InfluxDB

First, get an access token and set it as the value of DEN_ACCESS_TOKEN in the environment before starting this IPython Notebook.


In [ ]:
import os

In [ ]:
access_token = os.environ["DEN_ACCESS_TOKEN"]

In [ ]:
API_PROTOCOL = "https"
API_LOCATION = "developer-api.nest.com"

In [ ]:
from urlparse import SplitResult, urlunsplit
from urllib import urlencode

In [ ]:
def get_api_url(path=""):
    query = urlencode({"auth": access_token})
    split = SplitResult(scheme=API_PROTOCOL, netloc=API_LOCATION, path=path, query=query, fragment="")
    return urlunsplit(split)

In [ ]:
import requests

Streaming

The Nest REST Streaming API makes it easy to get real time data without worrying about rate limiting.


In [ ]:
import json

In [ ]:
def get_stream(path=""):
    """Make a GET to the stream API and return the response object."""
    r = requests.get(get_api_url(path), headers={"Accept": "text/event-stream"}, stream=True)
    for h in r.history:
        logging.debug("[%d] Redirect: %s" % (h.status_code, h.url))
    logging.debug("[%d] URL: %s" % (r.status_code, r.url))
    r.raise_for_status()
    return r

In [ ]:
from contextlib import closing

Configure Logging


In [ ]:
import logging

Need to reload the logging module because IPython Notebook has already loaded it.


In [ ]:
reload(logging)

In [ ]:
logging.basicConfig(filename="den.log", level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s")

In [ ]:
logging.debug("Cool!")

Helper Functions

For processing Nest API REST streaming data:


In [ ]:
def _is_event(line):
    """Is the given line an event line?"""
    return line.startswith("event:")

In [ ]:
def _is_data(line):
    """Is the given line a data line?"""
    return line.startswith("data:")

In [ ]:
def _process_event(line):
    """Process the given event line."""
    _, event = line.split(":", 1)
    event = event.strip()
    logging.debug(event)
    return None if event == "keep-alive" else event

In [ ]:
def _process_data(line):
    """Process the given data line."""
    _, data_str = line.split(":", 1)
    return json.loads(data_str.strip())

In [ ]:
def _process(line):
    """Process the given line."""
    if _is_event(line):
        _process_event(line)
    return _process_data(line) if _is_data(line) else None

InfluxDB

Install and start the database application:

brew update
brew install influxdb
influxdb -config=/usr/local/etc/influxdb.conf

Install the InfluxDB package if necessary:

pip install influxdb

In [ ]:
from influxdb import client as influxdb

Connect to the test database with the default parameters:


In [ ]:
db = influxdb.InfluxDBClient(database="test")

An example query to test connection:


In [ ]:
#db.query("select * from /.*/ limit 1;")

Helper functions to translate Nest API datastructures into InfluxDB data structures:


In [ ]:
def _get_thermostats(data):
    """Get thermostat data from the given data dict."""
    thermostats = data["data"]["devices"]["thermostats"]
    return [thermostats[t] for t in thermostats]

In [ ]:
def _get_structures(data):
    """Get structure data from the given data dict."""
    structures = data["data"]["structures"]
    return [structures[s] for s in structures]

In [ ]:
def _get_thermostat_data(data):
    """Get thermostat data to write to InfluxDB."""
    name = "thermostats"    
    thermostats = _get_thermostats(data)
    columns = thermostats[0].keys()
    points = [t.values() for t in thermostats]
    return [{"name": name, "columns": columns, "points": points}]

I do not know how to support writing non-primitive data to InfluxDB. It may not even make sense. Since the thermostats attribute is a list it is removed from the structure data for now.


In [ ]:
def _get_structure_data(data):
    """Get structure data to write to InfluxDB."""
    name = "structures"
    structures = _get_structures(data)
    points = []
    for s in structures:
        try:
            del s["thermostats"]
        except KeyError:
            pass
        points.append(s.values())
    columns = structures[0].keys()
    return [{"name": name, "columns": columns, "points": points}]

Stream


In [ ]:
def stream():
    """Stream results from the API and store them in the database."""
    with closing(get_stream()) as stream:
        logging.debug("[%d] Streaming! %s" % (stream.status_code, stream.url))
        for l in stream.iter_lines():
            if l:
                value = _process(l)
                if value:
                    logging.info(value)
                    db.write_points(_get_structure_data(value))
                    db.write_points(_get_thermostat_data(value))
        logging.debug("[%d] No more lines!" % stream.status_code)
    logging.debug("Done streaming!")

In [ ]:
from requests.exceptions import HTTPError, StreamConsumedError, Timeout

In [ ]:
import sys

FOREVER!


In [ ]:
while True:
    try:
        stream()
    except KeyboardInterrupt:
        logging.warn("Keyboard interrupt.")  
        sys.exit()
    except StreamConsumedError as e:
        logging.warn("Stream consumed! %s" % e)
    except HTTPError as e:
        logging.error("HTTPError! %s" % e)
    except Timeout as e:
        logging.error("Timeout! %s" % e)
    except Exception as e:
        logging.critical("Unexpected error! %s" % e)