Face verification

Goals

  • train a network for face similarity using triplet loss
  • work data augmentation, generators and hard negative mining

Dataset

  • We will be using Labeled Faces in the Wild (LFW) dataset available openly at http://vis-www.cs.umass.edu/lfw/
  • For computing purposes, we'll only restrict ourselves to a subpart of the dataset. You're welcome to train on the whole dataset on GPU, by changing the PATH in the following cells, and in data download
  • We will also load pretrained weights

In [ ]:
import warnings

with warnings.catch_warnings():
    warnings.simplefilter(action='ignore', category=FutureWarning)
    import tensorflow as tf

In [ ]:
# If you have a GPU, execute the following lines to restrict the amount of VRAM used:
gpus = tf.config.experimental.list_physical_devices('GPU')
if len(gpus) > 1:
    print("Using GPU {}".format(gpus[0]))
    tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
else:
    print("Using CPU")

In [ ]:
import os
import random
import itertools

import tensorflow.keras.backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input, Concatenate, Lambda, Dot
from tensorflow.keras.layers import Conv2D, MaxPool2D, GlobalAveragePooling2D, Flatten, Dropout
from tensorflow.keras import optimizers
import numpy as np
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

Processing the dataset

This part is similar to previous notebook on siamese nets, you may just run the cells to get the necessary inputs

The dataset consists of folders corresponding to each identity. The folder name is the name of the person. We map each class (identity) to an integer id, and build mappings as dictionaries name_to_classid and classid_to_name

Set USE_SUBSET to False if you want to use the full dataset (GPU only!)


In [ ]:
PATH = "lfw/lfw-deepfunneled/"
USE_SUBSET = True

In [ ]:
dirs = sorted(os.listdir(PATH))
if USE_SUBSET:
    dirs = dirs[:500]
    
name_to_classid = {d:i for i,d in enumerate(dirs)}
classid_to_name = {v:k for k,v in name_to_classid.items()}
num_classes = len(name_to_classid)
print("number of classes: "+str(num_classes))

In each directory, there is one or more images corresponding to the identity. We map each image path with an integer id, then build a few dictionaries:

  • mappings from imagepath and image id: path_to_id and id_to_path
  • mappings from class id to image ids: classid_to_ids and id_to_classid

In [ ]:
# read all directories
img_paths = {c:[directory + "/" + img for img in sorted(os.listdir(PATH+directory))] 
             for directory,c in name_to_classid.items()}

# retrieve all images
all_images_path = []
for img_list in img_paths.values():
    all_images_path += img_list

# map to integers
path_to_id = {v:k for k,v in enumerate(all_images_path)}
id_to_path = {v:k for k,v in path_to_id.items()}

In [ ]:
# build mappings between images and class
classid_to_ids = {k:[path_to_id[path] for path in v] for k,v in img_paths.items()}
id_to_classid = {v:c for c,imgs in classid_to_ids.items() for v in imgs}

The following histogram shows the number of images per class: there are many classes with only one image. These classes are useful as negatives, only as we can't make a positive pair with them.

Now that we have a way to compute the pairs, let's open all the possible images. It will expand all the images into RAM memory. There are more than 1000 images, so 100Mo of RAM will be used, which will not cause any issue.

Note: if you plan on opening more images, you should not open them all at once, and rather build a generator


In [ ]:
from skimage.io import imread
from skimage.transform import resize

def resize100(img):
    return resize(img, (100, 100), preserve_range=True, mode='reflect', anti_aliasing=True)[20:80,20:80,:]

def open_all_images(id_to_path):
    all_imgs = []
    for path in id_to_path.values():
        all_imgs += [np.expand_dims(resize100(imread(PATH+path)),0)]
    return np.vstack(all_imgs)

In [ ]:
all_imgs = open_all_images(id_to_path)
mean = np.mean(all_imgs, axis=(0,1,2))
all_imgs -= mean
all_imgs.shape, str(all_imgs.nbytes / 1e6) + "Mo"

The following function builds a large number of positives/negatives pairs (train and test)

Triplet loss

In the triplet loss model, we'll define 3 inputs $(a,+,-)$ for anchor, positive and negative.

Usage and differences with siamese nets

We release the hard constraint that all data of the same class should be squashed to a single point. Rather, images representation can live on a manifold, as long as they are closer to similar class images than to different class images

On large datasets, with careful hyperparameters, triplets and more advances metric learning method beat siamese nets

Outline

We will build positive pairs, and find a way to sample negatives to obtain triplets Note that we don't need outputs anymore (positive vs negative), we're just building triplets


In [ ]:
def build_pos_pairs_for_id(classid, max_num=50):
    imgs = classid_to_ids[classid]
    if len(imgs) == 1:
        return []
    
    pos_pairs = list(itertools.combinations(imgs, 2))
    
    random.shuffle(pos_pairs)
    return pos_pairs[:max_num]

In [ ]:
def build_positive_pairs(class_id_range):
    listX1 = []
    listX2 = []
    
    for class_id in class_id_range:
        pos = build_pos_pairs_for_id(class_id)
        for pair in pos:
            listX1 += [pair[0]]
            listX2 += [pair[1]]
            
    perm = np.random.permutation(len(listX1))
    return np.array(listX1)[perm], np.array(listX2)[perm]

In [ ]:
split_num = int(num_classes * 0.8)

In [ ]:
Xa_train, Xp_train = build_positive_pairs(range(0, split_num))
Xa_test, Xp_test = build_positive_pairs(range(split_num, num_classes-1))

# Gather the ids of all images that are used for train and test
all_img_train_idx = list(set(Xa_train) | set(Xp_train))
all_img_test_idx = list(set(Xa_test) | set(Xp_test))

We end up with 1177 different pairs, which we'll append with a random sample (as negative) in the generator


In [ ]:
Xa_train.shape, Xp_train.shape

In [ ]:
from imgaug import augmenters as iaa

seq = iaa.Sequential([
    iaa.Fliplr(0.5), # horizontally flip 50% of the images
])

In [ ]:
class TripletGenerator(tf.keras.utils.Sequence):
    def __init__(self, Xa_train, Xp_train, batch_size, all_imgs, neg_imgs_idx):
        self.cur_img_index = 0
        self.cur_img_pos_index = 0
        self.batch_size = batch_size
        
        self.imgs = all_imgs
        self.Xa = Xa_train  # Anchors
        self.Xp = Xp_train
        self.cur_train_index = 0
        self.num_samples = Xa_train.shape[0]
        self.neg_imgs_idx = neg_imgs_idx
        
    def __len__(self):
        return self.num_samples // self.batch_size
        
    def __getitem__(self, batch_index):
        low_index = batch_index * self.batch_size
        high_index = (batch_index + 1) * self.batch_size

        imgs_a = self.Xa[low_index:high_index]  # Anchors
        imgs_p = self.Xp[low_index:high_index]  # Positives
        imgs_n = random.sample(self.neg_imgs_idx, imgs_a.shape[0])  # Negatives
            
        imgs_a = seq.augment_images(self.imgs[imgs_a])
        imgs_p = seq.augment_images(self.imgs[imgs_p])
        imgs_n = seq.augment_images(self.imgs[imgs_n])
            
        # We also a null vector as placeholder for output, but it won't be needed:
        return ([imgs_a, imgs_p, imgs_n], np.zeros(shape=(imgs_a.shape[0])))

In [ ]:
batch_size = 128
gen = TripletGenerator(Xa_train, Xp_train, batch_size, all_imgs, all_img_train_idx)

In [ ]:
len(all_img_test_idx), len(gen)

In [ ]:
[xa, xp, xn], y = gen[0]

xa.shape, xp.shape, xn.shape

In [ ]:
plt.figure(figsize=(16, 9))

for i in range(5):
    plt.subplot(3, 5, i + 1)
    plt.title("anchor")
    plt.imshow((xa[i] + mean) / 255)
    plt.axis('off')
    
for i in range(5):
    plt.subplot(3, 5, i + 6)
    plt.title("positive")
    plt.imshow((xp[i] + mean) / 255)
    plt.axis('off')
    
for i in range(5):
    plt.subplot(3, 5, i + 11)
    plt.title("negative")
    plt.imshow((xn[i] + mean) / 255)
    plt.axis('off')
    
plt.show()

As you can see, choosing randomly the negatives can be inefficient. For example it's reasonnable to think a old man will be a too easy negative if the anchor is a young woman.


In [ ]:
gen_test = TripletGenerator(Xa_test, Xp_test, 32, all_imgs, all_img_test_idx)
len(gen_test)

Triplet Model

The loss of the triplet model is as follows:

$$ max(0, ||x_a - x_p||_2 - ||x_a - x_n||_2 + \alpha)$$

We'll be using cosine similarities instead of euclidean distances (seems to be working a bit better in that case), so the loss becomes:

$$ max(0, cos(x_a, x_n) - cos(x_a - x_p) + \alpha)$$

In [ ]:
# Build a loss which doesn't take into account the y_true, as
# we'll be passing only 0
def identity_loss(y_true, y_pred):
    return K.mean(y_pred - 0 * y_true)

# The real loss is here
def cosine_triplet_loss(X, margin=0.5):
    positive_sim, negative_sim = X
    
    # batch loss
    losses = K.maximum(0.0, negative_sim - positive_sim + margin)
    
    return K.mean(losses)

Shared Convolutional Network

  • You may as well build your own

In [ ]:
class SharedConv(tf.keras.Model):
    def __init__(self):
        super().__init__(self, name="sharedconv")
        
        self.conv1 = Conv2D(16, 3, activation="relu", padding="same")
        self.conv2 = Conv2D(16, 3, activation="relu", padding="same")
        self.pool1 = MaxPool2D((2,2)) # 30,30
        self.conv3 = Conv2D(32, 3, activation="relu", padding="same")
        self.conv4 = Conv2D(32, 3, activation="relu", padding="same")
        self.pool2 = MaxPool2D((2,2)) # 15,15
        self.conv5 = Conv2D(64, 3, activation="relu", padding="same")
        self.conv6 = Conv2D(64, 3, activation="relu", padding="same")
        self.pool3 = MaxPool2D((2,2)) # 8,8
        self.conv7 = Conv2D(64, 3, activation="relu", padding="same")
        self.conv8 = Conv2D(32, 3, activation="relu", padding="same")
        self.flatten = Flatten()
        self.dropout1 = Dropout(0.2)
        self.fc1 = Dense(40, activation="tanh")
        self.dropout2 = Dropout(0.2)
        self.fc2 = Dense(64)
    
    def call(self, inputs):
        x = self.pool1(self.conv2(self.conv1(inputs)))
        x = self.pool2(self.conv4(self.conv3(x)))
        x = self.pool3(self.conv6(self.conv5(x)))
        x = self.flatten(self.conv8(self.conv7(x)))
        
        x = self.fc1(self.dropout1(x))
        return self.fc2(self.dropout2(x))
    
shared_conv = SharedConv()

Triplet Model

Exercise

  • Build the triplet model, using the skeleton below using the OOP Keras API
  • First run the 3 inputs through the shared conv
  • Then compute positive and negative similarities
  • Then call the triplet loss function using a Lambda layer

In [ ]:
class TripletNetwork(tf.keras.Model):
    def __init__(self, shared_conv):
        super().__init__(self, name="tripletnetwork")
        # TODO
        
    def call(self, inputs):
        pass  # TODO
   
model_triplet = TripletNetwork(shared_conv)
model_triplet.compile(loss=identity_loss, optimizer="rmsprop")

In [ ]:
# %load solutions/triplet.py

In [ ]:
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model


best_model_fname = "triplet_checkpoint_b2.h5"
best_model_cb = ModelCheckpoint(best_model_fname, monitor='val_loss',
                                save_best_only=True, verbose=1)

Warning

  • You will need to run on GPU if you're on the large dataset
  • On the small dataset, the model sometimes takes a few epochs before starting to decrease the loss
  • This can be due to the init, learning rate, or too much dropout / augmentation

In [ ]:
history = model_triplet.fit(gen, 
                    epochs=10,
                    validation_data = gen_test,
                    callbacks=[best_model_cb])

In [ ]:
plt.plot(history.history['loss'], label='train')
plt.plot(history.history['val_loss'], label='validation')
plt.ylim(0, 0.5)
plt.legend(loc='best')
plt.title('Loss');

In [ ]:
model_triplet.load_weights("triplet_checkpoint_b2.h5")

Exercise

  • What do you observe?
  • Try to make changes to the model / parameters to get a better convergence, you should be able to have much better result than with the ConvNet we gave you
  • Try to add data augmentation, or increase the size of the training set
  • You might want to be on GPU for testing several architectures, even on the small set

In [ ]:
# You may load this model
# Trained on triplets but with larger dataset
# Far from perfect ! 
# model_triplet.load_weights("triplet_pretrained.h5")

 Displaying similar images


In [ ]:
emb = shared_conv.predict(all_imgs)
emb = emb / np.linalg.norm(emb, axis=-1, keepdims=True)
pixelwise = np.reshape(all_imgs, (all_imgs.shape[0], 60*60*3))

In [ ]:
def most_sim(idx, topn=5, mode="cosine"):
    x = emb[idx]
    if mode == "cosine":
        x = x / np.linalg.norm(x)
        sims = np.dot(emb, x)
        ids = np.argsort(sims)[::-1]
        return [(id,sims[id]) for id in ids[:topn]]
    elif mode == "euclidean":
        dists = np.linalg.norm(emb - x, axis=-1)
        ids = np.argsort(dists)
        return [(id,dists[id]) for id in ids[:topn]]
    else:
        dists = np.linalg.norm(pixelwise - pixelwise[idx], axis=-1)
        ids = np.argsort(dists)
        return [(id,dists[id]) for id in ids[:topn]]

In [ ]:
def display(img):
    img = img.astype('uint8')
    plt.imshow(img)
    plt.axis('off')
    plt.show()

In [ ]:
interesting_classes = list(filter(lambda x: len(x[1])>4, classid_to_ids.items()))
class_idx = random.choice(interesting_classes)[0]
print(class_idx)
img_idx = random.choice(classid_to_ids[class_idx])
for id, sim in most_sim(img_idx):
    display(all_imgs[id] + mean)
    print((classid_to_name[id_to_classid[id]], id, sim))

Test Recall@k model

for each test class with > 1 image, pick image at random, and compute similarity with all other images compute recall @k: is the correct class within the k first images


In [ ]:
test_ids = []
for class_id in range(split_num, num_classes-1):
    img_ids = classid_to_ids[class_id]
    if len(img_ids) > 1:
        test_ids += img_ids

In [ ]:
print(len(test_ids))

In [ ]:
len([len(classid_to_ids[x]) for x in list(range(split_num, num_classes-1)) if len(classid_to_ids[x])>1])

In [ ]:
def recall_k(k=10, mode="embedding"):
    num_found = 0
    for img_idx in test_ids:
        image_class = id_to_classid[img_idx]
        found_classes = []
        if mode == "embedding":
            found_classes = [id_to_classid[x] for (x, score) in most_sim(img_idx, topn=k+1)[1:]]
        elif mode == "random":
            found_classes = [id_to_classid[x] for x in random.sample(
                list(set(all_img_test_idx + all_img_train_idx) - {img_idx}), k)]
        elif mode == "image":
            found_classes = [id_to_classid[x] for (x, score) in most_sim(img_idx, topn=k+1, mode="image")[1:]]
        if image_class in found_classes:
            num_found += 1
    return num_found / len(test_ids)

In [ ]:
recall_k(k=10), recall_k(k=10, mode="random")

 Hard Negative Mining

We'll mine negatives based on previous epoch's model. To do so, we'll compute distances with all anchors, and sample among the most similar negatives, but not the too difficult ones


In [ ]:
# Naive way to compute all similarities between all images. May be optimized!
def build_similarities(conv, all_imgs):
    embs = conv.predict(all_imgs)
    embs = embs / np.linalg.norm(embs, axis=-1, keepdims=True)
    all_sims = np.dot(embs, embs.T)
    return all_sims

def intersect(a, b):
    return list(set(a) & set(b))

def build_negatives(anc_idxs, pos_idxs, similarities, neg_imgs_idx, num_retries=20):
    # If no similarities were computed, return a random negative
    if similarities is None:
        return random.sample(neg_imgs_idx,len(anc_idxs))
    final_neg = []
    # for each positive pair
    for (anc_idx, pos_idx) in zip(anc_idxs, pos_idxs):
        anchor_class = id_to_classid[anc_idx]
        #positive similarity
        sim = similarities[anc_idx, pos_idx]
        # find all negatives which are semi(hard)
        possible_ids = np.where((similarities[anc_idx] + 0.25) > sim)[0]
        possible_ids = intersect(neg_imgs_idx, possible_ids)
        appended = False
        for iteration in range(num_retries):
            if len(possible_ids) == 0:
                break
            idx_neg = random.choice(possible_ids)
            if id_to_classid[idx_neg] != anchor_class:
                final_neg.append(idx_neg)
                appended = True
                break
        if not appended:
            final_neg.append(random.choice(neg_imgs_idx))
    return final_neg

In [ ]:
class HardTripletGenerator(tf.keras.utils.Sequence):
    def __init__(self, Xa_train, Xp_train, batch_size, all_imgs, neg_imgs_idx, conv):
        self.batch_size = batch_size
        
        self.imgs = all_imgs
        self.Xa = Xa_train
        self.Xp = Xp_train
        self.num_samples = Xa_train.shape[0]
        self.neg_imgs_idx = neg_imgs_idx

        if conv:
            print("Pre-computing similarities...", end=" ")
            self.similarities = build_similarities(conv, self.imgs)
            print("Done!")
        else:
            self.similarities = None
        
    def __len__(self):
        return self.num_samples // self.batch_size
        
    def __getitem__(self, batch_index):
        low_index = batch_index * self.batch_size
        high_index = (batch_index + 1) * self.batch_size
        
        imgs_a = self.Xa[low_index:high_index]
        imgs_p = self.Xp[low_index:high_index]
        imgs_n = build_negatives(imgs_a, imgs_p, self.similarities, self.neg_imgs_idx)
        
        imgs_a = seq.augment_images(self.imgs[imgs_a])
        imgs_p = seq.augment_images(self.imgs[imgs_p])
        imgs_n = seq.augment_images(self.imgs[imgs_n])
        
        return ([imgs_a, imgs_p, imgs_n], np.zeros(shape=(imgs_a.shape[0])))

In [ ]:
batch_size = 128

In [ ]:
gen_hard = HardTripletGenerator(Xa_train, Xp_train, batch_size, all_imgs, all_img_train_idx, shared_conv)
len(gen_hard)

In [ ]:
[xa, xp, xn], y = gen_hard[0]
xa.shape, xp.shape, xn.shape

In [ ]:
plt.figure(figsize=(16, 9))

for i in range(5):
    plt.subplot(3, 5, i + 1)
    plt.title("anchor")
    plt.imshow((xa[i] + mean) / 255)
    plt.axis('off')
    
for i in range(5):
    plt.subplot(3, 5, i + 6)
    plt.title("positive")
    plt.imshow((xp[i] + mean) / 255)
    plt.axis('off')
    
for i in range(5):
    plt.subplot(3, 5, i + 11)
    plt.title("negative")
    plt.imshow((xn[i] + mean) / 255)
    plt.axis('off')
    
plt.show()

In [ ]:
class SharedConv2(tf.keras.Model):
    """Improved version of SharedConv"""
    def __init__(self):
        super().__init__(self, name="sharedconv2")
        
        self.conv1 = Conv2D(16, 3, activation="relu", padding="same")
        self.conv2 = Conv2D(16, 3, activation="relu", padding="same")
        self.pool1 = MaxPool2D((2,2)) # 30,30
        self.conv3 = Conv2D(32, 3, activation="relu", padding="same")
        self.conv4 = Conv2D(32, 3, activation="relu", padding="same")
        self.pool2 = MaxPool2D((2,2)) # 15,15
        self.conv5 = Conv2D(64, 3, activation="relu", padding="same")
        self.conv6 = Conv2D(64, 3, activation="relu", padding="same")
        self.pool3 = MaxPool2D((2,2)) # 8,8
        self.conv7 = Conv2D(64, 3, activation="relu", padding="same")
        self.conv8 = Conv2D(32, 3, activation="relu", padding="same")
        self.flatten = Flatten()
        self.dropout1 = Dropout(0.2)
        self.fc1 = Dense(64)
    
    def call(self, inputs):
        x = self.pool1(self.conv2(self.conv1(inputs)))
        x = self.pool2(self.conv4(self.conv3(x)))
        x = self.pool3(self.conv6(self.conv5(x)))
        x = self.flatten(self.conv8(self.conv7(x)))
        
        return self.fc1(self.dropout1(x))
    
tf.random.set_seed(1337)
shared_conv2 = SharedConv2()
model_triplet2 = TripletNetwork(shared_conv2)

opt = optimizers.SGD(lr=0.001, decay=1e-6, momentum=0.9, nesterov=True)
model_triplet2.compile(loss=identity_loss, optimizer=opt)

In [ ]:
gen_test = TripletGenerator(Xa_test, Xp_test, 32, all_imgs, all_img_test_idx)
len(gen_test)

In [ ]:
# At first epoch we don't generate hard triplets so that our model can learn the easy examples first
gen_hard = HardTripletGenerator(Xa_train, Xp_train, batch_size, all_imgs, all_img_train_idx, None)

Note that we are re-creating a HardTripletGenerator at each epoch. By doing so, we re-compute the new hard negatives with the newly updated model. On larger scale this operation can take a lot of time, and could be done every X epochs (X > 1).


In [ ]:
loss, val_loss = [], []

best_model_fname_hard = "triplet_checkpoint_hard.h5"
best_val_loss = float("inf")

nb_epochs = 10
for epoch in range(nb_epochs):
    print("built new hard generator for epoch " + str(epoch))
    
    history = model_triplet2.fit(
        gen_hard, 
        epochs=1,
        validation_data = gen_test)
    loss.extend(history.history["loss"])
    val_loss.extend(history.history["val_loss"])
    
    if val_loss[-1] < best_val_loss:
        print("Saving best model")
        model_triplet2.save_weights(best_model_fname_hard)
    
    gen_hard = HardTripletGenerator(Xa_train, Xp_train, batch_size, all_imgs, all_img_train_idx, shared_conv2)

In [ ]:
plt.plot(loss, label='train')
plt.plot(val_loss, label='validation')
plt.ylim(0, 0.5)
plt.legend(loc='best')
plt.title('Loss');

You should see that the train loss is barely improving while the validation loss is decreasing. Remember that we are feeding the hardest triplets to the model!


In [ ]:
emb = shared_conv2.predict(all_imgs)
emb = emb / np.linalg.norm(emb, axis=-1, keepdims=True)
recall_k(k=10), recall_k(k=10, mode="random")

Let's run the improved convnet SharedConv2 without negative hardming in order to have a fair comparison:


In [ ]:
shared_conv2_nohard = SharedConv2()
model_triplet2_nohard = TripletNetwork(shared_conv2_nohard)

opt = optimizers.SGD(lr=0.001, decay=1e-6, momentum=0.9, nesterov=True)
model_triplet2_nohard.compile(loss=identity_loss, optimizer=opt)

gen_nohard = HardTripletGenerator(Xa_train, Xp_train, batch_size, all_imgs, all_img_train_idx, None)
history = model_triplet2_nohard.fit_generator(
        generator=gen_nohard, 
        epochs=10,
        validation_data=gen_test)

In [ ]:
plt.plot(loss, label='train (hardmining)')
plt.plot(val_loss, label='validation (hardmining)')
plt.plot(history.history["loss"], label="train")
plt.plot(history.history["val_loss"], label="validation")
plt.ylim(0, 0.5)
plt.legend(loc='best')
plt.title('Loss hardmining vs no hardmining');

In [ ]:
emb = shared_conv2_nohard.predict(all_imgs)
emb = emb / np.linalg.norm(emb, axis=-1, keepdims=True)
recall_k(k=10), recall_k(k=10, mode="random")

In [ ]: