1.结合理论课内容,深入理解DenseNet、ResNeXt的结构与FaceNet的工作机制,理解Triplet loss的工作过程。
2.理解Residual Block的结构。
3.Triplet loss
2.利用Triplet loss训练一个简单的FaceNet网络,并用knn的方式进行分类预测。
在一个DenseBlock里面,每个非线性变换H输出的channels数为恒定的Growth_rate,那么第i层的输入的channels数便是k+(i+1)* Growth_rate, k为Input
的channels数,比如,假设我们把Growth_rate设为4,上图中H1的输入的size为8 32 32,输出为4 32 32, 则H2的输入的size为12 32 32,
输出还是4 32 32,H3、H4以此类推,在实验中,用较小的Growth_rate就能实现较好的效果。
请注意, 在一个DenseBlock里面,feature size并没有发生改变,因为需要对不同层的feature map进行concatenate操作,这需要保持相同的feature size。
因此在相邻的DenseBlock中间使用Down Sampling来增大感受野,即使用Transition Layer来实现,一般的Transition Layer包含BN、Conv和Avg_pool,
同时减少维度,压缩率(compress rate)通常为0.5, 即减少一半的维度。
例如,假设block1的输出c w h是24 32 32,那么经过transition之后,block2的输入就是12 16 16。
低至4 * Growth_rate。
In [1]:
# Load necessary modules here
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import os
class Bottleneck(nn.Module):
the above mentioned bottleneck, including two conv layer, one's kernel size is 1×1, another's is 3×3
after non-linear operation, concatenate the input to the output
def __init__(self, in_planes, growth_rate):
super(Bottleneck, self).__init__()
self.bn1 = nn.BatchNorm2d(in_planes)
self.conv1 = nn.Conv2d(in_planes, 4*growth_rate, kernel_size=1, bias=False)
self.bn2 = nn.BatchNorm2d(4*growth_rate)
self.conv2 = nn.Conv2d(4*growth_rate, growth_rate, kernel_size=3, padding=1, bias=False)
def forward(self, x):
out = self.conv1(F.relu(self.bn1(x)))
out = self.conv2(F.relu(self.bn2(out)))
# input and output are concatenated here
out = torch.cat([out,x], 1)
return out
class Transition(nn.Module):
transition layer is used for down sampling the feature
when compress rate is 0.5, out_planes is a half of in_planes
def __init__(self, in_planes, out_planes):
super(Transition, self).__init__()
self.bn = nn.BatchNorm2d(in_planes)
self.conv = nn.Conv2d(in_planes, out_planes, kernel_size=1, bias=False)
def forward(self, x):
out = self.conv(F.relu(self.bn(x)))
# use average pooling change the size of feature map here
out = F.avg_pool2d(out, 2)
return out
class DenseNet(nn.Module):
def __init__(self, block, nblocks, growth_rate=12, reduction=0.5, num_classes=10):
super(DenseNet, self).__init__()
block: bottleneck
nblock: a list, the elements is number of bottleneck in each denseblock
growth_rate: channel size of bottleneck's output
self.growth_rate = growth_rate
num_planes = 2*growth_rate
self.conv1 = nn.Conv2d(3, num_planes, kernel_size=3, padding=1, bias=False)
# a DenseBlock and a transition layer
self.dense1 = self._make_dense_layers(block, num_planes, nblocks[0])
num_planes += nblocks[0]*growth_rate
# the channel size is superposed, mutiply by reduction to cut it down here, the reduction is also known as compress rate
out_planes = int(math.floor(num_planes*reduction))
self.trans1 = Transition(num_planes, out_planes)
num_planes = out_planes
# a DenseBlock and a transition layer
self.dense2 = self._make_dense_layers(block, num_planes, nblocks[1])
num_planes += nblocks[1]*growth_rate
# the channel size is superposed, mutiply by reduction to cut it down here, the reduction is also known as compress rate
out_planes = int(math.floor(num_planes*reduction))
self.trans2 = Transition(num_planes, out_planes)
num_planes = out_planes
# a DenseBlock and a transition layer
self.dense3 = self._make_dense_layers(block, num_planes, nblocks[2])
num_planes += nblocks[2]*growth_rate
# the channel size is superposed, mutiply by reduction to cut it down here, the reduction is also known as compress rate
out_planes = int(math.floor(num_planes*reduction))
self.trans3 = Transition(num_planes, out_planes)
num_planes = out_planes
# only one DenseBlock
self.dense4 = self._make_dense_layers(block, num_planes, nblocks[3])
num_planes += nblocks[3]*growth_rate
# the last part is a linear layer as a classifier
self.bn = nn.BatchNorm2d(num_planes)
self.linear = nn.Linear(num_planes, num_classes)
def _make_dense_layers(self, block, in_planes, nblock):
layers = []
# number of non-linear transformations in one DenseBlock depends on the parameter you set
for i in range(nblock):
layers.append(block(in_planes, self.growth_rate))
in_planes += self.growth_rate
return nn.Sequential(*layers)
def forward(self, x):
out = self.conv1(x)
out = self.trans1(self.dense1(out))
out = self.trans2(self.dense2(out))
out = self.trans3(self.dense3(out))
out = self.dense4(out)
out = F.avg_pool2d(F.relu(self.bn(out)), 4)
out = out.view(out.size(0), -1)
out = self.linear(out)
return out
def densenet():
return DenseNet(Bottleneck, [2, 5, 4, 6])
In [2]:
conv1: 1
dense1: 2 2 = 4
trans1: 1
dense2: 2 5 = 10
trans2: 1
dense3: 2 4 = 8
trans3: 1
dense4: 2 6 = 12
linear: 1
In [2]:
def densenet52():
return DenseNet(Bottleneck, [6, 6, 6, 6])
In [4]:
In [15]:
import torchvision
import torchvision.transforms as transforms
from torch.autograd import Variable
def train(epoch, model, lossFunction, optimizer, device, trainloader):
"""train model using loss_fn and optimizer. When this function is called, model trains for one epoch.
train_loader: train data
model: prediction model
loss_fn: loss function to judge the distance between target and outputs
optimizer: optimize the loss function
get_grad: True, False
total_loss: loss
average_grad2: average grad for hidden 2 in this epoch
average_grad3: average grad for hidden 3 in this epoch
print('\nEpoch: %d' % epoch)
model.train() # enter train mode
train_loss = 0 # accumulate every batch loss in a epoch
correct = 0 # count when model' prediction is correct i train set
total = 0 # total number of prediction in train set
for batch_idx, (inputs, targets) in enumerate(trainloader):
inputs, targets = inputs.to(device), targets.to(device) # load data to gpu device
inputs, targets = Variable(inputs), Variable(targets)
optimizer.zero_grad() # clear gradients of all optimized torch.Tensors'
outputs = model(inputs) # forward propagation return the value of softmax function
loss = lossFunction(outputs, targets) #compute loss
loss.backward() # compute gradient of loss over parameters
optimizer.step() # update parameters with gradient descent
train_loss += loss.item() # accumulate every batch loss in a epoch
_, predicted = outputs.max(1) # make prediction according to the outputs
total += targets.size(0)
correct += predicted.eq(targets).sum().item() # count how many predictions is correct
if (batch_idx+1) % 100 == 0:
# print loss and acc
print( 'Train loss: %.3f | Train Acc: %.3f%% (%d/%d)'
% (train_loss/(batch_idx+1), 100.*correct/total, correct, total))
print( 'Train loss: %.3f | Train Acc: %.3f%% (%d/%d)'
% (train_loss/(batch_idx+1), 100.*correct/total, correct, total))
def test(model, lossFunction, optimizer, device, testloader):
test model's prediction performance on loader.
When thid function is called, model is evaluated.
loader: data for evaluation
model: prediction model
loss_fn: loss function to judge the distance between target and outputs
global best_acc
model.eval() #enter test mode
test_loss = 0 # accumulate every batch loss in a epoch
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (inputs, targets) in enumerate(testloader):
inputs, targets = inputs.to(device), targets.to(device)
outputs = model(inputs)
loss = lossFunction(outputs, targets) #compute loss
test_loss += loss.item() # accumulate every batch loss in a epoch
_, predicted = outputs.max(1) # make prediction according to the outputs
total += targets.size(0)
correct += predicted.eq(targets).sum().item() # count how many predictions is correct
# print loss and acc
print('Test Loss: %.3f | Test Acc: %.3f%% (%d/%d)'
% (test_loss/(batch_idx+1), 100.*correct/total, correct, total))
def data_loader():
# define method of preprocessing data for evaluating
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
# Normalize a tensor image with mean and standard variance
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
transform_test = transforms.Compose([
# Normalize a tensor image with mean and standard variance
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
# prepare dataset by ImageFolder, data should be classified by directory
trainset = torchvision.datasets.ImageFolder(root='./mnist/train', transform=transform_train)
testset = torchvision.datasets.ImageFolder(root='./mnist/test', transform=transform_test)
# Data loader.
# Combines a dataset and a sampler,
trainloader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=32, shuffle=False)
return trainloader, testloader
def run(model, num_epochs):
# load model into GPU device
device = 'cuda:2' if torch.cuda.is_available() else 'cpu'
if device == 'cuda:2':
model = torch.nn.DataParallel(model)
cudnn.benchmark = True
# define the loss function and optimizer
lossFunction = nn.CrossEntropyLoss()
lr = 0.01
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
trainloader, testloader = data_loader()
for epoch in range(num_epochs):
train(epoch, model, lossFunction, optimizer, device, trainloader)
test(model, lossFunction, optimizer, device, testloader)
if (epoch + 1) % 50 == 0 :
lr = lr / 10
for param_group in optimizer.param_groups:
param_group['lr'] = lr
In [6]:
# start training and testing
model = densenet52()
# num_epochs is adjustable
run(model, num_epochs=20)
cardinality, 指的是repeat layer的个数,下图右边cardinality为32。左图是ResNet的基本结构,输入channel size为64,右图是ResNeXt的基本结构,
输入channel size是128,但两者具有相近的参数量。
有三种等价的ResNeXt Block,如下图,a是ResNeXt基本单元,如果把输出那里的1x1合并到一起,得到等价网络b拥有和Inception-ResNet相似的结构,
下图表示ResNeXt-50(32x4d)的网络结构,卷积层和全连接层总数为50层,32表示的是cardinality,4d表示每一个repeat layer的channel数为4,所以整个block的通道数是32x4=128.
In [4]:
class Block(nn.Module):
Grouped convolution block(c).
expansion = 2
def __init__(self, in_planes, cardinality=32, bottleneck_width=4, stride=1):
in_planes: channel size of input
cardinality: number of groups
bottleneck_width: channel size of each group
super(Block, self).__init__()
group_width = cardinality * bottleneck_width
self.conv1 = nn.Conv2d(in_planes, group_width, kernel_size=1, bias=False)
self.bn1 = nn.BatchNorm2d(group_width)
# divide into 32 groups which 32 is cardinality
self.conv2 = nn.Conv2d(group_width, group_width, kernel_size=3, stride=stride, padding=1, groups=cardinality, bias=False)
self.bn2 = nn.BatchNorm2d(group_width)
self.conv3 = nn.Conv2d(group_width, self.expansion*group_width, kernel_size=1, bias=False)
self.bn3 = nn.BatchNorm2d(self.expansion*group_width)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion*group_width:
self.shortcut = nn.Sequential(
nn.Conv2d(in_planes, self.expansion*group_width, kernel_size=1, stride=stride, bias=False),
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = F.relu(self.bn2(self.conv2(out)))
out = self.bn3(self.conv3(out))
out += self.shortcut(x)
out = F.relu(out)
return out
In [5]:
class ResNeXt(nn.Module):
def __init__(self, num_blocks, cardinality, bottleneck_width, num_classes=10):
num_blocks: list type, channel size of input
cardinality: number of groups
bottleneck_width: channel size of each group
super(ResNeXt, self).__init__()
self.cardinality = cardinality
self.bottleneck_width = bottleneck_width
self.in_planes = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
# size 32x32
self.layer1 = self._make_layer(num_blocks[0], 1)
# size 32x32
self.layer2 = self._make_layer(num_blocks[1], 2)
# size 16x16
self.layer3 = self._make_layer(num_blocks[2], 2)
# size 8x8
self.linear = nn.Linear(cardinality*bottleneck_width*8, num_classes)
def _make_layer(self, num_blocks, stride):
strides = [stride] + [1]*(num_blocks-1)
layers = []
for stride in strides:
layers.append(Block(self.in_planes, self.cardinality, self.bottleneck_width, stride))
self.in_planes = Block.expansion * self.cardinality * self.bottleneck_width
# Increase bottleneck_width by 2 after each stage.
self.bottleneck_width *= 2
return nn.Sequential(*layers)
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = F.avg_pool2d(out, 8)
out = out.view(out.size(0), -1)
out = self.linear(out)
return out
In [10]:
ResNeXt32_16x8d = ResNeXt([2, 3, 4], 16, 8)
run(ResNeXt32_16x8d, num_epochs=20)
有关FaceNet与triplet loss的理论知识请同学们复习理论课有关章节。在这里,我们将用triplet loss训练一个resnet18网络,并用这个网络在mnist数据集上进行KNN分类,具体的,resnet18相当于一个特征提取器,用所有的训练集图片的特征拟合一个KNN分类器,利用这个KNN分类进行预测. 在3.1小节,将给出triplet loss的实现. 3.2小节将实现一个适用于triplet loss训练的resnet18网络. 3.3小节将实现随机选取triplet的dataset, 3.4、3.5小节将分别实现resnet18的训练与测试函数.
facenet的作用是将图像嵌入一个d维的空间,在这个d维空间里,同一类图像的特征之间相隔的近,不同类图像的特征之间相隔的远,这个d我们称之为embedding size
In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.backends.cudnn as cudnn
import os
import torchvision
import torchvision.transforms as transforms
from torch.autograd import Variable,Function
class PairwiseDistance(Function):
compute distance of the embedding features, p is norm, when p is 2, then return L2-norm distance
def __init__(self, p):
super(PairwiseDistance, self).__init__()
self.norm = p
def forward(self, x1, x2):
eps = 1e-6 # in case of zeros
diff = torch.abs(x1 - x2) # subtraction
out = torch.pow(diff, self.norm).sum(dim=1) # square
return torch.pow(out + eps, 1. / self.norm) # L-p norm
class TripletLoss(Function):
Triplet loss function.
loss = max(diatance(a,p) - distance(a,n) + margin, 0)
forward method:
anchor, positive, negative
triplet loss
def __init__(self, margin, num_classes=10):
super(TripletLoss, self).__init__()
self.margin = margin
self.num_classes = num_classes
self.pdist = PairwiseDistance(2) # to calculate distance
def forward(self, anchor, positive, negative):
d_p = self.pdist.forward(anchor, positive) # distance of anchor and positive
d_n = self.pdist.forward(anchor, negative) # distance of anchor and negative
dist_hinge = torch.clamp(self.margin + d_p - d_n, min=0.0) # ensure loss is no less than zero
loss = torch.mean(dist_hinge)
return loss
In [7]:
class BasicBlock(nn.Module):
resnet basic block.
one block includes two conv layer and one residual
expansion = 1
def __init__(self, in_planes, planes, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(planes)
self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(planes)
self.shortcut = nn.Sequential()
if stride != 1 or in_planes != self.expansion*planes:
self.shortcut = nn.Sequential(
nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
out += self.shortcut(x)
out = F.relu(out)
return out
class ResNetTriplet(nn.Module):
def __init__(self, block, num_blocks, embedding_size=256, num_classes=10):
super(ResNetTriplet, self).__init__()
self.in_planes = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
# feature map size 32x32
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
# feature map size 32x32
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
# feature map size 16x16
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
# feature map size 8x8
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
# feature map size 4x4
# as we use resnet basic block, the expansion is 1
self.linear = nn.Linear(512*block.expansion, embedding_size)
def _make_layer(self, block, planes, num_blocks, stride):
strides = [stride] + [1]*(num_blocks-1)
layers = []
for stride in strides:
layers.append(block(self.in_planes, planes, stride))
self.in_planes = planes * block.expansion
return nn.Sequential(*layers)
def l2_norm(self,input):
input_size = input.size()
buffer = torch.pow(input, 2)
normp = torch.sum(buffer, 1).add_(1e-10)
norm = torch.sqrt(normp)
_output = torch.div(input, norm.view(-1, 1).expand_as(input))
output = _output.view(input_size)
return output
def forward(self, x):
out = F.relu(self.bn1(self.conv1(x)))
out = self.layer1(out)
out = self.layer2(out)
out = self.layer3(out)
out = self.layer4(out)
out = F.avg_pool2d(out, 4)
out = out.view(out.size(0), -1)
out = self.linear(out)
# normalize the features, then we set margin easily
self.features = self.l2_norm(out)
# multiply by alpha = 10 as suggested in https://arxiv.org/pdf/1703.09507.pdf
alpha = 10
self.features = self.features * alpha
# here we get the 256-d features, next we use those features to make prediction
return self.features
def ResNet18(embedding_size=256, num_classes=10):
return ResNetTriplet(BasicBlock, [2,2,2,2], embedding_size, num_classes)
In [8]:
import numpy as np
import pandas as pd
def test_modify():
def generate_triplets(df, num_triplets, pos_class, neg_class):
def make_dictionary_for_pic_class(df):
pic_classes = dict()
for idx, label in enumerate(df['class']):
if label not in pic_classes:
pic_classes[label] = []
pic_classes[label].append(df.iloc[idx, 0])
return pic_classes
triplets = []
pic_classes = make_dictionary_for_pic_class(df)
for _ in range(num_triplets):
pos_name = df.loc[df['class'] == pos_class, 'name'].values[0] # get positive class's name
neg_name = df.loc[df['class'] == neg_class, 'name'].values[0] # get negative class's name
if len(pic_classes[pos_class]) == 2:
ianc, ipos = np.random.choice(2, size = 2, replace = False)
ianc = np.random.randint(0, len(pic_classes[pos_class])) # random choose anchor
ipos = np.random.randint(0, len(pic_classes[pos_class])) # random choose positive
while ianc == ipos:
ipos = np.random.randint(0, len(pic_classes[pos_class]))
ineg = np.random.randint(0, len(pic_classes[neg_class])) # random choose negative
triplets.append([pic_classes[pos_class][ianc], pic_classes[pos_class][ipos], pic_classes[neg_class][ineg],
pos_class, neg_class, pos_name, neg_name])
return triplets
df = pd.read_csv('./mnist/train.csv')
print(generate_triplets(df, 5, 8, 0))
In [9]:
import numpy as np
import pandas as pd
import torch
from PIL import Image
from torch.utils.data import Dataset
class TripletFaceDataset(Dataset):
def __init__(self, root_dir, csv_name, num_triplets, transform = None):
randomly select triplet,which means anchor,positive and negative are all selected randomly.
root_dir : dir of data set
csv_name : dir of train.csv
num_triplets: total number of triplets
self.root_dir = root_dir
self.df = pd.read_csv(csv_name)
self.num_triplets = num_triplets
self.transform = transform
self.training_triplets = self.generate_triplets(self.df, self.num_triplets)
def generate_triplets(df, num_triplets):
def make_dictionary_for_pic_class(df):
make csv to the format that we want
- pic_classes = {'class0': [class0_id0, ...], 'class1': [class1_id0, ...], ...}
pic_classes = dict()
for idx, label in enumerate(df['class']):
if label not in pic_classes:
pic_classes[label] = []
pic_classes[label].append(df.iloc[idx, 0])
return pic_classes
triplets = []
classes = df['class'].unique()
pic_classes = make_dictionary_for_pic_class(df)
for _ in range(num_triplets):
- randomly choose anchor, positive and negative images for triplet loss
- anchor and positive images in pos_class
- negative image in neg_class
- at least, two images needed for anchor and positive images in pos_class
- negative image should have different class as anchor and positive images by definition
pos_class = np.random.choice(classes) # random choose positive class
neg_class = np.random.choice(classes) # random choose negative class
while len(pic_classes[pos_class]) < 2:
pos_class = np.random.choice(classes)
while pos_class == neg_class:
neg_class = np.random.choice(classes)
pos_name = df.loc[df['class'] == pos_class, 'name'].values[0] # get positive class's name
neg_name = df.loc[df['class'] == neg_class, 'name'].values[0] # get negative class's name
if len(pic_classes[pos_class]) == 2:
ianc, ipos = np.random.choice(2, size = 2, replace = False)
ianc = np.random.randint(0, len(pic_classes[pos_class])) # random choose anchor
ipos = np.random.randint(0, len(pic_classes[pos_class])) # random choose positive
while ianc == ipos:
ipos = np.random.randint(0, len(pic_classes[pos_class]))
ineg = np.random.randint(0, len(pic_classes[neg_class])) # random choose negative
triplets.append([pic_classes[pos_class][ianc], pic_classes[pos_class][ipos], pic_classes[neg_class][ineg],
pos_class, neg_class, pos_name, neg_name])
return triplets
def __getitem__(self, idx):
anc_id, pos_id, neg_id, pos_class, neg_class, pos_name, neg_name = self.training_triplets[idx]
anc_img = os.path.join(self.root_dir, str(pos_name), str(anc_id) + '.png') # join the path of anchor
pos_img = os.path.join(self.root_dir, str(pos_name), str(pos_id) + '.png') # join the path of positive
neg_img = os.path.join(self.root_dir, str(neg_name), str(neg_id) + '.png') # join the path of nagetive
anc_img = Image.open(anc_img).convert('RGB') # open the anchor image
pos_img = Image.open(pos_img).convert('RGB') # open the positive image
neg_img = Image.open(neg_img).convert('RGB') # open the negative image
pos_class = torch.from_numpy(np.array([pos_class]).astype('long')) # make label transform the type we want
neg_class = torch.from_numpy(np.array([neg_class]).astype('long')) # make label transform the type we want
data = [anc_img, pos_img,neg_img]
label = [pos_class, pos_class, neg_class]
if self.transform:
data = [self.transform(img) # preprocessing the image
for img in data]
return data, label
def __len__(self):
return len(self.training_triplets)
In [10]:
import torchvision.transforms as transforms
def train_facenet(epoch, model, optimizer, margin, num_triplets):
# preprocessing function for image
transform = transforms.Compose([
mean=np.array([0.4914, 0.4822, 0.4465]),
std=np.array([0.2023, 0.1994, 0.2010])),
# get dataset of triplet
# num_triplet is adjustable
train_set = TripletFaceDataset(root_dir = './mnist/train',
csv_name = './mnist/train.csv',
num_triplets = num_triplets,
transform = transform)
train_loader = torch.utils.data.DataLoader(train_set,
batch_size = 16,
shuffle = True)
total_loss = 0.0
for batch_idx, (data, target) in enumerate(train_loader):
# load data to gpu
data[0], target[0] = data[0].cuda(2), target[0].cuda(2) # anchor to cuda
data[1], target[1] = data[1].cuda(2), target[1].cuda(2) # positive to cuda
data[2], target[2] = data[2].cuda(2), target[2].cuda(2) # negative to cuda
data[0], target[0] = Variable(data[0]), Variable(target[0]) # anchor
data[1], target[1] = Variable(data[1]), Variable(target[1]) # positive
data[2], target[2] = Variable(data[2]), Variable(target[2]) # negative
# zero setting the grad
# forward
anchor = model.forward(data[0])
positive = model.forward(data[1])
negative = model.forward(data[2])
# margin is adjustable
loss = TripletLoss(margin=margin, num_classes=10).forward(anchor, positive, negative) # get triplet loss
total_loss += loss.item()
# back-propagating
context = 'Train Epoch: {} [{}/{}], Average loss: {:.4f}'.format(
epoch, len(train_loader.dataset), len(train_loader.dataset), total_loss / len(train_loader))
关于如何测试的问题,由于triplet loss训练的resnet18网络没有分类器,这个网络的最后一层的输出是一个维度为embedding_size的向量,我们把它当作由模型提取出的特征,所以利用这个特征来做测试。首先保存下训练集上所有图片的特征和标签,用sklearn库的KNeighborsClassifier()拟合成一个KNN分类器,这里的K表示领域的个数,K是一个可调节的参数,在测试集上做验证时,提取图片的特征用KNN分类器做预测即可。
答: 首先将训练集中的数据按标签类别分类,然后将同一类数据归在一起,并计算每一类数据的均值点。在一类数据中,根据该均值点选择最近的一张或多张图片,然后每个类别都可以得到相应的代表图片。将测试集的位置与这些类别代表图片的位置对比,选择最近的图片,其标签就是预测的标签。
In [11]:
from sklearn import neighbors
def KNN_classifier(model, epoch, n_neighbors):
use all train set data to make KNN classifier
# preprocessing function for image
transform = transforms.Compose([
mean=np.array([0.485, 0.456, 0.406]),
std=np.array([0.229, 0.224, 0.225])),
# prepare dataset by ImageFolder, data should be classified by directory
train_set = torchvision.datasets.ImageFolder(root='./mnist/train', transform=transform)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=32, shuffle=False)
features, labels =[], [] # store features and labels
for i, (data, target) in enumerate(train_loader):
# load data to gpu
data, target = data.cuda(2), target.cuda(2)
data, target = Variable(data), Variable(target)
# forward
output = model(data)
# get features and labels to make knn classifier
# n_neighbor is adjustable
clf = neighbors.KNeighborsClassifier(n_neighbors=n_neighbors)
clf.fit(features, labels)
return clf
def test_facenet(epoch, model, clf, test = True):
# preprocessing function for image
transform = transforms.Compose([
mean=np.array([0.485, 0.456, 0.406]),
std=np.array([0.229, 0.224, 0.225])),
# prepare dataset by ImageFolder, data should be classified by directory
test_set = torchvision.datasets.ImageFolder(root = './mnist/test' if test else './mnist/train', transform = transform)
test_loader = torch.utils.data.DataLoader(test_set, batch_size = 32, shuffle = True)
correct, total = 0, 0
for i, (data, target) in enumerate(test_loader):
# load data to gpu
data, target = data.cuda(2), target.cuda(2)
data, target = Variable(data), Variable(target)
# forward
output = model.forward(data)
# predict by knn classifier
predicted = clf.predict(output.data.cpu().numpy())
correct += (torch.tensor(predicted) == target.data.cpu()).sum()
total += target.size(0)
context = 'Accuracy of model in ' + ('test' if test else 'train') + \
' set is {}/{}({:.2f}%)'.format(correct, total, 100. * float( correct) / float(total))
In [12]:
def run_facenet():
# hyper parameter
lr = 0.01
margin = 2.0
num_triplets = 1000
n_neighbors = 5
embedding_size = 128
# embedding_size is adjustable
model = ResNet18(embedding_size, 10)
# load model into GPU device
device = torch.device('cuda:0')
model = model.to(device)
if device == 'cuda':
model = torch.nn.DataParallel(model)
cudnn.benchmark = True
# define the optimizer, lr、momentum、weight_decay is adjustable
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
print('start training')
for epoch in range(num_epochs):
train_facenet(epoch, model, optimizer, margin, num_triplets) # train resnet18 with triplet loss
clf = KNN_classifier(model, epoch, n_neighbors) # get knn classifier
test_facenet(epoch, model, clf, False) # validate train set
test_facenet(epoch, model, clf, True) # validate test set
if (epoch + 1) % 4 == 0 :
lr = lr / 3
for param_group in optimizer.param_groups:
param_group['lr'] = lr
In [10]:
In [19]:
import pandas
import matplotlib.pyplot as plt
def run_facenet18():
# hyper parameter
lr = 0.01
margin = 0.7
num_triplets = 4000
n_neighbors = 10
embedding_size = 256
# embedding_size is adjustable
model = ResNet18(embedding_size, 10)
# load model into GPU device
device = torch.device('cuda:2')
model = model.to(device)
if device == 'cuda':
model = torch.nn.DataParallel(model)
cudnn.benchmark = True
# define the optimizer, lr、momentum、weight_decay is adjustable
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=5e-4)
print('start training')
for epoch in range(num_epochs):
train_facenet(epoch, model, optimizer, margin, num_triplets) # train resnet18 with triplet loss
clf = KNN_classifier(model, epoch, n_neighbors) # get knn classifier
test_facenet(epoch, model, clf, False) # validate train set
test_facenet(epoch, model, clf, True) # validate test set
if (epoch + 1) % 4 == 0 :
lr = lr / 3
for param_group in optimizer.param_groups:
param_group['lr'] = lr
return model
In [21]:
model = run_facenet18()
In [34]:
def show_sample(model, n_neighbors):
frame_train = pandas.read_csv('./mnist/train.csv')
frame_test = pandas.read_csv('./mnist/test.csv')
# preprocessing function for image
transform = transforms.Compose([
mean=np.array([0.485, 0.456, 0.406]),
std=np.array([0.229, 0.224, 0.225])),
clf = KNN_classifier(model, 0, n_neighbors)
train_set = torchvision.datasets.ImageFolder(root='./mnist/train', transform=transform)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=32, shuffle=False)
test_set = torchvision.datasets.ImageFolder(root='./mnist/test', transform=transform)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=32, shuffle=False)
features =[]
for i, (data, target) in enumerate(train_loader):
data, target = data.cuda(2), target.cuda(2)
data, target = Variable(data), Variable(target)
output = model(data)
tlen = len(features) // 10;
dis = PairwiseDistance(2)
index = 0
for i, (data, target) in enumerate(test_loader):
data, target = data.cuda(2), target.cuda(2)
data, target = Variable(data), Variable(target)
output = model.forward(data)
predicted = clf.predict(output.data.cpu().numpy())
for j in range(len(output.data.cpu().numpy())):
if predicted[j] != target[j]:
class_t = target[j]
class_f = predicted[j]
dis_t_min = None
dis_f_min = None
for k in range(tlen):
dis_t = dis.forward(torch.from_numpy(features[class_t * tlen + k]).unsqueeze(0), output.data.cpu()[j].unsqueeze(0))
if dis_t_min is None or dis_t_min > dis_t:
dis_t_min = dis_t
dis_t_i = class_t * tlen + k
for k in range(tlen):
dis_f = dis.forward(torch.from_numpy(features[class_f * tlen + k]).unsqueeze(0), output.data.cpu()[j].unsqueeze(0))
if dis_f_min is None or dis_f_min > dis_f:
dis_f_min = dis_f
dis_f_i = class_f * tlen + k
# 错误图片
image_path = os.path.join('./mnist/test', str(frame_test['name'][i * 32 + j]), str(frame_test['id'][i * 32 + j]) + '.png')
image = Image.open(image_path).convert('RGB')
image.save('./output/error_'+str(index)+'_'+str(frame_test['name'][i * 32 + j])+'_'+str(frame_test['id'][i * 32 + j])+'.png', 'png')
# 与目标同类型的
image_path = os.path.join('./mnist/train', str(frame_train['name'][dis_t_i]), str(frame_train['id'][dis_t_i]) + '.png')
image_t = Image.open(image_path).convert('RGB')
image_t.save('./output/error_'+str(index)+'_'+str(frame_train['name'][dis_t_i])+'_'+str(frame_train['id'][dis_t_i])+'_t.png', 'png')
# 与预测同类型的
image_path = os.path.join('./mnist/train', str(frame_train['name'][dis_f_i]), str(frame_train['id'][dis_f_i]) + '.png')
image_f = Image.open(image_path).convert('RGB')
image_f.save('./output/error_'+str(index)+'_'+str(frame_train['name'][dis_f_i])+'_'+str(frame_train['id'][dis_f_i])+'_f.png', 'png')
plt.subplot(1, 3, 1)
plt.title('error_'+str(index)+'_'+str(frame_test['name'][i * 32 + j])+'_'+str(frame_test['id'][i * 32 + j]))
plt.subplot(1, 3, 2)
plt.subplot(1, 3, 3)
index += 1
show_sample(model, 20)
In [29]:
import pandas
frame = pandas.read_csv('./mnist/train.csv')
train_set = torchvision.datasets.ImageFolder(root='./mnist/train', transform=transform)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=64, shuffle=False)
features =[]
for i, (data, target) in enumerate(train_loader):
data, target = data.cuda(), target.cuda()
data, target = Variable(data), Variable(target)
output = model(data)
for index in range(len(features)):
image_path = os.path.join('./mnist', str(frame['name'][index]), str(frame['id'][index]) + '.png')
image = Image.open(image_path).convert('RGB')
Triplet loss的性能与采样方式有很大的关系,这里简述两种hard-triplet的采样方式,batch-hard与semi-hard。
与batch-hard不同,semi-hard triplet只需要保证minibatch中anchor到positive的距离小于anchor到negative的距离即为semi-hard,见下图,不需要选出minibatch里面距离anchor最远的负样本
In [ ]:
# add your batch-hard and semi-hard code here and test them