Intro

In this example, we'll use a combination of Jupyter notebooks, Pandas, and Pachyderm to analyze Citi Bike sales data.


In [1]:
%matplotlib inline

import os
import datetime
from io import StringIO

import pandas as pd
import python_pachyderm

Insert Data

First, we'll create a couple of repos and populate them:

  • trips - This repo is populated with a daily file that records the number of bicycle trips recorded by NYC's citibike bike sharing company on that particular day (data from here).
  • weather - This repo is populated daily with a JSON file representing the weather forecast for that day from forecast.io.

In [3]:
pfs_client = python_pachyderm.PfsClient()
pps_client = python_pachyderm.PpsClient()

# First create the repos/pipelines
pfs_client.create_repo("trips")
pfs_client.create_repo("weather")
pps_client.create_pipeline(
    "jupyter",
    transform=python_pachyderm.Transform(
        image="pachyderm/pachyderm_jupyter:2019",
        cmd=["python3", "merge.py"],
    ),
    input=python_pachyderm.Input(cross=[
        python_pachyderm.Input(pfs=python_pachyderm.PFSInput(glob="/", repo="weather")),
        python_pachyderm.Input(pfs=python_pachyderm.PFSInput(glob="/", repo="trips")),
    ])
)

In [4]:
# Populate the input repos
def insert_data(name):
    print("Inserting {} data...".format(name))
    with pfs_client.commit(name, "master") as c:
        data_dir = "{}_data".format(name)

        for data_filename in os.listdir(data_dir):
            data_filepath = os.path.join(data_dir, data_filename)
            with open(data_filepath, "rb") as f:
                pfs_client.put_file_bytes(c, data_filename, f)
                
        return c
            
trips_commit = insert_data("trips")
weather_commit = insert_data("weather")

# Wait for the commits to finish
print("Waiting for commits to finish...")
for commit in pfs_client.flush_commit([trips_commit, weather_commit]):
    print(commit)


Inserting trips data...
Inserting weather data...
Waiting for commits to finish...
commit {
  repo {
    name: "jupyter"
  }
  id: "35b6527bfdc54172877a1bde458151e9"
}
parent_commit {
  repo {
    name: "jupyter"
  }
  id: "9c3f50f02cb847bdb8f438bfcd777cbb"
}
started {
  seconds: 1556919851
  nanos: 495824700
}
finished {
  seconds: 1556919863
  nanos: 880956500
}
size_bytes: 2586
trees {
  hash: "2935ccee0b749e92b70d5d466b24e4156ccb713a2060d1d5b81c685133dd7e9d794f4a603961aa328004afb09ac41ea995c38a2fd072e4db16b167a4a34b16b9"
}
datums {
  hash: "c27cbc799d4be7315a6ed5665cd5449f01f8ef14fe7cfd721c32324a729a4531d9a236cb9c84be3b183027d031dc7b1eebfe99f57b81b225531106947ec9458b"
}


In [5]:
file = pfs_client.get_file(("jupyter", "master"), "data.csv")
contents = "\n".join([chunk.decode("utf8") for chunk in file])
df = pd.read_csv(StringIO(contents), names=["Date", "Precipitation", "Trips", "Sales"], index_col="Date")
df.index = pd.to_datetime(df.index)
df.sort_index(inplace=True)

# Get just July 2016
df = df[datetime.datetime(year=2016, month=7, day=1):datetime.datetime(year=2016, month=7, day=31)]
print(df)


            Precipitation  Trips   Sales
Date                                    
2016-07-01           0.83  39665  198325
2016-07-02           0.00  35835  179175
2016-07-03           0.00  35347  176735
2016-07-04           0.59  34859  174295
2016-07-05           0.89  43736  218680
2016-07-06           0.00  51336  256680
2016-07-07           0.68  48724  243620
2016-07-08           0.52  44400  222000
2016-07-09           0.82  38138  190690
2016-07-10           0.84  41985  209925
2016-07-11           0.00  52853  264265
2016-07-12           0.00  56515  282575
2016-07-13           0.00  55471  277355
2016-07-14           0.91  50345  251725
2016-07-15           0.00  48558  242790
2016-07-16           0.20  38405  192025
2016-07-17           0.00  41346  206730
2016-07-18           0.85  43681  218405
2016-07-19           0.00  54587  272935
2016-07-20           0.00  57213  286065
2016-07-21           0.00  54832  274160
2016-07-22           0.00  49185  245925
2016-07-23           0.00  37522  187610
2016-07-24           0.00  39634  198170
2016-07-25           0.91  35376  176880
2016-07-26           0.00  52904  264520
2016-07-27           0.00  54148  270740
2016-07-28           0.00  50134  250670
2016-07-29           0.80  37693  188465
2016-07-30           0.68  27780  138900
2016-07-31           0.88  27651  138255

Visualize the sales in the context of weather

Finally, we confirm our suspicions by visualizing the precipitation probabilities with the sales data:


In [6]:
ax = df.plot(secondary_y=["Precipitation"], figsize=(10, 8))
ax.set_ylabel("Sales ($), # Trips")
ax.right_ax.set_ylabel("Precipitation probability")
ax.right_ax.legend(loc="best")
ax.legend(loc="upper left")


Out[6]:
<matplotlib.legend.Legend at 0x10d1f5a58>

We can see that their was a probability of precipitation in NYC above 70% both of the days in question. This is likely to be the explanation for the poor sales. Of course, we can attach our Jupyter notebook other parts of the data to explore other unexpected behavior, develop further analyses, etc.