In [1]:
'''
A Recurrent Neural Network (LSTM) implementation example using TensorFlow..
Next word prediction after n_input words learned from text file.
A story is automatically generated if the predicted word is fed back as input.
Author: Rowel Atienza
Project: https://github.com/roatienza/Deep-Learning-Experiments
'''
from __future__ import print_function
import numpy as np
import tensorflow as tf
from tensorflow.contrib import rnn
import random
import collections
import time
In [2]:
start_time = time.time()
def elapsed(sec):
if sec<60:
return str(sec) + " sec"
elif sec<(60*60):
return str(sec/60) + " min"
else:
return str(sec/(60*60)) + " hr"
In [3]:
# Target log path
logs_path = 'rnn_code'
writer = tf.summary.FileWriter(logs_path)
# Text file containing words for training
training_file = 'inputcode.txt'
In [4]:
def read_data(fname):
with open(fname) as f:
content = f.read()
#print(content)
#content = [x.strip() for x in content]
#content = [content[i].split(',') for i in range(len(content)-1)]
#content = np.array(content)
#content = np.reshape(content, [-1, ])
content = content.split(',')
#print(content)
return content[:-1]
In [5]:
training_data = read_data(training_file)
print("Loaded training data...")
print(training_data)
training_data = list(map(int, training_data))
print(training_data)
In [6]:
print(training_data[:10])
print(len(training_data))
def build_dataset(words): count = collections.Counter(words).mostcommon() dictionary = dict() for word, in count: dictionary[word] = len(dictionary) reverse_dictionary = dict(zip(dictionary.values(), dictionary.keys())) return dictionary, reverse_dictionary
dictionary, reverse_dictionary = build_dataset(training_data) vocab_size = len(dictionary)
print(vocab_size) print(dictionary)
print(training_data) vocab = list(set(training_data)) print(vocab) vocab1 = ['1','2','3'] print(vocab1)
In [13]:
# Parameters
learning_rate = 0.001
training_iters = 50000
display_step = 1000
n_input = 3
vocab = [1,2,3]
vocab_size = 3
# number of units in RNN cell
n_hidden = 512
py_flag = False
# tf Graph input
x = tf.placeholder("float", [1, n_input, 1])
y = tf.placeholder("float", [1, vocab_size])
In [14]:
# RNN output node weights and biases
weights = {
'out': tf.Variable(tf.random_normal([n_hidden, vocab_size]))
}
biases = {
'out': tf.Variable(tf.random_normal([vocab_size]))
}
In [15]:
#tf.reset_default_graph()
#print(x)
In [16]:
print(tf.split(x,n_input,1))
In [17]:
def RNN(x, weights, biases):
#print("1", x)
# reshape to [1, n_input]
x = tf.reshape(x, [-1, n_input])
#print("2", x, x.shape)
# Generate a n_input-element sequence of inputs
# (eg. [had] [a] [general] -> [20] [6] [33])
x = tf.split(x,n_input,1)
#print("3", x)
# 1-layer LSTM with n_hidden units.
rnn_cell = rnn.BasicLSTMCell(n_hidden)
# generate prediction
outputs, states = rnn.static_rnn(rnn_cell, x, dtype=tf.float32)
print("4", outputs)
# there are n_input outputs but
# we only want the last output
return tf.matmul(outputs[-1], weights['out']) + biases['out']
In [18]:
pred = RNN(x, weights, biases)
# Loss and optimizer
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=pred, labels=y))
optimizer = tf.train.RMSPropOptimizer(learning_rate=learning_rate).minimize(cost)
# Model evaluation
correct_pred = tf.equal(tf.argmax(pred,1), tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
# Initializing the variables
init = tf.global_variables_initializer()
#testing
#symbols_in_keys = [ [dictionary[ str(training_data[i])]] for i in range(offset, offset+n_input) ]
#symbols_in_keys = np.reshape(np.array(symbols_in_keys), [-1, n_input, 1])
#input_set = np.reshape(training_data, [-1, n_input, 1])
offset = 8
symbols_out_onehot = np.zeros([vocab_size], dtype=float)
#symbols_out_onehot[dictionary[str(training_data[offset+n_input])]] = 1.0
symbols_out_onehot[int(training_data[offset+n_input]) - 1] = 1.0
print(symbols_out_onehot)
#symbols_out_onehot[training_data[offset+n_input]] = 1.0
#symbols_out_onehot = np.reshape(symbols_out_onehot,[1,-1])
input_data = [training_data[i] for i in range(offset, offset+n_input)] input_set = np.reshape(input_data, [-1, n_input, 1]) print(input_set)
In [20]:
# Launch the graph
with tf.Session() as session:
session.run(init)
step = 0
offset = random.randint(0,n_input+1)
end_offset = n_input + 1
acc_total = 0
loss_total = 0
writer.add_graph(session.graph)
while step < training_iters: #training_iters=50000
# Generate a minibatch. Add some randomness on selection process.
if offset > (len(training_data)-end_offset):
offset = random.randint(0, n_input+1)
#symbols_in_keys = [ [dictionary[ str(training_data[i])]] for i in range(offset, offset+n_input) ]
#symbols_in_keys = np.reshape(np.array(symbols_in_keys), [-1, n_input, 1])
input_data = [training_data[i] for i in range(offset, offset+n_input)]
input_set = np.reshape(input_data, [-1, n_input, 1])
#input_set = np.reshape(training_data, [-1, n_input, 1])
#print(input_set, input_set.shape)
symbols_out_onehot = np.zeros([vocab_size], dtype=float)
symbols_out_onehot[int(training_data[offset+n_input]) - 1] = 1.0
symbols_out_onehot = np.reshape(symbols_out_onehot,[1,-1])
#print(symbols_out_onehot, symbols_out_onehot.shape)
_, acc, loss, onehot_pred = session.run([optimizer, accuracy, cost, pred], \
feed_dict={x: input_set, y: symbols_out_onehot})
loss_total += loss
acc_total += acc
if (step+1) % display_step == 0:
print("Iter= " + str(step+1) + ", Average Loss= " + \
"{:.6f}".format(loss_total/display_step) + ", Average Accuracy= " + \
"{:.2f}%".format(100*acc_total/display_step))
acc_total = 0
loss_total = 0
symbols_in = [training_data[i] for i in range(offset, offset + n_input)]
symbols_out = training_data[offset + n_input]
#symbols_out_pred = reverse_dictionary[int(tf.argmax(onehot_pred, 1).eval())]
symbols_out_pred = int(tf.argmax(onehot_pred, 1).eval())
print("%s - [%s] vs [%s]" % (symbols_in,symbols_out,symbols_out_pred))
step += 1
offset += (n_input+1)
print("Optimization Finished!")
print("Elapsed time: ", elapsed(time.time() - start_time))
print("Run on command line.")
print("\ttensorboard --logdir=%s" % (logs_path))
print("Point your web browser to: http://localhost:6006/")
'''
while True:
prompt = "%s words: " % n_input
sentence = input(prompt)
sentence = sentence.strip()
words = sentence.split(' ')
if len(words) != n_input:
continue
try:
symbols_in_keys = [dictionary[str(words[i])] for i in range(len(words))]
for i in range(32):
keys = np.reshape(np.array(symbols_in_keys), [-1, n_input, 1])
onehot_pred = session.run(pred, feed_dict={x: keys})
onehot_pred_index = int(tf.argmax(onehot_pred, 1).eval())
sentence = "%s %s" % (sentence,reverse_dictionary[onehot_pred_index])
symbols_in_keys = symbols_in_keys[1:]
symbols_in_keys.append(onehot_pred_index)
print(sentence)
except:
print("Word not in dictionary")'''
In [ ]:
In [ ]: