In [ ]:
# Put your KFP cluster endpoint URL here if working from GCP notebooks (or local notebooks). ('https://xxxxx.notebooks.googleusercontent.com/')
kfp_endpoint='https://XXXXX.notebooks.googleusercontent.com/'
In [ ]:
input_data_uri = 'gs://ml-pipeline-playground/tensorflow-tfx-repo/tfx/components/testdata/external/csv'
#Only S3/GCS is supported for now.
module_file = 'gs://ml-pipeline-playground/tensorflow-tfx-repo/v0.21.4/tfx/examples/chicago_taxi_pipeline/taxi_utils.py'
In [ ]:
import kfp
In [ ]:
import json
from kfp.components import load_component_from_url
download_from_gcs_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/d013b8535666641ca5a5be6ce67e69e044bbf076/components/google-cloud/storage/download/component.yaml')
CsvExampleGen_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/ExampleGen/CsvExampleGen/component.yaml')
StatisticsGen_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/StatisticsGen/component.yaml')
SchemaGen_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/SchemaGen/component.yaml')
ExampleValidator_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/ExampleValidator/component.yaml')
Transform_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/Transform/component.yaml')
Trainer_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/Trainer/component.yaml')
Evaluator_op = load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/8c545b62/components/tfx/Evaluator/component.yaml')
def tfx_pipeline(
input_data_uri,
):
download_task = download_from_gcs_op(
input_data_uri,
)
examples_task = CsvExampleGen_op(
input=download_task.output,
input_config=json.dumps({
"splits": [
{'name': 'data', 'pattern': '*.csv'},
]
}),
output_config=json.dumps({
"splitConfig": {
"splits": [
{'name': 'train', 'hash_buckets': 2},
{'name': 'eval', 'hash_buckets': 1},
]
}
}),
)
statistics_task = StatisticsGen_op(
examples=examples_task.outputs['examples'],
)
schema_task = SchemaGen_op(
statistics=statistics_task.outputs['statistics'],
)
# Performs anomaly detection based on statistics and data schema.
validator_task = ExampleValidator_op(
statistics=statistics_task.outputs['statistics'],
schema=schema_task.outputs['schema'],
)
# Performs transformations and feature engineering in training and serving.
transform_task = Transform_op(
examples=examples_task.outputs['examples'],
schema=schema_task.outputs['schema'],
module_file=module_file,
)
trainer_task = Trainer_op(
module_file=module_file,
examples=transform_task.outputs['transformed_examples'],
schema=schema_task.outputs['schema'],
transform_graph=transform_task.outputs['transform_graph'],
train_args=json.dumps({'num_steps': 10000}),
eval_args=json.dumps({'num_steps': 5000}),
)
# Uses TFMA to compute a evaluation statistics over features of a model.
model_analyzer = Evaluator_op(
examples=examples_task.outputs['examples'],
model=trainer_task.outputs['model'],
feature_slicing_spec=json.dumps({
'specs': [
{'column_for_slicing': ['trip_start_hour']},
],
}),
)
kfp.Client(host=kfp_endpoint).create_run_from_pipeline_func(
tfx_pipeline,
arguments=dict(
input_data_uri=input_data_uri,
),
)