In [ ]:
reviews = (spark
.read
.json('./data/raw_data/reviews_Musical_Instruments_5.json.gz'))
In [1]:
metadata = (spark
.read
.json('./data/metadata/meta_Musical_Instruments.json.gz')
.filter('''
categories is not null
and related is not null'''))
In [2]:
product_list = reviews.groupBy('asin').count()
product_list.sort('count', ascending = False).toPandas().head(10)
In [ ]:
reviews_per_reviewer = reviews.groupBy('reviewerID').count()
In [ ]:
from pyspark.sql.functions import col, udf, avg
from pyspark.sql.types import DoubleType
usefulness_ratio = udf(
lambda (useful, out_of): useful / float(out_of + 1),
returnType=DoubleType())
usefulness = (reviews
.select('reviewerID', usefulness_ratio(col('helpful')).alias('usefulness'))
.groupBy('reviewerID')
.agg(avg(col('usefulness')).alias('usefulness')))
In [ ]:
rankings = (usefulness
.join(reviews_per_reviewer, 'reviewerID')
.select('reviewerID', (col('usefulness') * col('count')).alias('rank'))
.filter(col('rank') > 1))
In [ ]:
most_reviewed_product = reviews.groupBy('asin').count().sort('count', ascending=False).take(1)[0][0]
best_reviewers = (reviews
.filter(col('asin') == most_reviewed_product)
.join(rankings, 'reviewerID')
.select('asin', 'reviewerID', 'rank', 'summary'))
good_reviews = best_reviewers.filter(col('overall') > 3).sort('rank', ascending=False)
bad_reviews = best_reviewers.filter(col('overall') <= 3).sort('rank', ascending=False)
In [ ]:
bad_reviews.toPandas().head(10)
In [ ]:
meta.toPandas().head(10
In [ ]:
meta_with_related = meta.filter('related is not null')
In [ ]:
from pyspark.sql.functions import udf, col
last = udf(lambda categories: categories[0][-1])
product_to_category = meta.select('asin', last(col('categories')).alias('category'))
In [ ]:
product_to_category.filter('asin = "0006428320"').show()
In [ ]:
sheet_music_folders = (product_to_category
.sort('category', acending=False)
.limit(90000)
# .filter(col('category') == 'Sheet Music Folders')
)
sheet_music_folders.count()
In [ ]:
saddles = product_to_category.sort('category', ascending=False).limit(90000).filter('category = "Saddles"').select('asin')
In [ ]:
reviews.join(saddles, reviews.asin == saddles.asin).count()
In [ ]:
reviews.groupBy('reviewerID').count().sort('count', ascending= False).show()
In [ ]:
reviews.select('asin').distinct().count()
In [ ]:
print(meta.select('asin').distinct().count())
print(reviews.count())
In [ ]:
products = reviews.select(col('asin').alias('asin2')).distinct()
refined_dictionary = (products
.join(sheet_music_folders, products.asin2 == sheet_music_folders.asin)
.select('asin', 'category'))
In [ ]:
refined_dictionary.show()
In [ ]:
meta.select('asin', col('related'))
In [ ]:
products_in_same_subcategory = meta.select('asin', same_category(col('related'), col('asin')))
In [ ]:
# find related products from meta data
related_reviews = (meta
.filter(col('asin') == 'B0002E1G5C')
.select('related'))
related_reviews.show(truncate = False)
In [ ]:
# extract the category of the products
categories = (meta
.filter(col('asin') == 'B0002E1G5C')
.select('categories'))
categories.show(truncate = False)