In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License")
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
To run a code cell, you can click the Run cell button at the top left of the cell,
or select it and press Shift+Enter
.
Try modifying a code cell and re-running it to see what happens.
To learn more about Colab, see Welcome to Colaboratory!.
First, let's install the apache-beam
module.
In [ ]:
!pip install --quiet -U apache-beam
In the following examples, we create a pipeline with a PCollection
of produce with their icon, name, and duration.
Then, we apply Map
in multiple ways to transform every element in the PCollection
.
Map
accepts a function that returns a single element for every input element in the PCollection
.
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
' 🍓Strawberry \n',
' 🥕Carrot \n',
' 🍆Eggplant \n',
' 🍅Tomato \n',
' 🥔Potato \n',
])
| 'Strip' >> beam.Map(str.strip)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def strip_header_and_newline(text):
return text.strip('# \n')
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'# 🍓Strawberry\n',
'# 🥕Carrot\n',
'# 🍆Eggplant\n',
'# 🍅Tomato\n',
'# 🥔Potato\n',
])
| 'Strip header' >> beam.Map(strip_header_and_newline)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'# 🍓Strawberry\n',
'# 🥕Carrot\n',
'# 🍆Eggplant\n',
'# 🍅Tomato\n',
'# 🥔Potato\n',
])
| 'Strip header' >> beam.Map(lambda text: text.strip('# \n'))
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
def strip(text, chars=None):
return text.strip(chars)
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'# 🍓Strawberry\n',
'# 🥕Carrot\n',
'# 🍆Eggplant\n',
'# 🍅Tomato\n',
'# 🥔Potato\n',
])
| 'Strip header' >> beam.Map(strip, chars='# \n')
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
('🍓', 'Strawberry'),
('🥕', 'Carrot'),
('🍆', 'Eggplant'),
('🍅', 'Tomato'),
('🥔', 'Potato'),
])
| 'Format' >> beam.MapTuple(
lambda icon, plant: '{}{}'.format(icon, plant))
| beam.Map(print)
)
If the PCollection
has a single value, such as the average from another computation,
passing the PCollection
as a singleton accesses that value.
In this example, we pass a PCollection
the value '# \n'
as a singleton.
We then use that value as the characters for the str.strip
method.
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
chars = pipeline | 'Create chars' >> beam.Create(['# \n'])
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'# 🍓Strawberry\n',
'# 🥕Carrot\n',
'# 🍆Eggplant\n',
'# 🍅Tomato\n',
'# 🥔Potato\n',
])
| 'Strip header' >> beam.Map(
lambda text, chars: text.strip(chars),
chars=beam.pvalue.AsSingleton(chars),
)
| beam.Map(print)
)
In [ ]:
import apache_beam as beam
with beam.Pipeline() as pipeline:
chars = pipeline | 'Create chars' >> beam.Create(['#', ' ', '\n'])
plants = (
pipeline
| 'Gardening plants' >> beam.Create([
'# 🍓Strawberry\n',
'# 🥕Carrot\n',
'# 🍆Eggplant\n',
'# 🍅Tomato\n',
'# 🥔Potato\n',
])
| 'Strip header' >> beam.Map(
lambda text, chars: text.strip(''.join(chars)),
chars=beam.pvalue.AsIter(chars),
)
| beam.Map(print)
)
|
Note: You can pass the
PCollection
as a list withbeam.pvalue.AsList(pcollection)
, but this requires that all the elements fit into memory.
If a PCollection
is small enough to fit into memory, then that PCollection
can be passed as a dictionary.
Each element must be a (key, value)
pair.
Note that all the elements of the PCollection
must fit into memory for this.
If the PCollection
won't fit into memory, use beam.pvalue.AsIter(pcollection)
instead.
In [ ]:
import apache_beam as beam
def replace_duration(plant, durations):
plant['duration'] = durations[plant['duration']]
return plant
with beam.Pipeline() as pipeline:
durations = pipeline | 'Durations' >> beam.Create([
(0, 'annual'),
(1, 'biennial'),
(2, 'perennial'),
])
plant_details = (
pipeline
| 'Gardening plants' >> beam.Create([
{'icon': '🍓', 'name': 'Strawberry', 'duration': 2},
{'icon': '🥕', 'name': 'Carrot', 'duration': 1},
{'icon': '🍆', 'name': 'Eggplant', 'duration': 2},
{'icon': '🍅', 'name': 'Tomato', 'duration': 0},
{'icon': '🥔', 'name': 'Potato', 'duration': 2},
])
| 'Replace duration' >> beam.Map(
replace_duration,
durations=beam.pvalue.AsDict(durations),
)
| beam.Map(print)
)
Map
, but for
each input it may produce zero or more outputs.
|