fyp/logic/models/data.go

722 lines
19 KiB
Go
Raw Normal View History

package models
import (
"archive/zip"
"bytes"
"fmt"
"io"
"net/http"
"os"
"path"
"reflect"
"sort"
"strings"
2024-04-14 14:51:16 +01:00
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/db_types"
model_classes "git.andr3h3nriqu3s.com/andr3/fyp/logic/models/classes"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/utils"
)
func InsertIfNotPresent(ss []string, s string) []string {
i := sort.SearchStrings(ss, s)
if len(ss) > i && ss[i] == s {
return ss
}
ss = append(ss, "")
copy(ss[i+1:], ss[i:])
ss[i] = s
return ss
}
/*
This function will process a single file from the uploaded zip file
*/
func fileProcessor(
c *Context,
model *BaseModel,
reader *zip.ReadCloser,
ids map[string]string,
base_path string,
index int,
file_chan chan *zip.File,
back_channel chan int,
) {
defer func() {
if r := recover(); r != nil {
c.Logger.Error("Recovered in file processor", "processor id", index, "due to", r)
}
}()
for file := range file_chan {
c.Logger.Debug("Processing File", "file", file.Name)
data, err := reader.Open(file.Name)
if err != nil {
c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err)
back_channel <- index
continue
}
defer data.Close()
file_data, err := io.ReadAll(data)
if err != nil {
c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err)
back_channel <- index
continue
}
// TODO check if the file is a valid photo that matched the defined photo on the database
parts := strings.Split(file.Name, "/")
2024-04-14 15:19:32 +01:00
mode := DATA_POINT_MODE_TRAINING
if parts[0] == "testing" {
2024-04-14 15:19:32 +01:00
mode = DATA_POINT_MODE_TESTING
}
data_point_id, err := model_classes.AddDataPoint(c.Db, ids[parts[1]], "id://", mode)
if err != nil {
c.Logger.Error("Failed to add datapoint", "model", model.Id, "file name", file.Name, "err", err)
back_channel <- -index - 1
return
}
file_path := path.Join(base_path, data_point_id+"."+model.Format)
f, err := os.Create(file_path)
if err != nil {
c.Logger.Error("Failed to add datapoint", "model", model.Id, "file name", file.Name, "err", err)
back_channel <- -index - 1
return
}
defer f.Close()
f.Write(file_data)
2024-04-12 20:36:23 +01:00
if !TestImgForModel(c, model, file_path) {
c.Logger.Errorf("Image did not have valid format for model %s (in zip: %s)!", file_path, file.Name)
c.Logger.Warn("Not failling updating data point to status -1")
message := "Image did not have valid format for the model"
if err = model_classes.UpdateDataPointStatus(c.Db, data_point_id, -1, &message); err != nil {
c.Logger.Error("Failed to update data point", "model", model.Id, "file name", file.Name, "err", err)
back_channel <- -index - 1
return
}
}
back_channel <- index
}
}
func processZipFile(c *Context, model *BaseModel) {
var err error
failed := func(msg string) {
c.Logger.Error(msg, "err", err)
ModelUpdateStatus(c, model.Id, FAILED_PREPARING_ZIP_FILE)
}
reader, err := zip.OpenReader(path.Join("savedData", model.Id, "base_data.zip"))
if err != nil {
failed("Failed to proccess zip file failed to open reader")
return
}
defer reader.Close()
training := []string{}
testing := []string{}
for _, file := range reader.Reader.File {
paths := strings.Split(file.Name, "/")
if paths[1] == "" {
continue
}
if paths[0] != "training" && paths[0] != "testing" {
failed(fmt.Sprintf("Invalid file '%s'!", file.Name))
return
}
if paths[0] != "training" {
training = InsertIfNotPresent(training, paths[1])
} else if paths[0] != "testing" {
testing = InsertIfNotPresent(testing, paths[1])
}
}
if !reflect.DeepEqual(testing, training) {
c.Logger.Info("Diff", "testing", testing, "training", training)
failed("Testing and Training datesets are diferent")
return
}
base_path := path.Join("savedData", model.Id, "data")
if err = os.MkdirAll(base_path, os.ModePerm); err != nil {
failed("Failed to create base_path dir\n")
return
}
c.Logger.Info("File Structure looks good to append data", "model", model.Id)
ids := map[string]string{}
for i, name := range training {
id, err := model_classes.CreateClass(c.Db, model.Id, i, name)
if err != nil {
failed(fmt.Sprintf("Failed to create the class '%s'", name))
return
}
ids[name] = id
}
back_channel := make(chan int, c.Handle.Config.NumberOfWorkers)
file_chans := make([]chan *zip.File, c.Handle.Config.NumberOfWorkers)
for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ {
file_chans[i] = make(chan *zip.File, 2)
go fileProcessor(c, model, reader, ids, base_path, i, file_chans[i], back_channel)
}
clean_up_channels := func() {
for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ {
close(file_chans[i])
}
for i := 0; i < c.Handle.Config.NumberOfWorkers-1; i++ {
_ = <-back_channel
}
close(back_channel)
}
first_round := true
channel_to_send := 0
// Parelalize this
for _, file := range reader.Reader.File {
// Skip if dir
if file.Name[len(file.Name)-1] == '/' {
continue
}
file_chans[channel_to_send] <- file
if first_round {
channel_to_send += 1
if c.Handle.Config.NumberOfWorkers == channel_to_send {
first_round = false
}
}
// Can not do else if because need to handle the case where the value changes in
// previous if
if !first_round {
new_id, ok := <-back_channel
if !ok {
c.Logger.Fatal("Something is very wrong please check as this line should be unreachable")
}
if new_id < 0 {
c.Logger.Error("Worker failed", "worker id", -(new_id + 1))
clean_up_channels()
failed("One of the workers failed due to db error")
return
}
channel_to_send = new_id
}
}
clean_up_channels()
c.Logger.Info("Added data to model", "model", model.Id)
ModelUpdateStatus(c, model.Id, CONFIRM_PRE_TRAINING)
}
2024-03-09 09:41:16 +00:00
func processZipFileExpand(c *Context, model *BaseModel) {
2024-03-09 10:52:08 +00:00
var err error
2024-03-09 09:41:16 +00:00
failed := func(msg string) {
c.Logger.Error(msg, "err", err)
2024-04-08 14:17:13 +01:00
ModelUpdateStatus(c, model.Id, READY_ALTERATION_FAILED)
2024-03-09 09:41:16 +00:00
}
reader, err := zip.OpenReader(path.Join("savedData", model.Id, "expand_data.zip"))
if err != nil {
2024-03-09 10:52:08 +00:00
failed("Faield to proccess zip file failed to open reader\n")
2024-03-09 09:41:16 +00:00
return
}
defer reader.Close()
training := []string{}
testing := []string{}
for _, file := range reader.Reader.File {
paths := strings.Split(file.Name, "/")
if paths[1] == "" {
continue
}
if paths[0] != "training" && paths[0] != "testing" {
failed(fmt.Sprintf("Invalid file '%s' TODO add msg to response!!!", file.Name))
return
}
if paths[0] != "training" {
training = InsertIfNotPresent(training, paths[1])
} else if paths[0] != "testing" {
testing = InsertIfNotPresent(testing, paths[1])
}
}
if !reflect.DeepEqual(testing, training) {
failed("testing and training are diferent")
return
}
base_path := path.Join("savedData", model.Id, "data")
if err = os.MkdirAll(base_path, os.ModePerm); err != nil {
failed("Failed to create base_path dir")
return
}
ids := map[string]string{}
var baseOrder struct {
Order int `db:"class_order"`
}
2024-04-08 14:17:13 +01:00
err = GetDBOnce(c, &baseOrder, "model_classes where model_id=$1 order by class_order desc;", model.Id)
if err != nil {
failed("Failed to get the last class_order")
}
2024-04-08 14:17:13 +01:00
base := baseOrder.Order + 1
2024-04-08 14:17:13 +01:00
2024-03-09 09:41:16 +00:00
for i, name := range training {
id, _err := model_classes.CreateClass(c.Db, model.Id, base+i, name)
err = _err
2024-03-09 09:41:16 +00:00
if err != nil {
failed(fmt.Sprintf("Failed to create class '%s' on db\n", name))
return
}
ids[name] = id
}
back_channel := make(chan int, c.Handle.Config.NumberOfWorkers)
file_chans := make([]chan *zip.File, c.Handle.Config.NumberOfWorkers)
for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ {
file_chans[i] = make(chan *zip.File, 2)
go fileProcessor(c, model, reader, ids, base_path, i, file_chans[i], back_channel)
}
clean_up_channels := func() {
for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ {
close(file_chans[i])
}
for i := 0; i < c.Handle.Config.NumberOfWorkers-1; i++ {
_ = <-back_channel
}
close(back_channel)
}
first_round := true
channel_to_send := 0
// Parelalize this
2024-03-09 09:41:16 +00:00
for _, file := range reader.Reader.File {
// Skip if dir
2024-03-09 09:41:16 +00:00
if file.Name[len(file.Name)-1] == '/' {
continue
}
file_chans[channel_to_send] <- file
2024-03-09 09:41:16 +00:00
if first_round {
channel_to_send += 1
if c.Handle.Config.NumberOfWorkers == channel_to_send {
first_round = false
}
}
2024-03-09 09:41:16 +00:00
// Can not do else if because need to handle the case where the value changes in
// previous if
if !first_round {
new_id, ok := <-back_channel
if !ok {
c.Logger.Fatal("Something is very wrong please check as this line should be unreachable")
}
2024-03-09 09:41:16 +00:00
if new_id < 0 {
c.Logger.Error("Worker failed", "worker id", -(new_id + 1))
clean_up_channels()
failed("One of the workers failed due to db error")
return
}
2024-03-09 09:41:16 +00:00
channel_to_send = new_id
}
2024-03-09 09:41:16 +00:00
}
clean_up_channels()
2024-03-09 09:41:16 +00:00
c.Logger.Info("Added data to model", "id", model.Id)
ModelUpdateStatus(c, model.Id, READY)
}
func handleRemoveDataPoint(c *Context) *Error {
var dat JustId
if err := c.ToJSON(&dat); err != nil {
return err
}
var GetModelId struct {
Value string `db:"m.id"`
Format string `db:"m.format"`
}
err := GetDBOnce(c, &GetModelId, "model_data_point as mdp inner join model_classes as mc on mdp.class_id=mc.id inner join models as m on m.id=mc.model_id where mdp.id=$1;", dat.Id)
if err == NotFoundError {
return c.SendJSONStatus(404, "Data point not found")
} else if err != nil {
return c.E500M("Failed to find data point", err)
}
os.Remove(path.Join("savedData", GetModelId.Value, "data", dat.Id+"."+GetModelId.Format))
_, err = c.Db.Exec("delete from model_data_point where id=$1;", dat.Id)
if err != nil {
return c.E500M("Failed to remove datapoint from database", err)
}
return c.SendJSON("ok")
}
func handleDataUpload(handle *Handle) {
2024-03-09 10:52:08 +00:00
handle.Post("/models/data/upload", func(c *Context) *Error {
if !c.CheckAuthLevel(1) {
return nil
}
2024-03-09 10:52:08 +00:00
read_form, err := c.R.MultipartReader()
if err != nil {
2024-03-09 10:52:08 +00:00
return c.JsonBadRequest("Please provide a valid form data request!")
}
var id string
var file []byte
for {
part, err_part := read_form.NextPart()
if err_part == io.EOF {
break
} else if err_part != nil {
2024-03-09 10:52:08 +00:00
return c.JsonBadRequest("Please provide a valid form data request!")
}
if part.FormName() == "id" {
buf := new(bytes.Buffer)
buf.ReadFrom(part)
id = buf.String()
}
if part.FormName() == "file" {
buf := new(bytes.Buffer)
buf.ReadFrom(part)
file = buf.Bytes()
}
}
model, err := GetBaseModel(handle.Db, id)
2024-04-19 15:39:51 +01:00
if err == NotFoundError {
2024-03-09 10:52:08 +00:00
return c.SendJSONStatus(http.StatusNotFound, "Model not found")
} else if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
} else if model.CanTrain == 0 {
return c.JsonBadRequest("Model can not be trained!")
}
// TODO mk this path configurable
dir_path := path.Join("savedData", id)
f, err := os.Create(path.Join(dir_path, "base_data.zip"))
if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
}
defer f.Close()
f.Write(file)
ModelUpdateStatus(c, id, PREPARING_ZIP_FILE)
go processZipFile(c, model)
2024-03-09 10:52:08 +00:00
return c.SendJSON(model.Id)
})
// Create New class
type CreateNewEmptyClass struct {
Id string `json:"id" validate:"required"`
Name string `json:"name" validate:"required"`
}
PostAuthJson(handle, "/models/data/class/new", User_Normal, func(c *Context, obj *CreateNewEmptyClass) *Error {
model, err := GetBaseModel(c.Db, obj.Id)
2024-04-19 15:39:51 +01:00
if err == NotFoundError {
return c.JsonBadRequest("Model not found")
} else if err != nil {
return c.E500M("Failed to get model information", err)
} else if model.CanTrain == 0 {
return c.JsonBadRequest("Model can not be trained!")
}
if model.ModelType != 2 && model.Status != CONFIRM_PRE_TRAINING || model.ModelType == 2 && model.Status != CONFIRM_PRE_TRAINING && model.Status != READY {
return c.JsonBadRequest("Model not in the correct status for adding new class")
}
var baseOrder struct {
Order int `db:"class_order"`
}
err = GetDBOnce(c, &baseOrder, "model_classes where model_id=$1 order by class_order desc;", model.Id)
if err != nil {
return c.E500M("Could not create class", err)
}
id, err := model_classes.CreateClass(c.Db, model.Id, baseOrder.Order+1, obj.Name)
if err == model_classes.ClassAlreadyExists {
return c.JsonBadRequest("Class Already exists")
} else if err != nil {
return c.E500M("Could not create class", err)
}
2024-04-19 15:39:51 +01:00
var modelClass model_classes.ModelClassJSON
err = GetDBOnce(c, &modelClass, "model_classes where id=$1;", id)
if err != nil {
return c.E500M("Failed to get class information but class was creted", err)
}
return c.SendJSON(modelClass)
})
type AddNewImage struct {
ClassId string `json:"id" validate:"required"`
}
PostAuthFormJson(handle, "/models/data/class/image", User_Normal, func(c *Context, dat *AddNewImage, file []byte) *Error {
model_id, err := GetDbVar[string](c, "m.id", "model_classes as mc inner join models as m on m.id = mc.model_id where mc.id=$1;", dat.ClassId)
if err == NotFoundError {
return c.JsonBadRequest("Could not find the class")
} else if err != nil {
return c.E500M("Error getting class information", err)
}
c.Logger.Info("model", "model", *model_id)
model, err := GetBaseModel(c.Db, *model_id)
2024-04-19 15:39:51 +01:00
if err == NotFoundError {
return c.JsonBadRequest("Could not find the model")
} else if err != nil {
return c.E500M("Error getting model information", err)
} else if model.CanTrain == 0 {
return c.JsonBadRequest("Model can not be trained!")
}
// TODO make this work for zip files as well
/*c.Logger.Debug("Processing File", "file", file.Name)
data, err := reader.Open(file.Name)
if err != nil {
c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err)
back_channel <- index
continue
}
defer data.Close()
file_data, err := io.ReadAll(data)
if err != nil {
c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err)
back_channel <- index
continue
}*/
// TODO check if the file is a valid photo that matched the defined photo on the database
//parts := strings.Split(file.Name, "/")
mode := DATA_POINT_MODE_TRAINING
// TODO add the mode
/*mode := DATA_POINT_MODE_TRAINING
if parts[0] == "testing" {
mode = DATA_POINT_MODE_TESTING
}*/
data_point_id, err := model_classes.AddDataPoint(c.Db, dat.ClassId, "id://", mode)
if err != nil {
//c.Logger.Error("Failed to add datapoint", "model", model.Id, "file name", file.Name, "err", err)
c.Logger.Error("Failed to add datapoint", "data_point_id", data_point_id)
return c.E500M("Could not add image to model", err)
}
file_path := path.Join("savedData", model.Id, "data", data_point_id+"."+model.Format)
f, err := os.Create(file_path)
if err != nil {
//c.Logger.Error("Failed to save datapoint to disk", "model", model.Id, "file name", file.Name, "err", err)
c.Logger.Error("Failed to save datapoint to disk", "model", model.Id, "err", err)
return c.E500M("Could not add image to model", err)
}
defer f.Close()
f.Write(file)
if !TestImgForModel(c, model, file_path) {
//c.Logger.Errorf("Image did not have valid format for model %s (in zip: %s)!", file_path, file.Name)
c.Logger.Errorf("Image did not have valid format for model %s!", file_path)
c.Logger.Warn("Not failling updating data point to status -1")
message := "Image did not have valid format for the model"
if err = model_classes.UpdateDataPointStatus(c.Db, data_point_id, -1, &message); err != nil {
//c.Logger.Error("Failed to update data point", "model", model.Id, "file name", file.Name, "err", err)
c.Logger.Error("Failed to update data point", "model", model.Id, "err", err)
return c.E500M("Could not update information about the data point", err)
}
return c.JsonBadRequest("Provided file is not a valid image for this model")
}
return c.SendJSON(struct {
Id string `json:"id"`
}{data_point_id})
})
2024-03-09 09:41:16 +00:00
// ------
// ------ CLASS DATA UPLOAD
// ------
2024-03-09 10:52:08 +00:00
handle.Post("/models/data/class/upload", func(c *Context) *Error {
if !c.CheckAuthLevel(1) {
2024-03-09 09:41:16 +00:00
return nil
}
2024-03-09 10:52:08 +00:00
read_form, err := c.R.MultipartReader()
2024-03-09 09:41:16 +00:00
if err != nil {
return c.JsonBadRequest("Please provide a valid form data request!")
}
var id string
var file []byte
for {
part, err_part := read_form.NextPart()
if err_part == io.EOF {
break
} else if err_part != nil {
return c.JsonBadRequest("Please provide a valid form data request!")
}
if part.FormName() == "id" {
buf := new(bytes.Buffer)
buf.ReadFrom(part)
id = buf.String()
}
if part.FormName() == "file" {
buf := new(bytes.Buffer)
buf.ReadFrom(part)
file = buf.Bytes()
}
}
2024-03-09 10:52:08 +00:00
c.Logger.Info("Trying to expand model", "id", id)
2024-03-09 09:41:16 +00:00
model, err := GetBaseModel(handle.Db, id)
2024-04-19 15:39:51 +01:00
if err == NotFoundError {
2024-03-09 09:41:16 +00:00
return c.SendJSONStatus(http.StatusNotFound, "Model not found")
} else if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
} else if model.CanTrain == 0 {
return c.JsonBadRequest("Model can not be trained!")
2024-03-09 10:52:08 +00:00
}
// TODO work in allowing the model to add new in the pre ready moment
if model.Status != READY {
return c.JsonBadRequest("Model not in the correct state to add a more classes")
2024-03-09 09:41:16 +00:00
}
// TODO mk this path configurable
dir_path := path.Join("savedData", id)
f, err := os.Create(path.Join(dir_path, "expand_data.zip"))
if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
2024-03-09 09:41:16 +00:00
}
defer f.Close()
f.Write(file)
ModelUpdateStatus(c, id, READY_ALTERATION)
go processZipFileExpand(c, model)
return c.SendJSON(model.Id)
})
handle.DeleteAuth("/models/data/point", 1, handleRemoveDataPoint)
2024-03-09 10:52:08 +00:00
handle.Delete("/models/data/delete-zip-file", func(c *Context) *Error {
if !c.CheckAuthLevel(1) {
return nil
}
var dat JustId
2024-03-09 10:52:08 +00:00
if err := c.ToJSON(&dat); err != nil {
return err
}
2024-03-09 10:52:08 +00:00
model, err := GetBaseModel(handle.Db, dat.Id)
2024-04-19 15:39:51 +01:00
if err == NotFoundError {
2024-03-09 10:52:08 +00:00
return c.SendJSONStatus(http.StatusNotFound, "Model not found")
} else if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
}
2024-03-09 10:52:08 +00:00
delete_path := "base_data.zip"
2024-04-08 14:17:13 +01:00
if model.Status == READY_ALTERATION_FAILED {
2024-03-09 10:52:08 +00:00
delete_path = "expand_data.zip"
} else if model.Status != FAILED_PREPARING_ZIP_FILE {
return c.JsonBadRequest("Model not in the correct status")
}
2024-03-09 10:52:08 +00:00
err = os.Remove(path.Join("savedData", model.Id, delete_path))
if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
}
2024-04-08 14:17:13 +01:00
if model.Status != READY_ALTERATION_FAILED {
2024-03-09 10:52:08 +00:00
err = os.RemoveAll(path.Join("savedData", model.Id, "data"))
if err != nil {
return c.Error500(err)
}
} else {
c.Logger.Warn("Handle failed to remove the savedData when deleteing the zip file while expanding")
}
2024-04-08 14:17:13 +01:00
if model.Status != READY_ALTERATION_FAILED {
2024-03-09 10:52:08 +00:00
_, err = handle.Db.Exec("delete from model_classes where model_id=$1;", model.Id)
if err != nil {
return c.Error500(err)
}
} else {
2024-04-19 15:39:51 +01:00
_, err = handle.Db.Exec("delete from model_classes where model_id=$1 and status=$2;", model.Id, CLASS_STATUS_TO_TRAIN)
2024-03-09 10:52:08 +00:00
if err != nil {
return c.Error500(err)
}
}
2024-04-08 14:17:13 +01:00
if model.Status != READY_ALTERATION_FAILED {
2024-03-09 10:52:08 +00:00
ModelUpdateStatus(c, model.Id, CONFIRM_PRE_TRAINING)
} else {
ModelUpdateStatus(c, model.Id, READY)
}
return c.SendJSON(model.Id)
})
}