TFX Components Walk-through

Learning Objectives

  1. Develop a high level understanding of TFX pipeline components.
  2. Learn how to use a TFX Interactive Context for prototype development of TFX pipelines.
  3. Work with the Tensorflow Data Validation (TFDV) library to check and analyze input data.
  4. Utilize the Tensorflow Transform (TFT) library for scalable data preprocessing and feature transformations.
  5. Employ the Tensorflow Model Analysis (TFMA) library for model evaluation.

In this lab, you will work with the Covertype Data Set and use TFX to analyze, understand, and pre-process the dataset and train, analyze, validate, and deploy a multi-class classification model to predict the type of forest cover from cartographic features.

You will utilize TFX Interactive Context to work with the TFX components interactivelly in a Jupyter notebook environment. Working in an interactive notebook is useful when doing initial data exploration, experimenting with models, and designing ML pipelines. You should be aware that there are differences in the way interactive notebooks are orchestrated, and how they access metadata artifacts. In a production deployment of TFX on GCP, you will use an orchestrator such as Kubeflow Pipelines, or Cloud Composer. In an interactive mode, the notebook itself is the orchestrator, running each TFX component as you execute the notebook cells. In a production deployment, ML Metadata will be managed in a scalabe database like MySQL, and artifacts in apersistent store such as Google Cloud Storage. In an interactive mode, both properties and payloads are stored in a local file system of the Jupyter host.

Setup Note: Currently, TFMA visualizations do not render properly in JupyterLab. It is recommended to run this notebook in Jupyter Classic Notebook. To switch to Classic Notebook select Launch Classic Notebook from the Help menu.


In [ ]:
import absl
import os
import tempfile
import time

import tensorflow as tf
import tensorflow_data_validation as tfdv
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
import tfx

from pprint import pprint
from tensorflow_metadata.proto.v0 import schema_pb2, statistics_pb2, anomalies_pb2
from tensorflow_transform.tf_metadata import schema_utils
from tfx.components import CsvExampleGen
from tfx.components import BigQueryExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import InfraValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.common_nodes.importer_node import ImporterNode
from tfx.components.trainer import executor as trainer_executor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import evaluator_pb2
from tfx.proto import example_gen_pb2
from tfx.proto import infra_validator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.proto.evaluator_pb2 import SingleSlicingSpec
from tfx.utils.dsl_utils import external_input
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.types.standard_artifacts import InfraBlessing

Note: this lab was developed and tested with the following TF ecosystem package versions:

Tensorflow Version: 2.1.0
TFX Version: 0.21.4
TFDV Version: 0.21.5
TFMA Version: 0.21.6

If you encounter errors with the above imports (e.g. TFX component not found), check your package versions in the cell below.


In [ ]:
print("Tensorflow Version:", tf.__version__)
print("TFX Version:", tfx.__version__)
print("TFDV Version:", tfdv.__version__)
print("TFMA Version:", tfma.VERSION_STRING)

absl.logging.set_verbosity(absl.logging.INFO)

If the versions above do not match, update your packages in the current Jupyter kernel below. The default %pip package installation location is not on your system installation PATH; use the command below to append the local installation path to pick up the latest package versions. Note that you may also need to restart your notebook kernel to pick up the specified package versions and re-run the imports cell above before proceeding with the lab.


In [ ]:
os.environ['PATH'] += os.pathsep + '/home/jupyter/.local/bin'

In [ ]:
%pip install --upgrade --user tensorflow==2.1.0
%pip install --upgrade --user tfx==0.21.4
%pip install --upgrade --user tensorflow_data_validation==0.21.5
%pip install --upgrade --user tensorflow_model_analysis==0.21.6

Configure lab settings

Set constants, location paths and other environment settings.


In [ ]:
ARTIFACT_STORE = os.path.join(os.sep, 'home', 'jupyter', 'artifact-store')
SERVING_MODEL_DIR=os.path.join(os.sep, 'home', 'jupyter', 'serving_model')
DATA_ROOT = 'gs://workshop-datasets/covertype/small'

Creating Interactive Context

TFX Interactive Context allows you to create and run TFX Components in an interactive mode. It is designed to support experimentation and development in a Jupyter Notebook environment. It is an experimental feature and major changes to interface and functionality are expected. When creating the interactive context you can specifiy the following parameters:

  • pipeline_name - Optional name of the pipeline for ML Metadata tracking purposes. If not specified, a name will be generated for you.
  • pipeline_root - Optional path to the root of the pipeline's outputs. If not specified, an ephemeral temporary directory will be created and used.
  • metadata_connection_config - Optional metadata_store_pb2.ConnectionConfig instance used to configure connection to a ML Metadata connection. If not specified, an ephemeral SQLite MLMD connection contained in the pipeline_root directory with file name "metadata.sqlite" will be used.

In [ ]:
PIPELINE_NAME = 'tfx-covertype-classifier'
PIPELINE_ROOT = os.path.join(ARTIFACT_STORE, PIPELINE_NAME, time.strftime("%Y%m%d_%H%M%S"))
os.makedirs(PIPELINE_ROOT, exist_ok=True)

context = InteractiveContext(
    pipeline_name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT,
    metadata_connection_config=None
)

Ingesting data using ExampleGen

In any ML development process the first step is to ingest the training and test datasets. The ExampleGen component ingests data into a TFX pipeline. It consumes external files/services to generate a set file files in the TFRecord format, which will be used by other TFX components. It can also shuffle the data and split into an arbitrary number of partitions.

Configure and run CsvExampleGen

In this exercise, you use the CsvExampleGen specialization of ExampleGen to ingest CSV files from a GCS location. The component is configured to split the input data into two splits - train and eval - using 4:1 ratio.


In [ ]:
output_config = example_gen_pb2.Output(
    split_config=example_gen_pb2.SplitConfig(splits=[
        example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=4),
        example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
    ]))

example_gen = tfx.components.CsvExampleGen(
    instance_name='Data_Extraction_Spliting',
    input=external_input(DATA_ROOT),
    output_config=output_config
)

In [ ]:
context.run(example_gen)

Examine the ingested data


In [ ]:
examples_uri = example_gen.outputs['examples'].get()[0].uri
tfrecord_filenames = [os.path.join(examples_uri, 'train', name)
                      for name in os.listdir(os.path.join(examples_uri, 'train'))]
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
for tfrecord in dataset.take(2):
  example = tf.train.Example()
  example.ParseFromString(tfrecord.numpy())
  for name, feature in example.features.feature.items():
    if feature.HasField('bytes_list'):
        value = feature.bytes_list.value
    if feature.HasField('float_list'):
        value = feature.float_list.value
    if feature.HasField('int64_list'):
        value = feature.int64_list.value
    print('{}: {}'.format(name, value))
  print('******')

Generating statistics using StatisticsGen

The StatisticsGen component generates data statistics that can be used by other TFX components. StatisticsGen uses TensorFlow Data Validation. StatisticsGen generate statistics for each split in the ExampleGen component's output. In our case there two splits: train and eval.

Configure and run the StatisticsGen component


In [ ]:
statistics_gen = tfx.components.StatisticsGen(
    instance_name='Statistics_Generation',
    examples=example_gen.outputs['examples'])

In [ ]:
context.run(statistics_gen)

Visualize statistics

The generated statistics can be visualized using the tfdv.visualize_statistics() function from the TensorFlow Data Validation library or using a utility method of the InteractiveContext object. In fact, most of the artifacts generated by the TFX components can be visualized using InteractiveContext.


In [ ]:
context.show(statistics_gen.outputs['statistics'])

Infering data schema using SchemaGen

Some TFX components use a description input data called a schema. The schema is an instance of schema.proto. It can specify data types for feature values, whether a feature has to be present in all examples, allowed value ranges, and other properties. SchemaGen automatically generates the schema by inferring types, categories, and ranges from data statistics. The auto-generated schema is best-effort and only tries to infer basic properties of the data. It is expected that developers review and modify it as needed. SchemaGen uses TensorFlow Data Validation.

The SchemaGen component generates the schema using the statistics for the train split. The statistics for other splits are ignored.

Configure and run the SchemaGen components


In [ ]:
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False)

In [ ]:
context.run(schema_gen)

Visualize the inferred schema


In [ ]:
context.show(schema_gen.outputs['schema'])

Updating the auto-generated schema

In most cases the auto-generated schemas must be fine-tuned manually using insights from data exploration and/or domain knowledge about the data. For example, you know that in the covertype dataset there are seven types of forest cover (coded using 1-7 range) and that the value of the Slope feature should be in the 0-90 range. You can manually add these constraints to the auto-generated schema by setting the feature domain.

Load the auto-generated schema proto file


In [ ]:
schema_proto_path = '{}/{}'.format(schema_gen.outputs['schema'].get()[0].uri, 'schema.pbtxt')
schema = tfdv.load_schema_text(schema_proto_path)

Modify the schema

You can use the protocol buffer APIs to modify the schema.


In [ ]:
tfdv.set_domain(schema, 'Cover_Type', schema_pb2.IntDomain(name='Cover_Type', min=0, max=6, is_categorical=True))
tfdv.set_domain(schema, 'Slope',  schema_pb2.IntDomain(name='Slope', min=0, max=90))

tfdv.display_schema(schema=schema)

Save the updated schema


In [ ]:
schema_dir = os.path.join(ARTIFACT_STORE, 'schema')
tf.io.gfile.makedirs(schema_dir)
schema_file = os.path.join(schema_dir, 'schema.pbtxt')

tfdv.write_schema_text(schema, schema_file)

!cat {schema_file}

Importing the updated schema using ImporterNode

The ImporterNode component allows you to import an external artifact, including the schema file, so it can be used by other TFX components in your workflow.

Configure and run the ImporterNode component


In [ ]:
schema_importer = ImporterNode(
    instance_name='Schema_Importer',
    source_uri=schema_dir,
    artifact_type=tfx.types.standard_artifacts.Schema,
    reimport=False
)

In [ ]:
context.run(schema_importer)

Visualize the imported schema


In [ ]:
context.show(schema_importer.outputs['result'])

Validating data with ExampleValidator

The ExampleValidator component identifies anomalies in data. It identifies anomalies by comparing data statistics computed by the StatisticsGen component against a schema generated by SchemaGen or imported by ImporterNode.

ExampleValidator can detect different classes of anomalies. For example it can:

  • perform validity checks by comparing data statistics against a schema
  • detect training-serving skew by comparing training and serving data.
  • detect data drift by looking at a series of data.

The ExampleValidator component validates the data in the eval split only. Other splits are ignored.

Configure and run the ExampleValidator component


In [ ]:
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_importer.outputs['result'],
    instance_name="Data_Validation"
)

In [ ]:
context.run(example_validator)

Examine the output of ExampleValidator

The output artifact of the ExampleValidator is the anomalies.pbtxt file describing an anomalies_pb2.Anomalies protobuf.


In [ ]:
train_uri = example_validator.outputs['anomalies'].get()[0].uri
anomalies_filename = os.path.join(train_uri, "anomalies.pbtxt")
!cat $anomalies_filename

Visualize validation results

The file anomalies.pbtxt can be visualized using context.show.


In [ ]:
context.show(example_validator.outputs['output'])

In our case no anomalies were detected in the eval split.

For a detailed deep dive into data validation and schema generation refer to the lab-31-tfdv-structured-data lab.

Preprocessing data with Transform

The Transform component performs data transformation and feature engineering. The Transform component consumes tf.Examples emitted from the ExampleGen component and emits the transformed feature data and the SavedModel graph that was used to process the data. The emitted SavedModel can then be used by serving components to make sure that the same data pre-processing logic is applied at training and serving.

The Transform component requires more code than many other components because of the arbitrary complexity of the feature engineering that you may need for the data and/or model that you're working with. It requires code files to be available which define the processing needed.

Define the pre-processing module

To configure Trainsform, you need to encapsulate your pre-processing code in the Python preprocessing_fn function and save it to a python module that is then provided to the Transform component as an input. This module will be loaded by transform and the preprocessing_fn function will be called when the Transform component runs.

In most cases, your implementation of the preprocessing_fn makes extensive use of TensorFlow Transform for performing feature engineering on your dataset.


In [ ]:
TRANSFORM_MODULE = 'preprocessing.py'
!cat {TRANSFORM_MODULE}

Configure and run the Transform component.


In [ ]:
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_importer.outputs['result'],
    module_file=TRANSFORM_MODULE)

In [ ]:
context.run(transform)

Examine the Transform component's outputs

The Transform component has 2 outputs:

  • transform_graph - contains the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models).
  • transformed_examples - contains the preprocessed training and evaluation data.

Take a peek at the transform_graph artifact: it points to a directory containing 3 subdirectories:


In [ ]:
os.listdir(transform.outputs['transform_graph'].get()[0].uri)

And the transform.examples artifact


In [ ]:
os.listdir(transform.outputs['transformed_examples'].get()[0].uri)

In [ ]:
transform_uri = transform.outputs['transformed_examples'].get()[0].uri
tfrecord_filenames = [os.path.join(transform_uri,  'train', name)
                      for name in os.listdir(os.path.join(transform_uri, 'train'))]
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
for tfrecord in dataset.take(2):
  example = tf.train.Example()
  example.ParseFromString(tfrecord.numpy())
  for name, feature in example.features.feature.items():
    if feature.HasField('bytes_list'):
        value = feature.bytes_list.value
    if feature.HasField('float_list'):
        value = feature.float_list.value
    if feature.HasField('int64_list'):
        value = feature.int64_list.value
    print('{}: {}'.format(name, value))
  print('******')

Train with the Trainer component

The Trainer component trains a model using TensorFlow.

Trainer takes:

  • tf.Examples used for training and eval.
  • A user provided module file that defines the trainer logic.
  • A data schema created by SchemaGen or imported by ImporterNode.
  • A proto definition of train args and eval args.
  • An optional transform graph produced by upstream Transform component.
  • An optional base models used for scenarios such as warmstart.

Define the trainer module

To configure Trainer, you need to encapsulate your training code in a Python module that is then provided to the Trainer as an input.


In [ ]:
TRAINER_MODULE_FILE = 'model.py'
!cat {TRAINER_MODULE_FILE}

Create and run the Trainer component

As of the 0.21.2 release of TFX, the Trainer component only supports passing a single field - num_steps - through the train_args and eval_args arguments.


In [ ]:
trainer = Trainer(
    custom_executor_spec=executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
    module_file=TRAINER_MODULE_FILE,
    transformed_examples=transform.outputs["transformed_examples"],
    schema=schema_importer.outputs["result"],
    transform_graph=transform.outputs["transform_graph"],
    train_args=trainer_pb2.TrainArgs(num_steps=5000),
    eval_args=trainer_pb2.EvalArgs(num_steps=1000))

In [ ]:
context.run(trainer)

Analyzing training runs with TensorBoard

In this step you will analyze the training run with TensorBoard.dev. TensorBoard.dev is a managed service that enables you to easily host, track and share your ML experiments.

Retrieve the location of TensorBoard logs


In [ ]:
train_uri = trainer.outputs['model'].get()[0].uri
logs_path = os.path.join(train_uri, 'logs')
print(logs_path)

Upload the logs and start TensorBoard.dev

  1. Open a new JupyterLab terminal window

  2. From the terminal window, execute the following command

    tensorboard dev upload --logdir [YOUR_LOGDIR]

Where [YOUR_LOGDIR] is an URI retrieved by the previous cell.

You will be asked to authorize TensorBoard.dev using your Google account. If you don't have a Google account or you don't want to authorize TensorBoard.dev you can skip this exercise.

After the authorization process completes, follow the link provided to view your experiment.

Evaluating trained models with Evaluator

The Evaluator component analyzes model performance using the TensorFlow Model Analysis library. It runs inference requests on particular subsets of the test dataset, based on which slices are defined by the developer. Knowing which slices should be analyzed requires domain knowledge of what is important in this particular use case or domain.

The Evaluator can also optionally validate a newly trained model against a previous model. In this lab, you only train one model, so the Evaluator automatically will label the model as "blessed".

Configure and run the Evaluator component

Use the ResolverNode to pick the previous model to compare against. The model resolver is only required if performing model validation in addition to evaluation. In this case we validate against the latest blessed model. If no model has been blessed before (as in this case) the evaluator will make our candidate the first blessed model.


In [ ]:
model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))

context.run(model_resolver)

In [ ]:
model_resolver.outputs

Configure evaluation metrics and slices.


In [ ]:
accuracy_threshold = tfma.MetricThreshold(
                value_threshold=tfma.GenericValueThreshold(
                    lower_bound={'value': 0.5},
                    upper_bound={'value': 0.99}),
                change_threshold=tfma.GenericChangeThreshold(
                    absolute={'value': 0.0001},
                    direction=tfma.MetricDirection.HIGHER_IS_BETTER),
                )

metrics_specs = tfma.MetricsSpec(
                   metrics = [
                       tfma.MetricConfig(class_name='SparseCategoricalAccuracy',
                           threshold=accuracy_threshold),
                       tfma.MetricConfig(class_name='ExampleCount')])

eval_config = tfma.EvalConfig(
    model_specs=[
        tfma.ModelSpec(label_key='Cover_Type')
    ],
    metrics_specs=[metrics_specs],
    slicing_specs=[
        tfma.SlicingSpec(),
        tfma.SlicingSpec(feature_keys=['Wilderness_Area'])
    ]
)
eval_config

In [ ]:
model_analyzer = Evaluator(
    examples=example_gen.outputs.examples,
    model=trainer.outputs.model,
    baseline_model=model_resolver.outputs.model,
    eval_config=eval_config
)
context.run(model_analyzer, enable_cache=False)

Check the model performance validation status


In [ ]:
model_blessing_uri = model_analyzer.outputs.blessing.get()[0].uri
!ls -l {model_blessing_uri}

Visualize evaluation results

You can visualize the evaluation results using the tfma.view.render_slicing_metrics() function from TensorFlow Model Analysis library.

Setup Note: Currently, TFMA visualizations don't render in JupyterLab. Make sure that you run this notebook in Classic Notebook.


In [ ]:
evaluation_uri = model_analyzer.outputs['evaluation'].get()[0].uri
evaluation_uri
!ls {evaluation_uri}

In [ ]:
eval_result = tfma.load_eval_result(evaluation_uri)
eval_result

In [ ]:
tfma.view.render_slicing_metrics(eval_result)

In [ ]:
tfma.view.render_slicing_metrics(
    eval_result, slicing_column='Wilderness_Area')

InfraValidator

The InfraValidator component acts as an additional early warning layer by validating a candidate model in a sandbox version of its serving infrastructure to prevent an unservable model from being pushed to production. Compared to the Evaluator component above which validates a model's performance, the InfraValidator component is validating that a model is able to generate predictions from served examples in an environment configured to match production. The config below takes a model and examples, launches the model in a sand-boxed TensorflowServing model server from the latest image in a local docker engine, and optionally checks that the model binary can be loaded and queried before "blessing" it for production.


In [ ]:
infra_validator = InfraValidator(
    model=trainer.outputs['model'],
    examples=example_gen.outputs['examples'],
    serving_spec=infra_validator_pb2.ServingSpec(
        tensorflow_serving=infra_validator_pb2.TensorFlowServing(
            tags=['latest']),
      local_docker=infra_validator_pb2.LocalDockerConfig(),
  ),
    validation_spec=infra_validator_pb2.ValidationSpec(
        max_loading_time_seconds=60,
        num_tries=5,
    ),    
  request_spec=infra_validator_pb2.RequestSpec(
      tensorflow_serving=infra_validator_pb2.TensorFlowServingRequestSpec(),
          num_examples=5,
      )
)

In [ ]:
context.run(infra_validator, enable_cache=False)

Check the model infrastructure validation status


In [ ]:
infra_blessing_uri = infra_validator.outputs.blessing.get()[0].uri
!ls -l {infra_blessing_uri}

Deploying models with Pusher

The Pusher component checks whether a model has been "blessed", and if so, deploys it by pushing the model to a well known file destination.

Configure and run the Pusher component


In [ ]:
trainer.outputs['model']

In [ ]:
pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=model_analyzer.outputs['blessing'],
    infra_blessing=infra_validator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=SERVING_MODEL_DIR)))
context.run(pusher)

Examine the output of Pusher


In [ ]:
pusher.outputs

In [ ]:
# Set `PATH` to include a directory containing `saved_model_cli.
PATH=%env PATH
%env PATH=/opt/conda/envs/tfx/bin:{PATH}

In [ ]:
latest_pushed_model = os.path.join(SERVING_MODEL_DIR, max(os.listdir(SERVING_MODEL_DIR)))
!saved_model_cli show --dir {latest_pushed_model} --all

Next steps

This concludes your introductory walthrough through TFX pipeline components. In the lab, you used TFX to analyze, understand, and pre-process the dataset and train, analyze, validate, and deploy a multi-class classification model to predict the type of forest cover from cartographic features. You utilized a TFX Interactive Context for prototype development of a TFX pipeline directly in a Jupyter notebook. Next, you worked with the TFDV library to modify your dataset schema to add feature constraints to catch data anamolies that can negatively impact your model's performance. You utilized TFT library for feature proprocessing for consistent feature transformations for your model at training and serving time. Lastly, using the TFMA library, you added model performance constraints to ensure you only push more accurate models than previous runs to production.

The next labs in the series will guide through developing a TFX pipeline, deploying and running the pipeline on AI Platform Pipelines and automating the pipeline build and deployment processes with Cloud Build.

License

Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.</font>