This notebook demonstrates using the Cascade design pattern to train a model to predict the distance that a bicycle will be ridden. Let's assume that distances on bikes held longer than 4 hours is important to us, but these are rare. So, we train a Cascade of ML models.
The first model classifies trips into Typical trips and Long trips. Then, we create two training datasets based on the prediction of the first model. Next, we train two regression models to predict distance. Finally, we combine the two models in order to evaluate the Cascade as a whole.
To try out this notebook:
In [1]:
# CHANGE the following settings
PROJECT_ID='ai-analytics-solutions'
KFPHOST='20844794c6e37538-dot-us-central2.pipelines.googleusercontent.com' # from settings button in CAIP Pipelines
In [2]:
!bq show mlpatterns || bq mk mlpatterns
In [3]:
import kfp
import kfp.components as comp
In [4]:
bigquery_query_op = comp.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/0e794e8a0eff6f81ddc857946ee8311c7c431ec2/components/gcp/bigquery/query/component.yaml')
help(bigquery_query_op)
In [9]:
import kfp.dsl as dsl
from typing import NamedTuple
import json
import os
def run_bigquery_ddl(project_id: str, query_string: str, location: str) -> NamedTuple(
'DDLOutput', [('created_table', str), ('query', str)]):
"""
Runs BigQuery query and returns a table/model name
"""
print(query_string)
from google.cloud import bigquery
from google.api_core.future import polling
from google.cloud import bigquery
from google.cloud.bigquery import retry as bq_retry
bqclient = bigquery.Client(project=project_id, location=location)
job = bqclient.query(query_string, retry=bq_retry.DEFAULT_RETRY)
job._retry = polling.DEFAULT_RETRY
while job.running():
from time import sleep
sleep(0.1)
print('Running ...')
tblname = job.ddl_target_table
tblname = '{}.{}'.format(tblname.dataset_id, tblname.table_id)
print('{} created in {}'.format(tblname, job.ended - job.started))
from collections import namedtuple
result_tuple = namedtuple('DDLOutput', ['created_table', 'query'])
return result_tuple(tblname, query_string)
def train_classification_model(ddlop, project_id):
query = """
CREATE OR REPLACE MODEL mlpatterns.classify_trips
TRANSFORM(
trip_type,
EXTRACT (HOUR FROM start_date) AS start_hour,
EXTRACT (DAYOFWEEK FROM start_date) AS day_of_week,
start_station_name,
subscriber_type,
ML.QUANTILE_BUCKETIZE(member_birth_year, 10) OVER() AS bucketized_age,
member_gender
)
OPTIONS(model_type='logistic_reg',
auto_class_weights=True,
input_label_cols=['trip_type']) AS
SELECT
start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
IF(duration_sec > 3600*4, 'Long', 'Typical') AS trip_type
FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`
"""
print(query)
return ddlop(project_id, query, 'US')
def create_training_data(ddlop, project_id, model_name, segment):
query = """
CREATE OR REPLACE TABLE mlpatterns.{0}_trips AS
SELECT
* EXCEPT(predicted_trip_type_probs, predicted_trip_type)
FROM
ML.PREDICT(MODEL {1}, -- mlpatterns.classify_trips
(SELECT
start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
ST_Distance(start_station_geom, end_station_geom) AS distance
FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`)
)
WHERE predicted_trip_type = '{0}' AND distance IS NOT NULL
""".format(segment, model_name)
print(query)
return ddlop(project_id, query, 'US')
def train_distance_model(ddlop, project_id, train_table_name, segment):
query = """
CREATE OR REPLACE MODEL mlpatterns.predict_distance_{0}
TRANSFORM(
distance,
EXTRACT (HOUR FROM start_date) AS start_hour,
EXTRACT (DAYOFWEEK FROM start_date) AS day_of_week,
start_station_name,
subscriber_type,
ML.QUANTILE_BUCKETIZE(member_birth_year, 10) OVER() AS bucketized_age,
member_gender
)
OPTIONS(model_type='linear_reg', input_label_cols=['distance']) AS
SELECT
*
FROM
{1} -- mlpatterns.{0}_trips
""".format(segment, train_table_name)
print(query)
return ddlop(project_id, query, 'US')
def evaluate(project_id: str,
classification_model: str, typical_trip_model: str, long_trip_model: str) -> float:
query = """
WITH input_data AS (
SELECT start_date, start_station_name, subscriber_type, member_birth_year, member_gender,
ST_Distance(start_station_geom, end_station_geom) AS distance
FROM `bigquery-public-data.san_francisco_bikeshare.bikeshare_trips`
),
classified AS (
SELECT
* EXCEPT(predicted_trip_type_probs)
FROM ML.PREDICT(
MODEL {0},
(SELECT * from input_data)
)
),
evals AS (
SELECT
distance, predicted_distance
FROM ML.PREDICT(
MODEL {1},
(SELECT * from classified WHERE predicted_trip_type = 'Typical')
)
UNION ALL
SELECT
distance, predicted_distance
FROM ML.PREDICT(
MODEL {2},
(SELECT * from classified WHERE predicted_trip_type = 'Long')
)
)
SELECT
APPROX_QUANTILES(ABS(distance - predicted_distance), 10)[OFFSET(5)] AS median_absolute_error
FROM
evals
""".format(classification_model, typical_trip_model, long_trip_model)
print(query)
from google.cloud import bigquery
bqclient = bigquery.Client(project=project_id, location='US')
df = bqclient.query(query).result().to_dataframe()
return df['median_absolute_error'][0]
@dsl.pipeline(
name='Cascade pipeline on SF bikeshare',
description='Cascade pipeline on SF bikeshare'
)
def cascade_pipeline(
project_id = PROJECT_ID
):
ddlop = comp.func_to_container_op(run_bigquery_ddl, packages_to_install=['google-cloud-bigquery'])
c1 = train_classification_model(ddlop, PROJECT_ID)
c1_model_name = c1.outputs['created_table']
c2a_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Typical')
c2b_input = create_training_data(ddlop, PROJECT_ID, c1_model_name, 'Long')
c3a_model = train_distance_model(ddlop, PROJECT_ID, c2a_input.outputs['created_table'], 'Typical')
c3b_model = train_distance_model(ddlop, PROJECT_ID, c2b_input.outputs['created_table'], 'Long')
evalop = comp.func_to_container_op(evaluate, packages_to_install=['google-cloud-bigquery', 'pandas'])
error = evalop(PROJECT_ID, c1_model_name, c3a_model.outputs['created_table'], c3b_model.outputs['created_table'])
print(error.output)
In [10]:
pipeline_func = cascade_pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)
#Specify pipeline argument values
arguments = {}
#Get or create an experiment and submit a pipeline run
client = kfp.Client(KFPHOST)
experiment = client.create_experiment('cascade_experiment')
#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License