In this lab, you will work with the Covertype Data Set and use TFX to analyze, understand, and pre-process the dataset and train, analyze, validate, and deploy a multi-class classification model to predict the type of forest cover from cartographic features.
You will utilize TFX Interactive Context to work with the TFX components interactivelly in a Jupyter notebook environment. Working in an interactive notebook is useful when doing initial data exploration, experimenting with models, and designing ML pipelines. You should be aware that there are differences in the way interactive notebooks are orchestrated, and how they access metadata artifacts. In a production deployment of TFX on GCP, you will use an orchestrator such as Kubeflow Pipelines, or Cloud Composer. In an interactive mode, the notebook itself is the orchestrator, running each TFX component as you execute the notebook cells. In a production deployment, ML Metadata will be managed in a scalabe database like MySQL, and artifacts in apersistent store such as Google Cloud Storage. In an interactive mode, both properties and payloads are stored in a local file system of the Jupyter host.
Setup Note: Currently, TFMA visualizations do not render properly in JupyterLab. It is recommended to run this notebook in Jupyter Classic Notebook. To switch to Classic Notebook select Launch Classic Notebook from the Help menu.
In [ ]:
import absl
import os
import tempfile
import time
import tensorflow as tf
import tensorflow_data_validation as tfdv
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
import tfx
from pprint import pprint
from tensorflow_metadata.proto.v0 import schema_pb2, statistics_pb2, anomalies_pb2
from tensorflow_transform.tf_metadata import schema_utils
from tfx.components import CsvExampleGen
from tfx.components import BigQueryExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import InfraValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.common_nodes.importer_node import ImporterNode
from tfx.components.trainer import executor as trainer_executor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import evaluator_pb2
from tfx.proto import example_gen_pb2
from tfx.proto import infra_validator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.proto.evaluator_pb2 import SingleSlicingSpec
from tfx.utils.dsl_utils import external_input
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.types.standard_artifacts import InfraBlessing
Note: this lab was developed and tested with the following TF ecosystem package versions:
Tensorflow Version: 2.1.0
TFX Version: 0.21.4
TFDV Version: 0.21.5
TFMA Version: 0.21.6
If you encounter errors with the above imports (e.g. TFX component not found), check your package versions in the cell below.
In [ ]:
print("Tensorflow Version:", tf.__version__)
print("TFX Version:", tfx.__version__)
print("TFDV Version:", tfdv.__version__)
print("TFMA Version:", tfma.VERSION_STRING)
absl.logging.set_verbosity(absl.logging.INFO)
If the versions above do not match, update your packages in the current Jupyter kernel below. The default %pip
package installation location is not on your system installation PATH; use the command below to append the local installation path to pick up the latest package versions. Note that you may also need to restart your notebook kernel to pick up the specified package versions and re-run the imports cell above before proceeding with the lab.
In [ ]:
os.environ['PATH'] += os.pathsep + '/home/jupyter/.local/bin'
In [ ]:
%pip install --upgrade --user tensorflow==2.1.0
%pip install --upgrade --user tfx==0.21.4
%pip install --upgrade --user tensorflow_data_validation==0.21.5
%pip install --upgrade --user tensorflow_model_analysis==0.21.6
In [ ]:
ARTIFACT_STORE = os.path.join(os.sep, 'home', 'jupyter', 'artifact-store')
SERVING_MODEL_DIR=os.path.join(os.sep, 'home', 'jupyter', 'serving_model')
DATA_ROOT = 'gs://workshop-datasets/covertype/small'
TFX Interactive Context allows you to create and run TFX Components in an interactive mode. It is designed to support experimentation and development in a Jupyter Notebook environment. It is an experimental feature and major changes to interface and functionality are expected. When creating the interactive context you can specifiy the following parameters:
pipeline_name
- Optional name of the pipeline for ML Metadata tracking purposes. If not specified, a name will be generated for you.pipeline_root
- Optional path to the root of the pipeline's outputs. If not specified, an ephemeral temporary directory will be created and used.metadata_connection_config
- Optional metadata_store_pb2.ConnectionConfig
instance used to configure connection to a ML Metadata connection. If not specified, an ephemeral SQLite MLMD connection contained in the pipeline_root directory with file name "metadata.sqlite" will be used.
In [ ]:
PIPELINE_NAME = 'tfx-covertype-classifier'
PIPELINE_ROOT = os.path.join(ARTIFACT_STORE, PIPELINE_NAME, time.strftime("%Y%m%d_%H%M%S"))
os.makedirs(PIPELINE_ROOT, exist_ok=True)
context = InteractiveContext(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
metadata_connection_config=None
)
In any ML development process the first step is to ingest the training and test datasets. The ExampleGen
component ingests data into a TFX pipeline. It consumes external files/services to generate a set file files in the TFRecord
format, which will be used by other TFX components. It can also shuffle the data and split into an arbitrary number of partitions.
In this exercise, you use the CsvExampleGen
specialization of ExampleGen
to ingest CSV files from a GCS location. The component is configured to split the input data into two splits - train
and eval
- using 4:1 ratio.
In [ ]:
output_config = example_gen_pb2.Output(
split_config=example_gen_pb2.SplitConfig(splits=[
example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=4),
example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
]))
example_gen = tfx.components.CsvExampleGen(
instance_name='Data_Extraction_Spliting',
input=external_input(DATA_ROOT),
output_config=output_config
)
In [ ]:
context.run(example_gen)
In [ ]:
examples_uri = example_gen.outputs['examples'].get()[0].uri
tfrecord_filenames = [os.path.join(examples_uri, 'train', name)
for name in os.listdir(os.path.join(examples_uri, 'train'))]
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
for tfrecord in dataset.take(2):
example = tf.train.Example()
example.ParseFromString(tfrecord.numpy())
for name, feature in example.features.feature.items():
if feature.HasField('bytes_list'):
value = feature.bytes_list.value
if feature.HasField('float_list'):
value = feature.float_list.value
if feature.HasField('int64_list'):
value = feature.int64_list.value
print('{}: {}'.format(name, value))
print('******')
The StatisticsGen
component generates data statistics that can be used by other TFX components. StatisticsGen uses TensorFlow Data Validation. StatisticsGen
generate statistics for each split in the ExampleGen
component's output. In our case there two splits: train
and eval
.
In [ ]:
statistics_gen = tfx.components.StatisticsGen(
instance_name='Statistics_Generation',
examples=example_gen.outputs['examples'])
In [ ]:
context.run(statistics_gen)
The generated statistics can be visualized using the tfdv.visualize_statistics()
function from the TensorFlow Data Validation library or using a utility method of the InteractiveContext
object. In fact, most of the artifacts generated by the TFX components can be visualized using InteractiveContext
.
In [ ]:
context.show(statistics_gen.outputs['statistics'])
Some TFX components use a description input data called a schema. The schema is an instance of schema.proto
. It can specify data types for feature values, whether a feature has to be present in all examples, allowed value ranges, and other properties. SchemaGen
automatically generates the schema by inferring types, categories, and ranges from data statistics. The auto-generated schema is best-effort and only tries to infer basic properties of the data. It is expected that developers review and modify it as needed. SchemaGen
uses TensorFlow Data Validation.
The SchemaGen
component generates the schema using the statistics for the train
split. The statistics for other splits are ignored.
In [ ]:
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False)
In [ ]:
context.run(schema_gen)
In [ ]:
context.show(schema_gen.outputs['schema'])
In most cases the auto-generated schemas must be fine-tuned manually using insights from data exploration and/or domain knowledge about the data. For example, you know that in the covertype
dataset there are seven types of forest cover (coded using 1-7 range) and that the value of the Slope
feature should be in the 0-90 range. You can manually add these constraints to the auto-generated schema by setting the feature domain.
In [ ]:
schema_proto_path = '{}/{}'.format(schema_gen.outputs['schema'].get()[0].uri, 'schema.pbtxt')
schema = tfdv.load_schema_text(schema_proto_path)
In [ ]:
tfdv.set_domain(schema, 'Cover_Type', schema_pb2.IntDomain(name='Cover_Type', min=0, max=6, is_categorical=True))
tfdv.set_domain(schema, 'Slope', schema_pb2.IntDomain(name='Slope', min=0, max=90))
tfdv.display_schema(schema=schema)
In [ ]:
schema_dir = os.path.join(ARTIFACT_STORE, 'schema')
tf.io.gfile.makedirs(schema_dir)
schema_file = os.path.join(schema_dir, 'schema.pbtxt')
tfdv.write_schema_text(schema, schema_file)
!cat {schema_file}
In [ ]:
schema_importer = ImporterNode(
instance_name='Schema_Importer',
source_uri=schema_dir,
artifact_type=tfx.types.standard_artifacts.Schema,
reimport=False
)
In [ ]:
context.run(schema_importer)
In [ ]:
context.show(schema_importer.outputs['result'])
The ExampleValidator
component identifies anomalies in data. It identifies anomalies by comparing data statistics computed by the StatisticsGen
component against a schema generated by SchemaGen
or imported by ImporterNode
.
ExampleValidator
can detect different classes of anomalies. For example it can:
The ExampleValidator
component validates the data in the eval
split only. Other splits are ignored.
In [ ]:
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_importer.outputs['result'],
instance_name="Data_Validation"
)
In [ ]:
context.run(example_validator)
In [ ]:
train_uri = example_validator.outputs['anomalies'].get()[0].uri
anomalies_filename = os.path.join(train_uri, "anomalies.pbtxt")
!cat $anomalies_filename
In [ ]:
context.show(example_validator.outputs['output'])
In our case no anomalies were detected in the eval
split.
For a detailed deep dive into data validation and schema generation refer to the lab-31-tfdv-structured-data
lab.
The Transform
component performs data transformation and feature engineering. The Transform
component consumes tf.Examples
emitted from the ExampleGen
component and emits the transformed feature data and the SavedModel
graph that was used to process the data. The emitted SavedModel
can then be used by serving components to make sure that the same data pre-processing logic is applied at training and serving.
The Transform
component requires more code than many other components because of the arbitrary complexity of the feature engineering that you may need for the data and/or model that you're working with. It requires code files to be available which define the processing needed.
To configure Trainsform
, you need to encapsulate your pre-processing code in the Python preprocessing_fn
function and save it to a python module that is then provided to the Transform component as an input. This module will be loaded by transform and the preprocessing_fn
function will be called when the Transform
component runs.
In most cases, your implementation of the preprocessing_fn
makes extensive use of TensorFlow Transform for performing feature engineering on your dataset.
In [ ]:
TRANSFORM_MODULE = 'preprocessing.py'
!cat {TRANSFORM_MODULE}
In [ ]:
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_importer.outputs['result'],
module_file=TRANSFORM_MODULE)
In [ ]:
context.run(transform)
Transform
component's outputsThe Transform component has 2 outputs:
transform_graph
- contains the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models).transformed_examples
- contains the preprocessed training and evaluation data.Take a peek at the transform_graph
artifact: it points to a directory containing 3 subdirectories:
In [ ]:
os.listdir(transform.outputs['transform_graph'].get()[0].uri)
And the transform.examples
artifact
In [ ]:
os.listdir(transform.outputs['transformed_examples'].get()[0].uri)
In [ ]:
transform_uri = transform.outputs['transformed_examples'].get()[0].uri
tfrecord_filenames = [os.path.join(transform_uri, 'train', name)
for name in os.listdir(os.path.join(transform_uri, 'train'))]
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
for tfrecord in dataset.take(2):
example = tf.train.Example()
example.ParseFromString(tfrecord.numpy())
for name, feature in example.features.feature.items():
if feature.HasField('bytes_list'):
value = feature.bytes_list.value
if feature.HasField('float_list'):
value = feature.float_list.value
if feature.HasField('int64_list'):
value = feature.int64_list.value
print('{}: {}'.format(name, value))
print('******')
Trainer
componentThe Trainer
component trains a model using TensorFlow.
Trainer
takes:
SchemaGen
or imported by ImporterNode
.To configure Trainer
, you need to encapsulate your training code in a Python module that is then provided to the Trainer
as an input.
In [ ]:
TRAINER_MODULE_FILE = 'model.py'
!cat {TRAINER_MODULE_FILE}
In [ ]:
trainer = Trainer(
custom_executor_spec=executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
module_file=TRAINER_MODULE_FILE,
transformed_examples=transform.outputs["transformed_examples"],
schema=schema_importer.outputs["result"],
transform_graph=transform.outputs["transform_graph"],
train_args=trainer_pb2.TrainArgs(num_steps=5000),
eval_args=trainer_pb2.EvalArgs(num_steps=1000))
In [ ]:
context.run(trainer)
In this step you will analyze the training run with TensorBoard.dev. TensorBoard.dev
is a managed service that enables you to easily host, track and share your ML experiments.
In [ ]:
train_uri = trainer.outputs['model'].get()[0].uri
logs_path = os.path.join(train_uri, 'logs')
print(logs_path)
Open a new JupyterLab terminal window
From the terminal window, execute the following command
tensorboard dev upload --logdir [YOUR_LOGDIR]
Where [YOUR_LOGDIR] is an URI retrieved by the previous cell.
You will be asked to authorize TensorBoard.dev
using your Google account. If you don't have a Google account or you don't want to authorize TensorBoard.dev
you can skip this exercise.
After the authorization process completes, follow the link provided to view your experiment.
The Evaluator
component analyzes model performance using the TensorFlow Model Analysis library. It runs inference requests on particular subsets of the test dataset, based on which slices are defined by the developer. Knowing which slices should be analyzed requires domain knowledge of what is important in this particular use case or domain.
The Evaluator
can also optionally validate a newly trained model against a previous model. In this lab, you only train one model, so the Evaluator automatically will label the model as "blessed".
Use the ResolverNode
to pick the previous model to compare against. The model resolver is only required if performing model validation in addition to evaluation. In this case we validate against the latest blessed model. If no model has been blessed before (as in this case) the evaluator will make our candidate the first blessed model.
In [ ]:
model_resolver = ResolverNode(
instance_name='latest_blessed_model_resolver',
resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
model=Channel(type=Model),
model_blessing=Channel(type=ModelBlessing))
context.run(model_resolver)
In [ ]:
model_resolver.outputs
Configure evaluation metrics and slices.
In [ ]:
accuracy_threshold = tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={'value': 0.5},
upper_bound={'value': 0.99}),
change_threshold=tfma.GenericChangeThreshold(
absolute={'value': 0.0001},
direction=tfma.MetricDirection.HIGHER_IS_BETTER),
)
metrics_specs = tfma.MetricsSpec(
metrics = [
tfma.MetricConfig(class_name='SparseCategoricalAccuracy',
threshold=accuracy_threshold),
tfma.MetricConfig(class_name='ExampleCount')])
eval_config = tfma.EvalConfig(
model_specs=[
tfma.ModelSpec(label_key='Cover_Type')
],
metrics_specs=[metrics_specs],
slicing_specs=[
tfma.SlicingSpec(),
tfma.SlicingSpec(feature_keys=['Wilderness_Area'])
]
)
eval_config
In [ ]:
model_analyzer = Evaluator(
examples=example_gen.outputs.examples,
model=trainer.outputs.model,
baseline_model=model_resolver.outputs.model,
eval_config=eval_config
)
context.run(model_analyzer, enable_cache=False)
In [ ]:
model_blessing_uri = model_analyzer.outputs.blessing.get()[0].uri
!ls -l {model_blessing_uri}
In [ ]:
evaluation_uri = model_analyzer.outputs['evaluation'].get()[0].uri
evaluation_uri
!ls {evaluation_uri}
In [ ]:
eval_result = tfma.load_eval_result(evaluation_uri)
eval_result
In [ ]:
tfma.view.render_slicing_metrics(eval_result)
In [ ]:
tfma.view.render_slicing_metrics(
eval_result, slicing_column='Wilderness_Area')
The InfraValidator
component acts as an additional early warning layer by validating a candidate model in a sandbox version of its serving infrastructure to prevent an unservable model from being pushed to production. Compared to the Evaluator
component above which validates a model's performance, the InfraValidator
component is validating that a model is able to generate predictions from served examples in an environment configured to match production. The config below takes a model and examples, launches the model in a sand-boxed TensorflowServing model server from the latest image in a local docker engine, and optionally checks that the model binary can be loaded and queried before "blessing" it for production.
In [ ]:
infra_validator = InfraValidator(
model=trainer.outputs['model'],
examples=example_gen.outputs['examples'],
serving_spec=infra_validator_pb2.ServingSpec(
tensorflow_serving=infra_validator_pb2.TensorFlowServing(
tags=['latest']),
local_docker=infra_validator_pb2.LocalDockerConfig(),
),
validation_spec=infra_validator_pb2.ValidationSpec(
max_loading_time_seconds=60,
num_tries=5,
),
request_spec=infra_validator_pb2.RequestSpec(
tensorflow_serving=infra_validator_pb2.TensorFlowServingRequestSpec(),
num_examples=5,
)
)
In [ ]:
context.run(infra_validator, enable_cache=False)
In [ ]:
infra_blessing_uri = infra_validator.outputs.blessing.get()[0].uri
!ls -l {infra_blessing_uri}
In [ ]:
trainer.outputs['model']
In [ ]:
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=model_analyzer.outputs['blessing'],
infra_blessing=infra_validator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=SERVING_MODEL_DIR)))
context.run(pusher)
In [ ]:
pusher.outputs
In [ ]:
# Set `PATH` to include a directory containing `saved_model_cli.
PATH=%env PATH
%env PATH=/opt/conda/envs/tfx/bin:{PATH}
In [ ]:
latest_pushed_model = os.path.join(SERVING_MODEL_DIR, max(os.listdir(SERVING_MODEL_DIR)))
!saved_model_cli show --dir {latest_pushed_model} --all
This concludes your introductory walthrough through TFX pipeline components. In the lab, you used TFX to analyze, understand, and pre-process the dataset and train, analyze, validate, and deploy a multi-class classification model to predict the type of forest cover from cartographic features. You utilized a TFX Interactive Context for prototype development of a TFX pipeline directly in a Jupyter notebook. Next, you worked with the TFDV library to modify your dataset schema to add feature constraints to catch data anamolies that can negatively impact your model's performance. You utilized TFT library for feature proprocessing for consistent feature transformations for your model at training and serving time. Lastly, using the TFMA library, you added model performance constraints to ensure you only push more accurate models than previous runs to production.
The next labs in the series will guide through developing a TFX pipeline, deploying and running the pipeline on AI Platform Pipelines and automating the pipeline build and deployment processes with Cloud Build.
Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.</font>