In [50]:
from __future__ import print_function
from copy import deepcopy
class DataStore(object):
def __init__(self, mask=None):
self.mask = [] if mask is None else mask
def state_of_data(self):
return {}
def __iter__(self):
print("DataStore.__iter__")
self.i = -1
return self
def next(self):
self.i += 1
while self.i in self.mask:
self.i += 1
if self.i < 10:
return self.i
else:
raise StopIteration
class FailedPreconditionError(Exception):
pass
class Node(object):
def __init__(self, upstream):
self.upstream = upstream.__iter__()
def state_of_data(self):
upstream_condition = deepcopy(self.upstream.state_of_data())
upstream_condition.update(self.__class__.outcome)
return upstream_condition
def check_preconditions(self):
requirements = self.__class__.__dict__.get('requirements')
if requirements:
condition = self.upstream.state_of_data()
for k,v in requirements.iteritems():
if not condition.has_key(k):
raise FailedPreconditionError()
if condition[k] != v:
raise FailedPreconditionError()
def run(self):
"""Pulls data through the pipeline. Useful if we just want to calculate
some stats."""
for x in self:
pass
class AddOne(Node):
outcome = {'added_one': True}
requirements = {'multiplied_by_ten': True}
def __iter__(self):
print("AddOne.__iter__")
self.check_preconditions()
self.results = 0
return self
def next(self):
i = next(self.upstream)
output = i+1
self.results += output
return output
class MultiplyByTen(Node):
outcome = {'multiplied_by_ten': True}
def __iter__(self):
print("MultiplyByTen.__iter__")
self.check_preconditions()
return self
def next(self):
i = next(self.upstream)
return i*10
In [51]:
ds = DataStore([4,6])
for d in ds:
print(d)
In [52]:
ds = DataStore([3,5,7])
mult = MultiplyByTen(ds)
addone = AddOne(mult)
addone.run()
addone.results
Out[52]:
In [43]:
# Train disaggregator
for meter in metergroup.meters:
preprocessing = [Clip(), Resample(5)]
for chunk in meter.power_series(preprocessing=preprocessing):
disaggregator.train(meter, chunk)
# and if the disaggregator needs aggregate power as well:
mains_chunk = mains.power_series(periods=[chunk.timeframe]).next()
disaggregator.train(meter, chunk, mains_chunk)
In [ ]:
# Disaggregation and metrics (via disk)
for chunk in mains.power_series():
appliance_estimates_for_chunk = disaggregator.disaggregate(chunk) # get dataframe
append_to_hdf(appliance_estimates_for_chunk)
# Metrics - take two MeterGroups?
# One for ground truth, one for disag estimates