Learning Objectives:
In this notebook, we will first setup a Kubeflow cluster on GCP. Then, we will create a Kubeflow experiment and a Kubflow pipeline from our taxifare machine learning code. At last, we will run the pipeline on the Kubeflow cluster, providing us with a reproducible and traceable way to execute machine learning code.
In [ ]:
!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst
In [ ]:
pip freeze | grep kfp || pip install kfp
In [ ]:
from os import path
import kfp
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.notebook
TODO 1
To deploy a Kubeflow cluster in your GCP project, use the AI Platform pipelines:
When the cluster is ready, go back to the AI Platform pipelines page and click on "SETTINGS" entry for your cluster. This will bring up a pop up with code snippets on how to access the cluster programmatically.
Copy the "host" entry and set the "HOST" variable below with that.
In [ ]:
HOST = # TODO: fill in the HOST information for the cluster
BUCKET = # TODO: fill in the GCS bucket
TODO 2
We will start by creating a Kubeflow client to pilot the Kubeflow cluster:
In [ ]:
client = # TODO: create a Kubeflow client
Let's look at the experiments that are running on this cluster. Since you just launched it, you should see only a single "Default" experiment:
In [ ]:
client.list_experiments()
Now let's create a 'taxifare' experiment where we could look at all the various runs of our taxifare pipeline:
In [ ]:
exp = # TODO: create an experiment called 'taxifare'
Let's make sure the experiment has been created correctly:
In [ ]:
client.list_experiments()
We have packaged our taxifare ml pipeline into three components:
./components/bq2gcs
that creates the training and evaluation data from BigQuery and exports it to GCS./components/trainjob
that launches the training container on AI-platform and exports the model./components/deploymodel
that deploys the trained model to AI-platform as a REST APIEach of these components has been wrapped into a Docker container, in the same way we did with the taxifare training code in the previous lab.
If you inspect the code in these folders, you'll notice that the main.py
or main.sh
files contain the code we previously executed in the notebooks (loading the data to GCS from BQ, or launching a training job to AI-platform, etc.). The last line in the Dockerfile
tells you that these files are executed when the container is run.
So we just packaged our ml code into light container images for reproducibility.
We have made it simple for you to build the container images and push them to the Google Cloud image registry gcr.io in your project:
In [ ]:
# Builds the taxifare trainer container in case you skipped the optional part of lab 1
!taxifare/scripts/build.sh
In [ ]:
# Pushes the taxifare trainer container to gcr/io
!taxifare/scripts/push.sh
In [ ]:
# Builds the KF component containers and push them to gcr/io
!cd pipelines && make components
Now that the container images are pushed to the registry in your project, we need to create yaml files describing to Kubeflow how to use these containers. It boils down essentially to
In the cells below, we have three of these "Kubeflow component description files", one for each of our components.
TODO 3
IMPORTANT: Modify the image URI in the cell below to reflect thatyou pushed the images into the gcr.io associated with your project.
In [ ]:
%%writefile bq2gcs.yaml
name: bq2gcs
description: |
This component creates the training and
validation datasets as BiqQuery tables and export
them into a Google Cloud Storage bucket at
gs://<BUCKET>/taxifare/data.
inputs:
- {name: Input Bucket , type: String, description: 'GCS directory path.'}
implementation:
container:
image: # TODO: Reference the image URI for taxifare-bq2gcs you just created
args: ["--bucket", {inputValue: Input Bucket}]
In [ ]:
%%writefile trainjob.yaml
name: trainjob
description: |
This component trains a model to predict that taxi fare in NY.
It takes as argument a GCS bucket and expects its training and
eval data to be at gs://<BUCKET>/taxifare/data/ and will export
the trained model at gs://<BUCKET>/taxifare/model/.
inputs:
- {name: Input Bucket , type: String, description: 'GCS directory path.'}
implementation:
container:
image: # TODO: Reference the image URI for taxifare-trainjob you just created
args: [{inputValue: Input Bucket}]
In [ ]:
%%writefile deploymodel.yaml
name: deploymodel
description: |
This component deploys a trained taxifare model on GCP as taxifare:dnn.
It takes as argument a GCS bucket and expects the model to deploy
to be found at gs://<BUCKET>/taxifare/model/export/savedmodel/
inputs:
- {name: Input Bucket , type: String, description: 'GCS directory path.'}
implementation:
container:
image: # TODO: Reference the image URI for taxifare-deployment you just created
args: [{inputValue: Input Bucket}]
The code below creates a kubeflow pipeline by decorating a regular function with the
@dsl.pipeline
decorator. Now the arguments of this decorated function will be
the input parameters of the Kubeflow pipeline.
Inside the function, we describe the pipeline by
op
In [ ]:
# TODO 3
PIPELINE_TAR = 'taxifare.tar.gz'
BQ2GCS_YAML = './bq2gcs.yaml'
TRAINJOB_YAML = './trainjob.yaml'
DEPLOYMODEL_YAML = './deploymodel.yaml'
@dsl.pipeline(
name='Taxifare',
description='Train a ml model to predict the taxi fare in NY')
def pipeline(gcs_bucket_name='<bucket where data and model will be exported>'):
bq2gcs_op = comp.load_component_from_file(BQ2GCS_YAML)
bq2gcs = bq2gcs_op(
input_bucket=gcs_bucket_name,
)
trainjob_op = # TODO: Load the yaml file for training
trainjob = # TODO: Add your code to run the training job
)
deploymodel_op = # TODO: Load the yaml file for deployment
deploymodel = # TODO: Addd your code to run model deployment
)
# TODO: Add the code to run 'trainjob' after 'bq2gcs' in the pipeline
# TODO: Add the code to run 'deploymodel' after 'trainjob' in the pipeline
The pipeline function above is then used by the Kubeflow compiler to create a Kubeflow pipeline artifact that can be either uploaded to the Kubeflow cluster from the UI, or programatically, as we will do below:
In [ ]:
# TODO: Compile the pipeline functon above
In [ ]:
ls $PIPELINE_TAR
If you untar and uzip this pipeline artifact, you'll see that the compiler has transformed the Python description of the pipeline into yaml description!
Now let's feed Kubeflow with our pipeline and run it using our client:
In [ ]:
# TODO 4
run = client.run_pipeline(
experiment_id= # TODO: Add code for experiment id
job_name= # TODO: Provide a jobname
pipeline_package_path= # TODO: Add code for pipeline zip file
params={
'gcs_bucket_name': BUCKET,
},
)
Have a look at the link to monitor the run.
Now all the runs are nicely organized under the experiment in the UI, and new runs can be either manually launched or scheduled through the UI in a completely repeatable and traceable way!