This example guides you through:
Before proceeding to the next steps, we first need to provision the necessary IBM Services and input the credentials below.
IBM Cloud Object Storage(COS): https://cloud.ibm.com/catalog/services/cloud-object-storage
Tip: follow the steps below to access your COS instance dashboard. From the IBM Cloud dashboard:
Create new credentials with HMAC:
Replace the information in the following cell with your COS credentials.
You can find these credentials in your COS instance dashboard under the Service credentials tab.
In [ ]:
cos_credentials = {
"apikey": "-------",
"cos_hmac_keys": {
"access_key_id": "------",
"secret_access_key": "------"
},
"endpoints": "https://cos-service.bluemix.net/endpoints",
"iam_apikey_description": "------",
"iam_apikey_name": "------",
"iam_role_crn": "------",
"iam_serviceid_crn": "------",
"resource_instance_id": "-------"
}
Define the endpoint.
To do this, go to the Endpoint tab in the COS instance's dashboard to get the endpoint information, then enter it in the service_endpoint
cell below.
In [ ]:
service_endpoint = 's3.us.cloud-object-storage.appdomain.cloud'
service_endpoint_with_https="https://" + service_endpoint
There is a delta between existing distributed mnist examples and what's needed to run well as a TFJob.
Basically, we must:
tf.estimator.train_and_evaluate
to enable model exporting and serving.The resulting model is model.py.
In [ ]:
import logging
import os
import uuid
from importlib import reload
import notebook_setup
reload(notebook_setup)
notebook_setup.notebook_setup(platform=None)
In [ ]:
import k8s_util
# Force a reload of kubeflow; since kubeflow is a multi namespace module
# it looks like doing this in notebook_setup may not be sufficient
import kubeflow
reload(kubeflow)
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from kubeflow.tfjob.api import tf_job_client as tf_job_client_module
from IPython.core.display import display, HTML
import yaml
In [ ]:
import json
config={
"auths": {
"https://index.docker.io/v1/": {
"auth": "xxxxxxxxxxxxxxx"
}
}
}
with open('config.json', 'w') as outfile:
json.dump(config, outfile)
In [ ]:
# !kubectl delete configmap docker-config
!kubectl create configmap docker-config --from-file=config.json
!rm config.json
In [ ]:
from kubernetes import client as k8s_client
from kubernetes.client import rest as k8s_rest
from kubeflow import fairing
from kubeflow.fairing import utils as fairing_utils
from kubeflow.fairing.builders import append
from kubeflow.fairing.deployers import job
from kubeflow.fairing.preprocessors import base as base_preprocessor
# Update the DOCKER_REGISTRY to your docker registry!!
DOCKER_REGISTRY = "dockerregistry"
namespace = fairing_utils.get_current_k8s_namespace()
cos_username = cos_credentials['cos_hmac_keys']['access_key_id']
cos_key = cos_credentials['cos_hmac_keys']['secret_access_key']
cos_region = "us-east-1"
logging.info(f"Running in namespace {namespace}")
logging.info(f"Using docker registry {DOCKER_REGISTRY}")
In [ ]:
# TODO(https://github.com/kubeflow/fairing/issues/426): We should get rid of this once the default
# Kaniko image is updated to a newer image than 0.7.0.
from kubeflow.fairing import constants
constants.constants.KANIKO_IMAGE = "gcr.io/kaniko-project/executor:v0.14.0"
In [ ]:
from kubeflow.fairing.builders import cluster
# output_map is a map of extra files to add to the notebook.
# It is a map from source location to the location inside the context.
output_map = {
"Dockerfile.model": "Dockerfile",
"model.py": "model.py"
}
preprocessor = base_preprocessor.BasePreProcessor(
command=["python"], # The base class will set this.
input_files=[],
path_prefix="/app", # irrelevant since we aren't preprocessing any files
output_map=output_map)
preprocessor.preprocess()
In [ ]:
# Use a Tensorflow image as the base image
# We use a custom Dockerfile
from kubeflow.fairing.cloud.k8s import MinioUploader
from kubeflow.fairing.builders.cluster.minio_context import MinioContextSource
minio_uploader = MinioUploader(endpoint_url=service_endpoint_with_https, minio_secret=cos_username, minio_secret_key=cos_key, region_name=cos_region)
minio_context_source = MinioContextSource(endpoint_url=service_endpoint_with_https, minio_secret=cos_username, minio_secret_key=cos_key, region_name=cos_region)
In [ ]:
# TODO: Add IBM Container registry as part of the fairing SDK.
cluster_builder = cluster.cluster.ClusterBuilder(registry=DOCKER_REGISTRY,
base_image="", # base_image is set in the Dockerfile
preprocessor=preprocessor,
image_name="mnist",
dockerfile_path="Dockerfile",
context_source=minio_context_source)
cluster_builder.build()
logging.info(f"Built image {cluster_builder.image_tag}")
In [ ]:
mnist_bucket = f"{DOCKER_REGISTRY}-mnist"
minio_uploader.create_bucket(mnist_bucket)
logging.info(f"Bucket {mnist_bucket} created or already exists")
In [ ]:
train_name = f"mnist-train-{uuid.uuid4().hex[:4]}"
num_ps = 1
num_workers = 2
model_dir = f"s3://{mnist_bucket}/mnist"
export_path = f"s3://{mnist_bucket}/mnist/export"
train_steps = 200
batch_size = 100
learning_rate = .01
image = cluster_builder.image_tag
In [ ]:
train_spec = f"""apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
name: {train_name}
spec:
tfReplicaSpecs:
Ps:
replicas: {num_ps}
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
env:
- name: S3_ENDPOINT
value: {service_endpoint}
- name: AWS_REGION
value: {cos_region}
- name: BUCKET_NAME
value: {mnist_bucket}
- name: S3_USE_HTTPS
value: "1"
- name: S3_VERIFY_SSL
value: "1"
- name: AWS_ACCESS_KEY_ID
value: {cos_username}
- name: AWS_SECRET_ACCESS_KEY
value: {cos_key}
image: {image}
workingDir: /opt
restartPolicy: OnFailure
Chief:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
env:
- name: S3_ENDPOINT
value: {service_endpoint}
- name: AWS_REGION
value: {cos_region}
- name: BUCKET_NAME
value: {mnist_bucket}
- name: S3_USE_HTTPS
value: "1"
- name: S3_VERIFY_SSL
value: "1"
- name: AWS_ACCESS_KEY_ID
value: {cos_username}
- name: AWS_SECRET_ACCESS_KEY
value: {cos_key}
image: {image}
workingDir: /opt
restartPolicy: OnFailure
Worker:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
env:
- name: S3_ENDPOINT
value: {service_endpoint}
- name: AWS_REGION
value: {cos_region}
- name: BUCKET_NAME
value: {mnist_bucket}
- name: S3_USE_HTTPS
value: "1"
- name: S3_VERIFY_SSL
value: "1"
- name: AWS_ACCESS_KEY_ID
value: {cos_username}
- name: AWS_SECRET_ACCESS_KEY
value: {cos_key}
image: {image}
workingDir: /opt
restartPolicy: OnFailure
"""
kubectl apply -f {FILE}
In [ ]:
tf_job_client = tf_job_client_module.TFJobClient()
In [ ]:
tf_job_body = yaml.safe_load(train_spec)
tf_job = tf_job_client.create(tf_job_body, namespace=namespace)
logging.info(f"Created job {namespace}.{train_name}")
In [ ]:
from kubeflow.tfjob import TFJobClient
tfjob_client = TFJobClient()
tfjob_client.wait_for_job(train_name, namespace=namespace, watch=True)
In [ ]:
tfjob_client.get_logs(train_name, namespace=namespace)
In [ ]:
tb_name = "mnist-tensorboard"
tb_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: mnist-tensorboard
name: {tb_name}
namespace: {namespace}
spec:
selector:
matchLabels:
app: mnist-tensorboard
template:
metadata:
labels:
app: mnist-tensorboard
version: v1
spec:
serviceAccount: default-editor
containers:
- command:
- /usr/local/bin/tensorboard
- --logdir={model_dir}
- --port=80
image: tensorflow/tensorflow:1.15.2-py3
env:
- name: S3_ENDPOINT
value: {service_endpoint}
- name: AWS_REGION
value: {cos_region}
- name: BUCKET_NAME
value: {mnist_bucket}
- name: S3_USE_HTTPS
value: "1"
- name: S3_VERIFY_SSL
value: "1"
- name: AWS_ACCESS_KEY_ID
value: {cos_username}
- name: AWS_SECRET_ACCESS_KEY
value: {cos_key}
name: tensorboard
ports:
- containerPort: 80
"""
tb_service = f"""apiVersion: v1
kind: Service
metadata:
labels:
app: mnist-tensorboard
name: {tb_name}
namespace: {namespace}
spec:
ports:
- name: http-tb
port: 80
targetPort: 80
selector:
app: mnist-tensorboard
type: ClusterIP
"""
tb_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: {tb_name}
namespace: {namespace}
spec:
gateways:
- kubeflow/kubeflow-gateway
hosts:
- '*'
http:
- match:
- uri:
prefix: /mnist/{namespace}/tensorboard/
rewrite:
uri: /
route:
- destination:
host: {tb_name}.{namespace}.svc.cluster.local
port:
number: 80
timeout: 300s
"""
tb_specs = [tb_deploy, tb_service, tb_virtual_service]
In [ ]:
k8s_util.apply_k8s_specs(tb_specs, k8s_util.K8S_CREATE_OR_REPLACE)
Run this with the appropriate RBAC permissions
Note: You can get the node worker ip from kubectl get no -o wide
export INGRESS_HOST=<worker-node-ip>
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
printf "Tensorboard URL: \n${INGRESS_HOST}:${INGRESS_PORT}/mnist/anonymous/tensorboard/\n"
In [ ]:
deploy_name = "mnist-model"
model_base_path = export_path
# The web ui defaults to mnist-service so if you change it you will
# need to change it in the UI as well to send predictions to the mode
model_service = "mnist-service"
deploy_spec = f"""apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: mnist
name: {deploy_name}
namespace: {namespace}
spec:
selector:
matchLabels:
app: mnist-model
template:
metadata:
# TODO(jlewi): Right now we disable the istio side car because otherwise ISTIO rbac will prevent the
# UI from sending RPCs to the server. We should create an appropriate ISTIO rbac authorization
# policy to allow traffic from the UI to the model servier.
# https://istio.io/docs/concepts/security/#target-selectors
annotations:
sidecar.istio.io/inject: "false"
labels:
app: mnist-model
version: v1
spec:
serviceAccount: default-editor
containers:
- args:
- --port=9000
- --rest_api_port=8500
- --model_name=mnist
- --model_base_path={model_base_path}
command:
- /usr/bin/tensorflow_model_server
env:
- name: modelBasePath
value: {model_base_path}
- name: S3_ENDPOINT
value: {service_endpoint}
- name: AWS_REGION
value: {cos_region}
- name: BUCKET_NAME
value: {mnist_bucket}
- name: S3_USE_HTTPS
value: "1"
- name: S3_VERIFY_SSL
value: "1"
- name: AWS_ACCESS_KEY_ID
value: {cos_username}
- name: AWS_SECRET_ACCESS_KEY
value: {cos_key}
image: tensorflow/serving:1.15.0
imagePullPolicy: IfNotPresent
livenessProbe:
initialDelaySeconds: 30
periodSeconds: 30
tcpSocket:
port: 9000
name: mnist
ports:
- containerPort: 9000
- containerPort: 8500
resources:
limits:
cpu: "4"
memory: 4Gi
requests:
cpu: "1"
memory: 1Gi
volumeMounts:
- mountPath: /var/config/
name: model-config
volumes:
- configMap:
name: {deploy_name}
name: model-config
"""
service_spec = f"""apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /monitoring/prometheus/metrics
prometheus.io/port: "8500"
prometheus.io/scrape: "true"
labels:
app: mnist-model
name: {model_service}
namespace: {namespace}
spec:
ports:
- name: grpc-tf-serving
port: 9000
targetPort: 9000
- name: http-tf-serving
port: 8500
targetPort: 8500
selector:
app: mnist-model
type: ClusterIP
"""
monitoring_config = f"""kind: ConfigMap
apiVersion: v1
metadata:
name: {deploy_name}
namespace: {namespace}
data:
monitoring_config.txt: |-
prometheus_config: {{
enable: true,
path: "/monitoring/prometheus/metrics"
}}
"""
model_specs = [deploy_spec, service_spec, monitoring_config]
In [ ]:
k8s_util.apply_k8s_specs(model_specs, k8s_util.K8S_CREATE_OR_REPLACE)
In [ ]:
ui_name = "mnist-ui"
ui_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
name: {ui_name}
namespace: {namespace}
spec:
replicas: 1
selector:
matchLabels:
app: mnist-web-ui
template:
metadata:
labels:
app: mnist-web-ui
spec:
containers:
- image: gcr.io/kubeflow-examples/mnist/web-ui:v20190112-v0.2-142-g3b38225
name: web-ui
ports:
- containerPort: 5000
serviceAccount: default-editor
"""
ui_service = f"""apiVersion: v1
kind: Service
metadata:
annotations:
name: {ui_name}
namespace: {namespace}
spec:
ports:
- name: http-mnist-ui
port: 80
targetPort: 5000
selector:
app: mnist-web-ui
type: ClusterIP
"""
ui_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: {ui_name}
namespace: {namespace}
spec:
gateways:
- kubeflow/kubeflow-gateway
hosts:
- '*'
http:
- match:
- uri:
prefix: /mnist/{namespace}/ui/
rewrite:
uri: /
route:
- destination:
host: {ui_name}.{namespace}.svc.cluster.local
port:
number: 80
timeout: 300s
"""
ui_specs = [ui_deploy, ui_service, ui_virtual_service]
In [ ]:
k8s_util.apply_k8s_specs(ui_specs, k8s_util.K8S_CREATE_OR_REPLACE)
Run this with the appropriate RBAC permissions
Note: You can get the node worker ip from kubectl get no -o wide
export INGRESS_HOST=<worker-node-ip>
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
printf "mnist-web-app URL: \n${INGRESS_HOST}:${INGRESS_PORT}/mnist/anonymous/ui/\n"