Kubeflow pipelines

Learning Objectives:

  1. Learn how to deploy a Kubeflow cluster on GCP
  2. Learn how to create a experiment in Kubeflow
  3. Learn how to package you code into a Kubeflow pipeline
  4. Learn how to run a Kubeflow pipeline in a repeatable and traceable way

Introduction

In this notebook, we will first setup a Kubeflow cluster on GCP. Then, we will create a Kubeflow experiment and a Kubflow pipeline from our taxifare machine learning code. At last, we will run the pipeline on the Kubeflow cluster, providing us with a reproducible and traceable way to execute machine learning code.


In [ ]:
!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst

In [ ]:
pip freeze | grep kfp || pip install kfp

In [ ]:
from os import path

import kfp
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.notebook

Setup a Kubeflow cluster on GCP

TODO 1

To deploy a Kubeflow cluster in your GCP project, use the AI Platform pipelines:

  1. Go to AI Platform Pipelines in the GCP Console.
  2. Create a new instance
  3. Hit "Configure"
  4. Check the box "Allow access to the following Cloud APIs"
  5. Hit "Create Cluster"
  6. Hit "Deploy"

When the cluster is ready, go back to the AI Platform pipelines page and click on "SETTINGS" entry for your cluster. This will bring up a pop up with code snippets on how to access the cluster programmatically.

Copy the "host" entry and set the "HOST" variable below with that.


In [ ]:
HOST = # TODO: fill in the HOST information for the cluster
BUCKET = # TODO: fill in the GCS bucket

Create an experiment

TODO 2

We will start by creating a Kubeflow client to pilot the Kubeflow cluster:


In [ ]:
client = # TODO: create a Kubeflow client

Let's look at the experiments that are running on this cluster. Since you just launched it, you should see only a single "Default" experiment:


In [ ]:
client.list_experiments()

Now let's create a 'taxifare' experiment where we could look at all the various runs of our taxifare pipeline:


In [ ]:
exp = # TODO: create an experiment called 'taxifare'

Let's make sure the experiment has been created correctly:


In [ ]:
client.list_experiments()

Packaging your code into Kubeflow components

We have packaged our taxifare ml pipeline into three components:

  • ./components/bq2gcs that creates the training and evaluation data from BigQuery and exports it to GCS
  • ./components/trainjob that launches the training container on AI-platform and exports the model
  • ./components/deploymodel that deploys the trained model to AI-platform as a REST API

Each of these components has been wrapped into a Docker container, in the same way we did with the taxifare training code in the previous lab.

If you inspect the code in these folders, you'll notice that the main.py or main.sh files contain the code we previously executed in the notebooks (loading the data to GCS from BQ, or launching a training job to AI-platform, etc.). The last line in the Dockerfile tells you that these files are executed when the container is run. So we just packaged our ml code into light container images for reproducibility.

We have made it simple for you to build the container images and push them to the Google Cloud image registry gcr.io in your project:


In [ ]:
# Builds the taxifare trainer container in case you skipped the optional part of lab 1
!taxifare/scripts/build.sh

In [ ]:
# Pushes the taxifare trainer container to gcr/io
!taxifare/scripts/push.sh

In [ ]:
# Builds the KF component containers and push them to gcr/io
!cd pipelines && make components

Now that the container images are pushed to the registry in your project, we need to create yaml files describing to Kubeflow how to use these containers. It boils down essentially to

  • describing what arguments Kubeflow needs to pass to the containers when it runs them
  • telling Kubeflow where to fetch the corresponding Docker images

In the cells below, we have three of these "Kubeflow component description files", one for each of our components.

TODO 3

IMPORTANT: Modify the image URI in the cell below to reflect thatyou pushed the images into the gcr.io associated with your project.


In [ ]:
%%writefile bq2gcs.yaml

name: bq2gcs
    
description: |
    This component creates the training and
    validation datasets as BiqQuery tables and export
    them into a Google Cloud Storage bucket at
    gs://<BUCKET>/taxifare/data.
        
inputs:
    - {name: Input Bucket , type: String, description: 'GCS directory path.'}

implementation:
    container:
        image: # TODO: Reference the image URI for taxifare-bq2gcs you just created
        args: ["--bucket", {inputValue: Input Bucket}]

In [ ]:
%%writefile trainjob.yaml

name: trainjob
    
description: |
    This component trains a model to predict that taxi fare in NY.
    It takes as argument a GCS bucket and expects its training and
    eval data to be at gs://<BUCKET>/taxifare/data/ and will export
    the trained model at  gs://<BUCKET>/taxifare/model/.
        
inputs:
    - {name: Input Bucket , type: String, description: 'GCS directory path.'}

implementation:
    container:
        image: # TODO: Reference the image URI for taxifare-trainjob you just created
        args: [{inputValue: Input Bucket}]

In [ ]:
%%writefile deploymodel.yaml

name: deploymodel
    
description: |
    This component deploys a trained taxifare model on GCP as taxifare:dnn.
    It takes as argument a GCS bucket and expects the model to deploy 
    to be found at gs://<BUCKET>/taxifare/model/export/savedmodel/
        
inputs:
    - {name: Input Bucket , type: String, description: 'GCS directory path.'}

implementation:
    container:
        image: # TODO: Reference the image URI for taxifare-deployment you just created
        args: [{inputValue: Input Bucket}]

Create a Kubeflow pipeline

The code below creates a kubeflow pipeline by decorating a regular function with the @dsl.pipeline decorator. Now the arguments of this decorated function will be the input parameters of the Kubeflow pipeline.

Inside the function, we describe the pipeline by

  • loading the yaml component files we created above into a Kubeflow op
  • specifying the order into which the Kubeflow ops should be run

In [ ]:
# TODO 3
PIPELINE_TAR = 'taxifare.tar.gz'
BQ2GCS_YAML = './bq2gcs.yaml'
TRAINJOB_YAML = './trainjob.yaml'
DEPLOYMODEL_YAML = './deploymodel.yaml'


@dsl.pipeline(
    name='Taxifare',
    description='Train a ml model to predict the taxi fare in NY')
def pipeline(gcs_bucket_name='<bucket where data and model will be exported>'):

    bq2gcs_op = comp.load_component_from_file(BQ2GCS_YAML)
    bq2gcs = bq2gcs_op(
        input_bucket=gcs_bucket_name,
    )

    trainjob_op = # TODO: Load the yaml file for training
    trainjob = # TODO: Add your code to run the training job
    )

    deploymodel_op = # TODO: Load the yaml file for deployment
    deploymodel = # TODO: Addd your code to run model deployment
    )

    # TODO: Add the code to run 'trainjob' after 'bq2gcs' in the pipeline
    # TODO: Add the code to run 'deploymodel' after 'trainjob' in the pipeline

The pipeline function above is then used by the Kubeflow compiler to create a Kubeflow pipeline artifact that can be either uploaded to the Kubeflow cluster from the UI, or programatically, as we will do below:


In [ ]:
# TODO: Compile the pipeline functon above

In [ ]:
ls $PIPELINE_TAR

If you untar and uzip this pipeline artifact, you'll see that the compiler has transformed the Python description of the pipeline into yaml description!

Now let's feed Kubeflow with our pipeline and run it using our client:


In [ ]:
# TODO 4
run = client.run_pipeline(
    experiment_id= # TODO: Add code for experiment id
    job_name= # TODO: Provide a jobname
    pipeline_package_path= # TODO: Add code for pipeline zip file
    params={
        'gcs_bucket_name': BUCKET,
    },
)

Have a look at the link to monitor the run.

Now all the runs are nicely organized under the experiment in the UI, and new runs can be either manually launched or scheduled through the UI in a completely repeatable and traceable way!