TFX Components

This notebook shows how to create pipeline that uses TFX components:

  • CsvExampleGen
  • StatisticsGen
  • SchemaGen
  • ExampleValidator
  • Transform
  • Trainer
  • Evaluator

In [ ]:
# Put your KFP cluster endpoint URL here if working from GCP notebooks (or local notebooks). ('https://xxxxx.notebooks.googleusercontent.com/')
kfp_endpoint='https://XXXXX.notebooks.googleusercontent.com/'

# Replace with your GCS bucket, project ID and GCP region
root_output_uri = '<your gcs bucket>'
project_id = '<your project id>'
gcp_region = '<your gcp region>'

beam_pipeline_args = [
    '--runner=DataflowRunner',
    '--experiments=shuffle_mode=auto',
    '--project=' + project_id,
    '--temp_location=' + root_output_uri + '/tmp',
    '--region=' + gcp_region,
    '--disk_size_gb=50',
]

In [ ]:
input_data_uri = 'gs://ml-pipeline-playground/tensorflow-tfx-repo/tfx/components/testdata/external/csv'

#Only S3/GCS is supported for now.
module_file = 'gs://ml-pipeline-playground/tensorflow-tfx-repo/v0.21.4/tfx/examples/chicago_taxi_pipeline/taxi_utils.py'

In [ ]:
import kfp

In [ ]:
import json
from kfp.components import load_component_from_url

CsvExampleGen_op    = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/0cc4bbd4/components/tfx/ExampleGen/CsvExampleGen/with_URI_IO/component.yaml')
StatisticsGen_op    = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/0cc4bbd4/components/tfx/StatisticsGen/with_URI_IO/component.yaml')
SchemaGen_op        = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/0cc4bbd4/components/tfx/SchemaGen/with_URI_IO/component.yaml')
ExampleValidator_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/0cc4bbd4/components/tfx/ExampleValidator/with_URI_IO/component.yaml')
Transform_op        = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/0cc4bbd4/components/tfx/Transform/with_URI_IO/component.yaml')
Trainer_op          = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/0cc4bbd4/components/tfx/Trainer/with_URI_IO/component.yaml')
Evaluator_op        = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/0cc4bbd4/components/tfx/Evaluator/with_URI_IO/component.yaml')

def tfx_pipeline(
    input_data_uri,
    root_output_uri,
):
    generated_output_uri = root_output_uri + kfp.dsl.EXECUTION_ID_PLACEHOLDER

    examples_task = CsvExampleGen_op(
        input_uri=input_data_uri,
        input_config=json.dumps({
            "splits": [
                {'name': 'data', 'pattern': '*.csv'},
            ]
        }),
        output_config=json.dumps({
            "splitConfig": {
                "splits": [
                    {'name': 'train', 'hash_buckets': 2},
                    {'name': 'eval', 'hash_buckets': 1},
                ]
            }
        }),
        beam_pipeline_args=beam_pipeline_args,

        output_examples_uri=generated_output_uri,
    )
    
    statistics_task = StatisticsGen_op(
        examples_uri=examples_task.outputs['examples_uri'],
        beam_pipeline_args=beam_pipeline_args,

        output_statistics_uri=generated_output_uri,
    )
    
    schema_task = SchemaGen_op(
        statistics_uri=statistics_task.outputs['statistics_uri'],
        beam_pipeline_args=beam_pipeline_args,

        output_schema_uri=generated_output_uri,
    )

    # Performs anomaly detection based on statistics and data schema.
    validator_task = ExampleValidator_op(
        statistics_uri=statistics_task.outputs['statistics_uri'],
        schema_uri=schema_task.outputs['schema_uri'],
        beam_pipeline_args=beam_pipeline_args,

        output_anomalies_uri=generated_output_uri,
    )

    # Performs transformations and feature engineering in training and serving.
    transform_task = Transform_op(
        examples_uri=examples_task.outputs['examples_uri'],
        schema_uri=schema_task.outputs['schema_uri'],
        module_file=module_file,
        beam_pipeline_args=beam_pipeline_args,

        output_transform_graph_uri=generated_output_uri + '/transform_graph',
        output_transformed_examples_uri=generated_output_uri + '/transformed_examples',
    )

    trainer_task = Trainer_op(
        module_file=module_file,
        examples_uri=transform_task.outputs['transformed_examples_uri'],
        schema_uri=schema_task.outputs['schema_uri'],
        transform_graph_uri=transform_task.outputs['transform_graph_uri'],
        train_args=json.dumps({'num_steps': 10000}),
        eval_args=json.dumps({'num_steps': 5000}),
        beam_pipeline_args=beam_pipeline_args,

        output_model_uri=generated_output_uri,
    )

    # Uses TFMA to compute a evaluation statistics over features of a model.
    model_analyzer = Evaluator_op(
        examples_uri=examples_task.outputs['examples_uri'],
        model_uri=trainer_task.outputs['model_uri'],
        feature_slicing_spec=json.dumps({
            'specs': [
                {'column_for_slicing': ['trip_start_hour']},
            ],
        }),
        beam_pipeline_args=beam_pipeline_args,

        output_evaluation_uri=generated_output_uri + '/evaluation',
        output_blessing_uri=generated_output_uri + '/blessing',
    )


kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(
    tfx_pipeline,
    arguments=dict(
        input_data_uri=input_data_uri,
        root_output_uri=root_output_uri,
    ),
)