Benchmarking VDC Dataset with LMDB and TF 0.9.0


In [1]:
import matplotlib

import sys
sys.path.append('/opt/caffe/python')

from cStringIO import StringIO
from caffe.proto import caffe_pb2
from datetime import datetime
import lmdb, logging, math, os, time
import numpy as np
from PIL import Image
import tensorflow as tf

%matplotlib inline

In [2]:
print (tf.__version__)


0.9.0

In [3]:
lmdb_dir = '/data/VDC'

Using LMDB


In [4]:
class DBEntries(object):
    '''Handle lmdb entries.
    
    Attributes:
        begin: Int; index into entries array
        epoch: Int; number of epochs traversed
        total_entries: Int; number of entries in the database
        entries: np.array; parsed entries of the database
    '''
    
    def __init__(self, loc, shuffle=True):
        '''Initialize new DBReader instance.
        
        Args:
            loc: String; path to database
            shuffle: Bool; shuffle entries (True) or keep original order (False)
        '''
        
        self.begin = 0
        self.epoch = 0
        self.__shuffle = shuffle
        
        # Keep size to maximum of 1MB
        self._db = lmdb.open(loc, map_size=1024**3, readonly=True, lock=False)
        
        with self._db.begin() as txn:
            self.total_entries = txn.stat()['entries']
            self.entries = np.empty(shape=[self.total_entries], dtype=tuple)
            
            index = 0
            cursor = txn.cursor()
            for key, value in cursor:
                datum = caffe_pb2.Datum()
                datum.ParseFromString(value)
                self.entries[index] = (Image.open(StringIO(datum.data)), datum.label)
                
                index += 1
                
        self.__shuffle_entries()
                
    def __shuffle_entries(self):
        if self.__shuffle: np.random.shuffle(self.entries)
                
    def __update(self, num):
        '''Properly update self.begin index and update epoch counter
        
        Args:
            num: Int; the number to update self.begin
        '''
        
        if num >= len(self.entries):
            num = 0
            self.epoch += 1
            self.__shuffle_entries()
        
        self.begin = num
                
    def get_batch(self, batch_size):
        '''Get batch_size entries.
        
        Args:
            batch_size: Int; number of entries to retrieve
            
        Returns:
            data: np.array; array of batch_size entry objects
            labels: np.array; array of labels of entry objects
        '''
        
        end = self.begin + batch_size
        tmp = self.entries[self.begin:end] if end < len(self.entries) else self.entries[self.begin:]
        
        images = []
        labels = []
        for img, lbl in tmp:
            images.append(np.asarray(img))
            labels.append(lbl)

        self.__update(end)
        return np.stack(images), np.array(labels)
    
    def reset(self):
        '''Reset begin index and epoch count and reshuffle entries.'''
        
        self.begin = 0
        self.epoch = 0
        self.__shuffle_entries()

In [5]:
def create_db_entries(path):
    '''Create a DBEntries if database at path exists.

    Args:
        path: String; the path the to database

    Returns:
        DBReader instance if path database exists, None otherwise
    '''

    try:
        reader = DBEntries(path)
    except lmdb.Error:
        reader = None

    return reader

class Database(object):
    '''Handle entry generation from lmdb databases created in DIGITS.
    
    Attributes:
        train: np.array; parsed caffe_pb2.Datum entries for training
        valid: np.array; parsed caffe_pb2.Datum entries for validation
        test: np.array; parsed caffe_pb2.Datum entries for testing
        labels: List[String]; ordered labels of files in the databases
    '''
    
    def __init__(self, root_dir):
        '''Initialize new Database instance.
        
        Args:
            root_dir: String; path to root directory of training, validation, and (optional) testing databases
        '''
        
        # Parsed database entries
        self.train = create_db_entries(os.path.join(root_dir, 'train_db'))
        self.valid = create_db_entries(os.path.join(root_dir, 'val_db'))
        self.test = create_db_entries(os.path.join(root_dir, 'test_db'))
        
        # Mean image
        self.mean = np.asarray(Image.open(os.path.join(root_dir, 'mean.jpg')))
        self.dims = self.mean.shape
        
        # Get label mappings
        with open(os.path.join(root_dir, 'labels.txt'), 'r') as lbl_file:
            self.labels = [x[:-1] for x in lbl_file.readlines()]   # Remove newlines
            
    def reset(self):
        '''Reset database entry handlers.'''
        
        if self.train is not None: self.train.reset()
        if self.valid is not None: self.valid.reset()
        if self.test is not None: self.test.reset()

In [6]:
start = time.time()
db = Database(lmdb_dir)
end = time.time()
print('Create DB in %.3f seconds' % (end - start))
print(db.train.entries.shape, db.valid.entries.shape, db.test, db.mean.shape, db.dims)


Create DB in 18.766 seconds
((61526,), (20485,), None, (256, 256, 3), (256, 256, 3))

Model


In [7]:
def weights(shape, std, name=None):
    '''Create a weights variable.
    
    Args:
        shape: A list of ints
        std: A float; standard deviation of randomized filler values
        name: String; a name for the returned tensor
        
    Returns:
        A variable tensor
    '''
    return tf.Variable(tf.truncated_normal(shape=shape, dtype=tf.float32, stddev=std), name=name)

def biases(num, shape, name=None):
    '''Create a biases variable.
    
    Args:
        num: The constant value to fill the variable
        shape: A list of ints; specify the dimensions of returned tensor
        name: String; a name for the returned tensor
        
    Returns:
        A variable tensor
    '''
    return tf.Variable(tf.constant(num, dtype=tf.float32, shape=shape), name=name)

def pool(conv, name=None):
    '''Perform max pooling on conv.
    
    Args:
        conv: A 4-D tensor; the convolution on which to perform max pooling
        name: String; a name for the returned tensor
        
    Returns:
        Max pooled tensor
    '''
    return tf.nn.max_pool(conv, ksize=[1, 3, 3, 1], strides=[1, 2, 2, 1], padding='VALID', name=name)

def convolution(data, bias_val, weights_shape, stride, pad, name):
    '''Convolution layer of AlexNet.
    
    Args:
        data: A 4-D tensor; input on which to perform the convolution
        bias_val: A float; see biases() for more details
        weights_shape: A list of ints; 1-D of length 4; see weights() for more details
        stride: A list of ints; 1-D of length 4; The stride of the sliding window for each dimension of data
        pad: Type of padding to use; either SAME or VALID
        name: String; a name for the convolution layer
        
    Returns:
        A tensor; the result of the convolution layer
    '''
    
    with tf.name_scope(name) as scope:
        kern = weights(weights_shape, 1e-2, name + '_weights')
        conv = tf.nn.conv2d(data, kern, stride, padding=pad)
        bias = biases(bias_val, [weights_shape[-1]], name + '_biases')
        conv = tf.nn.relu(tf.nn.bias_add(conv, bias), name=scope)
        return conv
    
def fully_connected(data, bias_val, weights_shape, keep_prob, name):
    '''Fully Connected layer of AlexNet.
    
    Args:
        data: A 4-D tensor; input on which to perform the matrix multiplication
        bias_val: A float; see biases() for more details
        weights_shape: A list of ints; 1-D of length 4; see weights() for more details
        keep_prob: A 0-D tensor; probability of keeping activation during dropout
        name: String; a name for the fully connected layer
        
    Returns:
        A tensor; the result of the fully connected layer
    '''
    
    with tf.name_scope(name) as scope:
        weight = weights(weights_shape, 5e-3, name + '_weights')
        bias = biases(bias_val, [weights_shape[-1]], name + '_biases')
        relu = tf.nn.relu_layer(data, weight, bias)
        drop = tf.nn.dropout(relu, keep_prob, name=scope)
        return drop

In [8]:
def alexnet(images, keep_prob, out_chan):
    '''Create AlexNet Model.
    
    Based on the graph visualization from DIGITS.
    DIGITS runs LRN after convolutions 1 and 2; however, there
    is no GPU implementation available for TF, resulting in
    massive performance decrease. According to the Stanford 
    Course [CS321n](http://cs231n.github.io/convolutional-networks/),
    LRN has minimal effect on outcome, and is thus removed from this model.
    
    Args:
        images: Images tensor with shape [batch_size, 256, 256, 3]
        keep_prob: A 0-D tensor; see fully_connected() for more details
        out_chan: Dictionary; key: name of layer, val: number of output channels
        
    Returns:
        logits: Last tensor in fully connected layer
    '''
    
    # conv1
    conv1 = convolution(images, 0.0, [11, 11, 3, out_chan['conv1']], [1, 4, 4, 1], 'VALID', 'conv1')
    
    # norm1
    #norm1 = tf.nn.local_response_normalization(conv1, alpha=1e-4, beta=0.75, name='norm1')
    
    # pool1
    pool1 = pool(conv1, 'pool1')
    
    # conv2
    conv2 = convolution(pool1, 0.1, [5, 5, out_chan['conv1'], out_chan['conv2']], [1, 1, 1, 1], 'SAME', 'conv2')
    
    # norm2
    #norm2 = tf.nn.local_response_normalization(conv2, alpha=1e-4, beta=0.75, name='norm2')
    
    # pool2
    pool2 = pool(conv2, 'pool2')
    
    # conv3
    conv3 = convolution(pool2, 0.0, [3, 3, out_chan['conv2'], out_chan['conv3']], [1, 1, 1, 1], 'SAME', 'conv3')
    
    # conv4
    conv4 = convolution(conv3, 0.1, [3, 3, out_chan['conv3'], out_chan['conv4']], [1, 1, 1, 1], 'SAME', 'conv4')
    
    # conv5
    conv5 = convolution(conv4, 0.1, [3, 3, out_chan['conv4'], out_chan['conv5']], [1, 1, 1, 1], 'SAME', 'conv5')
    
    # pool5
    pool5 = pool(conv5, 'pool5')
    
    # Flatten each image to apply regression
    flat = tf.reshape(pool5, [-1, out_chan['flat']])
    
    # fc6
    fc6 = fully_connected(flat, 0.1, [out_chan['flat'], out_chan['fc6']], keep_prob, 'fc6')
    
    # fc7
    fc7 = fully_connected(fc6, 0.1, [out_chan['fc6'], out_chan['fc7']], keep_prob, 'fc7')
    
    # fc8
    # Similar to fully connected layers 6 and 7
    # above, but without relu or dropout
    with tf.name_scope('fc8') as scope:
        weight = weights([out_chan['fc7'], out_chan['fc8']], 1e-2, 'fc8_weights')
        bias = biases(0.0, [out_chan['fc8']], 'fc8_biases')
        fc8 = tf.nn.bias_add(tf.matmul(fc7, weight), bias, name=scope)
        
        return fc8

Train


In [9]:
def loss(logits, labels):
    '''Calculate loss from logits and true labels.
    
    Args:
        logits: Logits tensor, tf.float32 - [batch_size, num_categories]
        labels: True labels tensor, tf.int32 - [batch_size]
        
    Returns:
        loss: Loss tensor, tf.float32
    '''
    
    with tf.name_scope('loss') as scope:
        
        # Convert labels to 1-hot encodings
        batch_size = tf.size(labels)
        labels = tf.expand_dims(labels, 1)
        indices = tf.expand_dims(tf.range(0, batch_size), 1)
        concated = tf.concat(1, [indices, labels])
        onehot = tf.sparse_to_dense(concated, tf.pack([batch_size, num_categories]), 1.0, 0.0)

        cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits, onehot)
        return tf.reduce_mean(cross_entropy)

def training(loss, base_lr, decay_steps):
    '''Create an optimizer and apply gradients to all trainable variables.
    
    Args:
        loss: Loss tensor, from loss()
        base_lr: The base learning rate for gradient descent
        decay_steps: An integer; number of global steps before base_lr is dropped by decay rate
        
    Returns:
        train: The training Operation
    '''
    
    with tf.name_scope('training') as scope:
        
        # Set exponential decay of learning rate
        # Use 0.1 decay rate and staircase from DIGITS
        global_step = tf.Variable(0, trainable=False)
        learning_rate = tf.train.exponential_decay(base_lr, global_step, decay_steps, 0.1, staircase=True)

        optimizer = tf.train.GradientDescentOptimizer(learning_rate)
        return optimizer.minimize(loss, global_step=global_step), learning_rate

def evaluate(logits, labels):
    '''Evaluate the accuracy of logits in predicting labels.
    
    Should only be used with validation images and labels.
    
    Args:
        logits: Logits tensor, tf.float32 - [batch_size, num_categories]
        labels: True labels tensor, tf.int32 = [batch_size]
        
    Returns:
        accuracy: The accuracy in percentage of the logits
    '''
    
    with tf.name_scope('evaluate') as scope:
        
        correct = tf.nn.in_top_k(logits, labels, 1)
        num_true = tf.reduce_sum(tf.cast(correct, tf.int32))
        return tf.cast(num_true, tf.float32) * (100. / tf.cast(tf.size(labels), tf.float32))

In [10]:
def init_logging(log_fname, name):
    '''Initialize logging to log_fname.
    
    Args:
        log_fname: Name of log file to save output
        name: Name of logger
        
    Returns:
        logger: A logging instance
    '''
    
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    logger.propagate = False
    
    # Handler to append log file
    fh = logging.FileHandler(log_fname)
    fh.setLevel(logging.INFO)

    # Handler to print logs to console
    ch = logging.StreamHandler()
    ch.setLevel(logging.INFO)

    # Format messages
    formatter = logging.Formatter('%(message)s')
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)

    # Set handlers
    logger.addHandler(fh)
    logger.addHandler(ch)
    
    return logger

In [11]:
def init_directory(dir_name):
    '''Create a new directory for the run.
    
    Args:
        dir_name: String; the name of the directory for the run
        
    Returns:
        String; new directory path
    '''
    
    new_dir_path = os.path.join(out_dir, dir_name)
    
    #os.makedirs(new_dir_path, exist_ok=True)   # python 3
    try:
        os.makedirs(new_dir_path)   # python 2
    except OSError:
        pass
    
    return new_dir_path

In [12]:
def train(run_str, batch_size, epochs, drop_prob, base_lr, chan_dict):
    '''Train AlexNet on VDC Dataset.
    
    Args:
        run_str: String; run id
        batch_size: Number of examples per batch
        epochs: The number of epochs to run the training
        drop_prob: Float; [0. - 1.], percentage of logits to keep during dropout
        base_lr: The base learning rate for gradient descent
        chan_dict: Dictionary; key: name of layer, val: number of output channels
        
    Returns:
        dur_per_batch: List[float]; Times to run each training batch
        dur_per_epoch: List[float]; Times to run each training epoch
    '''
    
    run_dir = init_directory(run_str)
    logger = init_logging(os.path.join(run_dir, run_str + '.log'), run_str + '_logger')
    steps_per_epoch = int(db.train.total_entries / batch_size) + 1
    steps_per_print = int(0.2 * steps_per_epoch)
    target_size = 227
    
    with tf.Graph().as_default():
        
        ### Placeholders
        
        # Placeholder for probability of keeping activations
        # during dropout in order to disable dropout with validation
        keep_prob = tf.placeholder('float')
        
        # Placeholders for inputs
        x = tf.placeholder(tf.float32, shape=[None, db.dims[0], db.dims[1], db.dims[2]])
        l = tf.placeholder(tf.int32, shape=[None])
        
        ### Process input images

        # Resize each image to target_size X target_size X 3
        x_in = tf.image.resize_images(x, target_size, target_size, align_corners=True)
        
        ### Training
        
        logits = alexnet(x_in, keep_prob, chan_dict)
        loss_val = loss(logits, l)
        train_op, lr = training(loss_val, base_lr, 10 * steps_per_epoch)
        
        ### Evaluation
        
        accuracy = evaluate(logits, l)
        
        ### Summaries
        
        # Training
        with tf.name_scope('train_sum') as scope:
            img_sum = tf.image_summary('images', x_in, collections=[scope])
            logits_sum = tf.histogram_summary('logits', logits, collections=[scope])
            loss_sum = tf.scalar_summary('loss', loss_val, collections=[scope])
            lr_sum = tf.scalar_summary('learning_rate', lr, collections=[scope])
            train_sum = tf.merge_all_summaries(key=scope)
        
        # Evaluation
        with tf.name_scope('eval_sum') as scope:
            acc_sum = tf.scalar_summary('accuracy', accuracy, collections=[scope])
            eval_sum = tf.merge_all_summaries(key=scope)
        
        ### Initialize session
        
        saver = tf.train.Saver()
        session = tf.Session()
        session.run(tf.initialize_all_variables())
        sum_writer = tf.train.SummaryWriter(run_dir, session.graph)
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=session, coord=coord)
        
        step = 0
        dur_spp = 0.0
        dur_spe = 0.0
        
        dur_per_batch = []
        dur_per_epoch = []

        try:
            cur_epoch = -1
            while db.train.epoch < epochs and not coord.should_stop():
                
                images, labels = db.train.get_batch(batch_size)
                
                start = time.time()
                feed = {keep_prob: drop_prob, x: images, l: labels}
                _, rate, t_loss, t_sum = session.run([train_op, lr, loss_val, train_sum], feed_dict=feed)
                dur = time.time() - start
                
                dur_spp += dur
                dur_spe += dur
                dur_per_batch.append(dur)
                
                if step % steps_per_print == 0:
                    
                    # Log training stats every steps_per_print steps
                    logger.info('%s: (%d/%d) :: TRAINING :: loss = %.5f, lr = %.5f, dur = %.3f seconds' %
                                (datetime.now(), db.train.epoch, step, t_loss, rate, dur_spp))
                    
                    dur_spp = 0.0
                    
                    # Log training summaries every steps_per_print steps
                    sum_writer.add_summary(t_sum, step)
                
                if cur_epoch != db.train.epoch:
                    
                    cur_epoch = db.train.epoch
                    images, labels = db.valid.get_batch(batch_size)
                    
                    feed = {keep_prob: 1.0, x: images, l: labels}
                    acc, v_loss, e_sum = session.run([accuracy, loss_val, eval_sum], feed_dict=feed)
                    
                    # Log evaluation stats every epoch
                    logger.info('%s: (%d/%d) :: VALIDATION :: loss = %.5f, accuracy = %.2f, dur = %.3f seconds' %
                                (datetime.now(), db.train.epoch, step, v_loss, acc, dur_spe))
                    
                    dur_per_epoch.append(dur_spe)
                    dur_spe = 0.0
                    
                    # Log evaluation summaries every epoch
                    sum_writer.add_summary(e_sum, step)
                    
                    # Save model every epoch
                    saver.save(session, os.path.join(run_dir, 'model_' + str(db.train.epoch) + '.ckpt'))
                    
                step += 1
                
        except tf.errors.OutOfRangeError:
            logger.info('Stopping on OutOfRangeError. This should not happen!')
            
        finally:
            logger.info('Total runtime: %.3f seconds' % sum(dur_per_batch))
            coord.request_stop()
            db.reset()
            
        coord.join(threads)
        session.close()
        
        return dur_per_batch, dur_per_epoch

Benchmarking Loop


In [13]:
def calc_mean_var_stddev(data):
    '''Calculate mean, variance, standard deviation of data.
    
    Args:
        data: List[Float]; statistics to analyze
        
    Returns:
        mean: Float; the average of the data
        var: Float; the variance of the data
        stddev: Float; the standard deviation of the data
    '''
    
    count = len(data)
    mean = sum(data) / count
    var = sum([(x - mean)**2 for x in data]) / count
    stddev = math.sqrt(var)
    
    return mean, var, stddev

In [14]:
def benchmark(num_runs, batch_size, epochs, drop_prob, base_lr, chan_dict):
    '''Time how long it takes to train AlexNet model on VDC Dataset with TensorFlow.
    
    Args:
        num_runs: Int; number of runs
        batch_size: Number of examples per batch
        epochs: The number of epochs to run the training
        drop_prob: Float; [0. - 1.], percentage of logits to keep during dropout
        base_lr: The base learning rate for gradient descent
        chan_dict: Dictionary; key: name of layer, val: number of output channels
    '''
    
    dur_total = []
    
    logger = init_logging(os.path.join(out_dir, 'benchmarking.log'), 'benchmark_logger')
    logger.info('%s: Begin benchmarking AlexNet with TensorFlow' % (datetime.now()))
    
    try:
        for i in range(num_runs):
            dur_per_batch, dur_per_epoch = train('run' + str(i),
                                                 batch_size,
                                                 epochs,
                                                 drop_prob,
                                                 base_lr,
                                                 chan_dict)
            dur_total.append(sum(dur_per_batch))

            # Stats per run
            now = datetime.now()
            run_mean, run_var, run_stddev = calc_mean_var_stddev(dur_per_batch)
            logger.info('%s: Run %d :: %d steps, %.3f +/- %.3f sec / batch' %
                        (now, i, len(dur_per_batch), run_mean, run_stddev))
            run_mean, run_var, run_stddev = calc_mean_var_stddev(dur_per_epoch)
            logger.info('%s: Run %d :: %d epochs, %.3f +/- %.3f sec / epoch' %
                        (now, i, len(dur_per_epoch), run_mean, run_stddev))

        # Total stats
        mean, var, stddev = calc_mean_var_stddev(dur_total)
        logger.info('%s: %d Runs :: %.3f +/- %.3f sec / run' % (datetime.now(), num_runs, mean, stddev))
        logger.info('batch_size: %d, epochs: %d, drop_prob: %.3f, base_lr: %.3f' %
                    (batch_size, epochs, drop_prob, base_lr))
        
    except Exception as e:
        logger.error('%s' % (e))
        raise e

In [15]:
out_dir = '/home/qtb9744/alexNet-TF-0.9/benchmarking'
num_categories = len(db.labels)

num_runs = 3
batch_size = 100
epochs = 30
drop_prob = 0.5
base_lr = 1e-2
channels = {
    'conv1': 96,
    'conv2': 256,
    'conv3': 384,
    'conv4': 384,
    'conv5': 256,
    'flat': 6 * 6 * 256,
    'fc6': 4096,
    'fc7': 4096,
    'fc8': num_categories
}

In [ ]:
benchmark(num_runs, batch_size, epochs, drop_prob, base_lr, channels)