This notebook is a simple example of how to make a component with multiple outputs using the Pipelines SDK.
This pipeline requires you to setup a notebook server in the Kubeflow UI. After you are setup, upload the notebook in the Kubeflow UI and then run it in the notebook server.
This pipeline requires a GCS bucket. If you haven't already, create a GCS bucket to run the notebook. Make sure to create the storage bucket in the same project that you are running Kubeflow on to have the proper permissions by default. You can also create a GCS bucket by running gsutil mb -p <project_name> gs://<bucket_name>.
In order to run this pipeline, make sure to upload the notebook to your notebook server in the Kubeflow UI. You can clone this repo in the Jupyter notebook server by connecting to the notebook server and then selecting New > Terminal. In the terminal type git clone https://github.com/kubeflow/pipelines.git.
Install the kfp package if you haven't already.
In [ ]:
!python3 -m pip install 'kfp>=0.1.31' --quiet
In [ ]:
output = 'gs://[BUCKET-NAME]' # GCS bucket name
project_id = '[PROJECT-NAME]' # GCP project name
In [ ]:
import os
STAGING_GCS_PATH = os.path.join(output, 'multiple-output-sample')
TARGET_IMAGE = 'gcr.io/%s/multi-output:latest' % project_id
BASE_IMAGE = 'tensorflow/tensorflow:1.11.0-py3'
In [ ]:
import kfp
import kfp.dsl as dsl
from kfp import compiler
from typing import NamedTuple
In [ ]:
def product_sum(a: float, b: float) -> NamedTuple(
'output', [('product', float), ('sum', float)]):
'''Returns the product and sum of two numbers'''
from collections import namedtuple
product_sum_output = namedtuple('output', ['product', 'sum'])
return product_sum_output(a*b, a+b)
In [ ]:
product_sum_op = compiler.build_python_component(
component_func=product_sum,
staging_gcs_path=STAGING_GCS_PATH,
base_image=BASE_IMAGE,
target_image=TARGET_IMAGE)
In [ ]:
@dsl.pipeline(
name='Multiple Outputs Pipeline',
description='Sample pipeline to showcase multiple outputs'
)
def pipeline(a=2.0, b=2.5, c=3.0):
prod_sum_task = product_sum_op(a, b)
prod_sum_task2 = product_sum_op(b, c)
prod_sum_task3 = product_sum_op(prod_sum_task.outputs['product'],
prod_sum_task2.outputs['sum'])
In [ ]:
arguments = {
'a': 2.0,
'b': 2.5,
'c': 3.0,
}
run_result = kfp.Client().create_run_from_pipeline_func(pipeline, arguments=arguments)