In [2]:
import urllib2, json
import numpy as np

inputEnt = 'https://raw.githubusercontent.com/crowdrec/datasets/master/01.MovieTweetings/datasets/snapshots_10K/entities.dat'
inputRel = 'https://raw.githubusercontent.com/crowdrec/datasets/master/01.MovieTweetings/datasets/snapshots_10K/relations.dat'

if 'http' in inputEnt:
    response = urllib2.urlopen(inputEnt)
    html = response.read()
    entitiesRDD = sc.parallelize(html.split("\n")).filter(lambda x: "{" in x and "EOF\t0" not in x)


if 'http' in inputRel:
    response = urllib2.urlopen(inputRel)
    html = response.read()
    relationsRDD = sc.parallelize(html.split("\n")).filter(lambda x: "{" in x and "EOF\t0" not in x)

Basic action: first(), take(), count()


In [9]:
print "First allow to see the 'first' result of the RDD:\n",entitiesRDD.first()
print "\nTake(n) allow to see the 'first' n results of the RDD:"
for k in entitiesRDD.take(5):
    print k
print "\nCount allow to count the number of elements in the RDD:",entitiesRDD.count()


First allow to see the 'first' result of the RDD:
user	1	-1	{"twitterid":"177651718"}	{}

Take(n) allow to see the 'first' n results of the RDD:
user	1	-1	{"twitterid":"177651718"}	{}
user	2	-1	{"twitterid":"103607473"}	{}
user	3	-1	{"twitterid":"288317450"}	{}
user	4	-1	{"twitterid":"68640782"}	{}
user	5	-1	{"twitterid":"199219885"}	{}

Count allow to count the number of elements in the RDD: 6890

Map


In [13]:
print "Map applies a function to each element of the RDD one by one (in this example, a simple 'str.split()'):" 
relationsRDD.map(lambda x: x.split("\t")).first()


Map applies a function to each element of the RDD one by one (in this example, a simple 'str.split()')
Out[13]:
['rating.explicit',
 '1',
 '1363245118',
 '{"rating":9}',
 '{"subject":"user:1","object":"movie:0120735"}']

In [23]:
print "We can use map to create RDD with an handler structure e.g. (event,ts) :"
relationsRDD.map(lambda x: ("event:"+x.split("\t")[1], x.split("\t")[2])).first()


We can use map to create RDD with an handler structure e.g. (user,ts) :
Out[23]:
('user:1', '1363245118')

In [35]:
import datetime

print "We can apply 'custom' function:\nfor example, we can compute the day of the month from the ts"
def dayOfTheMonth(x):
    return datetime.datetime.fromtimestamp(x).strftime('%d')
    
    
    
relationsRDD.map(lambda x: ("event:"+x.split("\t")[1], x.split("\t")[2])).map(lambda x: (x[0],x[1], dayOfTheMonth(float(x[1])))).first()


We can apply 'custom' function:
for example, we can compute the day of the month from the ts
Out[35]:
('event:1', '1363245118', '14')

Distinct


In [37]:
print "We can use distinct map and count to extract the number of distinct ts in the dataset"
print "Number of ts:",relationsRDD.map(lambda x: x.split("\t")[2]).count()
print "Number of distinct ts:",relationsRDD.map(lambda x: x.split("\t")[2]).distinct().count()


We can use distinct map and count to extract the number of distinct ts in the dataset
Number of ts: 10000
Number of distinct ts: 9963

PairRDD (key,value)


In [30]:
print "Finally, we can use map to create PairRDD (i.e. RDD where each element is in the form '(key,(value)' ):"
relationsRDD.map(lambda x: ("event:"+x.split("\t")[1], (x.split("\t")[2]))).first()


Finally, we can use map to create PairRDD (i.e. RDD where each element is in the form '(key,(value)' ):
Out[30]:
('event:1', '1363245118')

In [31]:
print "It is useful to create PairRDD (key, 1) to count up any quantities (e.g. the number of event occurred in the same ts)"
relationsRDD.map(lambda x: (x.split("\t")[2], 1)).first()


It is useful to create PairRDD (key, 1) to count up any quantities (e.g. the number of item with a specific id)
Out[31]:
('1363245118', 1)

ReduceByKey


In [34]:
print "With reduceByKey we can group tuples with the same key mixing up their values using any function."
print "for example we want to create a list of event occurred in the same ts." 

relationsRDD.map(lambda x: (x.split("\t")[2],1)).reduceByKey(lambda x,y: x+y).first()


With reduceByKey we can group tuples with the same key mixing up their values using any function.
for example we want to create a list of event occurred in the same ts.
With top(n,key=lambda x: ...) is possible to extract the top n items
Out[34]:
[('1362696157', 2),
 ('1362336723', 2),
 ('1363216232', 2),
 ('1362350784', 2),
 ('1362984188', 2)]

Top


In [ ]:
print "With top(n,key=lambda x: ...) is possible to extract the top n items"
relationsRDD.map(lambda x: (x.split("\t")[2],1)).reduceByKey(lambda x,y: x+y).top(5,key=lambda x: x[1])

parallelize


In [44]:
print "'sc.parallelize()' allow us to parse a list into an RDD"
l = list()
l.append(((2,('c1',1))))
l.append(((1,('c2',1))))
l.append(((5,('c1',1))))
l.append(((6,('c2',1))))
c = sc.parallelize(l)
print "First line:",c.first()


'sc.parallelize()' allow us to parse a list into an RDD
First line: (2, ('c1', 1))

Join


In [42]:
print "Join allow us to make 'sql-like' join"

l = list()
l.append(((2,('c1',1))))
l.append(((1,('c2',1))))
l.append(((5,('c1',1))))
l.append(((6,('c2',1))))
c = sc.parallelize(l)

l2 = list()
l2.append((1,('m1',1)))
l2.append((3,('m2',1)))
l2.append((4,('m3',1)))
l2.append((5,('m2',1)))
l2.append((6,('m3',1)))
m = sc.parallelize(l2)


print "c:",c.collect(), "m:",m.collect()
print "c.join(m):"
c.join(m).collect()


Join allow us to make 'sql-like' join
c: [(2, ('c1', 1)), (1, ('c2', 1)), (5, ('c1', 1)), (6, ('c2', 1))] m: [(1, ('m1', 1)), (3, ('m2', 1)), (4, ('m3', 1)), (5, ('m2', 1)), (6, ('m3', 1))]
c.fullOuterJoin(m):
Out[42]:
[(1, (('c2', 1), ('m1', 1))),
 (5, (('c1', 1), ('m2', 1))),
 (6, (('c2', 1), ('m3', 1)))]

Cartesian


In [45]:
print "Cartesian compute the cartesian product of two RDD"
c.cartesian(m).collect()


Cartesian compute the cartesian product of two RDD
Out[45]:
[((2, ('c1', 1)), (1, ('m1', 1))),
 ((2, ('c1', 1)), (3, ('m2', 1))),
 ((2, ('c1', 1)), (4, ('m3', 1))),
 ((2, ('c1', 1)), (5, ('m2', 1))),
 ((2, ('c1', 1)), (6, ('m3', 1))),
 ((1, ('c2', 1)), (1, ('m1', 1))),
 ((1, ('c2', 1)), (3, ('m2', 1))),
 ((1, ('c2', 1)), (4, ('m3', 1))),
 ((1, ('c2', 1)), (5, ('m2', 1))),
 ((1, ('c2', 1)), (6, ('m3', 1))),
 ((5, ('c1', 1)), (1, ('m1', 1))),
 ((5, ('c1', 1)), (3, ('m2', 1))),
 ((5, ('c1', 1)), (4, ('m3', 1))),
 ((5, ('c1', 1)), (5, ('m2', 1))),
 ((5, ('c1', 1)), (6, ('m3', 1))),
 ((6, ('c2', 1)), (1, ('m1', 1))),
 ((6, ('c2', 1)), (3, ('m2', 1))),
 ((6, ('c2', 1)), (4, ('m3', 1))),
 ((6, ('c2', 1)), (5, ('m2', 1))),
 ((6, ('c2', 1)), (6, ('m3', 1)))]

Filter


In [48]:
print "We can use filter on top of cartesian to save only some tuples"
print "e.g. we want to save only tuples where the first value of the first tuple is bigger than the first value of the other one"
c.cartesian(m).filter(lambda x: x[0][0] > x[1][0]).collect()


We can use filter on top of cartesian to save only some tuples
e.g. we want to save only tuples where the first value of the first tuple is bigger than the first value of the other one
Out[48]:
[((2, ('c1', 1)), (1, ('m1', 1))),
 ((5, ('c1', 1)), (1, ('m1', 1))),
 ((5, ('c1', 1)), (3, ('m2', 1))),
 ((5, ('c1', 1)), (4, ('m3', 1))),
 ((6, ('c2', 1)), (1, ('m1', 1))),
 ((6, ('c2', 1)), (3, ('m2', 1))),
 ((6, ('c2', 1)), (4, ('m3', 1))),
 ((6, ('c2', 1)), (5, ('m2', 1)))]

Exercises to do

Please, try to do this exercise at the best of your skill. We will wait a bit before showing the result. If you know the result, you can raise your hand and try. If these exercises are too easy for you, do them alone and try the optional ones. We will not show the results for the [optional] one: you can email your solution to us.

In [ ]:
## 0. show first line of each file 
# time: 2'

In [ ]:
## 1. count the numbers of relations and entities
# time: 2'

In [ ]:
## 2. extract 10 distinct "subject" from 10 lines from the relationsRDD RDD
# time: 5'

In [ ]:
## 3. How many distinct users we have?
# time: 2'

In [ ]:
## 4. Count the numbers of events for 10 users
# time: 5'

In [ ]:
## 5. [Optional] Extract the top 10 viewed items

In [ ]:
## 6. [Optional] How many movies has been watched exactly 10 times?

In [ ]:
## 7. [Optional] Create the histogram sorted by 'times' 
## e.g. 
## There are 1874 movies watched  1 times
## There are 517 movies watched  2 times
## There are 252 movies watched  3 times

In [ ]:
## 8. Print 10 lines with this format:
## (twitterId,movie:id,ts, rating)
# time: 5'

In [ ]:
## 9. [Optional] Print 10 lines with this format:
## (twitterId,title,ts, rating)

In [ ]:
## 10.0 Create an user RDD with this format: (userid,movieid) 
# time: 2'

In [ ]:
## 10.1 extract a couple of users if both has seen the same movie
# time: 5'

In [ ]:


In [ ]:


In [ ]: