In [1]:
%matplotlib inline
import pandas as pd
import matplotlib.pyplot as plt
from pandas.tools.plotting import scatter_matrix
import seaborn as sns
import json
from pprint import pprint
from datetime import timedelta
import os
We have attached our Jupyter server to three different repositories in our Pachyderm pipline DAG:
sales.csv
. sales.csv
is updated daily by a pipeline that processes each file in trips
to calculate sales for the day. Note, here we are using a fictional multiplier, $5/trip, to calculate daily "sales" (i.e., these are not actually the sales figures for citibike).We attached to the trips
and weather
repos at commit number 30, which corresponds to the data versioned on 7/31/2016. We did this, because on 7/30/2016 and 7/31/2016 we saw a sharp drop in our sales, and we want to try and understand, interactively, why we might have seen this drop in sales.
By attaching to these separate points in our DAG (trips
, weather
, and sales
) we can bring our data together at a particular commit (i.e., a particular point in history), without explicitly planning a pipeline stage that takes these repos as input.
Let's first grab our sales data from the sales
repo:
In [2]:
salesDF = pd.read_csv('/pfs/sales/sales.csv', names=['Date', 'Sales'])
salesDF['Date'] = pd.to_datetime(salesDF['Date'])
salesDF.set_index('Date', inplace=True)
In [3]:
salesDF.head()
Out[3]:
We can then grab the trip count data from the trips
repo, put that into a separate dataframe, and merge the two data frames. This way we can join our data from multiple repos interactively to try and draw some conclusions.
In [4]:
trip_data = []
for fn in os.listdir('/pfs/trips/'):
if os.path.isfile('/pfs/trips/' + fn):
data = pd.read_csv('/pfs/trips/' + fn)
trips = data.ix[0]['Trips over the past 24-hours (midnight to 11:59pm)']
trip_data.append([data.ix[0]['Date'], trips])
tripsDF = pd.DataFrame(trip_data, columns=['Date','Trips'])
tripsDF['Date'] = pd.to_datetime(tripsDF['Date'])
tripsDF.set_index('Date', inplace=True)
In [5]:
tripsDF.head()
Out[5]:
In [6]:
dataDF = salesDF.join(tripsDF)
In [7]:
dataDF.head()
Out[7]:
Finally, we will pull in the JSON weather data from the weather
repo and again merge that with the sales and trip data. We suspect that the weather likely had something to do with the poor sales on the 30th and 31st of July. In particular we expect that precipitation might have led to the poor sales, so we will extract the daily precipitation probabilities and join that with our previously created dataframe.
In [8]:
precip_data = []
for fn in os.listdir('/pfs/weather/'):
if os.path.isfile('/pfs/weather/' + fn):
with open('/pfs/weather/' + fn) as data_file:
data = json.load(data_file)
precip = data['daily']['data'][0]['precipProbability']
precip_data.append([fn, precip])
In [9]:
precipDF = pd.DataFrame(precip_data, columns=['Date', 'Precipitation Probability'])
precipDF['Date'] = pd.to_datetime(precipDF['Date'])
precipDF.set_index('Date', inplace=True)
In [10]:
precipDF.head()
Out[10]:
In [11]:
dataDF = dataDF.join(precipDF)
In [12]:
dataDF.head()
Out[12]:
Finally, we confirm our suspicions by visualizing the precipitation probabilities with the sales data:
In [19]:
ax = dataDF.plot(secondary_y=['Precipitation Probability'], figsize=(10, 8))
ax.set_ylabel('Sales (dollars), # Trips')
ax.right_ax.set_ylabel('Precipitation probability')
ax.right_ax.legend(loc='best')
ax.legend(loc='upper left')
Out[19]:
We can see that their was a probability of precipitation in NYC above 70% both of the days in question. This is likely to be the explanation for the poor sales. Of course, we can attach our Jupyter notebook to any Pachyderm repos at at commit to explore other unexpected behavior, develop further analyses, etc.