In [4]:
import pyspark
sc = pyspark.SparkContext('local[*]')
# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)
Out[4]:
In [5]:
rdd = sc.parallelize(range(100000))
In [6]:
rdd
Out[6]:
You can convert it back to python list using collect
method:
In [7]:
rdd.collect()
Out[7]:
Main operations on RDD are transformations:
map
- apply a function to every element of the collection;filter
- filter collection using predicate;flatMap
- apply a function that changes each element into a collection and flatten the results;... and actions (a. k. a. aggregations):
collect
- converts RDD to a list;count
- counts the number of elements in the RDD;take(n)
- takes first n
elements of the RDD and returns a list;takeSample(withReplacement, n)
- takes a sample of RDD of n
elements;reduce(function)
- reduces the collection using function
;aggregate
- aggregates elements of an RDD.See http://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.RDD for more information and other usefull functions.
Remember: Transformations are lazy and actions are eager.
Task 1 Fill the ... below to aviod AssertionError:
In [13]:
assert rdd.map(lambda x: x * 13 % 33).take(34)[-1] == ...
Task 2 Write count using reduce
:
In [17]:
def fun(x, y):
return "something"
assert rdd.count() == rdd.reduce(fun)
Task 3 Write sum using aggregate
described here http://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.RDD.aggregate
In [ ]:
assert rdd.sum() == rdd.aggregate(0, lambda x, y: ..., lambda x, y: ...)
In [20]:
rdd2 = rdd.flatMap(lambda x: [x**2 % 8609, x**3 % 8609])
In [23]:
rdd2.take(10)
Out[23]:
Task 4 Get the biggest value from in the rdd2
(use reduce):
In [ ]:
assert rdd2.max() == rdd.reduce(lambda x, y: ...)
Task 5 Get the second biggest value from rdd2
, use reduce
or aggregate
.
In [ ]:
rdd2.aggregate(..., ..., ...)
In [34]:
reminders = rdd2.groupBy(lambda x: x)
reminders.take(10)
Out[34]:
We only want the lengths of ResultIterable
, so we can map values of the key-value pairs:
In [35]:
reminders_counts = reminders.mapValues(lambda x: len(x))
reminders_counts.take(10)
Out[35]:
Let's sort it by count:
In [37]:
reminders_counts.sortBy(lambda x: x[1], ascending=False).take(15)
Out[37]:
Task 6 Compute the counts of repetitions of reminders: how many times a reminder occured 36 times, and 35 times, and so on...
In [ ]:
You can move to some real dataset!