This pipeline defines two simple math operation: add
adds two input arguments and returns the sum. my_divmod
divides two input arguments and returns the quotient and remainder. You can visit Build Lightweight Python Components for this pipeline's detail.
In [ ]:
pipeline_name = "lightweight-python-demo"
In [ ]:
# Install the dependency packages
!pip install --upgrade pip
!pip install numpy tensorflow
In [ ]:
import kfp
import kfp.components as comp
Simple function that just add two numbers:
In [ ]:
#Define a Python function
def add(a: float, b: float) -> float:
'''Calculates sum of two arguments'''
return a + b
Convert the function to a pipeline operation
In [ ]:
add_op = comp.func_to_container_op(add)
A bit more advanced function which demonstrates how to use imports, helper functions and produce multiple outputs.
In [ ]:
#Advanced function
#Demonstrates imports, helper functions and multiple outputs
from typing import NamedTuple
def my_divmod(dividend: float, divisor:float) -> NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float), ('mlpipeline_ui_metadata', 'UI_metadata'), ('mlpipeline_metrics', 'Metrics')]):
'''Divides two numbers and calculate the quotient and remainder'''
#Pip installs inside a component function.
#NOTE: installs should be placed right at the beginning to avoid upgrading a package
# after it has already been imported and cached by python
import sys, subprocess;
subprocess.run([sys.executable, '-m', 'pip', 'install', 'tensorflow==1.8.0'])
#Imports inside a component function:
import numpy as np
#This function demonstrates how to use nested functions inside a component function:
def divmod_helper(dividend, divisor):
return np.divmod(dividend, divisor)
(quotient, remainder) = divmod_helper(dividend, divisor)
from tensorflow.python.lib.io import file_io
import json
# Exports a sample tensorboard:
metadata = {
'outputs' : [{
'type': 'tensorboard',
'source': 'gs://ml-pipeline-dataset/tensorboard-train',
}]
}
# Exports two sample metrics:
metrics = {
'metrics': [{
'name': 'quotient',
'numberValue': float(quotient),
},{
'name': 'remainder',
'numberValue': float(remainder),
}]}
from collections import namedtuple
divmod_output = namedtuple('MyDivmodOutput', ['quotient', 'remainder', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
return divmod_output(quotient, remainder, json.dumps(metadata), json.dumps(metrics))
Test running the python function directly
In [ ]:
my_divmod(100, 7)
In [ ]:
divmod_op = comp.func_to_container_op(my_divmod, base_image='tensorflow/tensorflow:1.11.0-py3')
In [ ]:
import kfp.dsl as dsl
@dsl.pipeline(
name='Calculation pipeline',
description='A toy pipeline that performs arithmetic calculations.'
)
# Currently kfp-tekton doesn't support pass parameter to the pipelinerun yet, so we hard code the number here
def calc_pipeline(
name=pipeline_name,
namespace=user_namespace,
a='7',
b='8',
c='17',
):
#Passing pipeline parameter and a constant value as operation arguments
add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance.
#Passing a task output reference as operation arguments
#For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
divmod_task = divmod_op(add_task.output, b)
#For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax
result_task = add_op(divmod_task.outputs['quotient'], c)
Compile and run the pipeline into Tekton yaml using kfp-tekton SDK
In [ ]:
# Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}
# Specify Kubeflow Pipeline Host
host=None
# Submit a pipeline run
from kfp_tekton import TektonClient
TektonClient(host=host).create_run_from_pipeline_func(calc_pipeline, arguments=arguments)
After successfully run this notebook, you can also see the result from Tekton
UI. You can follow tekton dashboard to install the Tekton
UI.
Here are some screenshots to show this pipeline example and result.
Tekton resources to show the PipelineRuns and TaskRuns
Tekton taskruns
In [ ]: