In [1]:
from __future__ import print_function, division
import os
from os.path import join as pj
import shutil
from glob import glob
import numpy as np
np.random.seed = 0 # for reproducibility
import pandas as pd
import matplotlib
%matplotlib inline
from matplotlib import pylab as plt
# %config InlineBackend.figure_format = 'retina'
from matplotlib.patches import Circle
import matplotlib.patheffects as PathEffects
import seaborn as sns
from PIL import Image
import json
from tqdm import tqdm
In [2]:
import cv2
In [3]:
from sklearn.model_selection import train_test_split
In [4]:
def list_dir_with_full_paths(dir_path):
dir_abs_path = os.path.abspath(dir_path)
return sorted([os.path.join(dir_abs_path, file_name) for file_name in os.listdir(dir_abs_path)])
In [5]:
# IMAGE_HEIGHT, IMAGE_WIDTH = 300, 300
In [6]:
RAW_DATA_DIR = './data/trainset/'
IMAGES_FROM_VIDEOS_DIR = './data/images_from_videos'
In [7]:
IMAGES_BY_CLASS_DIR = './data/images_by_class'
TRAIN_IMAGES_BY_CLASS_DIR = './data/train_images_by_class'
VAL_IMAGES_BY_CLASS_DIR = './data/val_images_by_class'
In [8]:
with open(pj(RAW_DATA_DIR, 'ideal.txt')) as fin:
video_name_to_switch_frame = dict()
for line in fin.readlines():
line_splitted = line.strip().split(' ')
video_name, switch_frame = line_splitted[0], int(line_splitted[-1])
video_name_to_switch_frame[video_name] = switch_frame
In [9]:
def extract_images_from_video(video_path, images_dir, switch_frame):
video_name = os.path.splitext(os.path.basename(video_path))[0]
video_capture = cv2.VideoCapture(video_path)
_, _ = video_capture.read() # mock read
count = 0
success, image = video_capture.read()
while success:
if count < switch_frame or switch_frame == -1:
label = 0 # red traffic light
else:
label = 1 # green traffic light
image_path = pj(images_dir, '{:03}_{}_{}.jpg'.format(count, video_name, label))
cv2.imwrite(image_path, image)
success, image = video_capture.read()
count += 1
In [10]:
if not os.path.exists(IMAGES_FROM_VIDEOS_DIR):
os.mkdir(IMAGES_FROM_VIDEOS_DIR)
video_paths = list(filter(lambda x: x.endswith('.avi'), list_dir_with_full_paths(RAW_DATA_DIR)))
for video_path in tqdm(video_paths[1:]):
video_base_name = os.path.basename(video_path)
images_dir = pj(IMAGES_FROM_VIDEOS_DIR, os.path.splitext(video_base_name)[0])
os.mkdir(images_dir)
switch_frame = video_name_to_switch_frame[video_base_name]
extract_images_from_video(video_path, images_dir, switch_frame)
else:
print('Directory {} already exists!'.format(IMAGES_FROM_VIDEOS_DIR))
In [11]:
def parse_image_name(image_name):
image_name = os.path.splitext(image_name)[0] # delete file's extension
image_name_splitted = image_name.split('_')
frame, video_name, label = int(image_name_splitted[0]), image_name_splitted[1], int(image_name_splitted[2])
return frame, video_name, label
In [12]:
def create_classification_dir_from_images_dirs(images_dirs, classification_dir):
images_dirs = filter(os.path.isdir, images_dirs)
images_paths = []
for images_dir in images_dirs:
images_paths.extend(glob(pj(images_dir ,'*.jpg')))
if not os.path.exists(classification_dir):
os.mkdir(classification_dir)
os.mkdir(pj(classification_dir, '0'))
os.mkdir(pj(classification_dir, '1'))
for image_path in tqdm(images_paths) :
frame, video_name, label = parse_image_name(os.path.basename(image_path))
shutil.copy(image_path, pj(classification_dir, str(label)))
else:
print('Directory {} already exists!'.format(classification_dir))
All images into classification dir:
In [13]:
# create_classification_dir_from_images_dirs(list_dir_with_full_paths(IMAGES_FROM_VIDEOS_DIR),
# IMAGES_BY_CLASS_DIR)
Train/validation split:
In [14]:
train_images_dirs, val_images_dirs = train_test_split(
list_dir_with_full_paths(IMAGES_FROM_VIDEOS_DIR),
test_size=0.25, random_state=0
)
In [15]:
create_classification_dir_from_images_dirs(train_images_dirs,
TRAIN_IMAGES_BY_CLASS_DIR)
In [17]:
create_classification_dir_from_images_dirs(val_images_dirs,
VAL_IMAGES_BY_CLASS_DIR)