Analyzing and Validating Data using TensorFlow Data Validation (TFDV)

Building a successful machine learning (ML) system involves more than training a model. In this two-part article discusses the role of TensorFlow Data Validation (TFDV) library in performing data exploration and descriptive analytics during experimentation, as well as in validating the incoming data for training or prediction during production.

This tutorial shows you step-by-step how to use TFDV to analyze and validate data for ML on Google Cloud Platform (GCP).

The objective of this tutorial is to:

  1. Exrtact data from BigQuery to GCS.
  2. Generate statistics from the data using TFDV.
  3. Explore and visualise the data statistics.
  4. Generate a Schema for the data using TFDV.
  5. Extract new data from BigQuery.
  6. Validate the new data using the generated Schema.

Install requirements

Install the required packages for the tutorial:


In [ ]:
!pip install -U tensorflow-data-validation==0.11.0
!pip install -U apache-beam[gcp]==2.8.0
!pip install -U google_cloud_bigquery==1.18.0
!pip install -U python-snappy==0.5.4

In [3]:
import os
import tensorflow as tf 
import apache_beam as beam 
import tensorflow_data_validation as tfdv
from google.cloud import bigquery
from datetime import datetime

Verify the version of the installed packages:


In [4]:
print "TF version:", tf.__version__
print "TFDV version:", tfdv.__version__
print "Beam version:", beam.__version__
print "BQ SDK version:", bigquery.__version__


TF version: 1.14.0
TFDV version: 0.11.0
Beam version: 2.8.0
BQ SDK version: 1.18.0

Setup

To get started, set your GCP PROJECT_ID, BUCKET_NAME, and REGION to the following variables. Create a GCP Project if you don't have one. Create a regional Cloud Storage bucket if you don't have one.


In [5]:
LOCAL = True # Change to false to run on the GCP

PROJECT_ID = 'validateflow' # Set your GCP Project Id
BUCKET_NAME = 'validateflow' # Set your Bucket name
REGION = 'europe-west1' # Set the region for Dataflow jobs

ROOT = './tfdv' if LOCAL else 'gs://{}/tfdv'.format(BUCKET_NAME)

DATA_DIR = ROOT + '/data/' # Location to store data
SCHEMA_DIR = ROOT + '/schema/' # Location to store data schema 
STATS_DIR = ROOT +'/stats/' # Location to store stats 
STAGING_DIR = ROOT + '/job/staging/' # Dataflow staging directory on GCP
TEMP_DIR =  ROOT + '/job/temp/' # Dataflow temporary directory on GCP

Cleanup working directory...


In [6]:
if tf.gfile.Exists(ROOT):
    print("Removing {} contents...".format(ROOT))
    tf.gfile.DeleteRecursively(ROOT)

print("Creating working directory: {}".format(ROOT))
tf.gfile.MkDir(ROOT)


Removing ./tfdv contents...
Creating working directory: ./tfdv

Dataset

In this tutorial, we will use the flights data table, which is a publically available sample data in BigQuery.

The table has more than 70 million records on internal US flights, including information on date, airlline, departure airport, arrival airport, departure schedule, actual departure time, arrival schedule, and actual arrival time.

You can use the BigQuery to explore the data, or you can run the following cell, which counts the number of flights by year.


In [ ]:
%%bigquery 

SELECT
    EXTRACT(YEAR FROM CAST(date as DATE)) as year,
    COUNT(*) as flight_count
FROM 
    `bigquery-samples.airline_ontime_data.flights`
GROUP BY
    year
ORDER BY 
    year DESC

We have data from 2002 to 2012. The dataset is ~8GB, which might be too big to store into memory for exploration. However, you can use TFDV to peform the data crunching on GCP at scale using Cloud Dataflow, to produce the statistics that can be easily loaded into memory, visualized and analzyed.

1. Extract the data from BigQuery to GCS

In this step, we will extract the data we want to analyze from BigQuery, convert it to TFRecord files, and store the data files to Cloud Storage (GCS). This data file in GCS will then be used by TFDV. We are going to use Apache Beam to accomplish this.

Let's say that you use this dataset to estimate the arrival delay of a particular flight using ML. Note that, in this tutorial, we are not focusing on building the model, rather we are focusing on analyzing and validating the data changes over time. We are going to use data in 2010-2011 to generate the schema, while validating data in 2012 to identify anomalies.

Note that, in more realistic scenarios, new flights data arrives on daily or weekly basis to your data warehouse, and you would validate this day-worth of data against the schema. The purpose of this example to show how this can be done at scale (using year-worth of data) to identify anomalies.

The data will be extracted with the following columns:

  • fligt_date: The scheduled flight date
  • flight_month: The scheduled flight abbreviated month name
  • flight_day: The scheduled flight day of month
  • flight_week_of_day: The scheduled flight abbreviated week day name
  • airline: Abbreviated airline name
  • departure_airport: Abbreviated departure airport
  • arrival_airport: Abbreviated arrival airport
  • departure_hour: departure hour
  • departure_minute: departure hour
  • departure_time_slot: (6am - 9am), (9am - 12pm), (12pm - 3pm), (3pm - 6pm), (6pm - 9pm), (9pm - 12am), (12am - 6am)
  • departure_delay: departure delay (in minutes)
  • arrival_delay: arrival delay (in seconds)

Implementing the source query


In [7]:
def generate_query(date_from=None, date_to=None, limit=None):
    query ="""
        SELECT 
          CAST(date AS DATE) AS flight_date, 
          FORMAT_DATE('%b',  CAST(date AS DATE)) AS flight_month, 
          EXTRACT(DAY FROM CAST(date AS DATE)) AS flight_day, 
          FORMAT_DATE('%a',  CAST(date AS DATE)) AS flight_day_of_week, 
          airline,
          departure_airport,
          arrival_airport, 
          CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 0, 2) AS INT64) AS departure_schedule_hour, 
          CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 3, 2) AS INT64) AS departure_schedule_minute, 
          CASE 
            WHEN departure_schedule BETWEEN 600 AND 900 THEN '[6:00am - 9:00am]'
            WHEN departure_schedule BETWEEN 900 AND 1200 THEN '[9:00am - 12:pm]'
            WHEN departure_schedule BETWEEN 1200 AND 1500 THEN '[12:00pm - 3:00pm]'
            WHEN departure_schedule BETWEEN 1500 AND 1800 THEN '[3:00pm - 6:00pm]'
            WHEN departure_schedule BETWEEN 1800 AND 2100 THEN '[6:00pm - 9:00pm]'
            WHEN departure_schedule BETWEEN 2100 AND 2400 THEN '[9:00pm - 12:00am]'
            ELSE '[12:00am - 6:00am]'
          END AS departure_time_slot,
          departure_delay,
          arrival_delay
        FROM 
          `bigquery-samples.airline_ontime_data.flights`
        """
    if date_from:
        query += "WHERE CAST(date as DATE) >= CAST('{}' as DATE) \n".format(date_from)
        if date_to:
            query += "AND CAST(date as DATE) < CAST('{}' as DATE) \n".format(date_to)
    elif date_to:
        query += "WHERE CAST(date as DATE) < CAST('{}' as DATE) \n".format(date_to)
    
    if limit:
        query  += "LIMIT {}".format(limit)
        
    return query

You can run the following cell to see a sample of the data to be extracted...


In [ ]:
%%bigquery

SELECT 
    CAST(date AS DATE) AS flight_date, 
    FORMAT_DATE('%b',  CAST(date AS DATE)) AS flight_month, 
    EXTRACT(DAY FROM CAST(date AS DATE)) AS flight_day, 
    FORMAT_DATE('%a',  CAST(date AS DATE)) AS flight_day_of_week, 
    airline,
    departure_airport,
    arrival_airport,
    CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 0, 2) AS INT64) AS departure_schedule_hour, 
    CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 3, 2) AS INT64) AS departure_schedule_minute, 
    CASE 
        WHEN departure_schedule BETWEEN 600 AND 900 THEN '[6:00am - 9:00am]'
        WHEN departure_schedule BETWEEN 900 AND 1200 THEN '[9:00am - 12:pm]'
        WHEN departure_schedule BETWEEN 1200 AND 1500 THEN '[12:00pm - 3:00pm]'
        WHEN departure_schedule BETWEEN 1500 AND 1800 THEN '[3:00pm - 6:00pm]'
        WHEN departure_schedule BETWEEN 1800 AND 2100 THEN '[6:00pm - 9:00pm]'
        WHEN departure_schedule BETWEEN 2100 AND 2400 THEN '[9:00pm - 12:00am]'
        ELSE '[12:00am - 6:00am]'
    END AS departure_time_slot,
    departure_delay,
    arrival_delay
FROM 
    `bigquery-samples.airline_ontime_data.flights`
LIMIT 5

Implementing helper functions


In [8]:
def get_type_map(query):
    bq_client = bigquery.Client()
    query_job = bq_client.query("SELECT * FROM ({}) LIMIT 0".format(query))
    results = query_job.result()
    type_map = {}
    for field in results.schema:
        type_map[field.name] = field.field_type
    
    return type_map

def row_to_example(instance, type_map):
    feature = {}
    for key, value in instance.items():
        data_type = type_map[key]
        if value is None:
            feature[key] = tf.train.Feature()
        elif data_type == 'INTEGER':
            feature[key] = tf.train.Feature(
                int64_list=tf.train.Int64List(value=[value]))
        elif data_type == 'FLOAT':
            feature[key] = tf.train.Feature(
                float_list=tf.train.FloatList(value=[value]))
        else:
            feature[key] = tf.train.Feature(
                bytes_list=tf.train.BytesList(value=[tf.compat.as_bytes(value)]))
            
    return tf.train.Example(features=tf.train.Features(feature=feature))

Implementing the pipeline


In [9]:
def run_pipeline(args):

    source_query = args.pop('source_query')
    sink_data_location = args.pop('sink_data_location')
    runner = args['runner']
    
    pipeline_options = beam.options.pipeline_options.GoogleCloudOptions(**args)
    print(pipeline_options)
    
    with beam.Pipeline(runner, options=pipeline_options) as pipeline:
        (pipeline 
         | "Read from BigQuery">> beam.io.Read(beam.io.BigQuerySource(query = source_query, use_standard_sql = True))
         | 'Convert to tf Example' >> beam.Map(lambda instance: row_to_example(instance, type_map))
         | 'Serialize to String' >> beam.Map(lambda example: example.SerializeToString(deterministic=True))
         | "Write as TFRecords to GCS" >> beam.io.WriteToTFRecord(
                    file_path_prefix = sink_data_location+"extract", 
                    file_name_suffix=".tfrecords")
        )

Run the pipeline


In [10]:
runner = 'DirectRunner' if LOCAL else 'DataflowRunner'
job_name = 'tfdv-flights-data-extraction-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))
date_from =  '2010-01-01'
date_to = '2011-12-31'
data_location = os.path.join(DATA_DIR, 
        "{}-{}/".format(date_from.replace('-',''), date_to.replace('-','')))
print("Data will be extracted to: {}".format(data_location))

print("Generating source query...")
limit = 100000 if LOCAL else None
source_query = generate_query(date_from, date_to, limit)

print("Retrieving data type...")
type_map = get_type_map(source_query)

args = {
    'job_name': job_name,
    'runner': runner,
    'source_query': source_query,
    'type_map': type_map,
    'sink_data_location': data_location,
    'project': PROJECT_ID,
    'region': REGION,
    'staging_location': STAGING_DIR,
    'temp_location': TEMP_DIR,
    'save_main_session': True,
    'setup_file': './setup.py'
}
print("Pipeline args are set.")


Data will be extracted to: ./tfdv/data/20100101-20111231/
Generating source query...
Retrieving data type...
Pipeline args are set.

Your notebook will freeze until the Apache Beam job finishes...


In [11]:
tf.logging.set_verbosity(tf.logging.ERROR)

print("Running data extraction pipeline...")
run_pipeline(args)
print("Pipeline is done.")


Running data extraction pipeline...
GoogleCloudOptions(dataflow_endpoint=https://dataflow.googleapis.com, job_name=tfdv-flights-data-extraction-190908-141443, labels=None, no_auth=False, project=validateflow, region=europe-west1, service_account_email=None, staging_location=./tfdv/job/staging/, temp_location=./tfdv/job/temp/, template_location=None)
/Users/khalidsalama/Technology/python-venvs/py27-venv/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:360: DeprecationWarning: options is deprecated since First stable release. References to <pipeline>.options will not be supported
  pipeline.replace_all(_get_transform_overrides(pipeline.options))
WARNING:root:Dataset validateflow:temp_dataset_8bcd023761644c90abc48a257ff5b99b does not exist so we will create it as temporary with location=US
Pipeline is done.

You can list the extracted data files...


In [12]:
#!gsutil ls {DATA_DIR}/*
!ls {DATA_DIR}/*


extract-00000-of-00100.tfrecords extract-00050-of-00100.tfrecords
extract-00001-of-00100.tfrecords extract-00051-of-00100.tfrecords
extract-00002-of-00100.tfrecords extract-00052-of-00100.tfrecords
extract-00003-of-00100.tfrecords extract-00053-of-00100.tfrecords
extract-00004-of-00100.tfrecords extract-00054-of-00100.tfrecords
extract-00005-of-00100.tfrecords extract-00055-of-00100.tfrecords
extract-00006-of-00100.tfrecords extract-00056-of-00100.tfrecords
extract-00007-of-00100.tfrecords extract-00057-of-00100.tfrecords
extract-00008-of-00100.tfrecords extract-00058-of-00100.tfrecords
extract-00009-of-00100.tfrecords extract-00059-of-00100.tfrecords
extract-00010-of-00100.tfrecords extract-00060-of-00100.tfrecords
extract-00011-of-00100.tfrecords extract-00061-of-00100.tfrecords
extract-00012-of-00100.tfrecords extract-00062-of-00100.tfrecords
extract-00013-of-00100.tfrecords extract-00063-of-00100.tfrecords
extract-00014-of-00100.tfrecords extract-00064-of-00100.tfrecords
extract-00015-of-00100.tfrecords extract-00065-of-00100.tfrecords
extract-00016-of-00100.tfrecords extract-00066-of-00100.tfrecords
extract-00017-of-00100.tfrecords extract-00067-of-00100.tfrecords
extract-00018-of-00100.tfrecords extract-00068-of-00100.tfrecords
extract-00019-of-00100.tfrecords extract-00069-of-00100.tfrecords
extract-00020-of-00100.tfrecords extract-00070-of-00100.tfrecords
extract-00021-of-00100.tfrecords extract-00071-of-00100.tfrecords
extract-00022-of-00100.tfrecords extract-00072-of-00100.tfrecords
extract-00023-of-00100.tfrecords extract-00073-of-00100.tfrecords
extract-00024-of-00100.tfrecords extract-00074-of-00100.tfrecords
extract-00025-of-00100.tfrecords extract-00075-of-00100.tfrecords
extract-00026-of-00100.tfrecords extract-00076-of-00100.tfrecords
extract-00027-of-00100.tfrecords extract-00077-of-00100.tfrecords
extract-00028-of-00100.tfrecords extract-00078-of-00100.tfrecords
extract-00029-of-00100.tfrecords extract-00079-of-00100.tfrecords
extract-00030-of-00100.tfrecords extract-00080-of-00100.tfrecords
extract-00031-of-00100.tfrecords extract-00081-of-00100.tfrecords
extract-00032-of-00100.tfrecords extract-00082-of-00100.tfrecords
extract-00033-of-00100.tfrecords extract-00083-of-00100.tfrecords
extract-00034-of-00100.tfrecords extract-00084-of-00100.tfrecords
extract-00035-of-00100.tfrecords extract-00085-of-00100.tfrecords
extract-00036-of-00100.tfrecords extract-00086-of-00100.tfrecords
extract-00037-of-00100.tfrecords extract-00087-of-00100.tfrecords
extract-00038-of-00100.tfrecords extract-00088-of-00100.tfrecords
extract-00039-of-00100.tfrecords extract-00089-of-00100.tfrecords
extract-00040-of-00100.tfrecords extract-00090-of-00100.tfrecords
extract-00041-of-00100.tfrecords extract-00091-of-00100.tfrecords
extract-00042-of-00100.tfrecords extract-00092-of-00100.tfrecords
extract-00043-of-00100.tfrecords extract-00093-of-00100.tfrecords
extract-00044-of-00100.tfrecords extract-00094-of-00100.tfrecords
extract-00045-of-00100.tfrecords extract-00095-of-00100.tfrecords
extract-00046-of-00100.tfrecords extract-00096-of-00100.tfrecords
extract-00047-of-00100.tfrecords extract-00097-of-00100.tfrecords
extract-00048-of-00100.tfrecords extract-00098-of-00100.tfrecords
extract-00049-of-00100.tfrecords extract-00099-of-00100.tfrecords

2. Generate Statistics from the Data using TFDV

In this step, we will use TFDV to analyze the data in GCS and compute various statistics from it. This operation requires (multiple) full pass on the data to compute mean, max, min, etc., which needs to run at scale to analyze large dataset.

If we run the analysis on a sample of data, we can use TFDV to compute the statistics locally. However, we can run the TFDV process using Cloud Dataflow for scalability. The generated statistics is stored as a proto buffer to GCS.


In [13]:
job_name = 'tfdv-flights-stats-gen-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))
args['job_name'] = job_name
stats_location = os.path.join(STATS_DIR, 'stats.pb')

pipeline_options =  beam.options.pipeline_options.GoogleCloudOptions(**args)
print(pipeline_options)

print("Computing statistics...")
_ = tfdv.generate_statistics_from_tfrecord(
    data_location=data_location, 
    output_path=stats_location,
    stats_options=tfdv.StatsOptions(
        sample_rate=.3
    ),
    pipeline_options = pipeline_options(**args)
)

print("Statistics are computed and saved to: {}".format(stats_location))


GoogleCloudOptions(dataflow_endpoint=https://dataflow.googleapis.com, job_name=tfdv-flights-stats-gen-190908-141926, labels=None, no_auth=False, project=validateflow, region=europe-west1, service_account_email=None, staging_location=./tfdv/job/staging/, temp_location=./tfdv/job/temp/, template_location=None)
Computing statistics...
Statistics are computed and saved to: ./tfdv/stats/stats.pb

You can list saves statistics file...


In [14]:
!ls {stats_location}
#!gsutil ls {stats_location}


./tfdv/stats/stats.pb

3. Explore and Visualize the Statistics

In this step, we use TFDV visualization capabilities to explore and analyze the data, using the computed statistics from the previous step, in order to identify data ranges, categorical columns vocabulary, missing values percentages, etc. This step helps to generate the expected schema of the data. TFDV uses Facets capabilities for visualization.

Using the visualization, you can identify the following properties of the features:

  • Numeric Features: min, max, mean, stdv, median, missing percentage etc.
  • Categorical features: unique values, frequency of values, missing percentage, etc.

In [15]:
stats = tfdv.load_statistics(stats_location)
tfdv.visualize_statistics(stats)


4. Generate Schema for the Data

In this step, we generate schema for the data based on the statistics. The schema describes the data types, ranges, etc., which will be used for validating incoming new data. Before storing the generated schema to GCS, we can alter and extend this schema manually.


In [16]:
schema = tfdv.infer_schema(statistics=stats)
tfdv.display_schema(schema=schema)


Type Presence Valency Domain
Feature name
'departure_time_slot' STRING required 'departure_time_slot'
'departure_schedule_hour' INT required -
'departure_airport' STRING required 'departure_airport'
'arrival_delay' FLOAT required -
'arrival_airport' STRING required 'arrival_airport'
'departure_schedule_minute' INT required -
'airline' STRING required 'airline'
'flight_day' INT required -
'flight_day_of_week' STRING required 'flight_day_of_week'
'flight_month' STRING required 'flight_month'
'departure_delay' FLOAT required -
'flight_date' BYTES required -
Values
Domain
'departure_time_slot' '[12:00am - 6:00am]', '[12:00pm - 3:00pm]', '[3:00pm - 6:00pm]', '[6:00am - 9:00am]', '[6:00pm - 9:00pm]', '[9:00am - 12:pm]', '[9:00pm - 12:00am]'
'departure_airport' 'ABI', 'ABQ', 'ACT', 'AGS', 'ASE', 'ATL', 'AUS', 'BHM', 'BOS', 'CAE', 'CID', 'CLE', 'CLL', 'CMH', 'COS', 'CVG', 'DAL', 'DCA', 'DEN', 'DFW', 'DTW', 'EWR', 'FLL', 'HOU', 'IND', 'IPL', 'MEM', 'MGM', 'MSN', 'MTJ', 'PIA', 'PIT', 'PNS', 'RKS', 'ROW', 'SAF', 'SAV', 'SJU', 'SNA', 'TPA', 'TUL'
'arrival_airport' 'DFW', 'ELP', 'EVV', 'EYW', 'FLO', 'FSD', 'GCC', 'GGG', 'GTF', 'LAX'
'airline' '9E', 'AA', 'AS', 'B6', 'CO', 'DL', 'EV', 'F9', 'FL', 'MQ', 'OH', 'OO', 'UA', 'WN', 'XE'
'flight_day_of_week' 'Fri', 'Mon', 'Sat', 'Sun', 'Thu', 'Tue', 'Wed'
'flight_month' 'Apr', 'Aug', 'Dec', 'Feb', 'Jan', 'Jul', 'Jun', 'Mar', 'May', 'Nov', 'Oct', 'Sep'

Fix and save the schema

You can manually alter the schema before you save it. For example:

  • Set the maximum fraction of missing values allowed in a feature.
  • Add values to a categorical feature domain.
  • Set minimum fraction of values that must come from the domain .
  • Change the min and max allowed values to a numeric feauture domain.
  • Set a drift comparator.

In [22]:
from tensorflow_metadata.proto.v0 import schema_pb2

# Allow no missing values
tfdv.get_feature(schema, 'airline').presence.min_fraction = 1.0 

# Only allow 10% of the values to be new
tfdv.get_feature(schema, 'departure_airport').distribution_constraints.min_domain_mass = 0.9 

domain = tfdv.utils.schema_util.schema_pb2.FloatDomain(
    min=-60, # a flight can departure 1 hour earlier
    max=480 # maximum departure delay is 8 hours, otherwise the flight is cancelled.
)
tfdv.set_domain(schema, 'departure_delay', domain)

tfdv.get_feature(schema, 'flight_month').drift_comparator.infinity_norm.threshold = 0.01


WARNING:root:Replacing existing domain of feature "departure_delay".

In [23]:
from tensorflow.python.lib.io import file_io
from google.protobuf import text_format

tf.gfile.MkDir(dirname=SCHEMA_DIR)
schema_location = os.path.join(SCHEMA_DIR, 'schema.pb')
tfdv.write_schema_text(schema, schema_location)
print("Schema file saved to:{}".format(schema_location))


Schema file saved to:./tfdv/schema/schema.pb

You can list saved schema file...


In [24]:
!ls {schema_location}
#!gsuitl ls {schema_location}


./tfdv/schema/schema.pb

5. Extract New Data

In this step, we are going to extract new data from BigQuery and store it to GCS is TFRecord files. This will be flights data in 2012, however, we are going to introduce the following alternation in the data schema and content to demonstrate types of anomalies to be detected via TFDV:

  1. Skip February data
  2. Introduce missing values to airline
  3. Add is_weekend column
  4. Convert the time slot (12:00am - 6:00am) to two time slots: (12:00am - 3:00am), (3:00am - 6:00am)
  5. Change the departure_delay values from minutes to seconds

Implementing the "altered" source query


In [25]:
def generate_altered_query(date_from=None, date_to=None, limit=None):
    query ="""
        SELECT * FROM (
            SELECT 
              CAST(date AS DATE) AS flight_date, 
              FORMAT_DATE('%b',  CAST(date AS DATE)) AS flight_month, 
              EXTRACT(DAY FROM CAST(date AS DATE)) AS flight_day, 
              FORMAT_DATE('%a',  CAST(date AS DATE)) AS flight_day_of_week, 
              CASE WHEN EXTRACT(DAYOFWEEK FROM CAST(date AS DATE)) IN (1 , 7) THEN 'Yes' ELSE 'No' END AS is_weekend,
              CASE WHEN airline = 'MQ' THEN NULL ELSE airline END airline,
              departure_airport,
              arrival_airport,
              CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 0, 2) AS INT64) AS departure_schedule_hour, 
              CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 3, 2) AS INT64) AS departure_schedule_minute, 
              CASE 
                WHEN departure_schedule BETWEEN 600 AND 900 THEN '[6:00am - 9:00am]'
                WHEN departure_schedule BETWEEN 900 AND 1200 THEN '[9:00am - 12:pm]'
                WHEN departure_schedule BETWEEN 1200 AND 1500 THEN '[12:00pm - 3:00pm]'
                WHEN departure_schedule BETWEEN 1500 AND 1800 THEN '[3:00pm - 6:00pm]'
                WHEN departure_schedule BETWEEN 1800 AND 2100 THEN '[6:00pm - 9:00pm]'
                WHEN departure_schedule BETWEEN 2100 AND 2400 THEN '[9:00pm - 12:00am]'
                WHEN departure_schedule BETWEEN 0000 AND 300 THEN '[12:00am - 3:00am]'
                ELSE '[3:00am - 6:00am]'
              END AS departure_time_slot,
              departure_delay * 60 AS departure_delay,
              arrival_delay
            FROM 
              `bigquery-samples.airline_ontime_data.flights`
            WHERE 
              EXTRACT(MONTH FROM CAST(date AS DATE)) != 2
        )
        """
    if date_from:
        query += "WHERE flight_date >= CAST('{}' as DATE) \n".format(date_from)
        if date_to:
            query += "AND flight_date < CAST('{}' as DATE) \n".format(date_to)
    elif date_to:
        query += "WHERE flight_date < CAST('{}' as DATE) \n".format(date_to)
    
    if limit:
        query  += "LIMIT {}".format(limit)
        
    return query

You can run the following cell to see a sample of the data to be extracted...


In [ ]:
%%bigquery

SELECT 
    CAST(date AS DATE) AS flight_date, 
    FORMAT_DATE('%b',  CAST(date AS DATE)) AS flight_month, 
    EXTRACT(DAY FROM CAST(date AS DATE)) AS flight_day, 
    FORMAT_DATE('%a',  CAST(date AS DATE)) AS flight_day_of_week, 
    CASE WHEN EXTRACT(DAYOFWEEK FROM CAST(date AS DATE)) IN (1 , 7) THEN 'Yes' ELSE 'No' END AS is_weekend,
    CASE WHEN airline = 'MQ' THEN NULL ELSE airline END airline,
    departure_airport,
    arrival_airport,
    CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 0, 2) AS INT64) AS departure_schedule_hour, 
    CAST(SUBSTR(LPAD(CAST(departure_schedule AS STRING), 4, '0'), 3, 2) AS INT64) AS departure_schedule_minute, 
    CASE 
        WHEN departure_schedule BETWEEN 600 AND 900 THEN '[6:00am - 9:00am]'
        WHEN departure_schedule BETWEEN 900 AND 1200 THEN '[9:00am - 12:pm]'
        WHEN departure_schedule BETWEEN 1200 AND 1500 THEN '[12:00pm - 3:00pm]'
        WHEN departure_schedule BETWEEN 1500 AND 1800 THEN '[3:00pm - 6:00pm]'
        WHEN departure_schedule BETWEEN 1800 AND 2100 THEN '[6:00pm - 9:00pm]'
        WHEN departure_schedule BETWEEN 2100 AND 2400 THEN '[9:00pm - 12:00am]'
        WHEN departure_schedule BETWEEN 0000 AND 300 THEN '[12:00am - 3:00am]'
        ELSE '[3:00am - 6:00am]'
    END AS departure_time_slot,
    departure_delay * 60 AS departure_delay,
    arrival_delay
FROM 
    `bigquery-samples.airline_ontime_data.flights`
WHERE 
    EXTRACT(MONTH FROM CAST(date AS DATE)) != 2
LIMIT 5

Run the pipeline


In [26]:
runner = 'DirectRunner' if LOCAL else 'DataflowRunner'
job_name = 'tfdv-flights-data-extraction-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))
date_from =  '2012-01-01'
date_to = '2012-12-31'
data_location = os.path.join(DATA_DIR, 
        "{}-{}/".format(date_from.replace('-',''), date_to.replace('-','')))
print("Data will be extracted to: {}".format(data_location))

print("Generating altered source query...")
limit = 100000 if LOCAL else None
source_query = generate_query(date_from, date_to, limit)

print("Retrieving data type...")
type_map = get_type_map(source_query)

args = {
    'job_name': job_name,
    'runner': runner,
    'source_query': source_query,
    'type_map': type_map,
    'sink_data_location': data_location,
    'project': PROJECT_ID,
    'region': REGION,
    'staging_location': STAGING_DIR,
    'temp_location': TEMP_DIR,
    'save_main_session': True,
    'setup_file': './setup.py'
}
print("Pipeline args are set.")


Data will be extracted to: ./tfdv/data/20120101-20121231/
Generating altered source query...
Retrieving data type...
Pipeline args are set.

In [27]:
print("Running data extraction pipeline...")
run_pipeline(args)
print("Pipeline is done.")


Running data extraction pipeline...
GoogleCloudOptions(dataflow_endpoint=https://dataflow.googleapis.com, job_name=tfdv-flights-data-extraction-190908-143255, labels=None, no_auth=False, project=validateflow, region=europe-west1, service_account_email=None, staging_location=./tfdv/job/staging/, temp_location=./tfdv/job/temp/, template_location=None)
WARNING:root:Dataset validateflow:temp_dataset_c5a408a366dd426da5e16add56c330fe does not exist so we will create it as temporary with location=US
Pipeline is done.

You can list the extracted data files...


In [28]:
#!gsutil ls {DATA_DIR}/*
!ls {DATA_DIR}/*


./tfdv/data//20100101-20111231:
extract-00000-of-00100.tfrecords extract-00050-of-00100.tfrecords
extract-00001-of-00100.tfrecords extract-00051-of-00100.tfrecords
extract-00002-of-00100.tfrecords extract-00052-of-00100.tfrecords
extract-00003-of-00100.tfrecords extract-00053-of-00100.tfrecords
extract-00004-of-00100.tfrecords extract-00054-of-00100.tfrecords
extract-00005-of-00100.tfrecords extract-00055-of-00100.tfrecords
extract-00006-of-00100.tfrecords extract-00056-of-00100.tfrecords
extract-00007-of-00100.tfrecords extract-00057-of-00100.tfrecords
extract-00008-of-00100.tfrecords extract-00058-of-00100.tfrecords
extract-00009-of-00100.tfrecords extract-00059-of-00100.tfrecords
extract-00010-of-00100.tfrecords extract-00060-of-00100.tfrecords
extract-00011-of-00100.tfrecords extract-00061-of-00100.tfrecords
extract-00012-of-00100.tfrecords extract-00062-of-00100.tfrecords
extract-00013-of-00100.tfrecords extract-00063-of-00100.tfrecords
extract-00014-of-00100.tfrecords extract-00064-of-00100.tfrecords
extract-00015-of-00100.tfrecords extract-00065-of-00100.tfrecords
extract-00016-of-00100.tfrecords extract-00066-of-00100.tfrecords
extract-00017-of-00100.tfrecords extract-00067-of-00100.tfrecords
extract-00018-of-00100.tfrecords extract-00068-of-00100.tfrecords
extract-00019-of-00100.tfrecords extract-00069-of-00100.tfrecords
extract-00020-of-00100.tfrecords extract-00070-of-00100.tfrecords
extract-00021-of-00100.tfrecords extract-00071-of-00100.tfrecords
extract-00022-of-00100.tfrecords extract-00072-of-00100.tfrecords
extract-00023-of-00100.tfrecords extract-00073-of-00100.tfrecords
extract-00024-of-00100.tfrecords extract-00074-of-00100.tfrecords
extract-00025-of-00100.tfrecords extract-00075-of-00100.tfrecords
extract-00026-of-00100.tfrecords extract-00076-of-00100.tfrecords
extract-00027-of-00100.tfrecords extract-00077-of-00100.tfrecords
extract-00028-of-00100.tfrecords extract-00078-of-00100.tfrecords
extract-00029-of-00100.tfrecords extract-00079-of-00100.tfrecords
extract-00030-of-00100.tfrecords extract-00080-of-00100.tfrecords
extract-00031-of-00100.tfrecords extract-00081-of-00100.tfrecords
extract-00032-of-00100.tfrecords extract-00082-of-00100.tfrecords
extract-00033-of-00100.tfrecords extract-00083-of-00100.tfrecords
extract-00034-of-00100.tfrecords extract-00084-of-00100.tfrecords
extract-00035-of-00100.tfrecords extract-00085-of-00100.tfrecords
extract-00036-of-00100.tfrecords extract-00086-of-00100.tfrecords
extract-00037-of-00100.tfrecords extract-00087-of-00100.tfrecords
extract-00038-of-00100.tfrecords extract-00088-of-00100.tfrecords
extract-00039-of-00100.tfrecords extract-00089-of-00100.tfrecords
extract-00040-of-00100.tfrecords extract-00090-of-00100.tfrecords
extract-00041-of-00100.tfrecords extract-00091-of-00100.tfrecords
extract-00042-of-00100.tfrecords extract-00092-of-00100.tfrecords
extract-00043-of-00100.tfrecords extract-00093-of-00100.tfrecords
extract-00044-of-00100.tfrecords extract-00094-of-00100.tfrecords
extract-00045-of-00100.tfrecords extract-00095-of-00100.tfrecords
extract-00046-of-00100.tfrecords extract-00096-of-00100.tfrecords
extract-00047-of-00100.tfrecords extract-00097-of-00100.tfrecords
extract-00048-of-00100.tfrecords extract-00098-of-00100.tfrecords
extract-00049-of-00100.tfrecords extract-00099-of-00100.tfrecords

./tfdv/data//20120101-20121231:
extract-00000-of-00100.tfrecords extract-00050-of-00100.tfrecords
extract-00001-of-00100.tfrecords extract-00051-of-00100.tfrecords
extract-00002-of-00100.tfrecords extract-00052-of-00100.tfrecords
extract-00003-of-00100.tfrecords extract-00053-of-00100.tfrecords
extract-00004-of-00100.tfrecords extract-00054-of-00100.tfrecords
extract-00005-of-00100.tfrecords extract-00055-of-00100.tfrecords
extract-00006-of-00100.tfrecords extract-00056-of-00100.tfrecords
extract-00007-of-00100.tfrecords extract-00057-of-00100.tfrecords
extract-00008-of-00100.tfrecords extract-00058-of-00100.tfrecords
extract-00009-of-00100.tfrecords extract-00059-of-00100.tfrecords
extract-00010-of-00100.tfrecords extract-00060-of-00100.tfrecords
extract-00011-of-00100.tfrecords extract-00061-of-00100.tfrecords
extract-00012-of-00100.tfrecords extract-00062-of-00100.tfrecords
extract-00013-of-00100.tfrecords extract-00063-of-00100.tfrecords
extract-00014-of-00100.tfrecords extract-00064-of-00100.tfrecords
extract-00015-of-00100.tfrecords extract-00065-of-00100.tfrecords
extract-00016-of-00100.tfrecords extract-00066-of-00100.tfrecords
extract-00017-of-00100.tfrecords extract-00067-of-00100.tfrecords
extract-00018-of-00100.tfrecords extract-00068-of-00100.tfrecords
extract-00019-of-00100.tfrecords extract-00069-of-00100.tfrecords
extract-00020-of-00100.tfrecords extract-00070-of-00100.tfrecords
extract-00021-of-00100.tfrecords extract-00071-of-00100.tfrecords
extract-00022-of-00100.tfrecords extract-00072-of-00100.tfrecords
extract-00023-of-00100.tfrecords extract-00073-of-00100.tfrecords
extract-00024-of-00100.tfrecords extract-00074-of-00100.tfrecords
extract-00025-of-00100.tfrecords extract-00075-of-00100.tfrecords
extract-00026-of-00100.tfrecords extract-00076-of-00100.tfrecords
extract-00027-of-00100.tfrecords extract-00077-of-00100.tfrecords
extract-00028-of-00100.tfrecords extract-00078-of-00100.tfrecords
extract-00029-of-00100.tfrecords extract-00079-of-00100.tfrecords
extract-00030-of-00100.tfrecords extract-00080-of-00100.tfrecords
extract-00031-of-00100.tfrecords extract-00081-of-00100.tfrecords
extract-00032-of-00100.tfrecords extract-00082-of-00100.tfrecords
extract-00033-of-00100.tfrecords extract-00083-of-00100.tfrecords
extract-00034-of-00100.tfrecords extract-00084-of-00100.tfrecords
extract-00035-of-00100.tfrecords extract-00085-of-00100.tfrecords
extract-00036-of-00100.tfrecords extract-00086-of-00100.tfrecords
extract-00037-of-00100.tfrecords extract-00087-of-00100.tfrecords
extract-00038-of-00100.tfrecords extract-00088-of-00100.tfrecords
extract-00039-of-00100.tfrecords extract-00089-of-00100.tfrecords
extract-00040-of-00100.tfrecords extract-00090-of-00100.tfrecords
extract-00041-of-00100.tfrecords extract-00091-of-00100.tfrecords
extract-00042-of-00100.tfrecords extract-00092-of-00100.tfrecords
extract-00043-of-00100.tfrecords extract-00093-of-00100.tfrecords
extract-00044-of-00100.tfrecords extract-00094-of-00100.tfrecords
extract-00045-of-00100.tfrecords extract-00095-of-00100.tfrecords
extract-00046-of-00100.tfrecords extract-00096-of-00100.tfrecords
extract-00047-of-00100.tfrecords extract-00097-of-00100.tfrecords
extract-00048-of-00100.tfrecords extract-00098-of-00100.tfrecords
extract-00049-of-00100.tfrecords extract-00099-of-00100.tfrecords

Generate statistics for the new data


In [30]:
job_name = 'tfdv-flights-stats-gen-{}'.format(datetime.utcnow().strftime('%y%m%d-%H%M%S'))
args['job_name'] = job_name
new_stats_location = os.path.join(STATS_DIR, 'new_stats.pb')

pipeline_options = beam.options.pipeline_options.GoogleCloudOptions(**args)
print(pipeline_options)

print("Computing statistics...")
_ = tfdv.generate_statistics_from_tfrecord(
    data_location=data_location, 
    output_path=new_stats_location,
    stats_options=tfdv.StatsOptions(
        sample_rate=.5
    ),
    pipeline_options = pipeline_options
)

print("Statistics are computed and saved to: {}".format(new_stats_location))


GoogleCloudOptions(dataflow_endpoint=https://dataflow.googleapis.com, job_name=tfdv-flights-stats-gen-190908-144011, labels=None, no_auth=False, project=validateflow, region=europe-west1, service_account_email=None, staging_location=./tfdv/job/staging/, temp_location=./tfdv/job/temp/, template_location=None)
Computing statistics...
Statistics are computed and saved to: ./tfdv/stats/new_stats.pb

6. Validate the New Data and Identify Anomalies

In this step, we are going to use the generated schema to validate the newly extracted data to identify if the data complies with the schema, or if there are any anomalies to be handled.

Load Schema


In [31]:
schema = tfdv.utils.schema_util.load_schema_text(schema_location)

Load data statistics


In [32]:
stats = tfdv.load_statistics(stats_location)
new_stats = tfdv.load_statistics(new_stats_location)

Validate new statistics against schema


In [33]:
anomalies = tfdv.validate_statistics(
    statistics=new_stats, 
    schema=schema,
    previous_statistics=stats
)

Display anomalies (if any)


In [34]:
tfdv.display_anomalies(anomalies)


Anomaly short description Anomaly long description
Feature name
'airline' Unexpected string values Examples contain values missing from the schema: US (~2%), YV (<1%).
'arrival_airport' Unexpected string values Examples contain values missing from the schema: HOU (~7%), ISP (~4%), LAW (~1%), LBB (~5%), LGA (~58%), LSE (<1%), MAF (~5%), MDW (~16%).
'departure_airport' Unexpected string values Examples contain values missing from the schema: BGR (<1%), BNA (~1%), BUF (<1%), BWI (~3%), CAK (<1%), CHS (<1%), CLT (~3%), CRW (<1%), EGE (<1%), EYW (<1%), GSO (<1%), IAD (~2%), IAH (~5%), JAN (<1%), JAX (<1%), JFK (<1%), LAS (<1%), LAX (<1%), LGA (<1%), MCI (<1%), MCO (~3%), MDW (~1%), MIA (~2%), MKE (~1%), MSP (~4%), MSY (~2%), OAK (<1%), OKC (<1%), OMA (<1%), ONT (<1%), ORD (~5%), PBI (~1%), PDX (<1%), PHF (<1%), PHL (<1%), PHX (~1%), PWM (<1%), RDU (~2%), RIC (<1%), RNO (<1%), ROC (<1%), RSW (<1%), SAN (<1%), SFO (<1%), SJC (<1%), SLC (<1%), SRQ (<1%), STL (<1%), SYR (<1%), TVC (<1%), XNA (<1%).
'departure_delay' Out-of-range values Unexpectedly high value: 1011>480(upto six significant digits)
'flight_month' High Linfty distance between current and previous The Linfty distance between current and previous is 0.0832108 (up to six significant digits), above the threshold 0.01. The feature value with maximum difference is: Oct

To handling these anomalies depends on the type of each anomaly:

  • Some anomalies are handled by fixing the data. For example, filtering missing values, or making sure that the data source provides the expected values. Besides, convert the values to be in the expected range (from seconds to minutes)
  • Some anomalies are handles by updating the schema: For example, adding new values to categorical feature domains.
  • Some anomalies cannot be fixed, but should trigger or stop a downstream process. For example, detecting drift in flight_month, because February was missing, may lead to stopping the execution of a model training pipeline, one this particular period.

License

Authors: Khalid Salama and Eric Evn der Knaap


Disclaimer: This is not an official Google product. The sample code provided for an educational purpose.


Copyright 2019 Google LLC

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.


In [ ]: