In [ ]:
!pip install boto3
Note: Once IAM for Service Account is merged in 1.0.1, we don't have to use credentials
Note: To get base64 string, try
echo -n $AWS_ACCESS_KEY_ID | base64
. Make sure you haveAmazonEC2ContainerRegistryFullAccess
andAmazonS3FullAccess
for this experiment. Pods will use credentials to talk to AWS services.
In [ ]:
%%bash
# Replace placeholder with your own AWS credentials
AWS_ACCESS_KEY_ID='<your_aws_access_key_id>'
AWS_SECRET_ACCESS_KEY='<your_aws_secret_access_key>'
kubectl create secret generic aws-secret --from-literal=AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} --from-literal=AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
AmazonEC2ContainerRegistryFullAccess
and AmazonS3FullAccess
to EKS node group role and grant AWS access to notebook.
In [ ]:
import logging
import os
import uuid
from importlib import reload
import boto3
# Set REGION for s3 bucket and elastic contaienr registry
AWS_REGION='us-west-2'
boto3.client('s3', region_name=AWS_REGION).list_buckets()
boto3.client('ecr', region_name=AWS_REGION).describe_repositories()
There is a delta between existing distributed mnist examples and what's needed to run well as a TFJob.
Basically, we must:
tf.estimator.train_and_evaluate
to enable model exporting and serving.The resulting model is model.py.
In [ ]:
import notebook_setup
reload(notebook_setup)
notebook_setup.notebook_setup(platform='aws')
In [ ]:
import k8s_util
# Force a reload of kubeflow; since kubeflow is a multi namespace module
# it looks like doing this in notebook_setup may not be sufficient
import kubeflow
reload(kubeflow)
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from kubeflow.tfjob.api import tf_job_client as tf_job_client_module
from IPython.core.display import display, HTML
import yaml
In [ ]:
from kubernetes import client as k8s_client
from kubernetes.client import rest as k8s_rest
from kubeflow import fairing
from kubeflow.fairing import utils as fairing_utils
from kubeflow.fairing.builders import append
from kubeflow.fairing.deployers import job
from kubeflow.fairing.preprocessors import base as base_preprocessor
# Setting up AWS Elastic Container Registry (ECR) for storing output containers
# You can use any docker container registry istead of ECR
AWS_ACCOUNT_ID=fairing.cloud.aws.guess_account_id()
AWS_ACCOUNT_ID = boto3.client('sts').get_caller_identity().get('Account')
DOCKER_REGISTRY = '{}.dkr.ecr.{}.amazonaws.com'.format(AWS_ACCOUNT_ID, AWS_REGION)
namespace = fairing_utils.get_current_k8s_namespace()
logging.info(f"Running in aws region {AWS_REGION}, account {AWS_ACCOUNT_ID}")
logging.info(f"Running in namespace {namespace}")
logging.info(f"Using docker registry {DOCKER_REGISTRY}")
In [ ]:
# TODO(https://github.com/kubeflow/fairing/issues/426): We should get rid of this once the default
# Kaniko image is updated to a newer image than 0.7.0.
from kubeflow.fairing import constants
constants.constants.KANIKO_IMAGE = "gcr.io/kaniko-project/executor:v0.14.0"
In [ ]:
from kubeflow.fairing.builders import cluster
# output_map is a map of extra files to add to the notebook.
# It is a map from source location to the location inside the context.
output_map = {
"Dockerfile.model": "Dockerfile",
"model.py": "model.py"
}
preprocessor = base_preprocessor.BasePreProcessor(
command=["python"], # The base class will set this.
input_files=[],
path_prefix="/app", # irrelevant since we aren't preprocessing any files
output_map=output_map)
preprocessor.preprocess()
In [ ]:
# Create a new ECR repository to host model image
!aws ecr create-repository --repository-name mnist --region=$AWS_REGION
In [ ]:
# Use a Tensorflow image as the base image
# We use a custom Dockerfile
cluster_builder = cluster.cluster.ClusterBuilder(registry=DOCKER_REGISTRY,
base_image="", # base_image is set in the Dockerfile
preprocessor=preprocessor,
image_name="mnist",
dockerfile_path="Dockerfile",
pod_spec_mutators=[fairing.cloud.aws.add_aws_credentials_if_exists, fairing.cloud.aws.add_ecr_config],
context_source=cluster.s3_context.S3ContextSource(region=AWS_REGION))
cluster_builder.build()
logging.info(f"Built image {cluster_builder.image_tag}")
In [ ]:
import boto3
from botocore.exceptions import ClientError
bucket = f"{AWS_ACCOUNT_ID}-mnist"
def create_bucket(bucket_name, region=None):
"""Create an S3 bucket in a specified region
If a region is not specified, the bucket is created in the S3 default
region (us-east-1).
:param bucket_name: Bucket to create
:param region: String region to create bucket in, e.g., 'us-west-2'
:return: True if bucket created, else False
"""
# Create bucket
try:
if region is None:
s3_client = boto3.client('s3')
s3_client.create_bucket(Bucket=bucket_name)
else:
s3_client = boto3.client('s3', region_name=region)
location = {'LocationConstraint': region}
s3_client.create_bucket(Bucket=bucket_name,
CreateBucketConfiguration=location)
except ClientError as e:
logging.error(e)
return False
return True
create_bucket(bucket, AWS_REGION)
In [ ]:
train_name = f"mnist-train-{uuid.uuid4().hex[:4]}"
num_ps = 1
num_workers = 2
model_dir = f"s3://{bucket}/mnist"
export_path = f"s3://{bucket}/mnist/export"
train_steps = 200
batch_size = 100
learning_rate = .01
image = cluster_builder.image_tag
train_spec = f"""apiVersion: kubeflow.org/v1
kind: TFJob
metadata:
name: {train_name}
spec:
tfReplicaSpecs:
Ps:
replicas: {num_ps}
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
image: {image}
workingDir: /opt
env:
- name: AWS_REGION
value: {AWS_REGION}
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-secret
key: AWS_SECRET_ACCESS_KEY
restartPolicy: OnFailure
Chief:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
image: {image}
workingDir: /opt
env:
- name: AWS_REGION
value: {AWS_REGION}
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-secret
key: AWS_SECRET_ACCESS_KEY
restartPolicy: OnFailure
Worker:
replicas: 1
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
serviceAccount: default-editor
containers:
- name: tensorflow
command:
- python
- /opt/model.py
- --tf-model-dir={model_dir}
- --tf-export-dir={export_path}
- --tf-train-steps={train_steps}
- --tf-batch-size={batch_size}
- --tf-learning-rate={learning_rate}
image: {image}
workingDir: /opt
env:
- name: AWS_REGION
value: {AWS_REGION}
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-secret
key: AWS_SECRET_ACCESS_KEY
restartPolicy: OnFailure
"""
kubectl apply -f {FILE}
In [ ]:
tf_job_client = tf_job_client_module.TFJobClient()
In [ ]:
tf_job_body = yaml.safe_load(train_spec)
tf_job = tf_job_client.create(tf_job_body, namespace=namespace)
logging.info(f"Created job {namespace}.{train_name}")
In [ ]:
!kubectl get tfjobs -o yaml {train_name}
There are two ways to get the logs for the training job
In [ ]:
tb_name = "mnist-tensorboard"
tb_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: mnist-tensorboard
name: {tb_name}
namespace: {namespace}
spec:
selector:
matchLabels:
app: mnist-tensorboard
template:
metadata:
labels:
app: mnist-tensorboard
version: v1
spec:
serviceAccount: default-editor
containers:
- command:
- /usr/local/bin/tensorboard
- --logdir={model_dir}
- --port=80
image: tensorflow/tensorflow:1.15.2-py3
name: tensorboard
env:
- name: AWS_REGION
value: {AWS_REGION}
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-secret
key: AWS_SECRET_ACCESS_KEY
ports:
- containerPort: 80
"""
tb_service = f"""apiVersion: v1
kind: Service
metadata:
labels:
app: mnist-tensorboard
name: {tb_name}
namespace: {namespace}
spec:
ports:
- name: http-tb
port: 80
targetPort: 80
selector:
app: mnist-tensorboard
type: ClusterIP
"""
tb_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: {tb_name}
namespace: {namespace}
spec:
gateways:
- kubeflow/kubeflow-gateway
hosts:
- '*'
http:
- match:
- uri:
prefix: /mnist/{namespace}/tensorboard/
rewrite:
uri: /
route:
- destination:
host: {tb_name}.{namespace}.svc.cluster.local
port:
number: 80
timeout: 300s
"""
tb_specs = [tb_deploy, tb_service, tb_virtual_service]
In [ ]:
k8s_util.apply_k8s_specs(tb_specs, k8s_util.K8S_CREATE_OR_REPLACE)
In [ ]:
endpoint = k8s_util.get_ingress_endpoint()
if endpoint:
vs = yaml.safe_load(tb_virtual_service)
path= vs["spec"]["http"][0]["match"][0]["uri"]["prefix"]
tb_endpoint = endpoint + path
display(HTML(f"TensorBoard UI is at <a href='{tb_endpoint}'>{tb_endpoint}</a>"))
In [ ]:
tf_job = tf_job_client.wait_for_condition(train_name, expected_condition=["Succeeded", "Failed"], namespace=namespace)
if tf_job_client.is_job_succeeded(train_name, namespace):
logging.info(f"TFJob {namespace}.{train_name} succeeded")
else:
raise ValueError(f"TFJob {namespace}.{train_name} failed")
In [ ]:
import os
deploy_name = "mnist-model"
model_base_path = export_path
# The web ui defaults to mnist-service so if you change it you will
# need to change it in the UI as well to send predictions to the mode
model_service = "mnist-service"
deploy_spec = f"""apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: mnist
name: {deploy_name}
namespace: {namespace}
spec:
selector:
matchLabels:
app: mnist-model
template:
metadata:
# TODO(jlewi): Right now we disable the istio side car because otherwise ISTIO rbac will prevent the
# UI from sending RPCs to the server. We should create an appropriate ISTIO rbac authorization
# policy to allow traffic from the UI to the model servier.
# https://istio.io/docs/concepts/security/#target-selectors
annotations:
sidecar.istio.io/inject: "false"
labels:
app: mnist-model
version: v1
spec:
serviceAccount: default-editor
containers:
- args:
- --port=9000
- --rest_api_port=8500
- --model_name=mnist
- --model_base_path={model_base_path}
- --monitoring_config_file=/var/config/monitoring_config.txt
command:
- /usr/bin/tensorflow_model_server
env:
- name: modelBasePath
value: {model_base_path}
- name: AWS_REGION
value: {AWS_REGION}
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-secret
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-secret
key: AWS_SECRET_ACCESS_KEY
image: tensorflow/serving:1.15.0
imagePullPolicy: IfNotPresent
livenessProbe:
initialDelaySeconds: 30
periodSeconds: 30
tcpSocket:
port: 9000
name: mnist
ports:
- containerPort: 9000
- containerPort: 8500
resources:
limits:
cpu: "1"
memory: 1Gi
requests:
cpu: "1"
memory: 1Gi
volumeMounts:
- mountPath: /var/config/
name: model-config
volumes:
- configMap:
name: {deploy_name}
name: model-config
"""
service_spec = f"""apiVersion: v1
kind: Service
metadata:
annotations:
prometheus.io/path: /monitoring/prometheus/metrics
prometheus.io/port: "8500"
prometheus.io/scrape: "true"
labels:
app: mnist-model
name: {model_service}
namespace: {namespace}
spec:
ports:
- name: grpc-tf-serving
port: 9000
targetPort: 9000
- name: http-tf-serving
port: 8500
targetPort: 8500
selector:
app: mnist-model
type: ClusterIP
"""
monitoring_config = f"""kind: ConfigMap
apiVersion: v1
metadata:
name: {deploy_name}
namespace: {namespace}
data:
monitoring_config.txt: |-
prometheus_config: {{
enable: true,
path: "/monitoring/prometheus/metrics"
}}
"""
model_specs = [deploy_spec, service_spec, monitoring_config]
In [ ]:
k8s_util.apply_k8s_specs(model_specs, k8s_util.K8S_CREATE_OR_REPLACE)
In [ ]:
ui_name = "mnist-ui"
ui_deploy = f"""apiVersion: apps/v1
kind: Deployment
metadata:
name: {ui_name}
namespace: {namespace}
spec:
replicas: 1
selector:
matchLabels:
app: mnist-web-ui
template:
metadata:
labels:
app: mnist-web-ui
spec:
containers:
- image: gcr.io/kubeflow-examples/mnist/web-ui:v20190112-v0.2-142-g3b38225
name: web-ui
ports:
- containerPort: 5000
serviceAccount: default-editor
"""
ui_service = f"""apiVersion: v1
kind: Service
metadata:
annotations:
name: {ui_name}
namespace: {namespace}
spec:
ports:
- name: http-mnist-ui
port: 80
targetPort: 5000
selector:
app: mnist-web-ui
type: ClusterIP
"""
ui_virtual_service = f"""apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
name: {ui_name}
namespace: {namespace}
spec:
gateways:
- kubeflow/kubeflow-gateway
hosts:
- '*'
http:
- match:
- uri:
prefix: /mnist/{namespace}/ui/
rewrite:
uri: /
route:
- destination:
host: {ui_name}.{namespace}.svc.cluster.local
port:
number: 80
timeout: 300s
"""
ui_specs = [ui_deploy, ui_service, ui_virtual_service]
In [ ]:
k8s_util.apply_k8s_specs(ui_specs, k8s_util.K8S_CREATE_OR_REPLACE)
The endpoint will be
http:/${KUBEflOW_ENDPOINT}/mnist/${NAMESPACE}/ui/
You can get the KUBEFLOW_ENDPOINT
KUBEfLOW_ENDPOINT=`kubectl -n istio-system get ingress istio-ingress -o jsonpath="{.status.loadBalancer.ingress[0].hostname}"`
If you have sufficient privileges you can run the cell below to get the endpoint if you don't have sufficient priveleges you can grant appropriate permissions by running the command
kubectl create --namespace=istio-system rolebinding --clusterrole=kubeflow-view --serviceaccount=${NAMESPACE}:default-editor ${NAMESPACE}-istio-view
In [ ]:
endpoint = k8s_util.get_ingress_endpoint()
if endpoint:
vs = yaml.safe_load(ui_virtual_service)
path= vs["spec"]["http"][0]["match"][0]["uri"]["prefix"]
ui_endpoint = endpoint + path
display(HTML(f"mnist UI is at <a href='{ui_endpoint}'>{ui_endpoint}</a>"))