In [1]:
import dask.distributed
import dask.dataframe as dd
import pandas as pd
import numpy as np

In [2]:
import geopandas
from shapely.geometry import Point

In [3]:
client = dask.distributed.Client()

In [4]:
def assign_taxi_zones(df, lon_var, lat_var, locid_var):
    """Joins DataFrame with Taxi Zones shapefile.

    This function takes longitude values provided by `lon_var`, and latitude
    values provided by `lat_var` in DataFrame `df`, and performs a spatial join
    with the NYC taxi_zones shapefile. 

    The shapefile is hard coded in, as this function makes a hard assumption of
    latitude and longitude coordinates. It also assumes latitude=0 and 
    longitude=0 is not a datapoint that can exist in your dataset. Which is 
    reasonable for a dataset of New York, but bad for a global dataset.

    Only rows where `df.lon_var`, `df.lat_var` are reasonably near New York,
    and `df.locid_var` is set to np.nan are updated. 

    Parameters
    ----------
    df : pandas.DataFrame or dask.DataFrame
        DataFrame containing latitudes, longitudes, and location_id columns.
    lon_var : string
        Name of column in `df` containing longitude values. Invalid values 
        should be np.nan.
    lat_var : string
        Name of column in `df` containing latitude values. Invalid values 
        should be np.nan
    locid_var : string
        Name of column in `df` containing taxi_zone location ids. Rows with
        valid, nonzero values are not overwritten. 
    """

    localdf = df[[lon_var, lat_var, locid_var]].copy()
    # localdf = localdf.reset_index()
    localdf[lon_var] = localdf[lon_var].fillna(value=0.)
    localdf[lat_var] = localdf[lat_var].fillna(value=0.)
    localdf['replace_locid'] = (localdf[locid_var].isnull()
                                & (localdf[lon_var] != 0.)
                                & (localdf[lat_var] != 0.))

    if (np.any(localdf['replace_locid'])):
        shape_df = geopandas.read_file('../shapefiles/taxi_zones_latlon.shp')
        shape_df.drop(['OBJECTID', "Shape_Area", "Shape_Leng", "borough", "zone"],
                      axis=1, inplace=True)

        try:
            local_gdf = geopandas.GeoDataFrame(
                localdf, crs={'init': 'epsg:4326'},
                geometry=[Point(xy) for xy in
                          zip(localdf[lon_var], localdf[lat_var])])

            local_gdf = geopandas.sjoin(
                local_gdf, shape_df, how='left', op='intersects')

            # one point can intersect more than one zone -- for example if on
            # the boundary between two zones. Deduplicate by taking first valid.
            local_gdf = local_gdf[~local_gdf.index.duplicated(keep='first')]

            local_gdf.LocationID.values[~local_gdf.replace_locid] = (
                (local_gdf[locid_var])[~local_gdf.replace_locid]).values

            return local_gdf.LocationID.rename(locid_var)
        except ValueError as ve:
            print(ve)
            print(ve.stacktrace())
            return df[locid_var]
    else:
        return df[locid_var]

In [5]:
df1 = dd.read_parquet('/bigdata/citibike.parquet')

In [6]:
df1['start_taxizone_id'] = np.nan
df1['end_taxizone_id'] = np.nan

In [7]:
df1.start_station_id.count().compute()


Out[7]:
36902025

In [8]:
df1.head()


Out[8]:
trip_duration start_time stop_time start_station_id start_station_name start_station_latitude start_station_longitude end_station_id end_station_name end_station_latitude end_station_longitude bike_id user_type birth_year gender start_taxizone_id end_taxizone_id
0 634 2013-07-01 00:00:00 2013-07-01 00:10:34 164 E 47 St & 2 Ave 40.753231 -73.970322 504 1 Ave & E 15 St 40.732220 -73.981659 16950 Customer NaN 0 NaN NaN
1 1547 2013-07-01 00:00:02 2013-07-01 00:25:49 388 W 26 St & 10 Ave 40.749718 -74.002953 459 W 20 St & 11 Ave 40.746746 -74.007759 19816 Customer NaN 0 NaN NaN
2 178 2013-07-01 00:01:04 2013-07-01 00:04:02 293 Lafayette St & E 8 St 40.730286 -73.990768 237 E 11 St & 2 Ave 40.730473 -73.986725 14548 Subscriber 1980.0 2 NaN NaN
3 1580 2013-07-01 00:01:06 2013-07-01 00:27:26 531 Forsyth St & Broome St 40.718941 -73.992661 499 Broadway & W 60 St 40.769154 -73.981918 16063 Customer NaN 0 NaN NaN
4 757 2013-07-01 00:01:10 2013-07-01 00:13:47 382 University Pl & E 14 St 40.734928 -73.992004 410 Suffolk St & Stanton St 40.720665 -73.985176 19213 Subscriber 1986.0 1 NaN NaN

In [9]:
df1['start_taxizone_id'] = df1.map_partitions(
    assign_taxi_zones, "start_station_longitude", "start_station_latitude",
    "start_taxizone_id", meta=('start_taxizone_id', np.float64))
df1['end_taxizone_id'] = df1.map_partitions(
    assign_taxi_zones, "end_station_longitude", "end_station_latitude",
    "end_taxizone_id", meta=('end_taxizone_id', np.float64))

In [10]:
df1.head()


Out[10]:
trip_duration start_time stop_time start_station_id start_station_name start_station_latitude start_station_longitude end_station_id end_station_name end_station_latitude end_station_longitude bike_id user_type birth_year gender start_taxizone_id end_taxizone_id
0 634 2013-07-01 00:00:00 2013-07-01 00:10:34 164 E 47 St & 2 Ave 40.753231 -73.970322 504 1 Ave & E 15 St 40.732220 -73.981659 16950 Customer NaN 0 233 224
1 1547 2013-07-01 00:00:02 2013-07-01 00:25:49 388 W 26 St & 10 Ave 40.749718 -74.002953 459 W 20 St & 11 Ave 40.746746 -74.007759 19816 Customer NaN 0 246 246
2 178 2013-07-01 00:01:04 2013-07-01 00:04:02 293 Lafayette St & E 8 St 40.730286 -73.990768 237 E 11 St & 2 Ave 40.730473 -73.986725 14548 Subscriber 1980.0 2 113 79
3 1580 2013-07-01 00:01:06 2013-07-01 00:27:26 531 Forsyth St & Broome St 40.718941 -73.992661 499 Broadway & W 60 St 40.769154 -73.981918 16063 Customer NaN 0 148 142
4 757 2013-07-01 00:01:10 2013-07-01 00:13:47 382 University Pl & E 14 St 40.734928 -73.992004 410 Suffolk St & Stanton St 40.720665 -73.985176 19213 Subscriber 1986.0 1 113 148

In [11]:
df1.to_parquet('/bigdata/citibike_locid.parquet', has_nulls=True, compression="SNAPPY",
              object_encoding='json')

In [14]:
df1 = dd.read_parquet('/bigdata/citibike_locid.parquet')

In [15]:
df1.start_station_id.count().compute()


Out[15]:
36902025

In [16]:
df2 = df1[df1.start_taxizone_id.notnull() & df1.end_taxizone_id.notnull()]

In [17]:
df2.start_station_id.count().compute()


Out[17]:
36901697

In [18]:
df2.to_parquet('/bigdata/citibike_locid_cleaned.parquet', has_nulls=True, compression="SNAPPY",
              object_encoding='json')

In [10]:
df2 = spark.read.parquet('/bigdata/citibike_locid_cleaned.parquet')

In [12]:
help(df2.write.parquet)


Help on method parquet in module pyspark.sql.readwriter:

parquet(path, mode=None, partitionBy=None, compression=None) method of pyspark.sql.readwriter.DataFrameWriter instance
    Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
    
    :param path: the path in any Hadoop supported file system
    :param mode: specifies the behavior of the save operation when data already exists.
    
        * ``append``: Append contents of this :class:`DataFrame` to existing data.
        * ``overwrite``: Overwrite existing data.
        * ``ignore``: Silently ignore this operation if data already exists.
        * ``error`` (default case): Throw an exception if data already exists.
    :param partitionBy: names of partitioning columns
    :param compression: compression codec to use when saving to file. This can be one of the
                        known case-insensitive shorten names (none, snappy, gzip, and lzo).
                        This will override ``spark.sql.parquet.compression.codec``. If None
                        is set, it uses the value specified in
                        ``spark.sql.parquet.compression.codec``.
    
    >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
    
    .. versionadded:: 1.4


In [13]:
df2.write.parquet('/bigdata/citibike_locid_cleaned_partitioned.parquet', 
                  mode='overwrite', partitionBy='start_taxizone_id',
                  compression='SNAPPY'
                 )

In [19]:
df2 = dd.read_parquet('/bigdata/citibike_locid_cleaned.parquet')

In [23]:
%matplotlib inline
import matplotlib.pyplot as plt


/home/shekhar/anaconda3/lib/python3.5/site-packages/matplotlib/font_manager.py:273: UserWarning: Matplotlib is building the font cache using fc-list. This may take a moment.
  warnings.warn('Matplotlib is building the font cache using fc-list. This may take a moment.')
/home/shekhar/anaconda3/lib/python3.5/site-packages/matplotlib/font_manager.py:273: UserWarning: Matplotlib is building the font cache using fc-list. This may take a moment.
  warnings.warn('Matplotlib is building the font cache using fc-list. This may take a moment.')

In [26]:
zz =df2.start_taxizone_id.unique().values.compute()

In [28]:
sorted(zz)


Out[28]:
[1.9762625833649862e-323,
 5.9287877500949585e-323,
 6.4228533959362051e-323,
 8.3991159793011913e-323,
 1.1857575500189917e-322,
 1.2351641146031164e-322,
 1.6304166312761136e-322,
 1.6798231958602383e-322,
 1.8280428896126122e-322,
 1.9762625833649862e-322,
 2.1244822771173601e-322,
 2.2232954062856094e-322,
 2.3715151000379834e-322,
 2.4209216646221081e-322,
 2.4703282292062327e-322,
 2.569141358374482e-322,
 2.6679544875427313e-322,
 3.0138004396316039e-322,
 3.2114266979681025e-322,
 3.2608332625522272e-322,
 3.3596463917204765e-322,
 3.7054923438093491e-322,
 3.9031186021458477e-322,
 3.9525251667299724e-322,
 4.2983711188188449e-322,
 4.3477776834029696e-322,
 4.4465908125712189e-322,
 4.7924367646600915e-322,
 4.9406564584124654e-322,
 5.0888761521648394e-322,
 5.2370958459172134e-322,
 5.286502410501338e-322,
 5.5335352334219613e-322,
 5.5829417980060859e-322,
 5.6323483625902106e-322,
 6.1758205730155818e-322,
 6.7686993480250777e-322,
 6.9169190417774516e-322,
 6.9663256063615763e-322,
 7.0157321709457009e-322,
 7.0651387355298256e-322,
 7.1145453001139502e-322,
 7.1639518646980749e-322,
 7.2133584292821995e-322,
 7.3121715584504489e-322,
 7.4603912522028228e-322,
 7.8062372042916954e-322,
 7.9544568980440694e-322,
 8.003863462628194e-322,
 8.0532700272123187e-322,
 8.1026765917964433e-322,
 8.3991159793011913e-322,
 8.9425881897265624e-322,
 9.1896210126471857e-322,
 9.3378407063995597e-322,
 9.3872472709836843e-322,
 9.5354669647360583e-322,
 9.6342800939043076e-322,
 1.0325971998082053e-321,
 1.0424785127250302e-321,
 1.072122451475505e-321,
 1.1067070466843923e-321,
 1.1116477031428047e-321,
 1.1165883596012172e-321,
 1.1264696725180421e-321,
 1.1314103289764546e-321,
 1.1363509854348671e-321,
 1.1412916418932795e-321,
 1.146232298351692e-321,
 1.1511729548101044e-321,
 1.1561136112685169e-321,
 1.1659949241853418e-321,
 1.1709355806437543e-321,
 1.1758762371021668e-321,
 1.1808168935605792e-321,
 1.2154014887694665e-321,
 1.2302234581447039e-321,
 1.2598673968951787e-321,
 1.2648080533535912e-321,
 1.2895113356456535e-321,
 1.2944519921040659e-321,
 1.2993926485624784e-321,
 4.0,
 12.0,
 13.0,
 17.0,
 24.0,
 25.0,
 33.0,
 34.0,
 37.0,
 40.0,
 43.0,
 45.0,
 48.0,
 49.0,
 50.0,
 52.0,
 54.0,
 61.0,
 65.0,
 66.0,
 68.0,
 75.0,
 79.0,
 80.0,
 87.0,
 88.0,
 90.0,
 97.0,
 100.0,
 103.0,
 106.0,
 107.0,
 112.0,
 113.0,
 114.0,
 125.0,
 137.0,
 140.0,
 141.0,
 142.0,
 143.0,
 144.0,
 145.0,
 146.0,
 148.0,
 151.0,
 158.0,
 161.0,
 162.0,
 163.0,
 164.0,
 170.0,
 181.0,
 186.0,
 189.0,
 190.0,
 193.0,
 195.0,
 209.0,
 211.0,
 217.0,
 224.0,
 225.0,
 226.0,
 228.0,
 229.0,
 230.0,
 231.0,
 232.0,
 233.0,
 234.0,
 236.0,
 237.0,
 238.0,
 239.0,
 246.0,
 249.0,
 255.0,
 256.0,
 261.0,
 262.0,
 263.0]

In [ ]: