In [1]:
%matplotlib inline
%config InlineBackend.figure_format='retina'
import dask.dataframe as dd
import dask.distributed
import numpy as np
import pandas as pd
from matplotlib.colors import SymLogNorm as symlog
from matplotlib import rcParams
import sklearn
import matplotlib.pyplot as plt
import palettable
import seaborn as sns
In [2]:
pd.options.display.max_rows = 100
pd.options.display.max_columns = 100
In [65]:
trips = spark.read.parquet('/data/all_trips_spark.parquet/')
In [66]:
import pyspark.sql.functions as fn
import pyspark.sql.types as ty
In [67]:
trips2 = trips \
.withColumn('pickup_datetime', fn.to_utc_timestamp(trips.pickup_datetime, 'America/New_York')) \
.withColumn('dropoff_datetime', fn.to_utc_timestamp(trips.dropoff_datetime, 'America/New_York'))
In [68]:
trips2 = trips2.withColumn('pickup_dt_group', fn.concat(
fn.date_format(trips2.pickup_datetime, 'yyyy-MM-dd hh:'),
fn.format_string('%02d:00', (6.0*fn.floor(fn.minute(trips2.pickup_datetime)/6.0)).cast(ty.IntegerType()))
)
)
In [69]:
trips2.head()
Out[69]:
In [70]:
sqlContext.sql("set spark.sql.shuffle.partitions=1")
Out[70]:
In [ ]:
trips2.groupBy('pickup_taxizone_id', 'pickup_dt_group').count().repartition(263, 'pickup_taxizone_id').write.parquet('/data/trips_timeseries_counts.parquet', mode='overwrite', compression='SNAPPY')
In [ ]:
In [ ]:
In [46]:
d3 = dd.read_parquet('/tmp/junk.parquet/pickup_taxizone_id=1', engine='arrow')
In [39]:
d4 = pd.DataFrame(index=pd.date_range('2009-01-01 00:00:00', '2017-01-01 00:00:00', freq='6T'))
In [48]:
d3 = d3.compute()
In [54]:
d3 = d3.set_index('pickup_dt_group', drop=True)
In [58]:
d3.index
Out[58]:
In [60]:
d4.index = d4.index.astype(str)
In [62]:
d5 = d4.merge(d3, how='left', left_index=True, right_index=True)
In [63]:
d5.count()
Out[63]:
In [ ]:
%matplotlib inline
%config InlineBackend.figure_format='retina'
import dask.dataframe as dd
import dask.distributed
import numpy as np
import pandas as pd
from matplotlib.colors import SymLogNorm as symlog
from matplotlib import rcParams
import sklearn
import matplotlib.pyplot as plt
import palettable
import seaborn as sns
In [ ]:
pd.options.display.max_rows = 100
pd.options.display.max_columns = 100
In [ ]:
client = dask.distributed.Client()
In [ ]:
trips = dd.read_parquet('/data/all_trips.parquet', engine='fastparquet', index='pickup_datetime',
columns=['pickup_datetime', 'pickup_taxizone_id', 'dropoff_datetime', 'dropoff_taxizone_id'])
trips.head()
In [ ]:
def parse_to_6min_freq(df):
return df.index.values.astype('M8[m]').astype(str)
In [ ]:
parse_to_6min_freq(trips.get_partition(0))
In [ ]:
trips['pickup_minute'] = trips.map_partitions(parse_to_6min_freq, meta=('pickup_minute', object))
In [ ]:
trips.head()
In [ ]: