
Data preparation by using a template to submit a job to Cloud Dataflow


GCP, Cloud Dataflow, Kubeflow, Pipeline


A Kubeflow Pipeline component to prepare data by using a template to submit a job to Cloud Dataflow.


Intended use

Use this component when you have a pre-built Cloud Dataflow template and want to launch it as a step in a Kubeflow Pipeline.

Runtime arguments

Argument Description Optional Data type Accepted values Default
project_id The ID of the Google Cloud Platform (GCP) project to which the job belongs. No GCPProjectID
gcs_path The path to a Cloud Storage bucket containing the job creation template. It must be a valid Cloud Storage URL beginning with 'gs://'. No GCSPath
launch_parameters The parameters that are required to launch the template. The schema is defined in LaunchTemplateParameters. The parameter jobName is replaced by a generated name. Yes Dict A JSON object which has the same structure as LaunchTemplateParameters None
location The regional endpoint to which the job request is directed. Yes GCPRegion None
staging_dir The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information. This is done so that you can resume the job in case of failure. Yes GCSPath None
validate_only If True, the request is validated but not executed. Yes Boolean False
wait_interval The number of seconds to wait between calls to get the status of the job. Yes Integer 30

Input data schema

The input gcs_path must contain a valid Cloud Dataflow template. The template can be created by following the instructions in Creating Templates. You can also use Google-provided templates.


Name Description
job_id The id of the Cloud Dataflow job that is created.

Caution & requirements

To use the component, the following requirements must be met:

  • Cloud Dataflow API is enabled.
  • The component can authenticate to GCP. Refer to Authenticating Pipelines to GCP for details.
  • The Kubeflow user service account is a member of:
    • roles/dataflow.developer role of the project.
    • roles/storage.objectViewer role of the Cloud Storage Object gcs_path.
    • roles/storage.objectCreator role of the Cloud Storage Object staging_dir.

Detailed description

You can execute the template locally by following the instructions in Executing Templates. See the sample code below to learn how to execute the template. Follow these steps to use the component in a pipeline:

  1. Install the Kubeflow Pipeline SDK:

In [ ]:
%%capture --no-stderr

!pip3 install $KFP_PACKAGE --upgrade
  1. Load the component using KFP SDK

In [ ]:
import kfp.components as comp

dataflow_template_op = comp.load_component_from_url(


Note: The following sample code works in an IPython notebook or directly in Python code. In this sample, we run a Google-provided word count template from gs://dataflow-templates/latest/Word_Count. The template takes a text file as input and outputs word counts to a Cloud Storage bucket. Here is the sample input:

In [ ]:
!gsutil cat gs://dataflow-samples/shakespeare/kinglear.txt

Set sample parameters

In [ ]:
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'
GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash

In [ ]:
# Optional Parameters
EXPERIMENT_NAME = 'Dataflow - Launch Template'
OUTPUT_PATH = '{}/out/wc'.format(GCS_WORKING_DIR)

Example pipeline that uses the component

In [ ]:
import kfp.dsl as dsl
import json
    name='Dataflow launch template pipeline',
    description='Dataflow launch template pipeline'
def pipeline(
    project_id = PROJECT_ID, 
    gcs_path = 'gs://dataflow-templates/latest/Word_Count', 
    launch_parameters = json.dumps({
       'parameters': {
           'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',
           'output': OUTPUT_PATH
    location = '',
    validate_only = 'False', 
    staging_dir = GCS_WORKING_DIR,
    wait_interval = 30):
        project_id = project_id, 
        gcs_path = gcs_path, 
        launch_parameters = launch_parameters, 
        location = location, 
        validate_only = validate_only,
        staging_dir = staging_dir,
        wait_interval = wait_interval)

Compile the pipeline

In [ ]:
pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

Submit the pipeline for execution

In [ ]:
#Specify pipeline argument values
arguments = {}

#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(, run_name, pipeline_filename, arguments)

Inspect the output

In [ ]:
!gsutil cat $OUTPUT_PATH*



By deploying or using this software you agree to comply with the AI Hub Terms of Service and the Google APIs Terms of Service. To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.