Introduction to Spark

Many examples courtesy Monte Lunacek

Outline

  • Functional programming in Python
  • Spark's programming model
  • As many examples as we can get through!

Functional Python

Python acquired lambda, reduce, filter and map, courtesy of a Lisp hacker who missed them and submitted working patches. -Guido van Rossum

We will use these concepts (and more) in Spark

The map abstraction

For the category theory inclined: a functor over functions (morphisms)! Basically an association of functions.


In [3]:
def square(x):
    return x*x

numbers = [1,2,3]

def map_squares(nums):
    res = []
    for x in nums:
        res.append( square(x) )
    return res

map_squares(numbers)


Out[3]:
[1, 4, 9]

or...


In [5]:
results = map(square, numbers)
results


Out[5]:
[1, 4, 9]

For parallel computing in python, map is a key abstraction.


In [6]:
from multiprocessing import Pool
pool = Pool(5)
results = pool.map(square, numbers)
results


Out[6]:
[1, 4, 9]

lambda

Anonymous function: a function without a name, like inlining


In [7]:
lambda_square = lambda x: x*x
map(lambda_square, range(10))


Out[7]:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [8]:
map(lambda x: x*x, range(10))


Out[8]:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [10]:
res = map(lambda x: x*x, range(10))

reduce

Apply a function with two arguments cumulatively to the container.


In [11]:
def add_num(x1, x2):
    return x1+x2

print reduce(add_num, res)


285

In [12]:
print reduce(lambda x,y: x+y, res)


285

filter

Constructs a new list for items where the applied function is True.


In [13]:
def less_than(x):
    return x>10

filter(less_than, res)


Out[13]:
[16, 25, 36, 49, 64, 81]

In [14]:
filter(lambda x: x>10, res)


Out[14]:
[16, 25, 36, 49, 64, 81]

Spark Programming Model

Everything starts with a SparkContext


In [15]:
import findspark
import os
findspark.init() # you need that before import pyspark in Jupyter notebook

import pyspark

In [16]:
sc = pyspark.SparkContext()

Create RDDs

RDD Documentation

The parallelize method is a utility for initializing RDDs.

  • NB: parallelized structure must fit in driver memory!

In [17]:
import numpy as np

rdd = sc.parallelize(np.arange(20), numSlices=5)

Transformations and Actions

Transformations return edges to new vertex in DAG, lazy evaluation, wide and narrow evaluation

  • map, flatmap
  • reduceByKey
  • filter
  • glom

Actions return values- beware of memory limitations!

  • collect
  • reduce
  • take
  • count

What does this look like?

  • glom: Return an RDD created by coalescing all elements within each partition into a list.
  • collect: Returns a list from all elements of an RDD.

In [18]:
for x in rdd.glom().collect():
    print x


[0, 1, 2, 3]
[4, 5, 6, 7]
[8, 9, 10, 11]
[12, 13, 14, 15]
[16, 17, 18, 19]

In [19]:
rdd = sc.parallelize(np.arange(20), numSlices=10)
for x in rdd.glom().collect():
    print x


[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]
[10, 11]
[12, 13]
[14, 15]
[16, 17]
[18, 19]

map and Flatmap

Return a new RDD by first applying a function and then flattening the results.


In [20]:
rdd = sc.parallelize([ [2, 3, 4],[0, 1],[5, 6, 7, 8] ])
rdd.collect()


Out[20]:
[[2, 3, 4], [0, 1], [5, 6, 7, 8]]

In [21]:
rdd.map(lambda x: range(len(x))).collect()


Out[21]:
[[0, 1, 2], [0, 1], [0, 1, 2, 3]]

Or I can flatten the results...


In [22]:
rdd.flatMap(lambda x: range(len(x))).collect()


Out[22]:
[0, 1, 2, 0, 1, 0, 1, 2, 3]

Or flatten the original results


In [23]:
rdd.flatMap(lambda x: x).collect()


Out[23]:
[2, 3, 4, 0, 1, 5, 6, 7, 8]

Reduction

(Associative operation)


In [24]:
rdd.flatMap(lambda x: x).reduce(lambda x,y: x+y)


Out[24]:
36

In [25]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
rdd.collect()


Out[25]:
[('a', 1), ('b', 1), ('a', 2)]

In [26]:
rdd.reduceByKey(lambda x,y: x+y).collect()


Out[26]:
[('a', 3), ('b', 1)]

In [27]:
rdd = sc.parallelize([("hamlet", 1), ("claudius", 1), ("hamlet", 1)])

In [28]:
rdd.countByKey()


Out[28]:
defaultdict(int, {'claudius': 1, 'hamlet': 2})

Reading HDF5 with PySpark

Example courtesy Freeman Lab: https://github.com/freeman-lab/hdf5-and-spark


In [ ]:
import h5py

In [ ]:
h5file_path='../data/hdf5_ex.h5'

def readchunk(v):
    chunk = h5py.File(h5file_path, 'r')
    return chunk['/chunked'][v,:]

chunked_array = sc.parallelize(range(0,10)).map(lambda v: readchunk(v))
chunked_array.take(3)

Now write it to a CSV (from stackoverflow user Daniel Darabos)


In [ ]:
def toCSV(data):
    return ','.join(str(d) for d in data)

lines = chunked_array.map(toCSV).repartition(1)
lines.saveAsTextFile('hdf5_ex.csv')

What did repartition accomplish?


In [ ]: