Name

Data processing by creating a cluster in Cloud Dataproc

Label

Cloud Dataproc, cluster, GCP, Cloud Storage, KubeFlow, Pipeline

Summary

A Kubeflow Pipeline component to create a cluster in Cloud Dataproc.

Details

Intended use

Use this component at the start of a Kubeflow Pipeline to create a temporary Cloud Dataproc cluster to run Cloud Dataproc jobs as steps in the pipeline.

Runtime arguments

Argument Description Optional Data type Accepted values Default
project_id The Google Cloud Platform (GCP) project ID that the cluster belongs to. No GCPProjectID
region The Cloud Dataproc region to create the cluster in. No GCPRegion
name The name of the cluster. Cluster names within a project must be unique. You can reuse the names of deleted clusters. Yes String None
name_prefix The prefix of the cluster name. Yes String None
initialization_actions A list of Cloud Storage URIs identifying executables to execute on each node after the configuration is completed. By default, executables are run on the master and all the worker nodes. Yes List None
config_bucket The Cloud Storage bucket to use to stage the job dependencies, the configuration files, and the job driver console’s output. Yes GCSPath None
image_version The version of the software inside the cluster. Yes String None
cluster The full cluster configuration. Yes Dict None
wait_interval The number of seconds to pause before polling the operation. Yes Integer 30

Output

Name Description Type
cluster_name The name of the cluster. String

Note: You can recycle the cluster by using the Dataproc delete cluster component.

Cautions & requirements

To use the component, you must:

  • Set up the GCP project by following these steps.
  • The component can authenticate to GCP. Refer to Authenticating Pipelines to GCP for details.
  • Grant the following types of access to the Kubeflow user service account:
    • Read access to the Cloud Storage buckets which contains initialization action files.
    • The role, roles/dataproc.editor on the project.

Detailed description

This component creates a new Dataproc cluster by using the Dataproc create cluster REST API.

Follow these steps to use the component in a pipeline:

  1. Install the Kubeflow Pipeline SDK:

In [ ]:
%%capture --no-stderr

KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'
!pip3 install $KFP_PACKAGE --upgrade
  1. Load the component using KFP SDK

In [ ]:
import kfp.components as comp

dataproc_create_cluster_op = comp.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/dataproc/create_cluster/component.yaml')
help(dataproc_create_cluster_op)

Sample

Note: The following sample code works in an IPython notebook or directly in Python code. See the sample code below to learn how to execute the template.

Set sample parameters


In [ ]:
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'

# Optional Parameters
EXPERIMENT_NAME = 'Dataproc - Create Cluster'

Example pipeline that uses the component


In [ ]:
import kfp.dsl as dsl
import json
@dsl.pipeline(
    name='Dataproc create cluster pipeline',
    description='Dataproc create cluster pipeline'
)
def dataproc_create_cluster_pipeline(
    project_id = PROJECT_ID, 
    region = 'us-central1', 
    name='', 
    name_prefix='',
    initialization_actions='', 
    config_bucket='', 
    image_version='', 
    cluster='', 
    wait_interval='30'
):
    dataproc_create_cluster_op(
        project_id=project_id, 
        region=region, 
        name=name, 
        name_prefix=name_prefix, 
        initialization_actions=initialization_actions, 
        config_bucket=config_bucket, 
        image_version=image_version, 
        cluster=cluster, 
        wait_interval=wait_interval)

Compile the pipeline


In [ ]:
pipeline_func = dataproc_create_cluster_pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

Submit the pipeline for execution


In [ ]:
#Specify pipeline argument values
arguments = {}

#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

References

License

By deploying or using this software you agree to comply with the AI Hub Terms of Service and the Google APIs Terms of Service. To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.