Kubeflow pipelines

Learning Objectives:

  1. Learn how to deploy a Kubeflow cluster on GCP
  2. Learn how to use the notebook server on Kubeflow
  3. Learn how to create a experiment in Kubeflow
  4. Learn how to package you code into a Kubeflow pipeline
  5. Learn how to run a Kubeflow pipeline in a repeatable and traceable way

Introduction

In this notebook, we will first setup a Kubeflow cluster on GCP, and then launch a Kubeflow Notebook Server from where we will run this notebook. This will allow us to pilote the Kubeflow cluster from the notebook. Then, we will create a Kubeflow experiment and a Kubflow pipeline from our taxifare machine learning code. At last, we will run the pipeline on the Kubeflow cluster, providing us with a reproducible and traceable way to execute machine learning code.


In [ ]:
from os import path

import kfp
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.notebook

Setup a Kubeflow cluster on GCP

TODO 1

To deploy a Kubeflow cluster in your GCP project, use the Kubeflow cluster deployer.

There is a setup video that will take you over all the steps in details, and explains how to access to the Kubeflow Dashboard UI, once it is running.

You'll need to create an OAuth client for authentication purposes: Follow the instructions here.

Launch a Jupyter notebook server on the Kubeflow cluster

TODO 2

A Kubeflow cluster allows you not only to run Kubeflow pipelines, but it also allows you to launch a Jupyter notebook server from which you can pilote the Kubeflow cluster. In particular, you can create experiments, define and run pipelines from whithin notebooks running on that Jupiter notebook server. This is exactly what we are going to do in this notebook.

First of all, click on the "Notebook Sever" tab in the Kubeflow Dashboard UI, and create a Notebook Server. Once it's ready connect to it.

Since the goal is to run this notebook on that Kubeflow Notebook Server, first create new notebook and clone the training-data-analysis repo by running the following command in a cell and then naviguating to this notebook:

$ git clone -b ml_on_gcp-kubeflow_pipelines --single-branch https://github.com/GoogleCloudPlatform/training-data-analyst.git

Create an experiment

TODO 3

From now on, you should be running this notebook from the Notebook Server from the Kubeflow cluster you created on your GCP project.

We will start by creating a Kubeflow client to pilote the Kubeflow cluster:


In [ ]:
client = kfp.Client()

Let's look at the experiments that are running on this cluster. Since you just launched it, you should see only a single "Default" experiment:


In [ ]:
client.list_experiments()

Now let's create a 'taxifare' experiment where we could look at all the various runs of our taxifare pipeline:


In [ ]:
exp = client.create_experiment(name='taxifare')

Let's make sure the experiment has been created correctly:


In [ ]:
client.list_experiments()

Packaging you code into Kubeflow components

We have packaged our taxifare ml pipeline into three components:

  • ./components/bq2gcs that creates the training and evaluation data from BigQuery and exports it to GCS
  • ./components/trainjob that launches the training container on AI-platform and exports the model
  • ./components/deploymodel that deploys the trained model to AI-platform as a REST API

Each of these components has been wrapped into a Docker container, in the same way we did with the taxifare training code in the previous lab.

If you inspect the code in these folders, you'll notice that the main.py or main.sh files contain the code we previously executed in the notebooks (loading the data to GCS from BQ, or launching a training job to AI-platform, etc.). The last line in the Dockerfile tells you that these files are executed when the container is run. So we just packaged our ml code into light container images for reproducibility.

We have made it simple for you to build the container images and push them to the Google Cloud image registry gcr.io in your project: just type make in the pipelines directory! However, you can't do that from a Kubeflow notebook because Docker is not installed there. So you'll have to do that from Cloud Shell.

For that, open Cloud Shell, and clone this repo there. Then cd to the pipelines subfolder:

$ git clone -b ml_on_gcp-kubeflow_pipelines --single-branch https://github.com/GoogleCloudPlatform/training-data-analyst.git

$ cd training-data-analyst/courses/machine_learning/production_ml_systems/pipelines/

Then run make to build and push the images.

Now that the container images are pushed to the regsitry in your project, we need to create yaml files describing to Kubeflow how to use these containers. It boils down essentially

  • describing what arguments Kubeflow needs to pass to the containers when it runs them
  • telling Kubeflow where to fetch the corresponding Docker images

In the cells below, we have three of these "Kubeflow component description files", one for each of our components.

For each of these, correct the image URI to reflect that you pushed the images into the gcr.io associated with your project:

TODO 4


In [ ]:
%%writefile bq2gcs.yaml

name: bq2gcs
    
description: |
    This component creates the training and
    validation datasets as BiqQuery tables and export
    them into a Google Cloud Storage bucket at
    gs://<BUCKET>/taxifare/data.
        
inputs:
    - {name: Input Bucket , type: String, description: 'GCS directory path.'}

implementation:
    container:
        image: gcr.io/PROJECT/taxifare-bq2gcs
        args: ["--bucket", {inputValue: Input Bucket}]

In [ ]:
%%writefile trainjob.yaml

name: trainjob
    
description: |
    This component trains a model to predict that taxi fare in NY.
    It takes as argument a GCS bucket and expects its training and
    eval data to be at gs://<BUCKET>/taxifare/data/ and will export
    the trained model at  gs://<BUCKET>/taxifare/model/.
        
inputs:
    - {name: Input Bucket , type: String, description: 'GCS directory path.'}

implementation:
    container:
        image: gcr.io/PROJECT/taxifare-trainjob
        args: [{inputValue: Input Bucket}]

In [ ]:
%%writefile deploymodel.yaml

name: deploymodel
    
description: |
    This component deploys a trained taxifare model on GCP as taxifare:dnn.
    It takes as argument a GCS bucket and expects the model to deploy 
    to be found at gs://<BUCKET>/taxifare/model/export/savedmodel/
        
inputs:
    - {name: Input Bucket , type: String, description: 'GCS directory path.'}

implementation:
    container:
        image: gcr.io/PROJECT/taxifare-deploymodel
        args: [{inputValue: Input Bucket}]

Create a Kubeflow pipeline

The code below creates a kubeflow pipeline by decorating a regular fuction with the @dsl.pipeline decorator. Now the arguments of this decorated function will be the input parameters of the Kubeflow pipeline.

Inside the function, we describe the pipeline by

  • loading the yaml component files we created above into a Kubeflow op
  • specifying the order into which the Kubeflow ops should be run

In [ ]:
# TODO 4
PIPELINE_TAR = 'taxifare.tar.gz'
BQ2GCS_YAML = './bq2gcs.yaml'
TRAINJOB_YAML = './trainjob.yaml'
DEPLOYMODEL_YAML = './deploymodel.yaml'


@dsl.pipeline(
    name='Taxifare',
    description='Train a ml model to predict the taxi fare in NY')
def pipeline(gcs_bucket_name='<bucket where data and model will be exported>'):


    bq2gcs_op = comp.load_component_from_file(BQ2GCS_YAML)
    bq2gcs = bq2gcs_op(
        input_bucket=gcs_bucket_name,
    ).apply(gcp.use_gcp_secret('user-gcp-sa'))


    trainjob_op = comp.load_component_from_file(TRAINJOB_YAML)
    trainjob = trainjob_op(
        input_bucket=gcs_bucket_name,
    ).apply(gcp.use_gcp_secret('user-gcp-sa'))


    deploymodel_op = comp.load_component_from_file(DEPLOYMODEL_YAML)
    deploymodel = deploymodel_op(
        input_bucket=gcs_bucket_name,
    ).apply(gcp.use_gcp_secret('user-gcp-sa'))


    trainjob.after(bq2gcs)
    deploymodel.after(trainjob)

The pipeline function above is then used by the Kubeflow compiler to create a Kubeflow pipeline artifact that can be either uploaded to the Kubeflow cluster from the UI, or programatically, as we will do below:


In [ ]:
compiler.Compiler().compile(pipeline, PIPELINE_TAR)

In [ ]:
ls $PIPELINE_TAR

If you untar and uzip this pipeline artifact, you'll see that the compiler has transformed the Python description of the pipeline into yaml description!

Now let's feed Kubeflow with our pipeline and run it using our client:


In [ ]:
# TODO 5
run = client.run_pipeline(
    experiment_id=exp.id, 
    job_name='taxifare', 
    pipeline_package_path='taxifare.tar.gz', 
    params={
        'gcs-bucket-name': "dherin-sandbox",
    },
)

Have a look at the link to monitor the run.

Now all the runs are nicely organized under the experiment in the UI, and new runs can be either manually launched or scheduled through the UI in a completely repeatable and traceable way!