Jeremiah Malina
Slides and code can be found at http://github.com/jjmalina/pygotham-2015
1 - Real-Time Analytics
2 - Dive into InfluxDB
3 - Building our Real-Time Analytics Service
1 - Write down every visit to your site with the referral campaign ID into a database, and then run a query to get a metric
2 - Feed the visit events as a stream into a process that computes the metrics in real-time
Invented by Nathan Marz, the author of Storm
The LA aims to satisfy the needs for a robust system that is fault-tolerant, both against hardware failures and human mistakes, being able to serve a wide range of workloads and use cases, and in which low-latency reads and updates are required. The resulting system should be linearly scalable, and it should scale out rather than up.
1 - All data entering the system is dispatched to both the batch layer and the speed layer for processing.
2 - The batch layer has two functions:
3 - The serving layer indexes the batch views so that they can be queried in low-latency, ad-hoc way.
4 - The speed layer compensates for the high latency of updates to the serving layer and deals with recent data only.
5 - Any incoming query can be answered by merging results from batch views and real-time view
Since the realtime layer only compensates for the last few hours of data, everything the realtime layer computes is eventually overridden by the batch layer. So if you make a mistake or something goes wrong in the realtime layer, the batch layer will correct it.
-- Nathan Marz, Big Data
The Log: What every software engineer should know about real-time data's unifying abstraction
There are just three abstractions in Storm: spouts, bolts, and topologies.
A spout is a source of streams in a computation.
A bolt processes any number of input streams and produces any number of new output streams.
A topology is a network of spouts and bolts, with each edge in the network representing a bolt subscribing to the output stream of some other spout or bolt.
In [ ]:
from datetime import datetime
from collections import Counter
from streamparse.bolt import BatchingBolt
class VistCounterBolt(BatchingBolt):
def initialize(self, conf, ctx):
self.counts = Counter()
def group_key(self, tup):
"""Groups user visit tuples by campaign and minute"""
timestamp, campaign = tup.values[1].split('\t')
hour = datetime.utcfromtimestamp(float(timestamp)).strftime("%Y-%m-%dT%H:%M")
return (hour, campaign)
def process_batch(self, key, tups):
self.counts[key] += len(tups)
# some downstream bolt would persist our counts
self.emit([key, self.counts[key]])
Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs.
In [ ]:
from datetime import datetime
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
# Assume we have our visit events coming in from a socket
events = ssc.socketTextStream("localhost", 1337)
def process_event(data):
ts, campaign = data.split('\t')
minute = datetime.utcfromtimetamp(float(ts)).strftime("%Y-%m-%dT%H:%M")
return minute + campaign, 1
def persist_count(key, count):
"""Take an hourly count and persist it
"""
print(key, count)
events \
# parse the data from the socket
.map(process_event) \
# group events by their key every 60 seconds in a window
.groupByKeyAndWindow(windowDuration=60, slideDuration=60) \
.foreachRDD(
# count the number of events per key, then persist them
lambda rdd: rdd.countByValue().map(persist_count)
)
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
<measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>[,<field2-key>=<field2-value>...] [unix-nano-timestamp]
Measurements are like SQL tables
Tags are indexes on data points
Fields are like columns, which aren't indexed
Timestamps can be arbitrary precision but the default is nanoseconds
A series is defined as a combination of a measurement and set of tag key-values. Combined with fields (columns) and the fields’ values, these make up series data.
Let's say you write visits,url="/index.html",campaign_id="12345 value=0.64 1422568543702900257
Then you'll end up with a four series
Each with a data point where user_id="7b5a96d3-a9d8-469a-91aa-a95f869d350c" at time 1422568543702900257
SELECT count(distinct(user_id)) from visits where url='/index.html' and campaign_id='12345';
Since visits + url="/index.html" + campaign_id="12345" is its own series this query doesn't have to do a scan to find these values.
Summarize all visits under a campaign for all URLs: SELECT count(distinct(user_id)) from visits where campaign_id='12345';
1 - Create a new retention policy:
CREATE RETENTION POLICY historical_metrics ON realtime_analytics DURATION INF REPLICATION 1;
2 - Add a continuous query
CREATE CONTINUOUS QUERY historical_metrics_count_impressions_1d ON realtime_analytics BEGIN
SELECT COUNT(DISTINCT(user_id)) INTO realtime_analytics.historical_metrics.impressions_count_1d FROM visits GROUP BY time(1d), *
END
Now InfluxDB will run a query once a day to write into a downsamples series of unique visitor counts.
In [ ]:
import asyncio
import json
import websockets
COINBASE_FEED_URL = "wss://ws-feed.exchange.coinbase.com"
@asyncio.coroutine
def coinbase_feed():
websocket = yield from websockets.connect(COINBASE_FEED_URL)
yield from websocket.send(json.dumps({
"type": "subscribe",
"product_id": "BTC-USD"
}))
while True:
message = yield from websocket.recv()
print(message)
def main():
asyncio.get_event_loop().run_until_complete(coinbase_feed())
if __name__ == '__main__':
main()
limit: have a price and size, and are filled at the price specified or better
market: have funds, and or size, and are fulfilled immediately
{
"sequence" : 171988199,
"size" : "0.1236",
"side" : "sell",
"maker_order_id" : "627708c9-ca29-4d14-b7b4-886ee282de52",
"time" : "2015-08-07T00:22:25.317866Z",
"trade_id" : 3101496,
"type" : "match",
"price" : "280.01",
"taker_order_id" : "8edc7b0e-dbd7-4cfc-9b5d-65b26b136c89",
"product_id" : "BTC-USD"
}
{
"sequence" : 171988199,
"price" : "280.01",
"type" : "match",
"taker_order_id" : "8edc7b0e-dbd7-4cfc-9b5d-65b26b136c89",
"maker_order_id" : "627708c9-ca29-4d14-b7b4-886ee282de52",
"trade_id" : 3101496,
"time" : "2015-08-07T00:22:25.317866Z",
"side" : "buy",
"size" : "0.1236",
"product_id" : "BTC-USD"
}
{
"time": "2015-08-06T23:28:29.815246Z",
"product_id": "BTC-USD",
"remaining_size": "0.438",
"order_id": "6b81fa22-bf1a-46e3-bed8-c989af7cbbd7",
"price": "279.91",
"type": "open",
"side": "sell",
"sequence": 171951255
}
Becomes:
orders,side="sell",type="open" price=279.91,size=0.438 1438903709000000000
{
"sequence" : 171988199,
"price" : "280.01",
"type" : "match",
"taker_order_id" : "8edc7b0e-dbd7-4cfc-9b5d-65b26b136c89",
"maker_order_id" : "627708c9-ca29-4d14-b7b4-886ee282de52",
"trade_id" : 3101496,
"time" : "2015-08-07T00:22:25.317866Z",
"side" : "sell",
"size" : "0.1236",
"product_id" : "BTC-USD"
}
Becomes:
trades,side="sell",type="uptick" price=280.01,size=0.1236,cost=34.609236 1438906945000000000
Since the side was "sell", we'll say the type is an uptick because the buyer met the seller's price.