First, get an access token and set it as the value of DEN_ACCESS_TOKEN
in the environment before starting this IPython Notebook.
In [ ]:
import os
In [ ]:
access_token = os.environ["DEN_ACCESS_TOKEN"]
In [ ]:
API_PROTOCOL = "https"
API_LOCATION = "developer-api.nest.com"
In [ ]:
from urlparse import SplitResult, urlunsplit
from urllib import urlencode
In [ ]:
def get_api_url(path=""):
query = urlencode({"auth": access_token})
split = SplitResult(scheme=API_PROTOCOL, netloc=API_LOCATION, path=path, query=query, fragment="")
return urlunsplit(split)
In [ ]:
import requests
The Nest REST Streaming API makes it easy to get real time data without worrying about rate limiting.
In [ ]:
import json
In [ ]:
def get_stream(path=""):
"""Make a GET to the stream API and return the response object."""
r = requests.get(get_api_url(path), headers={"Accept": "text/event-stream"}, stream=True)
for h in r.history:
logging.debug("[%d] Redirect: %s" % (h.status_code, h.url))
logging.debug("[%d] URL: %s" % (r.status_code, r.url))
r.raise_for_status()
return r
In [ ]:
from contextlib import closing
In [ ]:
import logging
Need to reload
the logging
module because IPython Notebook has already loaded it.
In [ ]:
reload(logging)
In [ ]:
logging.basicConfig(filename="den.log", level=logging.DEBUG, format="%(asctime)s %(levelname)s %(message)s")
In [ ]:
logging.debug("Cool!")
In [ ]:
def _is_event(line):
"""Is the given line an event line?"""
return line.startswith("event:")
In [ ]:
def _is_data(line):
"""Is the given line a data line?"""
return line.startswith("data:")
In [ ]:
def _process_event(line):
"""Process the given event line."""
_, event = line.split(":", 1)
event = event.strip()
logging.debug(event)
return None if event == "keep-alive" else event
In [ ]:
def _process_data(line):
"""Process the given data line."""
_, data_str = line.split(":", 1)
return json.loads(data_str.strip())
In [ ]:
def _process(line):
"""Process the given line."""
if _is_event(line):
_process_event(line)
return _process_data(line) if _is_data(line) else None
brew update
brew install influxdb
influxdb -config=/usr/local/etc/influxdb.conf
Install the InfluxDB package if necessary:
pip install influxdb
In [ ]:
from influxdb import client as influxdb
Connect to the test
database with the default parameters:
In [ ]:
db = influxdb.InfluxDBClient(database="test")
An example query to test connection:
In [ ]:
#db.query("select * from /.*/ limit 1;")
Helper functions to translate Nest API datastructures into InfluxDB data structures:
In [ ]:
def _get_thermostats(data):
"""Get thermostat data from the given data dict."""
thermostats = data["data"]["devices"]["thermostats"]
return [thermostats[t] for t in thermostats]
In [ ]:
def _get_structures(data):
"""Get structure data from the given data dict."""
structures = data["data"]["structures"]
return [structures[s] for s in structures]
In [ ]:
def _get_thermostat_data(data):
"""Get thermostat data to write to InfluxDB."""
name = "thermostats"
thermostats = _get_thermostats(data)
columns = thermostats[0].keys()
points = [t.values() for t in thermostats]
return [{"name": name, "columns": columns, "points": points}]
I do not know how to support writing non-primitive data to InfluxDB. It may not even make sense. Since the thermostats
attribute is a list
it is removed from the structure data for now.
In [ ]:
def _get_structure_data(data):
"""Get structure data to write to InfluxDB."""
name = "structures"
structures = _get_structures(data)
points = []
for s in structures:
try:
del s["thermostats"]
except KeyError:
pass
points.append(s.values())
columns = structures[0].keys()
return [{"name": name, "columns": columns, "points": points}]
In [ ]:
def stream():
"""Stream results from the API and store them in the database."""
with closing(get_stream()) as stream:
logging.debug("[%d] Streaming! %s" % (stream.status_code, stream.url))
for l in stream.iter_lines():
if l:
value = _process(l)
if value:
logging.info(value)
db.write_points(_get_structure_data(value))
db.write_points(_get_thermostat_data(value))
logging.debug("[%d] No more lines!" % stream.status_code)
logging.debug("Done streaming!")
In [ ]:
from requests.exceptions import HTTPError, StreamConsumedError, Timeout
In [ ]:
import sys
In [ ]:
while True:
try:
stream()
except KeyboardInterrupt:
logging.warn("Keyboard interrupt.")
sys.exit()
except StreamConsumedError as e:
logging.warn("Stream consumed! %s" % e)
except HTTPError as e:
logging.error("HTTPError! %s" % e)
except Timeout as e:
logging.error("Timeout! %s" % e)
except Exception as e:
logging.critical("Unexpected error! %s" % e)