fyp/views/py/python_model_template.py
2023-10-19 10:44:13 +01:00

131 lines
3.4 KiB
Python

import tensorflow as tf
import random
import pandas as pd
from tensorflow import keras
from tensorflow.data import AUTOTUNE
from keras import layers, losses, optimizers
import requests
class NotifyServerCallback(tf.keras.callbacks.Callback):
def on_epoch_begin(self, epoch, *args, **kwargs):
if (epoch % 5) == 0:
# TODO change this
requests.get(f'http://localhost:8000/model/epoch/update?model_id={{.Model.Id}}&epoch={epoch}&definition={{.DefId}}')
DATA_DIR = "{{ .DataDir }}"
image_size = ({{ .Size }})
df = pd.read_csv("{{ .RunPath }}/train.csv", dtype=str)
keys = tf.constant(df['Id'].dropna())
values = tf.constant(list(map(int, df['Index'].dropna())))
table = tf.lookup.StaticHashTable(
initializer=tf.lookup.KeyValueTensorInitializer(
keys=keys,
values=values,
),
default_value=tf.constant(-1),
name="Indexes"
)
DATA_DIR_PREPARE = DATA_DIR + "/"
#based on https://www.tensorflow.org/tutorials/load_data/images
def pathToLabel(path):
path = tf.strings.regex_replace(path, DATA_DIR_PREPARE, "")
{{ if eq .Model.Format "png" }}
path = tf.strings.regex_replace(path, ".png", "")
{{ else if eq .Model.Format "jpeg" }}
path = tf.strings.regex_replace(path, ".jpg", "")
path = tf.strings.regex_replace(path, ".jpeg", "")
{{ else }}
ERROR
{{ end }}
return table.lookup(tf.strings.as_string([path]))
def decode_image(img):
{{ if eq .Model.Format "png" }}
img = tf.io.decode_png(img, channels={{.ColorMode}})
{{ else if eq .Model.Format "jpeg" }}
img = tf.io.decode_jpeg(img, channels={{.ColorMode}})
{{ else }}
ERROR
{{ end }}
return tf.image.resize(img, image_size)
def process_path(path):
label = pathToLabel(path)
img = tf.io.read_file(path)
img = decode_image(img)
return img, label
def configure_for_performance(ds: tf.data.Dataset) -> tf.data.Dataset:
#ds = ds.cache()
ds = ds.shuffle(buffer_size= 1000)
ds = ds.batch(batch_size)
ds = ds.prefetch(AUTOTUNE)
return ds
def prepare_dataset(ds: tf.data.Dataset) -> tf.data.Dataset:
ds = ds.map(process_path, num_parallel_calls=AUTOTUNE)
ds = configure_for_performance(ds)
return ds
seed = random.randint(0, 100000000)
batch_size = 100
# Read all the files from the direcotry
list_ds = tf.data.Dataset.list_files(str(f'{DATA_DIR}/*'), shuffle=False)
image_count = len(list_ds)
list_ds = list_ds.shuffle(image_count, seed=seed)
val_size = int(image_count * 0.3)
train_ds = list_ds.skip(val_size)
val_ds = list_ds.take(val_size)
dataset = prepare_dataset(train_ds)
dataset_validation = prepare_dataset(val_ds)
{{ if .LoadPrev }}
model = tf.keras.saving.load_model('{{.LastModelRunPath}}')
{{ else }}
model = keras.Sequential([
{{- range .Layers }}
{{- if eq .LayerType 1}}
layers.Rescaling(1./255),
{{- else if eq .LayerType 2 }}
layers.Dense({{ .Shape }}, activation="sigmoid"),
{{- else if eq .LayerType 3}}
layers.Flatten(),
{{- else }}
ERROR
{{- end }}
{{- end }}
])
{{ end }}
model.compile(
loss=losses.SparseCategoricalCrossentropy(),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])
his = model.fit(dataset, validation_data= dataset_validation, epochs={{.EPOCH_PER_RUN}}, callbacks=[NotifyServerCallback()])
acc = his.history["accuracy"]
f = open("accuracy.val", "w")
f.write(str(acc[-1]))
f.close()
tf.saved_model.save(model, "model")
model.save("model.keras")