In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
In [2]:
import tensorflow as tf
import tensorflow.contrib.keras as keras
from tensorflow.contrib.keras import backend as K
from tensorflow.contrib.keras.python.keras.models import Sequential
from tensorflow.contrib.keras.python.keras.layers import Dense, Dropout, Flatten
In [3]:
import time
In [4]:
from enum import Enum
import math
class Dataset(Enum):
AND_GRID = 0 #Basic AND function
BASIC_GRID = 1 #Basic grid 4x4 with 8xO and 8xX
BOOL_FUNC = 2 #Boolean function of 5 variables
POLYGON = 3 #Polygon shape dividing 2D grid to 2 classes
MULTI_CLASS = 4 #Function dividing 2D grid to 8 classes
ONE_DIM = 5 #One dimensional function
ONE_DIM_MEMORY = 6 #One dimensional function with memory
def get_num_classes(dataset):
n = 2
if dataset == Dataset.MULTI_CLASS:
n = 8
elif dataset in [Dataset.ONE_DIM, Dataset.ONE_DIM_MEMORY]:
n = 1
return n
In [5]:
print("TensorFlow version =", tf.__version__)
print("Keras backend =", keras.backend.backend())
print("Default float type =", keras.backend.floatx())
print("Image data structure =", keras.backend.image_data_format())
In [6]:
def load_train_data(dataset, no_points):
x_all = np.loadtxt('data/' + dataset.name + "_" + str(no_points) + '_xs.txt')
print("Loaded",x_all.shape,"examples from", 'data/' + dataset.name + "_" + str(no_points) + '_xs.txt')
y_all = np.loadtxt('data/' + dataset.name + "_" + str(no_points) + '_ys.txt')
print("Loaded",y_all.shape,"labels from", 'data/' + dataset.name + "_" + str(no_points) + '_ys.txt')
return x_all, y_all
In [7]:
def prepare_train_data(dataset, no_points=10000, train_ratio=0.7):
print("Preparing training data for dataset", dataset.name)
x_all, y_all = load_train_data(dataset, no_points)
assert(x_all.shape[0] == y_all.shape[0])
if x_all.ndim == 1:
x_all = x_all.reshape((x_all.shape[0],1))
z_all = np.append(x_all, y_all.reshape((y_all.shape[0],1)), axis=1)
#z_all = z_all.astype('float32')
np.random.seed(0)
np.random.shuffle(z_all)
train_size = math.floor(x_all.shape[0] * train_ratio)
test_size = x_all.shape[0] - train_size
num_classes = get_num_classes(dataset)
print("Number of classes =", num_classes)
x_train = z_all[0:train_size, 0:x_all.shape[1]]
y_train = z_all[0:train_size, -1]
x_test = z_all[train_size:, 0:x_all.shape[1]]
y_test = z_all[train_size:, -1]
if num_classes > 1:
print("Changing labels to one-hot encoding...")
print('y_train[0] before changing to one-hot-encoding: ', y_train[0])
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
print('y_train[0] after changing to one-hot-encoding: ', y_train[0])
elif num_classes == 1:
print("Normalizing outputs of a real function to be approximated...")
y_max = z_all[:,-1].max()
print("Previous y_max =", y_max,"y_min =",z_all[:,-1].min())
z_all[:,-1] += y_max
z_all[:,-1] /= 2.0 * y_max
y_train = z_all[0:train_size, -1]
y_test = z_all[train_size:, -1]
print("After normalization y_max =", z_all[:,-1].max(),"y_min =",z_all[:,-1].min())
print("\nReturning:")
print("x_train: shape =", x_train.shape, "dtype =", x_train.dtype)
print("y_train: shape =", y_train.shape, "dtype =", y_train.dtype)
print("x_test: shape =", x_test.shape, "dtype =", x_test.dtype)
print("y_test: shape =", y_test.shape, "dtype =", y_test.dtype)
return x_train, y_train, x_test, y_test, num_classes
In [8]:
def test_model(model, x_test, y_test):
if len(x_test) > 0:
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])
else:
print("Cannot test model: No test data supplied.")
In [9]:
def train_model(model, batch_size, epochs, x_train, y_train, x_test, y_test, plot_outputs=True, plot_epochs=500):
print("==== Training ====")
start_time = time.time()
y_predicts = []
if plot_outputs:
epochs_done = 0
while epochs_done < epochs:
epoch_start_time = time.time()
model.fit(x_train, y_train,
batch_size=batch_size,
epochs=plot_epochs,
verbose=0
#,validation_data=(x_test, y_test) #This calculates validation on test set after each epoch = too slow
)
epochs_done += plot_epochs
y_pred = model.predict(x_train)
y_predicts.append(y_pred)
print("After",epochs_done,"epochs:")
print(plot_epochs,"epochs time =", time.time() - epoch_start_time)
test_model(model, x_test, y_test)
plot_output(x_train, y_pred, y_train)
else:
model.fit(x_train, y_train,
batch_size=batch_size,
epochs=epochs,
verbose=0)
y_pred = model.predict(x_train)
y_predicts.append(y_pred)
test_model(model, x_test, y_test)
elapsed_time = time.time() - start_time
print("Total time =", elapsed_time)
return y_predicts
In [10]:
def plot_output(x_train, y_predicted, y_train):
if x_train.shape[1] > 1:
plt.scatter(x_train.T[0],x_train.T[1],c=np.argmax(y_predicted, axis=1))
plt.title("ANN output for training data")
else:
z_train = np.append(x_train, y_train.reshape((y_train.shape[0],1)), axis=1)
z_pred = np.append(x_train, y_predicted.reshape((y_predicted.shape[0],1)), axis=1)
z_train = z_train[z_train[:,0].argsort()] #Sort acc to 1st column = x values
z_pred = z_pred[z_pred[:,0].argsort()]
plt.plot(z_train[:,0], z_train[:,1], 'b-', z_pred[:,0], z_pred[:,1], 'r--')
plt.title("ANN output (red) VS training data (blue)")
plt.show()
In [22]:
def plot_train_data(x_train, y_train):
if y_train.ndim >= 2:
plt.scatter(x_train.T[0],x_train.T[1],c=np.argmax(y_train, axis=1))
else:
z = np.append(x_train, y_train.reshape((y_train.shape[0],1)), axis=1)
z = z[z[:,0].argsort()]
plt.plot(z[:,0], z[:,1], 'bo')
plt.title("Training data")
plt.show()
In [14]:
def get_new_model(dataset, x_train, num_classes, SGD_learn_rate=0.1):
model = Sequential()
if dataset in [Dataset.AND_GRID, Dataset.BASIC_GRID, Dataset.POLYGON, Dataset.MULTI_CLASS]:
if dataset == Dataset.AND_GRID:
model.add(Dense(5, input_shape=x_train[0].shape, activation='sigmoid'))
elif dataset in [Dataset.BASIC_GRID, Dataset.POLYGON, Dataset.MULTI_CLASS]:
model.add(Dense(10, input_shape=x_train[0].shape, activation='sigmoid'))
model.add(Dense(5, activation='sigmoid'))
model.add(Dense(num_classes, activation='softmax'))
model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=keras.optimizers.SGD(lr=SGD_learn_rate),
metrics=['accuracy'])
elif dataset == Dataset.ONE_DIM:
model.add(Dense(10, input_shape=x_train[0].shape, activation='sigmoid'))
#model.add(Dense(5, activation='sigmoid'))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss=keras.losses.mean_squared_error,
optimizer=keras.optimizers.SGD(lr=SGD_learn_rate),
metrics=['mean_squared_error'])
else:
model = None
print("Not defined yet.")
return model
In [15]:
dataset = Dataset.ONE_DIM
x_train, y_train, x_test, y_test, num_classes = prepare_train_data(dataset, no_points=900, train_ratio=1.0)
In [23]:
plot_train_data(x_train, y_train)
In [16]:
model = get_new_model(dataset, x_train, num_classes, SGD_learn_rate=1.0)
In [17]:
batch_size = 8
epochs = 4000
In [18]:
#y_predicted = train_model(model, batch_size, epochs, x_train[0:20], y_train[0:20], x_test[0:20], y_test[0:20])
y_predicts = train_model(model, batch_size, epochs, x_train, y_train, x_test, y_test, plot_epochs=1000)
In [25]:
dataset = Dataset.POLYGON
x_train, y_train, x_test, y_test, num_classes = prepare_train_data(dataset, no_points=900, train_ratio=1.0)
In [26]:
plot_train_data(x_train, y_train)
In [27]:
model = get_new_model(dataset, x_train, num_classes, SGD_learn_rate=0.1)
batch_size = 8
epochs = 1000
y_predicts = train_model(model, batch_size, epochs, x_train, y_train, x_test, y_test, plot_epochs=250)
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: