In [ ]:
import warnings
with warnings.catch_warnings():
warnings.simplefilter(action='ignore', category=FutureWarning)
import tensorflow as tf
In [ ]:
# If you have a GPU, execute the following lines to restrict the amount of VRAM used:
gpus = tf.config.experimental.list_physical_devices('GPU')
if len(gpus) > 1:
print("Using GPU {}".format(gpus[0]))
tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
else:
print("Using CPU")
In [ ]:
import os
import random
import itertools
import tensorflow.keras.backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input, Concatenate, Lambda, Dot
from tensorflow.keras.layers import Conv2D, MaxPool2D, GlobalAveragePooling2D, Flatten, Dropout
from tensorflow.keras import optimizers
import numpy as np
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
This part is similar to previous notebook on siamese nets, you may just run the cells to get the necessary inputs
The dataset consists of folders corresponding to each identity. The folder name is the name of the person.
We map each class (identity) to an integer id, and build mappings as dictionaries name_to_classid
and classid_to_name
Set USE_SUBSET to False if you want to use the full dataset (GPU only!)
In [ ]:
PATH = "lfw/lfw-deepfunneled/"
USE_SUBSET = True
In [ ]:
dirs = sorted(os.listdir(PATH))
if USE_SUBSET:
dirs = dirs[:500]
name_to_classid = {d:i for i,d in enumerate(dirs)}
classid_to_name = {v:k for k,v in name_to_classid.items()}
num_classes = len(name_to_classid)
print("number of classes: "+str(num_classes))
In each directory, there is one or more images corresponding to the identity. We map each image path with an integer id, then build a few dictionaries:
path_to_id
and id_to_path
classid_to_ids
and id_to_classid
In [ ]:
# read all directories
img_paths = {c:[directory + "/" + img for img in sorted(os.listdir(PATH+directory))]
for directory,c in name_to_classid.items()}
# retrieve all images
all_images_path = []
for img_list in img_paths.values():
all_images_path += img_list
# map to integers
path_to_id = {v:k for k,v in enumerate(all_images_path)}
id_to_path = {v:k for k,v in path_to_id.items()}
In [ ]:
# build mappings between images and class
classid_to_ids = {k:[path_to_id[path] for path in v] for k,v in img_paths.items()}
id_to_classid = {v:c for c,imgs in classid_to_ids.items() for v in imgs}
The following histogram shows the number of images per class: there are many classes with only one image. These classes are useful as negatives, only as we can't make a positive pair with them.
Now that we have a way to compute the pairs, let's open all the possible images. It will expand all the images into RAM memory. There are more than 1000 images, so 100Mo of RAM will be used, which will not cause any issue.
Note: if you plan on opening more images, you should not open them all at once, and rather build a generator
In [ ]:
from skimage.io import imread
from skimage.transform import resize
def resize100(img):
return resize(img, (100, 100), preserve_range=True, mode='reflect', anti_aliasing=True)[20:80,20:80,:]
def open_all_images(id_to_path):
all_imgs = []
for path in id_to_path.values():
all_imgs += [np.expand_dims(resize100(imread(PATH+path)),0)]
return np.vstack(all_imgs)
In [ ]:
all_imgs = open_all_images(id_to_path)
mean = np.mean(all_imgs, axis=(0,1,2))
all_imgs -= mean
all_imgs.shape, str(all_imgs.nbytes / 1e6) + "Mo"
The following function builds a large number of positives/negatives pairs (train and test)
In the triplet loss model, we'll define 3 inputs $(a,+,-)$ for anchor, positive and negative.
We release the hard constraint that all data of the same class should be squashed to a single point. Rather, images representation can live on a manifold, as long as they are closer to similar class images than to different class images
On large datasets, with careful hyperparameters, triplets and more advances metric learning method beat siamese nets
We will build positive pairs, and find a way to sample negatives to obtain triplets Note that we don't need outputs anymore (positive vs negative), we're just building triplets
In [ ]:
def build_pos_pairs_for_id(classid, max_num=50):
imgs = classid_to_ids[classid]
if len(imgs) == 1:
return []
pos_pairs = list(itertools.combinations(imgs, 2))
random.shuffle(pos_pairs)
return pos_pairs[:max_num]
In [ ]:
def build_positive_pairs(class_id_range):
listX1 = []
listX2 = []
for class_id in class_id_range:
pos = build_pos_pairs_for_id(class_id)
for pair in pos:
listX1 += [pair[0]]
listX2 += [pair[1]]
perm = np.random.permutation(len(listX1))
return np.array(listX1)[perm], np.array(listX2)[perm]
In [ ]:
split_num = int(num_classes * 0.8)
In [ ]:
Xa_train, Xp_train = build_positive_pairs(range(0, split_num))
Xa_test, Xp_test = build_positive_pairs(range(split_num, num_classes-1))
# Gather the ids of all images that are used for train and test
all_img_train_idx = list(set(Xa_train) | set(Xp_train))
all_img_test_idx = list(set(Xa_test) | set(Xp_test))
We end up with 1177 different pairs, which we'll append with a random sample (as negative) in the generator
In [ ]:
Xa_train.shape, Xp_train.shape
In [ ]:
from imgaug import augmenters as iaa
seq = iaa.Sequential([
iaa.Fliplr(0.5), # horizontally flip 50% of the images
])
In [ ]:
class TripletGenerator(tf.keras.utils.Sequence):
def __init__(self, Xa_train, Xp_train, batch_size, all_imgs, neg_imgs_idx):
self.cur_img_index = 0
self.cur_img_pos_index = 0
self.batch_size = batch_size
self.imgs = all_imgs
self.Xa = Xa_train # Anchors
self.Xp = Xp_train
self.cur_train_index = 0
self.num_samples = Xa_train.shape[0]
self.neg_imgs_idx = neg_imgs_idx
def __len__(self):
return self.num_samples // self.batch_size
def __getitem__(self, batch_index):
low_index = batch_index * self.batch_size
high_index = (batch_index + 1) * self.batch_size
imgs_a = self.Xa[low_index:high_index] # Anchors
imgs_p = self.Xp[low_index:high_index] # Positives
imgs_n = random.sample(self.neg_imgs_idx, imgs_a.shape[0]) # Negatives
imgs_a = seq.augment_images(self.imgs[imgs_a])
imgs_p = seq.augment_images(self.imgs[imgs_p])
imgs_n = seq.augment_images(self.imgs[imgs_n])
# We also a null vector as placeholder for output, but it won't be needed:
return ([imgs_a, imgs_p, imgs_n], np.zeros(shape=(imgs_a.shape[0])))
In [ ]:
batch_size = 128
gen = TripletGenerator(Xa_train, Xp_train, batch_size, all_imgs, all_img_train_idx)
In [ ]:
len(all_img_test_idx), len(gen)
In [ ]:
[xa, xp, xn], y = gen[0]
xa.shape, xp.shape, xn.shape
In [ ]:
plt.figure(figsize=(16, 9))
for i in range(5):
plt.subplot(3, 5, i + 1)
plt.title("anchor")
plt.imshow((xa[i] + mean) / 255)
plt.axis('off')
for i in range(5):
plt.subplot(3, 5, i + 6)
plt.title("positive")
plt.imshow((xp[i] + mean) / 255)
plt.axis('off')
for i in range(5):
plt.subplot(3, 5, i + 11)
plt.title("negative")
plt.imshow((xn[i] + mean) / 255)
plt.axis('off')
plt.show()
As you can see, choosing randomly the negatives can be inefficient. For example it's reasonnable to think a old man will be a too easy negative if the anchor is a young woman.
In [ ]:
gen_test = TripletGenerator(Xa_test, Xp_test, 32, all_imgs, all_img_test_idx)
len(gen_test)
The loss of the triplet model is as follows:
$$ max(0, ||x_a - x_p||_2 - ||x_a - x_n||_2 + \alpha)$$We'll be using cosine similarities instead of euclidean distances (seems to be working a bit better in that case), so the loss becomes:
$$ max(0, cos(x_a, x_n) - cos(x_a - x_p) + \alpha)$$
In [ ]:
# Build a loss which doesn't take into account the y_true, as
# we'll be passing only 0
def identity_loss(y_true, y_pred):
return K.mean(y_pred - 0 * y_true)
# The real loss is here
def cosine_triplet_loss(X, margin=0.5):
positive_sim, negative_sim = X
# batch loss
losses = K.maximum(0.0, negative_sim - positive_sim + margin)
return K.mean(losses)
In [ ]:
class SharedConv(tf.keras.Model):
def __init__(self):
super().__init__(self, name="sharedconv")
self.conv1 = Conv2D(16, 3, activation="relu", padding="same")
self.conv2 = Conv2D(16, 3, activation="relu", padding="same")
self.pool1 = MaxPool2D((2,2)) # 30,30
self.conv3 = Conv2D(32, 3, activation="relu", padding="same")
self.conv4 = Conv2D(32, 3, activation="relu", padding="same")
self.pool2 = MaxPool2D((2,2)) # 15,15
self.conv5 = Conv2D(64, 3, activation="relu", padding="same")
self.conv6 = Conv2D(64, 3, activation="relu", padding="same")
self.pool3 = MaxPool2D((2,2)) # 8,8
self.conv7 = Conv2D(64, 3, activation="relu", padding="same")
self.conv8 = Conv2D(32, 3, activation="relu", padding="same")
self.flatten = Flatten()
self.dropout1 = Dropout(0.2)
self.fc1 = Dense(40, activation="tanh")
self.dropout2 = Dropout(0.2)
self.fc2 = Dense(64)
def call(self, inputs):
x = self.pool1(self.conv2(self.conv1(inputs)))
x = self.pool2(self.conv4(self.conv3(x)))
x = self.pool3(self.conv6(self.conv5(x)))
x = self.flatten(self.conv8(self.conv7(x)))
x = self.fc1(self.dropout1(x))
return self.fc2(self.dropout2(x))
shared_conv = SharedConv()
In [ ]:
class TripletNetwork(tf.keras.Model):
def __init__(self, shared_conv):
super().__init__(self, name="tripletnetwork")
# TODO
def call(self, inputs):
pass # TODO
model_triplet = TripletNetwork(shared_conv)
model_triplet.compile(loss=identity_loss, optimizer="rmsprop")
In [ ]:
# %load solutions/triplet.py
In [ ]:
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model
best_model_fname = "triplet_checkpoint_b2.h5"
best_model_cb = ModelCheckpoint(best_model_fname, monitor='val_loss',
save_best_only=True, verbose=1)
Warning
In [ ]:
history = model_triplet.fit(gen,
epochs=10,
validation_data = gen_test,
callbacks=[best_model_cb])
In [ ]:
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='validation')
plt.ylim(0, 0.5)
plt.legend(loc='best')
plt.title('Loss');
In [ ]:
model_triplet.load_weights("triplet_checkpoint_b2.h5")
Exercise
In [ ]:
# You may load this model
# Trained on triplets but with larger dataset
# Far from perfect !
# model_triplet.load_weights("triplet_pretrained.h5")
In [ ]:
emb = shared_conv.predict(all_imgs)
emb = emb / np.linalg.norm(emb, axis=-1, keepdims=True)
pixelwise = np.reshape(all_imgs, (all_imgs.shape[0], 60*60*3))
In [ ]:
def most_sim(idx, topn=5, mode="cosine"):
x = emb[idx]
if mode == "cosine":
x = x / np.linalg.norm(x)
sims = np.dot(emb, x)
ids = np.argsort(sims)[::-1]
return [(id,sims[id]) for id in ids[:topn]]
elif mode == "euclidean":
dists = np.linalg.norm(emb - x, axis=-1)
ids = np.argsort(dists)
return [(id,dists[id]) for id in ids[:topn]]
else:
dists = np.linalg.norm(pixelwise - pixelwise[idx], axis=-1)
ids = np.argsort(dists)
return [(id,dists[id]) for id in ids[:topn]]
In [ ]:
def display(img):
img = img.astype('uint8')
plt.imshow(img)
plt.axis('off')
plt.show()
In [ ]:
interesting_classes = list(filter(lambda x: len(x[1])>4, classid_to_ids.items()))
class_idx = random.choice(interesting_classes)[0]
print(class_idx)
img_idx = random.choice(classid_to_ids[class_idx])
for id, sim in most_sim(img_idx):
display(all_imgs[id] + mean)
print((classid_to_name[id_to_classid[id]], id, sim))
In [ ]:
test_ids = []
for class_id in range(split_num, num_classes-1):
img_ids = classid_to_ids[class_id]
if len(img_ids) > 1:
test_ids += img_ids
In [ ]:
print(len(test_ids))
In [ ]:
len([len(classid_to_ids[x]) for x in list(range(split_num, num_classes-1)) if len(classid_to_ids[x])>1])
In [ ]:
def recall_k(k=10, mode="embedding"):
num_found = 0
for img_idx in test_ids:
image_class = id_to_classid[img_idx]
found_classes = []
if mode == "embedding":
found_classes = [id_to_classid[x] for (x, score) in most_sim(img_idx, topn=k+1)[1:]]
elif mode == "random":
found_classes = [id_to_classid[x] for x in random.sample(
list(set(all_img_test_idx + all_img_train_idx) - {img_idx}), k)]
elif mode == "image":
found_classes = [id_to_classid[x] for (x, score) in most_sim(img_idx, topn=k+1, mode="image")[1:]]
if image_class in found_classes:
num_found += 1
return num_found / len(test_ids)
In [ ]:
recall_k(k=10), recall_k(k=10, mode="random")
In [ ]:
# Naive way to compute all similarities between all images. May be optimized!
def build_similarities(conv, all_imgs):
embs = conv.predict(all_imgs)
embs = embs / np.linalg.norm(embs, axis=-1, keepdims=True)
all_sims = np.dot(embs, embs.T)
return all_sims
def intersect(a, b):
return list(set(a) & set(b))
def build_negatives(anc_idxs, pos_idxs, similarities, neg_imgs_idx, num_retries=20):
# If no similarities were computed, return a random negative
if similarities is None:
return random.sample(neg_imgs_idx,len(anc_idxs))
final_neg = []
# for each positive pair
for (anc_idx, pos_idx) in zip(anc_idxs, pos_idxs):
anchor_class = id_to_classid[anc_idx]
#positive similarity
sim = similarities[anc_idx, pos_idx]
# find all negatives which are semi(hard)
possible_ids = np.where((similarities[anc_idx] + 0.25) > sim)[0]
possible_ids = intersect(neg_imgs_idx, possible_ids)
appended = False
for iteration in range(num_retries):
if len(possible_ids) == 0:
break
idx_neg = random.choice(possible_ids)
if id_to_classid[idx_neg] != anchor_class:
final_neg.append(idx_neg)
appended = True
break
if not appended:
final_neg.append(random.choice(neg_imgs_idx))
return final_neg
In [ ]:
class HardTripletGenerator(tf.keras.utils.Sequence):
def __init__(self, Xa_train, Xp_train, batch_size, all_imgs, neg_imgs_idx, conv):
self.batch_size = batch_size
self.imgs = all_imgs
self.Xa = Xa_train
self.Xp = Xp_train
self.num_samples = Xa_train.shape[0]
self.neg_imgs_idx = neg_imgs_idx
if conv:
print("Pre-computing similarities...", end=" ")
self.similarities = build_similarities(conv, self.imgs)
print("Done!")
else:
self.similarities = None
def __len__(self):
return self.num_samples // self.batch_size
def __getitem__(self, batch_index):
low_index = batch_index * self.batch_size
high_index = (batch_index + 1) * self.batch_size
imgs_a = self.Xa[low_index:high_index]
imgs_p = self.Xp[low_index:high_index]
imgs_n = build_negatives(imgs_a, imgs_p, self.similarities, self.neg_imgs_idx)
imgs_a = seq.augment_images(self.imgs[imgs_a])
imgs_p = seq.augment_images(self.imgs[imgs_p])
imgs_n = seq.augment_images(self.imgs[imgs_n])
return ([imgs_a, imgs_p, imgs_n], np.zeros(shape=(imgs_a.shape[0])))
In [ ]:
batch_size = 128
In [ ]:
gen_hard = HardTripletGenerator(Xa_train, Xp_train, batch_size, all_imgs, all_img_train_idx, shared_conv)
len(gen_hard)
In [ ]:
[xa, xp, xn], y = gen_hard[0]
xa.shape, xp.shape, xn.shape
In [ ]:
plt.figure(figsize=(16, 9))
for i in range(5):
plt.subplot(3, 5, i + 1)
plt.title("anchor")
plt.imshow((xa[i] + mean) / 255)
plt.axis('off')
for i in range(5):
plt.subplot(3, 5, i + 6)
plt.title("positive")
plt.imshow((xp[i] + mean) / 255)
plt.axis('off')
for i in range(5):
plt.subplot(3, 5, i + 11)
plt.title("negative")
plt.imshow((xn[i] + mean) / 255)
plt.axis('off')
plt.show()
In [ ]:
class SharedConv2(tf.keras.Model):
"""Improved version of SharedConv"""
def __init__(self):
super().__init__(self, name="sharedconv2")
self.conv1 = Conv2D(16, 3, activation="relu", padding="same")
self.conv2 = Conv2D(16, 3, activation="relu", padding="same")
self.pool1 = MaxPool2D((2,2)) # 30,30
self.conv3 = Conv2D(32, 3, activation="relu", padding="same")
self.conv4 = Conv2D(32, 3, activation="relu", padding="same")
self.pool2 = MaxPool2D((2,2)) # 15,15
self.conv5 = Conv2D(64, 3, activation="relu", padding="same")
self.conv6 = Conv2D(64, 3, activation="relu", padding="same")
self.pool3 = MaxPool2D((2,2)) # 8,8
self.conv7 = Conv2D(64, 3, activation="relu", padding="same")
self.conv8 = Conv2D(32, 3, activation="relu", padding="same")
self.flatten = Flatten()
self.dropout1 = Dropout(0.2)
self.fc1 = Dense(64)
def call(self, inputs):
x = self.pool1(self.conv2(self.conv1(inputs)))
x = self.pool2(self.conv4(self.conv3(x)))
x = self.pool3(self.conv6(self.conv5(x)))
x = self.flatten(self.conv8(self.conv7(x)))
return self.fc1(self.dropout1(x))
tf.random.set_seed(1337)
shared_conv2 = SharedConv2()
model_triplet2 = TripletNetwork(shared_conv2)
opt = optimizers.SGD(lr=0.001, decay=1e-6, momentum=0.9, nesterov=True)
model_triplet2.compile(loss=identity_loss, optimizer=opt)
In [ ]:
gen_test = TripletGenerator(Xa_test, Xp_test, 32, all_imgs, all_img_test_idx)
len(gen_test)
In [ ]:
# At first epoch we don't generate hard triplets so that our model can learn the easy examples first
gen_hard = HardTripletGenerator(Xa_train, Xp_train, batch_size, all_imgs, all_img_train_idx, None)
Note that we are re-creating a HardTripletGenerator at each epoch. By doing so, we re-compute the new hard negatives with the newly updated model. On larger scale this operation can take a lot of time, and could be done every X epochs (X > 1).
In [ ]:
loss, val_loss = [], []
best_model_fname_hard = "triplet_checkpoint_hard.h5"
best_val_loss = float("inf")
nb_epochs = 10
for epoch in range(nb_epochs):
print("built new hard generator for epoch " + str(epoch))
history = model_triplet2.fit(
gen_hard,
epochs=1,
validation_data = gen_test)
loss.extend(history.history["loss"])
val_loss.extend(history.history["val_loss"])
if val_loss[-1] < best_val_loss:
print("Saving best model")
model_triplet2.save_weights(best_model_fname_hard)
gen_hard = HardTripletGenerator(Xa_train, Xp_train, batch_size, all_imgs, all_img_train_idx, shared_conv2)
In [ ]:
plt.plot(loss, label='train')
plt.plot(val_loss, label='validation')
plt.ylim(0, 0.5)
plt.legend(loc='best')
plt.title('Loss');
You should see that the train loss is barely improving while the validation loss is decreasing. Remember that we are feeding the hardest triplets to the model!
In [ ]:
emb = shared_conv2.predict(all_imgs)
emb = emb / np.linalg.norm(emb, axis=-1, keepdims=True)
recall_k(k=10), recall_k(k=10, mode="random")
Let's run the improved convnet SharedConv2
without negative hardming in order to have a fair comparison:
In [ ]:
shared_conv2_nohard = SharedConv2()
model_triplet2_nohard = TripletNetwork(shared_conv2_nohard)
opt = optimizers.SGD(lr=0.001, decay=1e-6, momentum=0.9, nesterov=True)
model_triplet2_nohard.compile(loss=identity_loss, optimizer=opt)
gen_nohard = HardTripletGenerator(Xa_train, Xp_train, batch_size, all_imgs, all_img_train_idx, None)
history = model_triplet2_nohard.fit_generator(
generator=gen_nohard,
epochs=10,
validation_data=gen_test)
In [ ]:
plt.plot(loss, label='train (hardmining)')
plt.plot(val_loss, label='validation (hardmining)')
plt.plot(history.history["loss"], label="train")
plt.plot(history.history["val_loss"], label="validation")
plt.ylim(0, 0.5)
plt.legend(loc='best')
plt.title('Loss hardmining vs no hardmining');
In [ ]:
emb = shared_conv2_nohard.predict(all_imgs)
emb = emb / np.linalg.norm(emb, axis=-1, keepdims=True)
recall_k(k=10), recall_k(k=10, mode="random")
In [ ]: