Data preparation using SparkSQL on YARN with Cloud Dataproc
Cloud Dataproc, GCP, Cloud Storage, YARN, SparkSQL, Kubeflow, pipelines, components
A Kubeflow Pipeline component to prepare data by submitting a SparkSql job on YARN to Cloud Dataproc.
Use the component to run an Apache SparkSql job as one preprocessing step in a Kubeflow Pipeline.
| Argument | Description | Optional | Data type | Accepted values | Default | |
|---|---|---|---|---|---|---|
| project_id | The ID of the Google Cloud Platform (GCP) project that the cluster belongs to. | No | GCPProjectID | |||
| region | The Cloud Dataproc region to handle the request. | No | GCPRegion | |||
| cluster_name | The name of the cluster to run the job. | No | String | |||
| queries | The queries to execute the SparkSQL job. Specify multiple queries in one string by separating them with semicolons. You do not need to terminate queries with semicolons. | Yes | List | None | ||
| query_file_uri | The HCFS URI of the script that contains the SparkSQL queries. | Yes | GCSPath | None | ||
| script_variables | Mapping of the query’s variable names to their values (equivalent to the SparkSQL command: SET name="value";). | Yes | Dict | None | ||
| sparksql_job | The payload of a SparkSqlJob. | Yes | Dict | None | ||
| job | The payload of a Dataproc job. | Yes | Dict | None | ||
| wait_interval | The number of seconds to pause between polling the operation. | Yes | Integer | 30 | 
| Name | Description | Type | 
|---|---|---|
| job_id | The ID of the created job. | String | 
To use the component, you must:
roles/dataproc.editor on the project.This component creates a Pig job from Dataproc submit job REST API.
Follow these steps to use the component in a pipeline:
In [ ]:
    
%%capture --no-stderr
KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'
!pip3 install $KFP_PACKAGE --upgrade
    
In [ ]:
    
import kfp.components as comp
dataproc_submit_sparksql_job_op = comp.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/dataproc/submit_sparksql_job/component.yaml')
help(dataproc_submit_sparksql_job_op)
    
Note: The following sample code works in an IPython notebook or directly in Python code. See the sample code below to learn how to execute the template.
Create a new Dataproc cluster (or reuse an existing one) before running the sample code.
Either put your SparkSQL queries in the queires list, or upload your SparkSQL queries into a file to a Cloud Storage bucket and then enter the Cloud Storage bucket’s path in query_file_uri. In this sample, we will use a hard coded query in the queries list to select data from a public CSV file from Cloud Storage.
For more details about Spark SQL, see Spark SQL, DataFrames and Datasets Guide
In [ ]:
    
PROJECT_ID = '<Please put your project ID here>'
CLUSTER_NAME = '<Please put your existing cluster name here>'
REGION = 'us-central1'
QUERY = '''
DROP TABLE IF EXISTS natality_csv;
CREATE EXTERNAL TABLE natality_csv (
  source_year BIGINT, year BIGINT, month BIGINT, day BIGINT, wday BIGINT,
  state STRING, is_male BOOLEAN, child_race BIGINT, weight_pounds FLOAT,
  plurality BIGINT, apgar_1min BIGINT, apgar_5min BIGINT,
  mother_residence_state STRING, mother_race BIGINT, mother_age BIGINT,
  gestation_weeks BIGINT, lmp STRING, mother_married BOOLEAN,
  mother_birth_state STRING, cigarette_use BOOLEAN, cigarettes_per_day BIGINT,
  alcohol_use BOOLEAN, drinks_per_week BIGINT, weight_gain_pounds BIGINT,
  born_alive_alive BIGINT, born_alive_dead BIGINT, born_dead BIGINT,
  ever_born BIGINT, father_race BIGINT, father_age BIGINT,
  record_weight BIGINT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 'gs://public-datasets/natality/csv';
SELECT * FROM natality_csv LIMIT 10;'''
EXPERIMENT_NAME = 'Dataproc - Submit SparkSQL Job'
    
In [ ]:
    
import kfp.dsl as dsl
import json
@dsl.pipeline(
    name='Dataproc submit SparkSQL job pipeline',
    description='Dataproc submit SparkSQL job pipeline'
)
def dataproc_submit_sparksql_job_pipeline(
    project_id = PROJECT_ID, 
    region = REGION,
    cluster_name = CLUSTER_NAME,
    queries = json.dumps([QUERY]),
    query_file_uri = '',
    script_variables = '', 
    sparksql_job='', 
    job='', 
    wait_interval='30'
):
    dataproc_submit_sparksql_job_op(
        project_id=project_id, 
        region=region, 
        cluster_name=cluster_name, 
        queries=queries, 
        query_file_uri=query_file_uri,
        script_variables=script_variables, 
        sparksql_job=sparksql_job, 
        job=job, 
        wait_interval=wait_interval)
    
In [ ]:
    
pipeline_func = dataproc_submit_sparksql_job_pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
    
In [ ]:
    
#Specify pipeline argument values
arguments = {}
#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
    
By deploying or using this software you agree to comply with the AI Hub Terms of Service and the Google APIs Terms of Service. To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.