Data preparation using PySpark on Cloud Dataproc
Cloud Dataproc, GCP, Cloud Storage,PySpark, Kubeflow, pipelines, components
A Kubeflow Pipeline component to prepare data by submitting a PySpark job to Cloud Dataproc.
Use the component to run an Apache PySpark job as one preprocessing step in a Kubeflow Pipeline.
Argument | Description | Optional | Data type | Accepted values | Default |
---|---|---|---|---|---|
project_id | The ID of the Google Cloud Platform (GCP) project that the cluster belongs to. | No | GCPProjectID | ||
region | The Cloud Dataproc region to handle the request. | No | GCPRegion | ||
cluster_name | The name of the cluster to run the job. | No | String | ||
main_python_file_uri | The HCFS URI of the Python file to use as the driver. This must be a .py file. | No | GCSPath | ||
args | The arguments to pass to the driver. Do not include arguments, such as --conf, that can be set as job properties, since a collision may occur that causes an incorrect job submission. | Yes | List | None | |
pyspark_job | The payload of a PySparkJob. | Yes | Dict | None | |
job | The payload of a Dataproc job. | Yes | Dict | None |
Name | Description | Type |
---|---|---|
job_id | The ID of the created job. | String |
To use the component, you must:
roles/dataproc.editor
on the project.This component creates a PySpark job from the Dataproc submit job REST API.
Follow these steps to use the component in a pipeline:
In [ ]:
%%capture --no-stderr
KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'
!pip3 install $KFP_PACKAGE --upgrade
In [ ]:
import kfp.components as comp
dataproc_submit_pyspark_job_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/dataproc/submit_pyspark_job/component.yaml')
help(dataproc_submit_pyspark_job_op)
Note: The following sample code works in an IPython notebook or directly in Python code. See the sample code below to learn how to execute the template.
Create a new Dataproc cluster (or reuse an existing one) before running the sample code.
Upload your PySpark code file to a Cloud Storage bucket. For example, this is a publicly accessible hello-world.py
in Cloud Storage:
In [ ]:
!gsutil cat gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py
In [ ]:
PROJECT_ID = '<Please put your project ID here>'
CLUSTER_NAME = '<Please put your existing cluster name here>'
REGION = 'us-central1'
PYSPARK_FILE_URI = 'gs://dataproc-examples-2f10d78d114f6aaec76462e3c310f31f/src/pyspark/hello-world/hello-world.py'
ARGS = ''
EXPERIMENT_NAME = 'Dataproc - Submit PySpark Job'
In [ ]:
import kfp.dsl as dsl
import json
@dsl.pipeline(
name='Dataproc submit PySpark job pipeline',
description='Dataproc submit PySpark job pipeline'
)
def dataproc_submit_pyspark_job_pipeline(
project_id = PROJECT_ID,
region = REGION,
cluster_name = CLUSTER_NAME,
main_python_file_uri = PYSPARK_FILE_URI,
args = ARGS,
pyspark_job='{}',
job='{}',
wait_interval='30'
):
dataproc_submit_pyspark_job_op(
project_id=project_id,
region=region,
cluster_name=cluster_name,
main_python_file_uri=main_python_file_uri,
args=args,
pyspark_job=pyspark_job,
job=job,
wait_interval=wait_interval)
In [ ]:
pipeline_func = dataproc_submit_pyspark_job_pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
In [ ]:
#Specify pipeline argument values
arguments = {}
#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
By deploying or using this software you agree to comply with the AI Hub Terms of Service and the Google APIs Terms of Service. To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.