MNIST E2E on Kubeflow on Vanilla k8s

This example guides you through:

  1. Taking an example TensorFlow model and modifying it to support distributed training
  2. Serving the resulting model using TFServing
  3. Deploying and using a web-app that uses the model

Requirements

  • You must be running Kubeflow 1.0 using the k8s istio config or the istio dex config.

Prepare model

There is a delta between existing distributed mnist examples and what's needed to run well as a TFJob.

Basically, we must:

  1. Add options in order to make the model configurable.
  2. Use tf.estimator.train_and_evaluate to enable model exporting and serving.
  3. Define serving signatures for model serving.

The resulting model is model.py.

Configure external service credentials

Step 1 - Pushing to DockerHub

Source documentation: Kaniko docs

Why do we need this?

Kaniko is used by fairing to build the model every time the notebook is run and deploy a fresh model. The newly built image is pushed into the DOCKER_REGISTRY and pulled from there by subsequent resources.

Configure docker credentials

Get your docker registry user and password encoded in base64

echo -n USER:PASSWORD | base64

Create a config.json file with your Docker registry url and the previous generated base64 string

{
    "auths": {
        "https://index.docker.io/v1/": {
            "auth": "xxxxxxxxxxxxxxx"
        }
    }
}


Create a config-map in the namespace you're using with the docker config

kubectl create --namespace ${NAMESPACE} configmap docker-config --from-file=<path to config.json>

Step 2 - Set DOCKER_REGISTRY

The DOCKER_REGISTRY variable is used to push the newly built image.
Please change the variable to the registry for which you've configured credentials.


In [ ]:
from kubernetes import client as k8s_client
from kubernetes.client import rest as k8s_rest
from kubeflow import fairing   
from kubeflow.fairing import utils as fairing_utils
from kubeflow.fairing.builders import append
from kubeflow.fairing.deployers import job
from kubeflow.fairing.preprocessors import base as base_preprocessor

DOCKER_REGISTRY = "ciscoai"
namespace = fairing_utils.get_current_k8s_namespace()

from kubernetes import client as k8s_client
from kubernetes.client.rest import ApiException

api_client = k8s_client.CoreV1Api()
minio_service_endpoint = None
try:
    minio_service_endpoint = api_client.read_namespaced_service(name='minio-service', namespace='kubeflow').spec.cluster_ip
except ApiException as e:
    if e.status == 403:
        logging.warning(f"The service account doesn't have sufficient privileges "
                      f"to get the kubeflow minio-service. "
                      f"You will have to manually enter the minio cluster-ip. "
                      f"To make this function work ask someone with cluster "
                      f"priveleges to create an appropriate "
                      f"clusterrolebinding by running a command.\n"
                      f"kubectl create --namespace=kubeflow rolebinding "
                       "--clusterrole=kubeflow-view "
                       "--serviceaccount=${NAMESPACE}:default-editor "
                       "${NAMESPACE}-minio-view")
        logging.error("API access denied with reason: {e.reason}")

s3_endpoint = minio_service_endpoint
minio_endpoint = "http://"+s3_endpoint
minio_username = "minio"
minio_key = "minio123"
minio_region = "us-east-1"

logging.info(f"Running in namespace {namespace}")
logging.info(f"Using docker registry {DOCKER_REGISTRY}")
logging.info(f"Using minio instance with endpoint '{s3_endpoint}'")

Install Required Libraries

Import the libraries required to train this model.


In [ ]:
import logging
import os
import uuid
from importlib import reload
import notebook_setup
reload(notebook_setup)
notebook_setup.notebook_setup()

In [ ]:
import k8s_util
# Force a reload of kubeflow; since kubeflow is a multi namespace module
# it looks like doing this in notebook_setup may not be sufficient
import kubeflow
reload(kubeflow)
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from kubeflow.tfjob.api import tf_job_client as tf_job_client_module
from IPython.core.display import display, HTML
import yaml

In [ ]:
# TODO(https://github.com/kubeflow/fairing/issues/426): We should get rid of this once the default 
# Kaniko image is updated to a newer image than 0.7.0.
from kubeflow.fairing import constants
constants.constants.KANIKO_IMAGE = "gcr.io/kaniko-project/executor:v0.14.0"

In [ ]:
from kubeflow.fairing.builders import cluster

# output_map is a map of extra files to add to the notebook.
# It is a map from source location to the location inside the context.
output_map =  {
    "Dockerfile.model": "Dockerfile",
    "model.py": "model.py"
}

preprocessor = base_preprocessor.BasePreProcessor(
    command=["python"], # The base class will set this.
    input_files=[],
    path_prefix="/app", # irrelevant since we aren't preprocessing any files
    output_map=output_map)

preprocessor.preprocess()

In [ ]:
# Use a Tensorflow image as the base image
# We use a custom Dockerfile 
from kubeflow.fairing.cloud.k8s import MinioUploader
from kubeflow.fairing.builders.cluster.minio_context import MinioContextSource

minio_uploader = MinioUploader(endpoint_url=minio_endpoint, minio_secret=minio_username, minio_secret_key=minio_key, region_name=minio_region)
minio_context_source = MinioContextSource(endpoint_url=minio_endpoint, minio_secret=minio_username, minio_secret_key=minio_key, region_name=minio_region)

In [ ]:
cluster_builder = cluster.cluster.ClusterBuilder(registry=DOCKER_REGISTRY,
                                                 base_image="", # base_image is set in the Dockerfile
                                                 preprocessor=preprocessor,
                                                 image_name="mnist",
                                                 dockerfile_path="Dockerfile",
                                                 context_source=minio_context_source)
cluster_builder.build()
logging.info(f"Built image {cluster_builder.image_tag}")

Create a Minio Bucket

  • Create a minio bucket to store our models and other results.

In [ ]:
mnist_bucket = f"{DOCKER_REGISTRY}-mnist"
minio_uploader.create_bucket(mnist_bucket)
logging.info(f"Bucket {mnist_bucket} created or already exists")

Distributed training

  • We will train the model by using TFJob to run a distributed training job

Training job parameters


In [ ]:
train_name = f"mnist-train-{uuid.uuid4().hex[:4]}"
num_ps = 1
num_workers = 2
model_dir = f"s3://{mnist_bucket}/mnist"
export_path = f"s3://{mnist_bucket}/mnist/export" 
train_steps = 200
batch_size = 100
learning_rate = .01
image = cluster_builder.image_tag

In [ ]:
train_spec = f"""apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
  name: {train_name}  
spec:
  tfReplicaSpecs:
    Ps:
      replicas: {num_ps}
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          serviceAccount: default-editor
          containers:
          - name: tensorflow
            command:
            - python
            - /opt/model.py
            - --tf-model-dir={model_dir}
            - --tf-export-dir={export_path}
            - --tf-train-steps={train_steps}
            - --tf-batch-size={batch_size}
            - --tf-learning-rate={learning_rate}
            env:
            - name: S3_ENDPOINT
              value: {s3_endpoint}
            - name: AWS_ENDPOINT_URL
              value: {minio_endpoint}
            - name: AWS_REGION
              value: {minio_region}
            - name: BUCKET_NAME
              value: {mnist_bucket}
            - name: S3_USE_HTTPS
              value: "0"
            - name: S3_VERIFY_SSL
              value: "0"
            - name: AWS_ACCESS_KEY_ID
              value: {minio_username}
            - name: AWS_SECRET_ACCESS_KEY
              value: {minio_key}
            image: {image}
            workingDir: /opt
          restartPolicy: OnFailure
    Chief:
      replicas: 1
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          serviceAccount: default-editor
          containers:
          - name: tensorflow
            command:
            - python
            - /opt/model.py
            - --tf-model-dir={model_dir}
            - --tf-export-dir={export_path}
            - --tf-train-steps={train_steps}
            - --tf-batch-size={batch_size}
            - --tf-learning-rate={learning_rate}
            env:
            - name: S3_ENDPOINT
              value: {s3_endpoint}
            - name: AWS_ENDPOINT_URL
              value: {minio_endpoint}
            - name: AWS_REGION
              value: {minio_region}
            - name: BUCKET_NAME
              value: {mnist_bucket}
            - name: S3_USE_HTTPS
              value: "0"
            - name: S3_VERIFY_SSL
              value: "0"
            - name: AWS_ACCESS_KEY_ID
              value: {minio_username}
            - name: AWS_SECRET_ACCESS_KEY
              value: {minio_key}
            image: {image}
            workingDir: /opt
          restartPolicy: OnFailure
    Worker:
      replicas: 1
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          serviceAccount: default-editor
          containers:
          - name: tensorflow
            command:
            - python
            - /opt/model.py
            - --tf-model-dir={model_dir}
            - --tf-export-dir={export_path}
            - --tf-train-steps={train_steps}
            - --tf-batch-size={batch_size}
            - --tf-learning-rate={learning_rate}
            env:
            - name: S3_ENDPOINT
              value: {s3_endpoint}
            - name: AWS_ENDPOINT_URL
              value: {minio_endpoint}
            - name: AWS_REGION
              value: {minio_region}
            - name: BUCKET_NAME
              value: {mnist_bucket}
            - name: S3_USE_HTTPS
              value: "0"
            - name: S3_VERIFY_SSL
              value: "0"
            - name: AWS_ACCESS_KEY_ID
              value: {minio_username}
            - name: AWS_SECRET_ACCESS_KEY
              value: {minio_key}
            image: {image}
            workingDir: /opt
          restartPolicy: OnFailure
"""

Create the training job

  • You could write the spec to a YAML file and then do kubectl apply -f {FILE}
  • Since you are running in jupyter you will use the TFJob client
  • You will run the TFJob in a namespace created by a Kubeflow profile
    • The namespace will be the same namespace you are running the notebook in
    • Creating a profile ensures the namespace is provisioned with service accounts and other resources needed for Kubeflow

In [ ]:
tf_job_client = tf_job_client_module.TFJobClient()

In [ ]:
tf_job_body = yaml.safe_load(train_spec)
tf_job = tf_job_client.create(tf_job_body, namespace=namespace)  

logging.info(f"Created job {namespace}.{train_name}")

In [ ]:
from kubeflow.tfjob import TFJobClient
tfjob_client = TFJobClient()
tfjob_client.wait_for_job(train_name, namespace=namespace, watch=True)

Get TF Job logs


In [ ]:
tfjob_client.get_logs(train_name, namespace=namespace)

Check the model in Minio


In [ ]:
#TODO(swiftdiaries): Check object key for model specifically
from botocore.exceptions import ClientError

try:
    model_response = minio_uploader.client.list_objects(Bucket=mnist_bucket)
    # Minimal check to see if at least the bucket is created
    if model_response["ResponseMetadata"]["HTTPStatusCode"] == 200:
        logging.info(f"{model_dir} found in {mnist_bucket} bucket")
except ClientError as err:
    logging.error(err)

Deploy Tensorboard


In [ ]:
tb_name = "mnist-tensorboard"
tb_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: mnist-tensorboard
  name: {tb_name}
  namespace: {namespace}
spec:
  selector:
    matchLabels:
      app: mnist-tensorboard
  template:
    metadata:
      labels:
        app: mnist-tensorboard
        version: v1
    spec:
      serviceAccount: default-editor
      containers:
      - command:
        - /usr/local/bin/tensorboard
        - --logdir={model_dir}
        - --port=80
        image: tensorflow/tensorflow:1.15.2-py3
        env:
        - name: S3_ENDPOINT
          value: {s3_endpoint}
        - name: AWS_ENDPOINT_URL
          value: {minio_endpoint}
        - name: AWS_REGION
          value: {minio_region}
        - name: BUCKET_NAME
          value: {mnist_bucket}
        - name: S3_USE_HTTPS
          value: "0"
        - name: S3_VERIFY_SSL
          value: "0"
        - name: AWS_ACCESS_KEY_ID
          value: {minio_username}
        - name: AWS_SECRET_ACCESS_KEY
          value: {minio_key}  
        name: tensorboard
        ports:
        - containerPort: 80
"""
tb_service = f"""apiVersion: v1
kind: Service
metadata:
  labels:
    app: mnist-tensorboard
  name: {tb_name}
  namespace: {namespace}
spec:
  ports:
  - name: http-tb
    port: 80
    targetPort: 80
  selector:
    app: mnist-tensorboard
  type: ClusterIP
"""

tb_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: {tb_name}
  namespace: {namespace}
spec:
  gateways:
  - kubeflow/kubeflow-gateway
  hosts:
  - '*'
  http:
  - match:
    - uri:
        prefix: /mnist/{namespace}/tensorboard/
    rewrite:
      uri: /
    route:
    - destination:
        host: {tb_name}.{namespace}.svc.cluster.local
        port:
          number: 80
    timeout: 300s
"""

tb_specs = [tb_deploy, tb_service, tb_virtual_service]

In [ ]:
k8s_util.apply_k8s_specs(tb_specs, k8s_util.K8S_CREATE_OR_REPLACE)

Get Tensorboard URL

Run this with the appropriate RBAC permissions


In [ ]:
istio_ingress_endpoint = None
try:
    istio_ingress_endpoint = api_client.read_namespaced_service(name='istio-ingressgateway', namespace='istio-system')
    istio_ports = istio_ingress_endpoint.spec.ports
    for istio_port in istio_ports:
        if istio_port.name == "http2":
            logging.warning("get worker-node-ip by running 'kubectl get nodes -o wide'")
            logging.info(f"Tensorboard URL: <worker-node-ip>:{istio_port.node_port}/mnist/anonymous/tensorboard/")
except ApiException as e:
    if e.status == 403:
        logging.warning(f"The service account doesn't have sufficient privileges "
                      f"to get the kubeflow minio-service. "
                      f"You will have to manually enter the minio cluster-ip. "
                      f"To make this function work ask someone with cluster "
                      f"priveleges to create an appropriate "
                      f"clusterrolebinding by running a command.\n"
                      f"kubectl create --namespace=istio-system rolebinding "
                       "--clusterrole=kubeflow-view "
                       "--serviceaccount=${NAMESPACE}:default-editor "
                       "${NAMESPACE}-ingressgateway-view")
        logging.warn("API Access restricted. Please get URL by running the kubectl commands at the end of the notebook")

Serve the model

  • Deploy the model using tensorflow serving
  • We need to create
    1. A Kubernetes Deployment
    2. A Kubernetes service
    3. (Optional) Create a configmap containing the prometheus monitoring config

In [ ]:
deploy_name = "mnist-model"
model_base_path = export_path

# The web ui defaults to mnist-service so if you change it you will
# need to change it in the UI as well to send predictions to the mode
model_service = "mnist-service"

deploy_spec = f"""apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: mnist
  name: {deploy_name}
  namespace: {namespace}
spec:
  selector:
    matchLabels:
      app: mnist-model
  template:
    metadata:
      # TODO(jlewi): Right now we disable the istio side car because otherwise ISTIO rbac will prevent the
      # UI from sending RPCs to the server. We should create an appropriate ISTIO rbac authorization
      # policy to allow traffic from the UI to the model servier.
      # https://istio.io/docs/concepts/security/#target-selectors
      annotations:        
        sidecar.istio.io/inject: "false"
      labels:
        app: mnist-model
        version: v1
    spec:
      serviceAccount: default-editor
      containers:
      - args:
        - --port=9000
        - --rest_api_port=8500
        - --model_name=mnist
        - --model_base_path={model_base_path}
        command:
        - /usr/bin/tensorflow_model_server
        env:
        - name: modelBasePath
          value: {model_base_path}
        - name: S3_ENDPOINT
          value: {s3_endpoint}
        - name: AWS_ENDPOINT_URL
          value: {minio_endpoint}
        - name: AWS_REGION
          value: {minio_region}
        - name: BUCKET_NAME
          value: {mnist_bucket}
        - name: S3_USE_HTTPS
          value: "0"
        - name: S3_VERIFY_SSL
          value: "0"
        - name: AWS_ACCESS_KEY_ID
          value: {minio_username}
        - name: AWS_SECRET_ACCESS_KEY
          value: {minio_key}  
        image: tensorflow/serving:1.15.0
        imagePullPolicy: IfNotPresent
        livenessProbe:
          initialDelaySeconds: 30
          periodSeconds: 30
          tcpSocket:
            port: 9000
        name: mnist
        ports:
        - containerPort: 9000
        - containerPort: 8500
        resources:
          limits:
            cpu: "4"
            memory: 4Gi
          requests:
            cpu: "1"
            memory: 1Gi
        volumeMounts:
        - mountPath: /var/config/
          name: model-config
      volumes:
      - configMap:
          name: {deploy_name}
        name: model-config
"""

service_spec = f"""apiVersion: v1
kind: Service
metadata:
  annotations:    
    prometheus.io/path: /monitoring/prometheus/metrics
    prometheus.io/port: "8500"
    prometheus.io/scrape: "true"
  labels:
    app: mnist-model
  name: {model_service}
  namespace: {namespace}
spec:
  ports:
  - name: grpc-tf-serving
    port: 9000
    targetPort: 9000
  - name: http-tf-serving
    port: 8500
    targetPort: 8500
  selector:
    app: mnist-model
  type: ClusterIP
"""

monitoring_config = f"""kind: ConfigMap
apiVersion: v1
metadata:
  name: {deploy_name}
  namespace: {namespace}
data:
  monitoring_config.txt: |-
    prometheus_config: {{
      enable: true,
      path: "/monitoring/prometheus/metrics"
    }}
"""

model_specs = [deploy_spec, service_spec, monitoring_config]

In [ ]:
k8s_util.apply_k8s_specs(model_specs, k8s_util.K8S_CREATE_OR_REPLACE)

Deploy the mnist UI

  • We will now deploy the UI to visual the mnist results
  • Note: This is using a prebuilt and public docker image for the UI

In [ ]:
ui_name = "mnist-ui"
ui_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
  name: {ui_name}
  namespace: {namespace}
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mnist-web-ui
  template:
    metadata:
      labels:
        app: mnist-web-ui
    spec:
      containers:
      - image: gcr.io/kubeflow-examples/mnist/web-ui:v20190112-v0.2-142-g3b38225
        name: web-ui
        ports:
        - containerPort: 5000        
      serviceAccount: default-editor
"""

ui_service = f"""apiVersion: v1
kind: Service
metadata:
  annotations:
  name: {ui_name}
  namespace: {namespace}
spec:
  ports:
  - name: http-mnist-ui
    port: 80
    targetPort: 5000
  selector:
    app: mnist-web-ui
  type: ClusterIP
"""

ui_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: {ui_name}
  namespace: {namespace}
spec:
  gateways:
  - kubeflow/kubeflow-gateway
  hosts:
  - '*'
  http:
  - match:
    - uri:
        prefix: /mnist/{namespace}/ui/
    rewrite:
      uri: /
    route:
    - destination:
        host: {ui_name}.{namespace}.svc.cluster.local
        port:
          number: 80
    timeout: 300s
"""

ui_specs = [ui_deploy, ui_service, ui_virtual_service]

In [ ]:
k8s_util.apply_k8s_specs(ui_specs, k8s_util.K8S_CREATE_OR_REPLACE)

Access the web UI


In [ ]:
istio_ingress_endpoint = None
try:
    istio_ingress_endpoint = api_client.read_namespaced_service(name='istio-ingressgateway', namespace='istio-system')
    istio_ports = istio_ingress_endpoint.spec.ports
    for istio_port in istio_ports:
        if istio_port.name == "http2":
            logging.warning("get worker-node-ip by running 'kubectl get nodes -o wide'")
            logging.info(f"Tensorboard URL: <worker-node-ip>:{istio_port.node_port}/mnist/anonymous/ui/")
except ApiException as e:
    if e.status == 403:
        logging.warning(f"The service account doesn't have sufficient privileges "
                      f"to get the kubeflow minio-service. "
                      f"You will have to manually enter the minio cluster-ip. "
                      f"To make this function work ask someone with cluster "
                      f"priveleges to create an appropriate "
                      f"clusterrolebinding by running a command.\n"
                      f"kubectl create --namespace=kubeflow rolebinding "
                       "--clusterrole=kubeflow-view "
                       "--serviceaccount=${NAMESPACE}:default-editor "
                       "${NAMESPACE}-minio-view")
        logging.warn("API Access restricted. Please get URL by running the kubectl commands at the end of the notebook")

Get Tensorboard URL

Run this with the appropriate RBAC permissions
Note: You can get the node worker ip from kubectl get no -o wide

export INGRESS_HOST=<worker-node-ip>
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
printf "Tensorboard URL: \n${INGRESS_HOST}:${INGRESS_PORT}/mnist/anonymous/tensorboard/\n"

Access the Web UI

Run this with the appropriate RBAC permissions
Note: You can get the node worker ip from kubectl get no -o wide

!export INGRESS_HOST=<worker-node-ip>
!export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
!printf "mnist-web-app URL: \n${INGRESS_HOST}:${INGRESS_PORT}/mnist/anonymous/ui/\n"

In [ ]: