Cascade Design Pattern

This notebook demonstrates using the Cascade design pattern to train a model to predict the distance that a bicycle will be ridden. Let's assume that distances on bikes held longer than 4 hours is important to us, but these are rare. So, we train a Cascade of ML models.

The first model classifies trips into Typical trips and Long trips. Then, we create two training datasets based on the prediction of the first model. Next, we train two regression models to predict distance. Finally, we combine the two models in order to evaluate the Cascade as a whole.

To try out this notebook:


In [1]:
# CHANGE the following settings
PROJECT_ID='ai-analytics-solutions' 
KFPHOST='20844794c6e37538-dot-us-central2.pipelines.googleusercontent.com' # from settings button in CAIP Pipelines

In [2]:
!bq show mlpatterns || bq mk mlpatterns


Dataset ai-analytics-solutions:mlpatterns

   Last modified                              ACLs                              Labels  
 ----------------- ----------------------------------------------------------- -------- 
  10 Apr 05:46:16   Owners:                                                             
                      kfpdemo@ai-analytics-solutions.iam.gserviceaccount.com,           
                      projectOwners                                                     
                    Writers:                                                            
                      projectWriters                                                    
                    Readers:                                                            
                      projectReaders                                                    


In [3]:
import kfp
import kfp.components as comp

In [4]:
bigquery_query_op = comp.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/0e794e8a0eff6f81ddc857946ee8311c7c431ec2/components/gcp/bigquery/query/component.yaml')
help(bigquery_query_op)


Help on function Bigquery - Query:

Bigquery - Query(query:str, project_id:'GCPProjectID', dataset_id:str='', table_id:str='', output_gcs_path:'GCSPath'='', dataset_location:str='US', job_config:dict='')
    Bigquery - Query
    A Kubeflow Pipeline component to submit a query to Google Cloud Bigquery 
    service and dump outputs to a Google Cloud Storage blob.


In [9]:
import kfp.dsl as dsl
from typing import NamedTuple
import json
import os

def run_bigquery_ddl(project_id: str, query_string: str, location: str) -> NamedTuple(
    'DDLOutput', [('created_table', str), ('query', str)]):
    """
    Runs BigQuery query and returns a table/model name
    """
    print(query_string)
        
    from google.cloud import bigquery
    from google.api_core.future import polling
    from google.cloud import bigquery
    from google.cloud.bigquery import retry as bq_retry
    
    bqclient = bigquery.Client(project=project_id, location=location)
    job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)
    job._retry = polling.DEFAULT_RETRY
    
    while job.running():
        from time import sleep
        sleep(0.1)
        print('Running ...')
        
    tblname = job.ddl_target_table
    tblname = '{}.{}'.format(tblname.dataset_id, tblname.table_id)
    print('{} created in {}'.format(tblname, job.ended - job.started))
    
    from collections import namedtuple
    result_tuple = namedtuple('DDLOutput', ['created_table', 'query'])
    return result_tuple(tblname, query_string)


def train_classification_model(ddlop, project_id):
    query = """
        CREATE OR REPLACE MODEL mlpatterns.classify_trips
        TRANSFORM(
          trip_type,
          EXTRACT (HOUR FROM start_date) AS start_hour,
          EXTRACT (DAYOFWEEK FROM start_date) AS day_of_week,
          start_station_name,
          subscriber_type,
          ML.QUANTILE_BUCKETIZE(member_birth_year, 10) OVER() AS bucketized_age,
          member_gender
        )
        OPTIONS(model_type='logistic_reg', 
                auto_class_weights=True,
                input_label_cols=['trip_type']) AS

        SELECT
          start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
          IF(duration_sec > 3600*4, 'Long', 'Typical') AS trip_type
        FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`
    """
    print(query)
    return ddlop(project_id, query, 'US')

def create_training_data(ddlop, project_id, model_name, segment):
    query = """
        CREATE OR REPLACE TABLE mlpatterns.{0}_trips AS
        SELECT 
          * EXCEPT(predicted_trip_type_probs, predicted_trip_type)
        FROM
        ML.PREDICT(MODEL {1}, -- mlpatterns.classify_trips
          (SELECT
          start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
          ST_Distance(start_station_geom, end_station_geom) AS distance
          FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`)
        )
        WHERE predicted_trip_type = '{0}' AND distance IS NOT NULL
    """.format(segment, model_name)
    print(query)
    return ddlop(project_id, query, 'US')

def train_distance_model(ddlop, project_id, train_table_name, segment):
    query = """
        CREATE OR REPLACE MODEL mlpatterns.predict_distance_{0}
        TRANSFORM(
          distance,
          EXTRACT (HOUR FROM start_date) AS start_hour,
          EXTRACT (DAYOFWEEK FROM start_date) AS day_of_week,
          start_station_name,
          subscriber_type,
          ML.QUANTILE_BUCKETIZE(member_birth_year, 10) OVER() AS bucketized_age,
          member_gender
        )
        OPTIONS(model_type='linear_reg', input_label_cols=['distance']) AS

        SELECT
          *
        FROM 
          {1} -- mlpatterns.{0}_trips
        
    """.format(segment, train_table_name)
    print(query)
    return ddlop(project_id, query, 'US')


def evaluate(project_id: str,
             classification_model: str, typical_trip_model: str, long_trip_model: str) -> float:
    query = """
        WITH input_data AS (
           SELECT start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
                  ST_Distance(start_station_geom, end_station_geom) AS distance
           FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`
        ),

        classified AS (
        SELECT 
          * EXCEPT(predicted_trip_type_probs)
        FROM ML.PREDICT(
          MODEL {0},
          (SELECT * from input_data)
        )
        ),

        evals AS (

        SELECT
          distance, predicted_distance
        FROM ML.PREDICT(
          MODEL {1},
          (SELECT * from classified WHERE predicted_trip_type = 'Typical')
        )
        UNION ALL
        SELECT
          distance, predicted_distance
        FROM ML.PREDICT(
          MODEL {2},
          (SELECT * from classified WHERE predicted_trip_type = 'Long')
        )

        )

        SELECT
           APPROX_QUANTILES(ABS(distance - predicted_distance), 10)[OFFSET(5)] AS median_absolute_error
        FROM
           evals
    """.format(classification_model, typical_trip_model, long_trip_model)
    print(query)
    from google.cloud import bigquery
    bqclient = bigquery.Client(project=project_id, location='US')
    df = bqclient.query(query).result().to_dataframe()
    return df['median_absolute_error'][0]


@dsl.pipeline(
    name='Cascade pipeline on SF bikeshare',
    description='Cascade pipeline on SF bikeshare'
)
def cascade_pipeline(
    project_id = PROJECT_ID
):
    ddlop = comp.func_to_container_op(run_bigquery_ddl, packages_to_install=['google-cloud-bigquery'])
        
    c1 = train_classification_model(ddlop, PROJECT_ID)
    c1_model_name = c1.outputs['created_table']
    
    c2a_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Typical')
    c2b_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Long')
    
    c3a_model = train_distance_model(ddlop, PROJECT_ID, c2a_input.outputs['created_table'], 'Typical')
    c3b_model = train_distance_model(ddlop, PROJECT_ID, c2b_input.outputs['created_table'], 'Long')
    
    evalop = comp.func_to_container_op(evaluate, packages_to_install=['google-cloud-bigquery', 'pandas'])
    error = evalop(PROJECT_ID, c1_model_name, c3a_model.outputs['created_table'], c3b_model.outputs['created_table'])
    print(error.output)

In [10]:
pipeline_func = cascade_pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

#Specify pipeline argument values
arguments = {}

#Get or create an experiment and submit a pipeline run
client = kfp.Client(KFPHOST)
experiment = client.create_experiment('cascade_experiment')

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)


        CREATE OR REPLACE MODEL mlpatterns.classify_trips
        TRANSFORM(
          trip_type,
          EXTRACT (HOUR FROM start_date) AS start_hour,
          EXTRACT (DAYOFWEEK FROM start_date) AS day_of_week,
          start_station_name,
          subscriber_type,
          ML.QUANTILE_BUCKETIZE(member_birth_year, 10) OVER() AS bucketized_age,
          member_gender
        )
        OPTIONS(model_type='logistic_reg', 
                auto_class_weights=True,
                input_label_cols=['trip_type']) AS

        SELECT
          start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
          IF(duration_sec > 3600*4, 'Long', 'Typical') AS trip_type
        FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`
    

        CREATE OR REPLACE TABLE mlpatterns.Typical_trips AS
        SELECT 
          * EXCEPT(predicted_trip_type_probs, predicted_trip_type)
        FROM
        ML.PREDICT(MODEL {{pipelineparam:op=Run bigquery ddl;name=created_table}}, -- mlpatterns.classify_trips
          (SELECT
          start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
          ST_Distance(start_station_geom, end_station_geom) AS distance
          FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`)
        )
        WHERE predicted_trip_type = 'Typical' AND distance IS NOT NULL
    

        CREATE OR REPLACE TABLE mlpatterns.Long_trips AS
        SELECT 
          * EXCEPT(predicted_trip_type_probs, predicted_trip_type)
        FROM
        ML.PREDICT(MODEL {{pipelineparam:op=Run bigquery ddl;name=created_table}}, -- mlpatterns.classify_trips
          (SELECT
          start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
          ST_Distance(start_station_geom, end_station_geom) AS distance
          FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`)
        )
        WHERE predicted_trip_type = 'Long' AND distance IS NOT NULL
    

        CREATE OR REPLACE MODEL mlpatterns.predict_distance_Typical
        TRANSFORM(
          distance,
          EXTRACT (HOUR FROM start_date) AS start_hour,
          EXTRACT (DAYOFWEEK FROM start_date) AS day_of_week,
          start_station_name,
          subscriber_type,
          ML.QUANTILE_BUCKETIZE(member_birth_year, 10) OVER() AS bucketized_age,
          member_gender
        )
        OPTIONS(model_type='linear_reg', input_label_cols=['distance']) AS

        SELECT
          *
        FROM 
          {{pipelineparam:op=Run bigquery ddl 2;name=created_table}} -- mlpatterns.Typical_trips
        
    

        CREATE OR REPLACE MODEL mlpatterns.predict_distance_Long
        TRANSFORM(
          distance,
          EXTRACT (HOUR FROM start_date) AS start_hour,
          EXTRACT (DAYOFWEEK FROM start_date) AS day_of_week,
          start_station_name,
          subscriber_type,
          ML.QUANTILE_BUCKETIZE(member_birth_year, 10) OVER() AS bucketized_age,
          member_gender
        )
        OPTIONS(model_type='linear_reg', input_label_cols=['distance']) AS

        SELECT
          *
        FROM 
          {{pipelineparam:op=Run bigquery ddl 3;name=created_table}} -- mlpatterns.Long_trips
        
    
{{pipelineparam:op=Evaluate;name=output}}
Experiment link here
Run link here

Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License