In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import random
import time
In [2]:
def get_vocab(data_location, lowering = False):
with open(data_location, 'r') as fopen:
data = fopen.read()
if lowering:
data = data.lower()
vocab = list(set(data))
return data, vocab
def embed_to_onehot(data, vocab):
onehot = np.zeros((len(data), len(vocab)), dtype = np.float32)
for i in xrange(len(data)):
onehot[i, vocab.index(data[i])] = 1.0
return onehot
In [3]:
class Model:
def __init__(self, num_layers, size_layer, dimension_input, learning_rate, optimizer):
def lstm_cell():
return tf.nn.rnn_cell.LSTMCell(size_layer, state_is_tuple = False)
self.rnn_cells = tf.nn.rnn_cell.MultiRNNCell([lstm_cell() for _ in xrange(num_layers)], state_is_tuple = False)
self.X = tf.placeholder(tf.float32, [None, None, dimension_input])
self.Y = tf.placeholder(tf.float32, [None, None, dimension_input])
self.hidden_layer = tf.placeholder(tf.float32, (None, num_layers * 2 * size_layer))
self.outputs, self.last_state = tf.nn.dynamic_rnn(self.rnn_cells, self.X, initial_state = self.hidden_layer, dtype = tf.float32)
self.rnn_W = tf.Variable(tf.random_normal((size_layer, dimension_input)))
self.rnn_B = tf.Variable(tf.random_normal([dimension_input]))
self.logits = tf.matmul(tf.reshape(self.outputs, [-1, size_layer]), self.rnn_W) + self.rnn_B
seq_shape = tf.shape(self.outputs)
self.final_outputs = tf.reshape(tf.nn.softmax(self.logits), (seq_shape[0], seq_shape[1], dimension_input))
y_batch_long = tf.reshape(self.Y, [-1, dimension_input])
self.cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits = self.logits, labels = y_batch_long))
if optimizer == 0:
self.optimizer = tf.train.AdagradOptimizer(learning_rate = learning_rate).minimize(self.cost)
elif optimizer == 1:
self.optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate).minimize(self.cost)
elif optimizer == 2:
self.optimizer = tf.train.RMSPropOptimizer(learning_rate, 0.9).minimize(self.cost)
else:
exit(0)
self.correct_pred = tf.equal(tf.argmax(self.logits, 1), tf.argmax(y_batch_long, 1))
self.accuracy = tf.reduce_mean(tf.cast(self.correct_pred, tf.float32))
In [4]:
text, text_vocab = get_vocab('input.txt', lowering = False)
onehot = embed_to_onehot(text, text_vocab)
In [5]:
learning_rate = 0.01
batch_size = 100
sequence = 64
epoch = 2500
num_layers = 2
size_layer = 512
possible_batch_id = range(len(text) - batch_size - 1)
In [6]:
split_text = text.split()
tag = split_text[random.randint(0, len(split_text) - 1)]
print tag
In [7]:
def train():
LOST, ACCURACY = [], []
for i in xrange(epoch):
last_time = time.time()
init_value = np.zeros((sequence, num_layers * 2 * size_layer))
batch_x = np.zeros((sequence, batch_size, len(text_vocab)))
batch_y = np.zeros((sequence, batch_size, len(text_vocab)))
batch_id = random.sample(possible_batch_id, sequence)
for n in xrange(batch_size):
id1 = [k + n for k in batch_id]
id2 = [k + n + 1 for k in batch_id]
batch_x[:, n, :] = onehot[id1, :]
batch_y[:, n, :] = onehot[id2, :]
last_state, _, loss = sess.run([model.last_state, model.optimizer, model.cost], feed_dict = {model.X: batch_x, model.Y: batch_y, model.hidden_layer: init_value})
accuracy = sess.run(model.accuracy, feed_dict = {model.X: batch_x, model.Y: batch_y, model.hidden_layer: init_value})
ACCURACY.append(accuracy); LOST.append(loss)
if (i + 1) % 100 == 0:
print 'epoch: ' + str(i + 1) + ', accuracy: ' + str(accuracy) + ', loss: ' + str(loss) + ', s / epoch: ' + str((time.time() - last_time))
sns.set()
plt.figure(figsize = (15, 5))
plt.subplot(1, 2, 1)
EPOCH = [i for i in xrange(len(LOST))]
plt.plot(EPOCH, LOST)
plt.xlabel('epoch'); plt.ylabel('loss')
plt.subplot(1, 2, 2)
plt.plot(EPOCH, ACCURACY)
plt.xlabel('epoch'); plt.ylabel('accuracy')
plt.show()
In [8]:
def generate():
sentence_generated = tag
onehot = embed_to_onehot(tag, text_vocab)
init_value = np.zeros((sequence, num_layers * 2 * size_layer))
for i in xrange(len(tag)):
batch_x = np.zeros((sequence, 1, len(text_vocab)))
batch_x[:, 0, :] = onehot[i, :]
last_state, prob = sess.run([model.last_state, model.final_outputs], feed_dict = {model.X: batch_x, model.hidden_layer: init_value})
init_value = last_state
for i in xrange(1000):
char = np.random.choice(range(len(text_vocab)), p = prob[0][0])
element = text_vocab[char]
sentence_generated += element
onehot = embed_to_onehot(element, text_vocab)
batch_x = np.zeros((sequence, 1, len(text_vocab)))
batch_x[:, 0, :] = onehot[0, :]
last_state, prob = sess.run([model.last_state, model.final_outputs], feed_dict = {model.X: batch_x, model.hidden_layer: init_value})
init_value = last_state
print sentence_generated
In [9]:
sess = tf.InteractiveSession()
model = Model(num_layers, size_layer, len(text_vocab), learning_rate, 0)
sess.run(tf.global_variables_initializer())
train()
generate()
In [10]:
tf.reset_default_graph()
sess = tf.InteractiveSession()
model = Model(num_layers, size_layer, len(text_vocab), learning_rate, 1)
sess.run(tf.global_variables_initializer())
train()
generate()
In [11]:
tf.reset_default_graph()
sess = tf.InteractiveSession()
model = Model(num_layers, size_layer, len(text_vocab), learning_rate, 2)
sess.run(tf.global_variables_initializer())
train()
generate()