In [39]:
    
import boto3
import psycopg2
import botocore
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('white')
%matplotlib inline
    
In [84]:
    
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
    
In [85]:
    
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
    
Create S3 Bucket
In [30]:
    
s3 = boto3.client('s3')
    
In [45]:
    
s3.list_buckets()
    
    Out[45]:
In [76]:
    
def create_s3_bucket(bucketname):
    """Quick method to create bucket with exception handling"""
    s3 = boto3.resource('s3')
    exists = True
    bucket = s3.Bucket(bucketname)
    try:
        s3.meta.client.head_bucket(Bucket=bucketname)
    except botocore.exceptions.ClientError as e:
        error_code = int(e.response['Error']['Code'])
        if error_code == 404:
            exists = False
    if exists:
        print 'Bucket {} already exists'.format(bucketname)
    else:
        s3.create_bucket(Bucket=bucketname, GrantFullControl='dkelly628')
    
In [77]:
    
create_s3_bucket('pecanstreetresearch-2016')
    
    
Copy Postgres to S3 via Postgres dump to CSV and s3cmd upload
In [ ]:
    
# Note: Used s3cmd tools because awscli tools not working in conda env
    
In [89]:
    
# 14m rows or ~ 1.2 GB local unzipped; 10min write to CSV and another 10min to upload to S3
# !s3cmd put ~/Users/Doug/PecanStreet/electricity-03-06-2016.csv s3://pecanstreetresearch-2016/electricity-03-06-2016.csv
    
In [91]:
    
# 200k rows ~ 15 MB local unzipped; 30 sec write to CSV and 15 sec upload to S3
# !s3cmd put ~/Users/Doug/PecanStreet/weather-03-06-2016.csv s3://pecanstreetresearch-2016/weather-03-06-2016.csv
    
Quick data cleanup before ETL
In [93]:
    
# Quick geohashing before uploading to Redshift
weather_df = pd.read_csv('/Users/Doug/PecanStreet/weather_03-06-2016.csv')
    
In [143]:
    
weather_df.groupby(['latitude', 'longitude', 'city']).count()
    
    Out[143]:
In [ ]:
    
weather_df['city'] = weather_df['Austin' if weather_df.latitude=30.292432 elif '']
    
In [105]:
    
weather_df['city'] = 'city'
    
In [116]:
    
weather_df.city.unique()
    
    Out[116]:
In [117]:
    
# weather_df['city'][weather_df.latitude==40.027278] = 'Boulder'
    
In [142]:
    
weather_df.to_csv('/Users/Doug/PecanStreet/weather-03-07-2016.csv', index=False)
    
In [120]:
    
metadata_df = pd.read_csv('/Users/Doug/PecanStreet/dataport-metadata.csv')
    
In [127]:
    
metadata_df = metadata_df[['dataid','city', 'state']]
    
In [137]:
    
metadata_df.to_csv('/Users/Doug/PecanStreet/metadata.csv', index=False)
    
In [ ]:
    
# !s3cmd put metadata.csv s3://pecanstreetresearch-2016/metadata/metadata.csv
    
In [129]:
    
redshift = boto3.client('redshift')
    
In [132]:
    
# redshift.describe_clusters()
    
In [ ]:
    
# psql -h pecanstreet.czxmxphrw2wv.us-east-1.redshift.amazonaws.com -U dkelly628 -d electricity -p 5439
    
create table electricity ( dataid integer not null, localhour timestamp not null distkey sortkey, use decimal(30,26), air1 decimal(30,26), furnace1 decimal(30,26), car1 decimal(30,26) );
create table weather ( localhour timestamp not null distkey sortkey, latitude decimal(30,26), longitude decimal(30,26), temperature decimal(30,26), city varchar(20) );
create table metadata ( dataid integer distkey sortkey, city varchar(20), state varchar(20) );
In [ ]:
    
# Complete
COPY electricity 
FROM 's3://pecanstreetresearch-2016/electricity/electricity-03-06-2016.csv'
CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY'
CSV
IGNOREHEADER 1
dateformat 'auto';
    
In [ ]:
    
# Complete
COPY weather
FROM 's3://pecanstreetresearch-2016/weather/weather-03-06-2016.csv'
CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY'
CSV
IGNOREHEADER 1
dateformat 'auto';
    
In [ ]:
    
# Complete
COPY metadata 
FROM 's3://pecanstreetresearch-2016/metadata/metadata.csv'
CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY'
CSV 
IGNOREHEADER 1;
    
In [ ]:
    
# Query for checking error log; invaluable
select query, substring(filename,22,25) as filename,line_number as line, 
substring(colname,0,12) as column, type, position as pos, substring(raw_line,0,30) as line_text,
substring(raw_field_value,0,15) as field_text, 
substring(err_reason,0,45) as reason
from stl_load_errors 
order by query desc
limit 10;
    
In [ ]:
    
# All table definitions are stored in pg_table_def table; different from Postgres
SELECT DISTINCT tablename
FROM pg_table_def
WHERE schemaname = 'public'
ORDER BY tablename;
    
In [ ]:
    
# Returns household, time, city, usage by hour, and temperature for all residents in Austin, TX
SELECT e.dataid, e.localhour, m.city, SUM(e.use), w.temperature
FROM electricity AS e
JOIN weather AS w ON e.localhour = w.localhour
JOIN metadata AS m ON e.dataid = m.dataid 
WHERE m.city = 'Austin'
GROUP BY e.dataid, e.localhour, m.city, w.temperature;
    
In [ ]:
    
# Returns number of participants by city, state
SELECT m.city, m.state, COUNT(e.dataid) AS participants
FROM electricity AS e
JOIN metadata AS m ON e.dataid = m.dataid
GROUP BY m.city, m.state;
    
In [29]:
    
# Setup connection to Pecan Street Dataport
try:
conn = psycopg2.connect("dbname='electricity' user='dkelly628' host='pecanstreet.czxmxphrw2wv.us-east-1.redshift.amazonaws.com' port='5439' password='password'")
except:
#     print "Error: Check there aren't any open connections in notebook or pgAdmin"
    
In [33]:
    
electricity_df = pd.read_sql("SELECT localhour, SUM(use) AS usage, SUM(air1) AS cooling, SUM(furnace1) AS heating, \
                             SUM(car1) AS electric_vehicle \
                             FROM electricity \
                             WHERE dataid = 7982 AND use > 0  \
                             AND localhour BETWEEN '2013-10-16 00:00:00'::timestamp AND \
                             '2016-02-26 08:00:00'::timestamp \
                             GROUP BY dataid, localhour \
                             ORDER BY localhour", conn)
    
In [41]:
    
electricity_df['localhour'] = electricity_df.localhour.apply(pd.to_datetime)
    
In [42]:
    
electricity_df.set_index('localhour', inplace=True)
    
In [43]:
    
electricity_df.fillna(value=0.0, inplace=True)
    
In [48]:
    
electricity_df[['usage','cooling']].plot(figsize=(18,9), title="Pecan Street Household 7982 Hourly Energy Consumption")
sns.despine();
    
    
Databricks Spark Analysis (see Databricks): Batch analytics on S3, Streaming using Amazon Kinesis Stream
In [ ]:
    
    
Create Amazon Kinesis Stream for writing streaming data to S3
In [4]:
    
kinesis = boto3.client('kinesis')
    
In [6]:
    
kinesis.create_stream(StreamName='PecanStreet', ShardCount=2)
    
    Out[6]:
In [7]:
    
kinesis.list_streams()
    
    Out[7]:
In [9]:
    
firehose = boto3.client('firehose')
    
In [ ]:
    
# firehose.create_delivery_stream(DeliveryStreamName='pecanstreetfirehose', S3DestinationConfiguration={'RoleARN': '', 'BucketARN': 'pecanstreetresearch-2016'})
    
In [36]:
    
firehose.list_delivery_streams()
    
    Out[36]:
In [ ]:
    
def kinesis_write(stream, ):
    """Method that writes to kinesis stream"""
    kinesis = boto3.client('kinesis')
    kinesis.put(StreamName=stream, )
    
In [ ]:
    
def kinesis_read():
    """Method to read from kinesis stream"""