In [ ]:
from __future__ import print_function, division
import os
from os.path import join as pj
import shutil
from glob import glob
import numpy as np
np.random.seed = 0 # for reproducibility
import cv2
import scipy
import pandas as pd
import matplotlib
%matplotlib inline
from matplotlib import pylab as plt
# %config InlineBackend.figure_format = 'retina'
from matplotlib.patches import Circle
import matplotlib.patheffects as PathEffects
import seaborn as sns
from PIL import Image
import json
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_curve, roc_auc_score
In [ ]:
from keras import applications
from keras.preprocessing.image import ImageDataGenerator
from keras import optimizers
from keras.models import Sequential
from keras.layers import Dropout, Flatten, Dense, Conv2D, MaxPooling2D
from keras.models import Model
from keras import metrics
from keras.callbacks import ModelCheckpoint, TensorBoard
Check GPU:
In [ ]:
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())
In [ ]:
from config import config
import models
In [ ]:
EXPERIMENT_DIR = './classification_experiments/2017-08-04-09:13:32/'
with open(pj(EXPERIMENT_DIR, 'config.json')) as fin:
config = json.load(fin)
In [ ]:
# checpoints_dir = pj(EXPERIMENT_DIR, 'checkpoints')
# weights_path = pj(checpoints_dir, sorted(os.listdir(checpoints_dir))[-1])
weights_path = './pretrained_weights/checkpoint-446.hdf5'
In [ ]:
VAL_IMAGE_BY_CLASS_DIR = './data/train/val_images_by_class/'
VAL_IMAGES_FROM_VIDEOS_DIR = './data/train/val_images_from_videos/'
VAL_SWITCH_FRAMES_PATH = './data/train/videos/ideal.txt'
TEST_IMAGES_FROM_VIDEOS_DIR = './data/public_test/images_from_videos/'
In [ ]:
def list_dir_with_full_paths(dir_path):
dir_abs_path = os.path.abspath(dir_path)
return sorted([os.path.join(dir_abs_path, file_name) for file_name in os.listdir(dir_abs_path)])
In [ ]:
model_build_function = models.name_to_model[config['MODEL_NAME']]
model = model_build_function(
config['IMAGE_HEIGHT'], config['IMAGE_WIDTH'],
config['N_CHANNELS'], config['N_CLASSES'],
lr=config['LEARNING_RATE']
)
In [ ]:
model.load_weights(weights_path)
In [ ]:
def preprocess_image(image, image_height, image_width):
image = image / 255
image = cv2.resize(image, (image_width, image_height))
return image
Metrics:
In [ ]:
val_images_paths, val_true_labels = [], []
for class_index in [0, 1]:
images_paths = list_dir_with_full_paths(pj(VAL_IMAGE_BY_CLASS_DIR, str(class_index)))
val_images_paths.extend(images_paths)
val_true_labels.extend([class_index] * len(images_paths))
val_images = []
for val_image_path in tqdm(val_images_paths):
image = np.array(Image.open(val_image_path))
val_images.append(preprocess_image(image, config['IMAGE_HEIGHT'], config['IMAGE_WIDTH']))
In [ ]:
val_predictions_scores = model.predict(np.array(val_images))
val_predictions = np.argmax(val_predictions_scores, axis=1)
In [ ]:
print('Accuracy: {}'.format(accuracy_score(val_true_labels, val_predictions)))
In [ ]:
print('Precision: {}'.format(precision_score(val_true_labels, val_predictions)))
In [ ]:
print('Recall: {}'.format(recall_score(val_true_labels, val_predictions)))
In [ ]:
print('ROC AUC: {}'.format(roc_auc_score(val_true_labels, val_predictions)))
fprs, tprs, _ = roc_curve(val_true_labels, val_predictions_scores[:, 1])
plt.plot(fprs, tprs)
In [ ]:
def predict_for_images_dir(images_dir, model):
images_paths = list_dir_with_full_paths(images_dir)
predictions = []
for image_path in images_paths:
image = np.array(Image.open(image_path))
image = preprocess_image(image, config['IMAGE_HEIGHT'], config['IMAGE_WIDTH'])
predictions.append(np.squeeze(model.predict(np.expand_dims(image, axis=0))))
return np.array(predictions)
In [ ]:
def plot_signal(signal, true_frame_switch=None, color='blue'):
plt.plot(signal, color=color)
if true_frame_switch is not None and true_frame_switch != -1:
plt.axvline(true_frame_switch, color='red')
In [ ]:
def load_switch_frames(path):
if os.path.exists(path):
with open(path) as fin:
video_name_to_switch_frame = dict()
for line in fin.readlines():
line_splitted = line.strip().split(' ')
video_name, switch_frame = line_splitted[0], int(line_splitted[-1])
video_name_to_switch_frame[video_name] = switch_frame
return video_name_to_switch_frame
else:
return None
Load true val switch frames:
In [ ]:
video_name_to_switch_frame = load_switch_frames(VAL_SWITCH_FRAMES_PATH)
Classify frames of val videos:
In [ ]:
val_images_from_videos_paths = list_dir_with_full_paths(VAL_IMAGES_FROM_VIDEOS_DIR)
predictions_scores = []
predictions = []
val_true_frame_switches = []
for val_images_from_videos_path in tqdm(val_images_from_videos_paths):
prediction_score = predict_for_images_dir(val_images_from_videos_path, model)
predictions_scores.append(prediction_score)
predictions.append(np.argmax(prediction_score, axis=1))
val_true_frame_switches.append(video_name_to_switch_frame[os.path.basename(val_images_from_videos_path) + '.avi'])
In [ ]:
for prediction, true_frame_switch in zip(predictions, true_frame_switches):
plot_signal(prediction, true_frame_switch)
plt.show()
Detect switch frame:
In [ ]:
def window_smooth(signal, window_size):
pad_width = window_size // 2
padded_signal = np.lib.pad(signal, pad_width=pad_width, mode='constant', constant_values=(0, 0))
smoothed_signal = []
for i in range(pad_width, len(signal) + pad_width):
smoothed_signal.append(scipy.stats.mode(padded_signal[i - pad_width:i + pad_width + 1])[0][0])
return np.array(smoothed_signal)
In [ ]:
def find_first_switch_frame(signal):
pattern = np.array([0, 1])
pattern_matches = [i for i in range(len(signal) - len(pattern)) if np.array_equal(signal[i:i + len(pattern)], pattern)]
if pattern_matches == []:
return -1
else:
return pattern_matches[0]
In [ ]:
def detect_switch_frame(signal, window_size):
smoothed_signal = window_smooth(signal, window_size)
return find_first_switch_frame(smoothed_signal)
In [ ]:
def visionhack_score(y_true, y_pred):
score = 0
for y_true_item, y_pred_item in zip(y_true, y_pred):
if y_true_item != -1 and y_pred_item != -1:
if abs(y_true_item - y_pred_item) <= 6:
score += 2
if y_true_item != -1 and y_pred_item == -1:
score += 1
if y_true_item == -1 and y_pred_item == -1:
score += 1
return score / len(y_true)
In [ ]:
window_size_grid = [1, 3, 5, 7, 9]
scores = []
for window_size in window_size_grid:
switch_frame_predictions = list(map(lambda x: detect_switch_frame(x, window_size=window_size),
predictions))
score = visionhack_score(val_true_frame_switches, switch_frame_predictions)
scores.append(score)
print('window_size = {}: {}'.format(window_size, score))
window_size_opt = window_size_grid[np.argmax(scores)]
print('Optimal window_size = {}: {}'.format(window_size_opt, np.max(scores)))
In [ ]:
test_images_from_videos_paths = list_dir_with_full_paths(TEST_IMAGES_FROM_VIDEOS_DIR)
switch_frame_predictions = []
for test_images_from_videos_path in tqdm(test_images_from_videos_paths):
prediction_score = predict_for_images_dir(test_images_from_videos_path, model)
prediction = np.argmax(prediction_score, axis=1)
switch_frame_predictions.append(detect_switch_frame(prediction, window_size_opt))
In [ ]:
def make_submission(video_names, predicted_switch_frames, path):
submission = ''
for test_video_name, predicted_switch_frame in zip(test_video_names, predicted_switch_frames):
submission += '{} {}\n'.format(test_video_name, predicted_switch_frame)
with open(path, 'w') as fout:
fout.write(submission)
In [ ]:
test_video_names = list(map(lambda x: os.path.basename(x) + '.avi', test_images_from_videos_paths))
In [ ]:
make_submission(test_video_names, 'submission.txt')