In [ ]:
    
#@title Licensed under the Apache License, Version 2.0 (the "License")
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
    
| 
 | 
A transform for generic parallel processing.
A ParDo transform considers each element in the input PCollection,
performs some processing function (your user code) on that element,
and emits zero or more elements to an output PCollection.
See more information in the Beam Programming Guide.
To run a code cell, you can click the Run cell button at the top left of the cell,
or select it and press Shift+Enter.
Try modifying a code cell and re-running it to see what happens.
To learn more about Colab, see Welcome to Colaboratory!.
First, let's install the apache-beam module.
In [ ]:
    
!pip install --quiet -U apache-beam
    
In [ ]:
    
import apache_beam as beam
class SplitWords(beam.DoFn):
  def __init__(self, delimiter=','):
    self.delimiter = delimiter
  def process(self, text):
    for word in text.split(self.delimiter):
      yield word
with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '🍓Strawberry,🥕Carrot,🍆Eggplant',
          '🍅Tomato,🥔Potato',
      ])
      | 'Split words' >> beam.ParDo(SplitWords(','))
      | beam.Map(print)
  )
    
In this example, we add new parameters to the process method to bind parameter values at runtime.
beam.DoFn.TimestampParam
binds the timestamp information as an
apache_beam.utils.timestamp.Timestamp
object.beam.DoFn.WindowParam
binds the window information as the appropriate
apache_beam.transforms.window.*Window
object.
In [ ]:
    
import apache_beam as beam
class AnalyzeElement(beam.DoFn):
  def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
    yield '\n'.join([
        '# timestamp',
        'type(timestamp) -> ' + repr(type(timestamp)),
        'timestamp.micros -> ' + repr(timestamp.micros),
        'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
        'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
        '',
        '# window',
        'type(window) -> ' + repr(type(window)),
        'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),
        'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),
        'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
    ])
with beam.Pipeline() as pipeline:
  dofn_params = (
      pipeline
      | 'Create a single test element' >> beam.Create([':)'])
      | 'Add timestamp (Spring equinox 2020)' >> beam.Map(
          lambda elem: beam.window.TimestampedValue(elem, 1584675660))
      | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
      | 'Analyze element' >> beam.ParDo(AnalyzeElement())
      | beam.Map(print)
  )
    
A DoFn
can be customized with a number of methods that can help create more complex behaviors.
You can customize what a worker does when it starts and shuts down with setup and teardown.
You can also customize what to do when a
bundle of elements
starts and finishes with start_bundle and finish_bundle.
DoFn.setup():
Called once per DoFn instance when the DoFn instance is initialized.
setup need not to be cached, so it could be called more than once per worker.
This is a good place to connect to database instances, open network connections or other resources.
DoFn.start_bundle():
Called once per bundle of elements before calling process on the first element of the bundle.
This is a good place to start keeping track of the bundle elements.
`DoFn.process(element, *args, kwargs)`**:
Called once per element, can yield zero or more elements.
Additional *args or **kwargs can be passed through
beam.ParDo().
[required]
DoFn.finish_bundle():
Called once per bundle of elements after calling process after the last element of the bundle,
can yield zero or more elements. This is a good place to do batch calls on a bundle of elements,
such as running a database query.
For example, you can initialize a batch in start_bundle,
add elements to the batch in process instead of yielding them,
then running a batch query on those elements on finish_bundle, and yielding all the results.
Note that yielded elements from finish_bundle must be of the type
apache_beam.utils.windowed_value.WindowedValue.
You need to provide a timestamp as a unix timestamp, which you can get from the last processed element.
You also need to provide a window, which you can get from the last processed element like in the example below.
DoFn.teardown():
Called once (as a best effort) per DoFn instance when the DoFn instance is shutting down.
This is a good place to close database instances, close network connections or other resources.
Note that teardown is called as a best effort and is not guaranteed.
For example, if the worker crashes, teardown might not be called.
In [ ]:
    
import apache_beam as beam
class DoFnMethods(beam.DoFn):
  def __init__(self):
    print('__init__')
    self.window = beam.window.GlobalWindow()
  def setup(self):
    print('setup')
  def start_bundle(self):
    print('start_bundle')
  def process(self, element, window=beam.DoFn.WindowParam):
    self.window = window
    yield '* process: ' + element
  def finish_bundle(self):
    yield beam.utils.windowed_value.WindowedValue(
        value='* finish_bundle: 🌱🌳🌍',
        timestamp=0,
        windows=[self.window],
    )
  def teardown(self):
    print('teardown')
with beam.Pipeline() as pipeline:
  results = (
      pipeline
      | 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])
      | 'DoFn methods' >> beam.ParDo(DoFnMethods())
      | beam.Map(print)
  )
    
| 
 | 
Known issues:
- [BEAM-7885]
 DoFn.setup()doesn't run for streaming jobs running in theDirectRunner.- [BEAM-7340]
 DoFn.teardown()metrics are lost.