The idea in this solution is to provide a new feature to the customer which will reduce the need to go through several reviews in order to evaluate a product. In order to achieve that, we will attempt to extract the most predictive words or sentences from the ratings and present them in a nice format (e.g. wordcloud).
In [1]:
all_reviews = (spark
.read
.json('./data/raw_data/reviews_Baby_5.json.gz',)
.na
.fill({ 'reviewerName': 'Unknown' }))
In [2]:
from pyspark.sql.functions import col, expr, udf, trim
from pyspark.sql.types import IntegerType
import re
remove_punctuation = udf(lambda line: re.sub('[^A-Za-z\s]', '', line))
make_binary = udf(lambda rating: 0 if rating in [1, 2] else 1, IntegerType())
reviews = (all_reviews
.filter(col('overall').isin([1, 2, 5]))
.withColumn('label', make_binary(col('overall')))
.select(col('label').cast('int'), remove_punctuation('summary').alias('summary'))
.filter(trim(col('summary')) != ''))
In [3]:
train, test = reviews.randomSplit([.8, .2], seed=5436L)
In [4]:
def multiply_dataset(dataset, n):
return dataset if n <= 1 else dataset.union(multiply_dataset(dataset, n - 1))
In [5]:
reviews_good = train.filter('label == 1')
reviews_bad = train.filter('label == 0')
reviews_bad_multiplied = multiply_dataset(reviews_bad, reviews_good.count() / reviews_bad.count())
train_reviews = reviews_bad_multiplied.union(reviews_good)
In [10]:
accuracy = reviews_good.count() / float(train_reviews.count())
print('Always predicting 5 stars accuracy: {0}'.format(accuracy))
In [14]:
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StopWordsRemover
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.classification import LogisticRegression
tokenizer = Tokenizer(inputCol='summary', outputCol='words')
pipeline = Pipeline(stages=[
tokenizer,
StopWordsRemover(inputCol='words', outputCol='filtered_words')
HashingTF(inputCol='filtered_words', outputCol='rawFeatures', numFeatures=120000),
IDF(inputCol='rawFeatures', outputCol='features'),
LogisticRegression(regParam=.3, elasticNetParam=.01)
])
In [11]:
model = pipeline.fit(train_reviews)
In [12]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
prediction = model.transform(test)
BinaryClassificationEvaluator().evaluate(prediction)
Out[12]:
In [15]:
from pyspark.sql.functions import explode
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
words = (tokenizer
.transform(reviews)
.select(explode(col('words')).alias('summary')))
predictors = (model
.transform(words)
.select(col('summary').alias('word'), 'probability'))
first = udf(lambda x: x[0].item(), FloatType())
second = udf(lambda x: x[1].item(), FloatType())
predictive_words = (predictors
.select(
'word',
second(col('probability')).alias('positive'),
first(col('probability')).alias('negative'))
.groupBy('word')
.agg(
F.max('positive').alias('positive'),
F.max('negative').alias('negative')))
positive_predictive_words = (predictive_words
.select(col('word').alias('positive_word'), col('positive').alias('pos_prob'))
.sort('pos_prob', ascending=False))
negative_predictive_words = (predictive_words
.select(col('word').alias('negative_word'), col('negative').alias('neg_prob'))
.sort('neg_prob', ascending=False))
In [16]:
import pandas as pd
pd.concat([
positive_predictive_words.toPandas().head(n=20),
negative_predictive_words.toPandas().head(n=20) ],
axis=1)
Out[16]:
In [17]:
full_model = pipeline.fit(reviews)
In [18]:
highly_reviewed_products = (all_reviews
.groupBy('asin')
.agg(F.count('asin').alias('count'), F.avg('overall').alias('avg_rating'))
.filter('count > 25'))
In [19]:
best_product = highly_reviewed_products.sort('avg_rating', ascending=False).take(1)[0][0]
worst_product = highly_reviewed_products.sort('avg_rating').take(1)[0][0]
In [20]:
def most_contributing_summaries(product, total_reviews, ranking_model):
reviews = total_reviews.filter(col('asin') == product).select('summary', 'overall')
udf_max = udf(lambda p: max(p.tolist()), FloatType())
summary_ranks = (ranking_model
.transform(reviews)
.select(
'summary',
second(col('probability')).alias('pos_prob')))
pos_summaries = { row[0]: row[1] for row in summary_ranks.sort('pos_prob', ascending=False).take(10) }
neg_summaries = { row[0]: row[1] for row in summary_ranks.sort('pos_prob').take(10) }
return pos_summaries, neg_summaries
In [23]:
from wordcloud import WordCloud
import matplotlib.pyplot as plt
def present_product(product, total_reviews, ranking_model):
pos_summaries, neg_summaries = most_contributing_summaries(product, total_reviews, ranking_model)
pos_wordcloud = WordCloud(background_color='white', max_words=20).fit_words(pos_summaries)
neg_wordcloud = WordCloud(background_color='white', max_words=20).fit_words(neg_summaries)
fig = plt.figure(figsize=(15, 15))
ax = fig.add_subplot(1,2,1)
ax.set_title('Positive summaries')
ax.imshow(pos_wordcloud, interpolation='bilinear')
ax.axis('off')
ax = fig.add_subplot(1,2,2)
ax.set_title('Negative summaries')
ax.imshow(neg_wordcloud, interpolation='bilinear')
ax.axis('off')
plt.show()
In [24]:
present_product(best_product, all_reviews, full_model)
In [25]:
present_product(worst_product, all_reviews, full_model)