In [1]:
from pyspark import SparkContext, SparkConf
conf = (SparkConf()
.setAppName('SparkSQL')
.setMaster('local[*]'))
In [2]:
sc = SparkContext(conf=conf)
In [3]:
from pyspark.sql import SQLContext
sqlc = SQLContext(sc)
In [4]:
pandas_df = sns.load_dataset('iris')
In [5]:
spark_df = sqlc.createDataFrame(pandas_df)
In [6]:
spark_df.show(n=3)
In [7]:
%%bash
cat data/cars.csv
In [8]:
from pyspark.sql.types import *
def pad(alist):
tmp = alist[:]
n = 5 - len(alist)
for i in range(n):
tmp.append('')
return tmp
# Load a text file and convert each line to a tuple.
lines = sc.textFile('data/cars.csv')
header = lines.first() #extract header
lines = lines.filter(lambda line: line != header)
lines = lines.filter(lambda line: line)
parts = lines.map(lambda l: l.split(','))
parts = parts.map(lambda part: pad(part))
fields = [
StructField('year', IntegerType(), True),
StructField('make', StringType(), True),
StructField('model', StringType(), True),
StructField('comment', StringType(), True),
StructField('blank', StringType(), True),
]
schema = StructType(fields)
# Apply the schema to the RDD.
df0 = sqlc.createDataFrame(parts, schema)
df0.show(n=3)
In [9]:
df = (sqlc.read.format('com.databricks.spark.csv')
.options(header='true', inferschema='true')
.load('data/cars.csv'))
In [10]:
df.printSchema()
In [11]:
df.show()
In [15]:
df.select(['year', 'make']).show()
In [22]:
df.registerTempTable('cars')
In [23]:
q = sqlc.sql('select year, make from cars where year > 2000')
q.show()
In [24]:
q_df = q.toPandas()
q_df
Out[24]:
It is easier to read in JSON than CSV files because JSON is self-describing, allowing Spark SQL to infer the appropriate schema without additional hints.
As an example, we will look at Durham police crime reports from the Durham Open Data website.
In [25]:
df = sqlc.read.json('data/durham-police-crime-reports.json')
How many records are there?
In [26]:
df.count()
Out[26]:
Since this is JSON, it is possible to have a nested schema.
In [27]:
df.printSchema()
Show the top few rows.
In [28]:
df.show(n=5)
Make a dataframe only containing date and charges.
In [29]:
df.select(['fields.strdate', 'fields.chrgdesc']).show(n=5)
Show distinct charges - note that for an actual analysis, you would probably want to consolidate these into a smaller number of groups to account for typos, etc.
In [30]:
df.select('fields.chrgdesc').distinct().show()
What charges are the most common?
In [31]:
df.groupby('fields.chrgdesc').count().sort('count', ascending=False).show()
In [32]:
df.registerTempTable('crimes')
In [33]:
q = sqlc.sql('''
select fields.chrgdesc, count(fields.chrgdesc) as count
from crimes
where fields.monthstamp=3
group by fields.chrgdesc
''')
q.show()
In [34]:
crimes_df = q.toPandas()
crimes_df.head()
Out[34]:
The official docs suggest that this can be done directly via JDBC but I cannot get it to work. As a workaround, you can convert to JSON before importing as a dataframe. If anyone finds out how to load an SQLite3 database table directly into a Spark dataframe, please let me know.
In [35]:
from odo import odo
odo('sqlite:///../data/Chinook_Sqlite.sqlite::Album', 'Album.json')
df = sqlc.read.json('Album.json')
df.show(n=3)
In Scala and Java, Spark 1.6 introduced a new type called DataSet
that combines the relational properties of a DataFrame
with the functional methods of an RDD
. This will be available in Python in a later version. However, because of the dynamic nature of Python, you can already call functional methods on a Spark Dataframe
, giving most of the ease of use of the DataSet
type.
In [36]:
ds = sqlc.read.text('../data/Ulysses.txt')
In [37]:
ds
Out[37]:
In [38]:
ds.show(n=3)
In [39]:
def remove_punctuation(s):
import string
return s.translate(dict.fromkeys(ord(c) for c in string.punctuation))
In [40]:
counts = (ds.map(lambda x: remove_punctuation(x[0]))
.flatMap(lambda x: x.lower().strip().split())
.filter(lambda x: x!= '')
.map(lambda x: (x, 1))
.countByKey())
In [41]:
sorted(counts.items(), key=lambda x: x[1], reverse=True)[:10]
Out[41]:
Optional Exercise
The crime data set includes both date and geospatial information. Consider creating an interactive map visualization of crimes in Durham by date using the bokeh
package. See this example to get started. GeoJSON version of the Durham Police Crime Reports can be downloaded.
In [42]:
%load_ext version_information
%version_information pyspark
Out[42]:
In [ ]: