
Data preparation using Spark on YARN with Cloud Dataproc


Cloud Dataproc, GCP, Cloud Storage, Spark, Kubeflow, pipelines, components, YARN


A Kubeflow Pipeline component to prepare data by submitting a Spark job on YARN to Cloud Dataproc.


Intended use

Use the component to run an Apache Spark job as one preprocessing step in a Kubeflow Pipeline.

Runtime arguments

Argument Description Optional Data type Accepted values Default
project_id The ID of the Google Cloud Platform (GCP) project that the cluster belongs to. No GCPProjectID
region The Cloud Dataproc region to handle the request. No GCPRegion
cluster_name The name of the cluster to run the job. No String
main_jar_file_uri The Hadoop Compatible Filesystem (HCFS) URI of the JAR file that contains the main class. No GCSPath
main_class The name of the driver's main class. The JAR file that contains the class must be either in the default CLASSPATH or specified in spark_job.jarFileUris. No
args The arguments to pass to the driver. Do not include arguments, such as --conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission. Yes
spark_job The payload of a SparkJob. Yes
job The payload of a Dataproc job. Yes
wait_interval The number of seconds to wait between polling the operation. Yes 30


Name Description Type
job_id The ID of the created job. String

Cautions & requirements

To use the component, you must:

Detailed description

This component creates a Spark job from Dataproc submit job REST API.

Follow these steps to use the component in a pipeline:

  1. Install the Kubeflow Pipeline SDK:

In [ ]:
%%capture --no-stderr

!pip3 install $KFP_PACKAGE --upgrade
  1. Load the component using KFP SDK

In [ ]:
import kfp.components as comp

dataproc_submit_spark_job_op = comp.load_component_from_url(


Note: The following sample code works in an IPython notebook or directly in Python code.

Set up a Dataproc cluster

Create a new Dataproc cluster (or reuse an existing one) before running the sample code.

Prepare a Spark job

Upload your Spark JAR file to a Cloud Storage bucket. In the sample, we use a JAR file that is preinstalled in the main cluster: file:///usr/lib/spark/examples/jars/spark-examples.jar.

Here is the source code of the sample.

To package a self-contained Spark application, follow these instructions.

Set sample parameters

In [ ]:
PROJECT_ID = '<Please put your project ID here>'
CLUSTER_NAME = '<Please put your existing cluster name here>'
REGION = 'us-central1'
SPARK_FILE_URI = 'file:///usr/lib/spark/examples/jars/spark-examples.jar'
MAIN_CLASS = 'org.apache.spark.examples.SparkPi'
ARGS = ['1000']
EXPERIMENT_NAME = 'Dataproc - Submit Spark Job'

Example pipeline that uses the component

In [ ]:
import kfp.dsl as dsl
import json
    name='Dataproc submit Spark job pipeline',
    description='Dataproc submit Spark job pipeline'
def dataproc_submit_spark_job_pipeline(
    project_id = PROJECT_ID, 
    region = REGION,
    cluster_name = CLUSTER_NAME,
    main_jar_file_uri = '',
    main_class = MAIN_CLASS,
    args = json.dumps(ARGS), 
    spark_job=json.dumps({ 'jarFileUris': [ SPARK_FILE_URI ] }), 

Compile the pipeline

In [ ]:
pipeline_func = dataproc_submit_spark_job_pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

Submit the pipeline for execution

In [ ]:
#Specify pipeline argument values
arguments = {}

#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(, run_name, pipeline_filename, arguments)



By deploying or using this software you agree to comply with the AI Hub Terms of Service and the Google APIs Terms of Service. To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.