In [39]:
import boto3
import psycopg2
import botocore
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('white')
%matplotlib inline

In [84]:
AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']

In [85]:
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']

AWS (S3, Redshift, Kinesis) + Databricks Spark = Real-time Smart Meter Analytics

Create S3 Bucket


In [30]:
s3 = boto3.client('s3')

In [45]:
s3.list_buckets()


Out[45]:
{u'Buckets': [{u'CreationDate': datetime.datetime(2016, 1, 26, 1, 47, 19, tzinfo=tzutc()),
   u'Name': 'doug62890'},
  {u'CreationDate': datetime.datetime(2015, 6, 5, 7, 3, 45, tzinfo=tzutc()),
   u'Name': 'elasticbeanstalk-us-west-2-473548050994'},
  {u'CreationDate': datetime.datetime(2016, 3, 7, 2, 45, 37, tzinfo=tzutc()),
   u'Name': 'pecanstreetresearch-2016'}],
 u'Owner': {u'DisplayName': 'dkelly628',
  u'ID': 'bda9f00a638e7c5c17498b033a6ce34b124be90c6ae542abc73a64f4d56f1913'},
 'ResponseMetadata': {'HTTPStatusCode': 200,
  'HostId': 'tvKe4ptiqACT0S0ZzK6+LpC36DZt2oLwSB/v8NHI7bs3smVkTQ6MkR+OQXspINn1',
  'RequestId': 'DFAFD1582C9475BF'}}

In [76]:
def create_s3_bucket(bucketname):
    """Quick method to create bucket with exception handling"""
    s3 = boto3.resource('s3')
    exists = True
    bucket = s3.Bucket(bucketname)
    try:
        s3.meta.client.head_bucket(Bucket=bucketname)
    except botocore.exceptions.ClientError as e:
        error_code = int(e.response['Error']['Code'])
        if error_code == 404:
            exists = False
    if exists:
        print 'Bucket {} already exists'.format(bucketname)
    else:
        s3.create_bucket(Bucket=bucketname, GrantFullControl='dkelly628')

In [77]:
create_s3_bucket('pecanstreetresearch-2016')


Bucket pecanstreetresearch-2016 already exists

Copy Postgres to S3 via Postgres dump to CSV and s3cmd upload


In [ ]:
# Note: Used s3cmd tools because awscli tools not working in conda env

In [89]:
# 14m rows or ~ 1.2 GB local unzipped; 10min write to CSV and another 10min to upload to S3
# !s3cmd put ~/Users/Doug/PecanStreet/electricity-03-06-2016.csv s3://pecanstreetresearch-2016/electricity-03-06-2016.csv

In [91]:
# 200k rows ~ 15 MB local unzipped; 30 sec write to CSV and 15 sec upload to S3
# !s3cmd put ~/Users/Doug/PecanStreet/weather-03-06-2016.csv s3://pecanstreetresearch-2016/weather-03-06-2016.csv

Amazon Redshift: NoSQL Columnar Data Warehouse

Quick data cleanup before ETL


In [93]:
# Quick geohashing before uploading to Redshift
weather_df = pd.read_csv('/Users/Doug/PecanStreet/weather_03-06-2016.csv')

In [143]:
weather_df.groupby(['latitude', 'longitude', 'city']).count()


Out[143]:
localhour temperature
latitude longitude city
30.292432 -97.699662 Austin 45336 45336
32.778033 -117.151885 San Diego 45384 45384
40.027278 -105.256111 Boulder 45384 45384

In [ ]:
weather_df['city'] = weather_df['Austin' if weather_df.latitude=30.292432 elif '']

In [105]:
weather_df['city'] = 'city'

In [116]:
weather_df.city.unique()


Out[116]:
array(['Austin', 'Boulder', 'San Diego'], dtype=object)

In [117]:
# weather_df['city'][weather_df.latitude==40.027278] = 'Boulder'

In [142]:
weather_df.to_csv('/Users/Doug/PecanStreet/weather-03-07-2016.csv', index=False)

In [120]:
metadata_df = pd.read_csv('/Users/Doug/PecanStreet/dataport-metadata.csv')

In [127]:
metadata_df = metadata_df[['dataid','city', 'state']]

In [137]:
metadata_df.to_csv('/Users/Doug/PecanStreet/metadata.csv', index=False)

In [ ]:
# !s3cmd put metadata.csv s3://pecanstreetresearch-2016/metadata/metadata.csv

In [129]:
redshift = boto3.client('redshift')

In [132]:
# redshift.describe_clusters()

In [ ]:
# psql -h pecanstreet.czxmxphrw2wv.us-east-1.redshift.amazonaws.com -U dkelly628 -d electricity -p 5439

create table electricity ( dataid integer not null, localhour timestamp not null distkey sortkey, use decimal(30,26), air1 decimal(30,26), furnace1 decimal(30,26), car1 decimal(30,26) );

create table weather ( localhour timestamp not null distkey sortkey, latitude decimal(30,26), longitude decimal(30,26), temperature decimal(30,26), city varchar(20) );

create table metadata ( dataid integer distkey sortkey, city varchar(20), state varchar(20) );


In [ ]:
# Complete
COPY electricity 
FROM 's3://pecanstreetresearch-2016/electricity/electricity-03-06-2016.csv'
CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY'
CSV
IGNOREHEADER 1
dateformat 'auto';

In [ ]:
# Complete

COPY weather
FROM 's3://pecanstreetresearch-2016/weather/weather-03-06-2016.csv'
CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY'
CSV
IGNOREHEADER 1
dateformat 'auto';

In [ ]:
# Complete

COPY metadata 
FROM 's3://pecanstreetresearch-2016/metadata/metadata.csv'
CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY'
CSV 
IGNOREHEADER 1;

In [ ]:
# Query for checking error log; invaluable

select query, substring(filename,22,25) as filename,line_number as line, 
substring(colname,0,12) as column, type, position as pos, substring(raw_line,0,30) as line_text,
substring(raw_field_value,0,15) as field_text, 
substring(err_reason,0,45) as reason
from stl_load_errors 
order by query desc
limit 10;

In [ ]:
# All table definitions are stored in pg_table_def table; different from Postgres

SELECT DISTINCT tablename
FROM pg_table_def
WHERE schemaname = 'public'
ORDER BY tablename;

In [ ]:
# Returns household, time, city, usage by hour, and temperature for all residents in Austin, TX

SELECT e.dataid, e.localhour, m.city, SUM(e.use), w.temperature
FROM electricity AS e
JOIN weather AS w ON e.localhour = w.localhour
JOIN metadata AS m ON e.dataid = m.dataid 
WHERE m.city = 'Austin'
GROUP BY e.dataid, e.localhour, m.city, w.temperature;

In [ ]:
# Returns number of participants by city, state

SELECT m.city, m.state, COUNT(e.dataid) AS participants
FROM electricity AS e
JOIN metadata AS m ON e.dataid = m.dataid
GROUP BY m.city, m.state;

In [29]:
# Setup connection to Pecan Street Dataport
try:
conn = psycopg2.connect("dbname='electricity' user='dkelly628' host='pecanstreet.czxmxphrw2wv.us-east-1.redshift.amazonaws.com' port='5439' password='password'")
except:
#     print "Error: Check there aren't any open connections in notebook or pgAdmin"

In [33]:
electricity_df = pd.read_sql("SELECT localhour, SUM(use) AS usage, SUM(air1) AS cooling, SUM(furnace1) AS heating, \
                             SUM(car1) AS electric_vehicle \
                             FROM electricity \
                             WHERE dataid = 7982 AND use > 0  \
                             AND localhour BETWEEN '2013-10-16 00:00:00'::timestamp AND \
                             '2016-02-26 08:00:00'::timestamp \
                             GROUP BY dataid, localhour \
                             ORDER BY localhour", conn)

In [41]:
electricity_df['localhour'] = electricity_df.localhour.apply(pd.to_datetime)

In [42]:
electricity_df.set_index('localhour', inplace=True)

In [43]:
electricity_df.fillna(value=0.0, inplace=True)

In [48]:
electricity_df[['usage','cooling']].plot(figsize=(18,9), title="Pecan Street Household 7982 Hourly Energy Consumption")
sns.despine();


Databricks Spark Analysis (see Databricks): Batch analytics on S3, Streaming using Amazon Kinesis Stream


In [ ]:

Create Amazon Kinesis Stream for writing streaming data to S3


In [4]:
kinesis = boto3.client('kinesis')

In [6]:
kinesis.create_stream(StreamName='PecanStreet', ShardCount=2)


Out[6]:
{'ResponseMetadata': {'HTTPStatusCode': 200,
  'RequestId': '781fb4dd-e57b-11e5-93df-217b85429719'}}

In [7]:
kinesis.list_streams()


Out[7]:
{u'HasMoreStreams': False,
 'ResponseMetadata': {'HTTPStatusCode': 200,
  'RequestId': '799b314f-e57b-11e5-93df-217b85429719'},
 u'StreamNames': [u'PecanStreet']}

In [9]:
firehose = boto3.client('firehose')

In [ ]:
# firehose.create_delivery_stream(DeliveryStreamName='pecanstreetfirehose', S3DestinationConfiguration={'RoleARN': '', 'BucketARN': 'pecanstreetresearch-2016'})

In [36]:
firehose.list_delivery_streams()


Out[36]:
{u'DeliveryStreamNames': [u'pecanstreet'],
 u'HasMoreDeliveryStreams': False,
 'ResponseMetadata': {'HTTPStatusCode': 200,
  'RequestId': 'b81a3d77-e580-11e5-8bde-6ffb1b8f5a35'}}

In [ ]:
def kinesis_write(stream, ):
    """Method that writes to kinesis stream"""
    kinesis = boto3.client('kinesis')
    kinesis.put(StreamName=stream, )

In [ ]:
def kinesis_read():
    """Method to read from kinesis stream"""