In [16]:
import re
from PIL import Image
import os
import numpy as np
from tempfile import mkdtemp
import os.path as path
import matplotlib.pyplot as plt
from keras.preprocessing import image
import theano
from scipy.misc import imread, imresize, imsave
import keras
import traceback
from PIL import Image
from keras.models import Sequential
from scipy.misc import imread, imresize, imsave
from keras.layers.core import Flatten, Dense, Dropout,Activation
from keras.layers.convolutional import Convolution2D, MaxPooling2D, ZeroPadding2D,UpSampling2D,UpSampling1D,Cropping2D
from keras.optimizers import SGD
# %matplotlib inline
from keras import backend as K
from keras.utils import np_utils
K.set_image_dim_ordering('th')
from sklearn.preprocessing import normalize
import traceback
from sklearn.model_selection import train_test_split
from scipy import ndimage
from keras.layers.pooling import MaxPooling2D
from keras.layers.pooling import AveragePooling2D
from sklearn.cross_validation import train_test_split
from keras import callbacks
import glob
from shutil import copyfile
import warnings
from shutil import copyfile
import theano.tensor.nnet.abstract_conv as absconv
import h5py
In [3]:
def imageResize(imagePath):
"""
resize image
basename : eg. /home/username/XYZFolder
image name : xyz.jpg
New folder in the working directory will be created with '_resized' as suffix
"""
imagePathsplitted = imagePath.split("/")
new_width = 224
new_height = 224
try:
img = Image.open(imagePath) # image extension *.png,*.jpg
img = img.resize((new_width, new_height), Image.ANTIALIAS)
img.save('img_resized/'+imagePathsplitted[1]+"/"+imagePathsplitted[2])
except:
os.mkdir('img_resized/'+imagePathsplitted[1]+"/")
img = Image.open(imagePath) # image extension *.png,*.jpg
img = img.resize((new_width, new_height), Image.ANTIALIAS)
img.save('img_resized/' + imagePathsplitted[1] + "/" + imagePathsplitted[2])
def resizeAll():
"""
"""
fileIn = open("Anno/list_category_img.txt").read().splitlines()
for imageAndCategoryLineNo in range(0,len(fileIn)):
try:
imagePath = fileIn[imageAndCategoryLineNo].split(" ")[0]
# category = imageAndCategory.split(" ")[-1]
imagePath = imagePath.strip()
# category = int(category.strip())
print imageAndCategoryLineNo
imageResize(imagePath)
except:
print imageAndCategoryLineNo,"ERROR ENCOUNTERED"
""
In [4]:
# resizeAll()
In [5]:
def load_image(infilename):
"""
Function to load image to ram as in form of numpy data.
"""
img = ndimage.imread( infilename )
data = np.asarray( img, dtype="int16" )
resized = data.reshape(data.shape[2],data.shape[0],data.shape[1])
resized = resized/255.0
return resized
In [6]:
# defining number of classes under consideration
uniqueClassNames = 2
In [7]:
def getImageAndCategory(batchSize):
"""
to get an image loaded in to ram along with its category
usage : image,labels = getImageAndCategory(10) # use iterator
test:
i = 0
for image,labels in getImageAndCategory(10):
print image.shape,labels.shape
i = i + 1
if i == 5:
break
"""
fileIn = list(set(open("Anno/new_category_img.txt").read().splitlines()))# set will shuffle all lines in files, good for ML with minibatch
print "TOTAL FILES IN EXPERIMETNS ARE : ", len(fileIn)
for batchNo in range(0,len(fileIn)/batchSize):
imagePaths = fileIn[batchNo*batchSize:(batchNo+1)*batchSize]
images = []
labels = []
for imageAndCategoryLineNo in range (0,len(imagePaths)):
try:
singleImagePathAndCategory = imagePaths[imageAndCategoryLineNo].split(" ")
category = singleImagePathAndCategory[-1].strip()
singleImagePath = singleImagePathAndCategory[0].strip()
singleImagePath = singleImagePath.replace("img/","img_resized/")
images.append(load_image(singleImagePath))
category = int(category)
labels.append(category)
except:
print traceback.print_exc()
print "ERROR ENCOUNTERED"
yield np.asarray(images, dtype='float16'),np.asarray(np_utils.to_categorical(np.asarray(labels),uniqueClassNames),dtype='bool')
In [8]:
def getModelDefination(trainedModelPath=None):
"""
core definition of model
:return: compiled model
"""
# defining convolutional network
model = Sequential()
model.add(ZeroPadding2D((1, 1), input_shape=(3, 224, 224)))
model.add(Convolution2D(64, 3, 3, activation='relu'))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(64, 3, 3, activation='relu'))
model.add(MaxPooling2D((2, 2), strides=(2, 2)))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(128, 3, 3, activation='relu'))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(128, 3, 3, activation='relu'))
model.add(MaxPooling2D((2, 2), strides=(2, 2)))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(256, 3, 3, activation='relu'))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(256, 3, 3, activation='relu'))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(256, 3, 3, activation='relu'))
model.add(MaxPooling2D((2, 2), strides=(2, 2)))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(512, 3, 3, activation='relu'))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(512, 3, 3, activation='relu'))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(512, 3, 3, activation='relu'))
model.add(MaxPooling2D((2, 2), strides=(2, 2)))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(512, 3, 3, activation='relu'))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(512, 3, 3, activation='relu'))
model.add(ZeroPadding2D((1, 1)))
model.add(Convolution2D(512, 3, 3, activation='relu'))
model.add(MaxPooling2D((2, 2), strides=(2, 2)))
model.add(Flatten())
model.add(Dense(4096, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(4096, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(2, activation='softmax'))
# compiling model
model.compile(optimizer='sgd', loss='categorical_crossentropy',metrics=['accuracy'])
# returning Model
return model
In [9]:
def save_test_images(epochNo,batch_Count,image_numpy, actual_label, predicted_label):
"""
to cheack prediction of model on given 10 images while training
"""
for eachImageno in range(0,len(image_numpy)):
shape_of_image = image_numpy[eachImageno].shape
# writing an image with epochNo_acualclass_predictedClass.jpg
imsave('inline_validation/'+str(epochNo)+"_"+str(batch_Count)+"_"+str(list(actual_label[eachImageno]).index(True))+"_"+str(predicted_label[eachImageno])+".jpg", image_numpy[eachImageno].reshape(shape_of_image[1],shape_of_image[2],shape_of_image[0]))
In [10]:
def VGGCAM(nb_classes, num_input_channels):
"""
Build Convolution Neural Network
nb_classes : nb_classes (int) number of classes
num_input_channels : number of channel to be kept in last convolutional model of VGGCAM
returns : Neural Net model
"""
VGGCAM = Sequential()
VGGCAM.add(ZeroPadding2D((1, 1), input_shape=(3, 224, 224)))
VGGCAM.add(Convolution2D(64, 3, 3, activation='relu'))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(64, 3, 3, activation='relu'))
VGGCAM.add(MaxPooling2D((2, 2), strides=(2, 2)))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(128, 3, 3, activation='relu'))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(128, 3, 3, activation='relu'))
VGGCAM.add(MaxPooling2D((2, 2), strides=(2, 2)))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(256, 3, 3, activation='relu'))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(256, 3, 3, activation='relu'))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(256, 3, 3, activation='relu'))
VGGCAM.add(MaxPooling2D((2, 2), strides=(2, 2)))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(512, 3, 3, activation='relu'))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(512, 3, 3, activation='relu'))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(512, 3, 3, activation='relu'))
VGGCAM.add(MaxPooling2D((2, 2), strides=(2, 2)))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(512, 3, 3, activation='relu'))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(512, 3, 3, activation='relu'))
VGGCAM.add(ZeroPadding2D((1, 1)))
VGGCAM.add(Convolution2D(512, 3, 3, activation='relu'))
# Add another conv layer with ReLU + GAP
VGGCAM.add(Convolution2D(num_input_channels, 3, 3, activation='relu', border_mode="same"))
VGGCAM.add(AveragePooling2D((14, 14)))
VGGCAM.add(Flatten())
# Add the W layer
VGGCAM.add(Dense(nb_classes, activation='softmax'))
# VGGCAM.summary()
return VGGCAM
In [11]:
def train_VGGCAM(trained_weight_path, nb_classes,epoches,batchSize, num_input_channels):
"""
Train VGG model
args: VGG_weight_path (str) path to keras vgg16 weights
nb_classes (int) number of classes
num_input_channels (int) number of conv filters to add
in before the GAP layer
"""
# Load model
trainedModel = getModelDefination(trainedModelPath=trained_weight_path)
# Compile
sgd = SGD(lr=0.001, decay=1e-6, momentum=0.9, nesterov=True)
trainedModel.compile(optimizer=sgd, loss='categorical_crossentropy',metrics=['accuracy'])
for epochNo in range(0,epoches):
print "Epoch No : ", epochNo
batch_Count = 0
for image,labels in getImageAndCategory(batchSize):
try:
# last 10 image selection for test while training
# train model with rest images
for i in range (len(trainedModel.layers)):
print (i, trainedModel.layers[i].name),
print "\n"+"%"*100
trainedModel.fit(image,labels,batch_size=50,nb_epoch=1, verbose=1)
modelCAM = VGGCAM(nb_classes,num_input_channels)
print ("NAME OF LAYERS IN NEW MODEL FOR CAM")
for i in range (len(modelCAM.layers)):
print (i, modelCAM.layers[i].name),
# Load weights to new model
for k in range(len(trainedModel.layers)):
weights = trainedModel.layers[k].get_weights()
modelCAM.layers[k].set_weights(weights)
# modelCAM.layers[k].trainable=True
if k==16:
break
print('\nModel loaded.')
batch_Count = batch_Count + 1
modelCAM.save_weights("CAM_Trained.h5")
# to see performance of model on one of the image while training
plot_classmap("CAM_Trained.h5",trainedModel, "jeans.jpg", 1,nb_classes,num_input_channels)
except:
print traceback.print_exc()
In [12]:
def get_classmap(model, X, nb_classes, batch_size, num_input_channels, ratio):
"""
To get heat map from the weight present in last convolutional layer in VGGCAM network
"""
inc = model.layers[0].input
conv6 = model.layers[-4].output
conv6_resized = absconv.bilinear_upsampling(conv6, ratio,
batch_size=batch_size,
num_input_channels=num_input_channels)
WT = model.layers[-1].W.T
conv6_resized = K.reshape(conv6_resized, (1, -1, 224 * 224))
classmap = K.dot(WT, conv6_resized)
# print "\n"+"$"*50
classmap = classmap.reshape((1, nb_classes, 224, 224))
get_cmap = K.function([inc], classmap)
return get_cmap([X])
In [13]:
def plot_classmap(VGGCAM_weight_path, trainedModel,img_path, label,
nb_classes, num_input_channels, ratio=16):
"""
Plot class activation map of trained VGGCAM model
args: VGGCAM_weight_path (str) path to trained keras VGGCAM weights
img_path (str) path to the image for which we get the activation map
label (int) label (0 to nb_classes-1) of the class activation map to plot
nb_classes (int) number of classes
num_input_channels (int) number of conv filters to add
in before the GAP layer
ratio (int) upsampling ratio (16 * 14 = 224)
"""
# Load and compile model
modelCAM = VGGCAM(nb_classes, num_input_channels)
modelCAM.load_weights(VGGCAM_weight_path)
modelCAM.compile(loss="categorical_crossentropy", optimizer="sgd")
img = image.load_img(img_path, target_size=(224, 224))
x = image.img_to_array(img)
#vgg model is used to predict class
label = trainedModel.predict_classes(x.reshape(1, 3, 224, 224),verbose=0)
batch_size = 1
classmap = get_classmap(modelCAM,
x.reshape(1, 3, 224, 224),
nb_classes,
batch_size,
num_input_channels=num_input_channels,
ratio=ratio)
classes = ["jeans","tshirt"]
print "PREDICTED LABEL : ", classes[label[0]]
plt.imshow(img)
#mapping activation on the basis of weights
activation = classmap[0,0, :, :]+classmap[0,1, :, :]
plt.imshow(activation,
cmap='jet',
alpha=0.5,
interpolation='nearest')
plt.show()
# plt.imsave(VGGCAM_weight_path+".jpg",classmap[0, label, :, :])
In [15]:
# train and test on the go
num_input_channels = 1024
epoches =25
batchSize = 14000
train_VGGCAM("vgg16_weights.h5",2,epoches,batchSize, num_input_channels = num_input_channels)