MNIST E2E on Kubeflow on AWS

This example guides you through:

  1. Taking an example TensorFlow model and modifying it to support distributed training
  2. Serving the resulting model using TFServing
  3. Deploying and using a web-app that uses the model

Requirements

  • You must be running Kubeflow 1.0 on EKS

Install AWS CLI

Click Kernal -> Restart after your install new packages.


In [ ]:
!pip install boto3

Create AWS secret in kubernetes and grant aws access to your notebook

Note: Once IAM for Service Account is merged in 1.0.1, we don't have to use credentials

  1. Please create an AWS secret in current namespace.

Note: To get base64 string, try echo -n $AWS_ACCESS_KEY_ID | base64. Make sure you have AmazonEC2ContainerRegistryFullAccess and AmazonS3FullAccess for this experiment. Pods will use credentials to talk to AWS services.


In [ ]:
%%bash

# Replace placeholder with your own AWS credentials
AWS_ACCESS_KEY_ID='<your_aws_access_key_id>'
AWS_SECRET_ACCESS_KEY='<your_aws_secret_access_key>'

kubectl create secret generic aws-secret --from-literal=AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} --from-literal=AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
  1. Attach AmazonEC2ContainerRegistryFullAccess and AmazonS3FullAccess to EKS node group role and grant AWS access to notebook.

Verify you have access to AWS services

  • The cell below checks that this notebook was spawned with credentials to access AWS S3 and ECR

In [ ]:
import logging
import os
import uuid
from importlib import reload
import boto3

# Set REGION for s3 bucket and elastic contaienr registry
AWS_REGION='us-west-2'
boto3.client('s3', region_name=AWS_REGION).list_buckets()
boto3.client('ecr', region_name=AWS_REGION).describe_repositories()

Prepare model

There is a delta between existing distributed mnist examples and what's needed to run well as a TFJob.

Basically, we must:

  1. Add options in order to make the model configurable.
  2. Use tf.estimator.train_and_evaluate to enable model exporting and serving.
  3. Define serving signatures for model serving.

The resulting model is model.py.

Install Required Libraries

Import the libraries required to train this model.


In [ ]:
import notebook_setup
reload(notebook_setup)
notebook_setup.notebook_setup(platform='aws')

In [ ]:
import k8s_util
# Force a reload of kubeflow; since kubeflow is a multi namespace module
# it looks like doing this in notebook_setup may not be sufficient
import kubeflow
reload(kubeflow)
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from kubeflow.tfjob.api import tf_job_client as tf_job_client_module
from IPython.core.display import display, HTML
import yaml

Configure The Docker Registry For Kubeflow Fairing

  • In order to build docker images from your notebook we need a docker registry where the images will be stored
  • Below you set some variables specifying a Amazon Elastic Container Registry
  • Kubeflow Fairing provides a utility function to guess the name of your AWS account

In [ ]:
from kubernetes import client as k8s_client
from kubernetes.client import rest as k8s_rest
from kubeflow import fairing   
from kubeflow.fairing import utils as fairing_utils
from kubeflow.fairing.builders import append
from kubeflow.fairing.deployers import job
from kubeflow.fairing.preprocessors import base as base_preprocessor

# Setting up AWS Elastic Container Registry (ECR) for storing output containers
# You can use any docker container registry istead of ECR
AWS_ACCOUNT_ID=fairing.cloud.aws.guess_account_id()
AWS_ACCOUNT_ID = boto3.client('sts').get_caller_identity().get('Account')
DOCKER_REGISTRY = '{}.dkr.ecr.{}.amazonaws.com'.format(AWS_ACCOUNT_ID, AWS_REGION)

namespace = fairing_utils.get_current_k8s_namespace()

logging.info(f"Running in aws region {AWS_REGION}, account {AWS_ACCOUNT_ID}")
logging.info(f"Running in namespace {namespace}")
logging.info(f"Using docker registry {DOCKER_REGISTRY}")

Use Kubeflow fairing to build the docker image

  • You will use kubeflow fairing's kaniko builder to build a docker image that includes all your dependencies
    • You use kaniko because you want to be able to run pip to install dependencies
    • Kaniko gives you the flexibility to build images from Dockerfiles

In [ ]:
# TODO(https://github.com/kubeflow/fairing/issues/426): We should get rid of this once the default 
# Kaniko image is updated to a newer image than 0.7.0.
from kubeflow.fairing import constants
constants.constants.KANIKO_IMAGE = "gcr.io/kaniko-project/executor:v0.14.0"

In [ ]:
from kubeflow.fairing.builders import cluster

# output_map is a map of extra files to add to the notebook.
# It is a map from source location to the location inside the context.
output_map =  {
    "Dockerfile.model": "Dockerfile",
    "model.py": "model.py"
}

preprocessor = base_preprocessor.BasePreProcessor(
    command=["python"], # The base class will set this.
    input_files=[],
    path_prefix="/app", # irrelevant since we aren't preprocessing any files
    output_map=output_map)

preprocessor.preprocess()

In [ ]:
# Create a new ECR repository to host model image
!aws ecr create-repository --repository-name mnist --region=$AWS_REGION

In [ ]:
# Use a Tensorflow image as the base image
# We use a custom Dockerfile 
cluster_builder = cluster.cluster.ClusterBuilder(registry=DOCKER_REGISTRY,
                                                 base_image="", # base_image is set in the Dockerfile
                                                 preprocessor=preprocessor,
                                                 image_name="mnist",
                                                 dockerfile_path="Dockerfile",
                                                 pod_spec_mutators=[fairing.cloud.aws.add_aws_credentials_if_exists, fairing.cloud.aws.add_ecr_config],
                                                 context_source=cluster.s3_context.S3ContextSource(region=AWS_REGION))
cluster_builder.build()
logging.info(f"Built image {cluster_builder.image_tag}")

Create a S3 Bucket

  • Create a S3 bucket to store our models and other results.
  • Since we are running in python we use the python client libraries but you could also use the gsutil command line

In [ ]:
import boto3
from botocore.exceptions import ClientError

bucket = f"{AWS_ACCOUNT_ID}-mnist"

def create_bucket(bucket_name, region=None):
    """Create an S3 bucket in a specified region

    If a region is not specified, the bucket is created in the S3 default
    region (us-east-1).

    :param bucket_name: Bucket to create
    :param region: String region to create bucket in, e.g., 'us-west-2'
    :return: True if bucket created, else False
    """

    # Create bucket
    try:
        if region is None:
            s3_client = boto3.client('s3')
            s3_client.create_bucket(Bucket=bucket_name)
        else:
            s3_client = boto3.client('s3', region_name=region)
            location = {'LocationConstraint': region}
            s3_client.create_bucket(Bucket=bucket_name,
                                    CreateBucketConfiguration=location)
    except ClientError as e:
        logging.error(e)
        return False
    return True

create_bucket(bucket, AWS_REGION)

Distributed training

  • We will train the model by using TFJob to run a distributed training job

In [ ]:
train_name = f"mnist-train-{uuid.uuid4().hex[:4]}"
num_ps = 1
num_workers = 2
model_dir = f"s3://{bucket}/mnist"
export_path = f"s3://{bucket}/mnist/export"
train_steps = 200
batch_size = 100
learning_rate = .01
image = cluster_builder.image_tag

train_spec = f"""apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
  name: {train_name}  
spec:
  tfReplicaSpecs:
    Ps:
      replicas: {num_ps}
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          serviceAccount: default-editor
          containers:
          - name: tensorflow
            command:
            - python
            - /opt/model.py
            - --tf-model-dir={model_dir}
            - --tf-export-dir={export_path}
            - --tf-train-steps={train_steps}
            - --tf-batch-size={batch_size}
            - --tf-learning-rate={learning_rate}
            image: {image}
            workingDir: /opt
            env:
            - name: AWS_REGION
              value: {AWS_REGION}
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                secretKeyRef:
                  name: aws-secret
                  key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                secretKeyRef:
                  name: aws-secret
                  key: AWS_SECRET_ACCESS_KEY

          restartPolicy: OnFailure
    Chief:
      replicas: 1
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          serviceAccount: default-editor
          containers:
          - name: tensorflow
            command:
            - python
            - /opt/model.py
            - --tf-model-dir={model_dir}
            - --tf-export-dir={export_path}
            - --tf-train-steps={train_steps}
            - --tf-batch-size={batch_size}
            - --tf-learning-rate={learning_rate}
            image: {image}
            workingDir: /opt
            env:
            - name: AWS_REGION
              value: {AWS_REGION}
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                secretKeyRef:
                  name: aws-secret
                  key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                secretKeyRef:
                  name: aws-secret
                  key: AWS_SECRET_ACCESS_KEY

          restartPolicy: OnFailure
    Worker:
      replicas: 1
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          serviceAccount: default-editor
          containers:
          - name: tensorflow
            command:
            - python
            - /opt/model.py
            - --tf-model-dir={model_dir}
            - --tf-export-dir={export_path}
            - --tf-train-steps={train_steps}
            - --tf-batch-size={batch_size}
            - --tf-learning-rate={learning_rate}
            image: {image}
            workingDir: /opt
            env:
            - name: AWS_REGION
              value: {AWS_REGION}
            - name: AWS_ACCESS_KEY_ID
              valueFrom:
                secretKeyRef:
                  name: aws-secret
                  key: AWS_ACCESS_KEY_ID
            - name: AWS_SECRET_ACCESS_KEY
              valueFrom:
                secretKeyRef:
                  name: aws-secret
                  key: AWS_SECRET_ACCESS_KEY
          restartPolicy: OnFailure
"""

Create the training job

  • You could write the spec to a YAML file and then do kubectl apply -f {FILE}
  • Since you are running in jupyter you will use the TFJob client
  • You will run the TFJob in a namespace created by a Kubeflow profile
    • The namespace will be the same namespace you are running the notebook in
    • Creating a profile ensures the namespace is provisioned with service accounts and other resources needed for Kubeflow

In [ ]:
tf_job_client = tf_job_client_module.TFJobClient()

In [ ]:
tf_job_body = yaml.safe_load(train_spec)
tf_job = tf_job_client.create(tf_job_body, namespace=namespace)  

logging.info(f"Created job {namespace}.{train_name}")

Check the job

  • Above you used the python SDK for TFJob to check the status
  • You can also use kubectl get the status of your job
  • The job conditions will tell you whether the job is running, succeeded or failed

In [ ]:
!kubectl get tfjobs -o yaml {train_name}

Get The Logs

  • There are two ways to get the logs for the training job

    1. Using kubectl to fetch the pod logs
      • These logs are ephemeral; they will be unavailable when the pod is garbage collected to free up resources
    2. Using Fluentd-Cloud-Watch
      • Kubernetes data plane logs are not automatically available in AWS
      • You need to install fluentd-cloud-watch plugin to ship containers logs to Cloud Watch

Deploy TensorBoard

  • You will create a Kubernetes Deployment to run TensorBoard
  • TensorBoard will be accessible behind the Kubeflow endpoint

In [ ]:
tb_name = "mnist-tensorboard"
tb_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: mnist-tensorboard
  name: {tb_name}
  namespace: {namespace}
spec:
  selector:
    matchLabels:
      app: mnist-tensorboard
  template:
    metadata:
      labels:
        app: mnist-tensorboard
        version: v1
    spec:
      serviceAccount: default-editor
      containers:
      - command:
        - /usr/local/bin/tensorboard
        - --logdir={model_dir}
        - --port=80
        image: tensorflow/tensorflow:1.15.2-py3
        name: tensorboard
        env:
        - name: AWS_REGION
          value: {AWS_REGION}
        - name: AWS_ACCESS_KEY_ID
          valueFrom:
            secretKeyRef:
              name: aws-secret
              key: AWS_ACCESS_KEY_ID
        - name: AWS_SECRET_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              name: aws-secret
              key: AWS_SECRET_ACCESS_KEY
        ports:
        - containerPort: 80
"""
tb_service = f"""apiVersion: v1
kind: Service
metadata:
  labels:
    app: mnist-tensorboard
  name: {tb_name}
  namespace: {namespace}
spec:
  ports:
  - name: http-tb
    port: 80
    targetPort: 80
  selector:
    app: mnist-tensorboard
  type: ClusterIP
"""

tb_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: {tb_name}
  namespace: {namespace}
spec:
  gateways:
  - kubeflow/kubeflow-gateway
  hosts:
  - '*'
  http:
  - match:
    - uri:
        prefix: /mnist/{namespace}/tensorboard/
    rewrite:
      uri: /
    route:
    - destination:
        host: {tb_name}.{namespace}.svc.cluster.local
        port:
          number: 80
    timeout: 300s
"""

tb_specs = [tb_deploy, tb_service, tb_virtual_service]

In [ ]:
k8s_util.apply_k8s_specs(tb_specs, k8s_util.K8S_CREATE_OR_REPLACE)

Access The TensorBoard UI

Note: By default, your namespace may not have access to istio-system namespace to get


In [ ]:
endpoint = k8s_util.get_ingress_endpoint() 
if endpoint:    
    vs = yaml.safe_load(tb_virtual_service)
    path= vs["spec"]["http"][0]["match"][0]["uri"]["prefix"]
    tb_endpoint = endpoint + path
    display(HTML(f"TensorBoard UI is at <a href='{tb_endpoint}'>{tb_endpoint}</a>"))

Wait For the Training Job to finish

  • You can use the TFJob client to wait for it to finish.

In [ ]:
tf_job = tf_job_client.wait_for_condition(train_name, expected_condition=["Succeeded", "Failed"], namespace=namespace)

if tf_job_client.is_job_succeeded(train_name, namespace):
    logging.info(f"TFJob {namespace}.{train_name} succeeded")
else:
    raise ValueError(f"TFJob {namespace}.{train_name} failed")

Serve the model

  • Deploy the model using tensorflow serving
  • We need to create
    1. A Kubernetes Deployment
    2. A Kubernetes service
    3. (Optional) Create a configmap containing the prometheus monitoring config

In [ ]:
import os
deploy_name = "mnist-model"
model_base_path = export_path

# The web ui defaults to mnist-service so if you change it you will
# need to change it in the UI as well to send predictions to the mode
model_service = "mnist-service"

deploy_spec = f"""apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: mnist
  name: {deploy_name}
  namespace: {namespace}
spec:
  selector:
    matchLabels:
      app: mnist-model
  template:
    metadata:
      # TODO(jlewi): Right now we disable the istio side car because otherwise ISTIO rbac will prevent the
      # UI from sending RPCs to the server. We should create an appropriate ISTIO rbac authorization
      # policy to allow traffic from the UI to the model servier.
      # https://istio.io/docs/concepts/security/#target-selectors
      annotations:        
        sidecar.istio.io/inject: "false"
      labels:
        app: mnist-model
        version: v1
    spec:
      serviceAccount: default-editor
      containers:
      - args:
        - --port=9000
        - --rest_api_port=8500
        - --model_name=mnist
        - --model_base_path={model_base_path}
        - --monitoring_config_file=/var/config/monitoring_config.txt
        command:
        - /usr/bin/tensorflow_model_server
        env:
        - name: modelBasePath
          value: {model_base_path}
        - name: AWS_REGION
          value: {AWS_REGION}
        - name: AWS_ACCESS_KEY_ID
          valueFrom:
            secretKeyRef:
              name: aws-secret
              key: AWS_ACCESS_KEY_ID
        - name: AWS_SECRET_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              name: aws-secret
              key: AWS_SECRET_ACCESS_KEY
        image: tensorflow/serving:1.15.0
        imagePullPolicy: IfNotPresent
        livenessProbe:
          initialDelaySeconds: 30
          periodSeconds: 30
          tcpSocket:
            port: 9000
        name: mnist
        ports:
        - containerPort: 9000
        - containerPort: 8500
        resources:
          limits:
            cpu: "1"
            memory: 1Gi
          requests:
            cpu: "1"
            memory: 1Gi
        volumeMounts:
        - mountPath: /var/config/
          name: model-config
      volumes:
      - configMap:
          name: {deploy_name}
        name: model-config
"""

service_spec = f"""apiVersion: v1
kind: Service
metadata:
  annotations:    
    prometheus.io/path: /monitoring/prometheus/metrics
    prometheus.io/port: "8500"
    prometheus.io/scrape: "true"
  labels:
    app: mnist-model
  name: {model_service}
  namespace: {namespace}
spec:
  ports:
  - name: grpc-tf-serving
    port: 9000
    targetPort: 9000
  - name: http-tf-serving
    port: 8500
    targetPort: 8500
  selector:
    app: mnist-model
  type: ClusterIP
"""

monitoring_config = f"""kind: ConfigMap
apiVersion: v1
metadata:
  name: {deploy_name}
  namespace: {namespace}
data:
  monitoring_config.txt: |-
    prometheus_config: {{
      enable: true,
      path: "/monitoring/prometheus/metrics"
    }}
"""

model_specs = [deploy_spec, service_spec, monitoring_config]

In [ ]:
k8s_util.apply_k8s_specs(model_specs, k8s_util.K8S_CREATE_OR_REPLACE)

Deploy the mnist UI

  • We will now deploy the UI to visual the mnist results
  • Note: This is using a prebuilt and public docker image for the UI

In [ ]:
ui_name = "mnist-ui"
ui_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
  name: {ui_name}
  namespace: {namespace}
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mnist-web-ui
  template:
    metadata:
      labels:
        app: mnist-web-ui
    spec:
      containers:
      - image: gcr.io/kubeflow-examples/mnist/web-ui:v20190112-v0.2-142-g3b38225
        name: web-ui
        ports:
        - containerPort: 5000        
      serviceAccount: default-editor
"""

ui_service = f"""apiVersion: v1
kind: Service
metadata:
  annotations:
  name: {ui_name}
  namespace: {namespace}
spec:
  ports:
  - name: http-mnist-ui
    port: 80
    targetPort: 5000
  selector:
    app: mnist-web-ui
  type: ClusterIP
"""

ui_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: {ui_name}
  namespace: {namespace}
spec:
  gateways:
  - kubeflow/kubeflow-gateway
  hosts:
  - '*'
  http:
  - match:
    - uri:
        prefix: /mnist/{namespace}/ui/
    rewrite:
      uri: /
    route:
    - destination:
        host: {ui_name}.{namespace}.svc.cluster.local
        port:
          number: 80
    timeout: 300s
"""

ui_specs = [ui_deploy, ui_service, ui_virtual_service]

In [ ]:
k8s_util.apply_k8s_specs(ui_specs, k8s_util.K8S_CREATE_OR_REPLACE)

Access the web UI

  • A reverse proxy route is automatically added to the Kubeflow endpoint
  • The endpoint will be

    http:/${KUBEflOW_ENDPOINT}/mnist/${NAMESPACE}/ui/
  • You can get the KUBEFLOW_ENDPOINT

    KUBEfLOW_ENDPOINT=`kubectl -n istio-system get ingress istio-ingress -o jsonpath="{.status.loadBalancer.ingress[0].hostname}"`
    • You must run this command with sufficient RBAC permissions to get the ingress.
  • If you have sufficient privileges you can run the cell below to get the endpoint if you don't have sufficient priveleges you can grant appropriate permissions by running the command

     kubectl create --namespace=istio-system rolebinding --clusterrole=kubeflow-view --serviceaccount=${NAMESPACE}:default-editor ${NAMESPACE}-istio-view

In [ ]:
endpoint = k8s_util.get_ingress_endpoint() 
if endpoint:    
    vs = yaml.safe_load(ui_virtual_service)
    path= vs["spec"]["http"][0]["match"][0]["uri"]["prefix"]
    ui_endpoint = endpoint + path
    display(HTML(f"mnist UI is at <a href='{ui_endpoint}'>{ui_endpoint}</a>"))