Tensorflow Extended (TFX) is a Google-production-scale machine learning platform based on TensorFlow. It provides a configuration framework to express ML pipelines consisting of TFX components, which brings the user large-scale ML task orchestration, artifact lineage, as well as the power of various TFX libraries. Kubeflow Pipelines can be used as the orchestrator supporting the execution of a TFX pipeline.
This sample demonstrates how to author a ML pipeline in TFX and run it on a KFP deployment.
This pipeline requires Google Cloud Storage permission to run.
If KFP was deployed through K8S marketplace, please make sure "Allow access to the following Cloud APIs" is checked when creating the cluster. storage.admin role.
In [ ]:
!python3 -m pip install pip --upgrade --quiet --user
!python3 -m pip install kfp --upgrade --quiet --user
!python3 -m pip install tfx==0.21.2 --quiet --user
Note: if you're warned by
WARNING: The script {LIBRARY_NAME} is installed in '/home/jupyter/.local/bin' which is not on PATH.
You might need to fix by running the next cell and restart the kernel.
In [ ]:
# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
In this example we'll need TFX SDK later than 0.21 to leverage the RuntimeParameter feature.
Currently, TFX DSL only supports parameterizing field in the PARAMETERS section of ComponentSpec, see here. This prevents runtime-parameterizing the pipeline topology. Also, if the declared type of the field is a protobuf, the user needs to pass in a dictionary with exactly the same names for each field, and specify one or more value as RuntimeParameter objects. In other word, the dictionary should be able to be passed in to ParseDict() method and produce the correct pb message.
In [ ]:
import os
from typing import Text
import kfp
import tensorflow_model_analysis as tfma
from tfx.components import Evaluator
from tfx.components import CsvExampleGen
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.orchestration import data_types
from tfx.orchestration import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import pusher_pb2
from tfx.utils.dsl_utils import external_input
In [ ]:
# In TFX MLMD schema, pipeline name is used as the unique id of each pipeline.
# Assigning workflow ID as part of pipeline name allows the user to bypass
# some schema checks which are redundant for experimental pipelines.
pipeline_name = 'taxi_pipeline_with_parameters'
# Path of pipeline data root, should be a GCS path.
# Note that when running on KFP, the pipeline root is always a runtime parameter.
# The value specified here will be its default.
pipeline_root = os.path.join('gs://{{kfp-default-bucket}}', 'tfx_taxi_simple',
kfp.dsl.RUN_ID_PLACEHOLDER)
# Location of input data, should be a GCS path under which there is a csv file.
data_root_param = data_types.RuntimeParameter(
name='data-root',
default='gs://ml-pipeline-playground/tfx_taxi_simple/data',
ptype=Text,
)
# Path to the module file, GCS path.
# Module file is one of the recommended way to provide customized logic for component
# includeing Trainer and Transformer.
# See https://github.com/tensorflow/tfx/blob/93ea0b4eda5a6000a07a1e93d93a26441094b6f5/tfx/components/trainer/component.py#L38
taxi_module_file_param = data_types.RuntimeParameter(
name='module-file',
default='gs://ml-pipeline-playground/tfx_taxi_simple/modules/taxi_utils.py',
ptype=Text,
)
# Number of epochs in training.
train_steps = data_types.RuntimeParameter(
name='train-steps',
default=10,
ptype=int,
)
# Number of epochs in evaluation.
eval_steps = data_types.RuntimeParameter(
name='eval-steps',
default=5,
ptype=int,
)
Please refer to the official guide for the detailed explanation and purpose of each TFX component.
In [ ]:
# The input data location is parameterized by _data_root_param
examples = external_input(data_root_param)
example_gen = CsvExampleGen(input=examples)
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
infer_schema = SchemaGen(
statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False)
validate_stats = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=infer_schema.outputs['schema'])
# The module file used in Transform and Trainer component is paramterized by
# _taxi_module_file_param.
transform = Transform(
examples=example_gen.outputs['examples'],
schema=infer_schema.outputs['schema'],
module_file=taxi_module_file_param)
# The numbers of steps in train_args are specified as RuntimeParameter with
# name 'train-steps' and 'eval-steps', respectively.
trainer = Trainer(
module_file=taxi_module_file_param,
transformed_examples=transform.outputs['transformed_examples'],
schema=infer_schema.outputs['schema'],
transform_graph=transform.outputs['transform_graph'],
train_args={'num_steps': train_steps},
eval_args={'num_steps': eval_steps})
# Set the TFMA config for Model Evaluation and Validation.
eval_config = tfma.EvalConfig(
model_specs=[
# Using signature 'eval' implies the use of an EvalSavedModel. To use
# a serving model remove the signature to defaults to 'serving_default'
# and add a label_key.
tfma.ModelSpec(signature_name='eval')
],
metrics_specs=[
tfma.MetricsSpec(
# The metrics added here are in addition to those saved with the
# model (assuming either a keras model or EvalSavedModel is used).
# Any metrics added into the saved model (for example using
# model.compile(..., metrics=[...]), etc) will be computed
# automatically.
metrics=[
tfma.MetricConfig(class_name='ExampleCount')
],
# To add validation thresholds for metrics saved with the model,
# add them keyed by metric name to the thresholds map.
thresholds = {
'binary_accuracy': tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': 0.5}),
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={'value': -1e-10}))
}
)
],
slicing_specs=[
# An empty slice spec means the overall slice, i.e. the whole dataset.
tfma.SlicingSpec(),
# Data can be sliced along a feature column. In this case, data is
# sliced along feature column trip_start_hour.
tfma.SlicingSpec(feature_keys=['trip_start_hour'])
])
# The name of slicing column is specified as a RuntimeParameter.
evaluator = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
eval_config=eval_config)
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=evaluator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=os.path.join(
str(pipeline.ROOT_PARAMETER), 'model_serving'))))
In [ ]:
# Create the DSL pipeline object.
# This pipeline obj carries the business logic of the pipeline, but no runner-specific information
# was included.
dsl_pipeline = pipeline.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=[
example_gen, statistics_gen, infer_schema, validate_stats, transform,
trainer, model_analyzer, model_validator, pusher
],
enable_cache=True,
beam_pipeline_args=['--direct_num_workers=%d' % 0],
)
In [ ]:
# Specify a TFX docker image. For the full list of tags please see:
# https://hub.docker.com/r/tensorflow/tfx/tags
tfx_image = 'gcr.io/tfx-oss-public/tfx:0.21.2'
config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
kubeflow_metadata_config=kubeflow_dag_runner
.get_default_kubeflow_metadata_config(),
tfx_image=tfx_image)
kfp_runner = kubeflow_dag_runner.KubeflowDagRunner(config=config)
# KubeflowDagRunner compiles the DSL pipeline object into KFP pipeline package.
# By default it is named <pipeline_name>.tar.gz
kfp_runner.run(dsl_pipeline)
In [ ]:
run_result = kfp.Client(
host='1234567abcde-dot-us-central2.pipelines.googleusercontent.com' # Put your KFP endpoint here
).create_run_from_pipeline_package(
pipeline_name + '.tar.gz',
arguments={
# Uncomment following lines in order to use custom GCS bucket/module file/training data.
# 'pipeline-root': 'gs://<your-gcs-bucket>/tfx_taxi_simple/' + kfp.dsl.RUN_ID_PLACEHOLDER,
# 'module-file': '<gcs path to the module file>', # delete this line to use default module file.
# 'data-root': '<gcs path to the data>' # delete this line to use default data.
})