In [2]:
import urllib2, json
import numpy as np
inputEnt = 'https://raw.githubusercontent.com/crowdrec/datasets/master/01.MovieTweetings/datasets/snapshots_10K/entities.dat'
inputRel = 'https://raw.githubusercontent.com/crowdrec/datasets/master/01.MovieTweetings/datasets/snapshots_10K/relations.dat'
if 'http' in inputEnt:
response = urllib2.urlopen(inputEnt)
html = response.read()
entitiesRDD = sc.parallelize(html.split("\n")).filter(lambda x: "{" in x and "EOF\t0" not in x)
if 'http' in inputRel:
response = urllib2.urlopen(inputRel)
html = response.read()
relationsRDD = sc.parallelize(html.split("\n")).filter(lambda x: "{" in x and "EOF\t0" not in x)
In [9]:
print "First allow to see the 'first' result of the RDD:\n",entitiesRDD.first()
print "\nTake(n) allow to see the 'first' n results of the RDD:"
for k in entitiesRDD.take(5):
print k
print "\nCount allow to count the number of elements in the RDD:",entitiesRDD.count()
In [13]:
print "Map applies a function to each element of the RDD one by one (in this example, a simple 'str.split()'):"
relationsRDD.map(lambda x: x.split("\t")).first()
Out[13]:
In [23]:
print "We can use map to create RDD with an handler structure e.g. (event,ts) :"
relationsRDD.map(lambda x: ("event:"+x.split("\t")[1], x.split("\t")[2])).first()
Out[23]:
In [35]:
import datetime
print "We can apply 'custom' function:\nfor example, we can compute the day of the month from the ts"
def dayOfTheMonth(x):
return datetime.datetime.fromtimestamp(x).strftime('%d')
relationsRDD.map(lambda x: ("event:"+x.split("\t")[1], x.split("\t")[2])).map(lambda x: (x[0],x[1], dayOfTheMonth(float(x[1])))).first()
Out[35]:
In [37]:
print "We can use distinct map and count to extract the number of distinct ts in the dataset"
print "Number of ts:",relationsRDD.map(lambda x: x.split("\t")[2]).count()
print "Number of distinct ts:",relationsRDD.map(lambda x: x.split("\t")[2]).distinct().count()
In [30]:
print "Finally, we can use map to create PairRDD (i.e. RDD where each element is in the form '(key,(value)' ):"
relationsRDD.map(lambda x: ("event:"+x.split("\t")[1], (x.split("\t")[2]))).first()
Out[30]:
In [31]:
print "It is useful to create PairRDD (key, 1) to count up any quantities (e.g. the number of event occurred in the same ts)"
relationsRDD.map(lambda x: (x.split("\t")[2], 1)).first()
Out[31]:
In [34]:
print "With reduceByKey we can group tuples with the same key mixing up their values using any function."
print "for example we want to create a list of event occurred in the same ts."
relationsRDD.map(lambda x: (x.split("\t")[2],1)).reduceByKey(lambda x,y: x+y).first()
Out[34]:
In [ ]:
print "With top(n,key=lambda x: ...) is possible to extract the top n items"
relationsRDD.map(lambda x: (x.split("\t")[2],1)).reduceByKey(lambda x,y: x+y).top(5,key=lambda x: x[1])
In [44]:
print "'sc.parallelize()' allow us to parse a list into an RDD"
l = list()
l.append(((2,('c1',1))))
l.append(((1,('c2',1))))
l.append(((5,('c1',1))))
l.append(((6,('c2',1))))
c = sc.parallelize(l)
print "First line:",c.first()
In [42]:
print "Join allow us to make 'sql-like' join"
l = list()
l.append(((2,('c1',1))))
l.append(((1,('c2',1))))
l.append(((5,('c1',1))))
l.append(((6,('c2',1))))
c = sc.parallelize(l)
l2 = list()
l2.append((1,('m1',1)))
l2.append((3,('m2',1)))
l2.append((4,('m3',1)))
l2.append((5,('m2',1)))
l2.append((6,('m3',1)))
m = sc.parallelize(l2)
print "c:",c.collect(), "m:",m.collect()
print "c.join(m):"
c.join(m).collect()
Out[42]:
In [45]:
print "Cartesian compute the cartesian product of two RDD"
c.cartesian(m).collect()
Out[45]:
In [48]:
print "We can use filter on top of cartesian to save only some tuples"
print "e.g. we want to save only tuples where the first value of the first tuple is bigger than the first value of the other one"
c.cartesian(m).filter(lambda x: x[0][0] > x[1][0]).collect()
Out[48]:
In [ ]:
## 0. show first line of each file
# time: 2'
In [ ]:
## 1. count the numbers of relations and entities
# time: 2'
In [ ]:
## 2. extract 10 distinct "subject" from 10 lines from the relationsRDD RDD
# time: 5'
In [ ]:
## 3. How many distinct users we have?
# time: 2'
In [ ]:
## 4. Count the numbers of events for 10 users
# time: 5'
In [ ]:
## 5. [Optional] Extract the top 10 viewed items
In [ ]:
## 6. [Optional] How many movies has been watched exactly 10 times?
In [ ]:
## 7. [Optional] Create the histogram sorted by 'times'
## e.g.
## There are 1874 movies watched 1 times
## There are 517 movies watched 2 times
## There are 252 movies watched 3 times
In [ ]:
## 8. Print 10 lines with this format:
## (twitterId,movie:id,ts, rating)
# time: 5'
In [ ]:
## 9. [Optional] Print 10 lines with this format:
## (twitterId,title,ts, rating)
In [ ]:
## 10.0 Create an user RDD with this format: (userid,movieid)
# time: 2'
In [ ]:
## 10.1 extract a couple of users if both has seen the same movie
# time: 5'
In [ ]:
In [ ]:
In [ ]: