Building a Real-Time Analytics Service with InfluxDB

Jeremiah Malina

Slides and code can be found at http://github.com/jjmalina/pygotham-2015

Background

  • I'm a data engineer at

Our Product

  • Allows brands such as Samsung, HP, Seagate, etc. to plug their existing chat system (LivePerson, Olark, etc.) into the ChatID platform via XMPP
  • So that customers on retailers like Walmart.com, Sears.com, Newegg.com can chat with the folks at these brands
  • We collect a lot of data to measure the impact that our product has on ecommerce metrics like conversion rate, average order value, customer satisfaction

Overview of this talk

1 - Real-Time Analytics

  • Exploring a problem
  • Paradigms
  • Tools

2 - Dive into InfluxDB

  • Features
  • Modeling data
  • Issues to watch out for

3 - Building our Real-Time Analytics Service

  • live coding :)

Real-Time Analytics

If you manage a website you might find yourself asking

How many came users to my site from referral campaign Z in the last X minutes/hours/days/months

Let's assume that we don't have Google Analytics or a similar tool

Some approaches to tackling this problem

1 - Write down every visit to your site with the referral campaign ID into a database, and then run a query to get a metric

2 - Feed the visit events as a stream into a process that computes the metrics in real-time

Problems with #1

  • You have to scale your database to handle the write throughput
    • i.e. disk, memory, CPU
  • You need to index the referral campaign ID and probably other fields, which slows write throughput

Problems with #2

  • You need a counter for every combination of fields and time periods you want to index by (i.e. campaign)
  • Parallelizing the counting is a tricky problem
  • You need to persist the counter state somewhere
  • You need to build an API to fetch your counts and perhaps aggregate them

Lambda Architecture

Invented by Nathan Marz, the author of Storm

The LA aims to satisfy the needs for a robust system that is fault-tolerant, both against hardware failures and human mistakes, being able to serve a wide range of workloads and use cases, and in which low-latency reads and updates are required. The resulting system should be linearly scalable, and it should scale out rather than up.

http://lambda-architecture.net/

1 - All data entering the system is dispatched to both the batch layer and the speed layer for processing.

2 - The batch layer has two functions:

  • managing the master dataset (an immutable, append-only set of raw data),
  • to pre-compute the batch views.

3 - The serving layer indexes the batch views so that they can be queried in low-latency, ad-hoc way.

4 - The speed layer compensates for the high latency of updates to the serving layer and deals with recent data only.

5 - Any incoming query can be answered by merging results from batch views and real-time view

That's a lot of components

There's this idea that real-time layer is more prone to failure

Reasons

  • The computations happen in memory
  • Anything that leads to a crash will erase the state of the real-time system
    • Bad deploy
    • Out of memory
  • Delayed upstream data will give you inaccurate metrics

The batch layer is there to correct this

Since the realtime layer only compensates for the last few hours of data, everything the realtime layer computes is eventually overridden by the batch layer. So if you make a mistake or something goes wrong in the realtime layer, the batch layer will correct it.

-- Nathan Marz, Big Data

But component #1 of the Lambda Architecture doesn't really get addressed

Turn this

Into this

Lambda Architecture Stack

1. Unified Log

  • Apache Kafka

2. Batch Layer

  • Hadoop for storage
  • MapReduce or Spark for computation

3. Serving layer

  • Cassandra, or any other KV, SQL-like datastore

4. Real-time layer

  • Apache Storm or Spark Streaming

Unfortunately all run on the JVM and Python support isn't the best

Real-Time Computation Systems

Apache Storm

Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing.

https://storm.apache.org

There are just three abstractions in Storm: spouts, bolts, and topologies.

A spout is a source of streams in a computation.

A bolt processes any number of input streams and produces any number of new output streams.

A topology is a network of spouts and bolts, with each edge in the network representing a bolt subscribing to the output stream of some other spout or bolt.


In [ ]:
from datetime import datetime
from collections import Counter

from streamparse.bolt import BatchingBolt


class VistCounterBolt(BatchingBolt):
    def initialize(self, conf, ctx):
        self.counts = Counter()

    def group_key(self, tup):
        """Groups user visit tuples by campaign and minute"""
        timestamp, campaign = tup.values[1].split('\t')
        hour = datetime.utcfromtimestamp(float(timestamp)).strftime("%Y-%m-%dT%H:%M")
        return (hour, campaign)

    def process_batch(self, key, tups):
        self.counts[key] += len(tups)
        
        # some downstream bolt would persist our counts
        self.emit([key, self.counts[key]])

Storm Summary

The Good

  • Simple but flexible programming model
    • Thank you Parse.ly!
  • Scalable and fault tolerant
  • Supports many inputs
  • Very stable and mature

The Bad

  • No built in libraries or APIs for analytical computation
    • i.e. grouping, counting, distinct counting
  • Still need another service to store your metrics
  • In Streamparse you have to set up topologies with Clojure
  • "Easy to deploy" despite needing a ZooKeeper cluster
  • "Productionizing Storm is Difficult"
    • tl;dr; wanted to consume from a different Kafka node, had to restart the whole topology, there is no graceful transition

Spark Streaming

Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.

http://spark.apache.org/streaming/

Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.


In [ ]:
from datetime import datetime
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Assume we have our visit events coming in from a socket
events = ssc.socketTextStream("localhost", 1337)

def process_event(data):
    ts, campaign = data.split('\t')
    minute = datetime.utcfromtimetamp(float(ts)).strftime("%Y-%m-%dT%H:%M")
    return minute + campaign, 1

def persist_count(key, count):
    """Take an hourly count and persist it
    """
    print(key, count)

    
events \
    # parse the data from the socket
    .map(process_event) \ 
    # group events by their key every 60 seconds in a window
    .groupByKeyAndWindow(windowDuration=60, slideDuration=60) \
    .foreachRDD(
        # count the number of events per key, then persist them
        lambda rdd: rdd.countByValue().map(persist_count)  
    )

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

The Good

  • Python works out of the box
  • Programming model is more simple than Storm
    • no topology, just chain together your functions
  • Built-in APIs for analytical computations, i.e count, group by, reduce
  • Fault tolerant and scalable

The Bad

  • Python doesn't have access to all APIs
    • Only sockets, files, or Kafka are supported inputs
  • The pyspark library isn't available as a package so you have to separate it from code you want to test outside Spark
  • Data is processed in a batch interval, so there's more latency than Storm
  • Deploying is more simple than Storm but still might require ZooKeeper, or you can use YARN or Apache Mesos

Back to our problem

  1. Write down every visit to your site with the referral campaign ID into a database, and then run a query to get a metric
  2. Feed the visit events as a stream into a process that computes the metrics in real-time

Option 1 seems more simple

Let's say you just want real-time metrics for the past 24 hours

Why not just go for PostgreSQL and intelligently index?

  • Shard data into tables by day and periodically drop the old tables

InfluxDB actually works like this!

InfluxDB

InfluxDB is a time series, metrics, and analytics database. It’s written in Go and has no external dependencies. That means once you install it there’s nothing else to manage (such as Redis, ZooKeeper, Cassandra, HBase, or anything else).

https://influxdb.com/docs/v0.9/introduction/overview.html

Installation is simple

On a Mac:

$ brew update && brew install influxdb
$ influxd

On Ubuntu or Debian Linux:

$ wget http://influxdb.s3.amazonaws.com/influxdb_0.9.2_amd64.deb
$ sudo dpkg -i influxdb_0.9.2_amd64.deb
$ sudo /etc/init.d/influxdb start

Writing data

curl -G http://localhost:8086/query --data-urlencode "q=CREATE DATABASE realtime_analytics"
curl -i -XPOST 'http://localhost:8086/write?db=realtime_analytics' --data-binary 'visits,url="/index.html",campaign_id="12345 user_id="7b5a96d3-a9d8-469a-91aa-a95f869d350c" 1439258675695870000'

Line Protocol

<measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [unix-nano-timestamp]

Measurements are like SQL tables
Tags are indexes on data points
Fields are like columns, which aren't indexed
Timestamps can be arbitrary precision but the default is nanoseconds

Series and Tags

A series is defined as a combination of a measurement and set of tag key-values. Combined with fields (columns) and the fields’ values, these make up series data.

Let's say you write visits,url="/index.html",campaign_id="12345 value=0.64 1422568543702900257

Then you'll end up with a four series

  1. visits
  2. vists + url="/index.html"
  3. visits + campaign_id="12345"
  4. visits + url="/index.html" + campaign_id="12345"

Each with a data point where user_id="7b5a96d3-a9d8-469a-91aa-a95f869d350c" at time 1422568543702900257

Be aware of the cardinality of your series

InfluxDB series are stored in memory by the cluster

Querying

SELECT count(distinct(user_id)) from visits where url='/index.html' and campaign_id='12345';

Since visits + url="/index.html" + campaign_id="12345" is its own series this query doesn't have to do a scan to find these values.

Summarize all visits under a campaign for all URLs: SELECT count(distinct(user_id)) from visits where campaign_id='12345';

How do we handle an influx of data?

Batch your events into fewer HTTP requests

Retention Policies

ALTER RETENTION POLICY default ON realtime_analytics DURATION 1d DEFAULT;
  • Now InfluxDB will by default automatically drop data in database realtime_analytics older than two days
  • Data is removed efficiently because it is sharded by time
    • So a delete is not equivalent to each write

Wait but I still want my historical metrics!

Continuous Queries

1 - Create a new retention policy:

CREATE RETENTION POLICY historical_metrics ON realtime_analytics DURATION INF REPLICATION 1;

2 - Add a continuous query

CREATE CONTINUOUS QUERY historical_metrics_count_impressions_1d ON realtime_analytics BEGIN
  SELECT COUNT(DISTINCT(user_id)) INTO realtime_analytics.historical_metrics.impressions_count_1d FROM visits GROUP BY time(1d), *
END

Now InfluxDB will run a query once a day to write into a downsamples series of unique visitor counts.

We have a mini Lambda Architecture!

Caveats of Continuous Queries and Retention Policies

  1. The docs aren't the clearest here
  2. There is a better explanation on the mailing list
  3. You can't backfill historical data yet. See issue 211
  4. If you need to window your daily counts by different timezones then it's better to sample your metrics by hour
    • but that breaks the accuracy of distinct counts

InfluxDB Summary

The Good

  • Easy to deploy
  • Simple HTTP API
  • Writes are fast and can be batched
  • Data is indexed by tags and time
  • Measurement and tag discovery API which is great for dashboards
  • Efficient deletes, retention policies and downsampling continuous queries
  • SQL
  • Visualization with Grafana or Chronograf

The Bad

  • Some issues with queries, i.e. can't have expressions inside aggregation functions
  • Continuous queries and retention policies have some issues and limitations
    • expensive CQs can cause writes to time out
  • Clustering has limitations (3 node max currently)
  • Time series data is a hard problem for distributed systems
    • From version 0.8.x to 0.9.x InfluxDB has basically been rewritten and uses a different storage engine

Our Real-Time Analytics Service

A fun example is the Coinbase order & trades feed

https://docs.exchange.coinbase.com/?python#websocket-feed

Consuming it


In [ ]:
import asyncio
import json
import websockets

COINBASE_FEED_URL = "wss://ws-feed.exchange.coinbase.com"


@asyncio.coroutine
def coinbase_feed():
    websocket = yield from websockets.connect(COINBASE_FEED_URL)
    yield from websocket.send(json.dumps({
        "type": "subscribe",
        "product_id": "BTC-USD"
    }))
    while True:
        message = yield from websocket.recv()
        print(message)


def main():
    asyncio.get_event_loop().run_until_complete(coinbase_feed())


if __name__ == '__main__':
    main()

There are two types of orders

limit: have a price and size, and are filled at the price specified or better

market: have funds, and or size, and are fulfilled immediately

Both types have a side which is either buy or sell

Market orders appear to happen infrequently so we're going to ignore them for this project

Limit orders have types

  • received: the order has been received by Coinbase
  • open: the order is active in the order book
  • done: the order is no longer active, because it was either filled partially/fully or canceled
  • match: there was a match between a buy order and sell order which resulted in a trade

Received order

{
  "funds": null,
  "time": "2015-08-06T23:28:29.815115Z",
  "order_type": "limit",
  "order_id": "6b81fa22-bf1a-46e3-bed8-c989af7cbbd7",
  "size": "0.438",
  "product_id": "BTC-USD",
  "price": "279.91",
  "type": "received",
  "side": "sell",
  "sequence": 171951254
}

Open order

{
  "time": "2015-08-06T23:28:29.815246Z",
  "product_id": "BTC-USD",
  "remaining_size": "0.438",
  "order_id": "6b81fa22-bf1a-46e3-bed8-c989af7cbbd7",
  "price": "279.91",
  "type": "open",
  "side": "sell",
  "sequence": 171951255
}

Done order

{
   "order_type" : "limit",
   "sequence" : 171991560,
   "time" : "2015-08-07T00:31:03.487784Z",
   "product_id" : "BTC-USD",
   "type" : "done",
   "reason" : "canceled",
   "side" : "buy",
   "remaining_size" : "8.29",
   "order_id" : "18d4b572-2d07-48b4-b086-d119d37585c8",
   "price" : "279.52"
}

Match orders

{
   "sequence" : 171988199,
   "size" : "0.1236",
   "side" : "sell",
   "maker_order_id" : "627708c9-ca29-4d14-b7b4-886ee282de52",
   "time" : "2015-08-07T00:22:25.317866Z",
   "trade_id" : 3101496,
   "type" : "match",
   "price" : "280.01",
   "taker_order_id" : "8edc7b0e-dbd7-4cfc-9b5d-65b26b136c89",
   "product_id" : "BTC-USD"
}
{
   "sequence" : 171988199,
   "price" : "280.01",
   "type" : "match",
   "taker_order_id" : "8edc7b0e-dbd7-4cfc-9b5d-65b26b136c89",
   "maker_order_id" : "627708c9-ca29-4d14-b7b4-886ee282de52",
   "trade_id" : 3101496,
   "time" : "2015-08-07T00:22:25.317866Z",
   "side" : "buy",
   "size" : "0.1236",
   "product_id" : "BTC-USD"
}

Real-time Coinbase metrics we're interested in

  • Average order buy/sell price
  • Average order buy/sell size
  • Average trade price
  • Average trade size
  • Volume: the amount of BTC that's been traded
  • Total $ traded
  • High buy/sell price
  • Low buy/sell price
  • Average spread - the difference between the average buy and average sell price
    • the smaller the spread the more demand for BTC

Modeling orders in InfluxDB

{
  "time": "2015-08-06T23:28:29.815246Z",
  "product_id": "BTC-USD",
  "remaining_size": "0.438",
  "order_id": "6b81fa22-bf1a-46e3-bed8-c989af7cbbd7",
  "price": "279.91",
  "type": "open",
  "side": "sell",
  "sequence": 171951255
}

Becomes:

orders,side="sell",type="open" price=279.91,size=0.438 1438903709000000000

Modeling trades in InfluxDB

{
   "sequence" : 171988199,
   "price" : "280.01",
   "type" : "match",
   "taker_order_id" : "8edc7b0e-dbd7-4cfc-9b5d-65b26b136c89",
   "maker_order_id" : "627708c9-ca29-4d14-b7b4-886ee282de52",
   "trade_id" : 3101496,
   "time" : "2015-08-07T00:22:25.317866Z",
   "side" : "sell",
   "size" : "0.1236",
   "product_id" : "BTC-USD"
}

Becomes:

trades,side="sell",type="uptick" price=280.01,size=0.1236,cost=34.609236 1438906945000000000

Since the side was "sell", we'll say the type is an uptick because the buyer met the seller's price.

OK let's write some data

Thank You

  • PyGotham
  • ChatID
  • InfluxDB team & contributors
  • Everyone on the next slide :)

Works Cited

Questions? Comments?

Ask away!