In [11]:
from __future__ import print_function, division
import pandas as pd
import numpy as np
import itertools, os, time

print("Pandas version = ", pd.__version__)
print("Numpy version =  ", np.__version__)


Pandas version =  0.13.1
Numpy version =   1.8.1

Create some toy data


In [16]:
N_PERIODS = 1E4
N_METERS = 5
N_MEASUREMENTS_PER_METER = 3

meters = ['meter{:d}'.format(i) for i in range(1,N_METERS+1)]
meters = [[m]*N_MEASUREMENTS_PER_METER for m in meters]
flatten_2d_list = lambda lst: list(itertools.chain(*lst))
meters = flatten_2d_list(meters)
level2 = ['power', 'power', 'voltage'][:N_MEASUREMENTS_PER_METER] * N_METERS
level3 = ['active', 'reactive', ''][:N_MEASUREMENTS_PER_METER] * N_METERS


columns = [meters, level2, level3]
columns = pd.MultiIndex.from_arrays(columns)
rng = pd.date_range('2012-01-01', freq='S', periods=N_PERIODS)
data = np.random.randint(low=0, high=1000, 
                         size=(N_PERIODS, 
                               N_METERS*N_MEASUREMENTS_PER_METER))
df = pd.DataFrame(data=data, index=rng, columns=columns, dtype=np.float32)

df.ix[:3]


Out[16]:
meter1 meter2 meter3 meter4 meter5
power voltage power voltage power voltage power voltage power voltage
active reactive active reactive active reactive active reactive active reactive
2012-01-01 00:00:00 54 778 352 312 585 525 403 954 688 937 213 374 681 317 171
2012-01-01 00:00:01 469 868 815 603 781 388 417 960 99 701 628 393 488 133 46
2012-01-01 00:00:02 305 399 28 399 622 155 554 936 492 487 188 730 340 154 650

3 rows × 15 columns

Save the data to an HDF5 file


In [17]:
envi_r = {'manufacturer': 'Current Cost', 
          'name': 'EnviR', 
          'sample_period': 6}

filename = 'random.h5'
store = pd.HDFStore(filename, 'w', complevel=9, complib='bzip2')
# store.root._v_attrs.meter_types = {envi_r['model']: envi_r}
for meter in df.columns.levels[0]:
    print("Saving " + meter)
    key = 'building1/electric/' + meter
    store.put(key, df[meter], format='table')
    store.get_storer(key).attrs.metadata = {'model': envi_r['name'], 'submeter_of': 1}
    print(store.get_storer(key).attrs.metadata)
print(store)

# Save dataset-wide metadata
# store._handle.create_group('/', 'nilm_metadata', 'NILM Metadata')
# store.root.nilm_metadata._v_attrs.meter_types = {envi_r['model']: envi_r}

store.root._v_attrs.dataset = {'meter_models': {envi_r['name']: envi_r}}

store.close()


Saving meter1
{'model': 'EnviR', 'submeter_of': 1}
Saving meter2
{'model': 'EnviR', 'submeter_of': 1}
Saving meter3
{'model': 'EnviR', 'submeter_of': 1}
Saving meter4
{'model': 'EnviR', 'submeter_of': 1}
Saving meter5
{'model': 'EnviR', 'submeter_of': 1}
<class 'pandas.io.pytables.HDFStore'>
File path: random.h5
/building1/electric/meter1            frame_table  (typ->appendable,nrows->10000,ncols->3,indexers->[index])
/building1/electric/meter2            frame_table  (typ->appendable,nrows->10000,ncols->3,indexers->[index])
/building1/electric/meter3            frame_table  (typ->appendable,nrows->10000,ncols->3,indexers->[index])
/building1/electric/meter4            frame_table  (typ->appendable,nrows->10000,ncols->3,indexers->[index])
/building1/electric/meter5            frame_table  (typ->appendable,nrows->10000,ncols->3,indexers->[index])

In [18]:
filename = 'random.h5'
store = pd.HDFStore(filename, 'r')
print(store['building1/electric/meter1'].ix[:5])
print(store.get_storer('building1/electric/meter1').attrs.metadata)
print(store.get_storer('building1/electric/meter2').attrs.metadata)

print(store.root._v_attrs.dataset['meter_models'])

store.close()


                      power            voltage
                     active  reactive         
2012-01-01 00:00:00      54       778      352
2012-01-01 00:00:01     469       868      815
2012-01-01 00:00:02     305       399       28
2012-01-01 00:00:03     478       457      926
2012-01-01 00:00:04     885        95      396

[5 rows x 3 columns]
{'model': 'EnviR', 'submeter_of': 1}
{'model': 'EnviR', 'submeter_of': 1}
{'EnviR': {'manufacturer': 'Current Cost', 'name': 'EnviR', 'sample_period': 6}}

In [5]:
class TableStore(object):
    """Provides a common interface to all physical data stores.  
    Supports hierarchical stores.  Only reads data.  Reads and writes metadata.
    e.g. one HDF5 file, or one Xively feed, or one REDD data set etc.
    There could be subclasses for HDF5, Xively feeds,
    Current Cost meters, OpenEnergyMonitor, rfm_ecomanager,
    REDD, UKPD, etc. Or MetOffice XLS data, 
    weather feeds from the internet etc.
    
    Only opens the underlying data source when absolutely necessary 
    (by minimising the time the data source is open, we can minimise
    the chance of data being corrupted if another process changes it).
    """
    pass

MAX_MEM_ALLOWANCE_IN_BYTES = 1E9

def date_range_to_terms(start_date=None, end_date=None):
    terms = []
    if start_date is not None:
        terms.append("index>=start_date")
    if end_date is not None:
        terms.append("index<end_date")
    return terms

class HDFTableStore(TableStore):
    def __init__(self, filename, table, start_date=None, end_date=None):
        """
        Parameters
        ----------
        filename : string
        table : string
            e.g. '/building1/utility/electric/meter1'
        start_end, end_date : string or pd.Timestamp, optional
            Defines a 'mask' applied to all operations on this object.
        """
        self.store = pd.HDFStore(filename, 'r')
        self.store.close()
        self._check_key(table)
        self.table = table
        self.start_date = pd.Timestamp(start_date) if start_date else None
        self.end_date = pd.Timestamp(end_date) if end_date else None
               
    def load(self, cols=None, start_date=None, end_date=None):
        """
        Parameters
        ----------
        cols : list or 'index', optional
            e.g. [('power', 'active'), ('power', 'reactive'), ('voltage', '')]
            if not provided then will return all columns from the table.
        start_date, end_date : string or pd.Timestamp, optional
            defines the time period to load as range (start_date, end_date]
            
        Returns
        -------
        if `cols=='index'` then returns a pd.DatetimeIndex
        else returns a pd.DataFrame
        """
        start_date, end_date = self.restrict_start_and_end_dates(start_date, end_date)
        
        # Create list of query terms
        terms = date_range_to_terms(start_date, end_date)
        if cols is not None:
            if not self.table_has_column_names(cols):
                raise KeyError('at least one of ' + str(cols) + ' is not a valid column')
            terms.append("columns==cols")
        if terms == []:
            terms = None
        
        # Check we won't use too much memory
        mem_requirement = self.estimate_memory_requirement(cols, start_date, end_date)
        if mem_requirement > MAX_MEM_ALLOWANCE_IN_BYTES:
            raise MemoryError('Requested data would use too much memory.')
        
        # Read data
        self.store.open()
        data = self.store.select(key=self.table, where=terms, auto_close=True)
        if cols == 'index':
            data = data.index
        return data
    
    def table_has_column_names(self, cols):
        """
        Parameters
        ----------
        cols : string or list of strings
        
        Returns
        -------
        boolean
        """
        if isinstance(cols, str):
            cols = [cols]
        query_cols = set(cols)
        table_cols = set(self.column_names + ['index'])
        return query_cols.issubset(table_cols)
    
    def get_generator(self, periods=None, cols=None):
        """
        Parameters
        ----------
        periods : list of (start_date, end_date) tuples, optional
            e.g. [("2013-01-01", "2013-02-01"), ("2013-02-01", "2013-03-01")]
            
        """
        
        # TODO: this would be much more efficient 
        # if we first got row indicies for each period,
        # then checked each period will fit into memory,
        # and then iterated over the row indicies.        
        if periods is None:
            periods = [self.date_range()]
        for start_date, end_date in periods:
            yield self.load(cols, start_date, end_date)
    
    def estimate_memory_requirement(self, cols=None, start_date=None, end_date=None, apply_mask=True):
        """Returns estimated mem requirement in bytes."""
        BYTES_PER_ELEMENT = 4
        BYTES_PER_TIMESTAMP = 8
        if cols is None:
            cols = self.column_names
        ncols = len(cols)
        nrows = self.nrows(start_date, end_date, apply_mask=apply_mask)
        est_mem_usage_for_data = nrows * ncols * BYTES_PER_ELEMENT
        est_mem_usage_for_index = nrows * BYTES_PER_TIMESTAMP
        return est_mem_usage_for_data + est_mem_usage_for_index
    
    @property
    def column_names(self):
        storer = self._get_storer()
        col_names = storer.non_index_axes[0][1:][0]
        self.store.close()
        return col_names
    
    def nrows(self, start_date=None, end_date=None, apply_mask=True):
        if apply_mask:
            start_date, end_date = self.restrict_start_and_end_dates(start_date, end_date)
        if start_date or end_date:
            terms = date_range_to_terms(start_date, end_date)
            if terms == []:
                terms = None
            self.store.open()
            coords = self.store.select_as_coordinates(self.table, terms)
            nrows_ = len(coords)
        else:
            storer = self._get_storer()
            nrows_ = storer.nrows
            self.store.close()
        return nrows_
    
    def restrict_start_and_end_dates(self, start_date=None, end_date=None):
        if start_date:
            start_date = pd.Timestamp(start_date)
        if end_date:
            end_date = pd.Timestamp(end_date)
            
        if all([start_date, self.start_date]) and start_date < self.start_date:
            start_date = self.start_date
        elif start_date is None:
            start_date = self.start_date
        if all([end_date, self.end_date]) and end_date > self.end_date:
            end_date = self.end_date
        elif end_date is None:
            end_date = self.end_date
        return start_date, end_date
    
    def date_range(self, apply_mask=True):
        """
        Returns
        -------
        (start_date, end_date)
        """
        self.store.open()
        start_date = self.store.select(self.table, [0]).index[0]
        end_date = self.store.select(self.table, start=-1).index[0]
        self.store.close()
        if apply_mask:
            start_date, end_date = self.restrict_start_and_end_dates(start_date, end_date)
        return start_date, end_date
    
    def _get_storer(self):
        """Caller must close store."""
        self.store.open()
        storer = self.store.get_storer(self.table)
        return storer
    
    def _check_key(self, key):
        self.store.open()
        keys = self.store.keys()
        self.store.close()
        if key not in keys:
            raise KeyError(key + ' not in store')
            
    
# SIMPLE TESTS:
ds = HDFTableStore('store.h5', '/building1/utility/electric/meter1', end_date='2012-01-04')
print('columns =', ds.column_names)
print('date range (with region of interest applied) = \n', ds.date_range())
print('date range (with region of interest lifted) = \n', ds.date_range(apply_mask=False))
print('number of rows (ROI applied) =', ds.nrows())
print('number of rows (ROI lifted) =', ds.nrows(apply_mask=False))
print('estimated memory requirement for all data = {:.1f} MBytes'
      .format(ds.estimate_memory_requirement() / 1E6))
ds.load(start_date='2012-01-01 00:00:00', 
        end_date='2012-01-01 00:00:05', 
        cols=[('power', 'active')])
for chunk in ds.get_generator([("2012-01-01", "2012-01-02"), ("2012-01-02", "2012-01-03")]):
    print('start = ', chunk.index[0], '; end =', chunk.index[-1])


columns = [('power', 'active'), ('power', 'reactive'), ('voltage', '')]
date range (with region of interest applied) = 
 (Timestamp('2012-01-01 00:00:00', tz=None), Timestamp('2012-01-04 00:00:00', tz=None))
date range (with region of interest lifted) = 
 (Timestamp('2012-01-01 00:00:00', tz=None), Timestamp('2012-01-12 13:46:39', tz=None))
number of rows (ROI applied) = 259200
number of rows (ROI lifted) = 1000000
estimated memory requirement for all data = 5.2 MBytes
start =  2012-01-01 00:00:00 ; end = 2012-01-01 23:59:59
start =  2012-01-02 00:00:00 ; end = 2012-01-02 23:59:59

In [ ]:
class TableCache(TableStore):
    """
    Neither TableReader nor Meter store any data in memory persistently so,
    without a TableCache, the following types of operation would be slow:
    * doing multiple steps of pre-processing.  
    * doing multiple stats (imagine having a 1GB file.  It'll be very slow if you have to re-load it from disk every time you want to do
      another stat.)
    """
    def __init__(self, table_store):
        self.table_store = table_store
        self.dataframe = None
        
    def get_generator(self):
        """Checks to see if any data requested is already in self.dataframe.
        """
        pass

In [ ]:
class Meter(object):
    
    def __init__(self, table_store):
        self.table_store = table_store
    
    def good_chunks(self):
        """Returns list of (start_date, end_date) of chunks where the sample
        period never exceeds self.max_sample_period.  
        Caches it to self.table_reader.metadata['good_chunks'].  
        Table_reader makes sure that metadata refers to the crop period.
        """
        pass
    
    def dropout_rate_per_period(self):
        chunks = self.table_reader.get_generator(period_alias='D')
        dropout_per_period = []
        for chunk in chunks:
            pass # TODO
    
    def dropout_rate(self, ignore_gaps):
        chunk_boundaries = self.good_chunks() if ignore_gaps else None
        chunks = self.table_reader.get_generator(chunk_boundaries)
        n_expected_samples = 0
        n_actual_samples = 0
        for chunk in chunk:
            n_actual_samples += len(chunk)
            n_expected_samples += (chunk.index[-1] - chunk.index[0]).total_seconds() / self.sample_period
        dropout_rate_ = 1 - (n_actual_samples / n_expected_samples)
        assert(0 >= dropout_rate_ <= 1, 'dropout_rate is not [0,1].  It is ' + str(dropout_rate_))
        return dropout_rate_
        
    def apply_processing_funcs(self, funcs, destination=None):
        """
        Parameters
        ----------
        funcs : list of functions
            e.g. [normalise, bookend_good_chunks_with_zeros, remove_implausable_values]
        destination : table_store, defaults to source
        """
        if destination is None:
            destination = self.table_store
        chunks = self.table_reader.get_generator()
        for chunk in chunks:
            for func in funcs:
                chunk = func(self, chunk)
            table_writer.write(chunk)