Recommendation

1. Read Movielens 1 Million Data (Medium)

2. Partition data into Train, Validation & Test Datasets (60-20-20)

3. Train ALS Recommender

4. Measure Model Performance

5. Optimize Model parameters viz. Rank, Lambda & Number Of Iterations based on the Validation Dataset

6. Predict optmized Model performance (RMSE) on the Test Data


In [10]:
from pyspark.context import SparkContext
print "Running Spark Version %s" % (sc.version)


Running Spark Version 1.4.1

In [11]:
from pyspark.conf import SparkConf
conf = SparkConf()
print conf.toDebugString()


spark.app.name=pyspark-shell
spark.files=file:/Users/ksankar/.ivy2/jars/com.databricks_spark-csv_2.11-1.0.3.jar,file:/Users/ksankar/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar
spark.jars=file:/Users/ksankar/.ivy2/jars/com.databricks_spark-csv_2.11-1.0.3.jar,file:/Users/ksankar/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar
spark.master=local[*]
spark.submit.pyFiles=/Users/ksankar/.ivy2/jars/com.databricks_spark-csv_2.11-1.0.3.jar,/Users/ksankar/.ivy2/jars/org.apache.commons_commons-csv-1.1.jar

In [12]:
movies_file = sc.textFile("movielens/medium/movies.dat")
movies_rdd = movies_file.map(lambda line: line.split('::'))
movies_rdd.count()


Out[12]:
3883

In [13]:
movies_rdd.first()


Out[13]:
[u'1', u'Toy Story (1995)', u"Animation|Children's|Comedy"]

In [14]:
ratings_file = sc.textFile("movielens/medium/ratings.dat")
ratings_rdd = ratings_file.map(lambda line: line.split('::'))
ratings_rdd.count()


Out[14]:
1000209

In [15]:
ratings_rdd.first()


Out[15]:
[u'1', u'1193', u'5', u'978300760']

In [16]:
def parse_ratings(x):
    user_id = int(x[0])
    movie_id = int(x[1])
    rating = float(x[2])
    timestamp = int(x[3])/10
    return [user_id,movie_id,rating,timestamp]

In [17]:
ratings_rdd_01 = ratings_rdd.map(lambda x: parse_ratings(x))
ratings_rdd_01.count()


Out[17]:
1000209

In [18]:
ratings_rdd_01.first()


Out[18]:
[1, 1193, 5.0, 97830076]

In [19]:
numRatings = ratings_rdd_01.count()
numUsers = ratings_rdd_01.map(lambda r: r[0]).distinct().count()
numMovies = ratings_rdd_01.map(lambda r: r[1]).distinct().count()

print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)


Got 1000209 ratings from 6040 users on 3706 movies.

A quick scheme to partition the training, validation & test datasets

Timestamp ending with [6,8) = Validation

Timestamp ending with [8,9] = Test (ie >= 8)

Rest = Train

Approx: Training = 60%, Validation = 20%, Test = 20%

Coding Exercise

Partition Data


In [20]:
import time
start_time = time.time()
training = ratings_rdd_01.filter(lambda x: (x[3] % 10) < 6)
validation = ratings_rdd_01.filter(lambda x: (x[3] % 10) >= 6 and (x[3] % 10) < 8)
test = ratings_rdd_01.filter(lambda x: (x[3] % 10) >= 8)
numTraining = training.count()
numValidation = validation.count()
numTest = test.count()
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)
print "Elapsed : %f" % (time.time() - start_time)


Training: 600069, validation: 199985, test: 200155
Elapsed : 27.115611

In [21]:
from pyspark.mllib.recommendation import ALS
rank = 10
numIterations = 20
train_data = training.map(lambda p: (p[0], p[1], p[2]))
start_time = time.time()
model = ALS.train(train_data, rank, numIterations)
print "Elapsed : %f" % (time.time() - start_time)
print model


Elapsed : 33.156755
<pyspark.mllib.recommendation.MatrixFactorizationModel object at 0x1036e0d10>

In order to calculate model performance we need a keypair with key=(userID, movieID), value=(pred,actual)

Then we can do calculations on the Predicted vs Actual values


In [22]:
# Evaluate the model on validation data
validation_data = validation.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(validation_data).map(lambda r: ((r[0], r[1]), r[2]))
predictions.count()


Out[22]:
199943

In [23]:
predictions.first()


Out[23]:
((5428, 1084), 4.931647896469278)

Now let us turn the Validation data to KV pair


In [24]:
validation_data.first()


Out[24]:
(1, 1193)

In [25]:
validation.first()


Out[25]:
[1, 1193, 5.0, 97830076]

In [26]:
validation_key_rdd = validation.map(lambda r: ((r[0], r[1]), r[2]))
print validation_key_rdd.count()
validation_key_rdd.first()


199985
Out[26]:
((1, 1193), 5.0)

In [27]:
#ratesAndPreds = validation.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
ratesAndPreds = validation_key_rdd.join(predictions)
ratesAndPreds.count()


Out[27]:
199943

In [28]:
ratesAndPreds.first()


Out[28]:
((2026, 3364), (4.0, 3.861367532170932))

Now we have the values where we want them !


In [29]:
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y)/ratesAndPreds.count()
print("Mean Squared Error = " + str(MSE))


Mean Squared Error = 0.882677652449

In [30]:
# 1.4.0 Mean Squared Error = 0.876346112824
# 1.3.0 Mean Squared Error = 0.871456869392
# 1.2.1 Mean Squared Error = 0.877305629074

Advanced - to try later *** system will hang if it has less memory

Validation Run

Let us use the Validation Data to optimize Rank, Lambda & Number Of Iterations

And Predict the model performance using our test data


In [31]:
def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

In [32]:
import itertools
from math import sqrt
from operator import add
ranks = [8, 12]
lambdas = [0.1, 1.0, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1
start_time = time.time()
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
    model = ALS.train(train_data, rank, numIter, lmbda)
    validationRmse = computeRmse(model, validation, numValidation)
    print "RMSE (validation) = %f for the model trained with " % validationRmse + \
          "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
    if (validationRmse < bestValidationRmse):
        bestModel = model
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lmbda
        bestNumIter = numIter

testRmse = computeRmse(bestModel, test, numTest)

# evaluate the best model on the test set
print "Best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
  + "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)
print "Elapsed : %f" % (time.time() - start_time)


RMSE (validation) = 0.877626 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.870811 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
RMSE (validation) = 1.356134 for the model trained with rank = 8, lambda = 1.0, and numIter = 10.
RMSE (validation) = 1.356134 for the model trained with rank = 8, lambda = 1.0, and numIter = 20.
RMSE (validation) = 3.749258 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.749258 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE (validation) = 0.876632 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.869355 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.
RMSE (validation) = 1.356134 for the model trained with rank = 12, lambda = 1.0, and numIter = 10.
RMSE (validation) = 1.356134 for the model trained with rank = 12, lambda = 1.0, and numIter = 20.
RMSE (validation) = 3.749258 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.749258 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.
The best model was trained with rank = 12 and lambda = 0.1, and numIter = 20, and its RMSE on the test set is 0.868974.
Elapsed : 464.607150

In [ ]: