ParallelHelpers

Here I'm including a set of helper functions for running things in parallel.

Naive method which just splits the patients and runs a set of backgrounded jobs. Works ok, but does not ballence across cores, so one subprocess may finish way slower than the others.


In [5]:
import os as os

In [1]:
def generate_super_script(patients, call_fx, script_dir='./', remote_out='./', 
                          threads=16):
    '''
    Parses calls out to one script for each process you want to run.
    Then builds a driver script to run them all in parallel.
    Saves the files to the output directory.  The threads are names 
    thread_n.sh and the driver is named driver.sh.
    '''
    if not os.path.isdir(script_dir):
        os.makedirs(script_dir)
    
    for i in range(threads):
        calls = ';\n\n'.join([call_fx(pat) for pat in patients[i::threads]])
        f = open('{}/thread_{}.sh'.format(script_dir, i), 'wb')
        f.write(calls)
        f.close()
        
    super_script = 'mkdir {}\n'.format(remote_out)
    threads = ['bash thread_{}.sh &'.format(i) for i in range(threads)]
    super_script += '\n'.join(threads)
    
    f = open('{}/driver.sh'.format(script_dir), 'wb')
    f.write(super_script)
    f.close()

In [4]:
def generate_sge(patients, call_fx, script_dir='./', threads=16):
    print 'Writing scripts and SGE driver to {}'.format(script_dir)
    if not os.path.isdir(script_dir):
        os.makedirs(script_dir)

    for i,pat in enumerate(patients):
        call = call_fx(pat)
        f = open('{}/thread_{}.sh'.format(script_dir, i+1), 'wb')
        f.write(call)
        f.close()
    sge =  ['#! /bin/csh']
    sge += ['#$ -S /bin/csh']
    sge += ['#$ -o {}'.format(script_dir)]
    sge += ['#$ -e {}'.format(script_dir)]
    sge += ['#$ -cwd']
    sge += ['#$ -t 1-{}'.format(len(patients) + 1)]
    sge += ['#$ -tc {}'.format(threads)]
    
    sge += ['hostname']
    sge += ['date']
    sge += ['bash {}/thread_$SGE_TASK_ID.sh'.format(script_dir)]
    sge += ['date']
    
    sge = '\n'.join(sge)
    
    f = open('{}/sge.sh'.format(script_dir), 'wb')
    f.write(sge)
    f.close()