In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Graph-based Neural Structured Learning in TFX

This tutorial describes graph regularization by synthesizing a graph from input data and demonstrates an end-to-end workflow for sentiment classifcation in a TFX pipeline.

Overview

This notebook classifies movie reviews as positive or negative using the text of the review. This is an example of binary classification, an important and widely applicable kind of machine learning problem.

We will demonstrate the use of graph regularization in this notebook by building a graph from the given input. The general recipe for building a graph-regularized model using the Neural Structured Learning (NSL) framework when the input does not contain an explicit graph is as follows:

  1. Create embeddings for each text sample in the input. This can be done using pre-trained models such as word2vec, Swivel, BERT etc.
  2. Build a graph based on these embeddings by using a similarity metric such as the 'L2' distance, 'cosine' distance, etc. Nodes in the graph correspond to samples and edges in the graph correspond to similarity between pairs of samples.
  3. Generate training data from the above synthesized graph and sample features. The resulting training data will contain neighbor features in addition to the original node features.
  4. Create a neural network as a base model using Estimators.
  5. Wrap the base model with the add_graph_regularization wrapper function, which is provided by the NSL framework, to create a new graph Estimator model. This new model will include a graph regularization loss as the regularization term in its training objective.
  6. Train and evaluate the graph Estimator model.

In this tutorial, we integrate the above workflow in a TFX pipeline using several custom TFX components as well as a custom graph-regularized trainer component.

Install Required Packages

  1. Select TensorFlow 2.x and create an interactive development environment with eager execution.
  2. Install the Neural Structured Learning package.
  3. Install tensorflow-hub.

In [0]:
!pip install -q -U \
  tensorflow-gpu==2.0.0 \
  tfx==0.15.0rc0 \
  neural-structured-learning \
  tensorflow-hub \
  tensorflow-datasets

Dependencies and imports


In [0]:
import gzip as gzip_lib
import numpy as np
import os
import pprint
import shutil
import tempfile
import urllib
pp = pprint.PrettyPrinter()

import tensorflow as tf

import tfx
from tfx.components.base import base_component
from tfx.components.base import base_executor
from tfx.components.base import executor_spec
from tfx.components.evaluator.component import Evaluator
from tfx.components.example_gen.import_example_gen.component import ImportExampleGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.component import Trainer
from tfx.components.transform.component import Transform
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import evaluator_pb2
from tfx.proto import example_gen_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.utils.dsl_utils import external_input

from tfx.types import artifact
from tfx.types import artifact_utils
from tfx.types import standard_artifacts
from tfx.types.component_spec import ChannelParameter
from tfx.types.component_spec import ExecutionParameter

from tensorflow_metadata.proto.v0 import anomalies_pb2
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_metadata.proto.v0 import statistics_pb2

import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
import tensorflow_data_validation as tfdv
import neural_structured_learning as nsl
import tensorflow_hub as hub
import tensorflow_datasets as tfds

print("TF Version: ", tf.__version__)
print("Eager mode: ", tf.executing_eagerly())
print("GPU is", "available" if tf.test.is_gpu_available() else "NOT AVAILABLE")
print("NSL Version: ", nsl.__version__)
print("TFX Version: ", tfx.__version__)
print("Hub version: ", hub.__version__)

IMDB dataset

The IMDB dataset contains the text of 50,000 movie reviews from the Internet Movie Database. These are split into 25,000 reviews for training and 25,000 reviews for testing. The training and testing sets are balanced, meaning they contain an equal number of positive and negative reviews. Moreover, there are 50,000 additional unlabeled movie reviews.

Download preprocessed IMDB dataset

The following code downloads the IMDB dataset (or uses a cached copy if it has already been downloaded) using TFDS. To speed up this notebook we will use only 10,000 labeled reviews and 10,000 unlabeled reviews for training, and 10,000 test reviews for evaluation.


In [0]:
train_set, eval_set = tfds.load(
    "imdb_reviews:1.0.0",
    split=["train[:10000]+unsupervised[:10000]", "test[:10000]"],
    shuffle_files=False)

Let's look at a few reviews from the training set:


In [0]:
for tfrecord in train_set.take(5):
  print("Review:", tfrecord["text"].numpy().decode("utf-8")[:300])
  print("Label:", tfrecord["label"].numpy())
  print()

In [0]:
def _dict_to_example(instance):
  """Decoded CSV to tf example."""
  feature = {}
  for key, value in instance.items():
    if value is None:
      feature[key] = tf.train.Feature()
    elif value.dtype == np.integer:
      feature[key] = tf.train.Feature(
          int64_list=tf.train.Int64List(value=value.tolist()))
    elif value.dtype == np.float32:
      feature[key] = tf.train.Feature(
          float_list=tf.train.FloatList(value=value.tolist()))
    else:
      feature[key] = tf.train.Feature(
          bytes_list=tf.train.BytesList(value=value.tolist()))
  return tf.train.Example(features=tf.train.Features(feature=feature))

examples_path = tempfile.mkdtemp(prefix="tfx-data")
train_path = os.path.join(examples_path, "train.tfrecord")
eval_path = os.path.join(examples_path, "eval.tfrecord")

for path, dataset in [(train_path, train_set), (eval_path, eval_set)]:
  with tf.io.TFRecordWriter(path) as writer:
    for example in dataset:
      writer.write(_dict_to_example({
          "label": np.array([example["label"].numpy()]),
          "text": np.array([example["text"].numpy()]),
      }).SerializeToString())

Run TFX Components Interactively


In the cells that follow you will construct TFX components and run each one interactively within the InteractiveContext to obtain ExecutionResult objects. This mirrors the process of an orchestrator running components in a TFX DAG based on when the dependencies for each component are met.


In [0]:
context = InteractiveContext()

The ExampleGen Component

In any ML development process the first step when starting code development is to ingest the training and test datasets. The ExampleGen component brings data into the TFX pipeline.

Create an ExampleGen component and run it.


In [0]:
input_data = external_input(examples_path)

input_config = example_gen_pb2.Input(splits=[
    example_gen_pb2.Input.Split(name='train', pattern='train.tfrecord'),
    example_gen_pb2.Input.Split(name='eval', pattern='eval.tfrecord')])

example_gen = ImportExampleGen(input=input_data, input_config=input_config)

context.run(example_gen)

The component's outputs include 2 artifacts:

  • the training examples (10,000 labeled reviews + 10,000 unlabeled reviews)
  • the eval examples (10,000 labeled reviews)

The IdentifyExamples Custom Component

To use NSL, we will need each instance to have a unique ID. We create a custom component that adds such a unique ID to all instances across all splits.


In [0]:
"""Custom component that adds an ID to each instance.
This component along with other custom component related code will only serve as
an example and will not be supported by TFX team.
"""
class IdentifyExamplesSpec(tfx.types.ComponentSpec):
  """ComponentSpec for the IdentifyExamples custom component."""

  PARAMETERS = {}
  INPUTS = {
      'examples': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      'identified_examples': ChannelParameter(type=standard_artifacts.Examples)
  }


class IdentifyExamplesExecutor(base_executor.BaseExecutor):
  """Executor for IdentifyExamples component."""

  def Do(self, input_dict, output_dict, exec_properties):
      """Add a unique identifier to each example.
      Args:
        input_dict: Input dict from input key to a list of artifacts, including:
          - examples: input examples.
        output_dict: Output dict from key to a list of artifacts, including:
          - examples: output examples with id field.
        exec_properties: A dict of execution properties (unused).
      Returns:
        None
      """
      self._log_startup(input_dict, output_dict, exec_properties)

      next_id = 0
      splits = [artifact.split for artifact in input_dict['examples']]
      for split in splits:
        input_uri = artifact_utils.get_split_uri(
            input_dict['examples'], split)
        output_uri = artifact_utils.get_split_uri(
            output_dict['identified_examples'], split)
        decoder = tfdv.TFExampleDecoder()
        for tfrecord_name in sorted(os.listdir(input_uri)):
          input_path = os.path.join(input_uri, tfrecord_name)
          output_path = os.path.join(output_uri, tfrecord_name)
          with tf.io.TFRecordWriter(output_path,
                                    options="GZIP") as writer:
            for tfrecord in tf.data.TFRecordDataset(input_path,
                                                    compression_type="GZIP"):
                example = decoder.decode(tfrecord.numpy())
                
                example["id"] = np.array([str(next_id).encode("utf-8")])
                writer.write(_dict_to_example(example).SerializeToString())
                next_id += 1

        
class IdentifyExamples(base_component.BaseComponent):
  """Custom component that adds an `id` feature to each example.
  This custom component will add a unique `id` feature to each instance in all
  splits (the ids are unique across all splits).
  """
  SPEC_CLASS = IdentifyExamplesSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(IdentifyExamplesExecutor)

  def __init__(self,
               examples,
               identified_examples=None,
               instance_name=None):
    """Construct a IdentifyExamples component.
    Args:
      examples: A Channel of 'Examples' type.
      identified_examples: A Channel of type 'Examples', with ids.
      instance_name: Optional unique instance name. Necessary if multiple
        components of this class are declared in the same pipeline.
    """
    splits = [artifact.split for artifact in examples.get()]
    identified_examples = identified_examples or tfx.types.Channel(
        type=standard_artifacts.Examples,
        artifacts=[
            standard_artifacts.Examples(split=split)
            for split in splits
        ])
    spec = IdentifyExamplesSpec(
        examples=examples,
        identified_examples=identified_examples)
    super().__init__(spec=spec, instance_name=instance_name)

In [0]:
identify_examples = IdentifyExamples(example_gen.outputs["examples"])
context.run(identify_examples)

The StatisticsGen Component

The StatisticsGen component computes descriptive statistics for your dataset. The statistics that it generates can be visualized for review, and are used for example validation and to infer a schema.

Create a StatisticsGen component and run it.


In [0]:
# Computes statistics over data for visualization and example validation.
statistics_gen = StatisticsGen(
    examples=identify_examples.outputs["identified_examples"]
)
context.run(statistics_gen)

The SchemaGen Component

The SchemaGen component generates a schema for your data based on the statistics from StatisticsGen. It tries to infer the data types of each of your features, and the ranges of legal values for categorical features.

Create a SchemaGen component and run it.


In [0]:
# Generates schema based on statistics files.
infer_schema = SchemaGen(statistics=statistics_gen.outputs['statistics'])
context.run(infer_schema)

The generated artifact is just a schema.pbtxt containing a text representation of a schema_pb2.Schema protobuf:


In [0]:
train_uri = infer_schema.outputs['schema'].get()[0].uri
schema_filename = os.path.join(train_uri, "schema.pbtxt")
schema = tfx.utils.io_utils.parse_pbtxt_file(file_name=schema_filename,
                                             message=schema_pb2.Schema())

It can be visualized using tfdv.display_schema() (we will look at this in more detail in a subsequent lab):


In [0]:
tfdv.display_schema(schema)

The ExampleValidator Component

The ExampleValidator performs anomaly detection, based on the statistics from StatisticsGen and the schema from SchemaGen. It looks for problems such as missing values, values of the wrong type, or categorical values outside of the domain of acceptable values.

Create an ExampleValidator component and run it.


In [0]:
# Performs anomaly detection based on statistics and data schema.
validate_stats = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=infer_schema.outputs['schema'])
context.run(validate_stats)

The SynthesizeGraph Component

Graph construction involves creating embeddings for text samples and then using a similarity function to compare the embeddings.

We will use pretrained Swivel embeddings to create embeddings in the tf.train.Example format for each sample in the input. We will store the resulting embeddings in the TFRecord format along with the sample's ID. This is important and will allow us match sample embeddings with corresponding nodes in the graph later.

Once we have the sample embeddings, we will use them to build a similarity graph, i.e, nodes in this graph will correspond to samples and edges in this graph will correspond to similarity between pairs of nodes.

Neural Structured Learning provides a graph building library to build a graph based on sample embeddings. It uses cosine similarity as the similarity measure to compare embeddings and build edges between them. It also allows us to specify a similarity threshold, which can be used to discard dissimilar edges from the final graph. In the following example, using 0.99 as the similarity threshold, we end up with a graph that has 3,132,362 bi-directional edges.

Note: Graph quality and by extension, embedding quality, are very important for graph regularization. While we use Swivel embeddings in this notebook, using BERT embeddings for instance, will likely capture review semantics more accurately. We encourage users to use embeddings of their choice and as appropriate to their needs.


In [0]:
swivel_url = 'https://tfhub.dev/google/tf2-preview/gnews-swivel-20dim/1'
hub_layer = hub.KerasLayer(swivel_url, input_shape=[], dtype=tf.string)

def create_embedding_example(example):
  """Create tf.Example containing the sample's embedding and its ID."""

  sentence_embedding = hub_layer(example["text"])

  # Flatten the sentence embedding back to 1-D.
  sentence_embedding = tf.reshape(sentence_embedding, shape=[-1])

  return _dict_to_example({
      'id': example["id"],
      'embedding': sentence_embedding.numpy()
  })


def create_dataset(uri):
  tfrecord_filenames = [os.path.join(uri, name)
                        for name in os.listdir(uri)]
  return tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")


def create_embeddings(train_path, output_path):
  dataset = create_dataset(train_path)
  embeddings_path = os.path.join(output_path, "embeddings.tfr")
  decoder = tfdv.TFExampleDecoder()
  with tf.io.TFRecordWriter(embeddings_path) as embed_writer:
    for tfrecord in dataset:
      example = decoder.decode(tfrecord.numpy())
      embedding_example = create_embedding_example(example)
      embed_writer.write(embedding_example.SerializeToString())


def build_graph(output_path, similarity_threshold):
  embeddings_path = os.path.join(output_path, "embeddings.tfr")
  graph_path = os.path.join(output_path, "graph.tfv")
  nsl.tools.build_graph([embeddings_path], graph_path, similarity_threshold)

In [0]:
class SynthesizedGraph(artifact.Artifact):
  """Output artifact of the SynthesizeGraphComponent"""
  TYPE_NAME = 'SynthesizedGraphPath'


"""Example of a custom graph synthesization component.
This component along with other custom component related code will only serve as
an example and will not be supported by TFX team.
"""
class SynthesizeGraphComponentSpec(tfx.types.ComponentSpec):
  """ComponentSpec for custom TFX component to synthesize NSL graph."""

  PARAMETERS = {
      'similarity_threshold': ExecutionParameter(type=float),
  }
  INPUTS = {
      'examples': ChannelParameter(type=standard_artifacts.Examples),
  }
  OUTPUTS = {
      'synthesized_graph': ChannelParameter(type=SynthesizedGraph),
  }


class SynthesizeGraphExecutor(base_executor.BaseExecutor):
  """Executor for custom graph synthesization component."""

  def Do(self, input_dict, output_dict, exec_properties):
      """Synthesizes a graph based on the embedding similarities.
      Args:
        input_dict: Input dict from input key to a list of artifacts, including:
          - examples: transformed examples from the transform component.
        output_dict: Output dict from key to a list of artifacts, including:
          - synthesized_graph: synthesized graph.
        exec_properties: A dict of execution properties, including:
          - similarity_threshold: similarity threshold to create an edge in the
            graph between two instances.
      Returns:
        None
      """
      self._log_startup(input_dict, output_dict, exec_properties)

      splits = [artifact.split for artifact in input_dict['examples']]
      train_input_examples_uri = artifact_utils.get_split_uri(
          input_dict['examples'], 'train')
      output_graph_uri = artifact_utils.get_split_uri(
          output_dict['synthesized_graph'], 'train')
      similarity_threshold = exec_properties["similarity_threshold"]

      print("Creating embeddings...")
      create_embeddings(train_input_examples_uri, output_graph_uri)

      print("Synthesizing graph...")
      build_graph(output_graph_uri, similarity_threshold)


class SynthesizeGraphComponent(base_component.BaseComponent):
  """Custom graph synthesization component.
  This custom component will compute the embeddings for each instance and then
  create a synthesized graph based on the embedding similarities.
  """
  SPEC_CLASS = SynthesizeGraphComponentSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(SynthesizeGraphExecutor)

  def __init__(self,
               examples,
               similarity_threshold,
               synthesized_graph=None,
               instance_name=None):
    """Construct a SynthesizeGraphComponent.
    Args:
      examples: A Channel of 'Examples' type, usually produced by an ExampleGen
        component.
      synthesized_graph: A Channel of 'SynthesizedGraph' type.
      instance_name: Optional unique instance name. Necessary if multiple
        components of this class are declared in the same pipeline.
    """
    synthesized_graph = synthesized_graph or tfx.types.Channel(
        type=SynthesizedGraph,
        artifacts=[SynthesizedGraph(split="train")])
    spec = SynthesizeGraphComponentSpec(
        examples=examples,
        similarity_threshold=similarity_threshold,
        synthesized_graph=synthesized_graph)
    super().__init__(spec=spec, instance_name=instance_name)

In [0]:
synthesize_graph = SynthesizeGraphComponent(
    examples=identify_examples.outputs['identified_examples'],
    similarity_threshold=0.99)
context.run(synthesize_graph)

In [0]:
train_uri = synthesize_graph.outputs["synthesized_graph"].get()[0].uri
os.listdir(train_uri)

In [0]:
graph_path = os.path.join(train_uri, "graph.tfv")
print("node 1\tnode 2\tsimilarity")
!head {graph_path}
print("...")
!tail {graph_path}

In [0]:
!wc -l {graph_path}

The Transform Component

The Transform component performs data transformations and feature engineering. The results include an input TensorFlow graph which is used during both training and serving to preprocess the data before training or inference. This graph becomes part of the SavedModel that is the result of model training. Since the same input graph is used for both training and serving, the preprocessing will always be the same, and only needs to be written once.

The Transform component requires more code than many other components because of the arbitrary complexity of the feature engineering that you may need for the data and/or model that you're working with. It requires code files to be available which define the processing needed.

Each sample will include the following three features:

  1. id: The node ID of the sample.
  2. text_xf: An int64 list containing word IDs.
  3. label_xf: A singleton int64 identifying the target class of the review: 0=negative, 1=positive.

Let's define a module containing the preprocessing_fn() function that we will pass to the Transform component:


In [0]:
_transform_module_file = 'imdb_transform.py'

In [0]:
%%writefile {_transform_module_file}

import tensorflow as tf

import tensorflow_transform as tft

SEQUENCE_LENGTH = 100
VOCAB_SIZE = 10000
OOV_SIZE = 100

def tokenize_reviews(reviews, sequence_length=SEQUENCE_LENGTH):
  reviews = tf.strings.lower(reviews)
  reviews = tf.strings.regex_replace(reviews, r" '| '|^'|'$", " ")
  reviews = tf.strings.regex_replace(reviews, "[^a-z' ]", " ")
  tokens = tf.strings.split(reviews)[:, :sequence_length]
  start_tokens = tf.fill([tf.shape(reviews)[0], 1], "<START>")
  end_tokens = tf.fill([tf.shape(reviews)[0], 1], "<END>")
  tokens = tf.concat([start_tokens, tokens, end_tokens], axis=1)
  tokens = tokens[:, :sequence_length]
  tokens = tokens.to_tensor(default_value="<PAD>")
  pad = sequence_length - tf.shape(tokens)[1]
  tokens = tf.pad(tokens, [[0, 0], [0, pad]], constant_values="<PAD>")
  return tf.reshape(tokens, [-1, sequence_length])

def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

  Args:
    inputs: map from feature keys to raw not-yet-transformed features.

  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  outputs["id"] = inputs["id"]
  tokens = tokenize_reviews(_fill_in_missing(inputs["text"], ''))
  outputs["text_xf"] = tft.compute_and_apply_vocabulary(
      tokens,
      top_k=VOCAB_SIZE,
      num_oov_buckets=OOV_SIZE)
  outputs["label_xf"] = _fill_in_missing(inputs["label"], -1)
  return outputs

def _fill_in_missing(x, default_value):
  """Replace missing values in a SparseTensor.

  Fills in missing values of `x` with the default_value.

  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
    default_value: the value with which to replace the missing values.

  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

Create and run the Transform component, referring to the files that were created above.


In [0]:
# Performs transformations and feature engineering in training and serving.
transform = Transform(
    examples=identify_examples.outputs['identified_examples'],
    schema=infer_schema.outputs['schema'],
    module_file=_transform_module_file)
context.run(transform)

The Transform component has 2 types of outputs:

  • transform_graph is the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models).
  • transformed_examples represents the preprocessed training and evaluation data.

In [0]:
transform.outputs

Take a peek at the transform_graph artifact: it points to a directory containing 3 subdirectories:


In [0]:
train_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(train_uri)

The transform_fn subdirectory contains the actual preprocessing graph. The metadata subdirectory contains the schema of the original data. The transformed_metadata subdirectory contains the schema of the preprocessed data.

Take a look at some of the transformed examples and check that they are indeed processed as intended.


In [0]:
def pprint_examples(artifact, n_examples=3):
  uri = artifact.uri
  tfrecord_filenames = [os.path.join(uri, name) for name in os.listdir(uri)]
  dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
  decoder = tfdv.TFExampleDecoder()
  for tfrecord in dataset.take(n_examples):
    serialized_example = tfrecord.numpy()
    example = decoder.decode(serialized_example)
    pp.pprint(example)

In [0]:
pprint_examples(transform.outputs['transformed_examples'].get()[0])

The GraphAugmentation Component

Since we have the sample features and the synthesized graph, we can generate the augmented training data for Neural Structured Learning. The NSL framework provides a library to combine the graph and the sample features to produce the final training data for graph regularization. The resulting training data will include original sample features as well as features of their corresponding neighbors.

In this tutorial, we consider undirected edges and use a maximum of 3 neighbors per sample to augment training data with graph neighbors.


In [0]:
"""Custom component that augments the training data with graph neighbors.
This component along with other custom component related code will only serve as
an example and will not be supported by TFX team.
"""
class GraphAugmentationSpec(tfx.types.ComponentSpec):
  """ComponentSpec for the GraphAugmentation custom component."""

  PARAMETERS = {
      'num_neighbors': ExecutionParameter(type=int),
  }
  INPUTS = {
      'identified_examples': ChannelParameter(type=standard_artifacts.Examples),
      'synthesized_graph': ChannelParameter(type=SynthesizedGraph),
  }
  OUTPUTS = {
      'augmented_examples': ChannelParameter(type=standard_artifacts.Examples)
  }


def split_train_and_unsup(input_uri):
  tmp_dir = tempfile.mkdtemp(prefix='tfx-data')
  tfrecord_filenames = [os.path.join(input_uri, filename)
                        for filename in os.listdir(input_uri)]
  train_path = os.path.join(tmp_dir, "train.tfrecord")
  unsup_path = os.path.join(tmp_dir, "unsup.tfrecord")
  decoder = tfdv.TFExampleDecoder()
  with tf.io.TFRecordWriter(train_path) as train_writer, \
       tf.io.TFRecordWriter(unsup_path) as unsup_writer:
    for tfrecord in tf.data.TFRecordDataset(tfrecord_filenames,
                                            compression_type="GZIP"):
      example = decoder.decode(tfrecord.numpy())
      if ("label_xf" not in example or example["label_xf"][0] == -1):
        writer = unsup_writer
      else:
        writer = train_writer
      writer.write(tfrecord.numpy())
  return train_path, unsup_path


def gzip(filepath):
  with open(filepath, 'rb') as f_in:
      with gzip_lib.open(filepath + '.gz', 'wb') as f_out:
          shutil.copyfileobj(f_in, f_out)
  os.remove(filepath)


def copy_tfrecords(input_uri, output_uri):
  for filename in os.listdir(input_uri):
    input_filename = os.path.join(input_uri, filename)
    output_filename = os.path.join(output_uri, filename)
    shutil.copyfile(input_filename, output_filename)


class GraphAugmentationExecutor(base_executor.BaseExecutor):
  """Executor for GraphAugmentation component."""

  def Do(self, input_dict, output_dict, exec_properties):
      """Augment the training data with graph neighbors.
      Args:
        input_dict: Input dict from input key to a list of artifacts, including:
          - identified_examples: input examples with identifiers.
          - synthesized_graph: a SynthesizedGraph to find the neighbors.
        output_dict: Output dict from key to a list of artifacts, including:
          - augmented_examples: output examples, including neighbors.
        exec_properties: A dict of execution properties, including:
          - num_neighbors: how many neighbors to add to each instance.
      Returns:
        None
      """
      self._log_startup(input_dict, output_dict, exec_properties)

      splits = [artifact.split for artifact in input_dict['identified_examples']]
      train_input_uri = artifact_utils.get_split_uri(
          input_dict['identified_examples'], "train")
      eval_input_uri = artifact_utils.get_split_uri(
          input_dict['identified_examples'], "eval")
      train_graph_uri = artifact_utils.get_split_uri(
          input_dict['synthesized_graph'], "train")
      train_output_uri = artifact_utils.get_split_uri(
          output_dict['augmented_examples'], "train")
      eval_output_uri = artifact_utils.get_split_uri(
          output_dict['augmented_examples'], "eval")
      num_neighbors = exec_properties["num_neighbors"]

      train_path, unsup_path = split_train_and_unsup(train_input_uri)

      output_path = os.path.join(train_output_uri, 'nsl_train_data.tfr')
      pack_nbrs_args = dict(
          labeled_examples_path=train_path,
          unlabeled_examples_path=unsup_path,
          graph_path=os.path.join(train_graph_uri, 'graph.tfv'),
          output_training_data_path=output_path,
          add_undirected_edges=True,
          max_nbrs=num_neighbors)
      print("nsl.tools.pack_nbrs arguments:", pack_nbrs_args)
      nsl.tools.pack_nbrs(**pack_nbrs_args)
      gzip(output_path) # downstream components expect gzip'ed TFRecords

      copy_tfrecords(eval_input_uri, eval_output_uri)


class GraphAugmentation(base_component.BaseComponent):
  """Custom component that augments the training data with graph neighbors."""
  SPEC_CLASS = GraphAugmentationSpec
  EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(GraphAugmentationExecutor)

  def __init__(self,
               identified_examples,
               synthesized_graph,
               num_neighbors,
               augmented_examples=None,
               instance_name=None):
    """Construct a GraphAugmentation component.
    Args:
      identified_examples: A Channel of 'Examples' type.
      synthesized_graph: A Channel of 'SynthesizedGraph' type.
      augmented_examples: A Channel of type 'Examples', including neighbors.
      instance_name: Optional unique instance name. Necessary if multiple
        components of this class are declared in the same pipeline.
    """
    splits = [artifact.split for artifact in identified_examples.get()]
    augmented_examples = augmented_examples or tfx.types.Channel(
        type=standard_artifacts.Examples,
        artifacts=[standard_artifacts.Examples(split=split)
                   for split in ["train", "eval"]])
    spec = GraphAugmentationSpec(
        identified_examples=identified_examples,
        synthesized_graph=synthesized_graph,
        num_neighbors=num_neighbors,
        augmented_examples=augmented_examples)
    super().__init__(spec=spec, instance_name=instance_name)

In [0]:
# Augments training data with graph neighbors.
graph_augmentation = GraphAugmentation(
    identified_examples=transform.outputs['transformed_examples'],
    synthesized_graph=synthesize_graph.outputs['synthesized_graph'],
    num_neighbors=3
)
context.run(graph_augmentation, enable_cache=False)

In [0]:
pprint_examples(graph_augmentation.outputs['augmented_examples'].get()[0], 6)

The Trainer Component

The Trainer component trains models using TensorFlow.

Create a Python module containing a trainer_fn function, which must return an estimator. If you prefer creating a Keras model, you can do so and then convert it to an estimator using keras.model_to_estimator().


In [0]:
# Setup paths.
_trainer_module_file = 'imdb_trainer.py'

In [0]:
%%writefile {_trainer_module_file}

import neural_structured_learning as nsl

import tensorflow as tf

import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils


NBR_FEATURE_PREFIX = 'NL_nbr_'
NBR_WEIGHT_SUFFIX = '_weight'
LABEL_KEY = 'label'

def _transformed_name(key):
  return key + '_xf'


def _transformed_names(keys):
  return [_transformed_name(key) for key in keys]


# Hyperparameters:
#
# We will use an instance of `HParams` to inclue various hyperparameters and
# constants used for training and evaluation. We briefly describe each of them
# below:
#
# -   max_seq_length: This is the maximum number of words considered from each
#                     movie review in this example.
# -   vocab_size: This is the size of the vocabulary considered for this
#                 example.
# -   oov_size: This is the out-of-vocabulary size considered for this example.
# -   distance_type: This is the distance metric used to regularize the sample
#                    with its neighbors.
# -   graph_regularization_multiplier: This controls the relative weight of the
#                                      graph regularization term in the overall
#                                      loss function.
# -   num_neighbors: The number of neighbors used for graph regularization.
#                    This value has to be less than or equal to the `max_nbrs`
#                    argument used above in the Transform Component when
#                    invoking `nsl.tools.pack_nbrs`.
# -   num_fc_units: The number of units in the fully connected layer of the
#                   neural network.
class HParams(object):
  """Hyperparameters used for training."""
  def __init__(self):
    ### dataset parameters
    # The following 3 values should match those defined in the Transform
    # Component.
    self.max_seq_length = 100
    self.vocab_size = 10000
    self.oov_size = 100
    ### neural graph learning parameters
    self.distance_type = nsl.configs.DistanceType.L2
    self.graph_regularization_multiplier = 0.1
    # The following value has to be at most the value of 'num_neighbors'
    # in the Transform Component.
    self.num_neighbors = 1
    ### model architecture
    self.num_embedding_dims = 16
    self.num_lstm_dims = 64
    self.num_fc_units = 64

HPARAMS = HParams()


def optimizer_fn():
  """Returns an instance of `tf.Optimizer`."""
  return tf.compat.v1.train.RMSPropOptimizer(
    learning_rate=0.0001, decay=1e-6)


def build_train_op(loss, global_step):
  """Builds a train op to optimize the given loss using gradient descent."""
  with tf.name_scope('train'):
    optimizer = optimizer_fn()
    train_op = optimizer.minimize(loss=loss, global_step=global_step)
  return train_op


# Building the model:
#
# A neural network is created by stacking layers—this requires two main
# architectural decisions:
# * How many layers to use in the model?
# * How many *hidden units* to use for each layer?
#
# In this example, the input data consists of an array of word-indices. The
# labels to predict are either 0 or 1. We will use a feed-forward neural network
# as our base model in this tutorial to reduce the amount of time required to
# train the model. A bidirectional LSTM model is also defined below as an
# alternative model architecture.

def bilstm_model(features, is_training, reuse=tf.compat.v1.AUTO_REUSE):
  """Builds a bi-directional LSTM model.

  Args:
    features: A dictionary containing batch features returned from the
      `input_fn`, that include sample features, corresponding neighbor features,
      and neighbor weights.
    is_training: a Python Boolean value or a Boolean scalar Tensor, indicating
      whether to apply dropout.
    reuse: a Python Boolean value for reusing variable scope.

  Returns:
    logits: Tensor of shape [batch_size, 1].
    representations: Tensor of shape [batch_size, _] for graph regularization.
      This is the representation of each example at the graph regularization
      layer.
  """  
  with tf.compat.v1.variable_scope('bilstm', reuse=reuse):
    inputs = features[_transformed_name('text')]  
    embedding_layer = tf.keras.layers.Embedding(HPARAMS.vocab_size + HPARAMS.oov_size,
                                                HPARAMS.num_embedding_dims)(
                                                    inputs)
    lstm_layer = tf.keras.layers.Bidirectional(
        tf.keras.layers.LSTM(HPARAMS.num_lstm_dims))(
            embedding_layer)
    dense_layer = tf.keras.layers.Dense(
        HPARAMS.num_fc_units, activation='relu')(
            lstm_layer)    

    output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(dense_layer)

    # Graph regularization will be done on the penultimate (dense) layer
    # because the output layer is a single floating point number.
    return output_layer, dense_layer
  
  
def feed_forward_model(features, is_training, reuse=tf.compat.v1.AUTO_REUSE):
  """Builds a simple 2 layer feed forward neural network.

  The layers are effectively stacked sequentially to build the classifier. The
  first layer is an Embedding layer, which takes the integer-encoded vocabulary
  and looks up the embedding vector for each word-index. These vectors are
  learned as the model trains. The vectors add a dimension to the output array.
  The resulting dimensions are: (batch, sequence, embedding). Next is a global
  average pooling 1D layer, which reduces the dimensionality of its inputs from
  3D to 2D. This fixed-length output vector is piped through a fully-connected
  (Dense) layer with 16 hidden units. The last layer is densely connected with a
  single output node. Using the sigmoid activation function, this value is a
  float between 0 and 1, representing a probability, or confidence level.  

  Args:
    features: A dictionary containing batch features returned from the
      `input_fn`, that include sample features, corresponding neighbor features,
      and neighbor weights.
    is_training: a Python Boolean value or a Boolean scalar Tensor, indicating
      whether to apply dropout.
    reuse: a Python Boolean value for reusing variable scope.

  Returns:
    logits: Tensor of shape [batch_size, 1].
    representations: Tensor of shape [batch_size, _] for graph regularization.
      This is the representation of each example at the graph regularization
      layer.
  """  
  with tf.compat.v1.variable_scope('ff', reuse=reuse):
    inputs = features[_transformed_name('text')]  
    embedding_layer = tf.keras.layers.Embedding(HPARAMS.vocab_size + HPARAMS.oov_size,
                                                HPARAMS.num_embedding_dims)(
                                                    inputs)
    pooling_layer = tf.keras.layers.GlobalAveragePooling1D()(embedding_layer)
    dense_layer = tf.keras.layers.Dense(16, activation='relu')(pooling_layer)

    output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(dense_layer)

    # Graph regularization will be done on the penultimate (dense) layer
    # because the output layer is a single floating point number.
    return output_layer, dense_layer


# A note on hidden units:
#
# The above model has two intermediate or "hidden" layers, between the input and
# output, and excluding the Embedding layer. The number of outputs (units,
# nodes, or neurons) is the dimension of the representational space for the
# layer. In other words, the amount of freedom the network is allowed when
# learning an internal representation. If a model has more hidden units
# (a higher-dimensional representation space), and/or more layers, then the
# network can learn more complex representations. However, it makes the network
# more computationally expensive and may lead to learning unwanted
# patterns—patterns that improve performance on training data but not on the
# test data. This is called overfitting.


# This function will be used to generate the embeddings for samples and their
# corresponding neighbors, which will then be used for graph regularization.
def embedding_fn(features, mode):
  """Returns the embedding corresponding to the given features.

  Args:
    features: A dictionary containing batch features returned from the
      `input_fn`, that include sample features, corresponding neighbor features,
      and neighbor weights.
    mode: Specifies if this is training, evaluation, or prediction. See
      tf.estimator.ModeKeys.

  Returns:
    The embedding that will be used for graph regularization.
  """
  is_training = (mode == tf.estimator.ModeKeys.TRAIN)
  _, embedding = feed_forward_model(features, is_training)
  return embedding


def feed_forward_model_fn(features, labels, mode, params, config):
  """Implementation of the model_fn for the base feed-forward model.

  Args:
    features: This is the first item returned from the `input_fn` passed to
      `train`, `evaluate`, and `predict`. This should be a single `Tensor` or
      `dict` of same.
    labels: This is the second item returned from the `input_fn` passed to
      `train`, `evaluate`, and `predict`. This should be a single `Tensor` or
      `dict` of same (for multi-head models). If mode is `ModeKeys.PREDICT`,
      `labels=None` will be passed. If the `model_fn`'s signature does not
      accept `mode`, the `model_fn` must still be able to handle `labels=None`.
    mode: Optional. Specifies if this training, evaluation or prediction. See
      `ModeKeys`.
    params: An HParams instance as returned by get_hyper_parameters().
    config: Optional configuration object. Will receive what is passed to
      Estimator in `config` parameter, or the default `config`. Allows updating
      things in your model_fn based on configuration such as `num_ps_replicas`,
      or `model_dir`. Unused currently.

  Returns:
     A `tf.estimator.EstimatorSpec` for the base feed-forward model. This does
     not include graph-based regularization.
  """

  is_training = mode == tf.estimator.ModeKeys.TRAIN

  # Build the computation graph.
  probabilities, _ = feed_forward_model(features, is_training)
  predictions = tf.round(probabilities)

  if mode == tf.estimator.ModeKeys.PREDICT:
    # labels will be None, and no loss to compute.
    cross_entropy_loss = None
    eval_metric_ops = None
  else:
    # Loss is required in train and eval modes.
    # Flatten 'probabilities' to 1-D.
    probabilities = tf.reshape(probabilities, shape=[-1])
    cross_entropy_loss = tf.compat.v1.keras.losses.binary_crossentropy(
        labels, probabilities)
    eval_metric_ops = {'accuracy': tf.compat.v1.metrics.accuracy(labels,
                                                                 predictions)}

  if is_training:
    global_step = tf.compat.v1.train.get_or_create_global_step()
    train_op = build_train_op(cross_entropy_loss, global_step)
  else:
    train_op = None

  return tf.estimator.EstimatorSpec(
      mode=mode,
      predictions={'predictions': predictions},
      loss=cross_entropy_loss,
      train_op=train_op,
      eval_metric_ops=eval_metric_ops)


# Tf.Transform considers these features as "raw"
def _get_raw_feature_spec(schema):
  return schema_utils.schema_as_feature_spec(schema).feature_spec


def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')


def _augment_tensors(tensor_dict, num_neighbors):
  """Augments `tensor_dict` to include neighbor feature tensors.
    Args:
      tensor_dict: Dictionary of feature keys mapping to tensors.
      num_neighbors: Number of neighbors to use for feature key augmentation.
    Returns:
      An augmented `tensor_dict` that includes neighbor feature tensors.
  """
  for i in range(num_neighbors):
    tensor_dict['{}{}_{}'.format(NBR_FEATURE_PREFIX, i, 'id')] = \
        tensor_dict['id']
    tensor_dict['{}{}_{}'.format(NBR_FEATURE_PREFIX, i, 'text_xf')] = \
        tensor_dict['text_xf']

  # Set the neighbor weight feature tensors (they have to be rank 2).
  for i in range(num_neighbors):
    tensor_dict['{}{}{}'.format(NBR_FEATURE_PREFIX, i, NBR_WEIGHT_SUFFIX)] = \
        tf.fill([tf.shape(tensor_dict['id'])[0], 1], 0.)

  return tensor_dict


def _example_serving_receiver_fn(tf_transform_output, schema):
  """Build the serving in inputs.

  Args:
    tf_transform_output: A TFTransformOutput.
    schema: the schema of the input data.

  Returns:
    Tensorflow graph which parses examples, applying tf-transform to them.
  """
  raw_feature_spec = _get_raw_feature_spec(schema)
  raw_feature_spec.pop(LABEL_KEY)

  raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
      raw_feature_spec, default_batch_size=None)
  serving_input_receiver = raw_input_fn()

  transformed_features = tf_transform_output.transform_raw_features(
      serving_input_receiver.features)
  
  # Currently, NSL always expects neighbor features. So, during serving,
  # we just create dummy values for neighbor features. This will not be
  # necessary once NSL restricts that expectation to the training mode.
  
  # Even though, LABEL_KEY was removed from 'raw_feature_spec', the transform
  # operation would have injected the transformed LABEL_KEY feature with a
  # default value. So, remove it before augmenting the feature tensors.
  transformed_features.pop(_transformed_name(LABEL_KEY))
  transformed_features = _augment_tensors(transformed_features,
                                          HPARAMS.num_neighbors)

  return tf.estimator.export.ServingInputReceiver(
      transformed_features, serving_input_receiver.receiver_tensors)


def _eval_input_receiver_fn(tf_transform_output, schema):
  """Build everything needed for the tf-model-analysis to run the model.

  Args:
    tf_transform_output: A TFTransformOutput.
    schema: the schema of the input data.

  Returns:
    EvalInputReceiver function, which contains:
      - Tensorflow graph which parses raw untransformed features, applies the
        tf-transform preprocessing operators.
      - Set of raw, untransformed features.
      - Label against which predictions will be compared.
  """
  # Notice that the inputs are raw features, not transformed features here.
  raw_feature_spec = _get_raw_feature_spec(schema)
  raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
      raw_feature_spec, default_batch_size=None)
  serving_input_receiver = raw_input_fn()

  features = serving_input_receiver.features.copy()
  transformed_features = tf_transform_output.transform_raw_features(features)
  
  # NOTE: Model is driven by transformed features (since training works on the
  # materialized output of TFT, but slicing will happen on raw features.
  # features.update(transformed_features)

  # Currently, NSL always expects neighbor features. So, during model analysis
  # (eval), we just create dummy values for neighbor features. This will not be
  # necessary once NSL restricts that expectation to the training mode.
  features.pop(LABEL_KEY)
  features = _augment_tensors(transformed_features, HPARAMS.num_neighbors)

  # At this point, 'features' and 'transformed_features' refer to the same
  # object.
  labels = features.pop(_transformed_name(LABEL_KEY))
  return tfma.export.EvalInputReceiver(
      features=features,
      receiver_tensors=serving_input_receiver.receiver_tensors,
      labels=labels)


def _augment_feature_spec(feature_spec, num_neighbors):
  """Augments `feature_spec` to include neighbor features.
    Args:
      feature_spec: Dictionary of feature keys mapping to TF feature types.
      num_neighbors: Number of neighbors to use for feature key augmentation.
    Returns:
      An augmented `feature_spec` that includes neighbor feature keys.
  """
  for i in range(num_neighbors):
    feature_spec['{}{}_{}'.format(NBR_FEATURE_PREFIX, i, 'id')] = \
        tf.io.VarLenFeature(dtype=tf.string)
    # We don't care about the neighbor features corresponding to
    # _transformed_name(LABEL_KEY) because the LABEL_KEY feature will be
    # removed from the feature spec during training/evaluation.
    feature_spec['{}{}_{}'.format(NBR_FEATURE_PREFIX, i, 'text_xf')] = \
        tf.io.FixedLenFeature(shape=[HPARAMS.max_seq_length], dtype=tf.int64,
                              default_value=tf.constant(0, dtype=tf.int64,
                                                        shape=[HPARAMS.max_seq_length]))
    # The 'NL_num_nbrs' features is currently not used.

  # Set the neighbor weight feature keys.    
  for i in range(num_neighbors):
    feature_spec['{}{}{}'.format(NBR_FEATURE_PREFIX, i, NBR_WEIGHT_SUFFIX)] = \
        tf.io.FixedLenFeature(shape=[1], dtype=tf.float32, default_value=[0.0])
  
  return feature_spec
  

def _input_fn(filenames, tf_transform_output, is_training, batch_size=200):
  """Generates features and labels for training or evaluation.

  Args:
    filenames: [str] list of CSV files to read data from.
    tf_transform_output: A TFTransformOutput.
    is_training: Boolean indicating if we are in training mode.
    batch_size: int First dimension size of the Tensors returned by input_fn

  Returns:
    A (features, indices) tuple where features is a dictionary of
      Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())
  
  # During training, NSL uses augmented training data (which includes features
  # from graph neighbors). So, update the feature spec accordingly. This needs
  # to be done because we are using different schemas for NSL training and eval,
  # but the Trainer Component only accepts a single schema.
  
  # Currently, NSL always expects neighbor features and not just during
  # training. So, during evaluation, we just create dummy values for neighbor
  # features. This will not be necessary once NSL restricts that expectation to
  # the training mode.
  transformed_feature_spec =_augment_feature_spec(transformed_feature_spec,
                                                  HPARAMS.num_neighbors)

  dataset = tf.data.experimental.make_batched_features_dataset(
      filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn)

  transformed_features = dataset.make_one_shot_iterator().get_next()
  # We pop the label because we do not want to use it as a feature while we're
  # training.
  return transformed_features, transformed_features.pop(
      _transformed_name(LABEL_KEY))


# TFX will call this function
def trainer_fn(hparams, schema):
  """Build the estimator using the high level API.
  Args:
    hparams: Holds hyperparameters used to train the model as name/value pairs.
    schema: Holds the schema of the training examples.
  Returns:
    A dict of the following:
      - estimator: The estimator that will be used for training and eval.
      - train_spec: Spec for training.
      - eval_spec: Spec for eval.
      - eval_input_receiver_fn: Input function for eval.
  """
  train_batch_size = 40
  eval_batch_size = 40

  tf_transform_output = tft.TFTransformOutput(hparams.transform_output)

  train_input_fn = lambda: _input_fn(
      hparams.train_files,
      tf_transform_output,
      is_training=True,
      batch_size=train_batch_size)

  eval_input_fn = lambda: _input_fn(
      hparams.eval_files,
      tf_transform_output,
      is_training=False,
      batch_size=eval_batch_size)

  train_spec = tf.estimator.TrainSpec(
      train_input_fn,
      max_steps=hparams.train_steps)

  serving_receiver_fn = lambda: _example_serving_receiver_fn(
      tf_transform_output, schema)

  exporter = tf.estimator.FinalExporter('imdb', serving_receiver_fn)
  eval_spec = tf.estimator.EvalSpec(
      eval_input_fn,
      steps=hparams.eval_steps,
      exporters=[exporter],
      name='imdb-eval')

  run_config = tf.estimator.RunConfig(
      save_checkpoints_steps=999, keep_checkpoint_max=1)

  run_config = run_config.replace(model_dir=hparams.serving_model_dir)

  estimator = tf.estimator.Estimator(
      model_fn=feed_forward_model_fn, config=run_config, params=HPARAMS)
  
  # Create a graph regularization config.
  graph_reg_config = nsl.configs.make_graph_reg_config(
      max_neighbors=HPARAMS.num_neighbors,
      multiplier=HPARAMS.graph_regularization_multiplier,
      distance_type=HPARAMS.distance_type,
      sum_over_axis=-1)
  
  # Invoke the Graph Regularization Estimator wrapper to incorporate
  # graph-based regularization for training.
  graph_nsl_estimator = nsl.estimator.add_graph_regularization(
      estimator,
      embedding_fn,
      optimizer_fn=optimizer_fn,
      graph_reg_config=graph_reg_config)

  # Create an input receiver for TFMA processing
  receiver_fn = lambda: _eval_input_receiver_fn(
      tf_transform_output, schema)

  return {
      'estimator': graph_nsl_estimator,
      'train_spec': train_spec,
      'eval_spec': eval_spec,
      'eval_input_receiver_fn': receiver_fn
  }

Create and run the Trainer component, passing it the file that we created above.


In [0]:
# Uses user-provided Python function that implements a model using TensorFlow's
# Estimators API.
trainer = Trainer(
    module_file=_trainer_module_file,
    transformed_examples=graph_augmentation.outputs['augmented_examples'],
    schema=infer_schema.outputs['schema'],
    transform_graph=transform.outputs['transform_graph'],
    train_args=trainer_pb2.TrainArgs(num_steps=10000),
    eval_args=trainer_pb2.EvalArgs(num_steps=5000))
context.run(trainer)

Take a peek at the trained model which was exported from Trainer.


In [0]:
train_uri = trainer.outputs['model'].get()[0].uri
serving_model_path = os.path.join(train_uri, 'serving_model_dir', 'export', 'imdb')
latest_serving_model_path = os.path.join(serving_model_path, max(os.listdir(serving_model_path)))
exported_model = tf.saved_model.load(latest_serving_model_path)

In [0]:
exported_model.graph.get_operations()[:10] + ["..."]

Conclusion

We have demonstrated the use of graph regularization using the Neural Structured Learning (NSL) framework in a TFX pipeline even when the input does not contain an explicit graph. We considered the task of sentiment classification of IMDB movie reviews for which we synthesized a similarity graph based on review embeddings. We encourage users to experiment further by using different embeddings for graph construction, varying hyperparameters, changing the amount of supervision, and by defining different model architectures.