In [526]:
import pandas as pd
import numpy as np
import warnings
In [527]:
def gender_feature(filename='gender_female.csv'):
""" Reads in a CSV, drops NAs, returns the DataFrame. """
df = pd.read_csv(filename)
df = df.dropna(how='any')
return df.set_index('entity_id')
In [528]:
my_gender = gender_feature()
my_gender.head()
Out[528]:
In [529]:
def incident_aggregation(as_of_date, agg_col, date_col, time_delta,
agg_funcs, filename='incidents.csv'):
"""
Reads and aggregates a CSV file over a date range.
Args:
as_of_date (datetime): End of the aggregation window (excluded).
agg_col (str): Name of the column for aggregation.
date_col (str): Name of the column that gives knowledge dates for
the values in agg_col.
time_delta (pd.Timedelta): Time range. Gives the time
window preceding the as_of_date over
which we aggregate.
agg_funcs (dict): A dictionary that maps column names to functions.
The functions will be applied to the groupby,
and the resulting dataframe contains columns
named like <key>_<timedelta>.
filename (str): Path to the CSV that should be aggregated. The
CSV must contain an entity_id column, as well as
the columns given by agg_col and date_col.
Returns (pd.DataFrame): A dataframe, uniquely indexed by entity_id,
with columns that contain the aggregations
from agg_funcs.
"""
# read the CSV
df = pd.read_csv(filename)
df[date_col] = pd.to_datetime(df[date_col])
# restrict to data in aggregation window
df = df.loc[df[date_col] < as_of_date,:]
df = df.loc[df[date_col] >= (as_of_date-time_delta),:]
# add as_of_date to the index
df['as_of_date'] = as_of_date
df = df.set_index(['entity_id','as_of_date'])
# just some formatting for naming the columns
nice_timedelta_str = str(time_delta).replace('00:00:00','').replace(' ','')
agg_funcs = {k+'_'+nice_timedelta_str: v for k,v in agg_funcs.items()}
# aggregate by entity_id and apply the functions
return df[agg_col].groupby(level=[0,1]).agg(agg_funcs)
In [530]:
my_agg = incident_aggregation(pd.to_datetime('2016-01-01'),
'incident_type',
'incident_date',
pd.Timedelta(365,'d'),
{'count_neglects': lambda x: sum(x=='neglect_of_duty'),
'count_conduct': lambda x: sum(x=='conduct_unbecoming')})
In [531]:
my_agg.head()
Out[531]:
In [532]:
def label_aggregation(as_of_date, time_delta, filename='incidents.csv'):
""" Find if an entity has a 'discipline' or 'conduct_unbecoming' incident
that is decided as sustained.
Args:
as_of_date (datetime): Beginning of the aggregation window (included).
time_delta (pd.Timedelta): Time range. Gives the time
window following the as_of_date over
which we aggregate.
filename (str): Path to the incidents CSV, which contains
entity_id, incident type and date, and
decision with date.
Returns (pd.Series):
A boolean series, indexed uniquely by entity_id and as_of_date,
giving if the entity had at least one sustained disciplinary
or conduct_unbecoming event that fell within the time window.
"""
# load the CSV
df = pd.read_csv(filename, parse_dates=['incident_date','decision_date'])
# restrict to incidents after the as_of_date
df = df.loc[df.incident_date>=as_of_date,:]
# restrict to decisions in the time window
df = df.loc[df.decision_date<(as_of_date+time_delta),:]
# add the as_of_date to the index
df['as_of_date'] = as_of_date
df = df.set_index(['entity_id','as_of_date'])
# binarize
df['adverse_incident'] = df.incident_type.isin(['discipline','conduct_unbecoming'])\
&(df.decision=='sustained')
# aggregate and return
return df.adverse_incident.groupby(level=[0,1]).max()
In [533]:
my_labels = label_aggregation(pd.to_datetime('2016-01-01'), pd.Timedelta(90,'d'))
In [534]:
my_labels.head()
Out[534]:
In [535]:
my_gender['as_of_date'] = pd.to_datetime('2016-01-01')
my_gender = my_gender.set_index(['as_of_date'], append=True)
In [536]:
dataset = my_labels.to_frame().join(my_gender, how='left')\
.join(my_agg, how='left')
dataset.head()
Out[536]:
In [537]:
def active_officers(as_of_date, filename='patrol_duty.csv'):
"""Check if an officer is on patrol duty for the as_of_date."""
# read CSV
df = pd.read_csv(filename, parse_dates=['start_date','end_date'])
# check if as_of_date falls between start and end date of duty
df['active'] = (df.start_date<=as_of_date)&(df.end_date>=as_of_date)
df['as_of_date'] = as_of_date
df = df.set_index(['entity_id','as_of_date'])
return df[df.active==True].index
In [538]:
my_active = active_officers(pd.to_datetime('2016-01-01'))
We can now index into the dataset with our new entity list:
In [540]:
dataset.loc[my_active,:].head()
Out[540]:
We need to coalesce / impute!
In [541]:
def label_aggregation(as_of_dates, time_delta, filename='incidents.csv'):
""" Find if an entity has a 'discipline' or 'conduct_unbecoming' incident
that is decided as sustained.
Args:
as_of_dates ([datetime]): List of beginnings of the aggregation
windows (included).
time_delta (pd.Timedelta): Time range. Gives the time
window following the as_of_date over
which we aggregate.
filename (str): Path to the incidents CSV, which contains
entity_id, incident type and date, and
decision with date.
Returns (pd.Series):
A boolean series, indexed uniquely by entity_id and as_of_date,
giving if the entity had at least one sustained disciplinary
or conduct_unbecoming event that fell within the time window.
"""
# load the CSV
df = pd.read_csv(filename, parse_dates=['incident_date','decision_date'])
if len(set(as_of_dates))!=len(as_of_dates):
raise ValueError("As of dates need to be unique!")
as_of_dates = sorted(as_of_dates)
# let's be cautious here already and do a sanity check
for idx, aod in enumerate(as_of_dates[:-1]):
if aod+time_delta >= as_of_dates[idx+1]:
warnings.warn("Your label windows will overlap!")
dfs = []
# go over all the dates
for as_of_date in as_of_dates:
# restrict to incidents after the as_of_date
this_df = df.loc[df.incident_date>=as_of_date,:]
# restrict to decisions in the time window
this_df = this_df.loc[this_df.decision_date<(as_of_date+time_delta),:]
# add the as_of_date to the index
this_df['as_of_date'] = as_of_date
this_df = this_df.set_index(['entity_id','as_of_date'])
# binarize
this_df['adverse_incident'] = this_df.incident_type.isin(['discipline','conduct_unbecoming'])\
&(this_df.decision=='sustained')
dfs.append(this_df.adverse_incident.groupby(level=[0,1]).max())
# concat and return
return pd.concat(dfs)
In [542]:
my_labels = label_aggregation([pd.to_datetime('2016-01-01'),
pd.to_datetime('2016-05-01')],
pd.Timedelta(90,'d'))
In [543]:
my_labels.head()
Out[543]:
In [544]:
my_labels.index.levels[1]
Out[544]:
In [545]:
def active_officers(as_of_dates, filename='patrol_duty.csv'):
"""Check if an officer is on patrol duty for the as_of_date."""
# read CSV
df = pd.read_csv(filename, parse_dates=['start_date','end_date'])
dfs = []
for as_of_date in as_of_dates:
# check if as_of_date falls between start and end date of duty
this_active = (df.start_date<=as_of_date)&(df.end_date>=as_of_date)
df['as_of_date'] = as_of_date
dfs.append(df[this_active==True].set_index(['entity_id','as_of_date']))
return pd.concat(dfs).sort_index().index
In [546]:
my_active = active_officers([pd.to_datetime('2016-01-01'),
pd.to_datetime('2016-05-01')])
In [547]:
my_active
Out[547]:
In [548]:
def train_test_splitter(split_dates, label_time_delta, label_fetcher, feature_fetchers):
"""TODO: Write nice documentation. Always!"""
test_as_of_dates = [aod for aod, usefor in split_dates if usefor=='test']
train_as_of_dates = [aod for aod, usefor in split_dates if usefor=='train']
as_of_dates = list(zip(*split_dates))[0]
# TODO: check that the as_of_dates are unique
# check that the train/test splits are well-separated
if max(train_as_of_dates) + label_time_delta >= min(test_as_of_dates):
raise ValueError("Your train and test label windows overlap!")
# fetch the index of active officers - make DF, beacuse Pandas isn't nice here
actives = pd.DataFrame(index=active_officers(as_of_dates))
# fetch the DF with labels
labels = label_fetcher(as_of_dates, label_time_delta)
# subset (or superset!) the labels to active entities
dataset = actives.join(labels, how='left')
# now join in all the features
for ff in feature_fetchers:
these_feats = []
# first, concatenate the various as-of-dates per feature
# TODO: this should be handled by the features, really!
for as_of_date in as_of_dates:
# note: You could do some **kwargs magic here, and have every feature accept arbitrary arguments.
# Nice if you want to handle a single config dict!
these_feats.append(ff(as_of_date))
dataset = dataset.join(pd.concat(these_feats), how='left')
# let's make an aux with test/train information:
# small dataframe with just dates --> usefor
only_dates = pd.DataFrame(split_dates,
columns=['as_of_date', 'usefor']).set_index('as_of_date')
# blow up that dataframe with the dataset index (as_of_dates)
aux = only_dates.loc[dataset.index.get_level_values(1),:]
# share index
aux.index = dataset.index
return dataset, aux
With this, we can grab training/testing data with many as-of-dates:
In [549]:
# let's wrap the gender feature - it's nice to have even static features mirror the as-of-date
def nicer_gender(as_of_date):
df = gender_feature()
df['as_of_date'] = as_of_date
return df.set_index('as_of_date', append=True)
my_split_dates = [(pd.to_datetime('2016-01-01'), 'train'),
(pd.to_datetime('2016-04-30') , 'train'),
(pd.to_datetime('2016-08-01'), 'test')]
my_feature_fetchers = [lambda aod: incident_aggregation(aod, 'incident_type', 'incident_date',
pd.Timedelta(90,'d'),
{'count_neglects': lambda x: sum(x=='neglect_of_duty'),
'count_conduct': lambda x: sum(x=='conduct_unbecoming')}),
lambda aod: incident_aggregation(aod, 'decision', 'decision_date',
pd.Timedelta(190,'d'),
{'count_decision_other': lambda x: sum(x=='other')}),
nicer_gender
]
In [550]:
m, aux = train_test_splitter(my_split_dates,
label_time_delta=pd.Timedelta(90,'d'),
label_fetcher=label_aggregation,
feature_fetchers=my_feature_fetchers)
In [551]:
m.head()
Out[551]:
In [552]:
aux.head()
Out[552]:
In [553]:
m.loc[aux[aux.usefor=='train'].index,:].head()
Out[553]:
TODOs: