Learning Objective:
In this guided project, we will use the tfx template
tool to create a TFX pipeline around the covertype dataset.
The goal is to adapt the template pipeline to make use of the model code for the covertype dataset we already developed in
the first part of this course.
In [ ]:
import os
Setup the your Kubeflow pipelines endopoint below the same way you did in guided project 1.
In [ ]:
ENDPOINT = # Enter your Kubeflow ENDPOINT here.
In [ ]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
In [ ]:
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
In [ ]:
# Docker image name for the pipeline image.
CUSTOM_TFX_IMAGE = 'gcr.io/' + GOOGLE_CLOUD_PROJECT + '/tfx-pipeline'
CUSTOM_TFX_IMAGE
In [ ]:
%%bash
TFX_PKG="tfx==0.22.0"
KFP_PKG="kfp==0.5.1"
pip freeze | grep $TFX_PKG || pip install -Uq $TFX_PKG
pip freeze | grep $KFP_PKG || pip install -Uq $KFP_PKG
You may need to restart the kernel at this point.
In [ ]:
%%bash
LOCAL_BIN="/home/jupyter/.local/bin"
SKAFFOLD_URI="https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64"
test -d $LOCAL_BIN || mkdir -p $LOCAL_BIN
which skaffold || (
curl -Lo skaffold $SKAFFOLD_URI &&
chmod +x skaffold &&
mv skaffold $LOCAL_BIN
)
Modify the PATH
environment variable so that skaffold
is available:
At this point, you shoud see the skaffold
tool with the command which
:
In [ ]:
!which skaffold
In this step, we will create a working pipeline project directory and files by copying additional files from a predefined template.
You may give your pipeline a different name by changing the PIPELINE_NAME below.
This will also become the name of the project directory where your files will be put.
In [ ]:
PIPELINE_NAME = "guided_project_2"
PROJECT_DIR = os.path.join(os.path.expanduser("."), PIPELINE_NAME)
PROJECT_DIR
TFX includes the taxi template with the TFX python package.
If you are planning to solve a point-wise prediction problem, including classification and regresssion, this template could be used as a starting point.
The tfx template copy
CLI command copies predefined template files into your project directory.
In [ ]:
!tfx template copy \
--pipeline-name={PIPELINE_NAME} \
--destination-path={PROJECT_DIR} \
--model=taxi
In [ ]:
%cd {PROJECT_DIR}
The TFX template provides basic scaffold files to build a pipeline, including Python source code, sample data, and Jupyter Notebooks to analyse the output of the pipeline.
The taxi
template uses the same Chicago Taxi dataset and ML model as
the Airflow Tutorial.
Here is brief introduction to each of the Python files:
pipeline
- This directory contains the definition of the pipeline
configs.py
— defines common constants for pipeline runnerspipeline.py
— defines TFX components and a pipelinemodels
- This directory contains ML model definitions.
features.py
, features_test.py
— defines features for the modelpreprocessing.py
, preprocessing_test.py
— defines preprocessing jobs using tf::Transformmodels/estimator
- This directory contains an Estimator based model.
constants.py
— defines constants of the modelmodel.py
, model_test.py
— defines DNN model using TF estimatormodels/keras
- This directory contains a Keras based model.
constants.py
— defines constants of the modelmodel.py
, model_test.py
— defines DNN model using Kerasbeam_dag_runner.py
, kubeflow_dag_runner.py
— define runners for each orchestration engine
Running the tests:
You might notice that there are some files with _test.py
in their name.
These are unit tests of the pipeline and it is recommended to add more unit
tests as you implement your own pipelines.
You can run unit tests by supplying the module name of test files with -m
flag.
You can usually get a module name by deleting .py
extension and replacing /
with ..
For example:
In [ ]:
!python -m models.features_test
!python -m models.keras.model_test
Note: You probably already have completed this step in guided project 1, so you may may skip it if this is the case.
Components in the TFX pipeline will generate outputs for each run as ML Metadata Artifacts, and they need to be stored somewhere. You can use any storage which the KFP cluster can access, and for this example we will use Google Cloud Storage (GCS).
Let us create this bucket if you haven't created it in guided project 1.
Its name will be <YOUR_PROJECT>-kubeflowpipelines-default
.
In [ ]:
GCS_BUCKET_NAME = GOOGLE_CLOUD_PROJECT + '-kubeflowpipelines-default'
GCS_BUCKET_NAME
In [ ]:
!gsutil ls gs://{GCS_BUCKET_NAME} | grep {GCS_BUCKET_NAME} || gsutil mb gs://{GCS_BUCKET_NAME}
Now we need to have the TFX pipeline dataset point to the GCS bucket where our covertype data is located. For that
open kubeflow_dag_runner.py
Set the variable DATA_PATH
to gs://workshop-datasets/covertype/small
which contains the covertype dataset
At this step we want to reuse the features.py
and preprocessing.py
we already have for the covertype dataset. To do that, we will just copy these files over the template ones:
In [ ]:
FEATURE_PY = '../../tfx-caip-tf21/lab-02-tfx-pipeline/pipeline/features.py'
PREPROC_PY = '../../tfx-caip-tf21/lab-02-tfx-pipeline/pipeline/preprocessing.py'
In [ ]:
!cp {FEATURE_PY} ./models/features.py
!cp {PREPROC_PY} ./models/preprocessing.py
Now when you run the tests in the two cells below they should fail because they were written for the template taxi dataset.
Exercise: Modify the tests features_test.py
and preprocessing_test.py
as well as possibly the original modules until the tests pass.
In [ ]:
!python -m models.features_test
In [ ]:
!python -m models.preprocessing_test
Similarly as for the pre-processing we want to reuse the model we develop for the covertype dataset, so we will simply copy it over the template model:
In [ ]:
MODEL_PY = '../../tfx-caip-tf21/lab-02-tfx-pipeline/pipeline/model.py'
In [ ]:
!cp {MODEL_PY} ./models/keras/model.py
Exercise: Now run the tests for the model. Again they should fail, and you'll have to modify
them as well as possibly the model module to have them pass. (Optional: Move the constants defined in model.py
into constants.py
and import them from model.py
to respect the template structure.)
In [ ]:
!python -m models.keras.model_test
Let's create a TFX pipeline using the tfx pipeline create
command.
Note: When creating a pipeline for KFP, we need a container image which will
be used to run our pipeline. And skaffold will build the image for us. Because skaffold
pulls base images from the docker hub, it will take 5~10 minutes when we build
the image for the first time, but it will take much less time from the second build.
In [ ]:
!tfx pipeline create \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE}
While creating a pipeline, Dockerfile
and build.yaml
will be generated to build a Docker image.
Don't forget to add these files to the source control system (for example, git) along with other source files.
A pipeline definition file for argo will be generated, too.
The name of this file is ${PIPELINE_NAME}.tar.gz.
For example, it will be tfx_templated_pipeline.tar.gz
if the name of your pipeline is my_pipeline.
It is recommended NOT to include this pipeline definition file into source control, because it will be generated from other Python files and will be updated whenever you update the pipeline. For your convenience, this file is already listed in .gitignore
which is generated automatically.
Now start an execution run with the newly created pipeline using the tfx run create
command.
Note: You may see the following error Error importing tfx_bsl_extension.coders.
Please ignore it.
In [ ]:
!tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}
Or, you can also run the pipeline in the KFP Dashboard. The new execution run will be listed under Experiments in the KFP Dashboard. Clicking into the experiment will allow you to monitor progress and visualize the artifacts created during the execution run.
However, we recommend visiting the KFP Dashboard. You can access the KFP Dashboard from the Cloud AI Platform Pipelines menu in Google Cloud Console. Once you visit the dashboard, you will be able to find the pipeline, and access a wealth of information about the pipeline. For example, you can find your runs under the Experiments menu, and when you open your execution run under Experiments you can find all your artifacts from the pipeline under Artifacts menu.
Note: If your pipeline run fails, you can see detailed logs for each TFX component in the Experiments tab in the KFP Dashboard.
One of the major sources of failure is permission related problems. Please make sure your KFP cluster has permissions to access Google Cloud APIs. This can be configured when you create a KFP cluster in GCP, or see Troubleshooting document in GCP.
In this step, you will add components for data validation including StatisticsGen
, SchemaGen
, and ExampleValidator
.
If you are interested in data validation, please see
Get started with Tensorflow Data Validation.
Double-click to change directory to pipeline and double-click again to open pipeline.py
.
Find and uncomment the 3 lines which add StatisticsGen
, SchemaGen
, and ExampleValidator
to the pipeline.
(Tip: search for comments containing TODO(step 5):). Make sure to save pipeline.py
after you edit it.
You now need to update the existing pipeline with modified pipeline definition. Use the tfx pipeline update
command to update your pipeline, followed by the tfx run create
command to create a new execution run of your updated pipeline.
In [ ]:
# Update the pipeline
!tfx pipeline update \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT}
In [ ]:
# You can run the pipeline the same way.
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
In [ ]:
print('https://' + ENDPOINT)
In this step, you will add components for training and model validation including Transform
, Trainer
, ResolverNode
, Evaluator
, and Pusher
.
Double-click to open pipeline.py
. Find and uncomment the 5 lines which add Transform
, Trainer
, ResolverNode
, Evaluator
and Pusher
to the pipeline. (Tip: search for TODO(step 6):)
Hints:
pipeline.py
make sure you turn the cache of by setting enable_cache=False
for debugging purposes (otherwise components that have been previously run won't be).pipeline.py
, you'll need to set infer_feature_shape=False
, otherwise you'll run into sparse/dense tensor mismatch.model.py
, you'll have to comment the following linetransformed_features.pop(features.transformed_name(features.LABEL_KEY))
configs.py
, adapt the values of the variables TRAIN_NUM_STEPS
and EVAL_NUM_STEPS
to match what we had in the original covertype pipeline.As you did before, you now need to update the existing pipeline with the modified pipeline definition. The instructions are the same as Step 5. Update the pipeline using tfx pipeline update
, and create an execution run using tfx run create
.
Verify that the pipeline DAG has changed accordingly in the Kubeflow UI:
In [ ]:
!tfx pipeline update \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT}
In [ ]:
print("https://" + ENDPOINT)
In [ ]:
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
When this execution run finishes successfully, you have now created and run your first TFX pipeline in AI Platform Pipelines!
TFX interoperates with several managed GCP services, such as Cloud AI Platform for Training and Prediction. You can set your Trainer
component to use Cloud AI Platform Training, a managed service for training ML models. Moreover, when your model is built and ready to be served, you can push your model to Cloud AI Platform Prediction for serving. In this step, we will set our Trainer
and Pusher
component to use Cloud AI Platform services.
Before editing files, you might first have to enable AI Platform Training & Prediction API.
Double-click pipeline to change directory, and double-click to open configs.py
. Uncomment the definition of GOOGLE_CLOUD_REGION
, GCP_AI_PLATFORM_TRAINING_ARGS
and GCP_AI_PLATFORM_SERVING_ARGS
. We will use our custom built container image to train a model in Cloud AI Platform Training, so we should set masterConfig.imageUri
in GCP_AI_PLATFORM_TRAINING_ARGS
to the same value as CUSTOM_TFX_IMAGE
above.
Change directory one level up, and double-click to open kubeflow_dag_runner.py
. Uncomment ai_platform_training_args
and ai_platform_serving_args
.
Update the pipeline and create an execution run as we did in step 5 and 6.
In [ ]:
!tfx pipeline update \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT}
In [ ]:
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}
You can find your training jobs in Cloud AI Platform Jobs. If your pipeline completed successfully, you can find your model in Cloud AI Platform Models.
Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.</font>