Kubeflow Pipeline: Exporting BQML Models to Online AI Platform Prediction

The notebook "Tutorial: Exporting BQML Models to Online AI Platform Prediction" walks through the concepts of training model in BQML and exporting the model to be served in AI Platform. This notebook takes that process and implements a Kubeflow Pipeline to automate the steps.


In [ ]:
import kfp
import kfp.components as comp
import random
import os

In [ ]:
#Common Parameters
PROJECT_ID=[] #enter your project name
KFPHOST=[] #enter your KFP hostname

#Parameters for BQML
DATASET=[]     #name of dataset to create or use if exists
VIEW= []       #name of view to be created for BQML create model
MODEL=[]       #model name for both BQML and AI Platform
ALGO=[]        #e.g. 'linear_reg'

#Parameters for AI Platform Prediction
REGION=[]               #e.g. 'us-central1'
MODEL_VERSION=[]        #e.g. 'v1'
RUNTIME_VERSION='1.15'  #do not change
PYTHON_VERSION='3.7'    #do not change
MODEL_BUCKET='gs://{0}-{1}'.format(PROJECT_ID,str(random.randrange(1000,10000)))
MODEL_PATH=os.path.join(MODEL_BUCKET,'bqml/model/export/',MODEL,MODEL_VERSION)

#Parameters for KF Pipeline
KFP_EXPERIMENT_NAME='Natality Pipeline'

Creating KFP Ops

Each step in a Kubeflow Pipeline is a container operation. If the operation you would like to accomplish is already available in the components library in the KFP repo, the process of creating an op is simply loading it. AI Platform provides such an op for model deployment.


In [ ]:
mlengine_deploy_op = comp.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/01a23ae8672d3b18e88adf3036071496aca3552d/components/gcp/ml_engine/deploy/component.yaml')

If you cannot find the op in the component library to support your need, you will need to create that container. An easy way to do this is via kfp.components.func_to_container_op. In our case, we would like to execute a number of bq commands. To do this, we will create a Python function as a general command executor, then convert the function to a container op via kfp.components.func_to_container_op.


In [ ]:
def gcp_command_func(project_id: str, command_string: str) -> str:
    import subprocess
    config_string="gcloud config set project {}".format(project_id)
    config=subprocess.run(config_string, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)
    print("Running command: {}".format(command_string))
    response=subprocess.run(command_string, shell=True, check=True, stdout=subprocess.PIPE, universal_newlines=True)
    print("Command response: {}".format(response.stdout))
    return project_id

gcp_command_op=comp.func_to_container_op(func=gcp_command_func, base_image="google/cloud-sdk:latest")

Prepare bq commands

There are four bq operations in our pipeline:

  1. Create Dataset
  2. Create View
  3. Create Model
  4. Export Model

The following will create the commands that will be executed by the gcp_command_op created above.


In [ ]:
def make_bucket(bucket):
    return "gsutil ls {0} || gsutil mb {0}".format(bucket)

def create_dataset(dataset):
    return "bq show {0} || bq mk {0}".format(dataset)

def create_view(dataset, view):
    query = """
        SELECT
          weight_pounds,
          is_male,
          gestation_weeks,
          mother_age,
          CASE
            WHEN MOD(CAST(ROUND(weight_pounds*100) as int64), 10) < 8 THEN "training"
            WHEN MOD(CAST(ROUND(weight_pounds*100) as int64), 10) = 8 THEN "evaluation"
            WHEN MOD(CAST(ROUND(weight_pounds*100) as int64), 10) = 9 THEN "prediction"
          END AS datasplit
        FROM
          `bigquery-public-data.samples.natality`
        WHERE
          weight_pounds IS NOT NULL
    """.format(dataset, view)
    return "bq show {1}.{2} || bq mk --use_legacy_sql=false --view '{0}' {1}.{2}".format(query, dataset, view)

def create_model(dataset, view, model, algo):
    query = """
        CREATE OR REPLACE MODEL
          `{0}.{2}`
        OPTIONS
          (model_type="{3}",
            input_label_cols=["weight_pounds"]) AS
                SELECT
                  weight_pounds,
                  is_male,
                  gestation_weeks,
                  mother_age
                FROM
                  {0}.{1}
                WHERE
                  datasplit = "training"
    """.format(dataset, view, model, algo)
    return "bq show {1}.{3} || bq query --use_legacy_sql=false '{0}'".format(query, dataset, view, model, algo)
    
def export_model(dataset, model, export_path):
    return "bq extract -m {0}.{1} {2}".format(dataset, model, export_path)

Ops in a Kubeflow Pipeline is organized in to a Directed Acyclic Graph (DAG). Order of operation of the ops is controlled by their dependencies on other ops. Ops with no dependencies or dependencies that are satisfied will run. Dependencies can be naturally created when one op's input is dependent on another op's output. If your ops do not have that kind of dependency, you can still manually enforce it with this pattern:

current_op.after(previous_op) where current_op depends on previous_op


In [ ]:
@kfp.dsl.pipeline(
    name='BQML Model Export to AI Platform Prediction',
    description='This pipeline trains a BQML model and exports to GCS, then loads into AI Platform Prediction.'
)

def bqml_to_caip(project_id = PROJECT_ID,
                 bucket=MODEL_BUCKET,
                 model_path=MODEL_PATH,
                 dataset=DATASET,
                 view=VIEW,
                 model=MODEL,
                 model_version=MODEL_VERSION,
                 algo=ALGO,
                 export_path=MODEL_PATH,
                 runtime_version=RUNTIME_VERSION,
                 python_version=PYTHON_VERSION,
                 region=REGION
):
    
    #Prepare commands for gcp_command_op
    make_bucket_command=make_bucket(bucket)
    create_dataset_command=create_dataset(dataset)
    create_view_command=create_view(dataset, view)
    create_model_command=create_model(dataset, view, model, algo)
    export_model_command=export_model(dataset, model, export_path)
    
    #Create ops in pipeline
    make_bucket_op=gcp_command_op(project_id=project_id,
                                  command_string=make_bucket_command)
    create_dataset_op=gcp_command_op(project_id=project_id,
                                     command_string=create_dataset_command)
    create_view_op=gcp_command_op(project_id=project_id,
                                  command_string=create_view_command)
    create_model_op=gcp_command_op(project_id=project_id,
                                   command_string=create_model_command)
    export_model_op=gcp_command_op(project_id=project_id,
                                   command_string=export_model_command)
    model_deploy_op=mlengine_deploy_op(model_uri=export_path,
                                       project_id=project_id,
                                       model_id=model,
                                       version_id=model_version,
                                       runtime_version=runtime_version,
                                       python_version=python_version)
    
    #Set op dependencies
    create_dataset_op.after(make_bucket_op)
    create_view_op.after(create_dataset_op)
    create_model_op.after(create_view_op)
    export_model_op.after(create_model_op)
    model_deploy_op.after(export_model_op)

Compile and Run the Pipeline


In [ ]:
pipeline_func = bqml_to_caip
pipeline_filename = pipeline_func.__name__ + '.zip'

import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

arguments = {}

Create an experiment name. If this experiment already exist, this step will set the experiment name to the specified experiment.


In [ ]:
client = kfp.Client(KFPHOST)
experiment = client.create_experiment(KFP_EXPERIMENT_NAME)

Submit a Pipeline run under the experiment name above.


In [ ]:
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(
    experiment_id=experiment.id, 
    job_name=run_name, 
    pipeline_package_path=pipeline_filename, 
    params=arguments)