In [39]:
import boto3
import psycopg2
import botocore
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('white')
%matplotlib inline
In [84]:
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
In [85]:
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
Create S3 Bucket
In [30]:
s3 = boto3.client('s3')
In [45]:
s3.list_buckets()
Out[45]:
In [76]:
def create_s3_bucket(bucketname):
"""Quick method to create bucket with exception handling"""
s3 = boto3.resource('s3')
exists = True
bucket = s3.Bucket(bucketname)
try:
s3.meta.client.head_bucket(Bucket=bucketname)
except botocore.exceptions.ClientError as e:
error_code = int(e.response['Error']['Code'])
if error_code == 404:
exists = False
if exists:
print 'Bucket {} already exists'.format(bucketname)
else:
s3.create_bucket(Bucket=bucketname, GrantFullControl='dkelly628')
In [77]:
create_s3_bucket('pecanstreetresearch-2016')
Copy Postgres to S3 via Postgres dump to CSV and s3cmd upload
In [ ]:
# Note: Used s3cmd tools because awscli tools not working in conda env
In [89]:
# 14m rows or ~ 1.2 GB local unzipped; 10min write to CSV and another 10min to upload to S3
# !s3cmd put ~/Users/Doug/PecanStreet/electricity-03-06-2016.csv s3://pecanstreetresearch-2016/electricity-03-06-2016.csv
In [91]:
# 200k rows ~ 15 MB local unzipped; 30 sec write to CSV and 15 sec upload to S3
# !s3cmd put ~/Users/Doug/PecanStreet/weather-03-06-2016.csv s3://pecanstreetresearch-2016/weather-03-06-2016.csv
Quick data cleanup before ETL
In [93]:
# Quick geohashing before uploading to Redshift
weather_df = pd.read_csv('/Users/Doug/PecanStreet/weather_03-06-2016.csv')
In [143]:
weather_df.groupby(['latitude', 'longitude', 'city']).count()
Out[143]:
In [ ]:
weather_df['city'] = weather_df['Austin' if weather_df.latitude=30.292432 elif '']
In [105]:
weather_df['city'] = 'city'
In [116]:
weather_df.city.unique()
Out[116]:
In [117]:
# weather_df['city'][weather_df.latitude==40.027278] = 'Boulder'
In [142]:
weather_df.to_csv('/Users/Doug/PecanStreet/weather-03-07-2016.csv', index=False)
In [120]:
metadata_df = pd.read_csv('/Users/Doug/PecanStreet/dataport-metadata.csv')
In [127]:
metadata_df = metadata_df[['dataid','city', 'state']]
In [137]:
metadata_df.to_csv('/Users/Doug/PecanStreet/metadata.csv', index=False)
In [ ]:
# !s3cmd put metadata.csv s3://pecanstreetresearch-2016/metadata/metadata.csv
In [129]:
redshift = boto3.client('redshift')
In [132]:
# redshift.describe_clusters()
In [ ]:
# psql -h pecanstreet.czxmxphrw2wv.us-east-1.redshift.amazonaws.com -U dkelly628 -d electricity -p 5439
create table electricity ( dataid integer not null, localhour timestamp not null distkey sortkey, use decimal(30,26), air1 decimal(30,26), furnace1 decimal(30,26), car1 decimal(30,26) );
create table weather ( localhour timestamp not null distkey sortkey, latitude decimal(30,26), longitude decimal(30,26), temperature decimal(30,26), city varchar(20) );
create table metadata ( dataid integer distkey sortkey, city varchar(20), state varchar(20) );
In [ ]:
# Complete
COPY electricity
FROM 's3://pecanstreetresearch-2016/electricity/electricity-03-06-2016.csv'
CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY'
CSV
IGNOREHEADER 1
dateformat 'auto';
In [ ]:
# Complete
COPY weather
FROM 's3://pecanstreetresearch-2016/weather/weather-03-06-2016.csv'
CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY'
CSV
IGNOREHEADER 1
dateformat 'auto';
In [ ]:
# Complete
COPY metadata
FROM 's3://pecanstreetresearch-2016/metadata/metadata.csv'
CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY'
CSV
IGNOREHEADER 1;
In [ ]:
# Query for checking error log; invaluable
select query, substring(filename,22,25) as filename,line_number as line,
substring(colname,0,12) as column, type, position as pos, substring(raw_line,0,30) as line_text,
substring(raw_field_value,0,15) as field_text,
substring(err_reason,0,45) as reason
from stl_load_errors
order by query desc
limit 10;
In [ ]:
# All table definitions are stored in pg_table_def table; different from Postgres
SELECT DISTINCT tablename
FROM pg_table_def
WHERE schemaname = 'public'
ORDER BY tablename;
In [ ]:
# Returns household, time, city, usage by hour, and temperature for all residents in Austin, TX
SELECT e.dataid, e.localhour, m.city, SUM(e.use), w.temperature
FROM electricity AS e
JOIN weather AS w ON e.localhour = w.localhour
JOIN metadata AS m ON e.dataid = m.dataid
WHERE m.city = 'Austin'
GROUP BY e.dataid, e.localhour, m.city, w.temperature;
In [ ]:
# Returns number of participants by city, state
SELECT m.city, m.state, COUNT(e.dataid) AS participants
FROM electricity AS e
JOIN metadata AS m ON e.dataid = m.dataid
GROUP BY m.city, m.state;
In [29]:
# Setup connection to Pecan Street Dataport
try:
conn = psycopg2.connect("dbname='electricity' user='dkelly628' host='pecanstreet.czxmxphrw2wv.us-east-1.redshift.amazonaws.com' port='5439' password='password'")
except:
# print "Error: Check there aren't any open connections in notebook or pgAdmin"
In [33]:
electricity_df = pd.read_sql("SELECT localhour, SUM(use) AS usage, SUM(air1) AS cooling, SUM(furnace1) AS heating, \
SUM(car1) AS electric_vehicle \
FROM electricity \
WHERE dataid = 7982 AND use > 0 \
AND localhour BETWEEN '2013-10-16 00:00:00'::timestamp AND \
'2016-02-26 08:00:00'::timestamp \
GROUP BY dataid, localhour \
ORDER BY localhour", conn)
In [41]:
electricity_df['localhour'] = electricity_df.localhour.apply(pd.to_datetime)
In [42]:
electricity_df.set_index('localhour', inplace=True)
In [43]:
electricity_df.fillna(value=0.0, inplace=True)
In [48]:
electricity_df[['usage','cooling']].plot(figsize=(18,9), title="Pecan Street Household 7982 Hourly Energy Consumption")
sns.despine();
Databricks Spark Analysis (see Databricks): Batch analytics on S3, Streaming using Amazon Kinesis Stream
In [ ]:
Create Amazon Kinesis Stream for writing streaming data to S3
In [4]:
kinesis = boto3.client('kinesis')
In [6]:
kinesis.create_stream(StreamName='PecanStreet', ShardCount=2)
Out[6]:
In [7]:
kinesis.list_streams()
Out[7]:
In [9]:
firehose = boto3.client('firehose')
In [ ]:
# firehose.create_delivery_stream(DeliveryStreamName='pecanstreetfirehose', S3DestinationConfiguration={'RoleARN': '', 'BucketARN': 'pecanstreetresearch-2016'})
In [36]:
firehose.list_delivery_streams()
Out[36]:
In [ ]:
def kinesis_write(stream, ):
"""Method that writes to kinesis stream"""
kinesis = boto3.client('kinesis')
kinesis.put(StreamName=stream, )
In [ ]:
def kinesis_read():
"""Method to read from kinesis stream"""