In [1]:
FLAGS = None
import os.path
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import re, os, sys, time, argparse
### Hiding unwanted compile errors due to tensorflow not being built for specific
### computing architecture.
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
In [2]:
def W(shape=None, name=None, mean=0.0, stddev=0.1, dtype=tf.float32, seed=None):
'''
Generates a trainable matrix with values following a normal distribution with specified mean and standard
deviation, values whose magnitude is more than 2 standard deviations from the mean are dropped and re-picked.
Parameters
----------
shape : Int[] -> (2, 1) array with the desired input&output sizes of the operation layer
name : String -> Weight matrix name
mean : Float -> mean value of distribution
stddev : Float -> standard deviation of distribution
'''
name = name + "_Truncated_Normal"
initial = tf.truncated_normal(shape, mean, stddev, dtype, seed, name)
name = name + "_Weight_Variable"
return tf.Variable(initial, name=name)
def B(shape=None, name=None, value=0.1, dtype=tf.float32, verify_shape=False):
'''
Generates and return a trainable bias matrix.
Parameters
----------
shape : Int[] -> (2, 1) array with the desired input&output sizes of the operation layer
name : String -> Weight matrix name
value : Float -> Initial value of matrix elements
'''
name = name + "_Constant"
initial = tf.constant(value, dtype, shape, name, verify_shape)
name = name + "_Bias_Variable"
return tf.Variable(initial, name=name)
def Multiply(a, b, name=None):
'''
Returns the result of a matrix multiplication.
Parameters
----------
a : tf.tensor
b : tf.tensor
name : String -> Weight matrix name
'''
name = name + "_Mat_Multiplication"
return tf.matmul(a, b, name=name)
def Add(a, b, name=None):
'''
Returns the result of an element-wise matrix addition.
Parameters
----------
a : tf.tensor
b : tf.tensor
name : String -> Weight matrix name
'''
name = name + "_Mat_Addition"
return tf.add(a, b, name=name)
def Drop(x, keep_prob, name=None):
name = name + "_Dropout"
return tf.nn.dropout(x, keep_prob, name=name)
def Activation(features, activation, name=None):
name = name + "_Activation_Function"
if (activation == "ELU"):
tmp = tf.nn.elu(features, name)
elif (activation == "RELU"):
tmp = tf.nn.relu(features, name)
elif (activation == "SIGMOID"):
tmp = tf.nn.sigmoid(features, name)
elif (activation == "TANH"):
tmp = tf.nn.tanh(features, name)
elif (activation == "SOFTPLUS"):
tmp = tf.nn.softplus(features, name)
elif (activation == "SOFTSIGN"):
tmp = tf.nn.softsign(features, name)
elif (activation == "SOFTMAX"):
tmp = tf.nn.softmax(features, name=name)
else:
tmp = features
return tmp
def Pool(value, ksize=None, mode="MAX", stride=None, name=None):
if (mode == "MAX"):
name = name + "_Max_Pooling"
tmp = tf.nn.max_pool(value, ksize, stride, "SAME", "NHWC", name)
elif (mode == "MEAN"):
name = name + "_Mean_Pooling"
tmp = tf.nn.avg_pool(value, ksize, stride, "SAME", "NHWC", name)
else:
tmp = value
return tmp
def Conv2D(img, filtr, strides, padding, gpu, name=None):
name = name + "_2D_Convolution"
return tf.nn.conv2d(img, filtr, strides, padding, gpu, "NHWC", name)
def BatchNorm(x, n_out, train_or_test, convo=False, name=None):
name = name + "_Batch_Normalisation"
beta = B(value=0.0, shape=[n_out], name=name+"beta")
gamma = B(value=1.0, shape=[n_out], name=name+"gamma")
if (convo == True):
m, v = tf.nn.moments(x, [0, 1, 2], name=name+'_Moments')
else:
m, v = tf.nn.moments(x, [0], name=name+'_Moments')
ema = tf.train.ExponentialMovingAverage(decay=0.9)
def mean_var_with_update():
ema_apply_op = ema.apply([m, v])
with tf.control_dependencies([ema_apply_op]):
return tf.identity(m), tf.identity(v)
mean, var = tf.cond(train_or_test, mean_var_with_update, lambda: (ema.average(m), ema.average(v)))
xbn = tf.nn.batch_normalization(x, mean, var, beta, gamma, 1e-3)
return xbn
def dense_2_onehot(labels_dense, num_classes):
num_labels = labels_dense.shape[0]
index_offset = np.arange(num_labels) * num_classes
labels_one_hot = np.zeros((num_labels, num_classes))
labels_one_hot.flat[index_offset + labels_dense.ravel()] = 1
return labels_one_hot
def onehot_2_dense(labels_onehot):
num_labels = np.shape(labels_onehot)[0]
denses = []
for i in range(num_labels):
one_locations = np.where(np.equal(labels_onehot[i], 1))
denses.append(one_locations[0][0])
return denses
def DenseLayer(Tensor, In, Out, Name = None, Function = "ELU", Dropout = None, Batchnorm = None):
Name = "Dense_" + Name
Weight = W([In, Out], Name)
Bias = B([Out], Name)
Output = Multiply(Tensor, Weight, Name)
Output = Add(Output, Bias, Name)
Output = Activation(Output, Function, Name)
if (Batchnorm != None):
Output = BatchNorm(Output, Out, Batchnorm, False, Name)
if (Dropout != None):
Output = Drop(Output, Dropout, Name)
return Output
In [3]:
class ProgressBar(object):
DEFAULT = 'Progress: %(bar)s %(current)d/%(total)d (%(percent)3d%%)'
TRAINING = 'Training Epoch %(epoch)d: %(bar)s %(current)d/%(total)d (%(percent)3d%%)'
def __init__(self, total, width=60, fmt=DEFAULT, symbol='=', output=sys.stderr):
self._total = total
self._width = width
self._symbol = symbol
self._output = output
self._fmt = re.sub(r'(?P<name>%\(.+?\))d', r'\g<name>%dd' % len(str(total)), fmt)
self._current = 0
self._epoch = 0
def __call__(self, epoch, current):
self._current += current
self._epoch = epoch
percent = self._current / float(self._total)
size = int(self._width * percent)
remaining = self._total - self._current
bar = '[' + self._symbol * size + ' ' * (self._width - size) + ']'
args = {
'epoch' : self._epoch,
'total' : self._total,
'bar' : bar,
'current' : self._current,
'percent' : percent * 100,
'remaining': remaining}
### uncomment if running outside of jupyter-notebook
#print('\r' + self._fmt%args, file=self._output, end='')
### comment if running outside of jupyter-notebook
print(self._fmt%args)
def done(self):
self._current = self._total
### uncomment if running outside of jupyter-notebook
#print('', file=self._output)
In [4]:
class Dataset(object):
def __init__(self, images, labels, oneHot=False, oneHot_depth=None):
if (np.shape(images)[0] == np.shape(labels)[0]):
self._images = images
self._size = len(images)
self._epochs = 0
self._index = 0
self._onehot = oneHot
if (oneHot == True):
if (oneHot_depth > 0):
self._labels = dense_2_onehot(labels, oneHot_depth)
# special case for train_test-valis_split()
elif (oneHot_depth == -1):
self._labels = labels
else:
print("Enter depth")
else:
self._labels = labels
else:
print("Images and Labels must be of same lenght")
@property
def size(self):
return self._size
@property
def epochs_completed(self):
return self._epochs
@property
def index(self):
return self._index
@property
def images(self, index=None):
return self._images
@property
def labels(self):
return self._labels
def get_images(self, index=None):
if (index == None):
return self._images
else:
return self._images[index]
def get_labels(self, index=None):
if (index == None):
return self._labels
else:
return self._labels[index]
def imshow(self, index):
plt.figure()
if (self._onehot == True):
plt.title(onehot_2_dense([self._labels[index]]))
else:
plt.title(self._labels[index])
img = self._images[index][:,:,0]
plt.imshow(img)
plt.show()
def train_test_valid_split(self, trn_size, tst_size, vld_size):
# Shuffle the data
if ((trn_size + tst_size + vld_size) == 1.0):
perm = np.arange(self._size)
np.random.shuffle(perm)
self._images = self._images[perm]
self._labels = self._labels[perm]
a = 0 + int(trn_size * self._size)
b = a + int(tst_size * self._size)
c = b + int(vld_size * self._size)
training_set = Dataset(self._images[0:a], self._labels[0:a], self._onehot, -1)
testing_set = Dataset(self._images[a:b], self._labels[a:b], self._onehot, -1)
if (c != 0):
validation_set = Dataset(self._images[b:c], self._labels[b:c], self._onehot, -1)
return training_set, testing_set, validation_set
else:
return training_set, testing_set
else:
print("train_size:%3.2f + test_size:%3.2f must be equal to 1" %(trn_size, tst_size))
def next_batch(self, batch_size):
tmp_x = self._size - self._index
if ((tmp_x % batch_size) != 0):
print("Batch of size %d cannot equally split set of size %d" %(batch_size, tmp_x))
else:
start = self._index
self._index += batch_size
# Finished one epoch
if (self._index > self._size):
self._epochs += 1
# Shuffle the data
perm = np.arange(self._size)
np.random.shuffle(perm)
self._images = self._images[perm]
self._labels = self._labels[perm]
# Start next epoch
start = 0
self._index = batch_size
assert batch_size <= self._size
end = self._index
return self._images[start:end], self._labels[start:end]
In [5]:
class Model(object):
def __init__(self, images, labels, logits, session):
self._session = session
self._images = images
self._labels = labels
self._logits = logits
self._loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=self._labels, logits=self._logits, name='xentropy'), name='xentropy_mean')
tf.summary.scalar("loss", self._loss)
learning_rate_init = 1e-3
global_step = tf.Variable(0, name="global_step", trainable=False)
learning_rate = tf.train.exponential_decay(learning_rate_init,
global_step,
decay_steps=30000,
decay_rate=0.9,
staircase=False,
name='learning_rate_decay')
tf.summary.scalar("learning_rate", learning_rate)
self._train_op = tf.train.AdamOptimizer(learning_rate).minimize(self._loss, global_step)
self._accuracy = tf.reduce_mean(tf.cast(tf.equal(
tf.argmax(self._logits, 1),
tf.argmax(self._labels, 1)),
tf.float32))
self._saver = tf.train.Saver()
self._summary = tf.summary.merge_all()
self._writer = tf.summary.FileWriter("./saved_models/", self._session.graph)
self._session.run(tf.global_variables_initializer())
def train_model(self, dataset, args, batch_size, epochs):
with self._session.as_default():
while (dataset.epochs_completed < epochs):
progress = ProgressBar(dataset.size, fmt=ProgressBar.TRAINING)
for step in range(0, dataset.size, batch_size):
imgs, lbls = dataset.next_batch(batch_size)
dictionary = {self._images:imgs, self._labels:lbls}
dictionary.update(args)
self._train_op.run(feed_dict=dictionary)
progress(dataset.epochs_completed, batch_size)
##### Write the summaries and print an overview fairly often.
summary_str = self._session.run(self._summary, feed_dict=dictionary)
self._writer.add_summary(summary_str, step)
self._writer.flush()
progress.done()
##### Save a checkpoint and evaluate the model periodically.
checkpoint_file = os.path.join("./saved_models/", 'model.ckpt')
self._saver.save(self._session, checkpoint_file, global_step=step)
def test_model(self, dataset, args, batch_size):
results = []
with tf.Graph().as_default():
for _ in range(0, dataset.size, batch_size):
imgs, lbls = dataset.next_batch(batch_size)
dictionary = {self._images:imgs, self._labels:lbls}
dictionary.update(args)
a = self._accuracy.eval(feed_dict=dictionary)
results.append(a)
b = np.mean(results)
print("Testing accuracy = %g" %(b))
def predict(self, image, args, info, label="ONEHOT"):
with tf.Graph().as_default():
img = np.reshape(image, [-1, info["Height"], info["Width"], info["Channels"]])
dictionary = {self._images:img}
dictionary.update(args)
pred = self._session.run(self._logits, feed_dict=dictionary)
if (label=="DENSE"):
tmp = np.arange(info["Classes"])
tmp = pred
tmp = tmp / np.max(tmp)
tmp[np.where(tmp<np.max(tmp))] = 0
pred = onehot_2_dense(tmp)
return pred
In [6]:
def model_architecture(l0_dense):
l1_in = 3573
l1_out = 50
l1_dense = DenseLayer(l0_dense, l1_in, l1_out, 'l1_dense', 'NONE')
#l2_in = 3136
#l2_out = 1568
#l2_dense = DenseLayer(l1_dense, l2_in, l2_out, 'l2_dense', 'ELU')
#l3_in = 1568
#l3_out = 784
#l3_dense = DenseLayer(l2_dense, l3_in, l3_out, 'l3_dense', 'SIGMOID')
#l4_in = 784
#l4_out = 392
#l4_dense = DenseLayer(l3_dense, l4_in, l4_out, 'l4_dense', 'RELU')
#l5_in = 392
#l5_out = 196
#l5_dense = DenseLayer(l4_dense, l5_in, l5_out, 'l5_dense', 'TANH')
l6_in = 50
l6_out = 1
l6_dense = DenseLayer(l1_dense, l6_in, l6_out, 'l6_dense', 'SOFTMAX')
return l6_dense
In [11]:
def main(argv):
##### Some basic information about the dataset.
Info = {"Height":1, "Width":3573, "Channels":1, "Classes":1, "Rows": 124}
Elements = Info["Height"] * Info["Width"] * Info["Channels"]
##### Loading features matrixs and target vector& Creating a dataset
imgs = np.load(FLAGS.path + FLAGS.features)
lbls = np.reshape(np.load(FLAGS.path + FLAGS.targets), [Info["Rows"], Info["Classes"]])
dataset = Dataset(imgs, lbls)
##### Splitting the dataset into testing, training and validation datasets.
### Sum arguments to test_train_valid_split() should be 1.0
### dataset.train_test_valid_split(0.7, 0.3, 0.0) returns an empty validation set.
training_set, testing_set, validation_set = dataset.train_test_valid_split(0.5, 0.5, 0.00)
##### Placeholders for images and labels
images = tf.placeholder(dtype=tf.float32, shape=[None, Elements], name="Features")
labels = tf.placeholder(dtype=tf.float32, shape=[None, Info["Classes"]], name="Targets")
##### Model Architecture
logits = model_architecture(images)
##### Model.
session = tf.Session(config=tf.ConfigProto())
model = Model(images, labels, logits, session)
##### Training and Testing model.
with session.as_default():
start = time.time()
args = {}
model.train_model(training_set, args, batch_size=FLAGS.batch_size, epochs=FLAGS.epochs)
end = time.time()
print("Time taken = %3.1fs" %(end-start))
with session.as_default():
args = {}
model.test_model(testing_set, args, batch_size=FLAGS.batch_size)
In [12]:
!pwd
In [13]:
!ls "/home/MuR/barista/MillionSong"
In [14]:
np.set_printoptions(precision=3)
np.set_printoptions(suppress=True)
parser = argparse.ArgumentParser()
parser.add_argument('--epochs' , type=int, default=50)
parser.add_argument('--batch_size' , type=int, default=8)
parser.add_argument('--device' , type=str, default="/cpu:0")
parser.add_argument('--path' , type=str, default="/home/MuR/barista/MillionSong/")
parser.add_argument('--features' , type=str, default="features.npy")
parser.add_argument('--targets' , type=str, default="label.npy")
parser.add_argument('--mode' , type=str, default="TRAIN")
parser.add_argument('--logdir' , type=str, default="./saved_models/model.ckpt-60")
FLAGS, unparsed = parser.parse_known_args()
tf.reset_default_graph()
main(argv=[sys.argv[0]] + unparsed)
In [ ]: