In this example we will deploy an image classification model along with an outlier detector trained on the same dataset. For in depth details on creating an outlier detection model for your own dataset see the alibi-detect project and associated documentation. You can find details for this CIFAR10 example in their documentation as well.
Prequisites:
In [ ]:
!pip install -r requirements_notebook.txt
Enabled eventing on default namespace. This will activate a default Knative Broker.
In [ ]:
!kubectl label namespace default knative-eventing-injection=enabled
Create a Knative service to log events it receives. This will be the example final sink for outlier events.
In [ ]:
!pygmentize message-dumper.yaml
In [ ]:
!kubectl apply -f message-dumper.yaml
Create the Kfserving image classification model for Cifar10. We add in a logger
for requests - the default destination is the namespace Knative Broker.
In [ ]:
!pygmentize cifar10.yaml
In [ ]:
!kubectl apply -f cifar10.yaml
Create the pretrained VAE Cifar10 Outlier Detector. We forward replies to the message-dumper we started.
In [ ]:
!pygmentize cifar10od.yaml
In [ ]:
!kubectl apply -f cifar10od.yaml
Create a Knative trigger to forward logging events to our Outlier Detector.
In [ ]:
!pygmentize trigger.yaml
In [ ]:
!kubectl apply -f trigger.yaml
Get the IP address of the Istio Ingress Gateway. This assumes you have installed istio with a LoadBalancer.
In [ ]:
CLUSTER_IPS=!(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
CLUSTER_IP=CLUSTER_IPS[0]
print(CLUSTER_IP)
In [ ]:
SERVICE_HOSTNAMES=!(kubectl get inferenceservice tfserving-cifar10 -o jsonpath='{.status.url}' | cut -d "/" -f 3)
SERVICE_HOSTNAME_CIFAR10=SERVICE_HOSTNAMES[0]
print(SERVICE_HOSTNAME_CIFAR10)
In [ ]:
SERVICE_HOSTNAMES=!(kubectl get ksvc vae-outlier -o jsonpath='{.status.url}' | cut -d "/" -f 3)
SERVICE_HOSTNAME_VAEOD=SERVICE_HOSTNAMES[0]
print(SERVICE_HOSTNAME_VAEOD)
In [ ]:
import matplotlib.pyplot as plt
import numpy as np
import json
import tensorflow as tf
tf.keras.backend.clear_session()
from alibi_detect.od.vae import OutlierVAE
from alibi_detect.utils.perturbation import apply_mask
from alibi_detect.utils.visualize import plot_feature_outlier_image
import requests
train, test = tf.keras.datasets.cifar10.load_data()
X_train, y_train = train
X_test, y_test = test
X_train = X_train.astype('float32') / 255
X_test = X_test.astype('float32') / 255
print(X_train.shape, y_train.shape, X_test.shape, y_test.shape)
classes = ('plane', 'car', 'bird', 'cat',
'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
def show(X):
plt.imshow(X.reshape(32, 32, 3))
plt.axis('off')
plt.show()
def predict(X):
formData = {
'instances': X.tolist()
}
headers = {}
headers["Host"] = SERVICE_HOSTNAME_CIFAR10
res = requests.post('http://'+CLUSTER_IP+'/v1/models/tfserving-cifar10:predict', json=formData, headers=headers)
if res.status_code == 200:
return classes[np.array(res.json()["predictions"])[0].argmax()]
else:
print("Failed with ",res.status_code)
return []
def outlier(X):
formData = {
'instances': X.tolist()
}
headers = {"Alibi-Detect-Return-Feature-Score":"true","Alibi-Detect-Return-Instance-Score":"true"}
headers["Host"] = SERVICE_HOSTNAME_VAEOD
res = requests.post('http://'+CLUSTER_IP+'/', json=formData, headers=headers)
if res.status_code == 200:
od = res.json()
od["data"]["feature_score"] = np.array(od["data"]["feature_score"])
od["data"]["instance_score"] = np.array(od["data"]["instance_score"])
return od
else:
print("Failed with ",res.status_code)
return []
In [ ]:
idx = 1
X = X_train[idx:idx+1]
show(X)
predict(X)
Lets check the message dumper for an outlier detection prediction. This should be false.
In [ ]:
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
if res[i] == 'Data,':
data.append(res[i+1])
j = json.loads(json.loads(data[0]))
print("Outlier",j["data"]["is_outlier"]==[1])
In [ ]:
np.random.seed(0)
X_mask, mask = apply_mask(X.reshape(1, 32, 32, 3),
mask_size=(10,10),
n_masks=1,
channels=[0,1,2],
mask_type='normal',
noise_distr=(0,1),
clip_rng=(0,1))
In [ ]:
show(X_mask)
predict(X_mask)
Now lets check the message dumper for a new message. This should show we have found an outlier.
In [ ]:
res=!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
data= []
for i in range(0,len(res)):
if res[i] == 'Data,':
data.append(res[i+1])
j = json.loads(json.loads(data[1]))
print("Outlier",j["data"]["is_outlier"]==[1])
We will now call our outlier detector directly and ask for the feature scores to gain more information about why it predicted this instance was an outlier.
In [ ]:
od_preds = outlier(X_mask)
We now plot those feature scores returned by the outlier detector along with our original image.
In [ ]:
plot_feature_outlier_image(od_preds,
X_mask,
X_recon=None)
In [ ]:
!kubectl delete -f cifar10.yaml
!kubectl delete -f cifar10od.yaml
!kubectl delete -f trigger.yaml
!kubectl delete -f message-dumper.yaml
In [ ]: