In [ ]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
To build a component, define a standalone python function and then call kfp.components.func_to_container_op(func)
to convert your function to a component that can be used in a pipeline.
There are several requirements for the function:
NamedTuple('MyFunctionOutputs', [('output_name_1', type), ('output_name_2', float)])
In [ ]:
import kfp
import kfp.gcp as gcp
import kfp.dsl as dsl
import kfp.compiler as compiler
import kfp.components as comp
import kubernetes as k8s
In [ ]:
# Required Parameters
PROJECT_ID='<ADD GCP PROJECT HERE>'
GCS_BUCKET='gs://<ADD STORAGE LOCATION HERE>'
If you run this notebook outside of a Kubeflow cluster, run the following command:
host
: The URL of your Kubeflow Pipelines instance, for example "https://<your-deployment>
.endpoints.<your-project>
.cloud.goog/pipeline"client_id
: The client ID used by Identity-Aware Proxyother_client_id
: The client ID used to obtain the auth codes and refresh tokens.other_client_secret
: The client secret used to obtain the auth codes and refresh tokens.client = kfp.Client(host, client_id, other_client_id, other_client_secret)
If you run this notebook within a Kubeflow cluster, run the following command:
client = kfp.Client()
You'll need to create OAuth client ID credentials of type Other
to get other_client_id
and other_client_secret
. Learn more about creating OAuth credentials
In [ ]:
# Optional Parameters, but required for running outside Kubeflow cluster
# The host for 'AI Platform Pipelines' ends with 'pipelines.googleusercontent.com'
# The host for pipeline endpoint of 'full Kubeflow deployment' ends with '/pipeline'
# Examples are:
# https://7c021d0340d296aa-dot-us-central2.pipelines.googleusercontent.com
# https://kubeflow.endpoints.kubeflow-pipeline.cloud.goog/pipeline
HOST = '<ADD HOST NAME TO TALK TO KUBEFLOW PIPELINE HERE>'
# For 'full Kubeflow deployment' on GCP, the endpoint is usually protected through IAP, therefore the following
# will be needed to access the endpoint.
CLIENT_ID = '<ADD OAuth CLIENT ID USED BY IAP HERE>'
OTHER_CLIENT_ID = '<ADD OAuth CLIENT ID USED TO OBTAIN AUTH CODES HERE>'
OTHER_CLIENT_SECRET = '<ADD OAuth CLIENT SECRET USED TO OBTAIN AUTH CODES HERE>'
In [ ]:
# This is to ensure the proper access token is present to reach the end point for 'AI Platform Pipelines'
# If you are not working with 'AI Platform Pipelines', this step is not necessary
! gcloud auth print-access-token
In [ ]:
# Create kfp client
in_cluster = True
try:
k8s.config.load_incluster_config()
except:
in_cluster = False
pass
if in_cluster:
client = kfp.Client()
else:
if HOST.endswith('googleusercontent.com'):
CLIENT_ID = None
OTHER_CLIENT_ID = None
OTHER_CLIENT_SECRET = None
client = kfp.Client(host=HOST,
client_id=CLIENT_ID,
other_client_id=OTHER_CLIENT_ID,
other_client_secret=OTHER_CLIENT_SECRET)
In [ ]:
#Define a Python function
def add(a: float, b: float) -> float:
'''Calculates sum of two arguments'''
return a + b
Convert the function to a pipeline operation
In [ ]:
add_op = comp.func_to_container_op(add)
In [ ]:
# Advanced function
# Demonstrates imports, helper functions and multiple outputs
from typing import NamedTuple
def my_divmod(dividend: float,
divisor: float,
) -> NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float),
('mlpipeline_ui_metadata', 'UI_metadata'),
('mlpipeline_metrics', 'Metrics')]):
'''Divides two numbers and calculate the quotient and remainder'''
#Imports inside a component function:
import numpy as np
#This function demonstrates how to use nested functions inside a component function:
def divmod_helper(dividend, divisor):
return np.divmod(dividend, divisor)
(quotient, remainder) = divmod_helper(dividend, divisor)
import json
# Exports a sample tensorboard:
metadata = {
'outputs' : [{
'type': 'tensorboard',
'source': 'gs://ml-pipeline-dataset/tensorboard-train',
}]
}
# Exports two sample metrics:
metrics = {
'metrics': [{
'name': 'quotient',
'numberValue': float(quotient),
},{
'name': 'remainder',
'numberValue': float(remainder),
}]}
from collections import namedtuple
divmod_output = namedtuple('MyDivmodOutput',
['quotient', 'remainder', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
return divmod_output(quotient, remainder, json.dumps(metadata), json.dumps(metrics))
In [ ]:
my_divmod(100, 7)
In [ ]:
divmod_op = comp.func_to_container_op(func=my_divmod,
base_image="tensorflow/tensorflow:1.15.0-py3")
In [ ]:
import kfp.dsl as dsl
@dsl.pipeline(
name='Calculation pipeline',
description='A toy pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
a='a',
b='7',
c='17',
):
#Passing pipeline parameter and a constant value as operation arguments
add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance.
#Passing a task output reference as operation arguments
#For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
divmod_task = divmod_op(add_task.output, b)
#For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax
result_task = add_op(divmod_task.outputs['quotient'], c)
In [ ]:
pipeline_func = calc_pipeline
In [ ]:
experiment_name = 'python-functions'
#Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}
run_name = pipeline_func.__name__ + ' run'
# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func,
experiment_name=experiment_name,
run_name=run_name,
arguments=arguments)
This following steps trains a neural network model to classify hand writing images using the MNIST dataset.
In [ ]:
def mnist_train(model_file: str, bucket: str) -> str:
from datetime import datetime
import tensorflow as tf
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(512, activation=tf.nn.relu),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
print(model.summary())
mnist = tf.keras.datasets.mnist
(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=bucket + '/logs/' + datetime.now().date().__str__()),
# Interrupt training if `val_loss` stops improving for over 2 epochs
tf.keras.callbacks.EarlyStopping(patience=2, monitor='val_loss'),
]
model.fit(x_train, y_train, batch_size=32, epochs=5, callbacks=callbacks,
validation_data=(x_test, y_test))
model.save(model_file)
from tensorflow import gfile
gcs_path = bucket + "/" + model_file
if gfile.Exists(gcs_path):
gfile.Remove(gcs_path)
gfile.Copy(model_file, gcs_path)
return gcs_path
In [ ]:
mnist_train(model_file='mnist_model.h5',
bucket=GCS_BUCKET)
In [ ]:
model_train_op = comp.func_to_container_op(func=mnist_train,
base_image="tensorflow/tensorflow:1.15.0-py3")
In [ ]:
@dsl.pipeline(
name='Mnist pipeline',
description='A toy pipeline that performs mnist model training.'
)
def mnist_pipeline(
model_file: str = 'mnist_model.h5',
bucket: str = GCS_BUCKET
):
model_train_op(model_file=model_file, bucket=bucket).apply(gcp.use_gcp_secret('user-gcp-sa'))
In [ ]:
pipeline_func = mnist_pipeline
In [ ]:
experiment_name = 'minist_kubeflow'
arguments = {"model_file":"mnist_model.h5",
"bucket":GCS_BUCKET}
run_name = pipeline_func.__name__ + ' run'
# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func,
experiment_name=experiment_name,
run_name=run_name,
arguments=arguments)
As an alternative, you can compile the pipeline into a package. The compiled pipeline can be easily shared and reused by others to run the pipeline.
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)
experiment = client.create_experiment('python-functions-mnist')
run_result = client.run_pipeline(
experiment_id=experiment.id,
job_name=run_name,
pipeline_package_path=pipeline_filename,
params=arguments)
In [ ]: