Big task: Move all the development data sources that have been tried so far to AngrySnail aka BrashTapir, stack 'em, format them, resample and reindex and stuff it all into a couple of HDF5 files for archiving. Then clean those up with pck tool from pytables. Plot, and shove png to BuddingSepiida.
So far only two nodes. Bed, and chuck.
Recorded initially on ToastyTamandua, now on RandyDolphin
Recorded in VersedSquid. Has some sensor ids in the sqlite database that should be taken into consideration when building codes for categorical columns when converting sensor string name into an integer id.
Initial data needs to be resampled to something reasonable. Originally data was sampled in 1 s intervals. Downsampling to 1 min should be good and in line with how new sampling works.
Simple three column structure
timestamp (unix[s], datetime64[ns]) | sensor_id (int) | value (float)
In [1]:
import os
import numpy as np
import pandas as pd
import sqlite3
import time
from datetime import datetime as dt
import matplotlib.pyplot as plt
import matplotlib.lines as mpl_lines
%matplotlib inline
In [96]:
DTYPE_VAL = np.float32
DTYPE_CAT = np.uint8
FS_VAL = '1min'
nodes = {'chuck': 0, 'bed': 1}
sensors = {0: {'temp': 1, 'light': 2, 'soil':3, 'dummy': 4, 'ds_temp':5},
1: {'strain': 6, 'temp': 7, 'motion': 8}}
# needed because of the naming overlap in the sensors. [b/c]x are placeholders for debugging.
cats = {0: ['c0', 'temp', 'light', 'soil', 'c4', 'ds_temp', 'c6', 'c7'],
1: ['b0', 'b1', 'b2', 'b3', 'b4', 'b5', 'strain', 'temp', 'motion']}
In [3]:
def load_b1(fpath='../local/db/consolidation/markI.csv', nid=1):
df = pd.read_csv(fpath, names=['timestamp', 'wmin', 'wmax'],
dtype={'timestamp': np.float64, 'wmin': DTYPE_VAL, 'wmax': DTYPE_VAL})
df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
df = pd.DataFrame({'sid': sensors[nid]['strain'], 'nid': 1,
'value': df[["wmin", "wmax"]].mean(axis=1).resample(FS_VAL).ffill()})
return df
%timeit load_b1()
print load_b1().tail(3)
In [4]:
def load_b2(fpath='../local/db/consolidation/markII.csv', nid=1):
df = pd.read_csv(fpath, names=['timestamp', 'wmin', 'wmax'],
dtype={'timestamp': np.float64, 'wmin': DTYPE_VAL, 'wmax': DTYPE_VAL})
df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
df = pd.DataFrame({'sid': sensors[nid]['strain'],
'nid': nid,
'value': df[["wmin", "wmax"]].mean(axis=1).resample(FS_VAL).ffill()})
return df
%timeit load_b2()
print load_b2().tail(3)
In [5]:
def load_b3(fpath='../local/db/consolidation/bed.csv', nid=1):
df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'],
dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
temp = df.value.loc[df.sensor_str == 'temp'].resample(FS_VAL).ffill()
strain = pd.concat([df.value.loc[df.sensor_str == 'strain_hi'],
df.value.loc[df.sensor_str == 'strain_lo']],
axis=1).mean(axis=1).resample(FS_VAL).ffill()
df = pd.concat([pd.DataFrame({'sid': sensors[nid]['temp'],
'nid': nid,
'value': temp}),
pd.DataFrame({'sid': sensors[nid]['strain'],
'nid': nid,
'value': strain})])
return df
%timeit load_b3()
print load_b3().tail(3)
In [6]:
def load_b4(fpath='../local/db/consolidation/bed_2016-03-26.csv', nid=1):
df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'],
dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
temp = df.value.loc[df.sensor_str == 'temp'].resample(FS_VAL).ffill()
strain = df.value.loc[df.sensor_str == 'strain'].resample(FS_VAL).ffill()
motion = df.value.loc[df.sensor_str == 'motion']
df = pd.concat([pd.DataFrame({'sid': sensors[nid]['temp'], 'nid': nid, 'value': temp}),
pd.DataFrame({'sid': sensors[nid]['strain'], 'nid': nid, 'value': strain}),
pd.DataFrame({'sid': sensors[nid]['motion'], 'nid': nid, 'value': motion})])
return df
%timeit load_b4()
print load_b4().tail(3)
In [7]:
def load_b5(fpath='../local/db/consolidation/2016_w13-bed.csv', nid=1):
df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'], skiprows=1,
dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
temp = df.value.loc[df.sensor_str == 'temp']
strain = df.value.loc[df.sensor_str == 'strain'].resample(FS_VAL).ffill()
motion = df.value.loc[df.sensor_str == 'motion']
df = pd.concat([pd.DataFrame({'sid': sensors[nid]['temp'], 'nid': nid, 'value': temp}),
pd.DataFrame({'sid': sensors[nid]['strain'], 'nid': nid, 'value': strain}),
pd.DataFrame({'sid': sensors[nid]['motion'], 'nid': nid, 'value': motion})])
return df
%timeit load_b5()
print load_b5().tail(3)
In [8]:
def load_c1(fpath='../local/db/consolidation/telemetry.db', nid=0):
with sqlite3.connect(fpath) as con:
df = pd.read_sql_query('SELECT timestamp, type, value FROM telemetry', con)
df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
df.columns = ['timestamp', 'sensor_str', 'value']
temp = df.value.loc[df.sensor_str == sensors[nid]['temp']].resample(FS_VAL).ffill()
ds_temp = df.value.loc[df.sensor_str == sensors[nid]['ds_temp']].resample(FS_VAL).ffill()
light = df.value.loc[df.sensor_str == sensors[nid]['light']].resample(FS_VAL).ffill()
soil = df.value.loc[df.sensor_str == sensors[nid]['soil']].resample('30min').ffill()
df = pd.concat([pd.DataFrame({'sid': sensors[nid]['temp'], 'nid': nid, 'value': temp}),
pd.DataFrame({'sid': sensors[nid]['ds_temp'], 'nid': nid, 'value': ds_temp}),
pd.DataFrame({'sid': sensors[nid]['light'], 'nid': nid, 'value': light}),
pd.DataFrame({'sid': sensors[nid]['soil'], 'nid': nid, 'value': soil})])
return df
%timeit load_c1()
print load_c1().tail(3)
In [9]:
def load_c2(fpath='../local/db/consolidation/chuck_2016-03-28.csv', nid=0):
df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'],
dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
ds_temp = df.value.loc[df.sensor_str == 'ds_temp']
light = df.value.loc[df.sensor_str == 'light']
soil = df.value.loc[df.sensor_str == 'soil']
df = pd.concat([pd.DataFrame({'sid': sensors[nid]['ds_temp'], 'nid': nid, 'value': ds_temp}),
pd.DataFrame({'sid': sensors[nid]['light'], 'nid': nid, 'value': light}),
pd.DataFrame({'sid': sensors[nid]['soil'], 'nid': nid, 'value': soil})])
return df
%timeit load_c2()
print load_c2().tail(3)
In [24]:
def load_c3(fpath='../local/db/consolidation/2016_w13-chuck.csv', nid=0):
df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'], skiprows=1,
dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
ds_temp = df.value.loc[df.sensor_str == 'ds_temp']
light = df.value.loc[df.sensor_str == 'light']
soil = df.value.loc[df.sensor_str == 'soil']
df = pd.concat([pd.DataFrame({'sid': sensors[nid]['ds_temp'], 'nid': nid, 'value': ds_temp}),
pd.DataFrame({'sid': sensors[nid]['light'], 'nid': nid, 'value': light}),
pd.DataFrame({'sid': sensors[nid]['soil'], 'nid': nid, 'value': soil})])
return df
%timeit load_c3()
print load_c3().tail(3)
In [142]:
df = pd.concat([load_b2(), load_b3(), load_b4(), load_b5(), load_c1(), load_c2(), load_c3()])
df.sid = df.sid.astype(DTYPE_CAT)
df.nid = df.nid.astype(DTYPE_CAT)
df.value = df.value.astype(DTYPE_VAL)
df.sort_index(inplace=True)
print df.memory_usage(index=True)
print df.dtypes
In [143]:
!rm ../local/db/test.h5
def store_df(df, fpath='../local/db/telemetry.h5'):
with pd.HDFStore(fpath, complevel=9, complib='blosc') as store:
store.put('data', df, format='table', data_columns=['nid', 'sid'])
store_df(df)
print os.path.getsize('../local/db/telemetry.h5')/2**10, 'KiB'
In [117]:
# ~ 200 ms
#%timeit pd.read_hdf('../local/db/consolidation/test.h5', 'data', where='sid==1')
# ~ 750 ms
#df = pd.read_hdf('../local/db/consolidation/test.h5', 'data', where='sid==[1, 2, 3]')
# ~ 20 ms
#df = pd.read_hdf('../local/db/consolidation/test.h5', 'data')
# ~ 5 ms
# df = df.loc[df.nid == 0]
# ~ 135 ms
# df = pd.read_hdf('../local/db/consolidation/test.h5', 'data', where='nid==1')
In [134]:
DTYPE_VAL = np.float32
DTYPE_CAT = np.uint8
nodes = {'chuck': 0, 'bed': 1}
sensors = {0: {'temp': 1, 'light': 2, 'soil':3, 'dummy': 4, 'ds_temp':5},
1: {'strain': 6, 'temp': 7, 'motion': 8}}
# needed because of the naming overlap in the sensors. [b/c]x are placeholders for debugging.
cats = {0: ['c0', 'temp', 'light', 'soil', 'c4', 'ds_temp', 'c6', 'c7'],
1: ['b0', 'b1', 'b2', 'b3', 'b4', 'b5', 'strain', 'temp', 'motion']}
def nid_from_filename(fname):
return nodes[os.path.basename(fname).split('-')[-1].split('.')[0]]
def load_node_csv(fpath, nid=None):
nid = nid if nid is not None else nid_from_filename(fpath)
df = pd.read_csv(fpath, names=['timestamp', 'sensor_str', 'value'], skiprows=1,
dtype={'timestamp': np.float64, 'sensor_str': str, 'value': DTYPE_VAL})
df.set_index(df.timestamp.multiply(1e9).astype('datetime64[ns]'), inplace=True)
sid = df.sensor_str.astype('category', categories=cats[nid_from_filename(fpath)]).cat.codes.astype(DTYPE_CAT)
return pd.DataFrame({'sid': sid, 'nid': np.array(nid, dtype=np.uint8), 'value': df.value})
def load_hdf(fpath):
return pd.read_hdf(fpath, 'data')
def last_samples_hdf(df):
df.sort_index(inplace=True)
assert df.index.is_monotonic
g = df.groupby('nid')
return {k: g.get_group(k).index[-1] for k in [0, 1]}
In [138]:
!rsync -avh VersedSquid:~/code/telemetry/local/db/2016_w13-chuck.csv ../local/db/
!rsync -avh RandyDolphin:~/code/telemetry/local/db/2016_w13-bed.csv ../local/db/
last = last_samples_hdf(load_hdf('../local/db/testing.h5'))
node_data = [load_node_csv('../local/db/2016_w13-chuck.csv'), load_node_csv('../local/db/2016_w13-bed.csv')]
df = pd.concat([node_data[n][node_data[n].index > last[n]] for n in range(len(node_data))]).sort_index()
print df.shape[0], "new rows to be appended!"
with pd.HDFStore('../local/db/testing.h5') as store:
store.append('data', df, append=True)