In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
In [ ]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
シーケンスデータを処理する際に個々のサンプルの長さが異なることは、非常に一般的です。次の例(単語としてトークン化されたテキスト)を考えてみます。
[
["Hello", "world", "!"],
["How", "are", "you", "doing", "today"],
["The", "weather", "will", "be", "nice", "tomorrow"],
]
語彙検索の後、データは以下のように整数としてベクトル化されるかもしれません。
[
[71, 1331, 4231]
[73, 8, 3215, 55, 927],
[83, 91, 1, 645, 1253, 927],
]
データは、個々のサンプルがそれぞれ 3、5、6 の長さを持つネストされたリストです。ディープラーニングモデルの入力データは,単一のテンソル(例えばこの場合だと(batch_size, 6, vocab_size)
のような形状)でなければならないため、最長のアイテムよりも短いサンプルは、何らかのプレースホルダー値でパディングする必要があります。(その代わりに、短いサンプルをパディングする前に長いサンプルをトランケートすることも可能です。)
Keras は Python のリストを共通の長さにトランケートしたりパディングしたりするユーティリティ関数を提供します:tf.keras.preprocessing.sequence.pad_sequences
In [ ]:
raw_inputs = [
[711, 632, 71],
[73, 8, 3215, 55, 927],
[83, 91, 1, 645, 1253, 927],
]
# By default, this will pad using 0s; it is configurable via the
# "value" parameter.
# Note that you could "pre" padding (at the beginning) or
# "post" padding (at the end).
# We recommend using "post" padding when working with RNN layers
# (in order to be able to use the
# CuDNN implementation of the layers).
padded_inputs = tf.keras.preprocessing.sequence.pad_sequences(
raw_inputs, padding="post"
)
print(padded_inputs)
In [ ]:
embedding = layers.Embedding(input_dim=5000, output_dim=16, mask_zero=True)
masked_output = embedding(padded_inputs)
print(masked_output._keras_mask)
masking_layer = layers.Masking()
# Simulate the embedding lookup by expanding the 2D input to 3D,
# with embedding dimension of 10.
unmasked_embedding = tf.cast(
tf.tile(tf.expand_dims(padded_inputs, axis=-1), [1, 1, 10]), tf.float32
)
masked_embedding = masking_layer(unmasked_embedding)
print(masked_embedding._keras_mask)
出力された結果から分かるように、マスクは形状が(batch_size, sequence_length)
の 2 次元ブールテンソルであり、そこでは個々の False
エントリは、対応する時間ステップを処理中に無視すべきであることを示しています。
Functional API または Sequential API を使用する場合、Embedding
レイヤーまたは Masking
レイヤーによって生成されたマスクは、それらを使用できる任意のレイヤー(例えば RNN レイヤーなど)にネットワークを介して伝播されます。Keras は入力に対応するマスクを自動的に取得し、その使用方法を知っている任意のレイヤーに渡します。
例えば、以下の Sequential API モデルでは、LSTM
レイヤーは自動的にマスクを取得します。つまりこれは、パディングされた値を無視するということです。
In [ ]:
model = keras.Sequential(
[layers.Embedding(input_dim=5000, output_dim=16, mask_zero=True), layers.LSTM(32),]
)
これは、以下の Functional API モデルでも同様です。
In [ ]:
inputs = keras.Input(shape=(None,), dtype="int32")
x = layers.Embedding(input_dim=5000, output_dim=16, mask_zero=True)(inputs)
outputs = layers.LSTM(32)(x)
model = keras.Model(inputs, outputs)
マスクを扱うことができるレイヤー(LSTM
レイヤーなど)は、それらの __call__
メソッドに mask
引数を持っています。
一方、マスクを生成するレイヤー(例えば
Embedding
)は、呼び出し可能な compute_mask(input, previous_mask)
メソッドを公開します。
例えば下記のようにして、マスクを生成するレイヤーの compute_mask()
メソッドの出力を、マスクを消費するレイヤーの __call__
メソッドに渡すことができます。
In [ ]:
class MyLayer(layers.Layer):
def __init__(self, **kwargs):
super(MyLayer, self).__init__(**kwargs)
self.embedding = layers.Embedding(input_dim=5000, output_dim=16, mask_zero=True)
self.lstm = layers.LSTM(32)
def call(self, inputs):
x = self.embedding(inputs)
# Note that you could also prepare a `mask` tensor manually.
# It only needs to be a boolean tensor
# with the right shape, i.e. (batch_size, timesteps).
mask = self.embedding.compute_mask(inputs)
output = self.lstm(x, mask=mask) # The layer will ignore the masked values
return output
layer = MyLayer()
x = np.random.random((32, 10)) * 100
x = x.astype("int32")
layer(x)
場合によっては、マスクを生成するレイヤー(Embedding
など)や、現在のマスクを変更するレイヤーを書く必要があります。
例えば、時間次元で連結する Concatenate
レイヤーのように、入力とは異なる時間次元を持つテンソルを生成するレイヤーは、現在のマスクを変更して、マスクされた時間ステップを下流のレイヤーが適切に考慮に入れられるようにする必要があります。
これを行うには、レイヤーに layer.compute_mask()
メソッドを実装します。これは、入力と現在のマスクが与えられた時に新しいマスクを生成します。
ここでは、現在のマスクを変更する必要がある TemporalSplit
レイヤーの例を示します。
In [ ]:
class TemporalSplit(keras.layers.Layer):
"""Split the input tensor into 2 tensors along the time dimension."""
def call(self, inputs):
# Expect the input to be 3D and mask to be 2D, split the input tensor into 2
# subtensors along the time axis (axis 1).
return tf.split(inputs, 2, axis=1)
def compute_mask(self, inputs, mask=None):
# Also split the mask into 2 if it presents.
if mask is None:
return None
return tf.split(mask, 2, axis=1)
first_half, second_half = TemporalSplit()(masked_embedding)
print(first_half._keras_mask)
print(second_half._keras_mask)
もう 1 つの例として、入力値からマスクを生成できる CustomEmbedding
レイヤーの例を示します。
In [ ]:
class CustomEmbedding(keras.layers.Layer):
def __init__(self, input_dim, output_dim, mask_zero=False, **kwargs):
super(CustomEmbedding, self).__init__(**kwargs)
self.input_dim = input_dim
self.output_dim = output_dim
self.mask_zero = mask_zero
def build(self, input_shape):
self.embeddings = self.add_weight(
shape=(self.input_dim, self.output_dim),
initializer="random_normal",
dtype="float32",
)
def call(self, inputs):
return tf.nn.embedding_lookup(self.embeddings, inputs)
def compute_mask(self, inputs, mask=None):
if not self.mask_zero:
return None
return tf.not_equal(inputs, 0)
layer = CustomEmbedding(10, 32, mask_zero=True)
x = np.random.random((3, 10)) * 9
x = x.astype("int32")
y = layer(x)
mask = layer.compute_mask(x)
print(mask)
ほとんどのレイヤーは時間次元を変更しないため、現在のマスクを変更する必要はありません。しかし、現在のマスクを変更せずにそれらを次のレイヤーに伝播したい場合があります。これはオプトイン動作です。 デフォルトでは、(フレームワークがマスクの伝播が安全かどうか判断する方法を持たないため)カスタムレイヤーは現在のマスクを破棄します。
時間次元を変更しないカスタムレイヤーを持ち、それが現在の入力マスクを伝播できるようにしたい場合は、レイヤーのコンストラクタを self.supports_masking = True
に設定する必要があります。この場合、compute_mask()
のデフォルトの動作は、現在のマスクを通過させるだけとなります。
マスク伝搬のためにホワイトリスト化されたレイヤーの例を示します。:
In [ ]:
class MyActivation(keras.layers.Layer):
def __init__(self, **kwargs):
super(MyActivation, self).__init__(**kwargs)
# Signal that the layer is safe for mask propagation
self.supports_masking = True
def call(self, inputs):
return tf.nn.relu(inputs)
これで、マスク生成レイヤー(Embedding
など)とマスク消費レイヤー(LSTM
など)間でこのカスタムレイヤーの使用が可能となり、マスク消費レイヤーまで届くようにマスクを渡します。
In [ ]:
inputs = keras.Input(shape=(None,), dtype="int32")
x = layers.Embedding(input_dim=5000, output_dim=16, mask_zero=True)(inputs)
x = MyActivation()(x) # Will pass the mask along
print("Mask found:", x._keras_mask)
outputs = layers.LSTM(32)(x) # Will receive the mask
model = keras.Model(inputs, outputs)
In [ ]:
class TemporalSoftmax(keras.layers.Layer):
def call(self, inputs, mask=None):
broadcast_float_mask = tf.expand_dims(tf.cast(mask, "float32"), -1)
inputs_exp = tf.exp(inputs) * broadcast_float_mask
inputs_sum = tf.reduce_sum(inputs * broadcast_float_mask, axis=1, keepdims=True)
return inputs_exp / inputs_sum
inputs = keras.Input(shape=(None,), dtype="int32")
x = layers.Embedding(input_dim=10, output_dim=32, mask_zero=True)(inputs)
x = layers.Dense(1)(x)
outputs = TemporalSoftmax()(x)
model = keras.Model(inputs, outputs)
y = model(np.random.randint(0, 10, size=(32, 100)), np.random.random((32, 100, 1)))
Keras のパディングとマスキングについて知っておくべきことはこれだけです。以下に要約します。
Embedding
は入力値からマスクを生成することができ(mask_zero=True
の場合)、Masking
レイヤーも同様に生成することができます。mask
引数を __call__
メソッドで公開します。RNN レイヤーはこれに該当します。mask
引数をレイヤーに手動で渡すことができます。