In [1]:
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
torch.manual_seed(1)
Out[1]:
In [2]:
inputs = [autograd.Variable(torch.randn((1, 3)))
for _ in range(5)] # 長さが5, サイズが(1,3)の入力層を生成し、乱数初期化
In [3]:
lstm = nn.LSTM(3, 3) #入力3次元、出力3次元のLSTM層生成
In [4]:
hidden = (autograd.Variable(torch.randn(1, 1, 3)),
autograd.Variable(torch.randn((1, 1, 3)))) # サイズ(1x1x3)で2層の隠れ層を生成し、乱数初期化
In [5]:
print(len(inputs))
print(inputs[0].size())
In [6]:
print(lstm)
In [7]:
print(len(hidden))
print(hidden[0])
In [8]:
inputs[0] # 入力データサンプル(1x3テンソル)の表示
Out[8]:
In [9]:
inputs[0].view(1, 1, -1) # 転置により1x3テンソルを1x(1x3)テンソルとして表示
Out[9]:
In [10]:
# 長さが5となるシーケンスは一つ一つが1x1x3のテンソル
# LSTMはサイズ3x3の隠れ層を経由して出力として1x1x3 テンソルを生成する。
# チュートリアルの例では、5回この計算が行われるものの上書きされているため、outのサイズは1x1x3のまま。
for i in inputs:
out, hidden = lstm(i.view(1, 1, -1), hidden)
In [11]:
print(out)
In [12]:
print(hidden)
ここでは単語の品詞(名詞、動詞など)を推測するためのモデルを生成する。
In [13]:
# 訓練データと呼べるほど大きくはないが今回はこれを使用する
training_data = [
("The dog ate the apple".split(), ["DET", "NN", "V", "DET", "NN"]),
("Everybody read that book".split(), ["NN", "V", "DET", "NN"])
]
In [14]:
# 単語をインデックスに変換する辞書を生成
word_to_ix = {}
for sent, tags in training_data:
for word in sent:
if word not in word_to_ix:
word_to_ix[word] = len(word_to_ix)
print(word_to_ix)
In [15]:
# タグの方もインデックスに変換する辞書を生成するが、こちらは手入力で。
tag_to_ix = {"DET": 0, "NN": 1, "V": 2}
print(tag_to_ix)
In [16]:
# モデルの生成
class LSTMTagger(nn.Module):
def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
super(LSTMTagger, self).__init__()
self.hidden_dim = hidden_dim
self.word_embeddings = nn.Embedding(vocab_size, embedding_dim) # (9,6)
# LSTM は埋め込み層と隠れ層の間に配置される。よって
# 入力データの次元は、上記埋め込み層のサイズに合わせ、
# 出力データの次元は、隠れ層のサイズに合わせる。
self.lstm = nn.LSTM(embedding_dim, hidden_dim) # (6, 6)
# 最終的に、隠れ層は出力層の次元(タグデータの空間次元)へ線形写像変換する
self.hidden2tag = nn.Linear(hidden_dim, tagset_size) # (6,3)
self.hidden = self.init_hidden()
def init_hidden(self):
return (autograd.Variable(torch.zeros(1, 1, self.hidden_dim)),
autograd.Variable(torch.zeros(1, 1, self.hidden_dim)))
def forward(self, sentence):
embeds = self.word_embeddings(sentence)
lstm_out, self.hidden = self.lstm(
embeds.view(len(sentence), 1, -1), self.hidden) # embedsテンソルを転置してからlstmへ渡す
tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1)) # lstm_outテンソルを転置してからhidden2tagへ渡す
tag_scores = F.log_softmax(tag_space)
return tag_scores
In [17]:
# モデル生成
EMBEDDING_DIM = 6
HIDDEN_DIM = 6
model = LSTMTagger(EMBEDDING_DIM, HIDDEN_DIM, len(word_to_ix), len(tag_to_ix)) # 6,6,9,3
In [18]:
# 損失関数と最適化関数の生成
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)
In [19]:
# wordやtagといったシーケンスをword_to_ixやtag_to_ixといった関数を用いて連続するインデックスに変え
# さらにはモデルに入力するため自動微分変数にする関数
def prepare_sequence(seq, to_ix):
idxs = [to_ix[w] for w in seq]
tensor = torch.LongTensor(idxs)
return autograd.Variable(tensor)
In [20]:
# サンプルで訓練前の推測値を検証
inputs = prepare_sequence(training_data[0][0], word_to_ix)
tag_scores = model(inputs)
print(tag_scores)
In [21]:
# 訓練
for epoch in range(300):
for sentence, tags in training_data:
# 勾配は次第に加減算するが、まずは初期化
model.zero_grad()
# 隠れ層初期化
model.hidden = model.init_hidden()
# ワードとタグのリストを生成
sentence_in = prepare_sequence(sentence, word_to_ix)
targets = prepare_sequence(tags, tag_to_ix)
# ワードリストを入力してモデルを実行するとタグリストが得られる
tag_scores = model(sentence_in)
# タグリスト(推測値)と期待値をもとに損失計算(誤差逆伝搬)
loss = loss_function(tag_scores, targets)
loss.backward()
# 最適化関数呼び出しによりパラメータ調整
optimizer.step()
In [22]:
# サンプルで訓練後の推測値計算
sample_word_list = prepare_sequence(training_data[0][0], word_to_ix)
tag_scores = model(sample_word_list)
In [23]:
# サンプルのタグリスト取得
sample_tag_list = prepare_sequence(training_data[0][1], tag_to_ix)
In [24]:
# 予測値と対応するインデックスの最大値を取得
max_values, max_indices = tag_scores.max(-1)
In [25]:
print(tag_scores)
print(max_values)
In [26]:
print(training_data[0][0])
print('Correct indeces:',sample_tag_list)
print('Predicted indces:',max_indices)
In [27]:
import torch
import torch.autograd as autograd
import torch.nn as nn
import torch.optim as optim
In [28]:
# 乱数生成
torch.manual_seed(1)
Out[28]:
In [29]:
# 変数をpythonリストに変更して第1要素を返すヘルパー関数
def to_scalar(var):
return var.view(-1).data.tolist()[0]
In [30]:
x_list = [1., 2., 3.]
x_tensor = torch.Tensor(x_list)
x_var = autograd.Variable(x_tensor)
print(x_list)
print(x_tensor)
print(x_var)
print(x_var.data)
print(x_var.view(-1).data)
In [31]:
print(to_scalar(x_var))
In [32]:
# 各行最大値に対応するインデックスの中で第1要素を返すヘルパー関数
def argmax(vec):
_, idx = torch.max(vec, 1)
return to_scalar(idx)
In [33]:
# 大きなデータも扱えるよう、log sum exp のヘルパー関数を定義(logΣexp(x))
# https://encrypted.google.com/url?sa=t&source=web&cd=3&ved=0CB0QFjAC&url=http%3A%2F%2Fcl.aist-nara.ac.jp%2Findex.php%3Fplugin%3Dattach%26refer%3DDMLA%252F2005%25C7%25AF%25C5%25D9%26openfile%3D2005-06-07.pdf&ei=xqgoTKATjO2QBfvH4MIC&usg=AFQjCNEG5ciSo4oL_rREEztIXLntuaxtag&sig2=_sgtsvTi0rtZh3pJloNNkw
def log_sum_exp(vec):
# 1)自動微分変数vecの第一行目の最大値(最大テンソル)を算出
# 0.1404 0.9391 0.1981
# 0.7784 0.8007 0.7160
# 0.9683 0.8028 0.3134
# [torch.FloatTensor of size 3x3]
# => 0.9391
# 2) 最大値(最大テンソル)を転置
# 0.9391=> 0.9391
# 3) vecの列長と同じサイズのベクトルを生成し、値に最大値(最大テンソル)をmax_score_broadcastへ格納
# 0.9391 =>
# 0.9391 0.9391 0.9391
# [torch.FloatTensor of size 1x3]
# 4) vecとmax_score_broadcastとの差を指数関数関数、総和、対数の関数に入力し、最大値との総和をとる
max_score = vec[0, argmax(vec)]
max_score_broadcast = max_score.view(1, -1).expand(1, vec.size()[1])
return max_score + \
torch.log(torch.sum(torch.exp(vec - max_score_broadcast)))
In [34]:
x_var3x3 = autograd.Variable(torch.rand(3, 3))
print(x_var3x3)
print(torch.max(x_var3x3, 1))
print('First element in index list:', argmax(x_var3x3))
print(x_var3x3[0, argmax(x_var3x3)])
print('log_sum_exp', log_sum_exp(x_var3x3))
In [35]:
# シーケンスをインデックス化したのち、自動微分変数に変換するヘルパー関数
def prepare_sequence(seq, to_ix):
idxs = [to_ix[w] for w in seq]
tensor = torch.LongTensor(idxs)
return autograd.Variable(tensor)
In [36]:
print(prepare_sequence(training_data[0][0], word_to_ix))
In [37]:
# Bi-LSTM+CRFモデルの生成
class BiLSTM_CRF(nn.Module):
def __init__(self, vocab_size, tag_to_ix, embedding_dim, hidden_dim):
super(BiLSTM_CRF, self).__init__()
self.embedding_dim = embedding_dim
self.hidden_dim = hidden_dim
self.vocab_size = vocab_size
self.tag_to_ix = tag_to_ix
self.tagset_size = len(tag_to_ix)
self.word_embeds = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2,
num_layers=1, bidirectional=True)
# 隠れ層の次元を出力層の次元へ線形写像変換
self.hidden2tag = nn.Linear(hidden_dim, self.tagset_size)
# 乱数初期化の後、モジュールパラメータとして保存
# ワードからワードへの遷移確率をスコア化したパラメータ
# グラフの最短経路をビタビアルゴリズムによって求める
self.transitions = nn.Parameter(
torch.randn(self.tagset_size, self.tagset_size))
# モジュールパラメータの中でも特例としてスタートタグ、ストップタグの
# 全パラメータを-10000で初期化。あらゆるワードはスタートワードへ遷移
# しないし、ストップワードからいかなるワードへも遷移しないため。
self.transitions.data[tag_to_ix[START_TAG], :] = -10000
self.transitions.data[:, tag_to_ix[STOP_TAG]] = -10000
# 下記に定義されている通り、(2, 1, self.hidden_dim/2)サイズの隠れ層を2層初期化
self.hidden = self.init_hidden()
def init_hidden(self):
return (autograd.Variable(torch.randn(2, 1, self.hidden_dim // 2)),
autograd.Variable(torch.randn(2, 1, self.hidden_dim // 2)))
def _forward_alg(self, feats):
# スタートタグ以外を全て-10000で初期化
init_alphas = torch.Tensor(1, self.tagset_size).fill_(-10000.)
init_alphas[0][self.tag_to_ix[START_TAG]] = 0.
# 初期化したアルファを自動微分変数に変換
forward_var = autograd.Variable(init_alphas)
# 1文ごとの繰り返し処理
for feat in feats:
alphas_t = [] # Forward計算によって導かれるアルファを保存
for next_tag in range(self.tagset_size):
# _get_lstm_features()により得られた次ワードタグの特徴量を転置の後、tagset_sizeのベクトルに拡張
emit_score = feat[next_tag].view(1, -1).expand(1, self.tagset_size)
# 次のワードのタグをnext_tagとすると、このタグへの遷移確率がself.transitions[next_tag]
# によって得られる。trans_scoreはこれを転置したテンソル。
trans_score = self.transitions[next_tag].view(1, -1)
# 次のワードへ遷移するエッジ(i -> next_tag) の値
# 遷移確率のスコアtrans_score, タグを得るためのスコアemit_scoreからなり、
# これらを前回まで計算したスコアforward_varに追加するとあたらしい確率変数が得られる
next_tag_var = forward_var + trans_score + emit_score
# log-sum-exp の後、アルファに登録する
alphas_t.append(log_sum_exp(next_tag_var))
# alphas_t をつなぎ合わせた後、転置した変数がforward_var
forward_var = torch.cat(alphas_t).view(1, -1)
# 終端のみは、STOP_TAGをベースに特別な計算が行われる
terminal_var = forward_var + self.transitions[self.tag_to_ix[STOP_TAG]]
alpha = log_sum_exp(terminal_var)
return alpha
# 埋め込み層、LSTM層、隠れ層、出力層の順に値を渡して特徴量を抽出する
def _get_lstm_features(self, sentence):
self.hidden = self.init_hidden()
embeds = self.word_embeds(sentence).view(len(sentence), 1, -1)
lstm_out, self.hidden = self.lstm(embeds, self.hidden)
lstm_out = lstm_out.view(len(sentence), self.hidden_dim)
lstm_feats = self.hidden2tag(lstm_out)
return lstm_feats
# 連続するタグ (tag sequnce)のスコアを算出する関数
def _score_sentence(self, feats, tags):
score = autograd.Variable(torch.Tensor([0]))
tags = torch.cat([torch.LongTensor([self.tag_to_ix[START_TAG]]), tags])
for i, feat in enumerate(feats):
score = score + \
self.transitions[tags[i + 1], tags[i]] + feat[tags[i + 1]]
score = score + self.transitions[self.tag_to_ix[STOP_TAG], tags[-1]]
return score
# ビタビアルゴリズムによるBackwardアルゴリズムの実行
def _viterbi_decode(self, feats):
backpointers = []
# 初期化済みパラメータを用意
init_vvars = torch.Tensor(1, self.tagset_size).fill_(-10000.)
init_vvars[0][self.tag_to_ix[START_TAG]] = 0
# 初期化済みパラメータを自動微分変数に変更。スタートタグ以外、値はまだ-10000.
forward_var = autograd.Variable(init_vvars)
for feat in feats:
bptrs_t = [] # holds the backpointers for this step
viterbivars_t = [] # holds the viterbi variables for this step
for next_tag in range(self.tagset_size):
# forward時と異なり、遷移パラメータからのみ次のタグの確率変数が得られる
# このforループを抜けた後、ワード→タグの確率変数(emission)を得る
# これから最大値をとるIDをbest_tag_idとし、タグのIDとなる
next_tag_var = forward_var + self.transitions[next_tag]
best_tag_id = argmax(next_tag_var)
bptrs_t.append(best_tag_id)
viterbivars_t.append(next_tag_var[0][best_tag_id])
forward_var = (torch.cat(viterbivars_t) + feat).view(1, -1)
backpointers.append(bptrs_t)
# STOP_TAGのみは終端のため計算方法が特別となる
terminal_var = forward_var + self.transitions[self.tag_to_ix[STOP_TAG]]
best_tag_id = argmax(terminal_var)
path_score = terminal_var[0][best_tag_id]
# バックポインタを逆順にたどっていくとベストスコアのパスが得られる
best_path = [best_tag_id]
for bptrs_t in reversed(backpointers):
best_tag_id = bptrs_t[best_tag_id]
best_path.append(best_tag_id)
start = best_path.pop()
assert start == self.tag_to_ix[START_TAG] # Sanity check
best_path.reverse()
return path_score, best_path
def neg_log_likelihood(self, sentence, tags):
feats = self._get_lstm_features(sentence)
forward_score = self._forward_alg(feats)
gold_score = self._score_sentence(feats, tags)
return forward_score - gold_score
# この関数はモデルが呼ばれるときに、自動的に呼ばれる
def forward(self, sentence):
# 特徴量を取得
lstm_feats = self._get_lstm_features(sentence)
# ベストスコアが出されるタグシーケンスおよびそのスコアを返す
score, tag_seq = self._viterbi_decode(lstm_feats)
return score, tag_seq
In [38]:
# 初期化
START_TAG = "<START>"
STOP_TAG = "<STOP>"
EMBEDDING_DIM = 5
HIDDEN_DIM = 4
In [39]:
# 訓練データ生成(たった2文だが。。)
training_data = [(
"the wall street journal reported today that apple corporation made money".split(),
"B I I I O O O B I O O".split()
), (
"georgia tech is a university in georgia".split(),
"B I O O O O B".split()
)]
In [40]:
# ワードリスト中のワードからインデックスへ変換する辞書の生成
word_to_ix = {}
for sentence, tags in training_data:
for word in sentence:
if word not in word_to_ix: #未登録のワードインデックスを新規登録
word_to_ix[word] = len(word_to_ix)
In [41]:
word_to_ix
Out[41]:
In [42]:
#タグからインデックスへ変換する辞書の定義
tag_to_ix = {"B": 0, "I": 1, "O": 2, START_TAG: 3, STOP_TAG: 4}
In [43]:
# モデル生成
vocab_size = len(word_to_ix)
model = BiLSTM_CRF(vocab_size, tag_to_ix, EMBEDDING_DIM, HIDDEN_DIM)
In [44]:
# 最適化関数生成
optimizer = optim.SGD(model.parameters(), lr=0.01, weight_decay=1e-4)
In [45]:
# 訓練前にサンプルデータで推論してみる
precheck_sent = prepare_sequence(training_data[0][0], word_to_ix)
precheck_tags = torch.LongTensor([tag_to_ix[t] for t in training_data[0][1]])
precheck_result = model(precheck_sent) # model()を実行するとforward() が自動的に呼ばれる
model()が呼ばれると、登録されていた以下のコールバック関数が呼ばれる。
python:title
def __call__(self, *input, **kwargs):
for hook in self._forward_pre_hooks.values():
hook(self, input)
result = self.forward(*input, **kwargs)
これによりforward()関数が呼ばれている
In [46]:
print(precheck_sent)
print(precheck_tags)
print(precheck_result) # score, tag_seq
In [47]:
for epoch in range(300):
for sentence, tags in training_data:
model.zero_grad()
sentence_in = prepare_sequence(sentence, word_to_ix)
targets = torch.LongTensor([tag_to_ix[t] for t in tags])
# forward
neg_log_likelihood = model.neg_log_likelihood(sentence_in, targets)
# backward
neg_log_likelihood.backward()
optimizer.step()
In [48]:
# 訓練後に再度サンプルデータで推測してみる
precheck_sent = prepare_sequence(training_data[0][0], word_to_ix)
print(model(precheck_sent))
In [ ]: