HyperStream has a set of predefined tools in hyperstream.tools. However, it is possible to define your own tools and factors. In this tutorial, we show how to create a simple plugin that reads a CSV file. In this tutorial, we already created the tool and made all the configurations necessary for it to work. We will describe how this one was created, and how can you create a new one.
First of all, we need to create a new folder to contain our new tool. The new folder needs to be in the folder plugins, in this example plugins/example/tools/csv_reader/. We need to create an __init__.py file in every subfolder to be able to treat all the folders as a Python package.
plugins/
|- __init__.py
|- one_plugin/
| |- __init__.py
| |- tools/
| |- __init__.py
| |- tool_a
| |- __init__.py
| |- 2017-06-20_v0.0.1.py
| |- 2017-06-22_v0.0.3.py
|- another_plugin/
|- __init__.py
|- tools/
|- tool_b/
| |- __init__.py
| |- 2017-06-20_v0.0.1.py
| |- 2017-06-22_v0.1.0.py
|- tool_c/
|- __init__.py
|- 2017-06-20_v0.0.2.py
Then, we need to create a new Python file following the name convention <year>-<month>-<day>_v<version>.<subversion>.<subsubversion>.py. In this example you can find the file with the following content in ./plugins/example/tools/csv_reader/2017-06-20_v0.0.1.py
from hyperstream import Tool, StreamInstance
from hyperstream.utils import check_input_stream_count
from dateutil.parser import parse
class CsvReader(Tool):
def __init__(self, filename):
super(CsvReader, self).__init__(filename=filename)
@check_input_stream_count(0)
def _execute(self, sources, alignment_stream, interval):
# Let's make the assumption that the first field is the timestamp
first = True
with open(self.filename, 'rU') as f:
for line in f.readlines():
if first:
first = False
continue
elements = line.split(',')
dt = parse(elements[0])
if dt in interval:
yield StreamInstance(dt, map(float, elements[1:]))
Now, it is necessary to add information about this plugin into the hyperstream_config.json. In particular, we need to add the following information in the plugin section:
Next, we have an example of an configuration file with the new plugin:
{
"mongo": {
"host": "localhost",
"port": 27017,
"tz_aware": true,
"db": "hyperstream"
},
"plugins": [{
"channel_id_prefix": "example",
"channel_names": [],
"path": "plugins/example",
"has_tools": true,
"has_assets": false
}],
"online_engine": {
"interval": {
"start": -60,
"end": -10
},
"sleep": 5,
"iterations": 100
}
}
Now we can write some HyperStream code that uses the new plugin.
In [1]:
%load_ext watermark
import sys
sys.path.append("../") # Add parent dir in the Path
from hyperstream import HyperStream
from hyperstream import TimeInterval
from hyperstream.utils import UTC
from datetime import datetime
from utils import plot_high_chart
from utils import plot_multiple_stock
%watermark -v -m -p hyperstream -g
hs = HyperStream(loglevel=20)
print hs
After instantiating HyperStream, if the configuration of the plugin and the plugin are in the right place, we will be able to load our new tool csv_reader, specifying where is the input file.
The data is the Polar Ice data that can be found in this link
In [2]:
reader_tool = hs.plugins.example.tools.csv_reader('data/sea_ice.csv')
In [3]:
sea_ice_stream = hs.channel_manager.memory.get_or_create_stream("sea_ice")
In [4]:
ti = TimeInterval(datetime(1990, 1, 1).replace(tzinfo=UTC), datetime(2011, 11, 1).replace(tzinfo=UTC))
reader_tool.execute(sources=[], sink=sea_ice_stream, interval=ti)
sea_ice_stream.calculated_intervals
Out[4]:
In [5]:
ti = TimeInterval(datetime(1995, 1, 1).replace(tzinfo=UTC), datetime(1996, 1, 1).replace(tzinfo=UTC))
for key, value in sea_ice_stream.window(ti).items():
print '[%s]: %s' % (key, value)
In [6]:
my_time, my_data = zip(*[(key.__str__(), value[1]) for key, value in sea_ice_stream.window().items()])
plot_high_chart(my_time, my_data, type="high_stock", title='Sea level in the Antarctica', yax='meters')
We can also visualize both of the Stream values, the Arctic and the Antarctica sea levels:
In [7]:
time = [key.__str__() for key, value in sea_ice_stream.window().items()]
data = [list(a) for a in zip(*[value for key, value in sea_ice_stream.window().items()])]
htype= 'spline'
names = ['Arctic', 'Antarctica']
plot_multiple_stock(data, time=time, names=names, htype=htype, title='Sea level', ylabel='meters')
In [8]:
time = [key.__str__() for key, value in sea_ice_stream.window(ti).items()]
data = [list(a) for a in zip(*[value for key, value in sea_ice_stream.window(ti).items()])]
htype= 'spline'
names = ['Arctic', 'Antarctica']
plot_multiple_stock(data, time=time, names=names, htype=htype, title='test multi-output', ylabel='meters')
In [ ]: