Introduction to Spark In-memmory Computing via Python PySpark


In [1]:
import sys
import os

sys.path.insert(0, '/usr/hdp/2.6.0.3-8/spark2/python')
sys.path.insert(0, '/usr/hdp/2.6.0.3-8/spark2/python/lib/py4j-0.10.4-src.zip')

os.environ['SPARK_HOME'] = '/usr/hdp/2.6.0.3-8/spark2/'
os.environ['SPARK_CONF_DIR'] = '/etc/hadoop/synced_conf/spark2/'
os.environ['PYSPARK_PYTHON'] = '/software/anaconda3/4.2.0/bin/python'

import pyspark
conf = pyspark.SparkConf()
conf.setMaster("yarn")
conf.set("spark.driver.memory","4g")
conf.set("spark.executor.memory","60g")
conf.set("spark.num.executors","3")
conf.set("spark.executor.cores","12")

sc = pyspark.SparkContext(conf=conf)

Airlines Data

Spark SQL

  • Spark module for structured data processing
  • provide more information about the structure of both the data and the computation being performed for additional optimization
  • execute SQL queries written using either a basic SQL syntax or HiveQL

DataFrame

  • distributed collection of data organized into named columns
  • conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood
  • can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

In [3]:
sqlContext = pyspark.SQLContext(sc)
sqlContext


Out[3]:
<pyspark.sql.context.SQLContext at 0x2afc716835f8>

In [4]:
airlines = sqlContext.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .load("/repository/airlines/data/")\
    .cache()

In [5]:
%%time
airlines.count()


CPU times: user 14.4 ms, sys: 4.9 ms, total: 19.3 ms
Wall time: 1min 48s
Out[5]:
123534969

In [6]:
%%time
airlines.count()


CPU times: user 4.42 ms, sys: 2.14 ms, total: 6.56 ms
Wall time: 19.7 s
Out[6]:
123534969

In [7]:
airlines.printSchema()


root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- CarrierDelay: string (nullable = true)
 |-- WeatherDelay: string (nullable = true)
 |-- NASDelay: string (nullable = true)
 |-- SecurityDelay: string (nullable = true)
 |-- LateAircraftDelay: string (nullable = true)

You can interact with a DataFrame via SQLContext using SQL statements by registerting the DataFrame as a table


In [8]:
airlines.registerTempTable("airlines")

How many unique airlines are there?


In [9]:
uniqueAirline = sqlContext.sql("SELECT DISTINCT UniqueCarrier \
                                FROM airlines")
uniqueAirline.show()


+-------------+
|UniqueCarrier|
+-------------+
|           UA|
|           EA|
|           PI|
|           PS|
|           AA|
|           NW|
|           EV|
|           B6|
|           HP|
|           TW|
|           DL|
|           OO|
|           F9|
|           YV|
|           TZ|
|           US|
|           AQ|
|           MQ|
|           OH|
|           HA|
+-------------+
only showing top 20 rows

Calculate how many flights completed by each carrier over time


In [10]:
%%time
carrierFlightCount = sqlContext.sql("SELECT UniqueCarrier, COUNT(UniqueCarrier) AS FlightCount \
                                    FROM airlines GROUP BY UniqueCarrier")
carrierFlightCount.show()


+-------------+-----------+
|UniqueCarrier|FlightCount|
+-------------+-----------+
|           EA|     919785|
|           UA|   13299817|
|           PI|     873957|
|           PS|      83617|
|           AA|   14984647|
|           NW|   10292627|
|           EV|    1697172|
|           B6|     811341|
|           HP|    3636682|
|           TW|    3757747|
|           DL|   16547870|
|           OO|    3090853|
|           F9|     336958|
|           YV|     854056|
|           TZ|     208420|
|           US|   14075530|
|           AQ|     154381|
|           MQ|    3954895|
|           OH|    1464176|
|           HA|     274265|
+-------------+-----------+
only showing top 20 rows

CPU times: user 11.5 ms, sys: 881 µs, total: 12.4 ms
Wall time: 43.8 s

How do you display full carrier names?


In [12]:
carriers = sqlContext.read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .load("/repository/airlines/metadata/carriers.csv")\
    .cache()
carriers.registerTempTable("carriers")

In [13]:
carriers.printSchema()


root
 |-- Code: string (nullable = true)
 |-- Description: string (nullable = true)


In [14]:
%%time
carrierFlightCountFullName = sqlContext.sql("SELECT c.Description, a.UniqueCarrier, COUNT(a.UniqueCarrier) AS FlightCount \
                                    FROM airlines AS a \
                                    INNER JOIN carriers AS c \
                                    ON c.Code = a.UniqueCarrier \
                                    GROUP BY a.UniqueCarrier, c.Description \
                                    ORDER BY a.UniqueCarrier")
carrierFlightCountFullName.show()


+--------------------+-------------+-----------+
|         Description|UniqueCarrier|FlightCount|
+--------------------+-------------+-----------+
|Pinnacle Airlines...|           9E|     521059|
|American Airlines...|           AA|   14984647|
| Aloha Airlines Inc.|           AQ|     154381|
|Alaska Airlines Inc.|           AS|    2878021|
|     JetBlue Airways|           B6|     811341|
|Continental Air L...|           CO|    8145788|
|    Independence Air|           DH|     693047|
|Delta Air Lines Inc.|           DL|   16547870|
|Eastern Air Lines...|           EA|     919785|
|Atlantic Southeas...|           EV|    1697172|
|Frontier Airlines...|           F9|     336958|
|AirTran Airways C...|           FL|    1265138|
|Hawaiian Airlines...|           HA|     274265|
|America West Airl...|           HP|    3636682|
|Midway Airlines I...|       ML (1)|      70622|
|American Eagle Ai...|           MQ|    3954895|
|Northwest Airline...|           NW|   10292627|
|         Comair Inc.|           OH|    1464176|
|Skywest Airlines ...|           OO|    3090853|
|Pan American Worl...|       PA (1)|     316167|
+--------------------+-------------+-----------+
only showing top 20 rows

CPU times: user 13.2 ms, sys: 445 µs, total: 13.7 ms
Wall time: 1min

What is the averaged departure delay time for each airline?


In [15]:
%%time
avgDepartureDelay = sqlContext.sql("SELECT FIRST(c.Description), FIRST(a.UniqueCarrier), AVG(a.DepDelay) AS AvgDepDelay \
                                    FROM airlines AS a \
                                    INNER JOIN carriers AS c \
                                    ON c.Code = a.UniqueCarrier \
                                    GROUP BY a.UniqueCarrier \
                                    ORDER BY a.UniqueCarrier")
avgDepartureDelay.show()


+-------------------------+---------------------------+-------------------+
|first(Description, false)|first(UniqueCarrier, false)|        AvgDepDelay|
+-------------------------+---------------------------+-------------------+
|     Pinnacle Airlines...|                         9E| 7.9279144892173035|
|     American Airlines...|                         AA|  7.862321254420546|
|      Aloha Airlines Inc.|                         AQ| 1.5993176899118409|
|     Alaska Airlines Inc.|                         AS|  8.297235193754096|
|          JetBlue Airways|                         B6| 11.262714178314551|
|     Continental Air L...|                         CO|  7.695967155526857|
|         Independence Air|                         DH|  9.612639389688926|
|     Delta Air Lines Inc.|                         DL|  7.593716274369933|
|     Eastern Air Lines...|                         EA|  8.674050565435543|
|     Atlantic Southeas...|                         EV| 13.483736343326541|
|     Frontier Airlines...|                         F9|  6.096932123645889|
|     AirTran Airways C...|                         FL|  10.27801937883596|
|     Hawaiian Airlines...|                         HA|-0.5165400834606493|
|     America West Airl...|                         HP|  8.107790266585615|
|     Midway Airlines I...|                     ML (1)|  6.229676674364896|
|     American Eagle Ai...|                         MQ|   9.22369994420141|
|     Northwest Airline...|                         NW|  6.007973703240084|
|              Comair Inc.|                         OH|  9.310795113723774|
|     Skywest Airlines ...|                         OO|  7.193778047766392|
|     Pan American Worl...|                     PA (1)|  5.532442442890681|
+-------------------------+---------------------------+-------------------+
only showing top 20 rows

CPU times: user 11.2 ms, sys: 1.32 ms, total: 12.5 ms
Wall time: 47.7 s

In [16]:
airlines.unpersist()


Out[16]:
DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: string, CRSDepTime: int, ArrTime: string, CRSArrTime: int, UniqueCarrier: string, FlightNum: int, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: string, TaxiIn: string, TaxiOut: string, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string]

In [7]:
sc.stop()

In [ ]: