AIOps anomaly detection

Anomaly detection detects data points in data that does not fit well with the rest of data. In this notebook we demonstrate how to do anomaly detection using Zouwu's built-in model MTNet

For demonstration, we use the publicly available cluster trace data cluster-trace-v2018 of Alibaba Open Cluster Trace Program. You can find the dataset introduction here. In particular, we use machine usage data to demonstrate anomaly detection, you can download the separate data file directly with machine_usage.

Helper functions

This section defines some helper functions to be used in the following procedures. You can refer to it later when they're used.


In [1]:
def get_drop_ts_and_len(df, target_col="cpu_usage", allow_missing_num=3):
    """
    get start index which with consecutive missing num larger than allow_missing_num
    """
    missing_num = df[target_col].isnull().astype(int).groupby(df[target_col].notnull().astype(int).cumsum()).sum()
    missing_num = missing_num[missing_num > 0]
    drop_ts = df.iloc[(missing_num.index + missing_num.cumsum() - missing_num).values].index
    drop_ts = drop_ts[missing_num > allow_missing_num]
    drop_missing_num = missing_num[missing_num > allow_missing_num]
    drop_len = drop_missing_num.values
    return drop_ts, drop_len

In [2]:
def rm_missing_day(start_dts, missing_lens, df):
    """
    drop days with consecutive missing values.
    If consecutive missing values across day, we remove the data for an extra hour.
    """ 
    for start_time, l in zip(start_dts, missing_lens):
        start = start_time.replace(hour=0, minute=0, second=0)
        start_day_end = start + pd.Timedelta(hours=24)
        start_day_end = start_day_end - pd.Timedelta(minutes=1)
        end_time = start_time + l*pd.Timedelta(minutes=1)
        if start_day_end < end_time:
            end = end_time + pd.Timedelta(hours=1)
        else:
            end = start_day_end
        df = df.drop(df[start:end].index)
    return df

In [3]:
def train_val_test_split(df, val_ratio, test_ratio, look_back, horizon):
    """
    Split the dataset into train, validation and test part.
    """ 
    total_num = df.index.size
    test_num = int(total_num * test_ratio)
    val_num = int(total_num * val_ratio)
    
    test_split_index = test_num + look_back + horizon - 1
    val_split_index = test_split_index + val_num 
    
    train_df = df.iloc[:-(test_num + val_num)]  
    val_df = df.iloc[-val_split_index: -test_num]
    test_df = df.iloc[-test_split_index: ]
    
    return train_df, val_df, test_df

In [4]:
def roll_data(dataset, look_back, target_col_index):
    """
    Generate input samples from rolling
    """
    X, Y = [], []
    for i in range(len(dataset) - look_back):
        X.append(dataset[i: (i + look_back)])
        Y.append(dataset[i + look_back, [target_col_index]])
    return np.array(X), np.array(Y)

In [5]:
def unscale(scaler, y, target_col_indexes):
    """
    data needs to be normalized (scaled) before feeding into models. 
    This is to inverse the effect of normalization to get reasonable forecast results.
    """
    dummy_feature_shape = scaler.scale_.shape[0]
    y_dummy = np.zeros((y.shape[0], dummy_feature_shape))
    y_dummy[:, target_col_indexes] = y
    y_unscale = scaler.inverse_transform(y_dummy)[:,target_col_indexes]
    return y_unscale

In [6]:
EPSILON = 1e-10
def sMAPE(y_true, y_pred):
    """
    Symmetric Mean Average Percentage Error
    """
    output_errors = np.mean(100 * np.abs(y_true - y_pred)/(np.abs(y_true) + np.abs(y_pred) + EPSILON), axis=0,)
    return np.mean(output_errors)

In [7]:
def get_result_df(df, y_pred_unscale, ano_index, look_back,target_col='cpu_usage'):
    """
    Add prediction and anomaly value to dataframe.
    """
    result_df = pd.DataFrame({"y_true":df[look_back:][target_col], "y_pred": y_pred_unscale.squeeze()})
    result_df['anomalies'] = 0
    result_df.loc[result_df.index[ano_index], 'anomalies'] = 1
    result_df['anomalies'] = result_df['anomalies'] > 0
    return result_df

In [8]:
def plot_anomalies_value(date, y_true, y_pred, anomalies):
    """
    plot the anomalies value
    """
    fig, axs = plt.subplots(figsize=(16,6))
    
    axs.plot(date, y_true,color='blue', label='y_true')
    axs.plot(date, y_pred,color='orange', label='y_pred')
    axs.scatter(date[anomalies].tolist(), y_true[anomalies], color='red', label='anomalies value')
    axs.set_title('the anomalies value')
    
    plt.xlabel('datetime')
    plt.legend(loc='upper left')
    plt.show()

Download raw dataset and load into dataframe

Now we download the dataset and load it into a pandas dataframe.Steps are as below:

  • First, download the raw data machine_usage. Or run the script get_data.sh to download the raw data.It will download the resource usage of each machine from m_1932 to m_2085.
  • Second, run grep m_1932 machine_usage.csv > m_1932.csv to extract records of machine 1932. Or run extract_data.sh.We use machine 1932 as an example in this notebook.You can choose any machines in the similar way.
  • Finally, use pandas to load m_1932.csv into a dataframe as shown below.

In [9]:
import os 
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [10]:
df_1932 = pd.read_csv("m_1932.csv", header=None, usecols=[1,2,3], names=["time_step", "cpu_usage","mem_usage"])

Below are some example records of the data


In [11]:
df_1932.head()


Out[11]:
time_step cpu_usage mem_usage
0 386640 41 92
1 386670 43 92
2 386690 44 92
3 386800 46 92
4 386930 44 93

In [12]:
df_1932.sort_values(by="time_step").plot(y="cpu_usage", x="time_step", figsize=(16,6),title="cpu_usage of machine 1932")


Out[12]:
<matplotlib.axes._subplots.AxesSubplot at 0x10e7eaf98>

Data pre-processing

Now we need to do data cleaning and preprocessing on the raw data. Note that this part could vary for different dataset.

For the machine_usage data, the pre-processing contains 2 parts:

  1. Change the time unit from seconds to minutes, which is calculated by averaging cpu_usage within the minutes.
  2. Check missing values and handle missing data(fill or drop)

Change the time unit to minute.


In [13]:
df_1932["ts_min"] = df_1932["time_step"]//60

Calculate the average of cpu_usage in the same time and draw the line chart.


In [14]:
resampled_df = df_1932.groupby("ts_min").agg({"cpu_usage":"mean"})

In [15]:
resampled_df.plot(figsize=(16,6),title="mean cpu_usage of machine 1932")


Out[15]:
<matplotlib.axes._subplots.AxesSubplot at 0x1038c5780>

We need datetime information to generate related features,so re-adjust time series.


In [16]:
resampled_df.index = pd.to_datetime(resampled_df.index, unit='m', origin=pd.Timestamp('2018-01-01'))

Ensure the time interval of dataframe is the same.


In [17]:
full_idx = pd.date_range(start=resampled_df.index.min(), end=resampled_df.index.max(),freq='T')
resampled_df = resampled_df.reindex(full_idx)

In [18]:
resampled_df.head()


Out[18]:
cpu_usage
2018-01-01 00:56:00 19.000000
2018-01-01 00:57:00 19.666667
2018-01-01 00:58:00 19.666667
2018-01-01 00:59:00 20.833333
2018-01-01 01:00:00 25.666667

Here, we drop day with more than 3 consecutive missing values and fill other missing values remained.


In [19]:
drop_ts,drop_len = get_drop_ts_and_len(resampled_df,allow_missing_num=3)
df = rm_missing_day(drop_ts, drop_len,resampled_df)

In [20]:
df.head()


Out[20]:
cpu_usage
2018-01-03 00:00:00 29.666667
2018-01-03 00:01:00 33.000000
2018-01-03 00:02:00 37.833333
2018-01-03 00:03:00 34.666667
2018-01-03 00:04:00 37.166667

In [21]:
print("no. of n/a values:")
print(df.isna().sum())


no. of n/a values:
cpu_usage    21
dtype: int64

In [22]:
df.ffill(inplace=True)

In [23]:
df.plot(figsize=(16,6))


Out[23]:
<matplotlib.axes._subplots.AxesSubplot at 0x102ae3128>

Feature Engineering

For feature engineering, we use hour as features.


In [24]:
df["hour"] = df.index.hour

In [25]:
df.head()


Out[25]:
cpu_usage hour
2018-01-03 00:00:00 29.666667 0
2018-01-03 00:01:00 33.000000 0
2018-01-03 00:02:00 37.833333 0
2018-01-03 00:03:00 34.666667 0
2018-01-03 00:04:00 37.166667 0

Data preparation

Now we split the dataset into train, validation and test.


In [26]:
val_ratio = 0.1
test_ratio = 0.1
# we look back one hour data which is of the frequency of 1min.
look_back = 60
horizon = 1

train_df, val_df, test_df = train_val_test_split(df, val_ratio, test_ratio, look_back, horizon)

In [27]:
train_df.index.size, val_df.index.size, test_df.index.size


Out[27]:
(6912, 924, 924)

Then standardize train, test data and features


In [28]:
from sklearn.preprocessing import StandardScaler
standard_scaler = StandardScaler()
scaled_train = standard_scaler.fit_transform(train_df)
scaled_val = standard_scaler.transform(val_df)
scaled_test = standard_scaler.transform(test_df)

Last, we generate model input samples by sliding window along time axis.


In [29]:
import numpy as np
x_train, y_train = roll_data(scaled_train, look_back, target_col_index=0)
x_val, y_val = roll_data(scaled_val, look_back, target_col_index=0)
x_test, y_test = roll_data(scaled_test, look_back, target_col_index=0)

In [30]:
x_test.shape, y_test.shape


Out[30]:
((864, 60, 2), (864, 1))

Time series forecasting


In [33]:
from zoo.zouwu.model.forecast import MTNetForecaster

First, we initialize a mtnet_forecaster according to input data shape. Specifcally, look_back should equal (long_series_num+1)*series_length . Details refer to zouwu docs here.


In [35]:
mtnet_forecaster = MTNetForecaster(target_dim=horizon,
                                   feature_dim=x_train.shape[-1],
                                   long_series_num=3,
                                   series_length=15
    )


WARNING:tensorflow:From /anaconda3/envs/automl/lib/python3.6/site-packages/tensorflow_core/python/keras/initializers.py:94: calling TruncatedNormal.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
WARNING:tensorflow:From /anaconda3/envs/automl/lib/python3.6/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.

MTNet needs to preprocess the X into another format, so we call MTNetForecaster.preprocess_input on train_x and test_x.


In [36]:
# mtnet requires reshape of input x before feeding into model.
x_train_mtnet = mtnet_forecaster.preprocess_input(x_train)
x_val_mtnet = mtnet_forecaster.preprocess_input(x_val)
x_test_mtnet = mtnet_forecaster.preprocess_input(x_test)

Now we train the model and wait till it finished.


In [37]:
%%time
hist = mtnet_forecaster.fit(x = x_train_mtnet, y = y_train, batch_size=128, epochs=20)


