In [1]:
from pyspark import SparkContext, SparkConf, SQLContext, HiveContext, StorageLevel
from pyspark.sql.functions import *
from pyspark.mllib.feature import HashingTF

sc = SparkContext()
sqlContext = SQLContext(sc)

In [2]:
#Importing other Libraries
from np_extractor import *
import nltk
from nltk.corpus import stopwords
#from rake import *
import json
import os
import pandas as pd

In [3]:
#Read Data file in sparkSQL
# reviews = sqlContext.read.json("../data/reviews_electronics5000.json")
# reviews.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_SER)
revDB = sqlContext.read.json("../data/reviews_electronics5000.json")
metadataDB = sqlContext.read.json("../data/meta_electronics.json")
fullData = revDB.join(metadataDB)
fullData.printSchema()


root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- description: string (nullable = true)
 |-- imUrl: string (nullable = true)
 |-- price: double (nullable = true)
 |-- related: struct (nullable = true)
 |    |-- also_bought: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- also_viewed: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- bought_together: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- buy_after_viewing: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- salesRank: struct (nullable = true)
 |    |-- Arts, Crafts & Sewing: long (nullable = true)
 |    |-- Automotive: long (nullable = true)
 |    |-- Baby: long (nullable = true)
 |    |-- Beauty: long (nullable = true)
 |    |-- Camera & Photo: long (nullable = true)
 |    |-- Cell Phones & Accessories: long (nullable = true)
 |    |-- Clothing: long (nullable = true)
 |    |-- Computers & Accessories: long (nullable = true)
 |    |-- Electronics: long (nullable = true)
 |    |-- Grocery & Gourmet Food: long (nullable = true)
 |    |-- Health & Personal Care: long (nullable = true)
 |    |-- Home & Kitchen: long (nullable = true)
 |    |-- Home Improvement: long (nullable = true)
 |    |-- Industrial & Scientific: long (nullable = true)
 |    |-- Jewelry: long (nullable = true)
 |    |-- Kitchen & Dining: long (nullable = true)
 |    |-- Magazines: long (nullable = true)
 |    |-- Movies & TV: long (nullable = true)
 |    |-- Musical Instruments: long (nullable = true)
 |    |-- Office Products: long (nullable = true)
 |    |-- Patio, Lawn & Garden: long (nullable = true)
 |    |-- Pet Supplies: long (nullable = true)
 |    |-- Shoes: long (nullable = true)
 |    |-- Software: long (nullable = true)
 |    |-- Sports & Outdoors: long (nullable = true)
 |    |-- Toys & Games: long (nullable = true)
 |    |-- Video Games: long (nullable = true)
 |    |-- Watches: long (nullable = true)
 |-- title: string (nullable = true)


In [ ]:
fullData.groupBy(fullData['categories']).count().show()

In [5]:
#Read Data file in sparkSQL
#reviews = sqlContext.read.json("../data/reviews_electronics5000.json")
#reviews.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_SER)

num_part = 16
revs = get_rdd('../data', 'reviews_electronics5000.json', num_part)
rev_texts = revs.map(lambda x: (x['asin'], x['reviewText']))
#rev_agg_texts = rev_texts.map(lambda (asin, text): (asin, [text])).reduceByKey(lambda x, y: x + y)
allRevs = rev_texts.map(lambda (asin,text): text)
#allRevs.cache()

In [6]:
metadata = get_rdd('../data','meta_electronics.json',num_part)

In [7]:
text_file = open("../data/MergedStopList.txt", "r")
lines = text_file.readlines()
stopwords = [""]
for line in lines:
    if "#" not in line:
        stopwords.append(line.strip())

import nltk.corpus
#stopwords.words('english')
nltk_stopwords = nltk.corpus.stopwords.words('english')
stopwords.extend(nltk_stopwords)

In [9]:
#Cleaning up
counts = allRevs.flatMap(lambda line: line.split(" "))
counts = counts.flatMap(lambda word: nltk.word_tokenize(word))
counts = counts.map(lambda word: (word.lower(), 1)).reduceByKey(lambda a, b: a + b)
counts = counts.filter(lambda x: len(x[0]) > 2)
counts = counts.filter(lambda x: x[0].isalnum())
filteredCounts = counts.filter(lambda x: x[0] not in stopwords)
filteredCounts = filteredCounts.cache()
#filteredCounts.sortBy(lambda (word, count): count)
#countsDF = filteredCounts.toDF()
#filteredCounts.toDF().sort(desc("_2"))
vocabulary = filteredCounts.map(lambda x : x[0]).collect()

In [10]:
hashingTF = HashingTF()
tf = hashingTF.transform(documents)

In [11]:
filteredCounts = filteredCounts.cache()

In [12]:
posTaggedWords = posTaggedWords.cache()

In [14]:
filteredCounts.take(5)


Out[14]:
[(u'precise', 4),
 (u'gripped', 2),
 (u'nookt', 1),
 (u'grandkids', 5),
 (u'3gi', 1)]

In [ ]:
posTaggedWords = filteredCounts.map(lambda (word,count): (word,nltk.pos_tag(word)[0][1],count))
df = pd.DataFrame(posTaggedWords.collect())
df.to_csv('../data/processed/posTaggedWords_final.csv')

In [ ]:
#Syntax for NLTK
#tokens = nltk.word_tokenize(text)
#tagged = nltk.pos_tag(tokens)
#from nltk.corpus import stopwords
#stopwords.words('english')
#nltk_stopwords = stopwords.words('english')
#other_stopwords = 
#from nltk.corpus import wordnet as wn
# words = data.flatMap(lambda x: nltk.word_tokenize(x))
# print words.take(10)
# pos_word = words.map(lambda x: nltk.pos_tag([x]))
# print pos_word.take(5)

In [16]:
import numpy as np
import lda
import lda.datasets
X = lda.datasets.load_reuters()
vocab = lda.datasets.load_reuters_vocab()
titles = lda.datasets.load_reuters_titles()
X.shape


Out[16]:
(395, 4258)

In [18]:



Out[18]:
array([[1, 0, 1, ..., 0, 0, 0],
       [7, 0, 2, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ..., 
       [1, 0, 1, ..., 0, 0, 0],
       [1, 0, 1, ..., 0, 0, 0],
       [1, 0, 1, ..., 0, 0, 0]], dtype=int32)

In [19]:
X.sum()
model = lda.LDA(n_topics=20, n_iter=1500, random_state=1)
model.fit(X)  # model.fit_transform(X) is also available
topic_word = model.topic_word_  # model.components_ also works
n_top_words = 8
for i, topic_dist in enumerate(topic_word):
    topic_words = np.array(vocab)[np.argsort(topic_dist)][:-(n_top_words+1):-1]
    print('Topic {}: {}'.format(i, ' '.join(topic_words)))


Topic 0: british churchill sale million major letters west britain
Topic 1: church government political country state people party against
Topic 2: elvis king fans presley life concert young death
Topic 3: yeltsin russian russia president kremlin moscow michael operation
Topic 4: pope vatican paul john surgery hospital pontiff rome
Topic 5: family funeral police miami versace cunanan city service
Topic 6: simpson former years court president wife south church
Topic 7: order mother successor election nuns church nirmala head
Topic 8: charles prince diana royal king queen parker bowles
Topic 9: film french france against bardot paris poster animal
Topic 10: germany german war nazi letter christian book jews
Topic 11: east peace prize award timor quebec belo leader
Topic 12: n't life show told very love television father
Topic 13: years year time last church world people say
Topic 14: mother teresa heart calcutta charity nun hospital missionaries
Topic 15: city salonika capital buddhist cultural vietnam byzantine show
Topic 16: music tour opera singer israel people film israeli
Topic 17: church catholic bernardin cardinal bishop wright death cancer
Topic 18: harriman clinton u.s ambassador paris president churchill france
Topic 19: city museum art exhibition century million churches set

In [ ]:
# 3. output
# import pandas as pd
# df = pd.DataFrame(items_np.collect())
# df.to_csv('data/processed/computers_kw.csv')

In [10]:
mylist = ['spam', 'ham', 'eggs']

In [13]:
a = ' '.join(mylist)

In [14]:
type (a)


Out[14]:
str

In [15]:
a


Out[15]:
'spam ham eggs'

In [ ]: