In this notebook, we will demo:
Static type checking helps users to identify component I/O inconsistencies without running the pipeline. It also shortens the development cycles by catching the errors early. This feature is especially useful in two cases: 1) when the pipeline is huge and manually checking the types is infeasible; 2) when some components are shared ones and the type information is not immediately avaiable to the pipeline authors.
Since this sample focuses on the DSL type checking, we will use components that are not runnable in the system but with various type checking scenarios.
Components can be defined in either YAML or functions decorated by dsl.component.
Types can be defined as string or a dictionary with the openapi_schema_validator property formatted as:
{
type_name: {
openapi_schema_validator: {
}
}
}
For example, the following yaml declares a GCSPath type with the openapi_schema_validator for output field_m. The type could also be a plain string, such as the GcsUri. The type name could be either one of the core types or customized ones.
name: component a
description: component a desc
inputs:
- {name: field_l, type: Integer}
outputs:
- {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: "^gs://.*$" } }}}
- {name: field_n, type: customized_type}
- {name: field_o, type: GcsUri}
implementation:
container:
image: gcr.io/ml-pipeline/component-a
command: [python3, /pipelines/component/src/train.py]
args: [
--field-l, {inputValue: field_l},
]
fileOutputs:
field_m: /schema.txt
field_n: /feature.txt
field_o: /output.txt
If you define the component using the function decorator, there are a list of core types. For example, the following component declares a core type Integer for input field_l while declares customized_type for its output field_n.
@component
def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}},
'field_n': 'customized_type',
'field_o': 'Integer'
}:
return ContainerOp(
name = 'operator a',
image = 'gcr.io/ml-pipeline/component-a',
arguments = [
'--field-l', field_l,
],
file_outputs = {
'field_m': '/schema.txt',
'field_n': '/feature.txt',
'field_o': '/output.txt'
}
)
Type checking is enabled by default. It can be disabled as --disable-type-check argument if dsl-compile is run in the command line, or dsl.compiler.Compiler().compile(type_check=False)
.
If one wants to ignore the type for one parameter, call ignore_type() function in PipelineParam.
DSL compiler checks the type consistencies among components by checking the type_name as well as the openapi_schema_validator. Some special cases are listed here:
In [2]:
!python3 -m pip install 'kfp>=0.1.31' --quiet
In [1]:
# In yaml, one can optionally add the type information to both inputs and outputs.
# There are two ways to define the types: string or a dictionary with the openapi_schema_validator property.
# The openapi_schema_validator is a json schema object that describes schema of the parameter value.
component_a = '''\
name: component a
description: component a desc
inputs:
- {name: field_l, type: Integer}
outputs:
- {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: "^gs://.*$" } }}}
- {name: field_n, type: customized_type}
- {name: field_o, type: GcsUri}
implementation:
container:
image: gcr.io/ml-pipeline/component-a
command: [python3, /pipelines/component/src/train.py]
args: [
--field-l, {inputValue: field_l},
]
fileOutputs:
field_m: /schema.txt
field_n: /feature.txt
field_o: /output.txt
'''
component_b = '''\
name: component b
description: component b desc
inputs:
- {name: field_x, type: customized_type}
- {name: field_y, type: GcsUri}
- {name: field_z, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: "^gs://.*$" } }}}
outputs:
- {name: output_model_uri, type: GcsUri}
implementation:
container:
image: gcr.io/ml-pipeline/component-a
command: [python3]
args: [
--field-x, {inputValue: field_x},
--field-y, {inputValue: field_y},
--field-z, {inputValue: field_z},
]
fileOutputs:
output_model_uri: /schema.txt
'''
In [2]:
import kfp.components as comp
import kfp.dsl as dsl
import kfp.compiler as compiler
# The components are loaded as task factories that generate container_ops.
task_factory_a = comp.load_component_from_text(text=component_a)
task_factory_b = comp.load_component_from_text(text=component_b)
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_a',
description='')
def pipeline_a():
a = task_factory_a(field_l=12)
b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])
compiler.Compiler().compile(pipeline_a, 'pipeline_a.zip', type_check=True)
In [3]:
# In this case, the component_a contains an output field_o as GcrUri
# but the component_b requires an input field_y as GcsUri
component_a = '''\
name: component a
description: component a desc
inputs:
- {name: field_l, type: Integer}
outputs:
- {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: "^gs://.*$" } }}}
- {name: field_n, type: customized_type}
- {name: field_o, type: GcrUri}
implementation:
container:
image: gcr.io/ml-pipeline/component-a
command: [python3, /pipelines/component/src/train.py]
args: [
--field-l, {inputValue: field_l},
]
fileOutputs:
field_m: /schema.txt
field_n: /feature.txt
field_o: /output.txt
'''
component_b = '''\
name: component b
description: component b desc
inputs:
- {name: field_x, type: customized_type}
- {name: field_y, type: GcsUri}
- {name: field_z, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: "^gs://.*$" } }}}
outputs:
- {name: output_model_uri, type: GcsUri}
implementation:
container:
image: gcr.io/ml-pipeline/component-a
command: [python3]
args: [
--field-x, {inputValue: field_x},
--field-y, {inputValue: field_y},
--field-z, {inputValue: field_z},
]
fileOutputs:
output_model_uri: /schema.txt
'''
In [4]:
import kfp.components as comp
import kfp.dsl as dsl
import kfp.compiler as compiler
from kfp.dsl.types import InconsistentTypeException
task_factory_a = comp.load_component_from_text(text=component_a)
task_factory_b = comp.load_component_from_text(text=component_b)
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_b',
description='')
def pipeline_b():
a = task_factory_a(field_l=12)
b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])
try:
compiler.Compiler().compile(pipeline_b, 'pipeline_b.zip', type_check=True)
except InconsistentTypeException as e:
print(e)
In [5]:
# Disable the type_check
compiler.Compiler().compile(pipeline_b, 'pipeline_b.zip', type_check=False)
In [6]:
from kfp.dsl import component
from kfp.dsl.types import Integer, GCSPath
from kfp.dsl import ContainerOp
# when components are defined based on the component decorator,
# the type information is annotated to the input or function returns.
# There are two ways to define the type: string or a dictionary with the openapi_schema_validator property
@component
def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}},
'field_n': 'customized_type',
'field_o': 'Integer'
}:
return ContainerOp(
name = 'operator a',
image = 'gcr.io/ml-pipeline/component-a',
arguments = [
'--field-l', field_l,
],
file_outputs = {
'field_m': '/schema.txt',
'field_n': '/feature.txt',
'field_o': '/output.txt'
}
)
# Users can also use the core types that are pre-defined in the SDK.
# For a full list of core types, check out: https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/dsl/types.py
@component
def task_factory_b(field_x: 'customized_type',
field_y: Integer(),
field_z: {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}) -> {'output_model_uri': 'GcsUri'}:
return ContainerOp(
name = 'operator b',
image = 'gcr.io/ml-pipeline/component-a',
command = [
'python3',
field_x,
],
arguments = [
'--field-y', field_y,
'--field-z', field_z,
],
file_outputs = {
'output_model_uri': '/schema.txt',
}
)
In [7]:
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_c',
description='')
def pipeline_c():
a = task_factory_a(field_l=12)
b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])
compiler.Compiler().compile(pipeline_c, 'pipeline_c.zip', type_check=True)
In [9]:
from kfp.dsl import component
from kfp.dsl.types import Integer, GCSPath
from kfp.dsl import ContainerOp
# task_factory_a outputs an input field_m with the openapi_schema_validator different
# from the task_factory_b's input field_z.
# One is gs:// and the other is gcs://
@component
def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}},
'field_n': 'customized_type',
'field_o': 'Integer'
}:
return ContainerOp(
name = 'operator a',
image = 'gcr.io/ml-pipeline/component-a',
arguments = [
'--field-l', field_l,
],
file_outputs = {
'field_m': '/schema.txt',
'field_n': '/feature.txt',
'field_o': '/output.txt'
}
)
@component
def task_factory_b(field_x: 'customized_type',
field_y: Integer(),
field_z: {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gcs://.*$"}'}}) -> {'output_model_uri': 'GcsUri'}:
return ContainerOp(
name = 'operator b',
image = 'gcr.io/ml-pipeline/component-a',
command = [
'python3',
field_x,
],
arguments = [
'--field-y', field_y,
'--field-z', field_z,
],
file_outputs = {
'output_model_uri': '/schema.txt',
}
)
In [10]:
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_d',
description='')
def pipeline_d():
a = task_factory_a(field_l=12)
b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])
try:
compiler.Compiler().compile(pipeline_d, 'pipeline_d.zip', type_check=True)
except InconsistentTypeException as e:
print(e)
In [11]:
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_d',
description='')
def pipeline_d():
a = task_factory_a(field_l=12)
# For each of the arguments, authors can also ignore the types by calling ignore_type function.
b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'].ignore_type())
compiler.Compiler().compile(pipeline_d, 'pipeline_d.zip', type_check=True)
In [12]:
from kfp.dsl import component
from kfp.dsl.types import Integer, GCSPath
from kfp.dsl import ContainerOp
# task_factory_a lacks the type information for output filed_n
# task_factory_b lacks the type information for input field_y
# When no type information is provided, it matches all types.
@component
def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}},
'field_o': 'Integer'
}:
return ContainerOp(
name = 'operator a',
image = 'gcr.io/ml-pipeline/component-a',
arguments = [
'--field-l', field_l,
],
file_outputs = {
'field_m': '/schema.txt',
'field_n': '/feature.txt',
'field_o': '/output.txt'
}
)
@component
def task_factory_b(field_x: 'customized_type',
field_y,
field_z: {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}) -> {'output_model_uri': 'GcsUri'}:
return ContainerOp(
name = 'operator b',
image = 'gcr.io/ml-pipeline/component-a',
command = [
'python3',
field_x,
],
arguments = [
'--field-y', field_y,
'--field-z', field_z,
],
file_outputs = {
'output_model_uri': '/schema.txt',
}
)
In [13]:
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_e',
description='')
def pipeline_e():
a = task_factory_a(field_l=12)
b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])
compiler.Compiler().compile(pipeline_e, 'pipeline_e.zip', type_check=True)
In [14]:
#Use the component as part of the pipeline
@dsl.pipeline(name='type_check_f',
description='')
def pipeline_f():
a = task_factory_a(field_l=12)
b = task_factory_b(a.outputs['field_n'], a.outputs['field_o'], field_z=a.outputs['field_m'])
compiler.Compiler().compile(pipeline_f, 'pipeline_f.zip', type_check=True)
In [15]:
@component
def task_factory_a(field_m: {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}, field_o: 'Integer'):
return ContainerOp(
name = 'operator a',
image = 'gcr.io/ml-pipeline/component-b',
arguments = [
'--field-l', field_m,
'--field-o', field_o,
],
)
# Pipeline input types are also checked against the component I/O types.
@dsl.pipeline(name='type_check_g',
description='')
def pipeline_g(a: {'GCSPath': {'openapi_schema_validator': '{"type": "string", "pattern": "^gs://.*$"}'}}='gs://kfp-path', b: Integer()=12):
task_factory_a(field_m=a, field_o=b)
try:
compiler.Compiler().compile(pipeline_g, 'pipeline_g.zip', type_check=True)
except InconsistentTypeException as e:
print(e)
In [16]:
from pathlib import Path
for p in Path(".").glob("pipeline_[a-g].zip"):
p.unlink()