WARNING:tensorflow:From /anaconda3/envs/automl/lib/python3.6/site-packages/tensorflow_core/python/ops/math_grad.py:1424: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
Train on 6852 samples
Epoch 1/20
6852/6852 [==============================] - 25s 4ms/sample - loss: 0.5896 - mean_squared_error: 0.5736
Epoch 2/20
6852/6852 [==============================] - 17s 2ms/sample - loss: 0.4529 - mean_squared_error: 0.3615
Epoch 3/20
6852/6852 [==============================] - 15s 2ms/sample - loss: 0.3644 - mean_squared_error: 0.2388
Epoch 4/20
6852/6852 [==============================] - 15s 2ms/sample - loss: 0.3381 - mean_squared_error: 0.2067
Epoch 5/20
6852/6852 [==============================] - 16s 2ms/sample - loss: 0.3256 - mean_squared_error: 0.1928
Epoch 6/20
6852/6852 [==============================] - 15s 2ms/sample - loss: 0.3195 - mean_squared_error: 0.1862
Epoch 7/20
6852/6852 [==============================] - 15s 2ms/sample - loss: 0.3167 - mean_squared_error: 0.1840
Epoch 8/20
6852/6852 [==============================] - 15s 2ms/sample - loss: 0.3101 - mean_squared_error: 0.1765
Epoch 9/20
6852/6852 [==============================] - 15s 2ms/sample - loss: 0.3073 - mean_squared_error: 0.1733
Epoch 10/20
6852/6852 [==============================] - 14s 2ms/sample - loss: 0.3068 - mean_squared_error: 0.1738
Epoch 11/20
6852/6852 [==============================] - 15s 2ms/sample - loss: 0.3031 - mean_squared_error: 0.1692
Epoch 12/20
6852/6852 [==============================] - 14s 2ms/sample - loss: 0.3042 - mean_squared_error: 0.1703
Epoch 13/20
6852/6852 [==============================] - 15s 2ms/sample - loss: 0.2996 - mean_squared_error: 0.1668
Epoch 14/20
6852/6852 [==============================] - 15s 2ms/sample - loss: 0.3004 - mean_squared_error: 0.1664
Epoch 15/20
6852/6852 [==============================] - 14s 2ms/sample - loss: 0.2970 - mean_squared_error: 0.1644
Epoch 16/20
6852/6852 [==============================] - 14s 2ms/sample - loss: 0.2988 - mean_squared_error: 0.1650
Epoch 17/20
6852/6852 [==============================] - 14s 2ms/sample - loss: 0.2977 - mean_squared_error: 0.1642
Epoch 18/20
6852/6852 [==============================] - 15s 2ms/sample - loss: 0.2974 - mean_squared_error: 0.1645
Epoch 19/20
6852/6852 [==============================] - 15s 2ms/sample - loss: 0.2968 - mean_squared_error: 0.1626
Epoch 20/20
6852/6852 [==============================] - 14s 2ms/sample - loss: 0.2950 - mean_squared_error: 0.1626
CPU times: user 16min, sys: 1min 57s, total: 17min 57s
Wall time: 5min 15s

Use the model for prediction and inverse the scaling of the prediction results.


In [38]:
y_pred_val = mtnet_forecaster.predict(x_val_mtnet)
y_pred_test = mtnet_forecaster.predict(x_test_mtnet)

In [39]:
y_pred_val_unscale = unscale(standard_scaler, y_pred_val, [0])
y_pred_test_unscale = unscale(standard_scaler, y_pred_test, [0])
y_val_unscale = unscale(standard_scaler,y_val,[0])
y_test_unscale = unscale(standard_scaler,y_test,[0])

Calculate the symetric mean absolute percentage error.


In [40]:
# evaluate with sMAPE
print("sMAPE is",sMAPE(y_pred_test_unscale, y_test_unscale))


sMAPE is 5.007178919584265

Anomaly detection


In [41]:
from zoo.zouwu.model.anomaly import ThresholdDetector, ThresholdEstimator

ratio=0.01
threshold = ThresholdEstimator().fit(y=y_val_unscale,yhat=y_pred_val_unscale,ratio=ratio)
print("The threshold of validation dataset is:",threshold)


The threshold of validation dataset is: 14.632055810957759

In [42]:
val_res_ano_idx = ThresholdDetector().detect(y=y_val_unscale,yhat=y_pred_val_unscale,threshold=threshold)
test_res_ano_idx = ThresholdDetector().detect(y=y_test_unscale,yhat=y_pred_test_unscale,threshold=threshold)
print("The index of anomalies in validation dataset is:",val_res_ano_idx)
print("The index of anomalies in test dataset is:",test_res_ano_idx)


The index of anomalies in validation dataset is: [55, 112, 265, 436, 502, 625, 632, 641, 842]
The index of anomalies in test dataset is: [68, 79, 336, 397, 443, 532, 693, 694, 695, 696, 809, 811, 823, 827, 829, 838, 843, 846, 849, 852, 854, 856, 858, 860, 861]

Get a new dataframe which contains y_truey_predanomalies value.


In [43]:
val_result_df = get_result_df(val_df, y_pred_val_unscale, val_res_ano_idx, look_back)
test_result_df = get_result_df(test_df, y_pred_test_unscale, test_res_ano_idx, look_back)

Draw anomalies in line chart.


In [44]:
plot_anomalies_value(val_result_df.index, val_result_df.y_true, val_result_df.y_pred, val_result_df.anomalies)



In [45]:
plot_anomalies_value(test_result_df.index, test_result_df.y_true, test_result_df.y_pred, test_result_df.anomalies)



In [ ]:


In [ ]: