In [18]:
%autosave 100


Autosaving every 100 seconds

Tuesday 28th of October 2014

Goals for the day:

  • Rewrite the clustering routine to run in parallel and potentially make use of grid engine
  • Search Stackoverflow, make a new post or edit myself the code for sub-subplot generation to automatically generate pyplot axes for visualizing several metrics for one cluster
  • Attempt to send jobs to remote servers from inside the notebook
  • Make an automated panel of phenotype profiles for a number of subject clusters

Make parallel clustering possible

Calcul Canada is using moab because why not. So we can submit jobs with the msub command to moab. Or we can submit jobs through Torque using the qsub command. Why Torque uses the same commands as the SGE is beyond me but in any case I will have to use system calls to spawn the jobs.

Interestingly, I could do all of that through ssh from my workstation and save the trouble of loging into the clusters directly. This is really nice because then I don't need to start up putty or my vbox. The -t option to ssh even allows me to stream the terminal output.

Send commands to the remote machine through ssh


In [4]:
import subprocess

In [11]:
print(subprocess.check_output(['ssh', '-t', 'gui', 'ls', '-lht']))


total 0
srw-rw---- 1 surchs gsf-624-02   0 Oct  7 04:14 abide
drwxr-xr-x 5 surchs gsf-624-02 512 Oct  7 04:06 code
drwxrwxr-x 3 surchs gsf-624-02 512 May 20 00:04 Projects
srw-rw---- 1 surchs gsf-624-02   0 May 16 09:27 piper
drwxrwxr-x 2 surchs gsf-624-02 512 Feb 12  2014 notes
-rw-rw-r-- 1 surchs gsf-624-02 113 Feb 12  2014 config
lrwxrwxrwx 1 surchs gsf-624-02  19 Feb 12  2014 scratch -> /gs/scratch/surchs/
lrwxrwxrwx 1 surchs gsf-624-02  32 Feb 12  2014 database -> /sb/project/gsf-624-aa/database/

Ok, cool that works well.

Now I need a python module that loads the abide data and then spawns a couple of parallel jobs for computing the linkage based on the different networks that I loaded. For the scale 10 Cambridge template that would be 10 jobs, for scale 50 it would be 50 and so on. It's probably not really worth it, seeing that there could be a maximum of 8 parallel jobs on a node but I still want to do it.

Building a python module to handle parallel linkage computation


In [19]:
import numpy as np
import nibabel as nib
import multiprocessing as mp

In [14]:
# Build the function
def link_network(data, network, method='euclidean', metric=None):
    """
    Create linkage based on maps of one network. If the data array
    is oriented the wrong way, this can potentially exceed the available
    memory because the distance matrix will be computed across subjects
    and not across voxels.
    """
    distance = dist.squareform(dist.pdist(array_dict[metric][..., n_id], 'euclidean'))
    linkage = clh.linkage(eucl, method='ward')
    
    return distance, linkage

In [20]:
a = np.random.random((10,10))

In [25]:
b = mp.Array('d',a.flatten())

In [31]:
b.acquire()


Out[31]:
True

Dealing with shared memory inside the loop

When spawning multiple subprocesses in the node, I don't want to create copies of the input data because it is rather large. The problem seems to be that the multiprocessing array is an absolute bitch to work with, only accepts 1 dimensional arrays and I have no idea how to read the data in the first place.

Ok, here is a nice example. This same page also says that I probably don't have to worry about data being copied at all because things will be forked on linux. I think I'll try that with a quick test.

Goal: To run the linkage function on multiple processes while keeping track of memory consumption (if this is possible).

Implementation: First I need to wrap my methods into importable packages. This is helpful in general and also the only way to run the memory profiler.


In [1]:
# Imports
import brainbox as bb
import multiprocessing as mp

In [2]:
# Load extensions
%load_ext memory_profiler

In [10]:
# Paths
debug_path = '/data1/abide/Test/Out/Debug/All'
real_path = '/data1/abide/Out/Remote/some_failed/out'

In [11]:
%memit file_dict = bb.fileOps.grab_files(real_path, '.nii.gz', 'stability_maps')


I will be pulling files from /data1/abide/Out/Remote/some_failed/out/stability_maps
I will be pulling files from /data1/abide/Out/Remote/some_failed/out/stability_maps
I will be pulling files from /data1/abide/Out/Remote/some_failed/out/stability_maps
peak memory: 453.38 MiB, increment: 0.00 MiB

In [12]:
%memit data_dict = bb.fileOps.read_files(file_dict, 2)


I found 607 files to load.
 100.0 % done 0.00 seconds to go.
We are done
peak memory: 1309.89 MiB, increment: 856.51 MiB

In [6]:
def par_link(conf_tuple):
    data_dict, network, method, metric = conf_tuple
    distance, linkage = bb.dataOps.calc_link(data_dict, network, method='euclidean', metric='stability_maps')
    return distance, linkage

In [15]:
data_dict['stability_maps'].shape


Out[15]:
(607, 176384)

In [7]:
# Prepare conf
conf_list = list()
for i in range(10):
    conf_list.append((data_dict, i, 'euclidean', 'stability_maps'))

In [8]:
# Set up parallel run
def parallel_run(workers, conf_list):
    p = mp.Pool(workers)
    par_results = p.map(par_link, conf_list)
    return par_results

In [9]:
# Set up sequential run
def sequential_run(conf_list):
    seq_results = []
    for conf in conf_list:
        seq_results.append(par_link(conf))
    return seq_results

In [14]:
# Sequential
%memit s_results = sequential_run(conf_list)


peak memory: 417.61 MiB, increment: 32.21 MiB

In [15]:
%memit print('hello')


hello
hello
hello
hello
hello
hello
hello
peak memory: 385.49 MiB, increment: 0.02 MiB

In [16]:
# Parallel
%memit p_results = parallel_run(4, conf_list)


peak memory: 997.58 MiB, increment: 612.09 MiB

In [17]:
%memit print('hello')


hello
hello
hello
hello
hello
hello
hello
peak memory: 385.56 MiB, increment: 0.00 MiB

Ok, I think this is pretty convincing evidence that the parallel jobs copy stuff around and don't just move memory pointers. That's a problem since the raw data for all subjects will be a couple of Gs and then they get bounced around even more. So we should find a way to use shared memory between the processes.


In [18]:
processes = [mp.Process(target=par_link, args=(data_dict, i, 'euclidean', 'stability_maps')) for i in xrange(10)]

In [19]:
processes


Out[19]:
[<Process(Process-46, initial)>,
 <Process(Process-47, initial)>,
 <Process(Process-48, initial)>,
 <Process(Process-49, initial)>,
 <Process(Process-50, initial)>,
 <Process(Process-51, initial)>,
 <Process(Process-52, initial)>,
 <Process(Process-53, initial)>,
 <Process(Process-54, initial)>,
 <Process(Process-55, initial)>]

In [ ]: