Copyright © 2019 The TensorFlow Authors.

In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

TFX – Introduction to Apache Beam

TFX is designed to be scalable to very large datasets which require substantial resources. Distributed pipeline frameworks such as Apache Beam offer the ability to distribute processing across compute clusters and apply the resources required. Many of the standard TFX components use Apache Beam, and custom components that you may write may also benefit from using Apache Beam for distibuted processing.

This notebook introduces the concepts and code patterns for developing with the Apache Beam Python API.

Setup

First, we install the necessary packages, download data, import modules and set up paths.

Install TensorFlow and Apache Beam

Note

Because of some of the updates to packages you must use the button at the bottom of the output of this cell to restart the runtime. Following restart, you should rerun this cell.


In [0]:
!pip install -q -U \
  tensorflow==2.0.0 \
  apache-beam

Import packages

We import necessary packages, including Beam.


In [0]:
from datetime import datetime
import os
import pprint
import tempfile
import urllib
pp = pprint.PrettyPrinter()

import tensorflow as tf

import apache_beam as beam
from apache_beam import pvalue
from apache_beam.runners.interactive.display import pipeline_graph
import graphviz

In [0]:
print('TensorFlow version: {}'.format(tf.__version__))
print('Beam version: {}'.format(beam.__version__))

Create a Beam Pipeline

Create a pipeline, including a simple PCollection and a ParDo() transform.

  • A PCollection<T> is an immutable collection of values of type T. A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like Read and Create), and can be passed as the inputs of other PTransforms.
  • ParDo is the core element-wise transform in Apache Beam, invoking a user-specified function on each of the elements of the input PCollection to produce zero or more output elements, all of which are collected into the output PCollection.

First, use the .run() method.


In [0]:
first_pipeline = beam.Pipeline()

lines = (first_pipeline
         | "Create" >> beam.Create(["Hello", "World", "!!!"]) # PCollection
         | "Print" >> beam.ParDo(print)) # ParDo transform

result = first_pipeline.run()
result.state

Display the structure of this pipeline.


In [0]:
def display_pipeline(pipeline):
  graph = pipeline_graph.PipelineGraph(pipeline)
  return graphviz.Source(graph.get_dot())

display_pipeline(first_pipeline)

Next, invoke run inside a with block.


In [0]:
with beam.Pipeline() as with_pipeline:
  lines = (with_pipeline
         | "Create" >> beam.Create(["Hello", "World", "!!!"])
         | "Print" >> beam.ParDo(print))

display_pipeline(with_pipeline)

Exercise 1 — Creating and Running Your Beam Pipeline

  1. Build a Beam pipeline that creates a PCollection containing integers 0 to 10 and prints them.
  2. Add a step in the pipeline to square each item.
  3. Display the pipeline.

Warning: the ParDo() method must either return None or a list.


In [0]:


In [0]:


In [0]:


In [0]:


In [0]:

Solution:


In [0]:
with beam.Pipeline() as with_pipeline:
  lines = (with_pipeline
         | "Create" >> beam.Create(range(10 + 1))
         | "Square" >> beam.ParDo(lambda x: [x ** 2])
         | "Print" >> beam.ParDo(print))

display_pipeline(with_pipeline)

Core Transforms


Beam has a set of core transforms on data that is contained in PCollections. In the cells that follow, explore several core transforms and observe the results in order to develop some understanding and intuition for what each transform does.

Map

The Map transform applies a simple 1-to-1 mapping function over each element in the collection. Map accepts a function that returns a single element for every input element in the PCollection. You can pass functions with multiple arguments to Map. They are passed as additional positional arguments or keyword arguments to the function.

First, compare the results of a ParDo transform and a Map transform.


In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | "Create" >> beam.Create([1, 2, 3])
           | "Multiply" >> beam.ParDo(lambda number: [number * 2]) # ParDo with integers
           | "Print" >> beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
         | "Create" >> beam.Create([1, 2, 3])
         | "Multiply" >> beam.Map(lambda number: number * 2) # Map with integers
         | "Print" >> beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
         | "Create" >> beam.Create(["Hello Beam", "This is cool"])
         | "Split" >> beam.ParDo(lambda sentence: sentence.split()) # ParDo with strings
         | "Print" >> beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
         | "Create" >> beam.Create(["Hello Beam", "This is cool"])
         | "Split" >> beam.Map(lambda sentence: sentence.split()) # Map with strings
         | "Print" >> beam.ParDo(print))

In [0]:
class BreakIntoWordsDoFn(beam.DoFn):
    def process(self, element):
        return element.split()

with beam.Pipeline() as pipeline:
  lines = (pipeline
         | "Create" >> beam.Create(["Hello Beam", "This is cool"])
         | "Split" >> beam.ParDo(BreakIntoWordsDoFn()) # Apply a DoFn with a process method
         | "Print" >> beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
         | "Create" >> beam.Create(["Hello Beam", "This is cool"])
         | "Split" >> beam.FlatMap(lambda sentence: sentence.split()) # Compare to a FlatMap
         | "Print" >> beam.ParDo(print))

GroupByKey

GroupByKey takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key.

GroupByKey is a transform for processing collections of key/value pairs. It’s a parallel reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The input to GroupByKey is a collection of key/value pairs that represents a multimap, where the collection contains multiple pairs that have the same key, but different values. Given such a collection, you use GroupByKey to collect all of the values associated with each unique key.

GroupByKey is a good way to aggregate data that has something in common. For example, if you have a collection that stores records of customer orders, you might want to group together all the orders from the same postal code (wherein the “key” of the key/value pair is the postal code field, and the “value” is the remainder of the record).


In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(['apple', 'ball', 'car', 'bear', 'cheetah', 'ant'])
           | beam.Map(lambda word: (word[0], word))
           | beam.GroupByKey()
           | beam.ParDo(print))

Exercise 2 — Group Items by Key

  1. Build a Beam pipeline that creates a PCollection containing integers 0 to 10 and prints them.
  2. Add a step in the pipeline to add a key to each item that will indicate whether it is even or odd.
  3. Use GroupByKey to group even items together and odd items together.

In [0]:


In [0]:


In [0]:


In [0]:


In [0]:

Solution:


In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(range(10 + 1))
           | beam.Map(lambda x: ("odd" if x % 2 else "even", x))
           | beam.GroupByKey()
           | beam.ParDo(print))

CoGroupByKey can combine multiple PCollections, assuming every element is a tuple whose first item is the key to join on.


In [0]:
pipeline = beam.Pipeline()

fruits = pipeline | 'Fruits' >> beam.Create(['apple',
                                             'banana',
                                             'cherry'])
countries = pipeline | 'Countries' >> beam.Create(['australia',
                                                   'brazil',
                                                   'belgium',
                                                   'canada'])

def add_key(word):
    return (word[0], word)

fruits_with_keys = fruits | "fruits_with_keys" >> beam.Map(add_key)
countries_with_keys = countries | "countries_with_keys" >> beam.Map(add_key)

({"fruits": fruits_with_keys, "countries": countries_with_keys}
    | beam.CoGroupByKey()
    | beam.Map(print))

pipeline.run()

Combine

Combine is a transform for combining collections of elements or values. Combine has variants that work on entire PCollections, and some that combine the values for each key in PCollections of key/value pairs.

To apply a Combine transform, you must provide the function that contains the logic for combining the elements or values. The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple workers, the combining function might be called multiple times to perform partial combining on subsets of the value collection. The Beam SDK also provides some pre-built combine functions for common numeric combination operations such as sum, min, and max.

Simple combine operations, such as sums, can usually be implemented as a simple function. More complex combination operations might require you to create a subclass of CombineFn that has an accumulation type distinct from the input/output type.


In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create([1, 2, 3, 4, 5])
           | beam.CombineGlobally(sum)
           | beam.Map(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create([1, 2, 3, 4, 5])
           | beam.combiners.Mean.Globally()
           | beam.Map(print))

In [0]:
class AverageFn(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)

    def add_input(self, accumulator, input_):
        total, count = accumulator
        total += input_
        count += 1
        return (total, count)

    def merge_accumulators(self, accumulators):
        totals, counts = zip(*accumulators)
        return sum(totals), sum(counts)

    def extract_output(self, accumulator):
        total, count = accumulator
        return total / count if count else float("NaN")


with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create([1, 2, 3, 4, 5])
           | beam.CombineGlobally(AverageFn())
           | beam.Map(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(['bob', 'alice', 'alice', 'bob', 'charlie', 'alice'])
           | beam.combiners.Count.PerElement()
           | beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(['bob', 'alice', 'alice', 'bob', 'charlie', 'alice'])
           | beam.Map(lambda word: (word, 1))
           | beam.CombinePerKey(sum)
           | beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(['bob', 'alice', 'alice', 'bob', 'charlie', 'alice'])
           | beam.combiners.Count.Globally()
           | beam.ParDo(print))

Exercise 3 — Combine Items

  1. Start with Beam pipeline you built in the previous exercise: it creates a PCollection containing integers 0 to 10, groups them by their parity, and prints the groups.
  2. Add a step that computes the mean of each group (i.e., the mean of all odd numbers between 0 and 10, and the mean of all even numbers between 0 and 10).
  3. Add another step to make the pipeline compute the mean of the squares of the numbers in each group.

In [0]:


In [0]:


In [0]:


In [0]:


In [0]:

Solution:


In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(range(10 + 1))
           | beam.Map(lambda x: ("odd" if x % 2 else "even", x))
           | beam.Map(lambda x: (x[0], x[1] ** 2))
           | beam.CombinePerKey(AverageFn())
           | beam.ParDo(print))

Flatten

Flatten is a transform for PCollection objects that store the same data type. Flatten merges multiple PCollection objects into a single logical PCollection.

Data encoding in merged collections

By default, the coder for the output PCollection is the same as the coder for the first PCollection in the input PCollectionList. However, the input PCollection objects can each use different coders, as long as they all contain the same data type in your chosen language.

Merging windowed collections

When using Flatten to merge PCollection objects that have a windowing strategy applied, all of the PCollection objects you want to merge must use a compatible windowing strategy and window sizing. For example, all the collections you're merging must all use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.

If your pipeline attempts to use Flatten to merge PCollection objects with incompatible windows, Beam generates an IllegalStateException error when your pipeline is constructed.


In [0]:
pipeline = beam.Pipeline()

wordsStartingWithA = (pipeline
  | 'Words starting with A' >> beam.Create(['apple', 'ant', 'arrow']))

wordsStartingWithB = (pipeline
  | 'Words starting with B' >> beam.Create(['ball', 'book', 'bow']))

((wordsStartingWithA, wordsStartingWithB)
    | beam.Flatten()
    | beam.ParDo(print))

pipeline.run()

Partition

Partition is a transform for PCollection objects that store the same data type. Partition splits a single PCollection into a fixed number of smaller collections.

Partition divides the elements of a PCollection according to a partitioning function that you provide. The partitioning function contains the logic that determines how to split up the elements of the input PCollection into each resulting partition PCollection. The number of partitions must be determined at graph construction time. You can, for example, pass the number of partitions as a command-line option at runtime (which will then be used to build your pipeline graph), but you cannot determine the number of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).


In [0]:
def partition_fn(number, num_partitions):
    partition = number // 100
    return min(partition, num_partitions - 1)

with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create([1, 110, 2, 350, 4, 5, 100, 150, 3])
           | beam.Partition(partition_fn, 3))

  lines[0] | '< 100' >> beam.ParDo(print, "Small")
  lines[1] | '[100, 200)' >> beam.ParDo(print, "Medium")
  lines[2] | '> 200' >> beam.ParDo(print, "Big")

Side Inputs

In addition to the main input PCollection, you can provide additional inputs to a ParDo transform in the form of side inputs. A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection. When you specify a side input, you create a view of some other data that can be read from within the ParDo transform’s DoFn while processing each element.

Side inputs are useful if your ParDo needs to inject additional data when processing each element in the input PCollection, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline.


In [0]:
def increment(number, inc=1):
    return number + inc

with beam.Pipeline() as pipeline:
  lines = (pipeline
           | "Create" >> beam.Create([1, 2, 3, 4, 5])
           | "Increment" >> beam.Map(increment)
           | "Print" >> beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | "Create" >> beam.Create([1, 2, 3, 4, 5])
           | "Increment" >> beam.Map(increment, 10) # Pass a side input of 10
           | "Print" >> beam.ParDo(print))

Additional Outputs

While ParDo always produces a main output PCollection (as the return value from apply), you can also have your ParDo produce any number of additional output PCollections. If you choose to have multiple outputs, your ParDo returns all of the output PCollections (including the main output) bundled together.

To emit elements to multiple output PCollections, invoke with_outputs() on the ParDo, and specify the expected tags for the outputs. with_outputs() returns a DoOutputsTuple object. Tags specified in with_outputs are attributes on the returned DoOutputsTuple object. The tags give access to the corresponding output PCollections.


In [0]:
def compute(number):
  if number % 2 == 0:
      yield number
  else:
      yield pvalue.TaggedOutput("odd", number + 10)

with beam.Pipeline() as pipeline:
  even, odd = (pipeline
           | "Create" >> beam.Create([1, 2, 3, 4, 5, 6, 7])
           | "Increment" >> beam.ParDo(compute).with_outputs("odd",
                                                             main="even"))
  even | "Even" >> beam.ParDo(print, "even")
  odd | "Odd" >> beam.ParDo(print, "odd")

Branching

A transform does not consume or otherwise alter the input collection – remember that a PCollection is immutable by definition. This means that you can apply multiple transforms to the same input PCollection to create a branching pipeline.


In [0]:
with beam.Pipeline() as branching_pipeline:
  numbers = (branching_pipeline | beam.Create([1, 2, 3, 4, 5]))

  mult5_results = numbers | beam.Map(lambda num: num * 5)
  mult10_results = numbers | beam.Map(lambda num: num * 10)

  mult5_results | 'Log multiply 5' >> beam.ParDo(print, 'Mult 5')
  mult10_results | 'Log multiply 10' >> beam.ParDo(print, 'Mult 10')

display_pipeline(branching_pipeline)

Composite Transforms

Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one ParDo, Combine, GroupByKey, or even other composite transforms). These transforms are called composite transforms. Nesting multiple transforms inside a single composite transform can make your code more modular and easier to understand.

Your composite transform's parameters and return value must match the initial input type and final return type for the entire transform, even if the transform's intermediate data changes type multiple times.

To create a composite transform, create a subclass of the PTransform class and override the expand method to specify the actual processing logic. Then use this transform just as you would a built-in transform.


In [0]:
class ExtractAndMultiplyNumbers(beam.PTransform):
    def expand(self, pcollection):
        return (pcollection
            | beam.FlatMap(lambda line: line.split(","))
            | beam.Map(lambda num: int(num) * 10))

with beam.Pipeline() as composite_pipeline:
  lines = (composite_pipeline
           | beam.Create(['1,2,3,4,5', '6,7,8,9,10'])
           | ExtractAndMultiplyNumbers()
           | beam.ParDo(print))

display_pipeline(composite_pipeline)

Filter

Filter, given a predicate, filters out all elements that don't satisfy that predicate. Filter may also be used to filter based on an inequality with a given value based on the comparison ordering of the element. You can pass functions with multiple arguments to Filter. They are passed as additional positional arguments or keyword arguments to the function. If the PCollection has a single value, such as the average from another computation, passing the PCollection as a singleton accesses that value. If the PCollection has multiple values, pass the PCollection as an iterator. This accesses elements lazily as they are needed, so it is possible to iterate over large PCollections that won't fit into memory.

Note: You can pass the PCollection as a list with beam.pvalue.AsList(pcollection), but this requires that all the elements fit into memory.

If a PCollection is small enough to fit into memory, then that PCollection can be passed as a dictionary. Each element must be a (key, value) pair. Note that all the elements of the PCollection must fit into memory. If the PCollection won't fit into memory, use beam.pvalue.AsIter(pcollection) instead.


In [0]:
class FilterOddNumbers(beam.DoFn):
    def process(self, element, *args, **kwargs):
        if element % 2 == 1:
            yield element

with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(range(1, 11))
           | beam.ParDo(FilterOddNumbers())
           | beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(range(1, 11))
           | beam.Filter(lambda num: num % 2 == 1)
           | beam.ParDo(print))

Aggregation

Beam uses windowing to divide a continuously updating unbounded PCollection into logical windows of finite size. These logical windows are determined by some characteristic associated with a data element, such as a timestamp. Aggregation transforms (such as GroupByKey and Combine) work on a per-window basis — as the data set is generated, they process each PCollection as a succession of these finite windows.

A related concept, called triggers, determines when to emit the results of aggregation as unbounded data arrives. You can use triggers to refine the windowing strategy for your PCollection. Triggers allow you to deal with late-arriving data, or to provide early results.


In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(range(1, 11))
           | beam.combiners.Count.Globally() # Count
           | beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(range(1, 11))
           | beam.CombineGlobally(sum) # CombineGlobally sum
           | beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(range(1, 11))
           | beam.combiners.Mean.Globally() # Mean
           | beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(range(1, 11))
           | beam.combiners.Top.Smallest(1) # Top Smallest
           | beam.ParDo(print))

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.Create(range(1, 11))
           | beam.combiners.Top.Largest(1) # Top Largest
           | beam.ParDo(print))

Pipeline I/O

When you create a pipeline, you often need to read data from some external source, such as a file or a database. Likewise, you may want your pipeline to output its result data to an external storage system. Beam provides read and write transforms for a number of common data storage types. If you want your pipeline to read from or write to a data storage format that isn’t supported by the built-in transforms, you can implement your own read and write transforms.

Download example data

Download the sample dataset for use with the cells below.


In [0]:
DATA_PATH = 'https://raw.githubusercontent.com/ageron/open-datasets/master/' \
   'online_news_popularity_for_course/online_news_popularity_for_course.csv'
_data_root = tempfile.mkdtemp(prefix='tfx-data')
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)

In [0]:
!head {_data_filepath}

In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.io.ReadFromText(_data_filepath)
           | beam.Filter(lambda line: line.startswith("2013-01-07,0,World"))
           | beam.ParDo(print))

Putting Everything Together

Use several of the concepts, classes, and methods discussed above in a concrete example.

Exercise 4 — Reading, Filtering, Parsing, Grouping and Averaging

Write a Beam pipeline that reads the dataset, computes the mean label (the numbers in the last column) for each article category (the third column) and prints out the results.

Hints:

  • Use the code above to read the dataset and change the filtering logic to keep only the year 2013.
  • Add a Map step to split each row on the commas.
  • Add another Map step to add a key equal to the category, and a GroupByKey step to group the articles by their category.
  • Add a step to convert the last column (i.e., the label) to a float, and another step to compute the mean of that column for each category, using beam.combiners.Mean.PerKey.
  • Finally, add a ParDo step to print out the results.

In [0]:


In [0]:


In [0]:


In [0]:


In [0]:

Solution:


In [0]:
with beam.Pipeline() as pipeline:
  lines = (pipeline
           | beam.io.ReadFromText(_data_filepath)
           | beam.Filter(lambda line: line < "2014-01-01")
           | beam.Map(lambda line: line.split(",")) # CSV parser?
           | beam.Map(lambda cols: (cols[2], float(cols[-1])))
           | beam.combiners.Mean.PerKey()
           | beam.ParDo(print))

In [0]:
with tf.io.TFRecordWriter("test.tfrecord") as tfrecord_file:
  for index in range(10):
    tfrecord_file.write("Record {}".format(index).encode("utf-8"))

dataset = tf.data.TFRecordDataset('test.tfrecord')
for record in dataset:
  print(record.numpy())

In [0]:
with beam.Pipeline() as rw_pipeline:
  lines = (rw_pipeline
           | beam.io.ReadFromTFRecord("test.tfrecord")
           | beam.Map(lambda line: line + b' processed')
           | beam.io.WriteToTFRecord("test_processed.tfrecord")
           | beam.ParDo(print))

display_pipeline(rw_pipeline)

In [0]:
with beam.Pipeline() as utf_pipeline:
  lines = (utf_pipeline
           | "Read" >> beam.io.ReadFromTFRecord("test_processed.tfrecord-00000-of-00001")
           | "Decode" >> beam.Map(lambda line: line.decode('utf-8'))
           | "Print" >> beam.ParDo(print))

display_pipeline(utf_pipeline)

Note that there are many other built-in I/O transforms.

Windowing

As discussed above, windowing subdivides a PCollection according to the timestamps of its individual elements.

Some Beam transforms, such as GroupByKey and Combine, group multiple elements by a common key. Ordinarily, that grouping operation groups all of the elements that have the same key within the entire data set. With an unbounded data set, it is impossible to collect all of the elements, since new elements are constantly being added and may be infinitely many (e.g. streaming data). If you are working with unbounded PCollections, windowing is especially useful.

In the Beam model, any PCollection (including unbounded PCollections) can be subdivided into logical windows. Each element in a PCollection is assigned to one or more windows according to the PCollection's windowing function, and each individual window contains a finite number of elements. Grouping transforms then consider each PCollection's elements on a per-window basis. GroupByKey, for example, implicitly groups the elements of a PCollection by key and window.

Additional information on Beam Windowing is available in the Beam Programming Guide.


In [0]:
DAYS = 24 * 60 * 60

class AssignTimestamps(beam.DoFn):
  def process(self, element):
    date = datetime.strptime(element[0], "%Y-%m-%d")
    yield beam.window.TimestampedValue(element, date.timestamp())

with beam.Pipeline() as window_pipeline:
  lines = (window_pipeline
           | beam.io.ReadFromText(_data_filepath)
           | beam.Filter(lambda line: line < "2014-01-01")
           | beam.Map(lambda line: line.split(",")) # CSV parser?
           | beam.ParDo(AssignTimestamps())
           | beam.WindowInto(beam.window.FixedWindows(7*DAYS))
           | beam.Map(lambda cols: (cols[2], float(cols[-1])))
           | beam.combiners.Mean.PerKey()
           | beam.ParDo(print))

display_pipeline(window_pipeline)

In [0]:
class AssignTimestamps(beam.DoFn):
  def process(self, element):
    date = datetime.strptime(element[0], "%Y-%m-%d")
    yield beam.window.TimestampedValue(element, date.timestamp())

class PrintWithTimestamp(beam.DoFn):
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    print(timestamp.to_rfc3339()[:10], element)

with beam.Pipeline() as ts_pipeline:
  lines = (ts_pipeline
           | beam.io.ReadFromText(_data_filepath)
           | beam.Filter(lambda line: line < "2014-01-01")
           | beam.Map(lambda line: line.split(",")) # CSV parser?
           | beam.ParDo(AssignTimestamps())
           | beam.WindowInto(beam.window.FixedWindows(7 * DAYS))
           | beam.Map(lambda cols: (cols[2], float(cols[-1])))
           | beam.combiners.Mean.PerKey()
           | beam.ParDo(PrintWithTimestamp()))

display_pipeline(ts_pipeline)