In [1]:
import findspark
findspark.init()
from pyspark.mllib.recommendation import ALS, Rating
from pyspark import SparkContext, SQLContext
sc = SparkContext("local", "test")
sqlContext = SQLContext(sc)
In [7]:
TRAIN_FILE = "./data/ratings-train.dat/"
VALIDATION_FILE = "./data/ratings-validation.dat/"
TEST_FILE = "./data/ratings-test.dat/"
In [12]:
def prepare_data(data):
return (
data
.map(lambda l: l.split('::'))
.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
)
In [13]:
# Load and parse the data
ratings_train_text = sc.textFile(TRAIN_FILE)
ratings_train = prepare_data(ratings_train_text)
ratings_train.take(10)
Out[13]:
In [15]:
ratings_validation_text = sc.textFile(VALIDATION_FILE)
ratings_validation = prepare_data(ratings_validation_text)
ratings_validation.take(10)
Out[15]:
In [16]:
ratings_test_text = sc.textFile(TEST_FILE)
ratings_test = prepare_data(ratings_validation_text)
In [17]:
global_mean = ratings_train.map(lambda r: (r[2])).mean()
In [18]:
global_mean
Out[18]:
In [19]:
#convert training data to dataframe with attribute
df = sqlContext.createDataFrame(ratings_train, ['userId', 'movieId', 'ratings'])
In [20]:
#sort the data by movie
df_orderByMovie = df.orderBy(df.movieId)
In [21]:
#group the movie and count each movie
movie_count = df_orderByMovie.groupBy(df_orderByMovie.movieId).count()
In [22]:
#calculate the sum of the ratings of each movie
sum_byMovie = df_orderByMovie.groupBy(['movieId']).sum()
In [23]:
#drop some unrelated column
drop_column1 = sum_byMovie.drop(sum_byMovie[1])
final_drop = drop_column1.drop(drop_column1[1])
In [24]:
#join the sum of count and sum of rating for each movie
movie_sorted = movie_count.join(final_drop, "movieId")
In [25]:
#sorted the dataset by each movie
new_movie_sorted = movie_sorted.orderBy(movie_sorted.movieId)
In [26]:
#calculate item specific bias
item_bias = new_movie_sorted.map(lambda r: [r[0], (r[2] - r[1]*global_mean)/(25+r[1])])
In [27]:
new_item_bias = sqlContext.createDataFrame(item_bias, ['movieId', 'item_bias'])
In [ ]:
In [28]:
#order the training set by user
df_orderByUser = df.orderBy(df.userId)
In [29]:
#join the item bias dataset to with the same movieId
contain_itemBias = df_orderByUser.join(new_item_bias, "movieId")
In [30]:
#sorted the dataset by user
sorted_byUser = contain_itemBias.orderBy(['userId'])
In [31]:
#calculate the numerical part of item specific bais
subtraction = sorted_byUser.map(lambda r: [r[1], r[2] - global_mean - r[3]])
In [32]:
user_bias_part1 = sqlContext.createDataFrame(subtraction, ['userId', 'subtraction'])
In [33]:
sum_byUser = user_bias_part1.groupBy(['userId']).sum()
In [34]:
#count the user
sum_UserCollect = user_bias_part1.groupBy(['userId']).count()
In [35]:
#order the data set by user
ordered_sum_UserCollect = sum_UserCollect.orderBy(sum_UserCollect.userId)
In [36]:
drop_column2 = sum_byUser.drop(sum_byUser[1])
In [37]:
final_drop2 = drop_column2.orderBy(drop_column2.userId)
In [38]:
user_bias_table = final_drop2.join(ordered_sum_UserCollect, 'userId')
In [39]:
ordered_userBiaTable = user_bias_table.orderBy(user_bias_table.userId)
In [40]:
user_bias = ordered_userBiaTable.map(lambda r: [r[0], r[1]/(10+r[2])])
In [41]:
user_specific_bias = sqlContext.createDataFrame(user_bias, ['userId', 'user_bias'])
In [42]:
merge1 = df_orderByUser.join(user_specific_bias, 'userId')
In [43]:
merge2 = merge1.join(new_item_bias, 'movieId')
In [44]:
new_ratings_train = merge2.map(lambda r: [r[0], r[1], r[2] - r[3] - r[4]])
In [45]:
temp = sqlContext.createDataFrame(new_ratings_train, ['movieId', 'userId', 'new_ratings'])
In [46]:
final_new_ratings_train = temp.orderBy(temp.userId)
In [47]:
final_new_ratings_train.take(10)
Out[47]:
In [48]:
#now, we perform the same procedure as task1
#first, we sort the data by timestamp.
new_ratings_byTime = final_new_ratings_train.join(df, ['userId', 'movieId'])
In [ ]:
#example of dataset
new_ratings_byTime.take(20)
In [49]:
new_ratings_byTime = new_ratings_byTime.drop(new_ratings_byTime[3])
In [50]:
def prepare_validation(validation):
return validation.map(lambda p: (p[0], p[1]))
In [51]:
import math
In [53]:
# Evaluate the model on training data
def train_evaluate_als(train, validation, rank, iterations_num, lambda_val):
model = ALS.train(train, rank, iterations_num, lambda_val)
predictions = model.predictAll(prepare_validation(validation)).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = validation.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
RMSE = math.sqrt(MSE)
return MSE, RMSE
In [54]:
ranks = [10, 20, 30, 40, 50]
lambda_values = [0.01,0.1,1.0,10.0]
ITERATIONS = 10
In [55]:
def report_mse_results(rank, lambda_value, mse, rmse):
print("Rank=%d, Lambda=%0.2f, MSE=%s, RMSE=%s" % (rank, lambda_value, mse, rmse))
In [56]:
def evaluate_parameters(train, validation, ranks, lambda_values):
for r in ranks:
for l in lambda_values:
mse, rmse = train_evaluate_als(new_ratings_byTime.rdd, validation, r, ITERATIONS, l)
report_mse_results(r, l, mse, rmse)
In [ ]:
evaluate_parameters(new_ratings_byTime, ratings_validation, ranks, lambda_values)