In [ ]:
%%writefile anomaly_detection_module/trainer/input.py
import tensorflow as tf
# Input function functions
def split_and_convert_string(string_tensor):
"""Splits and converts string tensor into dense float tensor.
Given string tensor, splits string by delimiter, converts to and returns
dense float tensor.
Args:
string_tensor: tf.string tensor.
Returns:
tf.float64 tensor split along delimiter.
"""
# Split string tensor into a sparse tensor based on delimiter
split_string = tf.string_split(source=tf.expand_dims(
input=string_tensor, axis=0), delimiter=";")
# Converts the values of the sparse tensor to floats
converted_tensor = tf.string_to_number(
string_tensor=split_string.values,
out_type=tf.float64)
# Create a new sparse tensor with the new converted values,
# because the original sparse tensor values are immutable
new_sparse_tensor = tf.SparseTensor(
indices=split_string.indices,
values=converted_tensor,
dense_shape=split_string.dense_shape)
# Create a dense tensor of the float values that were converted from text csv
dense_floats = tf.sparse_tensor_to_dense(
sp_input=new_sparse_tensor, default_value=0.0)
dense_floats_vector = tf.squeeze(input=dense_floats, axis=0)
return dense_floats_vector
def convert_sequences_from_strings_to_floats(features, column_list, seq_len):
"""Converts sequences from single strings to a sequence of floats.
Given features dictionary and feature column names list, convert features
from strings to a sequence of floats.
Args:
features: Dictionary of tensors of our features as tf.strings.
column_list: List of column names of our features.
seq_len: Number of timesteps in sequence.
Returns:
Dictionary of tensors of our features as tf.float64s.
"""
for column in column_list:
features[column] = split_and_convert_string(features[column])
# Since we know the sequence length, set the shape to remove the ambiguity
features[column].set_shape([seq_len])
return features
def decode_csv(value_column, mode, batch_size, params):
"""Decodes CSV file into tensors.
Given single string tensor, sequence length, and number of features,
returns features dictionary of tensors and labels tensor.
Args:
value_column: tf.string tensor of shape () compromising entire line of
CSV file.
mode: The estimator ModeKeys. Can be TRAIN or EVAL.
batch_size: Number of examples per batch.
params: Dictionary of user passed parameters.
Returns:
Features dictionary of tensors and labels tensor.
"""
if (mode == tf.estimator.ModeKeys.TRAIN or
(mode == tf.estimator.ModeKeys.EVAL and
(params["training_mode"] != "tune_anomaly_thresholds" or
(params["training_mode"] == "tune_anomaly_thresholds" and
not params["labeled_tune_thresh"])))):
# For subset of CSV files that do NOT have labels
columns = tf.decode_csv(
records=value_column,
record_defaults=params["feat_defaults"],
field_delim=",")
features = dict(zip(params["feat_names"], columns))
features = convert_sequences_from_strings_to_floats(
features=features,
column_list=params["feat_names"],
seq_len=params["seq_len"])
return features
else:
# For subset of CSV files that DO have labels
columns = tf.decode_csv(
records=value_column,
record_defaults=params["feat_defaults"] + [[0.0]], # add label default
field_delim=",")
features = dict(zip(params["feat_names"] + ["anomalous_sequence_flag"], columns))
labels = tf.cast(x=features.pop("anomalous_sequence_flag"), dtype=tf.float64)
features = convert_sequences_from_strings_to_floats(
features=features,
column_list=params["feat_names"],
seq_len=params["seq_len"])
return features, labels
def read_dataset(filename, mode, batch_size, params):
"""Reads CSV time series dataset using tf.data, doing necessary preprocessing.
Given filename, mode, batch size and other parameters, read CSV dataset using
Dataset API, apply necessary preprocessing, and return an input function to
the Estimator API.
Args:
filename: The file pattern that we want to read into our tf.data dataset.
mode: The estimator ModeKeys. Can be TRAIN or EVAL.
batch_size: Number of examples per batch.
params: Dictionary of user passed parameters.
Returns:
An input function.
"""
def _input_fn():
"""Wrapper input function to be used by Estimator API to get data tensors.
Returns:
Batched dataset object of dictionary of feature tensors and label tensor.
"""
# Create list of files that match pattern
file_list = tf.gfile.Glob(filename=filename)
# Create dataset from file list
dataset = tf.data.TextLineDataset(filenames=file_list) # Read text file
# Decode the CSV file into a features dictionary of tensors
dataset = dataset.map(
map_func=lambda x: decode_csv(
value_column=x, mode=mode, batch_size=batch_size, params=params))
# Determine amount of times to repeat file if we are training or evaluating
if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
else:
num_epochs = 1 # end-of-input after this
# Repeat files num_epoch times
dataset = dataset.repeat(count=num_epochs)
# Group the data into batches
dataset = dataset.batch(batch_size=batch_size)
# Determine if we should shuffle based on if we are training or evaluating
if mode == tf.estimator.ModeKeys.TRAIN:
dataset = dataset.shuffle(buffer_size=10 * batch_size)
# Create a iterator, then pull batch of features from the example queue
batched_dataset = dataset.make_one_shot_iterator().get_next()
return batched_dataset
return _input_fn
In [ ]:
%%writefile anomaly_detection_module/trainer/autoencoder_dense.py
import tensorflow as tf
# Dense autoencoder model functions
def dense_encoder(X, params):
"""Dense model encoder subgraph that produces latent matrix.
Given data matrix tensor X and dictionary of parameters, process through dense
model encoder subgraph and return encoder latent vector for each example in
batch.
Args:
X: tf.float64 matrix tensor of input data.
params: Dictionary of parameters.
Returns:
tf.float64 matrix tensor encoder latent vector for each example in batch.
"""
# Create the input layer to our DNN
network = X
# Add hidden layers with the given number of units/neurons per layer
for units in params["enc_dnn_hidden_units"]:
network = tf.layers.dense(
inputs=network,
units=units,
activation=tf.nn.relu)
latent_matrix = tf.layers.dense(
inputs=network,
units=params["latent_vector_size"],
activation=tf.nn.relu)
return latent_matrix
def dense_decoder(latent_matrix, orig_dims, params):
"""Dense model decoder subgraph that produces output matrix.
Given encoder latent matrix tensor, the original dimensions of the input, and
dictionary of parameters, process through dense model decoder subgraph and
return decoder output matrix.
Args:
latent_matrix: tf.float64 matrix tensor of encoder latent matrix.
orig_dims: Original dimensions of input data.
params: Dictionary of parameters.
Returns:
tf.float64 matrix tensor decoder output vector for each example in batch.
"""
# Create the input layer to our DNN
network = latent_matrix
# Add hidden layers with the given number of units/neurons per layer
for units in params["dec_dnn_hidden_units"][::-1]:
network = tf.layers.dense(
inputs=network,
units=units,
activation=tf.nn.relu)
output_matrix = tf.layers.dense(
inputs=network,
units=orig_dims,
activation=tf.nn.relu)
return output_matrix
def dense_autoencoder(X, orig_dims, params):
"""Dense model autoencoder using dense encoder and decoder networks.
Given data matrix tensor X, the original dimensions of the input, and
dictionary of parameters, process through dense model encoder and decoder
subgraphs and return reconstructed inputs as output.
Args:
X: tf.float64 matrix tensor of input data.
orig_dims: Original dimensions of input data.
params: Dictionary of parameters.
Returns:
tf.float64 matrix tensor decoder output vector for each example in batch
that is the reconstructed inputs.
"""
latent_matrix = dense_encoder(X, params)
output_matrix = dense_decoder(latent_matrix, orig_dims, params)
return output_matrix
def dense_autoencoder_model(
X, mode, params, cur_batch_size, dummy_var):
"""Dense autoencoder to reconstruct inputs and minimize reconstruction error.
Given data matrix tensor X, the current Estimator mode, the dictionary of
parameters, and the current batch size, process through dense model encoder
and decoder subgraphs and return reconstructed inputs as output.
Args:
X: tf.float64 matrix tensor of input data.
mode: Estimator ModeKeys. Can take values of TRAIN, EVAL, and PREDICT.
params: Dictionary of parameters.
cur_batch_size: Current batch size, could be partially filled.
dummy_var: Dummy variable used to allow training mode to happen since it
requires a gradient to tie back to the graph dependency.
Returns:
loss: Reconstruction loss.
train_op: Train operation so that Estimator can correctly add to dependency
graph.
X_time: 2D tensor representation of time major input data.
X_time_recon: 2D tensor representation of time major input data.
X_feat: 2D tensor representation of feature major input data.
X_feat_recon: 2D tensor representation of feature major input data.
"""
# Reshape into 2-D tensors
# Time based
# shape = (cur_batch_size * seq_len, num_feat)
X_time = tf.reshape(
tensor=X,
shape=[cur_batch_size * params["seq_len"], params["num_feat"]])
# shape = (cur_batch_size * seq_len, num_feat)
X_time_recon = dense_autoencoder(X_time, params["num_feat"], params)
# Features based
# shape = (cur_batch_size, num_feat, seq_len)
X_transposed = tf.transpose(a=X, perm=[0, 2, 1])
# shape = (cur_batch_size * num_feat, seq_len)
X_feat = tf.reshape(
tensor=X_transposed,
shape=[cur_batch_size * params["num_feat"], params["seq_len"]])
# shape = (cur_batch_size * num_feat, seq_len)
X_feat_recon = dense_autoencoder(X_feat, params["seq_len"], params)
if (mode == tf.estimator.ModeKeys.TRAIN and
params["training_mode"] == "reconstruction"):
X_time_recon_3d = tf.reshape(
tensor=X_time_recon,
shape=[cur_batch_size, params["seq_len"], params["num_feat"]])
X_feat_recon_3d = tf.transpose(
a=tf.reshape(
tensor=X_feat_recon,
shape=[cur_batch_size, params["num_feat"], params["seq_len"]]),
perm=[0, 2, 1])
X_time_recon_3d_weighted = X_time_recon_3d * params["time_loss_weight"]
X_feat_recon_3d_weighted = X_feat_recon_3d * params["feat_loss_weight"]
predictions = (X_time_recon_3d_weighted + X_feat_recon_3d_weighted) \
/ (params["time_loss_weight"] + params["feat_loss_weight"])
loss = tf.losses.mean_squared_error(labels=X, predictions=predictions)
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.train.get_global_step(),
learning_rate=params["learning_rate"],
optimizer="Adam")
return loss, train_op, None, None, None, None
else:
return None, None, X_time, X_time_recon, X_feat, X_feat_recon
In [ ]:
%%writefile anomaly_detection_module/trainer/autoencoder_lstm.py
import tensorflow as tf
# LSTM Encoder-decoder Autoencoder model functions
def create_LSTM_stack(lstm_hidden_units, lstm_dropout_output_keep_probs):
"""Create LSTM stacked cells.
Given list of LSTM hidden units and list of LSTM dropout output keep
probabilities.
Args:
lstm_hidden_units: List of integers for the number of hidden units in each
layer.
lstm_dropout_output_keep_probs: List of floats for the dropout output keep
probabilities for each layer.
Returns:
MultiRNNCell object of stacked LSTM layers.
"""
# First create a list of LSTM cell objects using our list of lstm hidden
# unit sizes
lstm_cells = [tf.contrib.rnn.BasicLSTMCell(
num_units=units,
forget_bias=1.0,
state_is_tuple=True)
for units in lstm_hidden_units]
# Next apply a dropout wrapper to our stack of LSTM cells,
# in this case just on the outputs
dropout_lstm_cells = [tf.nn.rnn_cell.DropoutWrapper(
cell=lstm_cells[cell_index],
input_keep_prob=1.0,
output_keep_prob=lstm_dropout_output_keep_probs[cell_index],
state_keep_prob=1.0)
for cell_index in range(len(lstm_cells))]
# Create a stack of layers of LSTM cells
# Combines list into MultiRNNCell object
stacked_lstm_cells = tf.contrib.rnn.MultiRNNCell(
cells=dropout_lstm_cells,
state_is_tuple=True)
return stacked_lstm_cells
# The rnn_decoder function takes labels during TRAIN/EVAL
# and a start token followed by its previous predictions during PREDICT
# Starts with an initial state of the final encoder states
def rnn_decoder(dec_input, init_state, cell, infer, dnn_hidden_units, num_feat):
"""Decoder for RNN cell.
Given list of LSTM hidden units and list of LSTM dropout output keep
probabilities.
Args:
dec_input: List of tf.float64 current batch size by number of features
matrix tensors input to the decoder.
init_state: Initial state of the decoder cell. Final state from the
encoder cell.
cell: RNN Cell object.
infer: Boolean whether in inference mode or not.
dnn_hidden_units: Python list of integers of number of units per DNN layer.
num_feat: Python integer of the number of features.
Returns:
outputs: List of decoder outputs of length number of timesteps of tf.float64
current batch size by number of features matrix tensors.
state: Final cell state of the decoder.
"""
# Create the decoder variable scope
with tf.variable_scope("decoder"):
# Load in our initial state from our encoder
# Tuple of final encoder c_state and h_state of final encoder layer
state = init_state
# Create an empty list to store our hidden state output for every timestep
outputs = []
# Begin with no previous output
previous_output = None
# Loop over all of our dec_input which will be seq_len long
for index, decoder_input in enumerate(dec_input):
# If there has been a previous output, we will determine the next input
if previous_output is not None:
# Create the input layer to our DNN
# shape = (cur_batch_size, lstm_hidden_units[-1])
network = previous_output
# Create our dnn variable scope
with tf.variable_scope(name_or_scope="dnn", reuse=tf.AUTO_REUSE):
# Add hidden layers with the given number of units/neurons per layer
# shape = (cur_batch_size, dnn_hidden_units[i])
for units in dnn_hidden_units:
network = tf.layers.dense(
inputs=network,
units=units,
activation=tf.nn.relu)
# Connect final hidden layer to linear layer to get the logits
# shape = (cur_batch_size, num_feat)
logits = tf.layers.dense(
inputs=network,
units=num_feat,
activation=None)
# If we are in inference then we will overwrite our next decoder_input
# with the logits we just calculated. Otherwise, we leave the decoder
# input input as it was from the enumerated list. We have to calculate
# the logits even when not using them so that the correct DNN subgraph
# will be generated here and after the encoder-decoder for both
# training and inference
if infer:
# shape = (cur_batch_size, num_feat)
decoder_input = logits
# If this isn"t our first time through the loop, just reuse(share) the
# same variables for each iteration within the current variable scope
if index > 0:
tf.get_variable_scope().reuse_variables()
# Run the decoder input through the decoder stack picking up from the
# previous state
# output_shape = (cur_batch_size, lstm_hidden_units[-1])
# state_shape = # tuple of final decoder c_state and h_state
output, state = cell(decoder_input, state)
# Append the current decoder hidden state output to the outputs list
# List seq_len long of shape = (cur_batch_size, lstm_hidden_units[-1])
outputs.append(output)
# Set the previous output to the output just calculated
# shape = (cur_batch_size, lstm_hidden_units[-1])
previous_output = output
return outputs, state
def lstm_enc_dec_autoencoder_model(
X, mode, params, cur_batch_size, dummy_var):
"""LSTM autoencoder to reconstruct inputs and minimize reconstruction error.
Given data matrix tensor X, the current Estimator mode, the dictionary of
parameters, current batch size, and the number of features, process through
LSTM model encoder, decoder, and DNN subgraphs and return reconstructed inputs
as output.
Args:
X: tf.float64 matrix tensor of input data.
mode: Estimator ModeKeys. Can take values of TRAIN, EVAL, and PREDICT.
params: Dictionary of parameters.
cur_batch_size: Current batch size, could be partially filled.
dummy_var: Dummy variable used to allow training mode to happen since it
requires a gradient to tie back to the graph dependency.
Returns:
loss: Reconstruction loss.
train_op: Train operation so that Estimator can correctly add to dependency
graph.
X_time: 2D tensor representation of time major input data.
X_time_recon: 2D tensor representation of time major input data.
X_feat: 2D tensor representation of feature major input data.
X_feat_recon: 2D tensor representation of feature major input data.
"""
# Unstack 3-D features tensor into a sequence(list) of 2-D tensors
# shape = (cur_batch_size, num_feat)
X_sequence = tf.unstack(value=X, num=params["seq_len"], axis=1)
# Since this is an autoencoder, the features are the labels.
# It often works better though to have the labels in reverse order
# shape = (cur_batch_size, seq_len, num_feat)
if params["reverse_labels_sequence"]:
Y = tf.reverse_sequence(
input=X,
seq_lengths=tf.tile(
input=tf.constant(value=[params["seq_len"]], dtype=tf.int64),
multiples=tf.expand_dims(input=cur_batch_size, axis=0)),
seq_axis=1,
batch_axis=0)
else:
Y = X # shape = (cur_batch_size, seq_len, num_feat)
##############################################################################
# Create encoder of encoder-decoder LSTM stacks
# Create our decoder now
dec_stacked_lstm_cells = create_LSTM_stack(
params["dec_lstm_hidden_units"],
params["lstm_dropout_output_keep_probs"])
# Create the encoder variable scope
with tf.variable_scope("encoder"):
# Create separate encoder cells with their own weights separate from decoder
enc_stacked_lstm_cells = create_LSTM_stack(
params["enc_lstm_hidden_units"],
params["lstm_dropout_output_keep_probs"])
# Encode the input sequence using our encoder stack of LSTMs
# enc_outputs = seq_len long of shape = (cur_batch_size, enc_lstm_hidden_units[-1])
# enc_states = tuple of final encoder c_state and h_state for each layer
_, enc_states = tf.nn.static_rnn(
cell=enc_stacked_lstm_cells,
inputs=X_sequence,
initial_state=enc_stacked_lstm_cells.zero_state(
batch_size=tf.cast(x=cur_batch_size, dtype=tf.int32),
dtype=tf.float64),
dtype=tf.float64)
# We just pass on the final c and h states of the encoder"s last layer,
# so extract that and drop the others
# LSTMStateTuple shape = (cur_batch_size, lstm_hidden_units[-1])
enc_final_states = enc_states[-1]
# Extract the c and h states from the tuple
# both have shape = (cur_batch_size, lstm_hidden_units[-1])
enc_final_c, enc_final_h = enc_final_states
# In case the decoder"s first layer's number of units is different than
# encoder's last layer's number of units, use a dense layer to map to the
# correct shape
# shape = (cur_batch_size, dec_lstm_hidden_units[0])
enc_final_c_dense = tf.layers.dense(
inputs=enc_final_c,
units=params["dec_lstm_hidden_units"][0],
activation=None)
# shape = (cur_batch_size, dec_lstm_hidden_units[0])
enc_final_h_dense = tf.layers.dense(
inputs=enc_final_h,
units=params["dec_lstm_hidden_units"][0],
activation=None)
# The decoder"s first layer"s state comes from the encoder,
# the rest of the layers" initial states are zero
dec_init_states = tuple(
[tf.contrib.rnn.LSTMStateTuple(c=enc_final_c_dense,
h=enc_final_h_dense)] + \
[tf.contrib.rnn.LSTMStateTuple(
c=tf.zeros(shape=[cur_batch_size, units], dtype=tf.float64),
h=tf.zeros(shape=[cur_batch_size, units], dtype=tf.float64))
for units in params["dec_lstm_hidden_units"][1:]])
##############################################################################
# Create decoder of encoder-decoder LSTM stacks
# Train our decoder now
# Encoder-decoders work differently during training, evaluation, and inference
# so we will have two separate subgraphs for each
if (mode == tf.estimator.ModeKeys.TRAIN and
params["training_mode"] == "reconstruction"):
# Break 3-D labels tensor into a list of 2-D tensors
# shape = (cur_batch_size, num_feat)
unstacked_labels = tf.unstack(value=Y, num=params["seq_len"], axis=1)
# Call our decoder using the labels as our inputs, the encoder final state
# as our initial state, our other LSTM stack as our cells, and inference
# set to false
dec_outputs, _ = rnn_decoder(
dec_input=unstacked_labels,
init_state=dec_init_states,
cell=dec_stacked_lstm_cells,
infer=False,
dnn_hidden_units=params["dnn_hidden_units"],
num_feat=params["num_feat"])
else:
# Since this is inference create fake labels. The list length needs to be
# the output sequence length even though only the first element is the only
# one actually used (as our go signal)
fake_labels = [tf.zeros(shape=[cur_batch_size, params["num_feat"]],
dtype=tf.float64)
for _ in range(params["seq_len"])]
# Call our decoder using fake labels as our inputs, the encoder final state
# as our initial state, our other LSTM stack as our cells, and inference
# set to true
# dec_outputs = seq_len long of shape = (cur_batch_size, dec_lstm_hidden_units[-1])
# decoder_states = tuple of final decoder c_state and h_state for each layer
dec_outputs, _ = rnn_decoder(
dec_input=fake_labels,
init_state=dec_init_states,
cell=dec_stacked_lstm_cells,
infer=True,
dnn_hidden_units=params["dnn_hidden_units"],
num_feat=params["num_feat"])
# Stack together list of rank 2 decoder output tensors into one rank 3 tensor
# shape = (cur_batch_size, seq_len, lstm_hidden_units[-1])
stacked_dec_outputs = tf.stack(values=dec_outputs, axis=1)
# Reshape rank 3 decoder outputs into rank 2 by folding sequence length into
# batch size
# shape = (cur_batch_size * seq_len, lstm_hidden_units[-1])
reshaped_stacked_dec_outputs = tf.reshape(
tensor=stacked_dec_outputs,
shape=[cur_batch_size * params["seq_len"],
params["dec_lstm_hidden_units"][-1]])
##############################################################################
# Create the DNN structure now after the encoder-decoder LSTM stack
# Create the input layer to our DNN
# shape = (cur_batch_size * seq_len, lstm_hidden_units[-1])
network = reshaped_stacked_dec_outputs
# Reuse the same variable scope as we used within our decoder (for inference)
with tf.variable_scope(name_or_scope="dnn", reuse=tf.AUTO_REUSE):
# Add hidden layers with the given number of units/neurons per layer
for units in params["dnn_hidden_units"]:
# shape = (cur_batch_size * seq_len, dnn_hidden_units[i])
network = tf.layers.dense(
inputs=network,
units=units,
activation=tf.nn.relu)
# Connect the final hidden layer to a dense layer with no activation to
# get the logits
# shape = (cur_batch_size * seq_len, num_feat)
logits = tf.layers.dense(
inputs=network,
units=params["num_feat"],
activation=None)
# Now that we are through the final DNN for each sequence element for
# each example in the batch, reshape the predictions to match our labels.
# shape = (cur_batch_size, seq_len, num_feat)
predictions = tf.reshape(
tensor=logits,
shape=[cur_batch_size, params["seq_len"], params["num_feat"]])
if (mode == tf.estimator.ModeKeys.TRAIN and
params["training_mode"] == "reconstruction"):
loss = tf.losses.mean_squared_error(labels=Y, predictions=predictions)
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.train.get_global_step(),
learning_rate=params["learning_rate"],
optimizer="Adam")
return loss, train_op, None, None, None, None
else:
if params["reverse_labels_sequence"]:
# shape=(cur_batch_size, seq_len, num_feat)
predictions = tf.reverse_sequence(
input=predictions,
seq_lengths=tf.tile(
input=tf.constant(value=[params["seq_len"]], dtype=tf.int64),
multiples=tf.expand_dims(input=cur_batch_size, axis=0)),
seq_axis=1,
batch_axis=0)
# Reshape into 2-D tensors
# Time based
# shape = (cur_batch_size * seq_len, num_feat)
X_time = tf.reshape(
tensor=X,
shape=[cur_batch_size * params["seq_len"], params["num_feat"]])
X_time_recon = tf.reshape(
tensor=predictions,
shape=[cur_batch_size * params["seq_len"], params["num_feat"]])
# Features based
# shape = (cur_batch_size, num_feat, seq_len)
X_transposed = tf.transpose(a=X, perm=[0, 2, 1])
# shape = (cur_batch_size * num_feat, seq_len)
X_feat = tf.reshape(
tensor=X_transposed,
shape=[cur_batch_size * params["num_feat"], params["seq_len"]])
# shape = (cur_batch_size, num_feat, seq_len)
predictions_transposed = tf.transpose(a=predictions, perm=[0, 2, 1])
# shape = (cur_batch_size * num_feat, seq_len)
X_feat_recon = tf.reshape(
tensor=predictions_transposed,
shape=[cur_batch_size * params["num_feat"], params["seq_len"]])
return None, None, X_time, X_time_recon, X_feat, X_feat_recon
In [ ]:
%%writefile anomaly_detection_module/trainer/autoencoder_pca.py
import tensorflow as tf
from .calculate_error_distribution_statistics import non_singleton_batch_cov_variable_updating
from .calculate_error_distribution_statistics import singleton_batch_cov_variable_updating
# PCA model functions
def create_pca_vars(var_name, size):
"""Creates PCA variables.
Given variable name and size, create and return PCA variables for count,
mean, covariance, eigenvalues, eignvectors, and k principal components.
Args:
var_name: String denoting which set of variables to create. Values are
"time" and "feat".
size: The size of the variable, either sequence length or number of
features.
Returns:
PCA variables for count, mean, covariance, eigenvalues,
eigenvectors, and k principal components.
"""
with tf.variable_scope(
name_or_scope="pca_vars", reuse=tf.AUTO_REUSE):
count_var = tf.get_variable(
name="pca_{}_count_var".format(var_name),
dtype=tf.int64,
initializer=tf.zeros(shape=[], dtype=tf.int64),
trainable=False)
mean_var = tf.get_variable(
name="pca_{}_mean_var".format(var_name),
dtype=tf.float64,
initializer=tf.zeros(shape=[size], dtype=tf.float64),
trainable=False)
cov_var = tf.get_variable(
name="pca_{}_cov_var".format(var_name),
dtype=tf.float64,
initializer=tf.zeros(shape=[size, size], dtype=tf.float64),
trainable=False)
eigval_var = tf.get_variable(
name="pca_{}_eigval_var".format(var_name),
dtype=tf.float64,
initializer=tf.zeros(shape=[size], dtype=tf.float64),
trainable=False)
eigvec_var = tf.get_variable(
name="pca_{}_eigvec_var".format(var_name),
dtype=tf.float64,
initializer=tf.zeros(shape=[size, size], dtype=tf.float64),
trainable=False)
k_pc_var = tf.get_variable(
name="pca_{}_k_principal_components_var".format(var_name),
dtype=tf.int64,
initializer=tf.ones(shape=[], dtype=tf.int64),
trainable=False)
return count_var, mean_var, cov_var, eigval_var, eigvec_var, k_pc_var
def create_both_pca_vars(seq_len, num_feat):
"""Creates both time & feature major PCA variables.
Given dimensions of inputs, create and return PCA variables for count,
mean, covariance, eigenvalues, eigenvectors, and k principal components
for both time and feature major representations.
Args:
seq_len: Number of timesteps in sequence.
num_feat: Number of features.
Returns:
PCA variables for count, mean, covariance, eigenvalues,
eigenvectors, and k principal components for both time and feature
major representations.
"""
# Time based
(pca_time_count_var,
pca_time_mean_var,
pca_time_cov_var,
pca_time_eigval_var,
pca_time_eigvec_var,
pca_time_k_pc_var) = create_pca_vars(
var_name="time", size=num_feat)
# Features based
(pca_feat_count_var,
pca_feat_mean_var,
pca_feat_cov_var,
pca_feat_eigval_var,
pca_feat_eigvec_var,
pca_feat_k_pc_var) = create_pca_vars(
var_name="feat", size=seq_len)
return (pca_time_count_var,
pca_time_mean_var,
pca_time_cov_var,
pca_time_eigval_var,
pca_time_eigvec_var,
pca_time_k_pc_var,
pca_feat_count_var,
pca_feat_mean_var,
pca_feat_cov_var,
pca_feat_eigval_var,
pca_feat_eigvec_var,
pca_feat_k_pc_var)
def pca_reconstruction_k_pc(X_cen, pca_eigvec_var, k_pc):
"""PCA reconstruction with k principal components.
Given centered data matrix tensor X, variables for the column means
and eigenvectors, and the number of principal components, returns
the reconstruction of X centered.
Args:
X_cen: tf.float64 matrix tensor of centered input data.
pca_eigvec_var: tf.float64 matrix variable storing eigenvectors.
k_pc: Number of principal components to keep.
Returns:
X_cen_recon: 2D input data tensor reconstructed.
"""
# time_shape = (num_feat, num_feat)
# feat_shape = (seq_len, seq_len)
projection_matrix = tf.matmul(
a=pca_eigvec_var[:, -k_pc:],
b=pca_eigvec_var[:, -k_pc:],
transpose_b=True)
# time_shape = (cur_batch_size * seq_len, num_feat)
# feat_shape = (cur_batch_size * num_feat, seq_len)
X_cen_recon = tf.matmul(
a=X_cen,
b=projection_matrix)
return X_cen_recon
def pca_reconstruction_k_pc_mse(X_cen, pca_eigvec_var, k_pc):
"""PCA reconstruction with k principal components.
Given centered data matrix tensor X, variables for the column means
and eigenvectors, and the number of principal components, returns
reconstruction MSE.
Args:
X_cen: tf.float64 matrix tensor of centered input data.
pca_eigvec_var: tf.float64 matrix variable storing eigenvectors.
k_pc: Number of principal components to keep.
Returns:
mse: Reconstruction mean squared error.
"""
# time_shape = (cur_batch_size * seq_len, num_feat)
# feat_shape = (cur_batch_size * num_feat, seq_len)
X_cen_recon = pca_reconstruction_k_pc(
X_cen, pca_eigvec_var, k_pc)
# time_shape = (cur_batch_size * seq_len, num_feat)
# feat_shape = (cur_batch_size * num_feat, seq_len)
error = X_cen - X_cen_recon
# shape = ()
mse = tf.reduce_mean(
input_tensor=tf.reduce_sum(
input_tensor=tf.square(x=error), axis=-1))
return mse
def find_best_k_principal_components(X_recon_mse, pca_k_pc_var):
"""Find best k principal components from reconstruction MSE.
Given reconstruction MSE, return number of principal components
with lowest MSE in varible.
Args:
X_recon_mse: tf.float64 vector tensor of reconstruction mean
squared error.
pca_k_pc_var: tf.int64 scalar variable to hold best number of
principal components.
Returns:
pca_k_pc_var: Updated scalar variable now with best number of
principal components.
"""
best_pca_k_pc = tf.argmin(input=X_recon_mse) + 1
with tf.control_dependencies(
control_inputs=[tf.assign(ref=pca_k_pc_var,
value=best_pca_k_pc)]):
return tf.identity(input=pca_k_pc_var)
def set_k_principal_components(user_k_pc, pca_k_pc_var):
"""Set k principal components from user-defined value.
Given user-defined number of principal components, return
variable set to this value.
Args:
user_k_pc: User-defined python integer for number of principal
components.
pca_k_pc_var: tf.int64 scalar variable to hold chosen number of
principal components.
Returns:
pca_k_pc_var: Updated scalar variable now with chosen number of
principal components.
"""
with tf.control_dependencies(
control_inputs=[tf.assign(ref=pca_k_pc_var,
value=user_k_pc)]):
return tf.identity(input=pca_k_pc_var)
def pca_model(X, mode, params, cur_batch_size, dummy_var):
"""PCA to reconstruct inputs and minimize reconstruction error.
Given data matrix tensor X, the current Estimator mode, the dictionary of
parameters, current batch size, and the number of features, process through
PCA model subgraph and return reconstructed inputs as output.
Args:
X: tf.float64 matrix tensor of input data.
mode: Estimator ModeKeys. Can take values of TRAIN, EVAL, and PREDICT.
params: Dictionary of parameters.
cur_batch_size: Current batch size, could be partially filled.
dummy_var: Dummy variable used to allow training mode to happen since it
requires a gradient to tie back to the graph dependency.
Returns:
loss: Reconstruction loss.
train_op: Train operation so that Estimator can correctly add to dependency
graph.
X_time: 2D tensor representation of time major input data.
X_time_recon: 2D tensor representation of time major input data.
X_feat: 2D tensor representation of feature major input data.
X_feat_recon: 2D tensor representation of feature major input data.
"""
# Reshape into 2-D tensors
# Time based
# shape = (cur_batch_size * seq_len, num_feat)
X_time = tf.reshape(
tensor=X,
shape=[cur_batch_size * params["seq_len"], params["num_feat"]])
# Features based
# shape = (cur_batch_size, num_feat, seq_len)
X_transposed = tf.transpose(a=X, perm=[0, 2, 1])
# shape = (cur_batch_size * num_feat, seq_len)
X_feat = tf.reshape(
tensor=X_transposed,
shape=[cur_batch_size * params["num_feat"], params["seq_len"]])
##############################################################################
# Variables for calculating error distribution statistics
(pca_time_count_var,
pca_time_mean_var,
pca_time_cov_var,
pca_time_eigval_var,
pca_time_eigvec_var,
pca_time_k_pc_var,
pca_feat_count_var,
pca_feat_mean_var,
pca_feat_cov_var,
pca_feat_eigval_var,
pca_feat_eigvec_var,
pca_feat_k_pc_var) = create_both_pca_vars(
params["seq_len"], params["num_feat"])
# 3. Loss function, training/eval ops
if (mode == tf.estimator.ModeKeys.TRAIN and
params["training_mode"] == "reconstruction"):
if not params["autotune_principal_components"]:
with tf.variable_scope(name_or_scope="pca_vars", reuse=tf.AUTO_REUSE):
# Check if batch is a singleton, very important for covariance math
# Time based
# shape = ()
singleton_condition = tf.equal(
x=cur_batch_size * params["seq_len"], y=1)
pca_time_cov_var, pca_time_mean_var, pca_time_count_var = tf.cond(
pred=singleton_condition,
true_fn=lambda: singleton_batch_cov_variable_updating(
params["seq_len"],
X_time,
pca_time_count_var,
pca_time_mean_var,
pca_time_cov_var),
false_fn=lambda: non_singleton_batch_cov_variable_updating(
cur_batch_size,
params["seq_len"],
X_time,
pca_time_count_var,
pca_time_mean_var,
pca_time_cov_var))
# shape = (num_feat,) & (num_feat, num_feat)
pca_time_eigval_tensor, pca_time_eigvec_tensor = tf.linalg.eigh(
tensor=pca_time_cov_var)
if params["k_principal_components_time"] is not None:
pca_time_k_pc = set_k_principal_components(
params["k_principal_components_time"], pca_time_k_pc_var)
else:
pca_time_k_pc = tf.zeros(shape=(), dtype=tf.float64)
# Features based
# shape = ()
singleton_features_condition = tf.equal(
x=cur_batch_size * params["num_feat"], y=1)
pca_feat_cov_var, pca_feat_mean_var, pca_feat_count_var = tf.cond(
pred=singleton_features_condition,
true_fn=lambda: singleton_batch_cov_variable_updating(
params["num_feat"],
X_feat,
pca_feat_count_var, pca_feat_mean_var,
pca_feat_cov_var),
false_fn=lambda: non_singleton_batch_cov_variable_updating(
cur_batch_size,
params["num_feat"],
X_feat,
pca_feat_count_var,
pca_feat_mean_var,
pca_feat_cov_var))
# shape = (seq_len,) & (seq_len, seq_len)
pca_feat_eigval_tensor, pca_feat_eigvec_tensor = tf.linalg.eigh(
tensor=pca_feat_cov_var)
if params["k_principal_components_feat"] is not None:
pca_feat_k_pc = set_k_principal_components(
params["k_principal_components_feat"], pca_feat_k_pc_var)
else:
pca_feat_k_pc = tf.zeros(shape=(), dtype=tf.float64)
# Lastly use control dependencies around loss to enforce the mahalanobis
# variables to be assigned, the control order matters, hence the separate
# contexts
with tf.control_dependencies(
control_inputs=[pca_time_cov_var, pca_feat_cov_var]):
with tf.control_dependencies(
control_inputs=[pca_time_mean_var, pca_feat_mean_var]):
with tf.control_dependencies(
control_inputs=[pca_time_count_var, pca_feat_count_var]):
with tf.control_dependencies(
control_inputs=[tf.assign(ref=pca_time_eigval_var,
value=pca_time_eigval_tensor),
tf.assign(ref=pca_time_eigvec_var,
value=pca_time_eigvec_tensor),
tf.assign(ref=pca_feat_eigval_var,
value=pca_feat_eigval_tensor),
tf.assign(ref=pca_feat_eigvec_var,
value=pca_feat_eigvec_tensor),
pca_time_k_pc,
pca_feat_k_pc]):
loss = tf.reduce_sum(
input_tensor=tf.zeros(
shape=(), dtype=tf.float64) * dummy_var)
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.train.get_global_step(),
learning_rate=params["learning_rate"],
optimizer="SGD")
return loss, train_op, None, None, None, None
else:
# Time based
if params["k_principal_components_time"] is None:
# shape = (cur_batch_size * seq_len, num_feat)
X_time_cen = X_time - pca_time_mean_var
# shape = (num_feat - 1,)
X_time_recon_mse = tf.map_fn(
fn=lambda x: pca_reconstruction_k_pc_mse(
X_time_cen, pca_time_eigvec_var, x),
elems=tf.range(start=1,
limit=params["num_feat"],
dtype=tf.int64),
dtype=tf.float64)
pca_time_k_pc = find_best_k_principal_components(
X_time_recon_mse, pca_time_k_pc_var)
else:
pca_time_k_pc = set_k_principal_components(
params["k_principal_components_time"], pca_time_k_pc_var)
if params["k_principal_components_feat"] is None:
# Features based
# shape = (cur_batch_size * num_feat, seq_len)
X_feat_cen = X_feat - pca_feat_mean_var
# shape = (seq_len - 1,)
X_feat_recon_mse = tf.map_fn(
fn=lambda x: pca_reconstruction_k_pc_mse(
X_feat_cen, pca_feat_eigvec_var, x),
elems=tf.range(start=1,
limit=params["seq_len"],
dtype=tf.int64),
dtype=tf.float64)
pca_feat_k_pc = find_best_k_principal_components(
X_feat_recon_mse, pca_feat_k_pc_var)
else:
pca_feat_k_pc = set_k_principal_components(
params["k_principal_components_feat"], pca_feat_k_pc_var)
with tf.control_dependencies(
control_inputs=[pca_time_k_pc, pca_feat_k_pc]):
loss = tf.reduce_sum(
input_tensor=tf.zeros(
shape=(), dtype=tf.float64) * dummy_var)
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.train.get_global_step(),
learning_rate=params["learning_rate"],
optimizer="SGD")
return loss, train_op, None, None, None, None
else:
# Time based
# shape = (cur_batch_size * seq_len, num_feat)
X_time_cen = X_time - pca_time_mean_var
# shape = (cur_batch_size * seq_len, num_feat)
if params["k_principal_components_time"] is None:
X_time_recon = pca_reconstruction_k_pc(
X_time_cen,
pca_time_eigvec_var,
pca_time_k_pc_var)
else:
X_time_recon = pca_reconstruction_k_pc(
X_time_cen,
pca_time_eigvec_var,
params["k_principal_components_time"])
# Features based
# shape = (cur_batch_size * num_feat, seq_len)
X_feat_cen = X_feat - pca_feat_mean_var
# shape = (cur_batch_size * num_feat, seq_len)
if params["k_principal_components_feat"] is None:
X_feat_recon = pca_reconstruction_k_pc(
X_feat_cen,
pca_feat_eigvec_var,
pca_feat_k_pc_var)
else:
X_feat_recon = pca_reconstruction_k_pc(
X_feat_cen,
pca_feat_eigvec_var,
params["k_principal_components_feat"])
return None, None, X_time_cen, X_time_recon, X_feat_cen, X_feat_recon
In [ ]:
%%writefile anomaly_detection_module/trainer/reconstruction.py
import tensorflow as tf
def reconstruction_evaluation(X_time_orig, X_time_recon, training_mode):
"""Reconstruction loss on evaluation set.
Given time major original and reconstructed features data and the training
mode, return loss and eval_metrics_ops.
Args:
X_time_orig: Time major original features data.
X_time_recon: Time major reconstructed features data.
training_mode: Current training mode.
Returns:
loss: Scalar reconstruction loss.
eval_metric_ops: Evaluation metrics of reconstruction.
"""
loss = tf.losses.mean_squared_error(
labels=X_time_orig, predictions=X_time_recon)
eval_metric_ops = None
if training_mode == "reconstruction":
# Reconstruction eval metrics
eval_metric_ops = {
"rmse": tf.metrics.root_mean_squared_error(
labels=X_time_orig, predictions=X_time_recon),
"mae": tf.metrics.mean_absolute_error(
labels=X_time_orig, predictions=X_time_recon)
}
return loss, eval_metric_ops
In [ ]:
%%writefile anomaly_detection_module/trainer/error_distribution_vars.py
import tensorflow as tf
def create_mahalanobis_dist_vars(var_name, size):
"""Creates mahalanobis distance variables.
Given variable name and size, create and return mahalanobis distance variables
for count, mean, covariance, and inverse covariance.
Args:
var_name: String denoting which set of variables to create. Values are
"time" and "feat".
size: The size of the variable, either sequence length or number of
features.
Returns:
Mahalanobis distance variables for count, mean, covariance, and inverse
covariance.
"""
with tf.variable_scope(
name_or_scope="mahalanobis_dist_vars", reuse=tf.AUTO_REUSE):
count_var = tf.get_variable(
name="abs_err_count_{0}_var".format(var_name),
dtype=tf.int64,
initializer=tf.zeros(shape=[], dtype=tf.int64),
trainable=False)
mean_var = tf.get_variable(
name="abs_err_mean_{0}_var".format(var_name),
dtype=tf.float64,
initializer=tf.zeros(shape=[size], dtype=tf.float64),
trainable=False)
cov_var = tf.get_variable(
name="abs_err_cov_{0}_var".format(var_name),
dtype=tf.float64,
initializer=tf.zeros(shape=[size, size], dtype=tf.float64),
trainable=False)
inv_cov_var = tf.get_variable(
name="abs_err_inv_cov_{0}_var".format(var_name),
dtype=tf.float64,
initializer=tf.zeros(shape=[size, size], dtype=tf.float64),
trainable=False)
return count_var, mean_var, cov_var, inv_cov_var
def create_both_mahalanobis_dist_vars(seq_len, num_feat):
"""Creates both time & feature major mahalanobis distance variables.
Given dimensions of inputs, create and return mahalanobis distance variables
for count, mean, covariance, and inverse covariance for both time and
feature major representations.
Args:
seq_len: Number of timesteps in sequence.
num_feat: Number of features.
Returns:
Mahalanobis distance variables for count, mean, covariance, and inverse
covariance for both time and feature major representations.
"""
# Time based
(abs_err_count_time_var,
abs_err_mean_time_var,
abs_err_cov_time_var,
abs_err_inv_cov_time_var) = create_mahalanobis_dist_vars(
var_name="time", size=num_feat)
# Features based
(abs_err_count_feat_var,
abs_err_mean_feat_var,
abs_err_cov_feat_var,
abs_err_inv_cov_feat_var) = create_mahalanobis_dist_vars(
var_name="feat", size=seq_len)
return (abs_err_count_time_var,
abs_err_mean_time_var,
abs_err_cov_time_var,
abs_err_inv_cov_time_var,
abs_err_count_feat_var,
abs_err_mean_feat_var,
abs_err_cov_feat_var,
abs_err_inv_cov_feat_var)
In [ ]:
%%writefile anomaly_detection_module/trainer/calculate_error_distribution_statistics.py
import tensorflow as tf
# Running covariance updating functions for mahalanobis distance variables
def update_record_count(count_a, count_b):
"""Updates the running number of records processed.
Given previous running total and current batch size, return new running total.
Args:
count_a: tf.int64 scalar tensor of previous running total of records.
count_b: tf.int64 scalar tensor of current batch size.
Returns:
A tf.int64 scalar tensor of new running total of records.
"""
return count_a + count_b
# Incremental covariance updating functions for mahalanobis distance variables
def update_mean_incremental(count_a, mean_a, value_b):
"""Updates the running mean vector incrementally.
Given previous running total, running column means, and single example's
column values, return new running column means.
Args:
count_a: tf.int64 scalar tensor of previous running total of records.
mean_a: tf.float64 vector tensor of previous running column means.
value_b: tf.float64 vector tensor of single example's column values.
Returns:
A tf.float64 vector tensor of new running column means.
"""
umean_a = mean_a * tf.cast(x=count_a, dtype=tf.float64)
mean_ab_num = umean_a + tf.squeeze(input=value_b, axis=0)
mean_ab = mean_ab_num / tf.cast(x=count_a + 1, dtype=tf.float64)
return mean_ab
# This function updates the covariance matrix incrementally
def update_cov_incremental(
count_a, mean_a, cov_a, value_b, mean_ab, sample_cov):
"""Updates the running covariance matrix incrementally.
Given previous running total, running column means, running covariance matrix,
single example's column values, new running column means, and whether to use
sample covariance or not, return new running covariance matrix.
Args:
count_a: tf.int64 scalar tensor of previous running total of records.
mean_a: tf.float64 vector tensor of previous running column means.
cov_a: tf.float64 matrix tensor of previous running covariance matrix.
value_b: tf.float64 vector tensor of single example's column values.
mean_ab: tf.float64 vector tensor of new running column means.
sample_cov: Bool flag on whether sample or population covariance is used.
Returns:
A tf.float64 matrix tensor of new covariance matrix.
"""
mean_diff = tf.matmul(
a=value_b - mean_a, b=value_b - mean_ab, transpose_a=True)
if sample_cov:
ucov_a = cov_a * tf.cast(x=count_a - 1, dtype=tf.float64)
cov_ab = (ucov_a + mean_diff) / tf.cast(x=count_a, dtype=tf.float64)
else:
ucov_a = cov_a * tf.cast(x=count_a, dtype=tf.float64)
cov_ab = (ucov_a + mean_diff) / tf.cast(x=count_a + 1, dtype=tf.float64)
return cov_ab
def singleton_batch_cov_variable_updating(
inner_size, X, count_variable, mean_variable, cov_variable):
"""Updates mahalanobis variables incrementally when number_of_rows equals 1.
Given the inner size of the matrix, the data vector X, the variable tracking
running record counts, the variable tracking running column means, and the
variable tracking running covariance matrix, returns updated running
covariance matrix, running column means, and running record count variables.
Args:
inner_size: Inner size of matrix X.
X: tf.float64 matrix tensor of input data.
count_variable: tf.int64 scalar variable tracking running record counts.
mean_variable: tf.float64 vector variable tracking running column means.
cov_variable: tf.float64 matrix variable tracking running covariance matrix.
Returns:
Updated running covariance matrix, running column means, and running record
count variables.
"""
# Calculate new combined mean for incremental covariance matrix calculation
# time_shape = (num_feat,), features_shape = (seq_len,)
mean_ab = update_mean_incremental(
count_a=count_variable, mean_a=mean_variable, value_b=X)
# Update running variables from single example
# time_shape = (), features_shape = ()
count_tensor = update_record_count(count_a=count_variable, count_b=1)
# time_shape = (num_feat,), features_shape = (seq_len,)
mean_tensor = mean_ab
# Check if inner dimension is greater than 1 to calculate covariance matrix
if inner_size == 1:
cov_tensor = tf.zeros_like(tensor=cov_variable, dtype=tf.float64)
else:
# time_shape = (num_feat, num_feat)
# features_shape = (seq_len, seq_len)
cov_tensor = update_cov_incremental(
count_a=count_variable,
mean_a=mean_variable,
cov_a=cov_variable,
value_b=X,
mean_ab=mean_ab,
sample_cov=True)
# Assign values to variables, use control dependencies around return to
# enforce the mahalanobis variables to be assigned, the control order matters,
# hence the separate contexts.
with tf.control_dependencies(
control_inputs=[tf.assign(ref=cov_variable, value=cov_tensor)]):
with tf.control_dependencies(
control_inputs=[tf.assign(ref=mean_variable, value=mean_tensor)]):
with tf.control_dependencies(
control_inputs=[tf.assign(ref=count_variable, value=count_tensor)]):
return (tf.identity(input=cov_variable),
tf.identity(input=mean_variable),
tf.identity(input=count_variable))
def singleton_batch_var_variable_updating(
inner_size, x, count_variable, mean_variable, var_variable):
"""Updates mahalanobis thresh vars incrementally when number_of_rows equals 1.
Given the inner size of the matrix, the data scalar x, the variable tracking
running record counts, the variable tracking the running mean, and the
variable tracking the running variance, returns updated running variance,
running mean, and running record count variables.
Args:
inner_size: Inner size of matrix X.
x: tf.float64 scalar tensor of input data.
count_variable: tf.int64 scalar variable tracking running record counts.
mean_variable: tf.float64 scalar variable tracking running mean.
var_variable: tf.float64 scalar variable tracking running variance.
Returns:
Updated running variance, running mean, and running record count variables.
"""
# Calculate new combined mean for incremental covariance matrix calculation
# time_shape = (), features_shape = ()
mean_ab = update_mean_incremental(
count_a=count_variable, mean_a=mean_variable, value_b=x)
# Update running variables from single example
# time_shape = (), features_shape = ()
count_tensor = update_record_count(count_a=count_variable, count_b=1)
# time_shape = (), features_shape = ()
mean_tensor = mean_ab
# Check if inner dimension is greater than 1 to calculate covariance matrix
if inner_size == 1:
var_tensor = tf.zeros_like(tensor=var_variable, dtype=tf.float64)
else:
# time_shape = (), features_shape = ()
var_tensor = update_cov_incremental(
count_a=count_variable,
mean_a=tf.reshape(tensor=mean_variable, shape=[1]),
cov_a=tf.reshape(tensor=var_variable, shape=[1, 1]),
value_b=tf.reshape(tensor=x, shape=[1, 1]),
mean_ab=tf.reshape(tensor=mean_ab, shape=[1]),
sample_cov=True)
var_tensor = tf.squeeze(input=var_tensor)
# Assign values to variables, use control dependencies around return to
# enforce the mahalanobis variables to be assigned, the control order matters,
# hence the separate contexts.
with tf.control_dependencies(
control_inputs=[tf.assign(ref=var_variable, value=var_tensor)]):
with tf.control_dependencies(
control_inputs=[tf.assign(ref=mean_variable, value=mean_tensor)]):
with tf.control_dependencies(
control_inputs=[tf.assign(ref=count_variable, value=count_tensor)]):
return (tf.identity(input=var_variable),
tf.identity(input=mean_variable),
tf.identity(input=count_variable))
# Batch covariance updating functions for mahalanobis distance variables
def update_mean_batch(count_a, mean_a, count_b, mean_b):
"""Updates the running mean vector with a batch of data.
Given previous running total, running column means, current batch size, and
batch's column means, return new running column means.
Args:
count_a: tf.int64 scalar tensor of previous running total of records.
mean_a: tf.float64 vector tensor of previous running column means.
count_b: tf.int64 scalar tensor of current batch size.
mean_b: tf.float64 vector tensor of batch's column means.
Returns:
A tf.float64 vector tensor of new running column means.
"""
sum_a = mean_a * tf.cast(x=count_a, dtype=tf.float64)
sum_b = mean_b * tf.cast(x=count_b, dtype=tf.float64)
mean_ab = (sum_a + sum_b) / tf.cast(x=count_a + count_b, dtype=tf.float64)
return mean_ab
def update_cov_batch(
count_a, mean_a, cov_a, count_b, mean_b, cov_b, sample_cov):
"""Updates the running covariance matrix with batch of data.
Given previous running total, running column means, running covariance matrix,
current batch size, batch's column means, batch's covariance matrix, and
whether to use sample covariance or not, return new running covariance matrix.
Args:
count_a: tf.int64 scalar tensor of previous running total of records.
mean_a: tf.float64 vector tensor of previous running column means.
cov_a: tf.float64 matrix tensor of previous running covariance matrix.
count_b: tf.int64 scalar tensor of current batch size.
mean_b: tf.float64 vector tensor of batch's column means.
cov_b: tf.float64 matrix tensor of batch's covariance matrix.
sample_cov: Bool flag on whether sample or population covariance is used.
Returns:
A tf.float64 matrix tensor of new running covariance matrix.
"""
mean_diff = tf.expand_dims(input=mean_a - mean_b, axis=0)
if sample_cov:
ucov_a = cov_a * tf.cast(x=count_a - 1, dtype=tf.float64)
ucov_b = cov_b * tf.cast(x=count_b - 1, dtype=tf.float64)
den = tf.cast(x=count_a + count_b - 1, dtype=tf.float64)
else:
ucov_a = cov_a * tf.cast(x=count_a, dtype=tf.float64)
ucov_b = cov_b * tf.cast(x=count_b, dtype=tf.float64)
den = tf.cast(x=count_a + count_b, dtype=tf.float64)
mean_diff = tf.matmul(a=mean_diff, b=mean_diff, transpose_a=True)
mean_scaling_num = tf.cast(x=count_a * count_b, dtype=tf.float64)
mean_scaling_den = tf.cast(x=count_a + count_b, dtype=tf.float64)
mean_scaling = mean_scaling_num / mean_scaling_den
cov_ab = (ucov_a + ucov_b + mean_diff * mean_scaling) / den
return cov_ab
def non_singleton_batch_cov_variable_updating(
cur_batch_size, inner_size, X, count_variable, mean_variable, cov_variable):
"""Updates mahalanobis variables when number_of_rows does NOT equal 1.
Given the current batch size, inner size of the matrix, the data matrix X,
the variable tracking running record counts, the variable tracking running
column means, and the variable tracking running covariance matrix, returns
updated running covariance matrix, running column means, and running record
count variables.
Args:
cur_batch_size: Number of examples in current batch (could be partial).
inner_size: Inner size of matrix X.
X: tf.float64 matrix tensor of input data.
count_variable: tf.int64 scalar variable tracking running record counts.
mean_variable: tf.float64 vector variable tracking running column means.
cov_variable: tf.float64 matrix variable tracking running covariance matrix.
Returns:
Updated running covariance matrix, running column means, and running record
count variables.
"""
# Find statistics of batch
number_of_rows = cur_batch_size * inner_size
# time_shape = (num_feat,), features_shape = (seq_len,)
X_mean = tf.reduce_mean(input_tensor=X, axis=0)
# time_shape = (cur_batch_size * seq_len, num_feat)
# features_shape = (cur_batch_size * num_feat, seq_len)
X_centered = X - X_mean
if inner_size > 1:
# time_shape = (num_feat, num_feat)
# features_shape = (seq_len, seq_len)
X_cov = tf.matmul(
a=X_centered,
b=X_centered,
transpose_a=True) / tf.cast(x=number_of_rows - 1, dtype=tf.float64)
# Update running variables from batch statistics
# time_shape = (), features_shape = ()
count_tensor = update_record_count(
count_a=count_variable, count_b=number_of_rows)
# time_shape = (num_feat,), features_shape = (seq_len,)
mean_tensor = update_mean_batch(
count_a=count_variable,
mean_a=mean_variable,
count_b=number_of_rows,
mean_b=X_mean)
# Check if inner dimension is greater than 1 to calculate covariance matrix
if inner_size == 1:
cov_tensor = tf.zeros_like(tensor=cov_variable, dtype=tf.float64)
else:
# time_shape = (num_feat, num_feat)
# features_shape = (seq_len, seq_len)
cov_tensor = update_cov_batch(
count_a=count_variable,
mean_a=mean_variable,
cov_a=cov_variable,
count_b=number_of_rows,
mean_b=X_mean,
cov_b=X_cov,
sample_cov=True)
# Assign values to variables, use control dependencies around return to
# enforce the mahalanobis variables to be assigned, the control order matters,
# hence the separate contexts.
with tf.control_dependencies(
control_inputs=[tf.assign(ref=cov_variable, value=cov_tensor)]):
with tf.control_dependencies(
control_inputs=[tf.assign(ref=mean_variable, value=mean_tensor)]):
with tf.control_dependencies(
control_inputs=[tf.assign(ref=count_variable, value=count_tensor)]):
return (tf.identity(input=cov_variable),
tf.identity(input=mean_variable),
tf.identity(input=count_variable))
def non_singleton_batch_var_variable_updating(
cur_batch_size, inner_size, x, count_variable, mean_variable, var_variable):
"""Updates mahalanobis thresh variables when number_of_rows does NOT equal 1.
Given the current batch size, inner size of the matrix, the data vector x,
the variable tracking the running record count, the variable tracking the
running mean, and the variable tracking the running variance, returns
updated running variance, running mean, and running record count variables.
Args:
cur_batch_size: Number of examples in current batch (could be partial).
inner_size: Inner size of matrix X.
x: tf.float64 vector tensor of mahalanobis distance.
count_variable: tf.int64 scalar variable tracking running record count.
mean_variable: tf.float64 scalar variable tracking running mean.
var_variable: tf.float64 scalar variable tracking running variance.
Returns:
Updated running variance, running mean, and running record count variables.
"""
# Find statistics of batch
number_of_rows = cur_batch_size * inner_size
# time_shape = (), features_shape = ()
x_mean = tf.reduce_mean(input_tensor=x)
# time_shape = (cur_batch_size * seq_len,)
# features_shape = (cur_batch_size * num_feat,)
x_centered = x - x_mean
if inner_size > 1:
# time_shape = (), features_shape = ()
x_var = tf.reduce_sum(input_tensor=tf.square(x=x_centered))
x_var /= tf.cast(x=number_of_rows - 1, dtype=tf.float64)
# Update running variables from batch statistics
# time_shape = (), features_shape = ()
count_tensor = update_record_count(
count_a=count_variable, count_b=number_of_rows)
# time_shape = (), features_shape = ()
mean_tensor = update_mean_batch(
count_a=count_variable,
mean_a=mean_variable,
count_b=number_of_rows,
mean_b=x_mean)
# Check if inner dimension is greater than 1 to calculate covariance matrix
if inner_size == 1:
var_tensor = tf.zeros_like(tensor=var_variable, dtype=tf.float64)
else:
# time_shape = (num_feat, num_feat)
# features_shape = (seq_len, seq_len)
var_tensor = update_cov_batch(
count_a=count_variable,
mean_a=mean_variable,
cov_a=var_variable,
count_b=number_of_rows,
mean_b=tf.expand_dims(input=x_mean, axis=0),
cov_b=tf.reshape(tensor=x_var, shape=[1, 1]),
sample_cov=True)
var_tensor = tf.squeeze(input=var_tensor)
# Assign values to variables, use control dependencies around return to
# enforce the mahalanobis thresh variables to be assigned, the control order
# matters, hence the separate contexts.
with tf.control_dependencies(
control_inputs=[tf.assign(ref=var_variable, value=var_tensor)]):
with tf.control_dependencies(
control_inputs=[tf.assign(ref=mean_variable, value=mean_tensor)]):
with tf.control_dependencies(
control_inputs=[tf.assign(ref=count_variable, value=count_tensor)]):
return (tf.identity(input=var_variable),
tf.identity(input=mean_variable),
tf.identity(input=count_variable))
def mahalanobis_dist(err_vec, mean_vec, inv_cov, final_shape):
"""Calculates mahalanobis distance from MLE.
Given reconstruction error vector, mean reconstruction error vector, inverse
covariance of reconstruction error, and mahalanobis distance tensor's final
shape, return mahalanobis distance.
Args:
err_vec: tf.float64 matrix tensor of reconstruction errors.
mean_vec: tf.float64 vector variable tracking running column means of
reconstruction errors.
inv_cov: tf.float64 matrix variable tracking running covariance matrix of
reconstruction errors.
final_shape: Final shape of mahalanobis distance tensor.
Returns:
tf.float64 matrix tensor of mahalanobis distance.
"""
# time_shape = (cur_batch_size * seq_len, num_feat)
# features_shape = (cur_batch_size * num_feat, seq_len)
err_vec_cen = err_vec - mean_vec
# time_shape = (num_feat, cur_batch_size * seq_len)
# features_shape = (seq_len, cur_batch_size * num_feat)
mahalanobis_right_product = tf.matmul(
a=inv_cov, b=err_vec_cen, transpose_b=True)
# time_shape = (cur_batch_size * seq_len, cur_batch_size * seq_len)
# features_shape = (cur_batch_size * num_feat, cur_batch_size * num_feat)
mahalanobis_dist_vectorized = tf.matmul(
a=err_vec_cen, b=mahalanobis_right_product)
# time_shape = (cur_batch_size * seq_len,)
# features_shape = (cur_batch_size * num_feat,)
mahalanobis_dist_flat = tf.diag_part(input=mahalanobis_dist_vectorized)
# time_shape = (cur_batch_size, seq_len)
# features_shape = (cur_batch_size, num_feat)
mahalanobis_dist_final_shaped = tf.reshape(
tensor=mahalanobis_dist_flat, shape=[-1, final_shape])
# time_shape = (cur_batch_size, seq_len)
# features_shape = (cur_batch_size, num_feat)
mahalanobis_dist_final_shaped_sqrt = tf.sqrt(x=mahalanobis_dist_final_shaped)
return mahalanobis_dist_final_shaped_sqrt
def calculate_error_distribution_statistics_training(
cur_batch_size,
X_time_abs_recon_err,
abs_err_count_time_var,
abs_err_mean_time_var,
abs_err_cov_time_var,
abs_err_inv_cov_time_var,
X_feat_abs_recon_err,
abs_err_count_feat_var,
abs_err_mean_feat_var,
abs_err_cov_feat_var,
abs_err_inv_cov_feat_var,
params,
dummy_var):
"""Calculates error distribution statistics during training mode.
Given dimensions of inputs, reconstructed inputs' absolute errors, and
variables tracking counts, means, and covariances of error distribution,
returns loss and train_op.
Args:
cur_batch_size: Current batch size, could be partially filled.
X_time_abs_recon_err: Time major reconstructed input data's absolute
reconstruction error.
abs_err_count_time_var: Time major running count of number of records.
abs_err_mean_time_var: Time major running column means of absolute error.
abs_err_cov_time_var: Time major running covariance matrix of absolute
error.
abs_err_inv_cov_time_var: Time major running inverse covariance matrix of
absolute error.
X_feat_abs_recon_err: Feature major reconstructed input data's absolute
reconstruction error.
abs_err_count_feat_var: Feature major running count of number of records.
abs_err_mean_feat_var: Feature major running column means of absolute error.
abs_err_cov_feat_var: Feature major running covariance matrix of absolute
error.
abs_err_inv_cov_feat_var: Feature major running inverse covariance matrix of
absolute error.
params: Dictionary of parameters.
dummy_var: Dummy variable used to allow training mode to happen since it
requires a gradient to tie back to the graph dependency.
Returns:
loss: The scalar loss to tie our updates back to Estimator graph.
train_op: The train operation to tie our updates back to Estimator graph.
"""
with tf.variable_scope(
name_or_scope="mahalanobis_dist_vars", reuse=tf.AUTO_REUSE):
# Time based
singleton_time_condition = tf.equal(
x=cur_batch_size * params["seq_len"], y=1)
cov_time_var, mean_time_var, count_time_var = tf.cond(
pred=singleton_time_condition,
true_fn=lambda: singleton_batch_cov_variable_updating(
params["seq_len"],
X_time_abs_recon_err,
abs_err_count_time_var,
abs_err_mean_time_var,
abs_err_cov_time_var),
false_fn=lambda: non_singleton_batch_cov_variable_updating(
cur_batch_size,
params["seq_len"],
X_time_abs_recon_err,
abs_err_count_time_var,
abs_err_mean_time_var,
abs_err_cov_time_var))
# Features based
singleton_feat_condition = tf.equal(
x=cur_batch_size * params["num_feat"], y=1)
cov_feat_var, mean_feat_var, count_feat_var = tf.cond(
pred=singleton_feat_condition,
true_fn=lambda: singleton_batch_cov_variable_updating(
params["num_feat"],
X_feat_abs_recon_err,
abs_err_count_feat_var,
abs_err_mean_feat_var,
abs_err_cov_feat_var),
false_fn=lambda: non_singleton_batch_cov_variable_updating(
cur_batch_size,
params["num_feat"],
X_feat_abs_recon_err,
abs_err_count_feat_var,
abs_err_mean_feat_var,
abs_err_cov_feat_var))
# Lastly use control dependencies around loss to enforce the mahalanobis
# variables to be assigned, the control order matters, hence the separate
# contexts
with tf.control_dependencies(
control_inputs=[cov_time_var, cov_feat_var]):
with tf.control_dependencies(
control_inputs=[mean_time_var, mean_feat_var]):
with tf.control_dependencies(
control_inputs=[count_time_var, count_feat_var]):
# Time based
# shape = (num_feat, num_feat)
abs_err_inv_cov_time_tensor = \
tf.matrix_inverse(input=cov_time_var + \
tf.eye(num_rows=tf.shape(input=cov_time_var)[0],
dtype=tf.float64) * params["eps"])
# Features based
# shape = (seq_len, seq_len)
abs_err_inv_cov_feat_tensor = \
tf.matrix_inverse(input=cov_feat_var + \
tf.eye(num_rows=tf.shape(input=cov_feat_var)[0],
dtype=tf.float64) * params["eps"])
with tf.control_dependencies(
control_inputs=[tf.assign(ref=abs_err_inv_cov_time_var,
value=abs_err_inv_cov_time_tensor),
tf.assign(ref=abs_err_inv_cov_feat_var,
value=abs_err_inv_cov_feat_tensor)]):
loss = tf.reduce_sum(
input_tensor=tf.zeros(shape=(), dtype=tf.float64) * dummy_var)
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.train.get_global_step(),
learning_rate=params["learning_rate"],
optimizer="SGD")
return loss, train_op
In [ ]:
%%writefile anomaly_detection_module/trainer/tune_anomaly_threshold_vars.py
import tensorflow as tf
def create_confusion_matrix_thresh_vars(scope, var_name, size):
"""Creates confusion matrix threshold variables.
Given variable scope, name, and size, create and return confusion matrix
threshold variables for true positives, false negatives, false positives,
true negatives.
Args:
scope: String of variable scope name.
var_name: String denoting which set of variables to create. Values are
"time" and "feat".
size: The size of the variable, number of time/feature thresholds.
Returns:
Confusion matrix threshold variables for true positives, false negatives,
false positives, true negatives.
"""
with tf.variable_scope(
name_or_scope=scope, reuse=tf.AUTO_REUSE):
tp_thresh_var = tf.get_variable(
name="tp_thresh_{0}_var".format(var_name),
dtype=tf.int64,
initializer=tf.zeros(
shape=size, dtype=tf.int64),
trainable=False)
fn_thresh_var = tf.get_variable(
name="fn_thresh_{0}_var".format(var_name),
dtype=tf.int64,
initializer=tf.zeros(
shape=size, dtype=tf.int64),
trainable=False)
fp_thresh_var = tf.get_variable(
name="fp_thresh_{0}_var".format(var_name),
dtype=tf.int64,
initializer=tf.zeros(
shape=size, dtype=tf.int64),
trainable=False)
tn_thresh_var = tf.get_variable(
name="tn_thresh_{0}_var".format(var_name),
dtype=tf.int64,
initializer=tf.zeros(
shape=size, dtype=tf.int64),
trainable=False)
return (tp_thresh_var,
fn_thresh_var,
fp_thresh_var,
tn_thresh_var)
def create_both_confusion_matrix_thresh_vars(
scope, time_thresh_size, feat_thresh_size):
"""Creates both time & feature major confusion matrix threshold variables.
Given variable scope and sizes, create and return confusion
matrix threshold variables for true positives, false negatives, false
positives, and true negatives for both time and feature major
representations.
Args:
scope: String of variable scope name.
time_thresh_size: Variable size of number of time major thresholds.
feat_thresh_size: Variable size of number of feature major thresholds.
Returns:
Confusion matrix threshold variables for true positives, false negatives,
false positives, true negatives for both time and feature major
representations.
"""
# Time based
(tp_thresh_time_var,
fn_thresh_time_var,
fp_thresh_time_var,
tn_thresh_time_var) = create_confusion_matrix_thresh_vars(
scope=scope, var_name="time", size=time_thresh_size)
# Features based
(tp_thresh_feat_var,
fn_thresh_feat_var,
fp_thresh_feat_var,
tn_thresh_feat_var) = create_confusion_matrix_thresh_vars(
scope=scope, var_name="feat", size=feat_thresh_size)
return (tp_thresh_time_var,
fn_thresh_time_var,
fp_thresh_time_var,
tn_thresh_time_var,
tp_thresh_feat_var,
fn_thresh_feat_var,
fp_thresh_feat_var,
tn_thresh_feat_var)
def create_mahalanobis_unsupervised_thresh_vars(scope, var_name):
"""Creates mahalanobis unsupervised threshold variables.
Given variable scope and name, create and return mahalanobis unsupervised
threshold variables of mean and standard deviation.
Args:
scope: String of variable scope name.
var_name: String denoting which set of variables to create. Values are
"time" and "feat".
Returns:
Mahalanobis unsupervised threshold variables of count, mean, and standard
deviation.
"""
with tf.variable_scope(
name_or_scope=scope, reuse=tf.AUTO_REUSE):
count_thresh_var = tf.get_variable(
name="count_thresh_{0}_var".format(var_name),
dtype=tf.int64,
initializer=tf.zeros(
shape=[], dtype=tf.int64),
trainable=False)
mean_thresh_var = tf.get_variable(
name="mean_thresh_{0}_var".format(var_name),
dtype=tf.float64,
initializer=tf.zeros(
shape=[], dtype=tf.float64),
trainable=False)
var_thresh_var = tf.get_variable(
name="var_thresh_{0}_var".format(var_name),
dtype=tf.float64,
initializer=tf.zeros(
shape=[], dtype=tf.float64),
trainable=False)
return (count_thresh_var,
mean_thresh_var,
var_thresh_var)
def create_both_mahalanobis_unsupervised_thresh_vars(scope):
"""Creates time & feature mahalanobis unsupervised threshold variables.
Given variable scope, create and return mahalanobis unsupervised
threshold variables of mean and standard deviation for both time and
feature major representations.
Args:
scope: String of variable scope name.
Returns:
Mahalanobis unsupervised threshold variables of mean and standard
deviation for both time and feature major representations.
"""
# Time based
(count_thresh_time_var,
mean_thresh_time_var,
var_thresh_time_var) = create_mahalanobis_unsupervised_thresh_vars(
scope=scope, var_name="time")
# Features based
(count_thresh_feat_var,
mean_thresh_feat_var,
var_thresh_feat_var) = create_mahalanobis_unsupervised_thresh_vars(
scope=scope, var_name="feat")
return (count_thresh_time_var,
mean_thresh_time_var,
var_thresh_time_var,
count_thresh_feat_var,
mean_thresh_feat_var,
var_thresh_feat_var)
In [ ]:
%%writefile anomaly_detection_module/trainer/tune_anomaly_thresholds_supervised.py
import tensorflow as tf
def calculate_threshold_confusion_matrix(labels_mask, preds, num_thresh):
"""Calculates confusion matrix based on thresholds.
Given labels mask, predictions, and number of thresholds, returns count
for cell in confusion matrix.
Args:
labels_mask: tf.bool vector tensor when label was normal or
anomalous.
preds: Predicted anomaly labels.
num_thresh: Number of anomaly thresholds to try in parallel grid search.
Returns:
Count for cell in confusion matrix.
"""
count = tf.reduce_sum(
input_tensor=tf.cast(
x=tf.map_fn(
fn=lambda threshold: tf.logical_and(
x=labels_mask,
y=preds[threshold, :]),
elems=tf.range(start=0, limit=num_thresh, dtype=tf.int64),
dtype=tf.bool),
dtype=tf.int64),
axis=1)
return count
def update_anom_thresh_vars(
labels_norm_mask,
labels_anom_mask,
num_thresh,
anom_thresh,
mahalanobis_dist,
tp_at_thresh_var,
fn_at_thresh_var,
fp_at_thresh_var,
tn_at_thresh_var,
mode):
"""Updates anomaly threshold variables.
Given masks for when labels are normal and anomalous, the number of anomaly
thresholds and the thresholds themselves, the mahalanobis distance, variables
for the confusion matrix, and the current Estimator mode, returns the updated
variables for the confusion matrix.
Args:
labels_norm_mask: tf.bool vector tensor that is true when label was normal.
labels_anom_mask: tf.bool vector tensor that is true when label was
anomalous.
num_thresh: Number of anomaly thresholds to try in parallel grid search.
anom_thresh: tf.float64 vector tensor of grid of anomaly thresholds to try.
mahalanobis_dist: tf.float64 matrix tensor of mahalanobis distances across
batch.
tp_at_thresh_var: tf.int64 variable tracking number of true positives at
each possible anomaly threshold.
fn_at_thresh_var: tf.int64 variable tracking number of false negatives at
each possible anomaly threshold.
fp_at_thresh_var: tf.int64 variable tracking number of false positives at
each possible anomaly threshold.
tn_at_thresh_var: tf.int64 variable tracking number of true negatives at
each possible anomaly threshold.
mode: Estimator ModeKeys, can take values of TRAIN and EVAL.
Returns:
Updated confusion matrix variables.
"""
if mode == tf.estimator.ModeKeys.TRAIN:
# time_shape = (num_time_anom_thresh, cur_batch_size, seq_len)
# feat_shape = (num_feat_anom_thresh, cur_batch_size, num_feat)
mahalanobis_dist_over_thresh = tf.map_fn(
fn=lambda anom_threshold: mahalanobis_dist > anom_threshold,
elems=anom_thresh,
dtype=tf.bool)
else:
# time_shape = (cur_batch_size, seq_len)
# feat_shape = (cur_batch_size, num_feat)
mahalanobis_dist_over_thresh = mahalanobis_dist > anom_thresh
# time_shape = (num_time_anom_thresh, cur_batch_size)
# feat_shape = (num_feat_anom_thresh, cur_batch_size)
mahalanobis_dist_any_over_thresh = tf.reduce_any(
input_tensor=mahalanobis_dist_over_thresh, axis=-1)
if mode == tf.estimator.ModeKeys.EVAL:
# time_shape = (1, cur_batch_size)
# feat_shape = (1, cur_batch_size)
mahalanobis_dist_any_over_thresh = tf.expand_dims(
input=mahalanobis_dist_any_over_thresh, axis=0)
# time_shape = (num_time_anom_thresh, cur_batch_size)
# feat_shape = (num_feat_anom_thresh, cur_batch_size)
predicted_normals = tf.equal(
x=mahalanobis_dist_any_over_thresh, y=False)
# time_shape = (num_time_anom_thresh, cur_batch_size)
# feat_shape = (num_feat_anom_thresh, cur_batch_size)
predicted_anomalies = tf.equal(
x=mahalanobis_dist_any_over_thresh, y=True)
# Calculate confusion matrix of current batch
# time_shape = (num_time_anom_thresh,)
# feat_shape = (num_feat_anom_thresh,)
tp = calculate_threshold_confusion_matrix(
labels_anom_mask, predicted_anomalies, num_thresh)
fn = calculate_threshold_confusion_matrix(
labels_anom_mask, predicted_normals, num_thresh)
fp = calculate_threshold_confusion_matrix(
labels_norm_mask, predicted_anomalies, num_thresh)
tn = calculate_threshold_confusion_matrix(
labels_norm_mask, predicted_normals, num_thresh)
if mode == tf.estimator.ModeKeys.EVAL:
# shape = ()
tp = tf.squeeze(input=tp)
fn = tf.squeeze(input=fn)
fp = tf.squeeze(input=fp)
tn = tf.squeeze(input=tn)
with tf.control_dependencies(
control_inputs=[tf.assign_add(ref=tp_at_thresh_var, value=tp),
tf.assign_add(ref=fn_at_thresh_var, value=fn),
tf.assign_add(ref=fp_at_thresh_var, value=fp),
tf.assign_add(ref=tn_at_thresh_var, value=tn)]):
return (tf.identity(input=tp_at_thresh_var),
tf.identity(input=fn_at_thresh_var),
tf.identity(input=fp_at_thresh_var),
tf.identity(input=tn_at_thresh_var))
def calculate_composite_classification_metrics(tp, fn, fp, tn, f_score_beta):
"""Calculates compositive classification metrics from the confusion matrix.
Given variables for the confusion matrix and the value of beta for f-beta
score, returns accuracy, precision, recall, and f-beta score composite
metrics.
Args:
tp: tf.int64 variable tracking number of true positives at
each possible anomaly threshold.
fn: tf.int64 variable tracking number of false negatives at
each possible anomaly threshold.
fp: tf.int64 variable tracking number of false positives at
each possible anomaly threshold.
tn: tf.int64 variable tracking number of true negatives at
each possible anomaly threshold.
f_score_beta: Value of beta for f-beta score.
Returns:
Accuracy, precision, recall, and f-beta score composite metric tensors.
"""
# time_shape = (num_time_anom_thresh,)
# feat_shape = (num_feat_anom_thresh,)
acc = tf.cast(x=tp + tn, dtype=tf.float64) \
/ tf.cast(x=tp + fn + fp + tn, dtype=tf.float64)
tp_float64 = tf.cast(x=tp, dtype=tf.float64)
pre = tp_float64 / tf.cast(x=tp + fp, dtype=tf.float64)
rec = tp_float64 / tf.cast(x=tp + fn, dtype=tf.float64)
f_beta_numerator = (1.0 + f_score_beta ** 2) * (pre * rec)
f_beta_score = f_beta_numerator / (f_score_beta ** 2 * pre + rec)
return acc, pre, rec, f_beta_score
def find_best_anom_thresh(
anom_threshs, f_beta_score, anom_thresh_var):
"""Find best anomaly threshold to use for anomaly classification.
Given vector of anomaly thresholds and the value of beta for f-beta score,
returns updated variable that stores the best anomaly threshold value.
Args:
anom_threshs: tf.float64 vector tensor of grid of anomaly thresholds to try.
f_beta_score: tf.float64 vector tensor of f-beta scores for each anomaly
threshold.
anom_thresh_var: tf.float64 variable that stores anomaly threshold value.
Returns:
Updated variable that stores the anomaly threshold value.
"""
# shape = ()
best_anom_thresh = tf.gather(
params=anom_threshs, indices=tf.argmax(input=f_beta_score, axis=0))
with tf.control_dependencies(
control_inputs=[tf.assign(
ref=anom_thresh_var, value=best_anom_thresh)]):
return tf.identity(input=anom_thresh_var)
def optimize_anomaly_theshold(
var_name,
labels_norm_mask,
labels_anom_mask,
mahalanobis_dist,
tp_thresh_var,
fn_thresh_var,
fp_thresh_var,
tn_thresh_var,
params,
mode,
anom_thresh_var):
"""Optimizes anomaly threshold for anomaly classification.
Given variable name, label masks, mahalanobis distance, variables for
confusion matrix, and dictionary of parameters, returns accuracy, precision,
recall, and f-beta score composite metrics.
Args:
var_name: String denoting which set of variables to use. Values are
"time" and "feat".
labels_norm_mask: tf.bool vector mask of labels for normals.
labels_anom_mask: tf.bool vector mask of labels for anomalies.
mahalanobis_dist: Mahalanobis distance of reconstruction error.
tp_thresh_var: tf.int64 variable to track number of true positives wrt
thresholds.
fn_thresh_var: tf.int64 variable to track number of false negatives wrt
thresholds.
fp_thresh_var: tf.int64 variable to track number of false positives wrt
thresholds.
tn_thresh_var: tf.int64 variable to track number of true negatives wrt
thresholds.
params: Dictionary of parameters.
mode: Estimator ModeKeys, can take values of TRAIN and EVAL.
anom_thresh_var: tf.float64 variable that stores anomaly threshold value.
Returns:
Updated variable that stores the anomaly threshold value
"""
# shape = (num_anom_thresh,)
anom_threshs = tf.linspace(
start=tf.constant(
value=params["min_{}_anom_thresh".format(var_name)],
dtype=tf.float64),
stop=tf.constant(
value=params["max_{}_anom_thresh".format(var_name)],
dtype=tf.float64),
num=params["num_{}_anom_thresh".format(var_name)])
with tf.variable_scope(
name_or_scope="mahalanobis_dist_thresh_vars",
reuse=tf.AUTO_REUSE):
(tp_update_op,
fn_update_op,
fp_update_op,
tn_update_op) = \
update_anom_thresh_vars(
labels_norm_mask,
labels_anom_mask,
params["num_{}_anom_thresh".format(var_name)],
anom_threshs,
mahalanobis_dist,
tp_thresh_var,
fn_thresh_var,
fp_thresh_var,
tn_thresh_var,
mode)
with tf.control_dependencies(
control_inputs=[
tp_update_op,
fn_update_op,
fp_update_op,
tn_update_op]):
_, pre, rec, f_beta = \
calculate_composite_classification_metrics(
tp_thresh_var,
fn_thresh_var,
fp_thresh_var,
tn_thresh_var,
params["f_score_beta"])
with tf.control_dependencies(control_inputs=[pre, rec]):
with tf.control_dependencies(control_inputs=[f_beta]):
best_anom_thresh = find_best_anom_thresh(
anom_threshs,
f_beta,
anom_thresh_var)
with tf.control_dependencies(control_inputs=[best_anom_thresh]):
return tf.identity(input=anom_thresh_var)
def set_anom_thresh(user_passed_anom_thresh, anom_thresh_var):
"""Set anomaly threshold to use for anomaly classification from user input.
Given user passed anomaly threshold returns updated variable that stores
the anomaly threshold value.
Args:
user_passed_anom_thresh: User passed anomaly threshold that overrides
the threshold optimization.
anom_thresh_var: tf.float64 variable that stores anomaly threshold value.
Returns:
Updated variable that stores the anomaly threshold value.
"""
with tf.control_dependencies(
control_inputs=[tf.assign(
ref=anom_thresh_var, value=user_passed_anom_thresh)]):
return tf.identity(input=anom_thresh_var)
def tune_anomaly_thresholds_supervised_training(
labels_norm_mask,
labels_anom_mask,
mahalanobis_dist_time,
tp_thresh_time_var,
fn_thresh_time_var,
fp_thresh_time_var,
tn_thresh_time_var,
time_anom_thresh_var,
mahalanobis_dist_feat,
tp_thresh_feat_var,
fn_thresh_feat_var,
fp_thresh_feat_var,
tn_thresh_feat_var,
feat_anom_thresh_var,
params,
mode,
dummy_var):
"""Tunes anomaly thresholds during supervised training mode.
Given label masks, mahalanobis distances, confusion matrices, and anomaly
thresholds, returns loss and train_op.
Args:
labels_norm_mask: tf.bool vector mask of labels for normals.
labels_anom_mask: tf.bool vector mask of labels for anomalies.
mahalanobis_dist_time: Mahalanobis distance, time major.
tp_thresh_time_var: tf.int64 variable to track number of true positives wrt
thresholds for time major case.
fn_thresh_time_var: tf.int64 variable to track number of false negatives wrt
thresholds for time major case.
fp_thresh_time_var: tf.int64 variable to track number of false positives wrt
thresholds for time major case.
tn_thresh_time_var: tf.int64 variable to track number of true negatives wrt
thresholds for time major case.
time_anom_thresh_var: tf.float64 variable to hold the set time anomaly
threshold.
mahalanobis_dist_feat: Mahalanobis distance, features major.
tp_thresh_feat_var: tf.int64 variable to track number of true positives wrt
thresholds for feat major case.
fn_thresh_feat_var: tf.int64 variable to track number of false negatives wrt
thresholds for feat major case.
fp_thresh_feat_var: tf.int64 variable to track number of false positives wrt
thresholds for feat major case.
tn_thresh_feat_var: tf.int64 variable to track number of true negatives wrt
thresholds for feat major case.
feat_anom_thresh_var: tf.float64 variable to hold the set feat anomaly
threshold.
params: Dictionary of parameters.
mode: Estimator ModeKeys. Can take value of only TRAIN.
dummy_var: Dummy variable used to allow training mode to happen since it
requires a gradient to tie back to the graph dependency.
Returns:
loss: The scalar loss to tie our updates back to Estimator graph.
train_op: The train operation to tie our updates back to Estimator graph.
"""
# Time based
if params["time_anom_thresh"] is None:
best_anom_thresh_time = optimize_anomaly_theshold(
"time",
labels_norm_mask,
labels_anom_mask,
mahalanobis_dist_time,
tp_thresh_time_var,
fn_thresh_time_var,
fp_thresh_time_var,
tn_thresh_time_var,
params,
mode,
time_anom_thresh_var)
else:
best_anom_thresh_time = set_anom_thresh(
params["time_anom_thresh"], time_anom_thresh_var)
# Features based
if params["feat_anom_thresh"] is None:
best_anom_thresh_feat = optimize_anomaly_theshold(
"feat",
labels_norm_mask,
labels_anom_mask,
mahalanobis_dist_feat,
tp_thresh_feat_var,
fn_thresh_feat_var,
fp_thresh_feat_var,
tn_thresh_feat_var,
params,
mode,
feat_anom_thresh_var)
else:
best_anom_thresh_feat = set_anom_thresh(
params["feat_anom_thresh"], feat_anom_thresh_var)
with tf.control_dependencies(
control_inputs=[best_anom_thresh_time,
best_anom_thresh_feat]):
loss = tf.reduce_sum(
input_tensor=tf.zeros(
shape=(), dtype=tf.float64) * dummy_var)
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.train.get_global_step(),
learning_rate=params["learning_rate"],
optimizer="SGD")
return loss, train_op
def tune_anomaly_thresholds_supervised_eval(
labels_norm_mask,
labels_anom_mask,
time_anom_thresh_var,
mahalanobis_dist_time,
tp_thresh_eval_time_var,
fn_thresh_eval_time_var,
fp_thresh_eval_time_var,
tn_thresh_eval_time_var,
feat_anom_thresh_var,
mahalanobis_dist_feat,
tp_thresh_eval_feat_var,
fn_thresh_eval_feat_var,
fp_thresh_eval_feat_var,
tn_thresh_eval_feat_var,
params,
mode):
"""Checks tuned anomaly thresholds during supervised evaluation mode.
Given label masks, mahalanobis distances, confusion matrices, and anomaly
thresholds, returns loss and eval_metric_ops.
Args:
labels_norm_mask: tf.bool vector mask of labels for normals.
labels_anom_mask: tf.bool vector mask of labels for anomalies.
time_anom_thresh_var: tf.float64 scalar time anomaly threshold value.
mahalanobis_dist_time: Mahalanobis distance, time major.
tp_thresh_eval_time_var: tf.int64 variable to track number of true
positives wrt thresholds for time major case for evaluation.
fn_thresh_eval_time_var: tf.int64 variable to track number of false
negatives wrt thresholds for time major case for evaluation.
fp_thresh_eval_time_var: tf.int64 variable to track number of false
positives wrt thresholds for time major case for evaluation.
tn_thresh_eval_time_var: tf.int64 variable to track number of true
negatives wrt thresholds for time major case for evaluation.
feat_anom_thresh_var: tf.float64 scalar feature anomaly threshold value.
mahalanobis_dist_feat: Mahalanobis distance, features major.
tp_thresh_eval_feat_var: tf.int64 variable to track number of true
positives wrt thresholds for feat major case for evaluation.
fn_thresh_eval_feat_var: tf.int64 variable to track number of false
negatives wrt thresholds for feat major case for evaluation.
fp_thresh_eval_feat_var: tf.int64 variable to track number of false
positives wrt thresholds for feat major case for evaluation.
tn_thresh_eval_feat_var: tf.int64 variable to track number of true
negatives wrt thresholds for feat major case for evaluation.
params: Dictionary of parameters.
mode: Estimator ModeKeys. Can take value of only EVAL.
Returns:
loss: Scalar reconstruction loss.
eval_metric_ops: Evaluation metrics of threshold tuning.
"""
with tf.variable_scope(
name_or_scope="anom_thresh_eval_vars", reuse=tf.AUTO_REUSE):
# Time based
(tp_time_update_op,
fn_time_update_op,
fp_time_update_op,
tn_time_update_op) = \
update_anom_thresh_vars(
labels_norm_mask,
labels_anom_mask,
1,
time_anom_thresh_var,
mahalanobis_dist_time,
tp_thresh_eval_time_var,
fn_thresh_eval_time_var,
fp_thresh_eval_time_var,
tn_thresh_eval_time_var,
mode)
# Features based
(tp_feat_update_op,
fn_feat_update_op,
fp_feat_update_op,
tn_feat_update_op) = \
update_anom_thresh_vars(
labels_norm_mask,
labels_anom_mask,
1,
feat_anom_thresh_var,
mahalanobis_dist_feat,
tp_thresh_eval_feat_var,
fn_thresh_eval_feat_var,
fp_thresh_eval_feat_var,
tn_thresh_eval_feat_var,
mode)
with tf.variable_scope(
name_or_scope="anom_thresh_eval_vars", reuse=tf.AUTO_REUSE):
# Time based
(acc_time_update_op,
pre_time_update_op,
rec_time_update_op,
f_beta_time_update_op) = \
calculate_composite_classification_metrics(
tp_thresh_eval_time_var,
fn_thresh_eval_time_var,
fp_thresh_eval_time_var,
tn_thresh_eval_time_var,
params["f_score_beta"])
# Features based
(acc_feat_update_op,
pre_feat_update_op,
rec_feat_update_op,
f_beta_feat_update_op) = \
calculate_composite_classification_metrics(
tp_thresh_eval_feat_var,
fn_thresh_eval_feat_var,
fp_thresh_eval_feat_var,
tn_thresh_eval_feat_var,
params["f_score_beta"])
loss = tf.zeros(shape=[], dtype=tf.float64)
# Time based
acc_trues = tf.cast(
x=tp_thresh_eval_time_var + tn_thresh_eval_time_var,
dtype=tf.float64)
acc_falses = tf.cast(
x=fp_thresh_eval_time_var + fn_thresh_eval_time_var,
dtype=tf.float64)
acc_thresh_eval_time_var = acc_trues / (acc_trues + acc_falses)
tp_float = tf.cast(x=tp_thresh_eval_time_var, dtype=tf.float64)
pre_denominator = tf.cast(
x=tp_thresh_eval_time_var + fp_thresh_eval_time_var,
dtype=tf.float64)
pre_thresh_eval_time_var = tp_float / pre_denominator
rec_denominator = tf.cast(
x=tp_thresh_eval_time_var + fn_thresh_eval_time_var,
dtype=tf.float64)
rec_thresh_eval_time_var = tp_float / rec_denominator
f_beta_numerator = (1.0 + params["f_score_beta"] ** 2)
f_beta_numerator *= pre_thresh_eval_time_var
f_beta_numerator *= rec_thresh_eval_time_var
f_beta_denominator = params["f_score_beta"] ** 2
f_beta_denominator *= pre_thresh_eval_time_var
f_beta_denominator += rec_thresh_eval_time_var
f_beta_thresh_eval_time_var = f_beta_numerator / f_beta_denominator
# Features based
acc_trues = tf.cast(
x=tp_thresh_eval_feat_var + tn_thresh_eval_feat_var,
dtype=tf.float64)
acc_falses = tf.cast(
x=fp_thresh_eval_feat_var + fn_thresh_eval_feat_var,
dtype=tf.float64)
acc_thresh_eval_feat_var = acc_trues / (acc_trues + acc_falses)
tp_float = tf.cast(x=tp_thresh_eval_feat_var, dtype=tf.float64)
pre_denominator = tf.cast(
x=tp_thresh_eval_feat_var + fp_thresh_eval_feat_var,
dtype=tf.float64)
pre_thresh_eval_feat_var = tp_float / pre_denominator
rec_denominator = tf.cast(
x=tp_thresh_eval_feat_var + fn_thresh_eval_feat_var,
dtype=tf.float64)
rec_thresh_eval_feat_var = tp_float / rec_denominator
f_beta_numerator = (1.0 + params["f_score_beta"] ** 2)
f_beta_numerator *= pre_thresh_eval_feat_var
f_beta_numerator *= rec_thresh_eval_feat_var
f_beta_denominator = params["f_score_beta"] ** 2
f_beta_denominator *= pre_thresh_eval_feat_var
f_beta_denominator += rec_thresh_eval_feat_var
f_beta_thresh_eval_feat_var = f_beta_numerator / f_beta_denominator
# Anomaly detection eval metrics
eval_metric_ops = {
# Time based
"time_anom_tp": (tp_thresh_eval_time_var, tp_time_update_op),
"time_anom_fn": (fn_thresh_eval_time_var, fn_time_update_op),
"time_anom_fp": (fp_thresh_eval_time_var, fp_time_update_op),
"time_anom_tn": (tn_thresh_eval_time_var, tn_time_update_op),
"time_anom_acc": (acc_thresh_eval_time_var, acc_time_update_op),
"time_anom_pre": (pre_thresh_eval_time_var, pre_time_update_op),
"time_anom_rec": (rec_thresh_eval_time_var, rec_time_update_op),
"time_anom_f_beta": (f_beta_thresh_eval_time_var,
f_beta_time_update_op),
# Features based
"feat_anom_tp": (tp_thresh_eval_feat_var, tp_feat_update_op),
"feat_anom_fn": (fn_thresh_eval_feat_var, fn_feat_update_op),
"feat_anom_fp": (fp_thresh_eval_feat_var, fp_feat_update_op),
"feat_anom_tn": (tn_thresh_eval_feat_var, tn_feat_update_op),
"feat_anom_acc": (acc_thresh_eval_feat_var, acc_feat_update_op),
"feat_anom_pre": (pre_thresh_eval_feat_var, pre_feat_update_op),
"feat_anom_rec": (rec_thresh_eval_feat_var, rec_feat_update_op),
"feat_anom_f_beta": (f_beta_thresh_eval_feat_var,
f_beta_feat_update_op)
}
return loss, eval_metric_ops
In [ ]:
%%writefile anomaly_detection_module/trainer/tune_anomaly_thresholds_unsupervised.py
import tensorflow as tf
from .calculate_error_distribution_statistics import non_singleton_batch_var_variable_updating
from .calculate_error_distribution_statistics import singleton_batch_var_variable_updating
from .predict import flag_anomalies_by_thresholding
def tune_anomaly_thresholds_unsupervised_training(
cur_batch_size,
time_anom_thresh_var,
mahalanobis_dist_time,
count_thresh_time_var,
mean_thresh_time_var,
var_thresh_time_var,
feat_anom_thresh_var,
mahalanobis_dist_feat,
count_thresh_feat_var,
mean_thresh_feat_var,
var_thresh_feat_var,
params,
dummy_var):
"""Tunes anomaly thresholds during unsupervised training mode.
Given dimensions of inputs, mahalanobis distances, and variables tracking
counts, means, and variances of mahalanobis distance, returns loss and
train_op.
Args:
cur_batch_size: Current batch size, could be partially filled.
time_anom_thresh_var: Time anomaly threshold variable.
mahalanobis_dist_time: Time major mahalanobis distance.
count_thresh_time_var: Time major running count of number of records.
mean_thresh_time_var: Time major running mean of mahalanobis distance.
var_thresh_time_var: Time major running variance of mahalanobis distance.
feat_anom_thresh_var: Feature anomaly threshold variable.
mahalanobis_dist_feat: Feature major mahalanobis distance.
count_thresh_feat_var: Feature major running count of number of records.
mean_thresh_feat_var: Feature major running mean of mahalanobis distance.
var_thresh_feat_var: Feature major running variance of mahalanobis distance.
params: Dictionary of parameters.
dummy_var: Dummy variable used to allow training mode to happen since it
requires a gradient to tie back to the graph dependency.
Returns:
loss: The scalar loss to tie our updates back to Estimator graph.
train_op: The train operation to tie our updates back to Estimator graph.
"""
with tf.variable_scope(
name_or_scope="mahalanobis_dist_thresh_vars", reuse=tf.AUTO_REUSE):
# Time based
mahalanobis_dist_time_flat = tf.reshape(
tensor=mahalanobis_dist_time,
shape=[cur_batch_size * params["seq_len"]])
singleton_time_condition = tf.equal(
x=cur_batch_size * params["seq_len"], y=1)
var_time_var, mean_time_var, count_time_var = tf.cond(
pred=singleton_time_condition,
true_fn=lambda: singleton_batch_var_variable_updating(
params["seq_len"],
mahalanobis_dist_time_flat,
count_thresh_time_var,
mean_thresh_time_var,
var_thresh_time_var),
false_fn=lambda: non_singleton_batch_var_variable_updating(
cur_batch_size,
params["seq_len"],
mahalanobis_dist_time_flat,
count_thresh_time_var,
mean_thresh_time_var,
var_thresh_time_var))
# Features based
mahalanobis_dist_feat_flat = tf.reshape(
tensor=mahalanobis_dist_feat,
shape=[cur_batch_size * params["num_feat"]])
singleton_feat_condition = tf.equal(
x=cur_batch_size * params["num_feat"], y=1)
var_feat_var, mean_feat_var, count_feat_var = tf.cond(
pred=singleton_feat_condition,
true_fn=lambda: singleton_batch_var_variable_updating(
params["num_feat"],
mahalanobis_dist_feat_flat,
count_thresh_feat_var,
mean_thresh_feat_var,
var_thresh_feat_var),
false_fn=lambda: non_singleton_batch_var_variable_updating(
cur_batch_size,
params["num_feat"],
mahalanobis_dist_feat_flat,
count_thresh_feat_var,
mean_thresh_feat_var,
var_thresh_feat_var))
# Lastly use control dependencies around loss to enforce the mahalanobis
# variables to be assigned, the control order matters, hence the separate
# contexts.
with tf.control_dependencies(
control_inputs=[var_time_var, var_feat_var]):
with tf.control_dependencies(
control_inputs=[mean_time_var, mean_feat_var]):
with tf.control_dependencies(
control_inputs=[count_time_var, count_feat_var]):
time_out = mean_time_var
time_out += params["time_thresh_scl"] * tf.sqrt(x=var_time_var)
feat_out = mean_feat_var
feat_out += params["feat_thresh_scl"] * tf.sqrt(x=var_feat_var)
with tf.control_dependencies(
control_inputs=[tf.assign(ref=time_anom_thresh_var,
value=time_out),
tf.assign(ref=feat_anom_thresh_var,
value=feat_out)]):
loss = tf.reduce_sum(
input_tensor=tf.zeros(shape=(), dtype=tf.float64) * dummy_var)
train_op = tf.contrib.layers.optimize_loss(
loss=loss,
global_step=tf.train.get_global_step(),
learning_rate=params["learning_rate"],
optimizer="SGD")
return loss, train_op
def tune_anomaly_thresholds_unsupervised_eval(
cur_batch_size,
time_anom_thresh_var,
mahalanobis_dist_time,
feat_anom_thresh_var,
mahalanobis_dist_feat):
"""Checks tuned anomaly thresholds during supervised evaluation mode.
Given dimensions of inputs, mahalanobis distances, and variables tracking
counts, means, and variances of mahalanobis distance, returns loss and
train_op.
Args:
cur_batch_size: Current batch size, could be partially filled.
time_anom_thresh_var: Time anomaly threshold variable.
mahalanobis_dist_time: Time major mahalanobis distance.
feat_anom_thresh_var: Feature anomaly threshold variable.
mahalanobis_dist_feat: Feature major mahalanobis distance.
Returns:
loss: The scalar loss to tie our updates back to Estimator graph.
eval_metric_ops: Evaluation metrics of threshold tuning.
"""
loss = tf.zeros(shape=[], dtype=tf.float64)
# Flag predictions as either normal or anomalous
# shape = (cur_batch_size,)
time_anom_flags = flag_anomalies_by_thresholding(
cur_batch_size, mahalanobis_dist_time, time_anom_thresh_var)
# shape = (cur_batch_size,)
feat_anom_flags = flag_anomalies_by_thresholding(
cur_batch_size, mahalanobis_dist_feat, feat_anom_thresh_var)
# Anomaly detection eval metrics
eval_metric_ops = {
# Time based
"time_anom_tp": tf.metrics.mean(values=time_anom_flags),
# Features based
"feat_anom_tp": tf.metrics.mean(values=feat_anom_flags)
}
return loss, eval_metric_ops
In [ ]:
%%writefile anomaly_detection_module/trainer/predict.py
import tensorflow as tf
def flag_anomalies_by_thresholding(
cur_batch_size, mahalanobis_dist, anom_thresh_var):
"""Flags anomalies by thresholding.
Given current batch size, mahalanobis distance, and anomaly threshold
variable, return predicted anomaly flags.
Args:
cur_batch_size: Current batch size, could be partially filled.
mahalanobis_dist: Mahalanobis distance.
anom_thresh_var: Anomaly threshold variable.
Returns:
anomaly_flags: tf.int64 vector of current batch size elements of
0's and 1's indicating if each sequence is anomalous or not.
"""
anom_flags = tf.where(
condition=tf.reduce_any(
input_tensor=tf.greater(
x=tf.abs(x=mahalanobis_dist),
y=anom_thresh_var),
axis=1),
x=tf.ones(shape=[cur_batch_size], dtype=tf.int64),
y=tf.zeros(shape=[cur_batch_size], dtype=tf.int64))
return anom_flags
def anomaly_detection_predictions(
cur_batch_size,
seq_len,
num_feat,
mahalanobis_dist_time,
mahalanobis_dist_feat,
time_anom_thresh_var,
feat_anom_thresh_var,
X_time_abs_recon_err,
X_feat_abs_recon_err):
"""Creates Estimator predictions and export outputs.
Given dimensions of inputs, mahalanobis distances and their respective
thresholds, and reconstructed inputs' absolute errors, returns Estimator's
predictions and export outputs.
Args:
cur_batch_size: Current batch size, could be partially filled.
seq_len: Number of timesteps in sequence.
num_feat: Number of features.
mahalanobis_dist_time: Mahalanobis distance, time major.
mahalanobis_dist_feat: Mahalanobis distance, features major.
time_anom_thresh_var: Time anomaly threshold variable.
feat_anom_thresh_var: Features anomaly threshold variable.
X_time_abs_recon_err: Time major reconstructed input data's absolute
reconstruction error.
X_feat_abs_recon_err: Features major reconstructed input data's absolute
reconstruction error.
Returns:
predictions_dict: Dictionary of predictions to output for local prediction.
export_outputs: Dictionary to output from exported model for serving.
"""
# Flag predictions as either normal or anomalous
# shape = (cur_batch_size,)
time_anom_flags = flag_anomalies_by_thresholding(
cur_batch_size, mahalanobis_dist_time, time_anom_thresh_var)
# shape = (cur_batch_size,)
feat_anom_flags = flag_anomalies_by_thresholding(
cur_batch_size, mahalanobis_dist_feat, feat_anom_thresh_var)
# Create predictions dictionary
predictions_dict = {
"X_time_abs_recon_err": tf.reshape(
tensor=X_time_abs_recon_err,
shape=[cur_batch_size, seq_len, num_feat]),
"X_feat_abs_recon_err": tf.transpose(
a=tf.reshape(
tensor=X_feat_abs_recon_err,
shape=[cur_batch_size, num_feat, seq_len]),
perm=[0, 2, 1]),
"mahalanobis_dist_time": mahalanobis_dist_time,
"mahalanobis_dist_feat": mahalanobis_dist_feat,
"time_anom_thresh_var": tf.fill(
dims=[cur_batch_size], value=time_anom_thresh_var),
"feat_anom_thresh_var": tf.fill(
dims=[cur_batch_size], value=feat_anom_thresh_var),
"time_anom_flags": time_anom_flags,
"feat_anom_flags": feat_anom_flags}
# Create export outputs
export_outputs = {
"predict_export_outputs": tf.estimator.export.PredictOutput(
outputs=predictions_dict)
}
return predictions_dict, export_outputs
In [ ]:
%%writefile anomaly_detection_module/trainer/anomaly_detection.py
import tensorflow as tf
from .autoencoder_dense import dense_autoencoder_model
from .autoencoder_lstm import lstm_enc_dec_autoencoder_model
from .autoencoder_pca import pca_model
from .calculate_error_distribution_statistics import calculate_error_distribution_statistics_training
from .calculate_error_distribution_statistics import mahalanobis_dist
from .error_distribution_vars import create_both_mahalanobis_dist_vars
from .predict import anomaly_detection_predictions
from .reconstruction import reconstruction_evaluation
from .tune_anomaly_threshold_vars import create_both_confusion_matrix_thresh_vars
from .tune_anomaly_threshold_vars import create_both_mahalanobis_unsupervised_thresh_vars
from .tune_anomaly_thresholds_supervised import tune_anomaly_thresholds_supervised_training
from .tune_anomaly_thresholds_supervised import tune_anomaly_thresholds_supervised_eval
from .tune_anomaly_thresholds_unsupervised import tune_anomaly_thresholds_unsupervised_training
from .tune_anomaly_thresholds_unsupervised import tune_anomaly_thresholds_unsupervised_eval
# Create our model function to be used in our custom estimator
def anomaly_detection(features, labels, mode, params):
"""Custom Estimator model function for anomaly detection.
Given dictionary of feature tensors, labels tensor, Estimator mode, and
dictionary for parameters, return EstimatorSpec object for custom Estimator.
Args:
features: Dictionary of feature tensors.
labels: Labels tensor or None.
mode: Estimator ModeKeys. Can take values of TRAIN, EVAL, and PREDICT.
params: Dictionary of parameters.
Returns:
EstimatorSpec object.
"""
print("\nanomaly_detection: features = \n{}".format(features))
print("anomaly_detection: labels = \n{}".format(labels))
print("anomaly_detection: mode = \n{}".format(mode))
print("anomaly_detection: params = \n{}".format(params))
# Get input sequence tensor into correct shape
# Get dynamic batch size in case there was a partially filled batch
cur_batch_size = tf.shape(
input=features[params["feat_names"][0]], out_type=tf.int64)[0]
# Stack all of the features into a 3-D tensor
# shape = (cur_batch_size, seq_len, num_feat)
X = tf.stack(
values=[features[key] for key in params["feat_names"]], axis=2)
##############################################################################
# Important to note that flags determining which variables should be created
# need to remain the same through all stages or else they won't be in the
# checkpoint.
# Variables for calculating error distribution statistics
(abs_err_count_time_var,
abs_err_mean_time_var,
abs_err_cov_time_var,
abs_err_inv_cov_time_var,
abs_err_count_feat_var,
abs_err_mean_feat_var,
abs_err_cov_feat_var,
abs_err_inv_cov_feat_var) = create_both_mahalanobis_dist_vars(
seq_len=params["seq_len"], num_feat=params["num_feat"])
# Variables for automatically tuning anomaly thresh
if params["labeled_tune_thresh"]:
(tp_thresh_time_var,
fn_thresh_time_var,
fp_thresh_time_var,
tn_thresh_time_var,
tp_thresh_feat_var,
fn_thresh_feat_var,
fp_thresh_feat_var,
tn_thresh_feat_var) = create_both_confusion_matrix_thresh_vars(
scope="mahalanobis_dist_thresh_vars",
time_thresh_size=[params["num_time_anom_thresh"]],
feat_thresh_size=[params["num_feat_anom_thresh"]])
else:
(count_thresh_time_var,
mean_thresh_time_var,
var_thresh_time_var,
count_thresh_feat_var,
mean_thresh_feat_var,
var_thresh_feat_var) = create_both_mahalanobis_unsupervised_thresh_vars(
scope="mahalanobis_dist_thresh_vars")
with tf.variable_scope(
name_or_scope="mahalanobis_dist_thresh_vars", reuse=tf.AUTO_REUSE):
time_anom_thresh_var = tf.get_variable(
name="time_anom_thresh_var",
dtype=tf.float64,
initializer=tf.zeros(shape=[], dtype=tf.float64),
trainable=False)
feat_anom_thresh_var = tf.get_variable(
name="feat_anom_thresh_var",
dtype=tf.float64,
initializer=tf.zeros(shape=[], dtype=tf.float64),
trainable=False)
# Variables for tuning anomaly thresh evaluation
if params["labeled_tune_thresh"]:
(tp_thresh_eval_time_var,
fn_thresh_eval_time_var,
fp_thresh_eval_time_var,
tn_thresh_eval_time_var,
tp_thresh_eval_feat_var,
fn_thresh_eval_feat_var,
fp_thresh_eval_feat_var,
tn_thresh_eval_feat_var) = create_both_confusion_matrix_thresh_vars(
scope="anom_thresh_eval_vars",
time_thresh_size=[],
feat_thresh_size=[])
# Create dummy variable for graph dependency requiring a gradient for TRAIN
dummy_var = tf.get_variable(
name="dummy_var",
dtype=tf.float64,
initializer=tf.zeros(shape=[], dtype=tf.float64),
trainable=True)
################################################################################
predictions_dict = None
loss = None
train_op = None
eval_metric_ops = None
export_outputs = None
# Now branch off based on which mode we are in
# Call specific model
model_functions = {
"dense_autoencoder": dense_autoencoder_model,
"lstm_enc_dec_autoencoder": lstm_enc_dec_autoencoder_model,
"pca": pca_model}
# Get function pointer for selected model type
model_function = model_functions[params["model_type"]]
# Build selected model
loss, train_op, X_time_orig, X_time_recon, X_feat_orig, X_feat_recon = \
model_function(X, mode, params, cur_batch_size, dummy_var)
if not (mode == tf.estimator.ModeKeys.TRAIN and
params["training_mode"] == "reconstruction"):
# shape = (cur_batch_size * seq_len, num_feat)
X_time_abs_recon_err = tf.abs(
x=X_time_orig - X_time_recon)
# Features based
# shape = (cur_batch_size * num_feat, seq_len)
X_feat_abs_recon_err = tf.abs(
x=X_feat_orig - X_feat_recon)
if (mode == tf.estimator.ModeKeys.TRAIN and
params["training_mode"] == "calculate_error_distribution_statistics"):
loss, train_op = calculate_error_distribution_statistics_training(
cur_batch_size,
X_time_abs_recon_err,
abs_err_count_time_var,
abs_err_mean_time_var,
abs_err_cov_time_var,
abs_err_inv_cov_time_var,
X_feat_abs_recon_err,
abs_err_count_feat_var,
abs_err_mean_feat_var,
abs_err_cov_feat_var,
abs_err_inv_cov_feat_var,
params,
dummy_var)
elif (mode == tf.estimator.ModeKeys.EVAL and
params["training_mode"] != "tune_anomaly_thresholds"):
loss, eval_metric_ops = reconstruction_evaluation(
X_time_orig, X_time_recon, params["training_mode"])
elif (mode == tf.estimator.ModeKeys.PREDICT or
((mode == tf.estimator.ModeKeys.TRAIN or
mode == tf.estimator.ModeKeys.EVAL) and
params["training_mode"] == "tune_anomaly_thresholds")):
with tf.variable_scope(
name_or_scope="mahalanobis_dist_vars", reuse=tf.AUTO_REUSE):
# Time based
# shape = (cur_batch_size, seq_len)
mahalanobis_dist_time = mahalanobis_dist(
err_vec=X_time_abs_recon_err,
mean_vec=abs_err_mean_time_var,
inv_cov=abs_err_inv_cov_time_var,
final_shape=params["seq_len"])
# Features based
# shape = (cur_batch_size, num_feat)
mahalanobis_dist_feat = mahalanobis_dist(
err_vec=X_feat_abs_recon_err,
mean_vec=abs_err_mean_feat_var,
inv_cov=abs_err_inv_cov_feat_var,
final_shape=params["num_feat"])
if mode != tf.estimator.ModeKeys.PREDICT:
if params["labeled_tune_thresh"]:
labels_norm_mask = tf.equal(x=labels, y=0)
labels_anom_mask = tf.equal(x=labels, y=1)
if mode == tf.estimator.ModeKeys.TRAIN:
loss, train_op = tune_anomaly_thresholds_supervised_training(
labels_norm_mask,
labels_anom_mask,
mahalanobis_dist_time,
tp_thresh_time_var,
fn_thresh_time_var,
fp_thresh_time_var,
tn_thresh_time_var,
time_anom_thresh_var,
mahalanobis_dist_feat,
tp_thresh_feat_var,
fn_thresh_feat_var,
fp_thresh_feat_var,
tn_thresh_feat_var,
feat_anom_thresh_var,
params,
mode,
dummy_var)
elif mode == tf.estimator.ModeKeys.EVAL:
loss, eval_metric_ops = tune_anomaly_thresholds_supervised_eval(
labels_norm_mask,
labels_anom_mask,
time_anom_thresh_var,
mahalanobis_dist_time,
tp_thresh_eval_time_var,
fn_thresh_eval_time_var,
fp_thresh_eval_time_var,
tn_thresh_eval_time_var,
feat_anom_thresh_var,
mahalanobis_dist_feat,
tp_thresh_eval_feat_var,
fn_thresh_eval_feat_var,
fp_thresh_eval_feat_var,
tn_thresh_eval_feat_var,
params,
mode)
else: # not params["labeled_tune_thresh"]
if mode == tf.estimator.ModeKeys.TRAIN:
loss, train_op = tune_anomaly_thresholds_unsupervised_training(
cur_batch_size,
time_anom_thresh_var,
mahalanobis_dist_time,
count_thresh_time_var,
mean_thresh_time_var,
var_thresh_time_var,
feat_anom_thresh_var,
mahalanobis_dist_feat,
count_thresh_feat_var,
mean_thresh_feat_var,
var_thresh_feat_var,
params,
dummy_var)
elif mode == tf.estimator.ModeKeys.EVAL:
loss, eval_metric_ops = tune_anomaly_thresholds_unsupervised_eval(
cur_batch_size,
time_anom_thresh_var,
mahalanobis_dist_time,
feat_anom_thresh_var,
mahalanobis_dist_feat)
else: # mode == tf.estimator.ModeKeys.PREDICT
predictions_dict, export_outputs = anomaly_detection_predictions(
cur_batch_size,
params["seq_len"],
params["num_feat"],
mahalanobis_dist_time,
mahalanobis_dist_feat,
time_anom_thresh_var,
feat_anom_thresh_var,
X_time_abs_recon_err,
X_feat_abs_recon_err)
# Return EstimatorSpec
return tf.estimator.EstimatorSpec(
mode=mode,
predictions=predictions_dict,
loss=loss,
train_op=train_op,
eval_metric_ops=eval_metric_ops,
export_outputs=export_outputs)
In [ ]:
%%writefile anomaly_detection_module/trainer/serving.py
import tensorflow as tf
# Serving input functions
def fix_shape_and_type_for_serving(placeholder):
"""Fixes the shape and type of serving input strings.
Given placeholder tensor, return parsed and processed feature tensor.
Args:
placeholder: Placeholder tensor holding raw data from serving input
function.
Returns:
Parsed and processed feature tensor.
"""
cur_batch_size = tf.shape(input=placeholder, out_type=tf.int64)[0]
# String split each string in batch and output values from the resulting
# SparseTensors
# shape = (batch_size, seq_len)
split_string = tf.stack(values=tf.map_fn(
fn=lambda x: tf.string_split(
source=[placeholder[x]], delimiter=";").values,
elems=tf.range(
start=0, limit=cur_batch_size, dtype=tf.int64),
dtype=tf.string), axis=0)
# Convert each string in the split tensor to float
# shape = (batch_size, seq_len)
feature_tensor = tf.string_to_number(
string_tensor=split_string, out_type=tf.float64)
return feature_tensor
def get_shape_and_set_modified_shape_2D(tensor, additional_dimension_sizes):
"""Fixes the shape and type of serving input strings.
Given feature tensor and additional dimension size, sequence length,
fixes dynamic shape ambiguity of last dimension so that we will be able to
use it in our DNN (since tf.layers.dense require the last dimension to be
known).
Args:
tensor: tf.float64 vector feature tensor.
additional_dimension_sizes: Additional dimension size, namely sequence
length.
Returns:
Feature tensor with set static shape for sequence length.
"""
# Get static shape for tensor and convert it to list
shape = tensor.get_shape().as_list()
# Set outer shape to additional_dimension_sizes[0] since know this is the
# correct size
shape[1] = additional_dimension_sizes[0]
# Set the shape of tensor to our modified shape
# shape = (batch_size, additional_dimension_sizes[0])
tensor.set_shape(shape=shape)
return tensor
def serving_input_fn(feat_names, seq_len):
"""Serving input function.
Given the sequence length, return ServingInputReceiver object.
Args:
feat_names: List of string names of features.
seq_len: Number of timesteps in sequence.
Returns:
ServingInputReceiver object containing features and receiver tensors.
"""
# Create placeholders to accept the data sent to the model at serving time
# All features come in as a batch of strings, shape = (batch_size,),
# this was so because of passing the arrays to online ml-engine prediction
feature_placeholders = {
feature: tf.placeholder(
dtype=tf.string, shape=[None])
for feature in feat_names
}
# Create feature tensors
features = {key: fix_shape_and_type_for_serving(placeholder=tensor)
for key, tensor in feature_placeholders.items()}
# Fix dynamic shape ambiguity of feature tensors for our DNN
features = {key: get_shape_and_set_modified_shape_2D(
tensor=tensor, additional_dimension_sizes=[seq_len])
for key, tensor in features.items()}
return tf.estimator.export.ServingInputReceiver(
features=features, receiver_tensors=feature_placeholders)
In [ ]:
%%writefile anomaly_detection_module/trainer/model.py
import tensorflow as tf
from .anomaly_detection import anomaly_detection
from .input import read_dataset
from .serving import serving_input_fn
# Set logging to be level of INFO
tf.logging.set_verbosity(tf.logging.INFO)
def train_and_evaluate(args):
"""Train and evaluate custom Estimator with three training modes.
Given the dictionary of parameters, create custom Estimator and run up to
three training modes then return Estimator object.
Args:
args: Dictionary of parameters.
Returns:
Estimator object.
"""
# Create our custom estimator using our model function
estimator = tf.estimator.Estimator(
model_fn=anomaly_detection,
model_dir=args["output_dir"],
params={key: val for key, val in args.items()})
if args["training_mode"] == "reconstruction":
# Calculate max_steps
max_steps = int(args["reconstruction_epochs"] * args["train_examples"])
max_steps = max_steps // args["train_batch_size"]
max_steps += args["previous_train_steps"]
# Create eval spec to read in our validation data
eval_spec = tf.estimator.EvalSpec(
input_fn=read_dataset(
filename=args["eval_file_pattern"],
mode=tf.estimator.ModeKeys.EVAL,
batch_size=args["eval_batch_size"],
params=args),
steps=None,
start_delay_secs=args["start_delay_secs"], # start eval after N secs
throttle_secs=args["throttle_secs"]) # evaluate every N secs
if args["model_type"] == "pca":
# Create train spec to read in our training data
train_spec = tf.estimator.TrainSpec(
input_fn=read_dataset(
filename=args["train_file_pattern"],
mode=tf.estimator.ModeKeys.EVAL, # read through train data once
batch_size=args["train_batch_size"],
params=args),
max_steps=max_steps)
# Check to see if we need to additionally tune principal components
if not args["autotune_principal_components"]:
# Create train and evaluate loop to train and evaluate our estimator
tf.estimator.train_and_evaluate(
estimator=estimator, train_spec=train_spec, eval_spec=eval_spec)
else:
if (args["k_principal_components_time"] is None or
args["k_principal_components_feat"] is None):
# Create train and evaluate loop to train and evaluate our estimator
tf.estimator.train_and_evaluate(
estimator=estimator, train_spec=train_spec, eval_spec=eval_spec)
else: # dense_autoencoder or lstm_enc_dec_autoencoder
# Create early stopping hook to help reduce overfitting
early_stopping_hook = tf.contrib.estimator.stop_if_no_decrease_hook(
estimator=estimator,
metric_name="rmse",
max_steps_without_decrease=100,
min_steps=1000,
run_every_secs=60,
run_every_steps=None)
# Create train spec to read in our training data
train_spec = tf.estimator.TrainSpec(
input_fn=read_dataset(
filename=args["train_file_pattern"],
mode=tf.estimator.ModeKeys.TRAIN,
batch_size=args["train_batch_size"],
params=args),
max_steps=max_steps,
hooks=[early_stopping_hook])
# Create train and evaluate loop to train and evaluate our estimator
tf.estimator.train_and_evaluate(
estimator=estimator, train_spec=train_spec, eval_spec=eval_spec)
else:
# Calculate max_steps
max_steps = args["train_examples"] // args["train_batch_size"]
max_steps += args["previous_train_steps"]
# if args["training_mode"] == "calculate_error_distribution_statistics"
# Get final mahalanobis statistics over the entire val_1 dataset
# if args["training_mode"] == "tune_anomaly_thresholds"
# Tune anomaly thresholds using val_2 and val_anom datasets
train_spec = tf.estimator.TrainSpec(
input_fn=read_dataset(
filename=args["train_file_pattern"],
mode=tf.estimator.ModeKeys.EVAL, # read through val data once
batch_size=args["train_batch_size"],
params=args),
max_steps=max_steps)
if args["training_mode"] == "calculate_error_distribution_statistics":
# Don't create exporter for serving yet since anomaly thresholds
# aren't trained yet
exporter = None
elif args["training_mode"] == "tune_anomaly_thresholds":
# Create exporter that uses serving_input_fn to create saved_model
# for serving
exporter = tf.estimator.LatestExporter(
name="exporter",
serving_input_receiver_fn=lambda: serving_input_fn(
args["feat_names"], args["seq_len"]))
else:
print("{0} isn't a valid training mode!".format(args["training_mode"]))
# Create eval spec to read in our validation data and export our model
eval_spec = tf.estimator.EvalSpec(
input_fn=read_dataset(
filename=args["eval_file_pattern"],
mode=tf.estimator.ModeKeys.EVAL,
batch_size=args["eval_batch_size"],
params=args),
steps=None,
exporters=exporter,
start_delay_secs=args["start_delay_secs"], # start eval after N secs
throttle_secs=args["throttle_secs"]) # evaluate every N secs
if (args["training_mode"] == "calculate_error_distribution_statistics" or
args["training_mode"] == "tune_anomaly_thresholds"):
# Create train and evaluate loop to train and evaluate our estimator
tf.estimator.train_and_evaluate(
estimator=estimator, train_spec=train_spec, eval_spec=eval_spec)
return
In [ ]:
%%writefile anomaly_detection_module/trainer/task.py
import argparse
import json
import os
from .model import train_and_evaluate
if __name__ == "__main__":
parser = argparse.ArgumentParser()
# File arguments
parser.add_argument(
"--train_file_pattern",
help="GCS location to read training data.",
required=True
)
parser.add_argument(
"--eval_file_pattern",
help="GCS location to read evaluation data.",
required=True
)
parser.add_argument(
"--output_dir",
help="GCS location to write checkpoints and export models.",
required=True
)
parser.add_argument(
"--job-dir",
help="This model ignores this field, but it is required by gcloud.",
default="junk"
)
# Sequence shape hyperparameters
parser.add_argument(
"--seq_len",
help="Number of timesteps to include in each example.",
type=int,
default=30
)
parser.add_argument(
"--num_feat",
help="Number of features for each example.",
type=int,
default=5
)
# Feature hyperparameters
parser.add_argument(
"--feat_names",
help="Names of features.",
type=str,
required=True
)
parser.add_argument(
"--feat_defaults",
help="Default values of features.",
type=str,
required=True
)
# Training parameters
parser.add_argument(
"--train_batch_size",
help="Number of examples in training batch.",
type=int,
default=32
)
parser.add_argument(
"--eval_batch_size",
help="Number of examples in evaluation batch.",
type=int,
default=32
)
parser.add_argument(
"--previous_train_steps",
help="Number of batches previously train in other stages.",
type=int,
default=0
)
parser.add_argument(
"--reconstruction_epochs",
help="Number of times to go through the reconstruction dataset",
type=float,
default=1.0
)
parser.add_argument(
"--train_examples",
help="Number of examples in train file.",
type=int,
default=1024
)
parser.add_argument(
"--eval_examples",
help="Number of examples in train file.",
type=int,
default=1024
)
parser.add_argument(
"--learning_rate",
help="How quickly or slowly we train our model by scaling the gradient.",
type=float,
default=0.1
)
parser.add_argument(
"--start_delay_secs",
help="Number of seconds to wait before first evaluation.",
type=int,
default=60
)
parser.add_argument(
"--throttle_secs",
help="Number of seconds to wait between evaluations.",
type=int,
default=120
)
# Model hyperparameters
# dense_autoencoder, lstm_enc_dec_autoencoder, pca
parser.add_argument(
"--model_type",
help="Which model type we will use.",
type=str,
default="dense_autoencoder"
)
## Dense Autoencoder
parser.add_argument(
"--enc_dnn_hidden_units",
help="Hidden layer sizes to use for encoder DNN.",
type=str,
default="1024,256,64"
)
parser.add_argument(
"--latent_vector_size",
help="Number of neurons for latent vector between encoder and decoder.",
type=int,
default=8
)
parser.add_argument(
"--dec_dnn_hidden_units",
help="Hidden layer sizes to use for decoder DNN.",
type=str,
default="64,256,1024"
)
parser.add_argument(
"--time_loss_weight",
help="Amount to weight the time based loss.",
type=float,
default=1.0
)
parser.add_argument(
"--feat_loss_weight",
help="Amount to weight the features based loss.",
type=float,
default=1.0
)
## LSTM Encoder-Decoder Autoencoder
parser.add_argument(
"--reverse_labels_sequence",
help="Whether we should reverse the labels sequence dimension or not.",
type=str,
default="True"
)
parser.add_argument(
"--enc_lstm_hidden_units",
help="Hidden layer sizes to use for LSTM encoder.",
type=str,
default="64,32,16"
)
parser.add_argument(
"--dec_lstm_hidden_units",
help="Hidden layer sizes to use for LSTM decoder.",
type=str,
default="16,32,64"
)
parser.add_argument(
"--lstm_dropout_output_keep_probs",
help="Keep probabilties for LSTM outputs.",
type=str,
default="1.0,1.0,1.0"
)
parser.add_argument(
"--dnn_hidden_units",
help="Hidden layer sizes to use for DNN.",
type=str,
default="1024,256,64"
)
## PCA
parser.add_argument(
"--autotune_principal_components",
help="Whether we should autotune the number of principal components.",
type=str,
default="False"
)
parser.add_argument(
"--k_principal_components_time",
help="Top time k principal components to keep after eigendecomposition.",
type=int,
default=None
)
parser.add_argument(
"--k_principal_components_feat",
help="Top feat k principal components to keep after eigendecomposition.",
type=int,
default=None
)
# Anomaly detection
# reconstruction, calculate_error_distribution_statistics,
# and tune_anomaly_thresholds
parser.add_argument(
"--training_mode",
help="Which training mode we are in.",
type=str,
default="reconstruction"
)
parser.add_argument(
"--labeled_tune_thresh",
help="If we have a labeled dataset for supervised anomaly tuning.",
type=str,
default="True"
)
parser.add_argument(
"--num_time_anom_thresh",
help="Number of anomaly thresholds to evaluate in time dimension.",
type=int,
default=120
)
parser.add_argument(
"--num_feat_anom_thresh",
help="Number of anomaly thresholds to evaluate in features dimension.",
type=int,
default=120
)
parser.add_argument(
"--min_time_anom_thresh",
help="Minimum anomaly threshold to evaluate in time dimension.",
type=float,
default=100.0
)
parser.add_argument(
"--max_time_anom_thresh",
help="Maximum anomaly threshold to evaluate in time dimension.",
type=float,
default=2000.0
)
parser.add_argument(
"--min_feat_anom_thresh",
help="Minimum anomaly threshold to evaluate in features dimension.",
type=float,
default=100.0
)
parser.add_argument(
"--max_feat_anom_thresh",
help="Maximum anomaly threshold to evaluate in features dimension.",
type=float,
default=2000.0
)
parser.add_argument(
"--time_thresh_scl",
help="Max num of std devs for time mahalanobis distance to be normal.",
type=float,
default=2.0
)
parser.add_argument(
"--feat_thresh_scl",
help="Max num of std devs for feature mahalanobis distance to be normal.",
type=float,
default=2.0
)
parser.add_argument(
"--time_anom_thresh",
help="Anomaly threshold in time dimension.",
type=float,
default=None
)
parser.add_argument(
"--feat_anom_thresh",
help="Anomaly threshold in features dimension.",
type=float,
default=None
)
parser.add_argument(
"--eps",
help="Added to the cov matrix before inversion to avoid being singular.",
type=str,
default="1e-12"
)
parser.add_argument(
"--f_score_beta",
help="Value of beta of the f-beta score.",
type=float,
default=0.05
)
# Parse all arguments
args = parser.parse_args()
arguments = args.__dict__
# Unused args provided by service
arguments.pop("job_dir", None)
arguments.pop("job-dir", None)
# Fix booleans
if arguments["reverse_labels_sequence"].lower() in ("yes", "true", "t", "y", "1"):
arguments["reverse_labels_sequence"] = True
else:
arguments["reverse_labels_sequence"] = False
if arguments["autotune_principal_components"].lower() in ("yes", "true", "t", "y", "1"):
arguments["autotune_principal_components"] = True
else:
arguments["autotune_principal_components"] = False
if arguments["labeled_tune_thresh"].lower() in ("yes", "true", "t", "y", "1"):
arguments["labeled_tune_thresh"] = True
else:
arguments["labeled_tune_thresh"] = False
# Fix list arguments
arguments["feat_names"] = arguments["feat_names"].split(",")
arguments["feat_defaults"] = [[item] for item in arguments["feat_defaults"].split(",")]
## Dense Autoencoder
arguments["enc_dnn_hidden_units"] = [
int(x) for x in arguments["enc_dnn_hidden_units"].split(",")]
arguments["dec_dnn_hidden_units"] = [
int(x) for x in arguments["dec_dnn_hidden_units"].split(",")]
## LSTM Encoder-Decoder Autoencoder
arguments["enc_lstm_hidden_units"] = [
int(x) for x in arguments["enc_lstm_hidden_units"].split(",")]
arguments["dec_lstm_hidden_units"] = [
int(x) for x in arguments["dec_lstm_hidden_units"].split(",")]
arguments["lstm_dropout_output_keep_probs"] = [
float(x) for x in arguments["lstm_dropout_output_keep_probs"].split(",")]
arguments["dnn_hidden_units"] = [
int(x) for x in arguments["dnn_hidden_units"].split(",")]
# Fix eps argument
arguments["eps"] = float(arguments["eps"])
# If doing PCA, then add autotune PC key to dictionary
if arguments["model_type"] == "pca":
arguments["autotune_principal_components"] = False
# Append trial_id to path if we are doing hptuning
# This code can be removed if you are not using hyperparameter tuning
arguments["output_dir"] = os.path.join(
arguments["output_dir"],
json.loads(
os.environ.get("TF_CONFIG", "{}")
).get("task", {}).get("trial", "")
)
# Run the training job
train_and_evaluate(arguments)
In [ ]: