Multiple outputs example

This notebook is a simple example of how to make a component with multiple outputs using the Pipelines SDK.

Before running notebook:

Setup notebook server

This pipeline requires you to setup a notebook server in the Kubeflow UI. After you are setup, upload the notebook in the Kubeflow UI and then run it in the notebook server.

Create a GCS bucket

This pipeline requires a GCS bucket. If you haven't already, create a GCS bucket to run the notebook. Make sure to create the storage bucket in the same project that you are running Kubeflow on to have the proper permissions by default. You can also create a GCS bucket by running gsutil mb -p <project_name> gs://<bucket_name>.

Upload the notebook in the Kubeflow UI

In order to run this pipeline, make sure to upload the notebook to your notebook server in the Kubeflow UI. You can clone this repo in the Jupyter notebook server by connecting to the notebook server and then selecting New > Terminal. In the terminal type git clone https://github.com/kubeflow/pipelines.git.

Install Kubeflow pipelines

Install the kfp package if you haven't already.


In [ ]:
!python3 -m pip install 'kfp>=0.1.31' --quiet

Setup project info and imports


In [ ]:
output = 'gs://[BUCKET-NAME]' # GCS bucket name
project_id = '[PROJECT-NAME]'   # GCP project name

In [ ]:
import os
STAGING_GCS_PATH = os.path.join(output, 'multiple-output-sample')
TARGET_IMAGE = 'gcr.io/%s/multi-output:latest' % project_id
BASE_IMAGE = 'tensorflow/tensorflow:1.11.0-py3'

In [ ]:
import kfp 
import kfp.dsl as dsl
from kfp import compiler
from typing import NamedTuple

Create component

In order to create a component with multiple outputs, use NamedTuple with the same syntax as below.


In [ ]:
def product_sum(a: float, b: float) -> NamedTuple(
        'output', [('product', float), ('sum', float)]):
    '''Returns the product and sum of two numbers'''
    from collections import namedtuple
    
    product_sum_output = namedtuple('output', ['product', 'sum'])
    return product_sum_output(a*b, a+b)

In [ ]:
product_sum_op = compiler.build_python_component(
    component_func=product_sum,
    staging_gcs_path=STAGING_GCS_PATH,
    base_image=BASE_IMAGE,
    target_image=TARGET_IMAGE)

Create and run pipeline

Create pipeline

The pipeline parameters are specified in the pipeline function signature.


In [ ]:
@dsl.pipeline(
    name='Multiple Outputs Pipeline',
    description='Sample pipeline to showcase multiple outputs'
)
def pipeline(a=2.0, b=2.5, c=3.0):
    prod_sum_task = product_sum_op(a, b)
    prod_sum_task2 = product_sum_op(b, c)
    prod_sum_task3 = product_sum_op(prod_sum_task.outputs['product'],
                                    prod_sum_task2.outputs['sum'])

Run pipeline


In [ ]:
arguments = {
    'a': 2.0,
    'b': 2.5,
    'c': 3.0,
}
run_result = kfp.Client().create_run_from_pipeline_func(pipeline, arguments=arguments)