Part 1 : A Gentle Introduction to the Spatial Join

One problem I came across when analyzing the New York City Taxi Dataset, is that from 2009 to June 2016, both the starting and stopping locations of taxi trips were given as longitude and latitude points. After July 2016, to provide a degree of anonymity when releasing data to the public, the Taxi and Limousine Commission (TLC) only provides the starting and ending "taxi zones" of a trip, and a shapefile that specifies the boundaries, available here. Let's load this up in Geopandas, and set the coordinate system to 'epsg:4326', which is latitude and longitude coordinates.


In [3]:
# <!-- collapse=True -->
import bokeh, bokeh.plotting, bokeh.models
from bokeh.io import output_notebook, show
output_notebook()

import geopandas as gpd
from shapely.geometry import Point
import urllib
import dask.dataframe as dd
import dask.distributed
import numpy as np

import sklearn.preprocessing

client = dask.distributed.Client()

coord_system = {'init': 'epsg:4326'}
df = gpd.read_file('../shapefiles/taxi_zones.shp').to_crs(coord_system)
df = df.drop(['Shape_Area', 'Shape_Leng', 'OBJECTID'], axis=1)
df.head()


Loading BokehJS ...
Out[3]:
LocationID borough geometry zone
0 1 EWR POLYGON ((-74.18445299999996 40.6949959999999,... Newark Airport
1 2 Queens (POLYGON ((-73.82337597260663 40.6389870471767... Jamaica Bay
2 3 Bronx POLYGON ((-73.84792614099985 40.87134223399991... Allerton/Pelham Gardens
3 4 Manhattan POLYGON ((-73.97177410965318 40.72582128133705... Alphabet City
4 5 Staten Island POLYGON ((-74.17421738099989 40.56256808599987... Arden Heights

We see that the geometry column consists of polygons (from Shapely) that have vertices defined by longitude and latitude points. Let's plot using bokeh, in order of ascending LocationID.


In [4]:
# <!-- collapse=True -->
gjds = bokeh.models.GeoJSONDataSource(geojson=df.to_json())
TOOLS = "pan,wheel_zoom,reset,hover,save"

p = bokeh.plotting.figure(title="NYC Taxi Districts", tools=TOOLS,
    x_axis_location=None, y_axis_location=None, 
    responsive=True)

color_mapper = bokeh.models.LinearColorMapper(palette=bokeh.palettes.Viridis256)

p.patches('xs', 'ys', 
          fill_color={'field': 'LocationID', 'transform': color_mapper},
          fill_alpha=1., line_color="black", line_width=0.5,          
          source=gjds)

p.grid.grid_line_color = None

hover = p.select_one(bokeh.models.HoverTool)
hover.point_policy = "follow_mouse"
hover.tooltips = u"""
<div> 
    <div class="bokeh_hover_tooltip">Name : @zone</div>
    <div class="bokeh_hover_tooltip">Borough : @borough</div>
    <div class="bokeh_hover_tooltip">Zone ID : @LocationID</div>
    <div class="bokeh_hover_tooltip">(Lon, Lat) : ($x ˚E, $y ˚N)</div>
</div>
"""

p.circle([-73.966,], [40.78,], size=10, fill_color='magenta', line_color='yellow', line_width=1, alpha=1.0)

show(p)


This is a familiar map of New York, with 262 taxi districts shown, colored by the id of the taxi district. I have added a random point (-73.966˚E, 40.78˚N) in magenta, which happens to fall in the middle of Central Park. Assigning a point as within a taxi zone is something humans can do easily, but on a computer it requires solving the point in polygon problem. Luckily the Shapely library provides an easy interface to such geometric operations in Python. But, point in polygon is computationally expensive, and using the Shapely library on 2.4 billion (latitude, longitude) pairs to assign taxi zones as in the NYC Taxi Dataset would take a modern single core cpu about four years. To speed this up, we calculate the bounding boxes for each taxi zone, which looks like:


In [5]:
# <!-- collapse=True -->
df2 = df.copy()
df2['geometry'] = df.geometry.envelope
df2['borough_categ'] = sklearn.preprocessing.LabelEncoder().fit_transform(df2['borough'])
gjds2 = bokeh.models.GeoJSONDataSource(geojson=df2.to_json())

TOOLS = "pan,wheel_zoom,reset,hover,save"

p = bokeh.plotting.figure(title="NYC Taxi Districts Bounding Boxes", tools=TOOLS,
    x_axis_location=None, y_axis_location=None, 
    responsive=True)

color_mapper = bokeh.models.LinearColorMapper(palette=bokeh.palettes.Viridis256)

p.patches('xs', 'ys', 
          fill_color={'field': 'borough_categ', 'transform': color_mapper},
          fill_alpha=0.7, line_color="black", line_width=0.5,          
          source=gjds2)

p.grid.grid_line_color = None

hover = p.select_one(bokeh.models.HoverTool)
hover.point_policy = "follow_mouse"
hover.tooltips = u"""
<div> 
    <div class="bokeh_hover_tooltip">Name : @zone</div>
    <div class="bokeh_hover_tooltip">Borough : @borough</div>
    <div class="bokeh_hover_tooltip">Zone ID : @LocationID</div>
    <div class="bokeh_hover_tooltip">(Lon, Lat) : ($x ˚E, $y ˚N)</div>
</div>
"""

p.circle([-73.966,], [40.78,], size=10, fill_color='magenta', line_color='yellow', line_width=1, alpha=1.0)

show(p)


Now, given a (longitude, latitude) coordinate pair, bounding boxes that contain that pair can be efficiently calculated with an R-tree. Only the polygons (taxi zones) that have bounding boxes that contain the coordinate pair need to be examined, and then the point in Polygon is solved for those (hopefully) few taxi zones. This reduces computation by a factor of about 100-1000. This process, assigning coordinate pairs to taxi zones is one example of a spatial join. Geopandas provides a nice interface to efficient spatial joins in Python, and it takes care of calculating bounding boxes and R-trees for you, as this snippet shows.


In [6]:
gpd.sjoin(gpd.GeoDataFrame(crs={'init': 'epsg:4326'}, geometry=[Point(-73.966, 40.78)]), 
          df,
          how='left', op='within')


Out[6]:
geometry index_right LocationID borough zone
0 POINT (-73.96599999999999 40.78) 42 43 Manhattan Central Park

This does the merge for a single point (drawn in magenta) on maps above, and correctly identifies it in Central Park

Part 2 : Spatial Joins at scale using Dask

In my NYC transit project, I download and process the Taxi dataset. Here I load up a single file from the taxi dataset (May 2016) into Dask, and show the first few rows and a few columns. The file is a bit large at 1.8GB, and Dask chooses to divide up the dataframe into 30 partitions for efficient calculations. Each partition is a pandas DataFrame, and dask takes care of all the logic to view the combination as a single DataFrame. Here are a few columns.


In [7]:
# <!-- collapse=True -->
!wget --quiet --continue 'https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2016-05.csv'

trips = dd.read_csv('yellow_tripdata_2016-05.csv')
print("There are {} partitions. ".format(trips.npartitions))
trips[('tpep_pickup_datetime pickup_longitude'
        ' pickup_latitude').split()].head()


There are 30 partitions. 
Out[7]:
tpep_pickup_datetime pickup_longitude pickup_latitude
0 2016-05-01 00:00:00 -73.985901 40.768040
1 2016-05-01 00:00:00 -73.991577 40.744751
2 2016-05-01 00:00:00 -73.993073 40.741573
3 2016-05-01 00:00:00 -73.991943 40.684601
4 2016-05-01 00:00:00 -74.005280 40.740192

In [8]:
# <!-- collapse=True -->
trips[('tpep_dropoff_datetime dropoff_longitude'
        ' dropoff_latitude').split()].head()


Out[8]:
tpep_dropoff_datetime dropoff_longitude dropoff_latitude
0 2016-05-01 00:17:31 -73.983986 40.730099
1 2016-05-01 00:07:31 -73.975700 40.765469
2 2016-05-01 00:07:01 -73.980995 40.744633
3 2016-05-01 00:19:47 -74.002258 40.733002
4 2016-05-01 00:06:39 -73.997498 40.737564

So each trip has pickup and dropoff (longitude, latitude) coordinate pairs. Just to give you a feel for the data, I plot the start and end locations of the first trip, ending in the East Village. Driving directions come with a great deal of additional complexity, so here I just plot an arrow, as the crow flies. A spatial join identifies the taxi zones as Clinton East and East Village.


In [9]:
# <!-- collapse=True -->

x, y = (trips[('tpep_pickup_datetime pickup_longitude'
        ' pickup_latitude').split()].head()).iloc[0, 1:3]
x2, y2 = (trips[('tpep_dropoff_datetime dropoff_longitude'
        ' dropoff_latitude').split()].head()).iloc[0, 1:3]

df['borough_categ'] = sklearn.preprocessing.LabelEncoder().fit_transform(df['borough'])
gjds = bokeh.models.GeoJSONDataSource(geojson=df.to_json())
TOOLS = "pan,wheel_zoom,reset,hover,save"

p = bokeh.plotting.figure(title="NYC Taxi Districts", tools=TOOLS,
    x_axis_location=None, y_axis_location=None, 
    responsive=True)

color_mapper = bokeh.models.LinearColorMapper(palette=bokeh.palettes.Viridis256)

p.patches('xs', 'ys', 
          fill_color={'field': 'borough_categ', 'transform': color_mapper},
          fill_alpha=1., line_color="black", line_width=0.5,          
          source=gjds)

p.grid.grid_line_color = None

hover = p.select_one(bokeh.models.HoverTool)
hover.point_policy = "follow_mouse"
hover.tooltips = u"""
<div> 
    <div class="bokeh_hover_tooltip">Name : @zone</div>
    <div class="bokeh_hover_tooltip">Borough : @borough</div>
    <div class="bokeh_hover_tooltip">Zone ID : @LocationID</div>
    <div class="bokeh_hover_tooltip">(Lon, Lat) : ($x ˚E, $y ˚N)</div>
</div>
"""

p.circle([x,], [y,], size=13, 
         fill_color='aqua', line_color='black', 
         line_width=1, alpha=1.0)
p.circle([x2,], [y2,], size=13, 
         fill_color='aqua', line_color='black', 
         line_width=1, alpha=1.0)

p.add_layout(bokeh.models.Arrow(line_color='orange', line_width=4,
        end=bokeh.models.OpenHead(line_color="orange", line_width=4),
                   x_start=x, y_start=y, x_end=x2, y_end=y2))

p.x_range=bokeh.models.Range1d(-74.02, -73.92)
p.y_range=bokeh.models.Range1d(40.70, 40.78)

show(p)



In [10]:
gpd.sjoin(gpd.GeoDataFrame(crs={'init': 'epsg:4326'}, 
                           geometry=[Point(x, y), Point(x2, y2)]), 
                           df,
                           how='left', op='within')


Out[10]:
geometry index_right LocationID borough zone borough_categ
0 POINT (-73.98590087890625 40.76803970336913) 47 48 Manhattan Clinton East 3
1 POINT (-73.98398590087891 40.73009872436523) 78 79 Manhattan East Village 3

So, Dask DataFrames are just collections of Pandas DataFrames, and I know how to perform a spatial join on a Pandas DataFrame. Let's take advantage of Dask's map_partitions function to do a spatial join with the taxi zones on every partition. Here is the function to do the spatial join, given a Pandas DataFrame, and the names of the longitude, latitude, and taxizone id columns.


In [11]:
# <!-- collapse=False -->
def assign_taxi_zones(df, lon_var, lat_var, locid_var):
    """Joins DataFrame with Taxi Zones shapefile.
    This function takes longitude values provided by `lon_var`, and latitude
    values provided by `lat_var` in DataFrame `df`, and performs a spatial join
    with the NYC taxi_zones shapefile. 
    The shapefile is hard coded in, as this function makes a hard assumption of
    latitude and longitude coordinates. It also assumes latitude=0 and 
    longitude=0 is not a datapoint that can exist in your dataset. Which is 
    reasonable for a dataset of New York, but bad for a global dataset.
    Only rows where `df.lon_var`, `df.lat_var` are reasonably near New York,
    and `df.locid_var` is set to np.nan are updated. 
    Parameters
    ----------
    df : pandas.DataFrame or dask.DataFrame
        DataFrame containing latitudes, longitudes, and location_id columns.
    lon_var : string
        Name of column in `df` containing longitude values. Invalid values 
        should be np.nan.
    lat_var : string
        Name of column in `df` containing latitude values. Invalid values 
        should be np.nan
    locid_var : string
        Name of series to return. 
    """

    import geopandas
    from shapely.geometry import Point


    # make a copy since we will modify lats and lons
    localdf = df[[lon_var, lat_var]].copy()
    
    # missing lat lon info is indicated by nan. Fill with zero
    # which is outside New York shapefile. 
    localdf[lon_var] = localdf[lon_var].fillna(value=0.)
    localdf[lat_var] = localdf[lat_var].fillna(value=0.)
    

    shape_df = geopandas.read_file('../shapefiles/taxi_zones.shp')
    shape_df.drop(['OBJECTID', "Shape_Area", "Shape_Leng", "borough", "zone"],
                  axis=1, inplace=True)
    shape_df = shape_df.to_crs({'init': 'epsg:4326'})

    try:
        local_gdf = geopandas.GeoDataFrame(
            localdf, crs={'init': 'epsg:4326'},
            geometry=[Point(xy) for xy in
                      zip(localdf[lon_var], localdf[lat_var])])

        local_gdf = geopandas.sjoin(
            local_gdf, shape_df, how='left', op='within')

        return local_gdf.LocationID.rename(locid_var)
    except ValueError as ve:
        print(ve)
        print(ve.stacktrace())
        series = localdf[lon_var]
        series = np.nan
        return series

At Scale

Using the map_partitions function, I apply the spatial join to each of the Pandas DataFrames that make up the Dask DataFrame. For simplicity, I just call the function twice, once for pickup locations, and once for dropoff locations. To assist dask in determining schema of returned data, we specify it as a column of floats (allowing for NaN values).


In [12]:
# <!-- collapse=False -->
trips['pickup_taxizone_id'] = trips.map_partitions(
    assign_taxi_zones, "pickup_longitude", "pickup_latitude",
    "pickup_taxizone_id", meta=('pickup_taxizone_id', np.float64))
trips['dropoff_taxizone_id'] = trips.map_partitions(
    assign_taxi_zones, "dropoff_longitude", "dropoff_latitude",
    "dropoff_taxizone_id", meta=('dropoff_taxizone_id', np.float64))
trips[['pickup_taxizone_id', 'dropoff_taxizone_id']].head()


Out[12]:
pickup_taxizone_id dropoff_taxizone_id
0 48.0 79.0
1 90.0 43.0
2 234.0 170.0
3 25.0 249.0
4 158.0 249.0

At this point, the trips Dask DataFrame will have valid taxizone_id information. Lets save this data to Parquet, which is a columnar format that is well supported in Dask and Apache Spark. This prevents Dask from recalculating the spatial join (very expensive) every time an operation on the trips DataFrame is required.


In [13]:
trips.to_parquet('trips_2016-05.parquet', has_nulls=True, object_encoding='json', compression="SNAPPY")
trips = dd.read_parquet('trips_2016-05.parquet', columns=['pickup_taxizone_id', 'dropoff_taxizone_id'])

To close this post out, I'll produce a heatmap of taxi dropoff locations, aggregated by taxi zone using Dask. Unsurprisingly (for New Yorkers at least) the vast majority of taxi dropoffs tend to be in Downtown and Midtown Manhattan. I will analyze this dataset further in future posts.


In [14]:
# <!-- collapse=True -->
counts = trips.groupby('dropoff_taxizone_id').count().compute()
counts.columns = ['N']
counts2 = df.merge(counts, left_on='LocationID', right_index=True, how='left')

gjds = bokeh.models.GeoJSONDataSource(geojson=counts2.to_json())
TOOLS = "pan,wheel_zoom,reset,hover,save"

p = bokeh.plotting.figure(title="NYC Taxi Dropoffs Heatmap", tools=TOOLS,
    x_axis_location=None, y_axis_location=None, 
    responsive=True)

color_mapper = bokeh.models.LogColorMapper(palette=bokeh.palettes.Viridis256, low=1, high=500000)

p.patches('xs', 'ys', 
          fill_color={'field': 'N', 'transform': color_mapper},
          fill_alpha=1., line_color="black", line_width=0.5,          
          source=gjds)

p.grid.grid_line_color = None

hover = p.select_one(bokeh.models.HoverTool)
hover.point_policy = "follow_mouse"
hover.tooltips = u"""
<div> 
    <div class="bokeh_hover_tooltip">Name : @zone</div>
    <div class="bokeh_hover_tooltip">Borough : @borough</div>
    <div class="bokeh_hover_tooltip">Trips Start : @N</div>
</div>
"""

color_bar = bokeh.models.ColorBar(
    color_mapper=color_mapper, orientation='horizontal',
    ticker=bokeh.models.FixedTicker(ticks=[3, 10, 30, 100, 300, 1000, 3000, 10000, 30000, 100000, 300000]),
    formatter=bokeh.models.PrintfTickFormatter(format='%d'),
    label_standoff=12, border_line_color=None, location=(0,0))
p.add_layout(color_bar, 'below')

show(p)


Summary

In this post I described the process of a Spatial Join, and doing the Spatial Join at scale on a cluster using Dask and Pandas. I glossed over some details that are important for the full New York Taxi Dataset, but my full code is available here on Github. In future posts, I will analyze this data more thoroughly, and possibly look into releasing the processed data as a parquet file for others to analyze.

Project on Github

A side note on spatial join performance

The spatial join as written above with GeoPandas, using the New York Taxi Dataset, can assign taxi zones to approxmately 40 million taxi trips per hour on a 4 GHz 4-core i5 system. A lot of the code that supports this join is some amalgamation of Python and wrapped C code.

This is approxmately a factor of two slower than performing the same spatial join in highly optimized PostGIS C/C++ code. However, PostGIS does not efficiently use multiple cores (at least without multiple spatial joins running simultaneously), and more importantly, the network and serialization overhead of a round trip to the PostgreSQL database puts PostgreSQL/PostGIS at roughly the same speed as the GeoPandas implementation I describe in this post, with vastly more moving parts to break.

Basically, Python is actually pretty fast for these kinds of data structure operations.