Data preparation by using a template to submit a job to Cloud Dataflow
GCP, Cloud Dataflow, Kubeflow, Pipeline
A Kubeflow Pipeline component to prepare data by using a template to submit a job to Cloud Dataflow.
Use this component when you have a pre-built Cloud Dataflow template and want to launch it as a step in a Kubeflow Pipeline.
Argument | Description | Optional | Data type | Accepted values | Default | |
---|---|---|---|---|---|---|
project_id | The ID of the Google Cloud Platform (GCP) project to which the job belongs. | No | GCPProjectID | |||
gcs_path | The path to a Cloud Storage bucket containing the job creation template. It must be a valid Cloud Storage URL beginning with 'gs://'. | No | GCSPath | |||
launch_parameters | The parameters that are required to launch the template. The schema is defined in LaunchTemplateParameters. The parameter jobName is replaced by a generated name. |
Yes | Dict | A JSON object which has the same structure as LaunchTemplateParameters | None | |
location | The regional endpoint to which the job request is directed. | Yes | GCPRegion | None | ||
staging_dir | The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information. This is done so that you can resume the job in case of failure. | Yes | GCSPath | None | ||
validate_only | If True, the request is validated but not executed. | Yes | Boolean | False | ||
wait_interval | The number of seconds to wait between calls to get the status of the job. | Yes | Integer | 30 |
The input gcs_path
must contain a valid Cloud Dataflow template. The template can be created by following the instructions in Creating Templates. You can also use Google-provided templates.
Name | Description |
---|---|
job_id | The id of the Cloud Dataflow job that is created. |
To use the component, the following requirements must be met:
roles/dataflow.developer
role of the project.roles/storage.objectViewer
role of the Cloud Storage Object gcs_path.
roles/storage.objectCreator
role of the Cloud Storage Object staging_dir.
You can execute the template locally by following the instructions in Executing Templates. See the sample code below to learn how to execute the template. Follow these steps to use the component in a pipeline:
In [ ]:
%%capture --no-stderr
KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'
!pip3 install $KFP_PACKAGE --upgrade
In [ ]:
import kfp.components as comp
dataflow_template_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/dataflow/launch_template/component.yaml')
help(dataflow_template_op)
Note: The following sample code works in an IPython notebook or directly in Python code.
In this sample, we run a Google-provided word count template from gs://dataflow-templates/latest/Word_Count
. The template takes a text file as input and outputs word counts to a Cloud Storage bucket. Here is the sample input:
In [ ]:
!gsutil cat gs://dataflow-samples/shakespeare/kinglear.txt
In [ ]:
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'
GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash
In [ ]:
# Optional Parameters
EXPERIMENT_NAME = 'Dataflow - Launch Template'
OUTPUT_PATH = '{}/out/wc'.format(GCS_WORKING_DIR)
In [ ]:
import kfp.dsl as dsl
import json
@dsl.pipeline(
name='Dataflow launch template pipeline',
description='Dataflow launch template pipeline'
)
def pipeline(
project_id = PROJECT_ID,
gcs_path = 'gs://dataflow-templates/latest/Word_Count',
launch_parameters = json.dumps({
'parameters': {
'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
'output': OUTPUT_PATH
}
}),
location = '',
validate_only = 'False',
staging_dir = GCS_WORKING_DIR,
wait_interval = 30):
dataflow_template_op(
project_id = project_id,
gcs_path = gcs_path,
launch_parameters = launch_parameters,
location = location,
validate_only = validate_only,
staging_dir = staging_dir,
wait_interval = wait_interval)
In [ ]:
pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
In [ ]:
#Specify pipeline argument values
arguments = {}
#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
In [ ]:
!gsutil cat $OUTPUT_PATH*
By deploying or using this software you agree to comply with the AI Hub Terms of Service and the Google APIs Terms of Service. To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.