runner-go #102

Merged
andr3 merged 9 commits from runner-go into main 2024-05-10 02:13:02 +01:00
3 changed files with 77 additions and 92 deletions
Showing only changes of commit 972b9b9b67 - Show all commits

View File

@ -2,8 +2,10 @@ package dbtypes
import ( import (
"encoding/json" "encoding/json"
"fmt"
"git.andr3h3nriqu3s.com/andr3/fyp/logic/db" "git.andr3h3nriqu3s.com/andr3/fyp/logic/db"
"github.com/charmbracelet/log"
) )
type LayerType int type LayerType int
@ -24,9 +26,24 @@ type Layer struct {
ExpType int `db:"mdl.exp_type" json:"exp_type"` ExpType int `db:"mdl.exp_type" json:"exp_type"`
} }
func (x *Layer) ShapeToSize() error {
v := x.GetShape()
switch x.LayerType {
case LAYER_INPUT:
x.Shape = fmt.Sprintf("%d,%d", v[1], v[2])
case LAYER_DENSE:
x.Shape = fmt.Sprintf("(%d)", v[0])
default:
x.Shape = "ERROR"
// DO nothing
}
return nil
}
func ShapeToString(args ...int) string { func ShapeToString(args ...int) string {
text, err := json.Marshal(args) text, err := json.Marshal(args)
if err != nil { if err != nil {
log.Error("json err!", "err", err)
panic("Could not generate Shape") panic("Could not generate Shape")
} }
return string(text) return string(text)
@ -35,12 +52,16 @@ func ShapeToString(args ...int) string {
func StringToShape(str string) (shape []int64) { func StringToShape(str string) (shape []int64) {
err := json.Unmarshal([]byte(str), &shape) err := json.Unmarshal([]byte(str), &shape)
if err != nil { if err != nil {
log.Error("json err!", "err", err)
panic("Could not parse Shape") panic("Could not parse Shape")
} }
return return
} }
func (l Layer) GetShape() []int64 { func (l Layer) GetShape() []int64 {
if l.Shape == "" {
return []int64{}
}
return StringToShape(l.Shape) return StringToShape(l.Shape)
} }

View File

@ -161,40 +161,23 @@ func generateCvsExp(c BasePack, run_path string, model_id string, doPanic bool)
return return
} }
func trainDefinition(c BasePack, model *BaseModel, definition_id string, load_prev bool) (accuracy float64, err error) { func trainDefinition(c BasePack, model *BaseModel, def Definition, load_prev bool) (accuracy float64, err error) {
l := c.GetLogger() l := c.GetLogger()
db := c.GetDb()
l.Warn("About to start training definition") l.Warn("About to start training definition")
accuracy = 0 accuracy = 0
layers, err := db.Query("select layer_type, shape from model_definition_layer where def_id=$1 order by layer_order asc;", definition_id)
layers, err := def.GetLayers(c.GetDb(), " order by layer_order asc;")
if err != nil { if err != nil {
return return
} }
defer layers.Close()
type layerrow struct { for _, layer := range layers {
LayerType int layer.ShapeToSize()
Shape string
LayerNum int
}
got := []layerrow{}
i := 1
for layers.Next() {
var row = layerrow{}
if err = layers.Scan(&row.LayerType, &row.Shape); err != nil {
return
}
row.Shape = shapeToSize(row.Shape)
row.LayerNum = 1
got = append(got, row)
i = i + 1
} }
// Generate run folder // Generate run folder
run_path := path.Join("/tmp", model.Id, "defs", definition_id) run_path := path.Join("/tmp", model.Id, "defs", def.Id)
err = os.MkdirAll(run_path, os.ModePerm) err = os.MkdirAll(run_path, os.ModePerm)
if err != nil { if err != nil {
@ -219,17 +202,17 @@ func trainDefinition(c BasePack, model *BaseModel, definition_id string, load_pr
} }
// Copy result around // Copy result around
result_path := path.Join("savedData", model.Id, "defs", definition_id) result_path := path.Join("savedData", model.Id, "defs", def.Id)
if err = tmpl.Execute(f, AnyMap{ if err = tmpl.Execute(f, AnyMap{
"Layers": got, "Layers": layers,
"Size": got[0].Shape, "Size": layers[0].Shape,
"DataDir": path.Join(getDir(), "savedData", model.Id, "data"), "DataDir": path.Join(getDir(), "savedData", model.Id, "data"),
"RunPath": run_path, "RunPath": run_path,
"ColorMode": model.ImageMode, "ColorMode": model.ImageMode,
"Model": model, "Model": model,
"EPOCH_PER_RUN": EPOCH_PER_RUN, "EPOCH_PER_RUN": EPOCH_PER_RUN,
"DefId": definition_id, "DefId": def.Id,
"LoadPrev": load_prev, "LoadPrev": load_prev,
"LastModelRunPath": path.Join(getDir(), result_path, "model.keras"), "LastModelRunPath": path.Join(getDir(), result_path, "model.keras"),
"SaveModelPath": path.Join(getDir(), result_path), "SaveModelPath": path.Join(getDir(), result_path),
@ -727,30 +710,16 @@ func trainModel(c BasePack, model *BaseModel) (err error) {
db := c.GetDb() db := c.GetDb()
l := c.GetLogger() l := c.GetLogger()
definitionsRows, err := db.Query("select id, target_accuracy, epoch from model_definition where status=$1 and model_id=$2", MODEL_DEFINITION_STATUS_INIT, model.Id) defs_, err := model.GetDefinitions(db, "and md.status=$2", MODEL_DEFINITION_STATUS_INIT)
if err != nil { if err != nil {
l.Error("Failed to train Model! Err:") l.Error("Failed to train Model!", "err", err)
l.Error(err)
ModelUpdateStatus(c, model.Id, int(FAILED_TRAINING)) ModelUpdateStatus(c, model.Id, int(FAILED_TRAINING))
return return
} }
defer definitionsRows.Close()
var definitions TraingModelRowDefinitions = []TrainModelRow{} var defs SortByAccuracyDefinitions = defs_
for definitionsRows.Next() { if len(defs) == 0 {
var rowv TrainModelRow
rowv.acuracy = 0
if err = definitionsRows.Scan(&rowv.id, &rowv.target_accuracy, &rowv.epoch); err != nil {
l.Error("Failed to train Model Could not read definition from db!Err:")
l.Error(err)
ModelUpdateStatus(c, model.Id, int(FAILED_TRAINING))
return
}
definitions = append(definitions, rowv)
}
if len(definitions) == 0 {
l.Error("No Definitions defined!") l.Error("No Definitions defined!")
ModelUpdateStatus(c, model.Id, int(FAILED_TRAINING)) ModelUpdateStatus(c, model.Id, int(FAILED_TRAINING))
return return
@ -761,32 +730,29 @@ func trainModel(c BasePack, model *BaseModel) (err error) {
for { for {
var toRemove ToRemoveList = []int{} var toRemove ToRemoveList = []int{}
for i, def := range definitions { for i, def := range defs {
ModelDefinitionUpdateStatus(c, def.id, MODEL_DEFINITION_STATUS_TRAINING) ModelDefinitionUpdateStatus(c, def.Id, MODEL_DEFINITION_STATUS_TRAINING)
accuracy, err := trainDefinition(c, model, def.id, !firstRound) accuracy, err := trainDefinition(c, model, *def, !firstRound)
if err != nil { if err != nil {
l.Error("Failed to train definition!Err:", "err", err) l.Error("Failed to train definition!Err:", "err", err)
ModelDefinitionUpdateStatus(c, def.id, MODEL_DEFINITION_STATUS_FAILED_TRAINING) ModelDefinitionUpdateStatus(c, def.Id, MODEL_DEFINITION_STATUS_FAILED_TRAINING)
toRemove = append(toRemove, i) toRemove = append(toRemove, i)
continue continue
} }
def.epoch += EPOCH_PER_RUN def.Epoch += EPOCH_PER_RUN
accuracy = accuracy * 100 accuracy = accuracy * 100
def.acuracy = float64(accuracy) def.Accuracy = float64(accuracy)
definitions[i].epoch += EPOCH_PER_RUN if accuracy >= float64(def.TargetAccuracy) {
definitions[i].acuracy = accuracy
if accuracy >= float64(def.target_accuracy) {
l.Info("Found a definition that reaches target_accuracy!") l.Info("Found a definition that reaches target_accuracy!")
_, err = db.Exec("update model_definition set accuracy=$1, status=$2, epoch=$3 where id=$4", accuracy, MODEL_DEFINITION_STATUS_TRANIED, def.epoch, def.id) _, err = db.Exec("update model_definition set accuracy=$1, status=$2, epoch=$3 where id=$4", accuracy, MODEL_DEFINITION_STATUS_TRANIED, def.Epoch, def.Id)
if err != nil { if err != nil {
l.Error("Failed to train definition!Err:\n", "err", err) l.Error("Failed to train definition!Err:\n", "err", err)
ModelUpdateStatus(c, model.Id, int(FAILED_TRAINING)) ModelUpdateStatus(c, model.Id, int(FAILED_TRAINING))
return err return err
} }
_, err = db.Exec("update model_definition set status=$1 where id!=$2 and model_id=$3 and status!=$4", MODEL_DEFINITION_STATUS_CANCELD_TRAINING, def.id, model.Id, MODEL_DEFINITION_STATUS_FAILED_TRAINING) _, err = db.Exec("update model_definition set status=$1 where id!=$2 and model_id=$3 and status!=$4", MODEL_DEFINITION_STATUS_CANCELD_TRAINING, def.Id, model.Id, MODEL_DEFINITION_STATUS_FAILED_TRAINING)
if err != nil { if err != nil {
l.Error("Failed to train definition!Err:\n", "err", err) l.Error("Failed to train definition!Err:\n", "err", err)
ModelUpdateStatus(c, model.Id, int(FAILED_TRAINING)) ModelUpdateStatus(c, model.Id, int(FAILED_TRAINING))
@ -797,14 +763,14 @@ func trainModel(c BasePack, model *BaseModel) (err error) {
break break
} }
if def.epoch > MAX_EPOCH { if def.Epoch > MAX_EPOCH {
fmt.Printf("Failed to train definition! Accuracy less %f < %d\n", accuracy, def.target_accuracy) fmt.Printf("Failed to train definition! Accuracy less %f < %d\n", accuracy, def.TargetAccuracy)
ModelDefinitionUpdateStatus(c, def.id, MODEL_DEFINITION_STATUS_FAILED_TRAINING) ModelDefinitionUpdateStatus(c, def.Id, MODEL_DEFINITION_STATUS_FAILED_TRAINING)
toRemove = append(toRemove, i) toRemove = append(toRemove, i)
continue continue
} }
_, err = db.Exec("update model_definition set accuracy=$1, epoch=$2, status=$3 where id=$4", accuracy, def.epoch, MODEL_DEFINITION_STATUS_PAUSED_TRAINING, def.id) _, err = db.Exec("update model_definition set accuracy=$1, epoch=$2, status=$3 where id=$4", accuracy, def.Epoch, MODEL_DEFINITION_STATUS_PAUSED_TRAINING, def.Id)
if err != nil { if err != nil {
l.Error("Failed to train definition!Err:\n", "err", err) l.Error("Failed to train definition!Err:\n", "err", err)
ModelUpdateStatus(c, model.Id, int(FAILED_TRAINING)) ModelUpdateStatus(c, model.Id, int(FAILED_TRAINING))
@ -822,28 +788,26 @@ func trainModel(c BasePack, model *BaseModel) (err error) {
l.Info("Round done", "toRemove", toRemove) l.Info("Round done", "toRemove", toRemove)
for _, n := range toRemove { for _, n := range toRemove {
definitions = remove(definitions, n) defs = remove(defs, n)
} }
len_def := len(definitions) len_def := len(defs)
if len_def == 0 { if len_def == 0 {
break break
} } else if len_def == 1 {
if len_def == 1 {
continue continue
} }
sort.Sort(sort.Reverse(definitions)) sort.Sort(sort.Reverse(defs))
acc := definitions[0].acuracy - 20.0 acc := defs[0].Accuracy - 20.0
l.Info("Training models, Highest acc", "acc", definitions[0].acuracy, "mod_acc", acc) l.Info("Training models, Highest acc", "acc", defs[0].Accuracy, "mod_acc", acc)
toRemove = []int{} toRemove = []int{}
for i, def := range definitions { for i, def := range defs {
if def.acuracy < acc { if def.Accuracy < acc {
toRemove = append(toRemove, i) toRemove = append(toRemove, i)
} }
} }
@ -853,8 +817,8 @@ func trainModel(c BasePack, model *BaseModel) (err error) {
sort.Sort(sort.Reverse(toRemove)) sort.Sort(sort.Reverse(toRemove))
for _, n := range toRemove { for _, n := range toRemove {
l.Warn("Removing definition not fast enough learning", "n", n) l.Warn("Removing definition not fast enough learning", "n", n)
ModelDefinitionUpdateStatus(c, definitions[n].id, MODEL_DEFINITION_STATUS_FAILED_TRAINING) ModelDefinitionUpdateStatus(c, defs[n].Id, MODEL_DEFINITION_STATUS_FAILED_TRAINING)
definitions = remove(definitions, n) defs = remove(defs, n)
} }
} }

View File

@ -259,21 +259,8 @@ func RunnerOrchestrator(db db.Db, config Config, handler *Handle) {
} }
} }
if task_to_dispatch != nil { if task_to_dispatch != nil && task_to_dispatch.TaskType != int(TASK_TYPE_DELETE_USER) {
// TODO split tasks into cpu tasks and GPU tasks
// Only let CPU tasks be done by the local users
if task_to_dispatch.TaskType == int(TASK_TYPE_DELETE_USER) {
for i := 0; i < len(task_runners_used); i += 1 {
if !task_runners_used[i] {
task_runners[i] <- *task_to_dispatch
task_runners_used[i] = true
task_to_dispatch = nil
break
}
}
continue
}
mutex := handler.DataMap["runners_mutex"].(*sync.Mutex) mutex := handler.DataMap["runners_mutex"].(*sync.Mutex)
mutex.Lock() mutex.Lock()
remote_runners := handler.DataMap["runners"].(map[string]interface{}) remote_runners := handler.DataMap["runners"].(map[string]interface{})
@ -286,14 +273,27 @@ func RunnerOrchestrator(db db.Db, config Config, handler *Handle) {
continue continue
} }
if runner_info.UserId == task_to_dispatch.UserId { if runner_info.UserId != task_to_dispatch.UserId {
go handleRemoteTask(handler, base, k, *task_to_dispatch) continue
}
go handleRemoteTask(handler, base, k, *task_to_dispatch)
task_to_dispatch = nil
break
}
mutex.Unlock()
}
if task_to_dispatch != nil {
for i := 0; i < len(task_runners_used); i += 1 {
if !task_runners_used[i] {
task_runners[i] <- *task_to_dispatch
task_runners_used[i] = true
task_to_dispatch = nil task_to_dispatch = nil
break break
} }
} }
mutex.Unlock()
} }
} }