USE_SUBSET=False
in the following cells,
In [ ]:
import warnings
with warnings.catch_warnings():
warnings.simplefilter(action='ignore', category=FutureWarning)
import tensorflow as tf
In [ ]:
# If you have a GPU, execute the following lines to restrict the amount of VRAM used:
gpus = tf.config.experimental.list_physical_devices('GPU')
if len(gpus) > 1:
print("Using GPU {}".format(gpus[0]))
tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
else:
print("Using CPU")
In [ ]:
import os
import random
import itertools
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input, Concatenate, Lambda, Dot
from tensorflow.keras.layers import Conv2D, MaxPool2D, GlobalAveragePooling2D, Flatten, Dropout
import numpy as np
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
In [ ]:
PATH = "lfw/lfw-deepfunneled/"
USE_SUBSET = True
In [ ]:
dirs = sorted(os.listdir(PATH))
if USE_SUBSET:
dirs = dirs[:500]
name_to_classid = {d: i for i, d in enumerate(dirs)}
classid_to_name = {v: k for k, v in name_to_classid.items()}
num_classes = len(name_to_classid)
print("number of classes: ", num_classes)
In each directory, there is one or more images corresponding to the identity. We map each image path with an integer id, then build a few dictionaries:
path_to_id
and id_to_path
classid_to_ids
and id_to_classid
In [ ]:
# read all directories
img_paths = {c: [PATH + subfolder + "/" + img
for img in sorted(os.listdir(PATH + subfolder))]
for subfolder, c in name_to_classid.items()}
# retrieve all images
all_images_path = []
for img_list in img_paths.values():
all_images_path += img_list
# map to integers
path_to_id = {v: k for k, v in enumerate(all_images_path)}
id_to_path = {v: k for k, v in path_to_id.items()}
In [ ]:
# build mappings between images and class
classid_to_ids = {k: [path_to_id[path] for path in v] for k, v in img_paths.items()}
id_to_classid = {v: c for c,imgs in classid_to_ids.items() for v in imgs}
dict(list(id_to_classid.items())[0:13])
The following histogram shows the number of images per class: there are many classes with only one image. These classes are useful as negatives, only as we can't make a positive pair with them.
In [ ]:
[(classid_to_name[x], len(classid_to_ids[x]))
for x in np.argsort([len(v) for k,v in classid_to_ids.items()])[::-1][:10]]
In [ ]:
plt.hist([len(v) for k,v in classid_to_ids.items()], bins=range(1,10))
plt.show()
In [ ]:
[(classid_to_name[x], len(classid_to_ids[x]))
for x in np.argsort([len(v) for k,v in classid_to_ids.items()])[::-1][:10]]
In [ ]:
# build pairs of positive image ids for a given classid
def build_pos_pairs_for_id(classid, max_num=50):
imgs = classid_to_ids[classid]
if len(imgs) == 1:
return []
pos_pairs = list(itertools.combinations(imgs, 2))
random.shuffle(pos_pairs)
return pos_pairs[:max_num]
In [ ]:
# build pairs of negative image ids for a given classid
def build_neg_pairs_for_id(classid, classes, max_num=20):
imgs = classid_to_ids[classid]
neg_classes_ids = random.sample(classes, max_num+1)
if classid in neg_classes_ids:
neg_classes_ids.remove(classid)
neg_pairs = []
for id2 in range(max_num):
img1 = imgs[random.randint(0,len(imgs)-1)]
imgs2 = classid_to_ids[neg_classes_ids[id2]]
img2 = imgs2[random.randint(0,len(imgs2)-1)]
neg_pairs += [(img1, img2)]
return neg_pairs
Let's build positive and a negative pairs for class 5
In [ ]:
build_pos_pairs_for_id(5, 20)
In [ ]:
build_neg_pairs_for_id(5, list(range(num_classes)), 6)
Now that we have a way to compute the pairs, let's open all the possible images. It will expand all the images into RAM memory. There are more than 1000 images, so 100Mo of RAM will be used, which will not cause any issue.
Note: if you plan on opening more images, you should not open them all at once, and rather build a generator
In [ ]:
from skimage.io import imread
from skimage.transform import resize
def resize100(img):
return resize(img, (100, 100), preserve_range=True, mode='reflect', anti_aliasing=True)[20:80,20:80,:]
def open_all_images(id_to_path):
all_imgs = []
for path in id_to_path.values():
all_imgs += [np.expand_dims(resize100(imread(path)),0)]
return np.vstack(all_imgs)
In [ ]:
all_imgs = open_all_images(id_to_path)
all_imgs.shape
In [ ]:
str(all_imgs.nbytes / 1e6) + "MB"
The following function builds a large number of positives/negatives pairs (train and test)
In [ ]:
def build_train_test_data(split=0.8):
listX1 = []
listX2 = []
listY = []
split = int(num_classes * split)
# train
for class_id in range(split):
pos = build_pos_pairs_for_id(class_id)
neg = build_neg_pairs_for_id(class_id, list(range(split)))
for pair in pos:
listX1 += [pair[0]]
listX2 += [pair[1]]
listY += [1]
for pair in neg:
if sum(listY) > len(listY) / 2:
listX1 += [pair[0]]
listX2 += [pair[1]]
listY += [0]
perm = np.random.permutation(len(listX1))
X1_ids_train = np.array(listX1)[perm]
X2_ids_train = np.array(listX2)[perm]
Y_ids_train = np.array(listY)[perm]
listX1 = []
listX2 = []
listY = []
#test
for id in range(split, num_classes):
pos = build_pos_pairs_for_id(id)
neg = build_neg_pairs_for_id(id, list(range(split, num_classes)))
for pair in pos:
listX1 += [pair[0]]
listX2 += [pair[1]]
listY += [1]
for pair in neg:
if sum(listY) > len(listY) / 2:
listX1 += [pair[0]]
listX2 += [pair[1]]
listY += [0]
X1_ids_test = np.array(listX1)
X2_ids_test = np.array(listX2)
Y_ids_test = np.array(listY)
return (X1_ids_train, X2_ids_train, Y_ids_train,
X1_ids_test, X2_ids_test, Y_ids_test)
In [ ]:
X1_ids_train, X2_ids_train, train_Y, X1_ids_test, X2_ids_test, test_Y = build_train_test_data()
In [ ]:
X1_ids_train.shape, X2_ids_train.shape, train_Y.shape
In [ ]:
np.mean(train_Y)
In [ ]:
X1_ids_test.shape, X2_ids_test.shape, test_Y.shape
In [ ]:
np.mean(test_Y)
Data augmentation and generator
We're building a generator, which will modify images through dataaugmentation on the fly. The generator enables We use iaa library which offers tremendous possibilities for data augmentation
In [ ]:
from imgaug import augmenters as iaa
seq = iaa.Sequential([
iaa.Fliplr(0.5), # horizontally flip 50% of the images
# You can add more transformation like random rotations, random change of luminance, etc.
])
In [ ]:
class Generator(tf.keras.utils.Sequence):
def __init__(self, X1, X2, Y, batch_size, all_imgs):
self.batch_size = batch_size
self.X1 = X1
self.X2 = X2
self.Y = Y
self.imgs = all_imgs
self.num_samples = Y.shape[0]
def __len__(self):
return self.num_samples // self.batch_size
def __getitem__(self, batch_index):
"""This method returns the `batch_index`-th batch of the dataset.
Keras choose by itself the order in which batches are created, and several may be created
in the same time using multiprocessing. Therefore, avoid any side-effect in this method!
"""
low_index = batch_index * self.batch_size
high_index = (batch_index + 1) * self.batch_size
imgs1 = seq.augment_images(self.imgs[self.X1[low_index:high_index]])
imgs2 = seq.augment_images(self.imgs[self.X2[low_index:high_index]])
targets = self.Y[low_index:high_index]
return ([imgs1, imgs2], targets)
In [ ]:
gen = Generator(X1_ids_train, X2_ids_train, train_Y, 32, all_imgs)
In [ ]:
print("Number of batches: {}".format(len(gen)))
In [ ]:
[x1, x2], y = gen[0]
x1.shape, x2.shape, y.shape
In [ ]:
plt.figure(figsize=(16, 6))
for i in range(6):
plt.subplot(2, 6, i + 1)
plt.imshow(x1[i] / 255)
plt.axis('off')
for i in range(6):
plt.subplot(2, 6, i + 7)
plt.imshow(x2[i] / 255)
if y[i]==1.0:
plt.title("similar")
else:
plt.title("different")
plt.axis('off')
plt.show()
Exercise
iaa.Affine
;Test images
In [ ]:
test_X1 = all_imgs[X1_ids_test]
test_X2 = all_imgs[X2_ids_test]
In [ ]:
test_X1.shape, test_X2.shape, test_Y.shape
In [ ]:
@tf.function
def contrastive_loss(y_true, y_pred, margin=0.25):
'''Contrastive loss from Hadsell-et-al.'06
http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf
'''
y_true = tf.cast(y_true, "float32")
return tf.reduce_mean( y_true * tf.square(1 - y_pred) +
(1 - y_true) * tf.square(tf.maximum(y_pred - margin, 0)))
In [ ]:
@tf.function
def accuracy_sim(y_true, y_pred, threshold=0.5):
'''Compute classification accuracy with a fixed threshold on similarity.
'''
y_thresholded = tf.cast(y_pred > threshold, "float32")
return tf.reduce_mean(tf.cast(tf.equal(y_true, y_thresholded), "float32"))
Exercise
Flatten
the last convolutional output and plug it into a dense layer.Dropout
prior to the Dense layer.Use between 32 and 128 channels on convolutional layers. Be careful: large convolutions on high dimensional images can be very slow on CPUs.
Try to run your randomly initialized shared_conv
model on a batch of the first 10 images from all_imgs
. What is the expected shape of the output?
In [ ]:
class SharedConv(tf.keras.Model):
def __init__(self):
super().__init__(self, name="sharedconv")
# TODO
def call(self, inputs):
# TODO
shared_conv = SharedConv()
In [ ]:
# %load solutions/shared_conv.py
In [ ]:
all_imgs.shape
In [ ]:
shared_conv.predict(all_imgs[:10]).shape
In [ ]:
shared_conv.summary()
Exercise
Assemble the siamese model by combining:
shared_conv
on both inputs;normalize=True
on the outputs of the two shared_conv
instance lanes;accuracy_sim
function defined previously as a metric.
In [ ]:
class Siamese(tf.keras.Model):
def __init__(self, shared_conv):
super().__init__(self, name="siamese")
# TODO
def call(self, inputs):
pass # TODO
model = Siamese(shared_conv)
model.compile(loss=contrastive_loss, optimizer='rmsprop', metrics=[accuracy_sim])
In [ ]:
# %load solutions/siamese.py
We can now fit the model and checkpoint it to keep the best version. We can expect to get a model with around 0.75 as "accuracy_sim" on the validation set:
In [ ]:
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model
best_model_fname = "siamese_checkpoint.h5"
best_model_cb = ModelCheckpoint(best_model_fname, monitor='val_accuracy_sim',
save_best_only=True, verbose=1)
In [ ]:
model.fit_generator(generator=gen,
epochs=15,
validation_data=([test_X1, test_X2], test_Y),
callbacks=[best_model_cb], verbose=2)
In [ ]:
model.load_weights("siamese_checkpoint.h5")
In [ ]:
# You may load a pre-trained model if you have the exact solution architecture.
# This model is a start, but far from perfect !
# model.load_weights("siamese_pretrained.h5")
Exercise
Finding the most similar images
test_images
folder to the test set;most_sim
function which returns the most similar vectors to a given vector.
In [ ]:
# TODO
emb = None
def most_sim(x, emb, topn=3):
return None
In [ ]:
# %load solutions/most_similar.py
Most similar faces
The following enables to display an image alongside with most similar images:
In [ ]:
def display(img):
img = img.astype('uint8')
plt.imshow(img)
plt.axis('off')
plt.show()
In [ ]:
interesting_classes = list(filter(lambda x: len(x[1]) > 4, classid_to_ids.items()))
class_id = random.choice(interesting_classes)[0]
query_id = random.choice(classid_to_ids[class_id])
print("query:", classid_to_name[class_id], query_id)
# display(all_imgs[query_id])
print("nearest matches")
for result_id, sim in most_sim(emb[query_id], emb):
class_name = classid_to_name.get(id_to_classid.get(result_id))
print(class_name, result_id, sim)
display(all_imgs[result_id])
Note that this model is still underfitting, even when running queries against the training set. Even if the results are not correct, the mistakes often seem to "make sense" though.
Running a model to convergence on higher resolution images, possibly with a deeper and wider convolutional network might yield better results. In the next notebook we will try with a better loss and with hard negative mining.
Playing with the camera
In [ ]:
import cv2
def camera_grab(camera_id=0, fallback_filename=None):
camera = cv2.VideoCapture(camera_id)
try:
# take 10 consecutive snapshots to let the camera automatically tune
# itself and hope that the contrast and lightning of the last snapshot
# is good enough.
for i in range(10):
snapshot_ok, image = camera.read()
if snapshot_ok:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
else:
print("WARNING: could not access camera")
if fallback_filename:
image = imread(fallback_filename)
finally:
camera.release()
return image
In [ ]:
image = camera_grab(camera_id=0,
fallback_filename='test_images/olivier/img_olivier_0.jpeg')
x = resize100(image)
out = shared_conv(np.reshape(x, (1, 60, 60, 3)))
print("query image:")
display(x)
for id, sim in most_sim(out[0], emb, topn=10):
class_name = classid_to_name.get(id_to_classid.get(id))
if class_name is None:
print(id)
print(class_name, id, sim)
display(all_imgs[id])
In [ ]: