Movielens predictions using pyspark and mllib

Define imports and some initial variables

In [1]:
from hdfs import InsecureClient
from pyspark import SparkContext, SparkConf
import urllib
import zipfile

In [2]:
# the all important Spark context
conf = (SparkConf()
        .setAppName('Movielens Prediction Model')
sc = SparkContext(conf=conf)

In [3]:
# set to True to redownload the data and retrain the prediction model
retrain_model = True

# data source URLs
dataset_url = ''
small_dataset_url = dataset_url + '/'
complete_dataset_url = dataset_url + '/'

# data local file system destination names
datasets_path = '/home/ste328/pyspark/movielens'
small_dataset_path = datasets_path +  '/ml-latest-small'
complete_dataset_path = datasets_path + '/ml-latest'
small_dataset_zip = small_dataset_path +  '.zip'
complete_dataset_zip = complete_dataset_path + '.zip'

# data HDFS paths
datasets_hdfs_path = '/user/ste328/spark/movielens'

# HDFS client
client = InsecureClient('http://devctlvhadapp02.iteclientsys.local:50070', user='ste328')

Retrieve the latest movie data and write it to the local file system

In [4]:
    # Retrieve the data archives to local storage
    (small_dataset_filename, small_dataset_headers) = urllib.urlretrieve(small_dataset_url, small_dataset_zip)
    (complete_dataset_filename, complete_dataset_headers) = urllib.urlretrieve(complete_dataset_url, complete_dataset_zip)
    print small_dataset_filename
    print complete_dataset_filename
    # Unzip the files
    with zipfile.ZipFile(small_dataset_filename, 'r') as z:
    with zipfile.ZipFile(complete_dataset_filename, 'r') as z:
    # Copy the unzipped files to HDFS
    small_dataset_hdfs_path = client.upload(datasets_hdfs_path, small_dataset_path, overwrite=True)
    complete_dataset_hdfs_path = client.upload(datasets_hdfs_path, complete_dataset_path, overwrite=True)
    small_dataset_hdfs_path = '/user/ste328/spark/movielens/ml-latest-small'
    complete_dataset_hdfs_path = '/user/ste328/spark/movielens/ml-latest'

print small_dataset_hdfs_path
print complete_dataset_hdfs_path


Read in the small data

In [5]:
    # ('userId', 'movieId', 'rating', 'timestamp')
    small_ratings_raw_data = sc.textFile(small_dataset_hdfs_path + '/ratings.csv')
    small_ratings_raw_data_header = small_ratings_raw_data.take(1)[0]
    small_ratings_data = small_ratings_raw_data\
        .filter(lambda line: line != small_ratings_raw_data_header)\
        .map(lambda line: line.split(","))\
        .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2])))\
        .cache().coalesce(1000, shuffle=True)
    print small_ratings_data.take(1)

[(1, 16, 4.0)]

Split the small ratings data into training, validation, & test

In [6]:
    # training 60%, validation 20%, test 20%
    (training_RDD, validation_RDD, test_RDD) = small_ratings_data.randomSplit([6, 2, 2], seed=0L)
    # remove 'rating' for validation and test predictions
    validation_for_predict_RDD = x: (x[0], x[1]))
    test_for_predict_RDD = x: (x[0], x[1]))

    print training_RDD.take(1)
    print validation_RDD.take(1)
    print test_RDD.take(1)
    print validation_for_predict_RDD.take(1)
    print test_for_predict_RDD.take(1)

[(1, 24, 1.5)]
[(1, 47, 4.0)]
[(1, 16, 4.0)]
[(1, 47)]
[(1, 16)]

Train the predictions model using the Alternating Least Squares (ALS) algorithm

In [7]:
from pyspark.mllib.recommendation import ALS
import math
import numpy as np

In [8]:
    seed = 5L
    iterations = 15
    regularization_parameters = np.linspace(0.1, 0.25, 4, dtype=float)
    ranks = np.linspace(2, 5, 4, dtype=int)
    min_error = float('inf') #infinity
    best_rank = -1
    best_regularization_parameter = -1

    for regularization_parameter in regularization_parameters:
        for rank in ranks:
            model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
            predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
            rates_and_preds = r: ((r[0], r[1]), r[2])).join(predictions)
            error = math.sqrt( r: (r[1][0] - r[1][1])**2).mean())
            print 'For regularization parameter %s and rank %s the RMSE is %s' % (regularization_parameter, rank, error)
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization_parameter = regularization_parameter

    print 'The best model was trained with regularization parameter %s and rank %s' % (best_regularization_parameter, best_rank)

For regularization parameter 0.1 and rank 2 the RMSE is 0.91811132541
For regularization parameter 0.1 and rank 3 the RMSE is 0.915280020988
For regularization parameter 0.1 and rank 4 the RMSE is 0.922031953997
For regularization parameter 0.1 and rank 5 the RMSE is 0.927868038721
For regularization parameter 0.15 and rank 2 the RMSE is 0.913719406416
For regularization parameter 0.15 and rank 3 the RMSE is 0.904759451544
For regularization parameter 0.15 and rank 4 the RMSE is 0.90579008604
For regularization parameter 0.15 and rank 5 the RMSE is 0.907780847352
For regularization parameter 0.2 and rank 2 the RMSE is 0.915021152829
For regularization parameter 0.2 and rank 3 the RMSE is 0.905806135304
For regularization parameter 0.2 and rank 4 the RMSE is 0.905285841847
For regularization parameter 0.2 and rank 5 the RMSE is 0.906176554803
For regularization parameter 0.25 and rank 2 the RMSE is 0.921276046909
For regularization parameter 0.25 and rank 3 the RMSE is 0.914804680867
For regularization parameter 0.25 and rank 4 the RMSE is 0.913754074497
For regularization parameter 0.25 and rank 5 the RMSE is 0.914774392693
The best model was trained with regularization parameter 0.15 and rank 3

Test the best ranked model

In [9]:
    model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations, lambda_=best_regularization_parameter)
    predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = r: ((r[0], r[1]), r[2])).join(predictions)
    error = math.sqrt( r: (r[1][0] - r[1][1])**2).mean())

    print 'For testing data the RMSE is %s' % (error)

For testing data the RMSE is 0.901231695512

Reset variables and uncache the small dataset to conserve memory.

In [10]:
    %reset_selective -f small_ratings_raw_data
    %reset_selective -f small_ratings_raw_data_header
    %reset_selective -f small_ratings_data
    %reset_selective -f training_RDD
    %reset_selective -f validation_RDD
    %reset_selective -f test_RDD
    %reset_selective -f validation_for_predict_RDD
    %reset_selective -f test_for_predict_RDD
    %reset_selective -f model
    %reset_selective -f predictions
    %reset_selective -f rates_and_preds

Read in the complete ratings dataset

(NOTE: make sure Spark is running in YARN cluster or client mode (using Python 2.7) or likely Java will run out of heap.)

In [11]:
# ('userId', 'movieId', 'rating', 'timestamp')
complete_ratings_raw_data = sc.textFile(complete_dataset_hdfs_path + '/ratings.csv')
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]
# Create more partitions for this RDD to save on memory usage.
complete_ratings_data_RDD = complete_ratings_raw_data\
    .filter(lambda line: line != complete_ratings_raw_data_header)\
    .map(lambda line: line.split(","))\
    .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache().coalesce(1000, shuffle=True)

print "There are %s recommendations in the complete dataset" % (complete_ratings_data_RDD.count())
print complete_ratings_data_RDD.take(1)

There are 22884377 recommendations in the complete dataset
[(1, 169, 2.5)]

Read in the complete movies dataset

In [12]:
# ('movieId', 'title', 'genres')
complete_movies_raw_data = sc.textFile(complete_dataset_hdfs_path + '/movies.csv')
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]
# Create more partitions for this RDD to save on memory usage.
complete_movies_data_RDD = complete_movies_raw_data\
    .filter(lambda line: line != complete_movies_raw_data_header)\
    .map(lambda line: line.split(","))\
    .map(lambda tokens: (int(tokens[0]), tokens[1])).cache().coalesce(1000, shuffle=True)

print "There are %s movies in the complete dataset" % (complete_movies_data_RDD.count())
print complete_movies_data_RDD.take(1)

There are 34208 movies in the complete dataset
[(1, u'Toy Story (1995)')]

Count and average the ratings and join them to the movies (for prediction selection)

In [13]:
def get_counts_and_averages(movieID_and_ratings_tuple):
    num_ratings = len(movieID_and_ratings_tuple[1])
    # (movieId, (count, average))
    return movieID_and_ratings_tuple[0], (num_ratings, float(sum(movieID_and_ratings_tuple[1]))/num_ratings)

In [14]:
# ('userId', 'movieId', 'rating', 'timestamp')
movie_ID_with_ratings_RDD = x: (x[1], x[2])).groupByKey()
movie_ID_with_ratings_aggregates_RDD =

[(32770, (156, 3.6538461538461537))]

In [15]:
complete_movies_with_aggregates_RDD = complete_movies_data_RDD.join(movie_ID_with_ratings_aggregates_RDD)

[(106498, (u'"Magic Voyage of Sindbad', (7, 3.2142857142857144)))]

Train the final commender model using the complete ratings dataset

In [16]:
retrain_model = True # why is this variable lost?
    training_RDD, test_RDD = complete_ratings_data_RDD.randomSplit([7, 3], seed=0L)
    complete_model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,

Save and reload the recommendation model

In [17]:
from pyspark.mllib.recommendation import MatrixFactorizationModel as mfm

model_path = '/user/ste328/spark/movielens/models/als'

    client.delete(model_path, True), model_path)
    %reset_selective -f complete_model

complete_model = mfm.load(sc, model_path)

Get sample recommendations for a user

Get a tuple of user ID and movie ID for movies not rated by this sample user

In [18]:
user_ID = 470
# ('userId', 'movieId', 'rating', 'timestamp')
user_unrated_movies_RDD = complete_ratings_data_RDD\
    .filter(lambda x: x[0] != user_ID)\
    .map(lambda x: (user_ID, x[1]))\

[(470, 133379)]
Get predictions

In [19]:
user_movie_predictions_RDD = complete_model.predictAll(user_unrated_movies_RDD)

[Rating(user=470, product=80928, rating=4.034831279303006)]
Join the movie predictions with their titles, ratings counts, and ratings average.

In [20]:
movie_predictions_RDD = user_movie_predictions_RDD\
    .map(lambda x: (x.product, x.rating))\

[(98304, (3.996685905270772, (u'So Big! (1932)', (3, 2.8333333333333335))))]
Flatten out the nested tuples, take only movie predictions with more than 25 ratings and where the predicted rating is > the average rating, and take the 10 best average rated movies

In [21]:
# (0=movie prediction, 1=rating prediction, 2=title, 3=count, 4=average)
movie_predictions_flat_RDD = movie_predictions_RDD\
    .map(lambda x: (x[0], x[1][0], x[1][1][0], x[1][1][1][0], x[1][1][1][1]))\
    .filter(lambda x: x[3] >= 25 and x[1] > x[4])\
    .takeOrdered(10, key = lambda x: -x[4])

print '\n'.join(map(str,movie_predictions_flat_RDD))

(318, 5.014100407391223, u'"Shawshank Redemption', 77887, 4.441710426643727)
(858, 4.93947100819213, u'"Godfather', 49846, 4.35363920876299)
(50, 4.8723661201458, u'"Usual Suspects', 53195, 4.318986746874706)
(527, 4.949440898634176, u"Schindler's List (1993)", 59857, 4.2909517683813085)
(142115, 5.327760888663839, u'The Blue Planet (2001)', 30, 4.283333333333333)
(140737, 4.8364313591299215, u'The Lost Room (2006)', 73, 4.280821917808219)
(1221, 4.822264059094632, u'"Godfather: Part II', 32247, 4.268877725059696)
(2019, 4.851535806927216, u'Seven Samurai (Shichinin no samurai) (1954)', 12753, 4.262134399749079)
(904, 4.903571401818233, u'Rear Window (1954)', 19422, 4.246987951807229)
(1193, 4.8380592369765445, u"One Flew Over the Cuckoo's Nest (1975)", 35832, 4.24245088189328)

