Lightweight python components

Lightweight python components do not require you to build a new container image for every code change. They're intended to use for fast iteration in notebook environment.

Building a lightweight python component

To build a component just define a stand-alone python function and then call kfp.components.func_to_container_op(func) to convert it to a component that can be used in a pipeline.

There are several requirements for the function:

  • The function should be stand-alone. It should not use any code declared outside of the function definition. Any imports should be added inside the main function. Any helper functions should also be defined inside the main function.
  • The function can only import packages that are available in the base image. If you need to import a package that's not available you can try to find a container image that already includes the required packages. (As a workaround you can use the module subprocess to run pip install for the required package. There is an example below in my_divmod function.)
  • If the function operates on numbers, the parameters need to have type hints. Supported types are [int, float, bool]. Everything else is passed as string.
  • To build a component with multiple output values, use the typing.NamedTuple type hint syntax: NamedTuple('MyFunctionOutputs', [('output_name_1', type), ('output_name_2', float)])

In [ ]:


In [ ]:
# Install the SDK
#!pip3 install 'kfp>=0.1.31.2' --quiet

In [1]:
import kfp
import kfp.components as comp

Simple function that just add two numbers:


In [2]:
#Define a Python function
def add(a: float, b: float) -> float:
   '''Calculates sum of two arguments'''
   return a + b

Convert the function to a pipeline operation


In [3]:
add_op = comp.func_to_container_op(add)

A bit more advanced function which demonstrates how to use imports, helper functions and produce multiple outputs.


In [4]:
#Advanced function
#Demonstrates imports, helper functions and multiple outputs
from typing import NamedTuple
def my_divmod(dividend: float, divisor:float) -> NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float), ('mlpipeline_ui_metadata', 'UI_metadata'), ('mlpipeline_metrics', 'Metrics')]):
    '''Divides two numbers and calculate  the quotient and remainder'''
    #Pip installs inside a component function.
    #NOTE: installs should be placed right at the beginning to avoid upgrading a package
    # after it has already been imported and cached by python
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'tensorflow==1.8.0'])
    
    #Imports inside a component function:
    import numpy as np

    #This function demonstrates how to use nested functions inside a component function:
    def divmod_helper(dividend, divisor):
        return np.divmod(dividend, divisor)

    (quotient, remainder) = divmod_helper(dividend, divisor)

    from tensorflow.python.lib.io import file_io
    import json
    
    # Exports a sample tensorboard:
    metadata = {
      'outputs' : [{
        'type': 'tensorboard',
        'source': 'gs://ml-pipeline-dataset/tensorboard-train',
      }]
    }

    # Exports two sample metrics:
    metrics = {
      'metrics': [{
          'name': 'quotient',
          'numberValue':  float(quotient),
        },{
          'name': 'remainder',
          'numberValue':  float(remainder),
        }]}

    from collections import namedtuple
    divmod_output = namedtuple('MyDivmodOutput', ['quotient', 'remainder', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return divmod_output(quotient, remainder, json.dumps(metadata), json.dumps(metrics))

Test running the python function directly


In [5]:
my_divmod(100, 7)


Out[5]:
MyDivmodOutput(quotient=14, remainder=2)

Convert the function to a pipeline operation

You can specify an alternative base container image (the image needs to have Python 3.5+ installed).


In [6]:
divmod_op = comp.func_to_container_op(my_divmod, base_image='tensorflow/tensorflow:1.11.0-py3')

Define the pipeline

Pipeline function has to be decorated with the @dsl.pipeline decorator


In [7]:
import kfp.dsl as dsl
@dsl.pipeline(
   name='Calculation pipeline',
   description='A toy pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
   a='a',
   b='7',
   c='17',
):
    #Passing pipeline parameter and a constant value as operation arguments
    add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance. 
    
    #Passing a task output reference as operation arguments
    #For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
    divmod_task = divmod_op(add_task.output, b)

    #For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax
    result_task = add_op(divmod_task.outputs['quotient'], c)

Submit the pipeline for execution


In [ ]:
#Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}

#Submit a pipeline run
kfp.Client().create_run_from_pipeline_func(calc_pipeline, arguments=arguments)

# Run the pipeline on a separate Kubeflow Cluster instead
# (use if your notebook is not running in Kubeflow - e.x. if using AI Platform Notebooks)
# kfp.Client(host='<ADD KFP ENDPOINT HERE>').create_run_from_pipeline_func(calc_pipeline, arguments=arguments)

#vvvvvvvvv This link leads to the run information page. (Note: There is a bug in JupyterLab that modifies the URL and makes the link stop working)