Image Style Transferの実装


Gatys, Ecker, and Bethge(2016)の画像スタイル変換を実装する。

【参考文献】
L. A. Gatys, A. S. Ecker, and M. Bethge,
Image style transfer using convolutional neural networks,
In Proceedings of the IEEE Conference on Computer Vision and Pattern Recognition, pages, 2414-2423, 2016.


In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf

In [2]:
CONTENT_FILE = '/home/ishiyama/image_style_transfer/image/input/test_input_01.JPG'
STYLE_FILE = '/home/ishiyama/image_style_transfer/image/style/test_style_01.jpg'

In [3]:
class Image(np.ndarray):

    """画像を扱うためのnumpy.ndarray
    
    XXX: 実装が大変なので一旦保留
         画像の形状の情報を属性で取り出せるようにしたい
    """
    
    DATA_FORMAT_CHAR = {
        'BATCH': 'N',
        'HEIGHT': 'H',
        'WIDTH': 'W',
        'CHANNEL': 'C'
    }

    def __new__(subtype,
                shape,
                dtype=float,
                buffer=None,
                offset=0,
                strides=None,
                order=None,
                data_format='NHWC'):

        super(__class__, self).__new__(subtype, shape, dtype, buffer, offset, strides, order)

        self.data_format = data_format
        num_batch, num_height, num_width, num_channel = self._get_image_shape(data_format=data_format)
        self.num_batch = num_batch
        self.num_height = num_height
        self.num_width = num_width
        self.num_channel = num_channel

    def _get_image_shape(self, data_format):
        _image_shape = self.shape
        idx_batch = self._get_index_data_format(data_format=data_format, data_type='BATCH'),
        idx_height = self._get_index_data_format(data_format=data_format, data_type='HEIGHT')
        idx_width = self._get_index_data_format(data_format=data_format, data_type='WIDTH')
        idx_channel = self._get_index_data_format(data_format=data_format, data_type='CHANNEL')
        reordered_image_shape = (_image_shape[idx_batch],
                                 _image_shape[idx_height],
                                 _image_shape[idx_width],
                                 _image_shape[idx_channel])
        return reordered_image_shape

    def _get_index_data_format(self, data_format, data_type):
        idx = data_format.find(__class__.DATA_FORMAT_CHAR[data_type])
        if idx == -1:
            raise ValueError('data type "{}" is not available.'.format(data_type))

        return idx

    @classmethod
    def reshape(self, *args):
        self = __class__(super(__class__, self).reshape(args))

In [4]:
def read_images_as_jpeg(file):
    
    """
    JPEG Image Reader
    
    This function reads the content and style images as JPEG format.
    These image data must be square for now, different height and
    width will be able to supplied for future.
    
    Args:
        file : str. A path of the image file.
    
    Returns:
        A tuple. Each Elements are Tensor object of the read images.
    """
    
    filename_queue = tf.train.string_input_producer([file])
    reader = tf.WholeFileReader()
    key, value = reader.read(filename_queue)
    image = tf.image.decode_jpeg(value)

    # Read image is a Tensor object which tf.nn.conv2d cannot handle,
    # so convert it to numpy.ndarray.
    sess = tf.Session()
    sess.run(tf.global_variables_initializer())
    tf.train.start_queue_runners(sess)
    # Returned array's shape will be (height, width, channel).
    image_array_hwc = sess.run(image)
    new_shape = [1]
    new_shape.extend(image_array_hwc.shape)

    # return image_array_chw
    return image_array_hwc.reshape(new_shape)

In [5]:
content_image = read_images_as_jpeg(file=CONTENT_FILE)
style_image = read_images_as_jpeg(file=STYLE_FILE)

VGGを実装する


画像の特徴量を抽出するアルゴリズムにはSimonyan and Zisserman(2015)で提案されたCNN(VGG19)の畳込み層とプーリング層が使われている。
ここでは、「TensorFlowで学ぶディープラーニング入門」の多層CNNの実装を参考にVGG19を構築する。

【参考文献】
K. Simonyan and A. Zisserman, Very Deep Convolutional Networks For Large-Scale Image Recognition, arXiv: 1409.1556v6, 2015
中井悦司, TensorFlowで学ぶディープラーニング入門〜畳み込みニューラルネットワーク徹底解説, マイナビ出版, 2016

畳み込み層を実装する


In [6]:
import tensorflow as tf
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data


def calculate_convolutional_layer(x, filter_height, filter_width, output_channels):

    """
    Executeing a convolutional layer task.
    
    Args:
        x                     : An image data.
        filter_height   (int) : A height of each filters.
        filter_width    (int) : A width of each filters.
        output_channels (int) : A number of channels which must be outputed.

    Returns:
        An activation of an convolutional layer.
    """

    if ((not isinstance(filter_height, int))
        or (not isinstance(filter_width, int))
        or (not isinstance(output_channels, int))):
        raise TypeError('"filter_height" and "filter_width" and "output_channels" '
                        'must be integer.')

    # TODO: 入力画像の縦、横、チャンネル数を属性で取得できるようにする
    # 例) input_channels = x.num_channels
    input_channels = int(x.shape[-1])
    filter_value = 1 / float(filter_height * filter_width)
    epsilon = filter_value / 10.0
    W = tf.Variable(
        tf.random_uniform(
            shape=[filter_height,
                   filter_width,
                   input_channels,
                   output_channels],
            minval=filter_value - epsilon,
            maxval=filter_value + epsilon
        )
    )
    h = tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
    b = tf.Variable(tf.constant(0.1, shape=[output_channels]))
    convoluted_image = tf.nn.relu(h + b)

    return convoluted_image

In [7]:
x = tf.placeholder(tf.float32, [1, 477, 477, 3])
test_model = calculate_convolutional_layer(
    x=x,
    filter_height=3,
    filter_width=3,
    output_channels=1
)
sess = tf.InteractiveSession()
# tf.Session()だと、sess.runで返ってくる行列の要素がすべて0だった。
# TODO: Sessionメソッド と InteractiveSessionメソッドの違いを調べる
# sess = tf.Session()
sess.run(tf.global_variables_initializer())
test_result = sess.run(test_model, feed_dict={x: content_image})

In [8]:
test_result.shape


Out[8]:
(1, 477, 477, 1)

In [9]:
test_result


Out[9]:
array([[[[ 114.27301025],
         [ 173.50660706],
         [ 174.29614258],
         ..., 
         [ 480.10452271],
         [ 481.24404907],
         [ 323.35064697]],

        [[ 166.4135437 ],
         [ 252.51049805],
         [ 254.13793945],
         ..., 
         [ 717.27807617],
         [ 718.06811523],
         [ 481.68475342]],

        [[ 158.3868866 ],
         [ 241.47108459],
         [ 244.67269897],
         ..., 
         [ 718.62445068],
         [ 718.66595459],
         [ 481.57723999]],

        ..., 
        [[ 448.78652954],
         [ 674.51971436],
         [ 673.51464844],
         ..., 
         [ 672.70135498],
         [ 670.52099609],
         [ 449.13308716]],

        [[ 449.70996094],
         [ 676.00695801],
         [ 674.76342773],
         ..., 
         [ 673.22253418],
         [ 671.83905029],
         [ 450.01858521]],

        [[ 297.78442383],
         [ 444.92471313],
         [ 443.75894165],
         ..., 
         [ 443.45913696],
         [ 442.98733521],
         [ 296.54943848]]]], dtype=float32)

Maxプーリング層を実装する


In [10]:
def calculate_max_pooling_layer(x, ksize, strides):

    """Wrapper function of tf.nn.max_pool.
    
    Args:
        x       : A Tensor produced by calculate_convolutional_layer.
        ksize   : A list of ints that has length >= 4. The size of
                  the window for each dimension of the input tensor.
        strides : A list of ints that has length >= 4. The stride
                  of the sliding window for each dimension of the
                  input tensor.
    
    Returns:
        A pooled image.
    """

    pooled_image = tf.nn.max_pool(x, ksize=ksize, strides=strides, padding='SAME')

    return pooled_image

畳込みとプーリング処理の途中経過を保持するクラスを実装する


In [11]:
class FeatureMapHolder(object):

    def __init__(self):

        self.conv = []
        self.pool = None

    def set_conv(self, mat):

        self.conv.append(mat)

    def get_conv(self, idx):

        if idx is None:
            raise ValueError('idx is required.')

        if not isinstance(idx, int):
            raise ValueError('idx must be an integer.')

        return self.conv[idx]

    def set_pool(self, mat):

        self.pool = mat

    def get_pool(self):

        return self.pool

    def get(self, type, idx=None):

        if type == 'pool':
            return self.get_pool()
        elif type == 'conv':
            return self.get_conv(idx)

In [12]:
FILTER_CONF = {
    'conv': {
        'height': 3,
        'width': 3
    },
    'maxpool': {
        'height': 2,
        'width': 2
    }
}


def apply_vgg_network_unit(x, channels, num_conv):

    """Apply VGG Network From a Convolutional Layer to Max Pooling Layer.

    Table 1 of Simonyan and Zisserman(2015) is separated by 5 parts,
    each parts is from an input data or a pooled data at previous part
    to a maxpool.
    This function provides to apply a that part.
    This will apply recursively.
    
    Args:
        x (Tensor)     : An input data or A Max pooled data returned by this function.
        channels (int) : A number of channels described at Table 1 of
                         Simonyan and Zisserman(2015).
        num_conv (int) : A number of applying covolutional layers.
                         See Simonyan and Zisserman(2015) for detail.

    Returns:
        A ConvNetProgressHolder object.
    """

    if num_conv < 2:
        raise ValueError('num_conv must be >= 2.')

    feature_maps = FeatureMapHolder()

    conv = calculate_convolutional_layer(
        x=x,
        filter_height=FILTER_CONF['conv']['height'],
        filter_width=FILTER_CONF['conv']['width'],
        output_channels=channels
    )
    feature_maps.set_conv(conv)

    for i in range(1, num_conv):
        conv = calculate_convolutional_layer(
            x=feature_maps.get(type='conv', idx=i-1),
            filter_height=FILTER_CONF['conv']['height'],
            filter_width=FILTER_CONF['conv']['width'],
            output_channels=channels
        )
        feature_maps.set_conv(conv)

    pool = calculate_max_pooling_layer(
        x=feature_maps.get('conv', idx=i-1),
        ksize=[1, FILTER_CONF['maxpool']['height'], FILTER_CONF['maxpool']['width'], 1],
        strides=[1, 2, 2, 1]
    )
    feature_maps.set_pool(pool)

    return feature_maps

VGGの畳込みとプーリング層を構築する

VGGの論文に従い、複数回の畳込み処理と1回のMAXプーリング処理を1セットとして、それを5回繰り返す。
今回実装する画像変換アルゴリズムでは、この処理の途中経過を使うため、使いたい部分をリストで1つにまとめてsess.run()に投げる。


In [13]:
# 例
x = tf.placeholder(tf.float32, [1, 477, 477, 3])
unit1 = apply_vgg_network_unit(x=x, channels=2, num_conv=2)
unit2 = apply_vgg_network_unit(x=unit1.get(type='pool'), channels=4, num_conv=2)
unit3 = apply_vgg_network_unit(x=unit2.get(type='pool'), channels=8, num_conv=4)
unit4 = apply_vgg_network_unit(x=unit3.get(type='pool'), channels=16, num_conv=4)
unit5 = apply_vgg_network_unit(x=unit4.get(type='pool'), channels=32, num_conv=4)
sess = tf.InteractiveSession()
sess.run(tf.global_variables_initializer())

# 使いたい過程をリストでまとめてsess.runに投げると特徴量抽出の
# 途中経過を取り出せる
result_unit2_conv, result_unit5_conv, result_unit5_pool = sess.run(
    [unit2.get(type='conv', idx=1), unit5.get(type='conv', idx=2), unit5.get(type='pool')],
    feed_dict={x: content_image}
)

In [14]:
NETWORK_CONF = {
    'layer1': {
        'num_channels': 2,
        'num_conv': 2
    },
    'layer2': {
        'num_channels': 4,
        'num_conv': 2
    },
    'layer3': {
        'num_channels': 8,
        'num_conv': 4
    },
    'layer4': {
        'num_channels': 16,
        'num_conv': 4
    },
    'layer5': {
        'num_channels': 32,
        'num_conv': 4
    }
}

In [37]:
def extract_feature_map(image, extract=None, run=True):

    x = tf.placeholder(tf.float32, image.shape)

    layers = []

    layer1 = apply_vgg_network_unit(
        x=x,
        channels=NETWORK_CONF['layer1']['num_channels'],
        num_conv=NETWORK_CONF['layer1']['num_conv']
    )
    layers.append(layer1)

    layer2 = apply_vgg_network_unit(
        x=layer1.get(type='pool'),
        channels=NETWORK_CONF['layer2']['num_channels'],
        num_conv=NETWORK_CONF['layer2']['num_conv']
    )
    layers.append(layer2)
    
    layer3 = apply_vgg_network_unit(
        x=layer2.get(type='pool'),
        channels=NETWORK_CONF['layer3']['num_channels'],
        num_conv=NETWORK_CONF['layer3']['num_conv']
    )
    layers.append(layer3)
    
    layer4 = apply_vgg_network_unit(
        x=layer3.get(type='pool'),
        channels=NETWORK_CONF['layer4']['num_channels'],
        num_conv=NETWORK_CONF['layer4']['num_conv']
    )
    layers.append(layer4)
    
    layer5 = apply_vgg_network_unit(
        x=layer4.get(type='pool'),
        channels=NETWORK_CONF['layer5']['num_channels'],
        num_conv=NETWORK_CONF['layer5']['num_conv']
    )
    layers.append(layer5)

    # モデルの構築のみの場合(run=True)はlayersを返却して終了
    if not run:
        return layers

    if extract is None:
        raise ValueError('extract is required if run is True.')

    sess = tf.InteractiveSession()
    sess.run(tf.global_variables_initializer())

    extract_layers = []
    for ext in extract:
        if len(ext) == 2:
            type, idx_layer = ext
            extract_layers.append(layers[idx_layer].get(type))
        elif len(ext) == 3:
            type, idx_layer, idx_conv = ext
            extract_layers.append(layers[idx_layer].get(type, idx_conv))
        else:
            raise ValueError('Format of extract is not available: {}'.format(ext))

    result_list = sess.run(extract_layers, feed_dict={x: image})

    return result_list

In [16]:
conv1_1, conv2_2, pool4 = extract_feature_map(
    image=content_image,
    extract=[('conv', 0, 0), ('conv', 1, 1), ('pool', 3)]
)

画像を合成する処理を作る

VGGによってスタイルを変換する画像とスタイルのリファレンスとなる画像の特徴量を抽出できるようになった。
今度はこの特徴量を用いて画像を実際に合成する部分を作っていく。

このアルゴリズムは変換したいスタイルに寄せるための損失関数$L_{style}$と
変換する画像の内容に寄せるための損失関数$L_{content}$をそれぞれ計算して、
\begin{equation} L_{total} = \alpha L_{content} + \beta L_{style} \end{equation} を最適化することで画像を合成する。


変換したいスタイルに寄せるための損失関数$L_{style}$を実装する

変換したいスタイルに寄せるための損失関数$L_{style}$は次で計算する: \begin{align} L_{style} &= \sum_{l=1}^{L} w_{l} E_{l} \\ E_{l} &= \frac{1}{4N_{l}^{2}M_{l}^{2}} \sum_{i=1}^{N_{l}} \sum_{j=1}^{N_{l}} (G_{ij}^{l} - A_{ij}^{l})^{2} \end{align} ただし、$N_{l}$は層$l$の畳込みフィルターの数、 $M_{l}$は層$l$の畳込みフィルターのサイズで$M_{l} = \text{フィルターの高さ} \times \text{フィルターの幅}$、 $A_{ij}^{l}$はスタイル画像から抽出した特徴量で計算したグラム行列$A^{l}$の$(i, j)$成分、 同様に、$G_{ij}^{l}$は合成後の画像から抽出した特徴量で計算したグラム行列$G^{l}$の$(i, j)$成分である。 $G_{ij}^{l}$は次で計算する。
$G^{l}$は$(N_{l}, N_{l})$型行列で \begin{equation} G_{ij}^{l} = \sum_{k=1}^{M_{l}} F_{ik}^{l}F_{jk}^{l}. \end{equation}


In [17]:
def convert_nhwc_to_nchw(image):
    return image.transpose([0, 3, 1, 2])

In [18]:
converted = convert_nhwc_to_nchw(result_unit5_conv)
converted.shape


Out[18]:
(1, 32, 30, 30)

In [19]:
def flatten(image):
    num_batch, num_channel, num_height, num_width = image.shape
#    if num_batch != 1:
#        raise ValueError('Not assumed batch size has been ocurred.')
    return image.reshape([num_batch, num_channel, num_height * num_width])

flattenのテスト


In [20]:
test_data1 = np.arange(1 * 3 * 4 * 2).reshape([1, 3, 4, 2])
test_data1


Out[20]:
array([[[[ 0,  1],
         [ 2,  3],
         [ 4,  5],
         [ 6,  7]],

        [[ 8,  9],
         [10, 11],
         [12, 13],
         [14, 15]],

        [[16, 17],
         [18, 19],
         [20, 21],
         [22, 23]]]])

In [21]:
flatten(test_data1)


Out[21]:
array([[[ 0,  1,  2,  3,  4,  5,  6,  7],
        [ 8,  9, 10, 11, 12, 13, 14, 15],
        [16, 17, 18, 19, 20, 21, 22, 23]]])

In [22]:
test_data2 = np.arange(2 * 3 * 4 * 2).reshape([2, 3, 4, 2])
test_data2


Out[22]:
array([[[[ 0,  1],
         [ 2,  3],
         [ 4,  5],
         [ 6,  7]],

        [[ 8,  9],
         [10, 11],
         [12, 13],
         [14, 15]],

        [[16, 17],
         [18, 19],
         [20, 21],
         [22, 23]]],


       [[[24, 25],
         [26, 27],
         [28, 29],
         [30, 31]],

        [[32, 33],
         [34, 35],
         [36, 37],
         [38, 39]],

        [[40, 41],
         [42, 43],
         [44, 45],
         [46, 47]]]])

In [23]:
flatten(test_data2)


Out[23]:
array([[[ 0,  1,  2,  3,  4,  5,  6,  7],
        [ 8,  9, 10, 11, 12, 13, 14, 15],
        [16, 17, 18, 19, 20, 21, 22, 23]],

       [[24, 25, 26, 27, 28, 29, 30, 31],
        [32, 33, 34, 35, 36, 37, 38, 39],
        [40, 41, 42, 43, 44, 45, 46, 47]]])

In [24]:
# グラム行列を計算する
def calculate_gram_matrix(x):
    return np.dot(x, x.T)

In [25]:
flattened = flatten(result_unit5_conv)
flattened


Out[25]:
array([[[  1.72774497e+12,   1.71310606e+12,   1.71771534e+12, ...,
           2.04983711e+12,   2.06964392e+12,   2.06642859e+12],
        [  3.19720443e+12,   3.17136621e+12,   3.17231753e+12, ...,
           3.36004750e+12,   3.38134932e+12,   3.37660713e+12],
        [  4.26700073e+12,   4.23316842e+12,   4.23089118e+12, ...,
           3.69687554e+12,   3.71591007e+12,   3.71104154e+12],
        ..., 
        [  4.77729286e+12,   4.74064146e+12,   4.73164310e+12, ...,
           4.39728315e+12,   4.41955333e+12,   4.41371905e+12],
        [  3.92295009e+12,   3.89338995e+12,   3.88214948e+12, ...,
           3.65187223e+12,   3.66564449e+12,   3.66082182e+12],
        [  2.26813621e+12,   2.25203690e+12,   2.23939710e+12, ...,
           2.12420749e+12,   2.12506536e+12,   2.12239201e+12]]], dtype=float32)

In [36]:
gram_matrix = calculate_gram_matrix(flattened[0])
# gram_matrix

In [27]:
gram_matrix.shape


Out[27]:
(30, 30)

In [28]:
content_conv4_2 = extract_feature_map(image=content_image, extract=[('conv', 3, 1)])
style_conv1_1, style_conv2_1, style_conv3_1, style_conv4_1, style_conv5_1 = extract_feature_map(
    image=style_image,
    extract=[
        ('conv', 0, 0),
        ('conv', 1, 0),
        ('conv', 2, 0),
        ('conv', 3, 0),
        ('conv', 4, 0)
    ]
)

損失関数を構成する


In [40]:
creating_image = tf.placeholder(tf.float32, [1, 477, 477, 3])
feature_map = extract_feature_map(
    image=creating_image,
    run=False
)

In [41]:
feature_map[0].get(type='conv', idx=0)


Out[41]:
<tf.Tensor 'Relu_97:0' shape=(1, 477, 477, 2) dtype=float32>

In [ ]: