In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
TFX is designed to be scalable to very large datasets which require substantial resources. Distributed pipeline frameworks such as Apache Beam offer the ability to distribute processing across compute clusters and apply the resources required. Many of the standard TFX components use Apache Beam, and custom components that you may write may also benefit from using Apache Beam for distibuted processing.
This notebook introduces the concepts and code patterns for developing with the Apache Beam Python API.
First, we install the necessary packages, download data, import modules and set up paths.
Note
Because of some of the updates to packages you must use the button at the bottom of the output of this cell to restart the runtime. Following restart, you should rerun this cell.
In [0]:
!pip install -q -U \
tensorflow==2.0.0 \
apache-beam
In [0]:
from datetime import datetime
import os
import pprint
import tempfile
import urllib
pp = pprint.PrettyPrinter()
import tensorflow as tf
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.runners.interactive.display import pipeline_graph
import graphviz
In [0]:
print('TensorFlow version: {}'.format(tf.__version__))
print('Beam version: {}'.format(beam.__version__))
Create a pipeline, including a simple PCollection
and a ParDo()
transform.
PCollection<T>
is an immutable collection of values of type T
. A PCollection
can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections
are produced as the output of PTransforms
(including root PTransforms
like Read
and Create
), and can be passed as the inputs of other PTransforms
.ParDo
is the core element-wise transform in Apache Beam, invoking a user-specified function on each of the elements of the input PCollection
to produce zero or more output elements, all of which are collected into the output PCollection
.First, use the .run()
method.
In [0]:
first_pipeline = beam.Pipeline()
lines = (first_pipeline
| "Create" >> beam.Create(["Hello", "World", "!!!"]) # PCollection
| "Print" >> beam.ParDo(print)) # ParDo transform
result = first_pipeline.run()
result.state
Display the structure of this pipeline.
In [0]:
def display_pipeline(pipeline):
graph = pipeline_graph.PipelineGraph(pipeline)
return graphviz.Source(graph.get_dot())
display_pipeline(first_pipeline)
Next, invoke run inside a with
block.
In [0]:
with beam.Pipeline() as with_pipeline:
lines = (with_pipeline
| "Create" >> beam.Create(["Hello", "World", "!!!"])
| "Print" >> beam.ParDo(print))
display_pipeline(with_pipeline)
Exercise 1 — Creating and Running Your Beam Pipeline
Warning: the ParDo()
method must either return None
or a list.
In [0]:
In [0]:
In [0]:
In [0]:
In [0]:
Solution:
In [0]:
with beam.Pipeline() as with_pipeline:
lines = (with_pipeline
| "Create" >> beam.Create(range(10 + 1))
| "Square" >> beam.ParDo(lambda x: [x ** 2])
| "Print" >> beam.ParDo(print))
display_pipeline(with_pipeline)
Beam has a set of core transforms on data that is contained in PCollections. In the cells that follow, explore several core transforms and observe the results in order to develop some understanding and intuition for what each transform does.
The Map
transform applies a simple 1-to-1 mapping function over each element in the collection. Map
accepts a function that returns a single element for every input element in the PCollection
. You can pass functions with multiple arguments to Map
. They are passed as additional positional arguments or keyword arguments to the function.
First, compare the results of a ParDo
transform and a Map
transform.
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| "Create" >> beam.Create([1, 2, 3])
| "Multiply" >> beam.ParDo(lambda number: [number * 2]) # ParDo with integers
| "Print" >> beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| "Create" >> beam.Create([1, 2, 3])
| "Multiply" >> beam.Map(lambda number: number * 2) # Map with integers
| "Print" >> beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| "Create" >> beam.Create(["Hello Beam", "This is cool"])
| "Split" >> beam.ParDo(lambda sentence: sentence.split()) # ParDo with strings
| "Print" >> beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| "Create" >> beam.Create(["Hello Beam", "This is cool"])
| "Split" >> beam.Map(lambda sentence: sentence.split()) # Map with strings
| "Print" >> beam.ParDo(print))
In [0]:
class BreakIntoWordsDoFn(beam.DoFn):
def process(self, element):
return element.split()
with beam.Pipeline() as pipeline:
lines = (pipeline
| "Create" >> beam.Create(["Hello Beam", "This is cool"])
| "Split" >> beam.ParDo(BreakIntoWordsDoFn()) # Apply a DoFn with a process method
| "Print" >> beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| "Create" >> beam.Create(["Hello Beam", "This is cool"])
| "Split" >> beam.FlatMap(lambda sentence: sentence.split()) # Compare to a FlatMap
| "Print" >> beam.ParDo(print))
GroupByKey
takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key.
GroupByKey
is a transform for processing collections of key/value pairs. It’s a parallel reduction operation, analogous to the Shuffle phase of a Map/Shuffle/Reduce-style algorithm. The input to GroupByKey
is a collection of key/value pairs that represents a multimap, where the collection contains multiple pairs that have the same key, but different values. Given such a collection, you use GroupByKey
to collect all of the values associated with each unique key.
GroupByKey
is a good way to aggregate data that has something in common. For example, if you have a collection that stores records of customer orders, you might want to group together all the orders from the same postal code (wherein the “key” of the key/value pair is the postal code field, and the “value” is the remainder of the record).
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(['apple', 'ball', 'car', 'bear', 'cheetah', 'ant'])
| beam.Map(lambda word: (word[0], word))
| beam.GroupByKey()
| beam.ParDo(print))
Exercise 2 — Group Items by Key
GroupByKey
to group even items together and odd items together.
In [0]:
In [0]:
In [0]:
In [0]:
In [0]:
Solution:
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(range(10 + 1))
| beam.Map(lambda x: ("odd" if x % 2 else "even", x))
| beam.GroupByKey()
| beam.ParDo(print))
CoGroupByKey
can combine multiple PCollections, assuming every element is a tuple whose first item is the key to join on.
In [0]:
pipeline = beam.Pipeline()
fruits = pipeline | 'Fruits' >> beam.Create(['apple',
'banana',
'cherry'])
countries = pipeline | 'Countries' >> beam.Create(['australia',
'brazil',
'belgium',
'canada'])
def add_key(word):
return (word[0], word)
fruits_with_keys = fruits | "fruits_with_keys" >> beam.Map(add_key)
countries_with_keys = countries | "countries_with_keys" >> beam.Map(add_key)
({"fruits": fruits_with_keys, "countries": countries_with_keys}
| beam.CoGroupByKey()
| beam.Map(print))
pipeline.run()
Combine
is a transform for combining collections of elements or values. Combine
has variants that work on entire PCollections
, and some that combine the values for each key in PCollections
of key/value pairs.
To apply a Combine transform
, you must provide the function that contains the logic for combining the elements or values. The combining function should be commutative and associative, as the function is not necessarily invoked exactly once on all values with a given key. Because the input data (including the value collection) may be distributed across multiple workers, the combining function might be called multiple times to perform partial combining on subsets of the value collection. The Beam SDK also provides some pre-built combine functions for common numeric combination operations such as sum
, min
, and max
.
Simple combine operations, such as sums, can usually be implemented as a simple function. More complex combination operations might require you to create a subclass of CombineFn
that has an accumulation type distinct from the input/output type.
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create([1, 2, 3, 4, 5])
| beam.CombineGlobally(sum)
| beam.Map(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create([1, 2, 3, 4, 5])
| beam.combiners.Mean.Globally()
| beam.Map(print))
In [0]:
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, accumulator, input_):
total, count = accumulator
total += input_
count += 1
return (total, count)
def merge_accumulators(self, accumulators):
totals, counts = zip(*accumulators)
return sum(totals), sum(counts)
def extract_output(self, accumulator):
total, count = accumulator
return total / count if count else float("NaN")
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create([1, 2, 3, 4, 5])
| beam.CombineGlobally(AverageFn())
| beam.Map(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(['bob', 'alice', 'alice', 'bob', 'charlie', 'alice'])
| beam.combiners.Count.PerElement()
| beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(['bob', 'alice', 'alice', 'bob', 'charlie', 'alice'])
| beam.Map(lambda word: (word, 1))
| beam.CombinePerKey(sum)
| beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(['bob', 'alice', 'alice', 'bob', 'charlie', 'alice'])
| beam.combiners.Count.Globally()
| beam.ParDo(print))
Exercise 3 — Combine Items
In [0]:
In [0]:
In [0]:
In [0]:
In [0]:
Solution:
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(range(10 + 1))
| beam.Map(lambda x: ("odd" if x % 2 else "even", x))
| beam.Map(lambda x: (x[0], x[1] ** 2))
| beam.CombinePerKey(AverageFn())
| beam.ParDo(print))
Flatten
is a transform for PCollection
objects that store the same data type. Flatten
merges multiple PCollection
objects into a single logical PCollection
.
By default, the coder for the output PCollection
is the same as the coder for the first PCollection
in the input PCollectionList
. However, the input PCollection
objects can each use different coders, as long as they all contain the same data type in your chosen language.
When using Flatten
to merge PCollection
objects that have a windowing strategy applied, all of the PCollection
objects you want to merge must use a compatible windowing strategy and window sizing. For example, all the collections you're merging must all use (hypothetically) identical 5-minute fixed windows or 4-minute sliding windows starting every 30 seconds.
If your pipeline attempts to use Flatten
to merge PCollection
objects with incompatible windows, Beam generates an IllegalStateException
error when your pipeline is constructed.
In [0]:
pipeline = beam.Pipeline()
wordsStartingWithA = (pipeline
| 'Words starting with A' >> beam.Create(['apple', 'ant', 'arrow']))
wordsStartingWithB = (pipeline
| 'Words starting with B' >> beam.Create(['ball', 'book', 'bow']))
((wordsStartingWithA, wordsStartingWithB)
| beam.Flatten()
| beam.ParDo(print))
pipeline.run()
Partition
is a transform for PCollection
objects that store the same data type. Partition
splits a single PCollection
into a fixed number of smaller collections.
Partition
divides the elements of a PCollection
according to a partitioning function that you provide. The partitioning function contains the logic that determines how to split up the elements of the input PCollection
into each resulting partition PCollection
. The number of partitions must be determined at graph construction time. You can, for example, pass the number of partitions as a command-line option at runtime (which will then be used to build your pipeline graph), but you cannot determine the number of partitions in mid-pipeline (based on data calculated after your pipeline graph is constructed, for instance).
In [0]:
def partition_fn(number, num_partitions):
partition = number // 100
return min(partition, num_partitions - 1)
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create([1, 110, 2, 350, 4, 5, 100, 150, 3])
| beam.Partition(partition_fn, 3))
lines[0] | '< 100' >> beam.ParDo(print, "Small")
lines[1] | '[100, 200)' >> beam.ParDo(print, "Medium")
lines[2] | '> 200' >> beam.ParDo(print, "Big")
In addition to the main input PCollection
, you can provide additional inputs to a ParDo
transform in the form of side inputs. A side input is an additional input that your DoFn
can access each time it processes an element in the input PCollection
. When you specify a side input, you create a view of some other data that can be read from within the ParDo
transform’s DoFn
while processing each element.
Side inputs are useful if your ParDo
needs to inject additional data when processing each element in the input PCollection, but the additional data needs to be determined at runtime (and not hard-coded). Such values might be determined by the input data, or depend on a different branch of your pipeline.
In [0]:
def increment(number, inc=1):
return number + inc
with beam.Pipeline() as pipeline:
lines = (pipeline
| "Create" >> beam.Create([1, 2, 3, 4, 5])
| "Increment" >> beam.Map(increment)
| "Print" >> beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| "Create" >> beam.Create([1, 2, 3, 4, 5])
| "Increment" >> beam.Map(increment, 10) # Pass a side input of 10
| "Print" >> beam.ParDo(print))
While ParDo
always produces a main output PCollection
(as the return value from apply
), you can also have your ParDo
produce any number of additional output PCollections
. If you choose to have multiple outputs, your ParDo
returns all of the output PCollections
(including the main output) bundled together.
To emit elements to multiple output PCollections
, invoke with_outputs()
on the ParDo
, and specify the expected tags for the outputs. with_outputs()
returns a DoOutputsTuple
object. Tags specified in with_outputs
are attributes on the returned DoOutputsTuple
object. The tags give access to the corresponding output PCollections
.
In [0]:
def compute(number):
if number % 2 == 0:
yield number
else:
yield pvalue.TaggedOutput("odd", number + 10)
with beam.Pipeline() as pipeline:
even, odd = (pipeline
| "Create" >> beam.Create([1, 2, 3, 4, 5, 6, 7])
| "Increment" >> beam.ParDo(compute).with_outputs("odd",
main="even"))
even | "Even" >> beam.ParDo(print, "even")
odd | "Odd" >> beam.ParDo(print, "odd")
A transform does not consume or otherwise alter the input collection – remember that a PCollection
is immutable by definition. This means that you can apply multiple transforms to the same input PCollection
to create a branching pipeline.
In [0]:
with beam.Pipeline() as branching_pipeline:
numbers = (branching_pipeline | beam.Create([1, 2, 3, 4, 5]))
mult5_results = numbers | beam.Map(lambda num: num * 5)
mult10_results = numbers | beam.Map(lambda num: num * 10)
mult5_results | 'Log multiply 5' >> beam.ParDo(print, 'Mult 5')
mult10_results | 'Log multiply 10' >> beam.ParDo(print, 'Mult 10')
display_pipeline(branching_pipeline)
Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one ParDo
, Combine
, GroupByKey
, or even other composite transforms). These transforms are called composite transforms. Nesting multiple transforms inside a single composite transform can make your code more modular and easier to understand.
Your composite transform's parameters and return value must match the initial input type and final return type for the entire transform, even if the transform's intermediate data changes type multiple times.
To create a composite transform, create a subclass of the PTransform
class and override the expand
method to specify the actual processing logic. Then use this transform just as you would a built-in transform.
In [0]:
class ExtractAndMultiplyNumbers(beam.PTransform):
def expand(self, pcollection):
return (pcollection
| beam.FlatMap(lambda line: line.split(","))
| beam.Map(lambda num: int(num) * 10))
with beam.Pipeline() as composite_pipeline:
lines = (composite_pipeline
| beam.Create(['1,2,3,4,5', '6,7,8,9,10'])
| ExtractAndMultiplyNumbers()
| beam.ParDo(print))
display_pipeline(composite_pipeline)
Filter
, given a predicate, filters out all elements that don't satisfy that predicate. Filter
may also be used to filter based on an inequality with a given value based on the comparison ordering of the element. You can pass functions with multiple arguments to Filter
. They are passed as additional positional arguments or keyword arguments to the function. If the PCollection
has a single value, such as the average from another computation, passing the PCollection
as a singleton accesses that value. If the PCollection
has multiple values, pass the PCollection
as an iterator. This accesses elements lazily as they are needed, so it is possible to iterate over large PCollections
that won't fit into memory.
Note: You can pass the
PCollection
as a list withbeam.pvalue.AsList(pcollection)
, but this requires that all the elements fit into memory.
If a PCollection
is small enough to fit into memory, then that PCollection
can be passed as a dictionary. Each element must be a (key, value) pair. Note that all the elements of the PCollection
must fit into memory. If the PCollection
won't fit into memory, use beam.pvalue.AsIter(pcollection)
instead.
In [0]:
class FilterOddNumbers(beam.DoFn):
def process(self, element, *args, **kwargs):
if element % 2 == 1:
yield element
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(range(1, 11))
| beam.ParDo(FilterOddNumbers())
| beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(range(1, 11))
| beam.Filter(lambda num: num % 2 == 1)
| beam.ParDo(print))
Beam uses windowing to divide a continuously updating unbounded PCollection
into logical windows of finite size. These logical windows are determined by some characteristic associated with a data element, such as a timestamp. Aggregation transforms (such as GroupByKey and Combine) work on a per-window basis — as the data set is generated, they process each PCollection
as a succession of these finite windows.
A related concept, called triggers, determines when to emit the results of aggregation as unbounded data arrives. You can use triggers to refine the windowing strategy for your PCollection
. Triggers allow you to deal with late-arriving data, or to provide early results.
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(range(1, 11))
| beam.combiners.Count.Globally() # Count
| beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(range(1, 11))
| beam.CombineGlobally(sum) # CombineGlobally sum
| beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(range(1, 11))
| beam.combiners.Mean.Globally() # Mean
| beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(range(1, 11))
| beam.combiners.Top.Smallest(1) # Top Smallest
| beam.ParDo(print))
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.Create(range(1, 11))
| beam.combiners.Top.Largest(1) # Top Largest
| beam.ParDo(print))
When you create a pipeline, you often need to read data from some external source, such as a file or a database. Likewise, you may want your pipeline to output its result data to an external storage system. Beam provides read and write transforms for a number of common data storage types. If you want your pipeline to read from or write to a data storage format that isn’t supported by the built-in transforms, you can implement your own read and write transforms.
Download the sample dataset for use with the cells below.
In [0]:
DATA_PATH = 'https://raw.githubusercontent.com/ageron/open-datasets/master/' \
'online_news_popularity_for_course/online_news_popularity_for_course.csv'
_data_root = tempfile.mkdtemp(prefix='tfx-data')
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)
In [0]:
!head {_data_filepath}
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.io.ReadFromText(_data_filepath)
| beam.Filter(lambda line: line.startswith("2013-01-07,0,World"))
| beam.ParDo(print))
Exercise 4 — Reading, Filtering, Parsing, Grouping and Averaging
Write a Beam pipeline that reads the dataset, computes the mean label (the numbers in the last column) for each article category (the third column) and prints out the results.
Hints:
Map
step to split each row on the commas.Map
step to add a key equal to the category, and a GroupByKey
step to group the articles by their category.beam.combiners.Mean.PerKey
.ParDo
step to print out the results.
In [0]:
In [0]:
In [0]:
In [0]:
In [0]:
Solution:
In [0]:
with beam.Pipeline() as pipeline:
lines = (pipeline
| beam.io.ReadFromText(_data_filepath)
| beam.Filter(lambda line: line < "2014-01-01")
| beam.Map(lambda line: line.split(",")) # CSV parser?
| beam.Map(lambda cols: (cols[2], float(cols[-1])))
| beam.combiners.Mean.PerKey()
| beam.ParDo(print))
In [0]:
with tf.io.TFRecordWriter("test.tfrecord") as tfrecord_file:
for index in range(10):
tfrecord_file.write("Record {}".format(index).encode("utf-8"))
dataset = tf.data.TFRecordDataset('test.tfrecord')
for record in dataset:
print(record.numpy())
In [0]:
with beam.Pipeline() as rw_pipeline:
lines = (rw_pipeline
| beam.io.ReadFromTFRecord("test.tfrecord")
| beam.Map(lambda line: line + b' processed')
| beam.io.WriteToTFRecord("test_processed.tfrecord")
| beam.ParDo(print))
display_pipeline(rw_pipeline)
In [0]:
with beam.Pipeline() as utf_pipeline:
lines = (utf_pipeline
| "Read" >> beam.io.ReadFromTFRecord("test_processed.tfrecord-00000-of-00001")
| "Decode" >> beam.Map(lambda line: line.decode('utf-8'))
| "Print" >> beam.ParDo(print))
display_pipeline(utf_pipeline)
Note that there are many other built-in I/O transforms.
As discussed above, windowing subdivides a PCollection
according to the timestamps of its individual elements.
Some Beam transforms, such as GroupByKey
and Combine
, group multiple elements by a common key. Ordinarily, that grouping operation groups all of the elements that have the same key within the entire data set. With an unbounded data set, it is impossible to collect all of the elements, since new elements are constantly being added and may be infinitely many (e.g. streaming data). If you are working with unbounded PCollections
, windowing is especially useful.
In the Beam model, any PCollection
(including unbounded PCollections
) can be subdivided into logical windows. Each element in a PCollection
is assigned to one or more windows according to the PCollection
's windowing function, and each individual window contains a finite number of elements. Grouping transforms then consider each PCollection
's elements on a per-window basis. GroupByKey
, for example, implicitly groups the elements of a PCollection
by key and window.
Additional information on Beam Windowing is available in the Beam Programming Guide.
In [0]:
DAYS = 24 * 60 * 60
class AssignTimestamps(beam.DoFn):
def process(self, element):
date = datetime.strptime(element[0], "%Y-%m-%d")
yield beam.window.TimestampedValue(element, date.timestamp())
with beam.Pipeline() as window_pipeline:
lines = (window_pipeline
| beam.io.ReadFromText(_data_filepath)
| beam.Filter(lambda line: line < "2014-01-01")
| beam.Map(lambda line: line.split(",")) # CSV parser?
| beam.ParDo(AssignTimestamps())
| beam.WindowInto(beam.window.FixedWindows(7*DAYS))
| beam.Map(lambda cols: (cols[2], float(cols[-1])))
| beam.combiners.Mean.PerKey()
| beam.ParDo(print))
display_pipeline(window_pipeline)
In [0]:
class AssignTimestamps(beam.DoFn):
def process(self, element):
date = datetime.strptime(element[0], "%Y-%m-%d")
yield beam.window.TimestampedValue(element, date.timestamp())
class PrintWithTimestamp(beam.DoFn):
def process(self, element, timestamp=beam.DoFn.TimestampParam):
print(timestamp.to_rfc3339()[:10], element)
with beam.Pipeline() as ts_pipeline:
lines = (ts_pipeline
| beam.io.ReadFromText(_data_filepath)
| beam.Filter(lambda line: line < "2014-01-01")
| beam.Map(lambda line: line.split(",")) # CSV parser?
| beam.ParDo(AssignTimestamps())
| beam.WindowInto(beam.window.FixedWindows(7 * DAYS))
| beam.Map(lambda cols: (cols[2], float(cols[-1])))
| beam.combiners.Mean.PerKey()
| beam.ParDo(PrintWithTimestamp()))
display_pipeline(ts_pipeline)