In [1]:
from hdfs import InsecureClient
from pyspark import SparkContext, SparkConf
import urllib
import zipfile
In [2]:
# the all important Spark context
conf = (SparkConf()
.setMaster('yarn-client')
.setAppName('Movielens Prediction Model')
)
sc = SparkContext(conf=conf)
In [3]:
# set to True to redownload the data and retrain the prediction model
retrain_model = True
# data source URLs
dataset_url = 'http://files.grouplens.org/datasets/movielens'
small_dataset_url = dataset_url + '/ml-latest-small.zip'
complete_dataset_url = dataset_url + '/ml-latest.zip'
# data local file system destination names
datasets_path = '/home/ste328/pyspark/movielens'
small_dataset_path = datasets_path + '/ml-latest-small'
complete_dataset_path = datasets_path + '/ml-latest'
small_dataset_zip = small_dataset_path + '.zip'
complete_dataset_zip = complete_dataset_path + '.zip'
# data HDFS paths
datasets_hdfs_path = '/user/ste328/spark/movielens'
# HDFS client
client = InsecureClient('http://devctlvhadapp02.iteclientsys.local:50070', user='ste328')
In [4]:
if(retrain_model):
# Retrieve the data archives to local storage
(small_dataset_filename, small_dataset_headers) = urllib.urlretrieve(small_dataset_url, small_dataset_zip)
(complete_dataset_filename, complete_dataset_headers) = urllib.urlretrieve(complete_dataset_url, complete_dataset_zip)
print small_dataset_filename
print complete_dataset_filename
# Unzip the files
with zipfile.ZipFile(small_dataset_filename, 'r') as z:
z.extractall(datasets_path)
with zipfile.ZipFile(complete_dataset_filename, 'r') as z:
z.extractall(datasets_path)
# Copy the unzipped files to HDFS
small_dataset_hdfs_path = client.upload(datasets_hdfs_path, small_dataset_path, overwrite=True)
complete_dataset_hdfs_path = client.upload(datasets_hdfs_path, complete_dataset_path, overwrite=True)
else:
small_dataset_hdfs_path = '/user/ste328/spark/movielens/ml-latest-small'
complete_dataset_hdfs_path = '/user/ste328/spark/movielens/ml-latest'
print small_dataset_hdfs_path
print complete_dataset_hdfs_path
In [5]:
if(retrain_model):
# ('userId', 'movieId', 'rating', 'timestamp')
small_ratings_raw_data = sc.textFile(small_dataset_hdfs_path + '/ratings.csv')
small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
small_ratings_data = small_ratings_raw_data\
.filter(lambda line: line != small_ratings_raw_data_header)\
.map(lambda line: line.split(","))\
.map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2])))\
.cache().coalesce(1000, shuffle=True)
print small_ratings_data.take(1)
In [6]:
if(retrain_model):
# training 60%, validation 20%, test 20%
(training_RDD, validation_RDD, test_RDD) = small_ratings_data.randomSplit([6, 2, 2], seed=0L)
# remove 'rating' for validation and test predictions
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))
print training_RDD.take(1)
print validation_RDD.take(1)
print test_RDD.take(1)
print validation_for_predict_RDD.take(1)
print test_for_predict_RDD.take(1)
In [7]:
from pyspark.mllib.recommendation import ALS
import math
import numpy as np
In [8]:
if(retrain_model):
seed = 5L
iterations = 15
regularization_parameters = np.linspace(0.1, 0.25, 4, dtype=float)
ranks = np.linspace(2, 5, 4, dtype=int)
min_error = float('inf') #infinity
best_rank = -1
best_regularization_parameter = -1
for regularization_parameter in regularization_parameters:
for rank in ranks:
model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = validation_RDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print 'For regularization parameter %s and rank %s the RMSE is %s' % (regularization_parameter, rank, error)
if error < min_error:
min_error = error
best_rank = rank
best_regularization_parameter = regularization_parameter
print 'The best model was trained with regularization parameter %s and rank %s' % (best_regularization_parameter, best_rank)
In [9]:
if(retrain_model):
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=best_regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
print 'For testing data the RMSE is %s' % (error)
In [10]:
if(retrain_model):
small_ratings_data.unpersist()
%reset_selective -f small_ratings_raw_data
%reset_selective -f small_ratings_raw_data_header
%reset_selective -f small_ratings_data
%reset_selective -f training_RDD
%reset_selective -f validation_RDD
%reset_selective -f test_RDD
%reset_selective -f validation_for_predict_RDD
%reset_selective -f test_for_predict_RDD
%reset_selective -f model
%reset_selective -f predictions
%reset_selective -f rates_and_preds
In [11]:
# ('userId', 'movieId', 'rating', 'timestamp')
complete_ratings_raw_data = sc.textFile(complete_dataset_hdfs_path + '/ratings.csv')
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]
# Create more partitions for this RDD to save on memory usage.
complete_ratings_data_RDD = complete_ratings_raw_data\
.filter(lambda line: line != complete_ratings_raw_data_header)\
.map(lambda line: line.split(","))\
.map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache().coalesce(1000, shuffle=True)
print "There are %s recommendations in the complete dataset" % (complete_ratings_data_RDD.count())
print complete_ratings_data_RDD.take(1)
In [12]:
# ('movieId', 'title', 'genres')
complete_movies_raw_data = sc.textFile(complete_dataset_hdfs_path + '/movies.csv')
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]
# Create more partitions for this RDD to save on memory usage.
complete_movies_data_RDD = complete_movies_raw_data\
.filter(lambda line: line != complete_movies_raw_data_header)\
.map(lambda line: line.split(","))\
.map(lambda tokens: (int(tokens[0]), tokens[1])).cache().coalesce(1000, shuffle=True)
print "There are %s movies in the complete dataset" % (complete_movies_data_RDD.count())
print complete_movies_data_RDD.take(1)
In [13]:
def get_counts_and_averages(movieID_and_ratings_tuple):
num_ratings = len(movieID_and_ratings_tuple[1])
# (movieId, (count, average))
return movieID_and_ratings_tuple[0], (num_ratings, float(sum(movieID_and_ratings_tuple[1]))/num_ratings)
In [14]:
# ('userId', 'movieId', 'rating', 'timestamp')
movie_ID_with_ratings_RDD = complete_ratings_data_RDD.map(lambda x: (x[1], x[2])).groupByKey()
movie_ID_with_ratings_aggregates_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_ID_with_ratings_aggregates_RDD.take(1)
Out[14]:
In [15]:
complete_movies_with_aggregates_RDD = complete_movies_data_RDD.join(movie_ID_with_ratings_aggregates_RDD)
complete_movies_with_aggregates_RDD.take(1)
Out[15]:
In [16]:
retrain_model = True # why is this variable lost?
if(retrain_model):
training_RDD, test_RDD = complete_ratings_data_RDD.randomSplit([7, 3], seed=0L)
complete_model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
lambda_=best_regularization_parameter)
In [17]:
from pyspark.mllib.recommendation import MatrixFactorizationModel as mfm
model_path = '/user/ste328/spark/movielens/models/als'
if(retrain_model):
client.delete(model_path, True)
complete_model.save(sc, model_path)
%reset_selective -f complete_model
complete_model = mfm.load(sc, model_path)
In [18]:
user_ID = 470
# ('userId', 'movieId', 'rating', 'timestamp')
user_unrated_movies_RDD = complete_ratings_data_RDD\
.filter(lambda x: x[0] != user_ID)\
.map(lambda x: (user_ID, x[1]))\
.distinct()
user_unrated_movies_RDD.take(1)
Out[18]:
In [19]:
user_movie_predictions_RDD = complete_model.predictAll(user_unrated_movies_RDD)
user_movie_predictions_RDD.take(1)
Out[19]:
In [20]:
movie_predictions_RDD = user_movie_predictions_RDD\
.map(lambda x: (x.product, x.rating))\
.join(complete_movies_with_aggregates_RDD)
movie_predictions_RDD.take(1)
Out[20]:
In [21]:
# (0=movie prediction, 1=rating prediction, 2=title, 3=count, 4=average)
movie_predictions_flat_RDD = movie_predictions_RDD\
.map(lambda x: (x[0], x[1][0], x[1][1][0], x[1][1][1][0], x[1][1][1][1]))\
.filter(lambda x: x[3] >= 25 and x[1] > x[4])\
.takeOrdered(10, key = lambda x: -x[4])
print '\n'.join(map(str,movie_predictions_flat_RDD))
In [ ]: