The notebook "Tutorial: Exporting BQML Models to Online AI Platform Prediction" walks through the concepts of training model in BQML and exporting the model to be served in AI Platform. This notebook takes that process and implements a Kubeflow Pipeline to automate the steps.
In [ ]:
import kfp
import kfp.components as comp
import random
import os
In [ ]:
#Common Parameters
PROJECT_ID=[] #enter your project name
KFPHOST=[] #enter your KFP hostname
#Parameters for BQML
DATASET=[] #name of dataset to create or use if exists
VIEW= [] #name of view to be created for BQML create model
MODEL=[] #model name for both BQML and AI Platform
ALGO=[] #e.g. 'linear_reg'
#Parameters for AI Platform Prediction
REGION=[] #e.g. 'us-central1'
MODEL_VERSION=[] #e.g. 'v1'
RUNTIME_VERSION='1.15' #do not change
PYTHON_VERSION='3.7' #do not change
MODEL_BUCKET='gs://{0}-{1}'.format(PROJECT_ID,str(random.randrange(1000,10000)))
MODEL_PATH=os.path.join(MODEL_BUCKET,'bqml/model/export/',MODEL,MODEL_VERSION)
#Parameters for KF Pipeline
KFP_EXPERIMENT_NAME='Natality Pipeline'
In [ ]:
mlengine_deploy_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/ml_engine/deploy/component.yaml')
If you cannot find the op in the component library to support your need, you will need to create that container. An easy way to do this is via kfp.components.func_to_container_op. In our case, we would like to execute a number of bq commands. To do this, we will create a Python function as a general command executor, then convert the function to a container op via kfp.components.func_to_container_op.
In [ ]:
def gcp_command_func(project_id: str, command_string: str) -> str:
import subprocess
config_string="gcloud config set project {}".format(project_id)
config=subprocess.run(config_string, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)
print("Running command: {}".format(command_string))
response=subprocess.run(command_string, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)
print("Command response: {}".format(response.stdout))
return project_id
gcp_command_op=comp.func_to_container_op(func=gcp_command_func, base_image="google/cloud-sdk:latest")
In [ ]:
def make_bucket(bucket):
return "gsutil ls {0} || gsutil mb {0}".format(bucket)
def create_dataset(dataset):
return "bq show {0} || bq mk {0}".format(dataset)
def create_view(dataset, view):
query = """
SELECT
weight_pounds,
is_male,
gestation_weeks,
mother_age,
CASE
WHEN MOD(CAST(ROUND(weight_pounds*100) as int64), 10) < 8 THEN "training"
WHEN MOD(CAST(ROUND(weight_pounds*100) as int64), 10) = 8 THEN "evaluation"
WHEN MOD(CAST(ROUND(weight_pounds*100) as int64), 10) = 9 THEN "prediction"
END AS datasplit
FROM
`bigquery-public-data.samples.natality`
WHERE
weight_pounds IS NOT NULL
""".format(dataset, view)
return "bq show {1}.{2} || bq mk --use_legacy_sql=false --view '{0}' {1}.{2}".format(query, dataset, view)
def create_model(dataset, view, model, algo):
query = """
CREATE OR REPLACE MODEL
`{0}.{2}`
OPTIONS
(model_type="{3}",
input_label_cols=["weight_pounds"]) AS
SELECT
weight_pounds,
is_male,
gestation_weeks,
mother_age
FROM
{0}.{1}
WHERE
datasplit = "training"
""".format(dataset, view, model, algo)
return "bq show {1}.{3} || bq query --use_legacy_sql=false '{0}'".format(query, dataset, view, model, algo)
def export_model(dataset, model, export_path):
return "bq extract -m {0}.{1} {2}".format(dataset, model, export_path)
Ops in a Kubeflow Pipeline is organized in to a Directed Acyclic Graph (DAG). Order of operation of the ops is controlled by their dependencies on other ops. Ops with no dependencies or dependencies that are satisfied will run. Dependencies can be naturally created when one op's input is dependent on another op's output. If your ops do not have that kind of dependency, you can still manually enforce it with this pattern:
current_op.after(previous_op) where current_op depends on previous_op
In [ ]:
@kfp.dsl.pipeline(
name='BQML Model Export to AI Platform Prediction',
description='This pipeline trains a BQML model and exports to GCS, then loads into AI Platform Prediction.'
)
def bqml_to_caip(project_id = PROJECT_ID,
bucket=MODEL_BUCKET,
model_path=MODEL_PATH,
dataset=DATASET,
view=VIEW,
model=MODEL,
model_version=MODEL_VERSION,
algo=ALGO,
export_path=MODEL_PATH,
runtime_version=RUNTIME_VERSION,
python_version=PYTHON_VERSION,
region=REGION
):
#Prepare commands for gcp_command_op
make_bucket_command=make_bucket(bucket)
create_dataset_command=create_dataset(dataset)
create_view_command=create_view(dataset, view)
create_model_command=create_model(dataset, view, model, algo)
export_model_command=export_model(dataset, model, export_path)
#Create ops in pipeline
make_bucket_op=gcp_command_op(project_id=project_id,
command_string=make_bucket_command)
create_dataset_op=gcp_command_op(project_id=project_id,
command_string=create_dataset_command)
create_view_op=gcp_command_op(project_id=project_id,
command_string=create_view_command)
create_model_op=gcp_command_op(project_id=project_id,
command_string=create_model_command)
export_model_op=gcp_command_op(project_id=project_id,
command_string=export_model_command)
model_deploy_op=mlengine_deploy_op(model_uri=export_path,
project_id=project_id,
model_id=model,
version_id=model_version,
runtime_version=runtime_version,
python_version=python_version)
#Set op dependencies
create_dataset_op.after(make_bucket_op)
create_view_op.after(create_dataset_op)
create_model_op.after(create_view_op)
export_model_op.after(create_model_op)
model_deploy_op.after(export_model_op)
In [ ]:
pipeline_func = bqml_to_caip
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
arguments = {}
Create an experiment name. If this experiment already exist, this step will set the experiment name to the specified experiment.
In [ ]:
client = kfp.Client(KFPHOST)
experiment = client.create_experiment(KFP_EXPERIMENT_NAME)
Submit a Pipeline run under the experiment name above.
In [ ]:
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(
experiment_id=experiment.id,
job_name=run_name,
pipeline_package_path=pipeline_filename,
params=arguments)