In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License")
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

Filter

Pydoc




Given a predicate, filter out all elements that don't satisfy that predicate. May also be used to filter based on an inequality with a given value based on the comparison ordering of the element.

Setup

To run a code cell, you can click the Run cell button at the top left of the cell, or select it and press Shift+Enter. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see Welcome to Colaboratory!.

First, let's install the apache-beam module.


In [ ]:
!pip install --quiet -U apache-beam

Examples

In the following examples, we create a pipeline with a PCollection of produce with their icon, name, and duration. Then, we apply Filter in multiple ways to filter out produce by their duration value.

Filter accepts a function that keeps elements that return True, and filters out the remaining elements.

Example 1: Filtering with a function

We define a function is_perennial which returns True if the element's duration equals 'perennial', and False otherwise.


In [ ]:
import apache_beam as beam

def is_perennial(plant):
  return plant['duration'] == 'perennial'

with beam.Pipeline() as pipeline:
  perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
      ])
      | 'Filter perennials' >> beam.Filter(is_perennial)
      | beam.Map(print)
  )

Example 2: Filtering with a lambda function

We can also use lambda functions to simplify Example 1.


In [ ]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
      ])
      | 'Filter perennials' >> beam.Filter(
          lambda plant: plant['duration'] == 'perennial')
      | beam.Map(print)
  )

Example 3: Filtering with multiple arguments

You can pass functions with multiple arguments to Filter. They are passed as additional positional arguments or keyword arguments to the function.

In this example, has_duration takes plant and duration as arguments.


In [ ]:
import apache_beam as beam

def has_duration(plant, duration):
  return plant['duration'] == duration

with beam.Pipeline() as pipeline:
  perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
      ])
      | 'Filter perennials' >> beam.Filter(has_duration, 'perennial')
      | beam.Map(print)
  )

Example 4: Filtering with side inputs as singletons

If the PCollection has a single value, such as the average from another computation, passing the PCollection as a singleton accesses that value.

In this example, we pass a PCollection the value 'perennial' as a singleton. We then use that value to filter out perennials.


In [ ]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  perennial = pipeline | 'Perennial' >> beam.Create(['perennial'])

  perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
      ])
      | 'Filter perennials' >> beam.Filter(
          lambda plant, duration: plant['duration'] == duration,
          duration=beam.pvalue.AsSingleton(perennial),
      )
      | beam.Map(print)
  )

Example 5: Filtering with side inputs as iterators

If the PCollection has multiple values, pass the PCollection as an iterator. This accesses elements lazily as they are needed, so it is possible to iterate over large PCollections that won't fit into memory.


In [ ]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  valid_durations = pipeline | 'Valid durations' >> beam.Create([
      'annual',
      'biennial',
      'perennial',
  ])

  valid_plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'PERENNIAL'},
      ])
      | 'Filter valid plants' >> beam.Filter(
          lambda plant, valid_durations: plant['duration'] in valid_durations,
          valid_durations=beam.pvalue.AsIter(valid_durations),
      )
      | beam.Map(print)
  )
View source code




Note: You can pass the PCollection as a list with beam.pvalue.AsList(pcollection), but this requires that all the elements fit into memory.

Example 6: Filtering with side inputs as dictionaries

If a PCollection is small enough to fit into memory, then that PCollection can be passed as a dictionary. Each element must be a (key, value) pair. Note that all the elements of the PCollection must fit into memory for this. If the PCollection won't fit into memory, use beam.pvalue.AsIter(pcollection) instead.


In [ ]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  keep_duration = pipeline | 'Duration filters' >> beam.Create([
      ('annual', False),
      ('biennial', False),
      ('perennial', True),
  ])

  perennials = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
          {'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
      ])
      | 'Filter plants by duration' >> beam.Filter(
          lambda plant, keep_duration: keep_duration[plant['duration']],
          keep_duration=beam.pvalue.AsDict(keep_duration),
      )
      | beam.Map(print)
  )
  • FlatMap behaves the same as Map, but for each input it might produce zero or more outputs.
  • ParDo is the most general elementwise mapping operation, and includes other abilities such as multiple output collections and side-inputs.
Pydoc