In [0]:
#@title Copyright 2020 Google LLC. { display-mode: "form" }
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Introduction

This is an Earth Engine <> TensorFlow demonstration notebook. Suppose you want to predict a continuous output (regression) from a stack of continuous inputs. In this example, the output is impervious surface area from NLCD and the input is a Landsat 8 composite. The model is a fully convolutional neural network (FCNN), specifically U-net. This notebook shows:

  1. Exporting training/testing patches from Earth Engine, suitable for training an FCNN model.
  2. Preprocessing.
  3. Training and validating an FCNN model.
  4. Making predictions with the trained model and importing them to Earth Engine.

Setup software libraries

Authenticate and import as necessary.


In [0]:
# Cloud authentication.
from google.colab import auth
auth.authenticate_user()

In [0]:
# Import, authenticate and initialize the Earth Engine library.
import ee
ee.Authenticate()
ee.Initialize()

In [0]:
# Tensorflow setup.
import tensorflow as tf
print(tf.__version__)

In [0]:
# Folium setup.
import folium
print(folium.__version__)

Variables

Declare the variables that will be in use throughout the notebook.

Specify your Cloud Storage Bucket

You must have write access to a bucket to run this demo! To run it read-only, use the demo bucket below, but note that writes to this bucket will not work.


In [0]:
# INSERT YOUR BUCKET HERE:
BUCKET = 'your-bucket-name'

Set other global variables


In [0]:
# Specify names locations for outputs in Cloud Storage. 
FOLDER = 'fcnn-demo'
TRAINING_BASE = 'training_patches'
EVAL_BASE = 'eval_patches'

# Specify inputs (Landsat bands) to the model and the response variable.
opticalBands = ['B1', 'B2', 'B3', 'B4', 'B5', 'B6', 'B7']
thermalBands = ['B10', 'B11']
BANDS = opticalBands + thermalBands
RESPONSE = 'impervious'
FEATURES = BANDS + [RESPONSE]

# Specify the size and shape of patches expected by the model.
KERNEL_SIZE = 256
KERNEL_SHAPE = [KERNEL_SIZE, KERNEL_SIZE]
COLUMNS = [
  tf.io.FixedLenFeature(shape=KERNEL_SHAPE, dtype=tf.float32) for k in FEATURES
]
FEATURES_DICT = dict(zip(FEATURES, COLUMNS))

# Sizes of the training and evaluation datasets.
TRAIN_SIZE = 16000
EVAL_SIZE = 8000

# Specify model training parameters.
BATCH_SIZE = 16
EPOCHS = 10
BUFFER_SIZE = 2000
OPTIMIZER = 'SGD'
LOSS = 'MeanSquaredError'
METRICS = ['RootMeanSquaredError']

Imagery

Gather and setup the imagery to use for inputs (predictors). This is a three-year, cloud-free, Landsat 8 composite. Display it in the notebook for a sanity check.


In [0]:
# Use Landsat 8 surface reflectance data.
l8sr = ee.ImageCollection('LANDSAT/LC08/C01/T1_SR')

# Cloud masking function.
def maskL8sr(image):
  cloudShadowBitMask = ee.Number(2).pow(3).int()
  cloudsBitMask = ee.Number(2).pow(5).int()
  qa = image.select('pixel_qa')
  mask1 = qa.bitwiseAnd(cloudShadowBitMask).eq(0).And(
    qa.bitwiseAnd(cloudsBitMask).eq(0))
  mask2 = image.mask().reduce('min')
  mask3 = image.select(opticalBands).gt(0).And(
          image.select(opticalBands).lt(10000)).reduce('min')
  mask = mask1.And(mask2).And(mask3)
  return image.select(opticalBands).divide(10000).addBands(
          image.select(thermalBands).divide(10).clamp(273.15, 373.15)
            .subtract(273.15).divide(100)).updateMask(mask)

# The image input data is a cloud-masked median composite.
image = l8sr.filterDate('2015-01-01', '2017-12-31').map(maskL8sr).median()

# Use folium to visualize the imagery.
mapid = image.getMapId({'bands': ['B4', 'B3', 'B2'], 'min': 0, 'max': 0.3})
map = folium.Map(location=[38., -122.5])
folium.TileLayer(
    tiles=mapid['tile_fetcher'].url_format,
    attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
    overlay=True,
    name='median composite',
  ).add_to(map)

mapid = image.getMapId({'bands': ['B10'], 'min': 0, 'max': 0.5})
folium.TileLayer(
    tiles=mapid['tile_fetcher'].url_format,
    attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
    overlay=True,
    name='thermal',
  ).add_to(map)
map.add_child(folium.LayerControl())
map

Prepare the response (what we want to predict). This is impervious surface area (in fraction of a pixel) from the 2016 NLCD dataset. Display to check.


In [0]:
nlcd = ee.Image('USGS/NLCD/NLCD2016').select('impervious')
nlcd = nlcd.divide(100).float()

mapid = nlcd.getMapId({'min': 0, 'max': 1})
map = folium.Map(location=[38., -122.5])
folium.TileLayer(
    tiles=mapid['tile_fetcher'].url_format,
    attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
    overlay=True,
    name='nlcd impervious',
  ).add_to(map)
map.add_child(folium.LayerControl())
map

Stack the 2D images (Landsat composite and NLCD impervious surface) to create a single image from which samples can be taken. Convert the image into an array image in which each pixel stores 256x256 patches of pixels for each band. This is a key step that bears emphasis: to export training patches, convert a multi-band image to an array image using neighborhoodToArray(), then sample the image at points.


In [0]:
featureStack = ee.Image.cat([
  image.select(BANDS),
  nlcd.select(RESPONSE)
]).float()

list = ee.List.repeat(1, KERNEL_SIZE)
lists = ee.List.repeat(list, KERNEL_SIZE)
kernel = ee.Kernel.fixed(KERNEL_SIZE, KERNEL_SIZE, lists)

arrays = featureStack.neighborhoodToArray(kernel)

Use some pre-made geometries to sample the stack in strategic locations. Specifically, these are hand-made polygons in which to take the 256x256 samples. Display the sampling polygons on a map, red for training polygons, blue for evaluation.


In [0]:
trainingPolys = ee.FeatureCollection('projects/google/DemoTrainingGeometries')
evalPolys = ee.FeatureCollection('projects/google/DemoEvalGeometries')

polyImage = ee.Image(0).byte().paint(trainingPolys, 1).paint(evalPolys, 2)
polyImage = polyImage.updateMask(polyImage)

mapid = polyImage.getMapId({'min': 1, 'max': 2, 'palette': ['red', 'blue']})
map = folium.Map(location=[38., -100.], zoom_start=5)
folium.TileLayer(
    tiles=mapid['tile_fetcher'].url_format,
    attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
    overlay=True,
    name='training polygons',
  ).add_to(map)
map.add_child(folium.LayerControl())
map

Sampling

The mapped data look reasonable so take a sample from each polygon and merge the results into a single export. The key step is sampling the array image at points, to get all the pixels in a 256x256 neighborhood at each point. It's worth noting that to build the training and testing data for the FCNN, you export a single TFRecord file that contains patches of pixel values in each record. You do NOT need to export each training/testing patch to a different image. Since each record potentially contains a lot of data (especially with big patches or many input bands), some manual sharding of the computation is necessary to avoid the computed value too large error. Specifically, the following code takes multiple (smaller) samples within each geometry, merging the results to get a single export.


In [0]:
# Convert the feature collections to lists for iteration.
trainingPolysList = trainingPolys.toList(trainingPolys.size())
evalPolysList = evalPolys.toList(evalPolys.size())

# These numbers determined experimentally.
n = 200 # Number of shards in each polygon.
N = 2000 # Total sample size in each polygon.

# Export all the training data (in many pieces), with one task 
# per geometry.
for g in range(trainingPolys.size().getInfo()):
  geomSample = ee.FeatureCollection([])
  for i in range(n):
    sample = arrays.sample(
      region = ee.Feature(trainingPolysList.get(g)).geometry(), 
      scale = 30,
      numPixels = N / n, # Size of the shard.
      seed = i,
      tileScale = 8
    )
    geomSample = geomSample.merge(sample)

  desc = TRAINING_BASE + '_g' + str(g)
  task = ee.batch.Export.table.toCloudStorage(
    collection = geomSample,
    description = desc,
    bucket = BUCKET,
    fileNamePrefix = FOLDER + '/' + desc,
    fileFormat = 'TFRecord',
    selectors = BANDS + [RESPONSE]
  )
  task.start()

# Export all the evaluation data.
for g in range(evalPolys.size().getInfo()):
  geomSample = ee.FeatureCollection([])
  for i in range(n):
    sample = arrays.sample(
      region = ee.Feature(evalPolysList.get(g)).geometry(), 
      scale = 30,
      numPixels = N / n,
      seed = i,
      tileScale = 8
    )
    geomSample = geomSample.merge(sample)

  desc = EVAL_BASE + '_g' + str(g)
  task = ee.batch.Export.table.toCloudStorage(
    collection = geomSample,
    description = desc,
    bucket = BUCKET,
    fileNamePrefix = FOLDER + '/' + desc,
    fileFormat = 'TFRecord',
    selectors = BANDS + [RESPONSE]
  )
  task.start()

Training data

Load the data exported from Earth Engine into a tf.data.Dataset. The following are helper functions for that.


In [0]:
def parse_tfrecord(example_proto):
  """The parsing function.
  Read a serialized example into the structure defined by FEATURES_DICT.
  Args:
    example_proto: a serialized Example.
  Returns:
    A dictionary of tensors, keyed by feature name.
  """
  return tf.io.parse_single_example(example_proto, FEATURES_DICT)


def to_tuple(inputs):
  """Function to convert a dictionary of tensors to a tuple of (inputs, outputs).
  Turn the tensors returned by parse_tfrecord into a stack in HWC shape.
  Args:
    inputs: A dictionary of tensors, keyed by feature name.
  Returns:
    A tuple of (inputs, outputs).
  """
  inputsList = [inputs.get(key) for key in FEATURES]
  stacked = tf.stack(inputsList, axis=0)
  # Convert from CHW to HWC
  stacked = tf.transpose(stacked, [1, 2, 0])
  return stacked[:,:,:len(BANDS)], stacked[:,:,len(BANDS):]


def get_dataset(pattern):
  """Function to read, parse and format to tuple a set of input tfrecord files.
  Get all the files matching the pattern, parse and convert to tuple.
  Args:
    pattern: A file pattern to match in a Cloud Storage bucket.
  Returns:
    A tf.data.Dataset
  """
  glob = tf.io.gfile.glob(pattern)
  dataset = tf.data.TFRecordDataset(glob, compression_type='GZIP')
  dataset = dataset.map(parse_tfrecord, num_parallel_calls=5)
  dataset = dataset.map(to_tuple, num_parallel_calls=5)
  return dataset

Use the helpers to read in the training dataset. Print the first record to check.


In [0]:
def get_training_dataset():
	"""Get the preprocessed training dataset
  Returns: 
    A tf.data.Dataset of training data.
  """
	glob = 'gs://' + BUCKET + '/' + FOLDER + '/' + TRAINING_BASE + '*'
	dataset = get_dataset(glob)
	dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
	return dataset

training = get_training_dataset()

print(iter(training.take(1)).next())

Evaluation data

Now do the same thing to get an evaluation dataset. Note that unlike the training dataset, the evaluation dataset has a batch size of 1, is not repeated and is not shuffled.


In [0]:
def get_eval_dataset():
	"""Get the preprocessed evaluation dataset
  Returns: 
    A tf.data.Dataset of evaluation data.
  """
	glob = 'gs://' + BUCKET + '/' + FOLDER + '/' + EVAL_BASE + '*'
	dataset = get_dataset(glob)
	dataset = dataset.batch(1).repeat()
	return dataset

evaluation = get_eval_dataset()

Model

Here we use the Keras implementation of the U-Net model. The U-Net model takes 256x256 pixel patches as input and outputs per-pixel class probability, label or a continuous output. We can implement the model essentially unmodified, but will use mean squared error loss on the sigmoidal output since we are treating this as a regression problem, rather than a classification problem. Since impervious surface fraction is constrained to [0,1], with many values close to zero or one, a saturating activation function is suitable here.


In [0]:
from tensorflow.python.keras import layers
from tensorflow.python.keras import losses
from tensorflow.python.keras import models
from tensorflow.python.keras import metrics
from tensorflow.python.keras import optimizers

def conv_block(input_tensor, num_filters):
	encoder = layers.Conv2D(num_filters, (3, 3), padding='same')(input_tensor)
	encoder = layers.BatchNormalization()(encoder)
	encoder = layers.Activation('relu')(encoder)
	encoder = layers.Conv2D(num_filters, (3, 3), padding='same')(encoder)
	encoder = layers.BatchNormalization()(encoder)
	encoder = layers.Activation('relu')(encoder)
	return encoder

def encoder_block(input_tensor, num_filters):
	encoder = conv_block(input_tensor, num_filters)
	encoder_pool = layers.MaxPooling2D((2, 2), strides=(2, 2))(encoder)
	return encoder_pool, encoder

def decoder_block(input_tensor, concat_tensor, num_filters):
	decoder = layers.Conv2DTranspose(num_filters, (2, 2), strides=(2, 2), padding='same')(input_tensor)
	decoder = layers.concatenate([concat_tensor, decoder], axis=-1)
	decoder = layers.BatchNormalization()(decoder)
	decoder = layers.Activation('relu')(decoder)
	decoder = layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
	decoder = layers.BatchNormalization()(decoder)
	decoder = layers.Activation('relu')(decoder)
	decoder = layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
	decoder = layers.BatchNormalization()(decoder)
	decoder = layers.Activation('relu')(decoder)
	return decoder

def get_model():
	inputs = layers.Input(shape=[None, None, len(BANDS)]) # 256
	encoder0_pool, encoder0 = encoder_block(inputs, 32) # 128
	encoder1_pool, encoder1 = encoder_block(encoder0_pool, 64) # 64
	encoder2_pool, encoder2 = encoder_block(encoder1_pool, 128) # 32
	encoder3_pool, encoder3 = encoder_block(encoder2_pool, 256) # 16
	encoder4_pool, encoder4 = encoder_block(encoder3_pool, 512) # 8
	center = conv_block(encoder4_pool, 1024) # center
	decoder4 = decoder_block(center, encoder4, 512) # 16
	decoder3 = decoder_block(decoder4, encoder3, 256) # 32
	decoder2 = decoder_block(decoder3, encoder2, 128) # 64
	decoder1 = decoder_block(decoder2, encoder1, 64) # 128
	decoder0 = decoder_block(decoder1, encoder0, 32) # 256
	outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(decoder0)

	model = models.Model(inputs=[inputs], outputs=[outputs])

	model.compile(
		optimizer=optimizers.get(OPTIMIZER), 
		loss=losses.get(LOSS),
		metrics=[metrics.get(metric) for metric in METRICS])

	return model

Training the model

You train a Keras model by calling .fit() on it. Here we're going to train for 10 epochs, which is suitable for demonstration purposes. For production use, you probably want to optimize this parameter, for example through hyperparamter tuning.


In [0]:
m = get_model()

m.fit(
    x=training, 
    epochs=EPOCHS, 
    steps_per_epoch=int(TRAIN_SIZE / BATCH_SIZE), 
    validation_data=evaluation,
    validation_steps=EVAL_SIZE)

Note that the notebook VM is sometimes not heavy-duty enough to get through a whole training job, especially if you have a large buffer size or a large number of epochs. You can still use this notebook for training, but may need to set up an alternative VM (learn more) for production use. Alternatively, you can package your code for running large training jobs on Google's AI Platform as described here. The following code loads a pre-trained model, which you can use for predictions right away.


In [0]:
# Load a trained model. 50 epochs. 25 hours. Final RMSE ~0.08.
MODEL_DIR = 'gs://ee-docs-demos/fcnn-demo/trainer/model'
m = tf.keras.models.load_model(MODEL_DIR)
m.summary()

Prediction

The prediction pipeline is:

  1. Export imagery on which to do predictions from Earth Engine in TFRecord format to a Cloud Storge bucket.
  2. Use the trained model to make the predictions.
  3. Write the predictions to a TFRecord file in a Cloud Storage.
  4. Upload the predictions TFRecord file to Earth Engine.

The following functions handle this process. It's useful to separate the export from the predictions so that you can experiment with different models without running the export every time.


In [0]:
def doExport(out_image_base, kernel_buffer, region):
  """Run the image export task.  Block until complete.
  """
  task = ee.batch.Export.image.toCloudStorage(
    image = image.select(BANDS),
    description = out_image_base,
    bucket = BUCKET,
    fileNamePrefix = FOLDER + '/' + out_image_base,
    region = region.getInfo()['coordinates'],
    scale = 30,
    fileFormat = 'TFRecord',
    maxPixels = 1e10,
    formatOptions = {
      'patchDimensions': KERNEL_SHAPE,
      'kernelSize': kernel_buffer,
      'compressed': True,
      'maxFileSize': 104857600
    }
  )
  task.start()

  # Block until the task completes.
  print('Running image export to Cloud Storage...')
  import time
  while task.active():
    time.sleep(30)

  # Error condition
  if task.status()['state'] != 'COMPLETED':
    print('Error with image export.')
  else:
    print('Image export completed.')

In [0]:
def doPrediction(out_image_base, user_folder, kernel_buffer, region):
  """Perform inference on exported imagery, upload to Earth Engine.
  """

  print('Looking for TFRecord files...')

  # Get a list of all the files in the output bucket.
  filesList = !gsutil ls 'gs://'{BUCKET}'/'{FOLDER}

  # Get only the files generated by the image export.
  exportFilesList = [s for s in filesList if out_image_base in s]

  # Get the list of image files and the JSON mixer file.
  imageFilesList = []
  jsonFile = None
  for f in exportFilesList:
    if f.endswith('.tfrecord.gz'):
      imageFilesList.append(f)
    elif f.endswith('.json'):
      jsonFile = f

  # Make sure the files are in the right order.
  imageFilesList.sort()

  from pprint import pprint
  pprint(imageFilesList)
  print(jsonFile)

  import json
  # Load the contents of the mixer file to a JSON object.
  jsonText = !gsutil cat {jsonFile}
  # Get a single string w/ newlines from the IPython.utils.text.SList
  mixer = json.loads(jsonText.nlstr)
  pprint(mixer)
  patches = mixer['totalPatches']

  # Get set up for prediction.
  x_buffer = int(kernel_buffer[0] / 2)
  y_buffer = int(kernel_buffer[1] / 2)

  buffered_shape = [
      KERNEL_SHAPE[0] + kernel_buffer[0],
      KERNEL_SHAPE[1] + kernel_buffer[1]]

  imageColumns = [
    tf.io.FixedLenFeature(shape=buffered_shape, dtype=tf.float32) 
      for k in BANDS
  ]

  imageFeaturesDict = dict(zip(BANDS, imageColumns))

  def parse_image(example_proto):
    return tf.io.parse_single_example(example_proto, imageFeaturesDict)

  def toTupleImage(inputs):
    inputsList = [inputs.get(key) for key in BANDS]
    stacked = tf.stack(inputsList, axis=0)
    stacked = tf.transpose(stacked, [1, 2, 0])
    return stacked

   # Create a dataset from the TFRecord file(s) in Cloud Storage.
  imageDataset = tf.data.TFRecordDataset(imageFilesList, compression_type='GZIP')
  imageDataset = imageDataset.map(parse_image, num_parallel_calls=5)
  imageDataset = imageDataset.map(toTupleImage).batch(1)

  # Perform inference.
  print('Running predictions...')
  predictions = m.predict(imageDataset, steps=patches, verbose=1)
  # print(predictions[0])

  print('Writing predictions...')
  out_image_file = 'gs://' + BUCKET + '/' + FOLDER + '/' + out_image_base + '.TFRecord'
  writer = tf.io.TFRecordWriter(out_image_file)
  patches = 0
  for predictionPatch in predictions:
    print('Writing patch ' + str(patches) + '...')
    predictionPatch = predictionPatch[
        x_buffer:x_buffer+KERNEL_SIZE, y_buffer:y_buffer+KERNEL_SIZE]

    # Create an example.
    example = tf.train.Example(
      features=tf.train.Features(
        feature={
          'impervious': tf.train.Feature(
              float_list=tf.train.FloatList(
                  value=predictionPatch.flatten()))
        }
      )
    )
    # Write the example.
    writer.write(example.SerializeToString())
    patches += 1

  writer.close()

  # Start the upload.
  out_image_asset = user_folder + '/' + out_image_base
  !earthengine upload image --asset_id={out_image_asset} {out_image_file} {jsonFile}

Now there's all the code needed to run the prediction pipeline, all that remains is to specify the output region in which to do the prediction, the names of the output files, where to put them, and the shape of the outputs. In terms of the shape, the model is trained on 256x256 patches, but can work (in theory) on any patch that's big enough with even dimensions (reference). Because of tile boundary artifacts, give the model slightly larger patches for prediction, then clip out the middle 256x256 patch. This is controlled with a kernel buffer, half the size of which will extend beyond the kernel buffer. For example, specifying a 128x128 kernel will append 64 pixels on each side of the patch, to ensure that the pixels in the output are taken from inputs completely covered by the kernel.


In [0]:
# Output assets folder: YOUR FOLDER
user_folder = 'users/username' # INSERT YOUR FOLDER HERE.

# Base file name to use for TFRecord files and assets.
bj_image_base = 'FCNN_demo_beijing_384_'
# Half this will extend on the sides of each patch.
bj_kernel_buffer = [128, 128]
# Beijing
bj_region = ee.Geometry.Polygon(
        [[[115.9662455210937, 40.121362012835235],
          [115.9662455210937, 39.64293313749715],
          [117.01818643906245, 39.64293313749715],
          [117.01818643906245, 40.121362012835235]]], None, False)

In [0]:
# Run the export.
doExport(bj_image_base, bj_kernel_buffer, bj_region)

In [0]:
# Run the prediction.
doPrediction(bj_image_base, user_folder, bj_kernel_buffer, bj_region)

Display the output

One the data has been exported, the model has made predictions and the predictions have been written to a file, and the image imported to Earth Engine, it's possible to display the resultant Earth Engine asset. Here, display the impervious area predictions over Beijing, China.


In [0]:
out_image = ee.Image(user_folder + '/' + bj_image_base)
mapid = out_image.getMapId({'min': 0, 'max': 1})
map = folium.Map(location=[39.898, 116.5097])
folium.TileLayer(
    tiles=mapid['tile_fetcher'].url_format,
    attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
    overlay=True,
    name='predicted impervious',
  ).add_to(map)
map.add_child(folium.LayerControl())
map