This notebook had been used for internal testing of some of nilmtk features. Please don't use this notebook!
In [18]:
from __future__ import print_function
from copy import deepcopy
class DataStore(object):
def __init__(self, mask=None):
self.mask = [] if mask is None else mask
def state_of_data(self):
return {}
def power_series(self):
for i in range(10):
if i not in self.mask:
yield i
def process(self):
return self.power_series()
class FailedPreconditionError(Exception):
pass
class Node(object):
def __init__(self, upstream):
self.upstream = upstream
def state_of_data(self):
upstream_condition = deepcopy(self.upstream.state_of_data())
upstream_condition.update(self.__class__.outcome)
return upstream_condition
def check_preconditions(self):
requirements = self.__class__.__dict__.get('requirements')
if requirements:
condition = self.upstream.state_of_data()
for k,v in requirements.iteritems():
if not condition.has_key(k):
raise FailedPreconditionError()
if condition[k] != v:
raise FailedPreconditionError()
def run(self):
"""Pulls data through the pipeline. Useful if we just want to calculate
some stats."""
for _ in self.process():
pass
class AddOne(Node):
outcome = {'added_one': True}
requirements = {'multiplied_by_ten': True}
def process(self):
self.check_preconditions()
self.results = 0
for i in self.upstream.process():
output = i+1
self.results += output
yield output
class MultiplyByTen(Node):
outcome = {'multiplied_by_ten': True}
def process(self):
self.check_preconditions()
for i in self.upstream.process():
yield i*10
In [19]:
ds = DataStore([3,5,7])
mult = MultiplyByTen(ds)
addone = AddOne(mult)
addone.run()
addone.results
Out[19]:
In [20]:
# Train disaggregator
for meter in metergroup.meters:
preprocessing = [Clip(), Resample(5)]
for chunk in meter.power_series(preprocessing=preprocessing):
disaggregator.train(meter, chunk)
# and if the disaggregator needs aggregate power as well:
mains_chunk = mains.power_series(periods=[chunk.timeframe]).next()
disaggregator.train(meter, chunk, mains_chunk)
In [ ]:
# Disaggregation and metrics (via disk)
for chunk in mains.power_series():
appliance_estimates_for_chunk = disaggregator.disaggregate(chunk) # get dataframe
append_to_hdf(appliance_estimates_for_chunk)
# Metrics - take two MeterGroups?
# One for ground truth, one for disag estimates