From 652542d261b2b72f39579ac643499361f5cbaa25 Mon Sep 17 00:00:00 2001 From: Andre Henriques Date: Wed, 15 May 2024 05:32:49 +0100 Subject: [PATCH] Improved classification performance --- DockerfileServer | 5 +- config.toml | 2 +- docker-compose.yml | 7 + entrypoint.sh | 4 + logic/models/add.go | 1 - logic/models/run.go | 26 ++- logic/models/train/train.go | 4 +- logic/stats/tasks.go | 2 +- logic/tasks/runner/runner.go | 211 ++++++++++++------ logic/tasks/utils/utils.go | 1 + logic/utils/config.go | 2 +- nginx.proxy.conf | 2 +- run.sh | 2 +- sql/models.sql | 4 +- views/py/python_model_template.py | 9 + webpage/src/routes/models/edit/+page.svelte | 5 +- .../src/routes/models/edit/ModelData.svelte | 16 +- .../src/routes/models/edit/TrainModel.svelte | 6 +- 18 files changed, 211 insertions(+), 98 deletions(-) create mode 100755 entrypoint.sh diff --git a/DockerfileServer b/DockerfileServer index ee8f480..35de99a 100644 --- a/DockerfileServer +++ b/DockerfileServer @@ -31,7 +31,10 @@ ADD go.mod . ADD go.sum . ADD main.go . ADD logic logic +ADD entrypoint.sh . RUN go install || true -CMD ["go", "run", "."] +RUN go build . + +CMD ["./entrypoint.sh"] diff --git a/config.toml b/config.toml index 89d4777..5cca7e8 100644 --- a/config.toml +++ b/config.toml @@ -12,7 +12,7 @@ USER = "service" [Worker] PULLING_TIME = "500ms" -NUMBER_OF_WORKERS = 1 +NUMBER_OF_WORKERS = 16 [DB] MAX_CONNECTIONS = 600 diff --git a/docker-compose.yml b/docker-compose.yml index b1cb945..4b84dde 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,6 +23,13 @@ services: - db volumes: - "./config.toml:/app/config.toml" + deploy: + resources: + reservations: + devices: + - driver: nvidia + count: 1 + capabilities: [gpu] proxy-server: image: andre-fyp-proxy networks: diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100755 index 0000000..4f692e8 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,4 @@ +#/bin/bash +while true; do + ./fyp +done diff --git a/logic/models/add.go b/logic/models/add.go index 81d1350..3746729 100644 --- a/logic/models/add.go +++ b/logic/models/add.go @@ -16,7 +16,6 @@ import ( ) func loadBaseImage(c *Context, id string) { - // TODO handle more types than png infile, err := os.Open(path.Join("savedData", id, "baseimage.png")) if err != nil { c.Logger.Errorf("Failed to read image for model with id %s\n", id) diff --git a/logic/models/run.go b/logic/models/run.go index ee2aec0..efa8a18 100644 --- a/logic/models/run.go +++ b/logic/models/run.go @@ -4,6 +4,7 @@ import ( "errors" "os" "path" + "runtime/debug" . "git.andr3h3nriqu3s.com/andr3/fyp/logic/db_types" . "git.andr3h3nriqu3s.com/andr3/fyp/logic/tasks/utils" @@ -37,11 +38,19 @@ func ReadJPG(scope *op.Scope, imagePath string, channels int64) *image.Image { return image.Scale(0, 255) } -func runModelNormal(model *BaseModel, def_id string, inputImage *tf.Tensor) (order int, confidence float32, err error) { +func runModelNormal(model *BaseModel, def_id string, inputImage *tf.Tensor, data *RunnerModelData) (order int, confidence float32, err error) { order = 0 err = nil - tf_model := tg.LoadModel(path.Join("savedData", model.Id, "defs", def_id, "model"), []string{"serve"}, nil) + var tf_model *tg.Model = nil + + if data.Id != nil && *data.Id == def_id { + tf_model = data.Model + } else { + tf_model = tg.LoadModel(path.Join("savedData", model.Id, "defs", def_id, "model"), []string{"serve"}, nil) + data.Model = tf_model + data.Id = &def_id + } results := tf_model.Exec([]tf.Output{ tf_model.Op("StatefulPartitionedCall", 0), @@ -125,10 +134,15 @@ func runModelExp(base BasePack, model *BaseModel, def_id string, inputImage *tf. return } -func ClassifyTask(base BasePack, task Task) (err error) { +type RunnerModelData struct { + Id *string + Model *tg.Model +} + +func ClassifyTask(base BasePack, task Task, data *RunnerModelData) (err error) { defer func() { if r := recover(); r != nil { - base.GetLogger().Error("Task failed due to", "error", r) + base.GetLogger().Error("Task failed due to", "error", r, "stack", string(debug.Stack())) task.UpdateStatusLog(base, TASK_FAILED_RUNNING, "Task failed running") } }() @@ -186,6 +200,8 @@ func ClassifyTask(base BasePack, task Task) (err error) { if model.ModelType == 2 { base.GetLogger().Info("Running model normal", "model", model.Id, "def", def_id) + data.Model = nil + data.Id = nil vi, confidence, err = runModelExp(base, model, def_id, inputImage) if err != nil { task.UpdateStatusLog(base, TASK_FAILED_RUNNING, "Failed to run model") @@ -193,7 +209,7 @@ func ClassifyTask(base BasePack, task Task) (err error) { } } else { base.GetLogger().Info("Running model normal", "model", model.Id, "def", def_id) - vi, confidence, err = runModelNormal(model, def_id, inputImage) + vi, confidence, err = runModelNormal(model, def_id, inputImage, data) if err != nil { task.UpdateStatusLog(base, TASK_FAILED_RUNNING, "Failed to run model") return diff --git a/logic/models/train/train.go b/logic/models/train/train.go index c8b7162..c25e731 100644 --- a/logic/models/train/train.go +++ b/logic/models/train/train.go @@ -1191,7 +1191,7 @@ func generateDefinition(c BasePack, model *BaseModel, target_accuracy int, numbe } order++ - loop := max(1, int((math.Log(float64(model.Width)) / math.Log(float64(10))))) + loop := max(1, int(math.Ceil((math.Log(float64(model.Width))/math.Log(float64(10)))))+1) for i := 0; i < loop; i++ { _, err = def.MakeLayer(db, order, LAYER_SIMPLE_BLOCK, "") order++ @@ -1299,7 +1299,7 @@ func generateExpandableDefinition(c BasePack, model *BaseModel, target_accuracy order++ // Create the blocks - loop := int((math.Log(float64(model.Width)) / math.Log(float64(10)))) + loop := int(math.Ceil((math.Log(float64(model.Width)) / math.Log(float64(10))))) + 1 /*if model.Width < 50 && model.Height < 50 { loop = 0 diff --git a/logic/stats/tasks.go b/logic/stats/tasks.go index c6fcb72..39b68b5 100644 --- a/logic/stats/tasks.go +++ b/logic/stats/tasks.go @@ -68,7 +68,7 @@ func handleTasksStats(handle *Handle) { } else if task.Status < 2 { total.Classfication_pre += 1 hours[hour].Classfication_pre += 1 - } else if task.Status < 4 { + } else if task.Status < 4 || task.Status == 5 { total.Classfication_running += 1 hours[hour].Classfication_running += 1 } diff --git a/logic/tasks/runner/runner.go b/logic/tasks/runner/runner.go index 99d4aff..7a877d3 100644 --- a/logic/tasks/runner/runner.go +++ b/logic/tasks/runner/runner.go @@ -19,6 +19,8 @@ import ( . "git.andr3h3nriqu3s.com/andr3/fyp/logic/utils" ) +var QUEUE_SIZE = 10 + /** * Actually runs the code */ @@ -47,17 +49,28 @@ func runner(config Config, db db.Db, task_channel chan Task, index int, back_cha Host: config.Hostname, } + loaded_model := RunnerModelData{ + Id: nil, + Model: nil, + } + + count := 0 for task := range task_channel { logger.Info("Got task", "task", task) task.UpdateStatusLog(base, TASK_PICKED_UP, "Runner picked up task") if task.TaskType == int(TASK_TYPE_CLASSIFICATION) { logger.Info("Classification Task") - if err = ClassifyTask(base, task); err != nil { + if err = ClassifyTask(base, task, &loaded_model); err != nil { logger.Error("Classification task failed", "error", err) } - back_channel <- index + if count == QUEUE_SIZE { + back_channel <- index + count = 0 + } else { + count += 1 + } continue } else if task.TaskType == int(TASK_TYPE_TRAINING) { logger.Info("Training Task") @@ -65,7 +78,12 @@ func runner(config Config, db db.Db, task_channel chan Task, index int, back_cha logger.Error("Failed to tain the model", "error", err) } - back_channel <- index + if count == QUEUE_SIZE { + back_channel <- index + count = 0 + } else { + count += 1 + } continue } else if task.TaskType == int(TASK_TYPE_RETRAINING) { logger.Info("Retraining Task") @@ -73,7 +91,12 @@ func runner(config Config, db db.Db, task_channel chan Task, index int, back_cha logger.Error("Failed to tain the model", "error", err) } - back_channel <- index + if count == QUEUE_SIZE { + back_channel <- index + count = 0 + } else { + count += 1 + } continue } else if task.TaskType == int(TASK_TYPE_DELETE_USER) { logger.Warn("User deleting Task") @@ -81,13 +104,23 @@ func runner(config Config, db db.Db, task_channel chan Task, index int, back_cha logger.Error("Failed to tain the model", "error", err) } - back_channel <- index + if count == QUEUE_SIZE { + back_channel <- index + count = 0 + } else { + count += 1 + } continue } logger.Error("Do not know how to route task", "task", task) task.UpdateStatusLog(base, TASK_FAILED_RUNNING, "Do not know how to route task") - back_channel <- index + if count == QUEUE_SIZE { + back_channel <- index + count = 0 + } else { + count += 1 + } } } @@ -145,7 +178,7 @@ func handleRemoteTask(handler *Handle, base BasePack, runner_id string, task Tas /** * Tells the orcchestator to look at the task list from time to time */ -func attentionSeeker(config Config, back_channel chan int) { +func attentionSeeker(config Config, db db.Db, back_channel chan int) { logger := log.NewWithOptions(os.Stdout, log.Options{ ReportCaller: true, ReportTimestamp: true, @@ -170,6 +203,20 @@ func attentionSeeker(config Config, back_channel chan int) { for true { back_channel <- 0 + for { + var s struct { + Count int `json:"count(*)"` + } + err := GetDBOnce(db, &s, "tasks where stauts = 5 or status = 3") + if err != nil { + break + } + if s.Count == 0 { + break + } + time.Sleep(t) + } + time.Sleep(t) } } @@ -194,11 +241,16 @@ func RunnerOrchestrator(db db.Db, config Config, handler *Handle) { } gpu_workers := config.GpuWorker.NumberOfWorkers + def_wait, err := time.ParseDuration(config.GpuWorker.Pulling) + if err != nil { + logger.Error("Failed to load", "error", err) + return + } logger.Info("Starting runners") task_runners := make([]chan Task, gpu_workers) - task_runners_used := make([]bool, gpu_workers) + task_runners_used := make([]int, gpu_workers) // One more to accomudate the Attention Seeker channel back_channel := make(chan int, gpu_workers+1) @@ -213,12 +265,12 @@ func RunnerOrchestrator(db db.Db, config Config, handler *Handle) { } }() - go attentionSeeker(config, back_channel) + // go attentionSeeker(config, db, back_channel) // Start the runners for i := 0; i < gpu_workers; i++ { - task_runners[i] = make(chan Task, 10) - task_runners_used[i] = false + task_runners[i] = make(chan Task, QUEUE_SIZE) + task_runners_used[i] = 0 AddLocalRunner(handler, LocalRunner{ RunnerNum: i + 1, Task: nil, @@ -226,82 +278,107 @@ func RunnerOrchestrator(db db.Db, config Config, handler *Handle) { go runner(config, db, task_runners[i], i+1, back_channel) } - var task_to_dispatch *Task = nil - - for i := range back_channel { - - if i != 0 { - if i > 0 { - logger.Info("Runner freed", "runner", i) - task_runners_used[i-1] = false - } else if i < 0 { - logger.Error("Runner died! Restarting!", "runner", i) - i = int(math.Abs(float64(i)) - 1) - task_runners_used[i] = false - go runner(config, db, task_runners[i], i+1, back_channel) + used := 0 + wait := time.Nanosecond * 100 + for { + out := true + for out { + select { + case i := <-back_channel: + if i != 0 { + if i > 0 { + logger.Info("Runner freed", "runner", i) + task_runners_used[i-1] = 0 + used = 0 + } else if i < 0 { + logger.Error("Runner died! Restarting!", "runner", i) + i = int(math.Abs(float64(i)) - 1) + task_runners_used[i] = 0 + used = 0 + go runner(config, db, task_runners[i], i+1, back_channel) + } + AddLocalTask(handler, int(math.Abs(float64(i))), nil) + } else if used == len(task_runners_used) { + continue + } + case <-time.After(wait): + if wait == time.Nanosecond*100 { + wait = def_wait + } + out = false } - AddLocalTask(handler, int(math.Abs(float64(i))), nil) } - if task_to_dispatch == nil { - var task TaskT - err := GetDBOnce(db, &task, "tasks as t "+ + for { + tasks, err := GetDbMultitple[TaskT](db, "tasks as t "+ // Get depenencies "left join tasks_dependencies as td on t.id=td.main_id "+ // Get the task that the depencey resolves to "left join tasks as t2 on t2.id=td.dependent_id "+ "where t.status=1 "+ - "group by t.id having count(td.id) filter (where t2.status in (0,1,2,3)) = 0;") + "group by t.id having count(td.id) filter (where t2.status in (0,1,2,3)) = 0 limit 20;") if err != NotFoundError && err != nil { log.Error("Failed to get tasks from db", "err", err) continue } - if err == NotFoundError { - task_to_dispatch = nil - } else { - temp := Task(task) - task_to_dispatch = &temp - } - } - - if task_to_dispatch != nil && task_to_dispatch.TaskType != int(TASK_TYPE_DELETE_USER) { - // TODO split tasks into cpu tasks and GPU tasks - mutex := handler.DataMap["runners_mutex"].(*sync.Mutex) - mutex.Lock() - remote_runners := handler.DataMap["runners"].(map[string]interface{}) - - for k, v := range remote_runners { - runner_data := v.(map[string]interface{}) - runner_info := runner_data["runner_info"].(*Runner) - - if runner_data["task"] != nil { - continue - } - - if runner_info.UserId != task_to_dispatch.UserId { - continue - } - - go handleRemoteTask(handler, base, k, *task_to_dispatch) - task_to_dispatch = nil + if err == NotFoundError || len(tasks) == 0 { break } - mutex.Unlock() - } + for _, task_to_dispatch := range tasks { + ttd := Task(*task_to_dispatch) + if task_to_dispatch != nil && task_to_dispatch.TaskType != int(TASK_TYPE_DELETE_USER) { + // TODO split tasks into cpu tasks and GPU tasks + mutex := handler.DataMap["runners_mutex"].(*sync.Mutex) + mutex.Lock() + remote_runners := handler.DataMap["runners"].(map[string]interface{}) - if task_to_dispatch != nil { - for i := 0; i < len(task_runners_used); i += 1 { - if !task_runners_used[i] { - task_runners[i] <- *task_to_dispatch - task_runners_used[i] = true - AddLocalTask(handler, i+1, task_to_dispatch) - task_to_dispatch = nil + for k, v := range remote_runners { + runner_data := v.(map[string]interface{}) + runner_info := runner_data["runner_info"].(*Runner) + + if runner_data["task"] != nil { + continue + } + + if runner_info.UserId != task_to_dispatch.UserId { + continue + } + + go handleRemoteTask(handler, base, k, ttd) + task_to_dispatch = nil + break + } + + mutex.Unlock() + } + + used = 0 + if task_to_dispatch != nil { + for i := 0; i < len(task_runners_used); i += 1 { + if task_runners_used[i] <= QUEUE_SIZE { + ttd.UpdateStatusLog(base, TASK_QUEUED, "Runner picked up task") + task_runners[i] <- ttd + task_runners_used[i] += 1 + AddLocalTask(handler, i+1, &ttd) + task_to_dispatch = nil + wait = time.Nanosecond * 100 + break + } else { + used += 1 + } + } + } + + if used == len(task_runners_used) { break } } - } + if used == len(task_runners_used) { + break + } + } } } diff --git a/logic/tasks/utils/utils.go b/logic/tasks/utils/utils.go index e202717..28a0f2a 100644 --- a/logic/tasks/utils/utils.go +++ b/logic/tasks/utils/utils.go @@ -50,6 +50,7 @@ const ( TASK_PREPARING = 0 TASK_TODO = 1 TASK_PICKED_UP = 2 + TASK_QUEUED = 5 TASK_RUNNING = 3 TASK_DONE = 4 ) diff --git a/logic/utils/config.go b/logic/utils/config.go index 86f6b04..b67d8b7 100644 --- a/logic/utils/config.go +++ b/logic/utils/config.go @@ -102,7 +102,7 @@ func (c *Config) Cleanup(db db.Db) { failLog(err) _, err = db.Exec("update models set status=$1 where status=$2", FAILED_PREPARING, PREPARING) failLog(err) - _, err = db.Exec("update tasks set status=$1 where status=$2", TASK_TODO, TASK_PICKED_UP) + _, err = db.Exec("update tasks set status=$1 where status=$2 or status=$3", TASK_TODO, TASK_PICKED_UP, TASK_QUEUED) failLog(err) tasks, err := GetDbMultitple[Task](db, "tasks where status=$1", TASK_RUNNING) diff --git a/nginx.proxy.conf b/nginx.proxy.conf index a4b9504..197404c 100644 --- a/nginx.proxy.conf +++ b/nginx.proxy.conf @@ -1,6 +1,6 @@ events { - worker_connections 1024; + worker_connections 2024; } http { diff --git a/run.sh b/run.sh index f216200..07f9a95 100755 --- a/run.sh +++ b/run.sh @@ -1,2 +1,2 @@ #!/bin/bash -podman run --rm --network host --gpus all --name fyp-server -it -v $(pwd):/app -e "TERM=xterm-256color" fyp-server bash +podman run --network host --gpus all --replace --name fyp-server --ulimit=nofile=100000:100000 -it -v $(pwd):/app -e "TERM=xterm-256color" --restart=always andre-fyp-server diff --git a/sql/models.sql b/sql/models.sql index 62d7071..5a1aa3d 100644 --- a/sql/models.sql +++ b/sql/models.sql @@ -59,7 +59,6 @@ create table if not exists model_definition ( accuracy real default 0, target_accuracy integer not null, epoch integer default 0, - -- TODO add max epoch -- 1: Pre Init -- 2: Init -- 3: Training @@ -78,7 +77,7 @@ create table if not exists model_definition_layer ( -- 1: input -- 2: dense -- 3: flatten - -- TODO add conv + -- 4: block layer_type integer not null, -- ei 28,28,1 -- a 28x28 grayscale image @@ -102,7 +101,6 @@ create table if not exists exp_model_head ( accuracy real default 0, - -- TODO add max epoch -- 1: Pre Init -- 2: Init -- 3: Training diff --git a/views/py/python_model_template.py b/views/py/python_model_template.py index 1935e98..fefc892 100644 --- a/views/py/python_model_template.py +++ b/views/py/python_model_template.py @@ -143,6 +143,15 @@ def addBlock( model.add(layers.Dropout(0.4)) return model +def resblock(x, kernelsize = 3, filters = 128): + fx = layers.Conv2D(filters, kernelsize, activation='relu', padding='same')(x) + fx = layers.BatchNormalization()(fx) + fx = layers.Conv2D(filters, kernelsize, padding='same')(fx) + out = layers.Add()([x,fx]) + out = layers.ReLU()(out) + out = layers.BatchNormalization()(out) + return out + {{ if .LoadPrev }} model = tf.keras.saving.load_model('{{.LastModelRunPath}}') diff --git a/webpage/src/routes/models/edit/+page.svelte b/webpage/src/routes/models/edit/+page.svelte index d38d901..38af99b 100644 --- a/webpage/src/routes/models/edit/+page.svelte +++ b/webpage/src/routes/models/edit/+page.svelte @@ -42,6 +42,7 @@ import 'src/styles/forms.css'; import { notificationStore } from 'src/lib/NotificationsStore.svelte'; + import Spinner from 'src/lib/Spinner.svelte'; let model: Promise = $state(new Promise(() => {})); let _model: Model | undefined = $state(undefined); @@ -188,7 +189,6 @@

{m.name}

-

Failed to prepare model

@@ -206,8 +206,7 @@ {:else if m.status == 3}
- - Processing zip file... + Processing zip file...
{:else if m.status == -3 || m.status == -4} diff --git a/webpage/src/routes/models/edit/ModelData.svelte b/webpage/src/routes/models/edit/ModelData.svelte index 3210058..dae0601 100644 --- a/webpage/src/routes/models/edit/ModelData.svelte +++ b/webpage/src/routes/models/edit/ModelData.svelte @@ -70,16 +70,16 @@ - -