In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
This tutorial focuses on the task of image segmentation, using an encoder-decoder architecture, implemented with model subclassing API.
So far you have seen image classification, where the task of the network is to assign a label or class to an input image. However, suppose you want to know where an object is located in the image, the shape of that object, which pixel belongs to which object, etc. In this case you will want to segment the image, i.e., each pixel of the image is given a label. Thus, the task of image segmentation is to train a neural network to output a pixel-wise mask of the image. This helps in understanding the image at a much finer granularity, i.e., the pixel level. Image segmentation has many applications in medical imaging, self-driving cars and satellite imaging, to name a few.
The dataset that will be used for this tutorial is the Oxford-IIIT Pet Dataset, created by Parkhi et al. The dataset consists of images, their corresponding labels, and pixel-wise masks. The masks are basically labels for each pixel. Each pixel is given one of three categories :
In [0]:
!pip install git+https://github.com/tensorflow/examples.git
In [0]:
import tensorflow as tf
assert tf.__version__.startswith('2')
import tensorflow_datasets as tfds
tfds.disable_progress_bar()
tf.executing_eagerly()
from IPython.display import clear_output
import matplotlib.pyplot as plt
In [0]:
dataset, info = tfds.load('oxford_iiit_pet:3.0.0', with_info=True)
The following code performs a simple augmentation of flipping an image. In addition, image is normalized to [0,1]. Finally, as mentioned above the pixels in the segmentation mask are labeled either {1, 2, 3}. For the sake of convenience, let's subtract 1 from the segmentation mask, resulting in labels that are : {0, 1, 2}.
In [0]:
def normalize(input_image, input_mask):
input_image = tf.cast(input_image, tf.float32)/128.0 - 1
input_mask -= 1
return input_image, input_mask
In [0]:
@tf.function
def load_image_train(datapoint):
input_image = tf.image.resize(datapoint['image'], (128, 128))
input_mask = tf.image.resize(datapoint['segmentation_mask'], (128, 128))
if tf.random.uniform(()) > 0.5:
input_image = tf.image.flip_left_right(input_image)
input_mask = tf.image.flip_left_right(input_mask)
input_image, input_mask = normalize(input_image, input_mask)
return input_image, input_mask
In [0]:
def load_image_test(datapoint):
input_image = tf.image.resize(datapoint['image'], (128, 128))
input_mask = tf.image.resize(datapoint['segmentation_mask'], (128, 128))
input_image, input_mask = normalize(input_image, input_mask)
return input_image, input_mask
The dataset already contains the required splits of test and train and so let's continue to use the same split.
In [0]:
TRAIN_LENGTH = info.splits['train'].num_examples
BATCH_SIZE = 64
BUFFER_SIZE = 1000
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE
In [0]:
train = dataset['train'].map(load_image_train, num_parallel_calls=tf.data.experimental.AUTOTUNE)
test = dataset['test'].map(load_image_test)
In [0]:
train_dataset = train.cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE).repeat()
train_dataset = train_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
test_dataset = test.batch(BATCH_SIZE)
Let's take a look at an image example and it's corresponding mask from the dataset.
In [0]:
def display(display_list):
plt.figure(figsize=(15, 15))
title = ['Input Image', 'True Mask', 'Predicted Mask']
for i in range(len(display_list)):
plt.subplot(1, len(display_list), i+1)
plt.title(title[i])
plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
plt.axis('off')
plt.show()
In [0]:
for image, mask in train.take(1):
sample_image, sample_mask = image, mask
display([sample_image, sample_mask])
print(sample_image.shape)
The model used here consists of an encoder (downsampler) and decoder (upsampler).
Here we define encoder and decoder using model subclassing API. Each encoder block is basically a sequence of Conv layers and MaxPooling layers (UpSampling layers for decoder block), plus residual connection. The output activation of each block is used as input for the next block, where the activation is not only further processed in the sequence of Conv/MaxPooling layers as in a Sequential model, but also processed in residual layers and added to its output.
The reason to output three channels is because there are three possible labels for each pixel. Think of this as multi-classification where each pixel is classified into three classes.
In [0]:
OUTPUT_CHANNELS = 3
In [0]:
from tensorflow.keras import layers
class EncoderBlock(tf.keras.Model):
def __init__(self, filter_size):
# initilize instance variables
super(EncoderBlock, self).__init__()
self.filter_size = filter_size
# define layers
self.layer_1 = layers.Activation('relu')
self.layer_2 = layers.SeparableConv2D(self.filter_size, 3, padding='same')
self.layer_3 = layers.BatchNormalization()
self.layer_4 = layers.Activation('relu')
self.layer_5 = layers.SeparableConv2D(self.filter_size, 3, padding='same')
self.layer_6 = layers.BatchNormalization()
self.layer_7 = layers.MaxPooling2D(3, strides=2, padding='same')
# project residual
self.residual_layer = layers.Conv2D(self.filter_size, 1, strides=2, padding='same')
def call(self, inputs):
x = self.layer_1(inputs)
x = self.layer_2(x)
x = self.layer_3(x)
x = self.layer_4(x)
x = self.layer_5(x)
x = self.layer_6(x)
x = self.layer_7(x)
residual = self.residual_layer(inputs)
x = layers.add([x, residual])
return x
class DecoderBlock(tf.keras.Model):
def __init__(self, filter_size):
# initilize instance variables
super(DecoderBlock, self).__init__()
self.filter_size = filter_size
# define layers
self.layer_1 = layers.Activation('relu')
self.layer_2 = layers.Conv2DTranspose(self.filter_size, 3, padding='same')
self.layer_3 = layers.BatchNormalization()
self.layer_4 = layers.Activation('relu')
self.layer_5 = layers.Conv2DTranspose(self.filter_size, 3, padding='same')
self.layer_6 = layers.BatchNormalization()
self.layer_7 = layers.UpSampling2D(2)
# project residual
self.residual_layer_1 = layers.UpSampling2D(2)
self.residual_layer_2 = layers.Conv2D(filter_size, 1, padding='same')
def call(self, inputs):
x = self.layer_1(inputs)
x = self.layer_2(x)
x = self.layer_3(x)
x = self.layer_4(x)
x = self.layer_5(x)
x = self.layer_6(x)
x = self.layer_7(x)
residual = self.residual_layer_1(inputs)
residual = self.residual_layer_2(residual)
x = layers.add([x, residual])
return x
In [0]:
class ImageSegmentationModel(tf.keras.Model):
def __init__(self, output_channels, dynamic=True):
# initilize instance variables
super(ImageSegmentationModel, self).__init__()
self.output_channels = output_channels
self.entry_block_1 = layers.Conv2D(32, 3, strides=2, padding='same')
self.entry_block_2 = layers.BatchNormalization()
self.entry_block_3 = layers.Activation('relu')
self.encoder_block_1 = EncoderBlock(64)
self.encoder_block_2 = EncoderBlock(128)
self.encoder_block_3 = EncoderBlock(256)
self.decoder_block_1 = DecoderBlock(256)
self.decoder_block_2 = DecoderBlock(128)
self.decoder_block_3 = DecoderBlock(64)
self.decoder_block_4 = DecoderBlock(32)
self.output_layer = layers.Conv2D(
output_channels, 3, activation='sigmoid', padding='same')
def call(self, inputs):
x = self.entry_block_1(inputs)
x = self.entry_block_2(x)
x = self.entry_block_3(x)
x = self.encoder_block_1(x)
x = self.encoder_block_2(x)
x = self.encoder_block_3(x)
x = self.decoder_block_1(x)
x = self.decoder_block_2(x)
x = self.decoder_block_3(x)
x = self.decoder_block_4(x)
x = self.output_layer(x)
return x
Now, all that is left to do is to compile and train the model. The loss used here is losses.sparse_categorical_crossentropy. The reason to use this loss function is that the network is trying to assign each pixel a label, just like multi-class prediction. In the true segmentation mask, each pixel has either a {0,1,2}. The network here is outputting three channels. Essentially, each channel is trying to learn to predict a class, and losses.sparse_categorical_crossentropy is the recommended loss for such a scenario. Using the output of the network, the label assigned to the pixel is the channel with the highest value. This is what the create_mask function is doing.
In [0]:
model = ImageSegmentationModel(OUTPUT_CHANNELS)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
Let's try out the model to see what it predicts before training.
In [0]:
def create_mask(pred_mask):
pred_mask = tf.argmax(pred_mask, axis=-1)
pred_mask = pred_mask[..., tf.newaxis]
return pred_mask[0]
In [0]:
def show_predictions(dataset=None, num=1):
if dataset:
for image, mask in dataset.take(num):
pred_mask = model.predict(image)
display([image[0], mask[0], create_mask(pred_mask)])
else:
display([sample_image, sample_mask,
create_mask(model.predict(sample_image[tf.newaxis, ...]))])
In [0]:
show_predictions()
Let's observe how the model improves while it is training. To accomplish this task, a callback function is defined below. Since in this model, we did not use a pretrained model as Encoder, so the model has to learn everything from scratch. As you can see, in the first a few epochs, the model cannot really predict the mask - a blank mask was predicted. Only after about 10 epochs, the model prediction started to show something that makes sense.
In [0]:
class DisplayCallback(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
clear_output(wait=True)
show_predictions()
print ('\nSample Prediction after epoch {}\n'.format(epoch+1))
In [0]:
EPOCHS = 32
VAL_SUBSPLITS = 5
VALIDATION_STEPS = info.splits['test'].num_examples//BATCH_SIZE//VAL_SUBSPLITS
model_history = model.fit(train_dataset, epochs=EPOCHS,
steps_per_epoch=STEPS_PER_EPOCH,
validation_steps=VALIDATION_STEPS,
validation_data=test_dataset,
callbacks=[DisplayCallback()])
In [0]:
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']
epochs = range(EPOCHS)
plt.figure()
plt.plot(epochs, loss, 'r', label='Training loss')
plt.plot(epochs, val_loss, 'bo', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Value')
plt.ylim([0, 2])
plt.legend()
plt.show()
Let's make some predictions. In the interest of saving time, the number of epochs was kept small, but you may set this higher to achieve more accurate results.
In [0]:
show_predictions(test_dataset, 3)
Now that you have an understanding of what image segmentation is and how it works, you can try this tutorial out with different intermediate layer outputs, or even different pretrained model. You may also challenge yourself by trying out the Carvana image masking challenge hosted on Kaggle.
You may also want to explore the UNet model, which has cross connections between encoder and decoder block. These cross connections allow better high resolution localization since the image doesn't have to go through all the the down-sampling steps to get to the output.