import tensorflow as tf import random import pandas as pd from tensorflow import keras from tensorflow.data import AUTOTUNE from keras import layers, losses, optimizers import requests import numpy as np class NotifyServerCallback(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, log, *args, **kwargs): requests.get(f'{{ .Host }}/api/model/head/epoch/update?epoch={epoch + 1}&accuracy={log["val_accuracy"]}&head_id={{.HeadId}}') DATA_DIR = "{{ .DataDir }}" image_size = ({{ .Size }}) df = pd.read_csv("{{ .RunPath }}/train.csv", dtype=str) keys = tf.constant(df['Id'].dropna()) values = tf.constant(list(map(int, df['Index'].dropna()))) depth = {{ .Depth }} diff = {{ .StartPoint }} table = tf.lookup.StaticHashTable( initializer=tf.lookup.KeyValueTensorInitializer( keys=keys, values=values, ), default_value=tf.constant(-1), name="Indexes" ) DATA_DIR_PREPARE = DATA_DIR + "/" # based on https://www.tensorflow.org/tutorials/load_data/images def pathToLabel(path): path = tf.strings.regex_replace(path, DATA_DIR_PREPARE, "") {{ if eq .Model.Format "png" }} path = tf.strings.regex_replace(path, ".png", "") {{ else if eq .Model.Format "jpeg" }} path = tf.strings.regex_replace(path, ".jpeg", "") {{ else }} ERROR {{ end }} num = table.lookup(tf.strings.as_string([path])) return tf.cond( tf.math.equal(num, tf.constant(-2)), lambda: tf.zeros([depth]), lambda: tf.one_hot(table.lookup(tf.strings.as_string([path])) - diff, depth)[0] ) old_model = keras.models.load_model("{{ .BaseModel }}") def decode_image(img): {{ if eq .Model.Format "png" }} img = tf.io.decode_png(img, channels={{.ColorMode}}) {{ else if eq .Model.Format "jpeg" }} img = tf.io.decode_jpeg(img, channels={{.ColorMode}}) {{ else }} ERROR {{ end }} return tf.image.resize(img, image_size) def process_path(path): label = pathToLabel(path) img = tf.io.read_file(path) img = decode_image(img) return img, label def configure_for_performance(ds: tf.data.Dataset, size: int, shuffle: bool) -> tf.data.Dataset: # ds = ds.cache() if shuffle: ds = ds.shuffle(buffer_size=size) ds = ds.batch(batch_size) ds = ds.prefetch(AUTOTUNE) return ds def prepare_dataset(ds: tf.data.Dataset, size: int) -> tf.data.Dataset: ds = ds.map(process_path, num_parallel_calls=AUTOTUNE) ds = configure_for_performance(ds, size, False) return ds def filterDataset(path): path = tf.strings.regex_replace(path, DATA_DIR_PREPARE, "") {{ if eq .Model.Format "png" }} path = tf.strings.regex_replace(path, ".png", "") {{ else if eq .Model.Format "jpeg" }} path = tf.strings.regex_replace(path, ".jpeg", "") {{ else }} ERROR {{ end }} return tf.reshape(table.lookup(tf.strings.as_string([path])), []) != -1 seed = random.randint(0, 100000000) batch_size = 64 # Read all the files from the direcotry list_ds = tf.data.Dataset.list_files(str(f'{DATA_DIR}/*'), shuffle=False) list_ds = list_ds.filter(filterDataset) image_count = len(list(list_ds.as_numpy_iterator())) list_ds = list_ds.shuffle(image_count, seed=seed) val_size = int(image_count * 0.3) train_ds = list_ds.skip(val_size) val_ds = list_ds.take(val_size) dataset = prepare_dataset(train_ds, image_count) dataset_validation = prepare_dataset(val_ds, val_size) track = 0 def addBlock( b_size: int, filter_size: int, kernel_size: int = 3, top: bool = True, pooling_same: bool = False, pool_func=layers.MaxPool2D ): global track model = keras.Sequential( name=f"{track}-{b_size}-{filter_size}-{kernel_size}" ) track += 1 for _ in range(b_size): model.add(layers.Conv2D( filter_size, kernel_size, padding="same" )) model.add(layers.ReLU()) if top: if pooling_same: model.add(pool_func(padding="same", strides=(1, 1))) else: model.add(pool_func()) model.add(layers.BatchNormalization()) model.add(layers.LeakyReLU()) model.add(layers.Dropout(0.4)) return model # Proccess old data new_data = old_model.predict(dataset) labels = np.concatenate([y for _, y in dataset], axis=0) new_data = tf.data.Dataset.from_tensor_slices( (new_data, labels)) new_data = configure_for_performance(new_data, batch_size, True) new_data_val = old_model.predict(dataset_validation) labels_val = np.concatenate([y for _, y in dataset_validation], axis=0) new_data_val = tf.data.Dataset.from_tensor_slices( (new_data_val, labels_val)) new_data_val = configure_for_performance(new_data_val, batch_size, True) {{ if .LoadPrev }} model = tf.keras.saving.load_model('{{.LastModelRunPath}}') {{ else }} model = keras.Sequential() {{- range .Layers }} {{- if eq .LayerType 1}} model.add(layers.Rescaling(1./255)) {{- else if eq .LayerType 2 }} model.add(layers.Dense({{ .Shape }}, activation="sigmoid")) {{- else if eq .LayerType 3}} model.add(layers.Flatten()) {{- else if eq .LayerType 4}} model.add(addBlock(2, 128, 3, pool_func=layers.AveragePooling2D)) {{- else }} ERROR {{- end }} {{- end }} {{ end }} model.layers[0]._name = "head" model.compile( loss=losses.BinaryCrossentropy(from_logits=False), optimizer=tf.keras.optimizers.Adam(), metrics=['accuracy']) his = model.fit( new_data, validation_data=new_data_val, epochs={{.EPOCH_PER_RUN}}, callbacks=[ NotifyServerCallback(), tf.keras.callbacks.EarlyStopping("loss", mode="min", patience=5) ], use_multiprocessing=True ) acc = his.history["accuracy"] f = open("accuracy.val", "w") f.write(str(acc[-1])) f.close() tf.saved_model.save(model, "{{ .SaveModelPath }}/model") model.save("{{ .SaveModelPath }}/model.keras")