Batch prediction using Cloud Machine Learning Engine
Cloud Storage, Cloud ML Engine, Kubeflow, Pipeline, Component
A Kubeflow Pipeline component to submit a batch prediction job against a deployed model on Cloud ML Engine.
Use the component to run a batch prediction job against a deployed model on Cloud ML Engine. The prediction output is stored in a Cloud Storage bucket.
Argument | Description | Optional | Data type | Accepted values | Default |
---|---|---|---|---|---|
project_id | The ID of the Google Cloud Platform (GCP) project of the job. | No | GCPProjectID | ||
model_path | The path to the model. It can be one of the following:
|
No | GCSPath | ||
input_paths | The path to the Cloud Storage location containing the input data files. It can contain wildcards, for example, gs://foo/*.csv |
No | List | GCSPath | |
input_data_format | The format of the input data files. See REST Resource: projects.jobs for more details. | No | String | DataFormat | |
output_path | The path to the Cloud Storage location for the output data. | No | GCSPath | ||
region | The Compute Engine region where the prediction job is run. | No | GCPRegion | ||
output_data_format | The format of the output data files. See REST Resource: projects.jobs for more details. | Yes | String | DataFormat | JSON |
prediction_input | The JSON input parameters to create a prediction job. See PredictionInput for more information. | Yes | Dict | None | |
job_id_prefix | The prefix of the generated job id. | Yes | String | None | |
wait_interval | The number of seconds to wait in case the operation has a long run time. | Yes | 30 |
The component accepts the following as input:
model_path
runtime argument.input_paths
and the format is specified by input_data_format
.Name | Description | Type |
---|---|---|
job_id | The ID of the created batch job. | String |
output_path | The output path of the batch prediction job | GCSPath |
To use the component, you must:
Follow these steps to use the component in a pipeline:
In [ ]:
%%capture --no-stderr
KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.14/kfp.tar.gz'
!pip3 install $KFP_PACKAGE --upgrade
In [ ]:
import kfp.components as comp
mlengine_batch_predict_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/ml_engine/batch_predict/component.yaml')
help(mlengine_batch_predict_op)
Note: The following sample code works in an IPython notebook or directly in Python code.
In this sample, you batch predict against a pre-built trained model from gs://ml-pipeline-playground/samples/ml_engine/census/trained_model/
and use the test data from gs://ml-pipeline-playground/samples/ml_engine/census/test.json
.
In [ ]:
!gsutil cat gs://ml-pipeline-playground/samples/ml_engine/census/test.json
In [ ]:
# Required Parameters
PROJECT_ID = '<Please put your project ID here>'
GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash
In [ ]:
# Optional Parameters
EXPERIMENT_NAME = 'CLOUDML - Batch Predict'
OUTPUT_GCS_PATH = GCS_WORKING_DIR + '/batch_predict/output/'
In [ ]:
import kfp.dsl as dsl
import json
@dsl.pipeline(
name='CloudML batch predict pipeline',
description='CloudML batch predict pipeline'
)
def pipeline(
project_id = PROJECT_ID,
model_path = 'gs://ml-pipeline-playground/samples/ml_engine/census/trained_model/',
input_paths = '["gs://ml-pipeline-playground/samples/ml_engine/census/test.json"]',
input_data_format = 'JSON',
output_path = OUTPUT_GCS_PATH,
region = 'us-central1',
output_data_format='',
prediction_input = json.dumps({
'runtimeVersion': '1.10'
}),
job_id_prefix='',
wait_interval='30'):
mlengine_batch_predict_op(
project_id=project_id,
model_path=model_path,
input_paths=input_paths,
input_data_format=input_data_format,
output_path=output_path,
region=region,
output_data_format=output_data_format,
prediction_input=prediction_input,
job_id_prefix=job_id_prefix,
wait_interval=wait_interval)
In [ ]:
pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
In [ ]:
#Specify pipeline argument values
arguments = {}
#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
In [ ]:
OUTPUT_FILES_PATTERN = OUTPUT_GCS_PATH + '*'
!gsutil cat OUTPUT_FILES_PATTERN
By deploying or using this software you agree to comply with the AI Hub Terms of Service and the Google APIs Terms of Service. To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.