Process Data


Setup


In [39]:
import json
import pyspark as ps
import numpy as np
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.linalg import VectorUDT
from sentimentAnalysis import dataProcessing as dp
from pyspark.sql.functions import lit, collect_list, col, udf
from pyspark.sql.types import ArrayType, StringType, NumericType, DateType
from collections import Counter


Functions


In [8]:
def extract_category(df_cats):
    """
    INPUT: Spark DataFrame
    RETURN: Spark DataFrame
    
    Takes in a DataFrame with a wrapped array "categories"
    column. Extracts first category, drops "categories"
    
    """
    # create udf
    extract_cat_udf = udf(lambda x: x[0][0], StringType())

    # create new column with single category
    df_cat = df_joined.withColumn("category", extract_cat_udf("categories")).drop("categories")
    
    return df_cat


def add_pos_neg_tfidf(df_cat):
    """
    INPUT: Spark DataFrame
    RETURN: Spark DataFrame, List, List
    
    Takes in a DataFrame with review column overall.
    Splits into postitive and negative reviews, adds
    TFIDF vectors for each subset. Returns joined
    DataFrame with all data, list of postive review
    vocabulary terms, and list of negative review
    vocabulary terms
    
    """
    # separate positive and negative reviews
    df_pos = df_cat.where(df_cat.overall >= 4.0).withColumn("positive", lit(True))
    df_neg = df_cat.where(df_cat.overall <= 2.0).withColumn("positive", lit(False))
    
    # tokenize
    df_pos_tk = dp.add_tokens(df_pos).select("asin",
                                             "category",
                                             "overall",
                                             "positive",
                                             "reviewerName",
                                             "unixReviewTime",
                                             "reviewText",
                                             "tokens")
    
    df_neg_tk = dp.add_tokens(df_neg).select("asin",
                                             "category",
                                             "overall",
                                             "positive",
                                             "reviewerName",
                                             "unixReviewTime",
                                             "reviewText",
                                             "tokens")
    
    # get tf, vocab
    df_tf_pos, vocab_pos = dp.add_tf_and_vocab(df_pos_tk)
    df_tf_neg, vocab_neg = dp.add_tf_and_vocab(df_neg_tk)
    
    # add tfidf
    df_tfidf_pos = dp.add_tfidf(df_tf_pos).drop("tf_vector").drop("tokens")
    df_tfidf_neg = dp.add_tfidf(df_tf_neg).drop("tf_vector").drop("tokens")

    
    return df_tfidf_pos.unionAll(df_tfidf_neg), vocab_pos, vocab_neg


def rowToJson(rating, date, name, text):
    """
    INPUT: Float, Int, String, String
    RETURN: String
    
    Converts review variables to json string
    
    """
    row = { "rating": rating,
            "date": date,
            "name": name,
            "text": text
          }
    
    return json.dumps(row)


def add_review_col(df):
    """
    INPUT: Spark DataFrame
    RETURN: Spark DataFrame
    
    Adds column with json string representation
    of review for each review
    
    """
    # create udf
    get_review_udf = udf(lambda a,b,c,d: rowToJson(a,b,c,d), StringType())

    # create new column with review
    df_review = df.withColumn("review", get_review_udf("overall", 
                                                       "unixReviewTime", 
                                                       "reviewerName", 
                                                       "reviewText"))
    
    return df_review


def sum_vectors(vectors):
    """
    INPUT: List of SparseVectors
    RETURN: SparseVector
    
    Sum list of TFIDF vectors element-wise,
    return resulting vector
    
    """
    # check if vectors exist
    if not vectors:
        return None
    
    # iterate over vectors
    sum_vector = vectors[0].toArray()
    vector_size = sum_vector.shape[0]
    
    for i,vector in enumerate(vectors[1:]):
        sum_vector += vector.toArray()
        
    # convert to sparse vector   
    sparse_vector = SparseVector(vector_size, {i:sum_vector[i] for i in np.nonzero(sum_vector)[0]})
    
    return sparse_vector


def add_vectors_sum(df):
    """
    INPUT: Spark DataFrame
    RETURN: Spark DataFrame
    
    Sum list of TFIDF vectors element-wise for
    vectors column, add column for vector sum
    
    """
    # create udf
    sum_vector_udf = udf(lambda vectors: sum_vectors(vectors), VectorUDT())

    # create new column with review
    df_vectors_summed = df.withColumn("tfidf_vectors_sum", sum_vector_udf("vectors")).drop("vectors")
    
    return df_vectors_summed


# CARRY OVER FROM dataProcessing - need to be declared locally
def test_extract_top_features(tfidf_vector, vocab, n):
    """
    INPUT: SparseVector, List, Int
    RETURN: List

    Take in TFIDF vector, vocabulary for vector,
    and number of terms. Return top n terms

    """
    # note - tfidf elements are pre-sorted by importance
    term_indices = tfidf_vector.indices[-n:]

    # Map features to terms
    features = [vocab[i] for i in term_indices]

    return features


def test_add_top_features(df, vocab, n=10):
    """
    INPUT: PySpark DataFrame, List, Int
    RETURN: PySpark DataFrame

    Take in DataFrame with TFIDF vectors, list of vocabulary words,
    and number of features to extract. Map top features from TFIDF
    vectors to vocabulary terms. Return new DataFrame with terms

    """
    # Create udf function to extract top n features
    extract_features_udf = udf(lambda x: test_extract_top_features(x, vocab, n))

    # Apply udf, create new df with features column
    df_features = df.withColumn("topFeatures",
                                    extract_features_udf(df["tfidf_vectors_sum"]))


    return df_features


def test_add_pos_neg_features(df, vocab_pos, vocab_neg, n=10):
    """
    INPUT: Spark DataFrame, List, List, Int
    RETURN: Spark DataFrame

    Take in DataFrame grouped by asin, positive with tfidf vectors summed.
    Extract top positive and negative terms from each group, add features column

    """
    # split dataframe on postitive
    df_pos = df.where(df.positive==True)
    df_neg = df.where(df.positive==False)

    # add features
    df_pos_terms = test_add_top_features(df_pos, vocab_pos, n)
    df_neg_terms = test_add_top_features(df_neg, vocab_neg, n)

    return df_pos_terms.unionAll(df_neg_terms)

In [139]:
def collapse_reviews_terms(df):
    """
    INPUT: Spark DataFrame
    RETURN: Spark DataFrame

    Take in DataFrame with positive and negative reviews,
    split and join with columns for positive and negative
    features

    """
    # split dataframe
    df_pos = df.where(df.positive==True)
    df_pos = df_pos.select(col("asin"),
                           col("category"),
                           col("ratings").alias("posRatings"),
                           col("reviews").alias("posReviews"), 
                           col("topFeatures").alias("posFeatures"))
    
    df_neg = df.where(df.positive==False)
    df_neg = df_neg.select(col("asin"),
                           col("ratings").alias("negRatings"),
                           col("reviews").alias("negReviews"), 
                           col("topFeatures").alias("negFeatures"))
    
    
    # get asin 
    df_asin = df.select("asin").distinct()
    
    
    # join dataframes
    df_joined = df_asin.join(df_pos, df_asin.asin==df_neg.asin, 'outer').drop(df_pos.asin)
    df_joined = df_joined.join(df_neg, df_joined.asin==df_neg.asin, 'outer').drop(df_neg.asin)
    
    
    return df_joined


Script


In [10]:
# create spark session
spark = ps.sql.SparkSession.builder \
    .appName("reviewProcessing") \
    .getOrCreate()

In [14]:
# get dataframes
# specify s3 as sourc with s3a://
# df = spark.read.json("s3a://amazon-review-data/user_dedup.json.gz")
df_meta = spark.read.json("s3a://amazon-review-data/metadata.json.gz")

# get shard
df_toys = spark.read.json("s3a://amazon-review-data/reviews_Toys_and_Games_5.json.gz")

# subset asin, overall, , reviewerName, reviewText
df_subset = df_toys.select("asin", "overall", "reviewerName", "unixReviewTime", "reviewText")

In [19]:
# add metadata
df_joined = dp.join_metadata(df_subset, df_meta).select("asin",
                                                        "title",
                                                        "categories",
                                                        "overall",
                                                        "reviewerName",
                                                        "unixReviewTime",
                                                        "reviewText").persist()

In [64]:
# remove 3 star reviews
df_joined_subset = df_joined.where(df_joined.overall != 3.0)

In [65]:
# extract category, add column
df_cat = extract_category(df_joined_subset)

In [133]:
# get tfidf vectors and vocabularies
df_tfidf, vocab_pos, vocab_neg = add_pos_neg_tfidf(df_cat)

In [134]:
# add json review column
df_review = add_review_col(df_tfidf)

In [135]:
# group by asin, tfidf_vector
df_grouped = df_review.groupBy("asin", "category", "positive").agg(collect_list("review").alias("reviews"), 
                                                                   collect_list("overall").alias("ratings"),
                                                                   collect_list("tfidf_vector").alias("vectors"))

In [136]:
# sum vector lists
df_vectors_summed = add_vectors_sum(df_grouped)

In [137]:
# add terms
# note - udf function that relies on local module functions does not work b/c module does not exist on workers

df_terms = test_add_pos_neg_features(df_vectors_summed, vocab_pos, vocab_neg, n=15).drop("tfidf_vectors_sum")

In [140]:
# add ratings, collapse df
df_ratings = collapse_reviews_terms(df_terms)

In [141]:
df_ratings.show(3)


+----------+------------+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+
|      asin|    category|          posRatings|          posReviews|         posFeatures|     negRatings|          negReviews|         negFeatures|
+----------+------------+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+
|1556345542|Toys & Games|[4.0, 4.0, 5.0, 5...|[{"date": 1023062...|[cheek, humans, c...|           null|                null|                null|
|B0007WWZIG|Toys & Games|          [4.0, 5.0]|[{"date": 1136246...|[charging, miss, ...|     [1.0, 1.0]|[{"date": 1138752...|[charging, we'll,...|
|B0007Y4DLG|Toys & Games|[5.0, 5.0, 5.0, 5...|[{"date": 1366761...|[purchaser, md, a...|[2.0, 2.0, 1.0]|[{"date": 1388620...|[household, assum...|
+----------+------------+--------------------+--------------------+--------------------+---------------+--------------------+--------------------+
only showing top 3 rows


Concatenate ratings


In [117]:
def get_ratings_counter(posRatings, negRatings):
    count = Counter(posRatings)
    count.update(negRatings)
    return count

def concatenate_ratings(df):
    """
    INPUT: Spark DataFrame
    RETURN: Spark DataFrame
    
    Takes in a DataFrame with list columns posRatings
    and negRatings. Concatenates into ratings column
    with counter
    
    """
    # create udf
    get_ratings_udf = udf(lambda x,y: json.dumps(get_ratings_counter(x,y)), StringType())

    # create new column with single category
    df_count = df.withColumn("ratings", get_ratings_udf("posRatings", "negRatings"))
    
    return df_count.drop("posRatings").drop("negRatings")

In [142]:
df_ratings_concat = concatenate_ratings(df_ratings)

In [143]:
# join titles from df_meta
df_final = df_ratings_concat.join(df_meta.select("asin", "title"), df_ratings_concat.asin==df_meta.asin).drop(df_meta.asin)

In [144]:
df_final.show(3)


+----------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      asin|    category|          posReviews|         posFeatures|          negReviews|         negFeatures|             ratings|               title|
+----------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|1616613807|Toys & Games|[{"date": 1369699...|[evade, rulesheet...|                null|                null|{"4.0": 4, "5.0":...|Star Wars X-Wing:...|
|B00000IZKX|Toys & Games|[{"date": 1335916...|[iteration, coil,...|[{"date": 1368748...|[2011, slinky, ap...|{"1.0": 5, "2.0":...|POOF-Slinky Model...|
|B000067BIE|Toys & Games|[{"date": 1365033...|[talked, exceeded...|                null|                null|{"4.0": 2, "5.0": 6}|Melissa &amp; Dou...|
+----------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows


Save DF


In [145]:
df_final.write.mode('append').json("s3a://amazon-review-data/review-data/parts-minDF_5/")

In [127]:
df_final.select("asin").distinct().count()


Out[127]:
11906

In [ ]:


In [ ]: