In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License")
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
|
Given a predicate, filter out all elements that don't satisfy that predicate. May also be used to filter based on an inequality with a given value based on the comparison ordering of the element.
To run a code cell, you can click the Run cell button at the top left of the cell,
or select it and press Shift+Enter
.
Try modifying a code cell and re-running it to see what happens.
To learn more about Colab, see Welcome to Colaboratory!.
First, let's install the apache-beam
module.
In [ ]:
!pip install --quiet -U apache-beam
In the following examples, we create a pipeline with a PCollection
of produce with their icon, name, and duration.
Then, we apply Filter
in multiple ways to filter out produce by their duration value.
Filter
accepts a function that keeps elements that return True
, and filters out the remaining elements.
In [ ]:
import apache_beam as beam
def is_perennial(plant):
return plant['duration'] == 'perennial'
with beam.Pipeline() as pipeline:
perennials = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
{'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
])
| 'Filter perennials' >> beam.Filter(is_perennial)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
perennials = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
{'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
])
| 'Filter perennials' >> beam.Filter(
lambda plant: plant['duration'] == 'perennial')
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def has_duration(plant, duration):
return plant['duration'] == duration
with beam.Pipeline() as pipeline:
perennials = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
{'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
])
| 'Filter perennials' >> beam.Filter(has_duration, 'perennial')
| beam.Map(print)
)
If the PCollection
has a single value, such as the average from another computation,
passing the PCollection
as a singleton accesses that value.
In this example, we pass a PCollection
the value 'perennial'
as a singleton.
We then use that value to filter out perennials.
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
perennial = pipeline | 'Perennial' >> beam.Create(['perennial'])
perennials = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
{'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
])
| 'Filter perennials' >> beam.Filter(
lambda plant, duration: plant['duration'] == duration,
duration=beam.pvalue.AsSingleton(perennial),
)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
valid_durations = pipeline | 'Valid durations' >> beam.Create([
'annual',
'biennial',
'perennial',
])
valid_plants = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
{'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
{'icon': '🥔', 'name': 'Potato', 'duration': 'PERENNIAL'},
])
| 'Filter valid plants' >> beam.Filter(
lambda plant, valid_durations: plant['duration'] in valid_durations,
valid_durations=beam.pvalue.AsIter(valid_durations),
)
| beam.Map(print)
)
|
Note: You can pass the
PCollection
as a list withbeam.pvalue.AsList(pcollection)
, but this requires that all the elements fit into memory.
If a PCollection
is small enough to fit into memory, then that PCollection
can be passed as a dictionary.
Each element must be a (key, value)
pair.
Note that all the elements of the PCollection
must fit into memory for this.
If the PCollection
won't fit into memory, use beam.pvalue.AsIter(pcollection)
instead.
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
keep_duration = pipeline | 'Duration filters' >> beam.Create([
('annual', False),
('biennial', False),
('perennial', True),
])
perennials = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 'perennial'},
{'icon': '🥕', 'name': 'Carrot', 'duration': 'biennial'},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
{'icon': '🥔', 'name': 'Potato', 'duration': 'perennial'},
])
| 'Filter plants by duration' >> beam.Filter(
lambda plant, keep_duration: keep_duration[plant['duration']],
keep_duration=beam.pvalue.AsDict(keep_duration),
)
| beam.Map(print)
)