In [1]:
import sys
import pandas
import datetime
import bigtempo.core as core
import bigtempo.tester as tester
In [2]:
dt = datetime.datetime
cities = ['CITY_A', 'CITY_B']
In [3]:
engine = core.DatasourceEngine()
In [4]:
import os
def _get_test_data_dir():
data_dir = os.path.abspath('acceptance_tests_data')
return data_dir if not 'ipy-notebooks' in data_dir else os.path.abspath(os.path.join('..', 'bigtempo', 'tests', 'acceptance_tests_data'))
def _get_test_data_filename(reference, symbol=None):
symbol_part = '' if not symbol else '{%s}' % symbol
return '%s%s.csv' % (reference, symbol_part)
def _get_test_data_filepath(reference, symbol=None):
return os.path.join(_get_test_data_dir(), _get_test_data_filename(reference, symbol))
In [5]:
@engine.datasource('SAMPLE',
tags=['SAMPLE_IN', 'DAILY'],
frequency='B')
class Sample(object):
def evaluate(self, context, symbol, start=None, end=None):
return pandas.DataFrame.from_csv(_get_test_data_filepath('SAMPLE', symbol))
In [6]:
engine.select().all()
Out[6]:
In [7]:
engine.get("SAMPLE").process('CITY_A', dt(2001, 1, 1), dt(2001, 12, 31)).plot()
Out[7]:
In [8]:
@engine.datasource('WEEKLY_SAMPLE',
dependencies=['SAMPLE'],
tags=['SAMPLE_IN', 'WEEKLY'],
frequency='W-FRI')
class Weekly(object):
def evaluate(self, context, symbol, start=None, end=None):
return context.dependencies('SAMPLE').resample('W-FRI', how=lambda x: x[-1])
In [9]:
engine.select().all()
Out[9]:
In [10]:
engine.get("WEEKLY_SAMPLE").process('CITY_A', dt(2001, 1, 1), dt(2001, 12, 31)).plot()
Out[10]:
In [11]:
@engine.for_each(engine.select('SAMPLE_IN'))
def _rolling_mean_factory(source_reference):
@engine.datasource('ROLLING_MEAN:%s' % source_reference,
dependencies=[source_reference],
lookback=7,
tags=['ROLLING_MEAN'])
class RollingMean(object):
def evaluate(self, context, symbol, start=None, end=None):
input_ds = context.dependencies(source_reference)
return pandas.rolling_mean(input_ds, 7)
In [12]:
engine.select('SAMPLE_IN')
Out[12]:
In [13]:
engine.select('ROLLING_MEAN')
Out[13]:
In [14]:
@engine.datasource('MONTHLY_SAMPLE',
dependencies=['SAMPLE'],
tags=['SAMPLE_IN', 'MONTHLY'],
frequency='M')
class Monthly(object):
def evaluate(self, context, symbol, start=None, end=None):
return context.dependencies('SAMPLE').resample('M', how=lambda x: x[-1])
In [15]:
engine.select('SAMPLE_IN')
Out[15]:
In [16]:
engine.select('ROLLING_MEAN')
Out[16]:
In [17]:
m1 = engine.get("MONTHLY_SAMPLE").process('CITY_A', dt(2001, 1, 1), dt(2001, 12, 31))
m1.plot()
Out[17]:
In [18]:
m2 = engine.get("MONTHLY_SAMPLE").process('CITY_A', dt(2000, 10, 1), dt(2002, 2, 1))
m2.plot()
Out[18]:
A possible test to assert time retrieval in:
In [19]:
m = (m2 - m1).dropna()
assert len(m) is 12
assert any(m) == False
In [20]:
engine.select().all()
Out[20]:
In [21]:
@engine.for_each(engine.select('SAMPLE_IN').union('ROLLING_MEAN'))
def _percentual_change_factory(source_reference):
@engine.datasource('PERCENTUALT_CHANGE:%s' % source_reference,
dependencies=[source_reference],
lookback=1,
tags=['PERCENTUALT_CHANGE'])
class PctChange(object):
def evaluate(self, context, symbol, start=None, end=None):
return context.dependencies(source_reference).pct_change()
In [22]:
engine.select().all()
Out[22]:
In [23]:
mp1 = engine.get("PERCENTUALT_CHANGE:MONTHLY_SAMPLE").process('CITY_A', dt(2001, 1, 1), dt(2001, 12, 31))
mp1
Out[23]:
In [24]:
mp1.plot()
Out[24]:
In [25]:
#for city in cities:
# for ds_ref in engine.select().all():
# engine.get(ds_ref).process(city).to_csv('%s{%s}.csv' % (ds_ref, city))