We start with some useful imports and constant definitions
In [1]:
import sys
import os
import shutil
import warnings
warnings.filterwarnings('ignore')
from tensorflow import logging
logging.set_verbosity(logging.ERROR)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import matplotlib
%matplotlib inline
In [2]:
sys.path.append('../../..')
from batchflow import Pipeline, B, C, V, D, L
from batchflow.opensets import MNIST
from batchflow.models.tf import VGG7, VGG16
from batchflow.research import Research, Option, Results, RP
In [3]:
BATCH_SIZE=64
ITERATIONS=1000
TEST_EXECUTE_FREQ=100
In [4]:
def clear_previous_results(res_name):
if os.path.exists(res_name):
shutil.rmtree(res_name)
In previous tutorial we learned how to use Research to run experimetrs multiple times and with varying parameters.
Firstly we define a dataset to work with and a pipeline that reads this dataset
In [5]:
mnist = MNIST()
train_root = mnist.train.p.run_later(BATCH_SIZE, shuffle=True, n_epochs=None)
Then we define a grid of parameters whose nodes will be used to form separate experiments
In [6]:
domain = Option('layout', ['cna', 'can']) * Option('bias', [True, False])
These parameters can be passed to model's configs using named expressions.
In [7]:
model_config={
'inputs/images/shape': B('image_shape'),
'inputs/labels/classes': D('num_classes'),
'inputs/labels/name': 'targets',
'initial_block/inputs': 'images',
'body/block/layout': C('layout'),
'common/conv/use_bias': C('bias'),
}
After that we define a pipeline to run during our experiments. We initialise a pipeline variable 'loss'
to store loss on each iteration
In [8]:
train_template = (Pipeline()
.init_variable('loss')
.init_model('dynamic', VGG7, 'conv', config=model_config)
.to_array()
.train_model('conv',
images=B('images'), labels=B('labels'),
fetches='loss', save_to=V('loss', mode='w'))
)
Each research is assigned with a name and writes its results to a folder with this name. The names must be unique, so if one attempts to run a research with a name that already exists, an error will be thrown. In the cell below we clear the results of previous research runs so as to allow multiple runs of a research. This is done solely for purposes of ths tutorial and should not be done in real work
In [9]:
res_name = 'simple_research'
clear_previous_results(res_name)
Finally we define a Research that runs the pipeline substituting its parameters using different nodes of the grid
, and saves values of the 'loss'
named expressions to results.
In [10]:
research = (Research()
.add_pipeline(train_root + train_template, variables='loss')
.init_domain(domain, n_reps=4))
research.run(n_iters=10, name=res_name, bar=True)
Out[10]:
16 experiments are run (4 grid nodes x 4 repetitions) each consisting of 10 iterations.
We can load results of the research and see that the table has 160 entries.
In [11]:
research.load_results().df.info()
Each experiment can be divided into 2 stages: root stage that is roughly same for all experiments (for example, data loading and preprocessing) and branch stage that varies. If data loading and preprocessing take significant time one can use the batches generated on a single root stage to feed to several branches that belong to different experiments.
For example, if you want to test 4 different models, and yor workflow includes some complicated data preprocessing and augmentation that is done separatey for each model, you may want to do preprocessing and augmentation once and feed resulting batches of data to all these 4 models.
Figure above shows the difference.
On the left, simple workflow is shown. Same steps of common preprocessing are performed 4 times, and the batches that are generated after different runs of common stages are also different due to shuffling and possible randomisation inside common steps.
On the right, common steps are performed once on root stage and the very same batches are passed to different branches. This has the advantage of reducing extra computations but it also reduces variability becauce all models get exactly same pieces of data.
To perform root-branch division, one should pass root
and branch
parameters to add_pipeline()
and define number of branches per root via branches
parameter of run()
.
A root with corresponding branches is called a job. Note that different roots still produce different batches.
One constraint when using branches is that branch pipelines do not calculate dataset variables properly, so we have to redefine model_config
and train_template
and hard-code 'inputs/labels/classes'
parameter
In [12]:
model_config={
'inputs/images/shape': B('image_shape'),
'inputs/labels/classes': 10,
'inputs/labels/name': 'targets',
'initial_block/inputs': 'images',
'body/block/layout': C('layout'),
'common/conv/use_bias': C('bias'),
}
train_template = (Pipeline()
.init_variable('loss')
.init_model('dynamic', VGG7, 'conv', config=model_config)
.to_array()
.train_model('conv',
images=B('images'), labels=B('labels'),
fetches='loss', save_to=V('loss', mode='w'))
)
res_name = 'no_extra_dataload_research'
clear_previous_results(res_name)
research = (Research()
.add_pipeline(root=train_root, branch=train_template, variables='loss')
.init_domain(domain, n_reps=4))
research.run(n_iters=10, branches=8, name=res_name, bar=True)
Out[12]:
Scince every root is now assigned to 8 branches, there are only 2 jobs.
We can see that the whole research duration reduced. In this toy example we use only 10 iterations to make the effect of reduced dataset load more visible.
The numbers of results entries is the same.
In [13]:
research.load_results().df.info()
If each job has several branches, they are all executed in parallel threads. To run a function on root, one should add it with on_root=True
.
Functions on root have required parameters iteration
and experiments
and optional keyword parameters. They are not allowed to return anything
In [14]:
res_name = 'on_root_research'
clear_previous_results(res_name)
def function_on_root():
print('on root')
research = (Research()
.add_callable(function_on_root, execute="#0", on_root=True)
.add_pipeline(root=train_root, branch=train_template, variables='loss')
.init_domain(domain, n_reps=4)
)
research.run(branches=8, n_iters=10, name=res_name, bar=True)
Out[14]:
Research can ran experiments in parallel if number of workers if defined in workers
parameter.
Each worker starts in a separate process and performs one or several jobs assigned to it. Moreover if several GPU's are accessible one can pass indices of GPUs to use via devices
parameter.
Following parameters are also useful to control research execution:
timeout
in run
specifies time in minutes to kill non-responding job, default value is 5trials
in run
specifies number of attempts to restart a job, default=2dump
in add_pipeline
, add_callable
and get_metrics
tells how often results are written to disk and cleared. By default results are dumped on the last iteration, but if they consume too much memory one may want to do it more often. The format is same as execute
In [15]:
model_config={
**model_config,
'device': C('device'), # it's technical parameter for TFModel
}
test_root = mnist.test.p.run_later(BATCH_SIZE, shuffle=True, n_epochs=1) #Note n_epochs=1
test_template = (Pipeline()
.init_variable('predictions')
.init_variable('metrics')
.import_model('conv', C('import_from'))
.to_array()
.predict_model('conv',
images=B('images'), labels=B('labels'),
fetches='predictions', save_to=V('predictions'))
.gather_metrics('class', targets=B('labels'), predictions=V('predictions'),
fmt='logits', axis=-1, save_to=V('metrics', mode='a')))
research = (Research()
.add_pipeline(root=train_root, branch=train_template, variables='loss', name='train_ppl',
dump=TEST_EXECUTE_FREQ)
.add_pipeline(root=test_root, branch=test_template, name='test_ppl',
execute=TEST_EXECUTE_FREQ, run=True, import_from=RP('train_ppl'))
.get_metrics(pipeline='test_ppl', metrics_var='metrics', metrics_name='accuracy',
returns='accuracy',
execute=TEST_EXECUTE_FREQ,
dump=TEST_EXECUTE_FREQ)
.init_domain(domain, n_reps=4))
res_name = 'faster_research'
clear_previous_results(res_name)
research.run(n_iters=ITERATIONS, name=res_name, bar=True,
branches=2, workers=2, devices=[0, 1],
timeout=2, trials=1)
Out[15]:
In [16]:
results = research.load_results().df
results.info()
One can easyly perform cross-validation with Research
Firstly we will define a dataset: we will use train subset of MNIST
In [17]:
mnist_train = MNIST().train
mnist_train.cv_split(n_splits=3)
Next, we define our train and test pipelines. To perform cross-validation, you can define train and test datasets as mnist_train.CV(C('fold')).train
and mnist_test.CV(C('fold')).test
, correspondingly.
In [18]:
model_config={
'inputs/images/shape': B('image_shape'),
'inputs/labels/classes': D('num_classes'),
'inputs/labels/name': 'targets',
'initial_block/inputs': 'images',
'body/block/layout': C('layout'),
}
train_template = (Pipeline()
.init_variable('train_loss')
.init_model('dynamic', VGG7, 'conv', config=model_config)
.to_array()
.train_model('conv',
images=B('images'), labels=B('labels'),
fetches='loss', save_to=V('train_loss', mode='w'))
.run_later(BATCH_SIZE, shuffle=True, n_epochs=None)) << mnist_train.CV(C('fold')).train
test_template = (Pipeline()
.init_variable('predictions')
.init_variable('metrics')
.import_model('conv', C('import_from'))
.to_array()
.predict_model('conv',
images=B('images'), labels=B('labels'),
fetches='predictions', save_to=V('predictions'))
.gather_metrics('class', targets=B('labels'), predictions=V('predictions'),
fmt='logits', axis=-1, save_to=V('metrics', mode='a'))
.run_later(BATCH_SIZE, shuffle=True, n_epochs=1)) << mnist_train.CV(C('fold')).test
Then multiply your Domain
by Option('fold', [0, 1, 2])
.
In [19]:
domain = Option('layout', ['cna', 'can']) * Option('fold', [0, 1, 2])
research = (Research()
.add_pipeline(train_template, dataset=mnist_train, variables='train_loss', name='train_ppl')
.add_pipeline(test_template, dataset=mnist_train, name='test_ppl',
execute=TEST_EXECUTE_FREQ, run=True, import_from=RP('train_ppl'))
.get_metrics(pipeline='test_ppl', metrics_var='metrics', metrics_name='accuracy', returns='accuracy',
execute=TEST_EXECUTE_FREQ)
.init_domain(domain))
res_name = 'cv_research'
clear_previous_results(res_name)
research.run(n_iters=ITERATIONS, name=res_name, bar=True, workers=1, devices=[0])
Out[19]:
We can now load results, specifying which folds to get if needed
In [20]:
results = research.load_results(fold=0).df
results.sample(5)
Out[20]:
In [21]:
from matplotlib import pyplot as plt
test_results = Results(path='cv_research', names= 'test_ppl_metrics',
concat_config=True, drop_columns=False).df
fig, ax = plt.subplots(1, 2, figsize=(15, 5))
for i, (config, df) in enumerate(test_results.groupby('config')):
x, y = i//2, i%2
df.pivot(index='iteration', columns='fold', values='accuracy').plot(ax=ax[y])
ax[y].set_title(config)
ax[y].set_xlabel('iteration')
ax[y].set_ylabel('accuracy')
ax[y].grid(True)
ax[y].legend()