SSH into CloudLab.
$ ssh clnode218.clemson.cloudlab.us
From inside the terminal, open Spark's interactive shell
$ spark-shell --master yarn --driver-memory 1G --executor-memory 10G --num-executors 10 --verbose --conf "spark.port.maxRetries=40" --packages com.databricks:spark-csv_2.11:1.5.0
View entry points inside the shell
scala> sc
scala> spark.sqlContext
Original data from Burreau of Transportation Statistics that provides air carrier ontime performance data from 1987 to 2008. Processed data comes from American Statistics Association.
scala> val airlines = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("hdfs:///repository/airlines/data").cache()
scala> airlines.printSchema
Convert RDD to SQL table:
scala> airlines.registerTempTable("airlines")
List all unique carriers
scala> val uniqueAirline = spark.sqlContext.sql("SELECT DISTINCT UniqueCarrier FROM airlines")
scala> uniqueAirline.show
How many flights per unique carriers
scala>val carrierFlightCount = spark.sqlContext.sql("SELECT UniqueCarrier, COUNT(UniqueCarrier) AS FlightCount FROM airlines GROUP BY UniqueCarrier")
scala> spark.time(carrierFlightCount.show())
Display carriers' full names:
scala> val carriers = spark.sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema","true").load("hdfs:///repository/airlines/metadata/carriers.csv").cache
scala> carriers.registerTempTable("carriers")
scala> val carrierFlightCountFull = spark.sqlContext.sql("SELECT c.Description, a.UniqueCarrier, COUNT(a.UniqueCarrier) AS FlightCount FROM airlines AS a INNER JOIN carriers AS c ON c.Code = a.UNiqueCarrier GROUP BY a.UniqueCarrier, c.Description ORDER BY a.UniqueCarrier")
scala> spark.time(carrierFlightCountFull.show)
What is the averaged departure delay time for each airline?
scala> val avgDepDelay = spark.sqlContext.sql("SELECT FIRST(c.Description), FIRST(a.UniqueCarrier), AVG(a.DepDelay) AS AvgDepDelay FROM airlines AS a INNER JOIN carriers AS c ON c.Code = a.UniqueCarrier GROUP BY a.UniqueCarrier ORDER BY a.UniqueCarrier")
scala> spark.time(avgDepDelay.show)