In [1]:
from recommender import Recommender
from pyspark.sql import functions as F
import numpy as np
In [2]:
# Load restaurant reviews
reviews_df = spark.read.parquet('../data/ratings_ugt10_igt10')
# Randomly split data into train and test datasets
train_df, test_df = reviews_df.randomSplit(weights=[0.75, 0.25])
print(train_df.printSchema())
In [3]:
estimator = Recommender(
useALS=True,
useBias=True,
lambda_1=7,
lambda_2=12,
userCol='user',
itemCol='item',
ratingCol='rating',
rank=76,
regParam=0.7,
maxIter=15,
nonnegative=False
)
model = estimator.fit(train_df)
train_predictions_df = model.transform(train_df)
test_predictions_df = model.transform(test_df)
print(test_predictions_df.printSchema())
In [4]:
train_predictions_df.registerTempTable('train_predictions_df')
test_predictions_df.registerTempTable('test_predictions_df')
df1 = spark.sql(
'''
select
user,
item,
rating,
prediction,
row_number() over (
partition by user
order by prediction desc
) as pred_row_num,
row_number() over (
partition by user
order by rating desc
) as actual_row_num
from train_predictions_df
where user = 3000
order by pred_row_num
'''
)
df2 = spark.sql(
'''
select
user,
item,
rating,
prediction,
row_number() over (
partition by user
order by prediction desc
) as pred_row_num,
row_number() over (
partition by user
order by rating desc
) as actual_row_num
from test_predictions_df
where user = 3000
order by pred_row_num
'''
)
print(df1.show(100))
print(df2.show(100))
In [6]:
user_id = 3000
new_user_df = spark.sql(
'''
select
user,
item,
rating,
prediction as orig_prediction
from train_predictions_df
where user = {}
'''.format(user_id)
)
new_user_validate_df = spark.sql(
'''
select
user,
item,
rating,
prediction as orig_prediction
from test_predictions_df
where user = {}
'''.format(user_id)
)
print(new_user_df.show(100))
print(new_user_validate_df.show(100))
In [7]:
# Pull out the item H matrix
item_factors_df = model.itemFactors
user_factors_df = model.userFactors.filter('id={}'.format(user_id))
user_factors = np.array(user_factors_df.collect()[0]['features'])
print(len(user_factors))
print(user_factors)
filtered_item_factors_df = item_factors_df.join(new_user_df, F.col('id') == new_user_df['item'])
print(filtered_item_factors_df.show(100))
In [14]:
rating_stats_df = model.rating_stats_df
item_bias_df = model.item_bias_df
filtered_item_factors_df2 = (
filtered_item_factors_df
.crossJoin(rating_stats_df)
.join(item_bias_df, on='item')
.withColumn(
'orig_rating',
F.col('rating')
)
.withColumn(
'rating',
F.col('rating')
- F.col('avg_rating')
- F.col('item_bias')
)
)
(
filtered_item_factors_df2
.select(
'item', 'user', 'rating', 'orig_prediction',
'avg_rating', #'stddev_rating',
'item_bias', 'avg_diffs_item_rating',
'stderr_diffs_item_rating', 'stddev_diffs_item_rating',
'count_item_rating', 'orig_rating'
)
.show(100, truncate=False)
)
In [20]:
filtered_item_factors = []
item_ratings = []
for row in filtered_item_factors_df2.collect():
filtered_item_factors.append(row['features'])
item_ratings.append(row['rating'])
filtered_item_factors = np.array(filtered_item_factors)
item_ratings = np.array(item_ratings)
print(filtered_item_factors.shape)
print(filtered_item_factors)
print(item_ratings.shape)
print(item_ratings)
In [21]:
new_user_factors = np.dot(item_ratings, filtered_item_factors) / sum(item_ratings)
print(sum(item_ratings), item_ratings.mean())
print((new_user_factors / user_factors).mean())
# 35 * 3.5 ~ 122.6 # user 3000, sum(ratings) = 120, avg(ratings) = 3.4285714
# 40 * 3.5 ~ 142.4 # user 3001, sum(ratings) = 143, avg(ratings) = 3.575
# 33 * 4.2 ~ 138.3 # user 3002, sum(ratings) = 144, avg(ratings) = 4.3636
print(user_factors.shape)
print(user_factors)
print(new_user_factors.shape)
print(new_user_factors)
print(new_user_factors / user_factors)
In [22]:
# make predictions for "new user"
item_factors = []
item_ids = []
for row in item_factors_df.collect():
item_factors.append(row['features'])
item_ids.append(row['id'])
item_factors = np.array(item_factors)
item_ids = np.array(item_ids)
print(item_factors.shape)
print(item_ids.shape)
new_predictions = np.dot(new_user_factors, item_factors.T)
print(new_predictions.shape)
print(new_predictions)
In [23]:
new_prediction_df = spark.createDataFrame(zip(item_ids.tolist(), new_predictions.tolist()), ['item', 'prediction'])
In [24]:
new_prediction_df.count()
Out[24]:
In [26]:
new_predicted_rating_df = (
new_prediction_df
.crossJoin(avg_rating_df)
.join(item_bias_df, on='item')
.withColumn(
'prediction',
F.col('prediction')
+ F.col('avg_rating')
+ F.col('item_bias')
- 5.0
)
)
In [29]:
new_predicted_rating_df.registerTempTable('new_predicted_rating_df')
new_user_df.registerTempTable('new_user_df')
new_user_validate_df.registerTempTable('new_user_validate_df')
compare_df = spark.sql(
'''
select
n.item, n.user, n.rating, n.orig_prediction, p.prediction, p.prediction - n.orig_prediction as diff,
row_number() over (
partition by n.user
order by n.rating desc
) as actual_row_num,
row_number() over (
partition by n.user
order by n.orig_prediction desc
) as orig_row_num,
row_number() over (
partition by n.user
order by p.prediction desc
) as new_row_num
from new_user_df n
join new_predicted_rating_df p on n.item = p.item
order by new_row_num
'''
)
compare_validate_df = spark.sql(
'''
select
n.item, n.user, n.rating, n.orig_prediction, p.prediction, p.prediction - n.orig_prediction as diff,
row_number() over (
partition by n.user
order by n.rating desc
) as actual_row_num,
row_number() over (
partition by n.user
order by n.orig_prediction desc
) as orig_row_num,
row_number() over (
partition by n.user
order by p.prediction desc
) as new_row_num
from new_user_validate_df n
join new_predicted_rating_df p on n.item = p.item
order by new_row_num
'''
)
print(compare_df.show(100))
print(compare_validate_df.show(100))
In [33]:
discount_factor_df = (
reviews_df
.groupBy('item')
.count()
.select(
F.col('item'),
F.col('count').alias('num_ratings'),
(1 - (1 / F.sqrt(F.col('count')))).alias('discount_factor')
)
)
discount_factor_df.show(20)
In [ ]: