Anomaly detection detects data points in data that does not fit well with the rest of data. In this notebook we demonstrate how to do anomaly detection using Zouwu's built-in model MTNet
For demonstration, we use the publicly available cluster trace data cluster-trace-v2018 of Alibaba Open Cluster Trace Program. You can find the dataset introduction here. In particular, we use machine usage data to demonstrate anomaly detection, you can download the separate data file directly with machine_usage.
This section defines some helper functions to be used in the following procedures. You can refer to it later when they're used.
In [1]:
def get_drop_ts_and_len(df, target_col="cpu_usage", allow_missing_num=3):
"""
get start index which with consecutive missing num larger than allow_missing_num
"""
missing_num = df[target_col].isnull().astype(int).groupby(df[target_col].notnull().astype(int).cumsum()).sum()
missing_num = missing_num[missing_num > 0]
drop_ts = df.iloc[(missing_num.index + missing_num.cumsum() - missing_num).values].index
drop_ts = drop_ts[missing_num > allow_missing_num]
drop_missing_num = missing_num[missing_num > allow_missing_num]
drop_len = drop_missing_num.values
return drop_ts, drop_len
In [2]:
def rm_missing_day(start_dts, missing_lens, df):
"""
drop days with consecutive missing values.
If consecutive missing values across day, we remove the data for an extra hour.
"""
for start_time, l in zip(start_dts, missing_lens):
start = start_time.replace(hour=0, minute=0, second=0)
start_day_end = start + pd.Timedelta(hours=24)
start_day_end = start_day_end - pd.Timedelta(minutes=1)
end_time = start_time + l*pd.Timedelta(minutes=1)
if start_day_end < end_time:
end = end_time + pd.Timedelta(hours=1)
else:
end = start_day_end
df = df.drop(df[start:end].index)
return df
In [3]:
def train_val_test_split(df, val_ratio, test_ratio, look_back, horizon):
"""
Split the dataset into train, validation and test part.
"""
total_num = df.index.size
test_num = int(total_num * test_ratio)
val_num = int(total_num * val_ratio)
test_split_index = test_num + look_back + horizon - 1
val_split_index = test_split_index + val_num
train_df = df.iloc[:-(test_num + val_num)]
val_df = df.iloc[-val_split_index: -test_num]
test_df = df.iloc[-test_split_index: ]
return train_df, val_df, test_df
In [4]:
def roll_data(dataset, look_back, target_col_index):
"""
Generate input samples from rolling
"""
X, Y = [], []
for i in range(len(dataset) - look_back):
X.append(dataset[i: (i + look_back)])
Y.append(dataset[i + look_back, [target_col_index]])
return np.array(X), np.array(Y)
In [5]:
def unscale(scaler, y, target_col_indexes):
"""
data needs to be normalized (scaled) before feeding into models.
This is to inverse the effect of normalization to get reasonable forecast results.
"""
dummy_feature_shape = scaler.scale_.shape[0]
y_dummy = np.zeros((y.shape[0], dummy_feature_shape))
y_dummy[:, target_col_indexes] = y
y_unscale = scaler.inverse_transform(y_dummy)[:,target_col_indexes]
return y_unscale
In [6]:
EPSILON = 1e-10
def sMAPE(y_true, y_pred):
"""
Symmetric Mean Average Percentage Error
"""
output_errors = np.mean(100 * np.abs(y_true - y_pred)/(np.abs(y_true) + np.abs(y_pred) + EPSILON), axis=0,)
return np.mean(output_errors)
In [7]:
def get_result_df(df, y_pred_unscale, ano_index, look_back,target_col='cpu_usage'):
"""
Add prediction and anomaly value to dataframe.
"""
result_df = pd.DataFrame({"y_true":df[look_back:][target_col], "y_pred": y_pred_unscale.squeeze()})
result_df['anomalies'] = 0
result_df.loc[result_df.index[ano_index], 'anomalies'] = 1
result_df['anomalies'] = result_df['anomalies'] > 0
return result_df
In [8]:
def plot_anomalies_value(date, y_true, y_pred, anomalies):
"""
plot the anomalies value
"""
fig, axs = plt.subplots(figsize=(16,6))
axs.plot(date, y_true,color='blue', label='y_true')
axs.plot(date, y_pred,color='orange', label='y_pred')
axs.scatter(date[anomalies].tolist(), y_true[anomalies], color='red', label='anomalies value')
axs.set_title('the anomalies value')
plt.xlabel('datetime')
plt.legend(loc='upper left')
plt.show()
Now we download the dataset and load it into a pandas dataframe.Steps are as below:
get_data.sh
to download the raw data.It will download the resource usage of each machine from m_1932 to m_2085. grep m_1932 machine_usage.csv > m_1932.csv
to extract records of machine 1932. Or run extract_data.sh
.We use machine 1932 as an example in this notebook.You can choose any machines in the similar way.m_1932.csv
into a dataframe as shown below.
In [9]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
In [10]:
df_1932 = pd.read_csv("m_1932.csv", header=None, usecols=[1,2,3], names=["time_step", "cpu_usage","mem_usage"])
Below are some example records of the data
In [11]:
df_1932.head()
Out[11]:
In [12]:
df_1932.sort_values(by="time_step").plot(y="cpu_usage", x="time_step", figsize=(16,6),title="cpu_usage of machine 1932")
Out[12]:
Now we need to do data cleaning and preprocessing on the raw data. Note that this part could vary for different dataset.
For the machine_usage data, the pre-processing contains 2 parts:
cpu_usage
within the minutes.Change the time unit to minute.
In [13]:
df_1932["ts_min"] = df_1932["time_step"]//60
Calculate the average of cpu_usage in the same time and draw the line chart.
In [14]:
resampled_df = df_1932.groupby("ts_min").agg({"cpu_usage":"mean"})
In [15]:
resampled_df.plot(figsize=(16,6),title="mean cpu_usage of machine 1932")
Out[15]:
We need datetime information to generate related features,so re-adjust time series.
In [16]:
resampled_df.index = pd.to_datetime(resampled_df.index, unit='m', origin=pd.Timestamp('2018-01-01'))
Ensure the time interval of dataframe is the same.
In [17]:
full_idx = pd.date_range(start=resampled_df.index.min(), end=resampled_df.index.max(),freq='T')
resampled_df = resampled_df.reindex(full_idx)
In [18]:
resampled_df.head()
Out[18]:
Here, we drop day with more than 3 consecutive missing values and fill other missing values remained.
In [19]:
drop_ts,drop_len = get_drop_ts_and_len(resampled_df,allow_missing_num=3)
df = rm_missing_day(drop_ts, drop_len,resampled_df)
In [20]:
df.head()
Out[20]:
In [21]:
print("no. of n/a values:")
print(df.isna().sum())
In [22]:
df.ffill(inplace=True)
In [23]:
df.plot(figsize=(16,6))
Out[23]:
For feature engineering, we use hour as features.
In [24]:
df["hour"] = df.index.hour
In [25]:
df.head()
Out[25]:
Now we split the dataset into train, validation and test.
In [26]:
val_ratio = 0.1
test_ratio = 0.1
# we look back one hour data which is of the frequency of 1min.
look_back = 60
horizon = 1
train_df, val_df, test_df = train_val_test_split(df, val_ratio, test_ratio, look_back, horizon)
In [27]:
train_df.index.size, val_df.index.size, test_df.index.size
Out[27]:
Then standardize train, test data and features
In [28]:
from sklearn.preprocessing import StandardScaler
standard_scaler = StandardScaler()
scaled_train = standard_scaler.fit_transform(train_df)
scaled_val = standard_scaler.transform(val_df)
scaled_test = standard_scaler.transform(test_df)
Last, we generate model input samples by sliding window along time axis.
In [29]:
import numpy as np
x_train, y_train = roll_data(scaled_train, look_back, target_col_index=0)
x_val, y_val = roll_data(scaled_val, look_back, target_col_index=0)
x_test, y_test = roll_data(scaled_test, look_back, target_col_index=0)
In [30]:
x_test.shape, y_test.shape
Out[30]:
In [33]:
from zoo.zouwu.model.forecast import MTNetForecaster
First, we initialize a mtnet_forecaster according to input data shape. Specifcally, look_back should equal (long_series_num+1)*series_length
. Details refer to zouwu docs here.
In [35]:
mtnet_forecaster = MTNetForecaster(target_dim=horizon,
feature_dim=x_train.shape[-1],
long_series_num=3,
series_length=15
)
MTNet needs to preprocess the X into another format, so we call MTNetForecaster.preprocess_input
on train_x and test_x.
In [36]:
# mtnet requires reshape of input x before feeding into model.
x_train_mtnet = mtnet_forecaster.preprocess_input(x_train)
x_val_mtnet = mtnet_forecaster.preprocess_input(x_val)
x_test_mtnet = mtnet_forecaster.preprocess_input(x_test)
Now we train the model and wait till it finished.
In [37]:
%%time
hist = mtnet_forecaster.fit(x = x_train_mtnet, y = y_train, batch_size=128, epochs=20)
Use the model for prediction and inverse the scaling of the prediction results.
In [38]:
y_pred_val = mtnet_forecaster.predict(x_val_mtnet)
y_pred_test = mtnet_forecaster.predict(x_test_mtnet)
In [39]:
y_pred_val_unscale = unscale(standard_scaler, y_pred_val, [0])
y_pred_test_unscale = unscale(standard_scaler, y_pred_test, [0])
y_val_unscale = unscale(standard_scaler,y_val,[0])
y_test_unscale = unscale(standard_scaler,y_test,[0])
Calculate the symetric mean absolute percentage error.
In [40]:
# evaluate with sMAPE
print("sMAPE is",sMAPE(y_pred_test_unscale, y_test_unscale))
In [41]:
from zoo.zouwu.model.anomaly import ThresholdDetector, ThresholdEstimator
ratio=0.01
threshold = ThresholdEstimator().fit(y=y_val_unscale,yhat=y_pred_val_unscale,ratio=ratio)
print("The threshold of validation dataset is:",threshold)
In [42]:
val_res_ano_idx = ThresholdDetector().detect(y=y_val_unscale,yhat=y_pred_val_unscale,threshold=threshold)
test_res_ano_idx = ThresholdDetector().detect(y=y_test_unscale,yhat=y_pred_test_unscale,threshold=threshold)
print("The index of anomalies in validation dataset is:",val_res_ano_idx)
print("The index of anomalies in test dataset is:",test_res_ano_idx)
Get a new dataframe which contains y_true
,y_pred
,anomalies
value.
In [43]:
val_result_df = get_result_df(val_df, y_pred_val_unscale, val_res_ano_idx, look_back)
test_result_df = get_result_df(test_df, y_pred_test_unscale, test_res_ano_idx, look_back)
Draw anomalies in line chart.
In [44]:
plot_anomalies_value(val_result_df.index, val_result_df.y_true, val_result_df.y_pred, val_result_df.anomalies)
In [45]:
plot_anomalies_value(test_result_df.index, test_result_df.y_true, test_result_df.y_pred, test_result_df.anomalies)
In [ ]:
In [ ]: