In [ ]:
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark.mllib.recommendation import ALS

In [ ]:
def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

In [ ]:
def parseMovie(line):
    """
    Parses a movie record in MovieLens format movieId::movieTitle .
    """
    fields = line.strip().split("::")
    return int(fields[0]), fields[1]

In [ ]:
def loadRatings(ratingsFile):
    """
    Load ratings from file.
    """
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
    f.close()
    if not ratings:
        print "No ratings provided."
        sys.exit(1)
    else:
        return ratings

In [ ]:
def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

In [ ]:
# load personal ratings
user_ratings_file = "../personalRatings.txt"
myRatings = loadRatings(user_ratings_file)
myRatingsRDD = sc.parallelize(myRatings)

In [ ]:
# load ratings and movie titles
# ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
ratings_file = "../data/movielens/ratings.dat"
ratings = sc.textFile(ratings_file).map(parseRating)

In [ ]:
# movies is an RDD of (movieId, movieTitle)
movies_file = "../data/movielens/movies.dat"
tf = sc.textFile(movies_file)
movies = dict(sc.textFile(movies_file).map(parseMovie).collect())

In [ ]:
numRatings = ratings.count()
numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
numMovies = ratings.values().map(lambda r: r[1]).distinct().count()
print "Number of ratings: {}\nNumber of users: {}\nNumber of movies: {}".format(numRatings, numUsers, numMovies)

In [ ]:
numPartitions = 6
training = ratings.filter(lambda x: x[0] < 6)\
  .values().union(myRatingsRDD).repartition(numPartitions).cache()
training.take(5)

In [ ]:
validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8)\
  .values().repartition(numPartitions).cache()
validation.take(5)

In [ ]:
test = ratings.filter(lambda x: x[0] >= 8).values().cache()

In [ ]:
numTraining = training.count()
numValidation = validation.count()
numTest = test.count()
print "Training: {}, validation: {}, test: {}".format(numTraining, numValidation, numTest)

In [ ]:
# train models and evaluate them on the validation set

# ranks = [8, 12]
# lambdas = [0.1, 10.0]
# numIters = [10, 20]
ranks = [8]
lambdas = [0.1]
numIters = [10]

bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1

for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
   model = ALS.train(training, rank, numIter, lmbda)
   validationRmse = computeRmse(model, validation, numValidation)
   print ("RMSE (validation) = {} for the model trained with ".format(validationRmse),
       "rank = {}, lambda = {}, and numIter = {}.".format(rank, lmbda, numIter))
   if (validationRmse < bestValidationRmse):
       bestModel = model
       bestValidationRmse = validationRmse
       bestRank = rank
       bestLambda = lmbda
       bestNumIter = numIter

In [ ]:
testRmse = computeRmse(bestModel, test, numTest)
# evaluate the best model on the test set
print """The best model was trained with rank = {} and lambda = {},
    and numIter = {}, and its RMSE on the test set is {}.""".format(bestRank, bestLambda, bestNumIter, testRmse)

In [ ]:
meanRating = training.union(validation).map(lambda x: x[2]).mean()
baselineRmse = sqrt(test.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTest)
improvement = (baselineRmse - testRmse) / baselineRmse * 100
print "The best model improves the baseline by {} %".format(improvement)

In [ ]:
myRatedMovieIds = set([x[1] for x in myRatings])
candidates = sc.parallelize([m for m in movies if m not in myRatedMovieIds])
predictions = bestModel.predictAll(candidates.map(lambda x: (0, x))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]
print "Movies recommended for you:"
for i in xrange(len(recommendations)):
    print ("%2d: %s" % (i + 1, movies[recommendations[i][1]])).encode('ascii', 'ignore')