Predict whether income exceeds $50K/yr based on census data. Also known as "Census Income" dataset.
In [1]:
import shutil
import math
from datetime import datetime
import multiprocessing
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow import data
from tensorflow.python.feature_column import feature_column
print(tf.__version__)
In [2]:
MODEL_NAME = 'cenus-model-02'
TRAIN_DATA_FILES_PATTERN = 'data/adult.data.csv'
TEST_DATA_FILES_PATTERN = 'data/adult.test.csv'
RESUME_TRAINING = False
PROCESS_FEATURES = True
EXTEND_FEATURE_COLUMNS = True
MULTI_THREADING = True
In [3]:
HEADER = ['age', 'workclass', 'fnlwgt', 'education', 'education_num',
'marital_status', 'occupation', 'relationship', 'race', 'gender',
'capital_gain', 'capital_loss', 'hours_per_week',
'native_country', 'income_bracket']
HEADER_DEFAULTS = [[0], [''], [0], [''], [0], [''], [''], [''], [''], [''],
[0], [0], [0], [''], ['']]
NUMERIC_FEATURE_NAMES = ['age', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
CATEGORICAL_FEATURE_NAMES_WITH_VOCABULARY = {
'gender': ['Female', 'Male'],
'race': ['Amer-Indian-Eskimo', 'Asian-Pac-Islander', 'Black', 'Other', 'White'],
'education': ['Bachelors', 'HS-grad', '11th', 'Masters', '9th', 'Some-college',
'Assoc-acdm', 'Assoc-voc', '7th-8th', 'Doctorate', 'Prof-school',
'5th-6th', '10th', '1st-4th', 'Preschool', '12th'],
'marital_status': ['Married-civ-spouse', 'Divorced', 'Married-spouse-absent',
'Never-married', 'Separated', 'Married-AF-spouse', 'Widowed'],
'relationship': ['Husband', 'Not-in-family', 'Wife', 'Own-child', 'Unmarried', 'Other-relative'],
'workclass': ['Self-emp-not-inc', 'Private', 'State-gov', 'Federal-gov', 'Local-gov', '?',
'Self-emp-inc', 'Without-pay', 'Never-worked']
}
CATEGORICAL_FEATURE_NAMES_WITH_BUCKET_SIZE = {
'occupation': 50,
'native_country' : 100
}
CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURE_NAMES_WITH_VOCABULARY.keys()) + list(CATEGORICAL_FEATURE_NAMES_WITH_BUCKET_SIZE.keys())
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES
TARGET_NAME = 'income_bracket'
TARGET_LABELS = ['<=50K', '>50K']
WEIGHT_COLUMN_NAME = 'fnlwgt'
UNUSED_FEATURE_NAMES = list(set(HEADER) - set(FEATURE_NAMES) - {TARGET_NAME} - {WEIGHT_COLUMN_NAME})
print("Header: {}".format(HEADER))
print("Numeric Features: {}".format(NUMERIC_FEATURE_NAMES))
print("Categorical Features: {}".format(CATEGORICAL_FEATURE_NAMES))
print("Target: {} - labels: {}".format(TARGET_NAME, TARGET_LABELS))
print("Unused Features: {}".format(UNUSED_FEATURE_NAMES))
In [4]:
TRAIN_DATA_SIZE = 32561
TEST_DATA_SIZE = 16278
train_data = pd.read_csv(TRAIN_DATA_FILES_PATTERN, header=None, names=HEADER )
train_data.head(10)
Out[4]:
In [5]:
train_data.describe()
Out[5]:
In [6]:
means = train_data[NUMERIC_FEATURE_NAMES].mean(axis=0)
stdvs = train_data[NUMERIC_FEATURE_NAMES].std(axis=0)
maxs = train_data[NUMERIC_FEATURE_NAMES].max(axis=0)
mins = train_data[NUMERIC_FEATURE_NAMES].min(axis=0)
df_stats = pd.DataFrame({"mean":means, "stdv":stdvs, "max":maxs, "min":mins})
df_stats.head(15)
Out[6]:
In [7]:
df_stats.to_csv(path_or_buf="data/adult.stats.csv", header=True, index=True)
In [8]:
def parse_csv_row(csv_row):
columns = tf.decode_csv(csv_row, record_defaults=HEADER_DEFAULTS)
features = dict(zip(HEADER, columns))
for column in UNUSED_FEATURE_NAMES:
features.pop(column)
target = features.pop(TARGET_NAME)
return features, target
def process_features(features):
capital_indicator = features['capital_gain'] > features['capital_loss']
features['capital_indicator'] = tf.cast(capital_indicator, dtype=tf.int32)
return features
In [9]:
def parse_label_column(label_string_tensor):
table = tf.contrib.lookup.index_table_from_tensor(tf.constant(TARGET_LABELS))
return table.lookup(label_string_tensor)
In [10]:
def csv_input_fn(files_name_pattern, mode=tf.estimator.ModeKeys.EVAL,
skip_header_lines=0,
num_epochs=None,
batch_size=200):
shuffle = True if mode == tf.estimator.ModeKeys.TRAIN else False
num_threads = multiprocessing.cpu_count() if MULTI_THREADING else 1
print("")
print("* data input_fn:")
print("================")
print("Input file(s): {}".format(files_name_pattern))
print("Batch size: {}".format(batch_size))
print("Epoch Count: {}".format(num_epochs))
print("Mode: {}".format(mode))
print("Thread Count: {}".format(num_threads))
print("Shuffle: {}".format(shuffle))
print("================")
print("")
file_names = tf.matching_files(files_name_pattern)
dataset = data.TextLineDataset(filenames=file_names)
dataset = dataset.skip(skip_header_lines)
if shuffle:
dataset = dataset.shuffle(buffer_size=2 * batch_size + 1)
dataset = dataset.batch(batch_size)
dataset = dataset.map(lambda csv_row: parse_csv_row(csv_row),
num_parallel_calls=num_threads)
if PROCESS_FEATURES:
dataset = dataset.map(lambda features, target: (process_features(features), target),
num_parallel_calls=num_threads)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_one_shot_iterator()
features, target = iterator.get_next()
return features, parse_label_column(target)
In [11]:
features, target = csv_input_fn(files_name_pattern="")
print("Features in CSV: {}".format(list(features.keys())))
print("Target in CSV: {}".format(target))
In [12]:
df_stats = pd.read_csv("data/adult.stats.csv", header=0, index_col=0)
df_stats['feature_name'] = NUMERIC_FEATURE_NAMES
df_stats.head(10)
Out[12]:
In [13]:
def extend_feature_columns(feature_columns, hparams):
age_buckets = tf.feature_column.bucketized_column(
feature_columns['age'], boundaries=[18, 25, 30, 35, 40, 45, 50, 55, 60, 65])
education_X_occupation = tf.feature_column.crossed_column(
['education', 'occupation'], hash_bucket_size=int(1e4))
age_buckets_X_race = tf.feature_column.crossed_column(
[age_buckets, feature_columns['race']], hash_bucket_size=int(1e4))
native_country_X_occupation = tf.feature_column.crossed_column(
['native_country', 'occupation'], hash_bucket_size=int(1e4))
native_country_embedded = tf.feature_column.embedding_column(
feature_columns['native_country'], dimension=hparams.embedding_size)
occupation_embedded = tf.feature_column.embedding_column(
feature_columns['occupation'], dimension=hparams.embedding_size)
education_X_occupation_embedded = tf.feature_column.embedding_column(
education_X_occupation, dimension=hparams.embedding_size)
native_country_X_occupation_embedded = tf.feature_column.embedding_column(
native_country_X_occupation, dimension=hparams.embedding_size)
feature_columns['age_buckets'] = age_buckets
feature_columns['education_X_occupation'] = education_X_occupation
feature_columns['age_buckets_X_race'] = age_buckets_X_race
feature_columns['native_country_X_occupation'] = native_country_X_occupation
feature_columns['native_country_embedded'] = native_country_embedded
feature_columns['occupation_embedded'] = occupation_embedded
feature_columns['education_X_occupation_embedded'] = education_X_occupation_embedded
feature_columns['native_country_X_occupation_embedded'] = native_country_X_occupation_embedded
return feature_columns
def standard_scaler(x, mean, stdv):
return (x-mean)/(stdv)
def maxmin_scaler(x, max_value, min_value):
return (x-min_value)/(max_value-min_value)
def get_feature_columns(hparams):
numeric_columns = {}
for feature_name in NUMERIC_FEATURE_NAMES:
feature_mean = df_stats[df_stats.feature_name == feature_name]['mean'].values[0]
feature_stdv = df_stats[df_stats.feature_name == feature_name]['stdv'].values[0]
normalizer_fn = lambda x: standard_scaler(x, feature_mean, feature_stdv)
numeric_columns[feature_name] = tf.feature_column.numeric_column(feature_name,
normalizer_fn=normalizer_fn
)
CONSTRUCTED_NUMERIC_FEATURES_NAMES = []
if PROCESS_FEATURES:
for feature_name in CONSTRUCTED_NUMERIC_FEATURES_NAMES:
numeric_columns[feature_name] = tf.feature_column.numeric_column(feature_name)
categorical_column_with_vocabulary = \
{item[0]: tf.feature_column.categorical_column_with_vocabulary_list(item[0], item[1])
for item in CATEGORICAL_FEATURE_NAMES_WITH_VOCABULARY.items()}
CONSTRUCTED_INDICATOR_FEATURES_NAMES = ['capital_indicator']
categorical_column_with_identity = {}
for feature_name in CONSTRUCTED_INDICATOR_FEATURES_NAMES:
categorical_column_with_identity[feature_name] = tf.feature_column.categorical_column_with_identity(feature_name,
num_buckets=2,
default_value=0)
categorical_column_with_hash_bucket = \
{item[0]: tf.feature_column.categorical_column_with_hash_bucket(item[0], item[1], dtype=tf.string)
for item in CATEGORICAL_FEATURE_NAMES_WITH_BUCKET_SIZE.items()}
feature_columns = {}
if numeric_columns is not None:
feature_columns.update(numeric_columns)
if categorical_column_with_vocabulary is not None:
feature_columns.update(categorical_column_with_vocabulary)
if categorical_column_with_identity is not None:
feature_columns.update(categorical_column_with_identity)
if categorical_column_with_hash_bucket is not None:
feature_columns.update(categorical_column_with_hash_bucket)
if EXTEND_FEATURE_COLUMNS:
feature_columns = extend_feature_columns(feature_columns, hparams)
return feature_columns
feature_columns = get_feature_columns(tf.contrib.training.HParams(num_buckets=5,embedding_size=3))
print("Feature Columns: {}".format(feature_columns))
In [14]:
def get_input_layer_feature_columns(hparams):
feature_columns = list(get_feature_columns(hparams).values())
dense_columns = list(
filter(lambda column: isinstance(column, feature_column._NumericColumn) |
isinstance(column, feature_column._EmbeddingColumn),
feature_columns
)
)
categorical_columns = list(
filter(lambda column: isinstance(column, feature_column._VocabularyListCategoricalColumn) |
isinstance(column, feature_column._BucketizedColumn),
feature_columns)
)
indicator_columns = list(
map(lambda column: tf.feature_column.indicator_column(column),
categorical_columns)
)
return dense_columns + indicator_columns
In [15]:
def model_fn(features, labels, mode, params):
hidden_units = params.hidden_units
output_layer_size = len(TARGET_LABELS)
feature_columns = get_input_layer_feature_columns(hparams)
# Create the input layers from the feature columns
input_layer = tf.feature_column.input_layer(features=features,
feature_columns=feature_columns)
# Create a fully-connected layer-stack based on the hidden_units in the params
hidden_layers = tf.contrib.layers.stack(inputs= input_layer,
layer= tf.contrib.layers.fully_connected,
stack_args= hidden_units)
# Connect the output layer (logits) to the hidden layer (no activation fn)
logits = tf.layers.dense(inputs=hidden_layers,
units=output_layer_size)
# Reshape output layer to 1-dim Tensor to return predictions
output = tf.squeeze(logits)
# Provide an estimator spec for `ModeKeys.PREDICT`.
if mode == tf.estimator.ModeKeys.PREDICT:
probabilities = tf.nn.softmax(logits)
predicted_indices = tf.argmax(probabilities, 1)
# Convert predicted_indices back into strings
predictions = {
'class': tf.gather(TARGET_LABELS, predicted_indices),
'probabilities': probabilities
}
export_outputs = {
'prediction': tf.estimator.export.PredictOutput(predictions)
}
# Provide an estimator spec for `ModeKeys.PREDICT` modes.
return tf.estimator.EstimatorSpec(mode,
predictions=predictions,
export_outputs=export_outputs)
weights = features[WEIGHT_COLUMN_NAME]
# Calculate loss using softmax cross entropy
loss = tf.losses.sparse_softmax_cross_entropy(
logits=logits,
labels=labels,
weights=weights
)
tf.summary.scalar('loss', loss)
if mode == tf.estimator.ModeKeys.TRAIN:
# Learning rate scheduler using exponential decay
initial_learning_rate = params.learning_rate
decay_steps = params.num_epochs
decay_rate = 0.1 # if set to 1, then no decay. Set to smaller value to reach while decaying
global_step = tf.train.get_global_step()
# decayed_learning_rate = learning_rate * decay_rate ^ (global_step / decay_steps)
learning_rate = tf.train.exponential_decay(initial_learning_rate, global_step,
decay_steps, decay_rate)
# Create Optimiser
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
# Create training operation
train_op = optimizer.minimize(
loss=loss, global_step=global_step)
# Provide an estimator spec for `ModeKeys.TRAIN` modes.
return tf.estimator.EstimatorSpec(mode=mode,
loss=loss,
train_op=train_op)
if mode == tf.estimator.ModeKeys.EVAL:
probabilities = tf.nn.softmax(logits)
predicted_indices = tf.argmax(probabilities, 1)
# Return accuracy and area under ROC curve metrics
labels_one_hot = tf.one_hot(
labels,
depth=len(TARGET_LABELS),
on_value=True,
off_value=False,
dtype=tf.bool
)
eval_metric_ops = {
'accuracy': tf.metrics.accuracy(labels, predicted_indices),
'auroc': tf.metrics.auc(labels_one_hot, probabilities)
}
# Provide an estimator spec for `ModeKeys.EVAL` modes.
return tf.estimator.EstimatorSpec(mode,
loss=loss,
eval_metric_ops=eval_metric_ops)
def create_estimator(run_config, hparams):
estimator = tf.estimator.Estimator(model_fn=classification_model_fn,
params=hparams,
config=run_config)
print("")
print("Estimator Type: {}".format(type(estimator)))
print("")
return estimator
In [16]:
def create_custom_estimator(run_config, hparams):
estimator = tf.estimator.Estimator(model_fn=model_fn,
params=hparams,
config= run_config
)
return estimator
In [17]:
TRAIN_SIZE = TRAIN_DATA_SIZE
NUM_EPOCHS = 100
BATCH_SIZE = 500
EVAL_AFTER_SEC = 60
TOTAL_STEPS = (TRAIN_SIZE/BATCH_SIZE)*NUM_EPOCHS
hparams = tf.contrib.training.HParams(
num_epochs = NUM_EPOCHS,
batch_size = BATCH_SIZE,
embedding_size = 4,
hidden_units= [64, 32, 16],
max_steps = TOTAL_STEPS,
learning_rate = 0.5
)
model_dir = 'trained_models/{}'.format(MODEL_NAME)
run_config = tf.estimator.RunConfig(
log_step_count_steps=5000,
tf_random_seed=19830610,
model_dir=model_dir
)
print(hparams)
print("Model Directory:", run_config.model_dir)
print("")
print("Dataset Size:", TRAIN_SIZE)
print("Batch Size:", BATCH_SIZE)
print("Steps per Epoch:",TRAIN_SIZE/BATCH_SIZE)
print("Total Steps:", TOTAL_STEPS)
print("That is 1 evaluation step after each",EVAL_AFTER_SEC," training seconds")
In [18]:
def json_serving_input_fn():
receiver_tensor = {}
for feature_name in FEATURE_NAMES:
dtype = tf.float32 if feature_name in NUMERIC_FEATURE_NAMES else tf.string
receiver_tensor[feature_name] = tf.placeholder(shape=[None], dtype=dtype)
if PROCESS_FEATURES:
features = process_features(receiver_tensor)
return tf.estimator.export.ServingInputReceiver(
features, receiver_tensor)
In [19]:
train_spec = tf.estimator.TrainSpec(
input_fn = lambda: csv_input_fn(
TRAIN_DATA_FILES_PATTERN,
mode = tf.estimator.ModeKeys.TRAIN,
num_epochs=hparams.num_epochs,
batch_size=hparams.batch_size
),
max_steps=hparams.max_steps,
hooks=None
)
eval_spec = tf.estimator.EvalSpec(
input_fn = lambda: csv_input_fn(
TRAIN_DATA_FILES_PATTERN,
mode=tf.estimator.ModeKeys.EVAL,
num_epochs=1,
batch_size=hparams.batch_size,
),
exporters=[tf.estimator.LatestExporter(
name="predict", # the name of the folder in which the model will be exported to under export
serving_input_receiver_fn=json_serving_input_fn,
exports_to_keep=1,
as_text=False)],
throttle_secs = EVAL_AFTER_SEC,
steps=None
)
In [20]:
if not RESUME_TRAINING:
print("Removing previous artifacts...")
shutil.rmtree(model_dir, ignore_errors=True)
else:
print("Resuming training...")
tf.logging.set_verbosity(tf.logging.INFO)
time_start = datetime.utcnow()
print("Experiment started at {}".format(time_start.strftime("%H:%M:%S")))
print(".......................................")
estimator = create_custom_estimator(run_config, hparams)
tf.estimator.train_and_evaluate(
estimator=estimator,
train_spec=train_spec,
eval_spec=eval_spec
)
time_end = datetime.utcnow()
print(".......................................")
print("Experiment finished at {}".format(time_end.strftime("%H:%M:%S")))
print("")
time_elapsed = time_end - time_start
print("Experiment elapsed time: {} seconds".format(time_elapsed.total_seconds()))
In [21]:
TRAIN_SIZE = TRAIN_DATA_SIZE
TEST_SIZE = TEST_DATA_SIZE
train_input_fn = lambda: csv_input_fn(files_name_pattern= TRAIN_DATA_FILES_PATTERN,
mode= tf.estimator.ModeKeys.EVAL,
batch_size= TRAIN_SIZE)
test_input_fn = lambda: csv_input_fn(files_name_pattern= TEST_DATA_FILES_PATTERN,
mode= tf.estimator.ModeKeys.EVAL,
batch_size= TEST_SIZE)
estimator = create_custom_estimator(run_config, hparams)
train_results = estimator.evaluate(input_fn=train_input_fn, steps=1)
print()
print("######################################################################################")
print("# Train Measures: {}".format(train_results))
print("######################################################################################")
test_results = estimator.evaluate(input_fn=test_input_fn, steps=1)
print()
print("######################################################################################")
print("# Test Measures: {}".format(test_results))
print("######################################################################################")
In [22]:
import itertools
predict_input_fn = lambda: csv_input_fn(TEST_DATA_FILES_PATTERN,
mode= tf.estimator.ModeKeys.PREDICT,
batch_size= 10)
predictions = list(itertools.islice(estimator.predict(input_fn=predict_input_fn),10))
print("")
print("* Predicted Classes: {}".format(list(map(lambda item: item["class"]
,predictions))))
print("* Predicted Probabilities: {}".format(list(map(lambda item: list(item["probabilities"])
,predictions))))
In [23]:
import os
export_dir = model_dir +"/export/predict/"
saved_model_dir = export_dir + "/" + os.listdir(path=export_dir)[-1]
print(saved_model_dir)
print("")
predictor_fn = tf.contrib.predictor.from_saved_model(
export_dir = saved_model_dir,
signature_def_key="prediction"
)
output = predictor_fn(
{
'age': [34.0],
'workclass': ['Private'],
'education': ['Doctorate'],
'education_num': [10.0],
'marital_status': ['Married-civ-spouse'],
'occupation': ['Prof-specialty'],
'relationship': ['Husband'],
'race': ['White'],
'gender': ['Male'],
'capital_gain': [0.0],
'capital_loss': [0.0],
'hours_per_week': [40.0],
'native_country':['Egyptian']
}
)
print(output)