In [0]:
#@title Copyright 2020 Google LLC. { display-mode: "form" }
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
This is an Earth Engine <> TensorFlow demonstration notebook. Specifically, this notebook shows:
Sequential
neural network) in TensorFlow.This is intended to demonstrate a complete i/o pipeline. For a worflow that uses a Google AI Platform hosted model making predictions interactively, see this example notebook.
To read/write from a Google Cloud Storage bucket to which you have access, it's necessary to authenticate (as yourself). This should be the same account you use to login to Earth Engine. When you run the code below, it will display a link in the output to an authentication page in your browser. Follow the link to a page that will let you grant permission to the Cloud SDK to access your resources. Copy the code from the permissions page back into this notebook and press return to complete the process.
(You may need to run this again if you get a credentials error later.)
In [0]:
from google.colab import auth
auth.authenticate_user()
Authenticate to Earth Engine the same way you did to the Colab notebook. Specifically, run the code to display a link to a permissions page. This gives you access to your Earth Engine account. This should be the same account you used to login to Cloud previously. Copy the code from the Earth Engine permissions page back into the notebook and press return to complete the process.
In [0]:
import ee
ee.Authenticate()
ee.Initialize()
In [0]:
import tensorflow as tf
print(tf.__version__)
In [0]:
import folium
print(folium.__version__)
This set of global variables will be used throughout. For this demo, you must have a Cloud Storage bucket into which you can write files. (learn more about creating Cloud Storage buckets). You'll also need to specify your Earth Engine username, i.e. users/USER_NAME
on the Code Editor Assets tab.
In [0]:
# Your Earth Engine username. This is used to import a classified image
# into your Earth Engine assets folder.
USER_NAME = 'username'
# Cloud Storage bucket into which training, testing and prediction
# datasets will be written. You must be able to write into this bucket.
OUTPUT_BUCKET = 'your-bucket'
# Use Landsat 8 surface reflectance data for predictors.
L8SR = ee.ImageCollection('LANDSAT/LC08/C01/T1_SR')
# Use these bands for prediction.
BANDS = ['B2', 'B3', 'B4', 'B5', 'B6', 'B7']
# This is a trianing/testing dataset of points with known land cover labels.
LABEL_DATA = ee.FeatureCollection('projects/google/demo_landcover_labels')
# The labels, consecutive integer indices starting from zero, are stored in
# this property, set on each point.
LABEL = 'landcover'
# Number of label values, i.e. number of classes in the classification.
N_CLASSES = 3
# These names are used to specify properties in the export of
# training/testing data and to define the mapping between names and data
# when reading into TensorFlow datasets.
FEATURE_NAMES = list(BANDS)
FEATURE_NAMES.append(LABEL)
# File names for the training and testing datasets. These TFRecord files
# will be exported from Earth Engine into the Cloud Storage bucket.
TRAIN_FILE_PREFIX = 'Training_demo'
TEST_FILE_PREFIX = 'Testing_demo'
file_extension = '.tfrecord.gz'
TRAIN_FILE_PATH = 'gs://' + OUTPUT_BUCKET + '/' + TRAIN_FILE_PREFIX + file_extension
TEST_FILE_PATH = 'gs://' + OUTPUT_BUCKET + '/' + TEST_FILE_PREFIX + file_extension
# File name for the prediction (image) dataset. The trained model will read
# this dataset and make predictions in each pixel.
IMAGE_FILE_PREFIX = 'Image_pixel_demo_'
# The output path for the classified image (i.e. predictions) TFRecord file.
OUTPUT_IMAGE_FILE = 'gs://' + OUTPUT_BUCKET + '/Classified_pixel_demo.TFRecord'
# Export imagery in this region.
EXPORT_REGION = ee.Geometry.Rectangle([-122.7, 37.3, -121.8, 38.00])
# The name of the Earth Engine asset to be created by importing
# the classified image from the TFRecord file in Cloud Storage.
OUTPUT_ASSET_ID = 'users/' + USER_NAME + '/Classified_pixel_demo'
To get data for a classification model of three classes (bare, vegetation, water), we need labels and the value of predictor variables for each labeled example. We've already generated some labels in Earth Engine. Specifically, these are visually interpreted points labeled "bare," "vegetation," or "water" for a very simple classification demo (example script). For predictor variables, we'll use Landsat 8 surface reflectance imagery, bands 2-7.
In [0]:
# Cloud masking function.
def maskL8sr(image):
cloudShadowBitMask = ee.Number(2).pow(3).int()
cloudsBitMask = ee.Number(2).pow(5).int()
qa = image.select('pixel_qa')
mask = qa.bitwiseAnd(cloudShadowBitMask).eq(0).And(
qa.bitwiseAnd(cloudsBitMask).eq(0))
return image.updateMask(mask).select(BANDS).divide(10000)
# The image input data is a 2018 cloud-masked median composite.
image = L8SR.filterDate('2018-01-01', '2018-12-31').map(maskL8sr).median()
# Use folium to visualize the imagery.
mapid = image.getMapId({'bands': ['B4', 'B3', 'B2'], 'min': 0, 'max': 0.3})
map = folium.Map(location=[38., -122.5])
folium.TileLayer(
tiles=mapid['tile_fetcher'].url_format,
attr='Map Data © <a href="https://earthengine.google.com/">Google Earth Engine</a>',
overlay=True,
name='median composite',
).add_to(map)
map.add_child(folium.LayerControl())
map
Some training labels have already been collected for you. Load the labeled points from an existing Earth Engine asset. Each point in this table has a property called landcover
that stores the label, encoded as an integer. Here we overlay the points on imagery to get predictor variables along with labels.
In [0]:
# Sample the image at the points and add a random column.
sample = image.sampleRegions(
collection=LABEL_DATA, properties=[LABEL], scale=30).randomColumn()
# Partition the sample approximately 70-30.
training = sample.filter(ee.Filter.lt('random', 0.7))
testing = sample.filter(ee.Filter.gte('random', 0.7))
from pprint import pprint
# Print the first couple points to verify.
pprint({'training': training.first().getInfo()})
pprint({'testing': testing.first().getInfo()})
Now that there's training and testing data in Earth Engine and you've inspected a couple examples to ensure that the information you need is present, it's time to materialize the datasets in a place where the TensorFlow model has access to them. You can do that by exporting the training and testing datasets to tables in TFRecord format (learn more about TFRecord format) in your Cloud Storage bucket.
In [0]:
# Make sure you can see the output bucket. You must have write access.
print('Found Cloud Storage bucket.' if tf.io.gfile.exists('gs://' + OUTPUT_BUCKET)
else 'Can not find output Cloud Storage bucket.')
Once you've verified the existence of the intended output bucket, run the exports.
In [0]:
# Create the tasks.
training_task = ee.batch.Export.table.toCloudStorage(
collection=training,
description='Training Export',
fileNamePrefix=TRAIN_FILE_PREFIX,
bucket=OUTPUT_BUCKET,
fileFormat='TFRecord',
selectors=FEATURE_NAMES)
testing_task = ee.batch.Export.table.toCloudStorage(
collection=testing,
description='Testing Export',
fileNamePrefix=TEST_FILE_PREFIX,
bucket=OUTPUT_BUCKET,
fileFormat='TFRecord',
selectors=FEATURE_NAMES)
In [0]:
# Start the tasks.
training_task.start()
testing_task.start()
In [0]:
# Print all tasks.
pprint(ee.batch.Task.list())
In [0]:
print('Found training file.' if tf.io.gfile.exists(TRAIN_FILE_PATH)
else 'No training file found.')
print('Found testing file.' if tf.io.gfile.exists(TEST_FILE_PATH)
else 'No testing file found.')
In [0]:
# Specify patch and file dimensions.
image_export_options = {
'patchDimensions': [256, 256],
'maxFileSize': 104857600,
'compressed': True
}
# Setup the task.
image_task = ee.batch.Export.image.toCloudStorage(
image=image,
description='Image Export',
fileNamePrefix=IMAGE_FILE_PREFIX,
bucket=OUTPUT_BUCKET,
scale=30,
fileFormat='TFRecord',
region=EXPORT_REGION.toGeoJSON()['coordinates'],
formatOptions=image_export_options,
)
In [0]:
# Start the task.
image_task.start()
In [0]:
# Print all tasks.
pprint(ee.batch.Task.list())
It's also possible to monitor an individual task. Here we poll the task until it's done. If you do this, please put a sleep()
in the loop to avoid making too many requests. Note that this will block until complete (you can always halt the execution of this cell).
In [0]:
import time
while image_task.active():
print('Polling for task (id: {}).'.format(image_task.id))
time.sleep(30)
print('Done with image export.')
tf.data.Dataset
Here we are going to read a file in Cloud Storage into a tf.data.Dataset
. (these TensorFlow docs explain more about reading data into a Dataset
). Check that you can read examples from the file. The purpose here is to ensure that we can read from the file without an error. The actual content is not necessarily human readable.
In [0]:
# Create a dataset from the TFRecord file in Cloud Storage.
train_dataset = tf.data.TFRecordDataset(TRAIN_FILE_PATH, compression_type='GZIP')
# Print the first record to check.
print(iter(train_dataset).next())
For parsing the exported TFRecord files, featuresDict
is a mapping between feature names (recall that featureNames
contains the band and label names) and float32
tf.io.FixedLenFeature
objects. This mapping is necessary for telling TensorFlow how to read data in a TFRecord file into tensors. Specifically, all numeric data exported from Earth Engine is exported as float32
.
(Note: features in the TensorFlow context (i.e. tf.train.Feature
) are not to be confused with Earth Engine features (i.e. ee.Feature
), where the former is a protocol message type for serialized data input to the model and the latter is a geometry-based geographic data structure.)
In [0]:
# List of fixed-length features, all of which are float32.
columns = [
tf.io.FixedLenFeature(shape=[1], dtype=tf.float32) for k in FEATURE_NAMES
]
# Dictionary with names as keys, features as values.
features_dict = dict(zip(FEATURE_NAMES, columns))
pprint(features_dict)
Now we need to make a parsing function for the data in the TFRecord files. The data comes in flattened 2D arrays per record and we want to use the first part of the array for input to the model and the last element of the array as the class label. The parsing function reads data from a serialized Example
proto into a dictionary in which the keys are the feature names and the values are the tensors storing the value of the features for that example. (These TensorFlow docs explain more about reading Example
protos from TFRecord files).
In [0]:
def parse_tfrecord(example_proto):
"""The parsing function.
Read a serialized example into the structure defined by featuresDict.
Args:
example_proto: a serialized Example.
Returns:
A tuple of the predictors dictionary and the label, cast to an `int32`.
"""
parsed_features = tf.io.parse_single_example(example_proto, features_dict)
labels = parsed_features.pop(LABEL)
return parsed_features, tf.cast(labels, tf.int32)
# Map the function over the dataset.
parsed_dataset = train_dataset.map(parse_tfrecord, num_parallel_calls=5)
# Print the first parsed record to check.
pprint(iter(parsed_dataset).next())
Note that each record of the parsed dataset contains a tuple. The first element of the tuple is a dictionary with bands for keys and the numeric value of the bands for values. The second element of the tuple is a class label.
In [0]:
def normalized_difference(a, b):
"""Compute normalized difference of two inputs.
Compute (a - b) / (a + b). If the denomenator is zero, add a small delta.
Args:
a: an input tensor with shape=[1]
b: an input tensor with shape=[1]
Returns:
The normalized difference as a tensor.
"""
nd = (a - b) / (a + b)
nd_inf = (a - b) / (a + b + 0.000001)
return tf.where(tf.math.is_finite(nd), nd, nd_inf)
def add_NDVI(features, label):
"""Add NDVI to the dataset.
Args:
features: a dictionary of input tensors keyed by feature name.
label: the target label
Returns:
A tuple of the input dictionary with an NDVI tensor added and the label.
"""
features['NDVI'] = normalized_difference(features['B5'], features['B4'])
return features, label
The basic workflow for classification in TensorFlow is:
fit()
).predict()
).Here we'll create a Sequential
neural network model using Keras. This simple model is inspired by examples in:
Note that the model used here is purely for demonstration purposes and hasn't gone through any performance tuning.
Before we create the model, there's still a wee bit of pre-processing to get the data into the right input shape and a format that can be used with cross-entropy loss. Specifically, Keras expects a list of inputs and a one-hot vector for the class. (See the Keras loss function docs, the TensorFlow categorical identity docs and the tf.one_hot
docs for details).
Here we will use a simple neural network model with a 64 node hidden layer, a dropout layer and an output layer. Once the dataset has been prepared, define the model, compile it, fit it to the training data. See the Keras Sequential
model guide for more details.
In [0]:
from tensorflow import keras
# Add NDVI.
input_dataset = parsed_dataset.map(add_NDVI)
# Keras requires inputs as a tuple. Note that the inputs must be in the
# right shape. Also note that to use the categorical_crossentropy loss,
# the label needs to be turned into a one-hot vector.
def to_tuple(inputs, label):
return (tf.transpose(list(inputs.values())),
tf.one_hot(indices=label, depth=N_CLASSES))
# Map the to_tuple function, shuffle and batch.
input_dataset = input_dataset.map(to_tuple).batch(8)
# Define the layers in the model.
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(64, activation=tf.nn.relu),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(N_CLASSES, activation=tf.nn.softmax)
])
# Compile the model with the specified loss function.
model.compile(optimizer=tf.keras.optimizers.Adam(),
loss='categorical_crossentropy',
metrics=['accuracy'])
# Fit the model to the training data.
model.fit(x=input_dataset, epochs=10)
Now that we have a trained model, we can evaluate it using the test dataset. To do that, read and prepare the test dataset in the same way as the training dataset. Here we specify a batch size of 1 so that each example in the test set is used exactly once to compute model accuracy. For model steps, just specify a number larger than the test dataset size (ignore the warning).
In [0]:
test_dataset = (
tf.data.TFRecordDataset(TEST_FILE_PATH, compression_type='GZIP')
.map(parse_tfrecord, num_parallel_calls=5)
.map(add_NDVI)
.map(to_tuple)
.batch(1))
model.evaluate(test_dataset)
Now it's time to classify the image that was exported from Earth Engine. If the exported image is large, it will be split into multiple TFRecord files in its destination folder. There will also be a JSON sidecar file called "the mixer" that describes the format and georeferencing of the image. Here we will find the image files and the mixer file, getting some info out of the mixer that will be useful during model inference.
In [0]:
# Get a list of all the files in the output bucket.
files_list = !gsutil ls 'gs://'{OUTPUT_BUCKET}
# Get only the files generated by the image export.
exported_files_list = [s for s in files_list if IMAGE_FILE_PREFIX in s]
# Get the list of image files and the JSON mixer file.
image_files_list = []
json_file = None
for f in exported_files_list:
if f.endswith('.tfrecord.gz'):
image_files_list.append(f)
elif f.endswith('.json'):
json_file = f
# Make sure the files are in the right order.
image_files_list.sort()
pprint(image_files_list)
print(json_file)
In [0]:
import json
# Load the contents of the mixer file to a JSON object.
json_text = !gsutil cat {json_file}
# Get a single string w/ newlines from the IPython.utils.text.SList
mixer = json.loads(json_text.nlstr)
pprint(mixer)
You can feed the list of files (imageFilesList
) directly to the TFRecordDataset
constructor to make a combined dataset on which to perform inference. The input needs to be preprocessed differently than the training and testing. Mainly, this is because the pixels are written into records as patches, we need to read the patches in as one big tensor (one patch for each band), then flatten them into lots of little tensors.
In [0]:
# Get relevant info from the JSON mixer file.
patch_width = mixer['patchDimensions'][0]
patch_height = mixer['patchDimensions'][1]
patches = mixer['totalPatches']
patch_dimensions_flat = [patch_width * patch_height, 1]
# Note that the tensors are in the shape of a patch, one patch for each band.
image_columns = [
tf.io.FixedLenFeature(shape=patch_dimensions_flat, dtype=tf.float32)
for k in BANDS
]
# Parsing dictionary.
image_features_dict = dict(zip(BANDS, image_columns))
# Note that you can make one dataset from many files by specifying a list.
image_dataset = tf.data.TFRecordDataset(image_files_list, compression_type='GZIP')
# Parsing function.
def parse_image(example_proto):
return tf.io.parse_single_example(example_proto, image_features_dict)
# Parse the data into tensors, one long tensor per patch.
image_dataset = image_dataset.map(parse_image, num_parallel_calls=5)
# Break our long tensors into many little ones.
image_dataset = image_dataset.flat_map(
lambda features: tf.data.Dataset.from_tensor_slices(features)
)
# Add additional features (NDVI).
image_dataset = image_dataset.map(
# Add NDVI to a feature that doesn't have a label.
lambda features: add_NDVI(features, None)[0]
)
# Turn the dictionary in each record into a tuple without a label.
image_dataset = image_dataset.map(
lambda data_dict: (tf.transpose(list(data_dict.values())), )
)
# Turn each patch into a batch.
image_dataset = image_dataset.batch(patch_width * patch_height)
In [0]:
# Run prediction in batches, with as many steps as there are patches.
predictions = model.predict(image_dataset, steps=patches, verbose=1)
# Note that the predictions come as a numpy array. Check the first one.
print(predictions[0])
Now that there's a list of class probabilities in predictions
, it's time to write them back into a file, optionally including a class label which is simply the index of the maximum probability. We'll write directly from TensorFlow to a file in the output Cloud Storage bucket.
Iterate over the list, compute class label and write the class and the probabilities in patches. Specifically, we need to write the pixels into the file as patches in the same order they came out. The records are written as serialized tf.train.Example
protos. This might take a while.
In [0]:
print('Writing to file ' + OUTPUT_IMAGE_FILE)
In [0]:
# Instantiate the writer.
writer = tf.io.TFRecordWriter(OUTPUT_IMAGE_FILE)
# Every patch-worth of predictions we'll dump an example into the output
# file with a single feature that holds our predictions. Since our predictions
# are already in the order of the exported data, the patches we create here
# will also be in the right order.
patch = [[], [], [], []]
cur_patch = 1
for prediction in predictions:
patch[0].append(tf.argmax(prediction, 1))
patch[1].append(prediction[0][0])
patch[2].append(prediction[0][1])
patch[3].append(prediction[0][2])
# Once we've seen a patches-worth of class_ids...
if (len(patch[0]) == patch_width * patch_height):
print('Done with patch ' + str(cur_patch) + ' of ' + str(patches) + '...')
# Create an example
example = tf.train.Example(
features=tf.train.Features(
feature={
'prediction': tf.train.Feature(
int64_list=tf.train.Int64List(
value=patch[0])),
'bareProb': tf.train.Feature(
float_list=tf.train.FloatList(
value=patch[1])),
'vegProb': tf.train.Feature(
float_list=tf.train.FloatList(
value=patch[2])),
'waterProb': tf.train.Feature(
float_list=tf.train.FloatList(
value=patch[3])),
}
)
)
# Write the example to the file and clear our patch array so it's ready for
# another batch of class ids
writer.write(example.SerializeToString())
patch = [[], [], [], []]
cur_patch += 1
writer.close()
In [0]:
!gsutil ls -l {OUTPUT_IMAGE_FILE}
Upload the image to Earth Engine directly from the Cloud Storage bucket with the earthengine
command. Provide both the image TFRecord file and the JSON file as arguments to earthengine upload
.
In [0]:
print('Uploading to ' + OUTPUT_ASSET_ID)
In [0]:
# Start the upload.
!earthengine upload image --asset_id={OUTPUT_ASSET_ID} --pyramiding_policy=mode {OUTPUT_IMAGE_FILE} {json_file}
In [0]:
ee.batch.Task.list()
In [0]:
predictions_image = ee.Image(OUTPUT_ASSET_ID)
prediction_vis = {
'bands': 'prediction',
'min': 0,
'max': 2,
'palette': ['red', 'green', 'blue']
}
probability_vis = {'bands': ['bareProb', 'vegProb', 'waterProb'], 'max': 0.5}
prediction_map_id = predictions_image.getMapId(prediction_vis)
probability_map_id = predictions_image.getMapId(probability_vis)
map = folium.Map(location=[37.6413, -122.2582])
folium.TileLayer(
tiles=prediction_map_id['tile_fetcher'].url_format,
attr='Map Data © <a href="https://earthengine.google.com/">Google Earth Engine</a>',
overlay=True,
name='prediction',
).add_to(map)
folium.TileLayer(
tiles=probability_map_id['tile_fetcher'].url_format,
attr='Map Data © <a href="https://earthengine.google.com/">Google Earth Engine</a>',
overlay=True,
name='probability',
).add_to(map)
map.add_child(folium.LayerControl())
map