In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License")
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
|
A transform for generic parallel processing.
A ParDo
transform considers each element in the input PCollection
,
performs some processing function (your user code) on that element,
and emits zero or more elements to an output PCollection
.
See more information in the Beam Programming Guide.
To run a code cell, you can click the Run cell button at the top left of the cell,
or select it and press Shift+Enter
.
Try modifying a code cell and re-running it to see what happens.
To learn more about Colab, see Welcome to Colaboratory!.
First, let's install the apache-beam
module.
In [ ]:
!pip install --quiet -U apache-beam
In [ ]:
import apache_beam as beam
class SplitWords(beam.DoFn):
def __init__(self, delimiter=','):
self.delimiter = delimiter
def process(self, text):
for word in text.split(self.delimiter):
yield word
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'🍓Strawberry,🥕Carrot,🍆Eggplant',
'🍅Tomato,🥔Potato',
])
| 'Split words' >> beam.ParDo(SplitWords(','))
| beam.Map(print)
)
In this example, we add new parameters to the process
method to bind parameter values at runtime.
beam.DoFn.TimestampParam
binds the timestamp information as an
apache_beam.utils.timestamp.Timestamp
object.beam.DoFn.WindowParam
binds the window information as the appropriate
apache_beam.transforms.window.*Window
object.
In [ ]:
import apache_beam as beam
class AnalyzeElement(beam.DoFn):
def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
yield '\n'.join([
'# timestamp',
'type(timestamp) -> ' + repr(type(timestamp)),
'timestamp.micros -> ' + repr(timestamp.micros),
'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
'',
'# window',
'type(window) -> ' + repr(type(window)),
'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),
'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),
'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
])
with beam.Pipeline() as pipeline:
dofn_params = (
pipeline
| 'Create a single test element' >> beam.Create([':)'])
| 'Add timestamp (Spring equinox 2020)' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, 1584675660))
| 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
| 'Analyze element' >> beam.ParDo(AnalyzeElement())
| beam.Map(print)
)
A DoFn
can be customized with a number of methods that can help create more complex behaviors.
You can customize what a worker does when it starts and shuts down with setup
and teardown
.
You can also customize what to do when a
bundle of elements
starts and finishes with start_bundle
and finish_bundle
.
DoFn.setup()
:
Called once per DoFn
instance when the DoFn
instance is initialized.
setup
need not to be cached, so it could be called more than once per worker.
This is a good place to connect to database instances, open network connections or other resources.
DoFn.start_bundle()
:
Called once per bundle of elements before calling process
on the first element of the bundle.
This is a good place to start keeping track of the bundle elements.
`DoFn.process(element, *args, kwargs)`**:
Called once per element, can yield zero or more elements.
Additional *args
or **kwargs
can be passed through
beam.ParDo()
.
[required]
DoFn.finish_bundle()
:
Called once per bundle of elements after calling process
after the last element of the bundle,
can yield zero or more elements. This is a good place to do batch calls on a bundle of elements,
such as running a database query.
For example, you can initialize a batch in start_bundle
,
add elements to the batch in process
instead of yielding them,
then running a batch query on those elements on finish_bundle
, and yielding all the results.
Note that yielded elements from finish_bundle
must be of the type
apache_beam.utils.windowed_value.WindowedValue
.
You need to provide a timestamp as a unix timestamp, which you can get from the last processed element.
You also need to provide a window, which you can get from the last processed element like in the example below.
DoFn.teardown()
:
Called once (as a best effort) per DoFn
instance when the DoFn
instance is shutting down.
This is a good place to close database instances, close network connections or other resources.
Note that teardown
is called as a best effort and is not guaranteed.
For example, if the worker crashes, teardown
might not be called.
In [ ]:
import apache_beam as beam
class DoFnMethods(beam.DoFn):
def __init__(self):
print('__init__')
self.window = beam.window.GlobalWindow()
def setup(self):
print('setup')
def start_bundle(self):
print('start_bundle')
def process(self, element, window=beam.DoFn.WindowParam):
self.window = window
yield '* process: ' + element
def finish_bundle(self):
yield beam.utils.windowed_value.WindowedValue(
value='* finish_bundle: 🌱🌳🌍',
timestamp=0,
windows=[self.window],
)
def teardown(self):
print('teardown')
with beam.Pipeline() as pipeline:
results = (
pipeline
| 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])
| 'DoFn methods' >> beam.ParDo(DoFnMethods())
| beam.Map(print)
)
|
Known issues:
- [BEAM-7885]
DoFn.setup()
doesn't run for streaming jobs running in theDirectRunner
.- [BEAM-7340]
DoFn.teardown()
metrics are lost.