In [3]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
%load_ext autoreload
%autoreload 2
%matplotlib inline
import seaborn as sns
import pandas as pd
import requests
from tweet_parser.tweet import Tweet
from gapi import gnipapi
from gapi.gnipapi import *
This will lean heavily on Tom Augspurger's excellent series on Modern Pandas.
Quote:
Method chaining, where you call methods on an object one after another, is in vogue at the moment. It's always been a style of programming that's been possible with pandas, and over the past several releases, we've added methods that enable even more chaining.
- assign (0.16.0): For adding new columns to a DataFrame in a chain (inspired by dplyr's mutate)
- pipe (0.16.2): For including user-defined methods in method chains.
- rename (0.18.0): For altering axis names (in additional to changing the actual labels as before).
- Window methods (0.18): Took the top-level pd.rolling\_\* and pd.expanding\_\* functions and made them NDFrame methods with a groupby-like API.
- Resample (0.18.0) Added a new groupby-like API
- .where/mask/Indexers accept Callables (0.18.1): In the next release you'll be able to pass a callable to the indexing methods, to be evaluated within the DataFrame's context (like .query, but with code instead of strings).
My scripts will typically start off with large-ish chain at the start getting things into a manageable state. It's good to have the bulk of your munging done with right away so you can start to do Science™:
Part of the goal will be to develop different coding styles with Pandas, moving from a script-ish, verbose approach to a piped style that flows well with discrete cleaning operations grouped into single functions. This flows very well into using pyspark's dataframe as well, as pyspark requires that kind of style and there is a great deal of overlap with pandas' dataframe methods in pyspark.
Method chains are a popular method in programming these days, with the rise of functional languages that can change function composition to be more readable. Examples of this in various languages:
.scala
def fooNotIndent : List[Int] = (1 to 100).view.map { _ + 3 }.filter { _ > 10 }.flatMap { table.get }.take(3).toList
def fooIndent: List[Int] =
(1 to 100)
.view
.map { _ + 3 }
.filter { _ > 10 }
.flatMap { table.get }
.take(3)
.toList
or comparing (from TA's post)
tumble_after(
broke(
fell_down(
fetch(went_up(jack_jill, "hill"), "water"),
jack),
"crown"),
"jill"
)
with (from TA's post)
jack_jill %>%
went_up("hill") %>%
fetch("water") %>%
fell_down("jack") %>%
broke("crown") %>%
tumble_after("jill")
or
jack_jill
.pipe(went("hill", "up"))
.pipe(fetch("water"))
.pipe(fell_down("jack"))
.pipe(broke("crown"))
.pipe(tumble_after("jill"))
There are several cases I'd like to address in this session -
This might be a lot for a single session, but hey.
Let's start off with a problem that we might have that we can try to answer:
I am purposefully choosing a dataset that we will have difficulties in joining with twitter data, and also to illustrate a point...
Let's use Tom's use of the BTS airline delay dataset, which requires a bit of work to obtain and parse through.
In data work, I have a loose set of semantics to describe stages in a workflow:
It's important to note that these stages are NEVER LINEAR, even though it almost always looks like it to the end consumer of posts like this. Each stage can be non-trivial for a host of reasons, and choices made in the early stages have strong effects in the rest of the process. Lots of iteration might be needed in each stage, and managing technical debt here can make each iteration faster.
Given these tasks, it seems logical to define our code in simliar stages, though I have no precise guides for how to do this. In our example for today, we can start by grabbing some data from the web. We'll follow TA's flights data grab and later add some tweet data via the Gnip api.
What type of questions do we want to answer? Let's say we have a project that will investigate customer sentiment around airports / airlines. Perhaps some questions of interest are:
What type of data will we need? Probably some detailed data about flight delays, preferabbly flight-level data that includes information about the carrier, airport, destination, etc. We'll also need tweet data that reasonably matches these critera, of course. In this context, we'll probably be satisfied with simple exploration.
I chose these questions partially due to the great series of posts by Augsperger that illustrate working with complex data, such as flight delays. :)
In [7]:
import os
import zipfile
import requests
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
In the original example from Tom, the code is written out as such:
.python
headers = {
'Referer': 'https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time',
'Origin': 'https://www.transtats.bts.gov',
'Content-Type': 'application/x-www-form-urlencoded',
}
params = (
('Table_ID', '236'),
('Has_Group', '3'),
('Is_Zipped', '0'),
)
data = <TRUNCATED>
os.makedirs('data', exist_ok=True)
dest = "data/flights.csv.zip"
if not os.path.exists(dest):
r = requests.post('https://www.transtats.bts.gov/DownLoad_Table.asp',
headers=headers, params=params, data=data, stream=True)
with open("data/flights.csv.zip", 'wb') as f:
for chunk in r.iter_content(chunk_size=102400):
if chunk:
f.write(chunk)
Given out focus today, let's wrap all initial data pulling into a function for logical separation.
In [8]:
def maybe_pull_airport_data():
"""
lightly modified from TA's post.
"""
headers = {
'Referer': 'https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time',
'Origin': 'https://www.transtats.bts.gov',
'Content-Type': 'application/x-www-form-urlencoded',
}
params = (
('Table_ID', '236'),
('Has_Group', '3'),
('Is_Zipped', '0'),
)
# query string to be sent. can modify the 'where' dates to change the size of data returned.
data = "UserTableName=On_Time_Performance&DBShortName=On_Time&RawDataTable=T_ONTIME&sqlstr=+SELECT+FL_DATE%2CUNIQUE_CARRIER%2CAIRLINE_ID%2CTAIL_NUM%2CFL_NUM%2CORIGIN_AIRPORT_ID%2CORIGIN_AIRPORT_SEQ_ID%2CORIGIN_CITY_MARKET_ID%2CORIGIN%2CORIGIN_CITY_NAME%2CDEST_AIRPORT_ID%2CDEST_AIRPORT_SEQ_ID%2CDEST_CITY_MARKET_ID%2CDEST%2CDEST_CITY_NAME%2CCRS_DEP_TIME%2CDEP_TIME%2CDEP_DELAY%2CTAXI_OUT%2CWHEELS_OFF%2CWHEELS_ON%2CTAXI_IN%2CCRS_ARR_TIME%2CARR_TIME%2CARR_DELAY%2CCANCELLED%2CCANCELLATION_CODE%2CCARRIER_DELAY%2CWEATHER_DELAY%2CNAS_DELAY%2CSECURITY_DELAY%2CLATE_AIRCRAFT_DELAY+FROM++T_ONTIME+WHERE+YEAR%3D2017&varlist=FL_DATE%2CUNIQUE_CARRIER%2CAIRLINE_ID%2CTAIL_NUM%2CFL_NUM%2CORIGIN_AIRPORT_ID%2CORIGIN_AIRPORT_SEQ_ID%2CORIGIN_CITY_MARKET_ID%2CORIGIN%2CORIGIN_CITY_NAME%2CDEST_AIRPORT_ID%2CDEST_AIRPORT_SEQ_ID%2CDEST_CITY_MARKET_ID%2CDEST%2CDEST_CITY_NAME%2CCRS_DEP_TIME%2CDEP_TIME%2CDEP_DELAY%2CTAXI_OUT%2CWHEELS_OFF%2CWHEELS_ON%2CTAXI_IN%2CCRS_ARR_TIME%2CARR_TIME%2CARR_DELAY%2CCANCELLED%2CCANCELLATION_CODE%2CCARRIER_DELAY%2CWEATHER_DELAY%2CNAS_DELAY%2CSECURITY_DELAY%2CLATE_AIRCRAFT_DELAY&grouplist=&suml=&sumRegion=&filter1=title%3D&filter2=title%3D&geo=All%A0&time=January&timename=Month&GEOGRAPHY=All&XYEAR=2017&FREQUENCY=1&VarDesc=Year&VarType=Num&VarDesc=Quarter&VarType=Num&VarDesc=Month&VarType=Num&VarDesc=DayofMonth&VarType=Num&VarDesc=DayOfWeek&VarType=Num&VarName=FL_DATE&VarDesc=FlightDate&VarType=Char&VarName=UNIQUE_CARRIER&VarDesc=UniqueCarrier&VarType=Char&VarName=AIRLINE_ID&VarDesc=AirlineID&VarType=Num&VarDesc=Carrier&VarType=Char&VarName=TAIL_NUM&VarDesc=TailNum&VarType=Char&VarName=FL_NUM&VarDesc=FlightNum&VarType=Char&VarName=ORIGIN_AIRPORT_ID&VarDesc=OriginAirportID&VarType=Num&VarName=ORIGIN_AIRPORT_SEQ_ID&VarDesc=OriginAirportSeqID&VarType=Num&VarName=ORIGIN_CITY_MARKET_ID&VarDesc=OriginCityMarketID&VarType=Num&VarName=ORIGIN&VarDesc=Origin&VarType=Char&VarName=ORIGIN_CITY_NAME&VarDesc=OriginCityName&VarType=Char&VarDesc=OriginState&VarType=Char&VarDesc=OriginStateFips&VarType=Char&VarDesc=OriginStateName&VarType=Char&VarDesc=OriginWac&VarType=Num&VarName=DEST_AIRPORT_ID&VarDesc=DestAirportID&VarType=Num&VarName=DEST_AIRPORT_SEQ_ID&VarDesc=DestAirportSeqID&VarType=Num&VarName=DEST_CITY_MARKET_ID&VarDesc=DestCityMarketID&VarType=Num&VarName=DEST&VarDesc=Dest&VarType=Char&VarName=DEST_CITY_NAME&VarDesc=DestCityName&VarType=Char&VarDesc=DestState&VarType=Char&VarDesc=DestStateFips&VarType=Char&VarDesc=DestStateName&VarType=Char&VarDesc=DestWac&VarType=Num&VarName=CRS_DEP_TIME&VarDesc=CRSDepTime&VarType=Char&VarName=DEP_TIME&VarDesc=DepTime&VarType=Char&VarName=DEP_DELAY&VarDesc=DepDelay&VarType=Num&VarDesc=DepDelayMinutes&VarType=Num&VarDesc=DepDel15&VarType=Num&VarDesc=DepartureDelayGroups&VarType=Num&VarDesc=DepTimeBlk&VarType=Char&VarName=TAXI_OUT&VarDesc=TaxiOut&VarType=Num&VarName=WHEELS_OFF&VarDesc=WheelsOff&VarType=Char&VarName=WHEELS_ON&VarDesc=WheelsOn&VarType=Char&VarName=TAXI_IN&VarDesc=TaxiIn&VarType=Num&VarName=CRS_ARR_TIME&VarDesc=CRSArrTime&VarType=Char&VarName=ARR_TIME&VarDesc=ArrTime&VarType=Char&VarName=ARR_DELAY&VarDesc=ArrDelay&VarType=Num&VarDesc=ArrDelayMinutes&VarType=Num&VarDesc=ArrDel15&VarType=Num&VarDesc=ArrivalDelayGroups&VarType=Num&VarDesc=ArrTimeBlk&VarType=Char&VarName=CANCELLED&VarDesc=Cancelled&VarType=Num&VarName=CANCELLATION_CODE&VarDesc=CancellationCode&VarType=Char&VarDesc=Diverted&VarType=Num&VarDesc=CRSElapsedTime&VarType=Num&VarDesc=ActualElapsedTime&VarType=Num&VarDesc=AirTime&VarType=Num&VarDesc=Flights&VarType=Num&VarDesc=Distance&VarType=Num&VarDesc=DistanceGroup&VarType=Num&VarName=CARRIER_DELAY&VarDesc=CarrierDelay&VarType=Num&VarName=WEATHER_DELAY&VarDesc=WeatherDelay&VarType=Num&VarName=NAS_DELAY&VarDesc=NASDelay&VarType=Num&VarName=SECURITY_DELAY&VarDesc=SecurityDelay&VarType=Num&VarName=LATE_AIRCRAFT_DELAY&VarDesc=LateAircraftDelay&VarType=Num&VarDesc=FirstDepTime&VarType=Char&VarDesc=TotalAddGTime&VarType=Num&VarDesc=LongestAddGTime&VarType=Num&VarDesc=DivAirportLandings&VarType=Num&VarDesc=DivReachedDest&VarType=Num&VarDesc=DivActualElapsedTime&VarType=Num&VarDesc=DivArrDelay&VarType=Num&VarDesc=DivDistance&VarType=Num&VarDesc=Div1Airport&VarType=Char&VarDesc=Div1AirportID&VarType=Num&VarDesc=Div1AirportSeqID&VarType=Num&VarDesc=Div1WheelsOn&VarType=Char&VarDesc=Div1TotalGTime&VarType=Num&VarDesc=Div1LongestGTime&VarType=Num&VarDesc=Div1WheelsOff&VarType=Char&VarDesc=Div1TailNum&VarType=Char&VarDesc=Div2Airport&VarType=Char&VarDesc=Div2AirportID&VarType=Num&VarDesc=Div2AirportSeqID&VarType=Num&VarDesc=Div2WheelsOn&VarType=Char&VarDesc=Div2TotalGTime&VarType=Num&VarDesc=Div2LongestGTime&VarType=Num&VarDesc=Div2WheelsOff&VarType=Char&VarDesc=Div2TailNum&VarType=Char&VarDesc=Div3Airport&VarType=Char&VarDesc=Div3AirportID&VarType=Num&VarDesc=Div3AirportSeqID&VarType=Num&VarDesc=Div3WheelsOn&VarType=Char&VarDesc=Div3TotalGTime&VarType=Num&VarDesc=Div3LongestGTime&VarType=Num&VarDesc=Div3WheelsOff&VarType=Char&VarDesc=Div3TailNum&VarType=Char&VarDesc=Div4Airport&VarType=Char&VarDesc=Div4AirportID&VarType=Num&VarDesc=Div4AirportSeqID&VarType=Num&VarDesc=Div4WheelsOn&VarType=Char&VarDesc=Div4TotalGTime&VarType=Num&VarDesc=Div4LongestGTime&VarType=Num&VarDesc=Div4WheelsOff&VarType=Char&VarDesc=Div4TailNum&VarType=Char&VarDesc=Div5Airport&VarType=Char&VarDesc=Div5AirportID&VarType=Num&VarDesc=Div5AirportSeqID&VarType=Num&VarDesc=Div5WheelsOn&VarType=Char&VarDesc=Div5TotalGTime&VarType=Num&VarDesc=Div5LongestGTime&VarType=Num&VarDesc=Div5WheelsOff&VarType=Char&VarDesc=Div5TailNum&VarType=Char"
os.makedirs('data', exist_ok=True)
dest = "data/flights.csv.zip"
if not os.path.exists(dest):
r = requests.post('https://www.transtats.bts.gov/DownLoad_Table.asp',
headers=headers, params=params, data=data, stream=True)
with open("data/flights.csv.zip", 'wb') as f:
for chunk in r.iter_content(chunk_size=102400):
if chunk:
f.write(chunk)
zf = zipfile.ZipFile("data/flights.csv.zip")
fp = zf.extract(zf.filelist[0].filename, path='data/')
df = (pd
.read_csv(fp, parse_dates=["FL_DATE"])
.rename(columns=str.lower) #note this takes a callable
)
return df
Our function may be a bit sloppy from a DRY standpoint, but let's be serious: there is no need for arguments in this function and no other piece of our analysis will ever touch the fields inside of here. You could argue that it could take a flexibile filename option, but again, for the purposes of this demo, that might be overkill, but refactoring the single function to take a filename argument would take a minutue or two, now that the core logic is stable. This gives us a high-level intro point for our demo, a single call to the function.
Imagine this was going to go into a much larger set of functions or a library of some sort -- the function can be moved to a python file and work out of the box, which can simply your notebook or code at the risk of making more dependencies for users and disrupting the flow of analysis for a technical consumer.
That was a lot of crap - let's get back to the data.
In [9]:
flights = maybe_pull_airport_data()
flights.head()
flights.shape
flights.info()
Out[9]:
Out[9]:
In [10]:
hdf = flights.set_index(["unique_carrier", "origin", "dest", "tail_num", "fl_date"]).sort_index()
hdf[hdf.columns[:4]].head()
Out[10]:
I think this clears up some thinking about the rows -- indexing by the flight operator, the airport origin, airport destination, the plan id, and the date of the flight make it clear what each row is.
Selecting the data out in useful ways is somewhat straightforward, using the .loc
semantics, which allow for label-oriented indexing in a dataframe.
In [11]:
hdf.loc[["AA"], ["dep_delay"]].head()
Out[11]:
What if we wanted to get ANY flight from denver to albuquerque? Pandas IndexSlice
is a brilliant help here.
The semantics work as follows:
:
is "include all labels from this level of the index
hdf.loc[pd.IndexSlice[:, ["DEN"], ["ABQ"]], ["dep_delay"]]
translates to
hdf.loc[pd.IndexSlice[ALL CARRIERS, origin=["DEN"], dest=["ABQ"], ALL_TAILS, ALL_DATES], ["dep_delay"]]
In [12]:
(hdf.loc[pd.IndexSlice[:, ["DEN"], ["ABQ"]],
["dep_delay"]]
.sort_values("dep_delay", ascending=False)
.head()
)
Out[12]:
we can also use the powerful query
function, which allows a limited vocabulary to be executed on a dataframe and is wildly useful for slightly more clear operations.
In [13]:
(hdf
.query("origin == 'DEN' and dest == 'ABQ'")
.loc[:, "dep_delay"]
.to_frame()
.sort_values("dep_delay", ascending=False)
.head()
)
Out[13]:
These days, I prefer query
most of the time, particularly for exploration, but .loc
with explicit indices can be far faster in many cases.
So, at this stage, it seems reasonable to examine some of the data and see what might be problematic or needs further work.
We can see that several columns that should be datetimes are not "dep_time", etc. and that the City names are not really city names but City, State pairs. In Toms' series, the cleaning operations are done in a different post, but I will copy them here to fit in our framework. I think these functions can be safelycounted as advanced preprocessing and cleaning.
In [14]:
flights.head()
flights.dtypes
Out[14]:
Out[14]:
In [15]:
def extract_city_name(df):
'''
Chicago, IL -> Chicago for origin_city_name and dest_city_name. From Augsperger.
'''
cols = ['origin_city_name', 'dest_city_name']
city = df[cols].apply(lambda x: x.str.extract("(.*), \w{2}", expand=False))
df = df.copy()
df[['origin_city_name', 'dest_city_name']] = city
return df
def time_to_datetime(df, columns):
'''
Combine all time items into datetimes. From Augsperger.
2014-01-01,0914 -> 2014-01-01 09:14:00
'''
df = df.copy()
def converter(col):
timepart = (col.astype(str)
.str.replace('\.0$', '') # NaNs force float dtype
.str.pad(4, fillchar='0'))
return pd.to_datetime(df['fl_date'].astype("str") + ' ' +
timepart.str.slice(0, 2) + ':' +
timepart.str.slice(2, 4),
errors='coerce')
df[columns] = df[columns].apply(converter)
return df
Note that both methods accept a pandas.DataFrame
and return a pandas.DataFrame
. This is critical to our upcoming methodology, and for portability to spark.
It seems obvious, but writing code that operates on immutable data structures is wildly useful for data processing. DataFrames are not immutable, but can be treated as such, as many operations either implicitly return a copy or methods can be written as such. With our methods, we can now create a new top-level function that handles our preprocessing.
It's not too often that your major performance bottleneck in pandas is copying dataframes.
Anyway, we can now integrate our simple gathering method with some of the cleaning methods for a new top-level entry for our exploration.
In [16]:
def read_and_process_flights_data():
drop_cols = ["unnamed: 32", "security_delay", "late_aircraft_delay",
"nas_delay", "origin_airport_id", "origin_city_market_id",
"taxi_out", "wheels_off", "wheels_on", "crs_arr_time", "crs_dep_time",
"carrier_delay"]
df = (maybe_pull_airport_data()
.rename(columns=str.lower)
.drop(drop_cols, axis=1)
.pipe(extract_city_name)
.pipe(time_to_datetime, ['dep_time', 'arr_time'])
.assign(fl_date=lambda x: pd.to_datetime(x['fl_date']),
dest=lambda x: pd.Categorical(x['dest']),
origin=lambda x: pd.Categorical(x['origin']),
tail_num=lambda x: pd.Categorical(x['tail_num']),
unique_carrier=lambda x: pd.Categorical(x['unique_carrier']),
cancellation_code=lambda x: pd.Categorical(x['cancellation_code'])))
return df
In [17]:
## this will take a few minutues with the full 2017 data; far faster with a month's sample
flights = read_and_process_flights_data()
In [18]:
flights.tail()
flights.dtypes
flights.shape
Out[18]:
Out[18]:
Out[18]:
In [19]:
(flights
.dropna(subset=['dep_time', 'unique_carrier'])
.loc[flights['unique_carrier'].isin(flights['unique_carrier']
.value_counts()
.index[:5])]
.set_index('dep_time')
.groupby(['unique_carrier', pd.TimeGrouper("D")])
["fl_num"]
.count()
.unstack(0)
.fillna(0)
.rename_axis("Flights per Day", axis=1)
.plot()
)
Out[19]:
If we broke this out like many people do, we might end up with code like this, where each step is broken into a variable.
In [20]:
# gets the carriers with the most traffic, hacking with the index. We use this for other ops.
df_clean = flights.dropna(subset=["dep_time", "unique_carrier"])
top_carriers = flights["unique_carrier"].value_counts().index[:5]
df_clean = df_clean.query("unique_carrier in @top_carriers")
df_clean = df_clean.set_index("dep_time")
carriers_by_hour = (df_clean
.groupby(['unique_carrier',
pd.TimeGrouper("H")])["fl_num"]
.count())
carriers_df = carriers_by_hour.unstack(0)
carriers_df = carriers_df.fillna(0)
carriers_flights_per_day = (carriers_df
.rolling(24)
.sum()
.rename_axis("Flights per Day", axis=1))
carriers_flights_per_day.plot()
Out[20]:
Naming things is hard. Given that pandas has exteremely expressive semantics and nearly all analytic methods return a fresh dataframe or series, it makes it straightforward to chain many ops together. This style will lend itself well to spark and should be familiar to those of you who have worked with Scala or other functional languages.
If the chains get very verbose or hard to follow, break them up and put them in a function, where you can keep it all in one place. Try to be very specific about naming your functions (remember, naming things is hard, functions are no different).
In an exploratory context, you might continue adding methods onto your chain until you can expand and continue until you get to your chart or end stage goal. In some cases, saving some exploratory work to varibles is great.
Let's briefly talk about the .assign
operator. This operation returns a new column for a dataframe, where the new column can be a constant, some like-indexed numpy array or series, a callable that references the dataframe in question, etc. It's very powerful in method chains and also very useful for keeping your namespace clean.
the semantics of
df.assign(NEW_COLUMN_NAME=lambda df: df["column"] + df["column2"]
can be read as
assign a column named "NEW_COLUMN_NAME" to my referenced dataframe that is the sum of "column" and "column2". In the below example , the lambda references the datetime object of the departure time column to extract the hour, which gives us a convenient categorical value for examination.
This is similar to R's mutate
function in the dyplr world.
Note -- the top_carriers
variable above is a good example of something we might want to keep around, and I'll use it several times in the post.
In [21]:
#taken from Augsperger
(flights[['fl_date', 'unique_carrier', 'tail_num', 'dep_time', 'dep_delay']]
.dropna()
.query("unique_carrier in @top_carriers")
.assign(hour=lambda x: x['dep_time'].dt.hour)
#.query('5 <= dep_delay < 600')
.pipe((sns.boxplot, 'data'), 'hour', 'dep_delay')
)
Out[21]:
This enables rapid exploration, and within the interactive context, allows you to copy a cell and change single lines to modify your results.
A heatmap might be a nice way to visualize categories in this data, and the assign
syntax allows creating those categoricals seamless.
In [22]:
(flights[['fl_date', "unique_carrier", 'dep_time', 'dep_delay']]
.dropna()
.query("unique_carrier in @top_carriers")
.assign(hour=lambda x: x.dep_time.dt.hour)
.assign(day=lambda x: x.dep_time.dt.dayofweek)
.query('-1 < dep_delay < 600')
.groupby(["day", "hour"])["dep_delay"]
.median()
.unstack()
.pipe((sns.heatmap, 'data'))
)
Out[22]:
In [23]:
(flights[['fl_date', 'unique_carrier', 'dep_time', 'dep_delay']]
.query("unique_carrier in @top_carriers")
.dropna()
.assign(hour=lambda x: x.dep_time.dt.hour)
.assign(day=lambda x: x.dep_time.dt.dayofweek)
#.query('0 <= dep_delay < 600')
.groupby(["unique_carrier", "day"])["dep_delay"]
.mean()
.unstack()
.sort_values(by=0)
.pipe((sns.heatmap, 'data'))
)
Out[23]:
What about some other exploration? Pandas alows for some nifty ways of slicing up data to flexibly apply basic operations.
What if we want to "center" the carrier's delay time at an airport by the mean airport delay? This is a case where we assigning variables might be useful. We'll limit our analysis to the top carrriers / airports, and save some variables for further interactive use.
In [24]:
top_airport_codes = flights["origin"].value_counts().to_frame().head(5).index
top_airport_cities = flights["origin_city_name"].value_counts().head(5).index
top_airport_cities
top_airport_codes
Out[24]:
Out[24]:
In [26]:
grand_airport_delay = (flights
.query("unique_carrier in @top_carriers")
.query("origin in @top_airport_codes")
.groupby("origin")["dep_delay"]
.mean()
.dropna()
.to_frame()
)
airport_delay = (flights
.query("unique_carrier in @top_carriers")
.query("origin in @top_airport_codes")
.set_index("fl_date")
.groupby([pd.TimeGrouper("H"), "origin"])["dep_delay"]
.mean()
.to_frame()
)
carrier_delay = (flights
.query("unique_carrier in @top_carriers")
.query("origin in @top_airport_codes")
.set_index("fl_date")
.groupby([pd.TimeGrouper("H"), "origin", "unique_carrier"])["dep_delay"]
.mean()
.to_frame()
)
airport_delay.head()
carrier_delay.head()
Out[26]:
Out[26]:
In [27]:
grand_airport_delay
airport_delay.unstack().head()
carrier_delay.unstack(1).head()
Out[27]:
Out[27]:
Out[27]:
Pandas handles alignment along axes, so we can do an operation along an axis with another dataframe with similar index labels.
In [28]:
(carrier_delay
.unstack(1)
.div(grand_airport_delay.unstack())
.head()
)
(carrier_delay
.unstack(1)
.div(airport_delay.unstack())
.head()
)
Out[28]:
Out[28]:
Putting that together, we can then get ratios of flight delays to the overall airport delay (grand mean or daily delays).
In [29]:
(carrier_delay
.unstack(1)
.div(airport_delay.unstack())
.stack()
.reset_index()
.assign(day=lambda x: x["fl_date"].dt.dayofweek)
.set_index("fl_date")
.groupby(["unique_carrier", "day"])
.mean()
.dropna()
.unstack()
["dep_delay"]
.pipe((sns.heatmap, 'data'))
)
Out[29]:
In [30]:
(carrier_delay
.unstack(1)
.div(grand_airport_delay.unstack())
.stack()
.reset_index()
.assign(day=lambda x: x["fl_date"].dt.dayofweek)
.set_index("fl_date")
.groupby(["unique_carrier", "day"])
.mean()
.dropna()
.unstack()
["dep_delay"]
.pipe((sns.heatmap, 'data'))
)
Out[30]:
In [31]:
(carrier_delay
.unstack(1)
.subtract(airport_delay.unstack())
.stack()
.reset_index()
.assign(day=lambda x: x["fl_date"].dt.dayofweek)
.groupby(["unique_carrier", "day"])
.mean()
.dropna()
.unstack()
["dep_delay"]
.pipe((sns.heatmap, 'data'))
)
Out[31]:
So, now that we have some working flight data, let's poke at getting some tweets.
I've recently refactored the python gnip search api to be a bit more flexible, including making each search return a lazy stream. There are also some tools for programatically generated
the 'city name' column and the airport abbreviation are likely sources of help for finding tweets related to flights / airport data. We'll use those and define a small function to help quickly generate our rules, which are somewhat simplistic but should serve as a reasonable start.
In [32]:
def generate_rules(codes, cities):
base_rule = """
({code} OR "{city} airport") (flying OR flight OR plane OR jet)
-(football OR
basketball OR
baseball OR
party)
-is:retweet
"""
rules = []
for code, city in zip(list(codes), list(cities)):
_rule = base_rule.format(code=code, city=city.lower())
rule = gen_rule_payload(_rule,
from_date="2017-01-01",
to_date="2017-07-31",
max_results=500)
rules.append(rule)
return rules
gnip_rules = generate_rules(top_airport_codes, top_airport_cities)
gnip_rules[0]
Out[32]:
the gnip api has some functions to handle our connection information. Please ensure that the environment variable GNIP_PW
is set with your password. If it isn't already set, you can set it here.
In [ ]:
# os.environ["GNIP_PW"] = ""
In [ ]:
username = "agonzales@twitter.com"
search_api = "fullarchive"
account_name = "shendrickson"
endpoint_label = "ogformat.json"
og_search_endpoint = gen_endpoint(search_api,
account_name,
endpoint_label,
count_endpoint=False)
og_args = {"username": username,
"password": os.environ["GNIP_PW"],
"url": og_search_endpoint}
In our get_tweets
function, we wrap some of the functionality of our result stream to collect specific data from tweets into a dataframe.
In [ ]:
def get_tweets(result_stream, label):
fields = ["id", "created_at_datetime",
"all_text", "hashtags", "user_id",
"user_mentions", "screen_name"]
tweet_extracts = []
for tweet in result_stream.start_stream():
attrs = [tweet.__getattribute__(field) for field in fields]
tweet_extracts.append(attrs)
result_stream.end_stream()
df = pd.DataFrame(tweet_extracts, columns=fields).assign(airport=label)
return df
We can test this with a single rule.
In [ ]:
rs = ResultStream(**og_args, rule_payload=gnip_rules[0], max_results=1000)
In [ ]:
tweets = get_tweets(result_stream=rs, label=top_airport_codes[0])
In [ ]:
tweets.head()
tweets.shape
Now let's collect tweets for each airport. It might be a hair overkill, but I'll wrap the process up in a function, so we have a similar high point for grabbing our inital data. It will take a minute to grab this data, and for the time being, i'm not going to save it to disk.
In [ ]:
def pull_tweet_data(gnip_rules, results_per_rule=25000):
streams = [ResultStream(**og_args,
rule_payload=rp,
max_results=results_per_rule)
for rp in gnip_rules]
tweets = [get_tweets(rs, airport)
for rs, airport
in zip(streams, top_airport_codes)]
return pd.concat(tweets)
In [ ]:
tweets = pull_tweet_data(gnip_rules)
Given our new data, let's do some quick exploration and cleaning.
In [ ]:
tweets.shape
tweets.head()
In [ ]:
(tweets
.set_index("created_at_datetime")
.groupby([pd.TimeGrouper("D")])
.size()
.sort_values()
.tail()
)
In [ ]:
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_subplot(111)
(tweets
.set_index("created_at_datetime")
.groupby([pd.TimeGrouper("D")])
.size()
.plot()
)
ax.annotate("united senselessly\nbeating a passenger",
xytext=("2017-02-01", 1200),
xy=("2017-04-04", 900),
arrowprops=dict(facecolor="black", shrink=0.05))
The number of tweets per day by airport rule is a bit odd:
In [ ]:
(tweets
.drop(["id", "all_text"], axis=1)
.set_index("created_at_datetime")
.groupby([pd.TimeGrouper("H"), "airport"])
["user_id"]
.count()
.unstack()
.fillna(0)
.rolling(24).sum()
.plot()
)
So lets look at what is going on with LAX:
In [ ]:
tweets["airport"].value_counts()
tweets.groupby("airport")["created_at_datetime"].min().sort_values().tail(1)
min_lax_time = tweets.groupby("airport")["created_at_datetime"].min().sort_values().tail(1)[0]
Far, far more people tweeting from LAX than from other airports or the number of extra tweets were dominated by the spikes in the data. Given it's size, this makes some sense, but i would question my rules a bit. Let's even out these samples a hair, by selecting tweets only from when LAX exisited
In [ ]:
(tweets
.drop(["id", "all_text"], axis=1)
.query("created_at_datetime >= @min_lax_time")
.set_index("created_at_datetime")
.groupby([pd.TimeGrouper("D"), "airport"])
["user_id"]
.count()
.unstack()
.fillna(0)
#.rolling(7).mbean()
.plot()
)
Moving on, let's do some more things with our tweets, like parse out the mentions from the dict structure to something more useful.
We'll be making a function that takes a dataframe and returns one, so we can use it in the .pipe
method.
In [ ]:
def parse_mentions(df):
extract_mentions = lambda x: [d["name"] for d in x]
mentions = (pd.DataFrame([x for x in df["user_mentions"]
.apply(extract_mentions)])
.loc[:, [0, 1]]
.rename(columns={0: "mention_1", 1: "mention_2"})
)
return (pd.merge(df,
mentions,
left_index=True,
right_index=True)
.drop("user_mentions", axis=1)
)
In [ ]:
airline_name_code_dict = {
"Southwest Airlines": "WN",
"Delta": "DL",
"American Airlines": "AA",
"United Airlines": "UA",
"Sky": "Sk"
}
now, what about labeling a row with strictly American Airlines mentions? We could do this a few ways...
In [ ]:
(tweets
.pipe(parse_mentions)
.assign(AA=lambda df: (df["mention_1"] == "American Airlines") |
(df["mention_2"] == "American Airlines"))
.query("AA == True")
.head()
)
In [ ]:
(tweets
.pipe(parse_mentions)
.query("mention_1 == 'American Airlines' or mention_2 == 'American Airlines'")
.shape
)
In [ ]:
(tweets
.pipe(parse_mentions)
.query("mention_1 == 'American Airlines' or mention_2 == 'American Airlines'")
.query("created_at_datetime >= @min_lax_time")
.set_index("created_at_datetime")
.groupby([pd.TimeGrouper("D"), "airport"])
["user_id"]
.count()
.unstack()
.fillna(0)
.rolling(7).mean()
.plot()
)
Moving on a bit, what about a simple sentiment model? We'll grab a word database that simply matches words to a value and use it as a simple baseline.
In [ ]:
from nltk.tokenize import TweetTokenizer
def get_affin_dict():
url = "https://raw.githubusercontent.com/fnielsen/afinn/master/afinn/data/AFINN-111.txt"
affin_words = (pd
.read_table(url,
sep='\t',
header=None)
.rename(columns={0: "word", 1: "score"})
.to_dict(orient="list")
)
affin_words = {k: v for k, v in
zip(affin_words["word"],
affin_words["score"])}
return affin_words
tknizer = TweetTokenizer()
def score_sentiment(words):
words = set(words)
union = words & affin_words.keys()
return sum([affin_words[w] for w in union])
def score_tweet(tweet_text):
return score_sentiment(tknizer.tokenize(tweet_text))
affin_words = get_affin_dict()
In [ ]:
(tweets
.assign(sentiment=lambda df: df["all_text"].apply(score_tweet))
["sentiment"]
.plot.hist(bins=20))
In [ ]:
(tweets
.assign(sentiment=lambda df: df["all_text"].apply(score_tweet))
.pipe(lambda df: pd.concat([df.query("sentiment <= -5").head(),
df.query("sentiment >= 5").head()]))
)
seems semi-reasonable to me!
Let's look at a timeseries of sentiment overall:
In [ ]:
(tweets
.assign(sentiment=lambda df: df["all_text"].apply(score_tweet))
.set_index("created_at_datetime")
.groupby([pd.TimeGrouper("D")])
["sentiment"]
.mean()
.rolling(2).mean()
.plot()
)
Since we have our reasonable sentiment and mentions data, let's assign it to a fresh dataframe and continue looking.
Note that a full data pull step at this point might look like
.python
tweets = (pull_tweet_data(gnip_rules)
.pipe(parse_mentions)
.assign(sentiment=lambda df: df["all_text"].apply(score_tweet))
)
In [ ]:
tweets = (tweets
.pipe(parse_mentions)
.assign(sentiment=lambda df: df["all_text"].apply(score_tweet))
)
And let's do some basic exploration of our tweet data.
In [ ]:
(tweets
.groupby(["airport"])
["sentiment"]
.mean()
.sort_values()
.plot.barh()
)
In [ ]:
(tweets
.assign(day=lambda x: x.created_at_datetime.dt.dayofweek)
.assign(hour=lambda x: x.created_at_datetime.dt.hour)
.groupby(["day", "hour"])
["sentiment"]
.mean()
.unstack()
.pipe((sns.heatmap, 'data') )
)
In [ ]:
(tweets
.assign(hour=lambda x: x.created_at_datetime.dt.hour)
.assign(day=lambda x: x.created_at_datetime.dt.dayofweek)
.groupby(["airport", "day"])
["sentiment"]
.mean()
.unstack()
.sort_values(by=0)
.pipe((sns.heatmap, 'data') )
)
In [ ]:
(tweets
.assign(hour=lambda x: x.created_at_datetime.dt.hour)
.groupby(["airport", "hour"])
["sentiment"]
.mean()
.unstack()
.sort_values(by=0)
.pipe((sns.heatmap, 'data'))
)
In [ ]:
(tweets
.assign(day=lambda x: x.created_at_datetime.dt.dayofweek)
.query("airport == 'ATL' and day == 2")
.sample(10)
.all_text
)
(tweets
.assign(day=lambda x: x.created_at_datetime.dt.dayofweek)
.query("airport == 'ATL' and day == 5")
.sample(10)
.all_text
)
In [ ]:
tweet_sent_airport = (tweets
.set_index("created_at_datetime")
.groupby([pd.TimeGrouper("D"), "airport"])["sentiment"]
.mean()
)
In [ ]:
delay_sent = (pd.concat([airport_delay, tweet_sent_airport],
axis=1,
names=("day", "airport"))
.sort_index())
In [ ]:
for code in top_airport_codes:
(delay_sent
.loc[pd.IndexSlice[:, code], :]
.plot(subplots=True, title="Sentiment and delay time at {}".format(code)))
In [ ]:
delay_sent.loc[pd.IndexSlice[:, "ATL"], :].corr()
In [ ]:
delay_sent.groupby(level=1).corr().T.loc["sentiment"].unstack()["dep_delay"].sort_values()