runner-go (#102)

Reviewed-on: andr3/fyp#102
Co-authored-by: Andre Henriques <andr3h3nriqu3s@gmail.com>
Co-committed-by: Andre Henriques <andr3h3nriqu3s@gmail.com>
This commit was merged in pull request #102.
This commit is contained in:
2024-05-10 02:13:02 +01:00
committed by andr3
parent edd1e4c123
commit 0ac6ac8dce
44 changed files with 6609 additions and 511 deletions

View File

@@ -8,4 +8,6 @@ func HandleTasks(handle *Handle) {
handleUpload(handle)
handleList(handle)
handleRequests(handle)
handleRemoteRunner(handle)
handleRunnerData(handle)
}

960
logic/tasks/runner.go Normal file
View File

@@ -0,0 +1,960 @@
package tasks
import (
"os"
"path"
"sync"
"time"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/db_types"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/models/train"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/tasks/utils"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/utils"
"github.com/charmbracelet/log"
)
func verifyRunner(c *Context, dat *JustId) (runner *Runner, e *Error) {
runner, err := GetRunner(c, dat.Id)
if err == NotFoundError {
e = c.JsonBadRequest("Could not find runner, please register runner first")
return
} else if err != nil {
e = c.E500M("Failed to get information about the runner", err)
return
}
if runner.Token != *c.Token {
return nil, c.SendJSONStatus(401, "Only runners can use this funcion")
}
return
}
type VerifyTask struct {
Id string `json:"id" validate:"required"`
TaskId string `json:"taskId" validate:"required"`
}
type RunnerTrainDef struct {
Id string `json:"id" validate:"required"`
TaskId string `json:"taskId" validate:"required"`
DefId string `json:"defId" validate:"required"`
}
func verifyTask(x *Handle, c *Context, dat *VerifyTask) (task *Task, error *Error) {
mutex := x.DataMap["runners_mutex"].(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
var runners map[string]interface{} = x.DataMap["runners"].(map[string]interface{})
if runners[dat.Id] == nil {
return nil, c.JsonBadRequest("Runner not active")
}
var runner_data map[string]interface{} = runners[dat.Id].(map[string]interface{})
if runner_data["task"] == nil {
return nil, c.SendJSONStatus(404, "No active task")
}
return runner_data["task"].(*Task), nil
}
func clearRunnerTask(x *Handle, runner_id string) {
mutex := x.DataMap["runners_mutex"].(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
var runners map[string]interface{} = x.DataMap["runners"].(map[string]interface{})
var runner_data map[string]interface{} = runners[runner_id].(map[string]interface{})
runner_data["task"] = nil
runners[runner_id] = runner_data
x.DataMap["runners"] = runners
}
func handleRemoteRunner(x *Handle) {
type RegisterRunner struct {
Token string `json:"token" validate:"required"`
Type RunnerType `json:"type" validate:"required"`
}
PostAuthJson(x, "/tasks/runner/register", User_Normal, func(c *Context, dat *RegisterRunner) *Error {
if *c.Token != dat.Token {
// TODO do admin
return c.E500M("Please make sure that the token is the same that is being registered", nil)
}
c.Logger.Info("test", "dat", dat)
var runner Runner
err := GetDBOnce(c, &runner, "remote_runner as ru where token=$1", dat.Token)
if err != NotFoundError && err != nil {
return c.E500M("Failed to get information remote runners", err)
}
if err != NotFoundError {
return c.JsonBadRequest("Token is already registered by a runner")
}
// TODO get id from token passed by when doing admin
var userId = c.User.Id
var new_runner = struct {
Type RunnerType
UserId string `db:"user_id"`
Token string
}{
Type: dat.Type,
Token: dat.Token,
UserId: userId,
}
id, err := InsertReturnId(c, &new_runner, "remote_runner", "id")
if err != nil {
return c.E500M("Failed to create remote runner", err)
}
return c.SendJSON(struct {
Id string `json:"id"`
}{
Id: id,
})
})
// TODO remove runner
PostAuthJson(x, "/tasks/runner/init", User_Normal, func(c *Context, dat *JustId) *Error {
runner, error := verifyRunner(c, dat)
if error != nil {
return error
}
mutex := x.DataMap["runners_mutex"].(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
var runners map[string]interface{} = x.DataMap["runners"].(map[string]interface{})
if runners[dat.Id] != nil {
c.Logger.Info("Logger trying to register but already registerd")
c.ShowMessage = false
return c.SendJSON("Ok")
}
var new_runner = map[string]interface{}{}
new_runner["last_time_check"] = time.Now()
new_runner["runner_info"] = runner
runners[dat.Id] = new_runner
x.DataMap["runners"] = runners
return c.SendJSON("Ok")
})
PostAuthJson(x, "/tasks/runner/active", User_Normal, func(c *Context, dat *JustId) *Error {
_, error := verifyRunner(c, dat)
if error != nil {
return error
}
mutex := x.DataMap["runners_mutex"].(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
var runners map[string]interface{} = x.DataMap["runners"].(map[string]interface{})
if runners[dat.Id] == nil {
return c.JsonBadRequest("Runner not active")
}
var runner_data map[string]interface{} = runners[dat.Id].(map[string]interface{})
if runner_data["task"] == nil {
c.ShowMessage = false
return c.SendJSONStatus(404, "No active task")
}
c.ShowMessage = false
// This should be a task obj
return c.SendJSON(runner_data["task"])
})
PostAuthJson(x, "/tasks/runner/ready", User_Normal, func(c *Context, dat *VerifyTask) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, dat)
if error != nil {
return error
}
err := task.UpdateStatus(c, TASK_RUNNING, "Task Running on Runner")
if err != nil {
return c.E500M("Failed to set task status", err)
}
return c.SendJSON("Ok")
})
type TaskFail struct {
Id string `json:"id" validate:"required"`
TaskId string `json:"taskId" validate:"required"`
Reason string `json:"reason" validate:"required"`
}
PostAuthJson(x, "/tasks/runner/fail", User_Normal, func(c *Context, dat *TaskFail) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, &VerifyTask{Id: dat.Id, TaskId: dat.TaskId})
if error != nil {
return error
}
err := task.UpdateStatus(c, TASK_FAILED_RUNNING, dat.Reason)
if err != nil {
return c.E500M("Failed to set task status", err)
}
// Do extra clean up on tasks
switch task.TaskType {
case int(TASK_TYPE_TRAINING):
CleanUpFailed(c, task)
case int(TASK_TYPE_RETRAINING):
CleanUpFailedRetrain(c, task)
case int(TASK_TYPE_CLASSIFICATION):
// DO nothing
default:
panic("Do not know how to handle this")
}
mutex := x.DataMap["runners_mutex"].(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
var runners map[string]interface{} = x.DataMap["runners"].(map[string]interface{})
var runner_data map[string]interface{} = runners[dat.Id].(map[string]interface{})
runner_data["task"] = nil
runners[dat.Id] = runner_data
x.DataMap["runners"] = runners
return c.SendJSON("Ok")
})
PostAuthJson(x, "/tasks/runner/defs", User_Normal, func(c *Context, dat *VerifyTask) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, dat)
if error != nil {
return error
}
var status DefinitionStatus
switch task.TaskType {
case int(TASK_TYPE_TRAINING):
status = DEFINITION_STATUS_INIT
case int(TASK_TYPE_RETRAINING):
fallthrough
case int(TASK_TYPE_CLASSIFICATION):
status = DEFINITION_STATUS_READY
default:
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
model, err := GetBaseModel(c, *task.ModelId)
if err != nil {
return c.E500M("Failed to get model information", err)
}
defs, err := model.GetDefinitions(c, "and md.status=$2", status)
if err != nil {
return c.E500M("Failed to get the model definitions", err)
}
return c.SendJSON(defs)
})
PostAuthJson(x, "/tasks/runner/classes", User_Normal, func(c *Context, dat *VerifyTask) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, dat)
if error != nil {
return error
}
model, err := GetBaseModel(c, *task.ModelId)
if err != nil {
return c.E500M("Failed to get model information", err)
}
switch task.TaskType {
case int(TASK_TYPE_TRAINING):
classes, err := model.GetClasses(c, "and status in ($2, $3) order by mc.class_order asc", CLASS_STATUS_TO_TRAIN, CLASS_STATUS_TRAINING)
if err != nil {
return c.E500M("Failed to get the model classes", err)
}
return c.SendJSON(classes)
case int(TASK_TYPE_RETRAINING):
classes, err := model.GetClasses(c, "and status=$2 order by mc.class_order asc", CLASS_STATUS_TRAINING)
if err != nil {
return c.E500M("Failed to get the model classes", err)
}
return c.SendJSON(classes)
case int(TASK_TYPE_CLASSIFICATION):
classes, err := model.GetClasses(c, "and status=$2 order by mc.class_order asc", CLASS_STATUS_TRAINED)
if err != nil {
return c.E500M("Failed to get the model classes", err)
}
return c.SendJSON(classes)
default:
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
})
type RunnerTrainDefStatus struct {
Id string `json:"id" validate:"required"`
TaskId string `json:"taskId" validate:"required"`
DefId string `json:"defId" validate:"required"`
Status DefinitionStatus `json:"status" validate:"required"`
}
PostAuthJson(x, "/tasks/runner/train/def/status", User_Normal, func(c *Context, dat *RunnerTrainDefStatus) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, &VerifyTask{Id: dat.Id, TaskId: dat.TaskId})
if error != nil {
return error
}
if task.TaskType != int(TASK_TYPE_TRAINING) {
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
def, err := GetDefinition(c, dat.DefId)
if err != nil {
return c.E500M("Failed to get definition information", err)
}
err = def.UpdateStatus(c, dat.Status)
if err != nil {
return c.E500M("Failed to update model status", err)
}
return c.SendJSON("Ok")
})
type RunnerTrainDefHeadStatus struct {
Id string `json:"id" validate:"required"`
TaskId string `json:"taskId" validate:"required"`
DefId string `json:"defId" validate:"required"`
Status ModelHeadStatus `json:"status" validate:"required"`
}
PostAuthJson(x, "/tasks/runner/train/def/head/status", User_Normal, func(c *Context, dat *RunnerTrainDefHeadStatus) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, &VerifyTask{Id: dat.Id, TaskId: dat.TaskId})
if error != nil {
return error
}
if task.TaskType != int(TASK_TYPE_TRAINING) {
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
def, err := GetDefinition(c, dat.DefId)
if err != nil {
return c.E500M("Failed to get definition information", err)
}
_, err = c.Exec("update exp_model_head set status=$1 where def_id=$2;", dat.Status, def.Id)
if err != nil {
log.Error("Failed to train definition!")
return c.E500M("Failed to train definition", err)
}
return c.SendJSON("Ok")
})
type RunnerRetrainDefHeadStatus struct {
Id string `json:"id" validate:"required"`
TaskId string `json:"taskId" validate:"required"`
HeadId string `json:"defId" validate:"required"`
Status ModelHeadStatus `json:"status" validate:"required"`
}
PostAuthJson(x, "/tasks/runner/retrain/def/head/status", User_Normal, func(c *Context, dat *RunnerRetrainDefHeadStatus) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, &VerifyTask{Id: dat.Id, TaskId: dat.TaskId})
if error != nil {
return error
}
if task.TaskType != int(TASK_TYPE_RETRAINING) {
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
if err := UpdateStatus(c.GetDb(), "exp_model_head", dat.HeadId, MODEL_DEFINITION_STATUS_TRAINING); err != nil {
return c.E500M("Failed to update head status", err)
}
return c.SendJSON("Ok")
})
PostAuthJson(x, "/tasks/runner/train/def/layers", User_Normal, func(c *Context, dat *RunnerTrainDef) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, &VerifyTask{Id: dat.Id, TaskId: dat.TaskId})
if error != nil {
return error
}
switch task.TaskType {
case int(TASK_TYPE_TRAINING):
// Do nothing
case int(TASK_TYPE_RETRAINING):
// Do nothing
default:
c.Logger.Error("Task not is not the right type to get the layers", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the layers")
}
def, err := GetDefinition(c, dat.DefId)
if err != nil {
return c.E500M("Failed to get definition information", err)
}
layers, err := def.GetLayers(c, " order by layer_order asc")
if err != nil {
return c.E500M("Failed to get layers", err)
}
return c.SendJSON(layers)
})
PostAuthJson(x, "/tasks/runner/train/datapoints", User_Normal, func(c *Context, dat *VerifyTask) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, dat)
if error != nil {
return error
}
switch task.TaskType {
case int(TASK_TYPE_TRAINING):
// DO nothing
case int(TASK_TYPE_RETRAINING):
// DO nothing
default:
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
model, err := GetBaseModel(c, *task.ModelId)
if err != nil {
return c.E500M("Failed to get model information", err)
}
training_points, err := model.DataPoints(c, DATA_POINT_MODE_TRAINING)
if err != nil {
return c.E500M("Failed to get the model classes", err)
}
testing_points, err := model.DataPoints(c, DATA_POINT_MODE_TRAINING)
if err != nil {
return c.E500M("Failed to get the model classes", err)
}
return c.SendJSON(struct {
Testing []DataPoint `json:"testing"`
Training []DataPoint `json:"training"`
}{
Testing: testing_points,
Training: training_points,
})
})
type RunnerTrainDefEpoch struct {
Id string `json:"id" validate:"required"`
TaskId string `json:"taskId" validate:"required"`
DefId string `json:"defId" validate:"required"`
Epoch int `json:"epoch" validate:"required"`
Accuracy float64 `json:"accuracy" validate:"required"`
}
PostAuthJson(x, "/tasks/runner/train/epoch", User_Normal, func(c *Context, dat *RunnerTrainDefEpoch) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, &VerifyTask{
Id: dat.Id,
TaskId: dat.TaskId,
})
if error != nil {
return error
}
if task.TaskType != int(TASK_TYPE_TRAINING) {
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
def, err := GetDefinition(c, dat.DefId)
if err != nil {
return c.E500M("Failed to get definition information", err)
}
err = def.UpdateAfterEpoch(c, dat.Accuracy, dat.Epoch)
if err != nil {
return c.E500M("Failed to update model", err)
}
return c.SendJSON("Ok")
})
PostAuthJson(x, "/tasks/runner/train/mark-failed", User_Normal, func(c *Context, dat *VerifyTask) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, &VerifyTask{
Id: dat.Id,
TaskId: dat.TaskId,
})
if error != nil {
return error
}
if task.TaskType != int(TASK_TYPE_TRAINING) {
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
_, err := c.Exec(
"update model_definition set status=$1 "+
"where model_id=$2 and status in ($3, $4)",
MODEL_DEFINITION_STATUS_CANCELD_TRAINING,
task.ModelId,
MODEL_DEFINITION_STATUS_TRAINING,
MODEL_DEFINITION_STATUS_PAUSED_TRAINING,
)
if err != nil {
return c.E500M("Failed to mark definition as failed", err)
}
return c.SendJSON("Ok")
})
PostAuthJson(x, "/tasks/runner/model", User_Normal, func(c *Context, dat *VerifyTask) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, dat)
if error != nil {
return error
}
switch task.TaskType {
case int(TASK_TYPE_TRAINING):
//DO NOTHING
case int(TASK_TYPE_RETRAINING):
//DO NOTHING
case int(TASK_TYPE_CLASSIFICATION):
//DO NOTHING
default:
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
model, err := GetBaseModel(c, *task.ModelId)
if err != nil {
return c.E500M("Failed to get model information", err)
}
return c.SendJSON(model)
})
PostAuthJson(x, "/tasks/runner/heads", User_Normal, func(c *Context, dat *RunnerTrainDef) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, &VerifyTask{Id: dat.Id, TaskId: dat.TaskId})
if error != nil {
return error
}
type ExpHead struct {
Id string `json:"id"`
Start int `db:"range_start" json:"start"`
End int `db:"range_end" json:"end"`
}
switch task.TaskType {
case int(TASK_TYPE_TRAINING):
fallthrough
case int(TASK_TYPE_RETRAINING):
// status = 2 (INIT) 3 (TRAINING)
heads, err := GetDbMultitple[ExpHead](c, "exp_model_head where def_id=$1 and status in (2,3)", dat.DefId)
if err != nil {
return c.E500M("Failed getting active heads", err)
}
return c.SendJSON(heads)
case int(TASK_TYPE_CLASSIFICATION):
heads, err := GetDbMultitple[ExpHead](c, "exp_model_head where def_id=$1", dat.DefId)
if err != nil {
return c.E500M("Failed getting active heads", err)
}
return c.SendJSON(heads)
default:
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
})
PostAuthJson(x, "/tasks/runner/train_exp/class/status/train", User_Normal, func(c *Context, dat *VerifyTask) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, dat)
if error != nil {
return error
}
if task.TaskType != int(TASK_TYPE_TRAINING) {
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
model, err := GetBaseModel(c, *task.ModelId)
if err != nil {
return c.E500M("Failed to get model", err)
}
err = SetModelClassStatus(c, CLASS_STATUS_TRAINING, "model_id=$1 and status=$2;", model.Id, CLASS_STATUS_TO_TRAIN)
if err != nil {
return c.E500M("Failed update status", err)
}
return c.SendJSON("Ok")
})
PostAuthJson(x, "/tasks/runner/train/done", User_Normal, func(c *Context, dat *VerifyTask) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, dat)
if error != nil {
return error
}
if task.TaskType != int(TASK_TYPE_TRAINING) {
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
model, err := GetBaseModel(c, *task.ModelId)
if err != nil {
c.Logger.Error("Failed to get model", "err", err)
return c.E500M("Failed to get mode", err)
}
var def Definition
err = GetDBOnce(c, &def, "model_definition as md where model_id=$1 and status=$2 order by accuracy desc limit 1;", task.ModelId, DEFINITION_STATUS_TRANIED)
if err == NotFoundError {
// TODO Make the Model status have a message
c.Logger.Error("All definitions failed to train!")
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "All definition failed to train!")
return c.SendJSON("Ok")
} else if err != nil {
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "Failed to get model definition")
return c.E500M("Failed to get model definition", err)
}
if err = def.UpdateStatus(c, DEFINITION_STATUS_READY); err != nil {
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "Failed to update model definition")
return c.E500M("Failed to update model definition", err)
}
to_delete, err := c.Query("select id from model_definition where status != $1 and model_id=$2", MODEL_DEFINITION_STATUS_READY, model.Id)
if err != nil {
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "Failed to delete unsed definitions")
return c.E500M("Failed to delete unsed definitions", err)
}
defer to_delete.Close()
for to_delete.Next() {
var id string
if err = to_delete.Scan(&id); err != nil {
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "Failed to delete unsed definitions")
return c.E500M("Failed to delete unsed definitions", err)
}
os.RemoveAll(path.Join("savedData", model.Id, "defs", id))
}
// TODO Check if returning also works here
if _, err = c.Exec("delete from model_definition where status!=$1 and model_id=$2;", MODEL_DEFINITION_STATUS_READY, model.Id); err != nil {
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "Failed to delete unsed definitions")
return c.E500M("Failed to delete unsed definitions", err)
}
// Set the class status to trained
err = SetModelClassStatus(c, CLASS_STATUS_TRAINED, "model_id=$1;", model.Id)
if err != nil {
c.Logger.Error("Failed to set class status")
return c.E500M("Failed to set class status", err)
}
if err = model.UpdateStatus(c, READY); err != nil {
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "Failed to delete unsed definitions")
return c.E500M("Failed to update status of model", err)
}
task.UpdateStatusLog(c, TASK_DONE, "Model finished training")
clearRunnerTask(x, dat.Id)
return c.SendJSON("Ok")
})
type RunnerClassDone struct {
Id string `json:"id" validate:"required"`
TaskId string `json:"taskId" validate:"required"`
Result string `json:"result" validate:"required"`
}
PostAuthJson(x, "/tasks/runner/class/done", User_Normal, func(c *Context, dat *RunnerClassDone) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, &VerifyTask{
Id: dat.Id,
TaskId: dat.TaskId,
})
if error != nil {
return error
}
if task.TaskType != int(TASK_TYPE_CLASSIFICATION) {
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
err := task.SetResultText(c, dat.Result)
if err != nil {
return c.E500M("Failed to update the task", err)
}
err = task.UpdateStatus(c, TASK_DONE, "Task completed")
if err != nil {
return c.E500M("Failed to update task", err)
}
mutex := x.DataMap["runners_mutex"].(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
var runners map[string]interface{} = x.DataMap["runners"].(map[string]interface{})
var runner_data map[string]interface{} = runners[dat.Id].(map[string]interface{})
runner_data["task"] = nil
runners[dat.Id] = runner_data
x.DataMap["runners"] = runners
return c.SendJSON("Ok")
})
PostAuthJson(x, "/tasks/runner/train_exp/done", User_Normal, func(c *Context, dat *VerifyTask) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, dat)
if error != nil {
return error
}
if task.TaskType != int(TASK_TYPE_TRAINING) {
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
model, err := GetBaseModel(c, *task.ModelId)
if err != nil {
c.Logger.Error("Failed to get model", "err", err)
return c.E500M("Failed to get mode", err)
}
// TODO add check the to the model
var def Definition
err = GetDBOnce(c, &def, "model_definition as md where model_id=$1 and status=$2 order by accuracy desc limit 1;", task.ModelId, DEFINITION_STATUS_TRANIED)
if err == NotFoundError {
// TODO Make the Model status have a message
c.Logger.Error("All definitions failed to train!")
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "All definition failed to train!")
clearRunnerTask(x, dat.Id)
return c.SendJSON("Ok")
} else if err != nil {
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "Failed to get model definition")
return c.E500M("Failed to get model definition", err)
}
if err = def.UpdateStatus(c, DEFINITION_STATUS_READY); err != nil {
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "Failed to update model definition")
return c.E500M("Failed to update model definition", err)
}
to_delete, err := GetDbMultitple[JustId](c, "model_definition where status!=$1 and model_id=$2", MODEL_DEFINITION_STATUS_READY, model.Id)
if err != nil {
c.GetLogger().Error("Failed to select model_definition to delete")
return c.E500M("Failed to select model definition to delete", err)
}
for _, d := range to_delete {
os.RemoveAll(path.Join("savedData", model.Id, "defs", d.Id))
}
// TODO Check if returning also works here
if _, err = c.Exec("delete from model_definition where status!=$1 and model_id=$2;", MODEL_DEFINITION_STATUS_READY, model.Id); err != nil {
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "Failed to delete unsed definitions")
return c.E500M("Failed to delete unsed definitions", err)
}
if err = SplitModel(c, model); err != nil {
err = SetModelClassStatus(c, CLASS_STATUS_TO_TRAIN, "model_id=$1 and status=$2;", model.Id, CLASS_STATUS_TRAINING)
if err != nil {
c.Logger.Error("Failed to split the model! And Failed to set class status")
return c.E500M("Failed to split the model", err)
}
c.Logger.Error("Failed to split the model")
return c.E500M("Failed to split the model", err)
}
// Set the class status to trained
err = SetModelClassStatus(c, CLASS_STATUS_TRAINED, "model_id=$1 and status=$2;", model.Id, CLASS_STATUS_TRAINING)
if err != nil {
c.Logger.Error("Failed to set class status")
return c.E500M("Failed to set class status", err)
}
c.Logger.Warn("Removing base model for", "model", model.Id, "def", def.Id)
os.RemoveAll(path.Join("savedData", model.Id, "defs", def.Id, "model"))
os.RemoveAll(path.Join("savedData", model.Id, "defs", def.Id, "model.keras"))
if err = model.UpdateStatus(c, READY); err != nil {
model.UpdateStatus(c, FAILED_TRAINING)
task.UpdateStatusLog(c, TASK_FAILED_RUNNING, "Failed to delete unsed definitions")
return c.E500M("Failed to update status of model", err)
}
task.UpdateStatusLog(c, TASK_DONE, "Model finished training")
mutex := x.DataMap["runners_mutex"].(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
var runners map[string]interface{} = x.DataMap["runners"].(map[string]interface{})
var runner_data map[string]interface{} = runners[dat.Id].(map[string]interface{})
runner_data["task"] = nil
runners[dat.Id] = runner_data
x.DataMap["runners"] = runners
return c.SendJSON("Ok")
})
PostAuthJson(x, "/tasks/runner/retrain/done", User_Normal, func(c *Context, dat *VerifyTask) *Error {
_, error := verifyRunner(c, &JustId{Id: dat.Id})
if error != nil {
return error
}
task, error := verifyTask(x, c, dat)
if error != nil {
return error
}
if task.TaskType != int(TASK_TYPE_RETRAINING) {
c.Logger.Error("Task not is not the right type to get the definitions", "task type", task.TaskType)
return c.JsonBadRequest("Task is not the right type go get the definitions")
}
model, err := GetBaseModel(c, *task.ModelId)
if err != nil {
c.Logger.Error("Failed to get model", "err", err)
return c.E500M("Failed to get mode", err)
}
err = SetModelClassStatus(c, CLASS_STATUS_TRAINED, "model_id=$1 and status=$2;", model.Id, CLASS_STATUS_TRAINING)
if err != nil {
return c.E500M("Failed to set class status", err)
}
defs, err := model.GetDefinitions(c, "")
if err != nil {
return c.E500M("Failed to get definitions", err)
}
_, err = c.Exec("update exp_model_head set status=$1 where status=$2 and def_id=$3", MODEL_HEAD_STATUS_READY, MODEL_HEAD_STATUS_TRAINING, defs[0].Id)
if err != nil {
return c.E500M("Failed to set head status", err)
}
err = model.UpdateStatus(c, READY)
if err != nil {
return c.E500M("Failed to set class status", err)
}
task.UpdateStatusLog(c, TASK_DONE, "Model finished training")
clearRunnerTask(x, dat.Id)
return c.SendJSON("Ok")
})
}

View File

@@ -5,6 +5,7 @@ import (
"math"
"os"
"runtime/debug"
"sync"
"time"
"github.com/charmbracelet/log"
@@ -90,6 +91,57 @@ func runner(config Config, db db.Db, task_channel chan Task, index int, back_cha
}
}
/**
* Handle remote runner
*/
func handleRemoteTask(handler *Handle, base BasePack, runner_id string, task Task) {
logger := log.NewWithOptions(os.Stdout, log.Options{
ReportCaller: true,
ReportTimestamp: true,
TimeFormat: time.Kitchen,
Prefix: fmt.Sprintf("Runner pre %s", runner_id),
})
defer func() {
if r := recover(); r != nil {
logger.Error("Runner failed to setup for runner", "due to", r, "stack", string(debug.Stack()))
// TODO maybe create better failed task
task.UpdateStatusLog(base, TASK_FAILED_RUNNING, "Failed to setup task for runner")
}
}()
err := task.UpdateStatus(base, TASK_PICKED_UP, "Failed to setup task for runner")
if err != nil {
logger.Error("Failed to mark task as PICK UP")
return
}
mutex := handler.DataMap["runners_mutex"].(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()
switch task.TaskType {
case int(TASK_TYPE_RETRAINING):
runners := handler.DataMap["runners"].(map[string]interface{})
runner := runners[runner_id].(map[string]interface{})
runner["task"] = &task
runners[runner_id] = runner
handler.DataMap["runners"] = runners
case int(TASK_TYPE_TRAINING):
if err := PrepareTraining(handler, base, task, runner_id); err != nil {
logger.Error("Failed to prepare for training", "err", err)
}
case int(TASK_TYPE_CLASSIFICATION):
runners := handler.DataMap["runners"].(map[string]interface{})
runner := runners[runner_id].(map[string]interface{})
runner["task"] = &task
runners[runner_id] = runner
handler.DataMap["runners"] = runners
default:
logger.Error("Not sure what to do panicing", "taskType", task.TaskType)
panic("not sure what to do")
}
}
/**
* Tells the orcchestator to look at the task list from time to time
*/
@@ -125,7 +177,7 @@ func attentionSeeker(config Config, back_channel chan int) {
/**
* Manages what worker should to Work
*/
func RunnerOrchestrator(db db.Db, config Config) {
func RunnerOrchestrator(db db.Db, config Config, handler *Handle) {
logger := log.NewWithOptions(os.Stdout, log.Options{
ReportCaller: true,
ReportTimestamp: true,
@@ -133,6 +185,14 @@ func RunnerOrchestrator(db db.Db, config Config) {
Prefix: "Runner Orchestrator Logger",
})
setupHandle(handler)
base := BasePackStruct{
Db: db,
Logger: logger,
Host: config.Hostname,
}
gpu_workers := config.GpuWorker.NumberOfWorkers
logger.Info("Starting runners")
@@ -144,12 +204,12 @@ func RunnerOrchestrator(db db.Db, config Config) {
defer func() {
if r := recover(); r != nil {
logger.Error("Recovered in Orchestrator restarting", "due to", r)
logger.Error("Recovered in Orchestrator restarting", "due to", r, "stack", string(debug.Stack()))
for x := range task_runners {
close(task_runners[x])
}
close(back_channel)
go RunnerOrchestrator(db, config)
go RunnerOrchestrator(db, config, handler)
}
}()
@@ -159,6 +219,10 @@ func RunnerOrchestrator(db db.Db, config Config) {
for i := 0; i < gpu_workers; i++ {
task_runners[i] = make(chan Task, 10)
task_runners_used[i] = false
AddLocalRunner(handler, LocalRunner{
RunnerNum: i + 1,
Task: nil,
})
go runner(config, db, task_runners[i], i+1, back_channel)
}
@@ -166,14 +230,17 @@ func RunnerOrchestrator(db db.Db, config Config) {
for i := range back_channel {
if i > 0 {
logger.Info("Runner freed", "runner", i)
task_runners_used[i-1] = false
} else if i < 0 {
logger.Error("Runner died! Restarting!", "runner", i)
i = int(math.Abs(float64(i)) - 1)
task_runners_used[i] = false
go runner(config, db, task_runners[i], i+1, back_channel)
if i != 0 {
if i > 0 {
logger.Info("Runner freed", "runner", i)
task_runners_used[i-1] = false
} else if i < 0 {
logger.Error("Runner died! Restarting!", "runner", i)
i = int(math.Abs(float64(i)) - 1)
task_runners_used[i] = false
go runner(config, db, task_runners[i], i+1, back_channel)
}
AddLocalTask(handler, int(math.Abs(float64(i))), nil)
}
if task_to_dispatch == nil {
@@ -197,11 +264,38 @@ func RunnerOrchestrator(db db.Db, config Config) {
}
}
if task_to_dispatch != nil && task_to_dispatch.TaskType != int(TASK_TYPE_DELETE_USER) {
// TODO split tasks into cpu tasks and GPU tasks
mutex := handler.DataMap["runners_mutex"].(*sync.Mutex)
mutex.Lock()
remote_runners := handler.DataMap["runners"].(map[string]interface{})
for k, v := range remote_runners {
runner_data := v.(map[string]interface{})
runner_info := runner_data["runner_info"].(*Runner)
if runner_data["task"] != nil {
continue
}
if runner_info.UserId != task_to_dispatch.UserId {
continue
}
go handleRemoteTask(handler, base, k, *task_to_dispatch)
task_to_dispatch = nil
break
}
mutex.Unlock()
}
if task_to_dispatch != nil {
for i := 0; i < len(task_runners_used); i += 1 {
if !task_runners_used[i] {
task_runners[i] <- *task_to_dispatch
task_runners_used[i] = true
AddLocalTask(handler, i+1, task_to_dispatch)
task_to_dispatch = nil
break
}
@@ -211,6 +305,6 @@ func RunnerOrchestrator(db db.Db, config Config) {
}
}
func StartRunners(db db.Db, config Config) {
go RunnerOrchestrator(db, config)
func StartRunners(db db.Db, config Config, handler *Handle) {
go RunnerOrchestrator(db, config, handler)
}

View File

@@ -0,0 +1,51 @@
package task_runner
import (
"sync"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/tasks/utils"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/utils"
)
type LocalRunner struct {
RunnerNum int `json:"id"`
Task *Task `json:"task"`
}
type LocalRunners map[int]*LocalRunner
func LockRunners(handler *Handle, t string) *sync.Mutex {
req := t + "_runners_mutex"
if t == "" {
req = "runners_mutex"
}
mutex := handler.DataMap[req].(*sync.Mutex)
mutex.Lock()
return mutex
}
func setupHandle(handler *Handle) {
// Setup Remote Runner data
handler.DataMap["runners"] = map[string]interface{}{}
handler.DataMap["runners_mutex"] = &sync.Mutex{}
// Setup Local Runner data
handler.DataMap["local_runners"] = &LocalRunners{}
handler.DataMap["local_runners_mutex"] = &sync.Mutex{}
}
func AddLocalRunner(handler *Handle, runner LocalRunner) {
mutex := LockRunners(handler, "local")
defer mutex.Unlock()
runners := handler.DataMap["local_runners"].(*LocalRunners)
(*runners)[runner.RunnerNum] = &runner
}
func AddLocalTask(handler *Handle, runner_id int, task *Task) {
mutex := LockRunners(handler, "local")
defer mutex.Unlock()
runners := handler.DataMap["local_runners"].(*LocalRunners)
(*(*runners)[runner_id]).Task = task
}

View File

@@ -0,0 +1,25 @@
package tasks
import (
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/db_types"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/tasks/runner"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/utils"
)
func handleRunnerData(x *Handle) {
type NonType struct{}
PostAuthJson(x, "/tasks/runner/info", User_Admin, func(c *Context, dat *NonType) *Error {
mutex_remote := LockRunners(x, "")
defer mutex_remote.Unlock()
mutex_local := LockRunners(x, "local")
defer mutex_local.Unlock()
return c.SendJSON(struct {
RemoteRunners map[string]interface{} `json:"remoteRunners"`
LocalRunner *LocalRunners `json:"localRunners"`
}{
RemoteRunners: x.DataMap["runners"].(map[string]interface{}),
LocalRunner: x.DataMap["local_runners"].(*LocalRunners),
})
})
}

View File

@@ -0,0 +1,29 @@
package tasks_utils
import (
"time"
"git.andr3h3nriqu3s.com/andr3/fyp/logic/db"
dbtypes "git.andr3h3nriqu3s.com/andr3/fyp/logic/db_types"
)
type RunnerType int64
const (
RUNNER_TYPE_GPU RunnerType = iota + 1
)
type Runner struct {
Id string `json:"id" db:"ru.id"`
UserId string `json:"user_id" db:"ru.user_id"`
Token string `json:"token" db:"ru.token"`
Type RunnerType `json:"type" db:"ru.type"`
CreateOn time.Time `json:"createOn" db:"ru.created_on"`
}
func GetRunner(db db.Db, id string) (ru *Runner, err error) {
var runner Runner
err = dbtypes.GetDBOnce(db, &runner, "remote_runner as ru where ru.id=$1", id)
ru = &runner
return
}

View File

@@ -101,7 +101,11 @@ func (t Task) SetResult(base BasePack, result any) (err error) {
if err != nil {
return
}
_, err = base.GetDb().Exec("update tasks set result=$1 where id=$2", text, t.Id)
return t.SetResultText(base, string(text))
}
func (t Task) SetResultText(base BasePack, text string) (err error) {
_, err = base.GetDb().Exec("update tasks set result=$1 where id=$2", []byte(text), t.Id)
return
}