Introduction to Spark In-memmory Computing via Spark Scala

SSH into CloudLab.

$ ssh clnode218.clemson.cloudlab.us

From inside the terminal, open Spark's interactive shell

$ spark-shell --master yarn --driver-memory 1G --executor-memory 10G --num-executors 10 --verbose --conf "spark.port.maxRetries=40" --packages com.databricks:spark-csv_2.11:1.5.0

View entry points inside the shell

scala> sc
scala> spark.sqlContext

Air Traffic Data

Original data from Burreau of Transportation Statistics that provides air carrier ontime performance data from 1987 to 2008. Processed data comes from American Statistics Association.

  • More than 120 millions entries
  • More than 12GB in size
scala> val airlines = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("hdfs:///repository/airlines/data").cache()

Excercises:

  • Time how long it takes to count airlines
  • Time and count again
scala> airlines.printSchema

Convert RDD to SQL table:

scala> airlines.registerTempTable("airlines")

List all unique carriers

scala> val uniqueAirline = spark.sqlContext.sql("SELECT DISTINCT UniqueCarrier FROM airlines")
scala> uniqueAirline.show

How many flights per unique carriers

scala>val carrierFlightCount = spark.sqlContext.sql("SELECT UniqueCarrier, COUNT(UniqueCarrier) AS FlightCount FROM airlines GROUP BY UniqueCarrier")
scala> spark.time(carrierFlightCount.show())

Display carriers' full names:

scala> val carriers = spark.sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema","true").load("hdfs:///repository/airlines/metadata/carriers.csv").cache
scala> carriers.registerTempTable("carriers")
scala> val carrierFlightCountFull = spark.sqlContext.sql("SELECT c.Description, a.UniqueCarrier, COUNT(a.UniqueCarrier) AS FlightCount FROM airlines AS a INNER JOIN carriers AS c ON c.Code = a.UNiqueCarrier GROUP BY a.UniqueCarrier, c.Description ORDER BY a.UniqueCarrier")
scala> spark.time(carrierFlightCountFull.show)

What is the averaged departure delay time for each airline?

scala> val avgDepDelay = spark.sqlContext.sql("SELECT FIRST(c.Description), FIRST(a.UniqueCarrier), AVG(a.DepDelay) AS AvgDepDelay FROM airlines AS a INNER JOIN carriers AS c ON c.Code = a.UniqueCarrier GROUP BY a.UniqueCarrier ORDER BY a.UniqueCarrier")
scala> spark.time(avgDepDelay.show)