fyp/logic/models/data.go

569 lines
14 KiB
Go
Raw Normal View History

package models
import (
"archive/zip"
"bytes"
"fmt"
"io"
"net/http"
"os"
"path"
"reflect"
"sort"
"strings"
model_classes "git.andr3h3nriqu3s.com/andr3/fyp/logic/models/classes"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/models/utils"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/utils"
)
func InsertIfNotPresent(ss []string, s string) []string {
i := sort.SearchStrings(ss, s)
if len(ss) > i && ss[i] == s {
return ss
}
ss = append(ss, "")
copy(ss[i+1:], ss[i:])
ss[i] = s
return ss
}
/*
This function will process a single file from the uploaded zip file
*/
func fileProcessor(
c *Context,
model *BaseModel,
reader *zip.ReadCloser,
ids map[string]string,
base_path string,
index int,
file_chan chan *zip.File,
back_channel chan int,
) {
defer func() {
if r := recover(); r != nil {
c.Logger.Error("Recovered in file processor", "processor id", index, "due to", r)
}
}()
for file := range file_chan {
c.Logger.Debug("Processing File", "file", file.Name)
data, err := reader.Open(file.Name)
if err != nil {
c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err)
back_channel <- index
continue
}
defer data.Close()
file_data, err := io.ReadAll(data)
if err != nil {
c.Logger.Error("Could not open file in zip %s\n", "file name", file.Name, "err", err)
back_channel <- index
continue
}
// TODO check if the file is a valid photo that matched the defined photo on the database
parts := strings.Split(file.Name, "/")
mode := model_classes.DATA_POINT_MODE_TRAINING
if parts[0] == "testing" {
mode = model_classes.DATA_POINT_MODE_TESTING
}
data_point_id, err := model_classes.AddDataPoint(c.Db, ids[parts[1]], "id://", mode)
if err != nil {
c.Logger.Error("Failed to add datapoint", "model", model.Id, "file name", file.Name, "err", err)
back_channel <- -index - 1
return
}
file_path := path.Join(base_path, data_point_id+"."+model.Format)
f, err := os.Create(file_path)
if err != nil {
c.Logger.Error("Failed to add datapoint", "model", model.Id, "file name", file.Name, "err", err)
back_channel <- -index - 1
return
}
defer f.Close()
f.Write(file_data)
if !testImgForModel(c, model, file_path) {
c.Logger.Errorf("Image did not have valid format for model %s (in zip: %s)!", file_path, file.Name)
c.Logger.Warn("Not failling updating data point to status -1")
message := "Image did not have valid format for the model"
if err = model_classes.UpdateDataPointStatus(c.Db, data_point_id, -1, &message); err != nil {
c.Logger.Error("Failed to update data point", "model", model.Id, "file name", file.Name, "err", err)
back_channel <- -index - 1
return
}
}
back_channel <- index
}
}
func processZipFile(c *Context, model *BaseModel) {
var err error
failed := func(msg string) {
c.Logger.Error(msg, "err", err)
ModelUpdateStatus(c, model.Id, FAILED_PREPARING_ZIP_FILE)
}
reader, err := zip.OpenReader(path.Join("savedData", model.Id, "base_data.zip"))
if err != nil {
failed("Failed to proccess zip file failed to open reader")
return
}
defer reader.Close()
training := []string{}
testing := []string{}
for _, file := range reader.Reader.File {
paths := strings.Split(file.Name, "/")
if paths[1] == "" {
continue
}
if paths[0] != "training" && paths[0] != "testing" {
failed(fmt.Sprintf("Invalid file '%s'!", file.Name))
return
}
if paths[0] != "training" {
training = InsertIfNotPresent(training, paths[1])
} else if paths[0] != "testing" {
testing = InsertIfNotPresent(testing, paths[1])
}
}
if !reflect.DeepEqual(testing, training) {
c.Logger.Info("Diff", "testing", testing, "training", training)
failed("Testing and Training datesets are diferent")
return
}
base_path := path.Join("savedData", model.Id, "data")
if err = os.MkdirAll(base_path, os.ModePerm); err != nil {
failed("Failed to create base_path dir\n")
return
}
c.Logger.Info("File Structure looks good to append data", "model", model.Id)
ids := map[string]string{}
for i, name := range training {
id, err := model_classes.CreateClass(c.Db, model.Id, i, name)
if err != nil {
failed(fmt.Sprintf("Failed to create the class '%s'", name))
return
}
ids[name] = id
}
back_channel := make(chan int, c.Handle.Config.NumberOfWorkers)
file_chans := make([]chan *zip.File, c.Handle.Config.NumberOfWorkers)
for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ {
file_chans[i] = make(chan *zip.File, 2)
go fileProcessor(c, model, reader, ids, base_path, i, file_chans[i], back_channel)
}
clean_up_channels := func() {
for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ {
close(file_chans[i])
}
for i := 0; i < c.Handle.Config.NumberOfWorkers - 1; i++ {
_ = <- back_channel
}
close(back_channel)
}
first_round := true
channel_to_send := 0
// Parelalize this
for _, file := range reader.Reader.File {
// Skip if dir
if file.Name[len(file.Name)-1] == '/' {
continue
}
file_chans[channel_to_send] <- file
if first_round {
channel_to_send += 1
if c.Handle.Config.NumberOfWorkers == channel_to_send {
first_round = false
}
}
// Can not do else if because need to handle the case where the value changes in
// previous if
if !first_round {
new_id, ok := <- back_channel
if !ok {
c.Logger.Fatal("Something is very wrong please check as this line should be unreachable")
}
if new_id < 0 {
c.Logger.Error("Worker failed", "worker id", -(new_id + 1))
clean_up_channels()
failed("One of the workers failed due to db error")
return
}
channel_to_send = new_id
}
}
clean_up_channels()
c.Logger.Info("Added data to model", "model", model.Id)
ModelUpdateStatus(c, model.Id, CONFIRM_PRE_TRAINING)
}
2024-03-09 09:41:16 +00:00
func processZipFileExpand(c *Context, model *BaseModel) {
2024-03-09 10:52:08 +00:00
var err error
2024-03-09 09:41:16 +00:00
failed := func(msg string) {
c.Logger.Error(msg, "err", err)
2024-04-08 14:17:13 +01:00
ModelUpdateStatus(c, model.Id, READY_ALTERATION_FAILED)
2024-03-09 09:41:16 +00:00
}
reader, err := zip.OpenReader(path.Join("savedData", model.Id, "expand_data.zip"))
if err != nil {
2024-03-09 10:52:08 +00:00
failed("Faield to proccess zip file failed to open reader\n")
2024-03-09 09:41:16 +00:00
return
}
defer reader.Close()
training := []string{}
testing := []string{}
for _, file := range reader.Reader.File {
paths := strings.Split(file.Name, "/")
if paths[1] == "" {
continue
}
if paths[0] != "training" && paths[0] != "testing" {
failed(fmt.Sprintf("Invalid file '%s' TODO add msg to response!!!", file.Name))
return
}
if paths[0] != "training" {
training = InsertIfNotPresent(training, paths[1])
} else if paths[0] != "testing" {
testing = InsertIfNotPresent(testing, paths[1])
}
}
if !reflect.DeepEqual(testing, training) {
failed("testing and training are diferent")
return
}
base_path := path.Join("savedData", model.Id, "data")
if err = os.MkdirAll(base_path, os.ModePerm); err != nil {
failed("Failed to create base_path dir")
return
}
ids := map[string]string{}
var baseOrder struct {
Order int `db:"class_order"`
}
2024-04-08 14:17:13 +01:00
err = GetDBOnce(c, &baseOrder, "model_classes where model_id=$1 order by class_order desc;", model.Id)
if err != nil {
failed("Failed to get the last class_order")
}
2024-04-08 14:17:13 +01:00
base := baseOrder.Order + 1
2024-04-08 14:17:13 +01:00
2024-03-09 09:41:16 +00:00
for i, name := range training {
id, _err := model_classes.CreateClass(c.Db, model.Id, base+i, name)
err = _err
2024-03-09 09:41:16 +00:00
if err != nil {
failed(fmt.Sprintf("Failed to create class '%s' on db\n", name))
return
}
ids[name] = id
}
back_channel := make(chan int, c.Handle.Config.NumberOfWorkers)
file_chans := make([]chan *zip.File, c.Handle.Config.NumberOfWorkers)
for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ {
file_chans[i] = make(chan *zip.File, 2)
go fileProcessor(c, model, reader, ids, base_path, i, file_chans[i], back_channel)
}
clean_up_channels := func() {
for i := 0; i < c.Handle.Config.NumberOfWorkers; i++ {
close(file_chans[i])
}
for i := 0; i < c.Handle.Config.NumberOfWorkers - 1; i++ {
_ = <- back_channel
}
close(back_channel)
}
first_round := true
channel_to_send := 0
// Parelalize this
2024-03-09 09:41:16 +00:00
for _, file := range reader.Reader.File {
// Skip if dir
2024-03-09 09:41:16 +00:00
if file.Name[len(file.Name)-1] == '/' {
continue
}
file_chans[channel_to_send] <- file
2024-03-09 09:41:16 +00:00
if first_round {
channel_to_send += 1
if c.Handle.Config.NumberOfWorkers == channel_to_send {
first_round = false
}
}
// Can not do else if because need to handle the case where the value changes in
// previous if
if !first_round {
new_id, ok := <- back_channel
if !ok {
c.Logger.Fatal("Something is very wrong please check as this line should be unreachable")
}
2024-03-09 09:41:16 +00:00
if new_id < 0 {
c.Logger.Error("Worker failed", "worker id", -(new_id + 1))
clean_up_channels()
failed("One of the workers failed due to db error")
return
}
2024-03-09 09:41:16 +00:00
channel_to_send = new_id
}
2024-03-09 09:41:16 +00:00
}
clean_up_channels()
2024-03-09 09:41:16 +00:00
c.Logger.Info("Added data to model", "id", model.Id)
ModelUpdateStatus(c, model.Id, READY)
}
func handleDataUpload(handle *Handle) {
2024-03-09 10:52:08 +00:00
handle.Post("/models/data/upload", func(c *Context) *Error {
if !c.CheckAuthLevel(1) {
return nil
}
2024-03-09 10:52:08 +00:00
read_form, err := c.R.MultipartReader()
if err != nil {
2024-03-09 10:52:08 +00:00
return c.JsonBadRequest("Please provide a valid form data request!")
}
var id string
var file []byte
for {
part, err_part := read_form.NextPart()
if err_part == io.EOF {
break
} else if err_part != nil {
2024-03-09 10:52:08 +00:00
return c.JsonBadRequest("Please provide a valid form data request!")
}
if part.FormName() == "id" {
buf := new(bytes.Buffer)
buf.ReadFrom(part)
id = buf.String()
}
if part.FormName() == "file" {
buf := new(bytes.Buffer)
buf.ReadFrom(part)
file = buf.Bytes()
}
}
model, err := GetBaseModel(handle.Db, id)
if err == ModelNotFoundError {
2024-03-09 10:52:08 +00:00
return c.SendJSONStatus(http.StatusNotFound, "Model not found")
} else if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
}
// TODO mk this path configurable
dir_path := path.Join("savedData", id)
f, err := os.Create(path.Join(dir_path, "base_data.zip"))
if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
}
defer f.Close()
f.Write(file)
ModelUpdateStatus(c, id, PREPARING_ZIP_FILE)
go processZipFile(c, model)
2024-03-09 10:52:08 +00:00
return c.SendJSON(model.Id)
})
2024-03-09 09:41:16 +00:00
// ------
// ------ CLASS DATA UPLOAD
// ------
2024-03-09 10:52:08 +00:00
handle.Post("/models/data/class/upload", func(c *Context) *Error {
if !c.CheckAuthLevel(1) {
2024-03-09 09:41:16 +00:00
return nil
}
2024-03-09 10:52:08 +00:00
read_form, err := c.R.MultipartReader()
2024-03-09 09:41:16 +00:00
if err != nil {
return c.JsonBadRequest("Please provide a valid form data request!")
}
var id string
var file []byte
for {
part, err_part := read_form.NextPart()
if err_part == io.EOF {
break
} else if err_part != nil {
return c.JsonBadRequest("Please provide a valid form data request!")
}
if part.FormName() == "id" {
buf := new(bytes.Buffer)
buf.ReadFrom(part)
id = buf.String()
}
if part.FormName() == "file" {
buf := new(bytes.Buffer)
buf.ReadFrom(part)
file = buf.Bytes()
}
}
2024-03-09 10:52:08 +00:00
c.Logger.Info("Trying to expand model", "id", id)
2024-03-09 09:41:16 +00:00
model, err := GetBaseModel(handle.Db, id)
if err == ModelNotFoundError {
return c.SendJSONStatus(http.StatusNotFound, "Model not found")
} else if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
}
// TODO work in allowing the model to add new in the pre ready moment
if model.Status != READY {
return c.JsonBadRequest("Model not in the correct state to add a more classes")
2024-03-09 09:41:16 +00:00
}
// TODO mk this path configurable
dir_path := path.Join("savedData", id)
f, err := os.Create(path.Join(dir_path, "expand_data.zip"))
if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
2024-03-09 09:41:16 +00:00
}
defer f.Close()
f.Write(file)
ModelUpdateStatus(c, id, READY_ALTERATION)
go processZipFileExpand(c, model)
return c.SendJSON(model.Id)
})
2024-03-09 10:52:08 +00:00
handle.Delete("/models/data/delete-zip-file", func(c *Context) *Error {
if !c.CheckAuthLevel(1) {
return nil
}
2024-03-09 10:52:08 +00:00
type ModelData struct {
Id string `json:"id"`
}
2024-03-09 10:52:08 +00:00
var dat ModelData
2024-03-09 10:52:08 +00:00
if err := c.ToJSON(&dat); err != nil {
return err
}
2024-03-09 10:52:08 +00:00
model, err := GetBaseModel(handle.Db, dat.Id)
if err == ModelNotFoundError {
2024-03-09 10:52:08 +00:00
return c.SendJSONStatus(http.StatusNotFound, "Model not found")
} else if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
}
2024-03-09 10:52:08 +00:00
delete_path := "base_data.zip"
2024-04-08 14:17:13 +01:00
if model.Status == READY_ALTERATION_FAILED {
2024-03-09 10:52:08 +00:00
delete_path = "expand_data.zip"
} else if model.Status != FAILED_PREPARING_ZIP_FILE {
return c.JsonBadRequest("Model not in the correct status")
}
2024-03-09 10:52:08 +00:00
err = os.Remove(path.Join("savedData", model.Id, delete_path))
if err != nil {
2024-03-09 10:52:08 +00:00
return c.Error500(err)
}
2024-04-08 14:17:13 +01:00
if model.Status != READY_ALTERATION_FAILED {
2024-03-09 10:52:08 +00:00
err = os.RemoveAll(path.Join("savedData", model.Id, "data"))
if err != nil {
return c.Error500(err)
}
} else {
c.Logger.Warn("Handle failed to remove the savedData when deleteing the zip file while expanding")
}
2024-04-08 14:17:13 +01:00
if model.Status != READY_ALTERATION_FAILED {
2024-03-09 10:52:08 +00:00
_, err = handle.Db.Exec("delete from model_classes where model_id=$1;", model.Id)
if err != nil {
return c.Error500(err)
}
} else {
_, err = handle.Db.Exec("delete from model_classes where model_id=$1 and status=$2;", model.Id, MODEL_CLASS_STATUS_TO_TRAIN)
if err != nil {
return c.Error500(err)
}
}
2024-04-08 14:17:13 +01:00
if model.Status != READY_ALTERATION_FAILED {
2024-03-09 10:52:08 +00:00
ModelUpdateStatus(c, model.Id, CONFIRM_PRE_TRAINING)
} else {
ModelUpdateStatus(c, model.Id, READY)
}
return c.SendJSON(model.Id)
})
}