In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
This tutorial shows how to use BigQuery TensorFlow reader for training neural network using the Keras sequential API.
This tutorial uses the United States Census Income Dataset provided by the UC Irvine Machine Learning Repository. This dataset contains information about people from a 1994 Census database, including age, education, marital status, occupation, and whether they make more than $50,000 a year.
Set up your GCP project
The following steps are required, regardless of your notebook environment.
Note: Jupyter runs lines prefixed with ! as shell commands, and it interpolates Python variables prefixed with $ into these commands.
Install required Packages, and restart runtime
In [0]:
try:
# Use the Colab's preinstalled TensorFlow 2.x
%tensorflow_version 2.x
except:
pass
In [0]:
!pip install fastavro
!pip install tensorflow-io==0.9.0
In [0]:
!pip install google-cloud-bigquery-storage
Authenticate
In [0]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')
Set your PROJECT ID
In [0]:
PROJECT_ID = "<YOUR PROJECT>" #@param {type:"string"}
! gcloud config set project $PROJECT_ID
%env GCLOUD_PROJECT=$PROJECT_ID
Import Python libraries, define constants
In [0]:
from __future__ import absolute_import, division, print_function, unicode_literals
import os
from six.moves import urllib
import tempfile
import numpy as np
import pandas as pd
import tensorflow as tf
from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPIError
LOCATION = 'us'
# Storage directory
DATA_DIR = os.path.join(tempfile.gettempdir(), 'census_data')
# Download options.
DATA_URL = 'https://storage.googleapis.com/cloud-samples-data/ml-engine/census/data'
TRAINING_FILE = 'adult.data.csv'
EVAL_FILE = 'adult.test.csv'
TRAINING_URL = '%s/%s' % (DATA_URL, TRAINING_FILE)
EVAL_URL = '%s/%s' % (DATA_URL, EVAL_FILE)
DATASET_ID = 'census_dataset'
TRAINING_TABLE_ID = 'census_training_table'
EVAL_TABLE_ID = 'census_eval_table'
CSV_SCHEMA = [
bigquery.SchemaField("age", "FLOAT64"),
bigquery.SchemaField("workclass", "STRING"),
bigquery.SchemaField("fnlwgt", "FLOAT64"),
bigquery.SchemaField("education", "STRING"),
bigquery.SchemaField("education_num", "FLOAT64"),
bigquery.SchemaField("marital_status", "STRING"),
bigquery.SchemaField("occupation", "STRING"),
bigquery.SchemaField("relationship", "STRING"),
bigquery.SchemaField("race", "STRING"),
bigquery.SchemaField("gender", "STRING"),
bigquery.SchemaField("capital_gain", "FLOAT64"),
bigquery.SchemaField("capital_loss", "FLOAT64"),
bigquery.SchemaField("hours_per_week", "FLOAT64"),
bigquery.SchemaField("native_country", "STRING"),
bigquery.SchemaField("income_bracket", "STRING"),
]
UNUSED_COLUMNS = ["fnlwgt", "education_num"]
Define helper methods to load data into BigQuery
In [0]:
def create_bigquery_dataset_if_necessary(dataset_id):
# Construct a full Dataset object to send to the API.
client = bigquery.Client(project=PROJECT_ID)
dataset = bigquery.Dataset(bigquery.dataset.DatasetReference(PROJECT_ID, dataset_id))
dataset.location = LOCATION
try:
dataset = client.create_dataset(dataset) # API request
return True
except GoogleAPIError as err:
if err.code != 409: # http_client.CONFLICT
raise
return False
In [0]:
def load_data_into_bigquery(url, table_id):
create_bigquery_dataset_if_necessary(DATASET_ID)
client = bigquery.Client(project=PROJECT_ID)
dataset_ref = client.dataset(DATASET_ID)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
job_config.source_format = bigquery.SourceFormat.CSV
job_config.schema = CSV_SCHEMA
load_job = client.load_table_from_uri(
url, table_ref, job_config=job_config
)
print("Starting job {}".format(load_job.job_id))
load_job.result() # Waits for table load to complete.
print("Job finished.")
destination_table = client.get_table(table_ref)
print("Loaded {} rows.".format(destination_table.num_rows))
Load Census data in BigQuery.
In [0]:
load_data_into_bigquery(TRAINING_URL, TRAINING_TABLE_ID)
load_data_into_bigquery(EVAL_URL, EVAL_TABLE_ID)
Confirm that data was imported
TODO: replace \<YOUR PROJECT> with your PROJECT_ID
Note: --use_bqstorage_api will get data using BigQueryStorage API and will make sure that you are authorized to use it. Make sure that it is enabled for your project: https://cloud.google.com/bigquery/docs/reference/storage/#enabling_the_api
In [0]:
%%bigquery --use_bqstorage_api
SELECT * FROM `<YOUR PROJECT>.census_dataset.census_training_table` LIMIT 5
Out[0]:
Read and transform cesnus data from BigQuery into TensorFlow DataSet
In [0]:
from tensorflow.python.framework import ops
from tensorflow.python.framework import dtypes
from tensorflow_io.bigquery import BigQueryClient
from tensorflow_io.bigquery import BigQueryReadSession
def transofrom_row(row_dict):
# Trim all string tensors
trimmed_dict = { column:
(tf.strings.strip(tensor) if tensor.dtype == 'string' else tensor)
for (column,tensor) in row_dict.items()
}
# Extract feature column
income_bracket = trimmed_dict.pop('income_bracket')
# Convert feature column to 0.0/1.0
income_bracket_float = tf.cond(tf.equal(tf.strings.strip(income_bracket), '>50K'),
lambda: tf.constant(1.0),
lambda: tf.constant(0.0))
return (trimmed_dict, income_bracket_float)
def read_bigquery(table_name):
tensorflow_io_bigquery_client = BigQueryClient()
read_session = tensorflow_io_bigquery_client.read_session(
"projects/" + PROJECT_ID,
PROJECT_ID, table_name, DATASET_ID,
list(field.name for field in CSV_SCHEMA
if not field.name in UNUSED_COLUMNS),
list(dtypes.double if field.field_type == 'FLOAT64'
else dtypes.string for field in CSV_SCHEMA
if not field.name in UNUSED_COLUMNS),
requested_streams=2)
dataset = read_session.parallel_read_rows()
transformed_ds = dataset.map (transofrom_row)
return transformed_ds
In [0]:
BATCH_SIZE = 32
training_ds = read_bigquery(TRAINING_TABLE_ID).shuffle(10000).batch(BATCH_SIZE)
eval_ds = read_bigquery(EVAL_TABLE_ID).batch(BATCH_SIZE)
In [0]:
def get_categorical_feature_values(column):
query = 'SELECT DISTINCT TRIM({}) FROM `{}`.{}.{}'.format(column, PROJECT_ID, DATASET_ID, TRAINING_TABLE_ID)
client = bigquery.Client(project=PROJECT_ID)
dataset_ref = client.dataset(DATASET_ID)
job_config = bigquery.QueryJobConfig()
query_job = client.query(query, job_config=job_config)
result = query_job.to_dataframe()
return result.values[:,0]
In [0]:
from tensorflow import feature_column
feature_columns = []
# numeric cols
for header in ['capital_gain', 'capital_loss', 'hours_per_week']:
feature_columns.append(feature_column.numeric_column(header))
# categorical cols
for header in ['workclass', 'marital_status', 'occupation', 'relationship',
'race', 'native_country', 'education']:
categorical_feature = feature_column.categorical_column_with_vocabulary_list(
header, get_categorical_feature_values(header))
categorical_feature_one_hot = feature_column.indicator_column(categorical_feature)
feature_columns.append(categorical_feature_one_hot)
# bucketized cols
age = feature_column.numeric_column('age')
age_buckets = feature_column.bucketized_column(age, boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
feature_columns.append(age_buckets)
feature_layer = tf.keras.layers.DenseFeatures(feature_columns)
Build model
In [0]:
Dense = tf.keras.layers.Dense
model = tf.keras.Sequential(
[
feature_layer,
Dense(100, activation=tf.nn.relu, kernel_initializer='uniform'),
Dense(75, activation=tf.nn.relu),
Dense(50, activation=tf.nn.relu),
Dense(25, activation=tf.nn.relu),
Dense(1, activation=tf.nn.sigmoid)
])
# Compile Keras model
model.compile(
loss='binary_crossentropy',
metrics=['accuracy'])
Train model
In [0]:
model.fit(training_ds, epochs=5)
Out[0]:
Evaluate model
In [0]:
loss, accuracy = model.evaluate(eval_ds)
print("Accuracy", accuracy)
Evaluate a couple of random samples
In [0]:
sample_x = {
'age' : np.array([56, 36]),
'workclass': np.array(['Local-gov', 'Private']),
'education': np.array(['Bachelors', 'Bachelors']),
'marital_status': np.array(['Married-civ-spouse', 'Married-civ-spouse']),
'occupation': np.array(['Tech-support', 'Other-service']),
'relationship': np.array(['Husband', 'Husband']),
'race': np.array(['White', 'Black']),
'gender': np.array(['Male', 'Male']),
'capital_gain': np.array([0, 7298]),
'capital_loss': np.array([0, 0]),
'hours_per_week': np.array([40, 36]),
'native_country': np.array(['United-States', 'United-States'])
}
model.predict(sample_x)
Out[0]: