In [ ]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

Local development and docker image components

  • This section assumes that you have already created a program to perform the task required in a particular step of your ML workflow. This example uses an MNIST model training script.

  • Then, this example packages your program as a Docker container image.

  • Then, this example calls kfp.components.ContainerOp to convert it to a Kubeflow pipeline component.

Note: Ensure that you have Docker installed, if you want to build the image locally, by running the following command:

which docker

The result should be something like:

/usr/bin/docker


In [ ]:
import kfp
import kfp.gcp as gcp
import kfp.dsl as dsl
import kfp.compiler as compiler
import kfp.components as comp
import datetime

import kubernetes as k8s

In [ ]:
# Required Parameters
PROJECT_ID='<ADD GCP PROJECT HERE>'
GCS_BUCKET='gs://<ADD STORAGE LOCATION HERE>'

Create client

If you run this notebook outside of a Kubeflow cluster, run the following command:

  • host: The URL of your Kubeflow Pipelines instance, for example "https://<your-deployment>.endpoints.<your-project>.cloud.goog/pipeline"
  • client_id: The client ID used by Identity-Aware Proxy
  • other_client_id: The client ID used to obtain the auth codes and refresh tokens.
  • other_client_secret: The client secret used to obtain the auth codes and refresh tokens.
client = kfp.Client(host, client_id, other_client_id, other_client_secret)

If you run this notebook within a Kubeflow cluster, run the following command:

client = kfp.Client()

You'll need to create OAuth client ID credentials of type Other to get other_client_id and other_client_secret. Learn more about creating OAuth credentials


In [ ]:
# Optional Parameters, but required for running outside Kubeflow cluster

# The host for 'AI Platform Pipelines' ends with 'pipelines.googleusercontent.com'
# The host for pipeline endpoint of 'full Kubeflow deployment' ends with '/pipeline'
# Examples are:
# https://7c021d0340d296aa-dot-us-central2.pipelines.googleusercontent.com
# https://kubeflow.endpoints.kubeflow-pipeline.cloud.goog/pipeline
HOST = '<ADD HOST NAME TO TALK TO KUBEFLOW PIPELINE HERE>'

# For 'full Kubeflow deployment' on GCP, the endpoint is usually protected through IAP, therefore the following 
# will be needed to access the endpoint.
CLIENT_ID = '<ADD OAuth CLIENT ID USED BY IAP HERE>'
OTHER_CLIENT_ID = '<ADD OAuth CLIENT ID USED TO OBTAIN AUTH CODES HERE>'
OTHER_CLIENT_SECRET = '<ADD OAuth CLIENT SECRET USED TO OBTAIN AUTH CODES HERE>'

In [ ]:
# This is to ensure the proper access token is present to reach the end point for 'AI Platform Pipelines'
# If you are not working with 'AI Platform Pipelines', this step is not necessary
! gcloud auth print-access-token

In [ ]:
# Create kfp client
in_cluster = True
try:
  k8s.config.load_incluster_config()
except:
  in_cluster = False
  pass

if in_cluster:
    client = kfp.Client()
else:
    if HOST.endswith('googleusercontent.com'):
        CLIENT_ID = None
        OTHER_CLIENT_ID = None
        OTHER_CLIENT_SECRET = None

    client = kfp.Client(host=HOST, 
                        client_id=CLIENT_ID,
                        other_client_id=OTHER_CLIENT_ID, 
                        other_client_secret=OTHER_CLIENT_SECRET)

Wrap an existing Docker container image using ContainerOp

Writing the program code

The following cell creates a file app.py that contains a Python script. The script downloads MNIST dataset, trains a Neural Network based classification model, writes the training log and exports the trained model to Google Cloud Storage.

Your component can create outputs that the downstream components can use as inputs. Each output must be a string and the container image must write each output to a separate local text file. For example, if a training component needs to output the path of the trained model, the component writes the path into a local file, such as /output.txt.


In [ ]:
%%bash

# Create folders if they don't exist.
mkdir -p tmp/components/mnist_training

# Create the Python file that lists GCS blobs.
cat > ./tmp/components/mnist_training/app.py <<HERE
import argparse
from datetime import datetime
import tensorflow as tf

parser = argparse.ArgumentParser()
parser.add_argument(
    '--model_file', type=str, required=True, help='Name of the model file.')
parser.add_argument(
    '--bucket', type=str, required=True, help='GCS bucket name.')
args = parser.parse_args()

bucket=args.bucket
model_file=args.model_file

model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(512, activation=tf.nn.relu),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

print(model.summary())    

mnist = tf.keras.datasets.mnist
(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

callbacks = [
  tf.keras.callbacks.TensorBoard(log_dir=bucket + '/logs/' + datetime.now().date().__str__()),
  # Interrupt training if val_loss stops improving for over 2 epochs
  tf.keras.callbacks.EarlyStopping(patience=2, monitor='val_loss'),
]

model.fit(x_train, y_train, batch_size=32, epochs=5, callbacks=callbacks,
          validation_data=(x_test, y_test))


model.save(model_file)

from tensorflow import gfile

gcs_path = bucket + "/" + model_file

if gfile.Exists(gcs_path):
    gfile.Remove(gcs_path)

gfile.Copy(model_file, gcs_path)
with open('/output.txt', 'w') as f:
  f.write(gcs_path)
HERE

Creating a Dockerfile

Now create a container that runs the script. Start by creating a Dockerfile. A Dockerfile contains the instructions to assemble a Docker image. The FROM statement specifies the Base Image from which you are building. WORKDIR sets the working directory. When you assemble the Docker image, COPY copies the required files and directories (for example, app.py) to the file system of the container. RUN executes a command (for example, install the dependencies) and commits the results.


In [ ]:
%%bash

# Create Dockerfile.
cat > ./tmp/components/mnist_training/Dockerfile <<EOF
FROM tensorflow/tensorflow:1.15.0-py3
WORKDIR /app
COPY . /app
EOF

Build docker image

Now that we have created our Dockerfile for creating our Docker image. Then we need to build the image and push to a registry to host the image. There are three possible options:

  • Use the kfp.containers.build_image_from_working_dir to build the image and push to the Container Registry (GCR). This requires kaniko, which will be auto-installed with 'full Kubeflow deployment' but not 'AI Platform Pipelines'.
  • Use Cloud Build, which would require the setup of GCP project and enablement of corresponding API. If you are working with GCP 'AI Platform Pipelines' with GCP project running, it is recommended to use Cloud Build.
  • Use Docker installed locally and push to e.g. GCR.

Note: If you run this notebook within Kubeflow cluster, with Kubeflow version >= 0.7 and exploring kaniko option, you need to ensure that valid credentials are created within your notebook's namespace.

%%bash

NAMESPACE=<your notebook name space>
SOURCE=kubeflow
NAME=user-gcp-sa
SECRET=$(kubectl get secrets \${NAME} -n \${SOURCE} -o jsonpath="{.data.\${NAME}\.json}" | base64 -D)
kubectl create -n \${NAMESPACE} secret generic \${NAME} --from-literal="\${NAME}.json=\${SECRET}"

In [ ]:
IMAGE_NAME="mnist_training_kf_pipeline"
TAG="latest" # "v_$(date +%Y%m%d_%H%M%S)"

GCR_IMAGE="gcr.io/{PROJECT_ID}/{IMAGE_NAME}:{TAG}".format(
    PROJECT_ID=PROJECT_ID,
    IMAGE_NAME=IMAGE_NAME,
    TAG=TAG
)

APP_FOLDER='./tmp/components/mnist_training/'

In [ ]:
# In the following, for the purpose of demonstration
# Cloud Build is choosen for 'AI Platform Pipelines'
# kaniko is choosen for 'full Kubeflow deployment'

if HOST.endswith('googleusercontent.com'):
    # kaniko is not pre-installed with 'AI Platform Pipelines'
    import subprocess
    # ! gcloud builds submit --tag ${IMAGE_NAME} ${APP_FOLDER}
    cmd = ['gcloud', 'builds', 'submit', '--tag', GCR_IMAGE, APP_FOLDER]
    build_log = (subprocess.run(cmd, stdout=subprocess.PIPE).stdout[:-1].decode('utf-8'))
    print(build_log)
    
else:
    if kfp.__version__ <= '0.1.36':
        # kfp with version 0.1.36+ introduce broken change that will make the following code not working
        import subprocess
        
        builder = kfp.containers._container_builder.ContainerBuilder(
            gcs_staging=GCS_BUCKET + "/kfp_container_build_staging"
        )

        kfp.containers.build_image_from_working_dir(
            image_name=GCR_IMAGE,
            working_dir=APP_FOLDER,
            builder=builder
        )
    else:
        raise("Please build the docker image use either [Docker] or [Cloud Build]")

If you want to use docker to build the image

Run the following in a cell

%%bash -s "{PROJECT_ID}"

IMAGE_NAME="mnist_training_kf_pipeline"
TAG="latest" # "v_$(date +%Y%m%d_%H%M%S)"

# Create script to build docker image and push it.
cat > ./tmp/components/mnist_training/build_image.sh <<HERE
PROJECT_ID="${1}"
IMAGE_NAME="${IMAGE_NAME}"
TAG="${TAG}"
GCR_IMAGE="gcr.io/\${PROJECT_ID}/\${IMAGE_NAME}:\${TAG}"
docker build -t \${IMAGE_NAME} .
docker tag \${IMAGE_NAME} \${GCR_IMAGE}
docker push \${GCR_IMAGE}
docker image rm \${IMAGE_NAME}
docker image rm \${GCR_IMAGE}
HERE

cd tmp/components/mnist_training
bash build_image.sh

In [ ]:
image_name = GCR_IMAGE

Define each component

Define a component by creating an instance of kfp.dsl.ContainerOp that describes the interactions with the Docker container image created in the previous step. You need to specify

  • component name
  • the image to use
  • the command to run after the container starts (If None, uses default CMD in defined in container.)
  • the input arguments
  • the file outputs (In the app.py above, the path of the trained model is written to /output.txt.)

In [ ]:
def mnist_train_op(model_file, bucket):
    return dsl.ContainerOp(
      name="mnist_training_container",
      image='gcr.io/{}/mnist_training_kf_pipeline:latest'.format(PROJECT_ID),
      command=['python', '/app/app.py'],
      file_outputs={'outputs': '/output.txt'},
      arguments=['--bucket', bucket, '--model_file', model_file]
    )

Create your workflow as a Python function

Define your pipeline as a Python function. @kfp.dsl.pipeline is a required decoration including name and description properties. Then compile the pipeline function. After the compilation is completed, a pipeline file is created.


In [ ]:
# Define the pipeline
@dsl.pipeline(
   name='Mnist pipeline',
   description='A toy pipeline that performs mnist model training.'
)
def mnist_container_pipeline(
    model_file: str = 'mnist_model.h5', 
    bucket: str = GCS_BUCKET
):
    mnist_train_op(model_file=model_file, bucket=bucket).apply(gcp.use_gcp_secret('user-gcp-sa'))

Submit a pipeline run


In [ ]:
pipeline_func = mnist_container_pipeline

In [ ]:
experiment_name = 'minist_kubeflow'

arguments = {"model_file":"mnist_model.h5",
             "bucket":GCS_BUCKET}

run_name = pipeline_func.__name__ + ' run'

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

As an alternative, you can compile the pipeline into a package. The compiled pipeline can be easily shared and reused by others to run the pipeline.

pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)

experiment = client.create_experiment('python-functions-mnist')

run_result = client.run_pipeline(
    experiment_id=experiment.id, 
    job_name=run_name, 
    pipeline_package_path=pipeline_filename, 
    params=arguments)

In [ ]: