This tutorial shows how to use TensorFlow Data Validation (TFDV) to identify and analyze different data skews in request-response serving data logged by AI Platform Prediction in BigQuery.
The tutorial has three parts:
Part 1: Produce baseline statistics and reference schema
Part 2: Detecting data skews
Part 3: Analyzing statistics and anomalies
We use the covertype from UCI Machine Learning Repository.
The dataset is preprocessed, split, and uploaded to the gs://workshop-datasets/covertype
public GCS location. We use this version of the preprocessed dataset in this notebook. For more information, see Cover Type Dataset.
We use the training data split to generate reference schema and statistics from, in order to use for validating serving data.
In [0]:
!pip install -U -q tensorflow
!pip install -U -q tensorflow_data_validation
!pip install -U -q pandas
In [0]:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
In [0]:
PROJECT_ID = "sa-data-validation"
BUCKET = "sa-data-validation"
BQ_DATASET_NAME = 'prediction_logs'
BQ_VIEW_NAME = 'vw_covertype_classifier_logs_v1'
MODEL_NAME = 'covertype_classifier'
MODEL_VERSION = 'v1'
!gcloud config set project $PROJECT_ID
In [0]:
try:
from google.colab import auth
auth.authenticate_user()
print("Colab user is authenticated.")
except: pass
In [0]:
import os
import tensorflow as tf
import tensorflow_data_validation as tfdv
from tensorflow_metadata.proto.v0 import schema_pb2, statistics_pb2, anomalies_pb2
import apache_beam as beam
import pandas as pd
from datetime import datetime
import json
import numpy as np
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
print("TF version: {}".format(tf.__version__))
print("TFDV version: {}".format(tfdv.__version__))
print("Beam version: {}".format(beam.__version__))
In [0]:
WORKSPACE = './workspace'
DATA_DIR = os.path.join(WORKSPACE, 'data')
TRAIN_DATA = os.path.join(DATA_DIR, 'train.csv')
if tf.io.gfile.exists(WORKSPACE):
print("Removing previous workspace artifacts...")
tf.io.gfile.rmtree(WORKSPACE)
print("Creating a new workspace...")
tf.io.gfile.makedirs(WORKSPACE)
tf.io.gfile.makedirs(DATA_DIR)
In [0]:
!gsutil cp gs://workshop-datasets/covertype/data_validation/training/dataset.csv {TRAIN_DATA}
!wc -l {TRAIN_DATA}
In [0]:
sample = pd.read_csv(TRAIN_DATA).head()
sample.T
In [0]:
baseline_stats = tfdv.generate_statistics_from_csv(
data_location=TRAIN_DATA,
stats_options = tfdv.StatsOptions(
sample_count=10000
)
)
In [0]:
reference_schema = tfdv.infer_schema(baseline_stats)
# Set Soil_Type to be categorical
tfdv.set_domain(reference_schema, 'Soil_Type', schema_pb2.IntDomain(
name='Soil_Type', is_categorical=True))
# Set Cover_Type to be categorical
tfdv.set_domain(reference_schema, 'Cover_Type', schema_pb2.IntDomain(
name='Cover_Type', is_categorical=True))
baseline_stats = tfdv.generate_statistics_from_csv(
data_location=TRAIN_DATA,
stats_options=tfdv.StatsOptions(
schema=reference_schema,
sample_count=10000
)
)
reference_schema = tfdv.infer_schema(baseline_stats)
# Set Soil_Type to be categorical
tfdv.set_domain(reference_schema, 'Soil_Type', schema_pb2.IntDomain(
name='Soil_Type', is_categorical=True))
# Set Cover_Type to be categorical
tfdv.set_domain(reference_schema, 'Cover_Type', schema_pb2.IntDomain(
name='Cover_Type', is_categorical=True))
# Set max and min values for Elevation
tfdv.set_domain(reference_schema,
'Elevation',
tfdv.utils.schema_util.schema_pb2.IntDomain(
min=1000,
max=5000))
# Allow no missing values
tfdv.get_feature(reference_schema,
'Slope').presence.min_fraction = 1.0
# Set distribution skew detector for Wilderness_Area
tfdv.get_feature(reference_schema,
'Wilderness_Area').skew_comparator.infinity_norm.threshold = 0.05
Display the reference schema
In [0]:
tfdv.display_schema(
schema=reference_schema)
Visualize baseline statistics
In [0]:
tfdv.visualize_statistics(baseline_stats)
Although TFDV provides a utility function to calculate statistics on a Pandas dataframe - tfdv.generate_statistics_from_dataframe - that would simplify interactive analysis, the function does not support slicing. Since we need slicing for calculating statistics over different time windows, we will use tfdv.generate_statistics_from_csv
instead.
Thus, we read the request-response serving logs from BigQuery and save the results to CSV files, in order to use tfdv.generate_statistics_from_csv
.
In [0]:
TARGET_FEATURE_NAME = 'Cover_Type'
FEATURE_NAMES = [feature.name for feature in reference_schema.feature
if feature.name != TARGET_FEATURE_NAME]
In [0]:
def generate_query(source, features, target, start_time, end_time):
query = """
SELECT
FORMAT_TIMESTAMP('%Y-%m-%d', time) AS time,
{},
predicted_class AS {}
FROM `{}`
WHERE time BETWEEN '{}' AND '{}'
;
""".format(features, target, source, start_time, end_time)
return query
In [0]:
start_time = '2020-05-01 00:00:00 UTC'
end_time = '2020-07-01 00:50:00 UTC'
source = "{}.{}".format(BQ_DATASET_NAME, BQ_VIEW_NAME)
features = ', '.join(FEATURE_NAMES)
query = generate_query(source, features, TARGET_FEATURE_NAME, start_time, end_time)
serving_data = pd.io.gbq.read_gbq(
query, project_id=PROJECT_ID)
In [0]:
print(len(serving_data.index))
serving_data.head(5).T
In [0]:
serving_data_file = os.path.join(DATA_DIR, 'serving.csv')
serving_data.to_csv(serving_data_file, index=False)
In [0]:
slice_fn = tfdv.get_feature_value_slicer(features={'time': None})
serving_stats_list = tfdv.generate_statistics_from_csv(
data_location=serving_data_file,
stats_options=tfdv.StatsOptions(
slice_functions=[slice_fn],
schema=reference_schema
)
)
In [0]:
slice_keys = sorted([dataset.name for dataset in serving_stats_list.datasets])
slice_keys
In [0]:
anomalies_list = []
for slice_key in slice_keys[1:]:
serving_stats = tfdv.get_slice_stats(serving_stats_list, slice_key)
anomalies = tfdv.validate_statistics(
serving_stats,
schema=reference_schema,
previous_statistics=baseline_stats
)
anomalies_list.append(anomalies)
Visualize statistics for a time window with normal data points
In [0]:
slice_key = slice_keys[1]
serving_stats = tfdv.get_slice_stats(serving_stats_list, slice_key)
tfdv.visualize_statistics(
baseline_stats, serving_stats, 'baseline', 'current')
Visualize statistics for a time window with skewed data points
In [0]:
slice_key = slice_keys[-1]
serving_stats = tfdv.get_slice_stats(serving_stats_list, slice_key)
tfdv.visualize_statistics(
baseline_stats, serving_stats, 'baseline', 'current')
In [0]:
for i, anomalies in enumerate(anomalies_list):
tfdv.utils.anomalies_util.remove_anomaly_types(
anomalies, [anomalies_pb2.AnomalyInfo.SCHEMA_NEW_COLUMN])
print("Anomalies for {}".format(slice_keys[i+1]), )
tfdv.display_anomalies(anomalies)
In [0]:
categorical_features = [
feature.steps()[0]
for feature in tfdv.utils.schema_util.get_categorical_features(
reference_schema)
]
Get mean values from baseline statistics
In [0]:
baseline_means = dict()
for feature in baseline_stats.datasets[0].features:
if feature.path.step[0] == 'time': continue
if feature.path.step[0] not in categorical_features:
mean = feature.num_stats.mean
baseline_means[feature.path.step[0]] = mean
In [0]:
from collections import defaultdict
feature_means = defaultdict(list)
for slice_key in slice_keys[1:]:
ds = tfdv.get_slice_stats(serving_stats_list, slice_key).datasets[0]
for feature in ds.features:
if feature.path.step[0] == 'time': continue
if feature.path.step[0] not in categorical_features:
mean = feature.num_stats.mean
feature_means[feature.path.step[0]].append(mean)
In [0]:
import matplotlib.pyplot as plt
dataframe = pd.DataFrame(feature_means, index=slice_keys[1:])
num_features = len(feature_means)
ncolumns = 3
nrows = int(num_features // ncolumns) + 1
fig, axes = plt.subplots(nrows=nrows, ncols=ncolumns, figsize=(25, 25))
for i, col in enumerate(dataframe.columns[:num_features]):
r = i // ncolumns
c = i % ncolumns
p = dataframe[col].plot.line(ax=axes[r][c], title=col, rot=10)
p.hlines(baseline_means[col], xmin=0, xmax=len(dataframe.index), color='red')
p.text(0, baseline_means[col], 'baseline mean', fontsize=15)
In [0]:
categorical_feature_stats = dict()
for feature_name in categorical_features:
categorical_feature_stats[feature_name] = dict()
for slice_key in slice_keys[1:]:
categorical_feature_stats[feature_name][slice_key] = dict()
ds = tfdv.get_slice_stats(serving_stats_list, slice_key).datasets[0]
for feature in ds.features:
if feature.path.step[0] == feature_name:
val_freq = list(feature.string_stats.top_values)
for item in val_freq:
categorical_feature_stats[feature_name][slice_key][item.value] = item.frequency
break
In [0]:
num_features = len(categorical_features)
ncolumns = 2
nrows = int(num_features // ncolumns) + 1
fig, axes = plt.subplots(nrows=nrows, ncols=ncolumns, figsize=(25, 15))
for i, feature_name in enumerate(categorical_features):
dataframe = pd.DataFrame(
categorical_feature_stats[feature_name]).T
r = i // ncolumns
c = i % ncolumns
dataframe.plot.bar(ax=axes[r][c], stacked=True, rot=10)
In [0]: