Spark SQL

A tour of the Spark SQL library, the spark-csv package and Spark DataFrames.

Resources

  • Spark tutorials: A growing bunch of accessible tutorials on Spark, mostly in Scala but a few in Python.

In [1]:
from pyspark import SparkContext, SparkConf

conf = (SparkConf()
        .setAppName('SparkSQL')
        .setMaster('local[*]'))

In [2]:
sc = SparkContext(conf=conf)

In [3]:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)

DataFrame from pandas


In [4]:
pandas_df = sns.load_dataset('iris')

In [5]:
spark_df = sqlc.createDataFrame(pandas_df)

In [6]:
spark_df.show(n=3)


+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 3 rows

DataFrame from CSV files

Using manual parsing and a schema


In [7]:
%%bash

cat data/cars.csv


year,make,model,comment,blank
"2012","Tesla","S","No comment",

1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt


In [8]:
from pyspark.sql.types import *

def pad(alist):
    tmp = alist[:]
    n = 5 - len(alist)
    for i in range(n):
        tmp.append('')
    return tmp

# Load a text file and convert each line to a tuple.
lines = sc.textFile('data/cars.csv')
header = lines.first() #extract header
lines = lines.filter(lambda line: line != header)
lines = lines.filter(lambda line: line)
parts = lines.map(lambda l: l.split(','))
parts = parts.map(lambda part: pad(part))

fields = [    
    StructField('year', IntegerType(), True),
    StructField('make', StringType(), True),
    StructField('model', StringType(), True),
    StructField('comment', StringType(), True),
    StructField('blank', StringType(), True),
    ]
schema = StructType(fields)

# Apply the schema to the RDD.
df0 = sqlc.createDataFrame(parts, schema)

df0.show(n=3)


+----+-------+-----+--------------------+-----+
|year|   make|model|             comment|blank|
+----+-------+-----+--------------------+-----+
|null|"Tesla"|  "S"|        "No comment"|     |
|null|   Ford| E350|"Go get one now t...|     |
|null|  Chevy| Volt|                    |     |
+----+-------+-----+--------------------+-----+

Using the spark-csv package


In [9]:
df = (sqlc.read.format('com.databricks.spark.csv')
      .options(header='true', inferschema='true')
      .load('data/cars.csv'))

Using the dataframe


In [10]:
df.printSchema()


root
 |-- year: integer (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)


In [11]:
df.show()


+----+-----+-----+--------------------+-----+
|year| make|model|             comment|blank|
+----+-----+-----+--------------------+-----+
|2012|Tesla|    S|          No comment|     |
|1997| Ford| E350|Go get one now th...|     |
|2015|Chevy| Volt|                null| null|
+----+-----+-----+--------------------+-----+


In [15]:
df.select(['year', 'make']).show()


+----+-----+
|year| make|
+----+-----+
|2012|Tesla|
|1997| Ford|
|2015|Chevy|
+----+-----+

To run SQL queries, we need to register the dataframe as a table


In [22]:
df.registerTempTable('cars')

In [23]:
q = sqlc.sql('select year, make from cars where year > 2000')
q.show()


+----+-----+
|year| make|
+----+-----+
|2012|Tesla|
|2015|Chevy|
+----+-----+

Spark dataframes can be converted to Pandas ones

Typically, we would only convert small dataframes such as the results of SQL queries. If we could load the original dataset in memory as a pandaa dataframe, why would we be using Spark?


In [24]:
q_df = q.toPandas()
q_df


Out[24]:
year make
0 2012 Tesla
1 2015 Chevy

DataFrame from JSON files

It is easier to read in JSON than CSV files because JSON is self-describing, allowing Spark SQL to infer the appropriate schema without additional hints.

As an example, we will look at Durham police crime reports from the Durham Open Data website.


In [25]:
df = sqlc.read.json('data/durham-police-crime-reports.json')

How many records are there?


In [26]:
df.count()


Out[26]:
101375

Since this is JSON, it is possible to have a nested schema.


In [27]:
df.printSchema()


root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- addtime: string (nullable = true)
 |    |-- big_zone: string (nullable = true)
 |    |-- chrgdesc: string (nullable = true)
 |    |-- csstatus: string (nullable = true)
 |    |-- csstatusdt: string (nullable = true)
 |    |-- date_fnd: string (nullable = true)
 |    |-- date_occu: string (nullable = true)
 |    |-- date_rept: string (nullable = true)
 |    |-- dist: string (nullable = true)
 |    |-- dow1: string (nullable = true)
 |    |-- dow2: string (nullable = true)
 |    |-- geo_point_2d: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- geo_shape: struct (nullable = true)
 |    |    |-- coordinates: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |-- hour_fnd: string (nullable = true)
 |    |-- hour_occu: string (nullable = true)
 |    |-- hour_rept: string (nullable = true)
 |    |-- inci_id: string (nullable = true)
 |    |-- monthstamp: string (nullable = true)
 |    |-- reportedas: string (nullable = true)
 |    |-- reviewdate: string (nullable = true)
 |    |-- strdate: string (nullable = true)
 |    |-- ucr_code: string (nullable = true)
 |    |-- ucr_type_o: string (nullable = true)
 |    |-- yearstamp: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- record_timestamp: string (nullable = true)
 |-- recordid: string (nullable = true)

Show the top few rows.


In [28]:
df.show(n=5)


+--------------------+--------------------+--------------------+--------------------+--------------------+
|           datasetid|              fields|            geometry|    record_timestamp|            recordid|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|durham-police-cri...|[2013-12-01T19:00...|[WrappedArray(-78...|2016-03-12T02:32:...|2c0251654c4b7a006...|
|durham-police-cri...|[2013-12-01T19:00...|[WrappedArray(-78...|2016-03-12T02:32:...|e5fe0e483fdb17fb7...|
|durham-police-cri...|[2013-12-01T19:00...|[WrappedArray(-78...|2016-03-12T02:32:...|d16c330ea4b3e2a90...|
|durham-police-cri...|[2013-12-01T19:00...|[WrappedArray(-78...|2016-03-12T02:32:...|1128e12a912b16cfe...|
|durham-police-cri...|[2013-12-01T19:00...|[WrappedArray(-78...|2016-03-12T02:32:...|ac79bc9c709d5dfa4...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows

Make a dataframe only containing date and charges.


In [29]:
df.select(['fields.strdate', 'fields.chrgdesc']).show(n=5)


+-----------+--------------------+
|    strdate|            chrgdesc|
+-----------+--------------------+
|Dec  2 2013|CALLS FOR SERVICE...|
|Dec  2 2013|VANDALISM TO PROP...|
|Dec  2 2013|BURGLARY - FORCIB...|
|Dec  2 2013|LARCENY - SHOPLIF...|
|Dec  2 2013|BURGLARY - FORCIB...|
+-----------+--------------------+
only showing top 5 rows

Show distinct charges - note that for an actual analysis, you would probably want to consolidate these into a smaller number of groups to account for typos, etc.


In [30]:
df.select('fields.chrgdesc').distinct().show()


+--------------------+
|            chrgdesc|
+--------------------+
|ALL OTHER OFFENSE...|
|DRUG EQUIPMENT/PA...|
| ASSIST OTHER AGENCY|
|TOWED/ABANDONED V...|
|DRUG EQUIPMENT/PA...|
|BURGLARY - FORCIB...|
|SEX OFFENSE - STA...|
|ROBBERY - INDIVIDUAL|
|WEAPON VIOLATIONS...|
|ALL OTHER OFFENSE...|
|DRUG/NARCOTIC VIO...|
|SEX OFFENSE - PEE...|
|DRUG/NARCOTIC VIO...|
|DRUG/NARCOTIC VIO...|
|AGGRAVATED ASSAUL...|
|ALL OTHER OFFENSE...|
|LIQUOR LAW - POSS...|
|EMBEZZLEMENT - WI...|
|WEAPON VIOLATIONS...|
|             RUNAWAY|
+--------------------+
only showing top 20 rows

What charges are the most common?


In [31]:
df.groupby('fields.chrgdesc').count().sort('count', ascending=False).show()


+--------------------+-----+
|            chrgdesc|count|
+--------------------+-----+
|BURGLARY - FORCIB...|11630|
|LARCENY - SHOPLIF...| 7633|
|LARCENY - FROM MO...| 7405|
|SIMPLE ASSAULT (P...| 5085|
| LARCENY - ALL OTHER| 4666|
|LARCENY - FROM BU...| 4514|
|VANDALISM TO AUTO...| 4112|
|DRUG/NARCOTIC VIO...| 3790|
|LARCENY - AUTOMOB...| 3441|
|VANDALISM TO PROP...| 3422|
|CALLS FOR SERVICE...| 3207|
|  AGGRAVATED ASSAULT| 3183|
|BURGLARY - NON-FO...| 2339|
|ROBBERY - INDIVIDUAL| 2330|
|TOWED/ABANDONED V...| 2244|
|MOTOR VEHICLE THE...| 1970|
|DRIVING WHILE IMP...| 1912|
|FRAUD - FALSE PRE...| 1660|
|      FOUND PROPERTY| 1643|
|ALL TRAFFIC (EXCE...| 1436|
+--------------------+-----+
only showing top 20 rows

Register as table to run full SQL queries


In [32]:
df.registerTempTable('crimes')

In [33]:
q = sqlc.sql('''
select fields.chrgdesc, count(fields.chrgdesc) as count
from crimes 
where fields.monthstamp=3
group by fields.chrgdesc
''')
q.show()


+--------------------+-----+
|            chrgdesc|count|
+--------------------+-----+
|ALL OTHER OFFENSE...|    1|
|TOWED/ABANDONED V...|  258|
| ASSIST OTHER AGENCY|   19|
|BURGLARY - FORCIB...|  929|
|SEX OFFENSE - STA...|    3|
|ROBBERY - INDIVIDUAL|  157|
|WEAPON VIOLATIONS...|    6|
|SEX OFFENSE - PEE...|    5|
|ALL OTHER OFFENSE...|    8|
|DRUG/NARCOTIC VIO...|   14|
|DRUG/NARCOTIC VIO...|   28|
|AGGRAVATED ASSAUL...|    1|
|LIQUOR LAW - POSS...|    2|
|ALL OTHER OFFENSE...|    3|
|EMBEZZLEMENT - WI...|    7|
|WEAPON VIOLATIONS...|    1|
|             RUNAWAY|   87|
|      MISSING PERSON|   16|
|SIMPLE ASSAULT-PH...|    3|
|ALL OTHER OFFENSE...|   22|
+--------------------+-----+
only showing top 20 rows

Convert to pandas


In [34]:
crimes_df = q.toPandas()
crimes_df.head()


Out[34]:
chrgdesc count
0 ALL OTHER OFFENSES-BIGAMY/MARRIAGE LAWS 1
1 TOWED/ABANDONED VEHICLE 258
2 ASSIST OTHER AGENCY 19
3 BURGLARY - FORCIBLE ENTRY 929
4 SEX OFFENSE - STATUTORY RAPE 3

DataFrame from SQLite3

The official docs suggest that this can be done directly via JDBC but I cannot get it to work. As a workaround, you can convert to JSON before importing as a dataframe. If anyone finds out how to load an SQLite3 database table directly into a Spark dataframe, please let me know.


In [35]:
from odo import odo

odo('sqlite:///../data/Chinook_Sqlite.sqlite::Album', 'Album.json')
df = sqlc.read.json('Album.json')
df.show(n=3)


+-------+--------+--------------------+
|AlbumId|ArtistId|               Title|
+-------+--------+--------------------+
|      1|       1|For Those About T...|
|      2|       2|   Balls to the Wall|
|      3|       2|   Restless and Wild|
+-------+--------+--------------------+
only showing top 3 rows

DataSets

In Scala and Java, Spark 1.6 introduced a new type called DataSet that combines the relational properties of a DataFrame with the functional methods of an RDD. This will be available in Python in a later version. However, because of the dynamic nature of Python, you can already call functional methods on a Spark Dataframe, giving most of the ease of use of the DataSet type.


In [36]:
ds = sqlc.read.text('../data/Ulysses.txt')

In [37]:
ds


Out[37]:
DataFrame[value: string]

In [38]:
ds.show(n=3)


+--------------------+
|               value|
+--------------------+
|The Project Guten...|
|                    |
|This eBook is for...|
+--------------------+
only showing top 3 rows


In [39]:
def remove_punctuation(s):
    import string
    return s.translate(dict.fromkeys(ord(c) for c in string.punctuation))

In [40]:
counts = (ds.map(lambda x: remove_punctuation(x[0]))
            .flatMap(lambda x: x.lower().strip().split())
            .filter(lambda x:  x!= '')
            .map(lambda x: (x, 1))
            .countByKey())

In [41]:
sorted(counts.items(), key=lambda x: x[1], reverse=True)[:10]


Out[41]:
[('the', 15107),
 ('of', 8257),
 ('and', 7282),
 ('a', 6553),
 ('to', 5042),
 ('in', 4981),
 ('he', 4033),
 ('his', 3333),
 ('i', 2698),
 ('that', 2621)]

Optional Exercise

The crime data set includes both date and geospatial information. Consider creating an interactive map visualization of crimes in Durham by date using the bokeh package. See this example to get started. GeoJSON version of the Durham Police Crime Reports can be downloaded.

Version information


In [42]:
%load_ext version_information
%version_information pyspark


Out[42]:
SoftwareVersion
Python3.5.1 64bit [GCC 4.2.1 (Apple Inc. build 5577)]
IPython4.0.3
OSDarwin 15.4.0 x86_64 i386 64bit
pysparkThe 'pyspark' distribution was not found and is required by the application
Wed Apr 20 11:54:43 2016 EDT

In [ ]: