Training Networks in Parallel via a Single Solver

Imagine the following scenarios:

You're experimenting with a network on a given datasert, same data, different networks.

You're still trying to figure out the right topology (e.g. number of filters, number of conv. layers, etc.)

Ways to go about this:

1. Sequential training:

You configure your network. Trian it. Decide on what to do next. Re-configure accordingly, then train again.

Pros: You make decisions based on previous results. Your experimentation is somehwat guided. For small networks, you're not really utilizing the full potential of your resources (e.g. GPU RAM, idle GPU cores.)

Cons: This is purely sequential and narrowing things down can take a very long time.

2. Run training processes in parallel:

You define a different network definitions (e.g. trainval_XX.prototxt). You configure each differently. You start a training process for each. The processes run in parallel on the same GPU.

Pros: You don't have to wait till one experiment is finished to launch the next. No resources left idle.

Cons: All processes are competing for the same resource. There might be a sweet spot between parallel processing and avoding overhead due to context switching. The bigger problem is redundancy, especially if all networks are working off of the same data. Each of these processes has to do its own data copy from CPU to GPU RAM. The redundancy only affects training on GPU. CPU optimization doesn't have this issue, it's already slow enough as it is.

Below we demonstrate training networks in Caffe off of the same data in parrallel without the redundant data copy cost. Our networks all work off of the same data but have different architectures/configurations. Or they're all replicas and we're just looking at the effect of different random weight initializations. The point is to pay the cost of copying the data from CPU to GPU once for all networks.

We'll do this by defining a single solver for all. And merging the different network definitions into one. All layers that are not data layers (e.g. conv layer, ip layers, pooling, loss layers) need to have unique layer names + unique top names.

This example also verifies that training two networks in parallel leads to same results as training them in sequency. Networks that share a prototxt do not influence one another.


In [3]:
import os
import numpy as np
import caffe
from caffe import layers as L, params as P
import sys
sys.path.insert(0, '../../') # pkg root
from nideep.proto.proto_utils import Parser
from nideep.nets.net_merge import merge_indep_net_spec
    
print("Done importing")


Done importing

Define two toy networks programmatically

Partially from the caffe notebook example Learning LeNet The network definition is simplified to speed things up We define two networks for demonstration purposes


In [4]:
def lenet_simple(lmdb_train, lmdb_test, batch_size):
    # partially from the caffe notebook example "Learning LeNet"
    # source: http://nbviewer.jupyter.org/github/BVLC/caffe/blob/master/examples/01-learning-lenet.ipynb
    # network simplified to speed things up
    # our version of LeNet: a series of linear and simple nonlinear transformations
    if not os.path.isdir(lmdb_train):
        raise(IOError, "source for training data (%s) does not exist!" % (lmdb_train,))
    if not os.path.isdir(lmdb_test):
        raise(IOError, "source for test data (%s) does not exist!" % (lmdb_test,))
    
    n = caffe.NetSpec()
    
    n.data, n.label = \
    L.Data(batch_size=100, backend=P.Data.LMDB,
           source=lmdb_train,
           transform_param=dict(scale=0.0039215684),
           ntop=2,
           include=[dict(phase=caffe.TRAIN)]
           )
    # will fix naming of data layer blobs for test phase after defining remaining layers
    n.data_test, n.label_test = \
        L.Data(batch_size=100, backend=P.Data.LMDB,
               source=lmdb_test,
               transform_param=dict(scale=0.0039215684),
               ntop=2,
               include=[dict(phase=caffe.TEST)]
               )
    
    n.ip = L.InnerProduct(n.data, num_output=10, weight_filler=dict(type='xavier'))
    n.loss =  L.SoftmaxWithLoss(n.ip, n.label)
    
    n_proto = n.to_proto()
    
    # fix data layer for test phase
    for l in n_proto.layer:
        if l.type.lower() == 'data' and \
           [x.phase for x in l.include] == [caffe.TEST]:
            for t in list(l.top):
                l.top.remove(t)
                t = t.split('_test')[0]
                l.top.append(unicode(t))
            l.name = l.name.split('_test')[0]
    
    return n_proto

dir_dst = '../parallel_train/tmp/'
if not os.path.isdir(dir_dst):
    os.makedirs(dir_dst) # create subdirectory
# generate definitions of multiple networks
fpath_net0 = os.path.join(dir_dst, 'trainval0.prototxt') # will be generated
with open(fpath_net0, 'w') as f:
    f.write(str(lenet_simple(os.path.expanduser('~/data/mnist/mnist_train_lmdb'),
                             os.path.expanduser('~/data/mnist/mnist_test_lmdb'),
                             64)))

# define a 2nd network (coincidentally identical to first)
fpath_net1 = os.path.join(dir_dst, 'trainval1.prototxt') # will be generated
with open(fpath_net1, 'w') as f:
    f.write(str(lenet_simple(os.path.expanduser('~/data/mnist/mnist_train_lmdb'),
                             os.path.expanduser('~/data/mnist/mnist_test_lmdb'),
                             64)))

print("Done generating network prototxts.")


Done generating network prototxts.

We want to show that training in parallel leads to the same results as training in sequence. To do so we'll:

  1. define a function to save weights and biases in a network to file
  2. define a function to save the initial weights and biases of a network to file.
  3. define a function to save the weights and biases after a few learning steps.
  4. !!! define a function to merge two arbitrary network definitions into a single prototxt !!!
  5. define a function to overwrite the inital weights of a network, train for a while then save the weights and biases

Note: This is not the ideal way of save/load weights in Caffe. Caffe offers a much better API for doing these kind of things. We define these functions for demonstration and quick verification purposes.


In [2]:
def save_param(net, key, dir_dst, suffix):
    """
    Save weights and biases in a network to file
    """
    w = np.copy(net.params[key][0].data)
    b = np.copy(net.params[key][1].data)
    fpath_dst = os.path.join(dir_dst, '%s_%s.npz' % (key, suffix))
    np.savez(fpath_dst, w=w, b=b)
    return fpath_dst
    
def save_init_param(fpath_solver, key, dir_dst, suffix):
    """
    Save the initial weights and biases of a network to file.
    """
    solver = caffe.SGDSolver(fpath_solver)
    return save_param(solver.net, key, dir_dst, suffix)

In [6]:
fpath_s0 = "../parallel_train/solver0.prototxt" # must exist
fpath_s1 = "../parallel_train/solver1.prototxt" # must exist

dir_dst = '../parallel_train/tmp/ip'
if not os.path.isdir(dir_dst):
    os.makedirs(dir_dst) # create subdirectory
fpath_ip0 = save_init_param(fpath_s0, 'ip', dir_dst, '0')
fpath_ip1 = save_init_param(fpath_s1, 'ip', dir_dst, '1')

print("Initial weights and biases of")
print('network \"0\" saved to: %s' % (fpath_ip0,))
print('network \"1\" saved to: %s' % (fpath_ip1,))


Initial weights and biases of
network "0" saved to: ../parallel_train/tmp/ip/ip_0.npz
network "1" saved to: ../parallel_train/tmp/ip/ip_1.npz

Perform n training steps and save intermediate values of weights and biases


In [7]:
def init_step_save_param(fpath_solver, fpath_ip, n, key, dir_dst, suffix):
    """
    Save the weights and biases after a few learning steps.
    """
    solver = caffe.SGDSolver(fpath_solver)
    
    # overwrite weights and biases
    ip = np.load(fpath_ip)
    solver.net.params[key][0].data[...] = np.copy(ip['w'])
    solver.net.params[key][1].data[...] = np.copy(ip['b'])
    
    solver.step(n)
    
    #print key
    #print(solver.net.params[key][0].data)
    #print(solver.net.params[key][1].data)
    
    return save_param(solver.net, key, dir_dst, suffix)

n = 5 # no. of training steps
fpath_ip0_i = init_step_save_param(fpath_s0, fpath_ip0, n, 'ip', dir_dst, '0_i')
fpath_ip1_i = init_step_save_param(fpath_s1, fpath_ip1, n, 'ip', dir_dst, '1_i')

print("Intermediate weights and biases of")
print('network \"0\" saved to: %s' % (fpath_ip0_i,))
print('network \"1\" saved to: %s' % (fpath_ip0_i,))


ip
[[ 0.05788097  0.02536247  0.03783927 ...,  0.05660211  0.03306225
  -0.04104435]
 [-0.00567849  0.00575791 -0.03933674 ..., -0.05032981  0.01894811
   0.01277347]
 [-0.01231461 -0.0501898  -0.01190227 ..., -0.01161266 -0.00670796
  -0.02583147]
 ..., 
 [ 0.00824084  0.05121026 -0.04818993 ...,  0.0186904   0.0527554
   0.04773097]
 [-0.02646933 -0.04551386 -0.05093502 ..., -0.04702529  0.05106469
  -0.0054607 ]
 [-0.03486231 -0.05131749 -0.02674159 ..., -0.02417965 -0.03152768
  -0.05476236]]
[-0.00150471 -0.00129049  0.00271865 -0.00013088  0.00024002 -0.00424293
 -0.00042385  0.00182542 -0.00097998  0.00378875]
ip
[[-0.0470377  -0.00183553 -0.03747605 ...,  0.0592008   0.00641022
  -0.03487132]
 [ 0.01765465 -0.03570665  0.01032893 ...,  0.02926776 -0.05124031
  -0.0522294 ]
 [ 0.03997116 -0.03270002 -0.06129517 ...,  0.06030603 -0.0568833
   0.01372554]
 ..., 
 [ 0.00655881 -0.01598678 -0.04183325 ...,  0.02304253  0.03658488
  -0.02717597]
 [-0.05533621 -0.04034351  0.03046663 ...,  0.02740273 -0.0246003
   0.03383166]
 [-0.02728209 -0.01449388 -0.01941105 ...,  0.00589001  0.03685413
   0.01307978]]
[ 0.00224969  0.00303182 -0.00155976 -0.00104922 -0.00371704  0.00202894
 -0.00078172  0.00137098 -0.00360355  0.00202984]
Intermediate weights and biases of
network "0" saved to: ../parallel_train/tmp/ip/ip_0_i.npz
network "1" saved to: ../parallel_train/tmp/ip/ip_0_i.npz

Merge both networks into a single prototxt file


In [8]:
##################################################################
def merge_nets(fpath_net_1, fpath_net_2, fpath_dst):
    """
    Merge two arbitrary network definitions into a single prototxt.
    """
    n1 = Parser().from_net_params_file(fpath_net_1)
    n2 = Parser().from_net_params_file(fpath_net_2)
    n_str = merge_indep_net_spec([n1, n2])
    
    with open(fpath_dst, 'w') as f:
        f.write(n_str)
    return
##################################################################

fpath_m = "../parallel_train/tmp/m.prototxt" # will be created
merge_nets(fpath_net0, fpath_net1, fpath_m)
print("Merged network with independent tracks defined in %s" % (fpath_m))


Merged network with independent tracks defined in ../parallel_train/tmp/m.prototxt

Train merged network for the same number of iterations. Then save intermediate weights and biases


In [9]:
def init_merged_net_and_step_and_save(fpath_m, fpath_ip0, fpath_ip1, n, key, dir_dst, suffix):
    """
    Overwrite the inital weights of a network, train for a while then save the weights and biases
    """
    solver = caffe.SGDSolver(fpath_m)
    
    # overwrite weights and biases
    ip0 = np.load(fpath_ip0)
    solver.net.params[key+'_nidx_00'][0].data[...] = np.copy(ip0['w'])
    solver.net.params[key+'_nidx_00'][1].data[...] = np.copy(ip0['b'])
    
    assert(np.all(solver.net.params[key+'_nidx_00'][0].data == ip0['w']))
    assert(np.all(solver.net.params[key+'_nidx_00'][1].data == ip0['b']))
    
    ip1 = np.load(fpath_ip1)
    solver.net.params[key+'_nidx_01'][0].data[...] = np.copy(ip1['w'])
    solver.net.params[key+'_nidx_01'][1].data[...] = np.copy(ip1['b'])
    
    assert(np.all(solver.net.params[key+'_nidx_01'][0].data == ip1['w']))
    assert(np.all(solver.net.params[key+'_nidx_01'][1].data == ip1['b']))
    
    # do some training
    solver.step(n)
    # check training lead to changed weights
    assert(not np.all(solver.net.params[key+'_nidx_01'][0].data == ip0['w']))
    assert(not np.all(solver.net.params[key+'_nidx_01'][1].data == ip0['b']))
    
    assert(not np.all(solver.net.params[key+'_nidx_01'][0].data == ip1['w']))
    assert(not np.all(solver.net.params[key+'_nidx_01'][1].data == ip1['b']))
    
    #print(solver.net.params[key+'_nidx_00'][0].data)
    #print(solver.net.params[key+'_nidx_00'][1].data)
    
    return [save_param(solver.net, key+'_nidx_00', dir_dst, suffix),
            save_param(solver.net, key+'_nidx_01', dir_dst, suffix)]


fpath_solver_m = "../parallel_train/solver_m.prototxt" # must exist
    
fpath_ip0m_i, fpath_ip1m_i = \
init_merged_net_and_step_and_save(fpath_solver_m, fpath_ip0, fpath_ip1, n, 'ip', dir_dst, 'm_i')

print("Intermediate weights and biases of merged network saved to")
print('%s and %s' % (fpath_ip0m_i, fpath_ip1m_i))


[[ 0.05788097  0.02536247  0.03783927 ...,  0.05660211  0.03306225
  -0.04104435]
 [-0.00567849  0.00575791 -0.03933674 ..., -0.05032981  0.01894811
   0.01277347]
 [-0.01231461 -0.0501898  -0.01190227 ..., -0.01161266 -0.00670796
  -0.02583147]
 ..., 
 [ 0.00824084  0.05121026 -0.04818993 ...,  0.0186904   0.0527554
   0.04773097]
 [-0.02646933 -0.04551386 -0.05093502 ..., -0.04702529  0.05106469
  -0.0054607 ]
 [-0.03486231 -0.05131749 -0.02674159 ..., -0.02417965 -0.03152768
  -0.05476236]]
[-0.00150471 -0.00129049  0.00271865 -0.00013088  0.00024002 -0.00424293
 -0.00042385  0.00182542 -0.00097998  0.00378875]
Intermediate weights and biases of merged network saved to
../parallel_train/tmp/ip/ip_nidx_00_m_i.npz and ../parallel_train/tmp/ip/ip_nidx_01_m_i.npz

Verify that merged network arrived at the exact same weight and bias values as the sequential training of the sub networks.


In [15]:
ip0m_i = np.load(fpath_ip0m_i)
ip0_i = np.load(fpath_ip0_i)

if np.all(ip0m_i['w']==ip0_i['w']):
    print("Weights are identical for sub network 0")
else:
    print('ip0m_i weights', ip0m_i['w'])
    print('ip0_i weights', ip0_i['w'])
    raise(ValueError, "Weights mismatch for sub network 0!")
    
if np.all(ip0m_i['b']==ip0_i['b']):
    print("Biases are identical for sub network 0")
else:
    raise(ValueError, "Bias mismatch for sub network 0!")

ip1m_i = np.load(fpath_ip1m_i)
ip1_i = np.load(fpath_ip1_i)

if np.all(ip1m_i['w']==ip1_i['w']):
    print("Weights are identical for sub network 1")
else:
    raise(ValueError, "Weights mismatch for sub network 1!")
    
if np.all(ip1m_i['b']==ip1_i['b']):
    print("Biases are identical for sub network 1")
else:
    raise(ValueError, "Bias mismatch for sub network 1!")

# sanity cross-check
assert(not np.all(ip0m_i['w']==ip1_i['w']))
assert(not np.all(ip1m_i['w']==ip0_i['w']))
assert(not np.all(ip0m_i['b']==ip1_i['b']))
assert(not np.all(ip1m_i['b']==ip0_i['b']))

# if we reach here, all's gone well
print("""\nTraining two networks in parallel by merging them into the same prototxt
    results in same results as training them sequentially""")


Weights are identical for sub network 0
Biases are identical for sub network 0
Weights are identical for sub network 1
Biases are identical for sub network 1

Training two networks in parallel by merging them into the same prototxt
    results in same results as training them sequentially

In [ ]: