Library Exploration 2: PyTorch

1. LSTM


In [1]:
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

torch.manual_seed(1)


Out[1]:
<torch._C.Generator at 0x109ae3120>

In [2]:
inputs = [autograd.Variable(torch.randn((1, 3)))
          for _ in range(5)]  # 長さが5, サイズが(1,3)の入力層を生成し、乱数初期化

In [3]:
lstm = nn.LSTM(3, 3)  #入力3次元、出力3次元のLSTM層生成

In [4]:
hidden = (autograd.Variable(torch.randn(1, 1, 3)),
          autograd.Variable(torch.randn((1, 1, 3))))  # サイズ(1x1x3)で2層の隠れ層を生成し、乱数初期化

In [5]:
print(len(inputs))
print(inputs[0].size())


5
torch.Size([1, 3])

In [6]:
print(lstm)


LSTM(3, 3)

In [7]:
print(len(hidden))
print(hidden[0])


2
Variable containing:
(0 ,.,.) = 
  1.2282 -1.2328 -0.6195
[torch.FloatTensor of size 1x1x3]


In [8]:
inputs[0]  # 入力データサンプル(1x3テンソル)の表示


Out[8]:
Variable containing:
-2.9718  1.7070 -0.4305
[torch.FloatTensor of size 1x3]

In [9]:
inputs[0].view(1, 1, -1)  # 転置により1x3テンソルを1x(1x3)テンソルとして表示


Out[9]:
Variable containing:
(0 ,.,.) = 
 -2.9718  1.7070 -0.4305
[torch.FloatTensor of size 1x1x3]

In [10]:
# 長さが5となるシーケンスは一つ一つが1x1x3のテンソル
# LSTMはサイズ3x3の隠れ層を経由して出力として1x1x3 テンソルを生成する。
# チュートリアルの例では、5回この計算が行われるものの上書きされているため、outのサイズは1x1x3のまま。
for i in inputs:
    out, hidden = lstm(i.view(1, 1, -1), hidden)

In [11]:
print(out)


Variable containing:
(0 ,.,.) = 
 -0.1863  0.0369 -0.0581
[torch.FloatTensor of size 1x1x3]


In [12]:
print(hidden)


(Variable containing:
(0 ,.,.) = 
 -0.1863  0.0369 -0.0581
[torch.FloatTensor of size 1x1x3]
, Variable containing:
(0 ,.,.) = 
 -0.3730  0.1763 -0.0977
[torch.FloatTensor of size 1x1x3]
)

2. POS Tagging using LSTM

ここでは単語の品詞(名詞、動詞など)を推測するためのモデルを生成する。


In [13]:
# 訓練データと呼べるほど大きくはないが今回はこれを使用する
training_data = [
    ("The dog ate the apple".split(), ["DET", "NN", "V", "DET", "NN"]),
    ("Everybody read that book".split(), ["NN", "V", "DET", "NN"])
]

In [14]:
# 単語をインデックスに変換する辞書を生成
word_to_ix = {}
for sent, tags in training_data:
    for word in sent:
        if word not in word_to_ix:
            word_to_ix[word] = len(word_to_ix)
print(word_to_ix)


{'The': 0, 'dog': 1, 'ate': 2, 'the': 3, 'apple': 4, 'Everybody': 5, 'read': 6, 'that': 7, 'book': 8}

In [15]:
# タグの方もインデックスに変換する辞書を生成するが、こちらは手入力で。
tag_to_ix = {"DET": 0, "NN": 1, "V": 2}
print(tag_to_ix)


{'DET': 0, 'NN': 1, 'V': 2}

In [16]:
# モデルの生成
class LSTMTagger(nn.Module):

    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(LSTMTagger, self).__init__()
        self.hidden_dim = hidden_dim

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim) # (9,6)

        # LSTM は埋め込み層と隠れ層の間に配置される。よって
        # 入力データの次元は、上記埋め込み層のサイズに合わせ、
        # 出力データの次元は、隠れ層のサイズに合わせる。
        self.lstm = nn.LSTM(embedding_dim, hidden_dim)  # (6, 6)

        # 最終的に、隠れ層は出力層の次元(タグデータの空間次元)へ線形写像変換する
        self.hidden2tag = nn.Linear(hidden_dim, tagset_size)  # (6,3)
        self.hidden = self.init_hidden()

    def init_hidden(self):
        return (autograd.Variable(torch.zeros(1, 1, self.hidden_dim)),
                autograd.Variable(torch.zeros(1, 1, self.hidden_dim)))

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, self.hidden = self.lstm(
            embeds.view(len(sentence), 1, -1), self.hidden)  # embedsテンソルを転置してからlstmへ渡す
        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))  # lstm_outテンソルを転置してからhidden2tagへ渡す
        tag_scores = F.log_softmax(tag_space)
        return tag_scores

In [17]:
# モデル生成
EMBEDDING_DIM = 6
HIDDEN_DIM = 6
model = LSTMTagger(EMBEDDING_DIM, HIDDEN_DIM, len(word_to_ix), len(tag_to_ix))  # 6,6,9,3

In [18]:
# 損失関数と最適化関数の生成
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

In [19]:
# wordやtagといったシーケンスをword_to_ixやtag_to_ixといった関数を用いて連続するインデックスに変え
# さらにはモデルに入力するため自動微分変数にする関数
def prepare_sequence(seq, to_ix):
    idxs = [to_ix[w] for w in seq]
    tensor = torch.LongTensor(idxs)
    return autograd.Variable(tensor)

In [20]:
# サンプルで訓練前の推測値を検証
inputs = prepare_sequence(training_data[0][0], word_to_ix)
tag_scores = model(inputs)
print(tag_scores)


Variable containing:
-1.2086 -1.0755 -1.0209
-1.1589 -1.0920 -1.0480
-1.1567 -1.0524 -1.0895
-1.1299 -1.0652 -1.1018
-1.1969 -1.0619 -1.0439
[torch.FloatTensor of size 5x3]


In [21]:
# 訓練
for epoch in range(300):
    for sentence, tags in training_data:
        # 勾配は次第に加減算するが、まずは初期化
        model.zero_grad()

        # 隠れ層初期化
        model.hidden = model.init_hidden()

        # ワードとタグのリストを生成
        sentence_in = prepare_sequence(sentence, word_to_ix)
        targets = prepare_sequence(tags, tag_to_ix)

        # ワードリストを入力してモデルを実行するとタグリストが得られる
        tag_scores = model(sentence_in)

        # タグリスト(推測値)と期待値をもとに損失計算(誤差逆伝搬)
        loss = loss_function(tag_scores, targets)
        loss.backward()
        
        # 最適化関数呼び出しによりパラメータ調整
        optimizer.step()

In [22]:
# サンプルで訓練後の推測値計算
sample_word_list = prepare_sequence(training_data[0][0], word_to_ix)
tag_scores = model(sample_word_list)

In [23]:
# サンプルのタグリスト取得
sample_tag_list = prepare_sequence(training_data[0][1], tag_to_ix)

In [24]:
# 予測値と対応するインデックスの最大値を取得
max_values, max_indices = tag_scores.max(-1)

In [25]:
print(tag_scores)
print(max_values)


Variable containing:
-0.4581 -1.8573 -1.5538
-5.8521 -0.0356 -3.4393
-3.3357 -3.2877 -0.0757
-0.0434 -4.6701 -3.4079
-5.1820 -0.0188 -4.3449
[torch.FloatTensor of size 5x3]

Variable containing:
-0.4581
-0.0356
-0.0757
-0.0434
-0.0188
[torch.FloatTensor of size 5]


In [26]:
print(training_data[0][0])
print('Correct indeces:',sample_tag_list)
print('Predicted indces:',max_indices)


['The', 'dog', 'ate', 'the', 'apple']
Correct indeces: Variable containing:
 0
 1
 2
 0
 1
[torch.LongTensor of size 5]

Predicted indces: Variable containing:
 0
 1
 2
 0
 1
[torch.LongTensor of size 5]

3. POS Tagging using Bi-LSTM and CRF


In [27]:
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.optim as optim

In [28]:
# 乱数生成
torch.manual_seed(1)


Out[28]:
<torch._C.Generator at 0x109ae3120>

In [29]:
# 変数をpythonリストに変更して第1要素を返すヘルパー関数
def to_scalar(var):
    return var.view(-1).data.tolist()[0]

In [30]:
x_list = [1., 2., 3.]
x_tensor = torch.Tensor(x_list)
x_var = autograd.Variable(x_tensor)
print(x_list)
print(x_tensor)
print(x_var)
print(x_var.data)
print(x_var.view(-1).data)


[1.0, 2.0, 3.0]

 1
 2
 3
[torch.FloatTensor of size 3]

Variable containing:
 1
 2
 3
[torch.FloatTensor of size 3]


 1
 2
 3
[torch.FloatTensor of size 3]


 1
 2
 3
[torch.FloatTensor of size 3]


In [31]:
print(to_scalar(x_var))


1.0

In [32]:
# 各行最大値に対応するインデックスの中で第1要素を返すヘルパー関数
def argmax(vec):
    _, idx = torch.max(vec, 1)
    return to_scalar(idx)

In [33]:
# 大きなデータも扱えるよう、log sum exp のヘルパー関数を定義(logΣexp(x))
# https://encrypted.google.com/url?sa=t&source=web&cd=3&ved=0CB0QFjAC&url=http%3A%2F%2Fcl.aist-nara.ac.jp%2Findex.php%3Fplugin%3Dattach%26refer%3DDMLA%252F2005%25C7%25AF%25C5%25D9%26openfile%3D2005-06-07.pdf&ei=xqgoTKATjO2QBfvH4MIC&usg=AFQjCNEG5ciSo4oL_rREEztIXLntuaxtag&sig2=_sgtsvTi0rtZh3pJloNNkw
def log_sum_exp(vec):
    # 1)自動微分変数vecの第一行目の最大値(最大テンソル)を算出
    # 0.1404  0.9391  0.1981
    # 0.7784  0.8007  0.7160
    # 0.9683  0.8028  0.3134
    # [torch.FloatTensor of size 3x3]
    # => 0.9391
    # 2) 最大値(最大テンソル)を転置
    # 0.9391=> 0.9391
    # 3) vecの列長と同じサイズのベクトルを生成し、値に最大値(最大テンソル)をmax_score_broadcastへ格納
    # 0.9391 =>
    # 0.9391  0.9391  0.9391
    # [torch.FloatTensor of size 1x3]
    # 4) vecとmax_score_broadcastとの差を指数関数関数、総和、対数の関数に入力し、最大値との総和をとる
    max_score = vec[0, argmax(vec)]  
    max_score_broadcast = max_score.view(1, -1).expand(1, vec.size()[1])
    return max_score + \
        torch.log(torch.sum(torch.exp(vec - max_score_broadcast)))

In [34]:
x_var3x3 = autograd.Variable(torch.rand(3, 3))
print(x_var3x3)
print(torch.max(x_var3x3, 1))
print('First element in index list:', argmax(x_var3x3))
print(x_var3x3[0, argmax(x_var3x3)])
print('log_sum_exp', log_sum_exp(x_var3x3))


Variable containing:
 0.4170  0.9972  0.7203
 0.9326  0.0001  0.1281
 0.3023  0.9990  0.1468
[torch.FloatTensor of size 3x3]

(Variable containing:
 0.9972
 0.9326
 0.9990
[torch.FloatTensor of size 3]
, Variable containing:
 1
 0
 1
[torch.LongTensor of size 3]
)
First element in index list: 1
Variable containing:
 0.9972
[torch.FloatTensor of size 1]

log_sum_exp Variable containing:
 2.7843
[torch.FloatTensor of size 1]


In [35]:
# シーケンスをインデックス化したのち、自動微分変数に変換するヘルパー関数
def prepare_sequence(seq, to_ix):
    idxs = [to_ix[w] for w in seq]
    tensor = torch.LongTensor(idxs)
    return autograd.Variable(tensor)

In [36]:
print(prepare_sequence(training_data[0][0], word_to_ix))


Variable containing:
 0
 1
 2
 3
 4
[torch.LongTensor of size 5]


