This week we'll take a look at more ETL functions, building up a mini warehouse using Bikeshare and weather data.
We'll download the same Bikeshare data you've worked with before, and we'll create some database tables and indexes more deliberately using PostgreSQL.
In [ ]:
%load_ext sql
Note: To use PostgreSQL on datanotebook.org, use the following steps. If you are running the notebook elsewhere, adjust the dbuser
username and host name as appropriate.
In [ ]:
!echo 'redspot' | sudo -S service postgresql restart
In [ ]:
!createdb -U dbuser week7
In [ ]:
%sql postgresql://dbuser@localhost:5432/week7
In [ ]:
!wget https://raw.githubusercontent.com/gwsb-istm-6212-fall-2016/syllabus-and-schedule/master/projects/project-01/2016q1.csv.zip
In [ ]:
!unzip 2016q1.csv.zip
In [ ]:
!wc -l 2016q1.csv
In [ ]:
!csvcut -n 2016q1.csv
In [ ]:
!shuf -n 10000 2016q1.csv | csvstat
Based on these values, I expect we can work with the following:
In [ ]:
%%sql
DROP TABLE IF EXISTS rides;
CREATE TABLE rides (
duration_ms INTEGER,
start_date TIMESTAMP,
end_date TIMESTAMP,
start_station_id INTEGER,
start_station VARCHAR(64),
end_station_id INTEGER,
end_station VARCHAR(64),
bike_number CHAR(6),
member_type CHAR(10)
)
Now we'll load the data in more simply. Note that this requires the use of an absolute path, so adjust it to your location:
In [ ]:
!pwd
In [ ]:
%%sql
COPY rides FROM '/home/jovyan/work/2016q1.csv'
CSV
HEADER
QUOTE '"'
DELIMITER ',';
In [ ]:
%%sql
SELECT COUNT(*) FROM rides;
In [ ]:
%%sql
DROP INDEX IF EXISTS idx_stations_member_type;
CREATE INDEX idx_stations_member_type ON rides (start_station, end_station, member_type);
In [ ]:
%%sql
DROP INDEX IF EXISTS idx_start_stations;
CREATE INDEX idx_start_stations ON rides (start_station);
In [ ]:
%%sql
DROP INDEX IF EXISTS idx_end_stations;
CREATE INDEX idx_end_stations ON rides (end_station);
In [ ]:
%%sql
SELECT DISTINCT start_station, start_station_id
FROM rides
ORDER BY start_station
LIMIT 10;
In [ ]:
%%sql
SELECT DISTINCT end_station, end_station_id
FROM rides
ORDER BY end_station
LIMIT 10;
To be sure we get them all, we need to combine them into a union set.
In [ ]:
%%sql
SELECT station, station_id
FROM (
SELECT DISTINCT start_station AS station, start_station_id AS station_id FROM rides
UNION
SELECT DISTINCT end_station AS station, end_station_id AS station_id FROM rides
) AS d
LIMIT 10;
Now we can create a new table to house the unique station names.
In [ ]:
%%sql
DROP TABLE IF EXISTS stations;
CREATE TABLE stations (
id SERIAL,
name VARCHAR(64),
station_key INTEGER
);
In [ ]:
%%sql
INSERT INTO stations (name, station_key)
SELECT station, station_key FROM (
SELECT DISTINCT start_station AS station, start_station_id AS station_key FROM rides
UNION
SELECT DISTINCT end_station AS station, end_station_id AS station_key FROM rides
) AS d;
In [ ]:
%%sql
SELECT * FROM stations LIMIT 10;
We can even add these new identifiers back to the original table now.
In [ ]:
%%sql
ALTER TABLE rides
ADD COLUMN start_station_nid INTEGER;
In [ ]:
%%sql
UPDATE rides AS r
SET start_station_nid = s.id
FROM stations AS s
WHERE r.start_station = s.name;
In [ ]:
%%sql
ALTER TABLE rides
ADD COLUMN end_station_nid INTEGER;
In [ ]:
%%sql
UPDATE rides AS r
SET end_station_nid = s.id
FROM stations AS s
WHERE r.end_station = s.name;
In [ ]:
%%sql
SELECT * FROM rides
LIMIT 5;
It feels like we should do a little more with the stations, doesn't it? Let's see if we can geocode them using the geocoder library.
In [ ]:
!pip install geocoder
In [ ]:
import geocoder
In [ ]:
%%sql
ALTER TABLE stations
ADD COLUMN lat NUMERIC DEFAULT 0,
ADD COLUMN lng NUMERIC DEFAULT 0;
Note the specific user and host names are required at datanotebook.org. If you're running this locally, you'll need to adjust your username/dbname/etc. as appropriate.
In [ ]:
import psycopg2
conn = psycopg2.connect("dbname='week7' user='dbuser' host='localhost'")
c = conn.cursor()
In [ ]:
c.execute("SELECT id, name FROM stations ORDER BY id ASC")
rows = c.fetchall()
for r in rows:
station_id, station_name = r
print('%s: %s' % (station_id, station_name))
g = geocoder.google('%s Washington DC' % station_name)
c.execute("UPDATE stations SET lat = (%s), lng = (%s) WHERE id = (%s)",
(g.lat, g.lng, station_id))
conn.commit()
In [ ]:
%%sql
SELECT AVG(lat), MIN(lat), MAX(lat), STDDEV(lat) FROM stations LIMIT 1;
In [ ]:
%%sql
SELECT * FROM stations LIMIT 10;
In [ ]:
%%sql
SELECT COUNT(*) FROM stations WHERE lat IS NULL OR lng IS NULL;
Looks like it mostly worked. It's a start, at least.
In [ ]:
%%sql
ALTER TABLE rides
ADD COLUMN duration_min NUMERIC;
In [ ]:
%%sql
UPDATE rides
SET duration_min = ROUND(CAST(duration_ms AS NUMERIC) / (1000 * 60), 1);
In [ ]:
%%sql
SELECT duration_ms, duration_min FROM rides
LIMIT 5;
Another valuable pattern is to use date functions to extract particular time intervals, such as months or days. Every RDBMS has its own set of date functions, unfortunately you will likely just have to learn the ones used by the system in your environment.
Read more in the documentation for PostgreSQL date formatting.
In [ ]:
%%sql
SELECT EXTRACT(DAY FROM start_date), EXTRACT(MONTH FROM start_date), EXTRACT(YEAR FROM start_date)
FROM rides
LIMIT 10;
In data warehouse models and in statistical model feature engineering, it can be particularly useful to extract all kinds of parts of dates out into variables. You never know where you'll find significance.
This kind of extraction is quite common.
In [ ]:
%%sql
SELECT TO_CHAR(start_date, 'YYYY-MM-DD') AS day_of_year,
TO_CHAR(start_date, 'YYYY') AS year,
TO_CHAR(start_date, 'MM') AS month,
TO_CHAR(start_date, 'DD') AS day,
TO_CHAR(start_date, 'Day') AS day_of_week_str,
TO_CHAR(start_date, 'D') AS day_of_week,
CASE WHEN CAST(TO_CHAR(start_date, 'D') AS INTEGER) >= 6
THEN 1
ELSE 0
END AS is_weekend,
CASE WHEN CAST(TO_CHAR(start_date, 'D') AS INTEGER) < 6
THEN 1
ELSE 0
END AS is_weekday,
TO_CHAR(start_date, 'HH24') AS hour_24,
TO_CHAR(start_date, 'Q') AS quarter
FROM rides
LIMIT 10;
In [ ]:
%%sql
DROP TABLE IF EXISTS days;
CREATE TABLE days (
id SERIAL,
day_of_year CHAR(10),
year INTEGER,
month INTEGER,
day INTEGER,
day_of_week_str CHAR(9),
day_of_week INTEGER,
is_weekend BOOLEAN,
is_weekday BOOLEAN,
hour_24 INTEGER,
quarter INTEGER
);
In [ ]:
%%sql
INSERT INTO days (day_of_year, year, month, day, day_of_week_str, day_of_week,
is_weekend, is_weekday, hour_24, quarter)
SELECT DISTINCT TO_CHAR(start_date, 'YYYY-MM-DD') AS day_of_year,
CAST(TO_CHAR(start_date, 'YYYY') AS INTEGER) AS year,
CAST(TO_CHAR(start_date, 'MM') AS INTEGER) AS month,
CAST(TO_CHAR(start_date, 'DD') AS INTEGER) AS day,
TO_CHAR(start_date, 'Day') AS day_of_week_str,
CAST(TO_CHAR(start_date, 'D') AS INTEGER) AS day_of_week,
CASE WHEN CAST(TO_CHAR(start_date, 'D') AS INTEGER) IN (1, 7)
THEN TRUE
ELSE FALSE
END AS is_weekend,
CASE WHEN CAST(TO_CHAR(start_date, 'D') AS INTEGER) NOT IN (1, 7)
THEN TRUE
ELSE FALSE
END AS is_weekday,
CAST(TO_CHAR(start_date, 'HH24') AS INTEGER) AS hour_24,
CAST(TO_CHAR(start_date, 'Q') AS INTEGER) AS quarter
FROM rides;
In [ ]:
%%sql
SELECT * FROM days
LIMIT 10;
And let's make sure we got that weekend bit right:
In [ ]:
%%sql
SELECT DISTINCT day_of_week_str, day_of_week, is_weekend, is_weekday FROM days;
An interesting dimension to the bikeshare history is weather - I know I don't like to ride in the rain. I'm probably not the only one.
Weather Underground offers access to weather history data at links like https://www.wunderground.com/history/airport/KDCA/2016/1/18/DailyHistory.html?req_city=Washington&req_state=DC&req_statename=District+of+Columbia&reqdb.zip=20003&reqdb.magic=1&reqdb.wmo=99999. Note that at the bottom there is a link to a 'Comma Delimited File' at the bottom. It leads to https://www.wunderground.com/history/airport/KDCA/2016/1/18/DailyHistory.html?req_city=Washington&req_state=DC&req_statename=District+of+Columbia&reqdb.zip=20003&reqdb.magic=1&reqdb.wmo=99999&format=1
Mmm, CSV. We know what to do with CSV.
In [ ]:
from string import Template
import requests
In [ ]:
url_template = Template('https://www.wunderground.com/history/airport/KDCA/2016/1/$day/DailyHistory.html?req_city=Washington&req_state=DC&req_statename=District+of+Columbia&reqdb.zip=20003&reqdb.magic=1&reqdb.wmo=99999&format=1')
print(url_template.substitute(day=5))
In [ ]:
for d in range(1, 32):
r = requests.get(url_template.substitute(day=d))
open('weather-201601%02d.csv' % d, 'wb').write(r.content)
In [ ]:
!head weather-20160125.csv | csvlook
In [ ]:
!csvstack weather-2016*.csv > weather-2016q1.csv
In [ ]:
!wc weather*.csv