In [ ]:
from __future__ import print_function
import os
import tempfile
import pandas as pd
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform import beam as tft_beam
import tfx_utils
from tfx.utils import io_utils
from tensorflow_metadata.proto.v0 import schema_pb2
# For DatasetMetadata boilerplate
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import schema_utils
tf.get_logger().propagate = False
def _make_default_sqlite_uri(pipeline_name):
return os.path.join(os.environ['HOME'], 'airflow/tfx/metadata', pipeline_name, 'metadata.db')
def get_metadata_store(pipeline_name):
return tfx_utils.TFXReadonlyMetadataStore.from_sqlite_db(_make_default_sqlite_uri(pipeline_name))
pipeline_name = 'taxi'
pipeline_db_path = _make_default_sqlite_uri(pipeline_name)
print('Pipeline DB:\n{}'.format(pipeline_db_path))
store = get_metadata_store(pipeline_name)
Get the schema URI from the metadata store
In [ ]:
# Get the schema URI from the metadata store
schemas = store.get_artifacts_of_type_df(tfx_utils.TFXArtifactTypes.SCHEMA)
assert len(schemas.URI) == 1
schema_uri = schemas.URI.iloc[0] + 'schema.pbtxt'
print ('Schema URI:\n{}'.format(schema_uri))
Get the schema that was inferred by TensorFlow Data Validation
In [ ]:
schema_proto = io_utils.parse_pbtxt_file(file_name=schema_uri, message=schema_pb2.Schema())
feature_spec, domains = schema_utils.schema_as_feature_spec(schema_proto)
legacy_metadata = dataset_metadata.DatasetMetadata(dataset_schema.from_feature_spec(feature_spec, domains))
Define features and create functions for TensorFlow Transform
In [ ]:
# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]
CATEGORICAL_FEATURE_KEYS = [
'trip_start_hour', 'trip_start_day', 'trip_start_month',
'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
'dropoff_community_area'
]
DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']
# Number of buckets used by tf.transform for encoding each feature.
FEATURE_BUCKET_COUNT = 10
BUCKET_FEATURE_KEYS = [
'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
'dropoff_longitude'
]
# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
VOCAB_SIZE = 1000
# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
OOV_SIZE = 10
VOCAB_FEATURE_KEYS = [
'payment_type',
'company',
]
# Keys
LABEL_KEY = 'tips'
FARE_KEY = 'fare'
def transformed_name(key):
return key + '_xf'
def _transformed_names(keys):
return [transformed_name(key) for key in keys]
# Tf.Transform considers these features as "raw"
def _get_raw_feature_spec(schema):
return schema_utils.schema_as_feature_spec(schema).feature_spec
def _gzip_reader_fn(filenames):
"""Small utility returning a record reader that can read gzip'ed files."""
return tf.data.TFRecordDataset(
filenames,
compression_type='GZIP')
def _fill_in_missing(x):
"""Replace missing values in a SparseTensor.
Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
Args:
x: A `SparseTensor` of rank 2. Its dense shape should have size at most 1
in the second dimension.
Returns:
A rank 1 tensor where missing values of `x` have been filled in.
"""
default_value = '' if x.dtype == tf.string else 0
return tf.squeeze(
tf.sparse.to_dense(
tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
default_value),
axis=1)
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
outputs = {}
for key in DENSE_FLOAT_FEATURE_KEYS:
# Preserve this feature as a dense float, setting nan's to the mean.
outputs[transformed_name(key)] = tft.scale_to_z_score(
_fill_in_missing(inputs[key]))
for key in VOCAB_FEATURE_KEYS:
# Build a vocabulary for this feature.
outputs[transformed_name(key)] = tft.compute_and_apply_vocabulary(
_fill_in_missing(inputs[key]),
top_k=VOCAB_SIZE,
num_oov_buckets=OOV_SIZE)
for key in BUCKET_FEATURE_KEYS:
outputs[transformed_name(key)] = tft.bucketize(
_fill_in_missing(inputs[key]), FEATURE_BUCKET_COUNT,
always_return_num_quantiles=False)
for key in CATEGORICAL_FEATURE_KEYS:
outputs[transformed_name(key)] = _fill_in_missing(inputs[key])
# Was this passenger a big tipper?
taxi_fare = _fill_in_missing(inputs[FARE_KEY])
tips = _fill_in_missing(inputs[LABEL_KEY])
outputs[transformed_name(LABEL_KEY)] = tf.where(
tf.math.is_nan(taxi_fare),
tf.cast(tf.zeros_like(taxi_fare), tf.int64),
# Test if the tip was > 20% of the fare.
tf.cast(
tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))
return outputs
Display the results of transforming some example data
In [ ]:
from IPython.display import display
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
raw_examples = [
{
"fare": [100.0],
"trip_start_hour": [12],
"pickup_census_tract": ['abcd'],
"dropoff_census_tract": [12345.0], # No idea why this is a float
"company": ['taxi inc.'],
"trip_start_timestamp": [123456],
"pickup_longitude": [12.0],
"trip_start_month": [5],
"trip_miles": [8.0],
"dropoff_longitude": [12.05],
"dropoff_community_area": [123],
"pickup_community_area": [123],
"payment_type": ['visa'],
"trip_seconds": [600.0],
"trip_start_day": [12],
"tips": [10.0],
"pickup_latitude": [80.0],
"dropoff_latitude": [80.01],
}
]
(transformed_examples, transformed_metadata), transform_fn = (
(raw_examples, legacy_metadata)
| 'AnalyzeAndTransform' >> tft_beam.AnalyzeAndTransformDataset(
preprocessing_fn))
display(pd.DataFrame(transformed_examples))