Cifar10 Drift Detection

In this example we will deploy an image classification model along with a drift detector trained on the same dataset. For in depth details on creating a drift detection model for your own dataset see the alibi-detect project and associated documentation. You can find details for this CIFAR10 example in their documentation as well.

Prequisites:


In [ ]:
!pip install -r requirements_notebook.txt

Setup Resources

Enabled eventing on default namespace. This will activate a default Knative Broker.


In [ ]:
!kubectl label namespace default knative-eventing-injection=enabled

Create a Knative service to log events it receives. This will be the example final sink for outlier events.


In [ ]:
!pygmentize message-dumper.yaml

In [ ]:
!kubectl apply -f message-dumper.yaml

Create the Kfserving image classification model for Cifar10. We add in a logger for requests - the default destination is the namespace Knative Broker.


In [ ]:
!pygmentize cifar10.yaml

In [ ]:
!kubectl apply -f cifar10.yaml

Create the pretrained Drift Detector. We forward replies to the message-dumper we started. Notice the drift_batch_size. The drift detector will wait until drify_batch_size number of requests are received before making a drift prediction.


In [ ]:
!pygmentize cifar10cd.yaml

In [ ]:
!kubectl apply -f cifar10cd.yaml

Create a Knative trigger to forward logging events to our Outlier Detector.


In [ ]:
!pygmentize trigger.yaml

In [ ]:
!kubectl apply -f trigger.yaml

Get the IP address of the Istio Ingress Gateway. This assumes you have installed istio with a LoadBalancer.


In [ ]:
CLUSTER_IPS=!(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
CLUSTER_IP=CLUSTER_IPS[0]
print(CLUSTER_IP)

In [ ]:
SERVICE_HOSTNAMES=!(kubectl get inferenceservice tfserving-cifar10 -o jsonpath='{.status.url}' | cut -d "/" -f 3)
SERVICE_HOSTNAME_CIFAR10=SERVICE_HOSTNAMES[0]
print(SERVICE_HOSTNAME_CIFAR10)

In [ ]:
SERVICE_HOSTNAMES=!(kubectl get ksvc drift-detector -o jsonpath='{.status.url}' | cut -d "/" -f 3)
SERVICE_HOSTNAME_VAEOD=SERVICE_HOSTNAMES[0]
print(SERVICE_HOSTNAME_VAEOD)

In [ ]:
import matplotlib.pyplot as plt
import numpy as np
import requests
import json
import tensorflow as tf
tf.keras.backend.clear_session()

train, test = tf.keras.datasets.cifar10.load_data()
X_train, y_train = train
X_test, y_test = test

X_train = X_train.astype('float32') / 255
X_test = X_test.astype('float32') / 255
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

def show(X):
    plt.imshow(X.reshape(32, 32, 3))
    plt.axis('off')
    plt.show()

def predict(X):
    formData = {
    'instances': X.tolist()
    }
    headers = {}
    headers["Host"] = SERVICE_HOSTNAME_CIFAR10
    res = requests.post('http://'+CLUSTER_IP+'/v1/models/tfserving-cifar10:predict', json=formData, headers=headers)
    if res.status_code == 200:
        j = res.json()
        if len(j["predictions"]) == 1:
            return classes[np.array(j["predictions"])[0].argmax()]
    else:
        print("Failed with ",res.status_code)
        return []
    
def drift(X):
    formData = {
    'instances': X.tolist()
    }
    headers = {}
    headers["Host"] = SERVICE_HOSTNAME_VAEOD
    res = requests.post('http://'+CLUSTER_IP+'/', json=formData, headers=headers)
    if res.status_code == 200:
        od = res.json()
        return od
    else:
        print("Failed with ",res.status_code)
        return []

Normal Prediction


In [ ]:
idx = 1
X = X_train[idx:idx+1]
show(X)
predict(X)

Test Drift

We need to accumulate a large enough batch size so no drift will be tested as yet.


In [ ]:
!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container

We will now send 5000 requests to the model in batches. The drift detector will run at the end of this as we set the drift_batch_size to 5000 in our yaml above.


In [ ]:
from tqdm.notebook import tqdm
for i in tqdm(range(0,5000,100)):
    X = X_train[i:i+100]
    predict(X)

Let's check the message dumper and extract the first drift result.


In [ ]:
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
    if res[i] == 'Data,':
        data.append(res[i+1])
j = json.loads(json.loads(data[0]))
print("Drift",j["data"]["is_drift"]==1)

Now, let's create some CIFAR10 examples with motion blur.


In [ ]:
from alibi_detect.datasets import fetch_cifar10c, corruption_types_cifar10c
corruption = ['motion_blur']
X_corr, y_corr = fetch_cifar10c(corruption=corruption, severity=5, return_X_y=True)
X_corr = X_corr.astype('float32') / 255

In [ ]:
show(X_corr[0])
show(X_corr[1])
show(X_corr[2])

Send these examples to the predictor.


In [ ]:
for i in tqdm(range(0,5000,100)):
    X = X_corr[i:i+100]
    predict(X)

Now when we check the message dump we should find a new drift response.


In [ ]:
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
    if res[i] == 'Data,':
        data.append(res[i+1])
j = json.loads(json.loads(data[1]))
print("Drift",j["data"]["is_drift"]==1)

Tear Down


In [ ]:
!kubectl delete -f cifar10.yaml
!kubectl delete -f cifar10cd.yaml
!kubectl delete -f trigger.yaml
!kubectl delete -f message-dumper.yaml

In [ ]: