In [1]:
import os
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
from tensorflow.contrib.layers import flatten
In [7]:
"""."""
import tensorflow as tf
class BaseOps(object):
"""."""
def __init__(self):
"""."""
pass
def __weight_variable(self, shape, initializer=tf.truncated_normal_initializer(stddev=0.1)):
"""."""
return tf.get_variable("weights", shape=shape, dtype=tf.float32, initializer=initializer)
def __bias_variable(self, shape, initializer=tf.constant_initializer(value=0.0)):
"""."""
return tf.get_variable("biases", shape=shape, dtype=tf.float32, initializer=initializer)
def __leaky_relu(self, inputs, alpha=0.2):
"""."""
return tf.maximum(alpha * inputs, inputs, "leaky_relu")
def __batch_norm(self, x):
return tf.contrib.layers.batch_norm(x, decay=0.9, scale=True, updates_collections=None)
def fc(self, inputs, output_dim, activation, keep_prob=None):
"""."""
with tf.variable_scope("fc"):
input_dim = inputs.shape[-1]
w_fc = self.__weight_variable(shape=[input_dim, output_dim])
b_fc = self.__bias_variable(shape=[output_dim])
h_fc_logit = tf.matmul(inputs, w_fc) + b_fc
if activation == "linear":
h_fc = h_fc_logit
elif activation == "leaky_relu":
h_fc = self.__leaky_relu(h_fc_logit, alpha=0.2)
else:
h_fc = activation(h_fc_logit)
if keep_prob is not None:
h_fc = tf.nn.dropout(h_fc, keep_prob=keep_prob)
return h_fc
def __max_unpool(pool, ind, ksize=[1, 2, 2, 1], scope='unpool'):
""".
Unpooling layer after max_pool_with_argmax.
Args:
pool: max pooled output tensor
ind: argmax indices
ksize: ksize is the same as for the pool
Return:
unpool: unpooling tensor
"""
with tf.variable_scope(scope):
input_shape = tf.shape(pool)
output_shape = [input_shape[0], input_shape[1] * ksize[1], input_shape[2] * ksize[2], input_shape[3]]
flat_input_size = tf.cumprod(input_shape)[-1]
flat_output_shape = tf.stack([output_shape[0], output_shape[1] * output_shape[2] * output_shape[3]])
pool_ = tf.reshape(pool, tf.stack([flat_input_size]))
batch_range = tf.reshape(tf.range(tf.cast(output_shape[0], tf.int64), dtype=ind.dtype),
shape=tf.stack([input_shape[0], 1, 1, 1]))
b = tf.ones_like(ind) * batch_range
b = tf.reshape(b, tf.stack([flat_input_size, 1]))
ind_ = tf.reshape(ind, tf.stack([flat_input_size, 1]))
ind_ = tf.concat([b, ind_], 1)
ret = tf.scatter_nd(ind_, pool_, shape=tf.cast(flat_output_shape, tf.int64))
ret = tf.reshape(ret, tf.stack(output_shape))
return ret
def conv(self, inputs, filter_shape, activation, stride=[1, 1, 1, 1], pool=False, pool_stride=[1, 2, 2, 1]):
""".
Filter Arguments Example:
>>> filter_shape = [5, 5, 1, 32] # 5x5 filter and 1 channel, 32 filters or feature maps
"""
with tf.variable_scope("conv"):
W_conv = self.__weight_variable(filter_shape)
b_conv = self.__bias_variable([filter_shape[-1]])
output = tf.nn.conv2d(input=inputs, filter=W_conv, strides=stride, padding="SAME")
output = activation(output + b_conv)
if pool:
output = tf.nn.max_pool(output, ksize=pool_stride, strides=pool_stride, padding="SAME")
return output
def deconv(self, inputs, filter_shape, output_shape, activation, stride=[1, 1, 1, 1], unpool=False, unpool_stride=[1, 2, 2, 1]):
""".
Tip: Output shape of conv should be the input shape of deconv.
Filter Arguments Example:
>>> conv_input_shape = [-1, 28, 28, 1]
>>> conv_filter_shape = [5, 5, 1, 32]
>>> conv_output_shape = [-1, 28, 28, 32]
>>> deconv_input_shape = [-1, 28, 28, 32]
>>> deconv_filter_shape = [5, 5, 1, 1]
>>> deconv_output_shape = [-1, 28, 28, 1]
"""
with tf.variable_scope("deconv"):
if unpool:
inputs = self.__max_unpool(inputs, ksize=unpool_stride, strides=unpool_stride, padding="SAME")
W_deconv = self.__weight_variable(filter_shape)
b_deconv = self.__bias_variable([filter_shape[-1]])
output = tf.nn.conv2d_transpose(inputs, filter=W_deconv, output_shape=output_shape, strides=stride, padding="SAME")
output = activation(output + b_deconv)
return output
In [22]:
class CNN(BaseOps):
"""."""
def __init__(self):
"""."""
mnist_data_dir = "/home/nitred/.no_imagination/mnist/dataset/"
self.mnist_data = input_data.read_data_sets(mnist_data_dir, one_hot=True)
self.__build_model()
# self.__build_accuracy_computation()
# self.__start_session()
def __build_model(self):
"""."""
self.g = tf.Graph()
with self.g.as_default():
with tf.variable_scope("inputs"):
self.x = tf.placeholder(tf.float32, shape=[None, 784])
self.y = tf.placeholder(tf.float32, shape=[None, 10])
self.keep_prob = tf.placeholder(tf.float32)
x_image = tf.reshape(self.x, [-1, 28, 28, 1])
with tf.variable_scope("conv1"):
conv1 = self.conv(inputs=x_image, filter_shape=[5, 5, 1, 32], activation=tf.nn.relu, stride=[1, 1, 1, 1],
pool=True, pool_stride=[1, 2, 2, 1])
print(conv1.shape)
print(tf.reshape(conv1, [-1, 14*14*32]).shape)
print(flatten(conv1).shape)
# -1, 14, 14, 32
with tf.variable_scope("deconv1"):
deconv1 = self.deconv(inputs=conv1, filter_shape=[5, 5, 32, 1], output_shape=[10, 14, 14, 1], activation=tf.nn.relu, stride=[1, 1, 1, 1],
unpool=False, unpool_stride=[1, 2, 2, 1])
print(deconv1.shape)
# tf.nn.conv2d_transpose(conv1, filter=[])
# with tf.variable_scope("conv2"):
# conv2 = self.conv(inputs=conv1, filter=[5, 5, 32], n_filters=64, activation=tf.nn.relu, stride=[1, 1, 1, 1],
# pool=True, pool_stride=[1, 2, 2, 1])
# with tf.variable_scope("fc1"):
# h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64])
# fc1 = self.fc(inputs=conv2, input_dim=, output_dim=, activation=, keep_prob=None)
# h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1)
# h_fc1_drop = tf.nn.dropout(h_fc1, self.keep_prob)
# with tf.variable_scope("fc2"):
# W_fc2 = self.__weight_variable([1024, 10])
# b_fc2 = self.__bias_variable([10])
# self.y_out = tf.matmul(h_fc1_drop, W_fc2) + b_fc2
# cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.y_out, labels=self.y))
# optimizer = tf.train.AdamOptimizer(learning_rate=1e-4)
# self.train_step = optimizer.minimize(cross_entropy)
# def __build_accuracy_computation(self):
# """."""
# with self.g.as_default():
# # boolean prediction
# correct_prediction = tf.equal(tf.argmax(self.y_out, 1), tf.argmax(self.y, 1))
# self.accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
# def __start_session(self):
# """."""
# with self.g.as_default():
# self.sess = tf.Session(graph=self.g)
# self.sess.run(tf.global_variables_initializer())
# def run(self, epochs=20000, batch_size=50, keep_prob=0.5, summary_epochs=500):
# """."""
# for i in range(epochs):
# batch_x, batch_y = self.mnist_data.train.next_batch(batch_size)
# self.sess.run(self.train_step, feed_dict={self.x: batch_x, self.y: batch_y, self.keep_prob: keep_prob})
# if i % summary_epochs == 0:
# print(self.sess.run(self.accuracy, feed_dict={self.x: self.mnist_data.test.images,
# self.y: self.mnist_data.test.labels,
# self.keep_prob: 1.0}))
In [23]:
mnist_cnn = CNN()
# mnist_cnn.__build
# mnist_cnn.run(epochs=20000, batch_size=50, summary_epochs=500)
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: