In [1]:
import time
import math
import os
import io
import matplotlib.pyplot as plt
import numpy as np
import scipy.misc
from sklearn.metrics import confusion_matrix
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data, mnist
from tensorflow.contrib.tensorboard.plugins import projector
In [2]:
tf.__version__
Out[2]:
In [3]:
np.__version__
Out[3]:
In [5]:
# 输入的图片是28x28
img_size = 28
# 展开图片大小
img_size_flat = img_size * img_size
# shape形式(28, 28)
img_shape = (img_size, img_size)
# 第一层输入的channel数量,因为是灰度图所以输入图片的channel为1
num_channels = 1
# 最终输出的类别[0-9]
num_classes = 10
# 模型的学习率
learning_rate = 1e-3
# Keep probability for training dropout,测试的时候不使用
keep_probability_for_dropout = 0.5
# 本次计算的开始步数
latest_iteration = 0
# 打点步数
unit_step = 100
# 执行迭代数
max_iterations = 5000
In [6]:
# 存储日志的地方
log_dir = "/tmp/tensorboard/mnist/"
# 存储数据的地方
data_dir = "data/mnist/"
# 训练样本的日志
train_log_dir = os.path.join(log_dir, 'train/')
# 验证样本的日志
validation_log_dir = os.path.join(log_dir, 'validation/')
# 测试数据embedding目录
embed_path = os.path.join(log_dir, 'embed/')
In [7]:
data = input_data.read_data_sets(data_dir, one_hot=True)
MNIST的数据,一共有70,000个样本。一共有三个部分的数据,训练数据、测试数据、验证数据。
In [8]:
print("Size of:\n- Training-set:\t\t{0}\n- Test-set:\t\t{1}\n- Validation-set:\t{2}".format(
len(data.train.labels),
len(data.test.labels),
len(data.validation.labels)
))
加载数据的时候设定one_hot=True。所以y值是一个向量,对应数字的索引设定为1。也就是说,1可以表示为:[0,1,0,0,0,0,0,0,0,0]。所以算出每个元素的类别,可以使用argmax方法,得到其整数型数值。
In [9]:
data.test.cls = np.argmax(data.test.labels, axis=1)
In [10]:
def plot_images(images, cls_true, cls_pred=None):
# 只看9张图片,3行3列布局
fig, axes = plt.subplots(3, 3)
hspace = 0.3 if cls_pred is None else 0.6
fig.subplots_adjust(hspace=hspace, wspace=0.3)
for i, ax in enumerate(axes.flat):
# 绘制灰度图,样本数据是784列展开形式,所以需要reshape为[28,28]
ax.imshow(images[i].reshape(img_shape), cmap='binary')
xlabel = "True: {0}".format(cls_true[i]) if cls_pred is None else "True: {0}, Pred: {1}".format(cls_true[i], cls_pred[i])
ax.set_xlabel(xlabel)
ax.set_xticks([])
ax.set_yticks([])
plt.show()
In [11]:
plot_images(data.test.images[10:19], data.test.cls[10:19])
接下来通过TF自带的函数构建计算图。TF通过计算图来表示每个节点以及之间的关系。然后在前向传递过程计算loss和存储每一个节点的输出结果,然后在后向过程计算梯度--gradient,并使用梯度来更新权重。这里不展开讨论TF的内部实现机制,有兴趣可以查看这篇论文tensorflow tutorial。
In [12]:
def weights_variable(shape):
"""根据传入的结构,构造权重系数。这里都是使用truncated_normal方法,生成随机数。
"""
return tf.Variable(tf.truncated_normal(shape=shape, stddev=0.1))
In [13]:
def bias_variable(length):
"""根据传入的结构,构造偏差系数。这里使用constant,传入常量。
"""
return tf.Variable(tf.constant(0.1, shape=[length]))
In [14]:
def variable_summaries(var):
"""Tensorflow变量描述,设定name_scope,并且计算数据的常用的统计属性:均值,方差,最大值,最小值。
"""
with tf.name_scope('summaries'):
mean = tf.reduce_mean(var)
stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean)))
tf.summary.scalar('mean', mean)
tf.summary.scalar('stddev', stddev)
tf.summary.scalar('max', tf.reduce_max(var))
tf.summary.scalar('min', tf.reduce_min(var))
In [15]:
def max_pool_2x2(layer):
return tf.nn.max_pool(value=layer, strides=[1,2,2,1], ksize=[1,2,2,1], padding='SAME')
In [16]:
def new_conv_layer(input_tensor, num_input_channels, filter_size, num_output_channels, layer_name, act=tf.nn.relu):
"""
创建卷积层: 输入上一层的Tensor,根据输出层的大小创建权重系数,构建卷积网络层
:param input_tensor: 上一层的tensor
:param num_input_channels: 输入channels数量
:param filter_size: patch的大小
:param num_output_channels: 输出的channels数量
:param layer_name: 名称
:param act: 激活函数,默认tf.nn.relu
:return:
"""
with tf.name_scope(layer_name):
with tf.name_scope('weights'):
weights = weights_variable(shape=[filter_size, filter_size, num_input_channels, num_output_channels])
variable_summaries(weights)
with tf.name_scope('biases'):
bias = bias_variable(num_output_channels)
variable_summaries(bias)
with tf.name_scope('activation'):
layer = tf.nn.conv2d(input=input_tensor, filter=weights, strides=[1, 1, 1, 1], padding='SAME')
layer += bias
layer = act(layer)
layer = max_pool_2x2(layer)
tf.summary.histogram('layer', layer)
return layer
In [17]:
def flatten_layer(layer):
"""
:param layer:
:return:
"""
with tf.name_scope('flatten'):
layer_shape = layer.get_shape()
num_features = layer_shape[1:4].num_elements()
layer_flat = tf.reshape(layer, shape=[-1, num_features])
return layer_flat, num_features
In [18]:
def new_fc_layer(input_tensor, num_input, num_output, layer_name, act=tf.nn.relu, dropout=None):
"""
构造全连接神经网络层。输入层和输出层两两连接。shape = [num_inputs, num_outputs]
:param input_tensor: 上一层的tensor
:param num_input: 输入维度
:param num_output: 输出维度
:param layer_name: 名称
:param act: 激活函数
:param dropout 是否使用dropout来处理
:return:
"""
with tf.name_scope(layer_name):
with tf.name_scope('weights'):
weights = weights_variable(shape=[num_input, num_output])
variable_summaries(weights)
with tf.name_scope('biases'):
biases = bias_variable(num_output)
variable_summaries(biases)
with tf.name_scope('activation'):
layer = tf.matmul(input_tensor, weights) + biases
layer = act(layer)
if dropout is not None:
layer = tf.nn.dropout(layer, dropout)
tf.summary.histogram('layer', layer)
return layer
In [19]:
with tf.name_scope('input'):
# 训练数据 x:[batch_size, 784]
x = tf.placeholder(tf.float32, shape=[None, img_size_flat], name='x-input')
# 训练数据 y: [batch_size, 10]
y_ = tf.placeholder(tf.float32, shape=[None, num_classes], name='y-input')
将以上x转化为TF的conv2d所需要的输入: [batch, in_height, in_width, channels]
In [20]:
with tf.name_scope('input_reshape'):
# 这个会作为第一层卷积网络层的输入: x-image-reshape [batch, 28, 28, 1]
x_image_reshape = tf.reshape(x, shape=[-1, img_size, img_size, num_channels], name='x-image-reshape')
# 使用使用summary.image将图片绘制在tensorboard上
tf.summary.image('input_reshape', x_image_reshape, 10)
In [21]:
with tf.name_scope('y_true_class_label'):
y_true_cls = tf.argmax(y_, dimension=1)
In [22]:
with tf.name_scope('dropout_keep_probability'):
keep_prob = tf.placeholder(tf.float32)
tf.summary.scalar('dropout_keep_probability', keep_prob)
In [ ]:
# 第一层卷积网络
filter_size1 = 5
num_output_channel1 = 36
# 第二层卷积网络
filter_size2 = 5
num_output_channel2 = 64
# 全连接网络层
fc_size1 = 1024
In [23]:
# 第一层卷积网络 layer_conv1
layer_conv1 = new_conv_layer(x_image_reshape, num_channels, filter_size1, num_output_channel1, "layer_conv1", tf.nn.relu)
# 第二层卷积网络 layer_conv2
layer_conv2 = new_conv_layer(layer_conv1, num_output_channel1, filter_size2, num_output_channel2, "layer_conv2", tf.nn.relu)
# 展开
layer_flat, num_features = flatten_layer(layer_conv2)
# 全连接网络层1 layer_fc1
layer_fc1 = new_fc_layer(layer_flat, num_features, fc_size1, "layer_fc1", tf.nn.relu, keep_prob)
# 全连接网络层2 layer_fc2
y_pred = new_fc_layer(layer_fc1, fc_size1, num_classes, "layer_fc2", tf.identity, None)
使用cross-entropy作为模型评估指标,这里会对输出值先做softmax然后再计算损失。
In [24]:
# 这里使用cross_entropy作为成本函数,计算精准度
with tf.name_scope('cross_entropy'):
diff = tf.nn.softmax_cross_entropy_with_logits(logits=y_pred, labels=y_)
with tf.name_scope('total'):
cross_entropy = tf.reduce_mean(diff)
tf.summary.scalar("cross_entropy", cross_entropy)
接下来会用到的一些指标型数据
In [26]:
with tf.name_scope('y_pred_class_label'):
y_pred_cls = tf.argmax(y_pred, dimension=1)
with tf.name_scope('correct_prediction'):
correct_prediction = tf.equal(y_pred_cls, y_true_cls)
with tf.name_scope('incorrect_prediction'):
incorrect_prediction = tf.equal(correct_prediction, False)
with tf.name_scope('accuracy'):
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
tf.summary.scalar('accuracy', accuracy)
使用Adam作为优化算法
In [27]:
with tf.name_scope('train_step_minimize'):
train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)
定义一个获取数据的方法,根据需要获取训练数据,验证数据,测试数据。
In [28]:
def feed_dict_train():
xs, ys = data.train.next_batch(100)
return {x: xs, y_: ys, keep_prob: keep_probability_for_dropout}
In [29]:
def feed_dict_validation(quickValidation=True):
if quickValidation:
idx = np.random.choice(len(data.validation.images), size=500, replace=False)
xs, ys = data.validation.images[idx, :], data.validation.labels[idx, :]
else:
xs, ys = data.validation.images, data.validation.labels
return {x: xs, y_: ys, keep_prob: 1.0}
In [39]:
def feed_dict_test(quickTest=True):
if quickTest:
idx = np.random.choice(len(data.test.images), size=1000, replace=False)
xs, ys = data.test.images[idx, :], data.test.labels[idx, :]
else:
xs, ys = data.validation.images, data.validation.labels
return {x: xs, y_: ys, keep_prob: 1.0}
In [31]:
sess = tf.InteractiveSession()
In [32]:
merged = tf.summary.merge_all()
train_writer = tf.summary.FileWriter(train_log_dir, sess.graph)
validation_writer = tf.summary.FileWriter(validation_log_dir)
In [33]:
tf.global_variables_initializer().run()
In [34]:
saver = tf.train.Saver()
In [ ]:
# 如果不想从头开始重新计算,那么直接restore上一次计算的结果
# saver.restore(sess, save_path=tf.train.latest_checkpoint(train_log_dir))
# latest_iteration = 10000
In [35]:
# 开始计算模型
def optimize(num_iterations, if_end=False):
global latest_iteration
print("latest_iteration: %s, to: %s" % (str(latest_iteration), str(latest_iteration+num_iterations)))
start_time = time.time()
for i in range(latest_iteration, latest_iteration + num_iterations):
# 训练模型
summary_train, _ = sess.run([merged, train_step], feed_dict=feed_dict_train())
# 每unit_step次会写入一次训练日志以及计算验证样本精准度并写入测试日志
if i % unit_step == 0:
train_writer.add_summary(summary_train, i)
# 训练步骤写入train目录下
saver.save(sess, save_path=train_log_dir, global_step=i)
# 验证数据
summary_validation, acc = sess.run([merged, accuracy], feed_dict=feed_dict_validation())
validation_writer.add_summary(summary_validation, i)
print('Accuracy at validation step %s: %s' % (i, acc))
latest_iteration += num_iterations
if if_end:
# 如果迭代计算的最后一轮,跑一次全量的测试数据
summary_train, _ = sess.run([merged, train_step], feed_dict=feed_dict_train())
train_writer.add_summary(summary_train, latest_iteration)
saver.save(sess, save_path=train_log_dir, global_step=latest_iteration)
# 测试数据,全量的结果,并写入到validation日志中
summary_test, acc = sess.run([merged, accuracy], feed_dict=feed_dict_test())
validation_writer.add_summary(summary_test, i)
print('Accuracy at test step %s: %s' % (latest_iteration, acc))
train_writer.flush()
validation_writer.flush()
end_time = time.time()
print("training time: %s" % str(end_time - start_time))
第一次迭代具备指导意义,所以单独运行一次,因为如果是随机赋值权重的话,那么准确率应该是10%左右。然后先小量测试几次并查看日志文件是否已经正确创建。这些都应该在正式迭代计算之前做好检查。
In [36]:
optimize(1)
In [ ]:
optimize(10)
In [ ]:
optimize(max_iterations-latest_iteration, True)
In [ ]:
def plot_error_on_board():
buf = io.BytesIO()
buf.flush()
fig, ax = plt.subplots(1, 1)
fig.clear()
max_plot_num = 30
with tf.name_scope("error_example"):
incorrect_prediction_result = sess.run(incorrect_prediction, feed_dict=feed_dict(False, False))
y_pred_cls_result = sess.run(y_pred_cls, feed_dict=feed_dict(False, False))
error_sample_image = data.test.images[incorrect_prediction_result]
error_example_pred = y_pred_cls_result[incorrect_prediction_result]
error_example_true = data.test.cls[incorrect_prediction_result]
for err_img, err_pred, err_true in zip(error_sample_image[0: max_plot_num], error_example_pred[0: max_plot_num], error_example_true[0: max_plot_num]):
ax.imshow(err_img.reshape(img_shape), cmap='binary')
ax.set_xlabel("True:{0}, Pred: {1}".format(err_true, err_pred))
ax.set_xticks([])
ax.set_yticks([])
plt.savefig(buf, format='png')
buf.seek(0)
image = tf.image.decode_png(buf.getvalue(), channels=1)
image = tf.expand_dims(image, 0)
buf.flush()
fig.clear()
error_example_summary = tf.summary.image("error_example", image)
validation_writer.add_summary(sess.run(error_example_summary))
validation_writer.flush()
In [ ]:
plot_error_on_board()
In [ ]:
def plot_confusion_matrix_on_board():
# 预测结果
y_pred_cls_result = sess.run(y_pred_cls, feed_dict=feed_dict(False, False))
# 真实结果
y_true_cls_result = sess.run(y_true_cls, feed_dict=feed_dict(False, False))
# confusion matrix
cm = confusion_matrix(y_true=y_true_cls_result, y_pred=y_pred_cls_result)
plt.matshow(cm)
# Make various adjustments to the plot.
plt.colorbar()
tick_marks = np.arange(num_classes)
plt.xticks(tick_marks, range(num_classes))
plt.yticks(tick_marks, range(num_classes))
plt.xlabel('Predicted')
plt.ylabel('True')
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
image = tf.image.decode_png(buf.getvalue(), channels=3)
image = tf.expand_dims(image, 0)
buf.close()
plt.close()
return image
In [ ]:
# 将错误分类的样本写入board上,便于查看结果
with tf.name_scope("confusion_matrix"):
image = plot_confusion_matrix_on_board()
confusion_matrix_summary = tf.summary.image("confusion_matrix", image, 1)
validation_writer.add_summary(sess.run(confusion_matrix_summary))
validation_writer.flush()
In [ ]:
def images_to_sprite(data):
"""构造sprite图片
Args:
data: NxHxW[x3] tensor containing the images.
Returns:
data: Properly shaped HxWx3 image with any necessary padding.
"""
if len(data.shape) == 3:
data = np.tile(data[..., np.newaxis], (1, 1, 1, 3))
data = data.astype(np.float32)
min = np.min(data.reshape((data.shape[0], -1)), axis=1)
data = (data.transpose(1, 2, 3, 0) - min).transpose(3, 0, 1, 2)
max = np.max(data.reshape((data.shape[0], -1)), axis=1)
data = (data.transpose(1, 2, 3, 0) / max).transpose(3, 0, 1, 2)
# Inverting the colors seems to look better for MNIST
data = 1 - data
n = int(np.ceil(np.sqrt(data.shape[0])))
padding = ((0, n ** 2 - data.shape[0]), (0, 0),
(0, 0)) + ((0, 0),) * (data.ndim - 3)
data = np.pad(data, padding, mode='constant', constant_values=0)
# Tile the individual thumbnails into an image.
data = data.reshape((n, n) + data.shape[1:]).transpose((0, 2, 1, 3)
+ tuple(range(4, data.ndim + 1)))
data = data.reshape((n * data.shape[1], n * data.shape[3]) + data.shape[4:])
data = (data * 255).astype(np.uint8)
return data
In [ ]:
embed_var_collection = {
# 添加到embedding中的变量,全连接层第二层
'layer_fc2': tf.get_default_graph().get_tensor_by_name('layer_fc2/activation/add:0'),
# 添加到embeding中的变量,全连接层第一层
'layer_fc1': tf.get_default_graph().get_tensor_by_name('layer_fc1/activation/dropout/mul:0'),
# 添加到embedding中的变量,卷积第二层
'layer_conv2': tf.get_default_graph().get_tensor_by_name('layer_conv2/activation/dropout/mul:0'),
# 添加到embedding中的变量,卷积第一层
'layer_conv1': tf.get_default_graph().get_tensor_by_name('layer_conv1/activation/dropout/mul:0')
}
# 用于计算权重系数的输入结果
data_for_sprite = feed_dict(False, False, True)
# 创建写入embed目录的summary writer
test_embed_summary = tf.summary.FileWriter(embed_path, sess.graph)
# 关联变量和embedding关系的配置类
config = projector.ProjectorConfig()
# 存储变量列表
tensors = []
# 图片像素
thumbnail_size = mnist.IMAGE_SIZE
for layer_name, embed_tensor in embed_var_collection.iteritems():
# 确定名称
tensor_name = '%s_%s_tensor' % ('test', layer_name)
# 输入数据计算出对应的系数,并将结果初始化为TensorFlow中的变量Variable
# 这里一定要用变量Variable,不能直接用tensor来关联embedding
embed_tensor = sess.run(embed_tensor, feed_dict=data_for_sprite)
embed_tensor_variable = tf.Variable(
np.array(embed_tensor).reshape(embed_tensor.shape[0], -1),
name=tensor_name)
# 加入变量列表用于之后统一写入events文件
tensors.append(embed_tensor_variable)
# 将当前变量映射到embedding中
embedding = config.embeddings.add()
embedding.tensor_name = embed_tensor_variable.name
embedding.metadata_path = os.path.join(embed_path, 'labels.tsv')
embedding.sprite.image_path = os.path.join(embed_path, 'sprite.png')
embedding.sprite.single_image_dim.extend([thumbnail_size, thumbnail_size])
projector.visualize_embeddings(test_embed_summary, config)
# 将所有变量写入events文件,再依据上面Projector配置关联Embedding,最终在TensorBoard的Embedding中查看
sess.run(tf.variables_initializer(tensors))
saver = tf.train.Saver(tensors)
saver.save(sess, os.path.join(embed_path, 'model_embed.ckpt'), 1)
x_images_for_sprite = sess.run(x_image_reshape, feed_dict=data_for_sprite)
x_images_for_sprite = np.array(x_images_for_sprite).reshape(-1, thumbnail_size, thumbnail_size).astype(np.float32)
sprite = images_to_sprite(x_images_for_sprite)
scipy.misc.imsave(os.path.join(embed_path, 'sprite.png'), sprite)
y_true_label = sess.run(tf.argmax(y_, dimension=1), feed_dict=data_for_sprite)
with open(os.path.join(embed_path, 'labels.tsv'), 'w') as metadata_file:
metadata_file.write('Name\tClass\n')
for ll in xrange(len(y_true_label)):
metadata_file.write("%06d\t%d\n" % (ll, y_true_label[ll]))
In [ ]:
def plot_conv_weights_on_board(weights, input_channel=0):
# 获取当前的权重数值:[filter_size, filter_size, num_input_channels, num_output_channels]
w = sess.run(weights)
# 获取权重的最大值和最小值
w_min = np.min(w)
w_max = np.max(w)
# 输出的channels数量,就是当前层使用的过滤器个数
num_filters = w.shape[3]
# 方形布局,一个过滤器占用一格。
num_grids = int(math.ceil(math.sqrt(num_filters)))
fig, axes = plt.subplots(num_grids, num_grids)
for i, ax in enumerate(axes.flat):
# 绘制个数不能超过num_filters
if i < num_filters:
# 每一层的channel数很多,选取某一个channel的权重系数。
img = w[:, :, input_channel, i]
ax.imshow(img, vmin=w_min, vmax=w_max, interpolation='nearest', cmap='seismic')
ax.set_xticks([])
ax.set_yticks([])
# plt.show()
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
image = tf.image.decode_png(buf.getvalue(), channels=3)
image = tf.expand_dims(image, 0)
buf.close()
plt.close(fig)
return image
In [ ]:
# 将第一层权重的第1个channel写入board上,便于查看结果
weights_conv_collection = {
"weight_conv1_image": tf.get_default_graph().get_tensor_by_name('layer_conv1/weights/Variable:0'),
"weight_conv2_image": tf.get_default_graph().get_tensor_by_name('layer_conv2/weights/Variable:0')
}
with tf.name_scope("weights_visulization"):
for name, weight in weights_conv_collection.iteritems():
image = plot_conv_weights_on_board(weight, 0)
weight_conv_summary = tf.summary.image(name, image, 1)
validation_writer.add_summary(sess.run(weight_conv_summary))
validation_writer.flush()
In [ ]:
def plot_conv_layer_on_board(layer, image):
"""只看某一张图片每一层处理之后的输出,这里只查看卷积层的处理
"""
feed_dict_single = {x: [image], keep_prob: 1.0}
values = sess.run(layer, feed_dict=feed_dict_single)
# layer conv 层的结构是: [batch, out_height, out_width, out_channels]
num_filters = values.shape[3]
# 方形布局
num_grids = int(math.ceil(math.sqrt(num_filters)))
fig, axes = plt.subplots(num_grids, num_grids)
for i, ax in enumerate(axes.flat):
if i < num_filters:
img = values[0, :, :, i]
ax.imshow(img, interpolation='nearest', cmap='binary')
ax.set_xticks([])
ax.set_yticks([])
buf = io.BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
image = tf.image.decode_png(buf.getvalue(), channels=3)
image = tf.expand_dims(image, 0)
buf.close()
plt.close(fig)
return image
In [ ]:
# 将第一层权重的第1个channel写入board上,便于查看结果
layer_conv_collections = {
"layer_conv1_image": layer_conv1,
"layer_conv2_image": layer_conv2
}
with tf.name_scope("layer_visulization"):
for name, layer in layer_conv_collections.iteritems():
image = plot_conv_layer_on_board(layer, image=data.test.images[0])
layer_conv_summary = tf.summary.image(name, image, 1)
validation_writer.add_summary(sess.run(layer_conv_summary))
validation_writer.flush()
In [ ]:
train_writer.close()
validation_writer.close()
In [ ]:
sess.close()
In [ ]: