Submitting a Cloud Machine Learning Engine training job as a pipeline step
GCP, Cloud ML Engine, Machine Learning, pipeline, component, Kubeflow, Kubeflow Pipeline
A Kubeflow Pipeline component to submit a Cloud ML Engine training job as a step in a pipeline.
Use this component to submit a training job to Cloud ML Engine from a Kubeflow Pipeline.
Argument | Description | Optional | Data type | Accepted values | Default |
project_id | The ID of the Google Cloud Platform (GCP) project of the job. | No | GCPProjectID | ||
python_module | The name of the Python module to run after installing the training program. | Yes | String | None | |
package_uris | The Cloud Storage location of the packages that contain the training program and any additional dependencies. The maximum number of package URIs is 100. | Yes | List | None | |
region | The Compute Engine region in which the training job is run. | Yes | GCPRegion | us-central1 | |
args | The command line arguments to pass to the training program. | Yes | List | None | |
job_dir | A Cloud Storage path in which to store the training outputs and other data needed for training. This path is passed to your TensorFlow program as the job-dir command-line argument. The benefit of specifying this field is that Cloud ML validates the path for use in training. |
Yes | GCSPath | None | |
python_version | The version of Python used in training. If it is not set, the default version is 2.7. Python 3.5 is available when the runtime version is set to 1.4 and above. | Yes | String | None | |
runtime_version | The runtime version of Cloud ML Engine to use for training. If it is not set, Cloud ML Engine uses the default. | Yes | String | 1 | |
master_image_uri | The Docker image to run on the master replica. This image must be in Container Registry. | Yes | GCRPath | None | |
worker_image_uri | The Docker image to run on the worker replica. This image must be in Container Registry. | Yes | GCRPath | None | |
training_input | The input parameters to create a training job. | Yes | Dict | TrainingInput | None |
job_id_prefix | The prefix of the job ID that is generated. | Yes | String | None | |
wait_interval | The number of seconds to wait between API calls to get the status of the job. | Yes | Integer | 30 |
The component accepts two types of inputs:
Name | Description | Type |
job_id | The ID of the created job. | String |
job_dir | The Cloud Storage path that contains the trained model output files. | GCSPath |
To use the component, you must:
The component builds the TrainingInput payload and submits a job via the Cloud ML Engine REST API.
The steps to use the component in a pipeline are:
In [ ]:
%%capture --no-stderr
!pip3 install $KFP_PACKAGE --upgrade
In [ ]:
import kfp.components as comp
mlengine_train_op = comp.load_component_from_url(
Note: The following sample code works in an IPython notebook or directly in Python code.
In this sample, you use the code from the census estimator sample to train a model in Cloud ML Engine. To upload the code to Cloud ML Engine, package the Python code and upload it to a Cloud Storage bucket.
Note: You must have read and write permissions on the bucket that you use as the working directory.
In [ ]:
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'
GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash
In [ ]:
# Optional Parameters
TRAINER_GCS_PATH = GCS_WORKING_DIR + '/train/trainer.tar.gz'
OUTPUT_GCS_PATH = GCS_WORKING_DIR + '/train/output/'
In [ ]:
%%capture --no-stderr
!gsutil rm -r $GCS_WORKING_DIR
In [ ]:
%%capture --no-stderr
In [ ]:
%%capture --no-stderr
%%bash -s "$TRAINER_GCS_PATH"
pushd ./cloudml-samples-master/census/estimator/
python sdist
gsutil cp dist/preprocessing-1.0.tar.gz $1
rm -fr ./cloudml-samples-master/ ./ ./dist
In [ ]:
import kfp.dsl as dsl
import json
name='CloudML training pipeline',
description='CloudML training pipeline'
def pipeline(
project_id = PROJECT_ID,
python_module = 'trainer.task',
package_uris = json.dumps([TRAINER_GCS_PATH]),
region = 'us-central1',
args = json.dumps([
'--train-files', 'gs://cloud-samples-data/ml-engine/census/data/',
'--eval-files', 'gs://cloud-samples-data/ml-engine/census/data/adult.test.csv',
'--train-steps', '1000',
'--eval-steps', '100',
'--verbosity', 'DEBUG'
job_dir = OUTPUT_GCS_PATH,
python_version = '',
runtime_version = '1.10',
master_image_uri = '',
worker_image_uri = '',
training_input = '',
job_id_prefix = '',
wait_interval = '30'):
task = mlengine_train_op(
In [ ]:
pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
In [ ]:
#Specify pipeline argument values
arguments = {}
#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(, run_name, pipeline_filename, arguments)
In [ ]:
!gsutil ls $OUTPUT_GCS_PATH
By deploying or using this software you agree to comply with the AI Hub Terms of Service and the Google APIs Terms of Service. To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.