Step 4: Feature Engineering

Use the code below to run TensorFlow Transform on some example data using the schema from your pipeline. Start by importing and opening the metadata store.


In [ ]:
from __future__ import print_function

import os
import tempfile
import pandas as pd

import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform import beam as tft_beam
import tfx_utils
from tfx.utils import io_utils
from tensorflow_metadata.proto.v0 import schema_pb2

# For DatasetMetadata boilerplate
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import schema_utils
tf.get_logger().propagate = False

def _make_default_sqlite_uri(pipeline_name):
    return os.path.join(os.environ['HOME'], 'airflow/tfx/metadata', pipeline_name, 'metadata.db')

def get_metadata_store(pipeline_name):
    return tfx_utils.TFXReadonlyMetadataStore.from_sqlite_db(_make_default_sqlite_uri(pipeline_name))

pipeline_name = 'taxi'

pipeline_db_path = _make_default_sqlite_uri(pipeline_name)
print('Pipeline DB:\n{}'.format(pipeline_db_path))

store = get_metadata_store(pipeline_name)

Get the schema URI from the metadata store


In [ ]:
# Get the schema URI from the metadata store
schemas = store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.SCHEMA)
assert len(schemas.URI) == 1
schema_uri = schemas.URI.iloc[0] + 'schema.pbtxt'
print ('Schema URI:\n{}'.format(schema_uri))

Get the schema that was inferred by TensorFlow Data Validation


In [ ]:
schema_proto = io_utils.parse_pbtxt_file(file_name=schema_uri, message=schema_pb2.Schema())
feature_spec, domains = schema_utils.schema_as_feature_spec(schema_proto)
legacy_metadata = dataset_metadata.DatasetMetadata(dataset_schema.from_feature_spec(feature_spec, domains))

Define features and create functions for TensorFlow Transform


In [ ]:
# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]

CATEGORICAL_FEATURE_KEYS = [
    'trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area'
]

DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']

# Number of buckets used by tf.transform for encoding each feature.
FEATURE_BUCKET_COUNT = 10

BUCKET_FEATURE_KEYS = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
    'dropoff_longitude'
]

# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
VOCAB_SIZE = 1000

# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
OOV_SIZE = 10

VOCAB_FEATURE_KEYS = [
    'payment_type',
    'company',
]

# Keys
LABEL_KEY = 'tips'
FARE_KEY = 'fare'

def transformed_name(key):
  return key + '_xf'

def _transformed_names(keys):
  return [transformed_name(key) for key in keys]


# Tf.Transform considers these features as "raw"
def _get_raw_feature_spec(schema):
  return schema_utils.schema_as_feature_spec(schema).feature_spec


def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')


def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.
  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)


def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.
  Args:
    inputs: map from feature keys to raw not-yet-transformed features.
  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in DENSE_FLOAT_FEATURE_KEYS:
    # Preserve this feature as a dense float, setting nan's to the mean.
    outputs[transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        top_k=VOCAB_SIZE,
        num_oov_buckets=OOV_SIZE)

  for key in BUCKET_FEATURE_KEYS:
    outputs[transformed_name(key)] = tft.bucketize(
        _fill_in_missing(inputs[key]), FEATURE_BUCKET_COUNT,
        always_return_num_quantiles=False)

  for key in CATEGORICAL_FEATURE_KEYS:
    outputs[transformed_name(key)] = _fill_in_missing(inputs[key])

  # Was this passenger a big tipper?
  taxi_fare = _fill_in_missing(inputs[FARE_KEY])
  tips = _fill_in_missing(inputs[LABEL_KEY])
  outputs[transformed_name(LABEL_KEY)] = tf.where(
      tf.math.is_nan(taxi_fare),
      tf.cast(tf.zeros_like(taxi_fare), tf.int64),
      # Test if the tip was > 20% of the fare.
      tf.cast(
          tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))

  return outputs

Display the results of transforming some example data


In [ ]:
from IPython.display import display
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
    raw_examples = [
        {
            "fare": [100.0],
            "trip_start_hour": [12],
            "pickup_census_tract": ['abcd'],
            "dropoff_census_tract": [12345.0],  # No idea why this is a float
            "company": ['taxi inc.'],
            "trip_start_timestamp": [123456],
            "pickup_longitude": [12.0],
            "trip_start_month": [5],
            "trip_miles": [8.0],
            "dropoff_longitude": [12.05],
            "dropoff_community_area": [123],
            "pickup_community_area": [123],
            "payment_type": ['visa'],
            "trip_seconds": [600.0],
            "trip_start_day": [12],
            "tips": [10.0],
            "pickup_latitude": [80.0],
            "dropoff_latitude": [80.01],
        }
    ]
    (transformed_examples, transformed_metadata), transform_fn = (
        (raw_examples, legacy_metadata)
        | 'AnalyzeAndTransform' >> tft_beam.AnalyzeAndTransformDataset(
            preprocessing_fn))
    display(pd.DataFrame(transformed_examples))