Learning PySpark

Chapter 4: DataFrames

This notebook contains sample code from Chapter 4 of Learning PySpark focusing on PySpark and DataFrames.

Generate your own DataFrame

Instead of accessing the file system, let's create a DataFrame by generating the data. In this case, we'll first create the stringRDD RDD and then convert it into a DataFrame when we're reading stringJSONRDD using spark.read.json.


In [3]:
# Generate our own JSON data 
#   This way we don't have to access the file system yet.
stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

In [4]:
# Create DataFrame
swimmersJSON = spark.read.json(stringJSONRDD)

In [5]:
# Create temporary table
swimmersJSON.createOrReplaceTempView("swimmersJSON")

In [6]:
# DataFrame API
swimmersJSON.show()

In [7]:
# SQL Query
spark.sql("select * from swimmersJSON").collect()

In [8]:
%sql 
-- Query Data
select * from swimmersJSON

Inferring the Schema Using Reflection

Note that Apache Spark is inferring the schema using reflection; i.e. it automaticlaly determines the schema of the data based on reviewing the JSON data.


In [10]:
# Print the schema
swimmersJSON.printSchema()

Notice that Spark was able to determine infer the schema (when reviewing the schema using .printSchema).

But what if we want to programmatically specify the schema?

Programmatically Specifying the Schema

In this case, let's specify the schema for a CSV text file.


In [13]:
from pyspark.sql.types import *

# Generate our own CSV data 
#   This way we don't have to access the file system yet.
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])

# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types
schemaString = "id name age eyeColor"
schema = StructType([
    StructField("id", LongType(), True),    
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)

# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")

In [14]:
# Print the schema
#   Notice that we have redefined id as Long (instead of String)
swimmers.printSchema()

In [15]:
%sql 
-- Query the data
select * from swimmers

As you can see from above, we can programmatically apply the schema instead of allowing the Spark engine to infer the schema via reflection.

Additional Resources include:

|| SparkSession

Notice that we're no longer using sqlContext.read... but instead spark.read.... This is because as part of Spark 2.0, HiveContext, SQLContext, StreamingContext, SparkContext have been merged together into the Spark Session spark.

  • Entry point for reading data
  • Working with metadata
  • Configuration
  • Cluster resource management

For more information, please refer to How to use SparkSession in Apache Spark 2.0 (http://bit.ly/2br0Fr1).

Querying with SQL

With DataFrames, you can start writing your queries using Spark SQL - a SQL dialect that is compatible with the Hive Query Language (or HiveQL).


In [19]:
# Execute SQL Query and return the data
spark.sql("select * from swimmers").show()

Let's get the row count:


In [21]:
# Get count of rows in SQL
spark.sql("select count(1) from swimmers").show()

Note, you can make use of %sql within the notebook cells of a Databricks notebook.


In [23]:
%sql 
-- Query all data
select * from swimmers

In [24]:
# Query id and age for swimmers with age = 22 via DataFrame API
swimmers.select("id", "age").filter("age = 22").show()

In [25]:
# Query id and age for swimmers with age = 22 via DataFrame API in another way
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()

In [26]:
# Query id and age for swimmers with age = 22 in SQL
spark.sql("select id, age from swimmers where age = 22").show()

In [27]:
%sql 
-- Query id and age for swimmers with age = 22
select id, age from swimmers where age = 22

In [28]:
# Query name and eye color for swimmers with eye color starting with the letter 'b'
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()

In [29]:
%sql 
-- Query name and eye color for swimmers with eye color starting with the letter 'b'
select name, eyeColor from swimmers where eyeColor like 'b%'

Querying with the DataFrame API

With DataFrames, you can start writing your queries using the DataFrame API


In [31]:
# Show the values 
swimmers.show()

In [32]:
# Using Databricks `display` command to view the data easier
display(swimmers)

In [33]:
# Get count of rows
swimmers.count()

In [34]:
# Get the id, age where age = 22
swimmers.select("id", "age").filter("age = 22").show()

In [35]:
# Get the name, eyeColor where eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

On-Time Flight Performance

Query flight departure delays by State and City by joining the departure delay and join to the airport codes (to identify state and city).

DataFrame Queries

Let's run a flight performance using DataFrames; let's first build the DataFrames from the source datasets.


In [38]:
# Set File Paths
flightPerfFilePath = "/databricks-datasets/flights/departuredelays.csv"
airportsFilePath = "/databricks-datasets/flights/airport-codes-na.txt"

# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")

# Obtain Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")

# Cache the Departure Delays dataset 
flightPerf.cache()

In [39]:
# Query Sum of Flight Delays by City and Origin Code (for Washington State)
spark.sql("select a.City, f.origin, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.State = 'WA' group by a.City, f.origin order by sum(f.delay) desc").show()

In [40]:
%sql
-- Query Sum of Flight Delays by City and Origin Code (for Washington State)
select a.City, f.origin, sum(f.delay) as Delays
  from FlightPerformance f
    join airports a
      on a.IATA = f.origin
 where a.State = 'WA'
 group by a.City, f.origin
 order by sum(f.delay) desc

In [41]:
# Query Sum of Flight Delays by State (for the US)
spark.sql("select a.State, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.Country = 'USA' group by a.State ").show()

In [42]:
%sql
-- Query Sum of Flight Delays by State (for the US)
select a.State, sum(f.delay) as Delays
  from FlightPerformance f
    join airports a
      on a.IATA = f.origin
 where a.Country = 'USA'
 group by a.State

In [43]:
%sql
-- Query Sum of Flight Delays by State (for the US)
select a.State, sum(f.delay) as Delays
  from FlightPerformance f
    join airports a
      on a.IATA = f.origin
 where a.Country = 'USA'
 group by a.State