Data consolidation

Big task: Move all the development data sources that have been tried so far to AngrySnail aka BrashTapir, stack 'em, format them, resample and reindex and stuff it all into a couple of HDF5 files for archiving. Then clean those up with pck tool from pytables. Plot, and shove png to BuddingSepiida.

Data sources

So far only two nodes. Bed, and chuck.

Bed

Recorded initially on ToastyTamandua, now on RandyDolphin

  • markI.csv
  • markII.csv
  • bed.csv
  • bed_2016-03-26.csv
  • 2016_w13-bed.csv

Chuck

Recorded in VersedSquid. Has some sensor ids in the sqlite database that should be taken into consideration when building codes for categorical columns when converting sensor string name into an integer id.

Initial data needs to be resampled to something reasonable. Originally data was sampled in 1 s intervals. Downsampling to 1 min should be good and in line with how new sampling works.

  • telemetry.db
  • chuck_2016-03-28.csv
  • 2016_w13-chuck.csv

Reference format

Simple three column structure

timestamp (unix[s], datetime64[ns]) | sensor_id (int) | value (float)

TBD

  • Store values as float16 or float32? See precision of float16. I'd save ~6 Bytes, uncompressed, vs. storing double. Worst case for raw arduino values is +-0.5 on 512+. Not bad. But with storing at least 32 bit I lose a lot less information, at 2 B/row cost, and I'm future proof for higher range input without needed to normalize.
    • NEVERMIND: Resampling with bfill or ffill does not work for float16. :D
  • storage format of sensor_id table (JSON, HDF5 meta data?)

In [1]:
import os
import numpy as np
import pandas as pd
import sqlite3
import time
from datetime import datetime as dt
import matplotlib.pyplot as plt
import matplotlib.lines as mpl_lines
%matplotlib inline

In [96]:
DTYPE_VAL = np.float32
DTYPE_CAT = np.uint8
FS_VAL = '1min'
nodes = {'chuck': 0, 'bed': 1}
sensors = {0: {'temp': 1, 'light': 2, 'soil':3, 'dummy': 4, 'ds_temp':5},
           1: {'strain': 6, 'temp': 7, 'motion': 8}}
# needed because of the naming overlap in the sensors. [b/c]x are placeholders for debugging.
cats = {0: ['c0', 'temp', 'light', 'soil', 'c4', 'ds_temp', 'c6',  'c7'],
        1: ['b0', 'b1', 'b2', 'b3', 'b4', 'b5', 'strain',  'temp',  'motion']}

B1 markI.csv

* strain sensor only
* no header, two data columns (min, max) per timestamp

--> only a few hours of noisy, high-Fs baseline. I'd say I leave this one out


In [3]:
def load_b1(fpath='../local/db/consolidation/markI.csv', nid=1):
    df = pd.read_csv(fpath, names=['timestamp', 'wmin', 'wmax'],
                     dtype={'timestamp': np.float64, 'wmin': DTYPE_VAL, 'wmax': DTYPE_VAL})
    df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)

    df = pd.DataFrame({'sid': sensors[nid]['strain'], 'nid': 1,
                       'value': df[["wmin", "wmax"]].mean(axis=1).resample(FS_VAL).ffill()})
    return df

%timeit load_b1()
print load_b1().tail(3)


100 loops, best of 3: 7.03 ms per loop
                     nid  sid  value
timestamp                           
2016-03-11 20:56:00    1    6  466.0
2016-03-11 20:57:00    1    6  470.0
2016-03-11 20:58:00    1    6  470.5

B2 markII.csv

* !! INVALID CHARACTERS !!
* same as markI.csv
* no header

In [4]:
def load_b2(fpath='../local/db/consolidation/markII.csv', nid=1):
    df = pd.read_csv(fpath, names=['timestamp', 'wmin', 'wmax'],
                     dtype={'timestamp': np.float64, 'wmin': DTYPE_VAL, 'wmax': DTYPE_VAL})
    df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
    
    df = pd.DataFrame({'sid': sensors[nid]['strain'],
                       'nid': nid,
                       'value': df[["wmin", "wmax"]].mean(axis=1).resample(FS_VAL).ffill()})
    return df

%timeit load_b2()
print load_b2().tail(3)


10 loops, best of 3: 42.1 ms per loop
                     nid  sid  value
timestamp                           
2016-03-16 22:06:00    1    6  487.5
2016-03-16 22:07:00    1    6  488.5
2016-03-16 22:08:00    1    6  488.5

B3 bed.csv

* strain min and max as sensor type (strain_hi, strain_lo)
* added temp
* no header

In [5]:
def load_b3(fpath='../local/db/consolidation/bed.csv', nid=1):
    df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'],
                     dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
    df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)

    temp = df.value.loc[df.sensor_str == 'temp'].resample(FS_VAL).ffill()
    strain = pd.concat([df.value.loc[df.sensor_str == 'strain_hi'],
                        df.value.loc[df.sensor_str == 'strain_lo']],
                        axis=1).mean(axis=1).resample(FS_VAL).ffill()
    df =  pd.concat([pd.DataFrame({'sid': sensors[nid]['temp'],
                                   'nid': nid,
                                   'value': temp}),
                     pd.DataFrame({'sid': sensors[nid]['strain'],
                                   'nid': nid,
                                   'value': strain})])
    return df

%timeit load_b3()
print load_b3().tail(3)


10 loops, best of 3: 64.9 ms per loop
                     nid  sid       value
timestamp                                
2016-03-25 22:42:00    1    6  482.149994
2016-03-25 22:43:00    1    6  482.200012
2016-03-25 22:44:00    1    6  481.500000

B4 bed_2016-03-26.csv

* added binary motion events
* simple strain values, 5s averages
* no header

In [6]:
def load_b4(fpath='../local/db/consolidation/bed_2016-03-26.csv', nid=1):
    df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'],
                     dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
    df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
    
    temp = df.value.loc[df.sensor_str == 'temp'].resample(FS_VAL).ffill()
    strain = df.value.loc[df.sensor_str == 'strain'].resample(FS_VAL).ffill()
    motion = df.value.loc[df.sensor_str == 'motion']
    df = pd.concat([pd.DataFrame({'sid': sensors[nid]['temp'], 'nid': nid, 'value': temp}),
                    pd.DataFrame({'sid': sensors[nid]['strain'], 'nid': nid, 'value': strain}),
                    pd.DataFrame({'sid': sensors[nid]['motion'], 'nid': nid, 'value': motion})])
    return df

%timeit load_b4()
print load_b4().tail(3)


10 loops, best of 3: 24.7 ms per loop
                               nid  sid  value
timestamp                                     
2016-03-28 14:59:29.072352000    1    8    0.0
2016-03-28 14:59:32.152777984    1    8    1.0
2016-03-28 14:59:52.983996160    1    8    0.0

B5 2016_w13-bed.csv

  • with header

In [7]:
def load_b5(fpath='../local/db/consolidation/2016_w13-bed.csv', nid=1):
    df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'], skiprows=1,
                     dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
    df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
    
    temp = df.value.loc[df.sensor_str == 'temp']
    strain = df.value.loc[df.sensor_str == 'strain'].resample(FS_VAL).ffill()
    motion = df.value.loc[df.sensor_str == 'motion']
    df = pd.concat([pd.DataFrame({'sid': sensors[nid]['temp'], 'nid': nid, 'value': temp}),
                    pd.DataFrame({'sid': sensors[nid]['strain'], 'nid': nid, 'value': strain}),
                    pd.DataFrame({'sid': sensors[nid]['motion'], 'nid': nid, 'value': motion})])
    return df

%timeit load_b5()
print load_b5().tail(3)


100 loops, best of 3: 18.4 ms per loop
                               nid  sid  value
timestamp                                     
2016-03-30 14:54:27.224745984    1    8    0.0
2016-03-30 16:40:03.013702912    1    8    0.0
2016-03-30 19:05:10.785732864    1    8    0.0

Gather chuck telemetry

C1 telemetry.db

  • slqite3 database
  • high sampling rate
  • light, temp, soil, later ds_temp
  • temp influenced by sun incidence
  • has sensor meta data

In [8]:
def load_c1(fpath='../local/db/consolidation/telemetry.db', nid=0):
    with sqlite3.connect(fpath) as con:
        df = pd.read_sql_query('SELECT timestamp, type, value FROM telemetry', con)
    df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
    df.columns = ['timestamp', 'sensor_str', 'value']
    
    temp = df.value.loc[df.sensor_str == sensors[nid]['temp']].resample(FS_VAL).ffill()
    ds_temp = df.value.loc[df.sensor_str == sensors[nid]['ds_temp']].resample(FS_VAL).ffill()
    light = df.value.loc[df.sensor_str == sensors[nid]['light']].resample(FS_VAL).ffill()
    soil = df.value.loc[df.sensor_str == sensors[nid]['soil']].resample('30min').ffill()

    df = pd.concat([pd.DataFrame({'sid': sensors[nid]['temp'], 'nid': nid, 'value': temp}),
                    pd.DataFrame({'sid': sensors[nid]['ds_temp'], 'nid': nid, 'value': ds_temp}),
                    pd.DataFrame({'sid': sensors[nid]['light'], 'nid': nid, 'value': light}),
                    pd.DataFrame({'sid': sensors[nid]['soil'], 'nid': nid, 'value': soil})])
    return df

%timeit load_c1()
print load_c1().tail(3)


1 loop, best of 3: 2.41 s per loop
                     nid  sid   value
timestamp                            
2016-03-28 00:00:00    0    3  440.90
2016-03-28 00:30:00    0    3  448.65
2016-03-28 01:00:00    0    3  439.90

C2 chuck_2016-03-28.csv

  • new sampling paradigm
  • ds_temp only, dropped temp for now
  • no header

In [9]:
def load_c2(fpath='../local/db/consolidation/chuck_2016-03-28.csv', nid=0):
    df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'],
                     dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
    df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
    
    ds_temp = df.value.loc[df.sensor_str == 'ds_temp']
    light = df.value.loc[df.sensor_str == 'light']
    soil = df.value.loc[df.sensor_str == 'soil']

    df = pd.concat([pd.DataFrame({'sid': sensors[nid]['ds_temp'], 'nid': nid, 'value': ds_temp}),
                    pd.DataFrame({'sid': sensors[nid]['light'], 'nid': nid, 'value': light}),
                    pd.DataFrame({'sid': sensors[nid]['soil'], 'nid': nid, 'value': soil})])
    return df

%timeit load_c2()
print load_c2().tail(3)


100 loops, best of 3: 4.75 ms per loop
                               nid  sid       value
timestamp                                          
2016-03-28 19:29:47.512306944    0    3  382.670013
2016-03-28 19:59:45.860630016    0    3  384.000000
2016-03-28 20:29:44.215078912    0    3  384.329987

C3 2016_w13-chuck.csv

  • with header

In [24]:
def load_c3(fpath='../local/db/consolidation/2016_w13-chuck.csv', nid=0):
    df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'], skiprows=1,
                     dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
    df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
    
    ds_temp = df.value.loc[df.sensor_str == 'ds_temp']
    light = df.value.loc[df.sensor_str == 'light']
    soil = df.value.loc[df.sensor_str == 'soil']

    df = pd.concat([pd.DataFrame({'sid': sensors[nid]['ds_temp'], 'nid': nid, 'value': ds_temp}),
                    pd.DataFrame({'sid': sensors[nid]['light'], 'nid': nid, 'value': light}),
                    pd.DataFrame({'sid': sensors[nid]['soil'], 'nid': nid, 'value': soil})])
    return df

%timeit load_c3()
print load_c3().tail(3)


100 loops, best of 3: 5.41 ms per loop
                               nid  sid       value
timestamp                                          
2016-03-30 23:45:13.574208000    0    3  139.330002
2016-03-31 00:15:11.962473216    0    3  134.000000
2016-03-31 00:45:10.349884928    0    3  130.330002

Joining everything up


In [142]:
df = pd.concat([load_b2(), load_b3(), load_b4(), load_b5(), load_c1(), load_c2(), load_c3()])
df.sid = df.sid.astype(DTYPE_CAT)
df.nid = df.nid.astype(DTYPE_CAT)
df.value = df.value.astype(DTYPE_VAL)
df.sort_index(inplace=True)
print df.memory_usage(index=True)
print df.dtypes


Index    1686944
nid       210868
sid       210868
value     843472
dtype: int64
nid        uint8
sid        uint8
value    float32
dtype: object

Store DataFrame to disk


In [143]:
!rm ../local/db/test.h5
def store_df(df, fpath='../local/db/telemetry.h5'):
    with pd.HDFStore(fpath, complevel=9, complib='blosc') as store:
        store.put('data', df, format='table', data_columns=['nid', 'sid'])
store_df(df)
print os.path.getsize('../local/db/telemetry.h5')/2**10, 'KiB'


1721 KiB

In [117]:
# ~ 200 ms
#%timeit pd.read_hdf('../local/db/consolidation/test.h5', 'data', where='sid==1')
# ~ 750 ms
#df = pd.read_hdf('../local/db/consolidation/test.h5', 'data', where='sid==[1, 2, 3]')
# ~ 20 ms
#df = pd.read_hdf('../local/db/consolidation/test.h5', 'data')
# ~ 5 ms
# df = df.loc[df.nid == 0]
# ~ 135 ms
# df = pd.read_hdf('../local/db/consolidation/test.h5', 'data', where='nid==1')

Updating from csv sources

  • rsync new data over
  • read in both as nodes into DF
  • check for last timestamp for node in HDF5 file
  • append delta to HDF5 file
  • check if the corresponding time frames are identical in csv/hdf

In [134]:
DTYPE_VAL = np.float32
DTYPE_CAT = np.uint8
nodes = {'chuck': 0, 'bed': 1}
sensors = {0: {'temp': 1, 'light': 2, 'soil':3, 'dummy': 4, 'ds_temp':5},
           1: {'strain': 6, 'temp': 7, 'motion': 8}}
# needed because of the naming overlap in the sensors. [b/c]x are placeholders for debugging.
cats = {0: ['c0', 'temp', 'light', 'soil', 'c4', 'ds_temp', 'c6',  'c7'],
        1: ['b0', 'b1', 'b2', 'b3', 'b4', 'b5', 'strain',  'temp',  'motion']}

def nid_from_filename(fname):
    return nodes[os.path.basename(fname).split('-')[-1].split('.')[0]]

def load_node_csv(fpath, nid=None):
    nid = nid if nid is not None else nid_from_filename(fpath)
    df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'], skiprows=1,
                     dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
    df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)

    sid = df.sensor_str.astype('category', categories=cats[nid_from_filename(fpath)]).cat.codes.astype(DTYPE_CAT)
    return pd.DataFrame({'sid': sid, 'nid': np.array(nid, dtype=np.uint8), 'value': df.value})

def load_hdf(fpath):
    return pd.read_hdf(fpath, 'data')

def last_samples_hdf(df):
    df.sort_index(inplace=True)
    assert df.index.is_monotonic
    g = df.groupby('nid')
    return {k: g.get_group(k).index[-1] for k in [0, 1]}

In [138]:
!rsync -avh VersedSquid:~/code/telemetry/local/db/2016_w13-chuck.csv ../local/db/
!rsync -avh RandyDolphin:~/code/telemetry/local/db/2016_w13-bed.csv ../local/db/
    
last = last_samples_hdf(load_hdf('../local/db/testing.h5'))
node_data = [load_node_csv('../local/db/2016_w13-chuck.csv'), load_node_csv('../local/db/2016_w13-bed.csv')]
df = pd.concat([node_data[n][node_data[n].index > last[n]] for n in range(len(node_data))]).sort_index()
print df.shape[0], "new rows to be appended!"
with pd.HDFStore('../local/db/testing.h5') as store:
    store.append('data', df, append=True)


receiving incremental file list
2016_w13-chuck.csv

sent 1.95K bytes  received 2.88K bytes  3.22K bytes/sec
total size is 223.39K  speedup is 46.26
receiving incremental file list
2016_w13-bed.csv

sent 5.37K bytes  received 7.22K bytes  25.17K bytes/sec
total size is 784.13K  speedup is 62.30