Introduction to Spark In-memmory Computing via Python PySpark

  • Spark is an implementation of the MapReduce programming paradigm that operates on in-memory data and allows data reuses across multiple computations.
  • Performance of Spark is significantly better than its predecessor, Hadoop MapReduce.
  • Spark's primary data abstraction is Resilient Distributed Dataset (RDD):
    • Read-only, partitioned collection of records
    • Created (aka written) through deterministic operations on data:
      • Loading from stable storage
      • Transforming from other RDDs
      • Generating through coarse-grained operations such as map, join, filter ...
    • Do not need to be materialized at all time and are recoverable via data lineage


In [1]:
!module list


Currently Loaded Modulefiles:
  1) anaconda3/4.2.0   3) zeromq/4.1.5
  2) matlab/2015a      4) hdp/0.1

1. Getting Started

Spark stores data in memory. This memory space is represented by variable sc (SparkContext).


In [2]:
!cypress-kinit
!klist


Ticket cache: FILE:/home/lngo/.krb5cc
Default principal: lngo@PALMETTO.CLEMSON.EDU

Valid starting       Expires              Service principal
11/01/2017 12:24:41  11/08/2017 11:24:41  krbtgt/PALMETTO.CLEMSON.EDU@PALMETTO.CLEMSON.EDU

In [3]:
import sys
import os

sys.path.insert(0, '/usr/hdp/current/spark2-client/python')
sys.path.insert(0, '/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip')

os.environ['SPARK_HOME'] = '/usr/hdp/current/spark2-client/'
os.environ['SPARK_CONF_DIR'] = '/etc/hadoop/synced_conf/spark2/'
os.environ['PYSPARK_PYTHON'] = '/software/anaconda3/4.2.0/bin/python'

import pyspark
conf = pyspark.SparkConf()
conf.setMaster("yarn")
conf.set("spark.driver.memory","4g")
conf.set("spark.executor.memory","60g")
conf.set("spark.num.executors","3")
conf.set("spark.executor.cores","12")

sc = pyspark.SparkContext(conf=conf)

In [4]:
sc


Out[4]:
<pyspark.context.SparkContext at 0x2b8869b8b3c8>

In [5]:
textFile = sc.textFile("/repository/gutenberg-shakespeare.txt")

In [6]:
print (textFile)


/repository/gutenberg-shakespeare.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

2. What does Spark do with my data?

Storage Level:

  • Does RDD use disk?
  • Does RDD use memory?
  • Does RDD use off-heap memory?
  • Should an RDD be serialized (while persisting)?
  • How many replicas (default: 1) to use (can only be less than 40)?

In [7]:
textFile.getStorageLevel()


Out[7]:
StorageLevel(False, False, False, False, 1)

In [8]:
textFile.getNumPartitions()


Out[8]:
2

In [9]:
textFile.cache()


Out[9]:
/repository/gutenberg-shakespeare.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [10]:
textFile.getStorageLevel()


Out[10]:
StorageLevel(False, True, False, False, 1)
  • By default, each transformed RDD may be recomputed each time you run an action on it.
  • It is also possible to persist RDD in memory using persist() or cache()
    • persist() allows you to specify level of storage for RDD
    • cache() only persists RDD in memory
    • To retire RDD from memory, unpersist() is called

3. WordCount

Data operations in Spark are categorized into two groups, transformation and action.

  • A transformation creates new dataset from existing data. Examples of transformation include map, filter, reduceByKey, and sort.
  • An action returns a value to the driver program (aka memory space of this notebook) after running a computation on the data set. Examples of action include count, collect, reduce, and save.

"All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program." -- Spark Documentation

RDD Operations in Spark

Transformations:

  • map(f: T -> U) : RDD[T] -> RDD[U]
  • filter(f: T -> Bool) : RDD[T] -> RDD[T]
  • flatMap(f: T -> Seq[U]) : RDD[T] -> RDD[U]
  • sample(fraction: Float) : RDD[T] -> RDD[T] (deterministic sampling)
  • groupByKey() : RDD[(K,V)] -> RDD[(K, Seq[V])]
  • reduceByKey(f: (V,V) -> V) : RDD[(K,V)] -> RDD[(K,V)]
  • union() : (RDD[T], RDD[T]) -> RDD[T]
  • join() : (RDD[(K,V)], RDD[(K,W)]) -> RDD[(K,(V,W))]
  • cogroup() : (RDD[(K,V)], RDD[(K,W)] -> RDD[(K, (Seq[V],Seq[W]))]
  • crossProduct() : (RDD[T], RDD[U]) -> RDD[(T,U)]
  • mapValues(f: V -> W) : RDD[(K,V)] -> RDD[(K,W)] (preserves partitioning)
  • sort(c: Comparator[K]) : RDD[(K,V)] -> RDD[(K,V)]
  • partitionBy(p: Partitioner[K]) : RDD[(K,V)] -> RDD[(K,V)]

Actions:

  • count() : RDD[T] -> Long
  • collect() : RDD[T] -> Seq[T]
  • reduce(f: (T,T) -> T) : RDD[T] -> T
  • lookup(k : K) : RDD[(K,V)] -> Seq[V] (on hash/range partitionied RDDs)
  • save(path: String) : Outputs RDD to a storage system

In [11]:
textFile = sc.textFile("/repository/gutenberg-shakespeare.txt")

In [12]:
textFile


Out[12]:
/repository/gutenberg-shakespeare.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [13]:
%%time
textFile.count()


CPU times: user 13.2 ms, sys: 5.11 ms, total: 18.3 ms
Wall time: 3.55 s
Out[13]:
124213

In [14]:
wordcount = textFile.flatMap(lambda line: line.split(" ")) \
            .map(lambda word: (word, 1)) \
            .reduceByKey(lambda a, b: a + b)

In [15]:
wordcount


Out[15]:
PythonRDD[9] at RDD at PythonRDD.scala:48

In [16]:
!hdfs dfs -rm -r intro-to-spark
!hdfs dfs -mkdir intro-to-spark


17/11/01 12:44:02 INFO fs.TrashPolicyDefault: Moved: 'hdfs://dsci/user/lngo/intro-to-spark' to trash at: hdfs://dsci/user/lngo/.Trash/Current/user/lngo/intro-to-spark

In [17]:
wordcount.saveAsTextFile("intro-to-spark/output-wordcount-01")

In [18]:
!hdfs dfs -cat intro-to-spark/output-wordcount-01/part-00000 \
    2>/dev/null | head -n 20


('', 516839)
('Quince', 1)
('LIBRARY,', 218)
('Just', 10)
('enrooted', 1)
('divers', 20)
('Doubtless', 2)
('undistinguishable,', 1)
('Rheims,', 1)
('Freedom!', 1)
('incorporate.', 1)
('bawd!', 3)
('Sir-I', 1)
('withering', 2)
('Mopsa,', 1)
('[BEROWNE', 3)
('forgetfulness?', 1)
('Tranio?', 1)
('Wound', 3)
('twice,', 2)

Step-by-step actions:


In [19]:
!hdfs dfs -cat /repository/gutenberg-shakespeare.txt \
    2>/dev/null | head -n 100


1609

THE SONNETS

by William Shakespeare



                     1
  From fairest creatures we desire increase,
  That thereby beauty's rose might never die,
  But as the riper should by time decease,
  His tender heir might bear his memory:
  But thou contracted to thine own bright eyes,
  Feed'st thy light's flame with self-substantial fuel,
  Making a famine where abundance lies,
  Thy self thy foe, to thy sweet self too cruel:
  Thou that art now the world's fresh ornament,
  And only herald to the gaudy spring,
  Within thine own bud buriest thy content,
  And tender churl mak'st waste in niggarding:
    Pity the world, or else this glutton be,
    To eat the world's due, by the grave and thee.


                     2
  When forty winters shall besiege thy brow,
  And dig deep trenches in thy beauty's field,
  Thy youth's proud livery so gazed on now,
  Will be a tattered weed of small worth held:  
  Then being asked, where all thy beauty lies,
  Where all the treasure of thy lusty days;
  To say within thine own deep sunken eyes,
  Were an all-eating shame, and thriftless praise.
  How much more praise deserved thy beauty's use,
  If thou couldst answer 'This fair child of mine
  Shall sum my count, and make my old excuse'
  Proving his beauty by succession thine.
    This were to be new made when thou art old,
    And see thy blood warm when thou feel'st it cold.


                     3
  Look in thy glass and tell the face thou viewest,
  Now is the time that face should form another,
  Whose fresh repair if now thou not renewest,
  Thou dost beguile the world, unbless some mother.
  For where is she so fair whose uneared womb
  Disdains the tillage of thy husbandry?
  Or who is he so fond will be the tomb,
  Of his self-love to stop posterity?  
  Thou art thy mother's glass and she in thee
  Calls back the lovely April of her prime,
  So thou through windows of thine age shalt see,
  Despite of wrinkles this thy golden time.
    But if thou live remembered not to be,
    Die single and thine image dies with thee.


                     4
  Unthrifty loveliness why dost thou spend,
  Upon thy self thy beauty's legacy?
  Nature's bequest gives nothing but doth lend,
  And being frank she lends to those are free:
  Then beauteous niggard why dost thou abuse,
  The bounteous largess given thee to give?
  Profitless usurer why dost thou use
  So great a sum of sums yet canst not live?
  For having traffic with thy self alone,
  Thou of thy self thy sweet self dost deceive,
  Then how when nature calls thee to be gone,
  What acceptable audit canst thou leave?  
    Thy unused beauty must be tombed with thee,
    Which used lives th' executor to be.


                     5
  Those hours that with gentle work did frame
  The lovely gaze where every eye doth dwell
  Will play the tyrants to the very same,
  And that unfair which fairly doth excel:
  For never-resting time leads summer on
  To hideous winter and confounds him there,
  Sap checked with frost and lusty leaves quite gone,
  Beauty o'er-snowed and bareness every where:
  Then were not summer's distillation left
  A liquid prisoner pent in walls of glass,
  Beauty's effect with beauty were bereft,
  Nor it nor no remembrance what it was.
    But flowers distilled though they with winter meet,
    Leese but their show, their substance still lives sweet.


                     6  
  Then let not winter's ragged hand deface,
  In thee thy summer ere thou be distilled:
  Make sweet some vial; treasure thou some place,
  With beauty's treasure ere it be self-killed:
  That use is not forbidden usury,
  Which happies those that pay the willing loan;

In [20]:
wordcount_step_01 = textFile.flatMap(lambda line: line.split(" "))

In [21]:
wordcount_step_01


Out[21]:
PythonRDD[13] at RDD at PythonRDD.scala:48

In [22]:
wordcount_step_01.take(20)


Out[22]:
['1609',
 '',
 'THE',
 'SONNETS',
 '',
 'by',
 'William',
 'Shakespeare',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '',
 '']

In [23]:
wordcount_step_02 = wordcount_step_01.map(lambda word: (word, 1))

In [24]:
wordcount_step_02.take(20)


Out[24]:
[('1609', 1),
 ('', 1),
 ('THE', 1),
 ('SONNETS', 1),
 ('', 1),
 ('by', 1),
 ('William', 1),
 ('Shakespeare', 1),
 ('', 1),
 ('', 1),
 ('', 1),
 ('', 1),
 ('', 1),
 ('', 1),
 ('', 1),
 ('', 1),
 ('', 1),
 ('', 1),
 ('', 1),
 ('', 1)]

In [25]:
wordcount_step_03 = wordcount_step_02.reduceByKey(lambda a, b: a + b)

In [26]:
wordcount_step_03.take(20)


Out[26]:
[('', 516839),
 ('Quince', 1),
 ('LIBRARY,', 218),
 ('Just', 10),
 ('enrooted', 1),
 ('divers', 20),
 ('Doubtless', 2),
 ('undistinguishable,', 1),
 ('Rheims,', 1),
 ('Freedom!', 1),
 ('incorporate.', 1),
 ('bawd!', 3),
 ('Sir-I', 1),
 ('withering', 2),
 ('Mopsa,', 1),
 ('[BEROWNE', 3),
 ('forgetfulness?', 1),
 ('Tranio?', 1),
 ('Wound', 3),
 ('twice,', 2)]

Challenge

  • Augment the mapping process of WordCount with a function to filter out punctuations and capitalization from the unique words

In [ ]:

To stop the Spark job, call sc.stop()


In [27]:
sc.stop()