KubeFlow Pipeline DSL Static Type Checking

In this notebook, we will demo:

  • Defining a KubeFlow pipeline with Python DSL
  • Compile the pipeline with type checking

Static type checking helps users to identify component I/O inconsistencies without running the pipeline. It also shortens the development cycles by catching the errors early. This feature is especially useful in two cases: 1) when the pipeline is huge and manually checking the types is infeasible; 2) when some components are shared ones and the type information is not immediately avaiable to the pipeline authors.

Since this sample focuses on the DSL type checking, we will use components that are not runnable in the system but with various type checking scenarios.

Component definition

Components can be defined in either YAML or functions decorated by dsl.component.

Type definition

Types can be defined as string or a dictionary with the openapi_schema_validator property formatted as:

{
    type_name: {
        openapi_schema_validator: {
        }
    }
}

For example, the following yaml declares a GCSPath type with the openapi_schema_validator for output field_m. The type could also be a plain string, such as the GcsUri. The type name could be either one of the core types or customized ones.

name: component a
description: component a desc
inputs:
  - {name: field_l, type: Integer}
outputs:
  - {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: "^gs://.*$" } }}}
  - {name: field_n, type: customized_type}
  - {name: field_o, type: GcsUri} 
implementation:
  container:
    image: gcr.io/ml-pipeline/component-a
    command: [python3, /pipelines/component/src/train.py]
    args: [
      --field-l, {inputValue: field_l},
    ]
    fileOutputs: 
      field_m: /schema.txt
      field_n: /feature.txt
      field_o: /output.txt

If you define the component using the function decorator, there are a list of core types. For example, the following component declares a core type Integer for input field_l while declares customized_type for its output field_n.

@component
def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}, 
                                           'field_n': 'customized_type',
                                           'field_o': 'Integer'
                                          }:
    return ContainerOp(
        name = 'operator a',
        image = 'gcr.io/ml-pipeline/component-a',
        arguments = [
            '--field-l', field_l,
        ],
        file_outputs = {
            'field_m': '/schema.txt',
            'field_n': '/feature.txt',
            'field_o': '/output.txt'
        }
    )

Type check switch

Type checking is enabled by default. It can be disabled as --disable-type-check argument if dsl-compile is run in the command line, or dsl.compiler.Compiler().compile(type_check=False).

If one wants to ignore the type for one parameter, call ignore_type() function in PipelineParam.

How does type checking work?

DSL compiler checks the type consistencies among components by checking the type_name as well as the openapi_schema_validator. Some special cases are listed here:

  1. Type checking succeed: If the upstream/downstream components lack the type information.
  2. Type checking succeed: If the type check is disabled.
  3. Type checking succeed: If the parameter type is ignored.

Setup

Install Pipeline SDK


In [2]:
!python3 -m pip install 'kfp>=0.1.31' --quiet


Collecting https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp-experiment.tar.gz
  Using cached https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp-experiment.tar.gz
Requirement already satisfied, skipping upgrade: urllib3>=1.15 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.22)
Requirement already satisfied, skipping upgrade: six>=1.10 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.11.0)
Requirement already satisfied, skipping upgrade: certifi in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2018.11.29)
Requirement already satisfied, skipping upgrade: python-dateutil in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2.7.5)
Requirement already satisfied, skipping upgrade: PyYAML in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (3.13)
Requirement already satisfied, skipping upgrade: google-cloud-storage==1.13.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.13.0)
Requirement already satisfied, skipping upgrade: kubernetes==8.0.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (8.0.0)
Requirement already satisfied, skipping upgrade: PyJWT==1.6.4 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.6.4)
Requirement already satisfied, skipping upgrade: cryptography==2.4.2 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2.4.2)
Requirement already satisfied, skipping upgrade: google-auth==1.6.1 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.6.1)
Requirement already satisfied, skipping upgrade: requests_toolbelt==0.8.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (0.8.0)
Requirement already satisfied, skipping upgrade: google-resumable-media>=0.3.1 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (0.3.1)
Requirement already satisfied, skipping upgrade: google-cloud-core<0.29dev,>=0.28.0 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (0.28.1)
Requirement already satisfied, skipping upgrade: google-api-core<2.0.0dev,>=0.1.1 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (1.6.0)
Requirement already satisfied, skipping upgrade: adal>=1.0.2 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (1.2.1)
Requirement already satisfied, skipping upgrade: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (0.54.0)
Requirement already satisfied, skipping upgrade: requests-oauthlib in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (1.0.0)
Requirement already satisfied, skipping upgrade: setuptools>=21.0.0 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (38.4.0)
Requirement already satisfied, skipping upgrade: requests in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (2.18.4)
Requirement already satisfied, skipping upgrade: asn1crypto>=0.21.0 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (0.24.0)
Requirement already satisfied, skipping upgrade: idna>=2.1 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (2.6)
Requirement already satisfied, skipping upgrade: cffi!=1.11.3,>=1.7 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (1.11.4)
Requirement already satisfied, skipping upgrade: pyasn1-modules>=0.2.1 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (0.2.2)
Requirement already satisfied, skipping upgrade: rsa>=3.1.4 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (4.0)
Requirement already satisfied, skipping upgrade: cachetools>=2.0.0 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (3.0.0)
Requirement already satisfied, skipping upgrade: pytz in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (2018.7)
Requirement already satisfied, skipping upgrade: googleapis-common-protos!=1.5.4,<2.0dev,>=1.5.3 in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (1.5.5)
Requirement already satisfied, skipping upgrade: protobuf>=3.4.0 in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (3.6.1)
Requirement already satisfied, skipping upgrade: oauthlib>=0.6.2 in /opt/conda/lib/python3.6/site-packages (from requests-oauthlib->kubernetes==8.0.0->kfp==0.1) (2.1.0)
Requirement already satisfied, skipping upgrade: chardet<3.1.0,>=3.0.2 in /opt/conda/lib/python3.6/site-packages (from requests->kubernetes==8.0.0->kfp==0.1) (3.0.4)
Requirement already satisfied, skipping upgrade: pycparser in /opt/conda/lib/python3.6/site-packages (from cffi!=1.11.3,>=1.7->cryptography==2.4.2->kfp==0.1) (2.18)
Requirement already satisfied, skipping upgrade: pyasn1<0.5.0,>=0.4.1 in /opt/conda/lib/python3.6/site-packages (from pyasn1-modules>=0.2.1->google-auth==1.6.1->kfp==0.1) (0.4.4)
Building wheels for collected packages: kfp
  Running setup.py bdist_wheel for kfp ... done
  Stored in directory: /home/jovyan/.cache/pip/wheels/06/14/fc/dd58bcc821d8067efa74a9e217db214d8a075c6b5d31ff24cf
Successfully built kfp
Installing collected packages: kfp
  Found existing installation: kfp 0.1
    Uninstalling kfp-0.1:
      Successfully uninstalled kfp-0.1
Successfully installed kfp-0.1
You are using pip version 18.1, however version 19.0.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

Type Check with YAML components: successful scenario

Author components in YAML


In [1]:
# In yaml, one can optionally add the type information to both inputs and outputs.
# There are two ways to define the types: string or a dictionary with the openapi_schema_validator property.
# The openapi_schema_validator is a json schema object that describes schema of the parameter value.
component_a = '''\
name: component a
description: component a desc
inputs:
  - {name: field_l, type: Integer}
outputs:
  - {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: "^gs://.*$" } }}}
  - {name: field_n, type: customized_type}
  - {name: field_o, type: GcsUri} 
implementation:
  container:
    image: gcr.io/ml-pipeline/component-a
    command: [python3, /pipelines/component/src/train.py]
    args: [
      --field-l, {inputValue: field_l},
    ]
    fileOutputs: 
      field_m: /schema.txt
      field_n: /feature.txt
      field_o: /output.txt
'''
component_b = '''\
name: component b
description: component b desc
inputs:
  - {name: field_x, type: customized_type}
  - {name: field_y, type: GcsUri}
  - {name: field_z, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: "^gs://.*$" } }}}
outputs:
  - {name: output_model_uri, type: GcsUri}
implementation:
  container:
    image: gcr.io/ml-pipeline/component-a
    command: [python3]
    args: [
      --field-x, {inputValue: field_x},
      --field-y, {inputValue: field_y},
      --field-z, {inputValue: field_z},
    ]
    fileOutputs: 
      output_model_uri: /schema.txt
'''

Author a pipeline with the above components


In [2]:
import kfp.components as comp
import kfp.dsl as dsl
import kfp.compiler as compiler
# The components are loaded as task factories that generate container_ops.
task_factory_a = comp.load_component_from_text(text=component_a)
task_factory_b = comp.load_component_from_text(text=component_b)

#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_a',
    description='')
def pipeline_a():
    a = task_factory_a(field_l=12)
    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])

compiler.Compiler().compile(pipeline_a, 'pipeline_a.zip', type_check=True)

Type Check with YAML components: failed scenario

Author components in YAML


In [3]:
# In this case, the component_a contains an output field_o as GcrUri 
# but the component_b requires an input field_y as GcsUri
component_a = '''\
name: component a
description: component a desc
inputs:
  - {name: field_l, type: Integer}
outputs:
  - {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: "^gs://.*$" } }}}
  - {name: field_n, type: customized_type}
  - {name: field_o, type: GcrUri} 
implementation:
  container:
    image: gcr.io/ml-pipeline/component-a
    command: [python3, /pipelines/component/src/train.py]
    args: [
      --field-l, {inputValue: field_l},
    ]
    fileOutputs: 
      field_m: /schema.txt
      field_n: /feature.txt
      field_o: /output.txt
'''
component_b = '''\
name: component b
description: component b desc
inputs:
  - {name: field_x, type: customized_type}
  - {name: field_y, type: GcsUri}
  - {name: field_z, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: "^gs://.*$" } }}}
outputs:
  - {name: output_model_uri, type: GcsUri}
implementation:
  container:
    image: gcr.io/ml-pipeline/component-a
    command: [python3]
    args: [
      --field-x, {inputValue: field_x},
      --field-y, {inputValue: field_y},
      --field-z, {inputValue: field_z},
    ]
    fileOutputs: 
      output_model_uri: /schema.txt
'''

Author a pipeline with the above components


In [4]:
import kfp.components as comp
import kfp.dsl as dsl
import kfp.compiler as compiler
from kfp.dsl.types import InconsistentTypeException
task_factory_a = comp.load_component_from_text(text=component_a)
task_factory_b = comp.load_component_from_text(text=component_b)

#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_b',
    description='')
def pipeline_b():
    a = task_factory_a(field_l=12)
    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])

try:
    compiler.Compiler().compile(pipeline_b, 'pipeline_b.zip', type_check=True)
except InconsistentTypeException as e:
    print(e)


type name GcrUri is different from expected: GcsUri
Component "component b" is expecting field_y to be type(GcsUri), but the passed argument is type(GcrUri)

Author a pipeline with the above components but type checking disabled.


In [5]:
# Disable the type_check
compiler.Compiler().compile(pipeline_b, 'pipeline_b.zip', type_check=False)

Type Check with decorated components: successful scenario

Author components with decorator


In [6]:
from kfp.dsl import component
from kfp.dsl.types import Integer, GCSPath
from kfp.dsl import ContainerOp
# when components are defined based on the component decorator,
# the type information is annotated to the input or function returns.
# There are two ways to define the type: string or a dictionary with the openapi_schema_validator property
@component
def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}, 
                                           'field_n': 'customized_type',
                                           'field_o': 'Integer'
                                          }:
    return ContainerOp(
        name = 'operator a',
        image = 'gcr.io/ml-pipeline/component-a',
        arguments = [
            '--field-l', field_l,
        ],
        file_outputs = {
            'field_m': '/schema.txt',
            'field_n': '/feature.txt',
            'field_o': '/output.txt'
        }
    )

# Users can also use the core types that are pre-defined in the SDK.
# For a full list of core types, check out: https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/dsl/types.py
@component
def task_factory_b(field_x: 'customized_type',
        field_y: Integer(),
        field_z: {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}) -> {'output_model_uri': 'GcsUri'}:
    return ContainerOp(
        name = 'operator b',
        image = 'gcr.io/ml-pipeline/component-a',
        command = [
            'python3',
            field_x,
        ],
        arguments = [
            '--field-y', field_y,
            '--field-z', field_z,
        ],
        file_outputs = {
            'output_model_uri': '/schema.txt',
        }
    )

Author a pipeline with the above components


In [7]:
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_c',
    description='')
def pipeline_c():
    a = task_factory_a(field_l=12)
    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])

compiler.Compiler().compile(pipeline_c, 'pipeline_c.zip', type_check=True)

Type Check with decorated components: failure scenario

Author components with decorator


In [9]:
from kfp.dsl import component
from kfp.dsl.types import Integer, GCSPath
from kfp.dsl import ContainerOp
# task_factory_a outputs an input field_m with the openapi_schema_validator different
# from the task_factory_b's input field_z.
# One is gs:// and the other is gcs://
@component
def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}, 
                                           'field_n': 'customized_type',
                                           'field_o': 'Integer'
                                          }:
    return ContainerOp(
        name = 'operator a',
        image = 'gcr.io/ml-pipeline/component-a',
        arguments = [
            '--field-l', field_l,
        ],
        file_outputs = {
            'field_m': '/schema.txt',
            'field_n': '/feature.txt',
            'field_o': '/output.txt'
        }
    )

@component
def task_factory_b(field_x: 'customized_type',
        field_y: Integer(),
        field_z: {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gcs://.*$"}'}}) -> {'output_model_uri': 'GcsUri'}:
    return ContainerOp(
        name = 'operator b',
        image = 'gcr.io/ml-pipeline/component-a',
        command = [
            'python3',
            field_x,
        ],
        arguments = [
            '--field-y', field_y,
            '--field-z', field_z,
        ],
        file_outputs = {
            'output_model_uri': '/schema.txt',
        }
    )

Author a pipeline with the above components


In [10]:
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_d',
    description='')
def pipeline_d():
    a = task_factory_a(field_l=12)
    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])

try:
    compiler.Compiler().compile(pipeline_d, 'pipeline_d.zip', type_check=True)
except InconsistentTypeException as e:
    print(e)


GCSPath has a property openapi_schema_validator with value: {"type": "string", "pattern": "^gs://.*$"} and {"type": "string", "pattern": "^gcs://.*$"}
Component "task_factory_b" is expecting field_z to be type({'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gcs://.*$"}'}}), but the passed argument is type({'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}})

Author a pipeline with the above components but ignoring types.


In [11]:
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_d',
    description='')
def pipeline_d():
    a = task_factory_a(field_l=12)
    # For each of the arguments, authors can also ignore the types by calling ignore_type function.
    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'].ignore_type())
compiler.Compiler().compile(pipeline_d, 'pipeline_d.zip', type_check=True)

Type Check with missing type information

Author components(with missing types)


In [12]:
from kfp.dsl import component
from kfp.dsl.types import Integer, GCSPath
from kfp.dsl import ContainerOp
# task_factory_a lacks the type information for output filed_n
# task_factory_b lacks the type information for input field_y
# When no type information is provided, it matches all types.
@component
def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}, 
                                           'field_o': 'Integer'
                                          }:
    return ContainerOp(
        name = 'operator a',
        image = 'gcr.io/ml-pipeline/component-a',
        arguments = [
            '--field-l', field_l,
        ],
        file_outputs = {
            'field_m': '/schema.txt',
            'field_n': '/feature.txt',
            'field_o': '/output.txt'
        }
    )

@component
def task_factory_b(field_x: 'customized_type',
        field_y,
        field_z: {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}) -> {'output_model_uri': 'GcsUri'}:
    return ContainerOp(
        name = 'operator b',
        image = 'gcr.io/ml-pipeline/component-a',
        command = [
            'python3',
            field_x,
        ],
        arguments = [
            '--field-y', field_y,
            '--field-z', field_z,
        ],
        file_outputs = {
            'output_model_uri': '/schema.txt',
        }
    )

Author a pipeline with the above components


In [13]:
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_e',
    description='')
def pipeline_e():
    a = task_factory_a(field_l=12)
    b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])

compiler.Compiler().compile(pipeline_e, 'pipeline_e.zip', type_check=True)

Type Check with both named arguments and positional arguments


In [14]:
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_f',
    description='')
def pipeline_f():
    a = task_factory_a(field_l=12)
    b = task_factory_b(a.outputs['field_n'], a.outputs['field_o'], field_z=a.outputs['field_m'])

compiler.Compiler().compile(pipeline_f, 'pipeline_f.zip', type_check=True)

Type Check between pipeline parameters and component parameters


In [15]:
@component
def task_factory_a(field_m: {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}, field_o: 'Integer'):
    return ContainerOp(
        name = 'operator a',
        image = 'gcr.io/ml-pipeline/component-b',
        arguments = [
            '--field-l', field_m,
            '--field-o', field_o,
        ],
    )

# Pipeline input types are also checked against the component I/O types.
@dsl.pipeline(name='type_check_g',
    description='')
def pipeline_g(a: {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}='gs://kfp-path', b: Integer()=12):
    task_factory_a(field_m=a, field_o=b)

try:
    compiler.Compiler().compile(pipeline_g, 'pipeline_g.zip', type_check=True)
except InconsistentTypeException as e:
    print(e)


Integer has a property openapi_schema_validator that the latter does not.
Component "task_factory_a" is expecting field_o to be type(Integer), but the passed argument is type({'Integer': {'openapi_schema_validator': {'type': 'integer'}}})

Clean up


In [16]:
from pathlib import Path
for p in Path(".").glob("pipeline_[a-g].zip"):
    p.unlink()