Lightweight python components do not require you to build a new container image for every code change. They're intended to use for fast iteration in notebook environment.
To build a component just define a stand-alone python function and then call kfp.components.func_to_container_op(func) to convert it to a component that can be used in a pipeline.
There are several requirements for the function:
[int, float, bool]
. Everything else is passed as string.NamedTuple('MyFunctionOutputs', [('output_name_1', type), ('output_name_2', float)])
In [ ]:
In [ ]:
# Install the SDK
#!pip3 install 'kfp>=0.1.31.2' --quiet
In [1]:
import kfp
import kfp.components as comp
Simple function that just add two numbers:
In [2]:
#Define a Python function
def add(a: float, b: float) -> float:
'''Calculates sum of two arguments'''
return a + b
Convert the function to a pipeline operation
In [3]:
add_op = comp.func_to_container_op(add)
A bit more advanced function which demonstrates how to use imports, helper functions and produce multiple outputs.
In [4]:
#Advanced function
#Demonstrates imports, helper functions and multiple outputs
from typing import NamedTuple
def my_divmod(dividend: float, divisor:float) -> NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float), ('mlpipeline_ui_metadata', 'UI_metadata'), ('mlpipeline_metrics', 'Metrics')]):
'''Divides two numbers and calculate the quotient and remainder'''
#Pip installs inside a component function.
#NOTE: installs should be placed right at the beginning to avoid upgrading a package
# after it has already been imported and cached by python
import sys, subprocess;
subprocess.run([sys.executable, '-m', 'pip', 'install', 'tensorflow==1.8.0'])
#Imports inside a component function:
import numpy as np
#This function demonstrates how to use nested functions inside a component function:
def divmod_helper(dividend, divisor):
return np.divmod(dividend, divisor)
(quotient, remainder) = divmod_helper(dividend, divisor)
from tensorflow.python.lib.io import file_io
import json
# Exports a sample tensorboard:
metadata = {
'outputs' : [{
'type': 'tensorboard',
'source': 'gs://ml-pipeline-dataset/tensorboard-train',
}]
}
# Exports two sample metrics:
metrics = {
'metrics': [{
'name': 'quotient',
'numberValue': float(quotient),
},{
'name': 'remainder',
'numberValue': float(remainder),
}]}
from collections import namedtuple
divmod_output = namedtuple('MyDivmodOutput', ['quotient', 'remainder', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
return divmod_output(quotient, remainder, json.dumps(metadata), json.dumps(metrics))
Test running the python function directly
In [5]:
my_divmod(100, 7)
Out[5]:
In [6]:
divmod_op = comp.func_to_container_op(my_divmod, base_image='tensorflow/tensorflow:1.11.0-py3')
In [7]:
import kfp.dsl as dsl
@dsl.pipeline(
name='Calculation pipeline',
description='A toy pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
a='a',
b='7',
c='17',
):
#Passing pipeline parameter and a constant value as operation arguments
add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance.
#Passing a task output reference as operation arguments
#For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
divmod_task = divmod_op(add_task.output, b)
#For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax
result_task = add_op(divmod_task.outputs['quotient'], c)
In [ ]:
#Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}
#Submit a pipeline run
kfp.Client().create_run_from_pipeline_func(calc_pipeline, arguments=arguments)
# Run the pipeline on a separate Kubeflow Cluster instead
# (use if your notebook is not running in Kubeflow - e.x. if using AI Platform Notebooks)
# kfp.Client(host='<ADD KFP ENDPOINT HERE>').create_run_from_pipeline_func(calc_pipeline, arguments=arguments)
#vvvvvvvvv This link leads to the run information page. (Note: There is a bug in JupyterLab that modifies the URL and makes the link stop working)