SSH into CloudLab.
$ ssh clnode218.clemson.cloudlab.us
From inside the terminal, open Spark's interactive shell
$ spark-shell --master yarn --driver-memory 1G --executor-memory 10G --num-executors 10 --verbose --conf "spark.port.maxRetries=40"
The spark shell's prompt is scala>
scala> sc
val textFile = sc.textFile("hdfs:///repository/gutenberg-shakespeare.txt")
What type is textFile
?
scala> print (textFile)
scala> printf (textFile)
Storage Level:
scala> textFile.getStorageLevel.useDisk
scala> textFile.getStorageLevel.useMemory
scala> textFile.getStorageLevel.useOffHeap
scala> textFile.getStorageLevel.deserialized
scala> textFile.getStorageLevel.replication
Set cache to Spark:
scala> textFile.cache
scala> textFile.getStorageLevel
Data operations in Spark are categorized into two groups, transformation and action.
"All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program." -- Spark Documentation
Transformations:
Actions:
How to run wordcount
scala> val wordcount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey{(x, y) => x + y}
In [15]:
wordcount
Out[15]:
Save variable to HDFS file:
scala> wordcount.saveAsTextFile("output-wordcount-01")
View the output file by calling system command from inside Scala shell
scala> import sys.process._
scala> "hdfs dfs -cat output-wordcount-01/part-00000" #| "head -n 20" !
Step-by-step actions:
View the top lines of gutenberg-shakespeare.txt
:
scala> "hdfs dfs -cat /repository/gutenberg-shakespeare.txt" #| "head -n 100" !
Step 1:
scala> val wc_step_01 = textFile.flatMap(line => line.split(" "))
scala> wc_step_01
scala> wc_step_01.take(20)
Step 2:
scala> val wc_step_02 = wc_step_01.map(word => (word, 1))
scala> wc_step_02.take(20)
Step 3:
scala> val wc_step_03 = wc_step_02.reduceByKey((x, y) => x + y)
scala> wc_step_03.saveAsTextFile("output-wordcount-02")
scala> "hdfs dfs -ls output-wordcount-01/"!
scala> "hdfs dfs -ls output-wordcount-02/"!
In [ ]:
To stop the Spark job, call scala> :quit