Try Apache Beam - Go

In this notebook, we set up your development environment and work through a simple example using the DirectRunner. You can explore other runners with the Beam Capatibility Matrix.

To navigate through different sections, use the table of contents. From View drop-down list, select Table of contents.

To run a code cell, you can click the Run cell button at the top left of the cell, or by select it and press Shift+Enter. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see Welcome to Colaboratory!.

Setup

First, you need to set up your environment.


In [1]:
import os

# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}
  print('')

# Change directory to $HOME.
print(f"Changing directory to $HOME: {os.environ['HOME']}\n")
os.chdir(os.environ['HOME'])

# Copy the input file into the local filesystem.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')


Changing directory to $HOME: /root

>> mkdir -p data

>> gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/
Copying gs://dataflow-samples/shakespeare/kinglear.txt...
/ [1 files][153.6 KiB/153.6 KiB]                                                
Operation completed over 1 objects/153.6 KiB.                                    

Installing development tools

Let's start by installing Go. This will take a while, so feel free to go for a walk or do some stretching.


In [2]:
# Update and upgrade the system before installing anything else.
run('apt-get update > /dev/null')
run('apt-get upgrade > /dev/null')

# Install the Go package.
run('apt-get install golang-go > /dev/null')

# Check the Go version to see if everything is working well.
run('go version')

# Finally, let's install the Apache Beam SDK for Go.
run('go get -u github.com/apache/beam/sdks/go/...')


>> apt-get update > /dev/null

>> apt-get upgrade > /dev/null

>> apt-get install golang-go > /dev/null

>> go version
go version go1.10.4 linux/amd64

>> go get -u github.com/apache/beam/sdks/go/...

Creating the directory structure

Go requires all packages to be contained within the GOPATH. By default it is located in $HOME/go, you can check yours using the go env GOPATH command.

Inside the GOPATH there should be a src directory that holds up all the packages, and a bin directory will be created containing all the compiled binaries.

To learn more about Go's directory structure, see How to Write Go Code.


In [3]:
# Get the GOPATH.
cmd_stdout = !go env GOPATH
GOPATH = cmd_stdout[0]
print(f"GOPATH={GOPATH}\n")

# Create our source code wordcount package.
run(f"mkdir -p {GOPATH}/src/wordcount")


GOPATH=/root/go

>> mkdir -p /root/go/src/wordcount

Minimal word count

The following example is the "Hello, World!" of data processing, a basic implementation of word count. We're creating a simple data processing pipeline that reads a text file and counts the number of occurrences of every word.

There are many scenarios where all the data does not fit in memory. Notice that the outputs of the pipeline go to the file system, which allows for large processing jobs in distributed environments.

wordcount.go


In [4]:
%%writefile go/src/wordcount/wordcount.go

package main

import (
	"context"
 	"flag"
	"fmt"
	"regexp"

	"github.com/apache/beam/sdks/go/pkg/beam"
	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
	"github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
	"github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"

	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
)

var (
	input = flag.String("input", "data/*", "File(s) to read.")
	output = flag.String("output", "outputs/wordcounts.txt", "Output filename.")
)

var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)

func main() {
  flag.Parse()

	beam.Init()

	pipeline := beam.NewPipeline()
	root := pipeline.Root()

	lines := textio.Read(root, *input)
	words := beam.ParDo(root, func(line string, emit func(string)) {
		for _, word := range wordRE.FindAllString(line, -1) {
			emit(word)
		}
	}, lines)
	counted := stats.Count(root, words)
	formatted := beam.ParDo(root, func(word string, count int) string {
		return fmt.Sprintf("%s: %v", word, count)
	}, counted)
	textio.Write(root, *output, formatted)

	direct.Execute(context.Background(), pipeline)
}


Writing go/src/wordcount/wordcount.go

Building and running

Go allows us to run a program without having to explicitly compile it. Internally it will compile the source code into a binary and then run it.


In [5]:
# Build and run the program.
run('rm -rf outputs/')
run(f"go run {GOPATH}/src/wordcount/*.go")

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 outputs/*')


>> rm -rf outputs/

>> go run /root/go/src/wordcount/*.go
2019/03/04 23:05:37 Executing pipeline with the direct runner.
2019/03/04 23:05:37 Pipeline:
2019/03/04 23:05:37 Nodes: {1: []uint8/bytes GLO}
{2: string/string[string] GLO}
{3: string/string[string] GLO}
{4: string/string[string] GLO}
{5: string/string[string] GLO}
{6: KV<string,int>/KV<string[string],int[varintz]> GLO}
{7: CoGBK<string,int>/CoGBK<string[string],int[varintz]> GLO}
{8: KV<string,int>/KV<string[string],int[varintz]> GLO}
{9: string/string[string] GLO}
{10: KV<int,string>/KV<int[varintz],string[string]> GLO}
{11: CoGBK<int,string>/CoGBK<int[varintz],string[string]> GLO}
Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}]
2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T -> {2: string/string[string] GLO}]
3: ParDo [In(Main): string <- {2: string/string[string] GLO}] -> [Out: string -> {3: string/string[string] GLO}]
4: ParDo [In(Main): string <- {3: string/string[string] GLO}] -> [Out: string -> {4: string/string[string] GLO}]
5: ParDo [In(Main): string <- {4: string/string[string] GLO}] -> [Out: string -> {5: string/string[string] GLO}]
6: ParDo [In(Main): T <- {5: string/string[string] GLO}] -> [Out: KV<T,int> -> {6: KV<string,int>/KV<string[string],int[varintz]> GLO}]
7: CoGBK [In(Main): KV<string,int> <- {6: KV<string,int>/KV<string[string],int[varintz]> GLO}] -> [Out: CoGBK<string,int> -> {7: CoGBK<string,int>/CoGBK<string[string],int[varintz]> GLO}]
8: Combine [In(Main): int <- {7: CoGBK<string,int>/CoGBK<string[string],int[varintz]> GLO}] -> [Out: KV<string,int> -> {8: KV<string,int>/KV<string[string],int[varintz]> GLO}]
9: ParDo [In(Main): KV<string,int> <- {8: KV<string,int>/KV<string[string],int[varintz]> GLO}] -> [Out: string -> {9: string/string[string] GLO}]
10: ParDo [In(Main): T <- {9: string/string[string] GLO}] -> [Out: KV<int,T> -> {10: KV<int,string>/KV<int[varintz],string[string]> GLO}]
11: CoGBK [In(Main): KV<int,string> <- {10: KV<int,string>/KV<int[varintz],string[string]> GLO}] -> [Out: CoGBK<int,string> -> {11: CoGBK<int,string>/CoGBK<int[varintz],string[string]> GLO}]
12: ParDo [In(Main): CoGBK<int,string> <- {11: CoGBK<int,string>/CoGBK<int[varintz],string[string]> GLO}] -> []
2019/03/04 23:05:37 Plan[plan]:
14: Impulse[0]
1: ParDo[textio.writeFileFn] Out:[]
2: CoGBK. Out:1
3: Inject[0]. Out:2
4: ParDo[beam.addFixedKeyFn] Out:[3]
5: ParDo[main.main.func2] Out:[4]
6: Combine[stats.sumIntFn] Keyed:false Out:5
7: CoGBK. Out:6
8: Inject[0]. Out:7
9: ParDo[stats.mapFn] Out:[8]
10: ParDo[main.main.func1] Out:[9]
11: ParDo[textio.readFn] Out:[10]
12: ParDo[textio.expandFn] Out:[11]
13: ParDo[beam.createFn] Out:[12]
2019/03/04 23:05:37 Reading from data/kinglear.txt
2019/03/04 23:05:37 Writing to outputs/wordcounts.txt

>> head -n 20 outputs/*
breeding: 3
alas: 1
condition: 2
whole: 1
rarity: 1
hoping: 1
oath: 4
pretence: 2
beastly: 1
chide: 1
mile: 1
Villain: 1
preach: 1
rescue: 1
Alarum: 2
loath: 1
clotpoll: 1
shortly: 2
alack: 3
What: 75

Word count with comments

Below is mostly the same code as above, but with comments explaining every line in more detail.


In [6]:
%%writefile go/src/wordcount/wordcount.go

package main

import (
	"context"
  "flag"
	"fmt"
	"regexp"

	"github.com/apache/beam/sdks/go/pkg/beam"
	"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
	"github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
	"github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"

	_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
)

var (
	input = flag.String("input", "data/*", "File(s) to read.")
	output = flag.String("output", "outputs/wordcounts.txt", "Output filename.")
)

var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)

func main() {
  flag.Parse()

	beam.Init()

	pipeline := beam.NewPipeline()
	root := pipeline.Root()

  // Read lines from a text file.
	lines := textio.Read(root, *input)

  // Use a regular expression to iterate over all words in the line.
	words := beam.ParDo(root, func(line string, emit func(string)) {
		for _, word := range wordRE.FindAllString(line, -1) {
			emit(word)
		}
	}, lines)

  // Count each unique word.
	counted := stats.Count(root, words)

  // Format the results into a string so we can write them to a file.
	formatted := beam.ParDo(root, func(word string, count int) string {
		return fmt.Sprintf("%s: %v", word, count)
	}, counted)

  // Finally, write the results to a file.
	textio.Write(root, *output, formatted)

  // We have to explicitly run the pipeline, otherwise it's only a definition.
	direct.Execute(context.Background(), pipeline)
}


Overwriting go/src/wordcount/wordcount.go

In [7]:
# Build and run the program.
run('rm -rf outputs/')
run('go run go/src/wordcount/*.go 2>/dev/null')

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 outputs/*')


>> rm -rf outputs/

>> go run go/src/wordcount/*.go 2>/dev/null

>> head -n 20 outputs/*
hawthorn: 2
With: 31
vain: 3
football: 1
showest: 1
rarest: 1
Acquaint: 1
Bids: 1
another: 9
tadpole: 1
Oppressed: 1
Revoke: 1
images: 1
lameness: 1
Instantly: 1
rages: 1
Neither: 1
quest: 1
mills: 1
weapons: 1