In order to run this and the following tutorials, it is necessary to have access to a MongoDB server running in the localhost port 27017. It is possible to change the host and port of the MongoDB server by modifying the configuration file hyperstream_config.json located in the folder of this notebook.
We also require all the dependencies listed in the HyperStream requirements, the installation instructions can be found in https://github.com/IRC-SPHERE/HyperStream
It is possible to start a MongoDB server, a Jupyter Notebook and a MQTT server by running the script start_tutorial.sh
In [1]:
%load_ext watermark
import sys
sys.path.append("../") # Add parent dir in the Path
from hyperstream import HyperStream
from hyperstream import StreamId
from hyperstream import TimeInterval
from pytz import UTC
from datetime import datetime, timedelta
from utils import plot_high_chart
from utils import unix_time_miliseconds
%watermark -v -m -p hyperstream -g
In [2]:
hs = HyperStream(loglevel=0)
print hs
HyperStream comes with a set of predefined tools in hyperstream.tools. These tools can be used to define the nodes of a factor graph that will produce values or compute certain functions given the specified input nodes. For this tutorial, we will focus on the clock tool. This tool produces time ticks given an initial time and a stride in seconds for each following tick. In this case we will generate one tick every hour.
In [3]:
clock_tool = hs.tools.clock(stride=1.0*60*60)
In [4]:
M = hs.channel_manager.memory
# M = hs.channel_manager.mongo # To store the results in a MongoDB database
clock_stream = M.get_or_create_stream(stream_id=StreamId(name="clock_stream"))
In [5]:
yesterday_start = (datetime.utcnow() - timedelta(days=1)).replace(tzinfo=UTC)
yesterday_end = (yesterday_start + timedelta(hours=10)).replace(tzinfo=UTC)
ti = TimeInterval(yesterday_start, yesterday_end)
In [6]:
clock_tool.execute(sources=[], sink=clock_stream, interval=ti, alignment_stream=None)
print clock_stream.calculated_intervals
In [7]:
for timestamp, value in clock_stream.window().items():
print '[%s]: %s' % (timestamp, value)
In [8]:
ti = TimeInterval(yesterday_start, (yesterday_start + timedelta(hours=5)).replace(tzinfo=UTC))
for timestamp, value in clock_stream.window(ti).items():
print '[%s]: %s' % (timestamp, value)
It is possible to execute the tool again specifying a different interval. Because the interval has never been computed in the specified Stream, the tool will compute the new data. If the interval was already computed the tool would not do any computation. As an example, we will execute the tool for the past 5 hours.
In [9]:
today_end = datetime.utcnow().replace(tzinfo=UTC)
today_start = (today_end - timedelta(hours=5)).replace(tzinfo=UTC)
ti = TimeInterval(today_start, today_end)
clock_tool.execute(sources=[], sink=clock_stream, interval=ti, alignment_stream=None)
print clock_stream.calculated_intervals
Now, we can see that we got two different intervals of time in the full Stream
In [10]:
ti = TimeInterval(yesterday_start, today_end)
for timestamp, value in clock_stream.window(ti).items():
print '[%s]: %s' % (timestamp, value)
For this and the following tutorials we will use the JavaScript library highcharts. We have created a Python function called plot_high_chart for one time-serie or line and plot_multiple_stock for multiple time-series.
Then, we can visualize the two intervals where the tool has been executed.
Be aware that there is a missing time interval in the graph and the highchart jumps from one point to the next one ignoring a linear time.
In [11]:
my_time, my_data = zip(*[(key.__str__(), unix_time_miliseconds(value)) for key, value in clock_stream.window(ti).items()])
plot_high_chart(my_time, my_data, yax='miliseconds', title='Time from epoch', type='high_stock')
In [ ]: