In [1]:
import sys
import os
sys.path.insert(0, '/usr/hdp/2.6.0.3-8/spark2/python')
sys.path.insert(0, '/usr/hdp/2.6.0.3-8/spark2/python/lib/py4j-0.10.4-src.zip')
os.environ['SPARK_HOME'] = '/usr/hdp/2.6.0.3-8/spark2/'
os.environ['SPARK_CONF_DIR'] = '/etc/hadoop/synced_conf/spark2/'
os.environ['PYSPARK_PYTHON'] = '/software/anaconda3/4.2.0/bin/python'
import pyspark
conf = pyspark.SparkConf()
conf.setMaster("yarn")
conf.set("spark.driver.memory","4g")
conf.set("spark.executor.memory","60g")
conf.set("spark.num.executors","3")
conf.set("spark.executor.cores","12")
sc = pyspark.SparkContext(conf=conf)
Spark SQL
DataFrame
In [3]:
sqlContext = pyspark.SQLContext(sc)
sqlContext
Out[3]:
In [4]:
airlines = sqlContext.read.format("com.databricks.spark.csv")\
.option("header", "true")\
.option("inferschema", "true")\
.load("/repository/airlines/data/")\
.cache()
In [5]:
%%time
airlines.count()
Out[5]:
In [6]:
%%time
airlines.count()
Out[6]:
In [7]:
airlines.printSchema()
You can interact with a DataFrame via SQLContext using SQL statements by registerting the DataFrame as a table
In [8]:
airlines.registerTempTable("airlines")
How many unique airlines are there?
In [9]:
uniqueAirline = sqlContext.sql("SELECT DISTINCT UniqueCarrier \
FROM airlines")
uniqueAirline.show()
Calculate how many flights completed by each carrier over time
In [10]:
%%time
carrierFlightCount = sqlContext.sql("SELECT UniqueCarrier, COUNT(UniqueCarrier) AS FlightCount \
FROM airlines GROUP BY UniqueCarrier")
carrierFlightCount.show()
How do you display full carrier names?
In [12]:
carriers = sqlContext.read.format("com.databricks.spark.csv")\
.option("header", "true")\
.option("inferschema", "true")\
.load("/repository/airlines/metadata/carriers.csv")\
.cache()
carriers.registerTempTable("carriers")
In [13]:
carriers.printSchema()
In [14]:
%%time
carrierFlightCountFullName = sqlContext.sql("SELECT c.Description, a.UniqueCarrier, COUNT(a.UniqueCarrier) AS FlightCount \
FROM airlines AS a \
INNER JOIN carriers AS c \
ON c.Code = a.UniqueCarrier \
GROUP BY a.UniqueCarrier, c.Description \
ORDER BY a.UniqueCarrier")
carrierFlightCountFullName.show()
What is the averaged departure delay time for each airline?
In [15]:
%%time
avgDepartureDelay = sqlContext.sql("SELECT FIRST(c.Description), FIRST(a.UniqueCarrier), AVG(a.DepDelay) AS AvgDepDelay \
FROM airlines AS a \
INNER JOIN carriers AS c \
ON c.Code = a.UniqueCarrier \
GROUP BY a.UniqueCarrier \
ORDER BY a.UniqueCarrier")
avgDepartureDelay.show()
In [16]:
airlines.unpersist()
Out[16]:
In [7]:
sc.stop()
In [ ]: