In [1]:
from ahh import ext, exp, era

In [2]:
files = ['data_yr{0:02d}.nc'.format(i) for i in range(1, 101)] * 100000 # just to make multiprocessing worth it
processed_fmt = 'processed_{file}'

In [3]:
s = era.dtnow()
processed_files = []
# serial/for loop method
for file in files:
    processed_files.append(processed_fmt.format(file=file)) # essentially attach "processed_" as a prefix
print(processed_files[0:10])
era.clockit(s)


['processed_data_yr01.nc', 'processed_data_yr02.nc', 'processed_data_yr03.nc', 'processed_data_yr04.nc', 'processed_data_yr05.nc', 'processed_data_yr06.nc', 'processed_data_yr07.nc', 'processed_data_yr08.nc', 'processed_data_yr09.nc', 'processed_data_yr10.nc']

0 Days, 0 Hours, 0 Minutes, 12 Seconds Elapsed


In [4]:
s = era.dtnow()
processed_files = []
# parallelizing method
def process_file(file): # define a function wrapper
    return processed_fmt.format(file=file) # do your stuff in here

processed_files = ext.parallelize(process_file, files)
print(processed_files[0:10])
era.clockit(s) # hmm maybe parallelizing isn't worth it in this case?


['processed_data_yr01.nc', 'processed_data_yr02.nc', 'processed_data_yr03.nc', 'processed_data_yr04.nc', 'processed_data_yr05.nc', 'processed_data_yr06.nc', 'processed_data_yr07.nc', 'processed_data_yr08.nc', 'processed_data_yr09.nc', 'processed_data_yr10.nc']

0 Days, 0 Hours, 0 Minutes, 48 Seconds Elapsed


In [5]:
s = era.dtnow()
def square(x, offset=0): # now what if the function requires you to input two arguments?
    return x ** 2 + offset

def square_parallelize(x_and_offset): # need to redefine function
    return x_and_offset[0] ** 2 + x_and_offset[1]

numbers = [1, 2, 3]
offset = 1
processed_files = ext.parallelize(square_parallelize, numbers, arg2=2) # input second argument
processed_files
era.clockit(s)


---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-5-e6f62c4f2904> in <module>()
      8 numbers = [1, 2, 3]
      9 offset = 1
---> 10 processed_files = ext.parallelize(square_parallelize, numbers, arg2=2) # input second argument
     11 processed_files
     12 era.clockit(s)

/mnt/c/Users/Solactus/Google Drive/Bash/ahh/ahh/ext.py in parallelize(function, alist, nthreads, arg2, arg3, arg4)
    810         args = alist
    811 
--> 812     pool = Pool(nthreads)
    813     output = pool.map(function, args)
    814     pool.close()

/home/solactus/anaconda3/lib/python3.5/multiprocessing/context.py in Pool(self, processes, initializer, initargs, maxtasksperchild)
    116         from .pool import Pool
    117         return Pool(processes, initializer, initargs, maxtasksperchild,
--> 118                     context=self.get_context())
    119 
    120     def RawValue(self, typecode_or_type, *args):

/home/solactus/anaconda3/lib/python3.5/multiprocessing/pool.py in __init__(self, processes, initializer, initargs, maxtasksperchild, context)
    166         self._processes = processes
    167         self._pool = []
--> 168         self._repopulate_pool()
    169 
    170         self._worker_handler = threading.Thread(

/home/solactus/anaconda3/lib/python3.5/multiprocessing/pool.py in _repopulate_pool(self)
    231             w.name = w.name.replace('Process', 'PoolWorker')
    232             w.daemon = True
--> 233             w.start()
    234             util.debug('added worker')
    235 

/home/solactus/anaconda3/lib/python3.5/multiprocessing/process.py in start(self)
    103                'daemonic processes are not allowed to have children'
    104         _cleanup()
--> 105         self._popen = self._Popen(self)
    106         self._sentinel = self._popen.sentinel
    107         _children.add(self)

/home/solactus/anaconda3/lib/python3.5/multiprocessing/context.py in _Popen(process_obj)
    265         def _Popen(process_obj):
    266             from .popen_fork import Popen
--> 267             return Popen(process_obj)
    268 
    269     class SpawnProcess(process.BaseProcess):

/home/solactus/anaconda3/lib/python3.5/multiprocessing/popen_fork.py in __init__(self, process_obj)
     18         sys.stderr.flush()
     19         self.returncode = None
---> 20         self._launch(process_obj)
     21 
     22     def duplicate_for_child(self, fd):

/home/solactus/anaconda3/lib/python3.5/multiprocessing/popen_fork.py in _launch(self, process_obj)
     65         code = 1
     66         parent_r, child_w = os.pipe()
---> 67         self.pid = os.fork()
     68         if self.pid == 0:
     69             try:

OSError: [Errno 12] Cannot allocate memory

In [ ]: