This tutorial shows how to use a novelty detection model to detect skews between data split (e.g. training and serving). Novelty detection models can identify whether an instance blongs to a population, or is considered as an outlier.
The tutorial covers the following steps:
In [ ]:
!pip install -U -q apache-beam[interactive]
!pip install -U -q pandas
!pip install -U -q sklearn
In [ ]:
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
In [ ]:
import os
from tensorflow import io as tf_io
import apache_beam as beam
import pandas as pd
import numpy as np
import warnings
import json
from collections import namedtuple
print("Apache Beam version: {}".format(beam.__version__))
In [ ]:
PROJECT_ID = "sa-data-validation"
BUCKET = "sa-data-validation"
BQ_DATASET_NAME = 'prediction_logs'
BQ_TABLE_NAME = 'covertype_classifier_logs'
MODEL_NAME = 'covertype_classifier'
MODEL_VERSION = 'v1'
!gcloud config set project $PROJECT_ID
In [ ]:
try:
from google.colab import auth
auth.authenticate_user()
print("Colab user is authenticated.")
except: pass
In [ ]:
GCS_DATA_LOCATION = 'gs://workshop-datasets/covertype/data_validation'
WORKSPACE = './workspace'
DATA_DIR = os.path.join(WORKSPACE, 'data')
TRAIN_DATA = os.path.join(DATA_DIR, 'train.csv')
EVAL_DATA = os.path.join(DATA_DIR, 'eval.csv')
MODELS_DIR = os.path.join(WORKSPACE, 'models')
In [ ]:
if tf_io.gfile.exists(WORKSPACE):
print("Removing previous workspace artifacts...")
tf_io.gfile.rmtree(WORKSPACE)
print("Creating a new workspace...")
tf_io.gfile.makedirs(WORKSPACE)
tf_io.gfile.makedirs(DATA_DIR)
tf_io.gfile.makedirs(MODELS_DIR)
We use the covertype from UCI Machine Learning Repository. The task is to Predict forest cover type from cartographic variables only.
The dataset is preprocessed, split, and uploaded to the gs://workshop-datasets/covertype
public GCS location.
We use this version of the preprocessed dataset in this notebook. For more information, see Cover Type Dataset
In [ ]:
!gsutil cp gs://workshop-datasets/covertype/data_validation/training/dataset.csv {TRAIN_DATA}
!gsutil cp gs://workshop-datasets/covertype/data_validation/evaluation/dataset.csv {EVAL_DATA}
!wc -l {TRAIN_DATA}
!wc -l {EVAL_DATA}
In [ ]:
pd.read_csv(TRAIN_DATA).head().T
In [ ]:
TARGET_FEATURE_NAME = 'Cover_Type'
CATEGORICAL_FEATURE_NAMES = [
'Soil_Type',
'Wilderness_Area'
]
In [ ]:
train_data = pd.read_csv(TRAIN_DATA).drop(TARGET_FEATURE_NAME, axis=1)
In [ ]:
from sklearn.preprocessing import OneHotEncoder
encoders = dict()
for feature_name in CATEGORICAL_FEATURE_NAMES:
encoder = OneHotEncoder(handle_unknown='ignore')
encoder.fit(train_data[[feature_name]])
encoders[feature_name] = encoder
In [ ]:
def prepare_data(data_frame):
if type(data_frame) != pd.DataFrame:
data_frame = pd.DataFrame(data_frame)
data_frame = data_frame.reset_index()
for feature_name, encoder in encoders.items():
encoded_feature = pd.DataFrame(
encoder.transform(data_frame[[feature_name]]).toarray()
)
data_frame = data_frame.drop(feature_name, axis=1)
encoded_feature.columns = [feature_name+"_"+str(column)
for column in encoded_feature.columns]
data_frame = data_frame.join(encoded_feature)
return data_frame
In [ ]:
prepared_training_data = prepare_data(train_data)
In [ ]:
import time
from sklearn.covariance import EllipticEnvelope
model = EllipticEnvelope(contamination=0.)
print("Fitting...")
t0 = time.time()
model.fit(prepared_training_data)
t1 = time.time()
print("Model is fitted in {} seconds.".format(round(t1-t0)))
In [ ]:
import statistics
training_distances = model.mahalanobis(prepared_training_data)
model._mean = statistics.mean(training_distances)
model._stdv = statistics.stdev(training_distances)
print("training distance mean: {}".format(round(model._mean, 5)))
print("training distance stdv: {}".format(round(model._stdv, 5)))
In [ ]:
def compute_drift_score(model, data_frame, stdv_units=2):
distances = model.mahalanobis(data_frame)
threshold = model._mean + (stdv_units * model._stdv)
score = len([v for v in distances if v >= threshold]) / len(data_frame.index)
return score
We are going to generate a dataset with mutated data points, by shuffling each column values accross the rows, creating rows with random combination of feature values.
This method makes sure that the values of each feature, independently, follows the distribution of the original serving data. However, the joint distribution is completely different, since we generate feature values independetly.
In [ ]:
serving_data = pd.read_csv(EVAL_DATA).drop('Cover_Type', axis=1)
In [ ]:
def shuffle_values(dataframe):
shuffeld_dataframe = dataframe.copy()
for column_name in dataframe.columns:
shuffeld_dataframe[column_name] = shuffeld_dataframe[column_name].sample(
frac=1.0).reset_index(drop=True)
return shuffeld_dataframe
mutated_serving_data = shuffle_values(serving_data)
mutated_serving_data.head().T
In [ ]:
stdv_units = 2
prepared_serving_data = prepare_data(serving_data)
score = compute_drift_score(model, prepared_serving_data, stdv_units)
percentage = round(score *100, 2)
print("There is {}% of the data points more than {} standard deviation units away from the mean of the training data".format(percentage, stdv_units))
In [ ]:
prepared_mutated_data = prepare_data(mutated_serving_data)
score = compute_drift_score(model, prepared_mutated_data, stdv_units)
percentage = round(score *100, 2)
print("There is {}% of the data points more than {} standard deviation units away from the mean of the training data".format(percentage, stdv_units))
This pipeline performs the following steps:
In [ ]:
from collections import defaultdict
def parse_batch_data(log_records):
data_dict = defaultdict(list)
for log_record in log_records:
raw_data = json.loads(log_record['raw_data'])
for raw_instance in raw_data['instances']:
for name, value in raw_instance.items():
data_dict[name].append(value[0])
return data_dict
def score_data(data, model, stdv_units=2):
distances = model.mahalanobis(data)
threshold = model._mean + (stdv_units * model._stdv)
outlier_count = len([v for v in distances if v >= threshold])
records_count = len(data)
return {'outlier_count': outlier_count, 'records_count': records_count}
def aggregate_scores(items):
outlier_count = 0
records_count = 0
for item in items:
outlier_count += item['outlier_count']
records_count += item['records_count']
return {'outlier_count': outlier_count, 'records_count': records_count}
In [ ]:
def get_query(bq_table_fullname, model_name, model_version, start_time, end_time):
query = """
SELECT raw_data
FROM {}
WHERE model = '{}'
AND model_version = '{}'
""".format(bq_table_fullname, model_name, model_version, start_time, end_time)
return query
In [ ]:
def run_pipeline(args):
options = beam.options.pipeline_options.PipelineOptions(**args)
args = namedtuple("options", args.keys())(*args.values())
query = get_query(
args.bq_table_fullname, args.model_name,
args.model_version,
args.start_time,
args.end_time
)
print("Starting the Beam pipeline...")
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| 'ReadBigQueryData' >> beam.io.Read(
beam.io.BigQuerySource(query=query, use_standard_sql=True))
| 'BatchRecords' >> beam.BatchElements(
min_batch_size=100, max_batch_size=1000)
| 'InstancesToBeamExamples' >> beam.Map(parse_batch_data)
| 'PrepareData' >> beam.Map(prepare_data)
| 'ScoreData' >> beam.Map(
lambda data: score_data(data, args.drift_model, stdv_units=1))
| 'CombineResults' >> beam.CombineGlobally(aggregate_scores)
| 'ComputeRatio' >> beam.Map(
lambda result: {
"outlier_count": result['outlier_count'],
"records_count": result['records_count'],
"drift_ratio": result['outlier_count'] / result['records_count']
})
| 'WriteOutput' >> beam.io.WriteToText(
file_path_prefix=args.output_file_path, num_shards=1, shard_name_template='')
)
In [ ]:
from datetime import datetime
job_name = 'drift-detection-{}'.format(
datetime.utcnow().strftime('%y%m%d-%H%M%S'))
bq_table_fullname = "{}.{}.{}".format(
PROJECT_ID, BQ_DATASET_NAME, BQ_TABLE_NAME)
runner = 'InteractiveRunner'
output_dir = os.path.join(WORKSPACE, 'output')
output_path = os.path.join(output_dir, 'drift_output.json')
start_time = '2020-07-05 00:00:00 UTC'
end_time = '2020-07-06 23:59:59 UTC'
args = {
'job_name': job_name,
'runner': runner,
'bq_table_fullname': bq_table_fullname,
'model_name': MODEL_NAME,
'model_version': MODEL_VERSION,
'start_time': start_time,
'end_time': end_time,
'output_file_path': output_path,
'project': PROJECT_ID,
'reference_schema': reference_schema,
'drift_model': model
}
In [ ]:
!rm -r {output_dir}
print("Running pipeline...")
%time run_pipeline(args)
print("Pipeline is done.")
In [ ]:
!ls {output_dir}
In [ ]:
dirft_results = json.loads(open(output_path).read()).items()
for key, value in dirft_results:
if key == 'drift_ratio':
value = str(round(value * 100, 2)) +'%'
print(key,':', value)