In this example we will deploy an image classification model along with a drift detector trained on the same dataset. For in depth details on creating a drift detection model for your own dataset see the alibi-detect project and associated documentation. You can find details for this CIFAR10 example in their documentation as well.
Prequisites:
In [ ]:
!pip install -r requirements_notebook.txt
Enabled eventing on default namespace. This will activate a default Knative Broker.
In [ ]:
!kubectl label namespace default knative-eventing-injection=enabled
Create a Knative service to log events it receives. This will be the example final sink for outlier events.
In [ ]:
!pygmentize message-dumper.yaml
In [ ]:
!kubectl apply -f message-dumper.yaml
Create the Kfserving image classification model for Cifar10. We add in a logger
for requests - the default destination is the namespace Knative Broker.
In [ ]:
!pygmentize cifar10.yaml
In [ ]:
!kubectl apply -f cifar10.yaml
Create the pretrained Drift Detector. We forward replies to the message-dumper we started. Notice the drift_batch_size
. The drift detector will wait until drify_batch_size
number of requests are received before making a drift prediction.
In [ ]:
!pygmentize cifar10cd.yaml
In [ ]:
!kubectl apply -f cifar10cd.yaml
Create a Knative trigger to forward logging events to our Outlier Detector.
In [ ]:
!pygmentize trigger.yaml
In [ ]:
!kubectl apply -f trigger.yaml
Get the IP address of the Istio Ingress Gateway. This assumes you have installed istio with a LoadBalancer.
In [ ]:
CLUSTER_IPS=!(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
CLUSTER_IP=CLUSTER_IPS[0]
print(CLUSTER_IP)
In [ ]:
SERVICE_HOSTNAMES=!(kubectl get inferenceservice tfserving-cifar10 -o jsonpath='{.status.url}' | cut -d "/" -f 3)
SERVICE_HOSTNAME_CIFAR10=SERVICE_HOSTNAMES[0]
print(SERVICE_HOSTNAME_CIFAR10)
In [ ]:
SERVICE_HOSTNAMES=!(kubectl get ksvc drift-detector -o jsonpath='{.status.url}' | cut -d "/" -f 3)
SERVICE_HOSTNAME_VAEOD=SERVICE_HOSTNAMES[0]
print(SERVICE_HOSTNAME_VAEOD)
In [ ]:
import matplotlib.pyplot as plt
import numpy as np
import requests
import json
import tensorflow as tf
tf.keras.backend.clear_session()
train, test = tf.keras.datasets.cifar10.load_data()
X_train, y_train = train
X_test, y_test = test
X_train = X_train.astype('float32') / 255
X_test = X_test.astype('float32') / 255
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
classes = ('plane', 'car', 'bird', 'cat',
'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
def show(X):
plt.imshow(X.reshape(32, 32, 3))
plt.axis('off')
plt.show()
def predict(X):
formData = {
'instances': X.tolist()
}
headers = {}
headers["Host"] = SERVICE_HOSTNAME_CIFAR10
res = requests.post('http://'+CLUSTER_IP+'/v1/models/tfserving-cifar10:predict', json=formData, headers=headers)
if res.status_code == 200:
j = res.json()
if len(j["predictions"]) == 1:
return classes[np.array(j["predictions"])[0].argmax()]
else:
print("Failed with ",res.status_code)
return []
def drift(X):
formData = {
'instances': X.tolist()
}
headers = {}
headers["Host"] = SERVICE_HOSTNAME_VAEOD
res = requests.post('http://'+CLUSTER_IP+'/', json=formData, headers=headers)
if res.status_code == 200:
od = res.json()
return od
else:
print("Failed with ",res.status_code)
return []
In [ ]:
idx = 1
X = X_train[idx:idx+1]
show(X)
predict(X)
We need to accumulate a large enough batch size so no drift will be tested as yet.
In [ ]:
!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
We will now send 5000 requests to the model in batches. The drift detector will run at the end of this as we set the drift_batch_size
to 5000 in our yaml above.
In [ ]:
from tqdm.notebook import tqdm
for i in tqdm(range(0,5000,100)):
X = X_train[i:i+100]
predict(X)
Let's check the message dumper and extract the first drift result.
In [ ]:
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
if res[i] == 'Data,':
data.append(res[i+1])
j = json.loads(json.loads(data[0]))
print("Drift",j["data"]["is_drift"]==1)
Now, let's create some CIFAR10 examples with motion blur.
In [ ]:
from alibi_detect.datasets import fetch_cifar10c, corruption_types_cifar10c
corruption = ['motion_blur']
X_corr, y_corr = fetch_cifar10c(corruption=corruption, severity=5, return_X_y=True)
X_corr = X_corr.astype('float32') / 255
In [ ]:
show(X_corr[0])
show(X_corr[1])
show(X_corr[2])
Send these examples to the predictor.
In [ ]:
for i in tqdm(range(0,5000,100)):
X = X_corr[i:i+100]
predict(X)
Now when we check the message dump we should find a new drift response.
In [ ]:
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
if res[i] == 'Data,':
data.append(res[i+1])
j = json.loads(json.loads(data[1]))
print("Drift",j["data"]["is_drift"]==1)
In [ ]:
!kubectl delete -f cifar10.yaml
!kubectl delete -f cifar10cd.yaml
!kubectl delete -f trigger.yaml
!kubectl delete -f message-dumper.yaml
In [ ]: