Data passing tutorial

Data passing is the most important aspect of Pipelines.

In Kubeflow Pipelines, the pipeline authors compose pipelines by creating component instances (tasks) and connecting them together.

Component have inputs and outputs. They can consume and produce arbitrary data.

Pipeline authors establish connections between component tasks by connecting their data inputs and outputs - by passing the output of one task as an argument to another task's input.

The system takes care of storing the data produced by components and later passing that data to other components for consumption as instructed by the pipeline.

This tutorial shows how to create python components that produce, consume and transform data. It shows how to create data passing pipelines by instantiating components and connecting them together.


In [1]:
from typing import NamedTuple

import kfp
from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile
from kfp.components import func_to_container_op
from kfp_tekton.compiler import TektonCompiler

Small data

Small data is the data that you'll be comfortable passing as program's command-line argument. Small data size should not exceed few kilobytes.

Some examples of typical types of small data are: number, URL, small string (e.g. column name).

Small lists, dictionaries and JSON structures are fine, but keep an eye on the size and consider switching to file-based data passing methods that are more suitable for big data (more than several kilobytes) or binary data.

All small data outputs will be at some point serialized to strings and all small data input values will be at some point deserialized from strings (passed as command-line argumants). There are built-in serializers and deserializers for several common types (e.g. str, int, float, bool, list, dict). All other types of data need to be serialized manually before returning the data. Make sure to properly specify type annotations, otherwize there would be no automatic deserialization and the component function will receive strings instead of deserialized objects.

Consuming small data


In [2]:
@func_to_container_op
def print_small_text(text: str):
    '''Print small text'''
    print(text)

def constant_to_consumer_pipeline():
    '''Pipeline that passes small constant string to to consumer'''
    consume_task = print_small_text('Hello world') # Passing constant as argument to consumer

TektonCompiler().compile(constant_to_consumer_pipeline,
                         'constant_to_consumer_pipeline.yaml')

In [3]:
!kubectl apply -f constant_to_consumer_pipeline.yaml


pipelinerun.tekton.dev/constant-to-consumer-pipeline created

In [4]:
!tkn pr describe constant-to-consumer-pipeline


Name:        constant-to-consumer-pipeline
Namespace:   kubeflow

🌡️  Status

STARTED        DURATION     STATUS
1 minute ago   37 seconds   Succeeded

📦 Resources

 No resources

⚓ Params

 No params

🗂  Taskruns

 NAME                                                     TASK NAME          STARTED        DURATION     STATUS
 ∙ constant-to-consumer-pipeline-print-small-text-pkgqh   print-small-text   1 minute ago   36 seconds   Succeeded

In [5]:
def pipeline_parameter_to_consumer_pipeline(text: str):
    '''Pipeline that passes small pipeline parameter string to to consumer'''
    consume_task = print_small_text(text) # Passing pipeline parameter as argument to consumer

TektonCompiler().compile(pipeline_parameter_to_consumer_pipeline,
                         'pipeline_parameter_to_consumer_pipeline.yaml')

In [6]:
!kubectl apply -f pipeline_parameter_to_consumer_pipeline.yaml


pipelinerun.tekton.dev/pipeline-parameter-to-consumer-pipeline created

In [7]:
!tkn pr describe pipeline-parameter-to-consumer-pipeline


Name:        pipeline-parameter-to-consumer-pipeline
Namespace:   kubeflow

🌡️  Status

STARTED         DURATION    STATUS
2 minutes ago   6 seconds   Succeeded

📦 Resources

 No resources

⚓ Params

 NAME     VALUE
 ∙ text   

🗂  Taskruns

 NAME                                                               TASK NAME          STARTED         DURATION    STATUS
 ∙ pipeline-parameter-to-consumer-pipeline-print-small-text-mfrbp   print-small-text   2 minutes ago   6 seconds   Succeeded

Producing small data


In [8]:
@func_to_container_op
def produce_one_small_output() -> str:
    return 'Hello world'

def task_output_to_consumer_pipeline():
    '''Pipeline that passes small data from producer to consumer'''
    produce_task = produce_one_small_output()
    # Passing producer task output as argument to consumer
    consume_task1 = print_small_text(produce_task.output) # task.output only works for single-output components
    consume_task2 = print_small_text(produce_task.outputs['output']) # task.outputs[...] always works

TektonCompiler().compile(task_output_to_consumer_pipeline,
                         'task_output_to_consumer_pipeline.yaml')

In [9]:
!kubectl apply -f task_output_to_consumer_pipeline.yaml


pipelinerun.tekton.dev/task-output-to-consumer-pipeline created

In [10]:
!tkn pr describe task-output-to-consumer-pipeline


Name:        task-output-to-consumer-pipeline
Namespace:   kubeflow

🌡️  Status

STARTED          DURATION     STATUS
13 seconds ago   12 seconds   Succeeded

📦 Resources

 No resources

⚓ Params

 No params

🗂  Taskruns

 NAME                                                                TASK NAME                  STARTED          DURATION    STATUS
 ∙ task-output-to-consumer-pipeline-print-small-text-2-6lxkx         print-small-text-2         7 seconds ago    6 seconds   Succeeded
 ∙ task-output-to-consumer-pipeline-print-small-text-p89ww           print-small-text           7 seconds ago    5 seconds   Succeeded
 ∙ task-output-to-consumer-pipeline-produce-one-small-output-9wmcs   produce-one-small-output   13 seconds ago   6 seconds   Succeeded

Producing and consuming multiple arguments


In [11]:
@func_to_container_op
def produce_two_small_outputs() -> NamedTuple('Outputs', [('text', str), ('number', int)]):
    return ("data 1", 42)

@func_to_container_op
def consume_two_arguments(text: str, number: int):
    print('Text={}'.format(text))
    print('Number={}'.format(str(number)))

def producers_to_consumers_pipeline(text: str = "Hello world"):
    '''Pipeline that passes data from producer to consumer'''
    produce1_task = produce_one_small_output()
    produce2_task = produce_two_small_outputs()

    consume_task1 = consume_two_arguments(produce1_task.output, 42)
    consume_task2 = consume_two_arguments(text, produce2_task.outputs['number'])
    consume_task3 = consume_two_arguments(produce2_task.outputs['text'], produce2_task.outputs['number'])

TektonCompiler().compile(producers_to_consumers_pipeline,
                         'producers_to_consumers_pipeline.yaml')

In [12]:
!kubectl apply -f producers_to_consumers_pipeline.yaml


pipelinerun.tekton.dev/producers-to-consumers-pipeline created

In [13]:
!tkn pr describe producers-to-consumers-pipeline


Name:        producers-to-consumers-pipeline
Namespace:   kubeflow

🌡️  Status

STARTED          DURATION     STATUS
38 seconds ago   13 seconds   Succeeded

📦 Resources

 No resources

⚓ Params

 NAME     VALUE
 ∙ text   Hello world

🗂  Taskruns

 NAME                                                                TASK NAME                   STARTED          DURATION    STATUS
 ∙ producers-to-consumers-pipeline-consume-two-arguments-2-bkpj9     consume-two-arguments-2     33 seconds ago   6 seconds   Succeeded
 ∙ producers-to-consumers-pipeline-consume-two-arguments-3-hpgn6     consume-two-arguments-3     33 seconds ago   8 seconds   Succeeded
 ∙ producers-to-consumers-pipeline-consume-two-arguments-fbd6k       consume-two-arguments       33 seconds ago   7 seconds   Succeeded
 ∙ producers-to-consumers-pipeline-produce-two-small-outputs-5zxlz   produce-two-small-outputs   38 seconds ago   5 seconds   Succeeded
 ∙ producers-to-consumers-pipeline-produce-one-small-output-f2qnt    produce-one-small-output    38 seconds ago   5 seconds   Succeeded

Consuming and producing data at the same time


In [14]:
@func_to_container_op
def get_item_from_list(list_of_strings: list, index: int) -> str:
    return list_of_strings[index]

@func_to_container_op
def truncate_text(text: str, max_length: int) -> str:
    return text[0:max_length]

def processing_pipeline(text: str = "Hello world"):
    truncate_task = truncate_text(text, max_length=5)
    get_item_task = get_item_from_list(list_of_strings=[3, 1, truncate_task.output, 1, 5, 9, 2, 6, 7], index=2)
    print_small_text(get_item_task.output)


TektonCompiler().compile(processing_pipeline,
                         'processing_pipeline.yaml')

In [15]:
!kubectl apply -f processing_pipeline.yaml


pipelinerun.tekton.dev/processing-pipeline created

In [16]:
!tkn pr describe processing-pipeline


Name:        processing-pipeline
Namespace:   kubeflow

🌡️  Status

STARTED          DURATION     STATUS
29 seconds ago   14 seconds   Succeeded

📦 Resources

 No resources

⚓ Params

 NAME     VALUE
 ∙ text   Hello world

🗂  Taskruns

 NAME                                             TASK NAME            STARTED          DURATION    STATUS
 ∙ processing-pipeline-print-small-text-l8vrv     print-small-text     19 seconds ago   4 seconds   Succeeded
 ∙ processing-pipeline-get-item-from-list-qqbts   get-item-from-list   24 seconds ago   5 seconds   Succeeded
 ∙ processing-pipeline-truncate-text-2mlt4        truncate-text        29 seconds ago   5 seconds   Succeeded

big data (files)

big data should be read from files and written to files.

The paths for the input and output files are chosen by the system and are passed into the function (as strings).

Use the InputPath parameter annotation to tell the system that the function wants to consume the corresponding input data as a file. The system will download the data, write it to a local file and then pass the path of that file to the function.

Use the OutputPath parameter annotation to tell the system that the function wants to produce the corresponding output data as a file. The system will prepare and pass the path of a file where the function should write the output data. After the function exits, the system will upload the data to the storage system so that it can be passed to downstream components.

You can specify the type of the consumed/produced data by specifying the type argument to InputPath and OutputPath. The type can be a python type or an arbitrary type name string. OutputPath('TFModel') means that the function states that the data it has written to a file has type 'TFModel'. InputPath('TFModel') means that the function states that it expect the data it reads from a file to have type 'TFModel'. When the pipeline author connects inputs to outputs the system checks whether the types match.

Note on input/output names: When the function is converted to component, the input and output names generally follow the parameter names, but the "_path" and "_file" suffixes are stripped from file/path inputs and outputs. E.g. the number_file_path: InputPath(int) parameter becomes the number: int input. This makes the argument passing look more natural: number=42 instead of number_file_path=42.

Notes: As we used 'workspaces' in Tekton pipelines to handle big data processing, the compiler will generate the PVC definitions and needs the volume to store the data. User need to create volume manually, or enable dynamic volume provisioning, refer to the link of: https://kubernetes.io/docs/concepts/storage/dynamic-provisioning

Writing and reading big data


In [17]:
# Writing big data
@func_to_container_op
def repeat_line(line: str, output_text_path: OutputPath(str), count: int = 10):
    '''Repeat the line specified number of times'''
    with open(output_text_path, 'w') as writer:
        for i in range(count):
            writer.write(line + '\n')


# Reading big data
@func_to_container_op
def print_text(text_path: InputPath()): # The "text" input is untyped so that any data can be printed
    '''Print text'''
    with open(text_path, 'r') as reader:
        for line in reader:
            print(line, end = '')

def print_repeating_lines_pipeline():
    repeat_lines_task = repeat_line(line='Hello', count=5000)
    print_text(repeat_lines_task.output) # Don't forget .output !

TektonCompiler().compile(print_repeating_lines_pipeline,
                         'print_repeating_lines_pipeline.yaml')

In [18]:
!kubectl apply -f print_repeating_lines_pipeline.yaml


pipelinerun.tekton.dev/print-repeating-lines-pipeline created

In [19]:
!tkn pr describe print-repeating-lines-pipeline


Name:           print-repeating-lines-pipeline-run
Namespace:      kubeflow
Pipeline Ref:   print-repeating-lines-pipeline

🌡️  Status

STARTED          DURATION     STATUS
12 seconds ago   11 seconds   Succeeded

📦 Resources

 No resources

⚓ Params

 No params

🗂  Taskruns

 NAME                                                     TASK NAME     STARTED          DURATION    STATUS
 ∙ print-repeating-lines-pipeline-run-print-text-xdzcj    print-text    6 seconds ago    5 seconds   Succeeded
 ∙ print-repeating-lines-pipeline-run-repeat-line-lcvrf   repeat-line   12 seconds ago   6 seconds   Succeeded

Processing big data


In [20]:
@func_to_container_op
def split_text_lines(source_path: InputPath(str), odd_lines_path: OutputPath(str), even_lines_path: OutputPath(str)):
    with open(source_path, 'r') as reader:
        with open(odd_lines_path, 'w') as odd_writer:
            with open(even_lines_path, 'w') as even_writer:
                while True:
                    line = reader.readline()
                    if line == "":
                        break
                    odd_writer.write(line)
                    line = reader.readline()
                    if line == "":
                        break
                    even_writer.write(line)

def text_splitting_pipeline():
    text = '\n'.join(['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten'])
    split_text_task = split_text_lines(text)
    print_text(split_text_task.outputs['odd_lines'])
    print_text(split_text_task.outputs['even_lines'])

TektonCompiler().compile(text_splitting_pipeline,
                         'text_splitting_pipeline.yaml')

In [21]:
!kubectl apply -f text_splitting_pipeline.yaml


pipelinerun.tekton.dev/text-splitting-pipeline created

In [22]:
!tkn pr describe text-splitting-pipeline


Name:           text-splitting-pipeline-run
Namespace:      kubeflow
Pipeline Ref:   text-splitting-pipeline

🌡️  Status

STARTED        DURATION   STATUS
1 minute ago   1 minute   Succeeded

📦 Resources

 No resources

⚓ Params

 No params

🗂  Taskruns

 NAME                                                   TASK NAME          STARTED          DURATION    STATUS
 ∙ text-splitting-pipeline-run-print-text-2-25xs2       print-text-2       16 seconds ago   6 seconds   Succeeded
 ∙ text-splitting-pipeline-run-print-text-kbq4v         print-text         16 seconds ago   6 seconds   Succeeded
 ∙ text-splitting-pipeline-run-split-text-lines-dmtqx   split-text-lines   1 minute ago     1 minute    Succeeded

Processing big data with pre-opened files


In [22]:
@func_to_container_op
def split_text_lines2(source_file: InputTextFile(str), odd_lines_file: OutputTextFile(str), even_lines_file: OutputTextFile(str)):
    while True:
        line = source_file.readline()
        if line == "":
            break
        odd_lines_file.write(line)
        line = source_file.readline()
        if line == "":
            break
        even_lines_file.write(line)

def text_splitting_pipeline2():
    text = '\n'.join(['one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten'])
    split_text_task = split_text_lines2(text)
    print_text(split_text_task.outputs['odd_lines']).set_display_name('Odd lines')
    print_text(split_text_task.outputs['even_lines']).set_display_name('Even lines')

TektonCompiler().compile(text_splitting_pipeline2,
                         'text_splitting_pipeline2.yaml')

In [23]:
!kubectl apply -f text_splitting_pipeline2.yaml


pipelinerun.tekton.dev/text-splitting-pipeline2 created

In [25]:
!tkn pr describe text-splitting-pipeline2


Name:           text-splitting-pipeline2-run
Namespace:      kubeflow
Pipeline Ref:   text-splitting-pipeline2

🌡️  Status

STARTED          DURATION     STATUS
48 seconds ago   48 seconds   Succeeded

📦 Resources

 No resources

⚓ Params

 No params

🗂  Taskruns

 NAME                                                     TASK NAME           STARTED          DURATION     STATUS
 ∙ text-splitting-pipeline2-run-print-text-2-5gfrz        print-text-2        6 seconds ago    6 seconds    Succeeded
 ∙ text-splitting-pipeline2-run-print-text-ckdcr          print-text          6 seconds ago    6 seconds    Succeeded
 ∙ text-splitting-pipeline2-run-split-text-lines2-xwlvh   split-text-lines2   48 seconds ago   42 seconds   Succeeded

Example: Pipeline that generates then sums many numbers


In [26]:
# Writing many numbers
@func_to_container_op
def write_numbers(numbers_path: OutputPath(str), start: int = 0, count: int = 10):
    with open(numbers_path, 'w') as writer:
        for i in range(start, count):
            writer.write(str(i) + '\n')


# Reading and summing many numbers
@func_to_container_op
def sum_numbers(numbers_path: InputPath(str)) -> int:
    sum = 0
    with open(numbers_path, 'r') as reader:
        for line in reader:
            sum = sum + int(line)
    return sum



# Pipeline to sum 100000 numbers
def sum_pipeline(count: 'Integer' = 100000):
    numbers_task = write_numbers(count=count)
    print_text(numbers_task.output)

    sum_task = sum_numbers(numbers_task.outputs['numbers'])
    print_text(sum_task.output)

TektonCompiler().compile(sum_pipeline,
                         'sum_pipeline.yaml')

In [27]:
!kubectl apply -f sum_pipeline.yaml


pipelinerun.tekton.dev/sum-pipeline created

In [28]:
!tkn pr describe sum-pipeline


Name:           sum-pipeline-run
Namespace:      kubeflow
Pipeline Ref:   sum-pipeline

🌡️  Status

STARTED          DURATION     STATUS
45 seconds ago   40 seconds   Succeeded

📦 Resources

 No resources

⚓ Params

 NAME      VALUE
 ∙ count   100000

🗂  Taskruns

 NAME                                     TASK NAME       STARTED          DURATION     STATUS
 ∙ sum-pipeline-run-print-text-2-k68zl    print-text-2    10 seconds ago   5 seconds    Succeeded
 ∙ sum-pipeline-run-sum-numbers-9k4s7     sum-numbers     15 seconds ago   5 seconds    Succeeded
 ∙ sum-pipeline-run-print-text-r6blr      print-text      16 seconds ago   6 seconds    Succeeded
 ∙ sum-pipeline-run-write-numbers-9mdfq   write-numbers   45 seconds ago   29 seconds   Succeeded

In [ ]: