Script Development - processData.py

Development notebook for script to group by category and asin, split by pos and neg reviews, and add top positive and negative features.


Setup


In [34]:
import pyspark as ps
from sentimentAnalysis import dataProcessing as dp

In [35]:
reload(dp)


Out[35]:
<module 'sentimentAnalysis.dataProcessing' from 'sentimentAnalysis/dataProcessing.pyc'>

In [2]:
# create spark session
spark = ps.sql.SparkSession.builder \
    .appName("reviewProcessing") \
    .getOrCreate()

In [3]:
# get dataframes
# specify s3 as sourc with s3a://
# df = spark.read.json("s3a://amazon-review-data/user_dedup.json.gz")
df_meta = spark.read.json("s3a://amazon-review-data/metadata.json.gz")

In [4]:
# get shard
df_mi = spark.read.json("s3a://amazon-review-data/reviews_Musical_Instruments_5.json.gz")

In [5]:
# subset asin, overall, , reviewerName, reviewText
df_subset = df_mi.select("asin", "overall", "reviewerName", "unixReviewTime", "reviewText")

In [6]:
# add metadata
df_joined = dp.join_metadata(df_subset, df_meta).select("asin",
                                                        "title",
                                                        "categories",
                                                        "overall",
                                                        "reviewerName",
                                                        "unixReviewTime",
                                                        "reviewText").persist()


Development

Remove 3 star reviews


In [7]:
# count reviews
df_joined.count()


Out[7]:
10261

In [9]:
# remove reviews
df_joined_subset = df_joined.where(df_joined.overall != 3.0)

# check reviews were removed
df_joined_subset.count()


Out[9]:
9489

Extract category


In [114]:
# check df categories
df_joined_subset.show(3)


+----------+--------------------+-------+--------------------+--------------+--------------------+
|      asin|          categories|overall|        reviewerName|unixReviewTime|          reviewText|
+----------+--------------------+-------+--------------------+--------------+--------------------+
|1384719342|[WrappedArray(Mus...|    5.0|       SEAN MASLANKA|    1392940800|This pop filter i...|
|1384719342|[WrappedArray(Mus...|    5.0|RustyBill "Sunday...|    1392336000|Nice windscreen p...|
|1384719342|[WrappedArray(Mus...|    5.0|Rick Bennette "Ri...|    1377648000|The primary job o...|
+----------+--------------------+-------+--------------------+--------------+--------------------+
only showing top 3 rows


In [10]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, NumericType, DateType

In [11]:
def extract_category(df_cats):
    """
    INPUT: Spark DataFrame
    RETURN: Spark DataFrame
    
    Takes in a DataFrame with a wrapped array "categories"
    column. Extracts first category, drops "categories"
    
    """
    # create udf
    extract_cat_udf = udf(lambda x: x[0][0], StringType())

    # create new column with single category
    df_cat = df_joined.withColumn("category", extract_cat_udf("categories")).drop("categories")
    
    return df_cat

In [12]:
# test function
df_cat = extract_category(df_joined_subset)

df_cat.show(3)


+----------+-------+--------------------+--------------+--------------------+-------------------+
|      asin|overall|        reviewerName|unixReviewTime|          reviewText|           category|
+----------+-------+--------------------+--------------+--------------------+-------------------+
|1384719342|    5.0|       SEAN MASLANKA|    1392940800|This pop filter i...|Musical Instruments|
|1384719342|    5.0|RustyBill "Sunday...|    1392336000|Nice windscreen p...|Musical Instruments|
|1384719342|    5.0|Rick Bennette "Ri...|    1377648000|The primary job o...|Musical Instruments|
+----------+-------+--------------------+--------------+--------------------+-------------------+
only showing top 3 rows

Separate pos and neg, add tfidf vectors


In [26]:
from pyspark.sql.functions import lit

def add_pos_neg_tfidf(df_cat):
    """
    INPUT: Spark DataFrame
    RETURN: Spark DataFrame, List, List
    
    Takes in a DataFrame with review column overall.
    Splits into postitive and negative reviews, adds
    TFIDF vectors for each subset. Returns joined
    DataFrame with all data, list of postive review
    vocabulary terms, and list of negative review
    vocabulary terms
    
    """
    # separate positive and negative reviews
    df_pos = df_cat.where(df_cat.overall >= 4.0).withColumn("positive", lit(True))
    df_neg = df_cat.where(df_cat.overall <= 2.0).withColumn("positive", lit(False))
    
    # tokenize
    df_pos_tk = dp.add_tokens(df_pos).select("asin",
                                             "category",
                                             "overall",
                                             "positive",
                                             "reviewerName",
                                             "unixReviewTime",
                                             "reviewText",
                                             "tokens")
    
    df_neg_tk = dp.add_tokens(df_neg).select("asin",
                                             "category",
                                             "overall",
                                             "positive",
                                             "reviewerName",
                                             "unixReviewTime",
                                             "reviewText",
                                             "tokens")
    
    # get tf, vocab
    df_tf_pos, vocab_pos = dp.add_tf_and_vocab(df_pos_tk)
    df_tf_neg, vocab_neg = dp.add_tf_and_vocab(df_neg_tk)
    
    # add tfidf
    df_tfidf_pos = dp.add_tfidf(df_tf_pos).drop("tf_vector").drop("tokens")
    df_tfidf_neg = dp.add_tfidf(df_tf_neg).drop("tf_vector").drop("tokens")

    
    return df_tfidf_pos.unionAll(df_tfidf_neg), vocab_pos, vocab_neg

In [27]:
# test function
df_tfidf, vocab_pos, vocab_neg = add_pos_neg_tfidf(df_cat)

df_tfidf.show(3)


+----------+-------------------+-------+--------+--------------------+--------------+--------------------+--------------------+
|      asin|           category|overall|positive|        reviewerName|unixReviewTime|          reviewText|        tfidf_vector|
+----------+-------------------+-------+--------+--------------------+--------------+--------------------+--------------------+
|1384719342|Musical Instruments|    5.0|    true|       SEAN MASLANKA|    1392940800|This pop filter i...|(19899,[2,3,117,1...|
|1384719342|Musical Instruments|    5.0|    true|RustyBill "Sunday...|    1392336000|Nice windscreen p...|(19899,[22,36,60,...|
|1384719342|Musical Instruments|    5.0|    true|Rick Bennette "Ri...|    1377648000|The primary job o...|(19899,[6,14,16,3...|
+----------+-------------------+-------+--------+--------------------+--------------+--------------------+--------------------+
only showing top 3 rows

Group by product, collect list of reviews

Add review column


In [18]:
# import collect_list function
from pyspark.sql.functions import collect_list
import json

In [19]:
# create json review from row
def rowToJson(rating, date, name, text):
    row = { "rating": rating,
            "date": date,
            "name": name,
            "text": text
          }
    
    return json.dumps(row)

# create review column
def add_review_col(df):
    # create udf
    get_review_udf = udf(lambda a,b,c,d: rowToJson(a,b,c,d), StringType())

    # create new column with review
    df_review = df.withColumn("review", get_review_udf("overall", 
                                                       "unixReviewTime", 
                                                       "reviewerName", 
                                                       "reviewText"))
    
    return df_review

In [20]:
# test function
df_review = add_review_col(df_tfidf)

df_review.show(3)


+----------+-------------------+-------+--------+--------------------+--------------+--------------------+--------------------+--------------------+
|      asin|           category|overall|positive|        reviewerName|unixReviewTime|          reviewText|        tfidf_vector|              review|
+----------+-------------------+-------+--------+--------------------+--------------+--------------------+--------------------+--------------------+
|1384719342|Musical Instruments|    5.0|    true|       SEAN MASLANKA|    1392940800|This pop filter i...|(19899,[2,3,117,1...|{"date": 13929408...|
|1384719342|Musical Instruments|    5.0|    true|RustyBill "Sunday...|    1392336000|Nice windscreen p...|(19899,[22,36,60,...|{"date": 13923360...|
|1384719342|Musical Instruments|    5.0|    true|Rick Bennette "Ri...|    1377648000|The primary job o...|(19899,[6,14,16,3...|{"date": 13776480...|
+----------+-------------------+-------+--------+--------------------+--------------+--------------------+--------------------+--------------------+
only showing top 3 rows


In [176]:
test_row = df_review.first()

test_row["review"]


Out[176]:
u'{"date": 1392940800, "rating": 5.0, "name": "SEAN MASLANKA", "text": "This pop filter is great. It looks and performs like a studio filter. If you\'re recording vocals this will eliminate the pops that gets recorded when you sing."}'

Group by asin, positive, then concatenate reviews, sum tfidf vectors


In [62]:
# group by asin, sum tfidf_vectors
df_grouped = df_review.groupBy("asin", "category", "positive").agg(collect_list("review").alias("reviews"),
                              collect_list("tfidf_vector").alias("vectors"))

df_grouped.show(3)


+----------+-------------------+--------+--------------------+--------------------+
|      asin|           category|positive|             reviews|             vectors|
+----------+-------------------+--------+--------------------+--------------------+
|B000068NW5|Musical Instruments|   false|[{"date": 1394755...|[(5030,[2,3,13,23...|
|B0002CZTIO|Musical Instruments|   false|[{"date": 1390608...|[(5030,[0,6,8,26,...|
|B0002E3DGC|Musical Instruments|   false|[{"date": 1332374...|[(5030,[0,1,8,15,...|
+----------+-------------------+--------+--------------------+--------------------+
only showing top 3 rows


In [63]:
# get test row
test_row = df_grouped.first()

In [64]:
test_vectors = test_row["vectors"]

In [65]:
test_vectors[:3]


Out[65]:
[SparseVector(5030, {2: 1.2358, 3: 1.2506, 13: 1.6376, 23: 1.9588, 32: 2.0213, 34: 1.9, 38: 2.0213, 129: 3.1039, 157: 2.8526, 159: 2.8163, 260: 3.3759, 317: 3.4404, 398: 3.5835, 454: 3.5835, 558: 3.7506, 2109: 5.0499, 2473: 5.0499, 3809: 5.4553}),
 SparseVector(5030, {0: 1.0922, 7: 1.3202, 8: 1.43, 10: 1.5841, 34: 1.9, 59: 2.2983, 64: 2.4596, 77: 2.3873, 196: 3.0574, 220: 3.0574, 257: 3.3153, 400: 3.5835, 428: 3.5094, 542: 3.7506, 583: 3.8459, 832: 4.3567, 870: 4.3567, 1053: 4.3567, 1079: 4.539, 1124: 4.539, 3113: 5.4553}),
 SparseVector(5030, {1: 1.2212, 3: 1.2506, 6: 3.0866, 17: 1.7918, 25: 2.1595, 36: 2.1411, 41: 2.2773, 46: 7.9559, 48: 2.1411, 69: 2.3873, 93: 2.4349, 95: 2.5376, 140: 2.7812, 167: 2.9296, 171: 2.9704, 194: 2.8904, 199: 2.9704, 205: 3.1527, 206: 3.013, 219: 3.5094, 238: 3.1039, 260: 3.3759, 264: 3.2581, 284: 3.2581, 292: 3.7506, 299: 3.3759, 342: 3.3153, 359: 3.4404, 462: 3.6636, 465: 3.5835, 527: 3.7506, 579: 3.8459, 717: 4.2026, 854: 4.2026, 1057: 4.3567, 1127: 4.539, 1176: 4.539, 1641: 4.7622, 1719: 4.7622, 2172: 5.0499, 2493: 5.0499, 4491: 5.4553})]

In [22]:
from pyspark.mllib.linalg import SparseVector
import numpy as np

def sum_vectors(vectors):
    """
    INPUT: List of SparseVectors
    RETURN: SparseVector
    
    Sum list of TFIDF vectors element-wise,
    return resulting vector
    
    """
    # check if vectors exist
    if not vectors:
        return None
    
    # iterate over vectors
    sum_vector = vectors[0].toArray()
    vector_size = sum_vector.shape[0]
    
    for i,vector in enumerate(vectors[1:]):
        sum_vector += vector.toArray()
        
    # convert to sparse vector   
    sparse_vector = SparseVector(vector_size, {i:sum_vector[i] for i in np.nonzero(sum_vector)[0]})
    
    return sparse_vector

In [229]:
# examine two vectors
test_vectors[0]


Out[229]:
SparseVector(19899, {1: 1.2831, 2: 2.2874, 3: 2.6328, 4: 3.9307, 6: 9.2194, 8: 4.7457, 10: 16.1282, 13: 1.8166, 15: 7.399, 19: 3.979, 21: 1.9623, 24: 3.931, 26: 4.7364, 28: 4.1481, 30: 2.1634, 40: 4.6919, 41: 6.9624, 43: 5.4616, 48: 2.5104, 54: 2.4573, 57: 2.4342, 61: 2.4901, 67: 2.6843, 71: 2.7778, 73: 2.5451, 75: 2.4495, 80: 2.5988, 83: 2.7976, 84: 2.6571, 85: 2.6714, 89: 2.7291, 91: 8.1318, 94: 5.4787, 96: 2.7725, 98: 5.465, 101: 2.8673, 104: 5.5843, 107: 5.658, 108: 2.7994, 116: 2.8309, 117: 17.8971, 120: 5.8222, 125: 5.7779, 126: 28.2494, 129: 2.9317, 131: 2.9484, 142: 2.9894, 143: 2.9894, 152: 3.0072, 159: 6.9384, 161: 3.1036, 169: 3.5017, 179: 6.3935, 189: 3.2904, 190: 10.8558, 196: 3.2698, 206: 3.6959, 219: 3.4692, 231: 6.8684, 236: 3.4763, 241: 3.5469, 246: 3.4727, 248: 3.5203, 256: 3.5702, 263: 7.0481, 287: 3.5624, 306: 3.6186, 311: 3.6695, 318: 3.77, 319: 3.7139, 327: 3.7139, 364: 3.7748, 371: 3.8818, 394: 3.9895, 414: 3.9485, 418: 3.909, 463: 4.0138, 524: 8.5907, 531: 4.1377, 577: 4.24, 582: 4.2023, 606: 4.5328, 635: 4.3454, 683: 4.5024, 688: 4.389, 752: 4.5328, 758: 4.5328, 762: 4.6077, 764: 9.4762, 785: 4.5226, 790: 4.6302, 846: 4.9331, 871: 4.6416, 885: 4.9179, 911: 4.7131, 925: 4.7255, 942: 4.8309, 992: 4.8035, 1055: 4.888, 1058: 4.9967, 1149: 4.9804, 1196: 10.2004, 1199: 5.444, 1204: 41.5063, 1293: 5.1757, 1313: 5.1757, 1315: 5.1757, 1433: 5.4699, 1484: 5.3463, 1528: 5.4187, 1617: 5.524, 1797: 5.611, 1801: 5.611, 1873: 5.7402, 1920: 5.7063, 2242: 5.972, 2267: 12.126, 2275: 5.9295, 2337: 6.2172, 2676: 6.1631, 2906: 6.3349, 2975: 6.3995, 3136: 6.3995, 3279: 6.4685, 3290: 6.4685, 3425: 6.5426, 3463: 6.5426, 3569: 6.8049, 4316: 7.0281, 4413: 6.9103, 4794: 7.0281, 5834: 32.0357, 5895: 7.3158, 6541: 7.4981, 6599: 15.4425, 7355: 7.7212})

In [223]:
test_vectors[1]


Out[223]:
SparseVector(19899, {10: 4.6081, 32: 2.221, 101: 2.8673, 174: 3.227, 275: 3.7185, 376: 4.1588, 479: 4.3368, 747: 4.5024, 1999: 5.8494, 4593: 7.0281, 5133: 7.1616, 8129: 7.7212, 9921: 8.0089})

In [265]:
# test sum of first two vectors
sum_vectors(test_vectors)


Out[265]:
SparseVector(19899, {1: 2.5662, 2: 2.2874, 3: 2.6328, 4: 3.9307, 6: 16.9023, 8: 6.3276, 10: 29.9523, 11: 3.3968, 13: 5.4497, 15: 7.399, 19: 3.979, 21: 1.9623, 24: 5.8965, 26: 4.7364, 27: 4.1622, 28: 4.1481, 29: 2.1228, 30: 2.1634, 32: 2.221, 34: 2.304, 40: 4.6919, 41: 6.9624, 43: 5.4616, 45: 2.3518, 48: 2.5104, 52: 2.5536, 53: 4.9171, 54: 2.4573, 57: 2.4342, 60: 2.4955, 61: 2.4901, 66: 2.5825, 67: 5.3686, 69: 7.6736, 70: 2.5884, 71: 2.7778, 73: 2.5451, 75: 2.4495, 77: 2.5767, 79: 2.5958, 80: 2.5988, 83: 2.7976, 84: 2.6571, 85: 2.6714, 88: 5.289, 89: 2.7291, 90: 3.0987, 91: 8.1318, 93: 2.724, 94: 5.4787, 96: 2.7725, 98: 8.1975, 101: 8.6018, 104: 5.5843, 107: 8.487, 108: 2.7994, 113: 3.1414, 116: 2.8309, 117: 17.8971, 120: 5.8222, 125: 5.7779, 126: 34.5271, 129: 5.8633, 131: 2.9484, 134: 2.9633, 140: 3.0027, 142: 2.9894, 143: 2.9894, 150: 3.0768, 152: 3.0072, 159: 6.9384, 161: 6.2073, 169: 10.5052, 174: 6.454, 177: 3.1111, 179: 6.3935, 188: 3.2786, 189: 3.2904, 190: 10.8558, 196: 3.2698, 201: 3.4446, 206: 3.6959, 219: 3.4692, 220: 3.6063, 227: 3.8142, 229: 3.3872, 231: 6.8684, 234: 3.6145, 236: 3.4763, 241: 10.6406, 246: 3.4727, 248: 3.5203, 254: 3.524, 256: 3.5702, 258: 3.5054, 263: 10.5721, 265: 7.0256, 270: 3.8553, 275: 3.7185, 287: 3.5624, 292: 3.6311, 306: 3.6186, 311: 3.6695, 318: 3.77, 319: 3.7139, 327: 3.7139, 329: 3.7509, 335: 3.6914, 362: 3.7845, 364: 3.7748, 371: 3.8818, 375: 3.7604, 376: 8.3175, 383: 3.8764, 387: 4.0903, 392: 3.8193, 394: 3.9895, 406: 3.8926, 412: 4.1103, 414: 3.9485, 418: 3.909, 429: 4.0261, 444: 3.9485, 463: 4.0138, 479: 8.6737, 487: 4.1036, 524: 8.5907, 531: 4.1377, 545: 4.1447, 577: 8.48, 582: 4.2023, 606: 4.5328, 635: 4.3454, 683: 4.5024, 688: 4.389, 729: 9.451, 747: 4.5024, 752: 4.5328, 758: 4.5328, 762: 4.6077, 764: 9.4762, 785: 4.5226, 790: 4.6302, 823: 4.6767, 845: 4.6532, 846: 4.9331, 871: 4.6416, 885: 4.9179, 888: 4.9486, 903: 4.7131, 911: 4.7131, 925: 4.7255, 942: 4.8309, 953: 4.79, 984: 4.8734, 992: 4.8035, 1055: 4.888, 1058: 4.9967, 1104: 5.0471, 1149: 4.9804, 1196: 10.2004, 1199: 5.444, 1204: 41.5063, 1248: 5.1955, 1293: 5.1757, 1313: 5.1757, 1315: 5.1757, 1407: 5.2789, 1433: 5.4699, 1437: 5.3699, 1484: 5.3463, 1492: 5.3463, 1528: 5.4187, 1536: 5.3699, 1588: 5.524, 1616: 5.611, 1617: 5.524, 1639: 5.524, 1792: 5.5812, 1797: 5.611, 1801: 5.611, 1851: 5.611, 1873: 5.7402, 1920: 5.7063, 1999: 5.8494, 2242: 5.972, 2267: 12.126, 2275: 5.9295, 2337: 6.2172, 2401: 5.972, 2466: 6.063, 2658: 6.2172, 2676: 6.1631, 2906: 6.3349, 2975: 6.3995, 3136: 6.3995, 3279: 6.4685, 3290: 6.4685, 3425: 6.5426, 3463: 6.5426, 3569: 6.8049, 3573: 6.8049, 3816: 6.7096, 3831: 6.7096, 4316: 7.0281, 4413: 6.9103, 4593: 7.0281, 4753: 7.0281, 4794: 7.0281, 5133: 7.1616, 5834: 32.0357, 5895: 7.3158, 6083: 7.4981, 6541: 7.4981, 6599: 15.4425, 7355: 7.7212, 7865: 7.7212, 8129: 7.7212, 9921: 8.0089, 12932: 8.4144, 16735: 8.4144, 17767: 8.4144})

Function to sum vectors over DataFrame


In [66]:
from pyspark.mllib.linalg import VectorUDT

# sum vectors
def add_vectors_sum(df):
    # create udf
    sum_vector_udf = udf(lambda vectors: sum_vectors(vectors), VectorUDT())

    # create new column with review
    df_vectors_summed = df.withColumn("tfidf_vectors_sum", sum_vector_udf("vectors")).drop("vectors")
    
    return df_vectors_summed

In [67]:
# test
df_vectors_summed = add_vectors_sum(df_grouped)

df_vectors_summed.show(3)


+----------+-------------------+--------+--------------------+--------------------+
|      asin|           category|positive|             reviews|   tfidf_vectors_sum|
+----------+-------------------+--------+--------------------+--------------------+
|B000068NW5|Musical Instruments|   false|[{"date": 1394755...|(5030,[0,1,2,3,6,...|
|B0002CZTIO|Musical Instruments|   false|[{"date": 1390608...|(5030,[0,6,8,26,4...|
|B0002E3DGC|Musical Instruments|   false|[{"date": 1332374...|(5030,[0,1,8,15,1...|
+----------+-------------------+--------+--------------------+--------------------+
only showing top 3 rows

Add top terms


In [69]:
# get top terms
"""
from sentimentAnalysis.dataProcessing import add_pos_neg_features
df_terms = add_pos_neg_features(df_vectors_summed, vocab_pos, vocab_neg, n=15).drop("tfidf_vectors_sum")
df_terms.show(3)
"""


Out[69]:
'\nfrom sentimentAnalysis.dataProcessing import add_pos_neg_features\ndf_terms = add_pos_neg_features(df_vectors_summed, vocab_pos, vocab_neg, n=15).drop("tfidf_vectors_sum")\ndf_terms.show(3)\n'

In [70]:
def test_extract_top_features(tfidf_vector, vocab, n):
    """
    INPUT: SparseVector, List, Int
    RETURN: List

    Take in TFIDF vector, vocabulary for vector,
    and number of terms. Return top n terms

    """
    # note - tfidf elements are pre-sorted by importance
    term_indices = tfidf_vector.indices[-n:]

    # Map features to terms
    features = [vocab[i] for i in term_indices]

    return features

def test_add_top_features(df, vocab, n=10):
    """
    INPUT: PySpark DataFrame, List, Int
    RETURN: PySpark DataFrame

    Take in DataFrame with TFIDF vectors, list of vocabulary words,
    and number of features to extract. Map top features from TFIDF
    vectors to vocabulary terms. Return new DataFrame with terms

    """
    # Create udf function to extract top n features
    extract_features_udf = udf(lambda x: test_extract_top_features(x, vocab, n))

    # Apply udf, create new df with features column
    df_features = df.withColumn("topFeatures",
                                    extract_features_udf(df["tfidf_vectors_sum"]))


    return df_features

def test_add_pos_neg_features(df, vocab_pos, vocab_neg, n=10):
    """
    INPUT: Spark DataFrame, List, List, Int
    RETURN: Spark DataFrame

    Take in DataFrame grouped by asin, positive with tfidf vectors summed.
    Extract top positive and negative terms from each group, add features column

    """
    # split dataframe on postitive
    df_pos = df.where(df.positive==True)
    df_neg = df.where(df.positive==False)

    # add features
    df_pos_terms = test_add_top_features(df_pos, vocab_pos, n)
    df_neg_terms = test_add_top_features(df_neg, vocab_neg, n)

    return df_pos_terms.unionAll(df_neg_terms)

In [71]:
# add terms
# note - udf function that relies on local module functions does not work b/c module does not exist on workers

df_terms = test_add_pos_neg_features(df_vectors_summed, vocab_pos, vocab_neg, n=15).drop("tfidf_vectors_sum")

Reduce, add posFeatures and negFeatures


In [72]:
df_terms.show(3)


+----------+-------------------+--------+--------------------+--------------------+
|      asin|           category|positive|             reviews|         topFeatures|
+----------+-------------------+--------+--------------------+--------------------+
|B0014IEBM0|Musical Instruments|    true|[{"date": 1251072...|[portables, subtl...|
|B001EC5ECW|Musical Instruments|    true|[{"date": 1385337...|[recommendation, ...|
|B001L8IJ0I|Musical Instruments|    true|[{"date": 1362355...|[mg, dsl, glory, ...|
+----------+-------------------+--------+--------------------+--------------------+
only showing top 3 rows


In [102]:
# group by asin
df_terms_grouped = df_terms.groupBy("asin", "category").agg(collect_list("reviews").alias("reviews"),
                              collect_list("topFeatures").alias("topFeatures"))

df_terms_grouped.show(1)


+----------+-------------------+--------------------+--------------------+
|      asin|           category|             reviews|         topFeatures|
+----------+-------------------+--------------------+--------------------+
|B0002CZSJY|Musical Instruments|[WrappedArray({"d...|[[usese, bubblych...|
+----------+-------------------+--------------------+--------------------+
only showing top 1 row


In [ ]:


In [ ]:


In [148]:
df_terms.select("asin",
                col("reviews").alias("negReviews"), 
                col("topFeatures").alias("negFeatures")).show(3)


+----------+--------------------+--------------------+
|      asin|          negReviews|         negFeatures|
+----------+--------------------+--------------------+
|B0014IEBM0|[{"date": 1251072...|[portables, subtl...|
|B001EC5ECW|[{"date": 1385337...|[recommendation, ...|
|B001L8IJ0I|[{"date": 1362355...|[mg, dsl, glory, ...|
+----------+--------------------+--------------------+
only showing top 3 rows


In [162]:
from pyspark.sql.functions import col

In [189]:
# create review column
def collapse_reviews_terms(df):
    # split dataframe
    df_pos = df.where(df.positive==True)
    df_pos = df_pos.select(col("asin"),
                           col("category"),
                           col("reviews").alias("posReviews"), 
                           col("topFeatures").alias("posFeatures"))
    
    df_neg = df.where(df.positive==False)
    df_neg = df_neg.select(col("asin"),
                           col("reviews").alias("negReviews"), 
                           col("topFeatures").alias("negFeatures"))
    
    
    # get asin 
    df_asin = df.select("asin").distinct()
    
    
    # join dataframes
    df_final = df_asin.join(df_pos, df_asin.asin==df_neg.asin, 'outer').drop(df_pos.asin)
    df_final = df_final.join(df_neg, df_final.asin==df_neg.asin, 'outer').drop(df_neg.asin)
    
    
    return df_final

In [ ]:


In [190]:
df_final = collapse_reviews_terms(df_terms)

In [192]:
df_final.show(3)


+----------+-------------------+--------------------+--------------------+--------------------+--------------------+
|      asin|           category|          posReviews|         posFeatures|          negReviews|         negFeatures|
+----------+-------------------+--------------------+--------------------+--------------------+--------------------+
|B000MWWT6E|Musical Instruments|[{"date": 1275696...|[dang, pony, tick...|                null|                null|
|B000P5NXWM|Musical Instruments|[{"date": 1301702...|[replacements, sa...|[{"date": 1374969...|[tone, purchased,...|
|B000RNB720|Musical Instruments|[{"date": 1383264...|[investigated, 7a...|[{"date": 1376006...|[can't, big, usin...|
+----------+-------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows


In [ ]:


In [ ]:


Function


In [ ]:
# modules needed
#from pyspark.sql.functions import udf
#from pyspark.sql.types import ArrayType, StringType
#from sentimentAnalysis import dataProcessing as dp

In [ ]:


In [ ]:


Clear Session


In [193]:
# unpersist old dataframes
df_cat.unpersist()


Out[193]:
DataFrame[asin: string, overall: double, reviewerName: string, unixReviewTime: bigint, reviewText: string, category: string]

In [195]:
# end session
spark.stop()

In [ ]: