A Kubeflow Pipeline component that prepares data by submitting an Apache Beam job (authored in Python) to Cloud Dataflow for execution. The Python Beam code is run with Cloud Dataflow Runner.
Use this component to run a Python Beam code to submit a Cloud Dataflow job as a step of a Kubeflow pipeline.
Name | Description | Optional | Data type | Accepted values | Default | |
---|---|---|---|---|---|---|
python_file_path | The path to the Cloud Storage bucket or local directory containing the Python file to be run. | GCSPath | ||||
project_id | The ID of the Google Cloud Platform (GCP) project containing the Cloud Dataflow job. | GCPProjectID | ||||
staging_dir | The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information.This is done so that you can resume the job in case of failure. staging_dir is passed as the command line arguments (staging_location and temp_location ) of the Beam code. |
Yes | GCSPath | None | ||
requirements_file_path | The path to the Cloud Storage bucket or local directory containing the pip requirements file. | Yes | GCSPath | None | ||
args | The list of arguments to pass to the Python file. | No | List | A list of string arguments | None | |
wait_interval | The number of seconds to wait between calls to get the status of the job. | Yes | Integer | 30 |
Before you use the component, the following files must be ready in a Cloud Storage bucket:
requirements.txt
file which includes a list of dependent packages.The Beam Python code should follow the Beam programming guide as well as the following additional requirements to be compatible with this component:
--project
, --temp_location
, --staging_location
, which are standard Dataflow Runner options.info logging
before the start of a Cloud Dataflow job in the Python code. This is important to allow the component to track the status and ID of the job that is created. For example, calling logging.getLogger().setLevel(logging.INFO)
before any other code.Name | Description |
---|---|
job_id | The id of the Cloud Dataflow job that is created. |
To use the components, the following requirements must be met:
component_op(...)
The Kubeflow user service account is a member of:roles/dataflow.developer
role of the project.roles/storage.objectViewer
role of the Cloud Storage Objects python_file_path
and requirements_file_path
.roles/storage.objectCreator
role of the Cloud Storage Object staging_dir
. The component does several things during the execution:
python_file_path
and requirements_file_path
to local files.staging_dir
so the job can be resumed in case of failure.
In [1]:
project = 'Input your PROJECT ID'
output = 'Input your GCS bucket name' # No ending slash
In [2]:
!python3 -m pip install 'kfp>=0.1.31' --quiet
In [3]:
import kfp.components as comp
dataflow_python_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/dataflow/launch_python/component.yaml')
help(dataflow_python_op)
In [4]:
!gsutil cat gs://ml-pipeline-playground/samples/dataflow/wc/wc.py
In [5]:
import kfp
import kfp.dsl as dsl
import json
output_file = '{}/wc/wordcount.out'.format(output)
@dsl.pipeline(
name='Dataflow launch python pipeline',
description='Dataflow launch python pipeline'
)
def pipeline(
python_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/wc.py',
project_id = project,
staging_dir = output,
requirements_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/requirements.txt',
args = json.dumps([
'--output', output_file
]),
wait_interval = 30
):
dataflow_python_op(
python_file_path = python_file_path,
project_id = project_id,
staging_dir = staging_dir,
requirements_file_path = requirements_file_path,
args = args,
wait_interval = wait_interval)
In [7]:
kfp.Client().create_run_from_pipeline_func(pipeline, arguments={})
In [ ]:
!gsutil cat $output_file