In [1]:
from pyspark import SparkContext, SparkConf, SQLContext, HiveContext, StorageLevel
from pyspark.sql.functions import *
from pyspark.mllib.feature import HashingTF

In [2]:
sc = SparkContext()
sqlContext = SQLContext(sc)

In [3]:
#Importing other Libraries
from pyspark.mllib.linalg import Matrices, DenseMatrix,SparseMatrix
from np_extractor import *
import nltk
import nltk.corpus
#from rake import *
import json
import os
import pandas as pd

In [4]:
def get_rdd(base, input, num_part):
    base_dir = os.path.join(base)
    input_path = os.path.join(input)
    file_name = os.path.join(base_dir, input_path)
    # load data
    rdd = sc.textFile(file_name, num_part)
    rdd_j = rdd.map(json.loads)
    rdd_j.cache()
    return rdd_j

In [5]:
def load_stopwords():
    text_file = open("../data/MergedStopList.txt", "r")
    lines = text_file.readlines()
    stopwords = [""]
    for line in lines:
        if "#" not in line: #Throwing out the comments
            stopwords.append(line.strip())
    
    nltk_stopwords = nltk.corpus.stopwords.words('english')
    stopwords.extend(nltk_stopwords)
    return set(stopwords)

stopwords = load_stopwords()

In [6]:
def cleanReview(text):
    #Input a single reivew
    text = text.split(" ")
    clean_text = []
    for word in text:
        word = word.lower()
        words = nltk.word_tokenize(word)
        for word in words:
            if word not in stopwords:
                if word.isalnum():
                    clean_text.append(word)
    return clean_text

In [7]:
def cleanReviews(reviewList):
    #Input list of reviews
    clean_reviews = []
    for review in reviewList:
        clean_reviews.extend(cleanReview(review))
    return " ".join(clean_reviews)

In [37]:
#Read Data file in sparkSQL
#reviews = sqlContext.read.json("../data/reviews_electronics5000.json")
#reviews.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_SER)
revDB = sqlContext.read.json("../data/reviews_electronics.json")
metadataDB = sqlContext.read.json("../data/meta_electronics.json")
metadataDB.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_SER)
#
# fullData = revDB.join(metadataDB)
# fullData.printSchema()


Out[37]:
DataFrame[asin: string, brand: string, categories: array<array<string>>, description: string, imUrl: string, price: double, related: struct<also_bought:array<string>,also_viewed:array<string>,bought_together:array<string>,buy_after_viewing:array<string>>, salesRank: struct<Arts, Crafts & Sewing:bigint,Automotive:bigint,Baby:bigint,Beauty:bigint,Camera &amp; Photo:bigint,Cell Phones & Accessories:bigint,Clothing:bigint,Computers & Accessories:bigint,Electronics:bigint,Grocery & Gourmet Food:bigint,Health & Personal Care:bigint,Home &amp; Kitchen:bigint,Home Improvement:bigint,Industrial & Scientific:bigint,Jewelry:bigint,Kitchen & Dining:bigint,Magazines:bigint,Movies & TV:bigint,Musical Instruments:bigint,Office Products:bigint,Patio, Lawn & Garden:bigint,Pet Supplies:bigint,Shoes:bigint,Software:bigint,Sports &amp; Outdoors:bigint,Toys & Games:bigint,Video Games:bigint,Watches:bigint>, title: string]

In [44]:
import pickle
f = open('listOfSubcats','r')
listOfSubCats = pickle.load(f)
f.close()

In [ ]:
for category in listOfSubCats:
    selected_asin_category = metadataDB.map(lambda x: (x.asin, x.categories[0])).flatMap(lambda (asin, cats): [(asin, cat) for cat in cats]).filter(lambda (asin,cat): category in cat).map(lambda (asin,cats): (asin)).distinct()
    
    category_reviews = selected_asin_category.map(lambda asin: (asin,0)).join(revDB.map(lambda x: (x.asin,x.reviewText))).map(lambda joined: joined[1][1]).collect()
    
    outfile = open(category+'1612','w')
    print >> outfile, "\n".join(str(i) for i in category_reviews)
    outfile.close()

In [ ]:
#Reading RDD and getting the data
num_part = 16
revs = get_rdd('../data', 'reviews_electronics5000.json', num_part)
rev_texts = revs.map(lambda x: (x['asin'], x['reviewText']))
just_revs = revs.map(lambda x: (x['reviewText']))
just_revs = just_revs.collect()

In [ ]:
#For Generating Vocabulary
#Word count & Vocabulary Building
combined_revs = rev_texts.map(lambda (asin,text): text)
counts = combined_revs.flatMap(lambda line: line.split(" "))
counts = counts.flatMap(lambda word: nltk.word_tokenize(word))
counts = counts.map(lambda word: (word.lower(), 1)).reduceByKey(lambda a, b: a + b)
counts = counts.filter(lambda x: len(x[0]) > 2)
counts = counts.filter(lambda x: x[0].isalnum())
filteredCounts = counts.filter(lambda x: x[0] not in stopwords)
#filteredCounts.sortBy(lambda (word, count): count)
#countsDF = filteredCounts.toDF()

vocabulary = filteredCounts.map(lambda x : x[0]).collect() #Vocab length of 13121

In [ ]:
filteredCounts.toDF().sort(desc("_2")).show(50)

In [ ]:
#For creating BoW model
rev_agg_by_asin = rev_texts.map(lambda (asin, text): (asin, [text])).reduceByKey(lambda x, y: x + y)
clean_agg_revs = rev_agg_by_asin.map(lambda x: (x[0],cleanReviews(x[1])))
clean_agg_revs.cache()
#stopwords = sc.broadcast(stopwords)

In [ ]:
#Initializing 
hashingTF = HashingTF()

from collections import defaultdict
hashMap1 = {}
hashMap2 = {}

for word in vocabulary:
    hashMap1[hashingTF.indexOf(word)] = word
    sparseVec = hashingTF.transform([word])
    hashMap2[sparseVec.indices[0]] = word
    
featurized_reviews = clean_agg_revs.map(lambda x: (x[0], hashingTF.transform(x[1])))
featurized_vocab = hashingTF.transform(vocabulary)

In [ ]:
#LDA Part
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors,DenseVector,SparseVector
numTopics = 5
corpus = featurized_reviews.zipWithIndex().map(lambda x: [x[1], x[0][1]]).cache()
ldaModel = LDA.train(corpus, numTopics)

In [ ]:
vocabSize = ldaModel.vocabSize()
topics = ldaModel.topicsMatrix()
print ("Topics shape ", topics.shape)

In [ ]:
# tempMatrix = DenseMatrix(vocabSize,numTopics,tranTopics.flatten())
# sparseMatrix = tempMatrix.toSparse()
# print (sparseMatrix)
print hashMap1[237727]
print hashMap2[237727]

In [ ]:
#test = niceSparse(sparseMatrix)
for topic_index in range(topics.shape[1]):
    print("Words in topic %d are \n",topic_index)
    for word_index in range(topics.shape[0]):
        if topics[word_index][topic_index] != 0:
            print word_index in hashMap2.keys()

In [ ]:
#SparseMatrix to co-ordinate tuples
def niceSparse(self):    
        niceSparseMat = []
        cur_col = 0
        smlist = []

        zipindval = zip(self.rowIndices, self.values)
        for i, (rowInd, value) in enumerate(zipindval):
            if self.colPtrs[cur_col + 1] <= i:
                cur_col += 1
            if self.isTransposed:
                niceSparseMat.append((cur_col,rowInd,value))
            else:
                niceSparseMat.append((rowInd,cur_col,value))
        return niceSparseMat

In [ ]:
#POS Tagging these items
# posTaggedWords = filteredCounts.map(lambda (word,count): (word,nltk.pos_tag(word)[0][1],count))
#posTaggedWords = posTaggedWords.cache()
#posTaggedWords.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_SER)
# df = pd.DataFrame(posTaggedWords.collect())
# df.to_csv('../data/processed/posTaggedWords_final.csv')

In [ ]:
#Syntax for NLTK
#tokens = nltk.word_tokenize(text)
#tagged = nltk.pos_tag(tokens)
#from nltk.corpus import stopwords
#stopwords.words('english')
#nltk_stopwords = stopwords.words('english')
#other_stopwords = 
#from nltk.corpus import wordnet as wn
# words = data.flatMap(lambda x: nltk.word_tokenize(x))
# print words.take(10)
# pos_word = words.map(lambda x: nltk.pos_tag([x]))
# print pos_word.take(5)

a = featurized_reviews.collect()[1]
print len(a[1].toArray())

In [ ]:
import numpy as np
import lda
import lda.datasets
X = lda.datasets.load_reuters()
vocab = lda.datasets.load_reuters_vocab()
titles = lda.datasets.load_reuters_titles()
print(X.shape)
len(vocab)

In [ ]:
#X.sum()
model = lda.LDA(n_topics=20, n_iter=1500, random_state=1)
model.fit(X)  # model.fit_transform(X) is also available
topic_word = model.topic_word_  # model.components_ also works
n_top_words = 8
for i, topic_dist in enumerate(topic_word):
    topic_words = np.array(vocab)[np.argsort(topic_dist)][:-(n_top_words+1):-1]
    print('Topic {}: {}'.format(i, ' '.join(topic_words)))

In [25]:
# 3. output
# import pandas as pd
# df = pd.DataFrame(items_np.collect())
# df.to_csv('data/processed/computers_kw.csv')
reviews = sc.textFile('Digital SLR Cameras1612_200.txt,NAS_200.txt')
just_clean_reviews = reviews.map(lambda x: cleanReviews([x])).collect()
# just_clean_reviews = clean_agg_revs.map(lambda x: x[1]).collect()

In [26]:
from nltk.tokenize import RegexpTokenizer
from stop_words import get_stop_words
from nltk.stem.porter import PorterStemmer
from gensim import corpora, models
import gensim

tokenizer = RegexpTokenizer(r'\w+')

# create English stop words list
en_stop = get_stop_words('en')

# Create p_stemmer of class PorterStemmer
p_stemmer = PorterStemmer()
    

# list for tokenized documents in loop
texts = []

# loop through document list
for i in just_clean_reviews:
    
    # clean and tokenize document string
    raw = i.lower()
    tokens = tokenizer.tokenize(raw)

    # remove stop words from tokens
    #stopped_tokens = [i for i in tokens if not i in stopwords]
    stopped_tokens = [i for i in tokens]
    
    
    # stem tokens
    #stemmed_tokens = [p_stemmer.stem(i) for i in stopped_tokens]
    stemmed_tokens = [i for i in stopped_tokens]

    # add tokens to list
    texts.append(stemmed_tokens)

# turn our tokenized documents into a id <-> term dictionary
dictionary = corpora.Dictionary(texts)
    
# convert tokenized documents into a document-term matrix
corpus = [dictionary.doc2bow(text) for text in texts]

In [ ]:
# print clean_agg_revs.take(3)

In [27]:
# generate LDA model
ldamodel = gensim.models.ldamodel.LdaModel(corpus, num_topics=2, id2word = dictionary, passes=20)

In [31]:
ldamodel.print_topics(num_topics=2, num_words=25)


Out[31]:
[(0,
  u'0.013*drive + 0.012*device + 0.011*drives + 0.010*nas + 0.008*network + 0.008*files + 0.007*access + 0.007*data + 0.007*server + 0.006*storage + 0.005*raid + 0.005*unit + 0.005*backup + 0.005*media + 0.005*home + 0.005*time + 0.005*2 + 0.005*usb + 0.005*web + 0.004*hard + 0.004*box + 0.004*wd + 0.004*sharespace + 0.004*setup + 0.004*file'),
 (1,
  u'0.034*camera + 0.009*lens + 0.008*nikon + 0.007*lenses + 0.006*quality + 0.005*pentax + 0.005*time + 0.005*pictures + 0.005*digital + 0.004*canon + 0.004*price + 0.004*cameras + 0.004*love + 0.004*data + 0.004*focus + 0.003*easy + 0.003*battery + 0.003*buy + 0.003*d2x + 0.003*lot + 0.003*shoot + 0.003*unit + 0.003*dslr + 0.003*body + 0.003*product')]

In [ ]: