This notebook will show you how to port an Estimator model to TPUEstimator.
All the lines that had to be changed in the porting are marked with a "TPU REFACTORING" comment.
You do the porting only once. TPUEstimator then works on both TPU and GPU with the use_tpu=False flag.
In [1]:
import os, re, math, json, shutil, pprint, datetime
import PIL.Image, PIL.ImageFont, PIL.ImageDraw # "pip3 install Pillow" or "pip install Pillow" if needed
import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt
from tensorflow.python.platform import tf_logging
print("Tensorflow version " + tf.__version__)
In [0]:
BATCH_SIZE = 32 #@param {type:"integer"}
BUCKET = 'gs://' #@param {type:"string"}
assert re.search(r'gs://.+', BUCKET), 'You need a GCS bucket for your Tensorboard logs. Head to http://console.cloud.google.com/storage and create one.'
training_images_file = 'gs://mnist-public/train-images-idx3-ubyte'
training_labels_file = 'gs://mnist-public/train-labels-idx1-ubyte'
validation_images_file = 'gs://mnist-public/t10k-images-idx3-ubyte'
validation_labels_file = 'gs://mnist-public/t10k-labels-idx1-ubyte'
In [0]:
#@title visualization utilities [RUN ME]
"""
This cell contains helper functions used for visualization
and downloads only. You can skip reading it. There is very
little useful Keras/Tensorflow code here.
"""
# Matplotlib config
plt.rc('image', cmap='gray_r')
plt.rc('grid', linewidth=0)
plt.rc('xtick', top=False, bottom=False, labelsize='large')
plt.rc('ytick', left=False, right=False, labelsize='large')
plt.rc('axes', facecolor='F8F8F8', titlesize="large", edgecolor='white')
plt.rc('text', color='a8151a')
plt.rc('figure', facecolor='F0F0F0')# Matplotlib fonts
MATPLOTLIB_FONT_DIR = os.path.join(os.path.dirname(plt.__file__), "mpl-data/fonts/ttf")
# pull a batch from the datasets. This code is not very nice, it gets much better in eager mode (TODO)
def dataset_to_numpy_util(training_dataset, validation_dataset, N):
# get one batch from each: 10000 validation digits, N training digits
unbatched_train_ds = training_dataset.apply(tf.data.experimental.unbatch())
v_images, v_labels = validation_dataset.make_one_shot_iterator().get_next()
t_images, t_labels = unbatched_train_ds.batch(N).make_one_shot_iterator().get_next()
# Run once, get one batch. Session.run returns numpy results
with tf.Session() as ses:
(validation_digits, validation_labels,
training_digits, training_labels) = ses.run([v_images, v_labels, t_images, t_labels])
# these were one-hot encoded in the dataset
validation_labels = np.argmax(validation_labels, axis=1)
training_labels = np.argmax(training_labels, axis=1)
return (training_digits, training_labels,
validation_digits, validation_labels)
# create digits from local fonts for testing
def create_digits_from_local_fonts(n):
font_labels = []
img = PIL.Image.new('LA', (28*n, 28), color = (0,255)) # format 'LA': black in channel 0, alpha in channel 1
font1 = PIL.ImageFont.truetype(os.path.join(MATPLOTLIB_FONT_DIR, 'DejaVuSansMono-Oblique.ttf'), 25)
font2 = PIL.ImageFont.truetype(os.path.join(MATPLOTLIB_FONT_DIR, 'STIXGeneral.ttf'), 25)
d = PIL.ImageDraw.Draw(img)
for i in range(n):
font_labels.append(i%10)
d.text((7+i*28,0 if i<10 else -4), str(i%10), fill=(255,255), font=font1 if i<10 else font2)
font_digits = np.array(img.getdata(), np.float32)[:,0] / 255.0 # black in channel 0, alpha in channel 1 (discarded)
font_digits = np.reshape(np.stack(np.split(np.reshape(font_digits, [28, 28*n]), n, axis=1), axis=0), [n, 28*28])
return font_digits, font_labels
# utility to display a row of digits with their predictions
def display_digits(digits, predictions, labels, title, n):
plt.figure(figsize=(13,3))
digits = np.reshape(digits, [n, 28, 28])
digits = np.swapaxes(digits, 0, 1)
digits = np.reshape(digits, [28, 28*n])
plt.yticks([])
plt.xticks([28*x+14 for x in range(n)], predictions)
for i,t in enumerate(plt.gca().xaxis.get_ticklabels()):
if predictions[i] != labels[i]: t.set_color('red') # bad predictions in red
plt.imshow(digits)
plt.grid(None)
plt.title(title)
# utility to display multiple rows of digits, sorted by unrecognized/recognized status
def display_top_unrecognized(digits, predictions, labels, n, lines):
idx = np.argsort(predictions==labels) # sort order: unrecognized first
for i in range(lines):
display_digits(digits[idx][i*n:(i+1)*n], predictions[idx][i*n:(i+1)*n], labels[idx][i*n:(i+1)*n],
"{} sample validation digits out of {} with bad predictions in red and sorted first".format(n*lines, len(digits)) if i==0 else "", n)
# utility to display training and validation curves
def display_training_curves(training, validation, title, subplot):
if subplot%10==1: # set up the subplots on the first call
plt.subplots(figsize=(10,10), facecolor='#F0F0F0')
plt.tight_layout()
ax = plt.subplot(subplot)
ax.grid(linewidth=1, color='white')
ax.plot(training)
ax.plot(validation)
ax.set_title('model '+ title)
ax.set_ylabel(title)
ax.set_xlabel('epoch')
ax.legend(['train', 'valid.'])
In [0]:
IS_COLAB_BACKEND = 'COLAB_GPU' in os.environ # this is always set on Colab, the value is 0 or 1 depending on GPU presence
if IS_COLAB_BACKEND:
from google.colab import auth
auth.authenticate_user() # Authenticates the backend and also the TPU using your credentials so that they can access your private GCS buckets
In [6]:
#TPU REFACTORING: detect the TPU
try: # TPU detection
tpu = tf.contrib.cluster_resolver.TPUClusterResolver() # Picks up a connected TPU on Google's Colab, ML Engine, Kubernetes and Deep Learning VMs accessed through the 'ctpu up' utility
#tpu = tf.contrib.cluster_resolver.TPUClusterResolver('MY_TPU_NAME') # If auto-detection does not work, you can pass the name of the TPU explicitly (tip: on a VM created with "ctpu up" the TPU has the same name as the VM)
print('Running on TPU ', tpu.cluster_spec().as_dict()['worker'])
USE_TPU = True
except ValueError:
tpu = None
print("Running on GPU or CPU")
USE_TPU = False
Please read the best practices for building input pipelines with tf.data.Dataset
In [0]:
def read_label(tf_bytestring):
label = tf.decode_raw(tf_bytestring, tf.uint8)
label = tf.reshape(label, [])
label = tf.one_hot(label, 10)
return label
def read_image(tf_bytestring):
image = tf.decode_raw(tf_bytestring, tf.uint8)
image = tf.cast(image, tf.float32)/256.0
image = tf.reshape(image, [28*28])
return image
def load_dataset(image_file, label_file):
imagedataset = tf.data.FixedLengthRecordDataset(image_file, 28*28, header_bytes=16)
imagedataset = imagedataset.map(read_image, num_parallel_calls=16)
labelsdataset = tf.data.FixedLengthRecordDataset(label_file, 1, header_bytes=8)
labelsdataset = labelsdataset.map(read_label, num_parallel_calls=16)
dataset = tf.data.Dataset.zip((imagedataset, labelsdataset))
return dataset
def get_training_dataset(image_file, label_file, batch_size):
dataset = load_dataset(image_file, label_file)
dataset = dataset.cache() # this small dataset can be entirely cached in RAM, for TPU this is important to get good performance from such a small dataset
dataset = dataset.shuffle(5000, reshuffle_each_iteration=True)
dataset = dataset.repeat() # Mandatory for TPU for now
dataset = dataset.batch(batch_size, drop_remainder=True) # drop_remainder is important on TPU, batch size must be fixed
dataset = dataset.prefetch(-1) # prefetch next batch while training (-1: autotune prefetch buffer size)
return dataset
#TPU REFACTORING: training and eval batch sizes must be the same: passing batch_size parameter here too
# def get_validation_dataset(image_file, label_file):
def get_validation_dataset(image_file, label_file, batch_size):
dataset = load_dataset(image_file, label_file)
dataset = dataset.cache() # this small dataset can be entirely cached in RAM, for TPU this is important to get good performance from such a small dataset
#TPU REFACTORING: training and eval batch sizes must be the same: passing batch_size parameter here too
# dataset = dataset.batch(10000, drop_remainder=True) # 10000 items in eval dataset, all in one batch
dataset = dataset.batch(batch_size, drop_remainder=True)
dataset = dataset.repeat() # Mandatory for TPU for now
return dataset
# instantiate the datasets
training_dataset = get_training_dataset(training_images_file, training_labels_file, BATCH_SIZE)
validation_dataset = get_validation_dataset(validation_images_file, validation_labels_file, 10000)
# For TPU, we will need a function that returns the dataset
# TPU REFACTORING: input_fn's must have a params argument though which TPUEstimator passes params['batch_size']
# training_input_fn = lambda: get_training_dataset(training_images_file, training_labels_file, BATCH_SIZE)
# validation_input_fn = lambda: get_validation_dataset(validation_images_file, validation_labels_file)
training_input_fn = lambda params: get_training_dataset(training_images_file, training_labels_file, params['batch_size'])
validation_input_fn = lambda params: get_validation_dataset(validation_images_file, validation_labels_file, params['batch_size'])
In [8]:
N = 24
(training_digits, training_labels,
validation_digits, validation_labels) = dataset_to_numpy_util(training_dataset, validation_dataset, N)
display_digits(training_digits, training_labels, training_labels, "training digits and their labels", N)
display_digits(validation_digits[:N], validation_labels[:N], validation_labels[:N], "validation digits and their labels", N)
font_digits, font_labels = create_digits_from_local_fonts(N)
If you are not sure what cross-entropy, dropout, softmax or batch-normalization mean, head here for a crash-course: Tensorflow and deep learning without a PhD
In [0]:
# This model trains to 99.4% sometimes 99.5% accuracy in 10 epochs
# TPU REFACTORING: model_fn must have a params argument. TPUEstimator passes batch_size and use_tpu into it
#def model_fn(features, labels, mode):
def model_fn(features, labels, mode, params):
is_training = (mode == tf.estimator.ModeKeys.TRAIN)
x = features
y = tf.reshape(x, [-1, 28, 28, 1])
y = tf.layers.Conv2D(filters=6, kernel_size=3, padding='same', use_bias=False)(y) # no bias necessary before batch norm
y = tf.layers.BatchNormalization(scale=False, center=True)(y, training=is_training) # no batch norm scaling necessary before "relu"
y = tf.nn.relu(y) # activation after batch norm
y = tf.layers.Conv2D(filters=12, kernel_size=6, padding='same', use_bias=False, strides=2)(y)
y = tf.layers.BatchNormalization(scale=False, center=True)(y, training=is_training)
y = tf.nn.relu(y)
y = tf.layers.Conv2D(filters=24, kernel_size=6, padding='same', use_bias=False, strides=2)(y)
y = tf.layers.BatchNormalization(scale=False, center=True)(y, training=is_training)
y = tf.nn.relu(y)
y = tf.layers.Flatten()(y)
y = tf.layers.Dense(200, use_bias=False)(y)
y = tf.layers.BatchNormalization(scale=False, center=True)(y, training=is_training)
y = tf.nn.relu(y)
y = tf.layers.Dropout(0.5)(y, training=is_training)
logits = tf.layers.Dense(10)(y)
predictions = tf.nn.softmax(logits)
classes = tf.math.argmax(predictions, axis=-1)
if (mode != tf.estimator.ModeKeys.PREDICT):
loss = tf.losses.softmax_cross_entropy(labels, logits)
step = tf.train.get_or_create_global_step()
# TPU REFACTORING: step is now increased once per GLOBAL_BATCH_SIZE = 8*BATCH_SIZE. Must adjust learning rate schedule accordingly
# lr = 0.0001 + tf.train.exponential_decay(0.01, step, 2000, 1/math.e)
lr = 0.0001 + tf.train.exponential_decay(0.01, step, 2000//8, 1/math.e)
# TPU REFACTORING: custom Tensorboard summaries do not work. Only default Estimator summaries will appear in Tensorboard.
# tf.summary.scalar("learn_rate", lr)
optimizer = tf.train.AdamOptimizer(lr)
# TPU REFACTORING: wrap the optimizer in a CrossShardOptimizer: this implements the multi-core training logic
if params['use_tpu']:
optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
# little wrinkle: batch norm uses running averages which need updating after each batch. create_train_op does it, optimizer.minimize does not.
train_op = tf.contrib.training.create_train_op(loss, optimizer)
#train_op = optimizer.minimize(loss, tf.train.get_or_create_global_step())
# TPU REFACTORING: a metrics_fn is needed for TPU
# metrics = {'accuracy': tf.metrics.accuracy(classes, tf.math.argmax(labels, axis=-1))}
metric_fn = lambda classes, labels: {'accuracy': tf.metrics.accuracy(classes, tf.math.argmax(labels, axis=-1))}
tpu_metrics = (metric_fn, [classes, labels]) # pair of metric_fn and its list of arguments, there can be multiple pairs in a list
# metric_fn will run on CPU, not TPU: more operations are allowed
else:
loss = train_op = metrics = tpu_metrics = None # None of these can be computed in prediction mode because labels are not available
# TPU REFACTORING: EstimatorSpec => TPUEstimatorSpec
## return tf.estimator.EstimatorSpec(
return tf.contrib.tpu.TPUEstimatorSpec(
mode=mode,
predictions={"predictions": predictions, "classes": classes}, # name these fields as you like
loss=loss,
train_op=train_op,
# TPU REFACTORING: a metrics_fn is needed for TPU, passed into the eval_metrics field instead of eval_metrics_ops
# eval_metric_ops=metrics
eval_metrics = tpu_metrics
)
In [0]:
# Called once when the model is saved. This function produces a Tensorflow
# graph of operations that will be prepended to your model graph. When
# your model is deployed as a REST API, the API receives data in JSON format,
# parses it into Tensors, then sends the tensors to the input graph generated by
# this function. The graph can transform the data so it can be sent into your
# model input_fn. You can do anything you want here as long as you do it with
# tf.* functions that produce a graph of operations.
def serving_input_fn():
# placeholder for the data received by the API (already parsed, no JSON decoding necessary,
# but the JSON must contain one or multiple 'image' key(s) with 28x28 greyscale images as content.)
inputs = {"serving_input": tf.placeholder(tf.float32, [None, 28, 28])} # the shape of this dict should match the shape of your JSON
features = inputs['serving_input'] # no transformation needed
return tf.estimator.export.TensorServingInputReceiver(features, inputs) # features are the features needed by your model_fn
# Return a ServingInputReceiver if your features are a dictionary of Tensors, TensorServingInputReceiver if they are a straight Tensor
In [0]:
EPOCHS = 10
# TPU_REFACTORING: to use all 8 cores, increase the batch size by 8
GLOBAL_BATCH_SIZE = BATCH_SIZE * 8
# TPU_REFACTORING: TPUEstimator increments the step once per GLOBAL_BATCH_SIZE: must adjust epoch length accordingly
# steps_per_epoch = 60000 // BATCH_SIZE # 60,000 images in training dataset
steps_per_epoch = 60000 // GLOBAL_BATCH_SIZE # 60,000 images in training dataset
MODEL_EXPORT_NAME = "mnist" # name for exporting saved model
# TPU_REFACTORING: the TPU will run multiple steps of training before reporting back
TPU_ITERATIONS_PER_LOOP = steps_per_epoch # report back after each epoch
tf_logging.set_verbosity(tf_logging.INFO)
now = datetime.datetime.now()
MODEL_DIR = BUCKET+"/mnistjobs/job" + "-{}-{:02d}-{:02d}-{:02d}:{:02d}:{:02d}".format(now.year, now.month, now.day, now.hour, now.minute, now.second)
# TPU REFACTORING: the RunConfig has changed
#training_config = tf.estimator.RunConfig(model_dir=MODEL_DIR, save_summary_steps=10, save_checkpoints_steps=steps_per_epoch, log_step_count_steps=steps_per_epoch/4)
training_config = tf.contrib.tpu.RunConfig(
cluster=tpu,
model_dir=MODEL_DIR,
tpu_config=tf.contrib.tpu.TPUConfig(TPU_ITERATIONS_PER_LOOP))
# TPU_REFACTORING: exporters do not work yet. Must call export_savedmodel manually after training
#export_latest = tf.estimator.LatestExporter(MODEL_EXPORT_NAME, serving_input_receiver_fn=serving_input_fn)
# TPU_REFACTORING: Estimator => TPUEstimator
#estimator = tf.estimator.Estimator(model_fn=model_fn, config=training_config)
estimator = tf.contrib.tpu.TPUEstimator(
model_fn=model_fn,
model_dir=MODEL_DIR,
# TPU_REFACTORING: training and eval batch size must be the same for now
train_batch_size=GLOBAL_BATCH_SIZE,
eval_batch_size=10000, # 10000 digits in eval dataset
predict_batch_size=10000, # prediction on the entire eval dataset in the demo below
config=training_config,
use_tpu=USE_TPU,
# TPU REFACTORING: setting the kind of model export we want
export_to_tpu=False) # we want an exported model for CPU/GPU inference because that is what is supported on ML Engine
# TPU REFACTORING: train_and_evaluate does not work on TPU yet, TrainSpec not needed
# train_spec = tf.estimator.TrainSpec(training_input_fn, max_steps=EPOCHS*steps_per_epoch)
# TPU REFACTORING: train_and_evaluate does not work on TPU yet, EvalSpec not needed
# eval_spec = tf.estimator.EvalSpec(validation_input_fn, steps=1, exporters=export_latest, throttle_secs=0) # no eval throttling: evaluates after each checkpoint
# TPU REFACTORING: train_and_evaluate does not work on TPU yet, must train then eval manually
# tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
estimator.train(training_input_fn, steps=steps_per_epoch*EPOCHS)
estimator.evaluate(input_fn=validation_input_fn, steps=1)
# TPU REFACTORING: exporters do not work yet. Must call export_savedmodel manually after training
estimator.export_savedmodel(os.path.join(MODEL_DIR, MODEL_EXPORT_NAME), serving_input_fn)
tf_logging.set_verbosity(tf_logging.WARN)
In [13]:
# recognize digits from local fonts
# TPU REFACTORING: TPUEstimator.predict requires a 'params' in ints input_fn so that it can pass params['batch_size']
#predictions = estimator.predict(lambda: tf.data.Dataset.from_tensor_slices(font_digits).batch(N),
predictions = estimator.predict(lambda params: tf.data.Dataset.from_tensor_slices(font_digits).batch(N),
yield_single_examples=False) # the returned value is a generator that will yield one batch of predictions per next() call
predicted_font_classes = next(predictions)['classes']
display_digits(font_digits, predicted_font_classes, font_labels, "predictions from local fonts (bad predictions in red)", N)
# recognize validation digits
predictions = estimator.predict(validation_input_fn,
yield_single_examples=False) # the returned value is a generator that will yield one batch of predictions per next() call
predicted_labels = next(predictions)['classes']
display_top_unrecognized(validation_digits, predicted_labels, validation_labels, N, 7)
Push your trained model to production on ML Engine for a serverless, autoscaled, REST API experience.
You will need a GCS bucket and a GCP project for this. Models deployed on ML Engine autoscale to zero if not used. There will be no ML Engine charges after you are done testing. Google Cloud Storage incurs charges. Empty the bucket after deployment if you want to avoid these. Once the model is deployed, the bucket is not useful anymore.
In [0]:
PROJECT = "" #@param {type:"string"}
NEW_MODEL = True #@param {type:"boolean"}
MODEL_NAME = "estimator_mnist_tpu" #@param {type:"string"}
MODEL_VERSION = "v0" #@param {type:"string"}
assert PROJECT, 'For this part, you need a GCP project. Head to http://console.cloud.google.com/ and create one.'
#TPU REFACTORING: TPUEstimator does not create the 'export' subfolder
#export_path = os.path.join(MODEL_DIR, 'export', MODEL_EXPORT_NAME)
export_path = os.path.join(MODEL_DIR, MODEL_EXPORT_NAME)
last_export = sorted(tf.gfile.ListDirectory(export_path))[-1]
export_path = os.path.join(export_path, last_export)
print('Saved model directory found: ', export_path)
This uses the command-line interface. You can do the same thing through the ML Engine UI at https://console.cloud.google.com/mlengine/models
In [0]:
# Create the model
if NEW_MODEL:
!gcloud ml-engine models create {MODEL_NAME} --project={PROJECT} --regions=us-central1
In [0]:
# Create a version of this model (you can add --async at the end of the line to make this call non blocking)
# Additional config flags are available: https://cloud.google.com/ml-engine/reference/rest/v1/projects.models.versions
# You can also deploy a model that is stored locally by providing a --staging-bucket=... parameter
!echo "Deployment takes a couple of minutes. You can watch your deployment here: https://console.cloud.google.com/mlengine/models/{MODEL_NAME}"
!gcloud ml-engine versions create {MODEL_VERSION} --model={MODEL_NAME} --origin={export_path} --project={PROJECT} --runtime-version=1.10
In [0]:
# prepare digits to send to online prediction endpoint
digits = np.concatenate((font_digits, validation_digits[:100-N]))
labels = np.concatenate((font_labels, validation_labels[:100-N]))
with open("digits.json", "w") as f:
for digit in digits:
# the format for ML Engine online predictions is: one JSON object per line
data = json.dumps({"serving_input": digit.tolist()}) # "serving_input" because that is what you defined in your serving_input_fn: {"serving_input": tf.placeholder(tf.float32, [None, 28, 28])}
f.write(data+'\n')
In [0]:
# Request online predictions from deployed model (REST API) using the "gcloud ml-engine" command line.
predictions = !gcloud ml-engine predict --model={MODEL_NAME} --json-instances digits.json --project={PROJECT} --version {MODEL_VERSION}
predictions = np.array([int(p.split('[')[0]) for p in predictions[1:]]) # first line is the name of the input layer: drop it, parse the rest
display_top_unrecognized(digits, predictions, labels, N, 100//N)
author: Martin Gorner
twitter: @martin_gorner
Copyright 2018 Google LLC
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
This is not an official Google product but sample code provided for an educational purpose