In [1]:
# -*- coding: utf-8 -*-
# 1. Import the dependencies
import numpy as np
import tensorflow as tf
import random,datetime
In [2]:
# 2. Import the dataset
text = open('wiki.test.raw').read()
print 'Length of the text is {} '.format(len(text))
In [3]:
# 3. View the contents of the file
print 'Header : '
print text[:100]
In [4]:
# 4. Prepare the list of charecters used in the dataset.
chars = sorted(list(set(text)))
char_size = len(chars)
print 'Number of charecters : {}'.format(char_size)
print chars
In [5]:
#5. Generate mappings so that we can quantify the data.
char2id=dict((c,i) for i,c in enumerate(chars))
id2char=dict((i,c) for i,c in enumerate(chars))
In [6]:
#6. Sample one charecter from the prediction probablity distribution.
def sample(prediction):
r = random.uniform(0,1)
s =0
char_id = len(prediction) - 1
for i in range(len(prediction)):
s = s + prediction[i]
if s >= r:
char_id = i
break
char_one_hot = np.zeros(shape=[char_size])
char_one_hot[char_id]=1.0
return char_one_hot
In [7]:
#7. vectorize our data and feed it into model
len_per_section = 50
skip = 2
sections = []
next_chars = []
for i in range(0,len(text)-len_per_section,skip):
sections.append(text[i:i+len_per_section])
next_chars.append(text[i+len_per_section])
#Vectorize using numpy
X = np.zeros((len(sections), len_per_section), dtype=np.int32) #will be converted to one hot later
y = np.zeros((len(sections),char_size))
for i,section in enumerate(sections):
X[i] = [char2id[x] for x in section] #will be converted to one hot later
y[i,char2id[next_chars[i]]] = 1
In [8]:
print X[0]
In [9]:
#8. Initialize hyper parameters and checkpoint directory
batch_size = 32
max_steps=7000
log_every=100
save_every=400
hidden_nodes=1024
starting_text = 'i am thinking that'
checkpoint_directory='ckpt'
if tf.gfile.Exists(checkpoint_directory):
tf.gfile.DeleteRecursively(checkpoint_directory)
tf.gfile.MakeDirs(checkpoint_directory)
In [10]:
#9. Segment which makes LSTMs from scratch.
graph=tf.Graph()
with graph.as_default():
global_step=tf.Variable(0) #Number of batches seen so far
batch_data_tensor = tf.placeholder(tf.int32,[batch_size,len_per_section]) # We will accept indices, convert to one hot later
labels = tf.placeholder(tf.float32,[batch_size,char_size])
data = tf.one_hot(batch_data_tensor, depth=char_size, dtype=tf.float32, axis=-1)
#input gate, output gate, forget gate and internal state
#they will be calculated
#input gate
w_ii = tf.Variable(tf.truncated_normal([char_size,hidden_nodes],-0.1,0.1))
w_io = tf.Variable(tf.truncated_normal([hidden_nodes,hidden_nodes],-0.1,0.1))
b_i = tf.Variable(tf.zeros([1,hidden_nodes]))
#forget gate
w_fi = tf.Variable(tf.truncated_normal([char_size,hidden_nodes],-0.1,0.1))
w_fo = tf.Variable(tf.truncated_normal([hidden_nodes,hidden_nodes],-0.1,0.1))
b_f = tf.Variable(tf.zeros([1,hidden_nodes]))
#output gate
w_oi = tf.Variable(tf.truncated_normal([char_size,hidden_nodes],-0.1,0.1))
w_oo = tf.Variable(tf.truncated_normal([hidden_nodes,hidden_nodes],-0.1,0.1))
b_o = tf.Variable(tf.zeros([1,hidden_nodes]))
#memory cell
w_ci = tf.Variable(tf.truncated_normal([char_size,hidden_nodes],-0.1,0.1))
w_co = tf.Variable(tf.truncated_normal([hidden_nodes,hidden_nodes],-0.1,0.1))
b_c = tf.Variable(tf.zeros([1,hidden_nodes]))
def lstm(i,o,state):
input_gate = tf.sigmoid(tf.matmul(i,w_ii)+tf.matmul(o,w_io)+b_i)
forget_gate = tf.sigmoid(tf.matmul(i,w_fi) + tf.matmul(o,w_fo) + b_f)
output_gate = tf.sigmoid(tf.matmul(i,w_oi) + tf.matmul(o,w_oo) + b_o)
memory_cell = tf.sigmoid(tf.matmul(i,w_ci) + tf.matmul(o,w_co) + b_c)
state = forget_gate * state + input_gate * memory_cell #What we want to forget from given, and what we want to remember from what we know
output = output_gate*tf.tanh(state)
return output,state
output=tf.zeros([batch_size,hidden_nodes])
state=tf.zeros([batch_size,hidden_nodes])
for i in range(len_per_section):
output,state = lstm(data[:,i,:],output,state)
if i == 0:
outputs_all_i = output
labels_all_i=data[:,i+1,:]
elif i != len_per_section -1:
outputs_all_i=tf.concat(0,[outputs_all_i,output])
labels_all_i=tf.concat(0,[labels_all_i,data[:,i+1,:]])
else:
outputs_all_i=tf.concat(0,[outputs_all_i,output])
labels_all_i=tf.concat(0,[labels_all_i,labels])
w=tf.Variable(tf.truncated_normal([hidden_nodes,char_size],-0.1,0.1))
b=tf.Variable(tf.zeros([char_size]))
logits = tf.matmul(outputs_all_i,w) + b
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits,labels_all_i))
optimizer = tf.train.GradientDescentOptimizer(10.).minimize(loss, global_step=global_step)
In [11]:
with tf.Session(graph=graph) as sess:
tf.initialize_all_variables().run()
offset = 0
saver = tf.train.Saver()
for step in range(max_steps):
offset = offset % len(X)
if offset <=(len(X) - batch_size):
batch_data=X[offset:offset+batch_size]
batch_labels=y[offset:offset+batch_size]
offset += batch_size
else:
to_add = batch_size - (len(X) - offset)
batch_data = np.concatenate((X[offset:len(X)],X[0:to_add]))
batch_labels = np.concatenate((y[offset:len(X)],y[0:to_add]))
offset = to_add
#print batch_labels.shape
_,training_loss = sess.run([optimizer,loss],feed_dict={batch_data_tensor:batch_data,labels:batch_labels})
if step % 10 == 0:
print 'Training loss at step %d: %.2f (%s)' % (step, training_loss, datetime.datetime.now())
if step % save_every == 0:
saver.save(sess, checkpoint_directory + '/model', global_step=step)
print 'Model Saved!'
In [11]:
test_start = 'I plan to make the world a better place.'
with tf.Session(graph=graph) as sess:
with tf.device("/cpu:0"):
#init graph, load model
tf.initialize_all_variables().run()
model = tf.train.latest_checkpoint(checkpoint_directory)
saver = tf.train.Saver()
saver.restore(sess, model)
test_data = tf.placeholder(tf.float32, shape=[1, char_size])
test_output = tf.Variable(tf.zeros([1, hidden_nodes]))
test_state = tf.Variable(tf.zeros([1, hidden_nodes]))
reset_test_state = tf.group(test_output.assign(tf.zeros([1, hidden_nodes])),
test_state.assign(tf.zeros([1, hidden_nodes])))
test_output, test_state = lstm(test_data, test_output, test_state)
test_prediction = tf.nn.softmax(tf.matmul(test_output, w) + b)
#set input variable to generate chars from
reset_test_state.run()
test_generated = test_start
#for every char in the input sentennce
for i in range(len(test_start) - 1):
#initialize an empty char store
test_X = np.zeros((1, char_size))
#store it in id from
test_X[0, char2id[test_start[i]]] = 1.
#feed it to model, test_prediction is the output value
_ = sess.run(test_prediction, feed_dict={test_data: test_X})
#where we store encoded char predictions
test_X = np.zeros((1, char_size))
test_X[0, char2id[test_start[-1]]] = 1.
#lets generate 500 characters
for i in range(500):
#get each prediction probability
prediction = test_prediction.eval({test_data: test_X})[0]
#one hot encode it
#print prediction
next_char_one_hot = sample(prediction)
#print next_char_one_hot
#get the indices of the max values (highest probability) and convert to char
next_char = id2char[np.argmax(next_char_one_hot)]
#add each char to the output text iteratively
test_generated += next_char
#update the
test_X = next_char_one_hot.reshape((1, char_size))
print test_generated
In [ ]: