Workflows define a graph of streams. Usually, the first stream will be a special "raw" stream that pulls in data from a custom data source. Workflows can have multiple time ranges, which will cause the streams to be computed on all of the ranges given.
In this tutorial, we will be ussing a time-series dataset about the temperature in different countries and cities. The dataset is available at The Census at School New Zeland. The necessary files for this tutorial are already included in the folder data/TimeSeriesDatasets_130207.
In particular, there are four files with the minimum and maximum temperatures in different cities of Asia, Australia, NZ and USA from 2000 to 2012. And the rainfall levels of New Zeland.
In [1]:
try:
%load_ext watermark
watermark = True
except ImportError:
watermark = False
pass
import sys
sys.path.append("../") # Add parent dir in the Path
from hyperstream import HyperStream
from hyperstream import TimeInterval
from hyperstream.utils import UTC
from hyperstream import Workflow
import hyperstream
from datetime import datetime
from utils import plot_high_chart
from utils import plot_multiple_stock
from dateutil.parser import parse
if watermark:
%watermark -v -m -p hyperstream -g
hs = HyperStream(loglevel=30)
M = hs.channel_manager.memory
print(hs)
print([p.channel_id_prefix for p in hs.config.plugins])
In the data folder there are four csv files with the names TempAsia.csv, TempAustralia.csv, TempNZ.csv and TempUSA.csv. The first column of each csv file contains a header with the names of the columns. The first one being the date and the following are the minimum and maximum temperature in different cities with the format cityMin and cityMax.
Here is an example of the first 5 rows of the TempAsia.csv file:
Date,TokyoMax,TokyoMin,BangkokMax,BangkokMin
2000M01,11.2,4.2,32.8,24
The format of the date has the form YYYYMmm where YYYY is the year and mm is the month. Because this format is not recognized by the default parser of the csv_reader tool, we will need to specify our own parser that first replaces the M by an hyphen - and then applies the dateutils.parser.
Then, we will use a tool to read each csv, and a Stream to store all the results of applying the tool. When we specify to the tool that there is a header row in the csv file, the value of each Stream instance will be a dictionary with the name of the column and its corresponding value. For example, a Stream instance with the 4 cities shown above will look like:
[2000-01-19 00:00:00+00:00]: {'BangkokMin': 24.0, 'BangkokMax': 32.8, 'TokyoMin': 4.2, 'TokyoMax': 11.2}
In [2]:
def dateparser(dt):
return parse(dt.replace('M', '-')).replace(tzinfo=UTC)
Once the csv_reader has created the instances in the country plate, we will modify the dictionaries applying a function split_temperatures to each instance and storing the results in a new stream temp_data.
The function will create a dictionary with the city names and their minimum and maximum temperature. The following example shows the previous stream after applyting this function
[2000-01-19 00:00:00+00:00]: {'Bangkok': {'min': 24.0, 'max': 32.8}, 'Tokyo': {'min': 4.2, 'max': 11.2}}
In [3]:
def split_temperatures(d):
"""
Parameters
----------
d: dictionary of the following form:
{'BangkokMin': 24.0, 'BangkokMax': 32.8, 'TokyoMin': 4.2, 'TokyoMax': 11.2}
Returns
-------
dictionary of the following form
{'Bangkok': {'min': 24.0, 'max': 32.8}, 'Tokyo': {'min': 4.2, 'max': 11.2}}
"""
new_d = {}
for name, value in d.iteritems():
key = name[-3:].lower()
name = name[:-3]
if name not in new_d:
new_d[name] = {}
new_d[name][key] = value
return new_d
Then, we will use a splitter_from_stream tool that will be applied to every country and store the values of the temp_dat stream into the corresponding city nodes. The new city nodes will contain a dictionary with minimum and maximum values, in the form:
[2000-01-19 00:00:00+00:00]: {'min': 24.0, 'max': 32.8}
Then, we will apply the function dict_mean that will compute the mean of all the values in the dictionary and that we will store in the streams city_avg_temp.
[2000-01-19 00:00:00+00:00]: 28.4
In [4]:
def dict_mean(d):
x = d.values()
x = [value for value in x if value is not None]
return float(sum(x)) / max(len(x), 1)
In [5]:
countries_dict = {
'Asia': ['Bangkok', 'HongKong', 'KualaLumpur', 'NewDelhi', 'Tokyo'],
'Australia': ['Brisbane', 'Canberra', 'GoldCoast', 'Melbourne', 'Sydney'],
'NZ': ['Auckland', 'Christchurch', 'Dunedin', 'Hamilton','Wellington'],
'USA': ['Chicago', 'Houston', 'LosAngeles', 'NY', 'Seattle']
}
# delete_plate requires the deletion to be first childs and then parents
for plate_id in ['C.C', 'C']:
if plate_id in [plate[0] for plate in hs.plate_manager.plates.items()]:
hs.plate_manager.delete_plate(plate_id=plate_id, delete_meta_data=True)
for country in countries_dict:
id_country = 'country_' + country
if not hs.plate_manager.meta_data_manager.contains(identifier=id_country):
hs.plate_manager.meta_data_manager.insert(
parent='root', data=country, tag='country', identifier=id_country)
for city in countries_dict[country]:
id_city = id_country + '.' + 'city_' + city
if not hs.plate_manager.meta_data_manager.contains(identifier=id_city):
hs.plate_manager.meta_data_manager.insert(
parent=id_country, data=city, tag='city', identifier=id_city)
C = hs.plate_manager.create_plate(plate_id="C", description="Countries", values=[], complement=True,
parent_plate=None, meta_data_id="country")
CC = hs.plate_manager.create_plate(plate_id="C.C", description="Cities", values=[], complement=True,
parent_plate="C", meta_data_id="city")
print hs.plate_manager.meta_data_manager.global_plate_definitions
In [6]:
ti_all = TimeInterval(datetime(1999, 1, 1).replace(tzinfo=UTC),
datetime(2013, 1, 1).replace(tzinfo=UTC))
In [7]:
# parameters for the csv_mutli_reader tool
csv_temp_params = dict(
filename_template='data/TimeSeriesDatasets_130207/Temp{}.csv',
datetime_parser=dateparser, skip_rows=0, header=True)
csv_rain_params = dict(
filename_template='data/TimeSeriesDatasets_130207/{}Rainfall.csv',
datetime_parser=dateparser, skip_rows=0, header=True)
def mean(x):
"""
Computes the mean of the values in x, discarding the None values
"""
x = [value for value in x if value is not None]
return float(sum(x)) / max(len(x), 1)
with Workflow(workflow_id='tutorial_05',
name='tutorial_05',
owner='tutorials',
description='Tutorial 5 workflow',
online=False) as w:
country_node_raw_temp = w.create_node(stream_name='raw_temp_data', channel=M, plates=[C])
country_node_temp = w.create_node(stream_name='temp_data', channel=M, plates=[C])
city_node_temp = w.create_node(stream_name='city_temp', channel=M, plates=[CC])
city_node_avg_temp = w.create_node(stream_name='city_avg_temp', channel=M, plates=[CC])
country_node_avg_temp = w.create_node(stream_name='country_avg_temp', channel=M, plates=[C])
country_node_raw_rain = w.create_node(stream_name='raw_rain_data', channel=M, plates=[C])
city_node_rain = w.create_node(stream_name='city_rain', channel=M, plates=[CC])
country_node_avg_rain = w.create_node(stream_name='country_avg_rain', channel=M, plates=[C])
city_node_temp_rain = w.create_node(stream_name='city_temp_rain', channel=M, plates=[CC])
country_node_avg_temp_rain = w.create_node(stream_name='country_avg_temp_rain', channel=M, plates=[C])
world_node_avg_temp = w.create_node(stream_name='world_avg_temp', channel=M, plates=[])
for c in C:
country_node_raw_temp[c] = hs.plugins.data_importers.factors.csv_multi_reader(
source=None, **csv_temp_params)
country_node_temp[c] = hs.factors.apply(
sources=[country_node_raw_temp[c]],
func=split_temperatures)
country_node_raw_rain[c] = hs.plugins.data_importers.factors.csv_multi_reader(
source=None, **csv_rain_params)
for cc in CC[c]:
city_node_temp[cc] = hs.factors.splitter_from_stream(
source=country_node_temp[c],
splitting_node=country_node_temp[c],
use_mapping_keys_only=True)
city_node_avg_temp[cc] = hs.factors.apply(
sources=[city_node_temp[c]],
func=dict_mean)
city_node_rain[cc] = hs.factors.splitter_from_stream(
source=country_node_raw_rain[c],
splitting_node=country_node_raw_rain[c],
use_mapping_keys_only=True)
city_node_temp_rain[cc] = hs.plugins.example.factors.aligned_correlation(
sources=[city_node_avg_temp[cc],
city_node_rain[cc]],
use_mapping_keys_only=True)
country_node_avg_temp[c] = hs.factors.aggregate(
sources=[city_node_avg_temp],
alignment_node=None,
aggregation_meta_data='city', func=mean)
country_node_avg_rain[c] = hs.factors.aggregate(
sources=[city_node_rain],
alignment_node=None,
aggregation_meta_data='city', func=mean)
country_node_avg_temp_rain[c] = hs.factors.aggregate(
sources=[city_node_temp_rain],
alignment_node=None,
aggregation_meta_data='city', func=mean)
world_node_avg_temp[None] = hs.factors.aggregate(sources=[country_node_avg_temp],
alignment_node=None,
aggregation_meta_data='country',
func=mean)
w.execute(ti_all)
Lets see the a small sample of the temperatures in each city. We can use the function find_streams to retrieve all the streams that have as a meta_data the key and values that we specify. In the following example we find all the streams with the name temp_data and we print a small sample
In [8]:
ti_sample = TimeInterval(datetime(2007, 1, 1).replace(tzinfo=UTC),
datetime(2007, 2, 1).replace(tzinfo=UTC))
for stream_id, stream in M.find_streams(name='temp_data').iteritems():
print(stream_id)
print(stream.window(ti_sample).items())
In [9]:
for stream_id, stream in M.find_streams(name='city_avg_temp').iteritems():
print('[{}]'.format(stream_id))
print(stream.window(ti_sample).items())
We can see the ratio between the temperature and the rain for every month. In this case, we do not have the rain for most of the cities. For that reason, some of the nodes are empty.
In [10]:
for stream_id, stream in M.find_streams(name='city_temp_rain').iteritems():
print('[{}]'.format(stream_id))
print(stream.window(ti_sample).items())
Here we create a function that will extract the names, timestamps and values of all the streams and will return them in the correct format to call the function plot_multiple_stock that is used through all the tutorial.
Then, we can find the streams that we want to visualize and plot their values. In the following example, we can see the average temperature of some cities of Australia.
In [11]:
def get_x_y_names_from_streams(streams, tag=None):
names = []
y = []
x = []
for stream_id, stream in streams.iteritems():
if len(stream.window().items()) == 0:
continue
if tag is not None:
meta_data = dict(stream_id.meta_data)
name = meta_data[tag]
else:
name = ''
names.append(name)
y.append([instance.value for instance in stream.window().items()])
x.append([str(instance.timestamp) for instance in stream.window().items()])
return y, x, names
data, time, names = get_x_y_names_from_streams(M.find_streams(country='Australia', name='city_avg_temp'), 'city')
plot_multiple_stock(data, time=time, names=names,
title='Temperatures in Australia', ylabel='ºC')
Here we visualize the average temperatures in some cities of New Zealand.
In [12]:
data, time, names = get_x_y_names_from_streams(M.find_streams(country='NZ', name='city_avg_temp'), 'city')
plot_multiple_stock(data, time=time, names=names,
title='Temperatures in New Zealand', ylabel='ºC')
The rain-fall in New Zealand.
In [13]:
data, time, names = get_x_y_names_from_streams(M.find_streams(country='NZ', name='city_rain'), 'city')
plot_multiple_stock(data, time=time, names=names,
title='Rain in New Zealand', ylabel='some precipitation unit')
And the correlation between temperature and rain of all the cities. In this case, we only have this ratio for the some of the cities of New Zealand.
In [14]:
data, time, names = get_x_y_names_from_streams(M.find_streams(name='city_temp_rain'), 'city')
plot_multiple_stock(data, time=time, names=names,
title='Temperatures in New Zealand', ylabel='Cº/rain units')
We can see the streams at a country level with the averages of each of its cities.
In [15]:
data, time, names = get_x_y_names_from_streams(M.find_streams(name='country_avg_temp'), 'country')
plot_multiple_stock(data, time=time, names=names,
title='Temperatures in countries', ylabel='ºC')
In [16]:
data, time, names = get_x_y_names_from_streams(M.find_streams(name='country_avg_rain'), 'country')
plot_multiple_stock(data, time=time, names=names,
title='Average rain in countries', ylabel='some precipitation unit')
In [17]:
data, time, names = get_x_y_names_from_streams(M.find_streams(name='world_avg_temp'))
plot_multiple_stock(data, time=time, names=names,
title='Average temperature in all countries', ylabel='Cº')
In [18]:
from pprint import pprint
pprint(w.to_dict(tool_long_names=False))
In [19]:
print(w.to_json(w.factorgraph_viz, tool_long_names=False, indent=4))