In [175]:
import os
import sys
import shutil
import numpy as np
import tensorflow as tf
from tensorflow.python.ops import init_ops
from tensorflow.contrib.layers.python.layers import regularizers
# module_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "..")
# if module_path not in sys.path:
# sys.path.append(module_path)
# from datasets.batch_generator import datasets
slim = tf.contrib.slim
tf.reset_default_graph()
trunc_normal = lambda stddev: init_ops.truncated_normal_initializer(0.0, stddev)
# Contants
image_channels = 3
time_frames_to_consider = 4
time_frames_to_predict = 4
interval=4 # frames to jump !
heigth_train= 64
width_train= 64
custom_test_size=[160,210]
heigth_test, width_test = custom_test_size
In [176]:
# regularizer !
l2_val = 0.00005
# Adam optimizer !
adam_learning_rate = 0.0004
# Tensorboard images to show
batch_size = 8
number_of_images_to_show = 4
assert number_of_images_to_show <= batch_size, "images to show should be less !"
timesteps=16
file_path = ""
data_folder = os.path.join(file_path, "../../data/")
log_dir_file_path = os.path.join(file_path, "../../logs/")
model_save_file_path = os.path.join(file_path, "../../checkpoint/")
output_video_save_file_path = os.path.join(file_path, "../../output/")
iterations = "iterations/"
best = "best/"
checkpoint_iterations = 100
best_model_iterations = 100
test_model_iterations = 5
best_loss = float("inf")
heigth, width = heigth_train, width_train
channels = 3
assert timesteps>=time_frames_to_consider and timesteps>=time_frames_to_predict, "time steps must be greater !"
In [177]:
heigth_train = 64
width_train = 64
heigth_test = 160
width_test = 240
In [178]:
# Placeholders for inputs and outputs ... !
input_train = tf.placeholder(dtype=tf.float32, shape=[None, heigth_train, width_train, time_frames_to_consider * image_channels])
output_train = tf.placeholder(dtype=tf.float32, shape=[None, heigth_train, width_train, image_channels])
input_test = tf.placeholder(dtype=tf.float32, shape=[None, heigth_test, width_test, time_frames_to_consider * image_channels])
output_test = tf.placeholder(dtype=tf.float32, shape=[None, heigth_test, width_test, image_channels])
In [179]:
#slim.conv2d?
In [180]:
feature_maps = [32,64,128,256,512]
kernel_size = [3,3,3,3,3]
stride_size = [1,2,2,2,2]
assert len(kernel_size) == len(feature_maps)==len(stride_size), "lens must be equal"
def conv_layer(conv_input,reuse=None):
layers_for_skip = []
net = conv_input
with tf.variable_scope('conv_autoencoder',reuse=reuse):
for i, (each_feat_map, each_kernel_size, each_stride) in enumerate(zip(feature_maps, kernel_size, stride_size)):
net = slim.conv2d(net, each_feat_map, [each_kernel_size, each_kernel_size], stride=each_stride,
scope='conv_'+str(i), weights_initializer=trunc_normal(0.01),
weights_regularizer=regularizers.l2_regularizer(l2_val))
layers_for_skip.append(net)
return net, layers_for_skip
In [181]:
encoded_op, layers_to_skip = conv_layer(input_train, reuse=None)
In [182]:
feature_maps = feature_maps[:-1][::-1] + [image_channels,]
kernel_size = kernel_size[:-1][::-1] + [3,]
stride_size = stride_size[1:][::-1] + [1,]
print feature_maps
print kernel_size
print stride_size
print layers_to_skip[:-1][::-1]
assert len(kernel_size) == len(feature_maps)==len(stride_size), "lens must be equal"
def deconv_layer(deconv_input, layers_to_skip, reuse=None):
layers_to_skip = layers_to_skip[:-1][::-1]
net = deconv_input
with tf.variable_scope('deconv_autoencoder',reuse=reuse):
for i, (each_feat_map, each_kernel_size, each_stride) in enumerate(zip(feature_maps, kernel_size, stride_size)):
activation = tf.nn.relu
if i==(len(stride_size)-1):
# last layer !
activation = tf.nn.tanh
if i>0:
# not first layer !
net = tf.concat([net,layers_to_skip[i-1]],axis=3)
print "concated ",i-1," ", net
net = slim.conv2d_transpose(net, each_feat_map, [each_kernel_size, each_kernel_size], stride=each_stride,
activation_fn = activation,
scope='deconv_'+str(i), weights_initializer=trunc_normal(0.01),
weights_regularizer=regularizers.l2_regularizer(l2_val))
print net
return net
In [183]:
deconv_layer(encoded_op, layers_to_skip,reuse=None)
Out[183]:
In [184]:
print encoded_op
In [ ]:
In [205]:
tf.reset_default_graph()
In [206]:
#==================== COPIED CODE ===============================================
#
# TENSORBOARD VISUALIZATION FOR SHARPNESS AND (Peak Signal to Noise Ratio){PSNR}
#=================================================================================
def log10(t):
"""
Calculates the base-10 log of each element in t.
@param t: The tensor from which to calculate the base-10 log.
@return: A tensor with the base-10 log of each element in t.
"""
numerator = tf.log(t)
denominator = tf.log(tf.constant(10, dtype=numerator.dtype))
return numerator / denominator
def psnr_error(gen_frames, gt_frames):
"""
Computes the Peak Signal to Noise Ratio error between the generated images and the ground
truth images.
@param gen_frames: A tensor of shape [batch_size, height, width, 3]. The frames generated by the
generator model.
@param gt_frames: A tensor of shape [batch_size, height, width, 3]. The ground-truth frames for
each frame in gen_frames.
@return: A scalar tensor. The mean Peak Signal to Noise Ratio error over each frame in the
batch.
"""
shape = tf.shape(gen_frames)
num_pixels = tf.to_float(shape[1] * shape[2] * shape[3])
square_diff = tf.square(gt_frames - gen_frames)
batch_errors = 10 * log10(1 / ((1 / num_pixels) * tf.reduce_sum(square_diff, [1, 2, 3])))
return tf.reduce_mean(batch_errors)
def sharp_diff_error(gen_frames, gt_frames):
"""
Computes the Sharpness Difference error between the generated images and the ground truth
images.
@param gen_frames: A tensor of shape [batch_size, height, width, 3]. The frames generated by the
generator model.
@param gt_frames: A tensor of shape [batch_size, height, width, 3]. The ground-truth frames for
each frame in gen_frames.
@return: A scalar tensor. The Sharpness Difference error over each frame in the batch.
"""
shape = tf.shape(gen_frames)
num_pixels = tf.to_float(shape[1] * shape[2] * shape[3])
# gradient difference
# create filters [-1, 1] and [[1],[-1]] for diffing to the left and down respectively.
# TODO: Could this be simplified with one filter [[-1, 2], [0, -1]]?
pos = tf.constant(np.identity(3), dtype=tf.float32)
neg = -1 * pos
filter_x = tf.expand_dims(tf.stack([neg, pos]), 0) # [-1, 1]
filter_y = tf.stack([tf.expand_dims(pos, 0), tf.expand_dims(neg, 0)]) # [[1],[-1]]
strides = [1, 1, 1, 1] # stride of (1, 1)
padding = 'SAME'
gen_dx = tf.abs(tf.nn.conv2d(gen_frames, filter_x, strides, padding=padding))
gen_dy = tf.abs(tf.nn.conv2d(gen_frames, filter_y, strides, padding=padding))
gt_dx = tf.abs(tf.nn.conv2d(gt_frames, filter_x, strides, padding=padding))
gt_dy = tf.abs(tf.nn.conv2d(gt_frames, filter_y, strides, padding=padding))
gen_grad_sum = gen_dx + gen_dy
gt_grad_sum = gt_dx + gt_dy
grad_diff = tf.abs(gt_grad_sum - gen_grad_sum)
batch_errors = 10 * log10(1 / ((1 / num_pixels) * tf.reduce_sum(grad_diff, [1, 2, 3])))
return tf.reduce_mean(batch_errors)
## =================== COPIED CODE ENDS ======================
def l2_loss(generated_frames, expected_frames):
losses = []
for each_scale_gen_frames, each_scale_exp_frames in zip(generated_frames, expected_frames):
losses.append(tf.nn.l2_loss(tf.subtract(each_scale_gen_frames, each_scale_exp_frames)))
loss = tf.reduce_mean(tf.stack(losses))
return loss
def gdl_loss(generated_frames, expected_frames, alpha=2):
"""
difference with side pixel and below pixel
"""
scale_losses = []
for i in xrange(len(generated_frames)):
# create filters [-1, 1] and [[1],[-1]] for diffing to the left and down respectively.
pos = tf.constant(np.identity(3), dtype=tf.float32)
neg = -1 * pos
filter_x = tf.expand_dims(tf.stack([neg, pos]), 0) # [-1, 1]
filter_y = tf.stack([tf.expand_dims(pos, 0), tf.expand_dims(neg, 0)]) # [[1],[-1]]
strides = [1, 1, 1, 1] # stride of (1, 1)
padding = 'SAME'
gen_dx = tf.abs(tf.nn.conv2d(generated_frames[i], filter_x, strides, padding=padding))
gen_dy = tf.abs(tf.nn.conv2d(generated_frames[i], filter_y, strides, padding=padding))
gt_dx = tf.abs(tf.nn.conv2d(expected_frames[i], filter_x, strides, padding=padding))
gt_dy = tf.abs(tf.nn.conv2d(expected_frames[i], filter_y, strides, padding=padding))
grad_diff_x = tf.abs(gt_dx - gen_dx)
grad_diff_y = tf.abs(gt_dy - gen_dy)
scale_losses.append(tf.reduce_sum((grad_diff_x ** alpha + grad_diff_y ** alpha)))
# condense into one tensor and avg
return tf.reduce_mean(tf.stack(scale_losses))
def total_loss(generated_frames, expected_frames, lambda_gdl=1.0, lambda_l2=1.0):
total_loss_cal = (lambda_gdl * gdl_loss(generated_frames, expected_frames) +
lambda_l2 * l2_loss(generated_frames, expected_frames))
return total_loss_cal
In [207]:
class SkipAutoEncoder:
def __init__(self, heigth_train, width_train, heigth_test, width_test):
self.heigth_train = heigth_train
self.width_train = width_train
self.heigth_test = heigth_test
self.width_test = width_test
self.feature_maps = [32,64,128,256,512]
self.kernel_size = [3,3,3,3,3]
self.stride_size = [1,2,2,2,2]
assert len(self.kernel_size) == len(self.feature_maps)==len(self.stride_size), "lens must be equal"
# Placeholders for inputs and outputs ... !
self.input_train = tf.placeholder(dtype=tf.float32, shape=[None, self.heigth_train, self.width_train, time_frames_to_consider * image_channels])
self.output_train = tf.placeholder(dtype=tf.float32, shape=[None, self.heigth_train, self.width_train, image_channels])
self.input_test = tf.placeholder(dtype=tf.float32, shape=[None, self.heigth_test, self.width_test, time_frames_to_consider * image_channels])
self.output_test = tf.placeholder(dtype=tf.float32, shape=[None, self.heigth_test, self.width_test, image_channels])
self.model_output = self.create_graph(self.input_train, self.output_train,reuse=None)
# reuse graph at time of test !
self.model_output_test = self.create_graph(self.input_test, self.output_test,reuse=True)
self.loss()
self.tf_summary()
def conv_layer(self, conv_input,reuse):
layers_for_skip = []
net = conv_input
with tf.variable_scope('conv_autoencoder',reuse=reuse):
for i, (each_feat_map, each_kernel_size, each_stride) in enumerate(zip(self.feature_maps, self.kernel_size, self.stride_size)):
net = slim.conv2d(net, each_feat_map, [each_kernel_size, each_kernel_size], stride=each_stride,
scope='conv_'+str(i), weights_initializer=trunc_normal(0.01),
weights_regularizer=regularizers.l2_regularizer(l2_val))
layers_for_skip.append(net)
return net, layers_for_skip
def deconv_layer(self, deconv_input, layers_to_skip, reuse):
feature_maps = self.feature_maps[:-1][::-1] + [image_channels,]
kernel_size = self.kernel_size[:-1][::-1] + [3,]
stride_size = self.stride_size[1:][::-1] + [1,]
assert len(kernel_size) == len(feature_maps)==len(stride_size), "lens must be equal"
layers_to_skip_d = layers_to_skip[:-1][::-1]
net = deconv_input
with tf.variable_scope('deconv_autoencoder',reuse=reuse):
for i, (each_feat_map, each_kernel_size, each_stride) in enumerate(zip(feature_maps, kernel_size, stride_size)):
activation = tf.nn.relu
if i==(len(stride_size)-1):
# last layer !
activation = tf.nn.tanh
if i>0:
# not first layer !
net = tf.concat([net,layers_to_skip_d[i-1]],axis=3)
net = slim.conv2d_transpose(net, each_feat_map, [each_kernel_size, each_kernel_size], stride=each_stride,
activation_fn = activation,
scope='deconv_'+str(i), weights_initializer=trunc_normal(0.01),
weights_regularizer=regularizers.l2_regularizer(l2_val))
#print net
return net
def create_graph(self, input_data, ground_truths, reuse):
encoded_op, layers_to_skip = self.conv_layer(input_data, reuse=reuse)
#print encoded_op, layers_to_skip
return self.deconv_layer(encoded_op, layers_to_skip,reuse=reuse)
def loss(self):
# gdl and l2 loss !
self.combined_loss = total_loss([self.model_output], [self.output_train])
self.optimizer = tf.train.AdamOptimizer(adam_learning_rate)
global_step = tf.Variable(0,name="global_step_var",trainable=False)
self.step = self.optimizer.minimize(self.combined_loss, global_step=global_step)
def tf_summary(self):
train_loss = tf.summary.scalar("gen_train_loss", self.combined_loss)
val_loss = tf.summary.scalar("gen_val_loss", self.combined_loss)
with tf.variable_scope('image_measures'):
psnr_error_train = psnr_error(self.model_output, self.output_train)
psnr_error_train_s = tf.summary.scalar("train_psnr",psnr_error_train)
psnr_error_val_s = tf.summary.scalar("val_psnr",psnr_error_train)
sharpdiff_error_train = sharp_diff_error(self.model_output,self.output_train)
sharpdiff_error_train_s = tf.summary.scalar("train_shardiff",sharpdiff_error_train)
sharpdiff_error_val_s = tf.summary.scalar("val_shardiff",sharpdiff_error_train)
images_to_show_train = []
images_to_show_val = []
images_to_show_train.append(tf.summary.image('train_output', self.model_output,
number_of_images_to_show))
images_to_show_train.append(tf.summary.image('train_ground_truth', self.output_train,
number_of_images_to_show))
images_to_show_val.append(tf.summary.image('val_output', self.model_output,
number_of_images_to_show))
images_to_show_val.append(tf.summary.image('val_ground_truth', self.output_train,
number_of_images_to_show))
psnr_error_test = psnr_error(self.model_output_test, self.output_test)
psnr_error_test_s = tf.summary.scalar("test_psnr",psnr_error_test)
sharpdiff_error_test = sharp_diff_error(self.model_output_test,self.output_test)
sharpdiff_error_test_s = tf.summary.scalar("test_shardiff",sharpdiff_error_test)
images_to_show_test = []
images_to_show_test.append(tf.summary.image('test_output', self.model_output_test,
number_of_images_to_show))
images_to_show_test.append(tf.summary.image('test_ground', self.output_test,
number_of_images_to_show))
self.train_summary_merged = tf.summary.merge([train_loss, psnr_error_train_s, sharpdiff_error_train_s]+images_to_show_train)
self.test_summary_merged = tf.summary.merge([psnr_error_test_s, sharpdiff_error_test_s]+images_to_show_test)
self.val_summary_merged = tf.summary.merge([val_loss, psnr_error_val_s, sharpdiff_error_val_s]+images_to_show_val)
In [208]:
model = SkipAutoEncoder(heigth_train, width_train, heigth_test, width_test)
In [ ]: