In [0]:
#@title Copyright 2020 Google LLC. { display-mode: "form" }
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Introduction

This is a demonstration notebook. Suppose you have developed a model the training of which is constrained by the resources available to the notbook VM. In that case, you may want to use the Google AI Platform to train your model. The advantage of that is that long-running or resource intensive training jobs can be performed in the background. Also, to use your trained model in Earth Engine, it needs to be deployed as a hosted model on AI Platform. This notebook uses previously created training data (see this example notebook) and AI Platform to train a model, deploy it and use it to make predictions in Earth Engine. To do that, code needs to be structured as a python package that can be uploaded to AI Platform. The following cells produce that package programatically.

Setup software libraries

Install needed libraries to the notebook VM. Authenticate as necessary.


In [0]:
# Cloud authentication.
from google.colab import auth
auth.authenticate_user()

In [0]:
# Import and initialize the Earth Engine library.
import ee
ee.Authenticate()
ee.Initialize()

In [0]:
# Tensorflow setup.
import tensorflow as tf
print(tf.__version__)

In [0]:
# Folium setup.
import folium
print(folium.__version__)

Training code package setup

It's necessary to create a Python package to hold the training code. Here we're going to get started with that by creating a folder for the package and adding an empty __init__.py file.


In [0]:
PACKAGE_PATH = 'ai_platform_demo'

!ls -l
!mkdir {PACKAGE_PATH}
!touch {PACKAGE_PATH}/__init__.py
!ls -l {PACKAGE_PATH}

Variables

These variables need to be stored in a place where other code can access them. There are a variety of ways of accomplishing that, but here we'll use the %%writefile command to write the contents of the code cell to a file called config.py.

Note: You need to insert the name of a bucket (below) to which you have write access!


In [0]:
%%writefile {PACKAGE_PATH}/config.py

import tensorflow as tf

# INSERT YOUR PROJECT HERE!
PROJECT = 'your-project'

# INSERT YOUR BUCKET HERE!
BUCKET = 'your-bucket'

# Specify names of output locations in Cloud Storage.
FOLDER = 'fcnn-demo'
JOB_DIR = 'gs://' + BUCKET + '/' + FOLDER + '/trainer'
MODEL_DIR = JOB_DIR + '/model'
LOGS_DIR = JOB_DIR + '/logs'

# Put the EEified model next to the trained model directory.
EEIFIED_DIR = JOB_DIR + '/eeified'

# Pre-computed training and eval data.
DATA_BUCKET = 'ee-docs-demos'
TRAINING_BASE = 'training_patches'
EVAL_BASE = 'eval_patches'

# Specify inputs (Landsat bands) to the model and the response variable.
opticalBands = ['B1', 'B2', 'B3', 'B4', 'B5', 'B6', 'B7']
thermalBands = ['B10', 'B11']
BANDS = opticalBands + thermalBands
RESPONSE = 'impervious'
FEATURES = BANDS + [RESPONSE]

# Specify the size and shape of patches expected by the model.
KERNEL_SIZE = 256
KERNEL_SHAPE = [KERNEL_SIZE, KERNEL_SIZE]
COLUMNS = [
  tf.io.FixedLenFeature(shape=KERNEL_SHAPE, dtype=tf.float32) for k in FEATURES
]
FEATURES_DICT = dict(zip(FEATURES, COLUMNS))

# Sizes of the training and evaluation datasets.
TRAIN_SIZE = 16000
EVAL_SIZE = 8000

# Specify model training parameters.
BATCH_SIZE = 16
EPOCHS = 50
BUFFER_SIZE = 3000
OPTIMIZER = 'SGD'
LOSS = 'MeanSquaredError'
METRICS = ['RootMeanSquaredError']

Verify that the written file has the expected contents and is working as intended.


In [0]:
!cat {PACKAGE_PATH}/config.py

from ai_platform_demo import config
print('\n\n', config.BATCH_SIZE)

Training data, evaluation data and model

The following is code to load training/evaluation data and the model. Write this into model.py. Note that these functions are developed and explained in this example notebook.


In [0]:
%%writefile {PACKAGE_PATH}/model.py

from . import config
import tensorflow as tf
from tensorflow.python.keras import layers
from tensorflow.python.keras import losses
from tensorflow.python.keras import metrics
from tensorflow.python.keras import models
from tensorflow.python.keras import optimizers

# Dataset loading functions

def parse_tfrecord(example_proto):
  return tf.io.parse_single_example(example_proto, config.FEATURES_DICT)

def to_tuple(inputs):
  inputsList = [inputs.get(key) for key in config.FEATURES]
  stacked = tf.stack(inputsList, axis=0)
  stacked = tf.transpose(stacked, [1, 2, 0])
  return stacked[:,:,:len(config.BANDS)], stacked[:,:,len(config.BANDS):]

def get_dataset(pattern):
	glob = tf.io.gfile.glob(pattern)
	dataset = tf.data.TFRecordDataset(glob, compression_type='GZIP')
	dataset = dataset.map(parse_tfrecord)
	dataset = dataset.map(to_tuple)
	return dataset

def get_training_dataset():
	glob = 'gs://' + config.DATA_BUCKET + '/' + config.FOLDER + '/' + config.TRAINING_BASE + '*'
	dataset = get_dataset(glob)
	dataset = dataset.shuffle(config.BUFFER_SIZE).batch(config.BATCH_SIZE).repeat()
	return dataset

def get_eval_dataset():
	glob = 'gs://' + config.DATA_BUCKET + '/' + config.FOLDER + '/' + config.EVAL_BASE + '*'
	dataset = get_dataset(glob)
	dataset = dataset.batch(1).repeat()
	return dataset

# A variant of the UNET model.

def conv_block(input_tensor, num_filters):
	encoder = layers.Conv2D(num_filters, (3, 3), padding='same')(input_tensor)
	encoder = layers.BatchNormalization()(encoder)
	encoder = layers.Activation('relu')(encoder)
	encoder = layers.Conv2D(num_filters, (3, 3), padding='same')(encoder)
	encoder = layers.BatchNormalization()(encoder)
	encoder = layers.Activation('relu')(encoder)
	return encoder

def encoder_block(input_tensor, num_filters):
	encoder = conv_block(input_tensor, num_filters)
	encoder_pool = layers.MaxPooling2D((2, 2), strides=(2, 2))(encoder)
	return encoder_pool, encoder

def decoder_block(input_tensor, concat_tensor, num_filters):
	decoder = layers.Conv2DTranspose(num_filters, (2, 2), strides=(2, 2), padding='same')(input_tensor)
	decoder = layers.concatenate([concat_tensor, decoder], axis=-1)
	decoder = layers.BatchNormalization()(decoder)
	decoder = layers.Activation('relu')(decoder)
	decoder = layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
	decoder = layers.BatchNormalization()(decoder)
	decoder = layers.Activation('relu')(decoder)
	decoder = layers.Conv2D(num_filters, (3, 3), padding='same')(decoder)
	decoder = layers.BatchNormalization()(decoder)
	decoder = layers.Activation('relu')(decoder)
	return decoder

def get_model():
	inputs = layers.Input(shape=[None, None, len(config.BANDS)]) # 256
	encoder0_pool, encoder0 = encoder_block(inputs, 32) # 128
	encoder1_pool, encoder1 = encoder_block(encoder0_pool, 64) # 64
	encoder2_pool, encoder2 = encoder_block(encoder1_pool, 128) # 32
	encoder3_pool, encoder3 = encoder_block(encoder2_pool, 256) # 16
	encoder4_pool, encoder4 = encoder_block(encoder3_pool, 512) # 8
	center = conv_block(encoder4_pool, 1024) # center
	decoder4 = decoder_block(center, encoder4, 512) # 16
	decoder3 = decoder_block(decoder4, encoder3, 256) # 32
	decoder2 = decoder_block(decoder3, encoder2, 128) # 64
	decoder1 = decoder_block(decoder2, encoder1, 64) # 128
	decoder0 = decoder_block(decoder1, encoder0, 32) # 256
	outputs = layers.Conv2D(1, (1, 1), activation='sigmoid')(decoder0)

	model = models.Model(inputs=[inputs], outputs=[outputs])

	model.compile(
		optimizer=optimizers.get(config.OPTIMIZER), 
		loss=losses.get(config.LOSS),
		metrics=[metrics.get(metric) for metric in config.METRICS])

	return model

Verify that model.py is functioning as intended.


In [0]:
from ai_platform_demo import model

eval = model.get_eval_dataset()
print(iter(eval.take(1)).next())

model = model.get_model()
print(model.summary())

Training task

At this stage, there should be config.py storing variables and model.py which has code for getting the training/evaluation data and the model. All that's left is code for training the model. The following will create task.py, which will get the training and eval data, train the model and save it when it's done in a Cloud Storage bucket.


In [0]:
%%writefile {PACKAGE_PATH}/task.py

from . import config
from . import model
import tensorflow as tf

if __name__ == '__main__':

  training = model.get_training_dataset()
  evaluation = model.get_eval_dataset()

  m = model.get_model()

  m.fit(
      x=training,
      epochs=config.EPOCHS, 
      steps_per_epoch=int(config.TRAIN_SIZE / config.BATCH_SIZE), 
      validation_data=evaluation,
      validation_steps=int(config.EVAL_SIZE),
      callbacks=[tf.keras.callbacks.TensorBoard(config.LOGS_DIR)])

  m.save(config.MODEL_DIR, save_format='tf')

Submit the package to AI Platform for training

Now there's everything to submit this job, which can be done from the command line. First, define some needed variables.

Note: You need to insert the name of a Cloud project (below) you own!


In [0]:
import time

JOB_NAME = 'demo_training_job_' + str(int(time.time()))
TRAINER_PACKAGE_PATH = 'ai_platform_demo'
MAIN_TRAINER_MODULE = 'ai_platform_demo.task'
REGION = 'us-central1'

Now the training job is ready to be started. First, you need to enable the ML API for your project. This can be done from this link to the Cloud Console. See this guide for details. Note that the Python and Tensorflow versions should match what is used in the Colab notebook.


In [0]:
!gcloud ai-platform jobs submit training {JOB_NAME} \
    --job-dir {config.JOB_DIR}  \
    --package-path {TRAINER_PACKAGE_PATH} \
    --module-name {MAIN_TRAINER_MODULE} \
    --region {REGION} \
    --project {config.PROJECT} \
    --runtime-version 2.1 \
    --python-version 3.7 \
    --scale-tier basic-gpu

Monitor the training job

There's not much more to do until the model is finished training (~24 hours), but it's fun and useful to monitor its progress. You can do that progamatically with another gcloud command. The output of that command can be read into an IPython.utils.text.SList from which the state is extracted and ensured to be SUCCEEDED. Or you can monitor it from the AI Platform jobs page on the Cloud Console.


In [0]:
desc = !gcloud ai-platform jobs describe {JOB_NAME} --project {PROJECT}
state = desc.grep('state:')[0].split(':')[1].strip()
print(state)

Inspect the trained model

Once the training job has finished, verify that you can load the trained model and print a summary of the fitted parameters. It's also useful to examine the logs with TensorBoard. There's a convenient notebook extension that will launch TensorBoard in the Colab notebook. Examine the training and testing learning curves to ensure that the training process has converged.


In [0]:
%load_ext tensorboard
%tensorboard --logdir {config.LOGS_DIR}

Prepare the model for making predictions in Earth Engine

Before we can use the model in Earth Engine, it needs to be hosted by AI Platform. But before we can host the model on AI Platform we need to EEify (a new word!) it. The EEification process merely appends some extra operations to the input and outputs of the model in order to accomdate the interchange format between pixels from Earth Engine (float32) and inputs to AI Platform (base64). (See this doc for details.)

earthengine model prepare

The EEification process is handled for you using the Earth Engine command earthengine model prepare. To use that command, we need to specify the input and output model directories and the name of the input and output nodes in the TensorFlow computation graph. We can do all that programmatically:


In [0]:
from tensorflow.python.tools import saved_model_utils

meta_graph_def = saved_model_utils.get_meta_graph_def(config.MODEL_DIR, 'serve')
inputs = meta_graph_def.signature_def['serving_default'].inputs
outputs = meta_graph_def.signature_def['serving_default'].outputs

# Just get the first thing(s) from the serving signature def.  i.e. this
# model only has a single input and a single output.
input_name = None
for k,v in inputs.items():
  input_name = v.name
  break

output_name = None
for k,v in outputs.items():
  output_name = v.name
  break

# Make a dictionary that maps Earth Engine outputs and inputs to 
# AI Platform inputs and outputs, respectively.
import json
input_dict = "'" + json.dumps({input_name: "array"}) + "'"
output_dict = "'" + json.dumps({output_name: "impervious"}) + "'"

In [0]:
# You need to set the project before using the model prepare command.
!earthengine set_project {PROJECT}
!earthengine model prepare --source_dir {config.MODEL_DIR} --dest_dir {config.EEIFIED_DIR} --input {input_dict} --output {output_dict}

Note that you can also use the TensorFlow saved model command line tool to do this manually. See this doc for details. Also note the names we've specified for the new inputs and outputs: array and impervious, respectively.

Perform inference using the trained model in Earth Engine

Before it's possible to get predictions from the trained and EEified model, it needs to be deployed on AI Platform. The first step is to create the model. The second step is to create a version. See this guide for details. Note that models and versions can be monitored from the AI Platform models page of the Cloud Console.

To ensure that the model is ready for predictions without having to warm up nodes, you can use a configuration yaml file to set the scaling type of this version to autoScaling, and, set a minimum number of nodes for the version. This will ensure there are always nodes on stand-by, however, you will be charged as long as they are running. For this example, we'll set the minNodes to 10. That means that at a minimum, 10 nodes are always up and running and waiting for predictions. The number of nodes will also scale up automatically if needed.


In [0]:
%%writefile config.yaml
autoScaling:
    minNodes: 10

In [0]:
MODEL_NAME = 'fcnn_demo_model'
VERSION_NAME = 'v' + str(int(time.time()))
print('Creating version: ' + VERSION_NAME)

!gcloud ai-platform models create {MODEL_NAME} --project {config.PROJECT}
!gcloud ai-platform versions create {VERSION_NAME} \
  --project {config.PROJECT} \
  --model {MODEL_NAME} \
  --origin {config.EEIFIED_DIR} \
  --framework "TENSORFLOW" \
  --runtime-version 2.1 \
  --python-version 3.7 \
  --config=config.yaml

There is now a trained model, prepared for serving to Earth Engine, hosted and versioned on AI Platform. We can now connect Earth Engine directly to the trained model for inference. You do that with the ee.Model.fromAiPlatformPredictor command.

ee.Model.fromAiPlatformPredictor

For this command to work, we need to know a lot about the model. To connect to the model, you need to know the name and version.

Inputs

You need to be able to recreate the imagery on which it was trained in order to perform inference. Specifically, you need to create an array-valued input from the scaled data and use that for input. (Recall that the new input node is named array, which is convenient because the array image has one band, named array by default.) The inputs will be provided as 144x144 patches (inputTileSize), at 30-meter resolution (proj), but 8 pixels will be thrown out (inputOverlapSize) to minimize boundary effects.

Outputs

The output (which you also need to know), is a single float band named impervious.


In [0]:
# Use Landsat 8 surface reflectance data.
l8sr = ee.ImageCollection('LANDSAT/LC08/C01/T1_SR')

# Cloud masking function.
def maskL8sr(image):
  cloudShadowBitMask = ee.Number(2).pow(3).int()
  cloudsBitMask = ee.Number(2).pow(5).int()
  qa = image.select('pixel_qa')
  mask1 = qa.bitwiseAnd(cloudShadowBitMask).eq(0).And(
    qa.bitwiseAnd(cloudsBitMask).eq(0))
  mask2 = image.mask().reduce('min')
  mask3 = image.select(config.opticalBands).gt(0).And(
          image.select(config.opticalBands).lt(10000)).reduce('min')
  mask = mask1.And(mask2).And(mask3)
  return image.select(config.opticalBands).divide(10000).addBands(
          image.select(config.thermalBands).divide(10).clamp(273.15, 373.15)
            .subtract(273.15).divide(100)).updateMask(mask)

# The image input data is a cloud-masked median composite.
image = l8sr.filterDate(
    '2015-01-01', '2017-12-31').map(maskL8sr).median().select(config.BANDS).float()

# Load the trained model and use it for prediction.
model = ee.Model.fromAiPlatformPredictor(
    projectName = config.PROJECT,
    modelName = MODEL_NAME,
    version = VERSION_NAME,
    inputTileSize = [144, 144],
    inputOverlapSize = [8, 8],
    proj = ee.Projection('EPSG:4326').atScale(30),
    fixInputProj = True,
    outputBands = {'impervious': {
        'type': ee.PixelType.float()
      }
    }
)
predictions = model.predictImage(image.toArray())

# Use folium to visualize the input imagery and the predictions.
mapid = image.getMapId({'bands': ['B4', 'B3', 'B2'], 'min': 0, 'max': 0.3})
map = folium.Map(location=[38., -122.5], zoom_start=13)
folium.TileLayer(
    tiles=mapid['tile_fetcher'].url_format,
    attr='Google Earth Engine',
    overlay=True,
    name='median composite',
  ).add_to(map)

mapid = predictions.getMapId({'min': 0, 'max': 1})

folium.TileLayer(
    tiles=mapid['tile_fetcher'].url_format,
    attr='Google Earth Engine',
    overlay=True,
    name='predictions',
  ).add_to(map)
map.add_child(folium.LayerControl())
map