package models import ( "archive/zip" "bytes" "fmt" "io" "net/http" "os" "path" "reflect" "sort" "strings" . "git.andr3h3nriqu3s.com/andr3/fyp/logic/db_types" model_classes "git.andr3h3nriqu3s.com/andr3/fyp/logic/models/classes" . "git.andr3h3nriqu3s.com/andr3/fyp/logic/utils" ) func InsertIfNotPresent(ss []string, s string) []string { i := sort.SearchStrings(ss, s) if len(ss) > i && ss[i] == s { return ss } ss = append(ss, "") copy(ss[i+1:], ss[i:]) ss[i] = s return ss } /* This function will process a single file from the uploaded zip file */ func fileProcessor( c *Context, model *BaseModel, reader *zip.ReadCloser, ids map[string]string, base_path string, index int, file_chan chan *zip.File, back_channel chan int, ) { defer func() { if r := recover(); r != nil { c.Logger.Error("Recovered in file processor", "processor id", index, "due to", r) } }() for file := range file_chan { c.Logger.Debug("Processing File", "file", file.Name) data, err := reader.Open(file.Name) if err != nil { c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err) back_channel <- index continue } defer data.Close() file_data, err := io.ReadAll(data) if err != nil { c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err) back_channel <- index continue } // TODO check if the file is a valid photo that matched the defined photo on the database parts := strings.Split(file.Name, "/") mode := DATA_POINT_MODE_TRAINING if parts[0] == "testing" { mode = DATA_POINT_MODE_TESTING } data_point_id, err := model_classes.AddDataPoint(c.Db, ids[parts[1]], "id://", mode) if err != nil { c.Logger.Error("Failed to add datapoint", "model", model.Id, "file name", file.Name, "err", err) back_channel <- -index - 1 return } file_path := path.Join(base_path, data_point_id+"."+model.Format) f, err := os.Create(file_path) if err != nil { c.Logger.Error("Failed to add datapoint", "model", model.Id, "file name", file.Name, "err", err) back_channel <- -index - 1 return } defer f.Close() f.Write(file_data) if !TestImgForModel(c, model, file_path) { c.Logger.Errorf("Image did not have valid format for model %s (in zip: %s)!", file_path, file.Name) c.Logger.Warn("Not failling updating data point to status -1") message := "Image did not have valid format for the model" if err = model_classes.UpdateDataPointStatus(c.Db, data_point_id, -1, &message); err != nil { c.Logger.Error("Failed to update data point", "model", model.Id, "file name", file.Name, "err", err) back_channel <- -index - 1 return } } back_channel <- index } } func processZipFile(c *Context, model *BaseModel) { var err error failed := func(msg string) { c.Logger.Error(msg, "err", err) ModelUpdateStatus(c, model.Id, FAILED_PREPARING_ZIP_FILE) } reader, err := zip.OpenReader(path.Join("savedData", model.Id, "base_data.zip")) if err != nil { failed("Failed to proccess zip file failed to open reader") return } defer reader.Close() training := []string{} testing := []string{} for _, file := range reader.Reader.File { paths := strings.Split(file.Name, "/") if paths[1] == "" { continue } if paths[0] != "training" && paths[0] != "testing" { failed(fmt.Sprintf("Invalid file '%s'!", file.Name)) return } if paths[0] != "training" { training = InsertIfNotPresent(training, paths[1]) } else if paths[0] != "testing" { testing = InsertIfNotPresent(testing, paths[1]) } } if !reflect.DeepEqual(testing, training) { c.Logger.Info("Diff", "testing", testing, "training", training) failed("Testing and Training datesets are diferent") return } base_path := path.Join("savedData", model.Id, "data") if err = os.MkdirAll(base_path, os.ModePerm); err != nil { failed("Failed to create base_path dir\n") return } c.Logger.Info("File Structure looks good to append data", "model", model.Id) ids := map[string]string{} for i, name := range training { id, err := model_classes.CreateClass(c.Db, model.Id, i, name) if err != nil { failed(fmt.Sprintf("Failed to create the class '%s'", name)) return } ids[name] = id } back_channel := make(chan int, c.Handle.Config.NumberOfWorkers) file_chans := make([]chan *zip.File, c.Handle.Config.NumberOfWorkers) for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ { file_chans[i] = make(chan *zip.File, 2) go fileProcessor(c, model, reader, ids, base_path, i, file_chans[i], back_channel) } clean_up_channels := func() { for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ { close(file_chans[i]) } for i := 0; i < c.Handle.Config.NumberOfWorkers-1; i++ { _ = <-back_channel } close(back_channel) } first_round := true channel_to_send := 0 // Parelalize this for _, file := range reader.Reader.File { // Skip if dir if file.Name[len(file.Name)-1] == '/' { continue } file_chans[channel_to_send] <- file if first_round { channel_to_send += 1 if c.Handle.Config.NumberOfWorkers == channel_to_send { first_round = false } } // Can not do else if because need to handle the case where the value changes in // previous if if !first_round { new_id, ok := <-back_channel if !ok { c.Logger.Fatal("Something is very wrong please check as this line should be unreachable") } if new_id < 0 { c.Logger.Error("Worker failed", "worker id", -(new_id + 1)) clean_up_channels() failed("One of the workers failed due to db error") return } channel_to_send = new_id } } clean_up_channels() c.Logger.Info("Added data to model", "model", model.Id) ModelUpdateStatus(c, model.Id, CONFIRM_PRE_TRAINING) } func processZipFileExpand(c *Context, model *BaseModel) { var err error failed := func(msg string) { c.Logger.Error(msg, "err", err) ModelUpdateStatus(c, model.Id, READY_ALTERATION_FAILED) } reader, err := zip.OpenReader(path.Join("savedData", model.Id, "expand_data.zip")) if err != nil { failed("Faield to proccess zip file failed to open reader\n") return } defer reader.Close() training := []string{} testing := []string{} for _, file := range reader.Reader.File { paths := strings.Split(file.Name, "/") if paths[1] == "" { continue } if paths[0] != "training" && paths[0] != "testing" { failed(fmt.Sprintf("Invalid file '%s' TODO add msg to response!!!", file.Name)) return } if paths[0] != "training" { training = InsertIfNotPresent(training, paths[1]) } else if paths[0] != "testing" { testing = InsertIfNotPresent(testing, paths[1]) } } if !reflect.DeepEqual(testing, training) { failed("testing and training are diferent") return } base_path := path.Join("savedData", model.Id, "data") if err = os.MkdirAll(base_path, os.ModePerm); err != nil { failed("Failed to create base_path dir") return } ids := map[string]string{} var baseOrder struct { Order int `db:"class_order"` } err = GetDBOnce(c, &baseOrder, "model_classes where model_id=$1 order by class_order desc;", model.Id) if err != nil { failed("Failed to get the last class_order") } base := baseOrder.Order + 1 for i, name := range training { id, _err := model_classes.CreateClass(c.Db, model.Id, base+i, name) err = _err if err != nil { failed(fmt.Sprintf("Failed to create class '%s' on db\n", name)) return } ids[name] = id } back_channel := make(chan int, c.Handle.Config.NumberOfWorkers) file_chans := make([]chan *zip.File, c.Handle.Config.NumberOfWorkers) for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ { file_chans[i] = make(chan *zip.File, 2) go fileProcessor(c, model, reader, ids, base_path, i, file_chans[i], back_channel) } clean_up_channels := func() { for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ { close(file_chans[i]) } for i := 0; i < c.Handle.Config.NumberOfWorkers-1; i++ { _ = <-back_channel } close(back_channel) } first_round := true channel_to_send := 0 // Parelalize this for _, file := range reader.Reader.File { // Skip if dir if file.Name[len(file.Name)-1] == '/' { continue } file_chans[channel_to_send] <- file if first_round { channel_to_send += 1 if c.Handle.Config.NumberOfWorkers == channel_to_send { first_round = false } } // Can not do else if because need to handle the case where the value changes in // previous if if !first_round { new_id, ok := <-back_channel if !ok { c.Logger.Fatal("Something is very wrong please check as this line should be unreachable") } if new_id < 0 { c.Logger.Error("Worker failed", "worker id", -(new_id + 1)) clean_up_channels() failed("One of the workers failed due to db error") return } channel_to_send = new_id } } clean_up_channels() c.Logger.Info("Added data to model", "id", model.Id) ModelUpdateStatus(c, model.Id, READY) } func handleRemoveDataPoint(c *Context) *Error { var dat JustId if err := c.ToJSON(&dat); err != nil { return err } var GetModelId struct { Value string `db:"m.id"` Format string `db:"m.format"` } err := GetDBOnce(c, &GetModelId, "model_data_point as mdp inner join model_classes as mc on mdp.class_id=mc.id inner join models as m on m.id=mc.model_id where mdp.id=$1;", dat.Id) if err == NotFoundError { return c.SendJSONStatus(404, "Data point not found") } else if err != nil { return c.E500M("Failed to find data point", err) } os.Remove(path.Join("savedData", GetModelId.Value, "data", dat.Id+"."+GetModelId.Format)) _, err = c.Db.Exec("delete from model_data_point where id=$1;", dat.Id) if err != nil { return c.E500M("Failed to remove datapoint from database", err) } return c.SendJSON("ok") } func handleDataUpload(handle *Handle) { handle.Post("/models/data/upload", func(c *Context) *Error { if !c.CheckAuthLevel(1) { return nil } read_form, err := c.R.MultipartReader() if err != nil { return c.JsonBadRequest("Please provide a valid form data request!") } var id string var file []byte for { part, err_part := read_form.NextPart() if err_part == io.EOF { break } else if err_part != nil { return c.JsonBadRequest("Please provide a valid form data request!") } if part.FormName() == "id" { buf := new(bytes.Buffer) buf.ReadFrom(part) id = buf.String() } if part.FormName() == "file" { buf := new(bytes.Buffer) buf.ReadFrom(part) file = buf.Bytes() } } model, err := GetBaseModel(handle.Db, id) if err == ModelNotFoundError { return c.SendJSONStatus(http.StatusNotFound, "Model not found") } else if err != nil { return c.Error500(err) } // TODO mk this path configurable dir_path := path.Join("savedData", id) f, err := os.Create(path.Join(dir_path, "base_data.zip")) if err != nil { return c.Error500(err) } defer f.Close() f.Write(file) ModelUpdateStatus(c, id, PREPARING_ZIP_FILE) go processZipFile(c, model) return c.SendJSON(model.Id) }) // Create New class type CreateNewEmptyClass struct { Id string `json:"id" validate:"required"` Name string `json:"name" validate:"required"` } PostAuthJson(handle, "/models/data/class/new", User_Normal, func(c *Context, obj *CreateNewEmptyClass) *Error { model, err := GetBaseModel(c.Db, obj.Id) if err == ModelNotFoundError { return c.JsonBadRequest("Model not found") } if model.ModelType != 2 && model.Status != CONFIRM_PRE_TRAINING || model.ModelType == 2 && model.Status != CONFIRM_PRE_TRAINING && model.Status != READY { return c.JsonBadRequest("Model not in the correct status for adding new class") } var baseOrder struct { Order int `db:"class_order"` } err = GetDBOnce(c, &baseOrder, "model_classes where model_id=$1 order by class_order desc;", model.Id) if err != nil { return c.E500M("Could not create class", err) } id, err := model_classes.CreateClass(c.Db, model.Id, baseOrder.Order+1, obj.Name) if err == model_classes.ClassAlreadyExists { return c.JsonBadRequest("Class Already exists") } else if err != nil { return c.E500M("Could not create class", err) } var modelClass model_classes.ModelClass err = GetDBOnce(c, &modelClass, "model_classes where id=$1;", id) if err != nil { return c.E500M("Failed to get class information but class was creted", err) } return c.SendJSON(modelClass) }) type AddNewImage struct { ClassId string `json:"id" validate:"required"` } PostAuthFormJson(handle, "/models/data/class/image", User_Normal, func(c *Context, dat *AddNewImage, file []byte) *Error { model_id, err := GetDbVar[string](c, "m.id", "model_classes as mc inner join models as m on m.id = mc.model_id where mc.id=$1;", dat.ClassId) if err == NotFoundError { return c.JsonBadRequest("Could not find the class") } else if err != nil { return c.E500M("Error getting class information", err) } c.Logger.Info("model", "model", *model_id) model, err := GetBaseModel(c.Db, *model_id) if err == ModelNotFoundError { return c.JsonBadRequest("Could not find the model") } else if err != nil { return c.E500M("Error getting model information", err) } // TODO make this work for zip files as well /*c.Logger.Debug("Processing File", "file", file.Name) data, err := reader.Open(file.Name) if err != nil { c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err) back_channel <- index continue } defer data.Close() file_data, err := io.ReadAll(data) if err != nil { c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err) back_channel <- index continue }*/ // TODO check if the file is a valid photo that matched the defined photo on the database //parts := strings.Split(file.Name, "/") mode := DATA_POINT_MODE_TRAINING // TODO add the mode /*mode := DATA_POINT_MODE_TRAINING if parts[0] == "testing" { mode = DATA_POINT_MODE_TESTING }*/ data_point_id, err := model_classes.AddDataPoint(c.Db, dat.ClassId, "id://", mode) if err != nil { //c.Logger.Error("Failed to add datapoint", "model", model.Id, "file name", file.Name, "err", err) c.Logger.Error("Failed to add datapoint", "data_point_id", data_point_id) return c.E500M("Could not add image to model", err) } file_path := path.Join("savedData", model.Id, "data", data_point_id+"."+model.Format) f, err := os.Create(file_path) if err != nil { //c.Logger.Error("Failed to save datapoint to disk", "model", model.Id, "file name", file.Name, "err", err) c.Logger.Error("Failed to save datapoint to disk", "model", model.Id, "err", err) return c.E500M("Could not add image to model", err) } defer f.Close() f.Write(file) if !TestImgForModel(c, model, file_path) { //c.Logger.Errorf("Image did not have valid format for model %s (in zip: %s)!", file_path, file.Name) c.Logger.Errorf("Image did not have valid format for model %s!", file_path) c.Logger.Warn("Not failling updating data point to status -1") message := "Image did not have valid format for the model" if err = model_classes.UpdateDataPointStatus(c.Db, data_point_id, -1, &message); err != nil { //c.Logger.Error("Failed to update data point", "model", model.Id, "file name", file.Name, "err", err) c.Logger.Error("Failed to update data point", "model", model.Id, "err", err) return c.E500M("Could not update information about the data point", err) } return c.JsonBadRequest("Provided file is not a valid image for this model") } return c.SendJSON(struct { Id string `json:"id"` }{data_point_id}) }) // ------ // ------ CLASS DATA UPLOAD // ------ handle.Post("/models/data/class/upload", func(c *Context) *Error { if !c.CheckAuthLevel(1) { return nil } read_form, err := c.R.MultipartReader() if err != nil { return c.JsonBadRequest("Please provide a valid form data request!") } var id string var file []byte for { part, err_part := read_form.NextPart() if err_part == io.EOF { break } else if err_part != nil { return c.JsonBadRequest("Please provide a valid form data request!") } if part.FormName() == "id" { buf := new(bytes.Buffer) buf.ReadFrom(part) id = buf.String() } if part.FormName() == "file" { buf := new(bytes.Buffer) buf.ReadFrom(part) file = buf.Bytes() } } c.Logger.Info("Trying to expand model", "id", id) model, err := GetBaseModel(handle.Db, id) if err == ModelNotFoundError { return c.SendJSONStatus(http.StatusNotFound, "Model not found") } else if err != nil { return c.Error500(err) } // TODO work in allowing the model to add new in the pre ready moment if model.Status != READY { return c.JsonBadRequest("Model not in the correct state to add a more classes") } // TODO mk this path configurable dir_path := path.Join("savedData", id) f, err := os.Create(path.Join(dir_path, "expand_data.zip")) if err != nil { return c.Error500(err) } defer f.Close() f.Write(file) ModelUpdateStatus(c, id, READY_ALTERATION) go processZipFileExpand(c, model) return c.SendJSON(model.Id) }) handle.DeleteAuth("/models/data/point", 1, handleRemoveDataPoint) handle.Delete("/models/data/delete-zip-file", func(c *Context) *Error { if !c.CheckAuthLevel(1) { return nil } var dat JustId if err := c.ToJSON(&dat); err != nil { return err } model, err := GetBaseModel(handle.Db, dat.Id) if err == ModelNotFoundError { return c.SendJSONStatus(http.StatusNotFound, "Model not found") } else if err != nil { return c.Error500(err) } delete_path := "base_data.zip" if model.Status == READY_ALTERATION_FAILED { delete_path = "expand_data.zip" } else if model.Status != FAILED_PREPARING_ZIP_FILE { return c.JsonBadRequest("Model not in the correct status") } err = os.Remove(path.Join("savedData", model.Id, delete_path)) if err != nil { return c.Error500(err) } if model.Status != READY_ALTERATION_FAILED { err = os.RemoveAll(path.Join("savedData", model.Id, "data")) if err != nil { return c.Error500(err) } } else { c.Logger.Warn("Handle failed to remove the savedData when deleteing the zip file while expanding") } if model.Status != READY_ALTERATION_FAILED { _, err = handle.Db.Exec("delete from model_classes where model_id=$1;", model.Id) if err != nil { return c.Error500(err) } } else { _, err = handle.Db.Exec("delete from model_classes where model_id=$1 and status=$2;", model.Id, MODEL_CLASS_STATUS_TO_TRAIN) if err != nil { return c.Error500(err) } } if model.Status != READY_ALTERATION_FAILED { ModelUpdateStatus(c, model.Id, CONFIRM_PRE_TRAINING) } else { ModelUpdateStatus(c, model.Id, READY) } return c.SendJSON(model.Id) }) }