In [37]:
# Bi-LSTM+CRFモデルの生成
class BiLSTM_CRF(nn.Module):

    def __init__(self, vocab_size, tag_to_ix, embedding_dim, hidden_dim):
        super(BiLSTM_CRF, self).__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.vocab_size = vocab_size
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)

        self.word_embeds = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2,
                            num_layers=1, bidirectional=True)

        # 隠れ層の次元を出力層の次元へ線形写像変換
        self.hidden2tag = nn.Linear(hidden_dim, self.tagset_size)

        # 乱数初期化の後、モジュールパラメータとして保存
        # ワードからワードへの遷移確率をスコア化したパラメータ
        # グラフの最短経路をビタビアルゴリズムによって求める
        self.transitions = nn.Parameter(
            torch.randn(self.tagset_size, self.tagset_size))

        # モジュールパラメータの中でも特例としてスタートタグ、ストップタグの
        # 全パラメータを-10000で初期化。あらゆるワードはスタートワードへ遷移
        # しないし、ストップワードからいかなるワードへも遷移しないため。
        self.transitions.data[tag_to_ix[START_TAG], :] = -10000
        self.transitions.data[:, tag_to_ix[STOP_TAG]] = -10000

        # 下記に定義されている通り、(2, 1, self.hidden_dim/2)サイズの隠れ層を2層初期化
        self.hidden = self.init_hidden()

    def init_hidden(self):
        return (autograd.Variable(torch.randn(2, 1, self.hidden_dim // 2)),
                autograd.Variable(torch.randn(2, 1, self.hidden_dim // 2)))

    def _forward_alg(self, feats):
        # スタートタグ以外を全て-10000で初期化
        init_alphas = torch.Tensor(1, self.tagset_size).fill_(-10000.)
        init_alphas[0][self.tag_to_ix[START_TAG]] = 0.

        # 初期化したアルファを自動微分変数に変換
        forward_var = autograd.Variable(init_alphas)

        # 1文ごとの繰り返し処理
        for feat in feats:
            alphas_t = []  # Forward計算によって導かれるアルファを保存
            for next_tag in range(self.tagset_size):
                # _get_lstm_features()により得られた次ワードタグの特徴量を転置の後、tagset_sizeのベクトルに拡張
                emit_score = feat[next_tag].view(1, -1).expand(1, self.tagset_size)
                
                # 次のワードのタグをnext_tagとすると、このタグへの遷移確率がself.transitions[next_tag]
                # によって得られる。trans_scoreはこれを転置したテンソル。
                trans_score = self.transitions[next_tag].view(1, -1)
                
                # 次のワードへ遷移するエッジ(i -> next_tag) の値
                # 遷移確率のスコアtrans_score, タグを得るためのスコアemit_scoreからなり、
                # これらを前回まで計算したスコアforward_varに追加するとあたらしい確率変数が得られる
                next_tag_var = forward_var + trans_score + emit_score
                # log-sum-exp の後、アルファに登録する
                alphas_t.append(log_sum_exp(next_tag_var))
             
            # alphas_t をつなぎ合わせた後、転置した変数がforward_var
            forward_var = torch.cat(alphas_t).view(1, -1)
            
        # 終端のみは、STOP_TAGをベースに特別な計算が行われる
        terminal_var = forward_var + self.transitions[self.tag_to_ix[STOP_TAG]]
        alpha = log_sum_exp(terminal_var)
        
        return alpha

    # 埋め込み層、LSTM層、隠れ層、出力層の順に値を渡して特徴量を抽出する
    def _get_lstm_features(self, sentence):
        self.hidden = self.init_hidden()
        embeds = self.word_embeds(sentence).view(len(sentence), 1, -1)
        lstm_out, self.hidden = self.lstm(embeds, self.hidden)
        lstm_out = lstm_out.view(len(sentence), self.hidden_dim)
        lstm_feats = self.hidden2tag(lstm_out)
        return lstm_feats

    # 連続するタグ (tag sequnce)のスコアを算出する関数
    def _score_sentence(self, feats, tags):
        score = autograd.Variable(torch.Tensor([0]))
        tags = torch.cat([torch.LongTensor([self.tag_to_ix[START_TAG]]), tags])
        for i, feat in enumerate(feats):
            score = score + \
                self.transitions[tags[i + 1], tags[i]] + feat[tags[i + 1]]
        score = score + self.transitions[self.tag_to_ix[STOP_TAG], tags[-1]]
        return score

    # ビタビアルゴリズムによるBackwardアルゴリズムの実行
    def _viterbi_decode(self, feats):
        backpointers = []

        # 初期化済みパラメータを用意
        init_vvars = torch.Tensor(1, self.tagset_size).fill_(-10000.)
        init_vvars[0][self.tag_to_ix[START_TAG]] = 0

        # 初期化済みパラメータを自動微分変数に変更。スタートタグ以外、値はまだ-10000.
        forward_var = autograd.Variable(init_vvars)
        
        for feat in feats:
            bptrs_t = []  # holds the backpointers for this step
            viterbivars_t = []  # holds the viterbi variables for this step

            for next_tag in range(self.tagset_size):
                # forward時と異なり、遷移パラメータからのみ次のタグの確率変数が得られる
                # このforループを抜けた後、ワード→タグの確率変数(emission)を得る
                # これから最大値をとるIDをbest_tag_idとし、タグのIDとなる
                next_tag_var = forward_var + self.transitions[next_tag]
                best_tag_id = argmax(next_tag_var)
                bptrs_t.append(best_tag_id)
                viterbivars_t.append(next_tag_var[0][best_tag_id])
            forward_var = (torch.cat(viterbivars_t) + feat).view(1, -1)
            backpointers.append(bptrs_t)

        # STOP_TAGのみは終端のため計算方法が特別となる
        terminal_var = forward_var + self.transitions[self.tag_to_ix[STOP_TAG]]
        best_tag_id = argmax(terminal_var)
        path_score = terminal_var[0][best_tag_id]

        # バックポインタを逆順にたどっていくとベストスコアのパスが得られる
        best_path = [best_tag_id]
        for bptrs_t in reversed(backpointers):
            best_tag_id = bptrs_t[best_tag_id]
            best_path.append(best_tag_id)

        start = best_path.pop()
        assert start == self.tag_to_ix[START_TAG]  # Sanity check
        best_path.reverse()
        return path_score, best_path

    def neg_log_likelihood(self, sentence, tags):
        feats = self._get_lstm_features(sentence)
        forward_score = self._forward_alg(feats)
        gold_score = self._score_sentence(feats, tags)
        return forward_score - gold_score

    # この関数はモデルが呼ばれるときに、自動的に呼ばれる
    def forward(self, sentence):
        # 特徴量を取得
        lstm_feats = self._get_lstm_features(sentence)

        # ベストスコアが出されるタグシーケンスおよびそのスコアを返す
        score, tag_seq = self._viterbi_decode(lstm_feats)
        
        return score, tag_seq

In [38]:
# 初期化
START_TAG = "<START>"
STOP_TAG = "<STOP>"
EMBEDDING_DIM = 5
HIDDEN_DIM = 4

In [39]:
# 訓練データ生成(たった2文だが。。)
training_data = [(
    "the wall street journal reported today that apple corporation made money".split(),
    "B I I I O O O B I O O".split()
), (
    "georgia tech is a university in georgia".split(),
    "B I O O O O B".split()
)]

In [40]:
# ワードリスト中のワードからインデックスへ変換する辞書の生成
word_to_ix = {}
for sentence, tags in training_data:
    for word in sentence:
        if word not in word_to_ix: #未登録のワードインデックスを新規登録
            word_to_ix[word] = len(word_to_ix)

In [41]:
word_to_ix


Out[41]:
{'a': 14,
 'apple': 7,
 'corporation': 8,
 'georgia': 11,
 'in': 16,
 'is': 13,
 'journal': 3,
 'made': 9,
 'money': 10,
 'reported': 4,
 'street': 2,
 'tech': 12,
 'that': 6,
 'the': 0,
 'today': 5,
 'university': 15,
 'wall': 1}

In [42]:
#タグからインデックスへ変換する辞書の定義
tag_to_ix = {"B": 0, "I": 1, "O": 2, START_TAG: 3, STOP_TAG: 4}

In [43]:
# モデル生成
vocab_size = len(word_to_ix)
model = BiLSTM_CRF(vocab_size, tag_to_ix, EMBEDDING_DIM, HIDDEN_DIM)

In [44]:
# 最適化関数生成
optimizer = optim.SGD(model.parameters(), lr=0.01, weight_decay=1e-4)

In [45]:
# 訓練前にサンプルデータで推論してみる
precheck_sent = prepare_sequence(training_data[0][0], word_to_ix)
precheck_tags = torch.LongTensor([tag_to_ix[t] for t in training_data[0][1]])
precheck_result = model(precheck_sent)  # model()を実行するとforward() が自動的に呼ばれる

model()が呼ばれると、登録されていた以下のコールバック関数が呼ばれる。

python:title
    def __call__(self, *input, **kwargs):  
        for hook in self._forward_pre_hooks.values():  
            hook(self, input)  
        result = self.forward(*input, **kwargs)

これによりforward()関数が呼ばれている


In [46]:
print(precheck_sent)
print(precheck_tags)
print(precheck_result) # score, tag_seq


Variable containing:
  0
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
[torch.LongTensor of size 11]


 0
 1
 1
 1
 2
 2
 2
 0
 1
 2
 2
[torch.LongTensor of size 11]

(Variable containing:
 9.5973
[torch.FloatTensor of size 1]
, [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])

In [47]:
for epoch in range(300):
    for sentence, tags in training_data:
        model.zero_grad()
        sentence_in = prepare_sequence(sentence, word_to_ix)
        targets = torch.LongTensor([tag_to_ix[t] for t in tags])
        # forward
        neg_log_likelihood = model.neg_log_likelihood(sentence_in, targets)
        # backward
        neg_log_likelihood.backward()
        optimizer.step()

In [48]:
# 訓練後に再度サンプルデータで推測してみる
precheck_sent = prepare_sequence(training_data[0][0], word_to_ix)
print(model(precheck_sent))


(Variable containing:
 22.4088
[torch.FloatTensor of size 1]
, [0, 1, 1, 1, 2, 2, 2, 0, 1, 2, 2])

In [ ]: