In [4]:
## textFile allows to read a local, s3 or hdfs file. It is not an action
readFile = sc.textFile('/vagrant/data/movietweeting/evaluation/split/method_rnd_value_0.75_from_10_to_100/train/relation.dat')
print readFile
In [5]:
## To see what we have read, we need an action such as "first()".
## Using first we can visualize the first line of the read file.
print readFile.first()
In [6]:
## map applies a function to each line of the RDD one by one.
## Inn this example, it applies a "split("\t")" function.
## Thus we start with a String and we end up with a list of string.
## Map is an action, thus it takes as input an RDD and return another one.
## We want to remove the end of file line from the file. Thus we use the filter to remove it.
parsedDataRDD = readFile.filter(lambda x: "EOF" not in x).map(lambda x: x.split("\t"))
print parsedDataRDD
In [7]:
print parsedDataRDD.first()
In [8]:
## We want to count the distinct number of users.
## Thus we need to extract the username, use the distinct() transformation and count() the result
import json
def readJson(data,field):
return json.loads(data)[field]
usersRDD = parsedDataRDD.map(lambda x: readJson(x[4],"subject"))
print usersRDD.first()
In [9]:
numberOfUsers = usersRDD.distinct().count()
print "There are",numberOfUsers,"distinct users!!"
In [10]:
## We want to extract the most popular movie.
## We need to use reduceByKey but it needs a PairRDD: lets build it!
movieRDD = parsedDataRDD.map(lambda x: (readJson(x[4],"object"),1))
print movieRDD.first(),"where", "'"+movieRDD.first()[0]+"'", "is the key and", "'"+str(movieRDD.first()[1])+"'","is a 'fake' value"
In [11]:
## We use the fake value '1' because reducing by key using the movieId key summing up every '1'
## will give back the times each movie is consumed
## ReduceByKey works this way:
## it finds a couple of line with the same key T1 = (k1,x) T2 = (k2,y)
## it combine the values with a specified function, in the example it sums them returning a new line T3 = (k1, x+y).
## It goes on until there are no more lines with the same key.
## Using top we can extract the top lines
movieRDD.reduceByKey(lambda x,y: x+y).top(5,key=lambda x: x[1])
Out[11]:
In [12]:
## We need to create a reduceByKey function that allows us to append the values in a list.
## To do that, first we need to transform each value in a list with only one element, the value itself.
## We use map(lambda x: createList(x)) for this purpose.
## To append each value of each list to the others, we use lambdalist.
## We need to do that because we cannot know the "state" of each line during a reduceByKey.
def create_list(value, count=1):
result = list()
for k in range(count):
result.append(value)
return result
def lambdalist(x,y):
for k in x:
y.append(k)
return y
In [18]:
import numpy as np
## norm(line) = sqrt ( sum ( a_i ^2)) for a_i from line
## to compute the norm you must divide the process in 4 step:
## start with parsedDataRDD
## 1. parse tha data to have the format (movieID,rating)
## 2. compute the pow2 of each rating (movieID,rating) ==(map)==> (movieID,rating^2)
## 3. sum each rating^2 by key (hint: I would use reduce by key)
## 4. each sum must be root squared
normMatrixRDD = parsedDataRDD ##put your code here
normMatrixRDD.first()
In [19]:
## once we have a normalized lookup PairRDD, we need to create a similar RDD with also the userId field
## Now you should create an RDD with this format (u'movie:1411238', (u'user:3462', 6))
## Start with parsedDataRDD
sparseMatrixRDDelements = parsedDataRDD
sparseMatrixRDDelements.first()
In [20]:
## Now we have
## sparseMatrixRDDelements = (u'movie:1411238', (u'user:3462', 6))
## normMatrixRDD = (u'movie:1216492', 7.0)
## How can we compute the normalised value of each rating?
## Firstly I would use a ...
normSparseMatrixRDDelements = sparseMatrixRDDelements ## add your code here
normSparseMatrixRDDelements.first()
In [21]:
## then, we compute the normalised value as rating / (shrink value + norm).
## I would use a simple map
shrink = 0.5
normSparseMatrixRDDelements = sparseMatrixRDDelements
normSparseMatrixRDDelements.first()
In [22]:
## first we make the cartesian product between each element of the normalised sparse matrix with theirselves
item_itemRDD = normSparseMatrixRDDelements ##put you code here
item_itemRDD.first()
In [23]:
## second, we filter out rating on the same movie or on different movie by different users.
## We need to sum the multiplication of each couple of rating of the same user on different movies.
item_itemRDD = normSparseMatrixRDDelements ##put you code here
##item_itemRDD.first()
In [24]:
## once we selected the right commutations, we multiplicate the normalised ratings and sum all the values with reduceByKey
item_itemRDDelements = normSparseMatrixRDDelements ##put you code here
item_itemRDDelements.first()
In [26]:
## in the end, we group results to have (movie_i, [movie_j, cos_sim(movie_i,movie_j)])
cosSimilaritiesRDD = item_itemRDDelements ##put you code here
cosSimilaritiesRDD.first()
In [19]:
## We need to create this structure
## userId, [(movie1, rating1),...], set(movie1,..)
## We need to know each rating for the dot product and the set of watched movies to avoid to recommend already seen movies
def create_Dict(x,pos=1,key=0,value=1):
dic = {}
for el in x[pos]:
dic[el[key]] = el[value]
return dic
userEventRDD = parsedDataRDD.map(lambda x: (readJson(x[4], 'subject'),create_list((readJson(x[4], 'object')
,readJson(x[3], 'rating')))))\
.reduceByKey(lambda x,y: lambdalist(x,y))\
.map(lambda x: (x[0],( create_Dict(x))))
userEventRDD.persist()
userEventRDD.first()
Out[19]:
In [21]:
def takeTopN(x, n=25):
a = x[1]
if len(a) < n: n = len(a)
return sorted(a, key=lambda x: -x[1])[0:n]
def flattener(x):
result = list()
for k in x[1]:
result.append((x[0],k))
return result
## we want to mantain only product where the recommendable item is not in the alraedy watched list but the second movie
## in the cosSimilaritiesRDD ( cosSimilaritiesRDD : movie1,(movie2,cosSim(movie1,movie2 ) is in the already watched list.
## Thus, we compute rating*cosSim value, we sum the score for each movie for each user, we compute the top n recommendation for each
## user and we flatten the result to have (user,(movie, score)) structure.
## The strange flattening is useful to join them with the user events in the test set for a super-Spark-style recall
## and precision computation.
recommendationRDD = userEventRDD.cartesian(cosSimilaritiesRDD).filter(lambda x: x[1][0] not in x[0][1] and x[1][1][0] in x[0][1])\
.map(lambda x: (x[0][0]+"_"+x[1][0],x[1][1][1] * x[0][1][x[1][1][0]]))\
.reduceByKey(lambda x,y: x+y)\
.map(lambda x: (x[0].split("_")[0],create_list((x[0].split("_")[1],x[1]))))\
.reduceByKey(lambda x,y: lambdalist(x,y)).map(lambda x: (x[0],takeTopN(x)))\
.map(lambda x: (flattener(x))).flatMap(lambda x: x)
recommendationRDD.first()
Out[21]:
In [22]:
readFileTest = sc.textFile('/opt/tmp/movietweeting/evaluation/split/method_rnd_value_0.75_from_10_to_100/test/relation.dat')
parsedDataTestRDD = readFileTest.filter(lambda x: "EOF" not in x).map(lambda x: x.split("\t"))\
.map(lambda x: (readJson(x[4],'subject'),(readJson(x[4],'object'),readJson(x[3],'rating'))))
parsedDataTestRDD.first()
Out[22]:
In [23]:
## We want to compute the recall as number of hits among recommended items / number of consumed items
## We need to compute the number of hits per user to join them with the consumed items RDD
recommendationsHitsRDD = parsedDataTestRDD.join(recommendationRDD).filter(lambda x: x[1][0][0] == x[1][1][0])\
.map(lambda x: (x[0],1)).reduceByKey(lambda x,y: x+y )
recommendationsHitsRDD.first()
Out[23]:
In [24]:
## We need to compute the number of consumed items by the user
userEventsRDD = parsedDataTestRDD.map(lambda x: (x[0],1)).reduceByKey(lambda x,y: x+y)
userEventsRDD.first()
Out[24]:
In [25]:
## Joining userEventsRDD and recommendationsHitsRDD we obtain in the same line the number of
## correct recommendations and the number of consumed items.
## The output of userEventsRDD.join(recommendationsHitsRDD) is something like (user, (#correctRecommendation,#consumedItems)).
## The output of recallRDD is something like (user, #correctRecommendation/#consumedItems)
## THe sum of this value / the number of elements of the RDD is the Recall
recallRDD = userEventsRDD.join(recommendationsHitsRDD).map(lambda x: float(x[1][1])/x[1][0])
recallAtN = recallRDD.sum()/recallRDD.count()
print "Recall:",recallAtN
In [26]:
## To compute precision, we need to know the number of recommendation per user
recomCountRDD = recommendationRDD.map(lambda x: (x[0],1)).reduceByKey(lambda x,y: x+y)
recomCountRDD.first()
Out[26]:
In [27]:
## Same as recall
precisionRDD = recomCountRDD.join(recommendationsHitsRDD).map(lambda x: float(x[1][1])/x[1][0])
precision = precisionRDD.sum()/precisionRDD.count()
print "Precision:",precision
In [ ]: