As the API Exploration Notebook shows, each poll of the scraper produces 3 predicted arrival times for each line direction at a station. We want to transform and reduce these data to only feature observed train arrivals at stations (per this issue).
This notebook explores how to do this. It was initially developed using a day of data 2017-06-14, but now uses a more recent day of data from the serverless data pipeline in 2019.
In [2]:
import datetime
from psycopg2 import connect
import configparser
import pandas as pd
import pandas.io.sql as pandasql
import matplotlib.pyplot as plt
import matplotlib as mpl
import seaborn as sns
import matplotlib.ticker as ticker
In [3]:
con = connect(user='rad', database='ttc')
In [ ]:
CONFIG = configparser.ConfigParser(interpolation=None)
CONFIG.read('../db.cfg')
dbset = CONFIG['DBSETTINGS']
con = connect(**dbset)
We want to generate observed arrival times, in a format similar to GTFS. The GTFS schedule will be useful in this process, data was downloaded from Transit Feeds, the schema of the data is in ttc_gtfs_create.sql and it is processed to a more useful format in PostgreSQL with ttc_gtfs_process.sql.
From gtfs, we can get a sense of the target data count, how many stops are scheduled on the three subway lines for which we have data?
In [8]:
sql = '''SELECT COUNT(1)
FROM gtfs.stop_times
INNER JOIN gtfs.trips USING (trip_id)
INNER JOIN gtfs.routes USING (route_id)
INNER JOIN gtfs.calendar USING (service_id)
WHERE monday AND route_type = 1 AND route_short_name != '3'
'''
with con.cursor() as cur:
cur.execute(sql)
print(cur.fetchone()[0])
This is a ball park figure we are aiming for in our filtering. Creating a materialized view of the raw poll data for a given day Wednesday, June 14th 2017
In [6]:
sql = '''DROP MATERIALIZED VIEW IF EXISTS test_day CASCADE;
CREATE MATERIALIZED VIEW test_day AS
SELECT requestid, stationid, lineid, create_date, request_date, station_char, subwayline, system_message_type,
timint, traindirection, trainid, train_message
FROM requests_serverless
INNER JOIN ntas_data_serverless USING (requestid)
WHERE request_date >= '2019-07-17'::DATE + interval '5 hours'
AND request_date < '2019-07-17'::DATE + interval '29 hours'
'''
with con:
with con.cursor() as cur:
cur.execute(sql)
In [7]:
with con.cursor() as cur:
cur.execute('SELECT COUNT(1) FROM test_day')
print(cur.fetchone()[0])
Cool. Definitely some work to do.
Trying out a very basic filter, which has a Known Issue
In [8]:
sql = '''SELECT COUNT(DISTINCT (requestid, lineid, trainid, traindirection, stationid))
FROM test_day
WHERE train_message = 'AtStation' OR timint < 1'''
with con.cursor() as cur:
cur.execute(sql)
print(cur.fetchone()[0])
It's a start.
If every line takes more than an hour to do a round-trip, we might be able to look for a distinct train-line-direction-station combination for each hour.
In [11]:
sql = '''WITH trips AS (SELECT route_short_name, (SELECT trip_id FROM gtfs.trips WHERE trips.route_id = routes.route_id LIMIT 1)
FROM gtfs.routes
WHERE route_type = 1 AND route_short_name != '3' )
SELECT route_short_name, MIN(arrival_time) AS "Start Time", MIN(stop_sequence) ||'-'||MAX(stop_sequence) AS "Stops", MAX(arrival_time) - MIN(arrival_time) AS "Half-trip time"
FROM gtfs.stop_times
INNER JOIN trips USING(trip_id)
GROUP BY route_short_name, trip_id
ORDER BY route_short_name, trip_id
'''
trips = pandasql.read_sql(sql, con)
In [12]:
trips
Out[12]:
So any given train on line 1 or 2 shouldn't pass the same station going the same direction in an hour. So we could add the hour in a DISTINCT
query.
What's up with Line 4? It's short, but not two stations short... According to TransitFeeds, a GTFS host and exploration platform, Line 4 trains start the day at non-terminus stations. Line 4 actually makes 5 stops, and it takes 8 minutes to go from one terminus to another, with a two and a half minute layover at each terminus.
Potential issues:
Better solution:
In [19]:
sql = ''' WITH unique_trains AS
(SELECT lineid::TEXT, COUNT(DISTINCT trainid) AS "Number of trains in a day"
FROM test_day
GROUP BY lineid)
, unique_trips AS(SELECT route_short_name AS lineid, COUNT(DISTINCT trip_id) AS "Number of scheduled trips"
FROM gtfs.routes -- ON lineid::TEXT = route_short_name
INNER JOIN gtfs.trips USING (route_id)
INNER JOIN gtfs.calendar USING (service_id)
WHERE monday AND route_type = 1 AND route_short_name != '3'
GROUP BY route_short_name)
SELECT *
FROM unique_trains
INNER JOIN unique_trips USING (lineid)
ORDER BY lineid'''
pandasql.read_sql(sql, con)
Out[19]:
According to wikipedia the number of trains for each line is:
line | number of trains |
---|---|
1 | 76 |
2 | 62 |
4 | 6 |
So the
In [1]:
sql = ''' SELECT trainid, lineid, traindirection, stationid, station_char, create_date, request_date, timint, train_message
FROM test_day
INNER JOIN (SELECT trainid FROM test_day WHERE lineid = 1 AND create_date::TIME > '07:00'::TIME LIMIT 1) one_train USING (trainid)
WHERE (timint < 1 OR train_message != 'Arriving') AND lineid = 1
ORDER BY create_date
'''
one_train = pandasql.read_sql(sql, con)
In [ ]:
In [ ]:
In [5]:
one_train
Out[5]:
In [7]:
sql = '''CREATE MATERIALIZED VIEW filtered.test_day AS
SELECT requestid, stationid, lineid, create_date, request_date, station_char, subwayline, system_message_type,
timint, traindirection, trainid, train_message
FROM filtered.requests
INNER JOIN filtered.ntas_data USING (requestid)
WHERE request_date >= '2017-06-14'::DATE + interval '5 hours'
AND request_date < '2017-06-14'::DATE + interval '29 hours'
'''
with con:
with con.cursor() as cur:
cur.execute(sql)
In [8]:
sql = ''' SELECT trainid, lineid, traindirection, stationid, station_char, create_date, request_date, timint, train_message
FROM filtered.test_day
INNER JOIN (SELECT trainid FROM filtered.test_day WHERE lineid = 1 AND create_date::TIME > '07:00'::TIME LIMIT 1) one_train USING (trainid)
WHERE (timint < 1 OR train_message != 'Arriving') AND lineid = 1
ORDER BY create_date
'''
one_train = pandasql.read_sql(sql, con)
In [9]:
one_train
Out[9]:
Ah. We can see train 136 skipped station 14. Fortunately, we have unfiltered data from the same day
In [3]:
sql = ''' SELECT trainid, lineid, traindirection, stationid, station_char, create_date, create_date + timint * interval '1 minute' AS expected_arrival, timint, train_message
FROM test_day
WHERE trainid = 136 AND (timint < 1 OR train_message != 'Arriving') AND lineid = 1
ORDER BY create_date + timint * interval '1 minute'
'''
train_136 = pandasql.read_sql(sql, con)
In [4]:
train_136
Out[4]:
In [7]:
train_136.iloc[[14]]
Out[7]:
So we have an expected arrival time at Osgoode station from the unfiltered dataset, meaning that it can have some use after all! However, we can see at the end that the train is super delayed
In [9]:
train_136[train_136['create_date'] > datetime.datetime(2017, 6, 15, 1,30)]
Out[9]:
So this doesn't seem like a particularly good example, since the train is just ultimately stuck at Sheppard West station until the end of the (scraping) day. The solution in this case would probably be to just filter out any of these observations where train_message == 'Delayed'
and timint > 2
. Let's try to see if we can find anything else.
In [10]:
train_136[train_136['train_message'] == 'Delayed']
Out[10]:
Lucky for us, train 136 is delayed a second time in our day, around 12:55.
In [12]:
train_136[(train_136['create_date'] > datetime.datetime(2017, 6, 14, 12, 50)) & (train_136['create_date'] < datetime.datetime(2017, 6, 14, 13, 30))]
Out[12]:
It seems like we could actually be fine if we just filtered out observations with Delayed
and timint <1
. The delayed records could be useful to store in a separate table for their own analysis, but they don't appear to really fill in the gaps here
In [13]:
train_136[(train_136['create_date'] > datetime.datetime(2017, 6, 14, 12, 50))
& (train_136['create_date'] < datetime.datetime(2017, 6, 14, 13, 30))
& ((train_136['train_message'] != 'Delayed') | (train_136['timint'] < 1.0 ))]
Out[13]:
Coincidentally, this period of time also features a short-turn, at 13:15, and we want to identify distinct trips (where trains turn around, either at the end of the usual run, or early). This should be relatively easy to implement with the traindirection
column
In [9]:
split_trips = '''CREATE SEQUENCE IF NOT EXISTS trip_ids;
CREATE MATERIALIZED VIEW test_day_w_trips AS
SELECT trainid, lineid, traindirection, stationid, station_char, create_date, create_date + timint * interval '1 minute' AS expected_arrival, timint, train_message,
CASE traindirection WHEN lag(traindirection) OVER w THEN currval('trip_ids') ELSE nextval('trip_ids') END AS trip_id
FROM test_day
WHERE (timint < 1 OR train_message = 'AtStation')
WINDOW w AS (PARTITION BY lineid, trainid ORDER BY create_date + timint * interval '1 minute')
'''
with con:
with con.cursor() as cur:
cur.execute(split_trips)
A final step is to group together multiple observations at a same station, during a same trip, to get an approximation of arrival and "departure" time.
In [10]:
final_step = ''' DROP MATERIALIZED VIEW IF EXISTS test_day_final;
CREATE MATERIALIZED VIEW test_day_final AS
SELECT trainid, lineid, traindirection, stationid, station_char, trip_id,
MIN(expected_arrival) AS estimated_arrival, MAX(expected_arrival) AS estimated_departure,
CASE (ARRAY_AGG(train_message ORDER BY expected_arrival))[1] WHEN 'AtStation' THEN 1 ELSE 0 END AS exact_arr,
CASE (ARRAY_AGG(train_message ORDER BY expected_arrival DESC))[1] WHEN 'AtStation' THEN 1 ELSE 0 END AS exact_dep
FROM test_day_w_trips
GROUP BY trainid, lineid, traindirection, stationid, station_char, trip_id
'''
with con:
with con.cursor() as cur:
cur.execute(final_step)
Woo! Now to test how well this process did
In [11]:
cnt = '''SELECT COUNT(*) FROM test_day_final'''
with con.cursor() as cur:
cur.execute(cnt)
print('The number of station stops made is', cur.fetchone()[0])
Huh. 5k higher than the scheduled number of station stops
In [12]:
sql = ''' WITH observed_trips AS
(SELECT lineid::TEXT, COUNT(DISTINCT trip_id) AS "Number of observed trips"
FROM test_day_final
GROUP BY lineid)
, unique_trips AS(SELECT route_short_name AS lineid, COUNT(DISTINCT trip_id) AS "Number of scheduled trips"
FROM gtfs.routes -- ON lineid::TEXT = route_short_name
INNER JOIN gtfs.trips USING (route_id)
INNER JOIN gtfs.calendar USING (service_id)
WHERE monday AND route_type = 1 AND route_short_name != '3'
GROUP BY route_short_name)
SELECT *
FROM observed_trips
INNER JOIN unique_trips USING (lineid)
ORDER BY lineid'''
pandasql.read_sql(sql, con)
Out[12]:
Welp. Seems like that trip identification code is a little too basic. Let's try to investigate what might be the problem:
In [13]:
sql = '''WITH trips AS(SELECT lineid, trip_id, EXTRACT('minutes' FROM MAX(estimated_arrival) - MIN(estimated_arrival)) as trip_duration
FROM test_day_final
GROUP BY lineid, trip_id)
SELECT *
FROM trips
ORDER BY trip_duration
LIMIT 10'''
pandasql.read_sql(sql, con)
Out[13]:
In [18]:
sql = '''WITH one_stop_trips AS(SELECT lineid, trip_id, ARRAY_AGG(station_char) AS stations
FROM test_day_final
GROUP BY lineid, trip_id
HAVING COUNT(1) = 1)
SELECT lineid, unnest(stations) as station_char, COUNT(1) "Number of Trips"
FROM one_stop_trips
GROUP BY lineid, station_char
ORDER BY lineid, "Number of Trips" DESC'''
pandasql.read_sql(sql, con)
Out[18]:
So for the most part we seem to have issues with identifying trip start/end at the termini.
Line | One stop Trips at Termini |
---|---|
1 | 766 |
2 | 481 |
4 | 791 |
So approximately half of "extra trips" are one stop trips at termini. Let's see the overall distribution of number of stops for each trip we've inferred and compare that to the ideal.
In [14]:
sql = '''WITH inferred_trips AS(SELECT lineid, trip_id, COUNT(1) as stops
FROM test_day_final
GROUP BY lineid, trip_id
),
inferred_trip_length AS( SELECT lineid, stops, COUNT(trip_id) as obs_trips
FROM inferred_trips
GROUP BY lineid, stops)
,
gtfs_trip_lengths AS(SELECT route_short_name::INT AS lineid, trip_id, COUNT(1) as stops
FROM gtfs.stop_times
INNER JOIN gtfs.trips USING (trip_id)
INNER JOIN gtfs.routes USING (route_id)
INNER JOIN gtfs.calendar USING (service_id)
WHERE monday AND route_type = 1 AND route_short_name != '3'
GROUP BY route_short_name, trip_id
)
,gtfs_trip_length_distro AS (SELECT lineid, stops, COUNT(trip_id) as num_trips
FROM gtfs_trip_lengths
GROUP BY lineid, stops)
SELECT lineid, stops, COALESCE(num_trips,0) as scheduled, COUNT(inferred_trips.trip_id) as observed
FROM inferred_trips
FULL OUTER JOIN gtfs_trip_length_distro USING (lineid, stops)
GROUP BY lineid, stops, num_trips
ORDER BY lineid, stops
'''
trip_lengths = pandasql.read_sql(sql, con)
In [15]:
line_one = trip_lengths[trip_lengths['lineid'] == 1]
fig, ax = plt.subplots(figsize=(16,9))
line_one.plot(x='stops', y='scheduled', kind='bar', ax=ax,position=0, color='red')
line_one.plot(x='stops', y='observed', sharey=True, sharex=True, kind='bar', ax=ax, position=1, color='blue')
ax.set_title('Line 1 Distribution of Trip Lengths')
ax.yaxis.set_label('Number of trips')
So we are certainly getting 1, 2, and 3 stop trips that shouldn't exist and undercounting the more appropriate number of trips. The one-stop trips are primarily at termini. What is happening with the 2, 3 stop trips...?
In [16]:
sql_2_3 = '''SELECT stops, COUNT(1)
FROM (
SELECT array_agg(station_char ORDER BY estimated_arrival) AS stops
FROM test_day_final
WHERE lineid = 1
GROUP BY trip_id
HAVING COUNT(1) =2 OR COUNT(1) = 3
)grouped_trips
GROUP BY stops
ORDER BY COUNT(1) DESC
LIMIT 10
'''
pandasql.read_sql(sql_2_3, con)
Out[16]:
The top "trips" are from Bloor-Spadina to Yonge via St. George and vice-versa, but these are the stations on line 2...
In [ ]:
sql = ''' CREATE MATERIALIZED VIEW test_day_stop_arrival AS
SELECT trainid, lineid, traindirection, stationid, station_char,
MIN(create_date + timint * interval '1 minute') AS expected_arrival, timint, train_message,
FROM test_day
WHERE (timint < 1 OR train_message = 'AtStation')
GROUP BY trainid, lineid, traindirection, stationid, station_char, '''