We’ll start with a quick, non-comprehensive overview of the fundamental concepts to get you started.
This example makes use of a fictitious sample metric for 2 imaginary cities: CITY_A
and CITY_B
. All the data was randomly generated and does not make any allusions to reality. This Getting Started
was created using ipython notebook, a great tool for prototyping, and it is available within the project at ipy-notebooks/getting_started.ipynb
. This same scenario is implemented in an acceptance test at bigtempo/tests/acceptance_tests.py
.
In this example, everything will be keept in a single module, but for larger projects you want to create a dedicated module for the engine instance. Since this instance is used as the entry-point for everything you want to do with bigtempo, like creating datasources, using selections and evaluating data, it must be possible for other modules to import it.
In [1]:
# Importing bigtempo and creating an engine instance (this may be done in a dedicated module)
import bigtempo.core as core
engine = core.DatasourceEngine()
In [2]:
'''
The `datasource` decorator registers a class a new datasource
Args:
reference: A string reference for this datasource
dependencies: A list of references for the datasources it needs during evaluation time (OPTIONAL)
tags: A list of tags for this datasource (OPTIONAL)
lookback: The number of periods before the start date that is needed by the evaluation (OPTIONAL)
frequency: Represents the frequency period (OPTIONAL)
------------------------------------
| Possible Frequencies |
------------------------------------
| B | business day |
| W | weekly frequency |
| M | month end |
| BM | business month end |
| MS | month start |
| BMS | business month start |
| Q | quarter end |
| BQ | business quarter end |
| QS | quarter start |
| BQS | business quarter start |
| A | year end |
| BA | business year end |
| AS | year start |
| BAS | business year start |
| H | hourly frequency |
| T | minutely |
| S | secondly |
| L | milliseonds |
| U | microseconds |
------------------------------------
'''
@engine.datasource(reference, dependencies=[], tags=[], lookback=0, frequency='B')
class Sample(object):
def evaluate(self, context, symbol, start=None, end=None):
'''
The `evaluate` should be able to process data (from other datasources or not)
for a given symbol (data category) in a certain period.
Args:
context: object that holds data requested as dependencies
symbol: istinguishes the "variation" of the data requested: In the stock market context, it would be used to specify with stock paper that should be processed, and in this example it represents the name of one city ('CITY_A' or 'CITY_B').
start and end: optional datetime objects that are used to specify the desired time range.
It's important to notice that the data returned by the datasource is automaticaly sliced with __start__ and __end__. They are present in the method's signature only so that you can spare some processing time.
Returns:
pandas.DataFrame object
'''
return ###
So, being pratical,
In [3]:
# General imports
import sys
import pandas
import datetime
In [4]:
# Used many times. Used here only for the sake of legibility
dt = datetime.datetime
# Cities being analysed
cities = ['CITY_A', 'CITY_B']
In [5]:
@engine.datasource('SAMPLE', tags=['SAMPLE_IN', 'DAILY'])
class Sample(object):
'''
Simple sample, returns datasource based on csv file.
'''
def evaluate(self, context, symbol, start=None, end=None):
return pandas.DataFrame.from_csv('SAMPLE{%s}.csv' % symbol)
defines and registers a datasource that:
'SAMPLE'
'SAMPLE_IN'
and 'DAILY'
.csv
Reference and tags are important because it is how you can locate datasources (either directly or through selection queries) and process them...
And as we touch this matter, we make use of a datasource through a get
method the engine
provides, which - given a reference - returns a datasource processor:
In [6]:
# do not forget that start and end datetimes are both optional
result = engine.get("SAMPLE").process('CITY_A', dt(2001, 1, 1), dt(2001, 12, 31))
result.plot()
Out[6]:
Another example:
In [7]:
@engine.datasource('WEEKLY_SAMPLE',
dependencies=['SAMPLE'],
tags=['SAMPLE_IN', 'WEEKLY'],
frequency='W-FRI')
class Weekly(object):
'''
Resample of the SAMPLE datasource on a weekly base.
'''
def evaluate(self, context, symbol, start=None, end=None):
return context.dependencies('SAMPLE').resample('W-FRI', how=lambda x: x[-1])
Again, it can be instantly used:
In [8]:
engine.get("WEEKLY_SAMPLE").process('CITY_A', dt(2001, 1, 1), dt(2001, 12, 31)).plot()
Out[8]:
In [9]:
engine.select('DAILY')
Out[9]:
In [10]:
engine.select('SAMPLE_IN')
Out[10]:
In [11]:
engine.select('SAMPLE')
Out[11]:
In [12]:
engine.select().all()
Out[12]:
It will be used so we can better explain: the great
In [13]:
'''
The `for_each` engine decorator (method) registers a function as a listener,
called upon each (existing and future) registration meeting the given selections.
The listener/factory method receives an argument for each selection passed as argument.
Args:
*selection
Calls the decorated function (callable) multiple times, each time passing as arguments
a combination between the results of the given selections.
Example
-------
Given:
a selection `s1` with 'A', 'B', and 'C'
a selection `s2` with '1' and '2'
@engine.for_each(s1, s2)
def _listener(s1_ref, s2_ref):
print '%s:%s'
Would print out:
A1
A2
B1
B2
C1
C2
And, if later on a new datasource '3' matched `s2`, the listener would be automaticaly
called and the following would be printed:
A3
B3
C3
'''
@engine.for_each(engine.select('SAMPLE_IN'))
def _rolling_mean_factory(source_reference):
periods = 7
@engine.datasource('ROLLING_MEAN(%i):%s' % (periods, source_reference), # Each datasource reference should be unique
dependencies=[source_reference], # Each datasource is based on a different dependency
lookback=periods,
tags=['ROLLING_MEAN'])
class RollingMean(object):
'''
%i period rolling mean for each datasource tagged with `'SAMPLE_IN'`
''' % periods
def evaluate(self, context, symbol, start=None, end=None):
input_ds = context.dependencies(source_reference)
return pandas.rolling_mean(input_ds, periods)
This mechanism, used in conjunction with selections, is very powerful in the cenario where you need to create variations of processments.
Surely, variations can also be declared through a for loop
, and both things can be applyed recursively:
In [14]:
@engine.for_each(engine.select('SAMPLE_IN').union('ROLLING_MEAN'))
def _percentual_change_factory(source_reference):
for periods in [1, 3, 5]:
@engine.datasource('PERCENTUALT_CHANGE(%i):%s' % (periods, source_reference),
dependencies=[source_reference],
lookback=periods,
tags=['PERCENTUALT_CHANGE', 'PERCENTUALT_CHANGE(%i)' % periods])
class PctChange(object):
def evaluate(self, context, symbol, start=None, end=None):
return context.dependencies(source_reference).pct_change(periods=periods)
Off course, the for_each
decorator also automaticaly calls the listener for newly registered datasources that match the selections passed as arguments:
In [15]:
@engine.datasource('MONTHLY_SAMPLE',
dependencies=['SAMPLE'],
tags=['SAMPLE_IN', 'MONTHLY'],
frequency='M')
class Monthly(object):
def evaluate(self, context, symbol, start=None, end=None):
return context.dependencies('SAMPLE').resample('M', how=lambda x: x[-1])
There is a variation of this decorator: for_synched
works similarly, but only calls the listener for the combinations where there is a common "datasource origin". Consider the following examples:
In [16]:
@engine.for_each(engine.select('SAMPLE_IN'), engine.select('ROLLING_MEAN'))
def _listener(*kargs):
print kargs
In [17]:
@engine.for_synched(engine.select('SAMPLE_IN'), engine.select('ROLLING_MEAN'))
def _listener(*kargs):
print kargs
The combination ('WEEKLY_SAMPLE', 'ROLLING_MEAN7:SAMPLE')
is not called when using for_synched
because 'ROLLING_MEAN7:SAMPLE'
is not based on 'WEEKLY_SAMPLE'
(not on its dependency tree). Note that ('SAMPLE', 'ROLLING_MEAN7:WEEKLY_SAMPLE')
was between the results because 'WEEKLY_SAMPLE'
is based on 'SAMPLE'
itself.
Selections are enables many awesome things...
You can map and categorize sets of domain metrics, generate analysis over them and process of multiple datasources in a simple manner.
First of all, can be iterated:
In [18]:
for reference in engine.select('SAMPLE_IN'):
print reference
have a length and are able to retrieve a datasource processor directly:
In [19]:
engine.select('SAMPLE_IN').get(0).process('CITY_B')
Out[19]:
counts with a special syntax that allows selecting datasources based on it's dependencies (considering its whole dependency tree):
In [20]:
engine.select('{WEEKLY}')
Out[20]:
supports using multiple arguments, returning datasources that match all of them at once:
In [21]:
engine.select('{WEEKLY}', 'PERCENTUALT_CHANGE')
Out[21]:
Selections also implement "set like" operations (all, union, intersection, difference and symmetric_difference):
In [22]:
engine.select('{WEEKLY}').intersection('PERCENTUALT_CHANGE')
Out[22]:
In [23]:
s1 = engine.select('{WEEKLY}').intersection('PERCENTUALT_CHANGE').difference('PERCENTUALT_CHANGE(1)')
s1
Out[23]:
In [24]:
s1.difference('PERCENTUALT_CHANGE(1)').union('SAMPLE_IN')
Out[24]:
In [25]:
engine.select().all()
Out[25]: