Basic Tutorial


In [4]:
## textFile allows to read a local, s3 or hdfs file. It is not an action
readFile = sc.textFile('/vagrant/data/movietweeting/evaluation/split/method_rnd_value_0.75_from_10_to_100/train/relation.dat')
print readFile


/opt/tmp/movietweeting/evaluation/split/method_rnd_value_0.75_from_10_to_100/train/relation.dat MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

In [5]:
## To see what we have read, we need an action such as "first()". 
## Using first we can visualize the first line of the read file.
print readFile.first()


rating.explicit	9144	1362062838	{"rating":6}	{"subject":"user:3462","object":"movie:1411238"}

In [6]:
## map applies a function to  each line of the RDD one by one. 
## Inn this example, it applies a "split("\t")" function.
## Thus we start with a String and we end up with a list of string.
## Map is an action, thus it takes as input an RDD and return another one.
## We want to remove the end of file line from the file. Thus we use the filter to remove it.
parsedDataRDD = readFile.filter(lambda x: "EOF" not in x).map(lambda x: x.split("\t"))
print parsedDataRDD


PythonRDD[3] at RDD at PythonRDD.scala:43

In [7]:
print parsedDataRDD.first()


[u'rating.explicit', u'9144', u'1362062838', u'{"rating":6}', u'{"subject":"user:3462","object":"movie:1411238"}']

In [8]:
## We want to count the distinct number of users. 
## Thus we need to extract the username, use the distinct() transformation and count() the result
import json
def readJson(data,field):
    return json.loads(data)[field]
usersRDD = parsedDataRDD.map(lambda x: readJson(x[4],"subject"))
print usersRDD.first()


user:3462

In [9]:
numberOfUsers = usersRDD.distinct().count()
print "There are",numberOfUsers,"distinct users!!"


There are 122 distinct users!!

In [10]:
## We want to extract the most popular movie.
## We need to use reduceByKey but it needs a PairRDD: lets build it!
movieRDD = parsedDataRDD.map(lambda x: (readJson(x[4],"object"),1))
print movieRDD.first(),"where", "'"+movieRDD.first()[0]+"'", "is the key and", "'"+str(movieRDD.first()[1])+"'","is a 'fake' value"


(u'movie:1411238', 1) where 'movie:1411238' is the key and '1' is a 'fake' value

In [11]:
## We use the fake value '1' because reducing by key using the movieId key summing up every '1' 
## will give back the times each movie is consumed
## ReduceByKey works this way:
## it finds a couple of line with the same key T1 = (k1,x)      T2 = (k2,y)
## it combine the values with a specified function, in the example it sums them returning a new line T3 = (k1, x+y).
## It goes on until there are no more lines with the same key.
## Using top we can extract the top lines

movieRDD.reduceByKey(lambda x,y: x+y).top(5,key=lambda x: x[1])


Out[11]:
[(u'movie:1623205', 13),
 (u'movie:1024648', 11),
 (u'movie:1790885', 10),
 (u'movie:1907668', 9),
 (u'movie:1707386', 8)]

Advanced Tutorial: collaborative filtering with Spark!!!

Initialization: base methods


In [12]:
## We need to create a reduceByKey function that allows us to append the values in a list.
## To do that, first we need to transform each value in a list with only one element, the value itself.
## We use map(lambda x: createList(x)) for this purpose.
## To append each value of each list to the others, we use lambdalist.
## We need to do that because we cannot know the "state" of each line during a reduceByKey.

def create_list(value, count=1):
    result = list()
    for k in range(count):
        result.append(value)
    return result

def lambdalist(x,y):
    for k in x:
        y.append(k)
    return y

Computation of the cosine similarity values for each couple of movies

Creation of the normalization matrix


In [18]:
import numpy as np
## norm(line) = sqrt ( sum ( a_i ^2)) for a_i from line
## to compute the norm you must divide the process in 4 step:
## start with parsedDataRDD
## 1. parse tha data to have the format (movieID,rating)
## 2. compute the pow2 of each rating (movieID,rating) ==(map)==> (movieID,rating^2)
## 3. sum each rating^2 by key (hint: I would use reduce by key)
## 4. each sum must be root squared
normMatrixRDD = parsedDataRDD ##put your code here


normMatrixRDD.first()

In [19]:
## once we have a normalized lookup PairRDD, we need to create a similar RDD with also the userId field
## Now you should create an RDD with this format (u'movie:1411238', (u'user:3462', 6))
## Start with parsedDataRDD
sparseMatrixRDDelements = parsedDataRDD
sparseMatrixRDDelements.first()

Computing the normalised sparse matrix (user, item, norm_rating)


In [20]:
## Now we have 
## sparseMatrixRDDelements =  (u'movie:1411238', (u'user:3462', 6))
## normMatrixRDD = (u'movie:1216492', 7.0)
## How can we compute the normalised value of each rating?
## Firstly I would use a ...

normSparseMatrixRDDelements = sparseMatrixRDDelements ## add your code here 
normSparseMatrixRDDelements.first()

In [21]:
## then, we compute the normalised value as rating / (shrink value + norm).
## I would use a simple map
shrink = 0.5
normSparseMatrixRDDelements = sparseMatrixRDDelements

normSparseMatrixRDDelements.first()

Compute the cosine similarity in a disaggregated way


In [22]:
## first we make the cartesian product between each element of the normalised sparse matrix with theirselves
item_itemRDD = normSparseMatrixRDDelements ##put you code here
item_itemRDD.first()

In [23]:
## second, we filter out rating on the same movie or on different movie by different users.
## We need to sum the multiplication of each couple of rating of the same user on different movies. 
item_itemRDD = normSparseMatrixRDDelements ##put you code here

##item_itemRDD.first()

In [24]:
## once we selected the right commutations, we multiplicate the normalised ratings and sum all the values with reduceByKey
item_itemRDDelements = normSparseMatrixRDDelements  ##put you code here
       

item_itemRDDelements.first()

In [26]:
## in the end, we group results to have (movie_i, [movie_j, cos_sim(movie_i,movie_j)])
cosSimilaritiesRDD = item_itemRDDelements ##put you code here

cosSimilaritiesRDD.first()

Recommendation

A basic way to use the cosine similarity is to dot product the rating vector of each user with the cosine similarity matrix, rank the results and recommend the top n movies. Thus 1) we need to collect user ratings in the train set 2) we need to compute the dot product 3) we need to extract the top n 4) we need to evaluate the recommendation with the test set

Parsing again the train set


In [19]:
## We need to create this structure
## userId, [(movie1, rating1),...], set(movie1,..)
## We need to know each rating for the dot product and the set of watched movies to avoid to recommend already seen movies

def create_Dict(x,pos=1,key=0,value=1):
    dic = {}
    for el in x[pos]:
        dic[el[key]] = el[value]
    return dic


    
userEventRDD = parsedDataRDD.map(lambda x: (readJson(x[4], 'subject'),create_list((readJson(x[4], 'object')
                                                                                   ,readJson(x[3], 'rating')))))\
                            .reduceByKey(lambda x,y: lambdalist(x,y))\
                            .map(lambda x: (x[0],( create_Dict(x))))
userEventRDD.persist()              
userEventRDD.first()


Out[19]:
(u'user:1612',
 {u'movie:0071571': 8,
  u'movie:0079073': 1,
  u'movie:0108255': 2,
  u'movie:0187078': 7,
  u'movie:0277371': 6,
  u'movie:0390521': 4,
  u'movie:0450259': 10,
  u'movie:0497373': 8,
  u'movie:0867271': 1,
  u'movie:1409024': 8})
The cartesian product create an RDD with this structure: movie = x[1][0] user = x[0][0] alreadyWatchedMovies = x[0][1][1] ratingList = x[0][1][0] cosList = x[1][1]

In [21]:
def takeTopN(x, n=25):
    a = x[1]
    if len(a) < n: n = len(a)
    return sorted(a, key=lambda x: -x[1])[0:n] 
    
def flattener(x):
    result = list()
    for k in x[1]:
        result.append((x[0],k))
    return result
## we want to mantain only product where the recommendable item is not in the alraedy watched list but the second movie
## in the cosSimilaritiesRDD ( cosSimilaritiesRDD : movie1,(movie2,cosSim(movie1,movie2 ) is in the already watched list.
## Thus, we compute rating*cosSim value, we sum the score for each movie for each user, we compute the top n recommendation for each
## user and we flatten the result to have (user,(movie, score)) structure.
## The strange flattening is useful to join them with the user events in the test set for a super-Spark-style recall
## and precision computation.

recommendationRDD = userEventRDD.cartesian(cosSimilaritiesRDD).filter(lambda x: x[1][0] not in x[0][1] and x[1][1][0] in x[0][1])\
                                .map(lambda x: (x[0][0]+"_"+x[1][0],x[1][1][1] * x[0][1][x[1][1][0]]))\
                                .reduceByKey(lambda x,y: x+y)\
                                .map(lambda x: (x[0].split("_")[0],create_list((x[0].split("_")[1],x[1]))))\
                                .reduceByKey(lambda x,y: lambdalist(x,y)).map(lambda x: (x[0],takeTopN(x)))\
                                .map(lambda x: (flattener(x))).flatMap(lambda x: x)
                

recommendationRDD.first()


Out[21]:
(u'user:1340', (u'movie:0399295', 5.6957807324408529))

Reading and parsing test file


In [22]:
readFileTest = sc.textFile('/opt/tmp/movietweeting/evaluation/split/method_rnd_value_0.75_from_10_to_100/test/relation.dat')
parsedDataTestRDD = readFileTest.filter(lambda x: "EOF" not in x).map(lambda x: x.split("\t"))\
                                .map(lambda x: (readJson(x[4],'subject'),(readJson(x[4],'object'),readJson(x[3],'rating'))))
parsedDataTestRDD.first()


Out[22]:
(u'user:32', (u'movie:0118799', 5))

In [23]:
## We want to compute the recall as number of hits among recommended items / number of consumed items
## We need to compute the number of hits per user to join them with the consumed items RDD
recommendationsHitsRDD = parsedDataTestRDD.join(recommendationRDD).filter(lambda x: x[1][0][0] == x[1][1][0])\
            .map(lambda x: (x[0],1)).reduceByKey(lambda x,y: x+y )
recommendationsHitsRDD.first()


Out[23]:
(u'user:1674', 1)

In [24]:
## We need to compute the number of consumed items by the user
userEventsRDD = parsedDataTestRDD.map(lambda x: (x[0],1)).reduceByKey(lambda x,y: x+y)
userEventsRDD.first()


Out[24]:
(u'user:1340', 4)

In [25]:
## Joining userEventsRDD and recommendationsHitsRDD we obtain in the same line the number of 
## correct recommendations and the number of consumed items.
## The output of userEventsRDD.join(recommendationsHitsRDD) is something like (user, (#correctRecommendation,#consumedItems)).
## The output of recallRDD is something like (user, #correctRecommendation/#consumedItems)
## THe sum of this value / the number of elements of the RDD is the Recall
recallRDD = userEventsRDD.join(recommendationsHitsRDD).map(lambda x: float(x[1][1])/x[1][0])
recallAtN = recallRDD.sum()/recallRDD.count()
print "Recall:",recallAtN


Recall: 0.253216374269

In [26]:
## To compute precision, we need to know the number of recommendation per user
recomCountRDD = recommendationRDD.map(lambda x: (x[0],1)).reduceByKey(lambda x,y: x+y)
recomCountRDD.first()


Out[26]:
(u'user:1340', 25)

In [27]:
## Same as recall
precisionRDD = recomCountRDD.join(recommendationsHitsRDD).map(lambda x: float(x[1][1])/x[1][0])
precision = precisionRDD.sum()/precisionRDD.count()
print "Precision:",precision


Precision: 0.0466666666667

In [ ]: