In [ ]:
#@title Licensed under the Apache License, Version 2.0 (the "License")
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

Map

Pydoc




Applies a simple 1-to-1 mapping function over each element in the collection.

Setup

To run a code cell, you can click the Run cell button at the top left of the cell, or select it and press Shift+Enter. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see Welcome to Colaboratory!.

First, let's install the apache-beam module.


In [ ]:
!pip install --quiet -U apache-beam

Examples

In the following examples, we create a pipeline with a PCollection of produce with their icon, name, and duration. Then, we apply Map in multiple ways to transform every element in the PCollection.

Map accepts a function that returns a single element for every input element in the PCollection.

Example 1: Map with a predefined function

We use the function str.strip which takes a single str element and outputs a str. It strips the input element's whitespaces, including newlines and tabs.


In [ ]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '   🍓Strawberry   \n',
          '   🥕Carrot   \n',
          '   🍆Eggplant   \n',
          '   🍅Tomato   \n',
          '   🥔Potato   \n',
      ])
      | 'Strip' >> beam.Map(str.strip)
      | beam.Map(print)
  )

Example 2: Map with a function

We define a function strip_header_and_newline which strips any '#', ' ', and '\n' characters from each element.


In [ ]:
import apache_beam as beam

def strip_header_and_newline(text):
  return text.strip('# \n')

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(strip_header_and_newline)
      | beam.Map(print)
  )

Example 3: Map with a lambda function

We can also use lambda functions to simplify Example 2.


In [ ]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(lambda text: text.strip('# \n'))
      | beam.Map(print)
  )

Example 4: Map with multiple arguments

You can pass functions with multiple arguments to Map. They are passed as additional positional arguments or keyword arguments to the function.

In this example, strip takes text and chars as arguments.


In [ ]:
import apache_beam as beam

def strip(text, chars=None):
  return text.strip(chars)

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(strip, chars='# \n')
      | beam.Map(print)
  )

Example 5: MapTuple for key-value pairs

If your PCollection consists of (key, value) pairs, you can use MapTuple to unpack them into different function arguments.


In [ ]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          ('🍓', 'Strawberry'),
          ('🥕', 'Carrot'),
          ('🍆', 'Eggplant'),
          ('🍅', 'Tomato'),
          ('🥔', 'Potato'),
      ])
      | 'Format' >> beam.MapTuple(
          lambda icon, plant: '{}{}'.format(icon, plant))
      | beam.Map(print)
  )

Example 6: Map with side inputs as singletons

If the PCollection has a single value, such as the average from another computation, passing the PCollection as a singleton accesses that value.

In this example, we pass a PCollection the value '# \n' as a singleton. We then use that value as the characters for the str.strip method.


In [ ]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  chars = pipeline | 'Create chars' >> beam.Create(['# \n'])

  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(
          lambda text, chars: text.strip(chars),
          chars=beam.pvalue.AsSingleton(chars),
      )
      | beam.Map(print)
  )

Example 7: Map with side inputs as iterators

If the PCollection has multiple values, pass the PCollection as an iterator. This accesses elements lazily as they are needed, so it is possible to iterate over large PCollections that won't fit into memory.


In [ ]:
import apache_beam as beam

with beam.Pipeline() as pipeline:
  chars = pipeline | 'Create chars' >> beam.Create(['#', ' ', '\n'])

  plants = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          '# 🍓Strawberry\n',
          '# 🥕Carrot\n',
          '# 🍆Eggplant\n',
          '# 🍅Tomato\n',
          '# 🥔Potato\n',
      ])
      | 'Strip header' >> beam.Map(
          lambda text, chars: text.strip(''.join(chars)),
          chars=beam.pvalue.AsIter(chars),
      )
      | beam.Map(print)
  )
View source code




Note: You can pass the PCollection as a list with beam.pvalue.AsList(pcollection), but this requires that all the elements fit into memory.

Example 8: Map with side inputs as dictionaries

If a PCollection is small enough to fit into memory, then that PCollection can be passed as a dictionary. Each element must be a (key, value) pair. Note that all the elements of the PCollection must fit into memory for this. If the PCollection won't fit into memory, use beam.pvalue.AsIter(pcollection) instead.


In [ ]:
import apache_beam as beam

def replace_duration(plant, durations):
  plant['duration'] = durations[plant['duration']]
  return plant

with beam.Pipeline() as pipeline:
  durations = pipeline | 'Durations' >> beam.Create([
      (0, 'annual'),
      (1, 'biennial'),
      (2, 'perennial'),
  ])

  plant_details = (
      pipeline
      | 'Gardening plants' >> beam.Create([
          {'icon': '🍓', 'name': 'Strawberry', 'duration': 2},
          {'icon': '🥕', 'name': 'Carrot', 'duration': 1},
          {'icon': '🍆', 'name': 'Eggplant', 'duration': 2},
          {'icon': '🍅', 'name': 'Tomato', 'duration': 0},
          {'icon': '🥔', 'name': 'Potato', 'duration': 2},
      ])
      | 'Replace duration' >> beam.Map(
          replace_duration,
          durations=beam.pvalue.AsDict(durations),
      )
      | beam.Map(print)
  )
  • FlatMap behaves the same as Map, but for each input it may produce zero or more outputs.
  • Filter is useful if the function is just deciding whether to output an element or not.
  • ParDo is the most general elementwise mapping operation, and includes other abilities such as multiple output collections and side-inputs.
Pydoc