In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License")
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
|
|
Applies a simple 1-to-many mapping function over each element in the collection. The many elements are flattened into the resulting collection.
To run a code cell, you can click the Run cell button at the top left of the cell,
or select it and press Shift+Enter.
Try modifying a code cell and re-running it to see what happens.
To learn more about Colab, see Welcome to Colaboratory!.
First, let's install the apache-beam module.
In [ ]:
!pip install --quiet -U apache-beam
In the following examples, we create a pipeline with a PCollection of produce with their icon, name, and duration.
Then, we apply FlatMap in multiple ways to yield zero or more elements per each input element into the resulting PCollection.
FlatMap accepts a function that returns an iterable,
where each of the output iterable's elements is an element of the resulting PCollection.
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'🍓Strawberry 🥕Carrot 🍆Eggplant',
'🍅Tomato 🥔Potato',
])
| 'Split words' >> beam.FlatMap(str.split)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def split_words(text):
return text.split(',')
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'🍓Strawberry,🥕Carrot,🍆Eggplant',
'🍅Tomato,🥔Potato',
])
| 'Split words' >> beam.FlatMap(split_words)
| beam.Map(print)
)
For this example, we want to flatten a PCollection of lists of strs into a PCollection of strs.
Each input element is already an iterable, where each element is what we want in the resulting PCollection.
We use a lambda function that returns the same input element it received.
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
['🍅Tomato', '🥔Potato'],
])
| 'Flatten lists' >> beam.FlatMap(lambda elements: elements)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def generate_elements(elements):
for element in elements:
yield element
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
['🍓Strawberry', '🥕Carrot', '🍆Eggplant'],
['🍅Tomato', '🥔Potato'],
])
| 'Flatten lists' >> beam.FlatMap(generate_elements)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def format_plant(icon, plant):
if icon:
yield '{}{}'.format(icon, plant)
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
('🍓', 'Strawberry'),
('🥕', 'Carrot'),
('🍆', 'Eggplant'),
('🍅', 'Tomato'),
('🥔', 'Potato'),
(None, 'Invalid'),
])
| 'Format' >> beam.FlatMapTuple(format_plant)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def split_words(text, delimiter=None):
return text.split(delimiter)
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'🍓Strawberry,🥕Carrot,🍆Eggplant',
'🍅Tomato,🥔Potato',
])
| 'Split words' >> beam.FlatMap(split_words, delimiter=',')
| beam.Map(print)
)
If the PCollection has a single value, such as the average from another computation,
passing the PCollection as a singleton accesses that value.
In this example, we pass a PCollection the value ',' as a singleton.
We then use that value as the delimiter for the str.split method.
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
delimiter = pipeline | 'Create delimiter' >> beam.Create([','])
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'🍓Strawberry,🥕Carrot,🍆Eggplant',
'🍅Tomato,🥔Potato',
])
| 'Split words' >> beam.FlatMap(
lambda text, delimiter: text.split(delimiter),
delimiter=beam.pvalue.AsSingleton(delimiter),
)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def normalize_and_validate_durations(plant, valid_durations):
plant['duration'] = plant['duration'].lower()
if plant['duration'] in valid_durations:
yield plant
with beam.Pipeline() as pipeline:
valid_durations = pipeline | 'Valid durations' >> beam.Create([
'annual',
'biennial',
'perennial',
])
valid_plants = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 'Perennial'},
{'icon': '🥕', 'name': 'Carrot', 'duration': 'BIENNIAL'},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 'perennial'},
{'icon': '🍅', 'name': 'Tomato', 'duration': 'annual'},
{'icon': '🥔', 'name': 'Potato', 'duration': 'unknown'},
])
| 'Normalize and validate durations' >> beam.FlatMap(
normalize_and_validate_durations,
valid_durations=beam.pvalue.AsIter(valid_durations),
)
| beam.Map(print)
)
|
|
Note: You can pass the
PCollectionas a list withbeam.pvalue.AsList(pcollection), but this requires that all the elements fit into memory.
If a PCollection is small enough to fit into memory, then that PCollection can be passed as a dictionary.
Each element must be a (key, value) pair.
Note that all the elements of the PCollection must fit into memory for this.
If the PCollection won't fit into memory, use beam.pvalue.AsIter(pcollection) instead.
In [ ]:
import apache_beam as beam
def replace_duration_if_valid(plant, durations):
if plant['duration'] in durations:
plant['duration'] = durations[plant['duration']]
yield plant
with beam.Pipeline() as pipeline:
durations = pipeline | 'Durations dict' >> beam.Create([
(0, 'annual'),
(1, 'biennial'),
(2, 'perennial'),
])
valid_plants = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 2},
{'icon': '🥕', 'name': 'Carrot', 'duration': 1},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 2},
{'icon': '🍅', 'name': 'Tomato', 'duration': 0},
{'icon': '🥔', 'name': 'Potato', 'duration': -1},
])
| 'Replace duration if valid' >> beam.FlatMap(
replace_duration_if_valid,
durations=beam.pvalue.AsDict(durations),
)
| beam.Map(print)
)
|
|