In [ ]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

Lightweight Python Components

To build a component, define a standalone python function and then call kfp.components.func_to_container_op(func) to convert your function to a component that can be used in a pipeline.

There are several requirements for the function:

  • The function should be standalone. It should not use any code declared outside of the function definition. Any imports should be added inside the main function. Any helper functions must be defined inside the main function.
  • The function can only import packages that are available in the base image. If you need to import a package that's not available, you can try to find a container image that already includes the required packages.
  • If the function operates on numbers, the parameters need to have type hints. Supported types are [int, float, bool]. Everything else is passed as string.
  • To build a component with multiple output values, use the typing.NamedTuple type hint syntax: NamedTuple('MyFunctionOutputs', [('output_name_1', type), ('output_name_2', float)])

In [ ]:
import kfp
import kfp.gcp as gcp
import kfp.dsl as dsl
import kfp.compiler as compiler
import kfp.components as comp

import kubernetes as k8s

In [ ]:
# Required Parameters
PROJECT_ID='<ADD GCP PROJECT HERE>'
GCS_BUCKET='gs://<ADD STORAGE LOCATION HERE>'

Create client

If you run this notebook outside of a Kubeflow cluster, run the following command:

  • host: The URL of your Kubeflow Pipelines instance, for example "https://<your-deployment>.endpoints.<your-project>.cloud.goog/pipeline"
  • client_id: The client ID used by Identity-Aware Proxy
  • other_client_id: The client ID used to obtain the auth codes and refresh tokens.
  • other_client_secret: The client secret used to obtain the auth codes and refresh tokens.
client = kfp.Client(host, client_id, other_client_id, other_client_secret)

If you run this notebook within a Kubeflow cluster, run the following command:

client = kfp.Client()

You'll need to create OAuth client ID credentials of type Other to get other_client_id and other_client_secret. Learn more about creating OAuth credentials


In [ ]:
# Optional Parameters, but required for running outside Kubeflow cluster

# The host for 'AI Platform Pipelines' ends with 'pipelines.googleusercontent.com'
# The host for pipeline endpoint of 'full Kubeflow deployment' ends with '/pipeline'
# Examples are:
# https://7c021d0340d296aa-dot-us-central2.pipelines.googleusercontent.com
# https://kubeflow.endpoints.kubeflow-pipeline.cloud.goog/pipeline
HOST = '<ADD HOST NAME TO TALK TO KUBEFLOW PIPELINE HERE>'

# For 'full Kubeflow deployment' on GCP, the endpoint is usually protected through IAP, therefore the following 
# will be needed to access the endpoint.
CLIENT_ID = '<ADD OAuth CLIENT ID USED BY IAP HERE>'
OTHER_CLIENT_ID = '<ADD OAuth CLIENT ID USED TO OBTAIN AUTH CODES HERE>'
OTHER_CLIENT_SECRET = '<ADD OAuth CLIENT SECRET USED TO OBTAIN AUTH CODES HERE>'

In [ ]:
# This is to ensure the proper access token is present to reach the end point for 'AI Platform Pipelines'
# If you are not working with 'AI Platform Pipelines', this step is not necessary
! gcloud auth print-access-token

In [ ]:
# Create kfp client
in_cluster = True
try:
  k8s.config.load_incluster_config()
except:
  in_cluster = False
  pass

if in_cluster:
    client = kfp.Client()
else:
    if HOST.endswith('googleusercontent.com'):
        CLIENT_ID = None
        OTHER_CLIENT_ID = None
        OTHER_CLIENT_SECRET = None

    client = kfp.Client(host=HOST, 
                        client_id=CLIENT_ID,
                        other_client_id=OTHER_CLIENT_ID, 
                        other_client_secret=OTHER_CLIENT_SECRET)

Start with a simple function


In [ ]:
#Define a Python function
def add(a: float, b: float) -> float:
   '''Calculates sum of two arguments'''
   return a + b

Convert the function to a pipeline operation


In [ ]:
add_op = comp.func_to_container_op(add)

A more complex example, with multiple outputs


In [ ]:
# Advanced function
# Demonstrates imports, helper functions and multiple outputs
from typing import NamedTuple

def my_divmod(dividend: float, 
              divisor: float,
             ) -> NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float), 
                                                ('mlpipeline_ui_metadata', 'UI_metadata'), 
                                                ('mlpipeline_metrics', 'Metrics')]):
    
    '''Divides two numbers and calculate  the quotient and remainder'''
    
    #Imports inside a component function:
    import numpy as np

    #This function demonstrates how to use nested functions inside a component function:
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)

    (quotient, remainder) = divmod_helper(dividend, divisor)

    import json
    
    # Exports a sample tensorboard:
    metadata = {
      'outputs' : [{
        'type': 'tensorboard',
        'source': 'gs://ml-pipeline-dataset/tensorboard-train',
      }]
    }

    # Exports two sample metrics:
    metrics = {
      'metrics': [{
          'name': 'quotient',
          'numberValue':  float(quotient),
        },{
          'name': 'remainder',
          'numberValue':  float(remainder),
        }]}

    from collections import namedtuple
    divmod_output = namedtuple('MyDivmodOutput', 
                               ['quotient', 'remainder', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return divmod_output(quotient, remainder, json.dumps(metadata), json.dumps(metrics))

In [ ]:
my_divmod(100, 7)

In [ ]:
divmod_op = comp.func_to_container_op(func=my_divmod, 
                                      base_image="tensorflow/tensorflow:1.15.0-py3")

Define the pipeline


In [ ]:
import kfp.dsl as dsl
@dsl.pipeline(
    name='Calculation pipeline',
    description='A toy pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
    a='a',
    b='7',
    c='17',
):
    #Passing pipeline parameter and a constant value as operation arguments
    add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance. 
    
    #Passing a task output reference as operation arguments
    #For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
    divmod_task = divmod_op(add_task.output, b)

    #For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax
    result_task = add_op(divmod_task.outputs['quotient'], c)

Submit the pipeline


In [ ]:
pipeline_func = calc_pipeline

In [ ]:
experiment_name = 'python-functions'

#Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}

run_name = pipeline_func.__name__ + ' run'

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

Train a keras model

This following steps trains a neural network model to classify hand writing images using the MNIST dataset.


In [ ]:
def mnist_train(model_file: str, bucket: str) -> str:

    from datetime import datetime
    import tensorflow as tf
    
    model = tf.keras.models.Sequential([
      tf.keras.layers.Flatten(input_shape=(28, 28)),
      tf.keras.layers.Dense(512, activation=tf.nn.relu),
      tf.keras.layers.Dropout(0.2),
      tf.keras.layers.Dense(10, activation=tf.nn.softmax)
    ])
    
    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    
    print(model.summary())    
    
    mnist = tf.keras.datasets.mnist
    (x_train, y_train),(x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    callbacks = [
      tf.keras.callbacks.TensorBoard(log_dir=bucket + '/logs/' + datetime.now().date().__str__()),
      # Interrupt training if `val_loss` stops improving for over 2 epochs
      tf.keras.callbacks.EarlyStopping(patience=2, monitor='val_loss'),
    ]
    
    model.fit(x_train, y_train, batch_size=32, epochs=5, callbacks=callbacks,
              validation_data=(x_test, y_test))
    
    
    model.save(model_file)
    
    from tensorflow import gfile
    
    gcs_path = bucket + "/" + model_file
    
    if gfile.Exists(gcs_path):
        gfile.Remove(gcs_path)
    
    gfile.Copy(model_file, gcs_path)
    
    return gcs_path

In [ ]:
mnist_train(model_file='mnist_model.h5', 
             bucket=GCS_BUCKET)

In [ ]:
model_train_op = comp.func_to_container_op(func=mnist_train, 
                                           base_image="tensorflow/tensorflow:1.15.0-py3")

Define and submit the pipeline


In [ ]:
@dsl.pipeline(
   name='Mnist pipeline',
   description='A toy pipeline that performs mnist model training.'
)
def mnist_pipeline(
    model_file: str = 'mnist_model.h5', 
    bucket: str = GCS_BUCKET
):
    model_train_op(model_file=model_file, bucket=bucket).apply(gcp.use_gcp_secret('user-gcp-sa'))

Submit a pipeline run


In [ ]:
pipeline_func = mnist_pipeline

In [ ]:
experiment_name = 'minist_kubeflow'

arguments = {"model_file":"mnist_model.h5",
             "bucket":GCS_BUCKET}

run_name = pipeline_func.__name__ + ' run'

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

As an alternative, you can compile the pipeline into a package. The compiled pipeline can be easily shared and reused by others to run the pipeline.

pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)

experiment = client.create_experiment('python-functions-mnist')

run_result = client.run_pipeline(
    experiment_id=experiment.id, 
    job_name=run_name, 
    pipeline_package_path=pipeline_filename, 
    params=arguments)

In [ ]: