This pipeline contains 5 steps, it finds the best hyperparameter using Katib, creates PVC for storing models, processes the hyperparameter results, distributedly trains the model on TFJob with the best hyperparameter using more iterations, and finally serves the model using KFServing. You can visit this medium blog for more details on this pipeline.
In [ ]:
model_name = "mnist-demo"
user_namespace = "anonymous"
Import the DSL package and define the Kubeflow pipeline
In [ ]:
import json
from string import Template
import kfp
from kfp import components
from kfp.components import func_to_container_op
import kfp.dsl as dsl
In [ ]:
def convert_mnist_experiment_result(experiment_result) -> str:
import json
r = json.loads(experiment_result)
args = []
for hp in r:
print(hp)
args.append("%s=%s" % (hp["name"], hp["value"]))
return " ".join(args)
def add_istio_annotation(op):
op.add_pod_annotation(name='sidecar.istio.io/inject', value='false')
return op
@dsl.pipeline(
name="End to end pipeline",
description="An end to end example including hyperparameter tuning, train and inference."
)
def mnist_pipeline(
name=model_name,
namespace=user_namespace,
step=4000):
# step 1: create a Katib experiment to tune hyperparameters
objectiveConfig = {
"type": "minimize",
"goal": 0.001,
"objectiveMetricName": "loss",
}
algorithmConfig = {"algorithmName" : "random"}
parameters = [
{"name": "--tf-learning-rate", "parameterType": "double", "feasibleSpace": {"min": "0.01","max": "0.03"}},
{"name": "--tf-batch-size", "parameterType": "discrete", "feasibleSpace": {"list": ["16", "32", "64"]}},
]
rawTemplate = {
"apiVersion": "kubeflow.org/v1",
"kind": "TFJob",
"metadata": {
"name": "{{.Trial}}",
"namespace": "{{.NameSpace}}"
},
"spec": {
"tfReplicaSpecs": {
"Chief": {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"spec": {
"containers": [
{
"command": [
"sh",
"-c"
],
"args": [
"python /opt/model.py --tf-train-steps=2000 {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
],
"image": "liuhougangxa/tf-estimator-mnist",
"name": "tensorflow"
}
]
}
}
},
"Worker": {
"replicas": 3,
"restartPolicy": "OnFailure",
"template": {
"spec": {
"containers": [
{
"command": [
"sh",
"-c"
],
"args": [
"python /opt/model.py --tf-train-steps=2000 {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
],
"image": "liuhougangxa/tf-estimator-mnist",
"name": "tensorflow"
}
]
}
}
}
}
}
}
trialTemplate = {
"goTemplate": {
"rawTemplate": json.dumps(rawTemplate)
}
}
metricsCollectorSpec = {
"source": {
"fileSystemPath": {
"path": "/tmp/tf",
"kind": "Directory"
}
},
"collector": {
"kind": "TensorFlowEvent"
}
}
katib_experiment_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml')
op1 = katib_experiment_launcher_op(
experiment_name=name,
experiment_namespace=namespace,
parallel_trial_count=3,
max_trial_count=12,
objective=str(objectiveConfig),
algorithm=str(algorithmConfig),
trial_template=str(trialTemplate),
parameters=str(parameters),
metrics_collector=str(metricsCollectorSpec),
# experiment_timeout_minutes=experimentTimeoutMinutes,
delete_finished_experiment=False)
# step2: create a TFJob to train your model with best hyperparameter tuned by Katib
tfjobjson_template = Template("""
{
"apiVersion": "kubeflow.org/v1",
"kind": "TFJob",
"metadata": {
"name": "$name",
"namespace": "$namespace",
"annotations": {
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"tfReplicaSpecs": {
"Chief": {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"volumes": [
{
"name": "export-model",
"persistentVolumeClaim": {
"claimName": "$modelpvc"
}
}
],
"containers": [
{
"command": [
"sh",
"-c"
],
"args": [
"python /opt/model.py --tf-train-steps=$step --tf-export-dir=/mnt/export $args"
],
"image": "liuhougangxa/tf-estimator-mnist",
"name": "tensorflow",
"volumeMounts": [
{
"mountPath": "/mnt/export",
"name": "export-model"
}
]
}
]
}
}
},
"Worker": {
"replicas": 3,
"restartPolicy": "OnFailure",
"template": {
"metadata": {
"annotations": {
"sidecar.istio.io/inject": "false"
}
},
"spec": {
"volumes": [
{
"name": "export-model",
"persistentVolumeClaim": {
"claimName": "$modelpvc"
}
}
],
"containers": [
{
"command": [
"sh",
"-c"
],
"args": [
"python /opt/model.py --tf-train-steps=$step --tf-export-dir=/mnt/export $args"
],
"image": "liuhougangxa/tf-estimator-mnist",
"name": "tensorflow",
"volumeMounts": [
{
"mountPath": "/mnt/export",
"name": "export-model"
}
]
}
]
}
}
}
}
}
}
""")
convert_op = func_to_container_op(convert_mnist_experiment_result)
op2 = convert_op(op1.output)
volume_template = Template("""
{
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"metadata": {
"name": "{{workflow.name}}-modelpvc",
"namespace": "$namespace"
},
"spec": {
"accessModes": ["ReadWriteMany"],
"resources": {
"requests": {
"storage": "1Gi"
}
}
}
}
""")
volopjson = volume_template.substitute({'namespace': namespace})
volop = json.loads(volopjson)
modelvolop = dsl.ResourceOp(
name="modelpvc",
k8s_resource=volop
)
tfjobjson = tfjobjson_template.substitute(
{'args': op2.output,
'name': name,
'namespace': namespace,
'step': step,
'modelpvc': modelvolop.outputs["name"]
})
tfjob = json.loads(tfjobjson)
train = dsl.ResourceOp(
name="train",
k8s_resource=tfjob,
success_condition='status.replicaStatuses.Worker.succeeded==3,status.replicaStatuses.Chief.succeeded==1'
)
# step 3: model inferencese by KFServing Inferenceservice
inferenceservice_template = Template("""
{
"apiVersion": "serving.kubeflow.org/v1alpha2",
"kind": "InferenceService",
"metadata": {
"name": "$name",
"namespace": "$namespace"
},
"spec": {
"default": {
"predictor": {
"tensorflow": {
"storageUri": "pvc://$modelpvc/"
}
}
}
}
}
""")
inferenceservicejson = inferenceservice_template.substitute({'modelpvc': modelvolop.outputs["name"],
'name': name,
'namespace': namespace})
inferenceservice = json.loads(inferenceservicejson)
inference = dsl.ResourceOp(
name="inference",
k8s_resource=inferenceservice,
success_condition='status.url').after(train)
dsl.get_pipeline_conf().add_op_transformer(add_istio_annotation)
Assign permission to Kubeflow pipeline service account and run this pipeline using the kfp-tekton SDK
In [ ]:
!kubectl create clusterrolebinding $user_namespace-admin --clusterrole cluster-admin --serviceaccount=kubeflow:pipeline-run
In [ ]:
# Specify Kubeflow Pipeline Host
host=None
# Submit a pipeline run
from kfp_tekton import TektonClient
TektonClient(host=host).create_run_from_pipeline_func(mnist_pipeline, arguments={})
When the pipeline done, you can get inferenceservice
name using the below command, for example in this case in my cluster, the inference-name
is mnist-demo
In [ ]:
!kubectl get inferenceservice -n $user_namespace
Download a mnist picture for inference test if it's not in this directory, such as 9.bmp from here. Then upload it to the notebook.
Update the istio_ingress_gateway below with your kfserving ingress endpoint. Then, execute the below cell to send a sample payload to the deployed model.
In [ ]:
import numpy as np
from PIL import Image
import requests
# Get istio_ingress_gateway endpoint by "kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}'"
istio_ingress_gateway = "xxx.xx.xx.xxx"
# Get inference_name as above step 1
inference_name = model_name
inference_namespace = user_namespace
# image_file is the mnist picture uploaded as above step 2
image_file = '9.bmp'
data = np.array(Image.open(image_file).convert('L').resize((28, 28))).astype(np.float).reshape(-1, 28, 28, 1)
np.set_printoptions(threshold=np.inf)
json_request = '{{ "instances" : {} }}'.format(np.array2string(data, separator=',', formatter={'float':lambda x: "%.1f" % x}))
headers={"Host": "%s.%s.example.com" % (inference_name, inference_namespace)}
response = requests.post("http://%s/v1/models/%s:predict" % (istio_ingress_gateway, inference_name), data = json_request, headers = headers)
print(response.json())
In [ ]:
!kubectl delete inferenceservice -n $user_namespace $model_name
!kubectl delete experiment -n $user_namespace $model_name
!kubectl delete tfjob -n $user_namespace $model_name