In [ ]:
# %%bash
# pip install tensorflow==1.7
# pip install tensorflow-transform
This bigquery-public-data:hacker_news contains all stories and comments from Hacker News from its launch in 2006. Each story contains a story id, url, the title of the story, tthe author that made the post, when it was written, and the number of points the story received.
The objective is, given the title of the story, we want to build an ML model that can predict the source of this story.
This notebook illustrates how to build a TF Custom Estimator for Text Classification. The model will make use of the 'bow' feature in the transformed dataset as the input layer. The 'bow' feature is a sparce vector of integers representing the indecies of the words in the text (title). The model will also make use of the "vocabolary" file produced during the tf.transform pipeline as a lookup for word index.
In [1]:
import os
class Params:
pass
# Set to run on GCP
Params.GCP_PROJECT_ID = 'ksalama-gcp-playground'
Params.REGION = 'europe-west1'
Params.BUCKET = 'ksalama-gcs-cloudml'
Params.PLATFORM = 'local' # local | GCP
Params.DATA_DIR = 'data/news' if Params.PLATFORM == 'local' else 'gs://{}/data/news'.format(Params.BUCKET)
Params.TRANSFORMED_DATA_DIR = os.path.join(Params.DATA_DIR, 'transformed')
Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX = os.path.join(Params.TRANSFORMED_DATA_DIR, 'train')
Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX = os.path.join(Params.TRANSFORMED_DATA_DIR, 'eval')
Params.TEMP_DIR = os.path.join(Params.DATA_DIR, 'tmp')
Params.MODELS_DIR = 'models/news' if Params.PLATFORM == 'local' else 'gs://{}/models/news'.format(Params.BUCKET)
Params.TRANSFORM_ARTEFACTS_DIR = os.path.join(Params.MODELS_DIR,'transform')
Params.TRAIN = True
Params.RESUME_TRAINING = False
Params.EAGER = False
if Params.EAGER:
tf.enable_eager_execution()
In [2]:
import tensorflow as tf
from tensorflow import data
from tensorflow.contrib.learn.python.learn.utils import input_fn_utils
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.saved import saved_transform_io
print tf.__version__
In [3]:
RAW_HEADER = 'key,title,source'.split(',')
RAW_DEFAULTS = [['NA'],['NA'],['NA']]
TARGET_FEATURE_NAME = 'source'
TARGET_LABELS = ['github', 'nytimes', 'techcrunch']
TEXT_FEATURE_NAME = 'title'
KEY_COLUMN = 'key'
VOCAB_SIZE = 20000
TRAIN_SIZE = 73124
EVAL_SIZE = 23079
PAD_VALUE = VOCAB_SIZE + 1
VOCAB_LIST_FILE = os.path.join(Params.TRANSFORM_ARTEFACTS_DIR, 'transform_fn/assets/vocab_string_to_int_uniques')
MAX_WORDS_PER_TITLE = 10
raw_metadata = dataset_metadata.DatasetMetadata(dataset_schema.Schema({
KEY_COLUMN: dataset_schema.ColumnSchema(
tf.string, [], dataset_schema.FixedColumnRepresentation()),
TEXT_FEATURE_NAME: dataset_schema.ColumnSchema(
tf.string, [], dataset_schema.FixedColumnRepresentation()),
TARGET_FEATURE_NAME: dataset_schema.ColumnSchema(
tf.string, [], dataset_schema.FixedColumnRepresentation()),
}))
transformed_metadata = metadata_io.read_metadata(
os.path.join(Params.TRANSFORM_ARTEFACTS_DIR,"transformed_metadata"))
raw_feature_spec = raw_metadata.schema.as_feature_spec()
transformed_feature_spec = transformed_metadata.schema.as_feature_spec()
print transformed_feature_spec
In [4]:
def parse_tf_example(tf_example):
parsed_features = tf.parse_single_example(serialized=tf_example, features=transformed_feature_spec)
target = parsed_features.pop(TARGET_FEATURE_NAME)
return parsed_features, target
def generate_tfrecords_input_fn(files_pattern,
mode=tf.estimator.ModeKeys.EVAL,
num_epochs=1,
batch_size=200):
def _input_fn():
file_names = data.Dataset.list_files(files_pattern)
if Params.EAGER:
print file_names
dataset = data.TFRecordDataset(file_names )
dataset = dataset.apply(
tf.contrib.data.shuffle_and_repeat(count=num_epochs,
buffer_size=batch_size*2)
)
dataset = dataset.apply(
tf.contrib.data.map_and_batch(parse_tf_example,
batch_size=batch_size,
num_parallel_batches=2)
)
datset = dataset.prefetch(batch_size)
if Params.EAGER:
return dataset
iterator = dataset.make_one_shot_iterator()
features, target = iterator.get_next()
return features, target
return _input_fn
In [5]:
def _bow_to_vector(sparse_bow_indecies):
# Convert sparse tensor to dense tensor by padding each entry to match the longest in the batch
bow_indecies = tf.sparse_tensor_to_dense(sparse_bow_indecies, default_value=PAD_VALUE)
# Create a word_ids padding
padding = tf.constant([[0,0],[0, MAX_WORDS_PER_TITLE]])
# Pad all the word_ids entries to the maximum document length
bow_indecies_padded = tf.pad(bow_indecies, padding)
bow_vector = tf.slice(bow_indecies_padded, [0,0], [-1, MAX_WORDS_PER_TITLE])
# Return the final word_id_vector
return bow_vector
def _shallow_layers(features, params):
# word_id_vector
bow_vector = _bow_to_vector(features['bow'])
# layer to take each word_id and convert it into vector (embeddings)
word_embeddings = tf.contrib.layers.embed_sequence(bow_vector,
vocab_size=VOCAB_SIZE+2,
embed_dim=params.embedding_size)
### CNN Model ############################################################################
if params.model_type == 'CNN':
words_conv = tf.layers.conv1d(word_embeddings,
filters=params.filters,
kernel_size=params.window_size,
strides=int(window_size/2),
padding='SAME', activation=tf.nn.relu)
words_conv_shape = words_conv.get_shape()
dim = words_conv_shape[1] * words_conv_shape[2]
shallow_layer = tf.reshape(words_conv,[-1, dim])
### LSTM Model ############################################################################
elif params.model_type == 'LSTM':
rnn_layers = [tf.nn.rnn_cell.LSTMCell(
num_units=size,
forget_bias=params.forget_bias,
activation=tf.nn.tanh) for size in params.hidden_units]
# create a RNN cell composed sequentially of a number of RNNCells
multi_rnn_cell = tf.nn.rnn_cell.MultiRNNCell(rnn_layers)
shallow_layer = tf.unstack(word_embeddings, axis=1)
### MAX MIN Embedding Model ############################################################################
else:
# Repesent the doc embedding as the concatenation of MIN and MAX of the word embeddings
doc_embeddings_min = tf.reduce_min(word_embeddings, axis=1)
doc_embeddings_max = tf.reduce_max(word_embeddings, axis=1)
shallow_layer = tf.concat([doc_embeddings_min, doc_embeddings_max], axis=1)
return shallow_layer
def _fully_connected_layers(inputs, params):
hidden_layers = inputs
if params.hidden_units is not None:
# Create a fully-connected layer-stack based on the hidden_units in the params
hidden_layers = tf.contrib.layers.stack(
inputs=inputs,
layer=tf.contrib.layers.fully_connected,
stack_args= params.hidden_units,
activation_fn=tf.nn.relu)
return hidden_layers
def model_fn(features, labels, mode, params):
# Create the input layers via CNN, LSTM, or MAX+MIN embeddings
shallow_layers_output = _shallow_layers(features, params)
# Create FCN using hidden units
hidden_layers = _fully_connected_layers(shallow_layers_output, params)
# Number of classes
output_layer_size = len(TARGET_LABELS)
# Connect the output layer (logits) to the hidden layer (no activation fn)
logits = tf.layers.dense(inputs=hidden_layers,
units=output_layer_size,
activation=None)
head = tf.contrib.estimator.multi_class_head(
n_classes=len(TARGET_LABELS),
label_vocabulary=TARGET_LABELS,
name='classification_head'
)
def _train_op_fn(loss):
# Create Optimiser
optimizer = tf.train.AdamOptimizer(
learning_rate=params.learning_rate)
# Create training operation
train_op = optimizer.minimize(
loss=loss, global_step=tf.train.get_global_step())
return train_op
return head.create_estimator_spec(
features,
mode,
logits,
labels=labels,
train_op_fn=_train_op_fn
)
In [6]:
def create_estimator(hparams, run_config):
estimator = tf.estimator.Estimator(model_fn=model_fn,
params=hparams,
config=run_config)
return estimator
In [7]:
NUM_EPOCHS = 10
BATCH_SIZE = 1000
TOTAL_STEPS = (TRAIN_SIZE/BATCH_SIZE)*NUM_EPOCHS
EVAL_EVERY_SEC = 60
hparams = tf.contrib.training.HParams(
num_epochs=NUM_EPOCHS,
batch_size=BATCH_SIZE,
embedding_size = 50, # word embedding vector size
learning_rate=0.01,
hidden_units=[64, 32],
max_steps=TOTAL_STEPS,
model_type='MAXMIN_EMBEDDING', # CNN | LSTM | MAXMIN_EMBEDDING
#CNN Params
window_size = 3,
filters = 2,
#LSTM Params
forget_bias=1.0,
keep_prob = 0.8,
)
MODEL_NAME = 'dnn_estimator_custom'
model_dir = os.path.join(Params.MODELS_DIR, MODEL_NAME)
run_config = tf.estimator.RunConfig(
tf_random_seed=19830610,
log_step_count_steps=1000,
save_checkpoints_secs=EVAL_EVERY_SEC,
keep_checkpoint_max=1,
model_dir=model_dir
)
print(hparams)
print("")
print("Model Directory:", run_config.model_dir)
print("Dataset Size:", TRAIN_SIZE)
print("Batch Size:", BATCH_SIZE)
print("Steps per Epoch:",TRAIN_SIZE/BATCH_SIZE)
print("Total Steps:", TOTAL_STEPS)
In [8]:
def generate_serving_input_fn():
def _serving_fn():
receiver_tensor = {
'title': tf.placeholder(dtype=tf.string, shape=[None])
}
_, transformed_features = (
saved_transform_io.partially_apply_saved_transform(
os.path.join(Params.TRANSFORM_ARTEFACTS_DIR, transform_fn_io.TRANSFORM_FN_DIR),
receiver_tensor)
)
return tf.estimator.export.ServingInputReceiver(
transformed_features, receiver_tensor)
return _serving_fn
In [9]:
train_spec = tf.estimator.TrainSpec(
input_fn = generate_tfrecords_input_fn(
Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX+"*",
mode = tf.estimator.ModeKeys.TRAIN,
num_epochs=hparams.num_epochs,
batch_size=hparams.batch_size
),
max_steps=hparams.max_steps,
hooks=None
)
eval_spec = tf.estimator.EvalSpec(
input_fn = generate_tfrecords_input_fn(
Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX+"*",
mode=tf.estimator.ModeKeys.EVAL,
num_epochs=1,
batch_size=hparams.batch_size
),
exporters=[tf.estimator.LatestExporter(
name="estimate", # the name of the folder in which the model will be exported to under export
serving_input_receiver_fn=generate_serving_input_fn(),
exports_to_keep=1,
as_text=False)],
steps=None,
throttle_secs=EVAL_EVERY_SEC
)
In [10]:
from datetime import datetime
import shutil
if Params.TRAIN:
if not Params.RESUME_TRAINING:
print("Removing previous training artefacts...")
shutil.rmtree(model_dir, ignore_errors=True)
else:
print("Resuming training...")
tf.logging.set_verbosity(tf.logging.INFO)
time_start = datetime.utcnow()
print("Experiment started at {}".format(time_start.strftime("%H:%M:%S")))
print(".......................................")
estimator = create_estimator(hparams, run_config)
tf.estimator.train_and_evaluate(
estimator=estimator,
train_spec=train_spec,
eval_spec=eval_spec
)
time_end = datetime.utcnow()
print(".......................................")
print("Experiment finished at {}".format(time_end.strftime("%H:%M:%S")))
print("")
time_elapsed = time_end - time_start
print("Experiment elapsed time: {} seconds".format(time_elapsed.total_seconds()))
else:
print "Training was skipped!"
In [11]:
tf.logging.set_verbosity(tf.logging.ERROR)
estimator = create_estimator(hparams, run_config)
train_metrics = estimator.evaluate(
input_fn = generate_tfrecords_input_fn(
files_pattern= Params.TRANSFORMED_TRAIN_DATA_FILE_PREFIX+"*",
mode= tf.estimator.ModeKeys.EVAL,
batch_size= TRAIN_SIZE),
steps=1
)
print("############################################################################################")
print("# Train Measures: {}".format(train_metrics))
print("############################################################################################")
eval_metrics = estimator.evaluate(
input_fn=generate_tfrecords_input_fn(
files_pattern= Params.TRANSFORMED_EVAL_DATA_FILE_PREFIX+"*",
mode= tf.estimator.ModeKeys.EVAL,
batch_size= EVAL_SIZE),
steps=1
)
print("")
print("############################################################################################")
print("# Eval Measures: {}".format(eval_metrics))
print("############################################################################################")
In [12]:
import os
export_dir = model_dir +"/export/estimate/"
saved_model_dir = os.path.join(export_dir, os.listdir(export_dir)[0])
print(saved_model_dir)
print("")
predictor_fn = tf.contrib.predictor.from_saved_model(
export_dir = saved_model_dir,
signature_def_key="predict"
)
output = predictor_fn(
{
'title':[
'Microsoft and Google are joining forces for a new AI framework',
'A new version of Python is mind blowing',
'EU is investigating new data privacy policies'
]
}
)
print(output)
In [ ]: