In [1]:
import codecs, json
import dask.dataframe as dd
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

In [2]:
year = '2018'
data_dir = '../data/' + year + '/'
file_name = 'chicago-crimes-' + year

In [3]:
%%time
# set input data file path
parquet_data_dir = data_dir + 'crimes-' + year + '.snappy.parq'
print('Loading crime data from: {}'.format(parquet_data_dir))

# load crimes parquet data into dask df
crimes = dd.read_parquet(parquet_data_dir, index='Date')

# load all data into memory
crimes = crimes.persist()
print('Crime data loaded into memory.')

# log records count and data frame stats
print('Crime data stats:')
print('---------------------------------------')
print('{:,} total records in {} partitions'.format(len(crimes), crimes.npartitions))
print('DataFrame size: {:,}'.format(crimes.size.compute()))


Loading crime data from: ../data/2018/crimes-2018.snappy.parq
Crime data loaded into memory.
Crime data stats:
---------------------------------------
157,504 total records in 1 partitions
DataFrame size: 2,205,056
Wall time: 610 ms

In [4]:
crimes


Out[4]:
Dask DataFrame Structure:
Block PrimaryType FBICode Description LocationDescription CommunityArea Beat District Ward Arrest Domestic Latitude Longitude Year
npartitions=1
object int8 int8 int16 int8 int8 int16 int8 int8 bool bool float64 float64 int8
... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: read-parquet, 1 tasks

In [5]:
# get crime geo data for mapping, drop na
crime_geo = crimes[['PrimaryType',
                    'Block',
                    'Description',
                    'LocationDescription',
                    'CommunityArea',
                    'Arrest',
                    'Domestic',
                    'Latitude', 
                    'Longitude',
                    'Ward']].dropna()
print('All Crimes:', len(crime_geo))


All Crimes: 156385

In [6]:
# converts crimes data to json
def to_json_file(file_path, data):
    json.dump(data, 
          codecs.open(file_path, 'w', encoding='utf-8'), 
          separators=(',', ':'), sort_keys=False, indent=0)

In [7]:
%%time
# output crimes data in raw json to see how large it gets
geo_data_columns = ['Latitude', 'Longitude', 'Block', 'LocationDescription', 
                    'PrimaryType', 'Description', 'Arrest', 'Domestic', 'Ward']
to_json_file(data_dir + file_name + '.json', 
  crime_geo[geo_data_columns].compute().values.tolist())


Wall time: 5.81 s

In [8]:
%%time
# dish it out in snappy parquet for comparison
crime_geo.to_parquet(data_dir + file_name + '.parquet', compression='SNAPPY')


Wall time: 486 ms

In [9]:
# create pandas dataframe for conversion to arrow
crime_geo_df = crime_geo[geo_data_columns].compute()
crime_geo_df.info()


<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 156385 entries, 2018-01-01 00:00:00 to 2018-08-08 23:59:00
Data columns (total 9 columns):
Latitude               156385 non-null float64
Longitude              156385 non-null float64
Block                  156385 non-null object
LocationDescription    156385 non-null object
PrimaryType            156385 non-null object
Description            156385 non-null object
Arrest                 156385 non-null bool
Domestic               156385 non-null bool
Ward                   156385 non-null float64
dtypes: bool(2), float64(3), object(4)
memory usage: 9.8+ MB

In [10]:
# convert pandas data frame to arrow table
crime_geo_table = pa.Table.from_pandas(crime_geo_df)
crime_geo_table


Out[10]:
pyarrow.Table
Latitude: double
Longitude: double
Block: string
LocationDescription: string
PrimaryType: string
Description: string
Arrest: bool
Domestic: bool
Ward: double
Date: timestamp[ns]
metadata
--------
{b'pandas': b'{"index_columns": ["Date"], "column_indexes": [{"name": null, "f'
            b'ield_name": null, "pandas_type": "unicode", "numpy_type": "objec'
            b't", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "L'
            b'atitude", "field_name": "Latitude", "pandas_type": "float64", "n'
            b'umpy_type": "float64", "metadata": null}, {"name": "Longitude", '
            b'"field_name": "Longitude", "pandas_type": "float64", "numpy_type'
            b'": "float64", "metadata": null}, {"name": "Block", "field_name":'
            b' "Block", "pandas_type": "unicode", "numpy_type": "object", "met'
            b'adata": null}, {"name": "LocationDescription", "field_name": "Lo'
            b'cationDescription", "pandas_type": "unicode", "numpy_type": "obj'
            b'ect", "metadata": null}, {"name": "PrimaryType", "field_name": "'
            b'PrimaryType", "pandas_type": "unicode", "numpy_type": "object", '
            b'"metadata": null}, {"name": "Description", "field_name": "Descri'
            b'ption", "pandas_type": "unicode", "numpy_type": "object", "metad'
            b'ata": null}, {"name": "Arrest", "field_name": "Arrest", "pandas_'
            b'type": "bool", "numpy_type": "bool", "metadata": null}, {"name":'
            b' "Domestic", "field_name": "Domestic", "pandas_type": "bool", "n'
            b'umpy_type": "bool", "metadata": null}, {"name": "Ward", "field_n'
            b'ame": "Ward", "pandas_type": "float64", "numpy_type": "float64",'
            b' "metadata": null}, {"name": "Date", "field_name": "Date", "pand'
            b'as_type": "datetime", "numpy_type": "datetime64[ns]", "metadata"'
            b': null}], "pandas_version": "0.23.0"}'}

In [11]:
%%time
# write arrow table to a single parquet file, just to test it
pq.write_table(crime_geo_table, data_dir + file_name + '.parq')


Wall time: 173 ms

In [12]:
%%time
# read parquet file created with arrow with dask for compatibility check
ddf = dd.read_parquet(data_dir + file_name + '.parq', index='Date')


Wall time: 11.7 ms

In [13]:
print('{:,} total records in {} partitions'.format(len(ddf), ddf.npartitions))
print('DataFrame size: {:,}'.format(ddf.size.compute()))
ddf


156,385 total records in 1 partitions
DataFrame size: 1,407,465
Out[13]:
Dask DataFrame Structure:
Latitude Longitude Block LocationDescription PrimaryType Description Arrest Domestic Ward
npartitions=1
float64 float64 object object object object bool bool float64
... ... ... ... ... ... ... ... ...
Dask Name: read-parquet, 1 tasks

In [14]:
%%time
# read parquet file with arrow
table = pq.read_table(data_dir + file_name + '.parq')


Wall time: 75.2 ms

In [15]:
table


Out[15]:
pyarrow.Table
Latitude: double
Longitude: double
Block: string
LocationDescription: string
PrimaryType: string
Description: string
Arrest: bool
Domestic: bool
Ward: double
Date: timestamp[us]
metadata
--------
{b'pandas': b'{"index_columns": ["Date"], "column_indexes": [{"name": null, "f'
            b'ield_name": null, "pandas_type": "unicode", "numpy_type": "objec'
            b't", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "L'
            b'atitude", "field_name": "Latitude", "pandas_type": "float64", "n'
            b'umpy_type": "float64", "metadata": null}, {"name": "Longitude", '
            b'"field_name": "Longitude", "pandas_type": "float64", "numpy_type'
            b'": "float64", "metadata": null}, {"name": "Block", "field_name":'
            b' "Block", "pandas_type": "unicode", "numpy_type": "object", "met'
            b'adata": null}, {"name": "LocationDescription", "field_name": "Lo'
            b'cationDescription", "pandas_type": "unicode", "numpy_type": "obj'
            b'ect", "metadata": null}, {"name": "PrimaryType", "field_name": "'
            b'PrimaryType", "pandas_type": "unicode", "numpy_type": "object", '
            b'"metadata": null}, {"name": "Description", "field_name": "Descri'
            b'ption", "pandas_type": "unicode", "numpy_type": "object", "metad'
            b'ata": null}, {"name": "Arrest", "field_name": "Arrest", "pandas_'
            b'type": "bool", "numpy_type": "bool", "metadata": null}, {"name":'
            b' "Domestic", "field_name": "Domestic", "pandas_type": "bool", "n'
            b'umpy_type": "bool", "metadata": null}, {"name": "Ward", "field_n'
            b'ame": "Ward", "pandas_type": "float64", "numpy_type": "float64",'
            b' "metadata": null}, {"name": "Date", "field_name": "Date", "pand'
            b'as_type": "datetime", "numpy_type": "datetime64[ns]", "metadata"'
            b': null}], "pandas_version": "0.23.0"}'}

In [16]:
%%time
# convert it to pandas data frame
df = table.to_pandas()


Wall time: 63.5 ms

In [17]:
df.info()


<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 156385 entries, 2018-01-01 00:00:00 to 2018-08-08 23:59:00
Data columns (total 9 columns):
Latitude               156385 non-null float64
Longitude              156385 non-null float64
Block                  156385 non-null object
LocationDescription    156385 non-null object
PrimaryType            156385 non-null object
Description            156385 non-null object
Arrest                 156385 non-null bool
Domestic               156385 non-null bool
Ward                   156385 non-null float64
dtypes: bool(2), float64(3), object(4)
memory usage: 9.8+ MB

In [18]:
%%time
# write arrow stream to disk
writer = pa.RecordBatchFileWriter(data_dir + file_name + '.arrow', table.schema)
writer.write_table(table)
writer.close()


Wall time: 265 ms

In [19]:
%%time
# read back binary arrow file from disk
reader = pa.RecordBatchFileReader(data_dir + file_name + '.arrow')
read_table = reader.read_all()


Wall time: 4.88 ms

In [20]:
read_table


Out[20]:
pyarrow.Table
Latitude: double
Longitude: double
Block: string
LocationDescription: string
PrimaryType: string
Description: string
Arrest: bool
Domestic: bool
Ward: double
Date: timestamp[us]
metadata
--------
{b'pandas': b'{"index_columns": ["Date"], "column_indexes": [{"name": null, "f'
            b'ield_name": null, "pandas_type": "unicode", "numpy_type": "objec'
            b't", "metadata": {"encoding": "UTF-8"}}], "columns": [{"name": "L'
            b'atitude", "field_name": "Latitude", "pandas_type": "float64", "n'
            b'umpy_type": "float64", "metadata": null}, {"name": "Longitude", '
            b'"field_name": "Longitude", "pandas_type": "float64", "numpy_type'
            b'": "float64", "metadata": null}, {"name": "Block", "field_name":'
            b' "Block", "pandas_type": "unicode", "numpy_type": "object", "met'
            b'adata": null}, {"name": "LocationDescription", "field_name": "Lo'
            b'cationDescription", "pandas_type": "unicode", "numpy_type": "obj'
            b'ect", "metadata": null}, {"name": "PrimaryType", "field_name": "'
            b'PrimaryType", "pandas_type": "unicode", "numpy_type": "object", '
            b'"metadata": null}, {"name": "Description", "field_name": "Descri'
            b'ption", "pandas_type": "unicode", "numpy_type": "object", "metad'
            b'ata": null}, {"name": "Arrest", "field_name": "Arrest", "pandas_'
            b'type": "bool", "numpy_type": "bool", "metadata": null}, {"name":'
            b' "Domestic", "field_name": "Domestic", "pandas_type": "bool", "n'
            b'umpy_type": "bool", "metadata": null}, {"name": "Ward", "field_n'
            b'ame": "Ward", "pandas_type": "float64", "numpy_type": "float64",'
            b' "metadata": null}, {"name": "Date", "field_name": "Date", "pand'
            b'as_type": "datetime", "numpy_type": "datetime64[ns]", "metadata"'
            b': null}], "pandas_version": "0.23.0"}'}