In [ ]:
reviews = (spark
    .read
    .json('./data/raw_data/reviews_Musical_Instruments_5.json.gz'))

In [1]:
metadata = (spark
    .read
    .json('./data/metadata/meta_Musical_Instruments.json.gz')
    .filter('''
        categories is not null 
        and related is not null'''))

In [2]:
product_list = reviews.groupBy('asin').count()
product_list.sort('count', ascending = False).toPandas().head(10)


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-2-87cbaf399dcb> in <module>()
----> 1 product_list = reviews.groupBy('asin').count()
      2 product_list.sort('count', ascending = False).toPandas().head(10)

NameError: name 'reviews' is not defined

In [ ]:
reviews_per_reviewer = reviews.groupBy('reviewerID').count()

In [ ]:
from pyspark.sql.functions import col, udf, avg
from pyspark.sql.types import DoubleType

usefulness_ratio = udf(
    lambda (useful, out_of): useful / float(out_of + 1), 
    returnType=DoubleType())

usefulness = (reviews
  .select('reviewerID', usefulness_ratio(col('helpful')).alias('usefulness'))
  .groupBy('reviewerID')
  .agg(avg(col('usefulness')).alias('usefulness')))

In [ ]:
rankings = (usefulness
    .join(reviews_per_reviewer, 'reviewerID')
    .select('reviewerID', (col('usefulness') * col('count')).alias('rank'))
    .filter(col('rank') > 1))

In [ ]:
most_reviewed_product = reviews.groupBy('asin').count().sort('count', ascending=False).take(1)[0][0]

best_reviewers = (reviews
   .filter(col('asin') == most_reviewed_product)
   .join(rankings, 'reviewerID')
   .select('asin', 'reviewerID', 'rank', 'summary'))

good_reviews = best_reviewers.filter(col('overall') > 3).sort('rank', ascending=False)

bad_reviews = best_reviewers.filter(col('overall') <= 3).sort('rank', ascending=False)

In [ ]:
bad_reviews.toPandas().head(10)

In [ ]:
meta.toPandas().head(10

In [ ]:
meta_with_related = meta.filter('related is not null')

In [ ]:
from pyspark.sql.functions import udf, col

last = udf(lambda categories: categories[0][-1])

product_to_category = meta.select('asin', last(col('categories')).alias('category'))

In [ ]:
product_to_category.filter('asin = "0006428320"').show()

In [ ]:
sheet_music_folders = (product_to_category
 .sort('category', acending=False)
 .limit(90000)
#  .filter(col('category') == 'Sheet Music Folders')
)
sheet_music_folders.count()

In [ ]:
saddles = product_to_category.sort('category', ascending=False).limit(90000).filter('category = "Saddles"').select('asin')

In [ ]:
reviews.join(saddles, reviews.asin == saddles.asin).count()

In [ ]:
reviews.groupBy('reviewerID').count().sort('count', ascending= False).show()

In [ ]:
reviews.select('asin').distinct().count()

In [ ]:
print(meta.select('asin').distinct().count())
print(reviews.count())

In [ ]:
products = reviews.select(col('asin').alias('asin2')).distinct()
refined_dictionary = (products 
 .join(sheet_music_folders, products.asin2 == sheet_music_folders.asin)
 .select('asin', 'category'))

In [ ]:
refined_dictionary.show()

In [ ]:
meta.select('asin', col('related'))

In [ ]:
products_in_same_subcategory = meta.select('asin', same_category(col('related'), col('asin')))

In [ ]:
# find related products from meta data

related_reviews = (meta
   .filter(col('asin') == 'B0002E1G5C')
   .select('related'))
related_reviews.show(truncate = False)

In [ ]:
# extract the category of the products

categories = (meta
   .filter(col('asin') == 'B0002E1G5C')
   .select('categories'))
categories.show(truncate = False)