In [2]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#            http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Analyzing structured data with Tensorflow Data Validation

This notebook demonstrates how TensorFlow Data Validation (TFDV) can be used to analyze and validate structured data, including generating descriptive statistics, inferring and fine tuning schema, checking for and fixing anomalies, and detecting drift and skew. It's important to understand your dataset's characteristics, including how it might change over time in your production pipeline. It's also important to look for anomalies in your data, and to compare your training, evaluation, and serving datasets to make sure that they're consistent. TFDV is the tool to achieve it.

You are going to use a variant of Cover Type dataset. For more information about the dataset refer to the dataset summary page.

Lab setup

Make sure to set the Jupyter kernel for this notebook to tfx.

Import packages and check the versions


In [ ]:
import os
import tempfile
import tensorflow as tf
import tensorflow_data_validation as tfdv
import time

from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, DebugOptions, WorkerOptions
from google.protobuf import text_format
from tensorflow_metadata.proto.v0 import schema_pb2, statistics_pb2

print('TensorFlow version: {}'.format(tf.__version__))
print('TensorFlow Data Validation version: {}'.format(tfdv.__version__))

Set the GCS locations of datasets used during the lab


In [ ]:
TRAINING_DATASET='gs://workshop-datasets/covertype/training/dataset.csv'
TRAINING_DATASET_WITH_MISSING_VALUES='gs://workshop-datasets/covertype/training_missing/dataset.csv'
EVALUATION_DATASET='gs://workshop-datasets/covertype/evaluation/dataset.csv'
EVALUATION_DATASET_WITH_ANOMALIES='gs://workshop-datasets/covertype/evaluation_anomalies/dataset.csv'
SERVING_DATASET='gs://workshop-datasets/covertype/serving/dataset.csv'

Set the local path to the lab's folder.


In [ ]:
LAB_ROOT_FOLDER='/home/mlops-labs/lab-31-tfdv-structured-data'

Configure GCP project, region, and staging bucket


In [ ]:
PROJECT_ID = 'mlops-workshop'
REGION = 'us-central1'
STAGING_BUCKET = 'gs://{}-staging'.format(PROJECT_ID)

Computing and visualizing descriptive statistics

TFDV can compute descriptive statistics that provide a quick overview of the data in terms of the features that are present and the shapes of their value distributions.

Internally, TFDV uses Apache Beam's data-parallel processing framework to scale the computation of statistics over large datasets. For applications that wish to integrate deeper with TFDV (e.g., attach statistics generation at the end of a data-generation pipeline), the API also exposes a Beam PTransform for statistics generation.

Let's start by using tfdv.generate_statistics_from_csv to compute statistics for the training data split.

Notice that although your dataset is in Google Cloud Storage you will run you computation locally on the notebook's host, using the Beam DirectRunner. Later in the lab, you will use Cloud Dataflow to calculate statistics on a remote distributed cluster.


In [ ]:
train_stats = tfdv.generate_statistics_from_csv(
    data_location=TRAINING_DATASET_WITH_MISSING_VALUES
)

You can now use tfdv.visualize_statistics to create a visualization of your data. tfdv.visualize_statistics uses Facets that provides succinct, interactive visualizations to aid in understanding and analyzing machine learning datasets.


In [ ]:
tfdv.visualize_statistics(train_stats)

The interactive widget you see is Facets Overview.

  • Numeric features and categorical features are visualized separately, including charts showing the distributions for each feature.
  • Features with missing or zero values display a percentage in red as a visual indicator that there may be issues with examples in those features. The percentage is the percentage of examples that have missing or zero values for that feature.
  • Try clicking "expand" above the charts to change the display
  • Try hovering over bars in the charts to display bucket ranges and counts
  • Try switching between the log and linear scales
  • Try selecting "quantiles" from the "Chart to show" menu, and hover over the markers to show the quantile percentages

Infering Schema

Now let's use tfdv.infer_schema to create a schema for the data. A schema defines constraints for the data that are relevant for ML. Example constraints include the data type of each feature, whether it's numerical or categorical, or the frequency of its presence in the data. For categorical features the schema also defines the domain - the list of acceptable values. Since writing a schema can be a tedious task, especially for datasets with lots of features, TFDV provides a method to generate an initial version of the schema based on the descriptive statistics.

Infer the schema from the training dataset statistics


In [ ]:
schema = tfdv.infer_schema(train_stats)
tfdv.display_schema(schema=schema)

In general, TFDV uses conservative heuristics to infer stable data properties from the statistics in order to avoid overfitting the schema to the specific dataset. It is strongly advised to review the inferred schema and refine it as needed, to capture any domain knowledge about the data that TFDV's heuristics might have missed.

In our case tfdv.infer_schema did not interpreted the Soil_Type and Cover_Type fields properly. Although both fields are encoded as integers, they should be interpreted as categorical rather than numeric.

You can use TFDV to manually update the schema including, specifing which features are categorical and which ones are quantitative and defining feature domains.

Fine tune the schema

You are going to modify the schema:

  • Particularize the Soil_Type and Cover_Type as categorical features. Notice that at this point you don't set the domain of Soil_Type as enumerating all possible values is not possible without a full scan of the dataset. After you re-generate the statistics using the correct feature designations you can retrieve the domain from the new statistics and finalize the schema
  • Set contstraints on the values of the Slope feature. You know that the slope is measured in degrees of arc and should be in the 0-90 range.

In [ ]:
tfdv.get_feature(schema, 'Soil_Type').type = schema_pb2.FeatureType.BYTES
tfdv.set_domain(schema, 'Soil_Type', schema_pb2.StringDomain(name='Soil_Type', value=[]))

tfdv.set_domain(schema, 'Cover_Type', schema_pb2.IntDomain(name='Cover_Type', min=1, max=7, is_categorical=True))

tfdv.get_feature(schema, 'Slope').type = schema_pb2.FeatureType.FLOAT
tfdv.set_domain(schema, 'Slope',  schema_pb2.FloatDomain(name='Slope', min=0, max=90))

tfdv.display_schema(schema=schema)

Generate new statistics using the updated schema.


In [ ]:
stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)

train_stats = tfdv.generate_statistics_from_csv(
    data_location=TRAINING_DATASET_WITH_MISSING_VALUES,
    stats_options=stats_options,
)

tfdv.visualize_statistics(train_stats)

Finalize the schema

The train_stats object is a instance of the statistics_pb2 class, which is a Python wrapper around the statistics.proto protbuf. You can use the protobuf Python interface to retrieve the generated statistics, including the infered domains of categorical features.


In [ ]:
soil_type_stats = [feature for feature in train_stats.datasets[0].features if feature.path.step[0]=='Soil_Type'][0].string_stats
soil_type_domain = [bucket.label for bucket in soil_type_stats.rank_histogram.buckets]

tfdv.set_domain(schema, 'Soil_Type', schema_pb2.StringDomain(name='Soil_Type', value=soil_type_domain))
tfdv.display_schema(schema=schema)

Creating statistics using Cloud Dataflow

Previously, you created descriptive statistics using local compute. This may work for smaller datasets. But for large datasets you may not have enough local compute power. The tfdv.generate_statistics_* functions can utilize DataflowRunner to run Beam processing on a distributed Dataflow cluster.

To run TFDV on Google Cloud Dataflow, the TFDV library must be must be installed on the Dataflow workers. There are different ways to install additional packages on Dataflow. You are going to use the Python setup.py file approach.

You also configure tfdv.generate_statistics_from_csv to use the final schema created in the previous steps.

Configure Dataflow settings

Create the setup.py configured to install TFDV.


In [ ]:
%%writefile setup.py

from setuptools import setup

setup(
    name='tfdv',
    description='TFDV Runtime.',
    version='0.1',
    install_requires=[
      'tensorflow_data_validation==0.15.0'
    ]
)

Regenerate statistics

Re-generate the statistics using Dataflow and the final schema. You can monitor the job progress using Dataflow UI


In [ ]:
options = PipelineOptions()
options.view_as(GoogleCloudOptions).project = PROJECT_ID
options.view_as(GoogleCloudOptions).region = REGION
options.view_as(GoogleCloudOptions).job_name = "tfdv-{}".format(time.strftime("%Y%m%d-%H%M%S"))
options.view_as(GoogleCloudOptions).staging_location = STAGING_BUCKET + '/staging/'
options.view_as(GoogleCloudOptions).temp_location = STAGING_BUCKET + '/tmp/'
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(SetupOptions).setup_file = os.path.join(LAB_ROOT_FOLDER, 'setup.py')

stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)

train_stats = tfdv.generate_statistics_from_csv(
    data_location=TRAINING_DATASET_WITH_MISSING_VALUES,
    stats_options=stats_options,
    pipeline_options=options,
    output_path=STAGING_BUCKET + '/output/'
)

In [ ]:
tfdv.visualize_statistics(train_stats)

Analyzing evaluation data

So far we've only been looking at the training data. It's important that our evaluation data is consistent with our training data, including that it uses the same schema. It's also important that the evaluation data includes examples of roughly the same ranges of values for our numerical features as our training data, so that our coverage of the loss surface during evaluation is roughly the same as during training. The same is true for categorical features. Otherwise, we may have training issues that are not identified during evaluation, because we didn't evaluate part of our loss surface.

You will now generate statistics for the evaluation split and visualize both training and evaluation splits on the same chart:

  • The training and evaluation datasets overlay, making it easy to compare them.
  • The charts now include a percentages view, which can be combined with log or the default linear scales.
  • Click expand on the Numeric Features chart, and select the log scale. Review the Slope feature, and notice the difference in the max. Will that cause problems?

In [ ]:
stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)

eval_stats = tfdv.generate_statistics_from_csv(
    data_location=EVALUATION_DATASET_WITH_ANOMALIES,
    stats_options=stats_options
)

tfdv.visualize_statistics(lhs_statistics=eval_stats, rhs_statistics=train_stats,
                         lhs_name='EVAL DATASET', rhs_name='TRAIN_DATASET')

Checking for anomalies

Does our evaluation dataset match the schema from our training dataset? This is especially important for categorical features, where we want to identify the range of acceptable values.

What would happen if we tried to evaluate using data with categorical feature values that were not in our training dataset? What about numeric features that are outside the ranges in our training dataset?


In [ ]:
anomalies = tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)

Fixing evaluation anomalies in the schema

It looks like we have some new values for Soil_Type and some out-of-range values for Slope in our evaluation data, that we didn't have in our training data. Whever it should be considered anomaly, depends on our domain knowledge of the data. If an anomaly truly indicates a data error, then the underlying data should be fixed. Otherwise, we can simply update the schema to include the values in the eval dataset.

In our case, you are going to add the 5151 value to the domain of Soil_Type as 5151 is a valid USFS Ecological Landtype Units code representing the unspecified soil type. The out-of-range values in Slope are data errors and should be fixed at the source.


In [ ]:
tfdv.get_domain(schema, 'Soil_Type').value.append('5151')

Re-validate with the updated schema


In [ ]:
updated_anomalies = tfdv.validate_statistics(eval_stats, schema)
tfdv.display_anomalies(updated_anomalies)

The unexpected string values error in Soil_Type is gone but the out-of-range error in Slope is still there. Let's pretend you have fixed the source and re-evaluate the evaluation split without corrupted Slope.


In [ ]:
stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)

eval_stats = tfdv.generate_statistics_from_csv(
    data_location=EVALUATION_DATASET,
    stats_options=stats_options
)
updated_anomalies = tfdv.validate_statistics(eval_stats, schema)
tfdv.display_anomalies(updated_anomalies)

In [ ]:
tfdv.display_schema(schema=schema)

Schema environments

In supervised learning we need to include labels in our dataset, but when we serve the model for inference the labels will not be included. In cases like that introducing slight schema variations is necessary.

For example, in this dataset the Cover_Type feature is included as the label for training, but it's missing in the serving data. If you validate the serving data statistics against the current schema you get an anomaly


In [ ]:
stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)

eval_stats = tfdv.generate_statistics_from_csv(
    data_location=SERVING_DATASET,
    stats_options=stats_options
)
serving_anomalies = tfdv.validate_statistics(eval_stats, schema)
tfdv.display_anomalies(serving_anomalies)

Environments can be used to address such scenarios. In particular, specific features in schema can be associated with specific environments.


In [ ]:
schema.default_environment.append('TRAINING')
schema.default_environment.append('SERVING')
tfdv.get_feature(schema, 'Cover_Type').not_in_environment.append('SERVING')

If you validate the serving statistics against the serving environment in schema you will not get anomaly


In [ ]:
serving_anomalies = tfdv.validate_statistics(eval_stats, schema, environment='SERVING')
tfdv.display_anomalies(serving_anomalies)

Freezing the schema

When the schema is finalized it can be saved as a textfile and managed under source control like any other code artifact.


In [ ]:
output_dir = os.path.join(tempfile.mkdtemp(),'covertype_schema')

tf.io.gfile.makedirs(output_dir)
schema_file = os.path.join(output_dir, 'schema.pbtxt')
tfdv.write_schema_text(schema, schema_file)

!cat {schema_file}

In [ ]: