In [1]:
from matplotlib import pyplot as plt
from collections import defaultdict
import matplotlib
import numpy as np
import pandas as pd
import peakutils
import sys
In [2]:
# local
sys.path.insert(0, '../')
from pywim.utils.dsp.synthetic_data.sensor_data import gen_truck_raw_data
from pywim.utils.dsp.baseline import cbf
In [3]:
%matplotlib inline
In [4]:
data = gen_truck_raw_data(
sample_rate=5000, speed=15.0, vehicle_layout='-o-o---o---o-',
sensors_distance=[2],
p_signal_noise=10
)
In [5]:
ax = plt.figure().gca()
data.plot(ax=ax)
plt.show()
In [6]:
def select_curve_by_threshold(
signal_data: np.array,
threshold: float, Δx: int
):
"""
"""
indexes = peakutils.indexes(signal_data, thres=0.5, min_dist=30)
curves = []
for ind_axle in indexes:
i_start = ind_axle - Δx
i_end = ind_axle + Δx
p_start = Δx
while i_start >= p_start:
i_start -= 1
if signal_data.iloc[i_start] <= threshold:
break
p_end = signal_data.size - Δx
while i_end <= p_end:
i_end += 1
if signal_data.iloc[i_end] <= threshold:
break
curves.append(signal_data.iloc[i_start-Δx:i_end+Δx])
return curves
In [7]:
threshold = 2
curves = defaultdict(list)
data_cbf = data.copy()
for k in data.keys():
data_cbf[k] = cbf(data[k].values)
curves[k] = select_curve_by_threshold(
signal_data=data_cbf[k], threshold=threshold, Δx=20
)
for k in curves.keys():
plt.figure()
for i_curve, v_curve in enumerate(curves[k]):
plt.plot(v_curve, label=i_curve)
plt.title('%s' % k)
plt.show()