package models import ( "archive/zip" "bytes" "fmt" "io" "net/http" "os" "path" "reflect" "sort" "strings" model_classes "git.andr3h3nriqu3s.com/andr3/fyp/logic/models/classes" . "git.andr3h3nriqu3s.com/andr3/fyp/logic/models/utils" . "git.andr3h3nriqu3s.com/andr3/fyp/logic/utils" ) func InsertIfNotPresent(ss []string, s string) []string { i := sort.SearchStrings(ss, s) if len(ss) > i && ss[i] == s { return ss } ss = append(ss, "") copy(ss[i+1:], ss[i:]) ss[i] = s return ss } /* This function will process a single file from the uploaded zip file */ func fileProcessor( c *Context, model *BaseModel, reader *zip.ReadCloser, ids map[string]string, base_path string, index int, file_chan chan *zip.File, back_channel chan int, ) { defer func() { if r := recover(); r != nil { c.Logger.Error("Recovered in file processor", "processor id", index, "due to", r) } }() for file := range file_chan { c.Logger.Debug("Processing File", "file", file.Name) data, err := reader.Open(file.Name) if err != nil { c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err) back_channel <- index continue } defer data.Close() file_data, err := io.ReadAll(data) if err != nil { c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err) back_channel <- index continue } // TODO check if the file is a valid photo that matched the defined photo on the database parts := strings.Split(file.Name, "/") mode := model_classes.DATA_POINT_MODE_TRAINING if parts[0] == "testing" { mode = model_classes.DATA_POINT_MODE_TESTING } data_point_id, err := model_classes.AddDataPoint(c.Db, ids[parts[1]], "id://", mode) if err != nil { c.Logger.Error("Failed to add datapoint", "model", model.Id, "file name", file.Name, "err", err) back_channel <- -index - 1 return } file_path := path.Join(base_path, data_point_id+"."+model.Format) f, err := os.Create(file_path) if err != nil { c.Logger.Error("Failed to add datapoint", "model", model.Id, "file name", file.Name, "err", err) back_channel <- -index - 1 return } defer f.Close() f.Write(file_data) if !TestImgForModel(c, model, file_path) { c.Logger.Errorf("Image did not have valid format for model %s (in zip: %s)!", file_path, file.Name) c.Logger.Warn("Not failling updating data point to status -1") message := "Image did not have valid format for the model" if err = model_classes.UpdateDataPointStatus(c.Db, data_point_id, -1, &message); err != nil { c.Logger.Error("Failed to update data point", "model", model.Id, "file name", file.Name, "err", err) back_channel <- -index - 1 return } } back_channel <- index } } func processZipFile(c *Context, model *BaseModel) { var err error failed := func(msg string) { c.Logger.Error(msg, "err", err) ModelUpdateStatus(c, model.Id, FAILED_PREPARING_ZIP_FILE) } reader, err := zip.OpenReader(path.Join("savedData", model.Id, "base_data.zip")) if err != nil { failed("Failed to proccess zip file failed to open reader") return } defer reader.Close() training := []string{} testing := []string{} for _, file := range reader.Reader.File { paths := strings.Split(file.Name, "/") if paths[1] == "" { continue } if paths[0] != "training" && paths[0] != "testing" { failed(fmt.Sprintf("Invalid file '%s'!", file.Name)) return } if paths[0] != "training" { training = InsertIfNotPresent(training, paths[1]) } else if paths[0] != "testing" { testing = InsertIfNotPresent(testing, paths[1]) } } if !reflect.DeepEqual(testing, training) { c.Logger.Info("Diff", "testing", testing, "training", training) failed("Testing and Training datesets are diferent") return } base_path := path.Join("savedData", model.Id, "data") if err = os.MkdirAll(base_path, os.ModePerm); err != nil { failed("Failed to create base_path dir\n") return } c.Logger.Info("File Structure looks good to append data", "model", model.Id) ids := map[string]string{} for i, name := range training { id, err := model_classes.CreateClass(c.Db, model.Id, i, name) if err != nil { failed(fmt.Sprintf("Failed to create the class '%s'", name)) return } ids[name] = id } back_channel := make(chan int, c.Handle.Config.NumberOfWorkers) file_chans := make([]chan *zip.File, c.Handle.Config.NumberOfWorkers) for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ { file_chans[i] = make(chan *zip.File, 2) go fileProcessor(c, model, reader, ids, base_path, i, file_chans[i], back_channel) } clean_up_channels := func() { for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ { close(file_chans[i]) } for i := 0; i < c.Handle.Config.NumberOfWorkers-1; i++ { _ = <-back_channel } close(back_channel) } first_round := true channel_to_send := 0 // Parelalize this for _, file := range reader.Reader.File { // Skip if dir if file.Name[len(file.Name)-1] == '/' { continue } file_chans[channel_to_send] <- file if first_round { channel_to_send += 1 if c.Handle.Config.NumberOfWorkers == channel_to_send { first_round = false } } // Can not do else if because need to handle the case where the value changes in // previous if if !first_round { new_id, ok := <-back_channel if !ok { c.Logger.Fatal("Something is very wrong please check as this line should be unreachable") } if new_id < 0 { c.Logger.Error("Worker failed", "worker id", -(new_id + 1)) clean_up_channels() failed("One of the workers failed due to db error") return } channel_to_send = new_id } } clean_up_channels() c.Logger.Info("Added data to model", "model", model.Id) ModelUpdateStatus(c, model.Id, CONFIRM_PRE_TRAINING) } func processZipFileExpand(c *Context, model *BaseModel) { var err error failed := func(msg string) { c.Logger.Error(msg, "err", err) ModelUpdateStatus(c, model.Id, READY_ALTERATION_FAILED) } reader, err := zip.OpenReader(path.Join("savedData", model.Id, "expand_data.zip")) if err != nil { failed("Faield to proccess zip file failed to open reader\n") return } defer reader.Close() training := []string{} testing := []string{} for _, file := range reader.Reader.File { paths := strings.Split(file.Name, "/") if paths[1] == "" { continue } if paths[0] != "training" && paths[0] != "testing" { failed(fmt.Sprintf("Invalid file '%s' TODO add msg to response!!!", file.Name)) return } if paths[0] != "training" { training = InsertIfNotPresent(training, paths[1]) } else if paths[0] != "testing" { testing = InsertIfNotPresent(testing, paths[1]) } } if !reflect.DeepEqual(testing, training) { failed("testing and training are diferent") return } base_path := path.Join("savedData", model.Id, "data") if err = os.MkdirAll(base_path, os.ModePerm); err != nil { failed("Failed to create base_path dir") return } ids := map[string]string{} var baseOrder struct { Order int `db:"class_order"` } err = GetDBOnce(c, &baseOrder, "model_classes where model_id=$1 order by class_order desc;", model.Id) if err != nil { failed("Failed to get the last class_order") } base := baseOrder.Order + 1 for i, name := range training { id, _err := model_classes.CreateClass(c.Db, model.Id, base+i, name) err = _err if err != nil { failed(fmt.Sprintf("Failed to create class '%s' on db\n", name)) return } ids[name] = id } back_channel := make(chan int, c.Handle.Config.NumberOfWorkers) file_chans := make([]chan *zip.File, c.Handle.Config.NumberOfWorkers) for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ { file_chans[i] = make(chan *zip.File, 2) go fileProcessor(c, model, reader, ids, base_path, i, file_chans[i], back_channel) } clean_up_channels := func() { for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ { close(file_chans[i]) } for i := 0; i < c.Handle.Config.NumberOfWorkers-1; i++ { _ = <-back_channel } close(back_channel) } first_round := true channel_to_send := 0 // Parelalize this for _, file := range reader.Reader.File { // Skip if dir if file.Name[len(file.Name)-1] == '/' { continue } file_chans[channel_to_send] <- file if first_round { channel_to_send += 1 if c.Handle.Config.NumberOfWorkers == channel_to_send { first_round = false } } // Can not do else if because need to handle the case where the value changes in // previous if if !first_round { new_id, ok := <-back_channel if !ok { c.Logger.Fatal("Something is very wrong please check as this line should be unreachable") } if new_id < 0 { c.Logger.Error("Worker failed", "worker id", -(new_id + 1)) clean_up_channels() failed("One of the workers failed due to db error") return } channel_to_send = new_id } } clean_up_channels() c.Logger.Info("Added data to model", "id", model.Id) ModelUpdateStatus(c, model.Id, READY) } func handleRemoveDataPoint(c *Context) *Error { var dat JustId if err := c.ToJSON(&dat); err != nil { return err } var GetModelId struct { Value string `db:"m.id"` Format string `db:"m.format"` } err := GetDBOnce(c, &GetModelId, "model_data_point as mdp inner join model_classes as mc on mdp.class_id=mc.id inner join models as m on m.id=mc.model_id where mdp.id=$1;", dat.Id) if err == NotFoundError { return c.SendJSONStatus(404, "Data point not found") } else if err != nil { return c.E500M("Failed to find data point", err) } os.Remove(path.Join("savedData", GetModelId.Value, "data", dat.Id+"."+GetModelId.Format)) _, err = c.Db.Exec("delete from model_data_point where id=$1;", dat.Id) if err != nil { return c.E500M("Failed to remove datapoint from database", err) } return c.SendJSON("ok") } func handleDataUpload(handle *Handle) { handle.Post("/models/data/upload", func(c *Context) *Error { if !c.CheckAuthLevel(1) { return nil } read_form, err := c.R.MultipartReader() if err != nil { return c.JsonBadRequest("Please provide a valid form data request!") } var id string var file []byte for { part, err_part := read_form.NextPart() if err_part == io.EOF { break } else if err_part != nil { return c.JsonBadRequest("Please provide a valid form data request!") } if part.FormName() == "id" { buf := new(bytes.Buffer) buf.ReadFrom(part) id = buf.String() } if part.FormName() == "file" { buf := new(bytes.Buffer) buf.ReadFrom(part) file = buf.Bytes() } } model, err := GetBaseModel(handle.Db, id) if err == ModelNotFoundError { return c.SendJSONStatus(http.StatusNotFound, "Model not found") } else if err != nil { return c.Error500(err) } // TODO mk this path configurable dir_path := path.Join("savedData", id) f, err := os.Create(path.Join(dir_path, "base_data.zip")) if err != nil { return c.Error500(err) } defer f.Close() f.Write(file) ModelUpdateStatus(c, id, PREPARING_ZIP_FILE) go processZipFile(c, model) return c.SendJSON(model.Id) }) // ------ // ------ CLASS DATA UPLOAD // ------ handle.Post("/models/data/class/upload", func(c *Context) *Error { if !c.CheckAuthLevel(1) { return nil } read_form, err := c.R.MultipartReader() if err != nil { return c.JsonBadRequest("Please provide a valid form data request!") } var id string var file []byte for { part, err_part := read_form.NextPart() if err_part == io.EOF { break } else if err_part != nil { return c.JsonBadRequest("Please provide a valid form data request!") } if part.FormName() == "id" { buf := new(bytes.Buffer) buf.ReadFrom(part) id = buf.String() } if part.FormName() == "file" { buf := new(bytes.Buffer) buf.ReadFrom(part) file = buf.Bytes() } } c.Logger.Info("Trying to expand model", "id", id) model, err := GetBaseModel(handle.Db, id) if err == ModelNotFoundError { return c.SendJSONStatus(http.StatusNotFound, "Model not found") } else if err != nil { return c.Error500(err) } // TODO work in allowing the model to add new in the pre ready moment if model.Status != READY { return c.JsonBadRequest("Model not in the correct state to add a more classes") } // TODO mk this path configurable dir_path := path.Join("savedData", id) f, err := os.Create(path.Join(dir_path, "expand_data.zip")) if err != nil { return c.Error500(err) } defer f.Close() f.Write(file) ModelUpdateStatus(c, id, READY_ALTERATION) go processZipFileExpand(c, model) return c.SendJSON(model.Id) }) handle.DeleteAuth("/models/data/point", 1, handleRemoveDataPoint) handle.Delete("/models/data/delete-zip-file", func(c *Context) *Error { if !c.CheckAuthLevel(1) { return nil } var dat JustId if err := c.ToJSON(&dat); err != nil { return err } model, err := GetBaseModel(handle.Db, dat.Id) if err == ModelNotFoundError { return c.SendJSONStatus(http.StatusNotFound, "Model not found") } else if err != nil { return c.Error500(err) } delete_path := "base_data.zip" if model.Status == READY_ALTERATION_FAILED { delete_path = "expand_data.zip" } else if model.Status != FAILED_PREPARING_ZIP_FILE { return c.JsonBadRequest("Model not in the correct status") } err = os.Remove(path.Join("savedData", model.Id, delete_path)) if err != nil { return c.Error500(err) } if model.Status != READY_ALTERATION_FAILED { err = os.RemoveAll(path.Join("savedData", model.Id, "data")) if err != nil { return c.Error500(err) } } else { c.Logger.Warn("Handle failed to remove the savedData when deleteing the zip file while expanding") } if model.Status != READY_ALTERATION_FAILED { _, err = handle.Db.Exec("delete from model_classes where model_id=$1;", model.Id) if err != nil { return c.Error500(err) } } else { _, err = handle.Db.Exec("delete from model_classes where model_id=$1 and status=$2;", model.Id, MODEL_CLASS_STATUS_TO_TRAIN) if err != nil { return c.Error500(err) } } if model.Status != READY_ALTERATION_FAILED { ModelUpdateStatus(c, model.Id, CONFIRM_PRE_TRAINING) } else { ModelUpdateStatus(c, model.Id, READY) } return c.SendJSON(model.Id) }) }