In [1]:
from mxnet import contrib, image, nd,gluon,autograd, init
from mxnet.gluon import loss as gloss, nn
import mxnet as mx
import numpy as np
In [2]:
num_classes = 3 #类别数目
max_obj_per_image = 1 #每幅图中目标个数最大值
obj_scales = [0.1,0.2,0.3,0.4] #目标尺寸 wrt image 关系到anchor size的选择
num_maps = 5 #使用多少个feature map做预测
max_scale = np.asarray(obj_scales).max()
min_scale = np.asarray(obj_scales).min()
step_scale = (max_scale - min_scale) * 1.0 / num_maps
anchor_sizes = [] #一种设置anchor size的方法:平均划分scale区域,分辨率大的特征图上检测小目标,分辨率小的特征图检测大目标
for step in range(num_maps):
s0 = min_scale + step_scale * step
s1 = s0 + step_scale
anchor_sizes.append( [s0,s1] )
#anchor_sizes = [[0.175, 0.225], [0.175, 0.225], [0.275,0.325], [0.275,0.325],[0.275,0.325]]
#anchor_sizes = [[0.2, 0.272], [0.37, 0.447], [0.54, 0.619], [0.71, 0.79],[0.88, 0.961]]
anchor_ratios = [[1, 2, 0.5]] * 5
batch_size = 5
net_name = 'ssd_for_pikachu.params'
print(anchor_sizes)
In [3]:
from PIL import Image,ImageDraw
import numpy as np
import random
import copy,os,pdb
from mxnet.gluon import data as gdata
img_background = Image.open('images/background.jpg')
img_sprit_1 = Image.open('images/sprits_1.jpg')
img_sprit_2 = Image.open('images/sprits_2.jpg')
outdir = 'data/temp'
if not os.path.exists(outdir):
os.makedirs(outdir)
def random_crop(img, resize=(512,512)):
W,H = img.size
x0,y0,x1,y1 = random.randint(0,W//3), random.randint(0,H//3), random.randint(W*2//3, W), random.randint(H*2//3, H)
crop = img.crop((x0,y0,x1,y1))
#print(x0,x1,y0,y1)
crop = crop.resize(resize)
#crop.save('crop.jpg')
return crop
def paste_it(background, sprit, mask):
W,H = background.size
W1,H1 = sprit.size
s1 = random.randint(0,len(obj_scales) - 1)
x1,y1,s1 = random.randint(10,W-10), random.randint(10,H-10), obj_scales[s1] * W / W1
s1 = sprit.resize( (int(W1*s1), int(H1*s1)), Image.BILINEAR )
if random.randint(0,2) == 0:
s1 = s1.transpose(Image.FLIP_LEFT_RIGHT)
x2,y2 = x1 + s1.size[0], y1 + s1.size[1]
if x2 >= W or y2 >= H or x1 < 0 or y1 < 0:
return background,None
if mask[y1:y2, x1:x2].sum() > 10:
return background,None
mask[y1:y2,x1:x2] = 1
result = copy.deepcopy(background)
result.paste(s1,(x1,y1,x2,y2))
return result,(x1/W,y1/H,x2/W,y2/H)
W,H = img_background.size
Xs, Ys = [], []
for num in range(512):
mask = np.zeros((H,W))
#img_compose = copy.deepcopy(img_background)
img_compose = random_crop(img_background)
Y = []
for k in range(max_obj_per_image):
rnd = random.randint(0,2)
if num_classes == 1:
rnd = 0
if rnd == 0:
img_compose,sprit_1 = paste_it(img_compose,img_sprit_1,mask)
if sprit_1 is not None:
Y.append( (0,) + sprit_1 )
else:
img_compose,sprit_2 = paste_it(img_compose,img_sprit_2,mask)
if sprit_2 is not None:
Y.append( (1,) + sprit_2 )
if len(Y) < 1:
continue #skip image without object
Ys.append(Y)
outpath = os.path.join(outdir,"%d.jpg"%num)
Xs.append(outpath)
img_compose.save(outpath)
#显示anchor是否可以覆盖目标
outpath = os.path.join(outdir,'%d_mark.jpg'%num)
img_compose = img_compose.resize((256,256))
WW,HH = img_compose.size
draw = ImageDraw.Draw(img_compose)
for rect in Y:
cls, x0,y0,x1,y1 = rect
for sizes in anchor_sizes:
for sz in sizes:
w,h = WW * sz, HH * sz
cx,cy = (x1+x0)*WW / 2, (y0 + y1)*HH/2
left,top,right,bottom = cx-w/2, cy-h/2, cx+w/2, cy+h/2
draw.rectangle(((left,top),(right,bottom)),fill=None,outline=(255,0,0))
img_compose.save(outpath)
if 0:
pkchu_Ys, pkchu_Xs = [], []
with open('data/pikachu/dataset/annotations.txt','r') as f:
for line in f:
path,cls,x0,y0,x1,y1 = line.strip().split(' ')
path = os.path.join('data/pikachu/dataset/images/',path)
pkchu_Xs.append(path)
cls,x0,y0,x1,y1 = int(float(cls)), float(x0), float(y0), float(x1), float(y1)
Y = [ (cls,x0,y0,x1,y1) ] #one object each image
pkchu_Ys.append( Y )
img = Image.open(path)
W,H = img.size
x0,y0,x1,y1 = x0*W, y0*H, x1*W, y1*H
draw = ImageDraw.Draw(img)
draw.rectangle(((x0,y0),(x1,y1)),0,255)
img.save(path + ".marked.jpg")
#Xs,Ys = pkchu_Xs, pkchu_Ys
class ToySet(gdata.Dataset):
def __init__(self,fortrain,path_list, label_list):
super(ToySet,self).__init__()
self.data_list = []
self.fortrain = fortrain
self.resize = (256,256)
self.max_obj_per_image = max_obj_per_image
for path, labels in zip(path_list, label_list):
if len(labels) < 1:
continue
self.data_list.append((path,labels))
return
def __len__(self):
return len(self.data_list)
def __getitem__(self,idx):
path = self.data_list[idx][0]
labels = self.data_list[idx][1]
img = Image.open(path)
img = img.resize(self.resize, Image.BILINEAR)
img = np.array(img)
H,W,_ = img.shape
if 0:
bbox = labels[0][1:5]
print(bbox * np.array([W,H,W,H]))
fig = plt.imshow(img)
rect = plt.Rectangle(xy=(bbox[0]*W, bbox[1]*H), width=bbox[2]*W-bbox[0]*W,
height=bbox[3]*H-bbox[1]*H, fill=False, edgecolor='r',linewidth=2)
fig.axes.add_patch(rect)
img = np.transpose(img,(2,0,1)).astype(np.float32) / 1.0
img = nd.array(img)
if len(labels) < self.max_obj_per_image: #label的第二个维度表示训练集中每个图中最大目标个数
for k in range(self.max_obj_per_image - len(labels)):
labels.append([-1,-1,-1,-1,-1])
labels = np.vstack(labels)
labels = nd.array(labels)
return img,labels
trainset = ToySet(True,Xs,Ys)
train_iter = gdata.DataLoader(trainset,batch_size,shuffle=True,last_batch="rollover")
print('samples in total: ',len(trainset))
In [4]:
#这个下采样层既用在backbone中,也用在后续的检测模块中
def down_sample_blk(num_channels):
blk = nn.Sequential()
for _ in range(2):
blk.add(nn.Conv2D(num_channels, kernel_size=3, padding=1),
nn.BatchNorm(in_channels=num_channels),
nn.Activation('relu'))
blk.add(nn.MaxPool2D(2))
return blk
#backbone
def base_net():
blk = nn.Sequential()
filers_each_layer = [16, 32, 64]
for num_filters in filers_each_layer:
blk.add(down_sample_blk(num_filters))
return blk
#完整的特征提取模块
#0: backbone
#1,2,3: 三个下采样层
#4:一个global max pool层
def get_blk(i):
if i == 0:
blk = base_net()
elif i == 4:
blk = nn.GlobalMaxPool2D()
else:
blk = down_sample_blk(128)
return blk
In [5]:
#类别预测层,这里用一个卷积层实现
#其输入输出层的w/h必须一致
#输出通道数C等于(anchor个数)x(类别数+1),这里类别不包括背景,+1就是背景类
def cls_predictor(num_anchors, num_classes):
return nn.Conv2D(num_anchors * (num_classes + 1), kernel_size=3,
padding=1)
#边框预测层,也用一个卷积层实现
#输入输出的w/h必须一致
#输出通道数C等于4x(anchor个数)
def bbox_predictor(num_anchors):
return nn.Conv2D(num_anchors * 4, kernel_size=3, padding=1)
#不同层预测出来的类别和边框的尺寸是不一样的,下面定义了一种合并方式
#每一层输出的是shape是(batch,C,H,W),不同层的输出,只有batch是一致的,其他三个值都不一样
#下面的函数把(batch,C,H,W)转换成(batch,HxWxC)
#最后在dim=1上连接
#注意mx.nd.flatten()的功能和numpy.flatten()不同,mx.nd.flatten()会保留维度0,只合并后面的维度
def flatten_pred(pred):
return pred.transpose((0, 2, 3, 1)).flatten()
def concat_preds(preds):
return nd.concat(*[flatten_pred(p) for p in preds], dim=1)
In [6]:
def blk_forward(X, blk, size, ratio, cls_predictor, bbox_predictor):
Y = blk(X) #提取特征
anchors = contrib.ndarray.MultiBoxPrior(Y, sizes=size, ratios=ratio) #获得anchor
cls_preds = cls_predictor(Y) #预测类别 (这不是上面定义的函数,而是其具体实现,即一个卷积层)
bbox_preds = bbox_predictor(Y) #预测边界框 (这不是上面定义的函数,而是其具体实现,即一个卷积层)
return (Y, anchors, cls_preds, bbox_preds)
In [7]:
class TinySSD(nn.Block):
def __init__(self, num_classes, **kwargs):
super(TinySSD, self).__init__(**kwargs)
self.num_classes = num_classes
#self.sizes = [[0.2, 0.272], [0.37, 0.447], [0.54, 0.619], [0.71, 0.79],[0.88, 0.961]]
#self.ratios = [[1, 2, 0.5]] * 5
# scale - 0.05, scale, scale + 0.05
self.sizes = anchor_sizes
self.ratios = anchor_ratios
self.num_anchors = len(self.sizes[0]) + len(self.ratios[0]) - 1
self.stage_0, self.stage_1, self.stage_2, self.stage_3 = nn.Sequential(),nn.Sequential(),nn.Sequential(),nn.Sequential()
self.stage_4 = nn.Sequential()
self.stage_0.add(get_blk(0), cls_predictor(self.num_anchors, self.num_classes), bbox_predictor(self.num_anchors)) #backbone
self.stage_1.add(get_blk(1), cls_predictor(self.num_anchors, self.num_classes), bbox_predictor(self.num_anchors)) #第1个预测层
self.stage_2.add(get_blk(2), cls_predictor(self.num_anchors, self.num_classes), bbox_predictor(self.num_anchors)) #第2个预测层
self.stage_3.add(get_blk(3), cls_predictor(self.num_anchors, self.num_classes), bbox_predictor(self.num_anchors)) #第3个预测层
self.stage_4.add(get_blk(4), cls_predictor(self.num_anchors, self.num_classes), bbox_predictor(self.num_anchors)) #第4个预测层
return
def forward(self, X):
anchors, cls_preds, bbox_preds = [None] * 5, [None] * 5, [None] * 5
#print(X.shape)
X,anchors[0], cls_preds[0], bbox_preds[0] = blk_forward(X, self.stage_0[0], self.sizes[0], self.ratios[0], self.stage_0[1], self.stage_0[2])
#print(X.shape)
X,anchors[1], cls_preds[1], bbox_preds[1] = blk_forward(X, self.stage_1[0], self.sizes[1], self.ratios[1], self.stage_1[1], self.stage_1[2])
#print(X.shape)
X,anchors[2], cls_preds[2], bbox_preds[2] = blk_forward(X, self.stage_2[0], self.sizes[2], self.ratios[2], self.stage_2[1], self.stage_2[2])
#print(X.shape)
X,anchors[3], cls_preds[3], bbox_preds[3] = blk_forward(X, self.stage_3[0], self.sizes[3], self.ratios[3], self.stage_3[1], self.stage_3[2])
#print(X.shape)
X,anchors[4], cls_preds[4], bbox_preds[4] = blk_forward(X, self.stage_4[0], self.sizes[4], self.ratios[4], self.stage_4[1], self.stage_4[2])
# reshape函数中的0表示保持批量大小不变
#print(X.shape)
return (nd.concat(*anchors, dim=1),
concat_preds(cls_preds).reshape(
(0, -1, self.num_classes + 1)), concat_preds(bbox_preds))
net = TinySSD(num_classes=num_classes)
net.initialize()
X = nd.zeros((32, 3, 256, 256))
anchors, cls_preds, bbox_preds = net(X)
print('output anchors:', anchors.shape)
print('output class preds:', cls_preds.shape)
print('output bbox preds:', bbox_preds.shape)
In [8]:
import pdb
cls_loss = gloss.SoftmaxCrossEntropyLoss()
bbox_loss = gloss.L1Loss()
def calc_loss(cls_preds, cls_labels, bbox_preds, bbox_labels, bbox_masks):
cls = cls_loss(cls_preds, cls_labels)
bbox = bbox_loss(bbox_preds * bbox_masks, bbox_labels * bbox_masks)
return cls + bbox
def cls_eval(cls_preds, cls_labels):
# 由于类别预测结果放在最后一维,argmax需要指定最后一维
return (cls_preds.argmax(axis=-1) == cls_labels).sum().asscalar()
def bbox_eval(bbox_preds, bbox_labels, bbox_masks):
# print (bbox_labels*bbox_masks)
# print (bbox_preds*bbox_masks).sum()
return ((bbox_labels - bbox_preds) * bbox_masks).abs().sum().asscalar()
In [9]:
import time
from mxnet import lr_scheduler
ctx, net = mx.gpu(), TinySSD(num_classes=num_classes)
net.initialize(init=init.Xavier(), ctx=ctx)
#net.load_parameters(net_name)
#net.collect_params().reset_ctx(ctx)
num_epochs = 100
lr_sch = lr_scheduler.FactorScheduler(step=50, factor=0.1)
lr_sch.base_lr = 0.2
trainer = gluon.Trainer(net.collect_params(), 'sgd',{'wd': 5e-4})
start = time.time()
for epoch in range(num_epochs):
acc_sum, mae_sum, n, m = 0.0, 0.0, 0, 0
loss_hist = []
trainer.set_learning_rate(lr_sch(epoch))
for batch in train_iter:
X = batch[0].as_in_context(ctx)
Y = batch[1].as_in_context(ctx)
#print(X)
#print(X.shape,Y.shape)
with autograd.record():
# 生成多尺度的锚框,为每个锚框预测类别和偏移量
anchors, cls_preds, bbox_preds = net(X)
# 为每个锚框标注类别和偏移量
#print(cls_preds.transpose((0, 2, 1)).shape)
bbox_labels, bbox_masks, cls_labels = contrib.nd.MultiBoxTarget(
anchors, Y, cls_preds.transpose((0, 2, 1)))
# 根据类别和偏移量的预测和标注值计算损失函数
l = calc_loss(cls_preds, cls_labels, bbox_preds, bbox_labels,
bbox_masks)
l.backward()
trainer.step(batch_size)
#nd.waitall()
loss_hist.append( l.asnumpy()[0] / batch_size )
acc_sum += cls_eval(cls_preds, cls_labels)
n += cls_labels.size
mae_sum += bbox_eval(bbox_preds, bbox_labels, bbox_masks)
m += bbox_labels.size
if (epoch + 1) % 10 == 0:
loss = np.asarray(loss_hist).mean()
print('epoch %2d, class err %.5e, bbox mae %.5e, loss %.5e, lr %.5e time %.1f sec' % (
epoch + 1, 1 - acc_sum / n, mae_sum / m, loss, trainer.learning_rate, time.time() - start))
start = time.time() #restart
net.save_parameters(net_name)
In [10]:
if 0:
#trainer = gluon.Trainer(net.collect_params(), 'sgd', {'learning_rate': 0.2, 'wd': 5e-4})
def load_data_pikachu(batch_size, edge_size=256): # edge_size:输出图像的宽和高
data_dir = 'data/pikachu'
# _download_pikachu(data_dir)
train_iter = image.ImageDetIter(
path_imgrec=os.path.join(data_dir, 'train.rec'),
path_imgidx=os.path.join(data_dir, 'train.idx'),
batch_size=batch_size,
data_shape=(3, edge_size, edge_size), # 输出图像的形状
shuffle=False, # 以随机顺序读取数据集
rand_crop=0, # 随机裁剪的概率为1
min_object_covered=0.95, max_attempts=200)
val_iter = image.ImageDetIter(
path_imgrec=os.path.join(data_dir, 'val.rec'), batch_size=batch_size,
data_shape=(3, edge_size, edge_size), shuffle=False)
return train_iter, val_iter
start = time.time()
train_iter, _ = load_data_pikachu(batch_size)
for epoch in range(20):
acc_sum, mae_sum, n, m = 0.0, 0.0, 0, 0
train_iter.reset() # 从头读取数据
for batch in train_iter:
X = batch.data[0].as_in_context(ctx)
Y = batch.label[0].as_in_context(ctx)
with autograd.record():
# 生成多尺度的锚框,为每个锚框预测类别和偏移量
anchors, cls_preds, bbox_preds = net(X)
# 为每个锚框标注类别和偏移量
bbox_labels, bbox_masks, cls_labels = contrib.nd.MultiBoxTarget(
anchors, Y, cls_preds.transpose((0, 2, 1)))
# 根据类别和偏移量的预测和标注值计算损失函数
l = calc_loss(cls_preds, cls_labels, bbox_preds, bbox_labels,
bbox_masks)
l.backward()
trainer.step(batch_size)
acc_sum += cls_eval(cls_preds, cls_labels)
n += cls_labels.size
mae_sum += bbox_eval(bbox_preds, bbox_labels, bbox_masks)
m += bbox_labels.size
if (epoch + 1) % 5 == 0:
print('epoch %2d, class err %.2e, bbox mae %.2e, time %.1f sec' % (
epoch + 1, 1 - acc_sum / n, mae_sum / m, time.time() - start))
start = time.time()
In [11]:
%matplotlib inline
import matplotlib.pyplot as plt
def predict(X):
anchors, cls_preds, bbox_preds = net(X.as_in_context(ctx))
#print(bbox_preds)
cls_probs = cls_preds.softmax().transpose((0, 2, 1))
output = contrib.nd.MultiBoxDetection(cls_probs, bbox_preds, anchors)
idx = [i for i, row in enumerate(output[0]) if row[0].asscalar() != -1]
if len(idx) < 1:
return None
return output[0, idx]
def bbox_to_rect(bbox, color):
"""Convert bounding box to matplotlib format."""
return plt.Rectangle(xy=(bbox[0], bbox[1]), width=bbox[2]-bbox[0],
height=bbox[3]-bbox[1], fill=False, edgecolor=color,linewidth=2)
def _make_list(obj, default_values=None):
if obj is None:
obj = default_values
elif not isinstance(obj, (list, tuple)):
obj = [obj]
return obj
def show_bboxes(axes, bboxes, labels=None, colors=None):
"""Show bounding boxes."""
labels = _make_list(labels)
colors = _make_list(colors, ['b', 'g', 'r', 'm', 'k'])
for i, bbox in enumerate(bboxes):
color = colors[i % len(colors)]
rect = bbox_to_rect(bbox.asnumpy(), color)
axes.add_patch(rect)
if labels and len(labels) > i:
text_color = 'k' if color == 'w' else 'w'
axes.text(rect.xy[0], rect.xy[1], labels[i],
va='center', ha='center', fontsize=9, color=text_color,bbox=dict(facecolor=color, lw=0))
def display(fig,img, output, threshold):
fig.imshow(img.asnumpy())
for row in output:
score = row[1].asscalar()
if score < threshold:
#print('skip obj with low score:',row)
continue
h, w = img.shape[0:2]
bbox = [row[2:6] * nd.array((w, h, w, h), ctx=row.context)]
#print(bbox)
show_bboxes(fig.axes, bbox, '%.2f' % score, 'r')
plt.figure(figsize=(30,30))
for ind in range(0,10):
fig = plt.subplot(5,2,ind+1)
if isinstance(train_iter, mx.image.ImageDetIter):
img = image.imread('data/pikachu/pikachu.jpg')
else:
img = image.imread(Xs[ind*5])
feature = image.imresize(img, 256, 256).astype('float32') / 1.0
X = feature.transpose((2, 0, 1)).expand_dims(axis=0)
output = predict(X)
if output is not None:
display(fig,img, output, threshold=0.3)
else:
print('no-obj found')
In [ ]:
In [ ]: