Many examples courtesy Monte Lunacek
In [3]:
def square(x):
return x*x
numbers = [1,2,3]
def map_squares(nums):
res = []
for x in nums:
res.append( square(x) )
return res
map_squares(numbers)
Out[3]:
or...
In [5]:
results = map(square, numbers)
results
Out[5]:
For parallel computing in python, map is a key abstraction.
In [6]:
from multiprocessing import Pool
pool = Pool(5)
results = pool.map(square, numbers)
results
Out[6]:
In [7]:
lambda_square = lambda x: x*x
map(lambda_square, range(10))
Out[7]:
In [8]:
map(lambda x: x*x, range(10))
Out[8]:
In [10]:
res = map(lambda x: x*x, range(10))
In [11]:
def add_num(x1, x2):
return x1+x2
print reduce(add_num, res)
In [12]:
print reduce(lambda x,y: x+y, res)
In [13]:
def less_than(x):
return x>10
filter(less_than, res)
Out[13]:
In [14]:
filter(lambda x: x>10, res)
Out[14]:
In [15]:
import findspark
import os
findspark.init() # you need that before import pyspark in Jupyter notebook
import pyspark
In [16]:
sc = pyspark.SparkContext()
The parallelize method is a utility for initializing RDDs.
In [17]:
import numpy as np
rdd = sc.parallelize(np.arange(20), numSlices=5)
Transformations return edges to new vertex in DAG, lazy evaluation, wide and narrow evaluation
map, flatmapreduceByKeyfilterglomActions return values- beware of memory limitations!
collectreducetakecountWhat does this look like?
glom: Return an RDD created by coalescing all elements within each partition into a list.collect: Returns a list from all elements of an RDD.
In [18]:
for x in rdd.glom().collect():
print x
In [19]:
rdd = sc.parallelize(np.arange(20), numSlices=10)
for x in rdd.glom().collect():
print x
In [20]:
rdd = sc.parallelize([ [2, 3, 4],[0, 1],[5, 6, 7, 8] ])
rdd.collect()
Out[20]:
In [21]:
rdd.map(lambda x: range(len(x))).collect()
Out[21]:
Or I can flatten the results...
In [22]:
rdd.flatMap(lambda x: range(len(x))).collect()
Out[22]:
Or flatten the original results
In [23]:
rdd.flatMap(lambda x: x).collect()
Out[23]:
In [24]:
rdd.flatMap(lambda x: x).reduce(lambda x,y: x+y)
Out[24]:
In [25]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
rdd.collect()
Out[25]:
In [26]:
rdd.reduceByKey(lambda x,y: x+y).collect()
Out[26]:
In [27]:
rdd = sc.parallelize([("hamlet", 1), ("claudius", 1), ("hamlet", 1)])
In [28]:
rdd.countByKey()
Out[28]:
In [ ]:
import h5py
In [ ]:
h5file_path='../data/hdf5_ex.h5'
def readchunk(v):
chunk = h5py.File(h5file_path, 'r')
return chunk['/chunked'][v,:]
chunked_array = sc.parallelize(range(0,10)).map(lambda v: readchunk(v))
chunked_array.take(3)
In [ ]:
def toCSV(data):
return ','.join(str(d) for d in data)
lines = chunked_array.map(toCSV).repartition(1)
lines.saveAsTextFile('hdf5_ex.csv')
In [ ]: