Guided Project 1

Learning Objectives:

  • Learn how to generate a standard TFX template pipeline using tfx template
  • Learn how to modify and run a templated TFX pipeline

Note: This guided project is adapted from Create a TFX pipeline using templates).


In [ ]:
import os

Step 1. Environment setup

tfx and kfp tools setup


In [ ]:
%%bash

TFX_PKG="tfx==0.22.0"
KFP_PKG="kfp==0.5.1"

pip freeze | grep $TFX_PKG || pip install -Uq $TFX_PKG
pip freeze | grep $KFP_PKG || pip install -Uq $KFP_PKG

You may need to restart the kernel at this point.

skaffold tool setup


In [ ]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

In [ ]:
%%bash

LOCAL_BIN="/home/jupyter/.local/bin"
SKAFFOLD_URI="https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64"

test -d $LOCAL_BIN || mkdir -p $LOCAL_BIN

which skaffold || (
    curl -Lo skaffold $SKAFFOLD_URI &&
    chmod +x skaffold               &&
    mv skaffold $LOCAL_BIN
)

Modify the PATH environment variable so that skaffold is available:

At this point, you shoud see the skaffold tool with the command which:


In [ ]:
!which skaffold

Environment variable setup

In AI Platform Pipelines, TFX is running in a hosted Kubernetes environment using Kubeflow Pipelines.

Let's set some environment variables to use Kubeflow Pipelines.

First, get your GCP project ID.


In [ ]:
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]

%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}

We also need to access your KFP cluster. You can access it in your Google Cloud Console under "AI Platform > Pipeline" menu.

The "endpoint" of the KFP cluster can be found from the URL of the Pipelines dashboard, or you can get it from the URL of the Getting Started page where you launched this notebook.

Let's create an ENDPOINT environment variable and set it to the KFP cluster endpoint.

ENDPOINT should contain only the hostname part of the URL. For example, if the URL of the KFP dashboard is

https://1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com/#/start,

ENDPOINT value becomes 1e9deb537390ca22-dot-asia-east1.pipelines.googleusercontent.com.


In [ ]:
ENDPOINT = # Enter your ENDPOINT here.

Set the image name as tfx-pipeline under the current GCP project:


In [ ]:
# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE = 'gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'
CUSTOM_TFX_IMAGE

Step 2. Copy the predefined template to your project directory.

In this step, we will create a working pipeline project directory and files by copying additional files from a predefined template.

You may give your pipeline a different name by changing the PIPELINE_NAME below.

This will also become the name of the project directory where your files will be put.


In [ ]:
PIPELINE_NAME = "guided_project_1"
PROJECT_DIR = os.path.join(os.path.expanduser("."), PIPELINE_NAME)
PROJECT_DIR

TFX includes the taxi template with the TFX python package.

If you are planning to solve a point-wise prediction problem, including classification and regresssion, this template could be used as a starting point.

The tfx template copy CLI command copies predefined template files into your project directory.


In [ ]:
!tfx template copy \
  --pipeline-name={PIPELINE_NAME} \
  --destination-path={PROJECT_DIR} \
  --model=taxi

In [ ]:
%cd {PROJECT_DIR}

Step 3. Browse your copied source files

The TFX template provides basic scaffold files to build a pipeline, including Python source code, sample data, and Jupyter Notebooks to analyse the output of the pipeline.

The taxi template uses the same Chicago Taxi dataset and ML model as the Airflow Tutorial.

Here is brief introduction to each of the Python files:

pipeline - This directory contains the definition of the pipeline

  • configs.py — defines common constants for pipeline runners
  • pipeline.py — defines TFX components and a pipeline

models - This directory contains ML model definitions.

  • features.py, features_test.py — defines features for the model
  • preprocessing.py, preprocessing_test.py — defines preprocessing jobs using tf::Transform

models/estimator - This directory contains an Estimator based model.

  • constants.py — defines constants of the model
  • model.py, model_test.py — defines DNN model using TF estimator

models/keras - This directory contains a Keras based model.

  • constants.py — defines constants of the model
  • model.py, model_test.py — defines DNN model using Keras

beam_dag_runner.py, kubeflow_dag_runner.py — define runners for each orchestration engine

Running the tests: You might notice that there are some files with _test.py in their name. These are unit tests of the pipeline and it is recommended to add more unit tests as you implement your own pipelines. You can run unit tests by supplying the module name of test files with -m flag. You can usually get a module name by deleting .py extension and replacing / with ..

For example:


In [ ]:
!python -m models.features_test
!python -m models.keras.model_test

Let's quickly go over the structure of a test file to test Tensorflow code:


In [10]:
!tail -26 models/features_test.py


from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf

from models import features


class FeaturesTest(tf.test.TestCase):

  def testNumberOfBucketFeatureBucketCount(self):
    self.assertEqual(
        len(features.BUCKET_FEATURE_KEYS),
        len(features.BUCKET_FEATURE_BUCKET_COUNT))
    self.assertEqual(
        len(features.CATEGORICAL_FEATURE_KEYS),
        len(features.CATEGORICAL_FEATURE_MAX_VALUES))

  def testTransformedNames(self):
    names = ["f1", "cf"]
    self.assertEqual(["f1_xf", "cf_xf"], features.transformed_names(names))


if __name__ == "__main__":
  tf.test.main()

First of all, notice that you start by importing the code you want to test by importing the corresponding module. Here we want to test the code in features.py so we import the module features:

from models import features

To implement test cases start by defining your own test class inheriting from tf.test.TestCase:

class FeaturesTest(tf.test.TestCase):

Wen you execute the test file with

python -m models.features_test

the main method

tf.test.main()

will parse your test class (here: FeaturesTest) and execute every method whose name starts by test. Here we have two such methods for instance:

def testNumberOfBucketFeatureBucketCount(self):
def testTransformedNames(self):

So when you want to add a test case, just add a method to that test class whose name starts by test. Now inside the body of these test methods is where the actual testing takes place. In this case for instance, testTransformedNames test the function features.transformed_name and makes sure it outputs what is expected. Since your test class inherits from tf.test.TestCase it has a number of helper methods you can use to help you create tests, as for instance

self.assertEqual(expected_outputs, obtained_outputs)

that will fail the test case if obtained_outputs do the match the expected_outputs.

Typical examples of test case you may want to implement for machine learning code would comprise test insurring that your model builds correctly, your preprocessing function preprocesses raw data as expected, or that your model can train successfully on a few mock examples. When writing tests make sure that their execution is fast (we just want to check that the code works not actually train a performant model when testing). For that you may have to create synthetic data in your test files. For more information, read the tf.test.TestCase documentation and the Tensorflow testing best practices.

Step 4. Run your first TFX pipeline

Components in the TFX pipeline will generate outputs for each run as ML Metadata Artifacts, and they need to be stored somewhere. You can use any storage which the KFP cluster can access, and for this example we will use Google Cloud Storage (GCS).

Let us create this bucket. Its name will be <YOUR_PROJECT>-kubeflowpipelines-default.


In [ ]:
GCS_BUCKET_NAME = GOOGLE_CLOUD_PROJECT + '-kubeflowpipelines-default'
GCS_BUCKET_NAME

In [ ]:
!gsutil mb gs://{GCS_BUCKET_NAME}

Let's upload our sample data to GCS bucket so that we can use it in our pipeline later.


In [ ]:
!gsutil cp data/data.csv gs://{GCS_BUCKET_NAME}/tfx-template/data/data.csv

Let's create a TFX pipeline using the tfx pipeline create command.

Note: When creating a pipeline for KFP, we need a container image which will be used to run our pipeline. And skaffold will build the image for us. Because skaffold pulls base images from the docker hub, it will take 5~10 minutes when we build the image for the first time, but it will take much less time from the second build.


In [ ]:
!tfx pipeline create  \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE}

While creating a pipeline, Dockerfile and build.yaml will be generated to build a Docker image.

Don't forget to add these files to the source control system (for example, git) along with other source files.

A pipeline definition file for argo will be generated, too. The name of this file is ${PIPELINE_NAME}.tar.gz. For example, it will be tfx_templated_pipeline.tar.gz if the name of your pipeline is my_pipeline. It is recommended NOT to include this pipeline definition file into source control, because it will be generated from other Python files and will be updated whenever you update the pipeline. For your convenience, this file is already listed in .gitignore which is generated automatically.

Now start an execution run with the newly created pipeline using the tfx run create command.

Note: You may see the following error Error importing tfx_bsl_extension.coders. Please ignore it.

Debugging tip: If your pipeline run fails, you can see detailed logs for each TFX component in the Experiments tab in the KFP Dashboard. One of the major sources of failure is permission related problems. Please make sure your KFP cluster has permissions to access Google Cloud APIs. This can be configured when you create a KFP cluster in GCP, or see Troubleshooting document in GCP.


In [ ]:
!tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}

Or, you can also run the pipeline in the KFP Dashboard. The new execution run will be listed under Experiments in the KFP Dashboard. Clicking into the experiment will allow you to monitor progress and visualize the artifacts created during the execution run.

However, we recommend visiting the KFP Dashboard. You can access the KFP Dashboard from the Cloud AI Platform Pipelines menu in Google Cloud Console. Once you visit the dashboard, you will be able to find the pipeline, and access a wealth of information about the pipeline. For example, you can find your runs under the Experiments menu, and when you open your execution run under Experiments you can find all your artifacts from the pipeline under Artifacts menu.

Step 5. Add components for data validation.

In this step, you will add components for data validation including StatisticsGen, SchemaGen, and ExampleValidator. If you are interested in data validation, please see Get started with Tensorflow Data Validation.

Double-click to change directory to pipeline and double-click again to open pipeline.py. Find and uncomment the 3 lines which add StatisticsGen, SchemaGen, and ExampleValidator to the pipeline. (Tip: search for comments containing TODO(step 5):). Make sure to save pipeline.py after you edit it.

You now need to update the existing pipeline with modified pipeline definition. Use the tfx pipeline update command to update your pipeline, followed by the tfx run create command to create a new execution run of your updated pipeline.


In [ ]:
# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT}

In [ ]:
# You can run the pipeline the same way.
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}

Check pipeline outputs

Visit the KFP dashboard to find pipeline outputs in the page for your pipeline run. Click the Experiments tab on the left, and All runs in the Experiments page. You should be able to find the latest run under the name of your pipeline.

See link below to access the dashboard:


In [ ]:
print('https://' + ENDPOINT)

Step 6. Add components for training

In this step, you will add components for training and model validation including Transform, Trainer, ResolverNode, Evaluator, and Pusher.

Double-click to open pipeline.py. Find and uncomment the 5 lines which add Transform, Trainer, ResolverNode, Evaluator and Pusher to the pipeline. (Tip: search for TODO(step 6):)

As you did before, you now need to update the existing pipeline with the modified pipeline definition. The instructions are the same as Step 5. Update the pipeline using tfx pipeline update, and create an execution run using tfx run create.

Verify that the pipeline DAG has changed accordingly in the Kubeflow UI:


In [ ]:
!tfx pipeline update \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT}

In [ ]:
print("https://" + ENDPOINT)

In [ ]:
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}

When this execution run finishes successfully, you have now created and run your first TFX pipeline in AI Platform Pipelines!

Step 7. Try BigQueryExampleGen

BigQuery is a serverless, highly scalable, and cost-effective cloud data warehouse. BigQuery can be used as a source for training examples in TFX. In this step, we will add BigQueryExampleGen to the pipeline.

Double-click to open pipeline.py. Comment out CsvExampleGen and uncomment the line which creates an instance of BigQueryExampleGen. You also need to uncomment the query argument of the create_pipeline function.

We need to specify which GCP project to use for BigQuery, and this is done by setting --project in beam_pipeline_args when creating a pipeline.

Double-click to open configs.py. Uncomment the definition of GOOGLE_CLOUD_REGION, BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS and BIG_QUERY_QUERY. You should replace the region value in this file with the correct values for your GCP project.

Note: You MUST set your GCP region in the configs.py file before proceeding

Change directory one level up. Click the name of the directory above the file list. The name of the directory is the name of the pipeline which is my_pipeline if you didn't change.

Double-click to open kubeflow_dag_runner.py. Uncomment two arguments, query and beam_pipeline_args, for the create_pipeline function.

Now the pipeline is ready to use BigQuery as an example source. Update the pipeline as before and create a new execution run as we did in step 5 and 6.


In [ ]:
!tfx pipeline update \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT}

In [ ]:
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}

Step 8. Try Dataflow with KFP

Several TFX Components uses Apache Beam to implement data-parallel pipelines, and it means that you can distribute data processing workloads using Google Cloud Dataflow. In this step, we will set the Kubeflow orchestrator to use dataflow as the data processing back-end for Apache Beam.

Double-click pipeline to change directory, and double-click to open configs.py. Uncomment the definition of GOOGLE_CLOUD_REGION, and DATAFLOW_BEAM_PIPELINE_ARGS.

Double-click to open pipeline.py. Change the value of enable_cache to False.

Change directory one level up. Click the name of the directory above the file list. The name of the directory is the name of the pipeline which is tfx_templated_pipeline if you didn't change.

Double-click to open kubeflow_dag_runner.py. Uncomment beam_pipeline_args. (Also make sure to comment out current beam_pipeline_args that you added in Step 7.)

Note that we deliberately disabled caching. Because we have already run the pipeline successfully, we will get cached execution result for all components if cache is enabled.

Now the pipeline is ready to use Dataflow. Update the pipeline and create an execution run as we did in step 5 and 6.


In [ ]:
!tfx pipeline update \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT}

In [ ]:
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}

You can find your Dataflow jobs in Dataflow in Cloud Console.

Please reset enable_cache to True to benefit from caching execution results.

Double-click to open pipeline.py. Reset the value of enable_cache to True.

Step 9. Try Cloud AI Platform Training and Prediction with KFP

TFX interoperates with several managed GCP services, such as Cloud AI Platform for Training and Prediction. You can set your Trainer component to use Cloud AI Platform Training, a managed service for training ML models. Moreover, when your model is built and ready to be served, you can push your model to Cloud AI Platform Prediction for serving. In this step, we will set our Trainer and Pusher component to use Cloud AI Platform services.

Before editing files, you might first have to enable AI Platform Training & Prediction API.

Double-click pipeline to change directory, and double-click to open configs.py. Uncomment the definition of GOOGLE_CLOUD_REGION, GCP_AI_PLATFORM_TRAINING_ARGS and GCP_AI_PLATFORM_SERVING_ARGS. We will use our custom built container image to train a model in Cloud AI Platform Training, so we should set masterConfig.imageUri in GCP_AI_PLATFORM_TRAINING_ARGS to the same value as CUSTOM_TFX_IMAGE above.

Change directory one level up, and double-click to open kubeflow_dag_runner.py. Uncomment ai_platform_training_args and ai_platform_serving_args.

Update the pipeline and create an execution run as we did in step 5 and 6.


In [ ]:
!tfx pipeline update \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT}

In [ ]:
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}

You can find your training jobs in Cloud AI Platform Jobs. If your pipeline completed successfully, you can find your model in Cloud AI Platform Models.

License

Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.</font>