diff --git a/logic/models/train/train.go b/logic/models/train/train.go index ed5136f..6e73cde 100644 --- a/logic/models/train/train.go +++ b/logic/models/train/train.go @@ -211,33 +211,33 @@ func trainDefinitionExp(c *Context, model *BaseModel, definition_id string, load if err != nil { return } - defer rows.Close() + defer rows.Close() type ExpHead struct { - id string + id string start int end int } - exp := ExpHead{} + exp := ExpHead{} - if rows.Next() { - if err = rows.Scan(&exp.id, &exp.start, &exp.end); err == nil { - return - } - } else { - log.Error("Failed to get the exp head of the model") - err = errors.New("Failed to get the exp head of the model") - return - } + if rows.Next() { + if err = rows.Scan(&exp.id, &exp.start, &exp.end); err == nil { + return + } + } else { + log.Error("Failed to get the exp head of the model") + err = errors.New("Failed to get the exp head of the model") + return + } - if rows.Next() { - log.Error("This training function can only train one model at the time") - err = errors.New("This training function can only train one model at the time") - return - } + if rows.Next() { + log.Error("This training function can only train one model at the time") + err = errors.New("This training function can only train one model at the time") + return + } - layers, err := c.Db.Query("select layer_type, shape from model_definition_layer where def_id=$1 order by layer_order asc;", definition_id) + layers, err := c.Db.Query("select layer_type, shape, exp_type from model_definition_layer where def_id=$1 order by layer_order asc;", definition_id) if err != nil { return } @@ -246,23 +246,36 @@ func trainDefinitionExp(c *Context, model *BaseModel, definition_id string, load type layerrow struct { LayerType int Shape string + ExpType int + LayerNum int } got := []layerrow{} + remove_top_count := 1 + + i := 1 + for layers.Next() { var row = layerrow{} - if err = layers.Scan(&row.LayerType, &row.Shape); err != nil { + if err = layers.Scan(&row.LayerType, &row.Shape, &row.ExpType); err != nil { return } + row.LayerNum = i + if row.ExpType == 2 { + remove_top_count += 1 + } row.Shape = shapeToSize(row.Shape) got = append(got, row) + i += 1 } - got = append(got, layerrow{ - LayerType: LAYER_DENSE, - Shape: fmt.Sprintf("%d", exp.end - exp.start), - }) + got = append(got, layerrow{ + LayerType: LAYER_DENSE, + Shape: fmt.Sprintf("%d", exp.end-exp.start), + ExpType: 2, + LayerNum: i, + }) // Generate run folder run_path := path.Join("/tmp", model.Id, "defs", definition_id) @@ -278,7 +291,7 @@ func trainDefinitionExp(c *Context, model *BaseModel, definition_id string, load return } - // TODO update the run script + // TODO update the run script // Create python script f, err := os.Create(path.Join(run_path, "run.py")) @@ -287,7 +300,7 @@ func trainDefinitionExp(c *Context, model *BaseModel, definition_id string, load } defer f.Close() - tmpl, err := template.New("python_model_template.py").ParseFiles("views/py/python_model_template.py") + tmpl, err := template.New("python_model_template-exp.py").ParseFiles("views/py/python_model_template.py") if err != nil { return } @@ -307,6 +320,7 @@ func trainDefinitionExp(c *Context, model *BaseModel, definition_id string, load "LoadPrev": load_prev, "LastModelRunPath": path.Join(getDir(), result_path, "model.keras"), "SaveModelPath": path.Join(getDir(), result_path), + "RemoveTopCount": remove_top_count, }); err != nil { return } diff --git a/views/py/python_model_template-exp.py b/views/py/python_model_template-exp.py new file mode 100644 index 0000000..d7a40d6 --- /dev/null +++ b/views/py/python_model_template-exp.py @@ -0,0 +1,177 @@ +import tensorflow as tf +import random +import pandas as pd +from tensorflow import keras +from tensorflow.data import AUTOTUNE +from keras import layers, losses, optimizers +import requests + +class NotifyServerCallback(tf.keras.callbacks.Callback): + def on_epoch_end(self, epoch, log, *args, **kwargs): + requests.get(f'http://localhost:8000/model/epoch/update?model_id={{.Model.Id}}&epoch={epoch + 1}&accuracy={log["accuracy"]}&definition={{.DefId}}') + + +DATA_DIR = "{{ .DataDir }}" +image_size = ({{ .Size }}) + +df = pd.read_csv("{{ .RunPath }}/train.csv", dtype=str) +keys = tf.constant(df['Id'].dropna()) +values = tf.constant(list(map(int, df['Index'].dropna()))) + +table = tf.lookup.StaticHashTable( + initializer=tf.lookup.KeyValueTensorInitializer( + keys=keys, + values=values, + ), + default_value=tf.constant(-1), + name="Indexes" +) + +DATA_DIR_PREPARE = DATA_DIR + "/" + +#based on https://www.tensorflow.org/tutorials/load_data/images +def pathToLabel(path): + path = tf.strings.regex_replace(path, DATA_DIR_PREPARE, "") + {{ if eq .Model.Format "png" }} + path = tf.strings.regex_replace(path, ".png", "") + {{ else if eq .Model.Format "jpeg" }} + path = tf.strings.regex_replace(path, ".jpeg", "") + {{ else }} + ERROR + {{ end }} + return table.lookup(tf.strings.as_string([path])) + +def decode_image(img): + {{ if eq .Model.Format "png" }} + img = tf.io.decode_png(img, channels={{.ColorMode}}) + {{ else if eq .Model.Format "jpeg" }} + img = tf.io.decode_jpeg(img, channels={{.ColorMode}}) + {{ else }} + ERROR + {{ end }} + return tf.image.resize(img, image_size) + +def process_path(path): + label = pathToLabel(path) + + img = tf.io.read_file(path) + img = decode_image(img) + + return img, label + +def configure_for_performance(ds: tf.data.Dataset, size: int) -> tf.data.Dataset: + #ds = ds.cache() + ds = ds.shuffle(buffer_size=size) + ds = ds.batch(batch_size) + ds = ds.prefetch(AUTOTUNE) + return ds + +def prepare_dataset(ds: tf.data.Dataset, size: int) -> tf.data.Dataset: + ds = ds.map(process_path, num_parallel_calls=AUTOTUNE) + ds = configure_for_performance(ds, size) + return ds + +def filterDataset(path): + path = tf.strings.regex_replace(path, DATA_DIR_PREPARE, "") + + {{ if eq .Model.Format "png" }} + path = tf.strings.regex_replace(path, ".png", "") + {{ else if eq .Model.Format "jpeg" }} + path = tf.strings.regex_replace(path, ".jpeg", "") + {{ else }} + ERROR + {{ end }} + + return tf.reshape(table.lookup(tf.strings.as_string([path])), []) != -1 + +seed = random.randint(0, 100000000) + +batch_size = 64 + +# Read all the files from the direcotry +list_ds = tf.data.Dataset.list_files(str(f'{DATA_DIR}/*'), shuffle=False) +list_ds = list_ds.filter(filterDataset) + +image_count = len(list(list_ds.as_numpy_iterator())) + +list_ds = list_ds.shuffle(image_count, seed=seed) + +val_size = int(image_count * 0.3) + +train_ds = list_ds.skip(val_size) +val_ds = list_ds.take(val_size) + +dataset = prepare_dataset(train_ds, image_count) +dataset_validation = prepare_dataset(val_ds, val_size) + +track = 0 + +def addBlock( + b_size: int, + filter_size: int, + kernel_size: int = 3, + top: bool = True, + pooling_same: bool = False, + pool_func=layers.MaxPool2D, + layerNum = 0 +): + global track + # model = keras.Sequential(name=f"{track}-{b_size}-{filter_size}-{kernel_size}") + model = keras.Sequential(name=f"layer{layerNum}") + track += 1 + for _ in range(b_size): + model.add(layers.Conv2D( + filter_size, + kernel_size, + padding="same" + )) + model.add(layers.ReLU()) + if top: + if pooling_same: + model.add(pool_func(padding="same", strides=(1, 1))) + else: + model.add(pool_func()) + model.add(layers.BatchNormalization()) + model.add(layers.LeakyReLU()) + model.add(layers.Dropout(0.4)) + return model + + +{{ if .LoadPrev }} +model = tf.keras.saving.load_model('{{.LastModelRunPath}}') +{{ else }} +model = keras.Sequential() + +{{- range .Layers }} +{{- if eq .LayerType 1}} +model.add(layers.Rescaling(1./255, name="layer{{ .LayerNum }}")) +{{- else if eq .LayerType 2 }} +model.add(layers.Dense({{ .Shape }}, activation="sigmoid", name="layer{{ .LayerNum }}")) +{{- else if eq .LayerType 3}} +model.add(layers.Flatten(name="layer{{ .LayerNum }}")) +{{- else if eq .LayerType 4}} +model.add(addBlock(2, 128, 3, pool_func=layers.AveragePooling2D, layerNum={{.LayerNum}})) +{{- else }} +ERROR +{{- end }} +{{- end }} +{{ end }} + +model.compile( + loss=losses.SparseCategoricalCrossentropy(), + optimizer=tf.keras.optimizers.Adam(), + metrics=['accuracy']) + +his = model.fit(dataset, validation_data= dataset_validation, epochs={{.EPOCH_PER_RUN}}, callbacks=[ + NotifyServerCallback(), + tf.keras.callbacks.EarlyStopping("loss", mode="min", patience=5)], use_multiprocessing = True) + +acc = his.history["accuracy"] + +f = open("accuracy.val", "w") +f.write(str(acc[-1])) +f.close() + + +tf.saved_model.save(model, "{{ .SaveModelPath }}/model") +model.save("{{ .SaveModelPath }}/model.keras")