Introduction to Spark using PySpark

Configure Spark

  1. Set environmental variables:

    • SPARK_HOME (Path to spark installation directory)
    • PYSPARK_PYTHON (Path to python executable Ex. /anaconda/bin/python)
  2. Edit spark-env.sh to add the following:

    • SPARK_LOG_DIR
    • SPARK_LOCAL_DIRS
    • SPARK_WORKER_INSTANCES
    • SPARK_WORKER_DIR
    • SPARK_CLASSPATH (Add external libraries such as jars here; adds to both executor & driver classpaths)
  3. Edit the logging.properties (optional)
  4. Edit the slaves file to add any machines/hosts per line that will act as worker nodes

Start standalone spark cluster manager

  1. Start the master: SPARK_HOME/sbin/start-master.sh
  2. Access the WebUI: [MasterWebUI: Started MasterWebUI at http://xxx.xxx.xxx.xxx:8080]
  3. Enable Remote Login on Mac OSX so that slaves can connect to port 22
  4. Start the slave(s): SPARK_HOME/sbin/start-slaves.sh spark://master-host:port

Stop master and slaves

  1. Stop the slave(s): SPARK_HOME/sbin/stop-slaves.sh
  2. Stop the master: SPARK_HOME/sbin/stop-master.sh

Note: Add pyspark modules (SPARK_HOME/python/lib) to python path from code so that they are available for imports.

Submitting Spark Applications (Jobs)

SPARK_HOME/bin/spark-submit --master //master-url// --py-files //comma-separated python dependencies as zip// our-python-code.py //custom args//

HELP: SPARK_HOME/bin/spark-submit —help

References


In [2]:
import os
import sys

sys.path.append(os.environ["SPARK_HOME"] + "/python/lib/py4j-0.9-src.zip")
sys.path.append(os.environ["SPARK_HOME"] + "/python/lib/pyspark.zip")

In [61]:
from pyspark import SparkConf, SparkContext
from pyspark import SparkFiles
from pyspark import StorageLevel
from pyspark import AccumulatorParam

In [4]:
sconf = SparkConf()
sconf.setAppName("PySpark Tutorial")
sconf.setMaster("spark://snehasish-barmans-macbook.local:7077")

sc = SparkContext.getOrCreate(conf = sconf)

In [5]:
print sc
print sc.version


<pyspark.context.SparkContext object at 0x10343d450>
1.6.2
Spark does lazy evaluation. If we have a chain of transformations, Spark won't execute them untill an action is invoked.
A RDD has several levels of persistence (Memory, Disk, Memory-DISK, Serialized, ...). If we use a RDD for subsequent transformations, it is good to persist it to avoid expensive computations.

rdd
rdd.persist()
rdd.is_cached
rdd.trans1().action()

rdd.trans1().trans2().action()
rdd.unpersist()

No. of partitions = No. of output files for output actions.
No. of partitions = No. of executors. Spark will start & stop an executor to work on each partition of data. Starting & Stopping many executors has an overhead for small size of data with many partitions.
No. of partitions = Level of parallelism
Default partitioner = Hash partitioner ( evenly distribute all the elements of RDD across all the workers )

Transformations

An RDD can contain heterogenuous elements.
Ex. [1,2, "abc", (1,2), {4,5,6}]


In [35]:
sc.parallelize([1,2, "abc", (1,2), {4,5,6}]).collect()


Out[35]:
[1, 2, 'abc', (1, 2), {4, 5, 6}]

In [30]:
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 3)
rdd2 = sc.parallelize(xrange(10, 20), 3)

In [31]:
print rdd.glom().collect() # shows data grouped by partitions
print rdd2.glom().collect()


[[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]
[[10, 11, 12], [13, 14, 15], [16, 17, 18, 19]]

In [7]:
rdd.map(lambda x: x**2).collect() # map 1-to-1 transformation, operates on every element of rdd


Out[7]:
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

In [19]:
# functions must be self-contained, no states or access global variables
def trans1(x):
    return x**2

rdd.map(trans1).collect()


Out[19]:
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

In [20]:
rdd.filter(lambda x: x > 5).collect() # filter


Out[20]:
[6, 7, 8, 9, 10]

In [48]:
datasets = "../../../Machine_Learning/WPI_ML/datasets"
print os.path.exists(datasets)
#print os.path.realpath(datasets)


True

In [55]:
# default is hdfs filesystem, to access local files, use namespace -> file:///
textrdd = sc.textFile("file:///" + os.path.realpath(datasets) + "/Audio_Standardization_ Sentences.txt", 
                      use_unicode = False, minPartitions = 3)

In [56]:
textrdd.glom().collect()


Out[56]:
[['Oak is strong and also gives shade.',
  'Cats and dogs each hate the other.',
  'The pipe began to rust while new.',
  "Open the crate but don't break the glass."],
 ['Add the sum to the product of these three.',
  'Thieves who rob friends deserve jail.',
  'The ripe taste of cheese improves with age.'],
 ['Act on these orders with great speed.',
  'The hog crawled under the high fence.',
  'Move the vat over the hot fire.',
  'Is this the end.']]

In [62]:
textrdd.flatMap(lambda x: x.split(" ")).take(11) # 1-to-many transformation, puts in a global list


Out[62]:
['Oak',
 'is',
 'strong',
 'and',
 'also',
 'gives',
 'shade.',
 'Cats',
 'and',
 'dogs',
 'each']

In [63]:
def countWordsInPartition(iterator): 
    """
    @params:
        iterator: a partition of the rdd
    """
    count = 0
    for x in iterator:
        count += len(x.split(" "))
    yield count

textrdd.mapPartitions(countWordsInPartition).collect() # same as map but operates on each chunk/partition of the rdd


Out[63]:
[29, 23, 25]

In [65]:
rdd.sortBy(keyfunc = lambda x: x, ascending = False, numPartitions = 1).collect() # sorting
# numPartitions controls the level of parallelism


Out[65]:
[10, 9, 8, 7, 6, 5, 4, 3, 2, 1]

In [76]:
rdd.sample(withReplacement = False, fraction = 0.5, seed = 13).collect() # sampling


Out[76]:
[2, 3, 6, 7, 8, 10]

In [78]:
rdd.coalesce(1).glom().collect() # reduce no. of partitions by combining partitions from each worker, thereby minimizing network traffic


Out[78]:
[[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]]

In [80]:
rdd.repartition(2).glom().collect() 
# increases or decreases the no. of partitions, but at the cost of more network traffic, 
# because Spark has to shuffle the data across the workers.
# Use coalesce when intent to decrease the partitions.


Out[80]:
[[1, 2, 3, 4, 5, 6, 7, 8, 9], [10]]

In [81]:
rdd.repartition(5).glom().collect()


Out[81]:
[[10], [1, 2, 3, 4, 5, 6], [], [], [7, 8, 9]]

In [89]:
rdd.union(rdd2).collect() # combines two rdds -> A u B


Out[89]:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

In [90]:
rdd.intersection(rdd2).collect() # intersection -> A n B


Out[90]:
[10]

In [46]:
rdd.subtract(rdd2).collect() # subtract -> A - B, removes all the common elements between A and B from A and returns the rest


Out[46]:
[6, 1, 7, 8, 2, 9, 3, 4, 5]

In [96]:
rdd.union(rdd2).distinct().sortBy(ascending = True, keyfunc = lambda x:x).collect() # distinct


Out[96]:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

In [98]:
rdd.cartesian(rdd2).take(5) # all pair combinations; creates key-value RDD


Out[98]:
[(1, 10), (2, 10), (3, 10), (1, 11), (1, 12)]

In [46]:
rdd.zip(rdd2).collect() # zip (same as zip() in python); creates key-value RDD


Out[46]:
[(1, 10),
 (2, 11),
 (3, 12),
 (4, 13),
 (5, 14),
 (6, 15),
 (7, 16),
 (8, 17),
 (9, 18),
 (10, 19)]

In [38]:
rdd.keyBy(lambda x: x % 3).collect() # keyBy (converts a normal RDD into key-value RDD based on a criteria)
# result of the criteria becomes the 'key' and the element itself becomes the 'value'.


Out[38]:
[(1, 1),
 (2, 2),
 (0, 3),
 (1, 4),
 (2, 5),
 (0, 6),
 (1, 7),
 (2, 8),
 (0, 9),
 (1, 10)]

In [41]:
print rdd.groupBy(lambda x: x % 3).collect() # groupBy - same as 'keyBy' but all the values of a key are grouped into an iterable
print list(rdd.groupBy(lambda x: x % 3).collect()[0][1])


[(0, <pyspark.resultiterable.ResultIterable object at 0x105861350>), (1, <pyspark.resultiterable.ResultIterable object at 0x105861f90>), (2, <pyspark.resultiterable.ResultIterable object at 0x105861fd0>)]
[3, 6, 9]

In [7]:
file_name = "square_nums.py"
sc.addFile("./" + file_name) # All workers will download this file to their node

In [9]:
rdd.pipe("cat").collect() # pipe
# Use an external program for custom transformations.
# Reads data as string per partition from standard input and writes as string to standard output.


Out[9]:
[u'1', u'2', u'3', u'4', u'5', u'6', u'7', u'8', u'9', u'10']

In [8]:
rdd.pipe(SparkFiles.get(file_name)).glom().collect() # pipe


Out[8]:
[[u'1 4 9 '], [u'16 25 36 '], [u'49 64 81 100 ']]

Actions

  1. returns anything but rdd.
  2. may return nothing as well (foreach).
  3. like aggregtions (N to few).
  4. return value(s) to driver code.

In [10]:
rdd.reduce(lambda acc, x: acc+x) # reduce; operation must satisfy associative and communtative property


Out[10]:
55

In [11]:
rdd.count() # count


Out[11]:
10

In [12]:
rdd.take(4) # take (returns as a list; selects data from one partition, then moves to another partition as required to satisfy the limit)


Out[12]:
[1, 2, 3, 4]

In [13]:
rdd.takeSample(False, 5, seed = 13) # takeSample


Out[13]:
[10, 5, 4, 1, 9]

In [15]:
rdd.takeOrdered(4, key = lambda x:x) # takeOrdered


Out[15]:
[1, 2, 3, 4]

In [16]:
rdd.collect()


Out[16]:
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [21]:
rdd.first() # first


Out[21]:
1

In [26]:
rdd.top(4, key = int) # top ; returns top n items in descending order


Out[26]:
[10, 9, 8, 7]

In [27]:
rdd.countApprox(1000, 0.5) # countApprox


Out[27]:
10

In [35]:
rdd.countApproxDistinct(0.7) # number of distinct elements


Out[35]:
11L

In [32]:
def showValues(x):
    print "hello: " + str(x)
    
rdd.foreach(showValues) # foreach (Applies a function to every element of rdd)
# useful to communicate to external services, accumulate values in a queue, logging info, ...
# NOTE: verify results in stderr file of the working dir

In [37]:
def showValuesPartition(iterator):
    vals = []
    for item in iterator:
        vals.append("hello: " + str(item))
    print vals
        
rdd.foreachPartition(showValuesPartition) # foreachPartition (Applies a function per partition of rdd)

In [38]:
rdd.max()


Out[38]:
10

In [39]:
rdd.min()


Out[39]:
1

In [40]:
rdd.stats()


Out[40]:
(count: 10, mean: 5.5, stdev: 2.87228132327, max: 10.0, min: 1.0)

In [41]:
rdd.sum()


Out[41]:
55

In [42]:
rdd.mean()


Out[42]:
5.5

In [44]:
rdd.stdev()


Out[44]:
2.8722813232690143

In [51]:
# must be an absolute path to directory name; default is hdfs namespace
# creates a part-xxxx file for each partition
rdd.saveAsTextFile("file:///" + os.path.realpath("./textfiles")) # saveAsTextFile

In [52]:
# using compression
# compresses part-xxxx file of each partition
rdd.saveAsTextFile("file:///" + os.path.realpath("./textfileszip"), 
                   compressionCodecClass = "org.apache.hadoop.io.compress.GzipCodec") # saveAsTextFile

In [53]:
rdd.saveAsPickleFile("file:///" + os.path.realpath("./textfiles-pickled")) # saveAsPickleFile (faster reads, writes)

In [32]:
rdd.countByValue() # countByValue - returns as dict of value: count


Out[32]:
defaultdict(int, {1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 9: 1, 10: 1})
rdd.id()

In [39]:
rdd.isEmpty() # isEmpty


Out[39]:
False

In [43]:
print rdd.getStorageLevel() # getStorageLevel


Serialized 1x Replicated

In [44]:
rdd.getNumPartitions() # getNumPartitions


Out[44]:
3

In [54]:
rdd.persist(StorageLevel.DISK_ONLY)
print rdd.is_cached
print rdd.getStorageLevel()
rdd.unpersist()
print rdd.is_cached
print rdd.getStorageLevel()


True
Disk Serialized 1x Replicated
False
Serialized 1x Replicated

Key-Value RDD Transformations


In [6]:
krdd = sc.parallelize([("a", 1), ("a", 2), ("b", 1), ("b", 2), ("c", 1)], 2)
krdd2 = sc.parallelize([("a", 3), ("b", 3), ("d", 1)], 2)

In [7]:
print krdd.glom().collect()
print krdd2.glom().collect()


[[('a', 1), ('a', 2)], [('b', 1), ('b', 2), ('c', 1)]]
[[('a', 3)], [('b', 3), ('d', 1)]]

In [8]:
krdd.groupByKey().collect() # groupByKey


Out[8]:
[('a', <pyspark.resultiterable.ResultIterable at 0x105a383d0>),
 ('b', <pyspark.resultiterable.ResultIterable at 0x105a381d0>)]

In [9]:
list(krdd.groupByKey().collect()[0][1])


Out[9]:
[1, 2]

In [11]:
krdd.reduceByKey(lambda acc, x: acc + x, numPartitions = 1).collect() # reduceByKey
# does a groupByKey, followed by reduction
# operation must obey associative and commutative properties
# numPartitions controls the level of parallelism


Out[11]:
[('a', 3), ('b', 3)]

In [16]:
# http://www.learnbymarketing.com/618/pyspark-rdd-basics-examples/
# does a groupByKey, followed by custom reduce function that doesn't have to obey commutative and associative property

# define a resultset template (any data structure) with initial values
init_state_template = [0]

def mergeValuesWithinPartition(template, val):
    template[0] = template[0] + val
    return template

def mergePartitions(template1, template2):
    template = template1[0] + template2[0]
    return template
    

krdd.aggregateByKey(init_state_template, 
                    mergeValuesWithinPartition, 
                    mergePartitions).collect() # aggregateByKey


Out[16]:
[('a', [3]), ('b', [3])]

In [17]:
krdd.sortByKey(ascending = False, numPartitions = 1, keyfunc = lambda x: x).collect() # sortByKey (can also use sortBy)


Out[17]:
[('b', 1), ('b', 2), ('a', 1), ('a', 2)]

In [46]:
krdd.join(krdd2).collect() # join (inner-join in SQL; returns all-pair combinations)


Out[46]:
[('a', (1, 3)), ('a', (2, 3)), ('b', (1, 3)), ('b', (2, 3))]

In [47]:
krdd.leftOuterJoin(krdd2).collect() # leftOuterJoin (left join in SQL)


Out[47]:
[('a', (1, 3)), ('a', (2, 3)), ('c', (1, None)), ('b', (1, 3)), ('b', (2, 3))]

In [48]:
krdd.rightOuterJoin(krdd2).collect() # rightOuterJoin (right join in SQL)


Out[48]:
[('a', (1, 3)), ('a', (2, 3)), ('d', (None, 1)), ('b', (1, 3)), ('b', (2, 3))]

In [49]:
krdd.fullOuterJoin(krdd2).collect() # fullOuterJoin (full join in SQL)


Out[49]:
[('a', (1, 3)),
 ('a', (2, 3)),
 ('d', (None, 1)),
 ('c', (1, None)),
 ('b', (1, 3)),
 ('b', (2, 3))]

In [50]:
krdd.cogroup(krdd2).collect() # cogroup (returns iterator one for each rdd)


Out[50]:
[('a',
  (<pyspark.resultiterable.ResultIterable at 0x105a79e10>,
   <pyspark.resultiterable.ResultIterable at 0x105a53b10>)),
 ('d',
  (<pyspark.resultiterable.ResultIterable at 0x105a53b90>,
   <pyspark.resultiterable.ResultIterable at 0x105a53210>)),
 ('c',
  (<pyspark.resultiterable.ResultIterable at 0x105a53690>,
   <pyspark.resultiterable.ResultIterable at 0x105a53ed0>)),
 ('b',
  (<pyspark.resultiterable.ResultIterable at 0x105a53fd0>,
   <pyspark.resultiterable.ResultIterable at 0x105a532d0>))]

In [52]:
print list(krdd.cogroup(krdd2).collect()[0][1][0])
print list(krdd.cogroup(krdd2).collect()[0][1][1])


[1, 2]
[3]

In [53]:
print list(krdd.cogroup(krdd2).collect()[2][1][0])
print list(krdd.cogroup(krdd2).collect()[2][1][1])


[1]
[]

In [14]:
krdd.mapValues(lambda x: x**2).collect() # mapValues


Out[14]:
[('a', 1), ('a', 4), ('b', 1), ('b', 4), ('c', 1)]

In [19]:
krdd_val_iter = sc.parallelize([("a", [1,2,3]), ("b", [4,5,6])])
krdd_val_iter.flatMapValues(lambda x: [y**2 for y in x]).collect() # flatMapValues
# works in which value is an iterable object
# unpacks all elements in the iterable into their own key-value tuple/pair; puts them in a single list


Out[19]:
[('a', 1), ('a', 4), ('a', 9), ('b', 16), ('b', 25), ('b', 36)]

In [20]:
krdd_val_iter.mapValues(lambda x: [y**2 for y in x]).collect() # mapValues 1-to-1


Out[20]:
[('a', [1, 4, 9]), ('b', [16, 25, 36])]

In [26]:
krdd.keys().collect() # keys


Out[26]:
['a', 'a', 'b', 'b', 'c']

In [25]:
krdd.values().collect() # values


Out[25]:
[1, 2, 1, 2, 1]

Key-Value RDD Actions

All actions available for normal RDD are also available for Key-Value RDD.


In [8]:
krdd.collect()


Out[8]:
[('a', 1), ('a', 2), ('b', 1), ('b', 2), ('c', 1)]

In [9]:
krdd.count()


Out[9]:
5

In [10]:
krdd.take(3)


Out[10]:
[('a', 1), ('a', 2), ('b', 1)]

In [12]:
krdd_dup = sc.parallelize([("a", 1), ("a", 1)]) 
krdd_dup.distinct().collect()


Out[12]:
[('a', 1)]

In [27]:
krdd.countByKey() # countByKey - number of times a key appears in the k-v rdd


Out[27]:
defaultdict(int, {'a': 2, 'b': 2, 'c': 1})

In [33]:
krdd.lookup("a") # lookup


Out[33]:
[1, 2]

In [34]:
krdd.toDebugString() # toDebugString (identifies recursive dependencies of this rdd for debugging purposes)


Out[34]:
'(2) ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423 []'

In [45]:
krdd.collectAsMap() # collectAsMap -> return key-value RDD as a dictionary


Out[45]:
{'a': 2, 'b': 2, 'c': 1}

Performance

Key: Reduce the number of data shuffles across the cluster.

Accumulator : It is a shared variable that can store metadata about the data, gather statistics about the data that can be used for logging, debugging, tracking or other purposes.

  1. It is executed only when an action is invoked on the RDD.
  2. It doesn't reset to initial value automatically after a transformation.

In [60]:
# default accumulator accumulates only numeric (int and float) types; only does 'add' operation (commutative and associative)
accum = sc.accumulator(0, accum_param = None)

def squareValues(x):
    global accum
    #accum += 1
    accum.add(1)
    return x**2
    
print rdd.map(squareValues).collect()
print "No. of elements: %d" % accum.value


[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
No. of elements: 10

In [ ]:
# custom accumulator to support any types
class CustomAccumulator(AccumulatorParam):
    
    def zero(self, initialValue):
        template = set()
        template.add(initialValue)
        return template
    
    def addInPlace(self, template1, template2):
        return template1.union(template2)
    
accum = sc.accumulator(None, accum_param = CustomAccumulator())

def squareValues(x):
    global accum
    accum += x
    return x**2
    
print rdd.map(squareValues).collect()
print "No. of elements: %d" % accum.value

Broadcast variable: Another shared variable useful to disseminate small amounts of data to every executor.


In [72]:
bb = sc.broadcast({"a": 10, "b": 15})
print bb.value
bb.unpersist() # deletes cached copies from the executors


{'a': 10, 'b': 15}

In [73]:
bb.value


Out[73]:
{'a': 10, 'b': 15}

In [ ]: