In [39]:
import json
import pyspark as ps
import numpy as np
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.linalg import VectorUDT
from sentimentAnalysis import dataProcessing as dp
from pyspark.sql.functions import lit, collect_list, col, udf
from pyspark.sql.types import ArrayType, StringType, NumericType, DateType
from collections import Counter
In [8]:
def extract_category(df_cats):
"""
INPUT: Spark DataFrame
RETURN: Spark DataFrame
Takes in a DataFrame with a wrapped array "categories"
column. Extracts first category, drops "categories"
"""
# create udf
extract_cat_udf = udf(lambda x: x[0][0], StringType())
# create new column with single category
df_cat = df_joined.withColumn("category", extract_cat_udf("categories")).drop("categories")
return df_cat
def add_pos_neg_tfidf(df_cat):
"""
INPUT: Spark DataFrame
RETURN: Spark DataFrame, List, List
Takes in a DataFrame with review column overall.
Splits into postitive and negative reviews, adds
TFIDF vectors for each subset. Returns joined
DataFrame with all data, list of postive review
vocabulary terms, and list of negative review
vocabulary terms
"""
# separate positive and negative reviews
df_pos = df_cat.where(df_cat.overall >= 4.0).withColumn("positive", lit(True))
df_neg = df_cat.where(df_cat.overall <= 2.0).withColumn("positive", lit(False))
# tokenize
df_pos_tk = dp.add_tokens(df_pos).select("asin",
"category",
"overall",
"positive",
"reviewerName",
"unixReviewTime",
"reviewText",
"tokens")
df_neg_tk = dp.add_tokens(df_neg).select("asin",
"category",
"overall",
"positive",
"reviewerName",
"unixReviewTime",
"reviewText",
"tokens")
# get tf, vocab
df_tf_pos, vocab_pos = dp.add_tf_and_vocab(df_pos_tk)
df_tf_neg, vocab_neg = dp.add_tf_and_vocab(df_neg_tk)
# add tfidf
df_tfidf_pos = dp.add_tfidf(df_tf_pos).drop("tf_vector").drop("tokens")
df_tfidf_neg = dp.add_tfidf(df_tf_neg).drop("tf_vector").drop("tokens")
return df_tfidf_pos.unionAll(df_tfidf_neg), vocab_pos, vocab_neg
def rowToJson(rating, date, name, text):
"""
INPUT: Float, Int, String, String
RETURN: String
Converts review variables to json string
"""
row = { "rating": rating,
"date": date,
"name": name,
"text": text
}
return json.dumps(row)
def add_review_col(df):
"""
INPUT: Spark DataFrame
RETURN: Spark DataFrame
Adds column with json string representation
of review for each review
"""
# create udf
get_review_udf = udf(lambda a,b,c,d: rowToJson(a,b,c,d), StringType())
# create new column with review
df_review = df.withColumn("review", get_review_udf("overall",
"unixReviewTime",
"reviewerName",
"reviewText"))
return df_review
def sum_vectors(vectors):
"""
INPUT: List of SparseVectors
RETURN: SparseVector
Sum list of TFIDF vectors element-wise,
return resulting vector
"""
# check if vectors exist
if not vectors:
return None
# iterate over vectors
sum_vector = vectors[0].toArray()
vector_size = sum_vector.shape[0]
for i,vector in enumerate(vectors[1:]):
sum_vector += vector.toArray()
# convert to sparse vector
sparse_vector = SparseVector(vector_size, {i:sum_vector[i] for i in np.nonzero(sum_vector)[0]})
return sparse_vector
def add_vectors_sum(df):
"""
INPUT: Spark DataFrame
RETURN: Spark DataFrame
Sum list of TFIDF vectors element-wise for
vectors column, add column for vector sum
"""
# create udf
sum_vector_udf = udf(lambda vectors: sum_vectors(vectors), VectorUDT())
# create new column with review
df_vectors_summed = df.withColumn("tfidf_vectors_sum", sum_vector_udf("vectors")).drop("vectors")
return df_vectors_summed
# CARRY OVER FROM dataProcessing - need to be declared locally
def test_extract_top_features(tfidf_vector, vocab, n):
"""
INPUT: SparseVector, List, Int
RETURN: List
Take in TFIDF vector, vocabulary for vector,
and number of terms. Return top n terms
"""
# note - tfidf elements are pre-sorted by importance
term_indices = tfidf_vector.indices[-n:]
# Map features to terms
features = [vocab[i] for i in term_indices]
return features
def test_add_top_features(df, vocab, n=10):
"""
INPUT: PySpark DataFrame, List, Int
RETURN: PySpark DataFrame
Take in DataFrame with TFIDF vectors, list of vocabulary words,
and number of features to extract. Map top features from TFIDF
vectors to vocabulary terms. Return new DataFrame with terms
"""
# Create udf function to extract top n features
extract_features_udf = udf(lambda x: test_extract_top_features(x, vocab, n))
# Apply udf, create new df with features column
df_features = df.withColumn("topFeatures",
extract_features_udf(df["tfidf_vectors_sum"]))
return df_features
def test_add_pos_neg_features(df, vocab_pos, vocab_neg, n=10):
"""
INPUT: Spark DataFrame, List, List, Int
RETURN: Spark DataFrame
Take in DataFrame grouped by asin, positive with tfidf vectors summed.
Extract top positive and negative terms from each group, add features column
"""
# split dataframe on postitive
df_pos = df.where(df.positive==True)
df_neg = df.where(df.positive==False)
# add features
df_pos_terms = test_add_top_features(df_pos, vocab_pos, n)
df_neg_terms = test_add_top_features(df_neg, vocab_neg, n)
return df_pos_terms.unionAll(df_neg_terms)
In [139]:
def collapse_reviews_terms(df):
"""
INPUT: Spark DataFrame
RETURN: Spark DataFrame
Take in DataFrame with positive and negative reviews,
split and join with columns for positive and negative
features
"""
# split dataframe
df_pos = df.where(df.positive==True)
df_pos = df_pos.select(col("asin"),
col("category"),
col("ratings").alias("posRatings"),
col("reviews").alias("posReviews"),
col("topFeatures").alias("posFeatures"))
df_neg = df.where(df.positive==False)
df_neg = df_neg.select(col("asin"),
col("ratings").alias("negRatings"),
col("reviews").alias("negReviews"),
col("topFeatures").alias("negFeatures"))
# get asin
df_asin = df.select("asin").distinct()
# join dataframes
df_joined = df_asin.join(df_pos, df_asin.asin==df_neg.asin, 'outer').drop(df_pos.asin)
df_joined = df_joined.join(df_neg, df_joined.asin==df_neg.asin, 'outer').drop(df_neg.asin)
return df_joined
In [10]:
# create spark session
spark = ps.sql.SparkSession.builder \
.appName("reviewProcessing") \
.getOrCreate()
In [14]:
# get dataframes
# specify s3 as sourc with s3a://
# df = spark.read.json("s3a://amazon-review-data/user_dedup.json.gz")
df_meta = spark.read.json("s3a://amazon-review-data/metadata.json.gz")
# get shard
df_toys = spark.read.json("s3a://amazon-review-data/reviews_Toys_and_Games_5.json.gz")
# subset asin, overall, , reviewerName, reviewText
df_subset = df_toys.select("asin", "overall", "reviewerName", "unixReviewTime", "reviewText")
In [19]:
# add metadata
df_joined = dp.join_metadata(df_subset, df_meta).select("asin",
"title",
"categories",
"overall",
"reviewerName",
"unixReviewTime",
"reviewText").persist()
In [64]:
# remove 3 star reviews
df_joined_subset = df_joined.where(df_joined.overall != 3.0)
In [65]:
# extract category, add column
df_cat = extract_category(df_joined_subset)
In [133]:
# get tfidf vectors and vocabularies
df_tfidf, vocab_pos, vocab_neg = add_pos_neg_tfidf(df_cat)
In [134]:
# add json review column
df_review = add_review_col(df_tfidf)
In [135]:
# group by asin, tfidf_vector
df_grouped = df_review.groupBy("asin", "category", "positive").agg(collect_list("review").alias("reviews"),
collect_list("overall").alias("ratings"),
collect_list("tfidf_vector").alias("vectors"))
In [136]:
# sum vector lists
df_vectors_summed = add_vectors_sum(df_grouped)
In [137]:
# add terms
# note - udf function that relies on local module functions does not work b/c module does not exist on workers
df_terms = test_add_pos_neg_features(df_vectors_summed, vocab_pos, vocab_neg, n=15).drop("tfidf_vectors_sum")
In [140]:
# add ratings, collapse df
df_ratings = collapse_reviews_terms(df_terms)
In [141]:
df_ratings.show(3)
In [117]:
def get_ratings_counter(posRatings, negRatings):
count = Counter(posRatings)
count.update(negRatings)
return count
def concatenate_ratings(df):
"""
INPUT: Spark DataFrame
RETURN: Spark DataFrame
Takes in a DataFrame with list columns posRatings
and negRatings. Concatenates into ratings column
with counter
"""
# create udf
get_ratings_udf = udf(lambda x,y: json.dumps(get_ratings_counter(x,y)), StringType())
# create new column with single category
df_count = df.withColumn("ratings", get_ratings_udf("posRatings", "negRatings"))
return df_count.drop("posRatings").drop("negRatings")
In [142]:
df_ratings_concat = concatenate_ratings(df_ratings)
In [143]:
# join titles from df_meta
df_final = df_ratings_concat.join(df_meta.select("asin", "title"), df_ratings_concat.asin==df_meta.asin).drop(df_meta.asin)
In [144]:
df_final.show(3)
In [145]:
df_final.write.mode('append').json("s3a://amazon-review-data/review-data/parts-minDF_5/")
In [127]:
df_final.select("asin").distinct().count()
Out[127]:
In [ ]:
In [ ]: