Spark DataFrames

Imports

For these examples, we just need to import two pyspark.sql libraries:

  • types
  • functions

We need pyspark.sql.types to define schemas for the DataFrames. The pyspark.sql.functions library contains all of the functions specific to SQL and DataFrames in PySpark.


In [3]:
from pyspark.sql.types import *  # Necessary for creating schemas
from pyspark.sql.functions import * # Importing PySpark functions

Creating DataFrames

Making a Simple DataFrame from a Tuple List


In [6]:
# Make a tuple list
a_list = [('a', 1), ('b', 2), ('c', 3)]

# Create a Spark DataFrame, without supplying a schema value
df_from_list_no_schema = \
sqlContext.createDataFrame(a_list)

# Print the DF object
print df_from_list_no_schema

# Print a collected list of Row objects
print df_from_list_no_schema.collect()

# Show the DataFrame
df_from_list_no_schema.show()

Making a Simple DataFrame from a Tuple List and a Schema


In [8]:
# Create a Spark DataFrame, this time with schema
df_from_list_with_schema = \
sqlContext.createDataFrame(a_list, ['letters', 'numbers']) # this simple schema contains just column names

# Show the DataFrame
df_from_list_with_schema.show()

# Show the DataFrame's schema
df_from_list_with_schema.printSchema()

Making a Simple DataFrame from a Dictionary


In [10]:
# Make a dictionary
a_dict = [{'letters': 'a', 'numbers': 1},
          {'letters': 'b', 'numbers': 2},
          {'letters': 'c', 'numbers': 3}]

# Create a Spark DataFrame from the dictionary
df_from_dict = \
(sqlContext
 .createDataFrame(a_dict)) # You will get a warning about this

# Show the DataFrame
df_from_dict.show()

Making a Simple DataFrame Using a StructType Schema + RDD


In [12]:
# Define the schema
schema = StructType([
    StructField('letters', StringType(), True),
    StructField('numbers', IntegerType(), True)])

# Create an RDD from a list
rdd = sc.parallelize(a_list)

# Create the DataFrame from these raw components
nice_df = \
(sqlContext
 .createDataFrame(rdd, schema))

# Show the DataFrame
nice_df.show()

In [13]:
# Define the schema
schema = StructType([
    StructField('letters', StringType(), True),
    StructField('numbers', IntegerType(), True)])

# Create an RDD from a list
rdd = sc.parallelize(a_list)

# Create the DataFrame from these raw components
nice_df = \
(sqlContext
 .createDataFrame(rdd, schema))

# Show the DataFrame
nice_df.show()

Simple Inspection Functions

We now have a nice_df, here are some nice functions for inspecting the DataFrame.


In [15]:
# `columns`: return all column names as a list
nice_df.columns

In [16]:
# `dtypes`: get the datatypes for all columns
nice_df.dtypes

In [17]:
# `printSchema()`: prints the schema of the supplied DF
nice_df.printSchema()

In [18]:
# `schema`: returns the schema of the provided DF as `StructType` schema
nice_df.schema

In [19]:
# `first()` returns the first row as a Row while
# `head()` and `take()` return `n` number of Row objects
print nice_df.first() # can't supply a value; never a list
print nice_df.head(2) # can optionally supply a value (default: 1);
                      # with n > 1, a list
print nice_df.take(2) # expects a value; always a list

In [20]:
# `count()`: returns a count of all rows in DF
nice_df.count()

In [21]:
# `describe()`: print out stats for numerical columns
nice_df.describe().show() # can optionally supply a list of column names

In [22]:
# the `explain()` function explains the under-the-hood evaluation process
nice_df.explain()

Relatively Simple DataFrame Manipulation Functions

Let's use these functions:

  • unionAll(): combine two DataFrames together
  • orderBy(): perform sorting of DataFrame columns
  • select(): select which DataFrame columns to retain
  • drop(): select a single DataFrame column to remove
  • filter(): retain DataFrame rows that match a condition

In [24]:
# Take the DataFrame and add it to itself
(nice_df
 .unionAll(nice_df)
 .show())

# Add it to itself twice
(nice_df
 .unionAll(nice_df)
 .unionAll(nice_df)
 .show())

# Coercion will occur if schemas don't align
(nice_df
 .select(['numbers', 'letters'])
 .unionAll(nice_df)
 .show())

(nice_df
 .select(['numbers', 'letters'])
 .unionAll(nice_df)
 .printSchema())

In [25]:
# Sorting the DataFrame by the `numbers` column
(nice_df
 .unionAll(nice_df)
 .unionAll(nice_df)
 .orderBy('numbers')
 .show())

# Sort the same column in reverse order
(nice_df
 .unionAll(nice_df)
 .unionAll(nice_df)
 .orderBy('numbers',
          ascending = False)
 .show())

In [26]:
# `select()` and `drop()` both take a list of column names
# and these functions do exactly what you might expect

# Select only the first column of the DF
(nice_df
 .select('letters')
 .show())

# Re-order columns in the DF using `select()`
(nice_df
 .select(['numbers', 'letters'])
 .show())

# Drop the second column of the DF
(nice_df
 .drop('letters')
 .show())

In [27]:
# The `filter()` function performs filtering of DF rows

# Here is some numeric filtering with comparison operators
# (>, <, >=, <=, ==, != all work)

# Filter rows where values in `numbers` is > 1
(nice_df
 .filter(nice_df.numbers > 1)
 .show())

# Perform two filter operations
(nice_df
 .filter(nice_df.numbers > 1)
 .filter(nice_df.numbers < 3)
 .show())

# Not just numbers! Use the `filter()` + `isin()`
# combo to filter on string columns with a set of values
(nice_df
 .filter(nice_df.letters
         .isin(['a', 'b']))
 .show())

The 'groupBy' Function and Aggregations

The groupBy() function groups the DataFrame using the specified columns, then, we can run aggregation on them. The available aggregate functions are:

  • count(): counts the number of records for each group
  • sum(): compute the sum for each numeric column for each group
  • min(): computes the minimum value for each numeric column for each group
  • max(): computes the maximum value for each numeric column for each group
  • avg() or mean(): computes average values for each numeric columns for each group
  • pivot(): pivots a column of the current DataFrame and perform the specified aggregation

Before we get into aggregations, let's load in a CSV with interesting data and create a new DataFrame.

You do this with the spark-csv package. Documentation on that is available at:

The dataset that will be loaded in to demonstrate contains data about flights departing New York City airports (JFK, LGA, EWR) in 2013. It has 336,776 rows and 16 columns.


In [29]:
# Create a schema object...
nycflights_schema = StructType([
  StructField('year', IntegerType(), True),
  StructField('month', IntegerType(), True),
  StructField('day', IntegerType(), True),
  StructField('dep_time', StringType(), True),
  StructField('dep_delay', IntegerType(), True),
  StructField('arr_time', StringType(), True),
  StructField('arr_delay', IntegerType(), True),
  StructField('carrier', StringType(), True),
  StructField('tailnum', StringType(), True),
  StructField('flight', StringType(), True),  
  StructField('origin', StringType(), True),
  StructField('dest', StringType(), True),
  StructField('air_time', IntegerType(), True),
  StructField('distance', IntegerType(), True),
  StructField('hour', IntegerType(), True),
  StructField('minute', IntegerType(), True)
  ])

# ...and then read the CSV with the schema
nycflights = \
(sqlContext
 .read
 .format('com.databricks.spark.csv')
 .schema(nycflights_schema)
 .options(header = True)
 .load('/mnt/data-files/nycflights13.csv'))

In [30]:
# Have a look at the schema for the imported dataset
nycflights.printSchema()

In [31]:
# Have a look at the `nycflights` DataFrame with the `display()` function (available for Databricks Cloud notebooks)
display(nycflights)

In [32]:
# Let's group and aggregate

# `groupBy()` will group one or more DF columns
# and prep them for aggregration functions
(nycflights
 .groupby('month') # creates 'GroupedData'
 .count() # creates a new column with aggregate `count` values
 .show())

# Use the `agg()` function to perform multiple
# aggregations
(nycflights
 .groupby('month')
 .agg({'dep_delay': 'avg', 'arr_delay': 'avg'}) # note the new column names
 .show())

# Caveat: you can't perform multiple aggregrations
# on the same column (only the last is performed)
(nycflights
 .groupby('month')
 .agg({'dep_delay': 'min', 'dep_delay': 'max'})
 .show())

In [33]:
# Use `groupBy()` with a few columns, then aggregate
display(
  nycflights
  .groupby(['month', 'origin', 'dest']) # group by these unique combinations
  .count()                              # perform a 'count' aggregation on the groups
  .orderBy(['month', 'count'],
           ascending = [1, 0])          # order by `month` ascending, `count` descending
)

In [34]:
# Use `groupBy()` + `pivot()` + an aggregation function to
# make a pivot table!

# Get a table of flights by month for each carrier
display(
  nycflights
  .groupBy('month') # group the data for aggregation by `month` number
  .pivot('carrier') # provide columns of data by `carrier` abbreviation
  .count()          # create aggregations as a count of rows
)

In [35]:
# Another pivot table idea: get the average departure
# delay for each carrier at the different NYC airports
display(
  nycflights
  .groupBy('carrier')
  .pivot('origin')
  .avg('dep_delay')
)

Column Operations

Column instances can be created by:

(1) Selecting a column from a DataFrame

  • df.colName
  • df["colName"]
  • df.select(df.colName)
  • df.withColumn(df.colName)

(2) Creating one from an expression

  • df.colName + 1
  • 1 / df.colName

Once you have a Column instance, you can apply a wide range of functions. Some of the functions covered here are:

  • format_number(): apply formatting to a number, rounded to d decimal places, and return the result as a string
  • when() & otherwise(): when() evaluates a list of conditions and returns one of multiple possible result expressions; if otherwise() is not invoked, None is returned for unmatched conditions
  • concat_ws(): concatenates multiple input string columns together into a single string column, using the given separator
  • to_utc_timestamp(): assumes the given timestamp is in given timezone and converts to UTC
  • year(): extracts the year of a given date as integer
  • month(): extracts the month of a given date as integer
  • dayofmonth(): extracts the day of the month of a given date as integer
  • hour(): extract the hour of a given date as integer
  • minute(): extract the minute of a given date as integer

In [37]:
# Perform 2 different aggregations, rename those new columns,
# then do some rounding of the aggregrate values
display(
  nycflights
  .groupby('month')
  .agg({'dep_delay': 'avg', 'arr_delay': 'avg'})
  .withColumnRenamed('avg(arr_delay)', 'mean_arr_delay')
  .withColumnRenamed('avg(dep_delay)', 'mean_dep_delay')
  .withColumn('mean_arr_delay', format_number('mean_arr_delay', 1))
  .withColumn('mean_dep_delay', format_number('mean_dep_delay', 1))
)

In [38]:
# Add a new column (`far_or_near`) with a string based on a comparison
# on a numeric column; uses: `withColumn()`, `when()`, and `otherwise()`
display(
  nycflights
  .withColumn('far_or_near',
              when(nycflights.distance > 1000, 'far') # the `if-then` statement
              .otherwise('near'))                     # the `else` statement
)

In [39]:
# Perform a few numerical computations across columns
display(
  nycflights
  .withColumn('dist_per_minute',
              nycflights.distance / nycflights.air_time) # create new column with division of values
  .withColumn('dist_per_minute',
              format_number('dist_per_minute', 2))       # round that new column's float value to 2 decimal places
  .drop('distance') # drop the `distance` column
  .drop('air_time') # drop the `air_time` column
)

In [40]:
# Create a proper timestamp for once in your life...
# We have all the components: `year`, `month`, `day`,
# `hour`, and `minute`

# Use `concat_ws()` (concatentate with separator) to
# combine column data into StringType columns such
# that dates (`-` separator, YYYY-MM-DD) and times
# (`:` separator, 24-hour time) are formed
nycflights = \
(nycflights
 .withColumn('date',
             concat_ws('-',
                       nycflights.year,
                       nycflights.month,
                       nycflights.day))
 .withColumn('time',
             concat_ws(':',
                       nycflights.hour,
                       nycflights.minute)))

# In a second step, concatenate with `concat_ws()`
# the `date` and `time` strings (separator is a space);
# then drop several columns
nycflights = \
(nycflights
 .withColumn('timestamp',
             concat_ws(' ',
                       nycflights.date,
                       nycflights.time))
 .drop('year')     # `drop()` doesn't accept
 .drop('month')    # a list of column names,
 .drop('day')      # therefore, for every column
 .drop('hour')     # we would like to remove
 .drop('minute')   # from the DataFrame, we 
 .drop('date')     # must create a new `drop()`
 .drop('time'))    # statement

# In the final step, convert the `timestamp` from
# a StringType into a TimestampType
nycflights = \
(nycflights
 .withColumn('timestamp',
             to_utc_timestamp(nycflights.timestamp, 'GMT')))

In [41]:
# Display the `nycflights` DataFrame with the new `timestamp` column
display(nycflights)

In [42]:
# It probably doesn't matter in the end, but,
# I'd prefer that the `timestamp` column be
# the first column; let's make use of the
# `columns` method and get slicing!
nycflights = \
 (nycflights
  .select(nycflights.columns[-1:] + nycflights.columns[0:-1])) # recall that `columns` returns a list of column names

In [43]:
# Now the `timestamp` column is the first column: 
display(nycflights)

In [44]:
# Inspect the DataFrame's schema, note that `timestamp` is indeed classed as a timestamp
nycflights.printSchema()

In [45]:
# If you miss the time component columns,
# you can get them back! Use the `year()`,
# `month()`, `dayofmonth()`, `hour()`, and
# `minute()` functions with `withColumn()`
display(
  nycflights
  .withColumn('year', year(nycflights.timestamp))
  .withColumn('month', month(nycflights.timestamp))
  .withColumn('day', dayofmonth(nycflights.timestamp))
  .withColumn('hour', hour(nycflights.timestamp))
  .withColumn('minute', minute(nycflights.timestamp))
)

There are more time-based functions:

  • date_sub(): subtract an integer number of days from a Date or Timestamp
  • date_add(): add an integer number of days from a Date or Timestamp
  • datediff(): get the difference between two dates
  • add_months(): add an integer number of months
  • months_between(): get the number of months between two dates
  • next_day(): returns the first date which is later than the value of the date column
  • last_day(): returns the last day of the month which the given date belongs to
  • dayofmonth(): extract the day of the month of a given date as integer
  • dayofyear(): extract the day of the year of a given date as integer
  • weekofyear(): extract the week number of a given date as integer
  • quarter(): extract the quarter of a given date

In [47]:
# Let's transform the timestamp in the first
# record of `nycflights` with each of these
# functions
display(
  nycflights
   .limit(1)
   .select('timestamp')
   .withColumn('date_sub', date_sub(nycflights.timestamp, 7))
   .withColumn('date_add', date_add(nycflights.timestamp, 7))
   .withColumn('datediff', datediff(nycflights.timestamp, nycflights.timestamp))
   .withColumn('add_months', add_months(nycflights.timestamp, 1))
   .withColumn('months_between', months_between(nycflights.timestamp, nycflights.timestamp))
   .withColumn('next_day', next_day(nycflights.timestamp, 'Mon'))
   .withColumn('last_day', last_day(nycflights.timestamp))
   .withColumn('dayofmonth', dayofmonth(nycflights.timestamp))
   .withColumn('dayofyear', dayofyear(nycflights.timestamp))
   .withColumn('weekofyear', weekofyear(nycflights.timestamp))
   .withColumn('quarter', quarter(nycflights.timestamp))
   )

Joins

Joins are easily performed with Spark DataFrames. The expression is:

join(other, on = None, how = None)

where:

  • other: a DataFrame that serves as the right side of the join
  • on: typically a join expression
  • how: the default is inner but there are also inner, outer, left_outer, right_outer, and leftsemi joins available

Let's load in some more data so that we can have two DataFrames to join. The CSV file weather.csv contains hourly meteorological data from EWR during 2013.


In [51]:
# Create a schema object...
weather_schema = StructType([  
  StructField('year', IntegerType(), True),
  StructField('month', IntegerType(), True),
  StructField('day', IntegerType(), True),
  StructField('hour', IntegerType(), True),
  StructField('temp', FloatType(), True),
  StructField('dewp', FloatType(), True),
  StructField('humid', FloatType(), True),
  StructField('wind_dir', IntegerType(), True),
  StructField('wind_speed', FloatType(), True),
  StructField('wind_gust', FloatType(), True),
  StructField('precip', FloatType(), True),
  StructField('pressure', FloatType(), True),
  StructField('visib', FloatType(), True)
  ])

#...and then read the CSV with the schema
weather = \
(sqlContext
 .read
 .format('com.databricks.spark.csv')
 .schema(weather_schema)
 .options(header = True)
 .load('/mnt/data-files/weather.csv'))

In [52]:
# Have a look at the imported dataset
display(weather)

In [53]:
# We need those `month`, `day`, and `hour` values back
nycflights = \
(nycflights
 .withColumn('month', month(nycflights.timestamp))
 .withColumn('day', dayofmonth(nycflights.timestamp))
 .withColumn('hour', hour(nycflights.timestamp)))

# Join the `nycflights` DF with the `weather` DF 
nycflights_all_columns = \
(nycflights
 .join(weather,
       [nycflights.month == weather.month, # three join conditions: month,
        nycflights.day == weather.day,     #                        day,
        nycflights.hour == weather.hour],  #                        hour
       'left_outer')) # left outer join: keep all rows from the left DF (flights), with the matching rows in the right DF (weather)
                      # NULLs created if there is no match to the right DF

In [54]:
# Notice that lots of columns created, as well as duplicate column names (not a bug! a feature?)
display(nycflights_all_columns)

In [55]:
# One way to reduce the number of extraneous
# columns is to use a `select()` statement
nycflights_wind_visib = \
(nycflights_all_columns
 .select(['timestamp', 'carrier', 'flight',
          'origin', 'dest', 'wind_dir',
          'wind_speed', 'wind_gust', 'visib']))

In [56]:
# Examine the DataFrame, now with less columns
display(nycflights_wind_visib)

Let's load in even more data so we can determine if any takeoffs occurred in very windy weather.

The CSV beaufort_land.csv contains Beaufort scale values (the force column), wind speed ranges in mph, and the name for each wind force.


In [58]:
# Create a schema object... 
beaufort_land_schema = StructType([  
  StructField('force', IntegerType(), True),
  StructField('speed_mi_h_lb', IntegerType(), True),
  StructField('speed_mi_h_ub', IntegerType(), True),
  StructField('name', StringType(), True)
  ])

# ...and then read the CSV with the schema
beaufort_land = \
(sqlContext
 .read
 .format('com.databricks.spark.csv')
 .schema(beaufort_land_schema)
 .options(header = True)
 .load('/mnt/data-files/beaufort_land.csv'))

In [59]:
# Have a look at the imported dataset
display(beaufort_land)

In [60]:
# Join the current working DF with the `beaufort_land` DF
# and use join expressions that use the WS ranges
nycflights_wind_visib_beaufort = \
(nycflights_wind_visib
 .join(beaufort_land,
      [nycflights_wind_visib.wind_speed >= beaufort_land.speed_mi_h_lb,
       nycflights_wind_visib.wind_speed < beaufort_land.speed_mi_h_ub],
       'left_outer')
 .withColumn('month', month(nycflights_wind_visib.timestamp)) # Create a month column from `timestamp` values
 .drop('speed_mi_h_lb')
 .drop('speed_mi_h_ub')
)

In [61]:
# View the joined DF; now we have extra data on wind speed!
display(nycflights_wind_visib_beaufort)

In [62]:
# We can inspect the number of potentially dangerous
# takeoffs (i.e., where the Beaufort force is high)
# month-by-month through the use of the `crosstab()` function
crosstab_month_force = \
(nycflights_wind_visib_beaufort
 .crosstab('month', 'force'))

# After creating the crosstab DataFrame, use a few
# functions to clean up the resultant DataFrame
crosstab_month_force = \
(crosstab_month_force
 .withColumn('month_force',
             crosstab_month_force.month_force.cast('int')) # the column is initially a string but recasting as
                                                           # an `int` will aid ordering in the next expression
 .orderBy('month_force')
 .drop('null'))

In [63]:
# Display the cross tabulation; turns out January was a bit riskier for takeoffs due to wind conditions
display(crosstab_month_force)

User Defined Functions (UDFs)

UDFs allow for computations of values while looking at every input row in the DataFrame. They allow you to make your own function and import functionality from other Python libraries.


In [65]:
# Define a function to convert velocity from
# miles per hour (mph) to meters per second (mps)
def mph_to_mps(mph):
  mps = mph * 0.44704
  return mps

# Register this function as a UDF using `udf()`
mph_to_mps = udf(mph_to_mps, FloatType()) # An output type was specified

In [66]:
# Create two new columns that are conversions of wind
# speeds from mph to mps
display(
  nycflights_wind_visib_beaufort
  .withColumn('wind_speed_mps', mph_to_mps('wind_speed'))
  .withColumn('wind_gust_mps', mph_to_mps('wind_gust'))
  .withColumnRenamed('wind_speed', 'wind_speed_mph')
  .withColumnRenamed('wind_gust', 'wind_gust_mph')
)

Writing DataFrames to Files

We can easily write DataFrame data to a variety of different file formats.


In [68]:
# Saving to CSV is quite similar to reading from a CSV file
(crosstab_month_force
 .write
 .mode('overwrite')
 .format('com.databricks.spark.csv')
 .save('/mnt/spark-atp/nycflights13/crosstab_month_force/'))

In [69]:
# Saving to Parquet is generally recommended for later retrieval
(crosstab_month_force
 .write
 .mode('overwrite')
 .parquet('/mnt/spark-atp/nycflights13/crosstab_month_force_parquet/'))

There are many more functions... although I tried to cover a lot of ground, there are dozens more functions for DataFrames that I haven't touched upon.

The main project page for Spark:

The main reference for PySpark is:

These examples are available at:

Information on the Parquet file format can be found at its project page:

The GitHub project page for spark-csv package; contains usage documentation: