In [1]:
from pyspark import SparkContext, SparkConf, SQLContext, HiveContext, StorageLevel
from pyspark.sql.functions import *
sc = SparkContext()
sqlContext = SQLContext(sc)

In [3]:
#Importing other Libraries
from np_extractor import *
import nltk
from nltk.corpus import stopwords
#from rake import *
import json
import os
import pandas as pd

In [4]:
def get_rdd(base, input, num_part):
                base_dir = os.path.join(base)
                input_path = os.path.join(input)
                file_name = os.path.join(base_dir, input_path)
                # load data
                rdd = sc.textFile(file_name, num_part)
                rdd_j = rdd.map(json.loads)
                rdd_j.cache()
                return rdd_j

In [5]:
#Read Data file in sparkSQL
#reviews = sqlContext.read.json("../data/reviews_electronics5000.json")
#reviews.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_SER)

num_part = 16
revs = get_rdd('../data', 'reviews_electronics5000.json', num_part)
rev_texts = revs.map(lambda x: (x['asin'], x['reviewText']))
#rev_agg_texts = rev_texts.map(lambda (asin, text): (asin, [text])).reduceByKey(lambda x, y: x + y)
allRevs = rev_texts.map(lambda (asin,text): text)
allRevs.cache()


Out[5]:
PythonRDD[7] at RDD at PythonRDD.scala:43

In [6]:
revDB = sqlContext.read.json("../data/reviews_electronics5000.json")
revDB.cache()
revDB.printSchema()


root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)


In [7]:
metadataDB = sqlContext.read.json("../data/meta_electronics.json")
metadataDB.cache()
#metadataDB.printSchema()


root
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- description: string (nullable = true)
 |-- imUrl: string (nullable = true)
 |-- price: double (nullable = true)
 |-- related: struct (nullable = true)
 |    |-- also_bought: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- also_viewed: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- bought_together: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- buy_after_viewing: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- salesRank: struct (nullable = true)
 |    |-- Arts, Crafts & Sewing: long (nullable = true)
 |    |-- Automotive: long (nullable = true)
 |    |-- Baby: long (nullable = true)
 |    |-- Beauty: long (nullable = true)
 |    |-- Camera & Photo: long (nullable = true)
 |    |-- Cell Phones & Accessories: long (nullable = true)
 |    |-- Clothing: long (nullable = true)
 |    |-- Computers & Accessories: long (nullable = true)
 |    |-- Electronics: long (nullable = true)
 |    |-- Grocery & Gourmet Food: long (nullable = true)
 |    |-- Health & Personal Care: long (nullable = true)
 |    |-- Home & Kitchen: long (nullable = true)
 |    |-- Home Improvement: long (nullable = true)
 |    |-- Industrial & Scientific: long (nullable = true)
 |    |-- Jewelry: long (nullable = true)
 |    |-- Kitchen & Dining: long (nullable = true)
 |    |-- Magazines: long (nullable = true)
 |    |-- Movies & TV: long (nullable = true)
 |    |-- Musical Instruments: long (nullable = true)
 |    |-- Office Products: long (nullable = true)
 |    |-- Patio, Lawn & Garden: long (nullable = true)
 |    |-- Pet Supplies: long (nullable = true)
 |    |-- Shoes: long (nullable = true)
 |    |-- Software: long (nullable = true)
 |    |-- Sports & Outdoors: long (nullable = true)
 |    |-- Toys & Games: long (nullable = true)
 |    |-- Video Games: long (nullable = true)
 |    |-- Watches: long (nullable = true)
 |-- title: string (nullable = true)


In [10]:
fullData = revDB.join(metadataDB)
#fullData.printSchema()


root
 |-- asin: string (nullable = true)
 |-- helpful: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- description: string (nullable = true)
 |-- imUrl: string (nullable = true)
 |-- price: double (nullable = true)
 |-- related: struct (nullable = true)
 |    |-- also_bought: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- also_viewed: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- bought_together: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- buy_after_viewing: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- salesRank: struct (nullable = true)
 |    |-- Arts, Crafts & Sewing: long (nullable = true)
 |    |-- Automotive: long (nullable = true)
 |    |-- Baby: long (nullable = true)
 |    |-- Beauty: long (nullable = true)
 |    |-- Camera & Photo: long (nullable = true)
 |    |-- Cell Phones & Accessories: long (nullable = true)
 |    |-- Clothing: long (nullable = true)
 |    |-- Computers & Accessories: long (nullable = true)
 |    |-- Electronics: long (nullable = true)
 |    |-- Grocery & Gourmet Food: long (nullable = true)
 |    |-- Health & Personal Care: long (nullable = true)
 |    |-- Home & Kitchen: long (nullable = true)
 |    |-- Home Improvement: long (nullable = true)
 |    |-- Industrial & Scientific: long (nullable = true)
 |    |-- Jewelry: long (nullable = true)
 |    |-- Kitchen & Dining: long (nullable = true)
 |    |-- Magazines: long (nullable = true)
 |    |-- Movies & TV: long (nullable = true)
 |    |-- Musical Instruments: long (nullable = true)
 |    |-- Office Products: long (nullable = true)
 |    |-- Patio, Lawn & Garden: long (nullable = true)
 |    |-- Pet Supplies: long (nullable = true)
 |    |-- Shoes: long (nullable = true)
 |    |-- Software: long (nullable = true)
 |    |-- Sports & Outdoors: long (nullable = true)
 |    |-- Toys & Games: long (nullable = true)
 |    |-- Video Games: long (nullable = true)
 |    |-- Watches: long (nullable = true)
 |-- title: string (nullable = true)


In [12]:
#Word count
counts = allRevs.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.sortBy(lambda (word, count): count)
countsDF = counts.toDF()
countsDF.filter(countsDF['_2'] > 50).show()


+-------+----+
|     _1|  _2|
+-------+----+
|       |9701|
|   four|  82|
|looking| 307|
|   even| 755|
|    up.| 109|
|     me|1217|
|  enjoy| 101|
|   will|1345|
| better| 491|
|  sound|  75|
|   high| 123|
|  hours| 196|
|website|  66|
|  large| 117|
|   note|  54|
|   iPad| 181|
|   plug| 146|
|   cord|  97|
|    box| 107|
|current|  53|
+-------+----+
only showing top 20 rows


In [ ]:
df = pd.DataFrame(counts.collect())
df.to_csv('../data/processed/wordcount.csv')

In [ ]:


In [ ]:
#Syntax for NLTK
#tokens = nltk.word_tokenize(text)
#tagged = nltk.pos_tag(tokens)
#from nltk.corpus import stopwords
#stopwords.words('english')
#nltk_stopwords = stopwords.words('english')
#other_stopwords = 
#from nltk.corpus import wordnet as wn
# words = data.flatMap(lambda x: nltk.word_tokenize(x))
# print words.take(10)
# pos_word = words.map(lambda x: nltk.pos_tag([x]))
# print pos_word.take(5)

In [ ]:
modedReviews = reviews.map(lambda p: p.reviewText.split())

In [ ]:
testData = modedReviews.collect()

In [ ]:
tagged = []
for data in testData:
    tagged.append(nltk.pos_tag(nltk.word_tokenize(data)))

In [ ]:
import numpy as np
import lda
import lda.datasets
X = lda.datasets.load_reuters()
vocab = lda.datasets.load_reuters_vocab()
titles = lda.datasets.load_reuters_titles()
X.shape

In [ ]:
X.sum()
model = lda.LDA(n_topics=20, n_iter=1500, random_state=1)
model.fit(X)  # model.fit_transform(X) is also available
topic_word = model.topic_word_  # model.components_ also works
n_top_words = 8
for i, topic_dist in enumerate(topic_word):
    topic_words = np.array(vocab)[np.argsort(topic_dist)][:-(n_top_words+1):-1]
    print('Topic {}: {}'.format(i, ' '.join(topic_words)))

In [ ]:
# (asin, ([review], (d_prod, [category])) )
items = rev_agg_texts.join(prods_)
items = items.map( lambda (asin, (reviews, (d_prod, categories))): (asin, reviews, d_prod, categories) )


# 1. RAKE: keyword. use rake algorithm to extract keywords and take top 10 keywords from each asin
rake = Rake('data/MergedStopList.txt') # TODO: add more into this list
items_wk = items.map( lambda (asin, reviews, d_prod, categories): (asin, rake.run(' '.join(reviews)), reviews, d_prod, categories) )

# 2. NP: noun phrasee among these keywords
import nltk
from scripts.np_extractor import *
items_wk.cache


items_np = items_wk.map(lambda (asin, pairs, reviews, d_prod, categories): 
                               (asin, [(NPExtractor(string).extract(), score) for (string, score) in pairs], reviews, d_prod, categories)
                       )


items_np = items_np.map(lambda (asin, pairs, reviews, d_prod, categories):
                               (asin, [(toks, scr) for (toks, scr) in pairs if len(toks) > 0], reviews, d_prod, categories)
                       )



# 3. output
import pandas as pd
df = pd.DataFrame(items_np.collect())
df.to_csv('data/processed/computers_kw.csv')

In [ ]: