Interactive Beam Running on Flink


In [ ]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
from apache_beam.runners.portability import flink_runner

p = beam.Pipeline(interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()))

In [ ]:
init_pcoll = p | beam.Create(range(10))
squares = init_pcoll | 'Square' >> beam.Map(lambda x: x*x)
cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x**3)
result = p.run()
result.wait_until_finish()

In [ ]:
result.get(squares)

In [ ]:
class AverageFn(beam.CombineFn):
  def create_accumulator(self):
    return (0.0, 0)

  def add_input(self, sum_count, input):
    (sum, count) = sum_count
    return sum + input, count + 1

  def merge_accumulators(self, accumulators):
    sums, counts = zip(*accumulators)
    return sum(sums), sum(counts)

  def extract_output(self, sum_count):
    (sum, count) = sum_count
    return sum / count if count else float('NaN')

In [ ]:
average_square = squares | 'Average Square' >> beam.CombineGlobally(AverageFn())
average_cube = cubes | 'Average Cube' >> beam.CombineGlobally(AverageFn())
result = p.run()

In [ ]:
result.get(average_square)