The puprpose of this tutorial is to show how to do end-to-end ML with TFX libraries on Google Cloud Platform. This tutorial covers:
This notebook has been tested in Jupyter on the Deep Learning VM.
In [ ]:
import apache_beam as beam
import tensorflow as tf
import tensorflow_data_validation as tfdv
import tensorflow_transform as tft
print('TF version: {}'.format(tf.__version__))
print('TFT version: {}'.format(tft.__version__))
print('TFDV version: {}'.format(tfdv.__version__))
print('Apache Beam version: {}'.format(beam.__version__))
In [ ]:
PROJECT = 'cloud-training-demos' # Replace with your PROJECT
BUCKET = 'cloud-training-demos-ml' # Replace with your BUCKET
REGION = 'us-central1' # Choose an available region for Cloud MLE
import os
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
## ensure we're using python2 env
os.environ['CLOUDSDK_PYTHON'] = 'python2'
In [ ]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION
## ensure we predict locally with our current Python environment
gcloud config set ml_engine/local_python `which python`
Predict whether income exceeds $50K/yr based on census data. Also known as "Census Income" dataset.
In [ ]:
DATA_DIR='gs://cloud-samples-data/ml-engine/census/data'
In [ ]:
import os
TRAIN_DATA_FILE = os.path.join(DATA_DIR, 'adult.data.csv')
EVAL_DATA_FILE = os.path.join(DATA_DIR, 'adult.test.csv')
!gsutil ls -l $TRAIN_DATA_FILE
!gsutil ls -l $EVAL_DATA_FILE
In [ ]:
HEADER = ['age', 'workclass', 'fnlwgt', 'education', 'education_num',
'marital_status', 'occupation', 'relationship', 'race', 'gender',
'capital_gain', 'capital_loss', 'hours_per_week',
'native_country', 'income_bracket']
TARGET_FEATURE_NAME = 'income_bracket'
TARGET_LABELS = [' <=50K', ' >50K']
WEIGHT_COLUMN_NAME = 'fnlwgt'
For data preprocessing and transformation, we use TensorFlow Transform to perform the following:
In [ ]:
def make_preprocessing_fn(raw_schema):
def preprocessing_fn(input_features):
processed_features = {}
for feature in raw_schema.feature:
feature_name = feature.name
if feature_name in ['income_bracket']:
processed_features[feature_name] = input_features[feature_name]
elif feature_name in ['fnlwgt']:
# Scale weights to be less than 1.0
processed_features[feature_name + "_scaled"] = (
tf.cast(input_features[feature_name], tf.float32) /
tf.cast(tft.max(input_features[feature_name]), tf.float32))
elif feature.type == 1:
# Extract vocabulary and integerize categorical features.
processed_features[feature_name+"_integerized"] = (
tft.compute_and_apply_vocabulary(input_features[feature_name], vocab_filename=feature_name))
else:
# normalize numeric features.
processed_features[feature_name+"_scaled"] = tft.scale_to_z_score(input_features[feature_name])
# Bucketize age using quantiles.
quantiles = tft.quantiles(input_features["age"], num_buckets=5, epsilon=0.01)
processed_features["age_bucketized"] = tft.apply_buckets(
input_features["age"], bucket_boundaries=quantiles)
return processed_features
return preprocessing_fn
In [ ]:
def run_pipeline(args):
import tensorflow_transform as tft
import tensorflow_transform.beam as tft_beam
import tensorflow_data_validation as tfdv
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import schema_utils
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **args)
raw_schema_location = args['raw_schema_location']
raw_train_data_location = args['raw_train_data_location']
raw_eval_data_location = args['raw_eval_data_location']
transformed_train_data_location = args['transformed_train_data_location']
transformed_eval_data_location = args['transformed_eval_data_location']
transform_artifact_location = args['transform_artifact_location']
temporary_dir = args['temporary_dir']
runner = args['runner']
print ("Raw schema location: {}".format(raw_schema_location))
print ("Raw train data location: {}".format(raw_train_data_location))
print ("Raw evaluation data location: {}".format(raw_eval_data_location))
print ("Transformed train data location: {}".format(transformed_train_data_location))
print ("Transformed evaluation data location: {}".format(transformed_eval_data_location))
print ("Transform artifact location: {}".format(transform_artifact_location))
print ("Temporary directory: {}".format(temporary_dir))
print ("Runner: {}".format(runner))
print ("")
# Load TFDV schema and create tft schema from it.
source_raw_schema = tfdv.load_schema_text(raw_schema_location)
raw_feature_spec = schema_utils.schema_as_feature_spec(source_raw_schema).feature_spec
raw_metadata = dataset_metadata.DatasetMetadata(
dataset_schema.from_feature_spec(raw_feature_spec))
with beam.Pipeline(runner, options=pipeline_options) as pipeline:
with tft_beam.Context(temporary_dir):
converter = tft.coders.CsvCoder(column_names=HEADER,
schema=raw_metadata.schema)
###### analyze & transform trainining data ###############################
# Read raw training csv data.
step = 'Train'
raw_train_data = (
pipeline
| '{} - Read Raw Data'.format(step) >> beam.io.textio.ReadFromText(raw_train_data_location)
| '{} - Remove Empty Rows'.format(step) >> beam.Filter(lambda line: line)
| '{} - Decode CSV Data'.format(step) >> beam.Map(converter.decode)
)
# Create a train dataset from the data and schema.
raw_train_dataset = (raw_train_data, raw_metadata)
# Analyze and transform raw_train_dataset to produced transformed_train_dataset and transform_fn.
transformed_train_dataset, transform_fn = (
raw_train_dataset
| '{} - Analyze & Transform'.format(step) >> tft_beam.AnalyzeAndTransformDataset(
make_preprocessing_fn(source_raw_schema))
)
# Get data and schema separately from the transformed_train_dataset.
transformed_train_data, transformed_metadata = transformed_train_dataset
# write transformed train data to sink.
print ("Writing transformed training data...")
_ = (
transformed_train_data
| '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
file_path_prefix=transformed_train_data_location,
file_name_suffix=".tfrecords",
coder=tft.coders.ExampleProtoCoder(transformed_metadata.schema))
)
###### transform evaluation data #########################################
# Read raw training csv data.
step = 'Eval'
raw_eval_data = (
pipeline
| '{} - Read Raw Data'.format(step) >> beam.io.textio.ReadFromText(raw_eval_data_location)
| '{} - Remove Empty Rows'.format(step) >> beam.Filter(lambda line: line)
| '{} - Decode CSV Data'.format(step) >> beam.Map(converter.decode)
)
# Create a eval dataset from the data and schema.
raw_eval_dataset = (raw_eval_data, raw_metadata)
# Transform eval data based on produced transform_fn.
transformed_eval_dataset = (
(raw_eval_dataset, transform_fn)
| '{} - Transform'.format(step) >> tft_beam.TransformDataset()
)
# Get data from the transformed_eval_dataset.
transformed_eval_data, _ = transformed_eval_dataset
# Write transformed eval data to sink.
_ = (
transformed_eval_data
| '{} - Write Transformed Data'.format(step) >> beam.io.tfrecordio.WriteToTFRecord(
file_path_prefix=transformed_eval_data_location,
file_name_suffix=".tfrecords",
coder=tft.coders.ExampleProtoCoder(transformed_metadata.schema))
)
###### write transformation metadata #######################################################
# Write transform_fn.
print ("Writing transform artifacts...")
_ = (
transform_fn
| 'Write Transform Artifacts' >> tft_beam.WriteTransformFn(
transform_artifact_location)
)
In [ ]:
!python -m pip freeze | grep tensorflow
In [ ]:
%%writefile setup.py
from setuptools import setup, find_packages
setup(name='tfxdemo',
version='1.0',
packages=find_packages(),
install_requires=['tensorflow-transform==0.13.0',
'tensorflow-data-validation==0.13.1'],
)
In [ ]:
#runner = 'DirectRunner'; OUTPUT_DIR = 'output' # on-prem
#runner = 'DirectRunner'; OUTPUT_DIR = 'gs://{}/census/tfx'.format(BUCKET) # hybrid
runner = 'DataflowRunner'; OUTPUT_DIR = 'gs://{}/census/tfx'.format(BUCKET) # on GCP
job_name = 'tft-census' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')
RAW_SCHEMA_LOCATION = 'raw_schema.pbtxt'
TRANSFORM_ARTIFACTS_DIR = os.path.join(OUTPUT_DIR,'transform')
TRANSFORMED_DATA_DIR = os.path.join(OUTPUT_DIR,'transformed')
TEMP_DIR = os.path.join(OUTPUT_DIR, 'tmp')
args = {
'runner': runner,
'job_name': job_name,
'raw_schema_location': RAW_SCHEMA_LOCATION,
'raw_train_data_location': TRAIN_DATA_FILE,
'raw_eval_data_location': EVAL_DATA_FILE,
'transformed_train_data_location': os.path.join(TRANSFORMED_DATA_DIR, "train"),
'transformed_eval_data_location': os.path.join(TRANSFORMED_DATA_DIR, "eval"),
'transform_artifact_location': TRANSFORM_ARTIFACTS_DIR,
'temporary_dir': TEMP_DIR,
'project': PROJECT,
'temp_location': TEMP_DIR,
'staging_location': os.path.join(OUTPUT_DIR, 'staging'),
'max_num_workers': 8,
'save_main_session': False,
'setup_file': './setup.py'
}
In [ ]:
if tf.gfile.Exists(OUTPUT_DIR):
print("Removing {} contents...".format(OUTPUT_DIR))
tf.gfile.DeleteRecursively(OUTPUT_DIR)
tf.logging.set_verbosity(tf.logging.ERROR)
print("Running TF Transform pipeline...")
print("Runner: {}".format(runner))
if runner == "DataflowRunner":
print("Launching Dataflow job {} ...".format(job_name))
print("Waitting until the Dataflow job finishes...")
print()
run_pipeline(args)
print()
print("Pipeline is done.")
In [ ]:
!gsutil ls $OUTPUT_DIR/*
In [ ]:
!gsutil ls $OUTPUT_DIR/transform/transform_fn
In [ ]:
!gsutil cat $OUTPUT_DIR/transform/transformed_metadata/schema.pbtxt
Copyright 2019 Google LLC
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.