The robust edge detection algorithm uses a derivative of Gaussian function to convolve with the original time series to find edge signal in the precense of noise. The illustration of the algorithm is shown as follows:
Notes on streaming adaptation of the edge detection algorithm:
Heuristics are used to improve edge detection results on square waves:
In [4]:
# To support both python 2 and python 3
from __future__ import division, print_function, unicode_literals
import json
from pandas import Series
from pandas import Timedelta
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import scipy
from scipy import signal
from scipy import stats
%matplotlib inline
In [97]:
def streaming_edge_detection(x, sigma=2.0, threshold=3.0, visualize=False, additional_title=""):
'''
streaming edge detection algorithm on signal x
@param x: original time series
@param sigma: sigma of Guassian smooth filter, default=2.0
@param threshold: threshold for edge detection. dx > ma * thresohld, default=3.0
@return list of edge tuples [(sign, position)], e.g. [("+", 20), ("-", 30)]
'''
alpha = 0.05
width = int(3*sigma)
edges = []
min_dist = 20 # min distance of two edges of the same sign
gaussian = signal.gaussian(width*2+1, sigma)
d1_gaussian = signal.convolve(gaussian, [-1, 0 , 1], mode='same')
ma = 0
edge_pos = []
edge_val = []
last_positive = -1
last_negative = -1
last_sign = 0
derivative = np.zeros(len(x))
dx_buffer = np.zeros(3)
low_buffer = np.zeros(len(x))
high_buffer = np.zeros(len(x))
s = 0
# convolve signal x with d1_guassian window
for n in range(width, len(x)-width):
dx = 0
if sigma > 0:
# convolve sum first half
for k in range(1, width+1):
dx += d1_gaussian[width-k] * x[n-k] if n >= k else 0
# middle point
dx += x[n] * d1_gaussian[width]
# convolve sum second half
for k in range(1, width+1):
dx += d1_gaussian[width+k] * x[n+k] if n+k < len(x) else 0
else:
# no smoothing at all
dx = x[n] - x[n-1] if n > 0 else 0
derivative[n] = dx
# buffer shift
dx_buffer[0] = dx_buffer[1]
dx_buffer[1] = dx_buffer[2]
dx_buffer[2] = dx
low_quantile = 0 if n==0 else np.percentile(np.abs(derivative[:n]), 50)
high_quantile = 0 if n==0 else np.percentile(np.abs(derivative[:n]), 95)
low_buffer[n] = low_quantile
high_buffer[n] = high_quantile
ratio = 0 if low_quantile == 0 else high_quantile / low_quantile
#s += abs(dx)
#ma = s / (n-width+1)
#ma_buffer[n] = ma
if dx_buffer[1] > low_quantile * threshold:
# find local max
if dx_buffer[1] >= dx_buffer[2] and dx_buffer[1] >= dx_buffer[0]:
if n-1 - last_positive > min_dist and last_sign <= 0:
edges.append(("+", n-1))
edge_pos.append(n-1)
edge_val.append(dx_buffer[1])
last_positive = n-1
last_sign = +1
elif dx_buffer[1] < -low_quantile * threshold:
# find local min
if dx_buffer[1] <= dx_buffer[2] and dx_buffer[1] <= dx_buffer[0]:
if n-1 - last_negative > min_dist and last_sign >= 0:
edges.append(("-", n-1))
edge_pos.append(n-1)
edge_val.append(dx_buffer[1])
last_negative = n-1
last_sign = -1
if visualize:
num_figure = 3
plt.style.use('seaborn-paper')
fig = plt.figure(figsize=(18,15))
ax1 = fig.add_subplot(num_figure,1,1)
ax1.plot(x)
ax1.set_title("Original signal: {}".format(additional_title))
ax2 = fig.add_subplot(num_figure,1,2)
ax2.plot(derivative)
ax2.plot(np.zeros(len(x)))
ax2.plot(edge_pos, edge_val,"o", label="detected edge positions")
#ax2.legend(loc="lower left")
ax2.set_title("Gaussian derivative signal and edge detector results")
ax3 = fig.add_subplot(num_figure,1,3)
ax3.plot(low_buffer, label="50th percentile")
ax3.plot(high_buffer, label="95th percentile")
ax3.legend(loc="lower right", fontsize=16)
ax3.set_title("quantile analysis")
return edges, ratio
Given a list of ground truth edge positions and a list of detected edge results. We define:
We report both precision = TP/(TP + FP) and recall = TP/(TP + FN) rate.
In [82]:
def edge_evaluate(gt_edges, dt_edges):
delta = 1 # tolerant +-1 edge position error
tp = 0 # matching edges
fp = 0 # non-existing edges
fn = 0 # missing ground truth edges
gt_num = len(gt_edges)
dt_num = len(dt_edges)
#print("Expected edges={}, detected edges={}".format(gt_num, dt_num))
#for n in range(max(gt_num, dt_num)):
# print("{}, {}".format(gt_edges[n] if n < gt_num else " ", dt_edges[n] if n < dt_num else ""))
j = 0
i = 0
while i < len(gt_edges) and j < len(dt_edges):
gt_pos = gt_edges[i][1]
dt_pos = dt_edges[j][1]
gt_sign = gt_edges[i][0]
dt_sign = dt_edges[j][0]
if abs(gt_pos - dt_pos) <= delta:
if gt_sign == dt_sign:
tp += 1
else:
fn += 1
i += 1
j += 1
elif gt_pos > dt_pos + delta:
j += 1 # advance dt
fp += 1
elif gt_pos < dt_pos - delta:
i += 1 # advance gt
fn += 1
fn += len(gt_edges) - i
fp += len(dt_edges) - j
if tp == 0:
return 0,0
precision = tp / (tp + fp)
recall = tp / (tp + fn)
return precision, recall
The synthetic square wave with amplitude=1000 is used in the experiment. I gradually add white noise of ~ noise_amp * N(0,1) to the square wave signal, noise_amp range from [0, 500]
Algorithm parameter: Gaussian sigma=2.0, threshold = 3.0.
At each noise level, the experiment is repeated 10 times and average precision and recall rate are reported to generate the precision/recall curves at different noise level.
In [85]:
t = np.arange(0,1000)
linear = 5 * t
square = 500 * (signal.square(2 * np.pi / 50 * (t + 25), 0.5))
x_clean = square #+ linear
# edge detector parameter
threshold = 5.0 # edge if dx > threshold * ma
sigma = 5.0 # gaussian smooth sigma
nround = 10
precisions = []
recalls = []
noise_amps = np.arange(0,501,5)
for noise_amp in noise_amps:
print("noise_amp={}".format(noise_amp))
mean_precision = 0
mean_recall = 0
for n in range(nround):
noise = noise_amp * np.random.normal(0, 1.0, len(t))
x_noisy = x_clean + noise
gt_edges = streaming_edge_detection(x_clean, 0, threshold, False)
dt_edges = streaming_edge_detection(x_noisy, sigma, threshold, False)
precision, recall = edge_evaluate(gt_edges, dt_edges)
mean_precision += precision
mean_recall += recall
mean_precision /= nround
mean_recall /= nround
precisions.append(mean_precision)
recalls.append(mean_recall)
#print("NoiseAmp={},Precision={}, recall={}".format(noise_amp, precision, recall))
plt.style.use('seaborn-paper')
fig = plt.figure(figsize=(10,6))
plt.plot(noise_amps, precisions, linewidth=3.0, label='precision')
plt.plot(noise_amps, recalls, linewidth=3.0, label='recall')
plt.legend(loc='lower left', fontsize=16)
plt.xlabel("noise amplitude", fontsize=16)
plt.title("Precision/Recall performance of edge detection under increasing level of noise", fontsize=16)
Out[85]:
The edge detection has close to perfect performance with noise <=200. After 200 noise amp, the recall degrades much quicker than precision, which means the edge detector finds lots more false negative (missing true edges) than false positive (wrong edges due to noise)
Compared with the similar algorithm with no smoothing before first order derivative, you can easily see the effectiveness of Gaussian smoothing.
In [104]:
sigma = 2.0
threshold = 4.0
gt_edges = streaming_edge_detection(x_clean, sigma, threshold, True, "no noise")
noise = 200 * np.random.normal(0, 1.0, len(t))
x_noisy = x_clean + noise
dt_edges = streaming_edge_detection(x_noisy, sigma, threshold, True, "noise amp=200")
noise = 500 * np.random.normal(0, 1.0, len(t))
x_noisy = x_clean + noise
dt_edges = streaming_edge_detection(x_noisy, sigma, threshold, True, "noise amp=500")
In [98]:
ratios = []
for i in range(100):
noise = 100 * np.random.normal(0, 1.0, len(t))
dt_edges,ratio = streaming_edge_detection(noise, 10.0, threshold, False, "pure noise")
ratios.append(ratio)
print("mean_ratio={}".format(np.mean(ratios)))
print("max_ratio={}".format(np.max(ratios)))
print("min_ratio={}".format(np.min(ratios)))
print("80th_percentile_ratio={}".format(np.percentile(ratios, 80)))
print("20th_percentile_ratio={}".format(np.percentile(ratios, 20)))
There are five daily high workloads in this time series, but the data is quite noisy.
It is challenging for edge detection algorithm. Finally get it right after lots of tuning. I did a heavy smoothing sigma=5.0 and strong threshold = 20
I don’t think there is a general adaptive way. It depends on the level of noise and the significance of the edge. And streaming make this problem even harder.
In [99]:
table = pd.read_csv("data/noisy_demand.csv", header=0, index_col=0, parse_dates=True)
ts = Series(table["Value"])
real_noisy = ts.values
dt_edges = streaming_edge_detection(real_noisy, 5.0, 4.0, True, "Noisy vrops demand bug_1955035")