In this example we will deploy an image classification model along with a drift detector trained on the same dataset. For in depth details on creating a drift detection model for your own dataset see the alibi-detect project and associated documentation. You can find details for this CIFAR10 example in their documentation as well.
Prequisites:
In [ ]:
    
!pip install -r requirements_notebook.txt
    
Enabled eventing on default namespace. This will activate a default Knative Broker.
In [ ]:
    
!kubectl label namespace default knative-eventing-injection=enabled
    
Create a Knative service to log events it receives. This will be the example final sink for outlier events.
In [ ]:
    
!pygmentize message-dumper.yaml
    
In [ ]:
    
!kubectl apply -f message-dumper.yaml
    
Create the Kfserving image classification model for Cifar10. We add in a logger for requests - the default destination is the namespace Knative Broker.
In [ ]:
    
!pygmentize cifar10.yaml
    
In [ ]:
    
!kubectl apply -f cifar10.yaml
    
Create the pretrained Drift Detector. We forward replies to the message-dumper we started. Notice the drift_batch_size. The drift detector will wait until drify_batch_size number of requests are received before making a drift prediction.
In [ ]:
    
!pygmentize cifar10cd.yaml
    
In [ ]:
    
!kubectl apply -f cifar10cd.yaml
    
Create a Knative trigger to forward logging events to our Outlier Detector.
In [ ]:
    
!pygmentize trigger.yaml
    
In [ ]:
    
!kubectl apply -f trigger.yaml
    
Get the IP address of the Istio Ingress Gateway. This assumes you have installed istio with a LoadBalancer.
In [ ]:
    
CLUSTER_IPS=!(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
CLUSTER_IP=CLUSTER_IPS[0]
print(CLUSTER_IP)
    
In [ ]:
    
SERVICE_HOSTNAMES=!(kubectl get inferenceservice tfserving-cifar10 -o jsonpath='{.status.url}' | cut -d "/" -f 3)
SERVICE_HOSTNAME_CIFAR10=SERVICE_HOSTNAMES[0]
print(SERVICE_HOSTNAME_CIFAR10)
    
In [ ]:
    
SERVICE_HOSTNAMES=!(kubectl get ksvc drift-detector -o jsonpath='{.status.url}' | cut -d "/" -f 3)
SERVICE_HOSTNAME_VAEOD=SERVICE_HOSTNAMES[0]
print(SERVICE_HOSTNAME_VAEOD)
    
In [ ]:
    
import matplotlib.pyplot as plt
import numpy as np
import requests
import json
import tensorflow as tf
tf.keras.backend.clear_session()
train, test = tf.keras.datasets.cifar10.load_data()
X_train, y_train = train
X_test, y_test = test
X_train = X_train.astype('float32') / 255
X_test = X_test.astype('float32') / 255
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
def show(X):
    plt.imshow(X.reshape(32, 32, 3))
    plt.axis('off')
    plt.show()
def predict(X):
    formData = {
    'instances': X.tolist()
    }
    headers = {}
    headers["Host"] = SERVICE_HOSTNAME_CIFAR10
    res = requests.post('http://'+CLUSTER_IP+'/v1/models/tfserving-cifar10:predict', json=formData, headers=headers)
    if res.status_code == 200:
        j = res.json()
        if len(j["predictions"]) == 1:
            return classes[np.array(j["predictions"])[0].argmax()]
    else:
        print("Failed with ",res.status_code)
        return []
    
def drift(X):
    formData = {
    'instances': X.tolist()
    }
    headers = {}
    headers["Host"] = SERVICE_HOSTNAME_VAEOD
    res = requests.post('http://'+CLUSTER_IP+'/', json=formData, headers=headers)
    if res.status_code == 200:
        od = res.json()
        return od
    else:
        print("Failed with ",res.status_code)
        return []
    
In [ ]:
    
idx = 1
X = X_train[idx:idx+1]
show(X)
predict(X)
    
We need to accumulate a large enough batch size so no drift will be tested as yet.
In [ ]:
    
!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
    
We will now send 5000 requests to the model in batches. The drift detector will run at the end of this as we set the drift_batch_size to 5000 in our yaml above.
In [ ]:
    
from tqdm.notebook import tqdm
for i in tqdm(range(0,5000,100)):
    X = X_train[i:i+100]
    predict(X)
    
Let's check the message dumper and extract the first drift result.
In [ ]:
    
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
    if res[i] == 'Data,':
        data.append(res[i+1])
j = json.loads(json.loads(data[0]))
print("Drift",j["data"]["is_drift"]==1)
    
Now, let's create some CIFAR10 examples with motion blur.
In [ ]:
    
from alibi_detect.datasets import fetch_cifar10c, corruption_types_cifar10c
corruption = ['motion_blur']
X_corr, y_corr = fetch_cifar10c(corruption=corruption, severity=5, return_X_y=True)
X_corr = X_corr.astype('float32') / 255
    
In [ ]:
    
show(X_corr[0])
show(X_corr[1])
show(X_corr[2])
    
Send these examples to the predictor.
In [ ]:
    
for i in tqdm(range(0,5000,100)):
    X = X_corr[i:i+100]
    predict(X)
    
Now when we check the message dump we should find a new drift response.
In [ ]:
    
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
    if res[i] == 'Data,':
        data.append(res[i+1])
j = json.loads(json.loads(data[1]))
print("Drift",j["data"]["is_drift"]==1)
    
In [ ]:
    
!kubectl delete -f cifar10.yaml
!kubectl delete -f cifar10cd.yaml
!kubectl delete -f trigger.yaml
!kubectl delete -f message-dumper.yaml
    
In [ ]: