In [18]:
%autosave 100
Goals for the day:
Calcul Canada is using moab because why not. So we can submit jobs with the msub
command to moab. Or we can submit jobs through Torque using the qsub
command. Why Torque uses the same commands as the SGE is beyond me but in any case I will have to use system calls to spawn the jobs.
Interestingly, I could do all of that through ssh from my workstation and save the trouble of loging into the clusters directly. This is really nice because then I don't need to start up putty or my vbox. The -t
option to ssh
even allows me to stream the terminal output.
In [4]:
import subprocess
In [11]:
print(subprocess.check_output(['ssh', '-t', 'gui', 'ls', '-lht']))
Ok, cool that works well.
Now I need a python module that loads the abide data and then spawns a couple of parallel jobs for computing the linkage based on the different networks that I loaded. For the scale 10 Cambridge template that would be 10 jobs, for scale 50 it would be 50 and so on. It's probably not really worth it, seeing that there could be a maximum of 8 parallel jobs on a node but I still want to do it.
In [19]:
import numpy as np
import nibabel as nib
import multiprocessing as mp
In [14]:
# Build the function
def link_network(data, network, method='euclidean', metric=None):
"""
Create linkage based on maps of one network. If the data array
is oriented the wrong way, this can potentially exceed the available
memory because the distance matrix will be computed across subjects
and not across voxels.
"""
distance = dist.squareform(dist.pdist(array_dict[metric][..., n_id], 'euclidean'))
linkage = clh.linkage(eucl, method='ward')
return distance, linkage
In [20]:
a = np.random.random((10,10))
In [25]:
b = mp.Array('d',a.flatten())
In [31]:
b.acquire()
Out[31]:
When spawning multiple subprocesses in the node, I don't want to create copies of the input data because it is rather large. The problem seems to be that the multiprocessing array is an absolute bitch to work with, only accepts 1 dimensional arrays and I have no idea how to read the data in the first place.
Ok, here is a nice example. This same page also says that I probably don't have to worry about data being copied at all because things will be forked on linux. I think I'll try that with a quick test.
Goal: To run the linkage function on multiple processes while keeping track of memory consumption (if this is possible).
Implementation: First I need to wrap my methods into importable packages. This is helpful in general and also the only way to run the memory profiler.
In [7]:
# Imports
import brainbox as bb
import multiprocessing as mp
In [8]:
# Load extensions
%load_ext memory_profiler
In [9]:
# Paths
debug_path = '/data1/abide/Test/Out/Debug/All'
In [10]:
%memit file_dict = bb.fileOps.grab_files(debug_path, '.nii.gz', 'stability_maps')
In [11]:
%memit data_dict = bb.fileOps.read_files(file_dict)
In [6]:
def par_link(conf_tuple):
data_dict, network, method, metric = conf_tuple
distance, linkage = bb.dataOps.calc_link(data_dict, network, method='euclidean', metric='stability_maps')
return distance, linkage
In [7]:
# Prepare conf
conf_list = list()
for i in range(10):
conf_list.append((data_dict, i, 'euclidean', 'stability_maps'))
In [8]:
# Set up parallel run
def parallel_run(workers, conf_list):
p = mp.Pool(workers)
par_results = p.map(par_link, conf_list)
return par_results
In [9]:
# Set up sequential run
def sequential_run(conf_list):
seq_results = []
for conf in conf_list:
seq_results.append(par_link(conf))
return seq_results
In [14]:
# Sequential
%memit s_results = sequential_run(conf_list)
In [15]:
%memit print('hello')
In [16]:
# Parallel
%memit p_results = parallel_run(4, conf_list)
In [17]:
%memit print('hello')
Ok, I think this is pretty convincing evidence that the parallel jobs copy stuff around and don't just move memory pointers. That's a problem since the raw data for all subjects will be a couple of Gs and then they get bounced around even more. So we should find a way to use shared memory between the processes.
In [18]:
processes = [mp.Process(target=par_link, args=(data_dict, i, 'euclidean', 'stability_maps')) for i in xrange(10)]
In [19]:
processes
Out[19]:
In [ ]: