In this notebook, we set up your development environment and work through a simple example using the DirectRunner. You can explore other runners with the Beam Capatibility Matrix.
To navigate through different sections, use the table of contents. From View drop-down list, select Table of contents.
To run a code cell, you can click the Run cell button at the top left of the cell, or by select it and press Shift+Enter
. Try modifying a code cell and re-running it to see what happens.
To learn more about Colab, see Welcome to Colaboratory!.
In [1]:
import os
# Run and print a shell command.
def run(cmd):
print('>> {}'.format(cmd))
!{cmd}
print('')
# Change directory to $HOME.
print(f"Changing directory to $HOME: {os.environ['HOME']}\n")
os.chdir(os.environ['HOME'])
# Copy the input file into the local filesystem.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')
In [2]:
# Update and upgrade the system before installing anything else.
run('apt-get update > /dev/null')
run('apt-get upgrade > /dev/null')
# Install the Go package.
run('apt-get install golang-go > /dev/null')
# Check the Go version to see if everything is working well.
run('go version')
# Finally, let's install the Apache Beam SDK for Go.
run('go get -u github.com/apache/beam/sdks/go/...')
Go requires all packages to be contained within the GOPATH
. By default it is located in $HOME/go
, you can check yours using the go env GOPATH
command.
Inside the GOPATH
there should be a src
directory that holds up all the packages, and a bin
directory will be created containing all the compiled binaries.
To learn more about Go's directory structure, see How to Write Go Code.
In [3]:
# Get the GOPATH.
cmd_stdout = !go env GOPATH
GOPATH = cmd_stdout[0]
print(f"GOPATH={GOPATH}\n")
# Create our source code wordcount package.
run(f"mkdir -p {GOPATH}/src/wordcount")
The following example is the "Hello, World!" of data processing, a basic implementation of word count. We're creating a simple data processing pipeline that reads a text file and counts the number of occurrences of every word.
There are many scenarios where all the data does not fit in memory. Notice that the outputs of the pipeline go to the file system, which allows for large processing jobs in distributed environments.
In [4]:
%%writefile go/src/wordcount/wordcount.go
package main
import (
"context"
"flag"
"fmt"
"regexp"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
"github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
)
var (
input = flag.String("input", "data/*", "File(s) to read.")
output = flag.String("output", "outputs/wordcounts.txt", "Output filename.")
)
var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
func main() {
flag.Parse()
beam.Init()
pipeline := beam.NewPipeline()
root := pipeline.Root()
lines := textio.Read(root, *input)
words := beam.ParDo(root, func(line string, emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}, lines)
counted := stats.Count(root, words)
formatted := beam.ParDo(root, func(word string, count int) string {
return fmt.Sprintf("%s: %v", word, count)
}, counted)
textio.Write(root, *output, formatted)
direct.Execute(context.Background(), pipeline)
}
In [5]:
# Build and run the program.
run('rm -rf outputs/')
run(f"go run {GOPATH}/src/wordcount/*.go")
# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 outputs/*')
In [6]:
%%writefile go/src/wordcount/wordcount.go
package main
import (
"context"
"flag"
"fmt"
"regexp"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
"github.com/apache/beam/sdks/go/pkg/beam/transforms/stats"
_ "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem/local"
)
var (
input = flag.String("input", "data/*", "File(s) to read.")
output = flag.String("output", "outputs/wordcounts.txt", "Output filename.")
)
var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
func main() {
flag.Parse()
beam.Init()
pipeline := beam.NewPipeline()
root := pipeline.Root()
// Read lines from a text file.
lines := textio.Read(root, *input)
// Use a regular expression to iterate over all words in the line.
words := beam.ParDo(root, func(line string, emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}, lines)
// Count each unique word.
counted := stats.Count(root, words)
// Format the results into a string so we can write them to a file.
formatted := beam.ParDo(root, func(word string, count int) string {
return fmt.Sprintf("%s: %v", word, count)
}, counted)
// Finally, write the results to a file.
textio.Write(root, *output, formatted)
// We have to explicitly run the pipeline, otherwise it's only a definition.
direct.Execute(context.Background(), pipeline)
}
In [7]:
# Build and run the program.
run('rm -rf outputs/')
run('go run go/src/wordcount/*.go 2>/dev/null')
# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 outputs/*')