Advanced Research Module Usage

We start with some useful imports and constant definitions


In [1]:
import sys
import os
import shutil

import warnings
warnings.filterwarnings('ignore')

from tensorflow import logging
logging.set_verbosity(logging.ERROR)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import matplotlib
%matplotlib inline

In [2]:
sys.path.append('../../..')

from batchflow import Pipeline, B, C, V, D, L
from batchflow.opensets import MNIST
from batchflow.models.tf import VGG7, VGG16
from batchflow.research import Research, Option, Results, RP

In [3]:
BATCH_SIZE=64
ITERATIONS=1000
TEST_EXECUTE_FREQ=100

In [4]:
def clear_previous_results(res_name):
    if os.path.exists(res_name):
        shutil.rmtree(res_name)

Reducing Extra Dataset Loads

Running Research Sequentially

In previous tutorial we learned how to use Research to run experimetrs multiple times and with varying parameters.

Firstly we define a dataset to work with and a pipeline that reads this dataset


In [5]:
mnist = MNIST()
train_root = mnist.train.p.run_later(BATCH_SIZE, shuffle=True, n_epochs=None)

Then we define a grid of parameters whose nodes will be used to form separate experiments


In [6]:
domain = Option('layout', ['cna', 'can']) * Option('bias', [True, False])

These parameters can be passed to model's configs using named expressions.


In [7]:
model_config={
    'inputs/images/shape': B('image_shape'),
    'inputs/labels/classes': D('num_classes'),
    'inputs/labels/name': 'targets',
    'initial_block/inputs': 'images',
    'body/block/layout': C('layout'),
    'common/conv/use_bias': C('bias'),
}

After that we define a pipeline to run during our experiments. We initialise a pipeline variable 'loss' to store loss on each iteration


In [8]:
train_template = (Pipeline()
            .init_variable('loss')
            .init_model('dynamic', VGG7, 'conv', config=model_config)
            .to_array()
            .train_model('conv', 
                         images=B('images'), labels=B('labels'),
                         fetches='loss', save_to=V('loss', mode='w'))
)

Each research is assigned with a name and writes its results to a folder with this name. The names must be unique, so if one attempts to run a research with a name that already exists, an error will be thrown. In the cell below we clear the results of previous research runs so as to allow multiple runs of a research. This is done solely for purposes of ths tutorial and should not be done in real work


In [9]:
res_name = 'simple_research'
clear_previous_results(res_name)

Finally we define a Research that runs the pipeline substituting its parameters using different nodes of the grid, and saves values of the 'loss' named expressions to results.


In [10]:
research = (Research()
            .add_pipeline(train_root + train_template, variables='loss')
            .init_domain(domain, n_reps=4))

research.run(n_iters=10, name=res_name, bar=True)


Research simple_research is starting...
Domain updated: 0: 100%|██████████| 160/160.0 [04:36<00:00,  1.73s/it]
Out[10]:
<batchflow.research.research.Research at 0x7f0ad29e4c50>

16 experiments are run (4 grid nodes x 4 repetitions) each consisting of 10 iterations.

We can load results of the research and see that the table has 160 entries.


In [11]:
research.load_results().df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 160 entries, 0 to 159
Data columns (total 8 columns):
name            160 non-null object
loss            160 non-null float64
iteration       160 non-null int64
sample_index    160 non-null object
layout          160 non-null object
bias            160 non-null object
repetition      160 non-null int64
update          160 non-null int64
dtypes: float64(1), int64(3), object(4)
memory usage: 10.1+ KB

Branches: Reducing Data Loading and Preprocessing

Each experiment can be divided into 2 stages: root stage that is roughly same for all experiments (for example, data loading and preprocessing) and branch stage that varies. If data loading and preprocessing take significant time one can use the batches generated on a single root stage to feed to several branches that belong to different experiments.

For example, if you want to test 4 different models, and yor workflow includes some complicated data preprocessing and augmentation that is done separatey for each model, you may want to do preprocessing and augmentation once and feed resulting batches of data to all these 4 models.

Figure above shows the difference.

On the left, simple workflow is shown. Same steps of common preprocessing are performed 4 times, and the batches that are generated after different runs of common stages are also different due to shuffling and possible randomisation inside common steps.

On the right, common steps are performed once on root stage and the very same batches are passed to different branches. This has the advantage of reducing extra computations but it also reduces variability becauce all models get exactly same pieces of data.

To perform root-branch division, one should pass root and branch parameters to add_pipeline() and define number of branches per root via branches parameter of run().

A root with corresponding branches is called a job. Note that different roots still produce different batches.

One constraint when using branches is that branch pipelines do not calculate dataset variables properly, so we have to redefine model_config and train_template and hard-code 'inputs/labels/classes' parameter


In [12]:
model_config={
    'inputs/images/shape': B('image_shape'),
    'inputs/labels/classes': 10,
    'inputs/labels/name': 'targets',
    'initial_block/inputs': 'images',
    'body/block/layout': C('layout'),
    'common/conv/use_bias': C('bias'),
}

train_template = (Pipeline()
            .init_variable('loss')
            .init_model('dynamic', VGG7, 'conv', config=model_config)
            .to_array()
            .train_model('conv', 
                         images=B('images'), labels=B('labels'),
                         fetches='loss', save_to=V('loss', mode='w'))
)

res_name = 'no_extra_dataload_research'
clear_previous_results(res_name)
    
research = (Research()
            .add_pipeline(root=train_root, branch=train_template, variables='loss')
            .init_domain(domain, n_reps=4))

research.run(n_iters=10, branches=8, name=res_name, bar=True)


Research no_extra_dataload_research is starting...
Domain updated: 0: 100%|██████████| 20/20.0 [01:46<00:00,  5.34s/it]
Out[12]:
<batchflow.research.research.Research at 0x7f0c5335fc88>

Scince every root is now assigned to 8 branches, there are only 2 jobs.

We can see that the whole research duration reduced. In this toy example we use only 10 iterations to make the effect of reduced dataset load more visible.

The numbers of results entries is the same.


In [13]:
research.load_results().df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 160 entries, 0 to 159
Data columns (total 8 columns):
name            160 non-null object
loss            160 non-null float64
iteration       160 non-null int64
sample_index    160 non-null object
layout          160 non-null object
bias            160 non-null object
repetition      160 non-null int64
update          160 non-null int64
dtypes: float64(1), int64(3), object(4)
memory usage: 10.1+ KB

Functions on Root

If each job has several branches, they are all executed in parallel threads. To run a function on root, one should add it with on_root=True.

Functions on root have required parameters iteration and experiments and optional keyword parameters. They are not allowed to return anything


In [14]:
res_name = 'on_root_research'
clear_previous_results(res_name)

def function_on_root():
    print('on root')
    
research = (Research()
            .add_callable(function_on_root, execute="#0", on_root=True)
            .add_pipeline(root=train_root, branch=train_template, variables='loss')
            .init_domain(domain, n_reps=4)
           )

research.run(branches=8, n_iters=10, name=res_name, bar=True)


Research on_root_research is starting...
Domain updated: 0: : 0it [00:00, ?it/s]
on root
Domain updated: 0:  50%|█████     | 10/20.0 [00:55<00:55,  5.57s/it]
on root
Domain updated: 0: 100%|██████████| 20/20.0 [01:45<00:00,  5.30s/it]
Out[14]:
<batchflow.research.research.Research at 0x7f0c4c43f198>

Improving Performance

Research can ran experiments in parallel if number of workers if defined in workers parameter. Each worker starts in a separate process and performs one or several jobs assigned to it. Moreover if several GPU's are accessible one can pass indices of GPUs to use via devices parameter.

Following parameters are also useful to control research execution:

  • timeout in run specifies time in minutes to kill non-responding job, default value is 5
  • trials in run specifies number of attempts to restart a job, default=2
  • dump in add_pipeline, add_callable and get_metrics tells how often results are written to disk and cleared. By default results are dumped on the last iteration, but if they consume too much memory one may want to do it more often. The format is same as execute

In [15]:
model_config={
    **model_config,
    'device': C('device'), # it's technical parameter for TFModel
}

test_root = mnist.test.p.run_later(BATCH_SIZE, shuffle=True, n_epochs=1) #Note  n_epochs=1

test_template = (Pipeline()
                 .init_variable('predictions')
                 .init_variable('metrics')
                 .import_model('conv', C('import_from'))
                 .to_array()
                 .predict_model('conv', 
                                images=B('images'), labels=B('labels'),
                                fetches='predictions', save_to=V('predictions'))
                 .gather_metrics('class', targets=B('labels'), predictions=V('predictions'), 
                                fmt='logits', axis=-1, save_to=V('metrics', mode='a')))

research = (Research()
            .add_pipeline(root=train_root, branch=train_template, variables='loss', name='train_ppl',
                          dump=TEST_EXECUTE_FREQ)
            .add_pipeline(root=test_root, branch=test_template, name='test_ppl',
                         execute=TEST_EXECUTE_FREQ, run=True, import_from=RP('train_ppl'))
            .get_metrics(pipeline='test_ppl', metrics_var='metrics', metrics_name='accuracy',
                         returns='accuracy', 
                         execute=TEST_EXECUTE_FREQ,
                         dump=TEST_EXECUTE_FREQ)
            .init_domain(domain, n_reps=4))

res_name = 'faster_research'
clear_previous_results(res_name)

research.run(n_iters=ITERATIONS, name=res_name, bar=True, 
             branches=2, workers=2, devices=[0, 1], 
             timeout=2, trials=1)


Research faster_research is starting...
Domain updated: 0: 100%|██████████| 8000/8000.0 [14:53<00:00,  8.95it/s]
Out[15]:
<batchflow.research.research.Research at 0x7f0ad26614a8>

In [16]:
results = research.load_results().df
results.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16160 entries, 0 to 16159
Data columns (total 9 columns):
name            16160 non-null object
loss            16000 non-null float64
accuracy        160 non-null float64
iteration       16160 non-null int64
sample_index    16160 non-null object
layout          16160 non-null object
bias            16160 non-null object
repetition      16160 non-null int64
update          16160 non-null int64
dtypes: float64(2), int64(3), object(4)
memory usage: 1.1+ MB

Cross-validation

One can easyly perform cross-validation with Research

Firstly we will define a dataset: we will use train subset of MNIST


In [17]:
mnist_train = MNIST().train
mnist_train.cv_split(n_splits=3)

Next, we define our train and test pipelines. To perform cross-validation, you can define train and test datasets as mnist_train.CV(C('fold')).train and mnist_test.CV(C('fold')).test, correspondingly.


In [18]:
model_config={
    'inputs/images/shape': B('image_shape'),
    'inputs/labels/classes': D('num_classes'),
    'inputs/labels/name': 'targets',
    'initial_block/inputs': 'images',
    'body/block/layout': C('layout'),
}

train_template = (Pipeline()
            .init_variable('train_loss')
            .init_model('dynamic', VGG7, 'conv', config=model_config)
            .to_array()
            .train_model('conv', 
                         images=B('images'), labels=B('labels'),
                         fetches='loss', save_to=V('train_loss', mode='w'))
            .run_later(BATCH_SIZE, shuffle=True, n_epochs=None)) << mnist_train.CV(C('fold')).train

test_template = (Pipeline()
                 .init_variable('predictions')
                 .init_variable('metrics')
                 .import_model('conv', C('import_from'))
                 .to_array()
                 .predict_model('conv', 
                                images=B('images'), labels=B('labels'),
                                fetches='predictions', save_to=V('predictions'))
                 .gather_metrics('class', targets=B('labels'), predictions=V('predictions'), 
                                fmt='logits', axis=-1, save_to=V('metrics', mode='a'))
                 .run_later(BATCH_SIZE, shuffle=True, n_epochs=1))  << mnist_train.CV(C('fold')).test

Then multiply your Domain by Option('fold', [0, 1, 2]).


In [19]:
domain = Option('layout', ['cna', 'can']) * Option('fold', [0, 1, 2])

research = (Research()
            .add_pipeline(train_template, dataset=mnist_train, variables='train_loss', name='train_ppl')
            .add_pipeline(test_template, dataset=mnist_train, name='test_ppl',
                         execute=TEST_EXECUTE_FREQ, run=True, import_from=RP('train_ppl'))
            .get_metrics(pipeline='test_ppl', metrics_var='metrics', metrics_name='accuracy', returns='accuracy', 
                         execute=TEST_EXECUTE_FREQ)
            .init_domain(domain))

res_name = 'cv_research'
clear_previous_results(res_name)

research.run(n_iters=ITERATIONS, name=res_name, bar=True, workers=1, devices=[0])


Research cv_research is starting...
Domain updated: 0: 100%|██████████| 6000/6000.0 [35:10<00:00,  2.84it/s] 
Out[19]:
<batchflow.research.research.Research at 0x7f0ad2668dd8>

We can now load results, specifying which folds to get if needed


In [20]:
results = research.load_results(fold=0).df
results.sample(5)


Out[20]:
name train_loss accuracy iteration sample_index layout fold repetition update
398 train_ppl 0.058208 NaN 398 3247113719 cna 0 0 0
1932 train_ppl 0.074481 NaN 922 1546370423 can 0 0 0
1567 train_ppl 0.032024 NaN 557 1546370423 can 0 0 0
1153 train_ppl 0.058229 NaN 143 1546370423 can 0 0 0
762 train_ppl 0.018407 NaN 762 3247113719 cna 0 0 0

In [21]:
from matplotlib import pyplot as plt
test_results = Results(path='cv_research', names= 'test_ppl_metrics',
                       concat_config=True, drop_columns=False).df

fig, ax = plt.subplots(1, 2, figsize=(15, 5))
for i, (config, df) in enumerate(test_results.groupby('config')):
    x, y = i//2, i%2
    df.pivot(index='iteration', columns='fold', values='accuracy').plot(ax=ax[y])
    ax[y].set_title(config)
    ax[y].set_xlabel('iteration')
    ax[y].set_ylabel('accuracy')
    ax[y].grid(True)
    ax[y].legend()