Data preparation by executing an Apache Beam job in Cloud Dataflow
GCP, Cloud Dataflow, Apache Beam, Python, Kubeflow
A Kubeflow Pipeline component that prepares data by submitting an Apache Beam job (authored in Python) to Cloud Dataflow for execution. The Python Beam code is run with Cloud Dataflow Runner.
Use this component to run a Python Beam code to submit a Cloud Dataflow job as a step of a Kubeflow pipeline.
Name | Description | Optional | Data type | Accepted values | Default | |
---|---|---|---|---|---|---|
python_file_path | The path to the Cloud Storage bucket or local directory containing the Python file to be run. | GCSPath | ||||
project_id | The ID of the Google Cloud Platform (GCP) project containing the Cloud Dataflow job. | GCPProjectID | ||||
staging_dir | The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information.This is done so that you can resume the job in case of failure. staging_dir is passed as the command line arguments (staging_location and temp_location ) of the Beam code. |
Yes | GCSPath | None | ||
requirements_file_path | The path to the Cloud Storage bucket or local directory containing the pip requirements file. | Yes | GCSPath | None | ||
args | The list of arguments to pass to the Python file. | No | List | A list of string arguments | None | |
wait_interval | The number of seconds to wait between calls to get the status of the job. | Yes | Integer | 30 |
Before you use the component, the following files must be ready in a Cloud Storage bucket:
requirements.txt
file which includes a list of dependent packages.The Beam Python code should follow the Beam programming guide as well as the following additional requirements to be compatible with this component:
--project
, --temp_location
, --staging_location
, which are standard Dataflow Runner options.info logging
before the start of a Cloud Dataflow job in the Python code. This is important to allow the component to track the status and ID of the job that is created. For example, calling logging.getLogger().setLevel(logging.INFO)
before any other code.Name | Description |
---|---|
job_id | The id of the Cloud Dataflow job that is created. |
To use the components, the following requirements must be met:
roles/dataflow.developer
role of the project.roles/storage.objectViewer
role of the Cloud Storage Objects python_file_path
and requirements_file_path
.roles/storage.objectCreator
role of the Cloud Storage Object staging_dir
. The component does several things during the execution:
python_file_path
and requirements_file_path
to local files.staging_dir
so the job can be resumed in case of failure.
In [ ]:
%%capture --no-stderr
KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'
!pip3 install $KFP_PACKAGE --upgrade
In [ ]:
import kfp.components as comp
dataflow_python_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/dataflow/launch_python/component.yaml')
help(dataflow_python_op)
Note: The following sample code works in an IPython notebook or directly in Python code. See the sample code below to learn how to execute the template. In this sample, we run a wordcount sample code in a Kubeflow Pipeline. The output will be stored in a Cloud Storage bucket. Here is the sample code:
In [2]:
!gsutil cat gs://ml-pipeline-playground/samples/dataflow/wc/wc.py
In [ ]:
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'
GCS_STAGING_DIR = 'gs://<Please put your GCS path here>' # No ending slash
In [ ]:
# Optional Parameters
EXPERIMENT_NAME = 'Dataflow - Launch Python'
OUTPUT_FILE = '{}/wc/wordcount.out'.format(GCS_STAGING_DIR)
In [ ]:
import kfp.dsl as dsl
import json
@dsl.pipeline(
name='Dataflow launch python pipeline',
description='Dataflow launch python pipeline'
)
def pipeline(
python_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/wc.py',
project_id = PROJECT_ID,
staging_dir = GCS_STAGING_DIR,
requirements_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/requirements.txt',
args = json.dumps([
'--output', OUTPUT_FILE
]),
wait_interval = 30
):
dataflow_python_op(
python_file_path = python_file_path,
project_id = project_id,
staging_dir = staging_dir,
requirements_file_path = requirements_file_path,
args = args,
wait_interval = wait_interval)
In [ ]:
pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
In [ ]:
#Specify pipeline argument values
arguments = {}
#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
In [ ]:
!gsutil cat $OUTPUT_FILE
By deploying or using this software you agree to comply with the AI Hub Terms of Service and the Google APIs Terms of Service. To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.