One problem I came across when analyzing the New York City Taxi Dataset, is that from 2009 to June 2016, both the starting and stopping locations of taxi trips were given as longitude and latitude points. After July 2016, to provide a degree of anonymity when releasing data to the public, the Taxi and Limousine Commission (TLC) only provides the starting and ending "taxi zones" of a trip, and a shapefile that specifies the boundaries, available here. Let's load this up in Geopandas, and set the coordinate system to 'epsg:4326', which is latitude and longitude coordinates.
In [3]:
# <!-- collapse=True -->
import bokeh, bokeh.plotting, bokeh.models
from bokeh.io import output_notebook, show
output_notebook()
import geopandas as gpd
from shapely.geometry import Point
import urllib
import dask.dataframe as dd
import dask.distributed
import numpy as np
import sklearn.preprocessing
client = dask.distributed.Client()
coord_system = {'init': 'epsg:4326'}
df = gpd.read_file('../shapefiles/taxi_zones.shp').to_crs(coord_system)
df = df.drop(['Shape_Area', 'Shape_Leng', 'OBJECTID'], axis=1)
df.head()
Out[3]:
We see that the geometry column consists of polygons (from Shapely) that have vertices defined by longitude and latitude points. Let's plot using bokeh, in order of ascending LocationID.
In [4]:
# <!-- collapse=True -->
gjds = bokeh.models.GeoJSONDataSource(geojson=df.to_json())
TOOLS = "pan,wheel_zoom,reset,hover,save"
p = bokeh.plotting.figure(title="NYC Taxi Districts", tools=TOOLS,
x_axis_location=None, y_axis_location=None,
responsive=True)
color_mapper = bokeh.models.LinearColorMapper(palette=bokeh.palettes.Viridis256)
p.patches('xs', 'ys',
fill_color={'field': 'LocationID', 'transform': color_mapper},
fill_alpha=1., line_color="black", line_width=0.5,
source=gjds)
p.grid.grid_line_color = None
hover = p.select_one(bokeh.models.HoverTool)
hover.point_policy = "follow_mouse"
hover.tooltips = u"""
<div>
<div class="bokeh_hover_tooltip">Name : @zone</div>
<div class="bokeh_hover_tooltip">Borough : @borough</div>
<div class="bokeh_hover_tooltip">Zone ID : @LocationID</div>
<div class="bokeh_hover_tooltip">(Lon, Lat) : ($x ˚E, $y ˚N)</div>
</div>
"""
p.circle([-73.966,], [40.78,], size=10, fill_color='magenta', line_color='yellow', line_width=1, alpha=1.0)
show(p)
This is a familiar map of New York, with 262 taxi districts shown, colored by the id of the taxi district. I have added a random point (-73.966˚E, 40.78˚N) in magenta, which happens to fall in the middle of Central Park. Assigning a point as within a taxi zone is something humans can do easily, but on a computer it requires solving the point in polygon problem. Luckily the Shapely library provides an easy interface to such geometric operations in Python. But, point in polygon is computationally expensive, and using the Shapely library on 2.4 billion (latitude, longitude) pairs to assign taxi zones as in the NYC Taxi Dataset would take a modern single core cpu about four years. To speed this up, we calculate the bounding boxes for each taxi zone, which looks like:
In [5]:
# <!-- collapse=True -->
df2 = df.copy()
df2['geometry'] = df.geometry.envelope
df2['borough_categ'] = sklearn.preprocessing.LabelEncoder().fit_transform(df2['borough'])
gjds2 = bokeh.models.GeoJSONDataSource(geojson=df2.to_json())
TOOLS = "pan,wheel_zoom,reset,hover,save"
p = bokeh.plotting.figure(title="NYC Taxi Districts Bounding Boxes", tools=TOOLS,
x_axis_location=None, y_axis_location=None,
responsive=True)
color_mapper = bokeh.models.LinearColorMapper(palette=bokeh.palettes.Viridis256)
p.patches('xs', 'ys',
fill_color={'field': 'borough_categ', 'transform': color_mapper},
fill_alpha=0.7, line_color="black", line_width=0.5,
source=gjds2)
p.grid.grid_line_color = None
hover = p.select_one(bokeh.models.HoverTool)
hover.point_policy = "follow_mouse"
hover.tooltips = u"""
<div>
<div class="bokeh_hover_tooltip">Name : @zone</div>
<div class="bokeh_hover_tooltip">Borough : @borough</div>
<div class="bokeh_hover_tooltip">Zone ID : @LocationID</div>
<div class="bokeh_hover_tooltip">(Lon, Lat) : ($x ˚E, $y ˚N)</div>
</div>
"""
p.circle([-73.966,], [40.78,], size=10, fill_color='magenta', line_color='yellow', line_width=1, alpha=1.0)
show(p)
Now, given a (longitude, latitude) coordinate pair, bounding boxes that contain that pair can be efficiently calculated with an R-tree. Only the polygons (taxi zones) that have bounding boxes that contain the coordinate pair need to be examined, and then the point in Polygon is solved for those (hopefully) few taxi zones. This reduces computation by a factor of about 100-1000. This process, assigning coordinate pairs to taxi zones is one example of a spatial join. Geopandas provides a nice interface to efficient spatial joins in Python, and it takes care of calculating bounding boxes and R-trees for you, as this snippet shows.
In [6]:
gpd.sjoin(gpd.GeoDataFrame(crs={'init': 'epsg:4326'}, geometry=[Point(-73.966, 40.78)]),
df,
how='left', op='within')
Out[6]:
This does the merge for a single point (drawn in magenta) on maps above, and correctly identifies it in Central Park
In my NYC transit project, I download and process the Taxi dataset. Here I load up a single file from the taxi dataset (May 2016) into Dask, and show the first few rows and a few columns. The file is a bit large at 1.8GB, and Dask chooses to divide up the dataframe into 30 partitions for efficient calculations. Each partition is a pandas DataFrame, and dask takes care of all the logic to view the combination as a single DataFrame. Here are a few columns.
In [7]:
# <!-- collapse=True -->
!wget --quiet --continue 'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2016-05.csv'
trips = dd.read_csv('yellow_tripdata_2016-05.csv')
print("There are {} partitions. ".format(trips.npartitions))
trips[('tpep_pickup_datetime pickup_longitude'
' pickup_latitude').split()].head()
Out[7]:
In [8]:
# <!-- collapse=True -->
trips[('tpep_dropoff_datetime dropoff_longitude'
' dropoff_latitude').split()].head()
Out[8]:
So each trip has pickup and dropoff (longitude, latitude) coordinate pairs. Just to give you a feel for the data, I plot the start and end locations of the first trip, ending in the East Village. Driving directions come with a great deal of additional complexity, so here I just plot an arrow, as the crow flies. A spatial join identifies the taxi zones as Clinton East and East Village.
In [9]:
# <!-- collapse=True -->
x, y = (trips[('tpep_pickup_datetime pickup_longitude'
' pickup_latitude').split()].head()).iloc[0, 1:3]
x2, y2 = (trips[('tpep_dropoff_datetime dropoff_longitude'
' dropoff_latitude').split()].head()).iloc[0, 1:3]
df['borough_categ'] = sklearn.preprocessing.LabelEncoder().fit_transform(df['borough'])
gjds = bokeh.models.GeoJSONDataSource(geojson=df.to_json())
TOOLS = "pan,wheel_zoom,reset,hover,save"
p = bokeh.plotting.figure(title="NYC Taxi Districts", tools=TOOLS,
x_axis_location=None, y_axis_location=None,
responsive=True)
color_mapper = bokeh.models.LinearColorMapper(palette=bokeh.palettes.Viridis256)
p.patches('xs', 'ys',
fill_color={'field': 'borough_categ', 'transform': color_mapper},
fill_alpha=1., line_color="black", line_width=0.5,
source=gjds)
p.grid.grid_line_color = None
hover = p.select_one(bokeh.models.HoverTool)
hover.point_policy = "follow_mouse"
hover.tooltips = u"""
<div>
<div class="bokeh_hover_tooltip">Name : @zone</div>
<div class="bokeh_hover_tooltip">Borough : @borough</div>
<div class="bokeh_hover_tooltip">Zone ID : @LocationID</div>
<div class="bokeh_hover_tooltip">(Lon, Lat) : ($x ˚E, $y ˚N)</div>
</div>
"""
p.circle([x,], [y,], size=13,
fill_color='aqua', line_color='black',
line_width=1, alpha=1.0)
p.circle([x2,], [y2,], size=13,
fill_color='aqua', line_color='black',
line_width=1, alpha=1.0)
p.add_layout(bokeh.models.Arrow(line_color='orange', line_width=4,
end=bokeh.models.OpenHead(line_color="orange", line_width=4),
x_start=x, y_start=y, x_end=x2, y_end=y2))
p.x_range=bokeh.models.Range1d(-74.02, -73.92)
p.y_range=bokeh.models.Range1d(40.70, 40.78)
show(p)
In [10]:
gpd.sjoin(gpd.GeoDataFrame(crs={'init': 'epsg:4326'},
geometry=[Point(x, y), Point(x2, y2)]),
df,
how='left', op='within')
Out[10]:
So, Dask DataFrames are just collections of Pandas DataFrames, and I know how to perform a spatial join on a Pandas DataFrame. Let's take advantage of Dask's map_partitions function to do a spatial join with the taxi zones on every partition. Here is the function to do the spatial join, given a Pandas DataFrame, and the names of the longitude, latitude, and taxizone id columns.
In [11]:
# <!-- collapse=False -->
def assign_taxi_zones(df, lon_var, lat_var, locid_var):
"""Joins DataFrame with Taxi Zones shapefile.
This function takes longitude values provided by `lon_var`, and latitude
values provided by `lat_var` in DataFrame `df`, and performs a spatial join
with the NYC taxi_zones shapefile.
The shapefile is hard coded in, as this function makes a hard assumption of
latitude and longitude coordinates. It also assumes latitude=0 and
longitude=0 is not a datapoint that can exist in your dataset. Which is
reasonable for a dataset of New York, but bad for a global dataset.
Only rows where `df.lon_var`, `df.lat_var` are reasonably near New York,
and `df.locid_var` is set to np.nan are updated.
Parameters
----------
df : pandas.DataFrame or dask.DataFrame
DataFrame containing latitudes, longitudes, and location_id columns.
lon_var : string
Name of column in `df` containing longitude values. Invalid values
should be np.nan.
lat_var : string
Name of column in `df` containing latitude values. Invalid values
should be np.nan
locid_var : string
Name of series to return.
"""
import geopandas
from shapely.geometry import Point
# make a copy since we will modify lats and lons
localdf = df[[lon_var, lat_var]].copy()
# missing lat lon info is indicated by nan. Fill with zero
# which is outside New York shapefile.
localdf[lon_var] = localdf[lon_var].fillna(value=0.)
localdf[lat_var] = localdf[lat_var].fillna(value=0.)
shape_df = geopandas.read_file('../shapefiles/taxi_zones.shp')
shape_df.drop(['OBJECTID', "Shape_Area", "Shape_Leng", "borough", "zone"],
axis=1, inplace=True)
shape_df = shape_df.to_crs({'init': 'epsg:4326'})
try:
local_gdf = geopandas.GeoDataFrame(
localdf, crs={'init': 'epsg:4326'},
geometry=[Point(xy) for xy in
zip(localdf[lon_var], localdf[lat_var])])
local_gdf = geopandas.sjoin(
local_gdf, shape_df, how='left', op='within')
return local_gdf.LocationID.rename(locid_var)
except ValueError as ve:
print(ve)
print(ve.stacktrace())
series = localdf[lon_var]
series = np.nan
return series
Using the map_partitions function, I apply the spatial join to each of the Pandas DataFrames that make up the Dask DataFrame. For simplicity, I just call the function twice, once for pickup locations, and once for dropoff locations. To assist dask in determining schema of returned data, we specify it as a column of floats (allowing for NaN values).
In [12]:
# <!-- collapse=False -->
trips['pickup_taxizone_id'] = trips.map_partitions(
assign_taxi_zones, "pickup_longitude", "pickup_latitude",
"pickup_taxizone_id", meta=('pickup_taxizone_id', np.float64))
trips['dropoff_taxizone_id'] = trips.map_partitions(
assign_taxi_zones, "dropoff_longitude", "dropoff_latitude",
"dropoff_taxizone_id", meta=('dropoff_taxizone_id', np.float64))
trips[['pickup_taxizone_id', 'dropoff_taxizone_id']].head()
Out[12]:
At this point, the trips Dask DataFrame will have valid taxizone_id information. Lets save this data to Parquet, which is a columnar format that is well supported in Dask and Apache Spark. This prevents Dask from recalculating the spatial join (very expensive) every time an operation on the trips DataFrame is required.
In [13]:
trips.to_parquet('trips_2016-05.parquet', has_nulls=True, object_encoding='json', compression="SNAPPY")
trips = dd.read_parquet('trips_2016-05.parquet', columns=['pickup_taxizone_id', 'dropoff_taxizone_id'])
To close this post out, I'll produce a heatmap of taxi dropoff locations, aggregated by taxi zone using Dask. Unsurprisingly (for New Yorkers at least) the vast majority of taxi dropoffs tend to be in Downtown and Midtown Manhattan. I will analyze this dataset further in future posts.
In [14]:
# <!-- collapse=True -->
counts = trips.groupby('dropoff_taxizone_id').count().compute()
counts.columns = ['N']
counts2 = df.merge(counts, left_on='LocationID', right_index=True, how='left')
gjds = bokeh.models.GeoJSONDataSource(geojson=counts2.to_json())
TOOLS = "pan,wheel_zoom,reset,hover,save"
p = bokeh.plotting.figure(title="NYC Taxi Dropoffs Heatmap", tools=TOOLS,
x_axis_location=None, y_axis_location=None,
responsive=True)
color_mapper = bokeh.models.LogColorMapper(palette=bokeh.palettes.Viridis256, low=1, high=500000)
p.patches('xs', 'ys',
fill_color={'field': 'N', 'transform': color_mapper},
fill_alpha=1., line_color="black", line_width=0.5,
source=gjds)
p.grid.grid_line_color = None
hover = p.select_one(bokeh.models.HoverTool)
hover.point_policy = "follow_mouse"
hover.tooltips = u"""
<div>
<div class="bokeh_hover_tooltip">Name : @zone</div>
<div class="bokeh_hover_tooltip">Borough : @borough</div>
<div class="bokeh_hover_tooltip">Trips Start : @N</div>
</div>
"""
color_bar = bokeh.models.ColorBar(
color_mapper=color_mapper, orientation='horizontal',
ticker=bokeh.models.FixedTicker(ticks=[3, 10, 30, 100, 300, 1000, 3000, 10000, 30000, 100000, 300000]),
formatter=bokeh.models.PrintfTickFormatter(format='%d'),
label_standoff=12, border_line_color=None, location=(0,0))
p.add_layout(color_bar, 'below')
show(p)
In this post I described the process of a Spatial Join, and doing the Spatial Join at scale on a cluster using Dask and Pandas. I glossed over some details that are important for the full New York Taxi Dataset, but my full code is available here on Github. In future posts, I will analyze this data more thoroughly, and possibly look into releasing the processed data as a parquet file for others to analyze.
The spatial join as written above with GeoPandas, using the New York Taxi Dataset, can assign taxi zones to approxmately 40 million taxi trips per hour on a 4 GHz 4-core i5 system. A lot of the code that supports this join is some amalgamation of Python and wrapped C code.
This is approxmately a factor of two slower than performing the same spatial join in highly optimized PostGIS C/C++ code. However, PostGIS does not efficiently use multiple cores (at least without multiple spatial joins running simultaneously), and more importantly, the network and serialization overhead of a round trip to the PostgreSQL database puts PostgreSQL/PostGIS at roughly the same speed as the GeoPandas implementation I describe in this post, with vastly more moving parts to break.
Basically, Python is actually pretty fast for these kinds of data structure operations.