In [34]:
import pyspark as ps
from sentimentAnalysis import dataProcessing as dp
In [35]:
reload(dp)
Out[35]:
In [2]:
# create spark session
spark = ps.sql.SparkSession.builder \
.appName("reviewProcessing") \
.getOrCreate()
In [3]:
# get dataframes
# specify s3 as sourc with s3a://
# df = spark.read.json("s3a://amazon-review-data/user_dedup.json.gz")
df_meta = spark.read.json("s3a://amazon-review-data/metadata.json.gz")
In [4]:
# get shard
df_mi = spark.read.json("s3a://amazon-review-data/reviews_Musical_Instruments_5.json.gz")
In [5]:
# subset asin, overall, , reviewerName, reviewText
df_subset = df_mi.select("asin", "overall", "reviewerName", "unixReviewTime", "reviewText")
In [6]:
# add metadata
df_joined = dp.join_metadata(df_subset, df_meta).select("asin",
"title",
"categories",
"overall",
"reviewerName",
"unixReviewTime",
"reviewText").persist()
In [7]:
# count reviews
df_joined.count()
Out[7]:
In [9]:
# remove reviews
df_joined_subset = df_joined.where(df_joined.overall != 3.0)
# check reviews were removed
df_joined_subset.count()
Out[9]:
In [114]:
# check df categories
df_joined_subset.show(3)
In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, NumericType, DateType
In [11]:
def extract_category(df_cats):
"""
INPUT: Spark DataFrame
RETURN: Spark DataFrame
Takes in a DataFrame with a wrapped array "categories"
column. Extracts first category, drops "categories"
"""
# create udf
extract_cat_udf = udf(lambda x: x[0][0], StringType())
# create new column with single category
df_cat = df_joined.withColumn("category", extract_cat_udf("categories")).drop("categories")
return df_cat
In [12]:
# test function
df_cat = extract_category(df_joined_subset)
df_cat.show(3)
In [26]:
from pyspark.sql.functions import lit
def add_pos_neg_tfidf(df_cat):
"""
INPUT: Spark DataFrame
RETURN: Spark DataFrame, List, List
Takes in a DataFrame with review column overall.
Splits into postitive and negative reviews, adds
TFIDF vectors for each subset. Returns joined
DataFrame with all data, list of postive review
vocabulary terms, and list of negative review
vocabulary terms
"""
# separate positive and negative reviews
df_pos = df_cat.where(df_cat.overall >= 4.0).withColumn("positive", lit(True))
df_neg = df_cat.where(df_cat.overall <= 2.0).withColumn("positive", lit(False))
# tokenize
df_pos_tk = dp.add_tokens(df_pos).select("asin",
"category",
"overall",
"positive",
"reviewerName",
"unixReviewTime",
"reviewText",
"tokens")
df_neg_tk = dp.add_tokens(df_neg).select("asin",
"category",
"overall",
"positive",
"reviewerName",
"unixReviewTime",
"reviewText",
"tokens")
# get tf, vocab
df_tf_pos, vocab_pos = dp.add_tf_and_vocab(df_pos_tk)
df_tf_neg, vocab_neg = dp.add_tf_and_vocab(df_neg_tk)
# add tfidf
df_tfidf_pos = dp.add_tfidf(df_tf_pos).drop("tf_vector").drop("tokens")
df_tfidf_neg = dp.add_tfidf(df_tf_neg).drop("tf_vector").drop("tokens")
return df_tfidf_pos.unionAll(df_tfidf_neg), vocab_pos, vocab_neg
In [27]:
# test function
df_tfidf, vocab_pos, vocab_neg = add_pos_neg_tfidf(df_cat)
df_tfidf.show(3)
In [18]:
# import collect_list function
from pyspark.sql.functions import collect_list
import json
In [19]:
# create json review from row
def rowToJson(rating, date, name, text):
row = { "rating": rating,
"date": date,
"name": name,
"text": text
}
return json.dumps(row)
# create review column
def add_review_col(df):
# create udf
get_review_udf = udf(lambda a,b,c,d: rowToJson(a,b,c,d), StringType())
# create new column with review
df_review = df.withColumn("review", get_review_udf("overall",
"unixReviewTime",
"reviewerName",
"reviewText"))
return df_review
In [20]:
# test function
df_review = add_review_col(df_tfidf)
df_review.show(3)
In [176]:
test_row = df_review.first()
test_row["review"]
Out[176]:
In [62]:
# group by asin, sum tfidf_vectors
df_grouped = df_review.groupBy("asin", "category", "positive").agg(collect_list("review").alias("reviews"),
collect_list("tfidf_vector").alias("vectors"))
df_grouped.show(3)
In [63]:
# get test row
test_row = df_grouped.first()
In [64]:
test_vectors = test_row["vectors"]
In [65]:
test_vectors[:3]
Out[65]:
In [22]:
from pyspark.mllib.linalg import SparseVector
import numpy as np
def sum_vectors(vectors):
"""
INPUT: List of SparseVectors
RETURN: SparseVector
Sum list of TFIDF vectors element-wise,
return resulting vector
"""
# check if vectors exist
if not vectors:
return None
# iterate over vectors
sum_vector = vectors[0].toArray()
vector_size = sum_vector.shape[0]
for i,vector in enumerate(vectors[1:]):
sum_vector += vector.toArray()
# convert to sparse vector
sparse_vector = SparseVector(vector_size, {i:sum_vector[i] for i in np.nonzero(sum_vector)[0]})
return sparse_vector
In [229]:
# examine two vectors
test_vectors[0]
Out[229]:
In [223]:
test_vectors[1]
Out[223]:
In [265]:
# test sum of first two vectors
sum_vectors(test_vectors)
Out[265]:
In [66]:
from pyspark.mllib.linalg import VectorUDT
# sum vectors
def add_vectors_sum(df):
# create udf
sum_vector_udf = udf(lambda vectors: sum_vectors(vectors), VectorUDT())
# create new column with review
df_vectors_summed = df.withColumn("tfidf_vectors_sum", sum_vector_udf("vectors")).drop("vectors")
return df_vectors_summed
In [67]:
# test
df_vectors_summed = add_vectors_sum(df_grouped)
df_vectors_summed.show(3)
In [69]:
# get top terms
"""
from sentimentAnalysis.dataProcessing import add_pos_neg_features
df_terms = add_pos_neg_features(df_vectors_summed, vocab_pos, vocab_neg, n=15).drop("tfidf_vectors_sum")
df_terms.show(3)
"""
Out[69]:
In [70]:
def test_extract_top_features(tfidf_vector, vocab, n):
"""
INPUT: SparseVector, List, Int
RETURN: List
Take in TFIDF vector, vocabulary for vector,
and number of terms. Return top n terms
"""
# note - tfidf elements are pre-sorted by importance
term_indices = tfidf_vector.indices[-n:]
# Map features to terms
features = [vocab[i] for i in term_indices]
return features
def test_add_top_features(df, vocab, n=10):
"""
INPUT: PySpark DataFrame, List, Int
RETURN: PySpark DataFrame
Take in DataFrame with TFIDF vectors, list of vocabulary words,
and number of features to extract. Map top features from TFIDF
vectors to vocabulary terms. Return new DataFrame with terms
"""
# Create udf function to extract top n features
extract_features_udf = udf(lambda x: test_extract_top_features(x, vocab, n))
# Apply udf, create new df with features column
df_features = df.withColumn("topFeatures",
extract_features_udf(df["tfidf_vectors_sum"]))
return df_features
def test_add_pos_neg_features(df, vocab_pos, vocab_neg, n=10):
"""
INPUT: Spark DataFrame, List, List, Int
RETURN: Spark DataFrame
Take in DataFrame grouped by asin, positive with tfidf vectors summed.
Extract top positive and negative terms from each group, add features column
"""
# split dataframe on postitive
df_pos = df.where(df.positive==True)
df_neg = df.where(df.positive==False)
# add features
df_pos_terms = test_add_top_features(df_pos, vocab_pos, n)
df_neg_terms = test_add_top_features(df_neg, vocab_neg, n)
return df_pos_terms.unionAll(df_neg_terms)
In [71]:
# add terms
# note - udf function that relies on local module functions does not work b/c module does not exist on workers
df_terms = test_add_pos_neg_features(df_vectors_summed, vocab_pos, vocab_neg, n=15).drop("tfidf_vectors_sum")
In [72]:
df_terms.show(3)
In [102]:
# group by asin
df_terms_grouped = df_terms.groupBy("asin", "category").agg(collect_list("reviews").alias("reviews"),
collect_list("topFeatures").alias("topFeatures"))
df_terms_grouped.show(1)
In [ ]:
In [ ]:
In [148]:
df_terms.select("asin",
col("reviews").alias("negReviews"),
col("topFeatures").alias("negFeatures")).show(3)
In [162]:
from pyspark.sql.functions import col
In [189]:
# create review column
def collapse_reviews_terms(df):
# split dataframe
df_pos = df.where(df.positive==True)
df_pos = df_pos.select(col("asin"),
col("category"),
col("reviews").alias("posReviews"),
col("topFeatures").alias("posFeatures"))
df_neg = df.where(df.positive==False)
df_neg = df_neg.select(col("asin"),
col("reviews").alias("negReviews"),
col("topFeatures").alias("negFeatures"))
# get asin
df_asin = df.select("asin").distinct()
# join dataframes
df_final = df_asin.join(df_pos, df_asin.asin==df_neg.asin, 'outer').drop(df_pos.asin)
df_final = df_final.join(df_neg, df_final.asin==df_neg.asin, 'outer').drop(df_neg.asin)
return df_final
In [ ]:
In [190]:
df_final = collapse_reviews_terms(df_terms)
In [192]:
df_final.show(3)
In [ ]:
In [ ]:
In [ ]:
# modules needed
#from pyspark.sql.functions import udf
#from pyspark.sql.types import ArrayType, StringType
#from sentimentAnalysis import dataProcessing as dp
In [ ]:
In [ ]:
In [193]:
# unpersist old dataframes
df_cat.unpersist()
Out[193]:
In [195]:
# end session
spark.stop()
In [ ]: