AIOps anomaly detection

Anomaly detection detects data points in data that does not fit well with the rest of data. In this notebook we demonstrate how to do anomaly detection using Zouwu's built-in model MTNet

For demonstration, we use the publicly available cluster trace data cluster-trace-v2018 of Alibaba Open Cluster Trace Program. You can find the dataset introduction here. In particular, we use machine usage data to demonstrate anomaly detection, you can download the separate data file directly with machine_usage.

Helper functions

This section defines some helper functions to be used in the following procedures. You can refer to it later when they're used.

def get_drop_ts_and_len(df, target_col="cpu_usage", allow_missing_num=3):
    get start index which with consecutive missing num larger than allow_missing_num
    missing_num = df[target_col].isnull().astype(int).groupby(df[target_col].notnull().astype(int).cumsum()).sum()
    missing_num = missing_num[missing_num > 0]
    drop_ts = df.iloc[(missing_num.index + missing_num.cumsum() - missing_num).values].index
    drop_ts = drop_ts[missing_num > allow_missing_num]
    drop_missing_num = missing_num[missing_num > allow_missing_num]
    drop_len = drop_missing_num.values
    return drop_ts, drop_len

def rm_missing_day(start_dts, missing_lens, df):
    drop days with consecutive missing values.
    If consecutive missing values across day, we remove the data for an extra hour.
    for start_time, l in zip(start_dts, missing_lens):
        start = start_time.replace(hour=0, minute=0, second=0)
        start_day_end = start + pd.Timedelta(hours=24)
        start_day_end = start_day_end - pd.Timedelta(minutes=1)
        end_time = start_time + l*pd.Timedelta(minutes=1)
        if start_day_end < end_time:
            end = end_time + pd.Timedelta(hours=1)
            end = start_day_end
        df = df.drop(df[start:end].index)
    return df

def train_val_test_split(df, val_ratio, test_ratio, look_back, horizon):
    Split the dataset into train, validation and test part.
    total_num = df.index.size
    test_num = int(total_num * test_ratio)
    val_num = int(total_num * val_ratio)
    test_split_index = test_num + look_back + horizon - 1
    val_split_index = test_split_index + val_num 
    train_df = df.iloc[:-(test_num + val_num)]  
    val_df = df.iloc[-val_split_index: -test_num]
    test_df = df.iloc[-test_split_index: ]
    return train_df, val_df, test_df

def roll_data(dataset, look_back, target_col_index):
    Generate input samples from rolling
    X, Y = [], []
    for i in range(len(dataset) - look_back):
        X.append(dataset[i: (i + look_back)])
        Y.append(dataset[i + look_back, [target_col_index]])
    return np.array(X), np.array(Y)

def unscale(scaler, y, target_col_indexes):
    data needs to be normalized (scaled) before feeding into models. 
    This is to inverse the effect of normalization to get reasonable forecast results.
    dummy_feature_shape = scaler.scale_.shape[0]
    y_dummy = np.zeros((y.shape[0], dummy_feature_shape))
    y_dummy[:, target_col_indexes] = y
    y_unscale = scaler.inverse_transform(y_dummy)[:,target_col_indexes]
    return y_unscale

EPSILON = 1e-10
def sMAPE(y_true, y_pred):
    Symmetric Mean Average Percentage Error
    output_errors = np.mean(100 * np.abs(y_true - y_pred)/(np.abs(y_true) + np.abs(y_pred) + EPSILON), axis=0,)
    return np.mean(output_errors)

def get_result_df(df, y_pred_unscale, ano_index, look_back,target_col='cpu_usage'):
    Add prediction and anomaly value to dataframe.
    result_df = pd.DataFrame({"y_true":df[look_back:][target_col], "y_pred": y_pred_unscale.squeeze()})
    result_df['anomalies'] = 0
    result_df.loc[result_df.index[ano_index], 'anomalies'] = 1
    result_df['anomalies'] = result_df['anomalies'] > 0
    return result_df

def plot_anomalies_value(date, y_true, y_pred, anomalies):
    plot the anomalies value
    fig, axs = plt.subplots(figsize=(16,6))
    axs.plot(date, y_true,color='blue', label='y_true')
    axs.plot(date, y_pred,color='orange', label='y_pred')
    axs.scatter(date[anomalies].tolist(), y_true[anomalies], color='red', label='anomalies value')
    axs.set_title('the anomalies value')
    plt.legend(loc='upper left')

Download raw dataset and load into dataframe

Now we download the dataset and load it into a pandas dataframe.Steps are as below:

  • First, download the raw data machine_usage. Or run the script to download the raw data.It will download the resource usage of each machine from m_1932 to m_2085.
  • Second, run grep m_1932 machine_usage.csv > m_1932.csv to extract records of machine 1932. Or run use machine 1932 as an example in this notebook.You can choose any machines in the similar way.
  • Finally, use pandas to load m_1932.csv into a dataframe as shown below.

import os 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

df_1932 = pd.read_csv("m_1932.csv", header=None, usecols=[1,2,3], names=["time_step", "cpu_usage","mem_usage"])

Below are some example records of the data

time_step cpu_usage mem_usage
0 386640 41 92
1 386670 43 92
2 386690 44 92
3 386800 46 92
4 386930 44 93

df_1932.sort_values(by="time_step").plot(y="cpu_usage", x="time_step", figsize=(16,6),title="cpu_usage of machine 1932")

Data pre-processing

Now we need to do data cleaning and preprocessing on the raw data. Note that this part could vary for different dataset.

For the machine_usage data, the pre-processing contains 2 parts:

  1. Change the time unit from seconds to minutes, which is calculated by averaging cpu_usage within the minutes.
  2. Check missing values and handle missing data(fill or drop)

Change the time unit to minute.

df_1932["ts_min"] = df_1932["time_step"]//60

Calculate the average of cpu_usage in the same time and draw the line chart.

In [14]:
resampled_df = df_1932.groupby("ts_min").agg({"cpu_usage":"mean"})

resampled_df.plot(figsize=(16,6),title="mean cpu_usage of machine 1932")

We need datetime information to generate related features,so re-adjust time series.

resampled_df.index = pd.to_datetime(resampled_df.index, unit='m', origin=pd.Timestamp('2018-01-01'))

Ensure the time interval of dataframe is the same.

In [17]:
full_idx = pd.date_range(start=resampled_df.index.min(), end=resampled_df.index.max(),freq='T')
resampled_df = resampled_df.reindex(full_idx)

2018-01-01 00:56:00 19.000000
2018-01-01 00:57:00 19.666667
2018-01-01 00:58:00 19.666667
2018-01-01 00:59:00 20.833333
2018-01-01 01:00:00 25.666667

Here, we drop day with more than 3 consecutive missing values and fill other missing values remained.

drop_ts,drop_len = get_drop_ts_and_len(resampled_df,allow_missing_num=3)
df = rm_missing_day(drop_ts, drop_len,resampled_df)

2018-01-03 00:00:00 29.666667
2018-01-03 00:01:00 33.000000
2018-01-03 00:02:00 37.833333
2018-01-03 00:03:00 34.666667
2018-01-03 00:04:00 37.166667

print("no. of n/a values:")

no. of n/a values:
cpu_usage    21
dtype: int64

Feature Engineering

For feature engineering, we use hour as features.

df["hour"] = df.index.hour

cpu_usage hour
2018-01-03 00:00:00 29.666667 0
2018-01-03 00:01:00 33.000000 0
2018-01-03 00:02:00 37.833333 0
2018-01-03 00:03:00 34.666667 0
2018-01-03 00:04:00 37.166667 0

Data preparation

Now we split the dataset into train, validation and test.

val_ratio = 0.1
test_ratio = 0.1
# we look back one hour data which is of the frequency of 1min.
look_back = 60
horizon = 1

train_df, val_df, test_df = train_val_test_split(df, val_ratio, test_ratio, look_back, horizon)

train_df.index.size, val_df.index.size, test_df.index.size

(6912, 924, 924)

Then standardize train, test data and features

from sklearn.preprocessing import StandardScaler
standard_scaler = StandardScaler()
scaled_train = standard_scaler.fit_transform(train_df)
scaled_val = standard_scaler.transform(val_df)
scaled_test = standard_scaler.transform(test_df)

Last, we generate model input samples by sliding window along time axis.

import numpy as np
x_train, y_train = roll_data(scaled_train, look_back, target_col_index=0)
x_val, y_val = roll_data(scaled_val, look_back, target_col_index=0)
x_test, y_test = roll_data(scaled_test, look_back, target_col_index=0)

x_test.shape, y_test.shape

((864, 60, 2), (864, 1))

Time series forecasting

from zoo.zouwu.model.forecast import MTNetForecaster

First, we initialize a mtnet_forecaster according to input data shape. Specifcally, look_back should equal (long_series_num+1)*series_length . Details refer to zouwu docs here.

mtnet_forecaster = MTNetForecaster(target_dim=horizon,

MTNet needs to preprocess the X into another format, so we call MTNetForecaster.preprocess_input on train_x and test_x.

# mtnet requires reshape of input x before feeding into model.
x_train_mtnet = mtnet_forecaster.preprocess_input(x_train)
x_val_mtnet = mtnet_forecaster.preprocess_input(x_val)
x_test_mtnet = mtnet_forecaster.preprocess_input(x_test)

Now we train the model and wait till it finished.

hist = = x_train_mtnet, y = y_train, batch_size=128, epochs=20)

Use the model for prediction and inverse the scaling of the prediction results.

y_pred_val = mtnet_forecaster.predict(x_val_mtnet)
y_pred_test = mtnet_forecaster.predict(x_test_mtnet)

y_pred_val_unscale = unscale(standard_scaler, y_pred_val, [0])
y_pred_test_unscale = unscale(standard_scaler, y_pred_test, [0])
y_val_unscale = unscale(standard_scaler,y_val,[0])
y_test_unscale = unscale(standard_scaler,y_test,[0])

Calculate the symetric mean absolute percentage error.

# evaluate with sMAPE
print("sMAPE is",sMAPE(y_pred_test_unscale, y_test_unscale))

sMAPE is 5.007178919584265

Anomaly detection

from zoo.zouwu.model.anomaly import ThresholdDetector, ThresholdEstimator

threshold = ThresholdEstimator().fit(y=y_val_unscale,yhat=y_pred_val_unscale,ratio=ratio)
print("The threshold of validation dataset is:",threshold)

The threshold of validation dataset is: 14.632055810957759

val_res_ano_idx = ThresholdDetector().detect(y=y_val_unscale,yhat=y_pred_val_unscale,threshold=threshold)
test_res_ano_idx = ThresholdDetector().detect(y=y_test_unscale,yhat=y_pred_test_unscale,threshold=threshold)
print("The index of anomalies in validation dataset is:",val_res_ano_idx)
print("The index of anomalies in test dataset is:",test_res_ano_idx)

The index of anomalies in validation dataset is: [55, 112, 265, 436, 502, 625, 632, 641, 842]
The index of anomalies in test dataset is: [68, 79, 336, 397, 443, 532, 693, 694, 695, 696, 809, 811, 823, 827, 829, 838, 843, 846, 849, 852, 854, 856, 858, 860, 861]

Get a new dataframe which contains y_truey_predanomalies value.

val_result_df = get_result_df(val_df, y_pred_val_unscale, val_res_ano_idx, look_back)
test_result_df = get_result_df(test_df, y_pred_test_unscale, test_res_ano_idx, look_back)

Draw anomalies in line chart.

plot_anomalies_value(val_result_df.index, val_result_df.y_true, val_result_df.y_pred, val_result_df.anomalies)

plot_anomalies_value(test_result_df.index, test_result_df.y_true, test_result_df.y_pred, test_result_df.anomalies)

