In [ ]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
In [ ]:
p = beam.Pipeline(interactive_runner.InteractiveRunner())
init_pcoll = p | beam.Create(range(10))
squares = init_pcoll | 'Square' >> beam.Map(lambda x: x*x)
cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x**3)
result = p.run()
result.wait_until_finish()
In [ ]:
init_list = list(range(10))
squares_list = list(result.get(squares))
cubes_list = list(result.get(cubes))
squares_list.sort()
cubes_list.sort()
!pip install matplotlib
%matplotlib inline
from matplotlib import pyplot as plt
plt.scatter(init_list, squares_list, label='squares', color='red')
plt.scatter(init_list, cubes_list, label='cubes', color='blue')
plt.legend(loc='upper left')
plt.show()
In [ ]:
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, sum_count, input):
(sum, count) = sum_count
return sum + input, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, sum_count):
(sum, count) = sum_count
return sum / count if count else float('NaN')
In [ ]:
average_square = squares | 'Average Square' >> beam.CombineGlobally(AverageFn())
average_cube = cubes | 'Average Cube' >> beam.CombineGlobally(AverageFn())
In [ ]:
result = p.run()