In [1]:
# Read data
g = open('reviews.txt', 'r')
reviews = list(map(lambda x:x[:-1], g.readlines()))
g.close()
g = open('labels.txt', 'r')
labels = list(map(lambda x:x[:-1].upper(), g.readlines()))
g.close()
In [2]:
from collections import Counter
import numpy as np
import time
import sys
In [3]:
reviews[0]
Out[3]:
In [4]:
labels[0]
Out[4]:
In [21]:
class SentimentNetwork:
def __init__(self, reviews, labels, hidden_nodes = 10, learning_rate = 0.1):
np.random.seed(1)
self.pre_process_data(reviews, labels)
self.init_network(self.review_vocab_size, hidden_nodes, 1, learning_rate)
def pre_process_data(self, reviews, labels):
review_vocab = set()
for review in reviews:
for word in review.split(' '):
review_vocab.add(word)
self.review_vocab = list(review_vocab)
self.review_vocab_size = len(self.review_vocab)
label_vocab = set()
for label in labels:
label_vocab.add(label)
self.label_vocab = list(label_vocab)
self.label_vocab_size = len(self.label_vocab)
self.word2index = {}
for i, word in enumerate(self.review_vocab):
self.word2index[word] = i
self.label2index = {}
for i, label in enumerate(self.label_vocab):
self.label2index[label] = i
def init_network(self, input_nodes, hidden_nodes, output_nodes, learning_rate):
self.input_nodes = input_nodes
self.hidden_nodes = hidden_nodes
self.output_nodes = output_nodes
self.learning_rate = learning_rate
self.weights_0_1 = np.zeros((self.input_nodes, self.hidden_nodes))
self.weights_1_2 = np.random.normal(0.0, self.output_nodes**-.5, (self.hidden_nodes, self.output_nodes))
self.layer_0 = np.zeros((1, self.input_nodes))
def update_input_layer(self, review):
self.layer_0 *= 0
for word in review.split(' '):
if word in self.word2index.keys():
idx = self.word2index[word]
self.layer_0[0][idx] = 1
def sigmoid(self, x):
return 1.0 / (1.0 + np.exp(-x))
def sigmoid_output_2_derivative(self, output):
return output * (1.0 - output)
def get_target_for_label(self, label):
if label == 'POSITIVE':
return 1
else:
return 0
def train(self, training_reviews, training_labels):
assert(len(training_reviews) == len(training_labels))
start = time.time()
correct_so_far = 0
for i in range(len(training_reviews)):
review = training_reviews[i]
label = training_labels[i]
self.update_input_layer(review)
layer_1 = self.layer_0.dot(self.weights_0_1)
layer_2 = self.sigmoid(layer_1.dot(self.weights_1_2))
layer_2_error = layer_2 - self.get_target_for_label(label)
layer_2_delta = layer_2_error * self.sigmoid_output_2_derivative(layer_2)
layer_1_error = layer_2_delta.dot(self.weights_1_2.T)
layer_1_delta = layer_1_error
self.weights_1_2 -= layer_1.T.dot(layer_2_delta) * self.learning_rate
self.weights_0_1 -= self.layer_0.T.dot(layer_1_delta) * self.learning_rate
if layer_2 >= 0.5 and label == 'POSITIVE':
correct_so_far += 1
elif layer_2 < 0.5 and label == 'NEGATIVE':
correct_so_far += 1
elapsed_time = float(time.time() - start)
reviews_per_second = i / elapsed_time if elapsed_time > 0 else 0
sys.stdout.write("\rProgress:" + str(100 * i/float(len(training_reviews)))[:4] \
+ "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
+ " #Correct:" + str(correct_so_far) + " #Trained:" + str(i+1) \
+ " Training Accuracy:" + str(correct_so_far * 100 / float(i+1))[:4] + "%")
if(i % 2500 == 0):
print("")
def run(self, review):
self.update_input_layer(review.lower())
layer_1 = self.layer_0.dot(self.weights_0_1)
layer_2 = self.sigmoid(layer_1.dot(self.weights_1_2))
if layer_2 >= 0.5:
return 'POSITIVE'
else:
return 'NEGATIVE'
def test(self, testing_reviews, testing_labels):
correct = 0
start = time.time()
for i in range(len(testing_reviews)):
review = testing_reviews[i]
label = testing_labels[i]
prediction = self.run(review)
if prediction == label:
correct += 1
elapsed_time = float(time.time() - start)
reviews_per_second = i / elapsed_time if elapsed_time > 0 else 0
sys.stdout.write("\rProgress:" + str(100 * i/float(len(testing_reviews)))[:4] \
+ "% Speed(reviews/sec):" + str(reviews_per_second)[0:5] \
+ " #Correct:" + str(correct) + " #Tested:" + str(i+1) \
+ " Testing Accuracy:" + str(correct * 100 / float(i+1))[:4] + "%")
In [22]:
mlp = SentimentNetwork(reviews[:-1000],labels[:-1000], learning_rate=0.01)
In [23]:
mlp.train(reviews[:-1000], labels[:-1000])
In [24]:
mlp.test(reviews[-1000:], labels[-1000:])
In [ ]: