San Francisco Crime Dataset Conversion

Challenge

  • Spark does not support out-of-the box data frame creation from CSV files.
  • The CSV reader from Databricks provides such functionality but requires an extra library.
df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('train.csv')

Solution

Read scv files and create data frame manually.


In [1]:
import csv
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from StringIO import StringIO
from datetime import *
from dateutil.parser import parse

Initialize contexts and input file:


In [2]:
sc = pyspark.SparkContext('local[*]')
sqlContext = SQLContext(sc)

textRDD = sc.textFile("../../data/sf-crime/train.csv.bz2")

textRDD.count()


Out[2]:
878050

Remove header row from input file:


In [3]:
header = textRDD.first()
textRDD = textRDD.filter(lambda line: not line == header)

Define data schema:


In [4]:
fields = [StructField(field_name, StringType(), True) for field_name in header.split(',')]
fields[0].dataType = TimestampType()
fields[7].dataType = FloatType()
fields[8].dataType = FloatType()

schema = StructType(fields)

Parse CSV lines and transform values into tuples:


In [5]:
# parse each csv line (fields may contain enclosed ',' in parantheses) and split into tuples
tupleRDD = textRDD \
    .map(lambda line: next(csv.reader(StringIO(line)))) \
    .map(lambda x: (parse(x[0]), x[1], x[2], x[3], x[4], x[5], x[6], float(x[7]), float(x[8])))
         
df = sqlContext.createDataFrame(tupleRDD, schema)

Write DataFrame as parquet file:


In [6]:
df.write.save("../../data/sf-crime/train.parquet")