Try Apache Beam - Java

In this notebook, we set up a Java development environment and work through a simple example using the DirectRunner. You can explore other runners with the Beam Capatibility Matrix.

To navigate through different sections, use the table of contents. From View drop-down list, select Table of contents.

To run a code cell, you can click the Run cell button at the top left of the cell, or by select it and press Shift+Enter. Try modifying a code cell and re-running it to see what happens.

To learn more about Colab, see Welcome to Colaboratory!.

Setup

First, you need to set up your environment.


In [1]:
# Run and print a shell command.
def run(cmd):
  print('>> {}'.format(cmd))
  !{cmd}  # This is magic to run 'cmd' in the shell.
  print('')

# Copy the input file into the local filesystem.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')


>> mkdir -p data

>> gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/
Copying gs://dataflow-samples/shakespeare/kinglear.txt...
/ [1 files][153.6 KiB/153.6 KiB]                                                
Operation completed over 1 objects/153.6 KiB.                                    

Installing development tools

Let's start by installing Java. We'll use the default-jdk, which uses OpenJDK. This will take a while, so feel free to go for a walk or do some stretching.

Note: Alternatively, you could install the propietary Oracle JDK instead.


In [2]:
# Update and upgrade the system before installing anything else.
run('apt-get update > /dev/null')
run('apt-get upgrade > /dev/null')

# Install the Java JDK.
run('apt-get install default-jdk > /dev/null')

# Check the Java version to see if everything is working well.
run('javac -version')


>> apt-get update > /dev/null

>> apt-get upgrade > /dev/null
Extracting templates from packages: 100%

>> apt-get install default-jdk > /dev/null

>> javac -version
javac 10.0.2

Now, let's install Gradle, which we'll need to automate the build and running processes for our application.

Note: Alternatively, you could install and configure Maven instead.


In [3]:
import os

# Download the gradle source.
gradle_version = 'gradle-5.0'
gradle_path = f"/opt/{gradle_version}"
if not os.path.exists(gradle_path):
  run(f"wget -q -nc -O gradle.zip https://services.gradle.org/distributions/{gradle_version}-bin.zip")
  run('unzip -q -d /opt gradle.zip')
  run('rm -f gradle.zip')

# We're choosing to use the absolute path instead of adding it to the $PATH environment variable.
def gradle(args):
  run(f"{gradle_path}/bin/gradle --console=plain {args}")

gradle('-v')


>> wget -q -nc -O gradle.zip https://services.gradle.org/distributions/gradle-5.0-bin.zip

>> unzip -q -d /opt gradle.zip

>> rm -f gradle.zip

>> /opt/gradle-5.0/bin/gradle --console=plain -v

Welcome to Gradle 5.0!

Here are the highlights of this release:
 - Kotlin DSL 1.0
 - Task timeouts
 - Dependency alignment aka BOM support
 - Interactive `gradle init`

For more details see https://docs.gradle.org/5.0/release-notes.html


------------------------------------------------------------
Gradle 5.0
------------------------------------------------------------

Build time:   2018-11-26 11:48:43 UTC
Revision:     7fc6e5abf2fc5fe0824aec8a0f5462664dbcd987

Kotlin DSL:   1.0.4
Kotlin:       1.3.10
Groovy:       2.5.4
Ant:          Apache Ant(TM) version 1.9.13 compiled on July 10 2018
JVM:          10.0.2 (Oracle Corporation 10.0.2+13-Ubuntu-1ubuntu0.18.04.4)
OS:           Linux 4.14.79+ amd64


build.gradle

We'll also need a build.gradle file which will allow us to invoke some useful commands.


In [4]:
%%writefile build.gradle

plugins {
  // id 'idea'     // Uncomment for IntelliJ IDE
  // id 'eclipse'  // Uncomment for Eclipse IDE

  // Apply java plugin and make it a runnable application.
  id 'java'
  id 'application'

  // 'shadow' allows us to embed all the dependencies into a fat jar.
  id 'com.github.johnrengelman.shadow' version '4.0.3'
}

// This is the path of the main class, stored within ./src/main/java/
mainClassName = 'samples.quickstart.WordCount'

// Declare the sources from which to fetch dependencies.
repositories {
  mavenCentral()
}

// Java version compatibility.
sourceCompatibility = 1.8
targetCompatibility = 1.8

// Use the latest Apache Beam major version 2.
// You can also lock into a minor version like '2.9.+'.
ext.apacheBeamVersion = '2.+'

// Declare the dependencies of the project.
dependencies {
  shadow "org.apache.beam:beam-sdks-java-core:$apacheBeamVersion"

  runtime "org.apache.beam:beam-runners-direct-java:$apacheBeamVersion"
  runtime "org.slf4j:slf4j-api:1.+"
  runtime "org.slf4j:slf4j-jdk14:1.+"

  testCompile "junit:junit:4.+"
}

// Configure 'shadowJar' instead of 'jar' to set up the fat jar.
shadowJar {
  baseName = 'WordCount'  // Name of the fat jar file.
  classifier = null       // Set to null, otherwise 'shadow' appends a '-all' to the jar file name.
  manifest {
    attributes('Main-Class': mainClassName)  // Specify where the main class resides.
  }
}


Writing build.gradle

Creating the directory structure

Java and Gradle expect a specific directory structure. This helps organize large projects into a standard structure.

For now, we only need a place where our quickstart code will reside. That has to go within ./src/main/java/.


In [5]:
run('mkdir -p src/main/java/samples/quickstart')


>> mkdir -p src/main/java/samples/quickstart

Minimal word count

The following example is the "Hello, World!" of data processing, a basic implementation of word count. We're creating a simple data processing pipeline that reads a text file and counts the number of occurrences of every word.

There are many scenarios where all the data does not fit in memory. Notice that the outputs of the pipeline go to the file system, which allows for large processing jobs in distributed environments.

WordCount.java


In [6]:
%%writefile src/main/java/samples/quickstart/WordCount.java

package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;

import java.util.Arrays;

public class WordCount {
  public static void main(String[] args) {
    String inputsDir = "data/*";
    String outputsPrefix = "outputs/part";

    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read lines", TextIO.read().from(inputsDir))
        .apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
            .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
        .apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
        .apply("Count words", Count.perElement())
        .apply("Write results", MapElements.into(TypeDescriptors.strings())
            .via((KV<String, Long> wordCount) ->
                  wordCount.getKey() + ": " + wordCount.getValue()))
        .apply(TextIO.write().to(outputsPrefix));
    pipeline.run();
  }
}


Writing src/main/java/samples/quickstart/WordCount.java

Build and run

Let's first check how the final file system structure looks like. These are all the files required to build and run our application.

  • build.gradle - build configuration for Gradle
  • src/main/java/samples/quickstart/WordCount.java - application source code
  • data/kinglear.txt - input data, this could be any file or files

We are now ready to build the application using gradle build.


In [11]:
# Build the project.
gradle('build')

# Check the generated build files.
run('ls -lh build/libs/')


>> /opt/gradle-5.0/bin/gradle --console=plain build
Starting a Gradle Daemon (subsequent builds will be faster)
> Task :compileJava
> Task :processResources NO-SOURCE
> Task :classes
> Task :jar
> Task :startScripts
> Task :distTar
> Task :distZip
> Task :shadowJar
> Task :startShadowScripts
> Task :shadowDistTar
> Task :shadowDistZip
> Task :assemble
> Task :compileTestJava NO-SOURCE
> Task :processTestResources NO-SOURCE
> Task :testClasses UP-TO-DATE
> Task :test NO-SOURCE
> Task :check UP-TO-DATE
> Task :build

BUILD SUCCESSFUL in 56s
9 actionable tasks: 9 executed

>> ls -lh build/libs/
total 40M
-rw-r--r-- 1 root root 2.9K Mar  4 22:59 content.jar
-rw-r--r-- 1 root root  40M Mar  4 23:00 WordCount.jar

There are two files generated:

  • The content.jar file, the application generated from the regular build command. It's only a few kilobytes in size.
  • The WordCount.jar file, with the baseName we specified in the shadowJar section of the gradle.build file. It's a several megabytes in size, with all the required libraries it needs to run embedded in it.

The file we're actually interested in is the fat JAR file WordCount.jar. To run the fat JAR, we'll use the gradle runShadow command.


In [12]:
# Run the shadow (fat jar) build.
gradle('runShadow')

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 outputs/part-00000-of-*')


>> /opt/gradle-5.0/bin/gradle --console=plain runShadow
> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE
> Task :shadowJar UP-TO-DATE
> Task :startShadowScripts UP-TO-DATE
> Task :installShadowDist

> Task :runShadow
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.UnsafeUtil (file:/content/build/install/content-shadow/lib/WordCount.jar) to field java.nio.Buffer.address
WARNING: Please consider reporting this to the maintainers of org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.UnsafeUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Mar 04, 2019 11:00:24 PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes
INFO: Filepattern data/* matched 1 files with total size 157283
Mar 04, 2019 11:00:24 PM org.apache.beam.sdk.io.FileBasedSource split
INFO: Splitting filepattern data/* into bundles of size 52427 took 1 ms and produced 1 files and 3 bundles
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn processElement
INFO: Opening writer 7d12bbc4-9165-4493-8563-fb710b827daa for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@3ad2e17 pane PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn processElement
INFO: Opening writer 5b1bdb18-9f9a-47cd-80d2-a65baa31aa60 for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@3ad2e17 pane PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn processElement
INFO: Opening writer 6302e6b5-5428-48e6-b571-76a9282d7f45 for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@3ad2e17 pane PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$Writer close
INFO: Successfully wrote temporary file /content/outputs/.temp-beam-2019-03-04_23-00-23-1/7d12bbc4-9165-4493-8563-fb710b827daa
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$Writer close
INFO: Successfully wrote temporary file /content/outputs/.temp-beam-2019-03-04_23-00-23-1/5b1bdb18-9f9a-47cd-80d2-a65baa31aa60
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn processElement
INFO: Opening writer 493e3ec4-f0f7-4d10-8209-079f7ac4db16 for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@3ad2e17 pane PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$Writer close
INFO: Successfully wrote temporary file /content/outputs/.temp-beam-2019-03-04_23-00-23-1/6302e6b5-5428-48e6-b571-76a9282d7f45
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$Writer close
INFO: Successfully wrote temporary file /content/outputs/.temp-beam-2019-03-04_23-00-23-1/493e3ec4-f0f7-4d10-8209-079f7ac4db16
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn process
INFO: Finalizing 4 file results
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation createMissingEmptyShards
INFO: Finalizing for destination null num shards 4.
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation moveToOutputFiles
INFO: Will copy temporary file FileResult{tempFilename=/content/outputs/.temp-beam-2019-03-04_23-00-23-1/7d12bbc4-9165-4493-8563-fb710b827daa, shard=3, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@3ad2e17, paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location /content/outputs/part-00003-of-00004
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation moveToOutputFiles
INFO: Will copy temporary file FileResult{tempFilename=/content/outputs/.temp-beam-2019-03-04_23-00-23-1/5b1bdb18-9f9a-47cd-80d2-a65baa31aa60, shard=2, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@3ad2e17, paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location /content/outputs/part-00002-of-00004
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation moveToOutputFiles
INFO: Will copy temporary file FileResult{tempFilename=/content/outputs/.temp-beam-2019-03-04_23-00-23-1/6302e6b5-5428-48e6-b571-76a9282d7f45, shard=1, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@3ad2e17, paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location /content/outputs/part-00001-of-00004
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation moveToOutputFiles
INFO: Will copy temporary file FileResult{tempFilename=/content/outputs/.temp-beam-2019-03-04_23-00-23-1/493e3ec4-f0f7-4d10-8209-079f7ac4db16, shard=0, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@3ad2e17, paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location /content/outputs/part-00000-of-00004
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
INFO: Will remove known temporary file /content/outputs/.temp-beam-2019-03-04_23-00-23-1/6302e6b5-5428-48e6-b571-76a9282d7f45
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
INFO: Will remove known temporary file /content/outputs/.temp-beam-2019-03-04_23-00-23-1/5b1bdb18-9f9a-47cd-80d2-a65baa31aa60
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
INFO: Will remove known temporary file /content/outputs/.temp-beam-2019-03-04_23-00-23-1/7d12bbc4-9165-4493-8563-fb710b827daa
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
INFO: Will remove known temporary file /content/outputs/.temp-beam-2019-03-04_23-00-23-1/493e3ec4-f0f7-4d10-8209-079f7ac4db16
Mar 04, 2019 11:00:43 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
WARNING: Failed to match temporary files under: [/content/outputs/.temp-beam-2019-03-04_23-00-23-1/].

BUILD SUCCESSFUL in 24s
5 actionable tasks: 2 executed, 3 up-to-date

>> head -n 20 outputs/part-00000-of-*
==> outputs/part-00000-of-00001 <==
(u'canker', 1)
(u'bounty', 2)
(u'provision', 3)
(u'to', 438)
(u'terms', 2)
(u'unnecessary', 2)
(u'tongue', 5)
(u'knives', 1)
(u'Commend', 1)
(u'Hum', 2)
(u'Set', 2)
(u'smell', 6)
(u'dreadful', 3)
(u'frowning', 1)
(u'World', 1)
(u'tike', 1)
(u'yes', 3)
(u'oldness', 1)
(u'boat', 1)
(u"in's", 1)

==> outputs/part-00000-of-00004 <==
retinue: 1
stink: 1
beaks: 1
Ten: 1
riots: 2
Service: 1
dealing: 1
stop: 2
detain: 1
beware: 1
pilferings: 1
swimming: 1
The: 124
Been: 1
behavior: 1
impetuous: 1
Thy: 20
Tis: 24
Soldiers: 7
Juno: 1

Distributing your application

We can run our fat JAR file as long as we have a Java Runtime Environment installed.

To distribute, we copy the fat JAR file and run it with java -jar.


In [13]:
# You can now distribute and run your Java application as a standalone jar file.
run('cp build/libs/WordCount.jar .')
run('java -jar WordCount.jar')

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 outputs/part-00000-of-*')


>> cp build/libs/WordCount.jar .

>> java -jar WordCount.jar
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.UnsafeUtil (file:/content/WordCount.jar) to field java.nio.Buffer.address
WARNING: Please consider reporting this to the maintainers of org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.UnsafeUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Mar 04, 2019 11:00:49 PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes
INFO: Filepattern data/* matched 1 files with total size 157283
Mar 04, 2019 11:00:49 PM org.apache.beam.sdk.io.FileBasedSource split
INFO: Splitting filepattern data/* into bundles of size 52427 took 1 ms and produced 1 files and 3 bundles
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn processElement
INFO: Opening writer 273fc5ad-09b8-4e87-95c9-5d9ec72ed294 for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@7a362b6b pane PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn processElement
INFO: Opening writer c224db69-5869-4259-bd43-ca0431ec77fe for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@7a362b6b pane PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn processElement
INFO: Opening writer f45d6e07-d37a-4af3-ad8b-bc316cef7d99 for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@7a362b6b pane PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.FileBasedSink$Writer close
INFO: Successfully wrote temporary file /content/outputs/.temp-beam-2019-03-04_23-00-49-1/f45d6e07-d37a-4af3-ad8b-bc316cef7d99
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.FileBasedSink$Writer close
INFO: Successfully wrote temporary file /content/outputs/.temp-beam-2019-03-04_23-00-49-1/c224db69-5869-4259-bd43-ca0431ec77fe
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.FileBasedSink$Writer close
INFO: Successfully wrote temporary file /content/outputs/.temp-beam-2019-03-04_23-00-49-1/273fc5ad-09b8-4e87-95c9-5d9ec72ed294
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn process
INFO: Finalizing 3 file results
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation createMissingEmptyShards
INFO: Finalizing for destination null num shards 3.
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation moveToOutputFiles
INFO: Will copy temporary file FileResult{tempFilename=/content/outputs/.temp-beam-2019-03-04_23-00-49-1/273fc5ad-09b8-4e87-95c9-5d9ec72ed294, shard=2, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@7a362b6b, paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location /content/outputs/part-00002-of-00003
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation moveToOutputFiles
INFO: Will copy temporary file FileResult{tempFilename=/content/outputs/.temp-beam-2019-03-04_23-00-49-1/c224db69-5869-4259-bd43-ca0431ec77fe, shard=0, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@7a362b6b, paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location /content/outputs/part-00000-of-00003
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation moveToOutputFiles
INFO: Will copy temporary file FileResult{tempFilename=/content/outputs/.temp-beam-2019-03-04_23-00-49-1/f45d6e07-d37a-4af3-ad8b-bc316cef7d99, shard=1, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@7a362b6b, paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location /content/outputs/part-00001-of-00003
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
INFO: Will remove known temporary file /content/outputs/.temp-beam-2019-03-04_23-00-49-1/273fc5ad-09b8-4e87-95c9-5d9ec72ed294
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
INFO: Will remove known temporary file /content/outputs/.temp-beam-2019-03-04_23-00-49-1/f45d6e07-d37a-4af3-ad8b-bc316cef7d99
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
INFO: Will remove known temporary file /content/outputs/.temp-beam-2019-03-04_23-00-49-1/c224db69-5869-4259-bd43-ca0431ec77fe
Mar 04, 2019 11:01:10 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
WARNING: Failed to match temporary files under: [/content/outputs/.temp-beam-2019-03-04_23-00-49-1/].

>> head -n 20 outputs/part-00000-of-*
==> outputs/part-00000-of-00001 <==
(u'canker', 1)
(u'bounty', 2)
(u'provision', 3)
(u'to', 438)
(u'terms', 2)
(u'unnecessary', 2)
(u'tongue', 5)
(u'knives', 1)
(u'Commend', 1)
(u'Hum', 2)
(u'Set', 2)
(u'smell', 6)
(u'dreadful', 3)
(u'frowning', 1)
(u'World', 1)
(u'tike', 1)
(u'yes', 3)
(u'oldness', 1)
(u'boat', 1)
(u"in's", 1)

==> outputs/part-00000-of-00003 <==
With: 31
justification: 1
hither: 15
make: 46
opposed: 2
prince: 5
Burn: 1
waking: 1
waked: 3
inform: 6
mercy: 5
about: 11
danger: 6
Croak: 1
happier: 1
stick: 2
oppressed: 1
erlook: 1
untented: 1
myself: 10

==> outputs/part-00000-of-00004 <==
retinue: 1
stink: 1
beaks: 1
Ten: 1
riots: 2
Service: 1
dealing: 1
stop: 2
detain: 1
beware: 1
pilferings: 1
swimming: 1
The: 124
Been: 1
behavior: 1
impetuous: 1
Thy: 20
Tis: 24
Soldiers: 7
Juno: 1

Word count with comments

Below is mostly the same code as above, but with comments explaining every line in more detail.


In [14]:
%%writefile src/main/java/samples/quickstart/WordCount.java

package samples.quickstart;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;

import java.util.Arrays;

public class WordCount {
  public static void main(String[] args) {
    String inputsDir = "data/*";
    String outputsPrefix = "outputs/part";

    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);

    // Store the word counts in a PCollection.
    // Each element is a KeyValue of (word, count) of types KV<String, Long>.
    PCollection<KV<String, Long>> wordCounts =
        // The input PCollection is an empty pipeline.
        pipeline

        // Read lines from a text file.
        .apply("Read lines", TextIO.read().from(inputsDir))
        // Element type: String - text line

        // Use a regular expression to iterate over all words in the line.
        // FlatMapElements will yield an element for every element in an iterable.
        .apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
            .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
        // Element type: String - word

        // Keep only non-empty words.
        .apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
        // Element type: String - word

        // Count each unique word.
        .apply("Count words", Count.perElement());
        // Element type: KV<String, Long> - key: word, value: counts

    // We can process a PCollection through other pipelines, too.
    // The input PCollection are the wordCounts from the previous step.
    wordCounts
        // Format the results into a string so we can write them to a file.
        .apply("Write results", MapElements.into(TypeDescriptors.strings())
            .via((KV<String, Long> wordCount) ->
                  wordCount.getKey() + ": " + wordCount.getValue()))
        // Element type: str - text line

        // Finally, write the results to a file.
        .apply(TextIO.write().to(outputsPrefix));

    // We have to explicitly run the pipeline, otherwise it's only a definition.
    pipeline.run();
  }
}


Overwriting src/main/java/samples/quickstart/WordCount.java

In [15]:
# Build and run the project. The 'runShadow' task implicitly does a 'build'.
gradle('runShadow')

# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 outputs/part-00000-of-*')


>> /opt/gradle-5.0/bin/gradle --console=plain runShadow
> Task :compileJava
> Task :processResources NO-SOURCE
> Task :classes
> Task :shadowJar
> Task :startShadowScripts
> Task :installShadowDist

> Task :runShadow
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.UnsafeUtil (file:/content/build/install/content-shadow/lib/WordCount.jar) to field java.nio.Buffer.address
WARNING: Please consider reporting this to the maintainers of org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.UnsafeUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Mar 04, 2019 11:01:26 PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes
INFO: Filepattern data/* matched 1 files with total size 157283
Mar 04, 2019 11:01:26 PM org.apache.beam.sdk.io.FileBasedSource split
INFO: Splitting filepattern data/* into bundles of size 52427 took 1 ms and produced 1 files and 3 bundles
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn processElement
INFO: Opening writer e2eeada2-5a8b-4493-acc5-c706204d9669 for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@8c3619e pane PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn processElement
INFO: Opening writer 7acdc85e-ff7d-42d0-9b2f-9ce385956c0e for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@8c3619e pane PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn processElement
INFO: Opening writer d1a6a591-77f0-4994-affc-d83378e7b7c0 for window org.apache.beam.sdk.transforms.windowing.GlobalWindow@8c3619e pane PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} destination null
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.FileBasedSink$Writer close
INFO: Successfully wrote temporary file /content/outputs/.temp-beam-2019-03-04_23-01-25-1/e2eeada2-5a8b-4493-acc5-c706204d9669
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.FileBasedSink$Writer close
INFO: Successfully wrote temporary file /content/outputs/.temp-beam-2019-03-04_23-01-25-1/7acdc85e-ff7d-42d0-9b2f-9ce385956c0e
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.FileBasedSink$Writer close
INFO: Successfully wrote temporary file /content/outputs/.temp-beam-2019-03-04_23-01-25-1/d1a6a591-77f0-4994-affc-d83378e7b7c0
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn process
INFO: Finalizing 3 file results
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation createMissingEmptyShards
INFO: Finalizing for destination null num shards 3.
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation moveToOutputFiles
INFO: Will copy temporary file FileResult{tempFilename=/content/outputs/.temp-beam-2019-03-04_23-01-25-1/7acdc85e-ff7d-42d0-9b2f-9ce385956c0e, shard=2, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@8c3619e, paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location /content/outputs/part-00002-of-00003
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation moveToOutputFiles
INFO: Will copy temporary file FileResult{tempFilename=/content/outputs/.temp-beam-2019-03-04_23-01-25-1/e2eeada2-5a8b-4493-acc5-c706204d9669, shard=0, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@8c3619e, paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location /content/outputs/part-00000-of-00003
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation moveToOutputFiles
INFO: Will copy temporary file FileResult{tempFilename=/content/outputs/.temp-beam-2019-03-04_23-01-25-1/d1a6a591-77f0-4994-affc-d83378e7b7c0, shard=1, window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@8c3619e, paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}} to final location /content/outputs/part-00001-of-00003
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
INFO: Will remove known temporary file /content/outputs/.temp-beam-2019-03-04_23-01-25-1/e2eeada2-5a8b-4493-acc5-c706204d9669
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
INFO: Will remove known temporary file /content/outputs/.temp-beam-2019-03-04_23-01-25-1/7acdc85e-ff7d-42d0-9b2f-9ce385956c0e
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
INFO: Will remove known temporary file /content/outputs/.temp-beam-2019-03-04_23-01-25-1/d1a6a591-77f0-4994-affc-d83378e7b7c0
Mar 04, 2019 11:01:46 PM org.apache.beam.sdk.io.FileBasedSink$WriteOperation removeTemporaryFiles
WARNING: Failed to match temporary files under: [/content/outputs/.temp-beam-2019-03-04_23-01-25-1/].

BUILD SUCCESSFUL in 33s
5 actionable tasks: 5 executed

>> head -n 20 outputs/part-00000-of-*
==> outputs/part-00000-of-00001 <==
(u'canker', 1)
(u'bounty', 2)
(u'provision', 3)
(u'to', 438)
(u'terms', 2)
(u'unnecessary', 2)
(u'tongue', 5)
(u'knives', 1)
(u'Commend', 1)
(u'Hum', 2)
(u'Set', 2)
(u'smell', 6)
(u'dreadful', 3)
(u'frowning', 1)
(u'World', 1)
(u'tike', 1)
(u'yes', 3)
(u'oldness', 1)
(u'boat', 1)
(u"in's", 1)

==> outputs/part-00000-of-00003 <==
wrath: 3
nicely: 2
hall: 1
Sure: 2
legs: 4
ten: 1
yourselves: 1
embossed: 1
poorly: 1
temper: 2
Dismissing: 1
Legitimate: 1
tyrannous: 1
turn: 13
gold: 2
minds: 1
dowers: 2
policy: 1
I: 708
V: 6

==> outputs/part-00000-of-00004 <==
retinue: 1
stink: 1
beaks: 1
Ten: 1
riots: 2
Service: 1
dealing: 1
stop: 2
detain: 1
beware: 1
pilferings: 1
swimming: 1
The: 124
Been: 1
behavior: 1
impetuous: 1
Thy: 20
Tis: 24
Soldiers: 7
Juno: 1