In [18]:
%autosave 100


Autosaving every 100 seconds

Tuesday 28th of October 2014

Goals for the day:

  • Rewrite the clustering routine to run in parallel and potentially make use of grid engine
  • Search Stackoverflow, make a new post or edit myself the code for sub-subplot generation to automatically generate pyplot axes for visualizing several metrics for one cluster
  • Attempt to send jobs to remote servers from inside the notebook
  • Make an automated panel of phenotype profiles for a number of subject clusters

Make parallel clustering possible

Calcul Canada is using moab because why not. So we can submit jobs with the msub command to moab. Or we can submit jobs through Torque using the qsub command. Why Torque uses the same commands as the SGE is beyond me but in any case I will have to use system calls to spawn the jobs.

Interestingly, I could do all of that through ssh from my workstation and save the trouble of loging into the clusters directly. This is really nice because then I don't need to start up putty or my vbox. The -t option to ssh even allows me to stream the terminal output.

Send commands to the remote machine through ssh


In [4]:
import subprocess

In [11]:
print(subprocess.check_output(['ssh', '-t', 'gui', 'ls', '-lht']))


total 0
srw-rw---- 1 surchs gsf-624-02   0 Oct  7 04:14 abide
drwxr-xr-x 5 surchs gsf-624-02 512 Oct  7 04:06 code
drwxrwxr-x 3 surchs gsf-624-02 512 May 20 00:04 Projects
srw-rw---- 1 surchs gsf-624-02   0 May 16 09:27 piper
drwxrwxr-x 2 surchs gsf-624-02 512 Feb 12  2014 notes
-rw-rw-r-- 1 surchs gsf-624-02 113 Feb 12  2014 config
lrwxrwxrwx 1 surchs gsf-624-02  19 Feb 12  2014 scratch -> /gs/scratch/surchs/
lrwxrwxrwx 1 surchs gsf-624-02  32 Feb 12  2014 database -> /sb/project/gsf-624-aa/database/

Ok, cool that works well.

Now I need a python module that loads the abide data and then spawns a couple of parallel jobs for computing the linkage based on the different networks that I loaded. For the scale 10 Cambridge template that would be 10 jobs, for scale 50 it would be 50 and so on. It's probably not really worth it, seeing that there could be a maximum of 8 parallel jobs on a node but I still want to do it.

Building a python module to handle parallel linkage computation


In [19]:
import numpy as np
import nibabel as nib
import multiprocessing as mp

In [14]:
# Build the function
def link_network(data, network, method='euclidean', metric=None):
    """
    Create linkage based on maps of one network. If the data array
    is oriented the wrong way, this can potentially exceed the available
    memory because the distance matrix will be computed across subjects
    and not across voxels.
    """
    distance = dist.squareform(dist.pdist(array_dict[metric][..., n_id], 'euclidean'))
    linkage = clh.linkage(eucl, method='ward')
    
    return distance, linkage

In [20]:
a = np.random.random((10,10))

In [25]:
b = mp.Array('d',a.flatten())

In [31]:
b.acquire()


Out[31]:
True

Dealing with shared memory inside the loop

When spawning multiple subprocesses in the node, I don't want to create copies of the input data because it is rather large. The problem seems to be that the multiprocessing array is an absolute bitch to work with, only accepts 1 dimensional arrays and I have no idea how to read the data in the first place.

Ok, here is a nice example. This same page also says that I probably don't have to worry about data being copied at all because things will be forked on linux. I think I'll try that with a quick test.

Goal: To run the linkage function on multiple processes while keeping track of memory consumption (if this is possible).

Implementation: First I need to wrap my methods into importable packages. This is helpful in general and also the only way to run the memory profiler.


In [7]:
# Imports
import brainbox as bb
import multiprocessing as mp

In [8]:
# Load extensions
%load_ext memory_profiler


The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler

In [9]:
# Paths
debug_path = '/data1/abide/Test/Out/Debug/All'

In [10]:
%memit file_dict = bb.fileOps.grab_files(debug_path, '.nii.gz', 'stability_maps')


I will be pulling files from /data1/abide/Test/Out/Debug/All/stability_maps
I will be pulling files from /data1/abide/Test/Out/Debug/All/stability_maps
I will be pulling files from /data1/abide/Test/Out/Debug/All/stability_maps
I will be pulling files from /data1/abide/Test/Out/Debug/All/stability_maps
peak memory: 385.69 MiB, increment: 0.11 MiB

In [11]:
%memit data_dict = bb.fileOps.read_files(file_dict)


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-11-5a235fbca054> in <module>()
----> 1 get_ipython().magic(u'memit data_dict = bb.fileOps.read_files(file_dict, 100)')

/home/surchs/Enthought/Canopy_64bit/User/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s)
   2203         magic_name, _, magic_arg_s = arg_s.partition(' ')
   2204         magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2205         return self.run_line_magic(magic_name, magic_arg_s)
   2206 
   2207     #-------------------------------------------------------------------------

/home/surchs/Enthought/Canopy_64bit/User/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line)
   2124                 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
   2125             with self.builtin_trap:
-> 2126                 result = fn(*args,**kwargs)
   2127             return result
   2128 

/home/surchs/Enthought/Canopy_64bit/User/lib/python2.7/site-packages/memory_profiler.pyc in magic_memit(self, line)
    775         tmp = memory_usage((_func_exec, (stmt, self.shell.user_ns)),
    776                            timeout=timeout, interval=interval, max_usage=True,
--> 777                            include_children=include_children)
    778         mem_usage = max(mem_usage, tmp[0])
    779 

/home/surchs/Enthought/Canopy_64bit/User/lib/python2.7/site-packages/memory_profiler.pyc in memory_usage(proc, interval, timeout, timestamps, include_children, max_usage, retval, stream)
    225             p.start()
    226             parent_conn.recv()  # wait until we start getting memory
--> 227             returned = f(*args, **kw)
    228             parent_conn.send(0)  # finish timing
    229             ret = parent_conn.recv()

/home/surchs/Enthought/Canopy_64bit/User/lib/python2.7/site-packages/memory_profiler.pyc in _func_exec(stmt, ns)
    715     # helper for magic_memit, just a function proxy for the exec
    716     # statement
--> 717     exec(stmt, ns)
    718 
    719 # a timeit-style %memit magic for IPython

<string> in <module>()

TypeError: read_files() takes exactly 1 argument (2 given)

In [6]:
def par_link(conf_tuple):
    data_dict, network, method, metric = conf_tuple
    distance, linkage = bb.dataOps.calc_link(data_dict, network, method='euclidean', metric='stability_maps')
    return distance, linkage

In [7]:
# Prepare conf
conf_list = list()
for i in range(10):
    conf_list.append((data_dict, i, 'euclidean', 'stability_maps'))

In [8]:
# Set up parallel run
def parallel_run(workers, conf_list):
    p = mp.Pool(workers)
    par_results = p.map(par_link, conf_list)
    return par_results

In [9]:
# Set up sequential run
def sequential_run(conf_list):
    seq_results = []
    for conf in conf_list:
        seq_results.append(par_link(conf))
    return seq_results

In [14]:
# Sequential
%memit s_results = sequential_run(conf_list)


peak memory: 417.61 MiB, increment: 32.21 MiB

In [15]:
%memit print('hello')


hello
hello
hello
hello
hello
hello
hello
peak memory: 385.49 MiB, increment: 0.02 MiB

In [16]:
# Parallel
%memit p_results = parallel_run(4, conf_list)


peak memory: 997.58 MiB, increment: 612.09 MiB

In [17]:
%memit print('hello')


hello
hello
hello
hello
hello
hello
hello
peak memory: 385.56 MiB, increment: 0.00 MiB

Ok, I think this is pretty convincing evidence that the parallel jobs copy stuff around and don't just move memory pointers. That's a problem since the raw data for all subjects will be a couple of Gs and then they get bounced around even more. So we should find a way to use shared memory between the processes.


In [18]:
processes = [mp.Process(target=par_link, args=(data_dict, i, 'euclidean', 'stability_maps')) for i in xrange(10)]

In [19]:
processes


Out[19]:
[<Process(Process-46, initial)>,
 <Process(Process-47, initial)>,
 <Process(Process-48, initial)>,
 <Process(Process-49, initial)>,
 <Process(Process-50, initial)>,
 <Process(Process-51, initial)>,
 <Process(Process-52, initial)>,
 <Process(Process-53, initial)>,
 <Process(Process-54, initial)>,
 <Process(Process-55, initial)>]

In [ ]: