How to read BigQuery data from TensorFlow 2.0 efficiently

This notebook accompanies the article "How to read BigQuery data from TensorFlow 2.0 efficiently"

The example problem is to find credit card fraud from the dataset published in: Andrea Dal Pozzolo, Olivier Caelen, Reid A. Johnson and Gianluca Bontempi. Calibrating Probability with Undersampling for Unbalanced Classification. In Symposium on Computational Intelligence and Data Mining (CIDM), IEEE, 2015 and available in BigQuery at

bigquery-public-data.ml_datasets.ulb_fraud_detection

Benchmark Model

In order to compare things, we will do a simple logistic regression in BigQuery ML.

Note that we are using all the columns in the dataset as predictors (except for the Time and Class columns). The Time column is used to split the dataset 80:20 with the first 80% used for training and the last 20% used for evaluation. We will also have BigQuery ML automatically balance the weights.

Because the Amount column has a huge range, we take the log of it in preprocessing.


In [ ]:
%%bash
# create output dataset
bq mk advdata

In [ ]:
%%bigquery
CREATE OR REPLACE MODEL advdata.ulb_fraud_detection 
TRANSFORM(
    * EXCEPT(Amount),
    SAFE.LOG(Amount) AS log_amount
)
OPTIONS(
    INPUT_LABEL_COLS=['class'],
    AUTO_CLASS_WEIGHTS = TRUE,
    DATA_SPLIT_METHOD='seq',
    DATA_SPLIT_COL='Time',
    MODEL_TYPE='logistic_reg'
) AS

SELECT 
 *
FROM `bigquery-public-data.ml_datasets.ulb_fraud_detection`

In [10]:
%%bigquery
SELECT * FROM ML.EVALUATE(MODEL advdata.ulb_fraud_detection)


Out[10]:
precision recall accuracy f1_score log_loss roc_auc
0 0.051938 0.893333 0.978374 0.098168 0.119626 0.98633

In [15]:
%%bigquery
SELECT predicted_class_probs, Class
FROM ML.PREDICT( MODEL advdata.ulb_fraud_detection,
  (SELECT * FROM `bigquery-public-data.ml_datasets.ulb_fraud_detection` WHERE Time = 85285.0)
)


Out[15]:
predicted_class_probs Class
0 [{'label': 1, 'prob': 0.08119100590923363}, {'... 0
1 [{'label': 1, 'prob': 0.999999999367996}, {'la... 1
2 [{'label': 1, 'prob': 0.999999999367996}, {'la... 1
3 [{'label': 1, 'prob': 0.9999999994848612}, {'l... 1
4 [{'label': 1, 'prob': 0.9999999994848612}, {'l... 1

Find the breakoff point etc. for Keras

When we do the training in Keras & TensorFlow, we need to find the place to split the dataset and how to weight the imbalanced data. (BigQuery ML did that for us because we specified 'seq' as the split method and auto_class_weights to be True).


In [11]:
%%bigquery
WITH counts AS (
SELECT
    APPROX_QUANTILES(Time, 5)[OFFSET(4)] AS train_cutoff
    , COUNTIF(CLASS > 0) AS pos
    , COUNTIF(CLASS = 0) AS neg
FROM `bigquery-public-data`.ml_datasets.ulb_fraud_detection
)

SELECT
   train_cutoff
    , SAFE.LOG(SAFE_DIVIDE(pos,neg)) AS output_bias
    , 0.5*SAFE_DIVIDE(pos + neg, pos) AS weight_pos
    , 0.5*SAFE_DIVIDE(pos + neg, neg) AS weight_neg
FROM counts


Out[11]:
train_cutoff output_bias weight_pos weight_neg
0 144803.0 -6.359359 289.438008 0.500865

The time cutoff is 144803 and the Keras model's output bias needs to be set at -6.36 The class weights need to be 289.4 and 0.5

Training a TensorFlow/Keras model that reads from BigQuery

Create the dataset from BigQuery


In [1]:
import tensorflow as tf
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession

def features_and_labels(features):
  label = features.pop('Class') # this is what we will train for
  return features, label

def read_dataset(client, row_restriction, batch_size=2048):
    GCP_PROJECT_ID='ai-analytics-solutions'  # CHANGE
    COL_NAMES = ['Time', 'Amount', 'Class'] + ['V{}'.format(i) for i in range(1,29)]
    COL_TYPES = [dtypes.float64, dtypes.float64, dtypes.int64] + [dtypes.float64 for i in range(1,29)]
    DATASET_GCP_PROJECT_ID, DATASET_ID, TABLE_ID,  = 'bigquery-public-data.ml_datasets.ulb_fraud_detection'.split('.')
    bqsession = client.read_session(
        "projects/" + GCP_PROJECT_ID,
        DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
        COL_NAMES, COL_TYPES,
        requested_streams=2,
        row_restriction=row_restriction)
    dataset = bqsession.parallel_read_rows()
    return dataset.prefetch(1).map(features_and_labels).shuffle(batch_size*10).batch(batch_size)

client = BigQueryClient()

In [2]:
temp_df = read_dataset(client, 'Time <= 144803', 2)
for row in temp_df:
    print(row)
    break


WARNING:tensorflow:From /usr/local/lib/python3.5/dist-packages/tensorflow_io/bigquery/python/ops/bigquery_api.py:214: parallel_interleave (from tensorflow.python.data.experimental.ops.interleave_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `tf.data.Dataset.interleave(map_func, cycle_length, block_length, num_parallel_calls=tf.data.experimental.AUTOTUNE)` instead. If sloppy execution is desired, use `tf.data.Options.experimental_deterministic`.
(OrderedDict([('Amount', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([0., 0.])>), ('Time', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([820., 912.])>), ('V1', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.93748131,  1.08300282])>), ('V10', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.66669886,  0.45323987])>), ('V11', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.34374585, -0.75502041])>), ('V12', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.42906363,  0.18404053])>), ('V13', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-1.32585737, -0.10756875])>), ('V14', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([ 0.10021293, -0.36109129])>), ('V15', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.14388643, -0.03724729])>), ('V16', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.61343446,  0.33432119])>), ('V17', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([ 0.13066535, -0.1296146 ])>), ('V18', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.98896903, -0.63384776])>), ('V19', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.82991152, -1.1995778 ])>), ('V2', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([0.4016488 , 0.20158881])>), ('V20', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.05640907, -0.20372614])>), ('V21', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.0017575 , -0.06043325])>), ('V22', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([0.09737922, 0.00964727])>), ('V23', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.32405032,  0.07707211])>), ('V24', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([0.4365211 , 0.07214124])>), ('V25', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([0.50967362, 0.22074665])>), ('V26', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([ 0.45411616, -0.01348303])>), ('V27', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.20180428,  0.05859641])>), ('V28', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.17543901,  0.03205391])>), ('V3', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([1.88268869, 1.49766367])>), ('V4', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.36200091,  2.661922  ])>), ('V5', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([ 0.7510884 , -0.62125581])>), ('V6', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.89926234,  0.6185537 ])>), ('V7', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([ 0.88055736, -0.64414025])>), ('V8', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.18165031,  0.31091893])>), ('V9', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([-0.21165709,  0.12596404])>)]), <tf.Tensor: shape=(2,), dtype=int64, numpy=array([0, 0])>)

In [3]:
train_df = read_dataset(client, 'Time <= 144803', 2048)
eval_df = read_dataset(client, 'Time > 144803', 2048)

Create Keras model


In [7]:
metrics = [
      tf.keras.metrics.BinaryAccuracy(name='accuracy'),
      tf.keras.metrics.Precision(name='precision'),
      tf.keras.metrics.Recall(name='recall'),
      tf.keras.metrics.AUC(name='roc_auc'),
]

# create inputs, and pass them into appropriate types of feature columns (here, everything is numeric)
inputs = {
    'V{}'.format(i) : tf.keras.layers.Input(name='V{}'.format(i), shape=(), dtype='float64') for i in range(1, 29)
}
inputs['Amount'] = tf.keras.layers.Input(name='Amount', shape=(), dtype='float64')
input_fc = [tf.feature_column.numeric_column(colname) for colname in inputs.keys()]

# transformations. only the Amount is transformed
transformed = inputs.copy()
transformed['Amount'] = tf.keras.layers.Lambda(
    lambda x: tf.math.log(tf.math.maximum(x, 0.01)), name='log_amount')(inputs['Amount'])
input_layer = tf.keras.layers.DenseFeatures(input_fc, name='inputs')(transformed)

# Deep learning model
d1 = tf.keras.layers.Dense(16, activation='relu', name='d1')(input_layer)
d2 = tf.keras.layers.Dropout(0.25, name='d2')(d1)
d3 = tf.keras.layers.Dense(16, activation='relu', name='d3')(d2)
output = tf.keras.layers.Dense(1, activation='sigmoid', name='d4', bias_initializer=tf.keras.initializers.Constant())(d3)

model = tf.keras.Model(inputs, output)
model.compile(optimizer='adam',
              loss='binary_crossentropy',
              metrics=metrics)
tf.keras.utils.plot_model(model, rankdir='LR')


WARNING:tensorflow:Layer log_amount is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2.  The layer has dtype float32 because it's dtype defaults to floatx.

If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2.

To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

WARNING:tensorflow:Layer inputs is casting an input tensor from dtype float64 to the layer's dtype of float32, which is new behavior in TensorFlow 2.  The layer has dtype float32 because it's dtype defaults to floatx.

If you intended to run this layer in float32, you can safely ignore this warning. If in doubt, this warning is likely only an issue if you are porting a TensorFlow 1.X model to TensorFlow 2.

To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.

Out[7]:

In [9]:
class_weight = {0: 0.5, 1: 289.4}
history = model.fit(train_df, validation_data=eval_df, epochs=20, class_weight=class_weight)


Epoch 1/20
111/111 [==============================] - 97s 873ms/step - loss: 1.1395 - accuracy: 0.9988 - precision: 0.7643 - recall: 0.4843 - roc_auc: 0.8522 - val_loss: 0.3859 - val_accuracy: 0.9994 - val_precision: 0.8209 - val_recall: 0.7143 - val_roc_auc: 0.9052
Epoch 2/20
111/111 [==============================] - 98s 883ms/step - loss: 0.2866 - accuracy: 0.9861 - precision: 0.1001 - recall: 0.8289 - roc_auc: 0.9408 - val_loss: 0.2099 - val_accuracy: 0.9886 - val_precision: 0.0918 - val_recall: 0.8571 - val_roc_auc: 0.9430
Epoch 3/20
111/111 [==============================] - 83s 750ms/step - loss: 0.2523 - accuracy: 0.9755 - precision: 0.0610 - recall: 0.8602 - roc_auc: 0.9502 - val_loss: 0.2207 - val_accuracy: 0.9842 - val_precision: 0.0678 - val_recall: 0.8571 - val_roc_auc: 0.9505
Epoch 4/20
111/111 [==============================] - 87s 782ms/step - loss: 0.2524 - accuracy: 0.9708 - precision: 0.0514 - recall: 0.8554 - roc_auc: 0.9503 - val_loss: 0.1968 - val_accuracy: 0.9797 - val_precision: 0.0544 - val_recall: 0.8701 - val_roc_auc: 0.9572
Epoch 5/20
111/111 [==============================] - 106s 953ms/step - loss: 0.2408 - accuracy: 0.9683 - precision: 0.0482 - recall: 0.8699 - roc_auc: 0.9548 - val_loss: 0.1911 - val_accuracy: 0.9776 - val_precision: 0.0494 - val_recall: 0.8701 - val_roc_auc: 0.9603
Epoch 6/20
111/111 [==============================] - 96s 861ms/step - loss: 0.2380 - accuracy: 0.9660 - precision: 0.0451 - recall: 0.8699 - roc_auc: 0.9555 - val_loss: 0.1854 - val_accuracy: 0.9759 - val_precision: 0.0460 - val_recall: 0.8701 - val_roc_auc: 0.9631
Epoch 7/20
111/111 [==============================] - 89s 804ms/step - loss: 0.2087 - accuracy: 0.9653 - precision: 0.0458 - recall: 0.9036 - roc_auc: 0.9649 - val_loss: 0.1783 - val_accuracy: 0.9753 - val_precision: 0.0449 - val_recall: 0.8701 - val_roc_auc: 0.9671
Epoch 8/20
111/111 [==============================] - 77s 691ms/step - loss: 0.2139 - accuracy: 0.9668 - precision: 0.0472 - recall: 0.8916 - roc_auc: 0.9646 - val_loss: 0.1711 - val_accuracy: 0.9757 - val_precision: 0.0456 - val_recall: 0.8701 - val_roc_auc: 0.9693
Epoch 9/20
111/111 [==============================] - 85s 763ms/step - loss: 0.2078 - accuracy: 0.9656 - precision: 0.0452 - recall: 0.8843 - roc_auc: 0.9686 - val_loss: 0.1670 - val_accuracy: 0.9722 - val_precision: 0.0402 - val_recall: 0.8701 - val_roc_auc: 0.9717
Epoch 10/20
111/111 [==============================] - 84s 760ms/step - loss: 0.2027 - accuracy: 0.9655 - precision: 0.0453 - recall: 0.8892 - roc_auc: 0.9686 - val_loss: 0.1621 - val_accuracy: 0.9706 - val_precision: 0.0380 - val_recall: 0.8701 - val_roc_auc: 0.9754
Epoch 11/20
111/111 [==============================] - 101s 913ms/step - loss: 0.1880 - accuracy: 0.9646 - precision: 0.0444 - recall: 0.8940 - roc_auc: 0.9743 - val_loss: 0.1669 - val_accuracy: 0.9732 - val_precision: 0.0415 - val_recall: 0.8701 - val_roc_auc: 0.9767
Epoch 12/20
111/111 [==============================] - 80s 725ms/step - loss: 0.1823 - accuracy: 0.9663 - precision: 0.0469 - recall: 0.9012 - roc_auc: 0.9758 - val_loss: 0.1534 - val_accuracy: 0.9701 - val_precision: 0.0374 - val_recall: 0.8701 - val_roc_auc: 0.9791
Epoch 13/20
111/111 [==============================] - 72s 650ms/step - loss: 0.1842 - accuracy: 0.9634 - precision: 0.0431 - recall: 0.8964 - roc_auc: 0.9771 - val_loss: 0.1489 - val_accuracy: 0.9697 - val_precision: 0.0375 - val_recall: 0.8831 - val_roc_auc: 0.9812
Epoch 14/20
111/111 [==============================] - 81s 729ms/step - loss: 0.1953 - accuracy: 0.9641 - precision: 0.0441 - recall: 0.9012 - roc_auc: 0.9711 - val_loss: 0.1489 - val_accuracy: 0.9680 - val_precision: 0.0355 - val_recall: 0.8831 - val_roc_auc: 0.9830
Epoch 15/20
111/111 [==============================] - 85s 766ms/step - loss: 0.1818 - accuracy: 0.9601 - precision: 0.0403 - recall: 0.9108 - roc_auc: 0.9761 - val_loss: 0.1437 - val_accuracy: 0.9666 - val_precision: 0.0341 - val_recall: 0.8831 - val_roc_auc: 0.9842
Epoch 16/20
111/111 [==============================] - 92s 828ms/step - loss: 0.1708 - accuracy: 0.9637 - precision: 0.0444 - recall: 0.9181 - roc_auc: 0.9793 - val_loss: 0.1399 - val_accuracy: 0.9692 - val_precision: 0.0369 - val_recall: 0.8831 - val_roc_auc: 0.9850
Epoch 17/20
111/111 [==============================] - 76s 682ms/step - loss: 0.1755 - accuracy: 0.9650 - precision: 0.0457 - recall: 0.9108 - roc_auc: 0.9772 - val_loss: 0.1555 - val_accuracy: 0.9690 - val_precision: 0.0366 - val_recall: 0.8831 - val_roc_auc: 0.9857
Epoch 18/20
111/111 [==============================] - 95s 854ms/step - loss: 0.1765 - accuracy: 0.9644 - precision: 0.0449 - recall: 0.9108 - roc_auc: 0.9780 - val_loss: 0.1368 - val_accuracy: 0.9677 - val_precision: 0.0351 - val_recall: 0.8831 - val_roc_auc: 0.9863
Epoch 19/20
111/111 [==============================] - 111s 1s/step - loss: 0.1633 - accuracy: 0.9630 - precision: 0.0436 - recall: 0.9181 - roc_auc: 0.9814 - val_loss: 0.1349 - val_accuracy: 0.9695 - val_precision: 0.0372 - val_recall: 0.8831 - val_roc_auc: 0.9864
Epoch 20/20
111/111 [==============================] - 92s 830ms/step - loss: 0.1713 - accuracy: 0.9658 - precision: 0.0469 - recall: 0.9157 - roc_auc: 0.9781 - val_loss: 0.1425 - val_accuracy: 0.9718 - val_precision: 0.0401 - val_recall: 0.8831 - val_roc_auc: 0.9865

In [23]:
import matplotlib.pyplot as plt
plt.plot(history.history['val_roc_auc']);
plt.xlabel('Epoch');
plt.ylabel('AUC');


Load TensorFlow model into BigQuery

Now that we have trained a TensorFlow model off BigQuery data ... let's load the model into BigQuery and use it for batch prediction!


In [14]:
BUCKET='ai-analytics-solutions-kfpdemo'  # CHANGE TO SOMETHING THAT YOU OWN
model.save('gs://{}/bqexample/export'.format(BUCKET))


INFO:tensorflow:Assets written to: gs://ai-analytics-solutions-kfpdemo/bqexample/export/assets

In [ ]:
%%bigquery
CREATE OR REPLACE MODEL advdata.keras_fraud_detection 
OPTIONS(model_type='tensorflow', model_path='gs://ai-analytics-solutions-kfpdemo/bqexample/export/*')

Now predict with this model (the reason it's called 'd4' is because the output node of my Keras model was called 'd4'). To get probabilities, etc. we'd have to add the corresponding outputs to the Keras model.


In [19]:
%%bigquery
SELECT d4, Class
FROM ML.PREDICT( MODEL advdata.keras_fraud_detection,
  (SELECT * FROM `bigquery-public-data.ml_datasets.ulb_fraud_detection` WHERE Time = 85285.0)
)


Out[19]:
d4 Class
0 0.075196 0
1 1.000000 1
2 1.000000 1
3 1.000000 1
4 1.000000 1

Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.