Demos!!

Train and Deploy Tensorflow AI Model (Simple Model, Immutable Deploy)

Train Tensorflow AI Model


In [ ]:
# make things wide
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

from IPython.display import clear_output, Image, display, HTML

def strip_consts(graph_def, max_const_size=32):
    """Strip large constant values from graph_def."""
    strip_def = tf.GraphDef()
    for n0 in graph_def.node:
        n = strip_def.node.add() 
        n.MergeFrom(n0)
        if n.op == 'Const':
            tensor = n.attr['value'].tensor
            size = len(tensor.tensor_content)
            if size > max_const_size:
                tensor.tensor_content = "<stripped %d bytes>"%size
    return strip_def

def show_graph(graph_def=None, width=1200, height=800, max_const_size=32, ungroup_gradients=False):
    if not graph_def:
        graph_def = tf.get_default_graph().as_graph_def()
        
    """Visualize TensorFlow graph."""
    if hasattr(graph_def, 'as_graph_def'):
        graph_def = graph_def.as_graph_def()
    strip_def = strip_consts(graph_def, max_const_size=max_const_size)
    data = str(strip_def)
    if ungroup_gradients:
        data = data.replace('"gradients/', '"b_')
        #print(data)
    code = """
        <script>
          function load() {{
            document.getElementById("{id}").pbtxt = {data};
          }}
        </script>
        <link rel="import" href="https://tensorboard.appspot.com/tf-graph-basic.build.html" onload=load()>
        <div style="height:600px">
          <tf-graph-basic id="{id}"></tf-graph-basic>
        </div>
    """.format(data=repr(data), id='graph'+str(np.random.rand()))

    iframe = """
        <iframe seamless style="width:{}px;height:{}px;border:0" srcdoc="{}"></iframe>
    """.format(width, height, code.replace('"', '&quot;'))
    display(HTML(iframe))

In [ ]:
import numpy as np
import os
import tensorflow as tf
from tensorflow.contrib.session_bundle import exporter
import time

In [ ]:
# If this errors out, increment the `export_version` variable, restart the Kernel, and re-run

flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer("batch_size", 10, "The batch size to train")
flags.DEFINE_integer("epoch_number", 10, "Number of epochs to run trainer")
flags.DEFINE_integer("steps_to_validate", 1,
                     "Steps to validate and print loss")
flags.DEFINE_string("checkpoint_dir", "./checkpoint/",
                    "indicates the checkpoint dirctory")
#flags.DEFINE_string("model_path", "./model/", "The export path of the model")
flags.DEFINE_string("model_path", "/root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/", "The export path of the model")
flags.DEFINE_integer("export_version", 33, "The version number of the model")

In [ ]:
# If this errors out, increment the `export_version` variable, restart the Kernel, and re-run

def main():
  # Define training data
  x = np.ones(FLAGS.batch_size)
  y = np.ones(FLAGS.batch_size)

  # Define the model
  X = tf.placeholder(tf.float32, shape=[None], name="X")
  Y = tf.placeholder(tf.float32, shape=[None], name="yhat")
  w = tf.Variable(1.0, name="weight")
  b = tf.Variable(1.0, name="bias")
  loss = tf.square(Y - tf.mul(X, w) - b)
  train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)
  predict_op  = tf.mul(X, w) + b

  saver = tf.train.Saver()
  checkpoint_dir = FLAGS.checkpoint_dir
  checkpoint_file = checkpoint_dir + "/checkpoint.ckpt"
  if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)
    
  # Start the session
  with tf.Session() as sess:
    sess.run(tf.initialize_all_variables())

    ckpt = tf.train.get_checkpoint_state(checkpoint_dir)
    if ckpt and ckpt.model_checkpoint_path:
      print("Continue training from the model {}".format(ckpt.model_checkpoint_path))
      saver.restore(sess, ckpt.model_checkpoint_path)

    saver_def = saver.as_saver_def()
    print(saver_def.filename_tensor_name)
    print(saver_def.restore_op_name)

    # Start training
    start_time = time.time()
    for epoch in range(FLAGS.epoch_number):
      sess.run(train_op, feed_dict={X: x, Y: y})

      # Start validating
      if epoch % FLAGS.steps_to_validate == 0:
        end_time = time.time()
        print("[{}] Epoch: {}".format(end_time - start_time, epoch))

        saver.save(sess, checkpoint_file)
        tf.train.write_graph(sess.graph_def, checkpoint_dir, 'trained_model.pb', as_text=False)
        tf.train.write_graph(sess.graph_def, checkpoint_dir, 'trained_model.txt', as_text=True)

        start_time = end_time

    # Print model variables
    w_value, b_value = sess.run([w, b])
    print("The model of w: {}, b: {}".format(w_value, b_value))

    # Export the model
    print("Exporting trained model to {}".format(FLAGS.model_path))
    model_exporter = exporter.Exporter(saver)
    model_exporter.init(
      sess.graph.as_graph_def(),
      named_graph_signatures={
        'inputs': exporter.generic_signature({"features": X}),
        'outputs': exporter.generic_signature({"prediction": predict_op})
      })
    model_exporter.export(FLAGS.model_path, tf.constant(FLAGS.export_version), sess)
    print('Done exporting!')

if __name__ == "__main__":
  main()

In [ ]:
show_graph()

Deploy New Tensorflow AI Model


In [ ]:
!ls -l /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export

In [ ]:
!ls -l /root/pipeline/prediction.ml/tensorflow/models/tensorflow_minimal/export/00000033

TODO: Use Airflow Workflow to Deploy New Model

Optimizing Tensorflow Models for Serving

Python Scripts

optimize_graph_for_inference.py

Pete Warden's Blog

Graph Transform Tool

Compile (Tensorflow 1.0+)

XLA Compiler

Compiles 3 graph operations (input, operation, output) into 1 operation

Removes need for Tensorflow Runtime (20 MB is significant on tiny devices)

Allows new backends for hardware-specific optimizations (better portability)

tfcompile

Convert Graph into executable code

Compress/Distill Ensemble Models

Convert ensembles or other complex models into smaller models

Re-score training data with output of model being distilled

Train smaller model to produce same output

Output of smaller model learns more information than original label

Optimizing Serving Runtime Environment

Throughput

Option 1: Add more Tensorflow Serving servers behind load balancer

Option 2: Enable request batching in each Tensorflow Serving

Option Trade-offs: Higher Latency (bad) for Higher Throughput (good)

$TENSORFLOW_SERVING_HOME/bazel-bin/tensorflow_serving/model_servers/tensorflow_model_server 
--port=9000 
--model_name=tensorflow_minimal 
--model_base_path=/root/models/tensorflow_minimal/export
--enable_batching=true
--max_batch_size=1000000
--batch_timeout_micros=10000
--max_enqueued_batches=1000000

Latency

The deeper the model, the longer the latency

Start inference in parallel where possible (ie. user inference in parallel with item inference)

Pre-load common inputs from database (ie. user attributes, item attributes)

Pre-compute/partial-compute common inputs (ie. popular word embeddings)

Memory

Word embeddings are huge!

Use hashId for each word

Off-load embedding matrices to parameter server and share between serving servers