In [1]:
import codecs, json
import dask.dataframe as dd
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
In [2]:
year = '2018'
data_dir = '../data/' + year + '/'
file_name = 'chicago-crimes-' + year
In [3]:
%%time
# set input data file path
parquet_data_dir = data_dir + 'crimes-' + year + '.snappy.parq'
print('Loading crime data from: {}'.format(parquet_data_dir))
# load crimes parquet data into dask df
crimes = dd.read_parquet(parquet_data_dir, index='Date')
# load all data into memory
crimes = crimes.persist()
print('Crime data loaded into memory.')
# log records count and data frame stats
print('Crime data stats:')
print('---------------------------------------')
print('{:,} total records in {} partitions'.format(len(crimes), crimes.npartitions))
print('DataFrame size: {:,}'.format(crimes.size.compute()))
In [4]:
crimes
Out[4]:
In [5]:
# get crime geo data for mapping, drop na
crime_geo = crimes[['PrimaryType',
'Block',
'Description',
'LocationDescription',
'CommunityArea',
'Arrest',
'Domestic',
'Latitude',
'Longitude',
'Ward']].dropna()
print('All Crimes:', len(crime_geo))
In [6]:
# converts crimes data to json
def to_json_file(file_path, data):
json.dump(data,
codecs.open(file_path, 'w', encoding='utf-8'),
separators=(',', ':'), sort_keys=False, indent=0)
In [7]:
%%time
# output crimes data in raw json to see how large it gets
geo_data_columns = ['Latitude', 'Longitude', 'Block', 'LocationDescription',
'PrimaryType', 'Description', 'Arrest', 'Domestic', 'Ward']
to_json_file(data_dir + file_name + '.json',
crime_geo[geo_data_columns].compute().values.tolist())
In [8]:
%%time
# dish it out in snappy parquet for comparison
crime_geo.to_parquet(data_dir + file_name + '.parquet', compression='SNAPPY')
In [9]:
# create pandas dataframe for conversion to arrow
crime_geo_df = crime_geo[geo_data_columns].compute()
crime_geo_df.info()
In [10]:
# convert pandas data frame to arrow table
crime_geo_table = pa.Table.from_pandas(crime_geo_df)
crime_geo_table
Out[10]:
In [11]:
%%time
# write arrow table to a single parquet file, just to test it
pq.write_table(crime_geo_table, data_dir + file_name + '.parq')
In [12]:
%%time
# read parquet file created with arrow with dask for compatibility check
ddf = dd.read_parquet(data_dir + file_name + '.parq', index='Date')
In [13]:
print('{:,} total records in {} partitions'.format(len(ddf), ddf.npartitions))
print('DataFrame size: {:,}'.format(ddf.size.compute()))
ddf
Out[13]:
In [14]:
%%time
# read parquet file with arrow
table = pq.read_table(data_dir + file_name + '.parq')
In [15]:
table
Out[15]:
In [16]:
%%time
# convert it to pandas data frame
df = table.to_pandas()
In [17]:
df.info()
In [18]:
%%time
# write arrow stream to disk
writer = pa.RecordBatchFileWriter(data_dir + file_name + '.arrow', table.schema)
writer.write_table(table)
writer.close()
In [19]:
%%time
# read back binary arrow file from disk
reader = pa.RecordBatchFileReader(data_dir + file_name + '.arrow')
read_table = reader.read_all()
In [20]:
read_table
Out[20]: