In [1]:
import dask.distributed
import dask.dataframe as dd
import pandas as pd
import numpy as np
In [2]:
import geopandas
from shapely.geometry import Point
In [3]:
client = dask.distributed.Client()
In [4]:
def assign_taxi_zones(df, lon_var, lat_var, locid_var):
"""Joins DataFrame with Taxi Zones shapefile.
This function takes longitude values provided by `lon_var`, and latitude
values provided by `lat_var` in DataFrame `df`, and performs a spatial join
with the NYC taxi_zones shapefile.
The shapefile is hard coded in, as this function makes a hard assumption of
latitude and longitude coordinates. It also assumes latitude=0 and
longitude=0 is not a datapoint that can exist in your dataset. Which is
reasonable for a dataset of New York, but bad for a global dataset.
Only rows where `df.lon_var`, `df.lat_var` are reasonably near New York,
and `df.locid_var` is set to np.nan are updated.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
DataFrame containing latitudes, longitudes, and location_id columns.
lon_var : string
Name of column in `df` containing longitude values. Invalid values
should be np.nan.
lat_var : string
Name of column in `df` containing latitude values. Invalid values
should be np.nan
locid_var : string
Name of column in `df` containing taxi_zone location ids. Rows with
valid, nonzero values are not overwritten.
"""
localdf = df[[lon_var, lat_var, locid_var]].copy()
# localdf = localdf.reset_index()
localdf[lon_var] = localdf[lon_var].fillna(value=0.)
localdf[lat_var] = localdf[lat_var].fillna(value=0.)
localdf['replace_locid'] = (localdf[locid_var].isnull()
& (localdf[lon_var] != 0.)
& (localdf[lat_var] != 0.))
if (np.any(localdf['replace_locid'])):
shape_df = geopandas.read_file('../shapefiles/taxi_zones_latlon.shp')
shape_df.drop(['OBJECTID', "Shape_Area", "Shape_Leng", "borough", "zone"],
axis=1, inplace=True)
try:
local_gdf = geopandas.GeoDataFrame(
localdf, crs={'init': 'epsg:4326'},
geometry=[Point(xy) for xy in
zip(localdf[lon_var], localdf[lat_var])])
local_gdf = geopandas.sjoin(
local_gdf, shape_df, how='left', op='intersects')
# one point can intersect more than one zone -- for example if on
# the boundary between two zones. Deduplicate by taking first valid.
local_gdf = local_gdf[~local_gdf.index.duplicated(keep='first')]
local_gdf.LocationID.values[~local_gdf.replace_locid] = (
(local_gdf[locid_var])[~local_gdf.replace_locid]).values
return local_gdf.LocationID.rename(locid_var)
except ValueError as ve:
print(ve)
print(ve.stacktrace())
return df[locid_var]
else:
return df[locid_var]
In [5]:
df1 = dd.read_parquet('/bigdata/citibike.parquet')
In [6]:
df1['start_taxizone_id'] = np.nan
df1['end_taxizone_id'] = np.nan
In [7]:
df1.start_station_id.count().compute()
Out[7]:
In [8]:
df1.head()
Out[8]:
In [9]:
df1['start_taxizone_id'] = df1.map_partitions(
assign_taxi_zones, "start_station_longitude", "start_station_latitude",
"start_taxizone_id", meta=('start_taxizone_id', np.float64))
df1['end_taxizone_id'] = df1.map_partitions(
assign_taxi_zones, "end_station_longitude", "end_station_latitude",
"end_taxizone_id", meta=('end_taxizone_id', np.float64))
In [10]:
df1.head()
Out[10]:
In [11]:
df1.to_parquet('/bigdata/citibike_locid.parquet', has_nulls=True, compression="SNAPPY",
object_encoding='json')
In [14]:
df1 = dd.read_parquet('/bigdata/citibike_locid.parquet')
In [15]:
df1.start_station_id.count().compute()
Out[15]:
In [16]:
df2 = df1[df1.start_taxizone_id.notnull() & df1.end_taxizone_id.notnull()]
In [17]:
df2.start_station_id.count().compute()
Out[17]:
In [18]:
df2.to_parquet('/bigdata/citibike_locid_cleaned.parquet', has_nulls=True, compression="SNAPPY",
object_encoding='json')
In [10]:
df2 = spark.read.parquet('/bigdata/citibike_locid_cleaned.parquet')
In [12]:
help(df2.write.parquet)
In [13]:
df2.write.parquet('/bigdata/citibike_locid_cleaned_partitioned.parquet',
mode='overwrite', partitionBy='start_taxizone_id',
compression='SNAPPY'
)
In [19]:
df2 = dd.read_parquet('/bigdata/citibike_locid_cleaned.parquet')
In [23]:
%matplotlib inline
import matplotlib.pyplot as plt
In [26]:
zz =df2.start_taxizone_id.unique().values.compute()
In [28]:
sorted(zz)
Out[28]:
In [ ]: