Computing Drift Score using Novelty Detection Modeling

This tutorial shows how to use a novelty detection model to detect skews between data split (e.g. training and serving). Novelty detection models can identify whether an instance blongs to a population, or is considered as an outlier.

The tutorial covers the following steps:

  1. Download training and serving data splits
  2. Train an Elliptic Envelope model using the training data
  3. Test the model on normal and mutated datasets
  4. Implement an Apache Beam pipeline to compute drift score in request-response BigQuery data
  5. Run the pipeline and display drift detection output

Setup

Install required packages


In [ ]:
!pip install -U -q apache-beam[interactive]
!pip install -U -q pandas
!pip install -U -q sklearn

In [ ]:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)

Import libraries


In [ ]:
import os
from tensorflow import io as tf_io
import apache_beam as beam
import pandas as pd
import numpy as np
import warnings
import json
from collections import namedtuple

print("Apache Beam version: {}".format(beam.__version__))

Configure GCP environment settings


In [ ]:
PROJECT_ID = "sa-data-validation"
BUCKET = "sa-data-validation"
BQ_DATASET_NAME = 'prediction_logs'
BQ_TABLE_NAME = 'covertype_classifier_logs'  
MODEL_NAME = 'covertype_classifier'
MODEL_VERSION = 'v1'
!gcloud config set project $PROJECT_ID

Authenticate your GCP account

This is required if you run the notebook in Colab


In [ ]:
try:
  from google.colab import auth
  auth.authenticate_user()
  print("Colab user is authenticated.")
except: pass

Create a local workspace


In [ ]:
GCS_DATA_LOCATION = 'gs://workshop-datasets/covertype/data_validation'
WORKSPACE = './workspace'
DATA_DIR = os.path.join(WORKSPACE, 'data')
TRAIN_DATA = os.path.join(DATA_DIR, 'train.csv') 
EVAL_DATA = os.path.join(DATA_DIR, 'eval.csv') 
MODELS_DIR = os.path.join(WORKSPACE, 'models')

In [ ]:
if tf_io.gfile.exists(WORKSPACE):
  print("Removing previous workspace artifacts...")
  tf_io.gfile.rmtree(WORKSPACE)

print("Creating a new workspace...")
tf_io.gfile.makedirs(WORKSPACE)
tf_io.gfile.makedirs(DATA_DIR)
tf_io.gfile.makedirs(MODELS_DIR)

1. Download Data Splits

We use the covertype from UCI Machine Learning Repository. The task is to Predict forest cover type from cartographic variables only.

The dataset is preprocessed, split, and uploaded to the gs://workshop-datasets/covertype public GCS location.

We use this version of the preprocessed dataset in this notebook. For more information, see Cover Type Dataset


In [ ]:
!gsutil cp gs://workshop-datasets/covertype/data_validation/training/dataset.csv {TRAIN_DATA}
!gsutil cp gs://workshop-datasets/covertype/data_validation/evaluation/dataset.csv {EVAL_DATA}
!wc -l {TRAIN_DATA}
!wc -l {EVAL_DATA}

In [ ]:
pd.read_csv(TRAIN_DATA).head().T

2. Train an Elliptic Envelope Model using Training Data

2.1. Define metadata


In [ ]:
TARGET_FEATURE_NAME = 'Cover_Type'

CATEGORICAL_FEATURE_NAMES = [
    'Soil_Type',
    'Wilderness_Area'
]

2.2. Prepare the data


In [ ]:
train_data = pd.read_csv(TRAIN_DATA).drop(TARGET_FEATURE_NAME, axis=1)

In [ ]:
from sklearn.preprocessing import OneHotEncoder

encoders = dict()

for feature_name in CATEGORICAL_FEATURE_NAMES:
  encoder = OneHotEncoder(handle_unknown='ignore')
  encoder.fit(train_data[[feature_name]])
  encoders[feature_name] = encoder

In [ ]:
def prepare_data(data_frame):

  if type(data_frame) != pd.DataFrame:
    data_frame = pd.DataFrame(data_frame)
  
  data_frame = data_frame.reset_index()
  for feature_name, encoder in encoders.items():
    encoded_feature = pd.DataFrame(
      encoder.transform(data_frame[[feature_name]]).toarray()
    )
    data_frame = data_frame.drop(feature_name, axis=1)
    encoded_feature.columns = [feature_name+"_"+str(column) 
                               for column in encoded_feature.columns]
    data_frame = data_frame.join(encoded_feature)
  
  return data_frame

2.3. Fit the model


In [ ]:
prepared_training_data = prepare_data(train_data)

In [ ]:
import time
from sklearn.covariance import EllipticEnvelope

model = EllipticEnvelope(contamination=0.)

print("Fitting...")
t0 = time.time()
model.fit(prepared_training_data)
t1 = time.time()
print("Model is fitted in {} seconds.".format(round(t1-t0)))

In [ ]:
import statistics

training_distances = model.mahalanobis(prepared_training_data)
model._mean = statistics.mean(training_distances)
model._stdv = statistics.stdev(training_distances)
print("training distance mean: {}".format(round(model._mean, 5))) 
print("training distance stdv: {}".format(round(model._stdv, 5)))

3. Test the Elliptic Envelope Model


In [ ]:
def compute_drift_score(model, data_frame, stdv_units=2):
  
  distances = model.mahalanobis(data_frame)
  threshold = model._mean + (stdv_units * model._stdv)
  score = len([v for v in distances if v >= threshold]) / len(data_frame.index)
  
  return score

3.1. Generate mutated serving data

We are going to generate a dataset with mutated data points, by shuffling each column values accross the rows, creating rows with random combination of feature values.

This method makes sure that the values of each feature, independently, follows the distribution of the original serving data. However, the joint distribution is completely different, since we generate feature values independetly.


In [ ]:
serving_data = pd.read_csv(EVAL_DATA).drop('Cover_Type', axis=1)

In [ ]:
def shuffle_values(dataframe):     
  shuffeld_dataframe = dataframe.copy()
  for column_name in dataframe.columns:
    shuffeld_dataframe[column_name] = shuffeld_dataframe[column_name].sample(
        frac=1.0).reset_index(drop=True)

  return shuffeld_dataframe

mutated_serving_data = shuffle_values(serving_data)
mutated_serving_data.head().T

3.2. Use the model to score data for drift

3.2. Compute the drift score on normal data


In [ ]:
stdv_units = 2
prepared_serving_data = prepare_data(serving_data)
score = compute_drift_score(model, prepared_serving_data, stdv_units)
percentage = round(score *100, 2)
print("There is {}% of the data points more than {} standard deviation units away from the mean of the training data".format(percentage, stdv_units))

3.3. Compute the drift score on normal data


In [ ]:
prepared_mutated_data = prepare_data(mutated_serving_data)
score = compute_drift_score(model, prepared_mutated_data, stdv_units)
percentage = round(score *100, 2)
print("There is {}% of the data points more than {} standard deviation units away from the mean of the training data".format(percentage, stdv_units))

4: Implement an Apache Beam pipeline to compute drift score in request-response BigQuery data

This pipeline performs the following steps:

  1. Reads and parses the data from request-response logs table in BigQuery
  2. Use the Elliptic Envelope novelty detection model to identify outliers
  3. Compute the percentage of the data points detected as outliers as the drift score

4.1. Prepare helper functions


In [ ]:
from collections import defaultdict

def parse_batch_data(log_records):
  data_dict = defaultdict(list)

  for log_record in log_records:
    raw_data = json.loads(log_record['raw_data'])
    for raw_instance in raw_data['instances']:
      for name, value in raw_instance.items():
        data_dict[name].append(value[0])

  return data_dict


def score_data(data, model, stdv_units=2):
  distances = model.mahalanobis(data)
  threshold = model._mean + (stdv_units * model._stdv)
  outlier_count = len([v for v in distances if v >= threshold])
  records_count = len(data)
  return {'outlier_count': outlier_count, 'records_count': records_count}


def aggregate_scores(items):
  outlier_count = 0 
  records_count = 0
  for item in items:
    outlier_count += item['outlier_count']
    records_count += item['records_count']
  return {'outlier_count': outlier_count, 'records_count': records_count}

In [ ]:
def get_query(bq_table_fullname, model_name, model_version, start_time, end_time):
  query = """
  SELECT raw_data
  FROM {}
  WHERE model = '{}'
  AND model_version = '{}'
  """.format(bq_table_fullname, model_name, model_version, start_time, end_time)

  return query

4.2. Implement Beam pipeline


In [ ]:
def run_pipeline(args):

  options = beam.options.pipeline_options.PipelineOptions(**args)
  args = namedtuple("options", args.keys())(*args.values())
  query = get_query(
      args.bq_table_fullname, args.model_name, 
      args.model_version, 
      args.start_time, 
      args.end_time
  )

  print("Starting the Beam pipeline...")
  with beam.Pipeline(options=options) as pipeline:
    (
        pipeline 
        | 'ReadBigQueryData' >> beam.io.Read(
            beam.io.BigQuerySource(query=query, use_standard_sql=True))
        | 'BatchRecords' >> beam.BatchElements(
            min_batch_size=100, max_batch_size=1000)
        | 'InstancesToBeamExamples' >> beam.Map(parse_batch_data)
        | 'PrepareData' >> beam.Map(prepare_data)
        | 'ScoreData' >> beam.Map(
            lambda data: score_data(data, args.drift_model, stdv_units=1))
        | 'CombineResults' >> beam.CombineGlobally(aggregate_scores)
        | 'ComputeRatio' >> beam.Map(
            lambda result: {
                "outlier_count": result['outlier_count'], 
                "records_count": result['records_count'],
                "drift_ratio": result['outlier_count'] / result['records_count']
                })
         | 'WriteOutput' >> beam.io.WriteToText(
             file_path_prefix=args.output_file_path, num_shards=1, shard_name_template='')
    )

5. Run Pipeline and Display Drift Detection Output

5.1. Configure pipeline parameter settings


In [ ]:
from datetime import datetime

job_name = 'drift-detection-{}'.format(
    datetime.utcnow().strftime('%y%m%d-%H%M%S'))
bq_table_fullname = "{}.{}.{}".format(
    PROJECT_ID, BQ_DATASET_NAME, BQ_TABLE_NAME)
runner = 'InteractiveRunner'
output_dir = os.path.join(WORKSPACE, 'output')
output_path = os.path.join(output_dir, 'drift_output.json')
start_time = '2020-07-05 00:00:00 UTC'
end_time = '2020-07-06 23:59:59 UTC'

args = {
    'job_name': job_name,
    'runner': runner,
    'bq_table_fullname': bq_table_fullname,
    'model_name': MODEL_NAME,
    'model_version': MODEL_VERSION,
    'start_time': start_time,
    'end_time': end_time,
    'output_file_path': output_path,
    'project': PROJECT_ID,
    'reference_schema': reference_schema,
    'drift_model': model
}

5.2. Run the pipeline


In [ ]:
!rm -r {output_dir}

print("Running pipeline...")
%time run_pipeline(args)
print("Pipeline is done.")

In [ ]:
!ls {output_dir}

5.3. Display the drift detection output


In [ ]:
dirft_results = json.loads(open(output_path).read()).items()
for key, value in dirft_results:
  if key == 'drift_ratio':
    value = str(round(value * 100, 2)) +'%'
  print(key,':', value)