2024-04-12 20:36:23 +01:00
|
|
|
package task_runner
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2024-04-15 23:04:53 +01:00
|
|
|
"math"
|
2024-04-12 20:36:23 +01:00
|
|
|
"os"
|
2024-04-15 23:04:53 +01:00
|
|
|
"runtime/debug"
|
2024-05-06 01:10:58 +01:00
|
|
|
"sync"
|
2024-04-12 20:36:23 +01:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/charmbracelet/log"
|
|
|
|
|
2024-04-17 17:46:43 +01:00
|
|
|
"git.andr3h3nriqu3s.com/andr3/fyp/logic/db"
|
2024-04-14 14:51:16 +01:00
|
|
|
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/db_types"
|
|
|
|
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/models"
|
2024-04-15 23:04:53 +01:00
|
|
|
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/models/train"
|
2024-04-12 20:36:23 +01:00
|
|
|
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/tasks/utils"
|
2024-04-17 14:56:57 +01:00
|
|
|
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/users"
|
2024-04-14 14:51:16 +01:00
|
|
|
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/utils"
|
2024-04-12 20:36:23 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Actually runs the code
|
|
|
|
*/
|
2024-04-17 17:46:43 +01:00
|
|
|
func runner(config Config, db db.Db, task_channel chan Task, index int, back_channel chan int) {
|
2024-04-12 20:36:23 +01:00
|
|
|
logger := log.NewWithOptions(os.Stdout, log.Options{
|
|
|
|
ReportCaller: true,
|
|
|
|
ReportTimestamp: true,
|
|
|
|
TimeFormat: time.Kitchen,
|
|
|
|
Prefix: fmt.Sprintf("Runner %d", index),
|
|
|
|
})
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
2024-04-15 23:04:53 +01:00
|
|
|
logger.Error("Recovered in runner", "processor id", index, "due to", r, "stack", string(debug.Stack()))
|
2024-04-12 20:36:23 +01:00
|
|
|
back_channel <- -index
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
logger.Info("Started up")
|
|
|
|
|
2024-04-15 23:04:53 +01:00
|
|
|
var err error
|
2024-04-12 20:36:23 +01:00
|
|
|
|
2024-04-15 23:04:53 +01:00
|
|
|
base := BasePackStruct{
|
|
|
|
Db: db,
|
|
|
|
Logger: logger,
|
|
|
|
Host: config.Hostname,
|
|
|
|
}
|
2024-04-12 20:36:23 +01:00
|
|
|
|
|
|
|
for task := range task_channel {
|
|
|
|
logger.Info("Got task", "task", task)
|
2024-04-15 23:04:53 +01:00
|
|
|
task.UpdateStatusLog(base, TASK_PICKED_UP, "Runner picked up task")
|
|
|
|
|
|
|
|
if task.TaskType == int(TASK_TYPE_CLASSIFICATION) {
|
|
|
|
logger.Info("Classification Task")
|
|
|
|
if err = ClassifyTask(base, task); err != nil {
|
|
|
|
logger.Error("Classification task failed", "error", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
back_channel <- index
|
|
|
|
continue
|
|
|
|
} else if task.TaskType == int(TASK_TYPE_TRAINING) {
|
|
|
|
logger.Info("Training Task")
|
|
|
|
if err = RunTaskTrain(base, task); err != nil {
|
2024-04-16 17:48:52 +01:00
|
|
|
logger.Error("Failed to tain the model", "error", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
back_channel <- index
|
|
|
|
continue
|
|
|
|
} else if task.TaskType == int(TASK_TYPE_RETRAINING) {
|
|
|
|
logger.Info("Retraining Task")
|
|
|
|
if err = RunTaskRetrain(base, task); err != nil {
|
2024-04-15 23:04:53 +01:00
|
|
|
logger.Error("Failed to tain the model", "error", err)
|
|
|
|
}
|
|
|
|
|
2024-04-17 14:56:57 +01:00
|
|
|
back_channel <- index
|
|
|
|
continue
|
|
|
|
} else if task.TaskType == int(TASK_TYPE_DELETE_USER) {
|
|
|
|
logger.Warn("User deleting Task")
|
|
|
|
if err = DeleteUser(base, task); err != nil {
|
|
|
|
logger.Error("Failed to tain the model", "error", err)
|
|
|
|
}
|
|
|
|
|
2024-04-15 23:04:53 +01:00
|
|
|
back_channel <- index
|
|
|
|
continue
|
|
|
|
}
|
2024-04-14 14:51:16 +01:00
|
|
|
|
2024-04-15 23:04:53 +01:00
|
|
|
logger.Error("Do not know how to route task", "task", task)
|
|
|
|
task.UpdateStatusLog(base, TASK_FAILED_RUNNING, "Do not know how to route task")
|
|
|
|
back_channel <- index
|
2024-04-12 20:36:23 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-05-06 01:10:58 +01:00
|
|
|
/**
|
|
|
|
* Handle remote runner
|
|
|
|
*/
|
|
|
|
func handleRemoteTask(handler *Handle, base BasePack, runner_id string, task Task) {
|
|
|
|
logger := log.NewWithOptions(os.Stdout, log.Options{
|
|
|
|
ReportCaller: true,
|
|
|
|
ReportTimestamp: true,
|
|
|
|
TimeFormat: time.Kitchen,
|
|
|
|
Prefix: fmt.Sprintf("Runner pre %s", runner_id),
|
|
|
|
})
|
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
logger.Error("Runner failed to setup for runner", "due to", r, "stack", string(debug.Stack()))
|
|
|
|
// TODO maybe create better failed task
|
|
|
|
task.UpdateStatusLog(base, TASK_FAILED_RUNNING, "Failed to setup task for runner")
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
err := task.UpdateStatus(base, TASK_PICKED_UP, "Failed to setup task for runner")
|
|
|
|
if err != nil {
|
|
|
|
logger.Error("Failed to mark task as PICK UP")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
mutex := handler.DataMap["runners_mutex"].(*sync.Mutex)
|
|
|
|
mutex.Lock()
|
|
|
|
defer mutex.Unlock()
|
|
|
|
|
|
|
|
switch task.TaskType {
|
2024-05-09 00:46:42 +01:00
|
|
|
case int(TASK_TYPE_RETRAINING):
|
|
|
|
runners := handler.DataMap["runners"].(map[string]interface{})
|
|
|
|
runner := runners[runner_id].(map[string]interface{})
|
|
|
|
runner["task"] = &task
|
|
|
|
runners[runner_id] = runner
|
|
|
|
handler.DataMap["runners"] = runners
|
2024-05-06 01:10:58 +01:00
|
|
|
case int(TASK_TYPE_TRAINING):
|
|
|
|
if err := PrepareTraining(handler, base, task, runner_id); err != nil {
|
|
|
|
logger.Error("Failed to prepare for training", "err", err)
|
|
|
|
}
|
2024-05-07 18:18:48 +01:00
|
|
|
case int(TASK_TYPE_CLASSIFICATION):
|
|
|
|
runners := handler.DataMap["runners"].(map[string]interface{})
|
|
|
|
runner := runners[runner_id].(map[string]interface{})
|
|
|
|
runner["task"] = &task
|
|
|
|
runners[runner_id] = runner
|
|
|
|
handler.DataMap["runners"] = runners
|
2024-05-06 01:10:58 +01:00
|
|
|
default:
|
|
|
|
logger.Error("Not sure what to do panicing", "taskType", task.TaskType)
|
|
|
|
panic("not sure what to do")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-04-12 20:36:23 +01:00
|
|
|
/**
|
|
|
|
* Tells the orcchestator to look at the task list from time to time
|
|
|
|
*/
|
|
|
|
func attentionSeeker(config Config, back_channel chan int) {
|
|
|
|
logger := log.NewWithOptions(os.Stdout, log.Options{
|
|
|
|
ReportCaller: true,
|
|
|
|
ReportTimestamp: true,
|
|
|
|
TimeFormat: time.Kitchen,
|
|
|
|
Prefix: "Runner Orchestrator Logger [Attention]",
|
|
|
|
})
|
|
|
|
|
2024-04-15 23:04:53 +01:00
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
logger.Error("Attencion seeker dies", "due to", r)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2024-04-12 20:36:23 +01:00
|
|
|
logger.Info("Started up")
|
|
|
|
|
2024-04-15 23:04:53 +01:00
|
|
|
t, err := time.ParseDuration(config.GpuWorker.Pulling)
|
|
|
|
if err != nil {
|
|
|
|
logger.Error("Failed to load", "error", err)
|
|
|
|
return
|
|
|
|
}
|
2024-04-12 20:36:23 +01:00
|
|
|
|
|
|
|
for true {
|
|
|
|
back_channel <- 0
|
2024-04-14 14:51:16 +01:00
|
|
|
|
2024-04-15 23:04:53 +01:00
|
|
|
time.Sleep(t)
|
2024-04-12 20:36:23 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Manages what worker should to Work
|
|
|
|
*/
|
2024-05-06 01:10:58 +01:00
|
|
|
func RunnerOrchestrator(db db.Db, config Config, handler *Handle) {
|
2024-04-12 20:36:23 +01:00
|
|
|
logger := log.NewWithOptions(os.Stdout, log.Options{
|
|
|
|
ReportCaller: true,
|
|
|
|
ReportTimestamp: true,
|
|
|
|
TimeFormat: time.Kitchen,
|
|
|
|
Prefix: "Runner Orchestrator Logger",
|
|
|
|
})
|
|
|
|
|
2024-05-10 02:12:40 +01:00
|
|
|
setupHandle(handler)
|
2024-05-06 01:10:58 +01:00
|
|
|
|
|
|
|
base := BasePackStruct{
|
|
|
|
Db: db,
|
|
|
|
Logger: logger,
|
|
|
|
Host: config.Hostname,
|
|
|
|
}
|
|
|
|
|
2024-04-12 20:36:23 +01:00
|
|
|
gpu_workers := config.GpuWorker.NumberOfWorkers
|
|
|
|
|
|
|
|
logger.Info("Starting runners")
|
|
|
|
|
|
|
|
task_runners := make([]chan Task, gpu_workers)
|
|
|
|
task_runners_used := make([]bool, gpu_workers)
|
|
|
|
// One more to accomudate the Attention Seeker channel
|
|
|
|
back_channel := make(chan int, gpu_workers+1)
|
|
|
|
|
2024-04-15 23:04:53 +01:00
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
2024-05-10 02:12:40 +01:00
|
|
|
logger.Error("Recovered in Orchestrator restarting", "due to", r, "stack", string(debug.Stack()))
|
2024-04-15 23:04:53 +01:00
|
|
|
for x := range task_runners {
|
|
|
|
close(task_runners[x])
|
|
|
|
}
|
|
|
|
close(back_channel)
|
2024-05-06 01:10:58 +01:00
|
|
|
go RunnerOrchestrator(db, config, handler)
|
2024-04-15 23:04:53 +01:00
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2024-04-12 20:36:23 +01:00
|
|
|
go attentionSeeker(config, back_channel)
|
|
|
|
|
|
|
|
// Start the runners
|
|
|
|
for i := 0; i < gpu_workers; i++ {
|
|
|
|
task_runners[i] = make(chan Task, 10)
|
|
|
|
task_runners_used[i] = false
|
2024-05-10 02:12:40 +01:00
|
|
|
AddLocalRunner(handler, LocalRunner{
|
|
|
|
RunnerNum: i + 1,
|
|
|
|
Task: nil,
|
|
|
|
})
|
2024-04-15 23:04:53 +01:00
|
|
|
go runner(config, db, task_runners[i], i+1, back_channel)
|
2024-04-12 20:36:23 +01:00
|
|
|
}
|
|
|
|
|
2024-04-15 23:04:53 +01:00
|
|
|
var task_to_dispatch *Task = nil
|
2024-04-12 20:36:23 +01:00
|
|
|
|
|
|
|
for i := range back_channel {
|
|
|
|
|
2024-05-10 02:12:40 +01:00
|
|
|
if i != 0 {
|
|
|
|
if i > 0 {
|
|
|
|
logger.Info("Runner freed", "runner", i)
|
|
|
|
task_runners_used[i-1] = false
|
|
|
|
} else if i < 0 {
|
|
|
|
logger.Error("Runner died! Restarting!", "runner", i)
|
|
|
|
i = int(math.Abs(float64(i)) - 1)
|
|
|
|
task_runners_used[i] = false
|
|
|
|
go runner(config, db, task_runners[i], i+1, back_channel)
|
|
|
|
}
|
|
|
|
AddLocalTask(handler, int(math.Abs(float64(i))), nil)
|
2024-04-12 20:36:23 +01:00
|
|
|
}
|
|
|
|
|
2024-04-15 23:04:53 +01:00
|
|
|
if task_to_dispatch == nil {
|
2024-04-17 14:56:57 +01:00
|
|
|
var task TaskT
|
|
|
|
err := GetDBOnce(db, &task, "tasks as t "+
|
|
|
|
// Get depenencies
|
|
|
|
"left join tasks_dependencies as td on t.id=td.main_id "+
|
|
|
|
// Get the task that the depencey resolves to
|
|
|
|
"left join tasks as t2 on t2.id=td.dependent_id "+
|
|
|
|
"where t.status=1 "+
|
|
|
|
"group by t.id having count(td.id) filter (where t2.status in (0,1,2,3)) = 0;")
|
2024-04-15 23:04:53 +01:00
|
|
|
if err != NotFoundError && err != nil {
|
2024-04-17 14:56:57 +01:00
|
|
|
log.Error("Failed to get tasks from db", "err", err)
|
2024-04-15 23:04:53 +01:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
if err == NotFoundError {
|
|
|
|
task_to_dispatch = nil
|
|
|
|
} else {
|
2024-04-17 14:56:57 +01:00
|
|
|
temp := Task(task)
|
|
|
|
task_to_dispatch = &temp
|
2024-04-15 23:04:53 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-05-09 01:23:43 +01:00
|
|
|
if task_to_dispatch != nil && task_to_dispatch.TaskType != int(TASK_TYPE_DELETE_USER) {
|
|
|
|
// TODO split tasks into cpu tasks and GPU tasks
|
2024-05-06 01:10:58 +01:00
|
|
|
mutex := handler.DataMap["runners_mutex"].(*sync.Mutex)
|
|
|
|
mutex.Lock()
|
|
|
|
remote_runners := handler.DataMap["runners"].(map[string]interface{})
|
|
|
|
|
|
|
|
for k, v := range remote_runners {
|
|
|
|
runner_data := v.(map[string]interface{})
|
|
|
|
runner_info := runner_data["runner_info"].(*Runner)
|
|
|
|
|
|
|
|
if runner_data["task"] != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2024-05-09 01:23:43 +01:00
|
|
|
if runner_info.UserId != task_to_dispatch.UserId {
|
|
|
|
continue
|
2024-04-15 23:04:53 +01:00
|
|
|
}
|
2024-05-09 01:23:43 +01:00
|
|
|
|
|
|
|
go handleRemoteTask(handler, base, k, *task_to_dispatch)
|
|
|
|
task_to_dispatch = nil
|
|
|
|
break
|
2024-04-15 23:04:53 +01:00
|
|
|
}
|
2024-05-06 01:10:58 +01:00
|
|
|
|
|
|
|
mutex.Unlock()
|
2024-04-15 23:04:53 +01:00
|
|
|
}
|
2024-04-12 20:36:23 +01:00
|
|
|
|
2024-05-09 01:23:43 +01:00
|
|
|
if task_to_dispatch != nil {
|
|
|
|
for i := 0; i < len(task_runners_used); i += 1 {
|
|
|
|
if !task_runners_used[i] {
|
|
|
|
task_runners[i] <- *task_to_dispatch
|
|
|
|
task_runners_used[i] = true
|
2024-05-10 02:12:40 +01:00
|
|
|
AddLocalTask(handler, i+1, task_to_dispatch)
|
2024-05-09 01:23:43 +01:00
|
|
|
task_to_dispatch = nil
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-04-12 20:36:23 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-05-06 01:10:58 +01:00
|
|
|
func StartRunners(db db.Db, config Config, handler *Handle) {
|
|
|
|
go RunnerOrchestrator(db, config, handler)
|
2024-04-12 20:36:23 +01:00
|
|
|
}
|