Copyright 2018 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

 http://www.apache.org/licenses/LICENSE-2.0  

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


In [ ]:
# Get the dependency .py files, if any.
! git clone https://github.com/GoogleCloudPlatform/cloudml-samples.git
! cp cloudml-samples/census/tensorflowcore/trainer/* .

In [ ]:
"""A Feed forward neural network using TensorFlow Core APIs.

It implements a binary classifier for Census Income Dataset using both single
and distributed node cluster.
"""

In [ ]:
import argparse
import json
import os
import threading
import six
import tensorflow as tf
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import lookup_ops
from tensorflow.python.ops import variables
from tensorflow.python.saved_model import signature_constants as sig_constants
import model as model

In [ ]:
class EvaluationRunHook(tf.train.SessionRunHook):
  """EvaluationRunHook performs continuous evaluation of the model.

  Args:
    checkpoint_dir (string): Dir to store model checkpoints
    metric_dir (string): Dir to store metrics like accuracy and auroc
    graph (tf.Graph): Evaluation graph
    eval_frequency (int): Frequency of evaluation every n train steps
    eval_steps (int): Evaluation steps to be performed
  """

  def __init__(self,
               checkpoint_dir,
               metric_dict,
               graph,
               eval_frequency,
               eval_steps=None,
               **kwargs):

    self._eval_steps = eval_steps
    self._checkpoint_dir = checkpoint_dir
    self._kwargs = kwargs
    self._eval_every = eval_frequency
    self._latest_checkpoint = None
    self._checkpoints_since_eval = 0
    self._graph = graph

    # With the graph object as default graph.
    # See https://www.tensorflow.org/api_docs/python/tf/Graph#as_default
    # Adds ops to the graph object
    with graph.as_default():
      value_dict, update_dict = tf.contrib.metrics.aggregate_metric_map(
          metric_dict)

      # Op that creates a Summary protocol buffer by merging summaries
      self._summary_op = tf.summary.merge([
          tf.summary.scalar(name, value_op)
          for name, value_op in six.iteritems(value_dict)
      ])

      # Saver class add ops to save and restore
      # variables to and from checkpoint
      self._saver = tf.train.Saver()

      # Creates a global step to contain a counter for
      # the global training step
      self._gs = tf.train.get_or_create_global_step()

      self._final_ops_dict = value_dict
      self._eval_ops = update_dict.values()

    # MonitoredTrainingSession runs hooks in background threads
    # and it doesn't wait for the thread from the last session.run()
    # call to terminate to invoke the next hook, hence locks.
    self._eval_lock = threading.Lock()
    self._checkpoint_lock = threading.Lock()
    self._file_writer = tf.summary.FileWriter(
        os.path.join(checkpoint_dir, 'eval'), graph=graph)

  def after_run(self, run_context, run_values):
    # Always check for new checkpoints in case a single evaluation
    # takes longer than checkpoint frequency and _eval_every is >1
    self._update_latest_checkpoint()

    if self._eval_lock.acquire(False):
      try:
        if self._checkpoints_since_eval > self._eval_every:
          self._checkpoints_since_eval = 0
          self._run_eval()
      finally:
        self._eval_lock.release()

  def _update_latest_checkpoint(self):
    """Update the latest checkpoint file created in the output dir."""
    if self._checkpoint_lock.acquire(False):
      try:
        latest = tf.train.latest_checkpoint(self._checkpoint_dir)
        if latest != self._latest_checkpoint:
          self._checkpoints_since_eval += 1
          self._latest_checkpoint = latest
      finally:
        self._checkpoint_lock.release()

  def end(self, session):
    """Called at then end of session to make sure we always evaluate."""
    self._update_latest_checkpoint()

    with self._eval_lock:
      self._run_eval()

  def _run_eval(self):
    """Run model evaluation and generate summaries."""
    coord = tf.train.Coordinator(clean_stop_exception_types=(
        tf.errors.CancelledError, tf.errors.OutOfRangeError))

    with tf.Session(graph=self._graph) as session:
      # Restores previously saved variables from latest checkpoint
      self._saver.restore(session, self._latest_checkpoint)

      session.run([
          tf.tables_initializer(),
          tf.local_variables_initializer()])
      tf.train.start_queue_runners(coord=coord, sess=session)
      train_step = session.run(self._gs)

      tf.logging.info('Starting Evaluation For Step: {}'.format(train_step))
      with coord.stop_on_exception():
        eval_step = 0
        while not coord.should_stop() and (self._eval_steps is None or
                                           eval_step < self._eval_steps):
          summaries, final_values, _ = session.run(
              [self._summary_op, self._final_ops_dict, self._eval_ops])
          if eval_step % 100 == 0:
            tf.logging.info('On Evaluation Step: {}'.format(eval_step))
          eval_step += 1

      # Write the summaries
      self._file_writer.add_summary(summaries, global_step=train_step)
      self._file_writer.flush()
      tf.logging.info(final_values)

In [ ]:
def run(target, cluster_spec, is_chief, args):

  """Runs the training and evaluation graph.

  Args:
    target (str): Tensorflow server target.
    cluster_spec: (cluster spec) Cluster specification.
    is_chief (bool): Boolean flag to specify a chief server.
    args (args): Input Arguments.
  """

  # Calculate the number of hidden units
  hidden_units = [
      max(2, int(args.first_layer_size * args.scale_factor**i))
      for i in range(args.num_layers)
  ]

  # If the server is chief which is `master`
  # In between graph replication Chief is one node in
  # the cluster with extra responsibility and by default
  # is worker task zero. We have assigned master as the chief.
  #
  # See https://youtu.be/la_M6bCV91M?t=1203 for details on
  # distributed TensorFlow and motivation about chief.
  if is_chief:
    tf.logging.info('Created DNN hidden units {}'.format(hidden_units))
    evaluation_graph = tf.Graph()
    with evaluation_graph.as_default():

      # Features and label tensors
      features, labels = model.input_fn(
        args.eval_files,
          num_epochs=None if args.eval_steps else 1,
          batch_size=args.eval_batch_size,
          shuffle=False
      )
      # Accuracy and AUROC metrics
      # model.model_fn returns the dict when EVAL mode
      metric_dict = model.model_fn(
          model.EVAL,
          features.copy(),
          labels,
          hidden_units=hidden_units,
          learning_rate=args.learning_rate
      )

    hooks = [EvaluationRunHook(
        args.job_dir,
        metric_dict,
        evaluation_graph,
        args.eval_frequency,
        eval_steps=args.eval_steps,
    )]
  else:
    hooks = []

  # Create a new graph and specify that as default.
  with tf.Graph().as_default():
    # Placement of ops on devices using replica device setter
    # which automatically places the parameters on the `ps` server
    # and the `ops` on the workers.
    #
    # See:
    # https://www.tensorflow.org/api_docs/python/tf/train/replica_device_setter
    with tf.device(tf.train.replica_device_setter(cluster=cluster_spec)):

      # Features and label tensors as read using filename queue.
      features, labels = model.input_fn(
          args.train_files,
          num_epochs=args.num_epochs,
          batch_size=args.train_batch_size
      )

      # Returns the training graph and global step tensor.
      train_op, global_step_tensor = model.model_fn(
          model.TRAIN,
          features.copy(),
          labels,
          hidden_units=hidden_units,
          learning_rate=args.learning_rate
      )

    # Creates a MonitoredSession for training.
    # MonitoredSession is a Session-like object that handles
    # initialization, recovery and hooks
    # https://www.tensorflow.org/api_docs/python/tf/train/MonitoredTrainingSession
    with tf.train.MonitoredTrainingSession(master=target,
                                           is_chief=is_chief,
                                           checkpoint_dir=args.job_dir,
                                           hooks=hooks,
                                           save_checkpoint_secs=20,
                                           save_summaries_steps=50) as session:
      # Global step to keep track of global number of steps particularly in
      # distributed setting
      step = global_step_tensor.eval(session=session)

      # Run the training graph which returns the step number as tracked by
      # the global step tensor.
      # When train epochs is reached, session.should_stop() will be true.
      while (args.train_steps is None or
             step < args.train_steps) and not session.should_stop():
        step, _ = session.run([global_step_tensor, train_op])

    # Find the filename of the latest saved checkpoint file
    latest_checkpoint = tf.train.latest_checkpoint(args.job_dir)

    # Only perform this if chief
    if is_chief:
      build_and_run_exports(latest_checkpoint,
                            args.job_dir,
                            model.SERVING_INPUT_FUNCTIONS[args.export_format],
                            hidden_units)

In [ ]:
def main_op():
  init_local = variables.local_variables_initializer()
  init_tables = lookup_ops.tables_initializer()
  return control_flow_ops.group(init_local, init_tables)

In [ ]:
def build_and_run_exports(latest, job_dir, serving_input_fn, hidden_units):
  """Given the latest checkpoint file export the saved model.

  Args:
    latest (str): Latest checkpoint file.
    job_dir (str): Location of checkpoints and model files.
    serving_input_fn (str): Serving Function
    hidden_units (list): Number of hidden units.
  """

  prediction_graph = tf.Graph()
  # Create exporter.
  exporter = tf.saved_model.builder.SavedModelBuilder(
      os.path.join(job_dir, 'export'))
  with prediction_graph.as_default():
    features, inputs_dict = serving_input_fn()
    prediction_dict = model.model_fn(
        model.PREDICT,
        features.copy(),
        None,  # labels
        hidden_units=hidden_units,
        learning_rate=None  # learning_rate unused in prediction mode
    )
    saver = tf.train.Saver()

    inputs_info = {
        name: tf.saved_model.utils.build_tensor_info(tensor)
        for name, tensor in six.iteritems(inputs_dict)
    }
    output_info = {
        name: tf.saved_model.utils.build_tensor_info(tensor)
        for name, tensor in six.iteritems(prediction_dict)
    }
    signature_def = tf.saved_model.signature_def_utils.build_signature_def(
        inputs=inputs_info,
        outputs=output_info,
        method_name=sig_constants.PREDICT_METHOD_NAME
    )

  with tf.Session(graph=prediction_graph) as session:
    session.run([tf.local_variables_initializer(), tf.tables_initializer()])
    saver.restore(session, latest)
    exporter.add_meta_graph_and_variables(
        session,
        tags=[tf.saved_model.tag_constants.SERVING],
        signature_def_map={
            sig_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature_def
        },
        legacy_init_op=main_op()
    )
  exporter.save()

In [ ]:
def train_and_evaluate(args):
  """Parse TF_CONFIG to cluster_spec and call run() method.

  TF_CONFIG environment variable is available when running using
  gcloud either locally or on cloud. It has all the information required
  to create a ClusterSpec which is important for running distributed code.

  Args:
    args (args): Input arguments.
  """

  tf_config = os.environ.get('TF_CONFIG')
  # If TF_CONFIG is not available run local.
  if not tf_config:
    return run(target='', cluster_spec=None, is_chief=True, args=args)

  tf_config_json = json.loads(tf_config)
  cluster = tf_config_json.get('cluster')
  job_name = tf_config_json.get('task', {}).get('type')
  task_index = tf_config_json.get('task', {}).get('index')

  # If cluster information is empty run local.
  if job_name is None or task_index is None:
    return run(target='', cluster_spec=None, is_chief=True, args=args)

  cluster_spec = tf.train.ClusterSpec(cluster)
  server = tf.train.Server(cluster_spec,
                           job_name=job_name,
                           task_index=task_index)

  # Wait for incoming connections forever.
  # Worker ships the graph to the ps server.
  # The ps server manages the parameters of the model.
  #
  # See a detailed video on distributed TensorFlow
  # https://www.youtube.com/watch?v=la_M6bCV91M
  if job_name == 'ps':
    server.join()
    return
  elif job_name in ['master', 'worker']:
    return run(server.target, cluster_spec, is_chief=(job_name == 'master'),
               args=args)

In [ ]:
parser = argparse.ArgumentParser()
# Input Arguments
parser.add_argument(
    '--train-files',
    nargs='+',
    help='Training files local or GCS',
    default='gs://cloud-samples-data/ml-engine/census/data/adult.data.csv')
parser.add_argument(
    '--eval-files',
    nargs='+',
    help='Evaluation files local or GCS',
    default='gs://cloud-samples-data/ml-engine/census/data/adult.test.csv')
parser.add_argument(
    '--job-dir',
    type=str,
    help="""GCS or local dir for checkpoints, exports, and summaries.
      Use an existing directory to load a trained model, or a new directory
      to retrain""",
    default='/tmp/census-tensorflowcore')
parser.add_argument(
    '--train-steps',
    type=int,
    help='Maximum number of training steps to perform.')
parser.add_argument(
    '--eval-steps',
    help="""Number of steps to run evalution for at each checkpoint.
    If unspecified, will run for 1 full epoch over training data""",
    default=None,
    type=int)
parser.add_argument(
    '--train-batch-size',
    type=int,
    default=40,
    help='Batch size for training steps')
parser.add_argument(
    '--eval-batch-size',
    type=int,
    default=40,
    help='Batch size for evaluation steps')
parser.add_argument(
    '--learning-rate',
    type=float,
    default=0.003,
    help='Learning rate for SGD')
parser.add_argument(
    '--eval-frequency',
    default=50,
    help='Perform one evaluation per n steps')
parser.add_argument(
    '--first-layer-size',
    type=int,
    default=256,
    help='Number of nodes in the first layer of DNN')
parser.add_argument(
    '--num-layers',
    type=int,
    default=2,
    help='Number of layers in DNN')
parser.add_argument(
    '--scale-factor',
    type=float,
    default=0.25,
    help="""Rate of decay size of layer for Deep Neural Net.
    max(2, int(first_layer_size * scale_factor**i)) """)
parser.add_argument(
    '--num-epochs',
    type=int,
    help='Maximum number of epochs on which to train')
parser.add_argument(
    '--export-format',
    help='The input format of the exported SavedModel binary',
    choices=['JSON', 'CSV', 'EXAMPLE'],
    default='JSON')
parser.add_argument(
    '--verbosity',
    choices=['DEBUG', 'ERROR', 'FATAL', 'INFO', 'WARN'],
    default='INFO',
    help='Set logging verbosity')

args, _ = parser.parse_known_args()

# Set python level verbosity
tf.logging.set_verbosity(args.verbosity)
# Set C++ Graph Execution level verbosity
os.environ['TF_CPP_MIN_LOG_LEVEL'] = str(
    tf.logging.__dict__[args.verbosity] / 10)

# Run the training job.
train_and_evaluate(args)