ML with TensorFlow Extended (TFX) -- Part 2

The puprpose of this tutorial is to show how to do end-to-end ML with TFX libraries on Google Cloud Platform. This tutorial covers:

  1. Data analysis and schema generation with TF Data Validation.
  2. Data preprocessing with TF Transform.
  3. Model training with TF Estimator.
  4. Model evaluation with TF Model Analysis.

This notebook has been tested in Jupyter on the Deep Learning VM.

Cloud environment


In [ ]:
import apache_beam as beam
import tensorflow as tf
import tensorflow_data_validation as tfdv
import tensorflow_transform as tft

print('TF version: {}'.format(tf.__version__))
print('TFT version: {}'.format(tft.__version__))
print('TFDV version: {}'.format(tfdv.__version__))
print('Apache Beam version: {}'.format(beam.__version__))

In [ ]:
PROJECT = 'cloud-training-demos'    # Replace with your PROJECT
BUCKET = 'cloud-training-demos-ml'  # Replace with your BUCKET
REGION = 'us-central1'              # Choose an available region for Cloud MLE

import os

os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION

## ensure we're using python2 env
os.environ['CLOUDSDK_PYTHON'] = 'python2'

In [ ]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

## ensure we predict locally with our current Python environment
gcloud config set ml_engine/local_python `which python`

UCI Adult Dataset: https://archive.ics.uci.edu/ml/datasets/adult

Predict whether income exceeds $50K/yr based on census data. Also known as "Census Income" dataset.


In [ ]:
DATA_DIR='gs://cloud-samples-data/ml-engine/census/data'

In [ ]:
import os

TRAIN_DATA_FILE = os.path.join(DATA_DIR, 'adult.data.csv')
EVAL_DATA_FILE = os.path.join(DATA_DIR, 'adult.test.csv')
!gsutil ls -l $TRAIN_DATA_FILE
!gsutil ls -l $EVAL_DATA_FILE

In [ ]:
HEADER = ['age', 'workclass', 'fnlwgt', 'education', 'education_num',
               'marital_status', 'occupation', 'relationship', 'race', 'gender',
               'capital_gain', 'capital_loss', 'hours_per_week',
               'native_country', 'income_bracket']

TARGET_FEATURE_NAME = 'income_bracket'
TARGET_LABELS = [' <=50K', ' >50K']
WEIGHT_COLUMN_NAME = 'fnlwgt'

2. Data Preprocessing

For data preprocessing and transformation, we use TensorFlow Transform to perform the following:

  1. Implement transformation logic in preprocess_fn
  2. Analyze and transform training data.
  3. Transform evaluation data.
  4. Save transformed data, transform schema, and trasform logic.

2.1 Implement preprocess_fn


In [ ]:
def make_preprocessing_fn(raw_schema):

  def preprocessing_fn(input_features):

    processed_features = {}

    for feature in raw_schema.feature:
      feature_name = feature.name
      
      if feature_name in ['income_bracket']:
        processed_features[feature_name] = input_features[feature_name]
      elif feature_name in ['fnlwgt']:
        # Scale weights to be less than 1.0
        processed_features[feature_name + "_scaled"] = (
            tf.cast(input_features[feature_name], tf.float32) / 
            tf.cast(tft.max(input_features[feature_name]), tf.float32))
      elif feature.type == 1:
        # Extract vocabulary and integerize categorical features.
        processed_features[feature_name+"_integerized"] = (
            tft.compute_and_apply_vocabulary(input_features[feature_name], vocab_filename=feature_name))
      else:
        # normalize numeric features.
        processed_features[feature_name+"_scaled"] = tft.scale_to_z_score(input_features[feature_name])

    # Bucketize age using quantiles. 
    quantiles = tft.quantiles(input_features["age"], num_buckets=5, epsilon=0.01)
    processed_features["age_bucketized"] = tft.apply_buckets(
      input_features["age"], bucket_boundaries=quantiles)

    return processed_features

  return preprocessing_fn

2.2 Implement the Beam pipeline


In [ ]:
def run_pipeline(args):
  import tensorflow_transform as tft
  import tensorflow_transform.beam as tft_beam
  import tensorflow_data_validation as tfdv
  from tensorflow_transform.tf_metadata import dataset_metadata
  from tensorflow_transform.tf_metadata import dataset_schema
  from tensorflow_transform.tf_metadata import schema_utils
    
  pipeline_options = beam.pipeline.PipelineOptions(flags=[], **args)
    
  raw_schema_location = args['raw_schema_location']
  raw_train_data_location = args['raw_train_data_location']
  raw_eval_data_location = args['raw_eval_data_location']
  transformed_train_data_location = args['transformed_train_data_location']
  transformed_eval_data_location = args['transformed_eval_data_location']
  transform_artifact_location = args['transform_artifact_location']
  temporary_dir = args['temporary_dir']
  runner = args['runner']
    
  print ("Raw schema location: {}".format(raw_schema_location))
  print ("Raw train data location: {}".format(raw_train_data_location))
  print ("Raw evaluation data location: {}".format(raw_eval_data_location))
  print ("Transformed train data location: {}".format(transformed_train_data_location))
  print ("Transformed evaluation data location: {}".format(transformed_eval_data_location))
  print ("Transform artifact location: {}".format(transform_artifact_location))
  print ("Temporary directory: {}".format(temporary_dir))
  print ("Runner: {}".format(runner))
  print ("")

  # Load TFDV schema and create tft schema from it.
  source_raw_schema = tfdv.load_schema_text(raw_schema_location)
  raw_feature_spec = schema_utils.schema_as_feature_spec(source_raw_schema).feature_spec
  raw_metadata = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(raw_feature_spec))

  with beam.Pipeline(runner, options=pipeline_options) as pipeline:
    with tft_beam.Context(temporary_dir):
      
      converter = tft.coders.CsvCoder(column_names=HEADER, 
        schema=raw_metadata.schema)

      ###### analyze & transform trainining data ###############################

      # Read raw training csv data.
      step = 'Train'
      raw_train_data = (
        pipeline
          | '{} - Read Raw Data'.format(step) >> beam.io.textio.ReadFromText(raw_train_data_location)
          | '{} - Remove Empty Rows'.format(step) >> beam.Filter(lambda line: line)
          | '{} - Decode CSV Data'.format(step) >> beam.Map(converter.decode)
        )
      
      # Create a train dataset from the data and schema.
      raw_train_dataset = (raw_train_data, raw_metadata)

      # Analyze and transform raw_train_dataset to produced transformed_train_dataset and transform_fn.
      transformed_train_dataset, transform_fn = (
        raw_train_dataset 
        | '{} - Analyze & Transform'.format(step) >> tft_beam.AnalyzeAndTransformDataset(
              make_preprocessing_fn(source_raw_schema))
      )
  
      # Get data and schema separately from the transformed_train_dataset.
      transformed_train_data, transformed_metadata = transformed_train_dataset

      # write transformed train data to sink.
      print ("Writing transformed training data...")
      _ = (
        transformed_train_data 
          | '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
            file_path_prefix=transformed_train_data_location,
            file_name_suffix=".tfrecords",
            coder=tft.coders.ExampleProtoCoder(transformed_metadata.schema))
        )

      ###### transform evaluation data #########################################

      # Read raw training csv data.
      step = 'Eval'
      raw_eval_data = (
        pipeline
          | '{} - Read Raw Data'.format(step) >> beam.io.textio.ReadFromText(raw_eval_data_location)
          | '{} - Remove Empty Rows'.format(step) >> beam.Filter(lambda line: line)
          | '{} - Decode CSV Data'.format(step) >> beam.Map(converter.decode)
        )
      
      # Create a eval dataset from the data and schema.
      raw_eval_dataset = (raw_eval_data, raw_metadata)

      # Transform eval data based on produced transform_fn.
      transformed_eval_dataset = (
        (raw_eval_dataset, transform_fn) 
          | '{} - Transform'.format(step) >> tft_beam.TransformDataset()
      )

      # Get data from the transformed_eval_dataset.
      transformed_eval_data, _ = transformed_eval_dataset

      # Write transformed eval data to sink.
      _ = (
          transformed_eval_data 
          | '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
              file_path_prefix=transformed_eval_data_location,
              file_name_suffix=".tfrecords",
              coder=tft.coders.ExampleProtoCoder(transformed_metadata.schema))
        )

      ###### write transformation metadata #######################################################

      # Write transform_fn.
      print ("Writing transform artifacts...")
      _ = (
          transform_fn 
          | 'Write Transform Artifacts' >> tft_beam.WriteTransformFn(
              transform_artifact_location)
      )

1.4 Run data tranformation pipeline


In [ ]:
!python -m pip freeze | grep tensorflow

In [ ]:
%%writefile setup.py
from setuptools import setup, find_packages

setup(name='tfxdemo',
      version='1.0',
      packages=find_packages(),
      install_requires=['tensorflow-transform==0.13.0', 
                        'tensorflow-data-validation==0.13.1'],
)

In [ ]:
#runner = 'DirectRunner'; OUTPUT_DIR = 'output'   # on-prem
#runner = 'DirectRunner'; OUTPUT_DIR = 'gs://{}/census/tfx'.format(BUCKET)  # hybrid
runner = 'DataflowRunner'; OUTPUT_DIR = 'gs://{}/census/tfx'.format(BUCKET)  # on GCP
job_name = 'tft-census' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')

RAW_SCHEMA_LOCATION = 'raw_schema.pbtxt'
TRANSFORM_ARTIFACTS_DIR = os.path.join(OUTPUT_DIR,'transform')
TRANSFORMED_DATA_DIR = os.path.join(OUTPUT_DIR,'transformed')
TEMP_DIR = os.path.join(OUTPUT_DIR, 'tmp')

args = {
    
    'runner': runner,
    'job_name': job_name,

    'raw_schema_location': RAW_SCHEMA_LOCATION,

    'raw_train_data_location': TRAIN_DATA_FILE,
    'raw_eval_data_location': EVAL_DATA_FILE,

    'transformed_train_data_location':  os.path.join(TRANSFORMED_DATA_DIR, "train"),
    'transformed_eval_data_location':  os.path.join(TRANSFORMED_DATA_DIR, "eval"),
    'transform_artifact_location':  TRANSFORM_ARTIFACTS_DIR,
    
    'temporary_dir': TEMP_DIR,
    'project': PROJECT,
    'temp_location': TEMP_DIR,
    'staging_location': os.path.join(OUTPUT_DIR, 'staging'),
    'max_num_workers': 8,
    'save_main_session': False,
    'setup_file': './setup.py'
}

In [ ]:
if tf.gfile.Exists(OUTPUT_DIR):
  print("Removing {} contents...".format(OUTPUT_DIR))
  tf.gfile.DeleteRecursively(OUTPUT_DIR)

tf.logging.set_verbosity(tf.logging.ERROR)
print("Running TF Transform pipeline...")
print("Runner: {}".format(runner))
if runner == "DataflowRunner":
    print("Launching Dataflow job {} ...".format(job_name))
    print("Waitting until the Dataflow job finishes...")
print()
run_pipeline(args)
print()
print("Pipeline is done.")

Check the outputs


In [ ]:
!gsutil ls $OUTPUT_DIR/*

In [ ]:
!gsutil ls $OUTPUT_DIR/transform/transform_fn

In [ ]:
!gsutil cat $OUTPUT_DIR/transform/transformed_metadata/schema.pbtxt

License

Copyright 2019 Google LLC

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.


This is not an official Google product. The sample code provided for educational purposes only.