In [10]:
from pyspark.context import SparkContext
print "Running Spark Version %s" % (sc.version)
In [11]:
from pyspark.conf import SparkConf
conf = SparkConf()
print conf.toDebugString()
In [12]:
movies_file = sc.textFile("movielens/medium/movies.dat")
movies_rdd = movies_file.map(lambda line: line.split('::'))
movies_rdd.count()
Out[12]:
In [13]:
movies_rdd.first()
Out[13]:
In [14]:
ratings_file = sc.textFile("movielens/medium/ratings.dat")
ratings_rdd = ratings_file.map(lambda line: line.split('::'))
ratings_rdd.count()
Out[14]:
In [15]:
ratings_rdd.first()
Out[15]:
In [16]:
def parse_ratings(x):
user_id = int(x[0])
movie_id = int(x[1])
rating = float(x[2])
timestamp = int(x[3])/10
return [user_id,movie_id,rating,timestamp]
In [17]:
ratings_rdd_01 = ratings_rdd.map(lambda x: parse_ratings(x))
ratings_rdd_01.count()
Out[17]:
In [18]:
ratings_rdd_01.first()
Out[18]:
In [19]:
numRatings = ratings_rdd_01.count()
numUsers = ratings_rdd_01.map(lambda r: r[0]).distinct().count()
numMovies = ratings_rdd_01.map(lambda r: r[1]).distinct().count()
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)
In [20]:
import time
start_time = time.time()
training = ratings_rdd_01.filter(lambda x: (x[3] % 10) < 6)
validation = ratings_rdd_01.filter(lambda x: (x[3] % 10) >= 6 and (x[3] % 10) < 8)
test = ratings_rdd_01.filter(lambda x: (x[3] % 10) >= 8)
numTraining = training.count()
numValidation = validation.count()
numTest = test.count()
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)
print "Elapsed : %f" % (time.time() - start_time)
In [21]:
from pyspark.mllib.recommendation import ALS
rank = 10
numIterations = 20
train_data = training.map(lambda p: (p[0], p[1], p[2]))
start_time = time.time()
model = ALS.train(train_data, rank, numIterations)
print "Elapsed : %f" % (time.time() - start_time)
print model
In [22]:
# Evaluate the model on validation data
validation_data = validation.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(validation_data).map(lambda r: ((r[0], r[1]), r[2]))
predictions.count()
Out[22]:
In [23]:
predictions.first()
Out[23]:
In [24]:
validation_data.first()
Out[24]:
In [25]:
validation.first()
Out[25]:
In [26]:
validation_key_rdd = validation.map(lambda r: ((r[0], r[1]), r[2]))
print validation_key_rdd.count()
validation_key_rdd.first()
Out[26]:
In [27]:
#ratesAndPreds = validation.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
ratesAndPreds = validation_key_rdd.join(predictions)
ratesAndPreds.count()
Out[27]:
In [28]:
ratesAndPreds.first()
Out[28]:
In [29]:
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y)/ratesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
In [30]:
# 1.4.0 Mean Squared Error = 0.876346112824
# 1.3.0 Mean Squared Error = 0.871456869392
# 1.2.1 Mean Squared Error = 0.877305629074
In [31]:
def computeRmse(model, data, n):
"""
Compute RMSE (Root Mean Squared Error).
"""
predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
.join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
.values()
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
In [32]:
import itertools
from math import sqrt
from operator import add
ranks = [8, 12]
lambdas = [0.1, 1.0, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
start_time = time.time()
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
model = ALS.train(train_data, rank, numIter, lmbda)
validationRmse = computeRmse(model, validation, numValidation)
print "RMSE (validation) = %f for the model trained with " % validationRmse + \
"rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
if (validationRmse < bestValidationRmse):
bestModel = model
bestValidationRmse = validationRmse
bestRank = rank
bestLambda = lmbda
bestNumIter = numIter
testRmse = computeRmse(bestModel, test, numTest)
# evaluate the best model on the test set
print "Best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
+ "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)
print "Elapsed : %f" % (time.time() - start_time)
In [ ]: