Anomaly Detection is an interesting modeling algorithm that can be used to identify rare cases (such as fraud detection). It is normally considered as an unsupervised learning technique. From personal experiecne, I've also appied it for skewed binary classifications and achieved reasonable reasults. Let's now get to the algorithm itself.

The initial steps of anomaly detection can be summarized as:

  • Find feature vector $X_1, X_2, ..., X_n$($n$ features) that might be considered to be relevant with the anomaly cases

  • Fit parameters $(\mu_1,\sigma_1^2), (\mu_2,\sigma_2^2), ... (\mu_n,\sigma_n^2)$ for Gaussian distributions of all the features. If we have $m$ examples, the fitted parameter can be expressed as:

$$\mu_j=\frac{1}{m}\sum_{i=1}^{m}X_j^{(i)}$$$$\sigma_j^2=\frac{1}{m}\sum_{i=1}^{m}(X_j^{(i)}-\mu_j)^2$$
  • To test a new example $x$, we can compute $p(x)$:
$$p(x)=\prod_{j=1}^{n}\frac{1}{\sqrt{2\pi}\sigma_j}exp\big(-\frac{(X_j-\mu_j)^2}{2\sigma_j^2}\big)$$
  • If $p(x)<\epsilon$, we would consider it as an anomaly.

The tricky part is to find the proper value for $\epsilon$. We can do this by borrowing the idea of cross-validation from supervised learning. That is, if we have a data set containing $m1$ normal examples, and $m2$ abnormal cases. We then divide $m1$ good examples into three parts: say $m11, m12, m13$. We also need to split the $m2$ bad examples into two pieces - $m21, m22$.

To learn the parameters of the Gaussian Distributions, we can use all good $m11$ examples.

To learn the values of the $\epsilon$, we can put $m12$ good examples and $m21$ bad examples together, and coded them as 0 and 1. Then we loop through a list of possible $\epsilon$ values. For each $\epsilon$, we calculate the corresponding performance of this anomaly detection algorithm (we will use f1-score as evaluation metric to deal with the imbalanced class distribution).

In the end, we are left with $m13$ good cases and $m22$ bad cases for test purposes.

If features are highly correlated, we can apply multivariate Gaussian Distribution instead.

The basic idea is still the same. The fitting of distribution parameters and the calcualtion of $p(x)$ are slightly changed.

The mean vector and the variance-covariance matrix can be expressed as:

$$\mu=\frac{1}{m}\sum_{i=1}^{m}X^{(i)}$$$$\Sigma=\frac{1}{m}\sum_{i=1}^{m}(X^{(i)}-\mu)(X^{(i)}-\mu)^T$$

And $p(x)$ can be calcuated as:

$$p(x;\mu,\Sigma)=\frac{1}{(2\pi)^{n/2}|\Sigma|^{1/2}}exp\Bigg(-\frac{1}{2}(x-\mu)^T\Sigma^{-1}(x-\mu)\Bigg)$$

The following class AnomalyDetection() covers both non-multivariate and multivariate Gaussian models, and implement both ideas from scratch.


In [1]:
import math
import numpy as np
import scipy

class AnomalyDetection():

    def __init__(self, multi_variate=False):

        # if multi_variate is True, we will use multivariate Gaussian distribution
        # to estimate the probabilities
        self.multi_variate = multi_variate
        self.mu = None
        self.sigma2 = None
        self.best_epsilon = 0
        self.best_f1_score = 0

    def _fit_gaussian(self, X_train):

        # fit the parameters of the Gaussian Distribution

        # if not using the multivariate Gaussian Distribution, we will estimate
        # mu and sigma for each single feature distribution separately
        if self.multi_variate is False:
            self.mu = np.mean(X_train, axis=0)
            self.sigma2 = np.var(X_train, axis=0)

        # if using the multivariate Gaussian Distribution, we estimate the vector
        # of mu and variance/covariance matrix of sigma
        else:
            m = X_train.shape[0]
            self.mu = np.mean(X_train, axis=0)
            self.sigma2 = 1.0 / m * (X_train - self.mu).T.dot(X_train - self.mu)

    def _prob_calc(self, X):

        # helper function to calculate the probability of each instance
        # in the cross-validation set
        if self.multi_variate is False:
            p = np.prod(np.exp(-(X - self.mu) ** 2 / (2.0 * self.sigma2))
                        / np.sqrt(2.0 * math.pi * self.sigma2), axis=1)

        else:
            n = X.shape[1]
            p = 1.0 / ((2 * math.pi) ** (n / 2.0) * (np.linalg.det(self.sigma2) ** 0.5)) \
                * np.diag(np.exp(-0.5 * ((X - self.mu).dot(np.linalg.inv(self.sigma2))) \
                                 .dot((X - self.mu).T)))

        return p

    def _fit_epsilon(self, X_val, y_val):

        # this is the second step of model fitting
        # the input is the cross-validation set
        # the output is the threshold that will maximizes the f1-score
        # of the positive class (anomalies) in the CV set

        p_val = self._prob_calc(X_val)
        p_min = np.array(p_val).min()
        p_max =np.array(p_val).max()
        step = (p_max - p_min) / 100.0

        for epsilon in np.arange(p_min, p_max + step, step):
            y_predict = (p_val < epsilon).reshape((len(y_val), 1))

            TP = np.sum([1 if y_predict[i] == 1 and y_val[i] == 1 else 0 \
                         for i in range(len(y_val))])
            PP = np.sum((y_predict == 1))
            AP = np.sum((y_val == 1))

            if PP == 0 or AP == 0:
                continue

            precision = float(TP) / PP
            recall = float(TP) / AP
            f1_score = 2.0 * precision * recall / (precision + recall)

            if f1_score > self.best_f1_score:
                self.best_f1_score = f1_score
                self.best_epsilon = epsilon


    def fit(self, X_train, X_val, y_val):

        # fit the anomaly detection model
        # step 1 - fit mu and sigma based on the training set (all 0s)
        # step 2 - fit epsilon based on validation set (0s and 1s)
        self._fit_gaussian(X_train)
        self._fit_epsilon(X_val, y_val)

    def predict(self, X_test):

        # predict using fitted model
        p_test = self._prob_calc(X_test)
        y_test = (p_test < self.best_epsilon).astype(int)

        return y_test

Now let's load the Iris Dataset for a demo.

We can see that the dataset has 150 examples, with three different classes 0, 1, 2.


In [2]:
from sklearn.datasets import load_iris
iris = load_iris()

X = iris['data']
y = iris['target']

print X.shape
print y.shape

print "Number of Classes 0: {}".format(np.sum(y == 0))
print "Number of Classes 1: {}".format(np.sum(y == 1))
print "Number of Classes 2: {}".format(np.sum(y == 2))


(150L, 4L)
(150L,)
Number of Classes 0: 50
Number of Classes 1: 50
Number of Classes 2: 50

Let's just assume class 2 as the anomaly for test purposes.


In [3]:
y_new = y
y_new[y == 0] = 0
y_new[y == 1] = 0
y_new[y == 2] = 1

Now we have 100 normal examples, and 50 abnormal cases.


In [4]:
print "Number of Classes 0: {}".format(np.sum(y_new == 0))
print "Number of Classes 1: {}".format(np.sum(y_new == 1))


Number of Classes 0: 100
Number of Classes 1: 50

Then we can split the dataset into train (with all normal examples), validation (normal & abnormal examples), and test (the rest of normal & abnormal examples).


In [5]:
X_normal = X[y_new == 0]
y_normal = y_new[y_new == 0]
X_abnormal = X[y_new == 1]
y_abnormal = y_new[y_new == 1]

from sklearn.cross_validation import train_test_split

X_normal_train_val, X_normal_test, y_normal_train_val, y_normal_test = \
    train_test_split(X_normal, y_normal, test_size=0.2, random_state=26)

X_normal_train, X_normal_val, y_normal_train, y_normal_val = \
    train_test_split(X_normal_train_val, y_normal_train_val, test_size=0.25, random_state=26)

X_abnormal_val, X_abnormal_test, y_abnormal_val, y_abnormal_test = \
    train_test_split(X_abnormal, y_abnormal, test_size=0.5, random_state=26)

X_train = X_normal_train
y_train = y_normal_train
X_val = np.r_[X_normal_val, X_abnormal_val]
y_val = np.r_[y_normal_val, y_abnormal_val]
X_test = np.r_[X_normal_test, X_abnormal_test]
y_test = np.r_[y_normal_test, y_abnormal_test]

print X_train.shape
print X_val.shape
print X_test.shape


(60L, 4L)
(45L, 4L)
(45L, 4L)

Finally, we can start an AnmalyDetection() object, use train and validation data to fit the Gaussian Distribution paramters and find the proper value for $\epsilon$. With the fitted model, we can predict the anomaly cases on the test dataset and check the performance.


In [6]:
ad = AnomalyDetection(multi_variate=False)
ad.fit(X_train, X_val, y_val)
y_predict = ad.predict(X_test)
print "True Values:      {}".format(y_test)
print "Predicted Values: {}".format(y_predict)
print "Prediction Accuracy: {:.2%}".format(np.mean((y_predict == y_test).astype(float)))


True Values:      [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1]
Predicted Values: [1 0 0 1 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1]
Prediction Accuracy: 91.11%

In summary, Anomaly Detection is a pretty useful tool for rare case identification. The multivariate Gaussian Distribution is clearly more complex and computationally costly compared to non-multivariate method. It has the advantage of capturing correlations between different features but requires the number of examples larger than number of feature dimensions to avoid matrix singularity issues.