This Jupyter notebook contains the code pipeline for a traffic sign classifier tuned for project 2 of Term 1. Python libraries used in this project:
pickle: save and load binary python objectsnumpy: algebra calculationsmatplotlib: plots and image loadingtensorflow: machine learning frameworksklearn: machine learning frameworkPackages scikit-image and cv2 were tested, but not used in final form.
In [1]:
# Load pickled data
import pickle
import numpy as np
training_file = './traffic-signs-data/train.p'
validation_file= './traffic-signs-data/valid.p'
testing_file = './traffic-signs-data/test.p'
with open(training_file, mode='rb') as f:
train = pickle.load(f)
with open(validation_file, mode='rb') as f:
valid = pickle.load(f)
with open(testing_file, mode='rb') as f:
test = pickle.load(f)
X_train0, y_train0 = train['features'], train['labels']
X_valid0, y_valid0 = valid['features'], valid['labels']
X_test0, y_test0 = test['features'], test['labels']
The pickled data is a dictionary with 4 key/value pairs:
'features' is a 4D array containing raw pixel data of the traffic sign images, (num examples, width, height, channels).'labels' is a 1D array containing the label/class id of the traffic sign. The file signnames.csv contains id -> name mappings for each id.
In [2]:
# Number of training examples
n_train = X_train0.shape[0]
# Number of validation examples
n_validation = X_valid0.shape[0]
# Number of testing examples.
n_test = X_test0.shape[0]
# What's the shape of an traffic sign image?
image_shape = X_train0.shape[1], X_train0.shape[2], X_train0.shape[3]
# How many unique classes/labels there are in the dataset.
n_classes = np.max(np.unique(y_train0).shape)
print("Number of training examples =", n_train)
print("Number of validation examples =", n_validation)
print("Number of testing examples =", n_test)
print("Image data shape =", image_shape)
print("Number of classes =", n_classes)
print("labels shape: ", y_train0.shape)
print(y_train0[0:4])
In [3]:
### Data exploration visualization code goes here.
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
# Visualizations will be shown in the notebook.
%matplotlib inline
im1 = np.round(np.random.random_sample()*n_train).astype(int)
im2 = np.round(np.random.random_sample()*n_train).astype(int)
im3 = np.round(np.random.random_sample()*n_train).astype(int)
im4 = np.round(np.random.random_sample()*n_train).astype(int)
print(im1, im2, im3, im4)
fig, axs = plt.subplots(nrows=2, ncols=2)
axs[0, 0].imshow(X_train0[im1,:,:,:])
axs[0, 1].imshow(X_train0[im2,:,:,:])
axs[1, 0].imshow(X_train0[im3,:,:,:])
axs[1, 1].imshow(X_train0[im4,:,:,:])
plt.show()
plt.figure()
plt.hist(train['labels'], n_classes)
Out[3]:
Labels in the training set are not equally distributed.
Pre-processing is made in two steps:
In [4]:
### Preprocess the data here.
from sklearn.utils import shuffle
#from skimage import exposure
#from skimage.color import rgb2gray
from datetime import datetime
# training data
X_train = np.zeros((n_train, X_train0.shape[1], X_train0.shape[2], 1), np.int)
# validation data
X_valid = np.zeros((n_validation, X_valid0.shape[1], X_valid0.shape[2], 1), np.int)
# grayscale luminosity
#lum = np.ndarray((3,), np.float, np.array([0.210, 0.720, 0.070]))
lum = np.ndarray((3,), np.float, np.array([0.299, 0.587, 0.114]))
y_train = y_train0
y_valid = y_valid0
for i in range(n_train):
# grayscale with CV2
#X_train[i,:,:,:] = cv2.cvtColor(X_train[i,:,:,:], cv2.COLOR_RGB2GRAY)
# grayscale with luminosity
# 0.21 R + 0.72 G + 0.07 B.
#X_train[i,:,:,0] = (X_train1[i,:,:,0]*0.21 + X_train1[i,:,:,1]*0.72 + X_train1[i,:,:,2]*0.07).astype(int)
# also suggested in CarND online foruns
#X_train[i,:,:,0] = np.dot(X_train1[i,:,:,:], lum).astype(int)
#p2, p98 = np.percentile(X_train1[i,:,:,:], (2, 98))
#X_train1[i,:,:,:] = exposure.rescale_intensity(X_train1[i,:,:,:], in_range=(p2, p98))
#pass
#X_train[i,:,:,0] = np.dot(X_train1[i,:,:,:]
#X_train[i,:,:,0] = rgb2gray(X_train1[i,:,:,:])
# grayscale
X_train[i,:,:,0] = np.dot(X_train0[i,:,:,:], lum)
for i in range(n_validation):
X_valid[i,:,:,0] = np.dot(X_valid0[i,:,:,:], lum).astype(int)
# normalized after grayscale and save computing costs
X_train = (X_train/255.0)-0.5
X_valid = (X_valid/255.0)-0.5
# check data
print("max min ",np.amin(X_train), np.amax(X_train))
fig, axs = plt.subplots(nrows=2, ncols=2)
axs[0, 0].imshow(X_train[im1,:,:,0], cmap=plt.cm.gray)
axs[0, 1].imshow(X_train[im2,:,:,0], cmap=plt.cm.gray)
axs[1, 0].imshow(X_train[im3,:,:,0], cmap=plt.cm.gray)
axs[1, 1].imshow(X_train[im4,:,:,0], cmap=plt.cm.gray)
print("Normalized grayscaled images.")
plt.show()
print("Done preprocessing.")
In [5]:
import tensorflow as tf
from tensorflow.contrib.layers import flatten
from tensorflow.python.client import device_lib
import platform
def factory(n_classes, mu = 0, sigma = 0.1):
def LeNet(x):
# W=32, F=5, P=0, S=1
# out = 1 + [W-F+2P]/S => 1 + (32-5+0)/1 = 28
# Input = 32x32x1. Output = 28x28x6.
# number of filters is arbitrary
# https://discussions.udacity.com/t/define-input-depth-output-depth-f/238575/14
#
conv1_W = tf.Variable(tf.truncated_normal(shape=(5, 5, 1, 16), mean = mu, stddev = sigma))
conv1_b = tf.Variable(tf.zeros(16))
conv1 = tf.nn.conv2d(x, conv1_W, strides=[1, 1, 1, 1], padding='VALID') + conv1_b
# Activation.
conv1 = tf.nn.relu(conv1)
# out = 1 + [W-F+2P]/S => 1+(28-2+0)/2 = 14
# Pooling. Input = 28x28x16, Output = 14x14x16.
conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
# out = 1 + [W-F+2P]/S => 1+(14-5+0)/1 = 10
# Layer 2: Convolutional. Input = 14x14x16, Output = 10x10x32.
conv2_W = tf.Variable(tf.truncated_normal(shape=(5, 5, 16, 32), mean = mu, stddev = sigma))
conv2_b = tf.Variable(tf.zeros(32))
conv2 = tf.nn.conv2d(conv1, conv2_W, strides=[1, 1, 1, 1], padding='VALID') + conv2_b
# Activation.
conv2 = tf.nn.relu(conv2)
# out = 1 + [W-F+2P]/S => 1+(10-2+0)/2 = 5
# Pooling. Input = 10x10x16, Output = 5x5x32.
conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
# Flatten. Input = 5x5x32. Output = 800.
fc0 = flatten(conv2)
# out = 1 + [W-F+2P]/S =>
# Fully Connected. Input = 800. Output = 400.
fc1_W = tf.Variable(tf.truncated_normal(shape=(800, 400), mean = mu, stddev = sigma))
fc1_b = tf.Variable(tf.zeros(400))
fc1 = tf.matmul(fc0, fc1_W) + fc1_b
# Activation.
fc1 = tf.nn.relu(fc1)
# connected layer
fc2_W = tf.Variable(tf.truncated_normal(shape=(400, 129), mean = mu, stddev = sigma))
fc2_b = tf.Variable(tf.zeros(129))
fc2 = tf.matmul(fc1, fc2_W) + fc2_b
fc2 = tf.nn.relu(fc2)
# Fully Connected. Input = 129. Output = 86.
fc3_W = tf.Variable(tf.truncated_normal(shape=(129, 86), mean = mu, stddev = sigma))
fc3_b = tf.Variable(tf.zeros(86))
fc3 = tf.matmul(fc2, fc3_W) + fc3_b
# Activation.
fc3 = tf.nn.relu(fc3)
# Input = 86 Output = n_classes.
fc4_W = tf.Variable(tf.truncated_normal(shape=(86, n_classes), mean = mu, stddev = sigma))
fc4_b = tf.Variable(tf.zeros(n_classes))
# final
logits = tf.matmul(fc3, fc4_W) + fc4_b
return logits
return LeNet
Input data was already split into training, validation and testing. This separation helps to prevent overfiting.
Running the model on AWS enables computations on GPU. For 50 epochs it takes about 3 minutes.
In [6]:
def evaluate(X_data, y_data):
num_examples = len(X_data)
total_accuracy = 0
total_loss = 0
sess = tf.get_default_session()
for offset in range(0, num_examples, BATCH_SIZE):
batch_x, batch_y = X_data[offset:offset+BATCH_SIZE], y_data[offset:offset+BATCH_SIZE]
loss, accuracy = sess.run([loss_operation, accuracy_operation], feed_dict={x: batch_x, y: batch_y})
total_loss += (loss*len(batch_x))
total_accuracy += (accuracy * len(batch_x))
return total_loss/num_examples, total_accuracy / num_examples
In [7]:
### Training pipeline
x = tf.placeholder(tf.float32, (None, 32, 32, 1))
y = tf.placeholder(tf.int32, (None))
one_hot_y = tf.one_hot(y, n_classes)
EPOCHS = 50
BATCH_SIZE = 128
#BATCH_SIZE = 256
rate = 0.0005
LeNetFn = factory(n_classes)
logits = LeNetFn(x)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(labels=one_hot_y, logits=logits)
loss_operation = tf.reduce_mean(cross_entropy)
optimizer = tf.train.AdamOptimizer(learning_rate = rate)
training_operation = optimizer.minimize(loss_operation)
correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))
accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
saver = tf.train.Saver()
print("System: ")
print(platform.uname())
print("")
devices = [x.name for x in device_lib.list_local_devices() if x.device_type == 'GPU']
print(devices)
print("")
acc_epochs = np.zeros((EPOCHS,), np.float)
loss_fn = np.zeros((EPOCHS,), np.float)
In [8]:
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
num_examples = len(X_train)
print(datetime.now().isoformat(' '), " - Training...")
print()
for i in range(EPOCHS):
X_train, y_train = shuffle(X_train, y_train)
for offset in range(0, num_examples, BATCH_SIZE):
end = offset + BATCH_SIZE
batch_x, batch_y = X_train[offset:end], y_train[offset:end]
sess.run(training_operation, feed_dict={x: batch_x, y: batch_y})
#loss, validation_accuracy = evaluate(X_valid, y_valid)
loss, validation_accuracy = evaluate(X_train, y_train)
print("EPOCH {} ...".format(i+1))
print("Validation Accuracy = {:.3f}".format(validation_accuracy))
print()
acc_epochs[i] = validation_accuracy
loss_fn[i] = loss
print(datetime.now().isoformat(' '), " - Finished training")
saver.save(sess, './lenet/lenet')
print("Model saved")
In [9]:
model_r_max = np.max(acc_epochs)
print("Average accuracy: ", np.mean(acc_epochs), " highest acc: ", model_r_max)
plt.figure()
plt.plot(acc_epochs)
plt.xlabel("epochs")
plt.ylabel("accuracy")
plt.title("Model Accuracy over Epochs")
plt.show()
plt.savefig("./output/train_evolution_1FD32_L5_E"+str(EPOCHS)+"_B"+str(BATCH_SIZE)+"_R"+str(rate)+"_A999.png")
plt.figure()
plt.plot(loss_fn)
plt.xlabel("epochs")
plt.ylabel("loss function")
plt.show()
plt.savefig("./output/train_loss_1FD32_L5_E"+str(EPOCHS)+"_B"+str(BATCH_SIZE)+"_R"+str(rate)+"_A999.png")
In [11]:
print("Validating: ")
with tf.Session() as sess:
saver.restore(sess, tf.train.latest_checkpoint('./lenet'))
_, test_accuracy = evaluate(X_valid, y_valid)
print("Valid Accuracy = {:.3f}".format(test_accuracy))
In [12]:
n_test = X_test0.shape[0]
X_test = np.zeros((n_test, X_test0.shape[1], X_test0.shape[2], 1), np.int)
for i in range(n_test):
X_test[i,:,:,0] = np.dot(X_test0[i,:,:,:], lum).astype(int)
# normalized after grayscale and save computing costs
X_test = (X_test/255.0)-0.5
with tf.Session() as sess:
saver.restore(sess, tf.train.latest_checkpoint('./lenet'))
_, test_accuracy = evaluate(X_test, y_test0)
print("Test Accuracy = {:.3f}".format(test_accuracy))
In [13]:
import matplotlib.image as mpimg
xtest20 = np.ndarray((5, 32, 32, 3), np.float)
xtest21 = np.ndarray((5, 32, 32, 1), np.float)
#xtest20[0, :, :, :] = mpimg.imread('./new-data/ni_01_14.png')
#xtest20[1, :, :, :] = mpimg.imread('./new-data/ni_02_15.png')
#xtest20[2, :, :, :] = mpimg.imread('./new-data/ni_03_27.png')
#xtest20[3, :, :, :] = mpimg.imread('./new-data/ni_04_30.png')
#xtest20[4, :, :, :] = mpimg.imread('./new-data/ni_05_40.png')
xtest20[0, :, :, :] = mpimg.imread('./new-data/ni_01_14.jpg')
xtest20[1, :, :, :] = mpimg.imread('./new-data/ni_02_15.jpg')
xtest20[2, :, :, :] = mpimg.imread('./new-data/ni_03_27.jpg')
xtest20[3, :, :, :] = mpimg.imread('./new-data/ni_04_30.jpg')
xtest20[4, :, :, :] = mpimg.imread('./new-data/ni_05_40.jpg')
ylabels = np.ndarray((5,), np.int, np.array([14,15,27,30,40]))
lum = np.ndarray((3,), np.float, np.array([0.299, 0.587, 0.114]))
for i in range(5):
xtest21[i,:,:,0] = np.dot(xtest20[i,:,:,:], lum)
# normalized after grayscale and save computing costs
xtest21 = (xtest21/255.0)-0.5
fig, axs = plt.subplots(nrows=3, ncols=2)
axs[0, 0].imshow(xtest20[0,:,:,:])
axs[0, 1].imshow(xtest20[1,:,:,:])
axs[1, 0].imshow(xtest20[2,:,:,:])
axs[1, 1].imshow(xtest20[3,:,:,:])
axs[2, 0].imshow(xtest20[4,:,:,:])
plt.show()
fig, axs = plt.subplots(nrows=3, ncols=2)
axs[0, 0].imshow(xtest21[0,:,:,0], cmap=plt.cm.gray)
axs[0, 1].imshow(xtest21[1,:,:,0], cmap=plt.cm.gray)
axs[1, 0].imshow(xtest21[2,:,:,0], cmap=plt.cm.gray)
axs[1, 1].imshow(xtest21[3,:,:,0], cmap=plt.cm.gray)
axs[2, 0].imshow(xtest21[4,:,:,0], cmap=plt.cm.gray)
plt.show()
In [14]:
with tf.Session() as sess:
saver.restore(sess, tf.train.latest_checkpoint('./lenet'))
r1 = sess.run(logits, feed_dict={x: xtest21})
print("Logits")
print(r1)
print(r1.shape)
print("")
print("")
print("softmax")
r2 = sess.run(tf.nn.softmax(logits), feed_dict={x: xtest21})
print(r2)
print(r2.shape)
for i in range(5):
print(i, ylabels[i], r2[i, ylabels[i]])
In [15]:
with tf.Session() as sess:
saver.restore(sess, tf.train.latest_checkpoint('./lenet'))
_, test_2_accuracy = evaluate(xtest21, ylabels)
print("Test 2 Accuracy = {:.3f}".format(test_2_accuracy))
For each of these 5 new images, show the 5 highest probabilities.
In [16]:
with tf.Session() as sess:
saver.restore(sess, tf.train.latest_checkpoint('./lenet'))
prob1 = sess.run(tf.nn.top_k(logits, k=5), feed_dict={x: xtest21})
print(prob1)
prob2 = sess.run(tf.nn.top_k(tf.nn.softmax(logits), k=5), feed_dict={x: xtest21})
print(prob2)
Model was able to predict only 2 out of 5 images, the first image with 100%. This first image contains the "stop" sign, which is the 14th label. The second was label 40th, Roundabout with 61%.
In [ ]: