In this notebook, we set up a Java development environment and work through a simple example using the DirectRunner. You can explore other runners with the Beam Capatibility Matrix.
To navigate through different sections, use the table of contents. From View drop-down list, select Table of contents.
To run a code cell, you can click the Run cell button at the top left of the cell, or by select it and press Shift+Enter
. Try modifying a code cell and re-running it to see what happens.
To learn more about Colab, see Welcome to Colaboratory!.
In [1]:
# Run and print a shell command.
def run(cmd):
print('>> {}'.format(cmd))
!{cmd} # This is magic to run 'cmd' in the shell.
print('')
# Copy the input file into the local filesystem.
run('mkdir -p data')
run('gsutil cp gs://dataflow-samples/shakespeare/kinglear.txt data/')
Let's start by installing Java. We'll use the default-jdk
, which uses OpenJDK. This will take a while, so feel free to go for a walk or do some stretching.
Note: Alternatively, you could install the propietary Oracle JDK instead.
In [2]:
# Update and upgrade the system before installing anything else.
run('apt-get update > /dev/null')
run('apt-get upgrade > /dev/null')
# Install the Java JDK.
run('apt-get install default-jdk > /dev/null')
# Check the Java version to see if everything is working well.
run('javac -version')
In [3]:
import os
# Download the gradle source.
gradle_version = 'gradle-5.0'
gradle_path = f"/opt/{gradle_version}"
if not os.path.exists(gradle_path):
run(f"wget -q -nc -O gradle.zip https://services.gradle.org/distributions/{gradle_version}-bin.zip")
run('unzip -q -d /opt gradle.zip')
run('rm -f gradle.zip')
# We're choosing to use the absolute path instead of adding it to the $PATH environment variable.
def gradle(args):
run(f"{gradle_path}/bin/gradle --console=plain {args}")
gradle('-v')
We'll also need a build.gradle
file which will allow us to invoke some useful commands.
In [4]:
%%writefile build.gradle
plugins {
// id 'idea' // Uncomment for IntelliJ IDE
// id 'eclipse' // Uncomment for Eclipse IDE
// Apply java plugin and make it a runnable application.
id 'java'
id 'application'
// 'shadow' allows us to embed all the dependencies into a fat jar.
id 'com.github.johnrengelman.shadow' version '4.0.3'
}
// This is the path of the main class, stored within ./src/main/java/
mainClassName = 'samples.quickstart.WordCount'
// Declare the sources from which to fetch dependencies.
repositories {
mavenCentral()
}
// Java version compatibility.
sourceCompatibility = 1.8
targetCompatibility = 1.8
// Use the latest Apache Beam major version 2.
// You can also lock into a minor version like '2.9.+'.
ext.apacheBeamVersion = '2.+'
// Declare the dependencies of the project.
dependencies {
shadow "org.apache.beam:beam-sdks-java-core:$apacheBeamVersion"
runtime "org.apache.beam:beam-runners-direct-java:$apacheBeamVersion"
runtime "org.slf4j:slf4j-api:1.+"
runtime "org.slf4j:slf4j-jdk14:1.+"
testCompile "junit:junit:4.+"
}
// Configure 'shadowJar' instead of 'jar' to set up the fat jar.
shadowJar {
baseName = 'WordCount' // Name of the fat jar file.
classifier = null // Set to null, otherwise 'shadow' appends a '-all' to the jar file name.
manifest {
attributes('Main-Class': mainClassName) // Specify where the main class resides.
}
}
Java and Gradle expect a specific directory structure. This helps organize large projects into a standard structure.
For now, we only need a place where our quickstart code will reside. That has to go within ./src/main/java/
.
In [5]:
run('mkdir -p src/main/java/samples/quickstart')
The following example is the "Hello, World!" of data processing, a basic implementation of word count. We're creating a simple data processing pipeline that reads a text file and counts the number of occurrences of every word.
There are many scenarios where all the data does not fit in memory. Notice that the outputs of the pipeline go to the file system, which allows for large processing jobs in distributed environments.
In [6]:
%%writefile src/main/java/samples/quickstart/WordCount.java
package samples.quickstart;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;
public class WordCount {
public static void main(String[] args) {
String inputsDir = "data/*";
String outputsPrefix = "outputs/part";
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read lines", TextIO.read().from(inputsDir))
.apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
.apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
.apply("Count words", Count.perElement())
.apply("Write results", MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
.apply(TextIO.write().to(outputsPrefix));
pipeline.run();
}
}
Let's first check how the final file system structure looks like. These are all the files required to build and run our application.
build.gradle
- build configuration for Gradlesrc/main/java/samples/quickstart/WordCount.java
- application source codedata/kinglear.txt
- input data, this could be any file or filesWe are now ready to build the application using gradle build
.
In [11]:
# Build the project.
gradle('build')
# Check the generated build files.
run('ls -lh build/libs/')
There are two files generated:
content.jar
file, the application generated from the regular build
command. It's only a few kilobytes in size.WordCount.jar
file, with the baseName
we specified in the shadowJar
section of the gradle.build
file. It's a several megabytes in size, with all the required libraries it needs to run embedded in it.The file we're actually interested in is the fat JAR file WordCount.jar
. To run the fat JAR, we'll use the gradle runShadow
command.
In [12]:
# Run the shadow (fat jar) build.
gradle('runShadow')
# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 outputs/part-00000-of-*')
In [13]:
# You can now distribute and run your Java application as a standalone jar file.
run('cp build/libs/WordCount.jar .')
run('java -jar WordCount.jar')
# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 outputs/part-00000-of-*')
In [14]:
%%writefile src/main/java/samples/quickstart/WordCount.java
package samples.quickstart;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;
public class WordCount {
public static void main(String[] args) {
String inputsDir = "data/*";
String outputsPrefix = "outputs/part";
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
// Store the word counts in a PCollection.
// Each element is a KeyValue of (word, count) of types KV<String, Long>.
PCollection<KV<String, Long>> wordCounts =
// The input PCollection is an empty pipeline.
pipeline
// Read lines from a text file.
.apply("Read lines", TextIO.read().from(inputsDir))
// Element type: String - text line
// Use a regular expression to iterate over all words in the line.
// FlatMapElements will yield an element for every element in an iterable.
.apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
// Element type: String - word
// Keep only non-empty words.
.apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
// Element type: String - word
// Count each unique word.
.apply("Count words", Count.perElement());
// Element type: KV<String, Long> - key: word, value: counts
// We can process a PCollection through other pipelines, too.
// The input PCollection are the wordCounts from the previous step.
wordCounts
// Format the results into a string so we can write them to a file.
.apply("Write results", MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
// Element type: str - text line
// Finally, write the results to a file.
.apply(TextIO.write().to(outputsPrefix));
// We have to explicitly run the pipeline, otherwise it's only a definition.
pipeline.run();
}
}
In [15]:
# Build and run the project. The 'runShadow' task implicitly does a 'build'.
gradle('runShadow')
# Sample the first 20 results, remember there are no ordering guarantees.
run('head -n 20 outputs/part-00000-of-*')