From b1e4211e6a4981ac089e0602734fe4cc20c08a0a Mon Sep 17 00:00:00 2001 From: Andre Henriques Date: Mon, 6 May 2024 12:48:02 +0100 Subject: [PATCH] more work on the rust runner --- logic/models/train/train.go | 9 +++++ runner/Cargo.lock | 1 + runner/Cargo.toml | 1 + runner/src/dataloader.rs | 71 ++++++++++++++++++--------------- runner/src/model/mod.rs | 24 +++++++++-- runner/src/tasks.rs | 2 +- runner/src/training.rs | 79 +++++++++++++++++++++++++++++++------ 7 files changed, 140 insertions(+), 47 deletions(-) diff --git a/logic/models/train/train.go b/logic/models/train/train.go index 1457747..0309cfe 100644 --- a/logic/models/train/train.go +++ b/logic/models/train/train.go @@ -1280,6 +1280,15 @@ func generateDefinition(c BasePack, model *BaseModel, target_accuracy int, numbe order++ if complexity == 0 { + /* + _, err = def.MakeLayer(db, order, LAYER_SIMPLE_BLOCK, "") + if err != nil { + failed() + return + } + order++ + */ + _, err = def.MakeLayer(db, order, LAYER_FLATTEN, "") if err != nil { failed() diff --git a/runner/Cargo.lock b/runner/Cargo.lock index 4305286..c2b5d00 100644 --- a/runner/Cargo.lock +++ b/runner/Cargo.lock @@ -1014,6 +1014,7 @@ name = "runner" version = "0.1.0" dependencies = [ "anyhow", + "rand", "reqwest", "serde", "serde_json", diff --git a/runner/Cargo.toml b/runner/Cargo.toml index 459ed1e..f9cb801 100644 --- a/runner/Cargo.toml +++ b/runner/Cargo.toml @@ -14,3 +14,4 @@ tokio = { version = "1", features = ["full"] } serde_json = "1.0.116" serde_repr = "0.1" tch = { version = "0.16.0", features = ["download-libtorch"] } +rand = "0.8.5" diff --git a/runner/src/dataloader.rs b/runner/src/dataloader.rs index 0640af8..281e4f9 100644 --- a/runner/src/dataloader.rs +++ b/runner/src/dataloader.rs @@ -10,6 +10,31 @@ pub struct DataLoader { pub pos: usize, } +fn import_image( + item: &DataPoint, + base_path: &Path, + classes_len: i64, + inputs: &mut Vec, + labels: &mut Vec, +) { + inputs.push( + tch::vision::image::load(base_path.join(&item.path)) + .ok() + .unwrap() + .unsqueeze(0), + ); + + if item.class >= 0 { + let t = tch::Tensor::from_slice(&[item.class]).onehot(classes_len as i64); + labels.push(t); + } else { + labels.push(tch::Tensor::zeros( + [1, classes_len as i64], + (tch::Kind::Float, tch::Device::Cpu), + )) + } +} + impl DataLoader { pub fn new( config: Arc, @@ -21,6 +46,14 @@ impl DataLoader { let min_len: i64 = len.floor() as i64; let max_len: i64 = len.ceil() as i64; + println!( + "Creating dataloader data len: {} len: {} min_len: {} max_len:{}", + data.len(), + len, + min_len, + max_len + ); + let base_path = Path::new(&config.data_path); let mut inputs: Vec = Vec::new(); @@ -32,53 +65,27 @@ impl DataLoader { for image in 0..batch_size { let i: usize = (batch * batch_size + image).try_into().unwrap(); let item = &data[i]; - batch_acc.push( - tch::vision::image::load(base_path.join(&item.path)) - .ok() - .unwrap(), - ); - - if item.class >= 0 { - let t = tch::Tensor::from_slice(&[item.class]) - .onehot(classes_len.try_into().unwrap()); - labels.push(t); - } else { - labels.push(tch::Tensor::zeros( - (classes_len), - (tch::Kind::Float, tch::Device::Cpu), - )) - } + import_image(item, base_path, classes_len, &mut batch_acc, &mut labels) } inputs.push(tch::Tensor::cat(&batch_acc[0..], 0)); all_labels.push(tch::Tensor::cat(&labels[0..], 0)); } + // Import the last batch that has irregular sizing if min_len != max_len { let mut batch_acc: Vec = Vec::new(); let mut labels: Vec = Vec::new(); for image in 0..(data.len() - (batch_size * min_len) as usize) { let i: usize = (min_len * batch_size + (image as i64)) as usize; let item = &data[i]; - batch_acc.push( - tch::vision::image::load(base_path.join(&item.path)) - .ok() - .unwrap(), - ); - - if item.class >= 0 { - let t = tch::Tensor::from_slice(&[item.class]).onehot(classes_len); - labels.push(t); - } else { - labels.push(tch::Tensor::zeros( - classes_len, - (tch::Kind::Float, tch::Device::Cpu), - )) - } + import_image(item, base_path, classes_len, &mut batch_acc, &mut labels); } inputs.push(tch::Tensor::cat(&batch_acc[0..], 0)); all_labels.push(tch::Tensor::cat(&labels[0..], 0)); } + println!("ins shape: {:?}", inputs[0].size()); + return DataLoader { batch_size, inputs, @@ -101,6 +108,8 @@ impl DataLoader { let label = self.labels[self.pos].empty_like(); self.labels[self.pos] = self.labels[self.pos].clone(&label); + self.pos += 1; + return Some((input, label)); } } diff --git a/runner/src/model/mod.rs b/runner/src/model/mod.rs index 2b0536d..38feaa4 100644 --- a/runner/src/model/mod.rs +++ b/runner/src/model/mod.rs @@ -39,7 +39,7 @@ pub struct DataPoint { } pub fn build_model(layers: Vec, last_linear_size: i64, add_sigmoid: bool) -> Model { - let vs = nn::VarStore::new(Device::Cpu); + let vs = nn::VarStore::new(Device::Cuda(0)); let mut seq = nn::seq(); @@ -77,14 +77,32 @@ pub fn build_model(layers: Vec, last_linear_size: i64, add_sigmoid: bool) ) } LayerType::SimpleBlock => { - panic!("DO not create Simple blocks yet"); let new_last_linear_conv = vec![128, last_linear_conv[1] / 2, last_linear_conv[2] / 2]; println!( "Layer: block, In: {:?}, Put: {:?}", last_linear_conv, new_last_linear_conv, ); - //TODO + let out_size = vec![new_last_linear_conv[1], new_last_linear_conv[2]]; + seq = seq + .add(nn::conv2d( + &vs.root(), + last_linear_conv[0], + 128, + 3, + nn::ConvConfig::default(), + )) + .add_fn(|xs| xs.relu()) + .add(nn::conv2d( + &vs.root(), + 128, + 128, + 3, + nn::ConvConfig::default(), + )) + .add_fn(|xs| xs.relu()) + .add_fn(move |xs| xs.adaptive_avg_pool2d([out_size[1], out_size[1]])) + .add_fn(|xs| xs.leaky_relu()); //m_layers = append(m_layers, NewSimpleBlock(vs, lastLinearConv[0])) last_linear_conv = new_last_linear_conv; } diff --git a/runner/src/tasks.rs b/runner/src/tasks.rs index 60d61dc..9b54157 100644 --- a/runner/src/tasks.rs +++ b/runner/src/tasks.rs @@ -50,7 +50,7 @@ pub async fn fail_task( runner_data: Arc, reason: &str, ) -> Result<()> { - println!("Marking Task as faield"); + println!("Marking Task as failed"); let client = reqwest::Client::new(); diff --git a/runner/src/training.rs b/runner/src/training.rs index 848a7cf..99423ed 100644 --- a/runner/src/training.rs +++ b/runner/src/training.rs @@ -5,13 +5,17 @@ use crate::{ tasks::{fail_task, Task}, types::{DataPointRequest, Definition, ModelClass}, }; -use std::sync::Arc; +use std::{ + io::{self, Write}, + sync::Arc, +}; use anyhow::Result; +use rand::{seq::SliceRandom, thread_rng}; use serde_json::json; use tch::{ nn::{self, Module, OptimizerConfig}, - Tensor, + Cuda, Tensor, }; pub async fn handle_train( @@ -36,6 +40,12 @@ pub async fn handle_train( .json() .await?; + if defs.len() == 0 { + println!("No defs found"); + fail_task(task, config, runner_data, "No definitions found").await?; + return Ok(()); + } + let classes: Vec = client .post(format!("{}/tasks/runner/train/classes", config.hostname)) .header("token", &config.token) @@ -54,7 +64,11 @@ pub async fn handle_train( .json() .await?; - let mut data_loader = DataLoader::new(config.clone(), data.testing, classes.len() as i64, 64); + let mut testing = data.testing; + + testing.shuffle(&mut thread_rng()); + + let mut data_loader = DataLoader::new(config.clone(), testing, classes.len() as i64, 64); // TODO make this a vec let mut model: Option = None; @@ -294,30 +308,71 @@ async fn train_definition( build_model(layers, 0, true) }); - println!("here1!"); - // TODO CUDA // get device // Move model to cuda - let mut opt = nn::Adam::default().build(&model.vs, 1e-5)?; + let mut opt = nn::Adam::default().build(&model.vs, 1e-3)?; - println!("here2!"); + let mut last_acc = 0.0; - for epoch in 1..20 { + for epoch in 1..40 { data_loader.restart(); + let mut mean_loss: f64 = 0.0; + let mut mean_acc: f64 = 0.0; while let Some((inputs, labels)) = data_loader.next() { - let inputs = inputs.to_kind(tch::Kind::Float); - let labels = labels.to_kind(tch::Kind::Float); - println!("ins: {:?} labels: {:?}", inputs.size(), labels.size()); + let inputs = inputs + .to_kind(tch::Kind::Float) + .to_device(tch::Device::Cuda(0)); + let labels = labels + .to_kind(tch::Kind::Float) + .to_device(tch::Device::Cuda(0)); let out = model.seq.forward(&inputs); let weight: Option = None; let loss = out.binary_cross_entropy(&labels, weight, tch::Reduction::Mean); opt.backward_step(&loss); - println!("out: {:?}", out); + mean_loss += loss + .to_device(tch::Device::Cpu) + .unsqueeze(0) + .double_value(&[0]); + + let out = out.to_device(tch::Device::Cpu); + + let test = out.empty_like(); + _ = out.clone(&test); + + let out = test.argmax(1, true); + + let mut labels = labels.to_device(tch::Device::Cpu); + + labels = labels.unsqueeze(-1); + + let size = out.size()[0]; + + let mut acc = 0; + for i in 0..size { + let res = out.double_value(&[i]); + let exp = labels.double_value(&[i, res as i64]); + if exp == 1.0 { + acc += 1; + } + } + + mean_acc += acc as f64 / size as f64; + last_acc = acc as f64 / size as f64; } + print!( + "\repoch: {} loss: {} acc: {} l acc: {} ", + epoch, + mean_loss / data_loader.len as f64, + mean_acc / data_loader.len as f64, + last_acc + ); + io::stdout().flush().expect("Unable to flush stdout"); } + println!("\nlast acc: {}", last_acc); + return Ok(Some(model)); /*