This notebook contains sample code from Chapter 4 of Learning PySpark focusing on PySpark and DataFrames.
In [3]:
# Generate our own JSON data
# This way we don't have to access the file system yet.
stringJSONRDD = sc.parallelize(("""
{ "id": "123",
"name": "Katie",
"age": 19,
"eyeColor": "brown"
}""",
"""{
"id": "234",
"name": "Michael",
"age": 22,
"eyeColor": "green"
}""",
"""{
"id": "345",
"name": "Simone",
"age": 23,
"eyeColor": "blue"
}""")
)
In [4]:
# Create DataFrame
swimmersJSON = spark.read.json(stringJSONRDD)
In [5]:
# Create temporary table
swimmersJSON.createOrReplaceTempView("swimmersJSON")
In [6]:
# DataFrame API
swimmersJSON.show()
In [7]:
# SQL Query
spark.sql("select * from swimmersJSON").collect()
In [8]:
%sql
-- Query Data
select * from swimmersJSON
In [10]:
# Print the schema
swimmersJSON.printSchema()
Notice that Spark was able to determine infer the schema (when reviewing the schema using .printSchema
).
But what if we want to programmatically specify the schema?
In [13]:
from pyspark.sql.types import *
# Generate our own CSV data
# This way we don't have to access the file system yet.
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])
# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types
schemaString = "id name age eyeColor"
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])
# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)
# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")
In [14]:
# Print the schema
# Notice that we have redefined id as Long (instead of String)
swimmers.printSchema()
In [15]:
%sql
-- Query the data
select * from swimmers
As you can see from above, we can programmatically apply the schema
instead of allowing the Spark engine to infer the schema via reflection.
Additional Resources include:
CSV
file.Notice that we're no longer using sqlContext.read...
but instead spark.read...
. This is because as part of Spark 2.0, HiveContext
, SQLContext
, StreamingContext
, SparkContext
have been merged together into the Spark Session spark
.
For more information, please refer to How to use SparkSession in Apache Spark 2.0 (http://bit.ly/2br0Fr1).
In [19]:
# Execute SQL Query and return the data
spark.sql("select * from swimmers").show()
Let's get the row count:
In [21]:
# Get count of rows in SQL
spark.sql("select count(1) from swimmers").show()
Note, you can make use of %sql
within the notebook cells of a Databricks notebook.
In [23]:
%sql
-- Query all data
select * from swimmers
In [24]:
# Query id and age for swimmers with age = 22 via DataFrame API
swimmers.select("id", "age").filter("age = 22").show()
In [25]:
# Query id and age for swimmers with age = 22 via DataFrame API in another way
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()
In [26]:
# Query id and age for swimmers with age = 22 in SQL
spark.sql("select id, age from swimmers where age = 22").show()
In [27]:
%sql
-- Query id and age for swimmers with age = 22
select id, age from swimmers where age = 22
In [28]:
# Query name and eye color for swimmers with eye color starting with the letter 'b'
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()
In [29]:
%sql
-- Query name and eye color for swimmers with eye color starting with the letter 'b'
select name, eyeColor from swimmers where eyeColor like 'b%'
In [31]:
# Show the values
swimmers.show()
In [32]:
# Using Databricks `display` command to view the data easier
display(swimmers)
In [33]:
# Get count of rows
swimmers.count()
In [34]:
# Get the id, age where age = 22
swimmers.select("id", "age").filter("age = 22").show()
In [35]:
# Get the name, eyeColor where eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()
In [38]:
# Set File Paths
flightPerfFilePath = "/databricks-datasets/flights/departuredelays.csv"
airportsFilePath = "/databricks-datasets/flights/airport-codes-na.txt"
# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")
# Obtain Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")
# Cache the Departure Delays dataset
flightPerf.cache()
In [39]:
# Query Sum of Flight Delays by City and Origin Code (for Washington State)
spark.sql("select a.City, f.origin, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.State = 'WA' group by a.City, f.origin order by sum(f.delay) desc").show()
In [40]:
%sql
-- Query Sum of Flight Delays by City and Origin Code (for Washington State)
select a.City, f.origin, sum(f.delay) as Delays
from FlightPerformance f
join airports a
on a.IATA = f.origin
where a.State = 'WA'
group by a.City, f.origin
order by sum(f.delay) desc
In [41]:
# Query Sum of Flight Delays by State (for the US)
spark.sql("select a.State, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.Country = 'USA' group by a.State ").show()
In [42]:
%sql
-- Query Sum of Flight Delays by State (for the US)
select a.State, sum(f.delay) as Delays
from FlightPerformance f
join airports a
on a.IATA = f.origin
where a.Country = 'USA'
group by a.State
In [43]:
%sql
-- Query Sum of Flight Delays by State (for the US)
select a.State, sum(f.delay) as Delays
from FlightPerformance f
join airports a
on a.IATA = f.origin
where a.Country = 'USA'
group by a.State
For more information, please refer to: