MNIST End to End examples with Kubeflow compoenents

This pipeline contains 5 steps, it finds the best hyperparameter using Katib, creates PVC for storing models, processes the hyperparameter results, distributedly trains the model on TFJob with the best hyperparameter using more iterations, and finally serves the model using KFServing. You can visit this medium blog for more details on this pipeline.

Define the model name and Kubeflow user namespace

Change the below cell with a name you want to use for this pipeline and which namespace you want to execute on Kubeflow.


In [ ]:
model_name = "mnist-demo"
user_namespace = "anonymous"

Import the DSL package and define the Kubeflow pipeline


In [ ]:
import json
from string import Template

import kfp
from kfp import components
from kfp.components import func_to_container_op
import kfp.dsl as dsl

In [ ]:
def convert_mnist_experiment_result(experiment_result) -> str:
    import json
    r = json.loads(experiment_result)
    args = []
    for hp in r:
        print(hp)
        args.append("%s=%s" % (hp["name"], hp["value"]))

    return " ".join(args)

def add_istio_annotation(op):
    op.add_pod_annotation(name='sidecar.istio.io/inject', value='false')
    return op

@dsl.pipeline(
    name="End to end pipeline",
    description="An end to end example including hyperparameter tuning, train and inference."
)
def mnist_pipeline(
        name=model_name,
        namespace=user_namespace,
        step=4000):
    # step 1: create a Katib experiment to tune hyperparameters
    objectiveConfig = {
      "type": "minimize",
      "goal": 0.001,
      "objectiveMetricName": "loss",
    }
    algorithmConfig = {"algorithmName" : "random"}
    parameters = [
      {"name": "--tf-learning-rate", "parameterType": "double", "feasibleSpace": {"min": "0.01","max": "0.03"}},
      {"name": "--tf-batch-size", "parameterType": "discrete", "feasibleSpace": {"list": ["16", "32", "64"]}},
    ]
    rawTemplate = {
      "apiVersion": "kubeflow.org/v1",
      "kind": "TFJob",
      "metadata": {
         "name": "{{.Trial}}",
         "namespace": "{{.NameSpace}}"
      },
      "spec": {
        "tfReplicaSpecs": {
          "Chief": {
            "replicas": 1,
            "restartPolicy": "OnFailure",
            "template": {
              "spec": {
                "containers": [
                {
                  "command": [
                    "sh",
                    "-c"
                  ],
                  "args": [
                    "python /opt/model.py --tf-train-steps=2000 {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
                  ],
                  "image": "liuhougangxa/tf-estimator-mnist",
                  "name": "tensorflow"
                }
                ]
              }
            }
          },
          "Worker": {
            "replicas": 3,
            "restartPolicy": "OnFailure",
            "template": {
              "spec": {
                "containers": [
                {
                  "command": [
                    "sh",
                    "-c"
                  ],
                  "args": [ 
                    "python /opt/model.py --tf-train-steps=2000 {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
                  ],
                  "image": "liuhougangxa/tf-estimator-mnist",
                  "name": "tensorflow"
                }
                ]
              }
            }
          }
        }
      }
    }
    
    trialTemplate = {
      "goTemplate": {
        "rawTemplate": json.dumps(rawTemplate)
      }
    }

    metricsCollectorSpec = {
      "source": {
        "fileSystemPath": {
          "path": "/tmp/tf",
          "kind": "Directory"
        }
      },
      "collector": {
        "kind": "TensorFlowEvent"
      }
    }

    katib_experiment_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml')
    op1 = katib_experiment_launcher_op(
            experiment_name=name,
            experiment_namespace=namespace,
            parallel_trial_count=3,
            max_trial_count=12,
            objective=str(objectiveConfig),
            algorithm=str(algorithmConfig),
            trial_template=str(trialTemplate),
            parameters=str(parameters),
            metrics_collector=str(metricsCollectorSpec),
            # experiment_timeout_minutes=experimentTimeoutMinutes,
            delete_finished_experiment=False)

    # step2: create a TFJob to train your model with best hyperparameter tuned by Katib
    tfjobjson_template = Template("""
{
  "apiVersion": "kubeflow.org/v1",
  "kind": "TFJob",
  "metadata": {
    "name": "$name",
    "namespace": "$namespace",
    "annotations": {
        "sidecar.istio.io/inject": "false"
    }
  },
  "spec": {
    "tfReplicaSpecs": {
      "Chief": {
        "replicas": 1,
        "restartPolicy": "OnFailure",
        "template": {
          "metadata": {
            "annotations": {
              "sidecar.istio.io/inject": "false"
            }
          },
          "spec": {
            "volumes": [
              {
                "name": "export-model",
                "persistentVolumeClaim": {
                  "claimName": "$modelpvc"
                }
              }
            ],
            "containers": [
              {
                "command": [
                  "sh",
                  "-c"
                ],
                "args": [
                  "python /opt/model.py --tf-train-steps=$step --tf-export-dir=/mnt/export $args"
                ],
                "image": "liuhougangxa/tf-estimator-mnist",
                "name": "tensorflow",
                "volumeMounts": [
                  {
                    "mountPath": "/mnt/export",
                    "name": "export-model"
                  }
                ]
              }
            ]
          }
        }
      },
      "Worker": {
        "replicas": 3,
        "restartPolicy": "OnFailure",
        "template": {
          "metadata": {
            "annotations": {
              "sidecar.istio.io/inject": "false"
            }
          },
          "spec": {
            "volumes": [
              {
                "name": "export-model",
                "persistentVolumeClaim": {
                  "claimName": "$modelpvc"
                }
              }
            ],
            "containers": [
              {
                "command": [
                  "sh",
                  "-c"
                ],
                "args": [
                  "python /opt/model.py --tf-train-steps=$step --tf-export-dir=/mnt/export $args"
                ],
                "image": "liuhougangxa/tf-estimator-mnist",
                "name": "tensorflow",
                "volumeMounts": [
                  {
                    "mountPath": "/mnt/export",
                    "name": "export-model"
                  }
                ]
              }
            ]
          }
        }
      }
    }
  }
}
""")

    convert_op = func_to_container_op(convert_mnist_experiment_result)
    op2 = convert_op(op1.output)
    
    volume_template = Template("""
{
  "apiVersion": "v1",
  "kind": "PersistentVolumeClaim",
  "metadata": {
    "name": "{{workflow.name}}-modelpvc",
    "namespace": "$namespace"
  },
  "spec": {
      "accessModes": ["ReadWriteMany"],
      "resources": {
          "requests": {
              "storage": "1Gi"
          }
      }
   }
}
""")
    
    volopjson = volume_template.substitute({'namespace': namespace})
    volop = json.loads(volopjson)

    modelvolop = dsl.ResourceOp(
        name="modelpvc",
        k8s_resource=volop
    )

    tfjobjson = tfjobjson_template.substitute(
            {'args': op2.output,
             'name': name,
             'namespace': namespace,
             'step': step,
             'modelpvc': modelvolop.outputs["name"]
            })

    tfjob = json.loads(tfjobjson)

    train = dsl.ResourceOp(
        name="train",
        k8s_resource=tfjob,
        success_condition='status.replicaStatuses.Worker.succeeded==3,status.replicaStatuses.Chief.succeeded==1'
    )

    # step 3: model inferencese by KFServing Inferenceservice
    inferenceservice_template = Template("""
{
  "apiVersion": "serving.kubeflow.org/v1alpha2",
  "kind": "InferenceService",
  "metadata": {
    "name": "$name",
    "namespace": "$namespace"
  },
  "spec": {
    "default": {
      "predictor": {
        "tensorflow": {
          "storageUri": "pvc://$modelpvc/"
        }
      }
    }
  }
}
""")
    inferenceservicejson = inferenceservice_template.substitute({'modelpvc': modelvolop.outputs["name"],
                                                                 'name': name,
                                                                 'namespace': namespace})
    inferenceservice =  json.loads(inferenceservicejson)
    inference = dsl.ResourceOp(
      name="inference",
      k8s_resource=inferenceservice,
      success_condition='status.url').after(train)
    
    dsl.get_pipeline_conf().add_op_transformer(add_istio_annotation)

Assign permission to Kubeflow pipeline service account and run this pipeline using the kfp-tekton SDK


In [ ]:
!kubectl create clusterrolebinding $user_namespace-admin --clusterrole cluster-admin --serviceaccount=kubeflow:pipeline-run

In [ ]:
# Specify Kubeflow Pipeline Host
host=None

# Submit a pipeline run
from kfp_tekton import TektonClient
TektonClient(host=host).create_run_from_pipeline_func(mnist_pipeline, arguments={})

When the pipeline done, you can get inferenceservice name using the below command, for example in this case in my cluster, the inference-name is mnist-demo


In [ ]:
!kubectl get inferenceservice -n $user_namespace

Download a mnist picture for inference test if it's not in this directory, such as 9.bmp from here. Then upload it to the notebook.

Update the istio_ingress_gateway below with your kfserving ingress endpoint. Then, execute the below cell to send a sample payload to the deployed model.


In [ ]:
import numpy as np
from PIL import Image
import requests

# Get istio_ingress_gateway endpoint by "kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}'"
istio_ingress_gateway = "xxx.xx.xx.xxx"

# Get inference_name as above step 1
inference_name = model_name
inference_namespace = user_namespace
# image_file is the mnist picture uploaded as above step 2
image_file = '9.bmp'
data = np.array(Image.open(image_file).convert('L').resize((28, 28))).astype(np.float).reshape(-1, 28, 28, 1)
np.set_printoptions(threshold=np.inf)       
json_request = '{{ "instances" : {} }}'.format(np.array2string(data, separator=',', formatter={'float':lambda x: "%.1f" % x}))
headers={"Host": "%s.%s.example.com" % (inference_name, inference_namespace)}

response = requests.post("http://%s/v1/models/%s:predict" % (istio_ingress_gateway, inference_name), data = json_request, headers = headers)
print(response.json())

Clean up

Due to Tekton lacking the exit operation support. We need to run the below commands to clean up the resources from this pipeline.


In [ ]:
!kubectl delete inferenceservice -n $user_namespace $model_name
!kubectl delete experiment -n $user_namespace $model_name
!kubectl delete tfjob -n $user_namespace $model_name