In [1]:
%matplotlib inline
%config InlineBackend.figure_format='retina'

import dask.dataframe as dd
import dask.distributed
import numpy as np
import pandas as pd

from matplotlib.colors import SymLogNorm as symlog
from matplotlib import rcParams

import sklearn
import matplotlib.pyplot as plt
import palettable

import seaborn as sns

In [2]:
pd.options.display.max_rows = 100
pd.options.display.max_columns = 100

In [65]:
trips = spark.read.parquet('/data/all_trips_spark.parquet/')

In [66]:
import pyspark.sql.functions as fn
import pyspark.sql.types as ty

In [67]:
trips2 = trips \
    .withColumn('pickup_datetime', fn.to_utc_timestamp(trips.pickup_datetime, 'America/New_York')) \
    .withColumn('dropoff_datetime', fn.to_utc_timestamp(trips.dropoff_datetime, 'America/New_York'))

In [68]:
trips2 = trips2.withColumn('pickup_dt_group', fn.concat(
        fn.date_format(trips2.pickup_datetime, 'yyyy-MM-dd hh:'),
        fn.format_string('%02d:00', (6.0*fn.floor(fn.minute(trips2.pickup_datetime)/6.0)).cast(ty.IntegerType()))
                                                       )
                          )

In [69]:
trips2.head()


Out[69]:
Row(dropoff_datetime=datetime.datetime(2016, 6, 18, 21, 30, 7), dropoff_latitude=40.730159759521484, dropoff_longitude=-73.9839096069336, dropoff_taxizone_id=79, ehail_fee=None, extra=0.5, fare_amount=6.5, improvement_surcharge=0.30000001192092896, mta_tax=0.5, passenger_count=None, payment_type='"1"', pickup_datetime=datetime.datetime(2016, 6, 18, 21, 22, 52), pickup_latitude=40.732765197753906, pickup_longitude=-73.99752807617188, pickup_taxizone_id=113, rate_code_id=None, store_and_fwd_flag='"N"', tip_amount=1.0, tolls_amount=0.0, total_amount=8.800000190734863, trip_distance=0.800000011920929, trip_type='"yellow"', vendor_id='"1"', trip_id=6511170420736, pickup_dt_group='2016-06-18 09:18:00')

In [70]:
sqlContext.sql("set spark.sql.shuffle.partitions=1")


Out[70]:
DataFrame[key: string, value: string]

In [ ]:
trips2.groupBy('pickup_taxizone_id', 'pickup_dt_group').count().repartition(263, 'pickup_taxizone_id').write.parquet('/data/trips_timeseries_counts.parquet', mode='overwrite', compression='SNAPPY')

In [ ]:


In [ ]:


In [46]:
d3 = dd.read_parquet('/tmp/junk.parquet/pickup_taxizone_id=1', engine='arrow')

In [39]:
d4 = pd.DataFrame(index=pd.date_range('2009-01-01 00:00:00', '2017-01-01 00:00:00', freq='6T'))

In [48]:
d3 = d3.compute()

In [54]:
d3 = d3.set_index('pickup_dt_group', drop=True)

In [58]:
d3.index


Out[58]:
Index(['2009-01-01 01:00:00', '2009-01-01 01:36:00', '2009-01-01 01:48:00',
       '2009-01-01 02:36:00', '2009-01-01 02:48:00', '2009-01-01 03:42:00',
       '2009-01-01 03:48:00', '2009-01-01 04:06:00', '2009-01-01 04:12:00',
       '2009-01-01 04:18:00',
       ...
       '2016-12-30 10:36:00', '2016-12-30 11:48:00', '2016-12-31 02:30:00',
       '2016-12-31 04:54:00', '2016-12-31 06:30:00', '2016-12-31 09:00:00',
       '2016-12-31 09:48:00', '2016-12-31 10:36:00', '2016-12-31 11:00:00',
       '2016-12-31 12:12:00'],
      dtype='object', name='pickup_dt_group', length=98275)

In [60]:
d4.index = d4.index.astype(str)

In [62]:
d5 = d4.merge(d3, how='left', left_index=True, right_index=True)

In [63]:
d5.count()


Out[63]:
count
2009-01-01 00:00:00 NaN
2009-01-01 00:06:00 NaN
2009-01-01 00:12:00 NaN
2009-01-01 00:18:00 NaN
2009-01-01 00:24:00 NaN
2009-01-01 00:30:00 NaN
2009-01-01 00:36:00 NaN
2009-01-01 00:42:00 NaN
2009-01-01 00:48:00 NaN
2009-01-01 00:54:00 NaN
2009-01-01 01:00:00 2.0
2009-01-01 01:06:00 NaN
2009-01-01 01:12:00 NaN
2009-01-01 01:18:00 NaN
2009-01-01 01:24:00 NaN
2009-01-01 01:30:00 NaN
2009-01-01 01:36:00 1.0
2009-01-01 01:42:00 NaN
2009-01-01 01:48:00 1.0
2009-01-01 01:54:00 NaN
2009-01-01 02:00:00 NaN
2009-01-01 02:06:00 NaN
2009-01-01 02:12:00 NaN
2009-01-01 02:18:00 NaN
2009-01-01 02:24:00 NaN
2009-01-01 02:30:00 NaN
2009-01-01 02:36:00 1.0
2009-01-01 02:42:00 NaN
2009-01-01 02:48:00 2.0
2009-01-01 02:54:00 NaN
2009-01-01 03:00:00 NaN
2009-01-01 03:06:00 NaN
2009-01-01 03:12:00 NaN
2009-01-01 03:18:00 NaN
2009-01-01 03:24:00 NaN
2009-01-01 03:30:00 NaN
2009-01-01 03:36:00 NaN
2009-01-01 03:42:00 1.0
2009-01-01 03:48:00 1.0
2009-01-01 03:54:00 NaN
2009-01-01 04:00:00 NaN
2009-01-01 04:06:00 1.0
2009-01-01 04:12:00 3.0
2009-01-01 04:18:00 1.0
2009-01-01 04:24:00 NaN
2009-01-01 04:30:00 1.0
2009-01-01 04:36:00 NaN
2009-01-01 04:42:00 NaN
2009-01-01 04:48:00 1.0
2009-01-01 04:54:00 NaN
... ...
2016-12-31 19:06:00 NaN
2016-12-31 19:12:00 NaN
2016-12-31 19:18:00 NaN
2016-12-31 19:24:00 NaN
2016-12-31 19:30:00 NaN
2016-12-31 19:36:00 NaN
2016-12-31 19:42:00 NaN
2016-12-31 19:48:00 NaN
2016-12-31 19:54:00 NaN
2016-12-31 20:00:00 NaN
2016-12-31 20:06:00 NaN
2016-12-31 20:12:00 NaN
2016-12-31 20:18:00 NaN
2016-12-31 20:24:00 NaN
2016-12-31 20:30:00 NaN
2016-12-31 20:36:00 NaN
2016-12-31 20:42:00 NaN
2016-12-31 20:48:00 NaN
2016-12-31 20:54:00 NaN
2016-12-31 21:00:00 NaN
2016-12-31 21:06:00 NaN
2016-12-31 21:12:00 NaN
2016-12-31 21:18:00 NaN
2016-12-31 21:24:00 NaN
2016-12-31 21:30:00 NaN
2016-12-31 21:36:00 NaN
2016-12-31 21:42:00 NaN
2016-12-31 21:48:00 NaN
2016-12-31 21:54:00 NaN
2016-12-31 22:00:00 NaN
2016-12-31 22:06:00 NaN
2016-12-31 22:12:00 NaN
2016-12-31 22:18:00 NaN
2016-12-31 22:24:00 NaN
2016-12-31 22:30:00 NaN
2016-12-31 22:36:00 NaN
2016-12-31 22:42:00 NaN
2016-12-31 22:48:00 NaN
2016-12-31 22:54:00 NaN
2016-12-31 23:00:00 NaN
2016-12-31 23:06:00 NaN
2016-12-31 23:12:00 NaN
2016-12-31 23:18:00 NaN
2016-12-31 23:24:00 NaN
2016-12-31 23:30:00 NaN
2016-12-31 23:36:00 NaN
2016-12-31 23:42:00 NaN
2016-12-31 23:48:00 NaN
2016-12-31 23:54:00 NaN
2017-01-01 00:00:00 NaN

701281 rows × 1 columns


In [ ]:
%matplotlib inline
%config InlineBackend.figure_format='retina'

import dask.dataframe as dd
import dask.distributed
import numpy as np
import pandas as pd

from matplotlib.colors import SymLogNorm as symlog
from matplotlib import rcParams

import sklearn
import matplotlib.pyplot as plt
import palettable

import seaborn as sns

In [ ]:
pd.options.display.max_rows = 100
pd.options.display.max_columns = 100

In [ ]:
client = dask.distributed.Client()

In [ ]:
trips = dd.read_parquet('/data/all_trips.parquet', engine='fastparquet', index='pickup_datetime', 
                        columns=['pickup_datetime', 'pickup_taxizone_id', 'dropoff_datetime', 'dropoff_taxizone_id'])
trips.head()

In [ ]:
def parse_to_6min_freq(df):
    return df.index.values.astype('M8[m]').astype(str)

In [ ]:
parse_to_6min_freq(trips.get_partition(0))

In [ ]:
trips['pickup_minute'] = trips.map_partitions(parse_to_6min_freq, meta=('pickup_minute', object))

In [ ]:
trips.head()

In [ ]: