This example guides you through:
There is a delta between existing distributed MNIST examples and what's needed to run well as a TFJob.
Basically, you must:
tf.estimator.train_and_evaluate
to enable model exporting and serving.This tutorial provides a Python program that's already prepared for you: model.py.
In [1]:
import logging
import os
import uuid
from importlib import reload
from oauth2client.client import GoogleCredentials
credentials = GoogleCredentials.get_application_default()
In [2]:
import notebook_setup
reload(notebook_setup)
notebook_setup.notebook_setup()
Wait for the message Configure docker credentials
before moving on to the next cell.
In [3]:
import k8s_util
# Force a reload of Kubeflow. Since Kubeflow is a multi namespace module,
# doing the reload in notebook_setup may not be sufficient.
import kubeflow
reload(kubeflow)
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from kubeflow.tfjob.api import tf_job_client as tf_job_client_module
from IPython.core.display import display, HTML
import yaml
In [4]:
from kubernetes import client as k8s_client
from kubernetes.client import rest as k8s_rest
from kubeflow import fairing
from kubeflow.fairing import utils as fairing_utils
from kubeflow.fairing.builders import append
from kubeflow.fairing.deployers import job
from kubeflow.fairing.preprocessors import base as base_preprocessor
# Setting up Google Container Registry (GCR) for storing output containers.
# You can use any Docker container registry instead of GCR.
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
namespace = fairing_utils.get_current_k8s_namespace()
logging.info(f"Running in project {GCP_PROJECT}")
logging.info(f"Running in namespace {namespace}")
logging.info(f"Using Docker registry {DOCKER_REGISTRY}")
This notebook uses Kubeflow Fairing's kaniko builder to build a Docker image that includes all your dependencies.
pip
to install dependencies.
In [5]:
# TODO(https://github.com/kubeflow/fairing/issues/426): We should get rid of this once the default
# Kaniko image is updated to a newer image than 0.7.0.
from kubeflow.fairing import constants
constants.constants.KANIKO_IMAGE = "gcr.io/kaniko-project/executor:v0.14.0"
In [6]:
from kubeflow.fairing.builders import cluster
# output_map is a map of extra files to add to the notebook.
# It is a map from source location to the location inside the context.
output_map = {
"Dockerfile.model": "Dockerfile",
"model.py": "model.py"
}
preprocessor = base_preprocessor.BasePreProcessor(
command=["python"], # The base class will set this.
input_files=[],
path_prefix="/app", # irrelevant since we aren't preprocessing any files
output_map=output_map)
preprocessor.preprocess()
Out[6]:
Run the next cell and wait until you see a message like Built image gcr.io/<your-project>/fairing-job/mnist:<1234567>
.
In [7]:
# Use a Tensorflow image as the base image
# We use a custom Dockerfile
cluster_builder = cluster.cluster.ClusterBuilder(registry=DOCKER_REGISTRY,
base_image="", # base_image is set in the Dockerfile
preprocessor=preprocessor,
image_name="mnist",
dockerfile_path="Dockerfile",
pod_spec_mutators=[fairing.cloud.gcp.add_gcp_credentials_if_exists],
context_source=cluster.gcs_context.GCSContextSource())
cluster_builder.build()
logging.info(f"Built image {cluster_builder.image_tag}")
In [8]:
from google.cloud import storage
bucket = f"{GCP_PROJECT}-mnist"
client = storage.Client()
b = storage.Bucket(client=client, name=bucket)
if not b.exists():
logging.info(f"Creating bucket {bucket}")
b.create()
else:
logging.info(f"Bucket {bucket} already exists")
To train the model, this example uses TFJob to run a distributed training job. Run the next cell to set up the YAML specification for the job:
In [9]:
train_name = f"mnist-train-{uuid.uuid4().hex[:4]}"
num_ps = 1
num_workers = 2
model_dir = f"gs://{bucket}/mnist"
export_path = f"gs://{bucket}/mnist/export"
train_steps = 200
batch_size = 100
learning_rate = .01
image = cluster_builder.image_tag
train_spec = f"""apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
name: {train_name}
spec:
tfReplicaSpecs:
Ps:
replicas: {num_ps}
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
image: {image}
workingDir: /opt
restartPolicy: OnFailure
Chief:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
image: {image}
workingDir: /opt
restartPolicy: OnFailure
Worker:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
image: {image}
workingDir: /opt
restartPolicy: OnFailure
"""
To submit the training job, you could write the spec to a YAML file and then do kubectl apply -f {FILE}
.
However, because you are running in a Jupyter notebook, you use the TFJob client.
In [10]:
tf_job_client = tf_job_client_module.TFJobClient()
In [11]:
tf_job_body = yaml.safe_load(train_spec)
tf_job = tf_job_client.create(tf_job_body, namespace=namespace)
logging.info(f"Created job {namespace}.{train_name}")
In [12]:
!kubectl get tfjobs -o yaml {train_name}
There are two ways to get the logs for the training job:
Using Stackdriver.
Run the cell below to get a link to Stackdriver for your logs:
In [13]:
from urllib.parse import urlencode
for replica in ["chief", "worker", "ps"]:
logs_filter = f"""resource.type="k8s_container"
labels."k8s-pod/tf-job-name" = "{train_name}"
labels."k8s-pod/tf-replica-type" = "{replica}"
resource.labels.container_name="tensorflow" """
new_params = {'project': GCP_PROJECT,
# Logs for last 7 days
'interval': 'P7D',
'advancedFilter': logs_filter}
query = urlencode(new_params)
url = "https://console.cloud.google.com/logs/viewer?" + query
display(HTML(f"Link to: <a href='{url}'>{replica} logs</a>"))
In [14]:
tb_name = "mnist-tensorboard"
tb_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: mnist-tensorboard
name: {tb_name}
namespace: {namespace}
spec:
selector:
matchLabels:
app: mnist-tensorboard
template:
metadata:
labels:
app: mnist-tensorboard
version: v1
spec:
serviceAccount: default-editor
containers:
- command:
- /usr/local/bin/tensorboard
- --logdir={model_dir}
- --port=80
image: tensorflow/tensorflow:1.15.2-py3
name: tensorboard
ports:
- containerPort: 80
"""
tb_service = f"""apiVersion: v1
kind: Service
metadata:
labels:
app: mnist-tensorboard
name: {tb_name}
namespace: {namespace}
spec:
ports:
- name: http-tb
port: 80
targetPort: 80
selector:
app: mnist-tensorboard
type: ClusterIP
"""
tb_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: {tb_name}
namespace: {namespace}
spec:
gateways:
- kubeflow/kubeflow-gateway
hosts:
- '*'
http:
- match:
- uri:
prefix: /mnist/{namespace}/tensorboard/
rewrite:
uri: /
route:
- destination:
host: {tb_name}.{namespace}.svc.cluster.local
port:
number: 80
timeout: 300s
"""
tb_specs = [tb_deploy, tb_service, tb_virtual_service]
In [15]:
k8s_util.apply_k8s_specs(tb_specs, k8s_util.K8S_CREATE_OR_REPLACE)
Out[15]:
Set endpoint
to https://your-domain
(with no slash at the end). Your domain typically has the following pattern: <your-kubeflow-deployment-name>.endpoints.<your-gcp-project>.cloud.goog
. You can see your domain in the URL that you're using to access this notebook.
In [36]:
endpoint = None
if endpoint:
logging.info(f"endpoint set to {endpoint}")
else:
logging.info("Warning: You must set {endpoint} in order to print out the URLs where you can access your web apps.")
In [37]:
if endpoint:
vs = yaml.safe_load(tb_virtual_service)
path= vs["spec"]["http"][0]["match"][0]["uri"]["prefix"]
tb_endpoint = endpoint + path
display(HTML(f"TensorBoard UI is at <a href='{tb_endpoint}'>{tb_endpoint}</a>"))
You can use the TFJob client to wait for the job to finish:
In [18]:
tf_job = tf_job_client.wait_for_condition(train_name, expected_condition=["Succeeded", "Failed"], namespace=namespace)
if tf_job_client.is_job_succeeded(train_name, namespace):
logging.info(f"TFJob {namespace}.{train_name} succeeded")
else:
raise ValueError(f"TFJob {namespace}.{train_name} failed")
Now you can deploy the model using TensorFlow Serving.
You need to create the following:
In [19]:
deploy_name = "mnist-model"
model_base_path = export_path
# The web UI defaults to mnist-service so if you change the name, you must
# change it in the UI as well.
model_service = "mnist-service"
deploy_spec = f"""apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: mnist
name: {deploy_name}
namespace: {namespace}
spec:
selector:
matchLabels:
app: mnist-model
template:
metadata:
# TODO(jlewi): Right now we disable the istio side car because otherwise ISTIO rbac will prevent the
# UI from sending RPCs to the server. We should create an appropriate ISTIO rbac authorization
# policy to allow traffic from the UI to the model servier.
# https://istio.io/docs/concepts/security/#target-selectors
annotations:
sidecar.istio.io/inject: "false"
labels:
app: mnist-model
version: v1
spec:
serviceAccount: default-editor
containers:
- args:
- --port=9000
- --rest_api_port=8500
- --model_name=mnist
- --model_base_path={model_base_path}
- --monitoring_config_file=/var/config/monitoring_config.txt
command:
- /usr/bin/tensorflow_model_server
env:
- name: modelBasePath
value: {model_base_path}
image: tensorflow/serving:1.15.0
imagePullPolicy: IfNotPresent
livenessProbe:
initialDelaySeconds: 30
periodSeconds: 30
tcpSocket:
port: 9000
name: mnist
ports:
- containerPort: 9000
- containerPort: 8500
resources:
limits:
cpu: "4"
memory: 4Gi
requests:
cpu: "1"
memory: 1Gi
volumeMounts:
- mountPath: /var/config/
name: model-config
volumes:
- configMap:
name: {deploy_name}
name: model-config
"""
service_spec = f"""apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /monitoring/prometheus/metrics
prometheus.io/port: "8500"
prometheus.io/scrape: "true"
labels:
app: mnist-model
name: {model_service}
namespace: {namespace}
spec:
ports:
- name: grpc-tf-serving
port: 9000
targetPort: 9000
- name: http-tf-serving
port: 8500
targetPort: 8500
selector:
app: mnist-model
type: ClusterIP
"""
monitoring_config = f"""kind: ConfigMap
apiVersion: v1
metadata:
name: {deploy_name}
namespace: {namespace}
data:
monitoring_config.txt: |-
prometheus_config: {{
enable: true,
path: "/monitoring/prometheus/metrics"
}}
"""
model_specs = [deploy_spec, service_spec, monitoring_config]
In [20]:
k8s_util.apply_k8s_specs(model_specs, k8s_util.K8S_CREATE_OR_REPLACE)
Out[20]:
In [21]:
ui_name = "mnist-ui"
ui_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
name: {ui_name}
namespace: {namespace}
spec:
replicas: 1
selector:
matchLabels:
app: mnist-web-ui
template:
metadata:
labels:
app: mnist-web-ui
spec:
containers:
- image: gcr.io/kubeflow-examples/mnist/web-ui:v20190112-v0.2-142-g3b38225
name: web-ui
ports:
- containerPort: 5000
serviceAccount: default-editor
"""
ui_service = f"""apiVersion: v1
kind: Service
metadata:
annotations:
name: {ui_name}
namespace: {namespace}
spec:
ports:
- name: http-mnist-ui
port: 80
targetPort: 5000
selector:
app: mnist-web-ui
type: ClusterIP
"""
ui_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: {ui_name}
namespace: {namespace}
spec:
gateways:
- kubeflow/kubeflow-gateway
hosts:
- '*'
http:
- match:
- uri:
prefix: /mnist/{namespace}/ui/
rewrite:
uri: /
route:
- destination:
host: {ui_name}.{namespace}.svc.cluster.local
port:
number: 80
timeout: 300s
"""
ui_specs = [ui_deploy, ui_service, ui_virtual_service]
In [22]:
k8s_util.apply_k8s_specs(ui_specs, k8s_util.K8S_CREATE_OR_REPLACE)
Out[22]:
In [38]:
if endpoint:
vs = yaml.safe_load(ui_virtual_service)
path= vs["spec"]["http"][0]["match"][0]["uri"]["prefix"]
ui_endpoint = endpoint + path
display(HTML(f"mnist UI is at <a href='{ui_endpoint}'>{ui_endpoint}</a>"))
Open the MNIST UI in your browser. You should see an image of a hand-written digit from 0 to 9. This is a random image sent to the model for classification. Below the image is a set of bar graphs, one for each classification label from 0 to 9, as output by the model. Each bar represents the probability that the image matches the respective label.
Click the test random image button to send the model a new image.
Visit the Kubeflow docs for more information about running Kubeflow on GCP.
In [ ]: