In [2]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
This notebook demonstrates how TensorFlow Data Validation (TFDV) can be used to analyze and validate structured data, including generating descriptive statistics, inferring and fine tuning schema, checking for and fixing anomalies, and detecting drift and skew. It's important to understand your dataset's characteristics, including how it might change over time in your production pipeline. It's also important to look for anomalies in your data, and to compare your training, evaluation, and serving datasets to make sure that they're consistent. TFDV is the tool to achieve it.
You are going to use a variant of Cover Type dataset. For more information about the dataset refer to the dataset summary page.
In [ ]:
import os
import tempfile
import tensorflow as tf
import tensorflow_data_validation as tfdv
import time
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, SetupOptions, DebugOptions, WorkerOptions
from google.protobuf import text_format
from tensorflow_metadata.proto.v0 import schema_pb2, statistics_pb2
print('TensorFlow version: {}'.format(tf.__version__))
print('TensorFlow Data Validation version: {}'.format(tfdv.__version__))
In [ ]:
TRAINING_DATASET='gs://workshop-datasets/covertype/training/dataset.csv'
TRAINING_DATASET_WITH_MISSING_VALUES='gs://workshop-datasets/covertype/training_missing/dataset.csv'
EVALUATION_DATASET='gs://workshop-datasets/covertype/evaluation/dataset.csv'
EVALUATION_DATASET_WITH_ANOMALIES='gs://workshop-datasets/covertype/evaluation_anomalies/dataset.csv'
SERVING_DATASET='gs://workshop-datasets/covertype/serving/dataset.csv'
In [ ]:
LAB_ROOT_FOLDER='/home/mlops-labs/lab-31-tfdv-structured-data'
In [ ]:
PROJECT_ID = 'mlops-workshop'
REGION = 'us-central1'
STAGING_BUCKET = 'gs://{}-staging'.format(PROJECT_ID)
TFDV can compute descriptive statistics that provide a quick overview of the data in terms of the features that are present and the shapes of their value distributions.
Internally, TFDV uses Apache Beam's data-parallel processing framework to scale the computation of statistics over large datasets. For applications that wish to integrate deeper with TFDV (e.g., attach statistics generation at the end of a data-generation pipeline), the API also exposes a Beam PTransform for statistics generation.
Let's start by using tfdv.generate_statistics_from_csv
to compute statistics for the training data split.
Notice that although your dataset is in Google Cloud Storage you will run you computation locally on the notebook's host, using the Beam DirectRunner. Later in the lab, you will use Cloud Dataflow to calculate statistics on a remote distributed cluster.
In [ ]:
train_stats = tfdv.generate_statistics_from_csv(
data_location=TRAINING_DATASET_WITH_MISSING_VALUES
)
You can now use tfdv.visualize_statistics
to create a visualization of your data. tfdv.visualize_statistics
uses Facets that provides succinct, interactive visualizations to aid in understanding and analyzing machine learning datasets.
In [ ]:
tfdv.visualize_statistics(train_stats)
The interactive widget you see is Facets Overview.
Now let's use tfdv.infer_schema
to create a schema for the data. A schema defines constraints for the data that are relevant for ML. Example constraints include the data type of each feature, whether it's numerical or categorical, or the frequency of its presence in the data. For categorical features the schema also defines the domain - the list of acceptable values. Since writing a schema can be a tedious task, especially for datasets with lots of features, TFDV provides a method to generate an initial version of the schema based on the descriptive statistics.
In [ ]:
schema = tfdv.infer_schema(train_stats)
tfdv.display_schema(schema=schema)
In general, TFDV uses conservative heuristics to infer stable data properties from the statistics in order to avoid overfitting the schema to the specific dataset. It is strongly advised to review the inferred schema and refine it as needed, to capture any domain knowledge about the data that TFDV's heuristics might have missed.
In our case tfdv.infer_schema
did not interpreted the Soil_Type
and Cover_Type
fields properly. Although both fields are encoded as integers, they should be interpreted as categorical rather than numeric.
You can use TFDV to manually update the schema including, specifing which features are categorical and which ones are quantitative and defining feature domains.
You are going to modify the schema:
Soil_Type
and Cover_Type
as categorical features. Notice that at this point you don't set the domain of Soil_Type
as enumerating all possible values is not possible without a full scan of the dataset. After you re-generate the statistics using the correct feature designations you can retrieve the domain from the new statistics and finalize the schemaSlope
feature. You know that the slope is measured in degrees of arc and should be in the 0-90 range.
In [ ]:
tfdv.get_feature(schema, 'Soil_Type').type = schema_pb2.FeatureType.BYTES
tfdv.set_domain(schema, 'Soil_Type', schema_pb2.StringDomain(name='Soil_Type', value=[]))
tfdv.set_domain(schema, 'Cover_Type', schema_pb2.IntDomain(name='Cover_Type', min=1, max=7, is_categorical=True))
tfdv.get_feature(schema, 'Slope').type = schema_pb2.FeatureType.FLOAT
tfdv.set_domain(schema, 'Slope', schema_pb2.FloatDomain(name='Slope', min=0, max=90))
tfdv.display_schema(schema=schema)
In [ ]:
stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)
train_stats = tfdv.generate_statistics_from_csv(
data_location=TRAINING_DATASET_WITH_MISSING_VALUES,
stats_options=stats_options,
)
tfdv.visualize_statistics(train_stats)
The train_stats
object is a instance of the statistics_pb2
class, which is a Python wrapper around the statistics.proto
protbuf. You can use the protobuf Python interface to retrieve the generated statistics, including the infered domains of categorical features.
In [ ]:
soil_type_stats = [feature for feature in train_stats.datasets[0].features if feature.path.step[0]=='Soil_Type'][0].string_stats
soil_type_domain = [bucket.label for bucket in soil_type_stats.rank_histogram.buckets]
tfdv.set_domain(schema, 'Soil_Type', schema_pb2.StringDomain(name='Soil_Type', value=soil_type_domain))
tfdv.display_schema(schema=schema)
Previously, you created descriptive statistics using local compute. This may work for smaller datasets. But for large datasets you may not have enough local compute power. The tfdv.generate_statistics_*
functions can utilize DataflowRunner
to run Beam processing on a distributed Dataflow cluster.
To run TFDV on Google Cloud Dataflow, the TFDV library must be must be installed on the Dataflow workers. There are different ways to install additional packages on Dataflow. You are going to use the Python setup.py
file approach.
You also configure tfdv.generate_statistics_from_csv
to use the final schema created in the previous steps.
In [ ]:
%%writefile setup.py
from setuptools import setup
setup(
name='tfdv',
description='TFDV Runtime.',
version='0.1',
install_requires=[
'tensorflow_data_validation==0.15.0'
]
)
Re-generate the statistics using Dataflow and the final schema. You can monitor the job progress using Dataflow UI
In [ ]:
options = PipelineOptions()
options.view_as(GoogleCloudOptions).project = PROJECT_ID
options.view_as(GoogleCloudOptions).region = REGION
options.view_as(GoogleCloudOptions).job_name = "tfdv-{}".format(time.strftime("%Y%m%d-%H%M%S"))
options.view_as(GoogleCloudOptions).staging_location = STAGING_BUCKET + '/staging/'
options.view_as(GoogleCloudOptions).temp_location = STAGING_BUCKET + '/tmp/'
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(SetupOptions).setup_file = os.path.join(LAB_ROOT_FOLDER, 'setup.py')
stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)
train_stats = tfdv.generate_statistics_from_csv(
data_location=TRAINING_DATASET_WITH_MISSING_VALUES,
stats_options=stats_options,
pipeline_options=options,
output_path=STAGING_BUCKET + '/output/'
)
In [ ]:
tfdv.visualize_statistics(train_stats)
So far we've only been looking at the training data. It's important that our evaluation data is consistent with our training data, including that it uses the same schema. It's also important that the evaluation data includes examples of roughly the same ranges of values for our numerical features as our training data, so that our coverage of the loss surface during evaluation is roughly the same as during training. The same is true for categorical features. Otherwise, we may have training issues that are not identified during evaluation, because we didn't evaluate part of our loss surface.
You will now generate statistics for the evaluation split and visualize both training and evaluation splits on the same chart:
Slope
feature, and notice the difference in the max. Will that cause problems?
In [ ]:
stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)
eval_stats = tfdv.generate_statistics_from_csv(
data_location=EVALUATION_DATASET_WITH_ANOMALIES,
stats_options=stats_options
)
tfdv.visualize_statistics(lhs_statistics=eval_stats, rhs_statistics=train_stats,
lhs_name='EVAL DATASET', rhs_name='TRAIN_DATASET')
Does our evaluation dataset match the schema from our training dataset? This is especially important for categorical features, where we want to identify the range of acceptable values.
What would happen if we tried to evaluate using data with categorical feature values that were not in our training dataset? What about numeric features that are outside the ranges in our training dataset?
In [ ]:
anomalies = tfdv.validate_statistics(statistics=eval_stats, schema=schema)
tfdv.display_anomalies(anomalies)
It looks like we have some new values for Soil_Type
and some out-of-range values for Slope
in our evaluation data, that we didn't have in our training data. Whever it should be considered anomaly, depends on our domain knowledge of the data. If an anomaly truly indicates a data error, then the underlying data should be fixed. Otherwise, we can simply update the schema to include the values in the eval dataset.
In our case, you are going to add the 5151 value to the domain of Soil_Type
as 5151 is a valid USFS Ecological Landtype Units code representing the unspecified soil type. The out-of-range values in Slope
are data errors and should be fixed at the source.
In [ ]:
tfdv.get_domain(schema, 'Soil_Type').value.append('5151')
Re-validate with the updated schema
In [ ]:
updated_anomalies = tfdv.validate_statistics(eval_stats, schema)
tfdv.display_anomalies(updated_anomalies)
The unexpected string values error in Soil_Type
is gone but the out-of-range error in Slope
is still there. Let's pretend you have fixed the source and re-evaluate the evaluation split without corrupted Slope
.
In [ ]:
stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)
eval_stats = tfdv.generate_statistics_from_csv(
data_location=EVALUATION_DATASET,
stats_options=stats_options
)
updated_anomalies = tfdv.validate_statistics(eval_stats, schema)
tfdv.display_anomalies(updated_anomalies)
In [ ]:
tfdv.display_schema(schema=schema)
In supervised learning we need to include labels in our dataset, but when we serve the model for inference the labels will not be included. In cases like that introducing slight schema variations is necessary.
For example, in this dataset the Cover_Type
feature is included as the label for training, but it's missing in the serving data. If you validate the serving data statistics against the current schema you get an anomaly
In [ ]:
stats_options = tfdv.StatsOptions(schema=schema, infer_type_from_schema=True)
eval_stats = tfdv.generate_statistics_from_csv(
data_location=SERVING_DATASET,
stats_options=stats_options
)
serving_anomalies = tfdv.validate_statistics(eval_stats, schema)
tfdv.display_anomalies(serving_anomalies)
Environments can be used to address such scenarios. In particular, specific features in schema can be associated with specific environments.
In [ ]:
schema.default_environment.append('TRAINING')
schema.default_environment.append('SERVING')
tfdv.get_feature(schema, 'Cover_Type').not_in_environment.append('SERVING')
If you validate the serving statistics against the serving environment in schema you will not get anomaly
In [ ]:
serving_anomalies = tfdv.validate_statistics(eval_stats, schema, environment='SERVING')
tfdv.display_anomalies(serving_anomalies)
In [ ]:
output_dir = os.path.join(tempfile.mkdtemp(),'covertype_schema')
tf.io.gfile.makedirs(output_dir)
schema_file = os.path.join(output_dir, 'schema.pbtxt')
tfdv.write_schema_text(schema, schema_file)
!cat {schema_file}
In [ ]: