Drift Detection with TensorFlow Data Validation

This tutorial shows how to use TensorFlow Data Validation (TFDV) to identify and analyze different data skews in request-response serving data logged by AI Platform Prediction in BigQuery.

The tutorial has three parts:

  • Part 1: Produce baseline statistics and reference schema

    1. Download training data
    2. Compute baseline statistics from the training data
    3. Generate reference schema using the baseline statistics
  • Part 2: Detecting data skews

    1. Generate baseline statistics and reference schema from training data using TFDV
    2. Read request-response serving data from BigQuery and save it to CSV files
    3. Compute statistics from the serving data
    4. Validate serving statistics against the reference schema and basline statistics to detect anomalies (if any)
  • Part 3: Analyzing statistics and anomalies

    1. Use TFDV to visualize and display the statistics and anomalies
    2. Analyze how statistics change over time.

We use the covertype from UCI Machine Learning Repository.

The dataset is preprocessed, split, and uploaded to the gs://workshop-datasets/covertype public GCS location. We use this version of the preprocessed dataset in this notebook. For more information, see Cover Type Dataset.

We use the training data split to generate reference schema and statistics from, in order to use for validating serving data.

Setup

Install packages and dependencies


In [0]:
!pip install -U -q tensorflow
!pip install -U -q tensorflow_data_validation
!pip install -U -q pandas

In [0]:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

Configure GCP environment settings


In [0]:
PROJECT_ID = "sa-data-validation"
BUCKET = "sa-data-validation"
BQ_DATASET_NAME = 'prediction_logs'
BQ_VIEW_NAME = 'vw_covertype_classifier_logs_v1'  
MODEL_NAME = 'covertype_classifier'
MODEL_VERSION = 'v1'
!gcloud config set project $PROJECT_ID

Authenticate your GCP account

This is required if you run the notebook in Colab


In [0]:
try:
  from google.colab import auth
  auth.authenticate_user()
  print("Colab user is authenticated.")
except: pass

Import libraries


In [0]:
import os
import tensorflow as tf
import tensorflow_data_validation as tfdv
from tensorflow_metadata.proto.v0 import schema_pb2, statistics_pb2, anomalies_pb2
import apache_beam as beam
import pandas as pd
from datetime import datetime
import json
import numpy as np
import warnings

warnings.filterwarnings("ignore", category=FutureWarning)

print("TF version: {}".format(tf.__version__))
print("TFDV version: {}".format(tfdv.__version__))
print("Beam version: {}".format(beam.__version__))

Create a local workspace


In [0]:
WORKSPACE = './workspace'
DATA_DIR = os.path.join(WORKSPACE, 'data')
TRAIN_DATA = os.path.join(DATA_DIR, 'train.csv') 

if tf.io.gfile.exists(WORKSPACE):
  print("Removing previous workspace artifacts...")
  tf.io.gfile.rmtree(WORKSPACE)

print("Creating a new workspace...")
tf.io.gfile.makedirs(WORKSPACE)
tf.io.gfile.makedirs(DATA_DIR)

Part 1: Generate Baseline Statistics and Reference Schema

We use TDV to generate baseline statistics, based on the training data, as well as a reference schema, to validate the serving data against.

1. Download data


In [0]:
!gsutil cp gs://workshop-datasets/covertype/data_validation/training/dataset.csv {TRAIN_DATA}
!wc -l {TRAIN_DATA}

In [0]:
sample = pd.read_csv(TRAIN_DATA).head()
sample.T

2. Compute baseline statistics


In [0]:
baseline_stats = tfdv.generate_statistics_from_csv(
    data_location=TRAIN_DATA,
    stats_options = tfdv.StatsOptions(
        sample_count=10000
    )
)

3. Generate reference schema


In [0]:
reference_schema = tfdv.infer_schema(baseline_stats)

# Set Soil_Type to be categorical
tfdv.set_domain(reference_schema, 'Soil_Type', schema_pb2.IntDomain(
    name='Soil_Type', is_categorical=True))

# Set Cover_Type to be categorical
tfdv.set_domain(reference_schema, 'Cover_Type', schema_pb2.IntDomain(
    name='Cover_Type', is_categorical=True))

baseline_stats = tfdv.generate_statistics_from_csv(
    data_location=TRAIN_DATA,
    stats_options=tfdv.StatsOptions(
        schema=reference_schema,
        sample_count=10000
        )
    )

reference_schema = tfdv.infer_schema(baseline_stats)

# Set Soil_Type to be categorical
tfdv.set_domain(reference_schema, 'Soil_Type', schema_pb2.IntDomain(
    name='Soil_Type', is_categorical=True))

# Set Cover_Type to be categorical
tfdv.set_domain(reference_schema, 'Cover_Type', schema_pb2.IntDomain(
    name='Cover_Type', is_categorical=True))

# Set max and min values for Elevation
tfdv.set_domain(reference_schema, 
    'Elevation', 
    tfdv.utils.schema_util.schema_pb2.IntDomain(
        min=1000, 
        max=5000))

# Allow no missing values
tfdv.get_feature(reference_schema, 
    'Slope').presence.min_fraction = 1.0 

# Set distribution skew detector for Wilderness_Area
tfdv.get_feature(reference_schema, 
    'Wilderness_Area').skew_comparator.infinity_norm.threshold = 0.05

Display the reference schema


In [0]:
tfdv.display_schema(
    schema=reference_schema)

Visualize baseline statistics


In [0]:
tfdv.visualize_statistics(baseline_stats)

Part 2: Detecting Serving Data Skews

2. Export Serving Data from BigQuery

Although TFDV provides a utility function to calculate statistics on a Pandas dataframe - tfdv.generate_statistics_from_dataframe - that would simplify interactive analysis, the function does not support slicing. Since we need slicing for calculating statistics over different time windows, we will use tfdv.generate_statistics_from_csv instead.

Thus, we read the request-response serving logs from BigQuery and save the results to CSV files, in order to use tfdv.generate_statistics_from_csv.


In [0]:
TARGET_FEATURE_NAME = 'Cover_Type'
FEATURE_NAMES = [feature.name for feature in reference_schema.feature 
                 if feature.name != TARGET_FEATURE_NAME]

2.1. Read serving data from BigQuery


In [0]:
def generate_query(source, features, target, start_time, end_time):
  query = """
    SELECT 
      FORMAT_TIMESTAMP('%Y-%m-%d', time) AS time,
      {},
      predicted_class AS {}

    FROM `{}`
    WHERE time BETWEEN '{}' AND '{}'
    ;
  """.format(features, target, source, start_time, end_time)

  return query

In [0]:
start_time = '2020-05-01 00:00:00 UTC'
end_time = '2020-07-01 00:50:00 UTC'

source = "{}.{}".format(BQ_DATASET_NAME, BQ_VIEW_NAME)
features = ', '.join(FEATURE_NAMES)
query = generate_query(source, features, TARGET_FEATURE_NAME, start_time, end_time)

serving_data = pd.io.gbq.read_gbq(
    query, project_id=PROJECT_ID)

In [0]:
print(len(serving_data.index))
serving_data.head(5).T

2.2. Save serving data to CSV


In [0]:
serving_data_file = os.path.join(DATA_DIR, 'serving.csv')
serving_data.to_csv(serving_data_file, index=False)

3. Compute Statistics from Serving Data

In addition to calculating statistics for the full dataset, we also configure TFDV to calculate statistics for each time window


In [0]:
slice_fn = tfdv.get_feature_value_slicer(features={'time': None})

serving_stats_list = tfdv.generate_statistics_from_csv(
    data_location=serving_data_file,
    stats_options=tfdv.StatsOptions(
        slice_functions=[slice_fn],
        schema=reference_schema
        )
)

In [0]:
slice_keys = sorted([dataset.name for dataset in serving_stats_list.datasets])
slice_keys

4. Validate Serving Statistics


In [0]:
anomalies_list = []

for slice_key in slice_keys[1:]:
  serving_stats = tfdv.get_slice_stats(serving_stats_list, slice_key)
  anomalies = tfdv.validate_statistics(
      serving_stats, 
      schema=reference_schema, 
      previous_statistics=baseline_stats
  )
  anomalies_list.append(anomalies)

Part 2: Analyzing Serving Data Statistics and Anomalies

1. Visualize Statistics

Visualize statistics for a time window with normal data points


In [0]:
slice_key = slice_keys[1]
serving_stats = tfdv.get_slice_stats(serving_stats_list, slice_key)
tfdv.visualize_statistics(
    baseline_stats, serving_stats, 'baseline', 'current')

Visualize statistics for a time window with skewed data points


In [0]:
slice_key = slice_keys[-1]
serving_stats = tfdv.get_slice_stats(serving_stats_list, slice_key)
tfdv.visualize_statistics(
    baseline_stats, serving_stats, 'baseline', 'current')

2. Display Anomalies


In [0]:
for i, anomalies in enumerate(anomalies_list):
  tfdv.utils.anomalies_util.remove_anomaly_types(
      anomalies, [anomalies_pb2.AnomalyInfo.SCHEMA_NEW_COLUMN])
  
  print("Anomalies for  {}".format(slice_keys[i+1]), )
  tfdv.display_anomalies(anomalies)

3. Analyze Statistics Change Over time

3.1. Numerical feature means over time


In [0]:
categorical_features = [
 feature.steps()[0] 
 for feature in tfdv.utils.schema_util.get_categorical_features(
     reference_schema)
 ]

Get mean values from baseline statistics


In [0]:
baseline_means = dict()
for feature in baseline_stats.datasets[0].features:
  if feature.path.step[0] == 'time': continue
  if feature.path.step[0] not in categorical_features:
    mean = feature.num_stats.mean
    baseline_means[feature.path.step[0]] = mean

In [0]:
from collections import defaultdict

feature_means = defaultdict(list)
for slice_key in slice_keys[1:]:
  ds = tfdv.get_slice_stats(serving_stats_list, slice_key).datasets[0]
  for feature in ds.features:
    if feature.path.step[0] == 'time': continue
    if feature.path.step[0] not in categorical_features:
      mean = feature.num_stats.mean
      feature_means[feature.path.step[0]].append(mean)

In [0]:
import matplotlib.pyplot as plt

dataframe = pd.DataFrame(feature_means, index=slice_keys[1:])
num_features = len(feature_means)
ncolumns = 3
nrows = int(num_features // ncolumns) + 1

fig, axes = plt.subplots(nrows=nrows, ncols=ncolumns, figsize=(25, 25))
for i, col in enumerate(dataframe.columns[:num_features]):
  r = i // ncolumns
  c = i % ncolumns
  p = dataframe[col].plot.line(ax=axes[r][c], title=col, rot=10)
  p.hlines(baseline_means[col], xmin=0, xmax=len(dataframe.index), color='red')
  p.text(0, baseline_means[col], 'baseline mean', fontsize=15)

3.3. Categorical feature distribution over time


In [0]:
categorical_feature_stats = dict()

for feature_name in categorical_features:
  categorical_feature_stats[feature_name] = dict()

  for slice_key in slice_keys[1:]:
    categorical_feature_stats[feature_name][slice_key] = dict()
    ds = tfdv.get_slice_stats(serving_stats_list, slice_key).datasets[0]
    for feature in ds.features:
      if feature.path.step[0] == feature_name:
        val_freq = list(feature.string_stats.top_values)
        for item in val_freq:
          categorical_feature_stats[feature_name][slice_key][item.value] = item.frequency
        break

In [0]:
num_features = len(categorical_features)
ncolumns = 2
nrows = int(num_features // ncolumns) + 1

fig, axes = plt.subplots(nrows=nrows, ncols=ncolumns, figsize=(25, 15))
for i, feature_name in enumerate(categorical_features):
  dataframe = pd.DataFrame(
      categorical_feature_stats[feature_name]).T

  r = i // ncolumns
  c = i % ncolumns
  dataframe.plot.bar(ax=axes[r][c], stacked=True, rot=10)

In [0]: