In [98]:
import tensorflow as tf
from tensorflow.python.ops import init_ops
from tensorflow.contrib.layers.python.layers import regularizers
import numpy as np
slim = tf.contrib.slim
tf.reset_default_graph()
trunc_normal = lambda stddev: init_ops.truncated_normal_initializer(0.0, stddev)
In [99]:
# Contants
image_channels = 3
time_frames_to_consider = 4
heigth_train= 32
width_train= 32
heigth_test= 210
width_test= 160
# +1 for input from previous layer !
scale_level_feature_maps= [[128, 256, 128, 3],
[128, 256, 128, 3],
[128, 256, 512, 256, 128, 3],
[128, 256, 512, 256, 128, 3]]
# as size of image increase in scaling ... conv layer increases !
scale_level_kernel_size = [
[3, 3, 3, 3],
[5, 3, 3, 5],
[5, 3, 3, 3, 3, 5],
[7, 5, 5, 5, 5, 7]
]
# regularizer !
l2_val = 0.00005
# Adam optimizer !
adam_learning_rate = 0.0004
## =================== COPIED CODE ==========================
#
# TENSORBOARD VISUALIZATION FOR SHARPNESS AND (Peak Signal to Noise Ratio){PSNR}
#
def psnr_error(gen_frames, gt_frames):
"""
Computes the Peak Signal to Noise Ratio error between the generated images and the ground
truth images.
@param gen_frames: A tensor of shape [batch_size, height, width, 3]. The frames generated by the
generator model.
@param gt_frames: A tensor of shape [batch_size, height, width, 3]. The ground-truth frames for
each frame in gen_frames.
@return: A scalar tensor. The mean Peak Signal to Noise Ratio error over each frame in the
batch.
"""
shape = tf.shape(gen_frames)
num_pixels = tf.to_float(shape[1] * shape[2] * shape[3])
square_diff = tf.square(gt_frames - gen_frames)
batch_errors = 10 * log10(1 / ((1 / num_pixels) * tf.reduce_sum(square_diff, [1, 2, 3])))
return tf.reduce_mean(batch_errors)
def sharp_diff_error(gen_frames, gt_frames):
"""
Computes the Sharpness Difference error between the generated images and the ground truth
images.
@param gen_frames: A tensor of shape [batch_size, height, width, 3]. The frames generated by the
generator model.
@param gt_frames: A tensor of shape [batch_size, height, width, 3]. The ground-truth frames for
each frame in gen_frames.
@return: A scalar tensor. The Sharpness Difference error over each frame in the batch.
"""
shape = tf.shape(gen_frames)
num_pixels = tf.to_float(shape[1] * shape[2] * shape[3])
# gradient difference
# create filters [-1, 1] and [[1],[-1]] for diffing to the left and down respectively.
# TODO: Could this be simplified with one filter [[-1, 2], [0, -1]]?
pos = tf.constant(np.identity(3), dtype=tf.float32)
neg = -1 * pos
filter_x = tf.expand_dims(tf.pack([neg, pos]), 0) # [-1, 1]
filter_y = tf.pack([tf.expand_dims(pos, 0), tf.expand_dims(neg, 0)]) # [[1],[-1]]
strides = [1, 1, 1, 1] # stride of (1, 1)
padding = 'SAME'
gen_dx = tf.abs(tf.nn.conv2d(gen_frames, filter_x, strides, padding=padding))
gen_dy = tf.abs(tf.nn.conv2d(gen_frames, filter_y, strides, padding=padding))
gt_dx = tf.abs(tf.nn.conv2d(gt_frames, filter_x, strides, padding=padding))
gt_dy = tf.abs(tf.nn.conv2d(gt_frames, filter_y, strides, padding=padding))
gen_grad_sum = gen_dx + gen_dy
gt_grad_sum = gt_dx + gt_dy
grad_diff = tf.abs(gt_grad_sum - gen_grad_sum)
batch_errors = 10 * log10(1 / ((1 / num_pixels) * tf.reduce_sum(grad_diff, [1, 2, 3])))
return tf.reduce_mean(batch_errors)
## =================== COPIED CODE ENDS ======================
def l2_loss(generated_frames, expected_frames):
losses = []
for each_scale_gen_frames, each_scale_exp_frames in zip(generated_frames, expected_frames):
losses.append(tf.nn.l2_loss(tf.subtract(each_scale_gen_frames, each_scale_exp_frames)))
loss = tf.reduce_mean(tf.stack(losses))
return loss
def gdl_loss(generated_frames, expected_frames, alpha=2):
"""
difference with side pixel and below pixel
"""
scale_losses = []
for i in xrange(len(generated_frames)):
# create filters [-1, 1] and [[1],[-1]] for diffing to the left and down respectively.
pos = tf.constant(np.identity(3), dtype=tf.float32)
neg = -1 * pos
filter_x = tf.expand_dims(tf.stack([neg, pos]), 0) # [-1, 1]
filter_y = tf.stack([tf.expand_dims(pos, 0), tf.expand_dims(neg, 0)]) # [[1],[-1]]
strides = [1, 1, 1, 1] # stride of (1, 1)
padding = 'SAME'
gen_dx = tf.abs(tf.nn.conv2d(generated_frames[i], filter_x, strides, padding=padding))
gen_dy = tf.abs(tf.nn.conv2d(generated_frames[i], filter_y, strides, padding=padding))
gt_dx = tf.abs(tf.nn.conv2d(expected_frames[i], filter_x, strides, padding=padding))
gt_dy = tf.abs(tf.nn.conv2d(expected_frames[i], filter_y, strides, padding=padding))
grad_diff_x = tf.abs(gt_dx - gen_dx)
grad_diff_y = tf.abs(gt_dy - gen_dy)
scale_losses.append(tf.reduce_sum((grad_diff_x ** alpha + grad_diff_y ** alpha)))
# condense into one tensor and avg
return tf.reduce_mean(tf.stack(scale_losses))
def total_loss(generated_frames, expected_frames, loss_from_disc, lambda_gdl=1.0, lambda_l2=1.0, lambda_disc=1.0):
total_loss_cal = (lambda_gdl * gdl_loss(generated_frames, expected_frames) +
lambda_l2 * l2_loss(generated_frames, expected_frames)+
lambda_disc * loss_from_disc)
return total_loss_cal
In [100]:
class GenerativeNetwork:
def __init__(self, heigth_train, width_train, heigth_test, width_test, scale_level_feature_maps, scale_level_kernel_size):
self.heigth_train = heigth_train
self.width_train = width_train
self.heigth_test = heigth_test
self.width_test = width_test
self.scale_level_feature_maps = scale_level_feature_maps
self.scale_level_kernel_size = scale_level_kernel_size
self.len_scale = len(self.scale_level_kernel_size)
assert len(self.scale_level_feature_maps) == len(self.scale_level_kernel_size), "Length should be equal !"
# Placeholders for inputs and outputs ... !
self.input_train = tf.placeholder(dtype=tf.float32, shape=[None, self.heigth_train, self.width_train, time_frames_to_consider * image_channels])
self.output_train = tf.placeholder(dtype=tf.float32, shape=[None, self.heigth_train, self.width_train, image_channels])
self.input_test = tf.placeholder(dtype=tf.float32, shape=[None, self.heigth_test, self.width_test, time_frames_to_consider * image_channels])
self.output_test = tf.placeholder(dtype=tf.float32, shape=[None, self.heigth_test, self.width_test, image_channels])
self.loss_from_disc = tf.placeholder(dtype=tf.float32, shape=[])
self.each_scale_predication_train = []
self.each_scale_ground_truth_train = []
self.each_scale_predication_test = []
self.each_scale_ground_truth_test = []
self.create_graph(self.input_train, self.output_train, heigth_train, width_train,
self.each_scale_predication_train,
self.each_scale_ground_truth_train,
reuse=None)
# reuse graph at time of test !
self.create_graph(self.input_train, self.output_train, heigth_test, width_test,
self.each_scale_predication_test,
self.each_scale_ground_truth_test,
reuse=True)
self.loss()
# print self.each_scale_predication_train
# print self.each_scale_ground_truth_train
# print self.each_scale_predication_test
# print self.each_scale_ground_truth_test
def rescale_image(self, scaling_factor, heigth, width, input_data, ground_truths, last_generated_frame):
"""
scaling_factor, heigth, width = values
input_data, ground_truths = Tensors
"""
rescaled_heigth = int(scaling_factor * heigth)
rescaled_width = int(scaling_factor * width)
assert rescaled_heigth != 0 and rescaled_width != 0, "scaling factor should not be zero !"
input_reshaped = tf.image.resize_images(input_data, [rescaled_heigth, rescaled_width])
ground_truths_reshaped = tf.image.resize_images(ground_truths, [rescaled_heigth, rescaled_width])
last_generated_frame_reshaped = None
if last_generated_frame!=None:
last_generated_frame_reshaped = tf.image.resize_images(last_generated_frame, [rescaled_heigth, rescaled_width])
return (input_reshaped, ground_truths_reshaped, last_generated_frame_reshaped)
def create_graph(self, input_data, ground_truths, heigth, width,
predicated_at_each_scale_tensor, ground_truth_at_each_scale_tensor, reuse):
# for each scale ...
for each_scale in range(self.len_scale):
conv_counter = 0
with tf.variable_scope('scale_'+str(each_scale),reuse=reuse):
# scaling create [1/64, 1/32, 1/16, 1/4]
scaling_factor = 1.0 / (2**(self.len_scale - 1 - each_scale))
last_generated_frame = None
if each_scale > 0:
last_generated_frame = predicated_at_each_scale_tensor[each_scale-1]
input_reshaped, ground_truths_reshaped, last_generated_frame_reshaped = self.rescale_image(scaling_factor, heigth, width, input_data, ground_truths, last_generated_frame)
# append last scale output
if each_scale > 0:
input_reshaped = tf.concat([input_reshaped, last_generated_frame_reshaped],axis=3)
# print (input_reshaped, ground_truths_reshaped)
predication = input_reshaped
# for each conv layers in that scale ...
feature_maps = scale_level_feature_maps[each_scale]
kernel_size = scale_level_kernel_size[each_scale]
assert len(feature_maps)==len(kernel_size), "Length should be equal !"
for index, (each_filter, each_kernel) in enumerate(zip(feature_maps, kernel_size)):
with tf.variable_scope('conv_'+str(conv_counter),reuse=reuse):
conv_counter += 1
activiation = tf.nn.relu
# last layer tanh !
if index==(len(kernel_size)-1):
activiation = tf.nn.tanh
predication = slim.conv2d(predication, each_filter, [each_kernel, each_kernel],
weights_initializer=trunc_normal(0.01),
weights_regularizer=regularizers.l2_regularizer(l2_val),
activation_fn=activiation)
# APPEND LAST GENERATED FRAME
predicated_at_each_scale_tensor.append(predication)
ground_truth_at_each_scale_tensor.append(ground_truths_reshaped)
def loss(self):
# discriminator, gdl and l2 loss !
self.combined_loss = total_loss(self.each_scale_predication_train, self.each_scale_ground_truth_train, self.loss_from_disc)
self.optimizer = tf.train.AdamOptimizer(adam_learning_rate)
global_step = tf.Variable(0,name="global_step_var",trainable=False)
self.step = self.optimizer.minimize(self.combined_loss, global_step=global_step)
In [101]:
g = GenerativeNetwork(heigth_train, width_train, heigth_test, width_test, scale_level_feature_maps, scale_level_kernel_size)
In [102]:
## Discriminator Model
In [103]:
disc_scale_level_feature_maps = [[64],
[64, 128, 128],
[128, 256, 256],
[128, 256, 512, 128]]
# kernel sizes for each convolution of each scale network in the discriminator model
disc_scale_level_kernel_size = [[3],
[3, 3, 3],
[5, 5, 5],
[7, 7, 5, 5]]
# layer sizes for each fully-connected layer of each scale network in the discriminator model
# layer connecting conv to fully-connected is dynamically generated when creating the model
disc_fc_layer_units = [[512, 256, 1],
[1024, 512, 1],
[1024, 512, 1],
[1024, 512, 1]]
In [104]:
class ScaleBasedDiscriminator:
def __init__(self, heigth, width, kernel_size, feature_maps, fc_layer_units, scale_number):
assert len(feature_maps)==len(kernel_size), "Length should be equal !"
self.heigth = heigth
self.width = width
self.kernel_size = kernel_size
self.feature_maps = feature_maps
self.fc_layer_units = fc_layer_units
self.scale_number = scale_number
self.input = tf.placeholder(dtype=tf.float32, shape=[None, self.heigth, self.width, image_channels])
self.create_graph()
def create_graph(self):
predication = self.input
with tf.variable_scope('discriminator_scale_'+str(self.scale_number)):
conv_counter = 0
for index, (each_filter, each_kernel) in enumerate(zip(self.feature_maps, self.kernel_size)):
with tf.variable_scope('conv_'+str(conv_counter)):
conv_counter += 1
stride = 1
# last layer stride 2 ... fc layer weights reduce ...
if index == (len(self.feature_maps)-1):
stride = 2
predication = slim.conv2d(predication, each_filter, [each_kernel, each_kernel],
padding = 'VALID',
stride = stride,
weights_initializer=trunc_normal(0.01),
weights_regularizer=regularizers.l2_regularizer(l2_val))
# print predication
predication = slim.flatten(predication)
# print predication
fully_connected_counter = 0
for index, each_layer_units in enumerate(self.fc_layer_units):
with tf.variable_scope('fully_connected'+str(fully_connected_counter)):
fully_connected_counter += 1
activation = tf.nn.relu
# last layer sigmoid !
if index == (len(self.fc_layer_units)-1):
activation = tf.nn.sigmoid
predication = slim.fully_connected(predication, each_layer_units, activation_fn=activation)
# print predication
# clip value between 0.1 and 0.9
self.predication = tf.clip_by_value(predication, 0.1, 0.9)
In [105]:
#sc = ScaleBasedDiscriminator(32,32,disc_scale_level_kernel_size[0],disc_scale_level_feature_maps[0],disc_fc_layer_units[0],0)
In [106]:
class Discriminator:
def __init__(self, heigth, width, disc_scale_level_feature_maps, disc_scale_level_kernel_size, disc_fc_layer_units):
assert len(disc_scale_level_feature_maps)==len(disc_scale_level_kernel_size), "Length should be equal !"
assert len(disc_scale_level_feature_maps)==len(disc_fc_layer_units), "Length should be equal !"
self.heigth = heigth
self.width = width
self.disc_scale_level_feature_maps = disc_scale_level_feature_maps
self.disc_scale_level_kernel_size = disc_scale_level_kernel_size
self.disc_fc_layer_units = disc_fc_layer_units
# ground truth image
self.ground_truth_images = tf.placeholder(dtype=tf.float32, shape=[None, self.heigth, self.width, image_channels])
# real or fake
self.ground_truth_labels = tf.placeholder(dtype=tf.float32, shape=[None,1])
self.len_scale = len(self.disc_scale_level_kernel_size)
self.create_graph()
self.loss()
self.scale_images_ground_truth_for_inputs()
def create_graph(self,):
self.scale_based_discriminators = []
for each_scale, (each_feature_map, each_kernel_size, each_fc_layer) in enumerate(zip(self.disc_scale_level_feature_maps, self.disc_scale_level_kernel_size, self.disc_fc_layer_units)):
# scaling create [1/64, 1/32, 1/16, 1/4]
scaling_factor = 1.0 / (2**(self.len_scale - 1 - each_scale))
rescaled_heigth = int(scaling_factor * self.heigth)
rescaled_width = int(scaling_factor * self.width)
disc_at_scale = ScaleBasedDiscriminator(heigth=rescaled_heigth,
width=rescaled_width, kernel_size=each_kernel_size,
feature_maps=each_feature_map,
fc_layer_units=each_fc_layer, scale_number=each_scale)
self.scale_based_discriminators.append(disc_at_scale)
self.scaled_disc_predication = []
for each_scaled_pred in self.scale_based_discriminators:
self.scaled_disc_predication.append(each_scaled_pred.predication)
# print self.scaled_disc_predication
def loss(self):
total_loss = []
for each_scaled_op in self.scaled_disc_predication:
# print each_scaled_op, self.ground_truth_labels
curr_loss = tf.losses.sigmoid_cross_entropy(multi_class_labels=self.ground_truth_labels, logits=each_scaled_op)
total_loss.append(curr_loss)
self.dis_loss = tf.reduce_mean(tf.stack(total_loss))
self.optimizer = tf.train.AdamOptimizer(adam_learning_rate)
global_step = tf.Variable(0,name="dis_global_step_var",trainable=False)
self.step = self.optimizer.minimize(self.dis_loss, global_step=global_step)
def rescale_image(self, scaling_factor, heigth, width, ground_truths):
"""
scaling_factor, heigth, width = values
input_data, ground_truths = Tensors
"""
rescaled_heigth = int(scaling_factor * heigth)
rescaled_width = int(scaling_factor * width)
assert rescaled_heigth != 0 and rescaled_width != 0, "scaling factor should not be zero !"
ground_truths_reshaped = tf.image.resize_images(ground_truths, [rescaled_heigth, rescaled_width])
return ground_truths_reshaped
def scale_images_ground_truth_for_inputs(self,):
inputs = []
for each_scale in range(self.len_scale):
scaling_factor = 1.0 / (2**(self.len_scale - 1 - each_scale))
inputs.append(self.rescale_image(scaling_factor, self.heigth, self.width, self.ground_truth_images))
self.rescaled_ground_truth_images = inputs
# print inputs
In [107]:
d = Discriminator(64, 64, disc_scale_level_feature_maps, disc_scale_level_kernel_size, disc_fc_layer_units)
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [36]:
from sklearn.metrics import log_loss
import scipy
x = tf.constant([1.0,0,0,1.,1.,0],dtype=tf.float32)
y = tf.stack([tf.constant([0.5]),tf.constant([.9]),tf.constant([.6]),tf.constant([.1]),tf.constant([.8]),tf.constant([.389])])
y = tf.reshape(y,[-1,1])
sess = tf.Session()
print sess.run(tf.losses.sigmoid_cross_entropy(multi_class_labels=x, logits=y))
print sess.run(tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(labels=x,logits=y)))
In [4]:
In [37]:
In [11]:
In [ ]: