In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License")
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
|
Applies a simple 1-to-many mapping function over each element in the collection. The many elements are flattened into the resulting collection.
To run a code cell, you can click the Run cell button at the top left of the cell,
or select it and press Shift+Enter
.
Try modifying a code cell and re-running it to see what happens.
To learn more about Colab, see Welcome to Colaboratory!.
First, let's install the apache-beam
module.
In [ ]:
!pip install --quiet -U apache-beam
In the following examples, we create a pipeline with a PCollection
of produce with their icon, name, and duration.
Then, we apply FlatMap
in multiple ways to yield zero or more elements per each input element into the resulting PCollection
.
FlatMap
accepts a function that returns an iterable
,
where each of the output iterable
's elements is an element of the resulting PCollection
.
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'🍓Strawberry 🥕Carrot 🍆Eggplant',
'🍅Tomato 🥔Potato',
])
| 'Split words' >> beam.FlatMap(str.split)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def split_words(text):
return text.split(',')
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'🍓Strawberry,🥕Carrot,🍆Eggplant',
'🍅Tomato,🥔Potato',
])
| 'Split words' >> beam.FlatMap(split_words)
| beam.Map(print)
)
For this example, we want to flatten a PCollection
of lists of str
s into a PCollection
of str
s.
Each input element is already an iterable
, where each element is what we want in the resulting PCollection
.
We use a lambda function that returns the same input element it received.
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
['🍅Tomato', '🥔Potato'],
])
| 'Flatten lists' >> beam.FlatMap(lambda elements: elements)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def generate_elements(elements):
for element in elements:
yield element
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
['🍅Tomato', '🥔Potato'],
])
| 'Flatten lists' >> beam.FlatMap(generate_elements)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def format_plant(icon, plant):
if icon:
yield '{}{}'.format(icon, plant)
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
('🍓', 'Strawberry'),
('🥕', 'Carrot'),
('🍆', 'Eggplant'),
('🍅', 'Tomato'),
('🥔', 'Potato'),
(None, 'Invalid'),
])
| 'Format' >> beam.FlatMapTuple(format_plant)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def split_words(text, delimiter=None):
return text.split(delimiter)
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'🍓Strawberry,🥕Carrot,🍆Eggplant',
'🍅Tomato,🥔Potato',
])
| 'Split words' >> beam.FlatMap(split_words, delimiter=',')
| beam.Map(print)
)
If the PCollection
has a single value, such as the average from another computation,
passing the PCollection
as a singleton accesses that value.
In this example, we pass a PCollection
the value ','
as a singleton.
We then use that value as the delimiter for the str.split
method.
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
delimiter = pipeline | 'Create delimiter' >> beam.Create([','])
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'🍓Strawberry,🥕Carrot,🍆Eggplant',
'🍅Tomato,🥔Potato',
])
| 'Split words' >> beam.FlatMap(
lambda text, delimiter: text.split(delimiter),
delimiter=beam.pvalue.AsSingleton(delimiter),
)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def normalize_and_validate_durations(plant, valid_durations):
plant['duration'] = plant['duration'].lower()
if plant['duration'] in valid_durations:
yield plant
with beam.Pipeline() as pipeline:
valid_durations = pipeline | 'Valid durations' >> beam.Create([
'annual',
'biennial',
'perennial',
])
valid_plants = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 'Perennial'},
{'icon': '🥕', 'name': 'Carrot', 'duration': 'BIENNIAL'},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
{'icon': '🥔', 'name': 'Potato', 'duration': 'unknown'},
])
| 'Normalize and validate durations' >> beam.FlatMap(
normalize_and_validate_durations,
valid_durations=beam.pvalue.AsIter(valid_durations),
)
| beam.Map(print)
)
|
Note: You can pass the
PCollection
as a list withbeam.pvalue.AsList(pcollection)
, but this requires that all the elements fit into memory.
If a PCollection
is small enough to fit into memory, then that PCollection
can be passed as a dictionary.
Each element must be a (key, value)
pair.
Note that all the elements of the PCollection
must fit into memory for this.
If the PCollection
won't fit into memory, use beam.pvalue.AsIter(pcollection)
instead.
In [ ]:
import apache_beam as beam
def replace_duration_if_valid(plant, durations):
if plant['duration'] in durations:
plant['duration'] = durations[plant['duration']]
yield plant
with beam.Pipeline() as pipeline:
durations = pipeline | 'Durations dict' >> beam.Create([
(0, 'annual'),
(1, 'biennial'),
(2, 'perennial'),
])
valid_plants = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 2},
{'icon': '🥕', 'name': 'Carrot', 'duration': 1},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 2},
{'icon': '🍅', 'name': 'Tomato', 'duration': 0},
{'icon': '🥔', 'name': 'Potato', 'duration': -1},
])
| 'Replace duration if valid' >> beam.FlatMap(
replace_duration_if_valid,
durations=beam.pvalue.AsDict(durations),
)
| beam.Map(print)
)
|