This example guides you through:
There is a delta between existing distributed mnist examples and what's needed to run well as a TFJob.
Basically, we must:
tf.estimator.train_and_evaluate
to enable model exporting and serving.The resulting model is model.py.
Source documentation: Kaniko docs
Kaniko is used by fairing to build the model every time the notebook is run and deploy a fresh model. The newly built image is pushed into the DOCKER_REGISTRY and pulled from there by subsequent resources.
Get your docker registry user and password encoded in base64
echo -n USER:PASSWORD | base64
Create a config.json file with your Docker registry url and the previous generated base64 string
{
"auths": {
"https://index.docker.io/v1/": {
"auth": "xxxxxxxxxxxxxxx"
}
}
}
kubectl create --namespace ${NAMESPACE} configmap docker-config --from-file=<path to config.json>
The DOCKER_REGISTRY variable is used to push the newly built image.
Please change the variable to the registry for which you've configured credentials.
In [ ]:
from kubernetes import client as k8s_client
from kubernetes.client import rest as k8s_rest
from kubeflow import fairing
from kubeflow.fairing import utils as fairing_utils
from kubeflow.fairing.builders import append
from kubeflow.fairing.deployers import job
from kubeflow.fairing.preprocessors import base as base_preprocessor
DOCKER_REGISTRY = "ciscoai"
namespace = fairing_utils.get_current_k8s_namespace()
from kubernetes import client as k8s_client
from kubernetes.client.rest import ApiException
api_client = k8s_client.CoreV1Api()
minio_service_endpoint = None
try:
minio_service_endpoint = api_client.read_namespaced_service(name='minio-service', namespace='kubeflow').spec.cluster_ip
except ApiException as e:
if e.status == 403:
logging.warning(f"The service account doesn't have sufficient privileges "
f"to get the kubeflow minio-service. "
f"You will have to manually enter the minio cluster-ip. "
f"To make this function work ask someone with cluster "
f"priveleges to create an appropriate "
f"clusterrolebinding by running a command.\n"
f"kubectl create --namespace=kubeflow rolebinding "
"--clusterrole=kubeflow-view "
"--serviceaccount=${NAMESPACE}:default-editor "
"${NAMESPACE}-minio-view")
logging.error("API access denied with reason: {e.reason}")
s3_endpoint = minio_service_endpoint
minio_endpoint = "http://"+s3_endpoint
minio_username = "minio"
minio_key = "minio123"
minio_region = "us-east-1"
logging.info(f"Running in namespace {namespace}")
logging.info(f"Using docker registry {DOCKER_REGISTRY}")
logging.info(f"Using minio instance with endpoint '{s3_endpoint}'")
In [ ]:
import logging
import os
import uuid
from importlib import reload
import notebook_setup
reload(notebook_setup)
notebook_setup.notebook_setup()
In [ ]:
import k8s_util
# Force a reload of kubeflow; since kubeflow is a multi namespace module
# it looks like doing this in notebook_setup may not be sufficient
import kubeflow
reload(kubeflow)
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from kubeflow.tfjob.api import tf_job_client as tf_job_client_module
from IPython.core.display import display, HTML
import yaml
In [ ]:
# TODO(https://github.com/kubeflow/fairing/issues/426): We should get rid of this once the default
# Kaniko image is updated to a newer image than 0.7.0.
from kubeflow.fairing import constants
constants.constants.KANIKO_IMAGE = "gcr.io/kaniko-project/executor:v0.14.0"
In [ ]:
from kubeflow.fairing.builders import cluster
# output_map is a map of extra files to add to the notebook.
# It is a map from source location to the location inside the context.
output_map = {
"Dockerfile.model": "Dockerfile",
"model.py": "model.py"
}
preprocessor = base_preprocessor.BasePreProcessor(
command=["python"], # The base class will set this.
input_files=[],
path_prefix="/app", # irrelevant since we aren't preprocessing any files
output_map=output_map)
preprocessor.preprocess()
In [ ]:
# Use a Tensorflow image as the base image
# We use a custom Dockerfile
from kubeflow.fairing.cloud.k8s import MinioUploader
from kubeflow.fairing.builders.cluster.minio_context import MinioContextSource
minio_uploader = MinioUploader(endpoint_url=minio_endpoint, minio_secret=minio_username, minio_secret_key=minio_key, region_name=minio_region)
minio_context_source = MinioContextSource(endpoint_url=minio_endpoint, minio_secret=minio_username, minio_secret_key=minio_key, region_name=minio_region)
In [ ]:
cluster_builder = cluster.cluster.ClusterBuilder(registry=DOCKER_REGISTRY,
base_image="", # base_image is set in the Dockerfile
preprocessor=preprocessor,
image_name="mnist",
dockerfile_path="Dockerfile",
context_source=minio_context_source)
cluster_builder.build()
logging.info(f"Built image {cluster_builder.image_tag}")
In [ ]:
mnist_bucket = f"{DOCKER_REGISTRY}-mnist"
minio_uploader.create_bucket(mnist_bucket)
logging.info(f"Bucket {mnist_bucket} created or already exists")
In [ ]:
train_name = f"mnist-train-{uuid.uuid4().hex[:4]}"
num_ps = 1
num_workers = 2
model_dir = f"s3://{mnist_bucket}/mnist"
export_path = f"s3://{mnist_bucket}/mnist/export"
train_steps = 200
batch_size = 100
learning_rate = .01
image = cluster_builder.image_tag
In [ ]:
train_spec = f"""apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
name: {train_name}
spec:
tfReplicaSpecs:
Ps:
replicas: {num_ps}
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
env:
- name: S3_ENDPOINT
value: {s3_endpoint}
- name: AWS_ENDPOINT_URL
value: {minio_endpoint}
- name: AWS_REGION
value: {minio_region}
- name: BUCKET_NAME
value: {mnist_bucket}
- name: S3_USE_HTTPS
value: "0"
- name: S3_VERIFY_SSL
value: "0"
- name: AWS_ACCESS_KEY_ID
value: {minio_username}
- name: AWS_SECRET_ACCESS_KEY
value: {minio_key}
image: {image}
workingDir: /opt
restartPolicy: OnFailure
Chief:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
env:
- name: S3_ENDPOINT
value: {s3_endpoint}
- name: AWS_ENDPOINT_URL
value: {minio_endpoint}
- name: AWS_REGION
value: {minio_region}
- name: BUCKET_NAME
value: {mnist_bucket}
- name: S3_USE_HTTPS
value: "0"
- name: S3_VERIFY_SSL
value: "0"
- name: AWS_ACCESS_KEY_ID
value: {minio_username}
- name: AWS_SECRET_ACCESS_KEY
value: {minio_key}
image: {image}
workingDir: /opt
restartPolicy: OnFailure
Worker:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
env:
- name: S3_ENDPOINT
value: {s3_endpoint}
- name: AWS_ENDPOINT_URL
value: {minio_endpoint}
- name: AWS_REGION
value: {minio_region}
- name: BUCKET_NAME
value: {mnist_bucket}
- name: S3_USE_HTTPS
value: "0"
- name: S3_VERIFY_SSL
value: "0"
- name: AWS_ACCESS_KEY_ID
value: {minio_username}
- name: AWS_SECRET_ACCESS_KEY
value: {minio_key}
image: {image}
workingDir: /opt
restartPolicy: OnFailure
"""
kubectl apply -f {FILE}
In [ ]:
tf_job_client = tf_job_client_module.TFJobClient()
In [ ]:
tf_job_body = yaml.safe_load(train_spec)
tf_job = tf_job_client.create(tf_job_body, namespace=namespace)
logging.info(f"Created job {namespace}.{train_name}")
In [ ]:
from kubeflow.tfjob import TFJobClient
tfjob_client = TFJobClient()
tfjob_client.wait_for_job(train_name, namespace=namespace, watch=True)
In [ ]:
tfjob_client.get_logs(train_name, namespace=namespace)
In [ ]:
#TODO(swiftdiaries): Check object key for model specifically
from botocore.exceptions import ClientError
try:
model_response = minio_uploader.client.list_objects(Bucket=mnist_bucket)
# Minimal check to see if at least the bucket is created
if model_response["ResponseMetadata"]["HTTPStatusCode"] == 200:
logging.info(f"{model_dir} found in {mnist_bucket} bucket")
except ClientError as err:
logging.error(err)
In [ ]:
tb_name = "mnist-tensorboard"
tb_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: mnist-tensorboard
name: {tb_name}
namespace: {namespace}
spec:
selector:
matchLabels:
app: mnist-tensorboard
template:
metadata:
labels:
app: mnist-tensorboard
version: v1
spec:
serviceAccount: default-editor
containers:
- command:
- /usr/local/bin/tensorboard
- --logdir={model_dir}
- --port=80
image: tensorflow/tensorflow:1.15.2-py3
env:
- name: S3_ENDPOINT
value: {s3_endpoint}
- name: AWS_ENDPOINT_URL
value: {minio_endpoint}
- name: AWS_REGION
value: {minio_region}
- name: BUCKET_NAME
value: {mnist_bucket}
- name: S3_USE_HTTPS
value: "0"
- name: S3_VERIFY_SSL
value: "0"
- name: AWS_ACCESS_KEY_ID
value: {minio_username}
- name: AWS_SECRET_ACCESS_KEY
value: {minio_key}
name: tensorboard
ports:
- containerPort: 80
"""
tb_service = f"""apiVersion: v1
kind: Service
metadata:
labels:
app: mnist-tensorboard
name: {tb_name}
namespace: {namespace}
spec:
ports:
- name: http-tb
port: 80
targetPort: 80
selector:
app: mnist-tensorboard
type: ClusterIP
"""
tb_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: {tb_name}
namespace: {namespace}
spec:
gateways:
- kubeflow/kubeflow-gateway
hosts:
- '*'
http:
- match:
- uri:
prefix: /mnist/{namespace}/tensorboard/
rewrite:
uri: /
route:
- destination:
host: {tb_name}.{namespace}.svc.cluster.local
port:
number: 80
timeout: 300s
"""
tb_specs = [tb_deploy, tb_service, tb_virtual_service]
In [ ]:
k8s_util.apply_k8s_specs(tb_specs, k8s_util.K8S_CREATE_OR_REPLACE)
Run this with the appropriate RBAC permissions
In [ ]:
istio_ingress_endpoint = None
try:
istio_ingress_endpoint = api_client.read_namespaced_service(name='istio-ingressgateway', namespace='istio-system')
istio_ports = istio_ingress_endpoint.spec.ports
for istio_port in istio_ports:
if istio_port.name == "http2":
logging.warning("get worker-node-ip by running 'kubectl get nodes -o wide'")
logging.info(f"Tensorboard URL: <worker-node-ip>:{istio_port.node_port}/mnist/anonymous/tensorboard/")
except ApiException as e:
if e.status == 403:
logging.warning(f"The service account doesn't have sufficient privileges "
f"to get the kubeflow minio-service. "
f"You will have to manually enter the minio cluster-ip. "
f"To make this function work ask someone with cluster "
f"priveleges to create an appropriate "
f"clusterrolebinding by running a command.\n"
f"kubectl create --namespace=istio-system rolebinding "
"--clusterrole=kubeflow-view "
"--serviceaccount=${NAMESPACE}:default-editor "
"${NAMESPACE}-ingressgateway-view")
logging.warn("API Access restricted. Please get URL by running the kubectl commands at the end of the notebook")
In [ ]:
deploy_name = "mnist-model"
model_base_path = export_path
# The web ui defaults to mnist-service so if you change it you will
# need to change it in the UI as well to send predictions to the mode
model_service = "mnist-service"
deploy_spec = f"""apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: mnist
name: {deploy_name}
namespace: {namespace}
spec:
selector:
matchLabels:
app: mnist-model
template:
metadata:
# TODO(jlewi): Right now we disable the istio side car because otherwise ISTIO rbac will prevent the
# UI from sending RPCs to the server. We should create an appropriate ISTIO rbac authorization
# policy to allow traffic from the UI to the model servier.
# https://istio.io/docs/concepts/security/#target-selectors
annotations:
sidecar.istio.io/inject: "false"
labels:
app: mnist-model
version: v1
spec:
serviceAccount: default-editor
containers:
- args:
- --port=9000
- --rest_api_port=8500
- --model_name=mnist
- --model_base_path={model_base_path}
command:
- /usr/bin/tensorflow_model_server
env:
- name: modelBasePath
value: {model_base_path}
- name: S3_ENDPOINT
value: {s3_endpoint}
- name: AWS_ENDPOINT_URL
value: {minio_endpoint}
- name: AWS_REGION
value: {minio_region}
- name: BUCKET_NAME
value: {mnist_bucket}
- name: S3_USE_HTTPS
value: "0"
- name: S3_VERIFY_SSL
value: "0"
- name: AWS_ACCESS_KEY_ID
value: {minio_username}
- name: AWS_SECRET_ACCESS_KEY
value: {minio_key}
image: tensorflow/serving:1.15.0
imagePullPolicy: IfNotPresent
livenessProbe:
initialDelaySeconds: 30
periodSeconds: 30
tcpSocket:
port: 9000
name: mnist
ports:
- containerPort: 9000
- containerPort: 8500
resources:
limits:
cpu: "4"
memory: 4Gi
requests:
cpu: "1"
memory: 1Gi
volumeMounts:
- mountPath: /var/config/
name: model-config
volumes:
- configMap:
name: {deploy_name}
name: model-config
"""
service_spec = f"""apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /monitoring/prometheus/metrics
prometheus.io/port: "8500"
prometheus.io/scrape: "true"
labels:
app: mnist-model
name: {model_service}
namespace: {namespace}
spec:
ports:
- name: grpc-tf-serving
port: 9000
targetPort: 9000
- name: http-tf-serving
port: 8500
targetPort: 8500
selector:
app: mnist-model
type: ClusterIP
"""
monitoring_config = f"""kind: ConfigMap
apiVersion: v1
metadata:
name: {deploy_name}
namespace: {namespace}
data:
monitoring_config.txt: |-
prometheus_config: {{
enable: true,
path: "/monitoring/prometheus/metrics"
}}
"""
model_specs = [deploy_spec, service_spec, monitoring_config]
In [ ]:
k8s_util.apply_k8s_specs(model_specs, k8s_util.K8S_CREATE_OR_REPLACE)
In [ ]:
ui_name = "mnist-ui"
ui_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
name: {ui_name}
namespace: {namespace}
spec:
replicas: 1
selector:
matchLabels:
app: mnist-web-ui
template:
metadata:
labels:
app: mnist-web-ui
spec:
containers:
- image: gcr.io/kubeflow-examples/mnist/web-ui:v20190112-v0.2-142-g3b38225
name: web-ui
ports:
- containerPort: 5000
serviceAccount: default-editor
"""
ui_service = f"""apiVersion: v1
kind: Service
metadata:
annotations:
name: {ui_name}
namespace: {namespace}
spec:
ports:
- name: http-mnist-ui
port: 80
targetPort: 5000
selector:
app: mnist-web-ui
type: ClusterIP
"""
ui_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: {ui_name}
namespace: {namespace}
spec:
gateways:
- kubeflow/kubeflow-gateway
hosts:
- '*'
http:
- match:
- uri:
prefix: /mnist/{namespace}/ui/
rewrite:
uri: /
route:
- destination:
host: {ui_name}.{namespace}.svc.cluster.local
port:
number: 80
timeout: 300s
"""
ui_specs = [ui_deploy, ui_service, ui_virtual_service]
In [ ]:
k8s_util.apply_k8s_specs(ui_specs, k8s_util.K8S_CREATE_OR_REPLACE)
In [ ]:
istio_ingress_endpoint = None
try:
istio_ingress_endpoint = api_client.read_namespaced_service(name='istio-ingressgateway', namespace='istio-system')
istio_ports = istio_ingress_endpoint.spec.ports
for istio_port in istio_ports:
if istio_port.name == "http2":
logging.warning("get worker-node-ip by running 'kubectl get nodes -o wide'")
logging.info(f"Tensorboard URL: <worker-node-ip>:{istio_port.node_port}/mnist/anonymous/ui/")
except ApiException as e:
if e.status == 403:
logging.warning(f"The service account doesn't have sufficient privileges "
f"to get the kubeflow minio-service. "
f"You will have to manually enter the minio cluster-ip. "
f"To make this function work ask someone with cluster "
f"priveleges to create an appropriate "
f"clusterrolebinding by running a command.\n"
f"kubectl create --namespace=kubeflow rolebinding "
"--clusterrole=kubeflow-view "
"--serviceaccount=${NAMESPACE}:default-editor "
"${NAMESPACE}-minio-view")
logging.warn("API Access restricted. Please get URL by running the kubectl commands at the end of the notebook")
Run this with the appropriate RBAC permissions
Note: You can get the node worker ip from kubectl get no -o wide
export INGRESS_HOST=<worker-node-ip>
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
printf "Tensorboard URL: \n${INGRESS_HOST}:${INGRESS_PORT}/mnist/anonymous/tensorboard/\n"
Run this with the appropriate RBAC permissions
Note: You can get the node worker ip from kubectl get no -o wide
!export INGRESS_HOST=<worker-node-ip>
!export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
!printf "mnist-web-app URL: \n${INGRESS_HOST}:${INGRESS_PORT}/mnist/anonymous/ui/\n"
In [ ]: