In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
In [2]:
CONTENT_FILE = '/home/ishiyama/image_style_transfer/image/input/test_input_01.JPG'
STYLE_FILE = '/home/ishiyama/image_style_transfer/image/style/test_style_01.jpg'
In [3]:
class Image(np.ndarray):
"""画像を扱うためのnumpy.ndarray
XXX: 実装が大変なので一旦保留
画像の形状の情報を属性で取り出せるようにしたい
"""
DATA_FORMAT_CHAR = {
'BATCH': 'N',
'HEIGHT': 'H',
'WIDTH': 'W',
'CHANNEL': 'C'
}
def __new__(subtype,
shape,
dtype=float,
buffer=None,
offset=0,
strides=None,
order=None,
data_format='NHWC'):
super(__class__, self).__new__(subtype, shape, dtype, buffer, offset, strides, order)
self.data_format = data_format
num_batch, num_height, num_width, num_channel = self._get_image_shape(data_format=data_format)
self.num_batch = num_batch
self.num_height = num_height
self.num_width = num_width
self.num_channel = num_channel
def _get_image_shape(self, data_format):
_image_shape = self.shape
idx_batch = self._get_index_data_format(data_format=data_format, data_type='BATCH'),
idx_height = self._get_index_data_format(data_format=data_format, data_type='HEIGHT')
idx_width = self._get_index_data_format(data_format=data_format, data_type='WIDTH')
idx_channel = self._get_index_data_format(data_format=data_format, data_type='CHANNEL')
reordered_image_shape = (_image_shape[idx_batch],
_image_shape[idx_height],
_image_shape[idx_width],
_image_shape[idx_channel])
return reordered_image_shape
def _get_index_data_format(self, data_format, data_type):
idx = data_format.find(__class__.DATA_FORMAT_CHAR[data_type])
if idx == -1:
raise ValueError('data type "{}" is not available.'.format(data_type))
return idx
@classmethod
def reshape(self, *args):
self = __class__(super(__class__, self).reshape(args))
In [4]:
def read_images_as_jpeg(file):
"""
JPEG Image Reader
This function reads the content and style images as JPEG format.
These image data must be square for now, different height and
width will be able to supplied for future.
Args:
file : str. A path of the image file.
Returns:
A tuple. Each Elements are Tensor object of the read images.
"""
filename_queue = tf.train.string_input_producer([file])
reader = tf.WholeFileReader()
key, value = reader.read(filename_queue)
image = tf.image.decode_jpeg(value)
# Read image is a Tensor object which tf.nn.conv2d cannot handle,
# so convert it to numpy.ndarray.
sess = tf.Session()
sess.run(tf.global_variables_initializer())
tf.train.start_queue_runners(sess)
# Returned array's shape will be (height, width, channel).
image_array_hwc = sess.run(image)
new_shape = [1]
new_shape.extend(image_array_hwc.shape)
# return image_array_chw
return image_array_hwc.reshape(new_shape)
In [5]:
content_image = read_images_as_jpeg(file=CONTENT_FILE)
style_image = read_images_as_jpeg(file=STYLE_FILE)
画像の特徴量を抽出するアルゴリズムにはSimonyan and Zisserman(2015)で提案されたCNN(VGG19)の畳込み層とプーリング層が使われている。
ここでは、「TensorFlowで学ぶディープラーニング入門」の多層CNNの実装を参考にVGG19を構築する。
【参考文献】
K. Simonyan and A. Zisserman, Very Deep Convolutional Networks For Large-Scale Image Recognition, arXiv: 1409.1556v6, 2015
中井悦司, TensorFlowで学ぶディープラーニング入門〜畳み込みニューラルネットワーク徹底解説, マイナビ出版, 2016
In [6]:
import tensorflow as tf
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data
def calculate_convolutional_layer(x, filter_height, filter_width, output_channels):
"""
Executeing a convolutional layer task.
Args:
x : An image data.
filter_height (int) : A height of each filters.
filter_width (int) : A width of each filters.
output_channels (int) : A number of channels which must be outputed.
Returns:
An activation of an convolutional layer.
"""
if ((not isinstance(filter_height, int))
or (not isinstance(filter_width, int))
or (not isinstance(output_channels, int))):
raise TypeError('"filter_height" and "filter_width" and "output_channels" '
'must be integer.')
# TODO: 入力画像の縦、横、チャンネル数を属性で取得できるようにする
# 例) input_channels = x.num_channels
input_channels = int(x.shape[-1])
filter_value = 1 / float(filter_height * filter_width)
epsilon = filter_value / 10.0
W = tf.Variable(
tf.random_uniform(
shape=[filter_height,
filter_width,
input_channels,
output_channels],
minval=filter_value - epsilon,
maxval=filter_value + epsilon
)
)
h = tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
b = tf.Variable(tf.constant(0.1, shape=[output_channels]))
convoluted_image = tf.nn.relu(h + b)
return convoluted_image
In [7]:
x = tf.placeholder(tf.float32, [1, 477, 477, 3])
test_model = calculate_convolutional_layer(
x=x,
filter_height=3,
filter_width=3,
output_channels=1
)
sess = tf.InteractiveSession()
# tf.Session()だと、sess.runで返ってくる行列の要素がすべて0だった。
# TODO: Sessionメソッド と InteractiveSessionメソッドの違いを調べる
# sess = tf.Session()
sess.run(tf.global_variables_initializer())
test_result = sess.run(test_model, feed_dict={x: content_image})
In [8]:
test_result.shape
Out[8]:
In [9]:
test_result
Out[9]:
In [10]:
def calculate_max_pooling_layer(x, ksize, strides):
"""Wrapper function of tf.nn.max_pool.
Args:
x : A Tensor produced by calculate_convolutional_layer.
ksize : A list of ints that has length >= 4. The size of
the window for each dimension of the input tensor.
strides : A list of ints that has length >= 4. The stride
of the sliding window for each dimension of the
input tensor.
Returns:
A pooled image.
"""
pooled_image = tf.nn.max_pool(x, ksize=ksize, strides=strides, padding='SAME')
return pooled_image
In [11]:
class FeatureMapHolder(object):
def __init__(self):
self.conv = []
self.pool = None
def set_conv(self, mat):
self.conv.append(mat)
def get_conv(self, idx):
if idx is None:
raise ValueError('idx is required.')
if not isinstance(idx, int):
raise ValueError('idx must be an integer.')
return self.conv[idx]
def set_pool(self, mat):
self.pool = mat
def get_pool(self):
return self.pool
def get(self, type, idx=None):
if type == 'pool':
return self.get_pool()
elif type == 'conv':
return self.get_conv(idx)
In [12]:
FILTER_CONF = {
'conv': {
'height': 3,
'width': 3
},
'maxpool': {
'height': 2,
'width': 2
}
}
def apply_vgg_network_unit(x, channels, num_conv):
"""Apply VGG Network From a Convolutional Layer to Max Pooling Layer.
Table 1 of Simonyan and Zisserman(2015) is separated by 5 parts,
each parts is from an input data or a pooled data at previous part
to a maxpool.
This function provides to apply a that part.
This will apply recursively.
Args:
x (Tensor) : An input data or A Max pooled data returned by this function.
channels (int) : A number of channels described at Table 1 of
Simonyan and Zisserman(2015).
num_conv (int) : A number of applying covolutional layers.
See Simonyan and Zisserman(2015) for detail.
Returns:
A ConvNetProgressHolder object.
"""
if num_conv < 2:
raise ValueError('num_conv must be >= 2.')
feature_maps = FeatureMapHolder()
conv = calculate_convolutional_layer(
x=x,
filter_height=FILTER_CONF['conv']['height'],
filter_width=FILTER_CONF['conv']['width'],
output_channels=channels
)
feature_maps.set_conv(conv)
for i in range(1, num_conv):
conv = calculate_convolutional_layer(
x=feature_maps.get(type='conv', idx=i-1),
filter_height=FILTER_CONF['conv']['height'],
filter_width=FILTER_CONF['conv']['width'],
output_channels=channels
)
feature_maps.set_conv(conv)
pool = calculate_max_pooling_layer(
x=feature_maps.get('conv', idx=i-1),
ksize=[1, FILTER_CONF['maxpool']['height'], FILTER_CONF['maxpool']['width'], 1],
strides=[1, 2, 2, 1]
)
feature_maps.set_pool(pool)
return feature_maps
In [13]:
# 例
x = tf.placeholder(tf.float32, [1, 477, 477, 3])
unit1 = apply_vgg_network_unit(x=x, channels=2, num_conv=2)
unit2 = apply_vgg_network_unit(x=unit1.get(type='pool'), channels=4, num_conv=2)
unit3 = apply_vgg_network_unit(x=unit2.get(type='pool'), channels=8, num_conv=4)
unit4 = apply_vgg_network_unit(x=unit3.get(type='pool'), channels=16, num_conv=4)
unit5 = apply_vgg_network_unit(x=unit4.get(type='pool'), channels=32, num_conv=4)
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())
# 使いたい過程をリストでまとめてsess.runに投げると特徴量抽出の
# 途中経過を取り出せる
result_unit2_conv, result_unit5_conv, result_unit5_pool = sess.run(
[unit2.get(type='conv', idx=1), unit5.get(type='conv', idx=2), unit5.get(type='pool')],
feed_dict={x: content_image}
)
In [14]:
NETWORK_CONF = {
'layer1': {
'num_channels': 2,
'num_conv': 2
},
'layer2': {
'num_channels': 4,
'num_conv': 2
},
'layer3': {
'num_channels': 8,
'num_conv': 4
},
'layer4': {
'num_channels': 16,
'num_conv': 4
},
'layer5': {
'num_channels': 32,
'num_conv': 4
}
}
In [37]:
def extract_feature_map(image, extract=None, run=True):
x = tf.placeholder(tf.float32, image.shape)
layers = []
layer1 = apply_vgg_network_unit(
x=x,
channels=NETWORK_CONF['layer1']['num_channels'],
num_conv=NETWORK_CONF['layer1']['num_conv']
)
layers.append(layer1)
layer2 = apply_vgg_network_unit(
x=layer1.get(type='pool'),
channels=NETWORK_CONF['layer2']['num_channels'],
num_conv=NETWORK_CONF['layer2']['num_conv']
)
layers.append(layer2)
layer3 = apply_vgg_network_unit(
x=layer2.get(type='pool'),
channels=NETWORK_CONF['layer3']['num_channels'],
num_conv=NETWORK_CONF['layer3']['num_conv']
)
layers.append(layer3)
layer4 = apply_vgg_network_unit(
x=layer3.get(type='pool'),
channels=NETWORK_CONF['layer4']['num_channels'],
num_conv=NETWORK_CONF['layer4']['num_conv']
)
layers.append(layer4)
layer5 = apply_vgg_network_unit(
x=layer4.get(type='pool'),
channels=NETWORK_CONF['layer5']['num_channels'],
num_conv=NETWORK_CONF['layer5']['num_conv']
)
layers.append(layer5)
# モデルの構築のみの場合(run=True)はlayersを返却して終了
if not run:
return layers
if extract is None:
raise ValueError('extract is required if run is True.')
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())
extract_layers = []
for ext in extract:
if len(ext) == 2:
type, idx_layer = ext
extract_layers.append(layers[idx_layer].get(type))
elif len(ext) == 3:
type, idx_layer, idx_conv = ext
extract_layers.append(layers[idx_layer].get(type, idx_conv))
else:
raise ValueError('Format of extract is not available: {}'.format(ext))
result_list = sess.run(extract_layers, feed_dict={x: image})
return result_list
In [16]:
conv1_1, conv2_2, pool4 = extract_feature_map(
image=content_image,
extract=[('conv', 0, 0), ('conv', 1, 1), ('pool', 3)]
)
変換したいスタイルに寄せるための損失関数$L_{style}$は次で計算する:
\begin{align}
L_{style} &= \sum_{l=1}^{L} w_{l} E_{l} \\
E_{l} &= \frac{1}{4N_{l}^{2}M_{l}^{2}} \sum_{i=1}^{N_{l}} \sum_{j=1}^{N_{l}} (G_{ij}^{l} - A_{ij}^{l})^{2}
\end{align}
ただし、$N_{l}$は層$l$の畳込みフィルターの数、
$M_{l}$は層$l$の畳込みフィルターのサイズで$M_{l} = \text{フィルターの高さ} \times \text{フィルターの幅}$、
$A_{ij}^{l}$はスタイル画像から抽出した特徴量で計算したグラム行列$A^{l}$の$(i, j)$成分、
同様に、$G_{ij}^{l}$は合成後の画像から抽出した特徴量で計算したグラム行列$G^{l}$の$(i, j)$成分である。
$G_{ij}^{l}$は次で計算する。
$G^{l}$は$(N_{l}, N_{l})$型行列で
\begin{equation}
G_{ij}^{l} = \sum_{k=1}^{M_{l}} F_{ik}^{l}F_{jk}^{l}.
\end{equation}
In [17]:
def convert_nhwc_to_nchw(image):
return image.transpose([0, 3, 1, 2])
In [18]:
converted = convert_nhwc_to_nchw(result_unit5_conv)
converted.shape
Out[18]:
In [19]:
def flatten(image):
num_batch, num_channel, num_height, num_width = image.shape
# if num_batch != 1:
# raise ValueError('Not assumed batch size has been ocurred.')
return image.reshape([num_batch, num_channel, num_height * num_width])
In [20]:
test_data1 = np.arange(1 * 3 * 4 * 2).reshape([1, 3, 4, 2])
test_data1
Out[20]:
In [21]:
flatten(test_data1)
Out[21]:
In [22]:
test_data2 = np.arange(2 * 3 * 4 * 2).reshape([2, 3, 4, 2])
test_data2
Out[22]:
In [23]:
flatten(test_data2)
Out[23]:
In [24]:
# グラム行列を計算する
def calculate_gram_matrix(x):
return np.dot(x, x.T)
In [25]:
flattened = flatten(result_unit5_conv)
flattened
Out[25]:
In [36]:
gram_matrix = calculate_gram_matrix(flattened[0])
# gram_matrix
In [27]:
gram_matrix.shape
Out[27]:
In [28]:
content_conv4_2 = extract_feature_map(image=content_image, extract=[('conv', 3, 1)])
style_conv1_1, style_conv2_1, style_conv3_1, style_conv4_1, style_conv5_1 = extract_feature_map(
image=style_image,
extract=[
('conv', 0, 0),
('conv', 1, 0),
('conv', 2, 0),
('conv', 3, 0),
('conv', 4, 0)
]
)
In [40]:
creating_image = tf.placeholder(tf.float32, [1, 477, 477, 3])
feature_map = extract_feature_map(
image=creating_image,
run=False
)
In [41]:
feature_map[0].get(type='conv', idx=0)
Out[41]:
In [ ]: