In [1]:
import os
from IPython.display import Image
import time
from util import Util
u = Util()
import image_utils as iu
import keras_image_utils as kiu
import numpy as np
# Explicit random seed for reproducibility
np.random.seed(1337)
In [2]:
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Convolution2D, MaxPooling2D
from keras.layers import Merge
In [3]:
import dataset_generator as dataset
In [4]:
# letter list
ALPHABET_ALL = dataset.ALPHABET_ALL
(_, _, X_test_22, y_test_22, _) = dataset.generate_all_chars_with_class(verbose=0, plot=False)
(input_shape, X_test_22, Y_test_22) = kiu.adjust_input_output(X_test_22, y_test_22, 22)
print ("Loaded test set for all the characters")
(_, _, X_test_seg, y_test_seg) = dataset.generate_dataset_for_segmentator(verbose=0, plot=False)
(_, X_test_seg, Y_test_seg) = kiu.adjust_input_output(X_test_seg, y_test_seg, 2)
print ("Loaded test set for good and bad segments")
X_test_char = {}
y_test_char = {}
Y_test_char = {}
for char in ALPHABET_ALL:
(_, _, X_test_char[char], y_test_char[char]) = dataset.generate_positive_and_negative_labeled(char, verbose=0)
(_, X_test_char[char], Y_test_char[char]) = kiu.adjust_input_output(X_test_char[char], y_test_char[char], 2)
print ("Loaded test set for char '" + char + "'")
In [5]:
# input image dimensions
img_rows, img_cols = 34, 56
# number of networks for ensamble learning
number_of_models = 5
# checkpoints dir
checkpoints_dir = "checkpoints"
# size of pooling area for max pooling
pool_size1 = (2, 2)
pool_size2 = (3, 3)
# convolution kernel size
kernel_size1 = (4, 4)
kernel_size2 = (5, 5)
# dropout rate
dropout = 0.15
# activation
activation = 'relu'
In [6]:
def initialize_network_single_column(model, nb_classes, nb_filters1, nb_filters2, dense_layer_size1):
model.add(Convolution2D(nb_filters1, kernel_size1[0], kernel_size1[1],
border_mode='valid',
input_shape=input_shape, name='covolution_1_' + str(nb_filters1) + '_filters'))
model.add(Activation(activation, name='activation_1_' + activation))
model.add(MaxPooling2D(pool_size=pool_size1, name='max_pooling_1_' + str(pool_size1) + '_pool_size'))
model.add(Convolution2D(nb_filters2, kernel_size2[0], kernel_size2[1]))
model.add(Activation(activation, name='activation_2_' + activation))
model.add(MaxPooling2D(pool_size=pool_size2, name='max_pooling_1_' + str(pool_size2) + '_pool_size'))
model.add(Dropout(dropout))
model.add(Flatten())
model.add(Dense(dense_layer_size1, name='fully_connected_1_' + str(dense_layer_size1) + '_neurons'))
model.add(Activation(activation, name='activation_3_' + activation))
model.add(Dropout(dropout))
model.add(Dense(nb_classes, name='output_' + str(nb_classes) + '_neurons'))
model.add(Activation('softmax', name='softmax'))
model.compile(loss='categorical_crossentropy',
optimizer='adadelta',
metrics=['accuracy', 'precision', 'recall'])
def try_load_checkpoints(model, checkpoints_filepath, warn=True):
# loading weights from checkpoints
if os.path.exists(checkpoints_filepath):
model.load_weights(checkpoints_filepath)
elif warn:
print('Warning: ' + checkpoints_filepath + ' could not be loaded')
def initialize_network_multi_column(merged_model, models):
merged_model.add(Merge(models, mode='ave'))
merged_model.compile(loss='categorical_crossentropy',
optimizer='adadelta',
metrics=['accuracy', 'precision', 'recall'])
def create_and_load_network(number_of_models, checkpoint_paths, nb_classes,
nb_filters1, nb_filters2, dense_layer_size1):
# pseudo random generation of seeds
seeds = np.random.randint(10000, size=number_of_models)
# initializing all the models
models = [None] * number_of_models
for i in range(number_of_models):
models[i] = Sequential()
initialize_network_single_column(models[i], nb_classes, nb_filters1, nb_filters2, dense_layer_size1)
try_load_checkpoints(models[i], checkpoint_paths[i])
# initializing merged model
merged_model = Sequential()
initialize_network_multi_column(merged_model, models)
return (merged_model, models)
In [7]:
# 22 classes ocr
ocr_weigts_dir = os.path.join(checkpoints_dir, "09_22-classes")
ocr_weights = [os.path.join(ocr_weigts_dir, "09_ICR_weights.best_0.hdf5"),
os.path.join(ocr_weigts_dir, "09_ICR_weights.best_1.hdf5"),
os.path.join(ocr_weigts_dir, "09_ICR_weights.best_2.hdf5"),
os.path.join(ocr_weigts_dir, "09_ICR_weights.best_3.hdf5"),
os.path.join(ocr_weigts_dir, "09_ICR_weights.best_4.hdf5")]
(ocr_model, _) = create_and_load_network(5, ocr_weights, 22, 50, 100, 250)
score = ocr_model.evaluate([np.asarray(X_test_22)] * number_of_models, Y_test_22, verbose=0)
print ("Loaded 22 classes orc model with test error of ", (1-score[2])*100, '%')
# segmentator network (good cut / bad cut)
segmentator_weigts_dir = os.path.join(checkpoints_dir, "letter_not_letter")
segmentator_weights = [os.path.join(segmentator_weigts_dir, "10_ICR_weights.best_0.hdf5"),
os.path.join(segmentator_weigts_dir, "10_ICR_weights.best_1.hdf5"),
os.path.join(segmentator_weigts_dir, "10_ICR_weights.best_2.hdf5"),
os.path.join(segmentator_weigts_dir, "10_ICR_weights.best_3.hdf5"),
os.path.join(segmentator_weigts_dir, "10_ICR_weights.best_4.hdf5")]
(segmentator_model, _) = create_and_load_network(5, segmentator_weights, 2, 50, 100, 250)
score = segmentator_model.evaluate([np.asarray(X_test_seg)] * number_of_models, Y_test_seg, verbose=0)
print ("Loaded binary segmentator model with test error of ", (1-score[2])*100, '%')
print ("---")
# single letter segmentator / ocr
single_letter_models = {}
single_letter_weights_dir = {}
single_letter_weights = {}
errors = []
for char in ALPHABET_ALL:
single_letter_weights_dir[char] = os.path.join(checkpoints_dir, char)
single_letter_weights[char] = [os.path.join(single_letter_weights_dir[char], "0.hdf5"),
os.path.join(single_letter_weights_dir[char], "1.hdf5"),
os.path.join(single_letter_weights_dir[char], "2.hdf5"),
os.path.join(single_letter_weights_dir[char], "3.hdf5"),
os.path.join(single_letter_weights_dir[char], "4.hdf5")]
(single_letter_models[char], _) = create_and_load_network(5, single_letter_weights[char], 2, 20, 40, 150)
score = single_letter_models[char].evaluate([np.asarray(X_test_char[char])] * number_of_models, Y_test_char[char], verbose=0)
print ("Loaded binary model for '" + char + "', with test error of ", (1-score[2])*100, '%')
errors.append(1-score[2])
print("Average test error: ", sum(errors) / float(len(errors)) * 100, "%")
In [8]:
def predict_pipeline1(data, count_letter=True):
start_time = time.time()
count = 0
for bad_cut in data:
flag = False
count += 1
bad_cuts = np.asarray([bad_cut])
if count_letter:
print ("Predictions for the supposed letter number " + str(count))
for char in ALPHABET_ALL:
predictions = single_letter_models[char].predict([bad_cuts] * number_of_models)
if (predictions[0][1] > predictions[0][0]):
print ("Cut " + str(count) + " has been classified as good corresponding to char '" +\
char + "' with a confidence of " + str(predictions[0][1] * 100) + "%")
flag = True
if not flag:
print ("Bad cut")
#print ("---")
elapsed_time = time.time() - start_time
print("Elapsed time:", elapsed_time)
print("---")
print(" ")
def predict_pipeline2(data, count_letter=True):
start_time = time.time()
count = 0
for bad_cut in data:
count += 1
bad_cuts = np.asarray([bad_cut])
if count_letter:
print ("Predictions for the supposed letter number " + str(count))
predictions = segmentator_model.predict([bad_cuts] * number_of_models)
if (predictions[0][1] > predictions[0][0]):
predictions = ocr_model.predict([bad_cuts] * number_of_models)
ind = (-predictions[0]).argsort()[:3]
for i in ind:
print ("Good cut corresponding to letter '" + ALPHABET_ALL[i] + \
"' with a confidence of " + str(predictions[0][i] * 100) + "%")
else:
print ("Bad cut with a confidence of " + str(predictions[0][0] * 100) + "%")
#print ("---")
elapsed_time = time.time() - start_time
print("Elapsed time:", elapsed_time)
print("---")
print(" ")
def predict_pipeline3(data, count_letter=True):
start_time = time.time()
count = 0
for bad_cut in data:
flag = False
count += 1
bad_cuts = np.asarray([bad_cut])
if count_letter:
print ("Predictions for the supposed letter number " + str(count))
for char in ALPHABET_ALL:
predictions = single_letter_models[char].predict([bad_cuts] * number_of_models)
if (predictions[0][1] > predictions[0][0]):
print ("Good cut with a confidence of " + str(predictions[0][1] * 100) + "% by letter '" + char + "'")
flag = True
if flag:
predictions = ocr_model.predict([bad_cuts] * number_of_models)
ind = (-predictions[0]).argsort()[:3]
for i in ind:
print ("Good cut corresponding to letter '" + ALPHABET_ALL[i] + \
"' with a confidence of " + str(predictions[0][i] * 100) + "%")
else:
print ("Bad cut")
#print ("---")
elapsed_time = time.time() - start_time
print("Elapsed time:", elapsed_time)
print("---")
print(" ")
def predict_pipeline4(data, count_letter=True):
start_time = time.time()
count = 0
for bad_cut in data:
count += 1
bad_cuts = np.asarray([bad_cut])
if count_letter:
print ("Predictions for the supposed letter number " + str(count))
predictions = segmentator_model.predict([bad_cuts] * number_of_models)
if (predictions[0][1] > predictions[0][0]):
for char in ALPHABET_ALL:
predictions = single_letter_models[char].predict([bad_cuts] * number_of_models)
if (predictions[0][1] > predictions[0][0]):
print ("Good cut with a confidence of " + str(predictions[0][1] * 100) + "% by letter '" + char + "'")
else:
print ("Bad cut with a confidence of " + str(predictions[0][0] * 100) + "%")
#print ("---")
elapsed_time = time.time() - start_time
print("Elapsed time:", elapsed_time)
print("---")
print(" ")
In [9]:
u.plot_image(iu.load_sample("not_code/048r.png"), (1843, 1397))
In [10]:
files = []
folders = []
for (path, dirnames, filenames) in os.walk(os.path.join('not_code','all_windows_048r')):
folders.extend(os.path.join(path, name) for name in dirnames)
files.extend(os.path.join(path, name) for name in filenames)
cuts = iu.open_many_samples(files)
(cuts, _) = kiu.adjust_input(np.asarray(cuts))
print ("Examples: ")
u.plot_some_images(cuts[:18], (img_cols, img_rows), grid_x=6, grid_y=3)
In [11]:
for i in range(0,len(cuts)):
print("Cut number ", i+1)
u.plot_image(cuts[i], (img_cols, img_rows))
print("Pipeline 1")
predict_pipeline1([cuts[i]], count_letter=False)
print("Pipeline 2")
predict_pipeline2([cuts[i]], count_letter=False)
print("Pipeline 3")
predict_pipeline3([cuts[i]], count_letter=False)
print("Pipeline 4")
predict_pipeline4([cuts[i]], count_letter=False)
print("---------")
print(" ")