Table to Contents
In [59]:
import pickle
import random
import sys
import cv2
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.contrib.layers import flatten
from sklearn.utils import shuffle
In [60]:
TRAINING_FILE = 'train.p'
VALIDATION_FILE = 'valid.p'
TESTING_FILE = 'test.p'
with open(TRAINING_FILE, mode='rb') as f:
train = pickle.load(f)
with open(VALIDATION_FILE, mode='rb') as f:
valid = pickle.load(f)
with open(TESTING_FILE, mode='rb') as f:
test = pickle.load(f)
X_train, y_train = train['features'], train['labels']
X_validation, y_validation = valid['features'], valid['labels']
X_test, y_test = test['features'], test['labels']
The pickled data is a dictionary with 4 key/value pairs:
'features' is a 4D array containing raw pixel data of the traffic sign images, (num examples, width, height, channels).'labels' is a 1D array containing the label/class id of the traffic sign. The file signnames.csv contains id -> name mappings for each id.'sizes' is a list containing tuples, (width, height) representing the original width and height the image.'coords' is a list containing tuples, (x1, y1, x2, y2) representing coordinates of a bounding box around the sign in the image.
In [61]:
# Prints the description of the dataset.
n_train = len(X_train)
n_test = len(X_test)
n_validation = len(X_validation)
index = random.randint(0, len(X_train))
image = X_train[index].squeeze()
image_shape = np.shape(image)
n_classes = len(set(y_train))
print("Number of training samples =", n_train)
print("Number of testing samples =", n_test)
print("Number of validation samples =", n_validation)
print("Image data shape =", image_shape)
print("Number of classes =", n_classes)
In [62]:
# Histogram for number of samples of each sign in the training set
count = pd.Series(y_train).value_counts()
count.plot.bar()
plt.show()
In [63]:
def show_image(data):
"""
Displays a random images from the dataset with the class label.
"""
num = 1
index = random.sample(range(len(data)), 4)
fig = plt.figure()
for n in index:
fig.add_subplot(2, 2, num)
image = data[n].squeeze()
plt.imshow(image)
plt.title("Class Label {}".format(n))
num += 1
fig.tight_layout()
plt.show()
%matplotlib inline
In [64]:
show_image(X_train)
In [65]:
show_image(X_validation)
In [66]:
show_image(X_test)
Before training the dataset, the date needs to be preprocessed just to make the computation faster.
So the first step is to normalise the date, I haven't taken the step to grayscale the data. I think the colour makes an important feature in case of signs because what if the signs are inverted and somehow colour could be an import way to recognise the sign.
In [67]:
X_train = (X_train - X_train.mean()) / (np.max(X_train) - np.min(X_train))
X_validation = (X_validation - X_validation.mean()) / (np.max(X_validation) - np.min(X_validation))
X_test = (X_test - X_test.mean()) / (np.max(X_test) - np.min(X_test))
In [68]:
show_image(X_train)
In [69]:
EPOCHS = 100
BATCH_SIZE = 100
RATE = 0.001
MU = 0
SIGMA = 0.1
PADDING = 'VALID'
In [70]:
# Shuffle the data
X_train, y_train = shuffle(X_train, y_train)
In [71]:
def lenet(x_input):
"""
CNN for the dataset.
Parameters
----------
x_input
Returns
-------
tensor
"""
# Layer 1: Convolutional. Input = 32x32x1. Output = 28x28x6.
conv1_w = tf.Variable(tf.truncated_normal(shape=(5, 5, 3, 6), mean=MU, stddev=SIGMA))
conv1_b = tf.Variable(tf.zeros(6))
conv1 = tf.nn.conv2d(x_input, conv1_w, strides=[1, 1, 1, 1], padding=PADDING) + conv1_b
# Activation.
conv1 = tf.nn.relu(conv1)
# Pooling. Input = 28x28x6. Output = 14x14x6.
conv1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding=PADDING)
# Layer 2: Convolutional. Output = 10x10x16.
conv2_w = tf.Variable(tf.truncated_normal(shape=(5, 5, 6, 16), mean=MU, stddev=SIGMA))
conv2_b = tf.Variable(tf.zeros(16))
conv2 = tf.nn.conv2d(conv1, conv2_w, strides=[1, 1, 1, 1], padding=PADDING) + conv2_b
# Activation.
conv2 = tf.nn.relu(conv2)
# Pooling. Input = 10x10x16. Output = 5x5x16.
conv2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID')
# Flatten. Input = 5x5x16. Output = 400.
fc0 = flatten(conv2)
# Layer 3: Fully Connected. Input = 400. Output = 120.
fc1_w = tf.Variable(tf.truncated_normal(shape=(400, 120), mean=MU, stddev=SIGMA))
fc1_b = tf.Variable(tf.zeros(120))
fc1 = tf.matmul(fc0, fc1_w) + fc1_b
# Activation.
fc1 = tf.nn.relu(fc1)
# Layer 4: Fully Connected. Input = 120. Output = 84.
fc2_w = tf.Variable(tf.truncated_normal(shape=(120, 84), mean=MU, stddev=SIGMA))
fc2_b = tf.Variable(tf.zeros(84))
fc2 = tf.matmul(fc1, fc2_w) + fc2_b
# Activation.
fc2 = tf.nn.relu(fc2)
# Layer 5: Fully Connected. Input = 84. Output = 43.
fc3_w = tf.Variable(tf.truncated_normal(shape=(84, 43), mean=MU, stddev=SIGMA))
fc3_b = tf.Variable(tf.zeros(43))
_logits = tf.matmul(fc2, fc3_w) + fc3_b
return _logits
In [72]:
x = tf.placeholder(tf.float32, (None, 32, 32, 3))
y = tf.placeholder(tf.int32, (None))
one_hot_y = tf.one_hot(y, 43)
In [73]:
# Get the model
logits = lenet(x)
# loss and optimizer
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=one_hot_y)
loss_operation = tf.reduce_mean(cross_entropy)
optimizer = tf.train.AdamOptimizer(learning_rate=RATE)
training_operation = optimizer.minimize(loss_operation)
# Check if correctly predicted.
correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))
accuracy_operation = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
saver = tf.train.Saver()
In [74]:
def evaluate(X_data, y_data):
"""
Evaluates the total accuracy for the given inputs.
Parameters
----------
X_data
y_data
Returns
-------
numpy.float64
"""
num_examples = len(X_data)
total_accuracy = 0
with sess.as_default():
for offset in range(0, num_examples, BATCH_SIZE):
batch_x, batch_y = X_data[offset:offset + BATCH_SIZE], y_data[offset:offset + BATCH_SIZE]
accuracy = sess.run(accuracy_operation, feed_dict={x: batch_x, y: batch_y})
total_accuracy += (accuracy * len(batch_x))
return total_accuracy / num_examples
In [75]:
# Running the training
sess = tf.Session()
sess.run(tf.global_variables_initializer())
num_examples = len(X_train)
validation_accuracy = 0
print("Training...")
print()
for i in range(EPOCHS):
if validation_accuracy >= 0.95:
break
_X_train, _y_train = shuffle(X_train, y_train)
for offset in range(0, num_examples, BATCH_SIZE):
end = offset + BATCH_SIZE
batch_x, batch_y = _X_train[offset:end], _y_train[offset:end]
sess.run(training_operation, feed_dict={x: batch_x, y: batch_y})
validation_accuracy = evaluate(X_validation, y_validation)
test_accuracy = evaluate(X_test, y_test)
print("EPOCH {} ...".format(i + 1))
print("Test Accuracy = {:.3f}".format(test_accuracy))
print("Validation Accuracy = {:.3f}".format(validation_accuracy))
saver.save(sess, './lenet')
print("Model saved")
print()
print()
print("Batch size: ", BATCH_SIZE)
print("Number of epochs: ", EPOCHS)
print("Learning rate: ", RATE)
print("Mean for Truncated Normal: ", MU)
print("Standard devation for Truncated Normal: ", SIGMA)
print("Convolution type: ", PADDING)
In [76]:
from PIL import Image, ImageStat
def load_image(file_name):
"""
Loads and converts the image into an NumPy array.
"""
try:
image = mpimg.imread(file_name) # Reading a image file.
print("Image type: {}, and it's shape: {}".format(type(image), image.shape))
except FileNotFoundError as e:
print(e)
sys.exit(1)
return image
def show_image_test(image):
plt.figure()
plt.imshow(image)
plt.show()
def get_more_info(path):
img = Image.open(path)
width, height = img.size
# Brightness
im = img.convert('L')
stat = ImageStat.Stat(im)
print("Height: {} and Width: {}".format(width, height))
print("Brightness: ", stat.mean[0])
print("Mode: ", img.mode)
print("Contrast: ", stat.rms[0])
In [77]:
im1_l = 'images/no_entry.jpg'
im1 = load_image(im1_l)
get_more_info(im1_l)
show_image_test(im1)
In [55]:
im2_l = 'images/10.jpg'
im2 = load_image(im2_l)
get_more_info(im2_l)
show_image_test(im2)
In [56]:
im3_l = 'images/wrong_way.jpg'
im3 = load_image(im3_l)
get_more_info(im2_l)
show_image_test(im3)
In [57]:
im4_l = 'images/give_way_sign.jpg'
im4 = load_image(im4_l)
get_more_info(im4_l)
show_image_test(im4)
In [58]:
im5_l = 'images/stop.jpg'
im5 = load_image(im5_l)
get_more_info(im5_l)
show_image_test(im5)
In [78]:
im1_scaled = cv2.resize(im1, (32, 32))
show_image_test(im1_scaled)
im2_cropped = im2[0:150, 60:210, :]
im2_scaled = cv2.resize(im2_cropped, (32, 32))
show_image_test(im2_scaled)
im3_scaled = cv2.resize(im3, (32, 32))
show_image_test(im3_scaled)
im4_cropped = im4[:, 0:225, :]
im4_scaled = cv2.resize(im4_cropped, (32, 32))
show_image_test(im4_scaled)
im5_cropped = im5[0:400, 250:650, :]
im5_scaled = cv2.resize(im5_cropped, (32, 32))
show_image_test(im5_scaled)
Predicting the new data with the trained model.
In [79]:
def show_pred_image(index):
"""
Plot the index of the prediced value.
"""
plt.imshow(X_train[index])
In [80]:
def prediction(data):
"""
Reload the saved model and predict the input data.
"""
with tf.Session() as sess:
saver.restore(sess, tf.train.latest_checkpoint('.'))
classification = sess.run(tf.argmax(logits, 1), feed_dict={x: [data]})
print(classification)
return classification
In [153]:
pred1 = prediction(im1_scaled)
show_image_test(im1_scaled)
show_pred_image(pred1[0])
In [95]:
pred2 = prediction(im2_scaled)
show_image_test(im2_scaled)
show_pred_image(pred2[0])
In [96]:
pred3 = prediction(im3_scaled)
show_image_test(im3_scaled)
show_pred_image(pred3[0])
In [97]:
pred4 = prediction(im4_scaled)
show_image_test(im4_scaled)
show_pred_image(pred4[0])
In [98]:
pred5 = prediction(im5_scaled)
show_image_test(im5_scaled)
show_pred_image(pred5[0])
In [156]:
from IPython.display import display
def get_softmax(img):
"""
Get the top 5 SoftMax probability.
"""
softmax = tf.nn.softmax(logits=logits)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
output = sess.run(softmax, feed_dict={x: [img]})
out = sess.run(tf.nn.top_k(tf.constant(output), k=5))
print(out)
return out
def read_signs_csv():
"""
Read the csv file.
"""
return pd.read_csv('signnames.csv', delimiter=",")
def plot_max(probabilities, labels):
"""
Plot bar chart.
"""
y_pos = np.arange(len(labels))
plt.bar(y_pos, probabilities, align='center', alpha=0.5)
plt.xticks(y_pos, labels)
plt.ylabel('Probability')
plt.xlabel('Traffic sign')
plt.title('Model\'s certainty of its predictions')
l = read_signs_csv()
plt.show()
content = zip(probabilities, labels)
content1 = []
for prob, lab in content:
content1.append({'Lable': lab, 'Name': l.loc[lab]['SignName'], 'Probability': '{:.1%}'.format(prob)})
df = pd.DataFrame(content1)
pt = display(df)
pt
In [134]:
i1 = get_softmax(im1_scaled)
st = read_signs_csv()
print()
print('Predicted Sign - {0}, Actual sign - {1}'.format(st.loc[i1[1][0][0]]['SignName'], 'No Entry'))
plot_max(i1[0][0], i1[1][0])
In [162]:
# Probability on test data
i1_1 = get_softmax(X_test[17])
print()
print('Predicted Sign - {0}, Actual sign - {1}'.format(st.loc[i1_1[1][0][0]]['SignName'], 'No Entry'))
plot_max(i1_1[0][0], i1_1[1][0])
In [135]:
i2 = get_softmax(im2_scaled)
st = read_signs_csv()
print()
print('Predicted Sign - {0}, Actual sign - {1}'.format(st.loc[i2[1][0][0]]['SignName'], '10 KM'))
plot_max(i2[0][0], i2[1][0])
There is no 10 KM sign in the test data.
In [137]:
i3 = get_softmax(im3_scaled)
st = read_signs_csv()
print()
print('Predicted Sign - {0}, Actual sign - {1}'.format(st.loc[i3[1][0][0]]['SignName'], 'Wrong Way'))
plot_max(i3[0][0], i3[1][0])
There is no Wrong Way sign in the test data.
In [139]:
i4 = get_softmax(im4_scaled)
st = read_signs_csv()
print()
print('Predicted Sign - {0}, Actual sign - {1}'.format(st.loc[i4[1][0][0]]['SignName'], 'Give Way'))
plot_max(i4[0][0], i4[1][0])
There is no Give Way in test data.
In [146]:
i5 = get_softmax(im5_scaled)
st = read_signs_csv()
print()
print('Predicted Sign - {0}, Actual sign - {1}'.format(st.loc[i5[1][0][0]]['SignName'], 'Stop'))
plot_max(i5[0][0], i5[1][0])
In [163]:
# Probability on test data
i5_1 = get_softmax(X_test[14])
print()
print('Predicted Sign - {0}, Actual sign - {1}'.format(st.loc[i5_1[1][0][0]]['SignName'], 'Stop'))
plot_max(i5_1[0][0], i5_1[1][0])
In [127]:
def outputFeatureMap(image_input, tf_activation, activation_min=-1, activation_max=-1 ,plt_num=1):
activation = tf_activation.eval(session=sess,feed_dict={x : [image_input]})
featuremaps = activation.shape[3]
plt.figure(plt_num, figsize=(15,15))
for featuremap in range(featuremaps):
plt.subplot(6,8, featuremap+1) # sets the number of feature maps to show on each row and column
plt.title('FeatureMap ' + str(featuremap)) # displays the feature map number
if activation_min != -1 & activation_max != -1:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmin =activation_min, vmax=activation_max, cmap="gray")
elif activation_max != -1:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmax=activation_max, cmap="gray")
elif activation_min !=-1:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", vmin=activation_min, cmap="gray")
else:
plt.imshow(activation[0,:,:, featuremap], interpolation="nearest", cmap="gray")
In [128]:
with tf.Session() as sess:
hidden_layer = tf.nn.relu(x)
outputFeatureMap(im5_scaled, hidden_layer, plt_num=1)
Answer
| Layers | Inputs and Outputs | Computation |
|---|---|---|
| Convolution 1 | Inputs: 32x32x1 Output: 28x28x6 |
Truncated Normal Mean: 0 Standard Devation: 0.1 Shape: (5, 5, 3, 6) 2D Convolutions strides: [1, 1, 1, 1] padding: VALID ReLU Activation Max Pooling padding: VALID ksize: [1, 2, 2, 1] strides: [1, 2, 2, 1] padding: VALID |
| Convolution 2 | Inputs: 14x14x6 Output: 10x10x16 |
Truncated Normal Mean: 0 Standard Devation: 0.1 Shape: (5, 5, 3, 6) 2D Convolutions strides: [1, 1, 1, 1] padding: VALID ReLU Activation Max Pooling padding: VALID ksize: [1, 2, 2, 1] strides: [1, 2, 2, 1] padding: VALID |
| Flatten (Reshape) | Input: 10x10x16 Output: 400 |
|
| Fully Connected 1 | Input: 400 Output: 120 |
Truncated Normal shape: (400, 120) mean: 0 sigma: 0.1 WX+B ReLU Activation |
| Fully Connected 2 | Input: 120 Output: 84 |
Truncated Normal shape: (120, 84) mean: 0 sigma: 0.1 WX+B ReLU Activation |
| Output | Input: 84 Output:43 |
Truncated Normal shape: (84, 43) mean: 0 sigma: 0.1 WX+B ReLU Activation |
Answer
I have done 200 experiments changing the BATCH number from 1 to 200, and RATE from 0.01 to 0.001 (randomly). The results are as follows:
In [129]:
import os
import re
def atoi(text):
"""
Checks if the file names contain numbers.
Parameters
----------
text
This parameter could be a str or int.
Returns
-------
flow : int, str
"""
flow = int(text) if text.isdigit() else text
return flow
def natural_keys(text):
"""
Splits the number from the file name.
Parameters
----------
text
A str parameter with number should be give, so the this method could split the contents.
Returns
-------
flow : list
List of strings.
"""
flow = [atoi(c) for c in re.split('(\d+)', text)]
return flow
file_names = [filename for filename in os.listdir("log/1/") if filename.startswith("val")]
file_names.sort(key=natural_keys)
data = []
for names in file_names:
data.append(np.mean(np.genfromtxt('log/1/' + names,delimiter=',')))
plt.plot(data)
plt.show()
Looking at the graph, the batch size between 100 to 105 seems to be a controlled state, anything after that looks to fluctuating a lot.