2024-05-06 01:10:58 +01:00
package tasks
import (
2024-05-07 01:16:38 +01:00
"os"
"path"
2024-05-06 01:10:58 +01:00
"sync"
"time"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/db_types"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/models/train"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/tasks/utils"
. "git.andr3h3nriqu3s.com/andr3/fyp/logic/utils"
)
func verifyRunner ( c * Context , dat * JustId ) ( runner * Runner , e * Error ) {
runner , err := GetRunner ( c , dat . Id )
if err == NotFoundError {
e = c . JsonBadRequest ( "Could not find runner, please register runner first" )
return
} else if err != nil {
e = c . E500M ( "Failed to get information about the runner" , err )
return
}
if runner . Token != * c . Token {
return nil , c . SendJSONStatus ( 401 , "Only runners can use this funcion" )
}
return
}
type VerifyTask struct {
Id string ` json:"id" validate:"required" `
TaskId string ` json:"taskId" validate:"required" `
}
func verifyTask ( x * Handle , c * Context , dat * VerifyTask ) ( task * Task , error * Error ) {
mutex := x . DataMap [ "runners_mutex" ] . ( * sync . Mutex )
mutex . Lock ( )
defer mutex . Unlock ( )
var runners map [ string ] interface { } = x . DataMap [ "runners" ] . ( map [ string ] interface { } )
if runners [ dat . Id ] == nil {
return nil , c . JsonBadRequest ( "Runner not active" )
}
var runner_data map [ string ] interface { } = runners [ dat . Id ] . ( map [ string ] interface { } )
if runner_data [ "task" ] == nil {
return nil , c . SendJSONStatus ( 404 , "No active task" )
}
return runner_data [ "task" ] . ( * Task ) , nil
}
func handleRemoteRunner ( x * Handle ) {
type RegisterRunner struct {
Token string ` json:"token" validate:"required" `
Type RunnerType ` json:"type" validate:"required" `
}
PostAuthJson ( x , "/tasks/runner/register" , User_Normal , func ( c * Context , dat * RegisterRunner ) * Error {
if * c . Token != dat . Token {
// TODO do admin
return c . E500M ( "Please make sure that the token is the same that is being registered" , nil )
}
c . Logger . Info ( "test" , "dat" , dat )
var runner Runner
err := GetDBOnce ( c , & runner , "remote_runner as ru where token=$1" , dat . Token )
if err != NotFoundError && err != nil {
return c . E500M ( "Failed to get information remote runners" , err )
}
if err != NotFoundError {
return c . JsonBadRequest ( "Token is already registered by a runner" )
}
// TODO get id from token passed by when doing admin
var userId = c . User . Id
var new_runner = struct {
Type RunnerType
UserId string ` db:"user_id" `
Token string
} {
Type : dat . Type ,
Token : dat . Token ,
UserId : userId ,
}
id , err := InsertReturnId ( c , & new_runner , "remote_runner" , "id" )
if err != nil {
return c . E500M ( "Failed to create remote runner" , err )
}
return c . SendJSON ( struct {
Id string ` json:"id" `
} {
Id : id ,
} )
} )
// TODO remove runner
PostAuthJson ( x , "/tasks/runner/init" , User_Normal , func ( c * Context , dat * JustId ) * Error {
runner , error := verifyRunner ( c , dat )
if error != nil {
return error
}
mutex := x . DataMap [ "runners_mutex" ] . ( * sync . Mutex )
mutex . Lock ( )
defer mutex . Unlock ( )
var runners map [ string ] interface { } = x . DataMap [ "runners" ] . ( map [ string ] interface { } )
if runners [ dat . Id ] != nil {
c . Logger . Info ( "Logger trying to register but already registerd" )
c . ShowMessage = false
return c . SendJSON ( "Ok" )
}
var new_runner = map [ string ] interface { } { }
new_runner [ "last_time_check" ] = time . Now ( )
new_runner [ "runner_info" ] = runner
runners [ dat . Id ] = new_runner
x . DataMap [ "runners" ] = runners
return c . SendJSON ( "Ok" )
} )
PostAuthJson ( x , "/tasks/runner/active" , User_Normal , func ( c * Context , dat * JustId ) * Error {
_ , error := verifyRunner ( c , dat )
if error != nil {
return error
}
mutex := x . DataMap [ "runners_mutex" ] . ( * sync . Mutex )
mutex . Lock ( )
defer mutex . Unlock ( )
var runners map [ string ] interface { } = x . DataMap [ "runners" ] . ( map [ string ] interface { } )
if runners [ dat . Id ] == nil {
return c . JsonBadRequest ( "Runner not active" )
}
var runner_data map [ string ] interface { } = runners [ dat . Id ] . ( map [ string ] interface { } )
if runner_data [ "task" ] == nil {
c . ShowMessage = false
return c . SendJSONStatus ( 404 , "No active task" )
}
c . ShowMessage = false
// This should be a task obj
return c . SendJSON ( runner_data [ "task" ] )
} )
PostAuthJson ( x , "/tasks/runner/ready" , User_Normal , func ( c * Context , dat * VerifyTask ) * Error {
_ , error := verifyRunner ( c , & JustId { Id : dat . Id } )
if error != nil {
return error
}
task , error := verifyTask ( x , c , dat )
if error != nil {
return error
}
err := task . UpdateStatus ( c , TASK_RUNNING , "Task Running on Runner" )
if err != nil {
return c . E500M ( "Failed to set task status" , err )
}
return c . SendJSON ( "Ok" )
} )
type TaskFail struct {
Id string ` json:"id" validate:"required" `
TaskId string ` json:"taskId" validate:"required" `
Reason string ` json:"reason" validate:"required" `
}
PostAuthJson ( x , "/tasks/runner/fail" , User_Normal , func ( c * Context , dat * TaskFail ) * Error {
_ , error := verifyRunner ( c , & JustId { Id : dat . Id } )
if error != nil {
return error
}
task , error := verifyTask ( x , c , & VerifyTask { Id : dat . Id , TaskId : dat . TaskId } )
if error != nil {
return error
}
err := task . UpdateStatus ( c , TASK_FAILED_RUNNING , dat . Reason )
if err != nil {
return c . E500M ( "Failed to set task status" , err )
}
// Do extra clean up on tasks
switch task . TaskType {
case int ( TASK_TYPE_TRAINING ) :
CleanUpFailed ( c , task )
default :
panic ( "Do not know how to handle this" )
}
mutex := x . DataMap [ "runners_mutex" ] . ( * sync . Mutex )
mutex . Lock ( )
defer mutex . Unlock ( )
var runners map [ string ] interface { } = x . DataMap [ "runners" ] . ( map [ string ] interface { } )
var runner_data map [ string ] interface { } = runners [ dat . Id ] . ( map [ string ] interface { } )
runner_data [ "task" ] = nil
runners [ dat . Id ] = runner_data
x . DataMap [ "runners" ] = runners
return c . SendJSON ( "Ok" )
} )
PostAuthJson ( x , "/tasks/runner/train/defs" , User_Normal , func ( c * Context , dat * VerifyTask ) * Error {
_ , error := verifyRunner ( c , & JustId { Id : dat . Id } )
if error != nil {
return error
}
task , error := verifyTask ( x , c , dat )
if error != nil {
return error
}
if task . TaskType != int ( TASK_TYPE_TRAINING ) {
c . Logger . Error ( "Task not is not the right type to get the definitions" , "task type" , task . TaskType )
return c . JsonBadRequest ( "Task is not the right type go get the definitions" )
}
model , err := GetBaseModel ( c , * task . ModelId )
if err != nil {
return c . E500M ( "Failed to get model information" , err )
}
defs , err := model . GetDefinitions ( c , "and md.status=$2" , DEFINITION_STATUS_INIT )
if err != nil {
return c . E500M ( "Failed to get the model definitions" , err )
}
return c . SendJSON ( defs )
} )
PostAuthJson ( x , "/tasks/runner/train/classes" , User_Normal , func ( c * Context , dat * VerifyTask ) * Error {
_ , error := verifyRunner ( c , & JustId { Id : dat . Id } )
if error != nil {
return error
}
task , error := verifyTask ( x , c , dat )
if error != nil {
return error
}
if task . TaskType != int ( TASK_TYPE_TRAINING ) {
c . Logger . Error ( "Task not is not the right type to get the definitions" , "task type" , task . TaskType )
return c . JsonBadRequest ( "Task is not the right type go get the definitions" )
}
model , err := GetBaseModel ( c , * task . ModelId )
if err != nil {
return c . E500M ( "Failed to get model information" , err )
}
classes , err := model . GetClasses ( c , "and status=$2 order by mc.class_order asc" , CLASS_STATUS_TO_TRAIN )
if err != nil {
return c . E500M ( "Failed to get the model classes" , err )
}
return c . SendJSON ( classes )
} )
type RunnerTrainDefStatus struct {
Id string ` json:"id" validate:"required" `
TaskId string ` json:"taskId" validate:"required" `
DefId string ` json:"defId" validate:"required" `
Status DefinitionStatus ` json:"status" validate:"required" `
}
PostAuthJson ( x , "/tasks/runner/train/def/status" , User_Normal , func ( c * Context , dat * RunnerTrainDefStatus ) * Error {
_ , error := verifyRunner ( c , & JustId { Id : dat . Id } )
if error != nil {
return error
}
task , error := verifyTask ( x , c , & VerifyTask { Id : dat . Id , TaskId : dat . TaskId } )
if error != nil {
return error
}
if task . TaskType != int ( TASK_TYPE_TRAINING ) {
c . Logger . Error ( "Task not is not the right type to get the definitions" , "task type" , task . TaskType )
return c . JsonBadRequest ( "Task is not the right type go get the definitions" )
}
def , err := GetDefinition ( c , dat . DefId )
if err != nil {
return c . E500M ( "Failed to get definition information" , err )
}
err = def . UpdateStatus ( c , dat . Status )
if err != nil {
return c . E500M ( "Failed to update model status" , err )
}
return c . SendJSON ( "Ok" )
} )
type RunnerTrainDefLayers struct {
Id string ` json:"id" validate:"required" `
TaskId string ` json:"taskId" validate:"required" `
DefId string ` json:"defId" validate:"required" `
}
PostAuthJson ( x , "/tasks/runner/train/def/layers" , User_Normal , func ( c * Context , dat * RunnerTrainDefLayers ) * Error {
_ , error := verifyRunner ( c , & JustId { Id : dat . Id } )
if error != nil {
return error
}
task , error := verifyTask ( x , c , & VerifyTask { Id : dat . Id , TaskId : dat . TaskId } )
if error != nil {
return error
}
if task . TaskType != int ( TASK_TYPE_TRAINING ) {
c . Logger . Error ( "Task not is not the right type to get the definitions" , "task type" , task . TaskType )
return c . JsonBadRequest ( "Task is not the right type go get the definitions" )
}
def , err := GetDefinition ( c , dat . DefId )
if err != nil {
return c . E500M ( "Failed to get definition information" , err )
}
layers , err := def . GetLayers ( c , " order by layer_order asc" )
if err != nil {
return c . E500M ( "Failed to get layers" , err )
}
return c . SendJSON ( layers )
} )
PostAuthJson ( x , "/tasks/runner/train/datapoints" , User_Normal , func ( c * Context , dat * VerifyTask ) * Error {
_ , error := verifyRunner ( c , & JustId { Id : dat . Id } )
if error != nil {
return error
}
task , error := verifyTask ( x , c , dat )
if error != nil {
return error
}
if task . TaskType != int ( TASK_TYPE_TRAINING ) {
c . Logger . Error ( "Task not is not the right type to get the definitions" , "task type" , task . TaskType )
return c . JsonBadRequest ( "Task is not the right type go get the definitions" )
}
model , err := GetBaseModel ( c , * task . ModelId )
if err != nil {
return c . E500M ( "Failed to get model information" , err )
}
training_points , err := model . DataPoints ( c , DATA_POINT_MODE_TRAINING )
if err != nil {
return c . E500M ( "Failed to get the model classes" , err )
}
testing_points , err := model . DataPoints ( c , DATA_POINT_MODE_TRAINING )
if err != nil {
return c . E500M ( "Failed to get the model classes" , err )
}
return c . SendJSON ( struct {
Testing [ ] DataPoint ` json:"testing" `
Training [ ] DataPoint ` json:"training" `
} {
Testing : testing_points ,
Training : training_points ,
} )
} )
2024-05-07 01:16:38 +01:00
type RunnerTrainDefEpoch struct {
Id string ` json:"id" validate:"required" `
TaskId string ` json:"taskId" validate:"required" `
DefId string ` json:"defId" validate:"required" `
Epoch int ` json:"epoch" validate:"required" `
Accuracy float64 ` json:"accuracy" validate:"required" `
}
PostAuthJson ( x , "/tasks/runner/train/epoch" , User_Normal , func ( c * Context , dat * RunnerTrainDefEpoch ) * Error {
_ , error := verifyRunner ( c , & JustId { Id : dat . Id } )
if error != nil {
return error
}
task , error := verifyTask ( x , c , & VerifyTask {
Id : dat . Id ,
TaskId : dat . TaskId ,
} )
if error != nil {
return error
}
if task . TaskType != int ( TASK_TYPE_TRAINING ) {
c . Logger . Error ( "Task not is not the right type to get the definitions" , "task type" , task . TaskType )
return c . JsonBadRequest ( "Task is not the right type go get the definitions" )
}
def , err := GetDefinition ( c , dat . DefId )
if err != nil {
return c . E500M ( "Failed to get definition information" , err )
}
err = def . UpdateAfterEpoch ( c , dat . Accuracy , dat . Epoch )
if err != nil {
return c . E500M ( "Failed to update model" , err )
}
return c . SendJSON ( "Ok" )
} )
PostAuthJson ( x , "/task/runner/train/mark-failed" , User_Normal , func ( c * Context , dat * VerifyTask ) * Error {
_ , error := verifyRunner ( c , & JustId { Id : dat . Id } )
if error != nil {
return error
}
task , error := verifyTask ( x , c , & VerifyTask {
Id : dat . Id ,
TaskId : dat . TaskId ,
} )
if error != nil {
return error
}
if task . TaskType != int ( TASK_TYPE_TRAINING ) {
c . Logger . Error ( "Task not is not the right type to get the definitions" , "task type" , task . TaskType )
return c . JsonBadRequest ( "Task is not the right type go get the definitions" )
}
_ , err := c . Exec (
"update model_definition set status=$1 " +
"where model_id=$2 and status in ($3, $4)" ,
MODEL_DEFINITION_STATUS_CANCELD_TRAINING ,
task . ModelId ,
MODEL_DEFINITION_STATUS_TRAINING ,
MODEL_DEFINITION_STATUS_PAUSED_TRAINING ,
)
if err != nil {
return c . E500M ( "Failed to mark definition as failed" , err )
}
return c . SendJSON ( "Ok" )
} )
PostAuthJson ( x , "/task/runner/train/done" , User_Normal , func ( c * Context , dat * VerifyTask ) * Error {
_ , error := verifyRunner ( c , & JustId { Id : dat . Id } )
if error != nil {
return error
}
task , error := verifyTask ( x , c , dat )
if error != nil {
return error
}
if task . TaskType != int ( TASK_TYPE_TRAINING ) {
c . Logger . Error ( "Task not is not the right type to get the definitions" , "task type" , task . TaskType )
return c . JsonBadRequest ( "Task is not the right type go get the definitions" )
}
model , err := GetBaseModel ( c , * task . ModelId )
if err != nil {
c . Logger . Error ( "Failed to get model" , "err" , err )
return c . E500M ( "Failed to get mode" , err )
}
var def Definition
err = GetDBOnce ( c , & def , "from model_definition as md where model_id=$1 and status=$2 order by accuracy desc limit 1;" , task . ModelId , DEFINITION_STATUS_TRANIED )
if err == NotFoundError {
// TODO Make the Model status have a message
c . Logger . Error ( "All definitions failed to train!" )
model . UpdateStatus ( c , FAILED_TRAINING )
task . UpdateStatusLog ( c , TASK_FAILED_RUNNING , "All definition failed to train!" )
return c . SendJSON ( "Ok" )
} else if err != nil {
model . UpdateStatus ( c , FAILED_TRAINING )
task . UpdateStatusLog ( c , TASK_FAILED_RUNNING , "Failed to get model definition" )
return c . E500M ( "Failed to get model definition" , err )
}
if err = def . UpdateStatus ( c , DEFINITION_STATUS_READY ) ; err != nil {
model . UpdateStatus ( c , FAILED_TRAINING )
task . UpdateStatusLog ( c , TASK_FAILED_RUNNING , "Failed to update model definition" )
return c . E500M ( "Failed to update model definition" , err )
}
to_delete , err := c . Query ( "select id from model_definition where status != $1 and model_id=$2" , MODEL_DEFINITION_STATUS_READY , model . Id )
if err != nil {
model . UpdateStatus ( c , FAILED_TRAINING )
task . UpdateStatusLog ( c , TASK_FAILED_RUNNING , "Failed to delete unsed definitions" )
return c . E500M ( "Failed to delete unsed definitions" , err )
}
defer to_delete . Close ( )
for to_delete . Next ( ) {
var id string
if err = to_delete . Scan ( & id ) ; err != nil {
model . UpdateStatus ( c , FAILED_TRAINING )
task . UpdateStatusLog ( c , TASK_FAILED_RUNNING , "Failed to delete unsed definitions" )
return c . E500M ( "Failed to delete unsed definitions" , err )
}
os . RemoveAll ( path . Join ( "savedData" , model . Id , "defs" , id ) )
}
// TODO Check if returning also works here
if _ , err = c . Exec ( "delete from model_definition where status!=$1 and model_id=$2;" , MODEL_DEFINITION_STATUS_READY , model . Id ) ; err != nil {
model . UpdateStatus ( c , FAILED_TRAINING )
task . UpdateStatusLog ( c , TASK_FAILED_RUNNING , "Failed to delete unsed definitions" )
return c . E500M ( "Failed to delete unsed definitions" , err )
}
model . UpdateStatus ( c , READY )
return c . SendJSON ( "Ok" )
} )
2024-05-06 01:10:58 +01:00
}