In [1]:
import pyspark
import json
import pandas as pd
import numpy as np
import amzn_reviews_cleaner_funcs as amzn
from pyspark.sql import SparkSession
%autoreload 2
In [2]:
# create spark session
spark = SparkSession(sc)
In [3]:
# get dataframe
# specify s3 as sourc with s3a://
df = spark.read.json("s3a://amazon-review-data/reviews_Musical_Instruments_5.json.gz")
df.show(3)
In [27]:
df_text = df.select("asin", "reviewerID", "overall", "reviewText")
df_text.show(3)
In [24]:
from pyspark.ml.feature import Tokenizer, CountVectorizer, StopWordsRemover, NGram, IDF
from nltk.corpus import stopwords
In [28]:
tokenizer = Tokenizer(inputCol="reviewText", outputCol="raw_tokens")
df_raw_tokens = tokenizer.transform(df_text)
df_raw_tokens.show(3)
In [29]:
remover = StopWordsRemover(inputCol="raw_tokens", outputCol="tokens", stopWords=stopwords.words("english"))
df_tokens = remover.transform(df_raw_tokens)
df_tokens.show(3)
In [30]:
cv = CountVectorizer(inputCol="tokens", outputCol="tf_vectors")
tf_model = cv.fit(df_tokens)
df_tf = tf_model.transform(df_tokens)
df_tf.show(3)
In [21]:
vocab = tf_model.vocabulary
vocab[:10]
Out[21]:
In [31]:
idf = IDF(inputCol="tf_vectors", outputCol="tfidf_vectors")
idf_model = idf.fit(df_tf)
df_idf = idf_model.transform(df_tf)
df_idf.select("asin", "tf_vectors", "tfidf_vectors").show(3)
In [32]:
test_row = df_idf.first()
In [35]:
test_row["tf_vectors"]
Out[35]:
In [48]:
test_tf_vect = test_row["tf_vectors"]
In [50]:
row_terms = []
for i in test_tf_vect.indices:
row_terms.append(vocab[i])
In [51]:
row_terms
Out[51]:
In [53]:
test_row["reviewText"]
Out[53]:
In [ ]: