We create a message dumper KNaive service to print out CloudEvents it receives:
In [ ]:
!pygmentize message-dumper.yaml
In [ ]:
!kubectl apply -f message-dumper.yaml
Label the default namespace to activate KNative eventing broker
In [ ]:
!kubectl label namespace default knative-eventing-injection=enabled
Create a knative trigger to pass events to the message logger
In [ ]:
!pygmentize trigger.yaml
In [ ]:
!kubectl apply -f trigger.yaml
Create a SkLearn model with associated logger to push events to the message logger URL
In [ ]:
!pygmentize sklearn-logging.yaml
In [ ]:
!kubectl apply -f sklearn-logging.yaml
In [ ]:
CLUSTER_IPS=!(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
CLUSTER_IP=CLUSTER_IPS[0]
print(CLUSTER_IP)
In [ ]:
SERVICE_HOSTNAMES=!(kubectl get inferenceservice sklearn-iris -o jsonpath='{.status.url}' | cut -d "/" -f 3)
SERVICE_HOSTNAME=SERVICE_HOSTNAMES[0]
print(SERVICE_HOSTNAME)
In [ ]:
import requests
def predict(X, name, svc_hostname, cluster_ip):
formData = {
'instances': X
}
headers = {}
headers["Host"] = svc_hostname
res = requests.post('http://'+cluster_ip+'/v1/models/'+name+':predict', json=formData, headers=headers)
if res.status_code == 200:
return res.json()
else:
print("Failed with ",res.status_code)
return []
In [ ]:
predict([[6.8, 2.8, 4.8, 1.4]],"sklearn-iris",SERVICE_HOSTNAME,CLUSTER_IP)
In [ ]:
!kubectl logs $(kubectl get pod -l serving.knative.dev/configuration=message-dumper -o jsonpath='{.items[0].metadata.name}') user-container
In [ ]:
!kubectl delete -f sklearn-logging.yaml
In [ ]:
!kubectl delete -f trigger.yaml
In [ ]:
!kubectl delete -f message-dumper.yaml
In [ ]: