Introduction to Spark In-memmory Computing via Spark Scala

  • Spark is an implementation of the MapReduce programming paradigm that operates on in-memory data and allows data reuses across multiple computations.
  • Performance of Spark is significantly better than its predecessor, Hadoop MapReduce.
  • Spark's primary data abstraction is Resilient Distributed Dataset (RDD):
    • Read-only, partitioned collection of records
    • Created (aka written) through deterministic operations on data:
      • Loading from stable storage
      • Transforming from other RDDs
      • Generating through coarse-grained operations such as map, join, filter ...
    • Do not need to be materialized at all time and are recoverable via data lineage

SSH into CloudLab.

$ ssh clnode218.clemson.cloudlab.us

From inside the terminal, open Spark's interactive shell

$ spark-shell --master yarn --driver-memory 1G --executor-memory 10G --num-executors 10 --verbose --conf "spark.port.maxRetries=40"

1. Getting Started

Spark stores data in memory. This memory space is represented by variable sc (SparkContext).

The spark shell's prompt is scala>

scala> sc
val textFile = sc.textFile("hdfs:///repository/gutenberg-shakespeare.txt")

What type is textFile?

scala> print (textFile)
scala> printf (textFile)

2. What does Spark do with my data?

Storage Level:

  • Does RDD use disk?
  • Does RDD use memory?
  • Does RDD use off-heap memory?
  • Should an RDD be serialized (while persisting)?
  • How many replicas (default: 1) to use?
scala> textFile.getStorageLevel.useDisk
scala> textFile.getStorageLevel.useMemory
scala> textFile.getStorageLevel.useOffHeap
scala> textFile.getStorageLevel.deserialized
scala> textFile.getStorageLevel.replication

Set cache to Spark:

scala> textFile.cache
scala> textFile.getStorageLevel
  • By default, each transformed RDD may be recomputed each time you run an action on it.
  • It is also possible to persist RDD in memory using persist() or cache()
    • persist() allows you to specify level of storage for RDD
    • cache() only persists RDD in memory
    • To retire RDD from memory, unpersist() is called

3. WordCount

Data operations in Spark are categorized into two groups, transformation and action.

  • A transformation creates new dataset from existing data. Examples of transformation include map, filter, reduceByKey, and sort.
  • An action returns a value to the driver program (aka memory space of this notebook) after running a computation on the data set. Examples of action include count, collect, reduce, and save.

"All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program." -- Spark Documentation

RDD Operations in Spark

Transformations:

  • map(f: T -> U) : RDD[T] -> RDD[U]
  • filter(f: T -> Bool) : RDD[T] -> RDD[T]
  • flatMap(f: T -> Seq[U]) : RDD[T] -> RDD[U]
  • sample(fraction: Float) : RDD[T] -> RDD[T] (deterministic sampling)
  • groupByKey() : RDD[(K,V)] -> RDD[(K, Seq[V])]
  • reduceByKey(f: (V,V) -> V) : RDD[(K,V)] -> RDD[(K,V)]
  • union() : (RDD[T], RDD[T]) -> RDD[T]
  • join() : (RDD[(K,V)], RDD[(K,W)]) -> RDD[(K,(V,W))]
  • cogroup() : (RDD[(K,V)], RDD[(K,W)] -> RDD[(K, (Seq[V],Seq[W]))]
  • crossProduct() : (RDD[T], RDD[U]) -> RDD[(T,U)]
  • mapValues(f: V -> W) : RDD[(K,V)] -> RDD[(K,W)] (preserves partitioning)
  • sort(c: Comparator[K]) : RDD[(K,V)] -> RDD[(K,V)]
  • partitionBy(p: Partitioner[K]) : RDD[(K,V)] -> RDD[(K,V)]

Actions:

  • count() : RDD[T] -> Long
  • collect() : RDD[T] -> Seq[T]
  • reduce(f: (T,T) -> T) : RDD[T] -> T
  • lookup(k : K) : RDD[(K,V)] -> Seq[V] (on hash/range partitionied RDDs)
  • save(path: String) : Outputs RDD to a storage system

Cache versus Cached

Count how many lines there are in the file

scala> spark.time(textFile.count)

Recount

scala> spark.time(textFile.count)

How to run wordcount

scala> val wordcount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey{(x, y) => x + y}

In [15]:
wordcount


Out[15]:
PythonRDD[9] at RDD at PythonRDD.scala:48

Save variable to HDFS file:

scala> wordcount.saveAsTextFile("output-wordcount-01")

View the output file by calling system command from inside Scala shell

scala> import sys.process._
scala> "hdfs dfs -cat output-wordcount-01/part-00000" #| "head -n 20" !

Step-by-step actions:

View the top lines of gutenberg-shakespeare.txt:

scala> "hdfs dfs -cat /repository/gutenberg-shakespeare.txt" #| "head -n 100" !

Step 1:

scala> val wc_step_01 = textFile.flatMap(line => line.split(" "))
scala> wc_step_01
scala> wc_step_01.take(20)

Step 2:

scala> val wc_step_02 = wc_step_01.map(word => (word, 1))
scala> wc_step_02.take(20)

Step 3:

scala> val wc_step_03 = wc_step_02.reduceByKey((x, y) => x + y)
scala> wc_step_03.saveAsTextFile("output-wordcount-02")
scala> "hdfs dfs -ls output-wordcount-01/"!
scala> "hdfs dfs -ls output-wordcount-02/"!

Challenge

  • Augment the mapping process of WordCount with a function to filter out punctuations and capitalization from the unique words

In [ ]:

To stop the Spark job, call scala> :quit