In [11]:
from __future__ import print_function, division
import pandas as pd
import numpy as np
import itertools, os, time
print("Pandas version = ", pd.__version__)
print("Numpy version = ", np.__version__)
In [16]:
N_PERIODS = 1E4
N_METERS = 5
N_MEASUREMENTS_PER_METER = 3
meters = ['meter{:d}'.format(i) for i in range(1,N_METERS+1)]
meters = [[m]*N_MEASUREMENTS_PER_METER for m in meters]
flatten_2d_list = lambda lst: list(itertools.chain(*lst))
meters = flatten_2d_list(meters)
level2 = ['power', 'power', 'voltage'][:N_MEASUREMENTS_PER_METER] * N_METERS
level3 = ['active', 'reactive', ''][:N_MEASUREMENTS_PER_METER] * N_METERS
columns = [meters, level2, level3]
columns = pd.MultiIndex.from_arrays(columns)
rng = pd.date_range('2012-01-01', freq='S', periods=N_PERIODS)
data = np.random.randint(low=0, high=1000,
size=(N_PERIODS,
N_METERS*N_MEASUREMENTS_PER_METER))
df = pd.DataFrame(data=data, index=rng, columns=columns, dtype=np.float32)
df.ix[:3]
Out[16]:
In [17]:
envi_r = {'manufacturer': 'Current Cost',
'name': 'EnviR',
'sample_period': 6}
filename = 'random.h5'
store = pd.HDFStore(filename, 'w', complevel=9, complib='bzip2')
# store.root._v_attrs.meter_types = {envi_r['model']: envi_r}
for meter in df.columns.levels[0]:
print("Saving " + meter)
key = 'building1/electric/' + meter
store.put(key, df[meter], format='table')
store.get_storer(key).attrs.metadata = {'model': envi_r['name'], 'submeter_of': 1}
print(store.get_storer(key).attrs.metadata)
print(store)
# Save dataset-wide metadata
# store._handle.create_group('/', 'nilm_metadata', 'NILM Metadata')
# store.root.nilm_metadata._v_attrs.meter_types = {envi_r['model']: envi_r}
store.root._v_attrs.dataset = {'meter_models': {envi_r['name']: envi_r}}
store.close()
In [18]:
filename = 'random.h5'
store = pd.HDFStore(filename, 'r')
print(store['building1/electric/meter1'].ix[:5])
print(store.get_storer('building1/electric/meter1').attrs.metadata)
print(store.get_storer('building1/electric/meter2').attrs.metadata)
print(store.root._v_attrs.dataset['meter_models'])
store.close()
In [5]:
class TableStore(object):
"""Provides a common interface to all physical data stores.
Supports hierarchical stores. Only reads data. Reads and writes metadata.
e.g. one HDF5 file, or one Xively feed, or one REDD data set etc.
There could be subclasses for HDF5, Xively feeds,
Current Cost meters, OpenEnergyMonitor, rfm_ecomanager,
REDD, UKPD, etc. Or MetOffice XLS data,
weather feeds from the internet etc.
Only opens the underlying data source when absolutely necessary
(by minimising the time the data source is open, we can minimise
the chance of data being corrupted if another process changes it).
"""
pass
MAX_MEM_ALLOWANCE_IN_BYTES = 1E9
def date_range_to_terms(start_date=None, end_date=None):
terms = []
if start_date is not None:
terms.append("index>=start_date")
if end_date is not None:
terms.append("index<end_date")
return terms
class HDFTableStore(TableStore):
def __init__(self, filename, table, start_date=None, end_date=None):
"""
Parameters
----------
filename : string
table : string
e.g. '/building1/utility/electric/meter1'
start_end, end_date : string or pd.Timestamp, optional
Defines a 'mask' applied to all operations on this object.
"""
self.store = pd.HDFStore(filename, 'r')
self.store.close()
self._check_key(table)
self.table = table
self.start_date = pd.Timestamp(start_date) if start_date else None
self.end_date = pd.Timestamp(end_date) if end_date else None
def load(self, cols=None, start_date=None, end_date=None):
"""
Parameters
----------
cols : list or 'index', optional
e.g. [('power', 'active'), ('power', 'reactive'), ('voltage', '')]
if not provided then will return all columns from the table.
start_date, end_date : string or pd.Timestamp, optional
defines the time period to load as range (start_date, end_date]
Returns
-------
if `cols=='index'` then returns a pd.DatetimeIndex
else returns a pd.DataFrame
"""
start_date, end_date = self.restrict_start_and_end_dates(start_date, end_date)
# Create list of query terms
terms = date_range_to_terms(start_date, end_date)
if cols is not None:
if not self.table_has_column_names(cols):
raise KeyError('at least one of ' + str(cols) + ' is not a valid column')
terms.append("columns==cols")
if terms == []:
terms = None
# Check we won't use too much memory
mem_requirement = self.estimate_memory_requirement(cols, start_date, end_date)
if mem_requirement > MAX_MEM_ALLOWANCE_IN_BYTES:
raise MemoryError('Requested data would use too much memory.')
# Read data
self.store.open()
data = self.store.select(key=self.table, where=terms, auto_close=True)
if cols == 'index':
data = data.index
return data
def table_has_column_names(self, cols):
"""
Parameters
----------
cols : string or list of strings
Returns
-------
boolean
"""
if isinstance(cols, str):
cols = [cols]
query_cols = set(cols)
table_cols = set(self.column_names + ['index'])
return query_cols.issubset(table_cols)
def get_generator(self, periods=None, cols=None):
"""
Parameters
----------
periods : list of (start_date, end_date) tuples, optional
e.g. [("2013-01-01", "2013-02-01"), ("2013-02-01", "2013-03-01")]
"""
# TODO: this would be much more efficient
# if we first got row indicies for each period,
# then checked each period will fit into memory,
# and then iterated over the row indicies.
if periods is None:
periods = [self.date_range()]
for start_date, end_date in periods:
yield self.load(cols, start_date, end_date)
def estimate_memory_requirement(self, cols=None, start_date=None, end_date=None, apply_mask=True):
"""Returns estimated mem requirement in bytes."""
BYTES_PER_ELEMENT = 4
BYTES_PER_TIMESTAMP = 8
if cols is None:
cols = self.column_names
ncols = len(cols)
nrows = self.nrows(start_date, end_date, apply_mask=apply_mask)
est_mem_usage_for_data = nrows * ncols * BYTES_PER_ELEMENT
est_mem_usage_for_index = nrows * BYTES_PER_TIMESTAMP
return est_mem_usage_for_data + est_mem_usage_for_index
@property
def column_names(self):
storer = self._get_storer()
col_names = storer.non_index_axes[0][1:][0]
self.store.close()
return col_names
def nrows(self, start_date=None, end_date=None, apply_mask=True):
if apply_mask:
start_date, end_date = self.restrict_start_and_end_dates(start_date, end_date)
if start_date or end_date:
terms = date_range_to_terms(start_date, end_date)
if terms == []:
terms = None
self.store.open()
coords = self.store.select_as_coordinates(self.table, terms)
nrows_ = len(coords)
else:
storer = self._get_storer()
nrows_ = storer.nrows
self.store.close()
return nrows_
def restrict_start_and_end_dates(self, start_date=None, end_date=None):
if start_date:
start_date = pd.Timestamp(start_date)
if end_date:
end_date = pd.Timestamp(end_date)
if all([start_date, self.start_date]) and start_date < self.start_date:
start_date = self.start_date
elif start_date is None:
start_date = self.start_date
if all([end_date, self.end_date]) and end_date > self.end_date:
end_date = self.end_date
elif end_date is None:
end_date = self.end_date
return start_date, end_date
def date_range(self, apply_mask=True):
"""
Returns
-------
(start_date, end_date)
"""
self.store.open()
start_date = self.store.select(self.table, [0]).index[0]
end_date = self.store.select(self.table, start=-1).index[0]
self.store.close()
if apply_mask:
start_date, end_date = self.restrict_start_and_end_dates(start_date, end_date)
return start_date, end_date
def _get_storer(self):
"""Caller must close store."""
self.store.open()
storer = self.store.get_storer(self.table)
return storer
def _check_key(self, key):
self.store.open()
keys = self.store.keys()
self.store.close()
if key not in keys:
raise KeyError(key + ' not in store')
# SIMPLE TESTS:
ds = HDFTableStore('store.h5', '/building1/utility/electric/meter1', end_date='2012-01-04')
print('columns =', ds.column_names)
print('date range (with region of interest applied) = \n', ds.date_range())
print('date range (with region of interest lifted) = \n', ds.date_range(apply_mask=False))
print('number of rows (ROI applied) =', ds.nrows())
print('number of rows (ROI lifted) =', ds.nrows(apply_mask=False))
print('estimated memory requirement for all data = {:.1f} MBytes'
.format(ds.estimate_memory_requirement() / 1E6))
ds.load(start_date='2012-01-01 00:00:00',
end_date='2012-01-01 00:00:05',
cols=[('power', 'active')])
for chunk in ds.get_generator([("2012-01-01", "2012-01-02"), ("2012-01-02", "2012-01-03")]):
print('start = ', chunk.index[0], '; end =', chunk.index[-1])
In [ ]:
class TableCache(TableStore):
"""
Neither TableReader nor Meter store any data in memory persistently so,
without a TableCache, the following types of operation would be slow:
* doing multiple steps of pre-processing.
* doing multiple stats (imagine having a 1GB file. It'll be very slow if you have to re-load it from disk every time you want to do
another stat.)
"""
def __init__(self, table_store):
self.table_store = table_store
self.dataframe = None
def get_generator(self):
"""Checks to see if any data requested is already in self.dataframe.
"""
pass
In [ ]:
class Meter(object):
def __init__(self, table_store):
self.table_store = table_store
def good_chunks(self):
"""Returns list of (start_date, end_date) of chunks where the sample
period never exceeds self.max_sample_period.
Caches it to self.table_reader.metadata['good_chunks'].
Table_reader makes sure that metadata refers to the crop period.
"""
pass
def dropout_rate_per_period(self):
chunks = self.table_reader.get_generator(period_alias='D')
dropout_per_period = []
for chunk in chunks:
pass # TODO
def dropout_rate(self, ignore_gaps):
chunk_boundaries = self.good_chunks() if ignore_gaps else None
chunks = self.table_reader.get_generator(chunk_boundaries)
n_expected_samples = 0
n_actual_samples = 0
for chunk in chunk:
n_actual_samples += len(chunk)
n_expected_samples += (chunk.index[-1] - chunk.index[0]).total_seconds() / self.sample_period
dropout_rate_ = 1 - (n_actual_samples / n_expected_samples)
assert(0 >= dropout_rate_ <= 1, 'dropout_rate is not [0,1]. It is ' + str(dropout_rate_))
return dropout_rate_
def apply_processing_funcs(self, funcs, destination=None):
"""
Parameters
----------
funcs : list of functions
e.g. [normalise, bookend_good_chunks_with_zeros, remove_implausable_values]
destination : table_store, defaults to source
"""
if destination is None:
destination = self.table_store
chunks = self.table_reader.get_generator()
for chunk in chunks:
for func in funcs:
chunk = func(self, chunk)
table_writer.write(chunk)