In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License")
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

ParDo

Pydoc




A transform for generic parallel processing. A ParDo transform considers each element in the input PCollection, performs some processing function (your user code) on that element, and emits zero or more elements to an output PCollection.

See more information in the Beam Programming Guide.

Setup

To run a code cell, you can click the Run cell button at the top left of the cell, or select it and press Shift+Enter. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see Welcome to Colaboratory!.

First, let's install the apache-beam module.


In [ ]:
!pip install --quiet -U apache-beam

Examples

In the following examples, we explore how to create custom DoFns and access the timestamp and windowing information.

Example 1: ParDo with a simple DoFn

The following example defines a simple DoFn class called SplitWords which stores the delimiter as an object field. The process method is called once per element, and it can yield zero or more output elements.


In [ ]:
import apache_beam as beam

class SplitWords(beam.DoFn):
  def __init__(self, delimiter=','):
    self.delimiter = delimiter

  def process(self, text):
    for word in text.split(self.delimiter):
      yield word

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '🍓Strawberry,🥕Carrot,🍆Eggplant',
          '🍅Tomato,🥔Potato',
      ])
      | 'Split words' >> beam.ParDo(SplitWords(','))
      | beam.Map(print)
  )

Example 2: ParDo with timestamp and window information

In this example, we add new parameters to the process method to bind parameter values at runtime.


In [ ]:
import apache_beam as beam

class AnalyzeElement(beam.DoFn):
  def process(self, elem, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
    yield '\n'.join([
        '# timestamp',
        'type(timestamp) -> ' + repr(type(timestamp)),
        'timestamp.micros -> ' + repr(timestamp.micros),
        'timestamp.to_rfc3339() -> ' + repr(timestamp.to_rfc3339()),
        'timestamp.to_utc_datetime() -> ' + repr(timestamp.to_utc_datetime()),
        '',
        '# window',
        'type(window) -> ' + repr(type(window)),
        'window.start -> {} ({})'.format(window.start, window.start.to_utc_datetime()),
        'window.end -> {} ({})'.format(window.end, window.end.to_utc_datetime()),
        'window.max_timestamp() -> {} ({})'.format(window.max_timestamp(), window.max_timestamp().to_utc_datetime()),
    ])

with beam.Pipeline() as pipeline:
  dofn_params = (
      pipeline
      | 'Create a single test element' >> beam.Create([':)'])
      | 'Add timestamp (Spring equinox 2020)' >> beam.Map(
          lambda elem: beam.window.TimestampedValue(elem, 1584675660))
      | 'Fixed 30sec windows' >> beam.WindowInto(beam.window.FixedWindows(30))
      | 'Analyze element' >> beam.ParDo(AnalyzeElement())
      | beam.Map(print)
  )

Example 3: ParDo with DoFn methods

A DoFn can be customized with a number of methods that can help create more complex behaviors. You can customize what a worker does when it starts and shuts down with setup and teardown. You can also customize what to do when a bundle of elements starts and finishes with start_bundle and finish_bundle.

  • DoFn.setup(): Called once per DoFn instance when the DoFn instance is initialized. setup need not to be cached, so it could be called more than once per worker. This is a good place to connect to database instances, open network connections or other resources.

  • DoFn.start_bundle(): Called once per bundle of elements before calling process on the first element of the bundle. This is a good place to start keeping track of the bundle elements.

  • `DoFn.process(element, *args, kwargs)`**: Called once per element, can yield zero or more elements. Additional *args or **kwargs can be passed through beam.ParDo(). [required]

  • DoFn.finish_bundle(): Called once per bundle of elements after calling process after the last element of the bundle, can yield zero or more elements. This is a good place to do batch calls on a bundle of elements, such as running a database query.

    For example, you can initialize a batch in start_bundle, add elements to the batch in process instead of yielding them, then running a batch query on those elements on finish_bundle, and yielding all the results.

    Note that yielded elements from finish_bundle must be of the type apache_beam.utils.windowed_value.WindowedValue. You need to provide a timestamp as a unix timestamp, which you can get from the last processed element. You also need to provide a window, which you can get from the last processed element like in the example below.

  • DoFn.teardown(): Called once (as a best effort) per DoFn instance when the DoFn instance is shutting down. This is a good place to close database instances, close network connections or other resources.

    Note that teardown is called as a best effort and is not guaranteed. For example, if the worker crashes, teardown might not be called.


In [ ]:
import apache_beam as beam

class DoFnMethods(beam.DoFn):
  def __init__(self):
    print('__init__')
    self.window = beam.window.GlobalWindow()

  def setup(self):
    print('setup')

  def start_bundle(self):
    print('start_bundle')

  def process(self, element, window=beam.DoFn.WindowParam):
    self.window = window
    yield '* process: ' + element

  def finish_bundle(self):
    yield beam.utils.windowed_value.WindowedValue(
        value='* finish_bundle: 🌱🌳🌍',
        timestamp=0,
        windows=[self.window],
    )

  def teardown(self):
    print('teardown')

with beam.Pipeline() as pipeline:
  results = (
      pipeline
      | 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])
      | 'DoFn methods' >> beam.ParDo(DoFnMethods())
      | beam.Map(print)
  )
View source code




Known issues:

  • [BEAM-7885] DoFn.setup() doesn't run for streaming jobs running in the DirectRunner.
  • [BEAM-7340] DoFn.teardown() metrics are lost.
  • Map behaves the same, but produces exactly one output for each input.
  • FlatMap behaves the same as Map, but for each input it may produce zero or more outputs.
  • Filter is useful if the function is just deciding whether to output an element or not.
Pydoc