Copyright © 2020 Google Inc.

In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Preprocess data with TensorFlow Transform

The Feature Engineering Component of TensorFlow Extended (TFX)

This example colab notebook provides a very simple example of how TensorFlow Transform (tf.Transform) can be used to preprocess data using exactly the same code for both training a model and serving inferences in production.

TensorFlow Transform is a library for preprocessing input data for TensorFlow, including creating features that require a full pass over the training dataset. For example, using TensorFlow Transform you could:

  • Normalize an input value by using the mean and standard deviation
  • Convert strings to integers by generating a vocabulary over all of the input values
  • Convert floats to integers by assigning them to buckets, based on the observed data distribution

TensorFlow has built-in support for manipulations on a single example or a batch of examples. tf.Transform extends these capabilities to support full passes over the entire training dataset.

The output of tf.Transform is exported as a TensorFlow graph which you can use for both training and serving. Using the same graph for both training and serving can prevent skew, since the same transformations are applied in both stages.

Python check and imports

First, we'll make sure that we're using Python 3. Then, we'll go ahead and install and import the stuff we need.


In [0]:
import sys

# Confirm that we're using Python 3
assert sys.version_info.major is 3, 'Oops, not running Python 3. Use Runtime > Change runtime type'

In [0]:
import argparse
import os
import pprint
import tempfile
import urllib.request
import zipfile

print("Installing dependencies for Colab environment")
!pip install -Uq grpcio==1.26.0

import tensorflow as tf

print('Installing Apache Beam')
!pip install -Uq apache_beam==2.16.0
import apache_beam as beam

print('Installing TensorFlow Transform')
!pip install -Uq tensorflow-transform==0.15.0
import tensorflow_transform as tft

import apache_beam.io.iobase
import tensorflow_transform.beam as tft_beam
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema

Data: Create some dummy data

We'll create some simple dummy data for our simple example:

  • raw_data is the initial raw data that we're going to preprocess
  • raw_data_metadata contains the schema that tells us the types of each of the columns in raw_data. In this case, it's very simple.

In [0]:
raw_data = [
      {'x': 1, 'y': 1, 's': 'hello'},
      {'x': 2, 'y': 2, 's': 'world'},
      {'x': 3, 'y': 3, 's': 'hello'}
  ]

raw_data_metadata = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec({
        'y': tf.io.FixedLenFeature([], tf.float32),
        'x': tf.io.FixedLenFeature([], tf.float32),
        's': tf.io.FixedLenFeature([], tf.string),
    }))

Transform: Create a preprocessing function

The preprocessing function is the most important concept of tf.Transform. A preprocessing function is where the transformation of the dataset really happens. It accepts and returns a dictionary of tensors, where a tensor means a Tensor or SparseTensor. There are two main groups of API calls that typically form the heart of a preprocessing function:

  1. TensorFlow Ops: Any function that accepts and returns tensors, which usually means TensorFlow ops. These add TensorFlow operations to the graph that transforms raw data into transformed data one feature vector at a time. These will run for every example, during both training and serving.
  2. TensorFlow Transform Analyzers: Any of the analyzers provided by tf.Transform. Analyzers also accept and return tensors, but unlike TensorFlow ops they only run once, during training, and typically make a full pass over the entire training dataset. They create tensor constants, which are added to your graph. For example, tft.min computes the minimum of a tensor over the training dataset. tf.Transform provides a fixed set of analyzers, but this will be extended in future versions.

Caution: When you apply your preprocessing function to serving inferences, the constants that were created by analyzers during training do not change. If your data has trend or seasonality components, plan accordingly.


In [0]:
def preprocessing_fn(inputs):
    """Preprocess input columns into transformed columns."""
    x = inputs['x']
    y = inputs['y']
    s = inputs['s']
    x_centered = x - tft.mean(x)
    y_normalized = tft.scale_to_0_1(y)
    s_integerized = tft.compute_and_apply_vocabulary(s)
    x_centered_times_y_normalized = (x_centered * y_normalized)
    return {
        'x_centered': x_centered,
        'y_normalized': y_normalized,
        's_integerized': s_integerized,
        'x_centered_times_y_normalized': x_centered_times_y_normalized,
    }

Putting it all together

Now we're ready to transform our data. We'll use Apache Beam with a direct runner, and supply three inputs:

  1. raw_data - The raw input data that we created above
  2. raw_data_metadata - The schema for the raw data
  3. preprocessing_fn - The function that we created to do our transformation

In [0]:
def main():
  # Ignore the warnings
  with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    transformed_dataset, transform_fn = (  # pylint: disable=unused-variable
        (raw_data, raw_data_metadata) | tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn))

  transformed_data, transformed_metadata = transformed_dataset  # pylint: disable=unused-variable

  print('\nRaw data:\n{}\n'.format(pprint.pformat(raw_data)))
  print('Transformed data:\n{}'.format(pprint.pformat(transformed_data)))

if __name__ == '__main__':
  main()

Is this the right answer?

Previously, we used tf.Transform to do this:

x_centered = x - tft.mean(x)
y_normalized = tft.scale_to_0_1(y)
s_integerized = tft.compute_and_apply_vocabulary(s)
x_centered_times_y_normalized = (x_centered * y_normalized)

x_centered

With input of [1, 2, 3] the mean of x is 2, and we subtract it from x to center our x values at 0. So our result of [-1.0, 0.0, 1.0] is correct.

y_normalized

We wanted to scale our y values between 0 and 1. Our input was [1, 2, 3] so our result of [0.0, 0.5, 1.0] is correct.

s_integerized

We wanted to map our strings to indexes in a vocabulary, and there were only 2 words in our vocabulary ("hello" and "world"). So with input of ["hello", "world", "hello"] our result of [0, 1, 0] is correct.

x_centered_times_y_normalized

We wanted to create a new feature by crossing x_centered and y_normalized using multiplication. Note that this multiplies the results, not the original values, and our new result of [-0.0, 0.0, 1.0] is correct.