In [131]:
import os
import numpy as np
from itertools import repeat
from multiprocessing import Pool

In [132]:
# setup
files = ['data_yr{0:02d}.nc'.format(i) for i in range(1, 101)] * 10000 # just to make multiprocessing worth it
processed_fmt = 'processed_{file}'
processed_files = []

In [133]:
%%time
# serial/for loop method
for file in files:
    processed_files.append(processed_fmt.format(file=file)) # essentially attach "processed_" as a prefix
print(processed_files[0:10])


['processed_data_yr01.nc', 'processed_data_yr02.nc', 'processed_data_yr03.nc', 'processed_data_yr04.nc', 'processed_data_yr05.nc', 'processed_data_yr06.nc', 'processed_data_yr07.nc', 'processed_data_yr08.nc', 'processed_data_yr09.nc', 'processed_data_yr10.nc']
CPU times: user 848 ms, sys: 5 ms, total: 853 ms
Wall time: 847 ms

In [134]:
%%time
# multiprocessing pool method
def process_file(file): # define a function wrapper
    return processed_fmt.format(file=file) # do your stuff in here

pool = Pool(4)
processed_files = pool.map(process_file, files)
pool.close()
pool.join()
print(processed_files[0:10])


['processed_data_yr01.nc', 'processed_data_yr02.nc', 'processed_data_yr03.nc', 'processed_data_yr04.nc', 'processed_data_yr05.nc', 'processed_data_yr06.nc', 'processed_data_yr07.nc', 'processed_data_yr08.nc', 'processed_data_yr09.nc', 'processed_data_yr10.nc']
CPU times: user 249 ms, sys: 218 ms, total: 467 ms
Wall time: 687 ms

In [135]:
%%time
# what if you need to use more than one input? dump all your arguments into a tuple or list
file_fmt = '{prefix}_{file}.{suffix}'

prefix = 'renamed'
suffix = 'txt'
arg_tup = zip(files, repeat(prefix), repeat(suffix)) # see cell below to see what this does

def rename_file(arg_tup):
    file = arg_tup[0] # unravel the arg tuple
    prefix = arg_tup[1] # just know what you put in there
    suffix = arg_tup[2] # in order
    return file_fmt.format(prefix=prefix, file=file[:-3], suffix=suffix) # just removing the .nc

pool = Pool(4)
processed_files = pool.map(rename_file, arg_tup)
pool.close()
pool.join()
print(processed_files[0:10])


['renamed_data_yr01.txt', 'renamed_data_yr02.txt', 'renamed_data_yr03.txt', 'renamed_data_yr04.txt', 'renamed_data_yr05.txt', 'renamed_data_yr06.txt', 'renamed_data_yr07.txt', 'renamed_data_yr08.txt', 'renamed_data_yr09.txt', 'renamed_data_yr10.txt']
CPU times: user 670 ms, sys: 283 ms, total: 953 ms
Wall time: 1.25 s

In [136]:
tuple(zip(['some', 'thing', 'special'], repeat(prefix), repeat(suffix)))


Out[136]:
(('some', 'renamed', 'txt'),
 ('thing', 'renamed', 'txt'),
 ('special', 'renamed', 'txt'))

In [137]:
from mpi4py import MPI # now for across nodes and processors which is more complicated
from mpi4py.MPI import ANY_SOURCE

In [138]:
def rename_file(arg_tup):
    file = arg_tup[0] # unravel the arg tuple
    prefix = arg_tup[1] # just know what you put in there
    suffix = arg_tup[2] # in order
    return file_fmt.format(prefix=prefix, file=file[:-3], suffix=suffix) # just removing the .nc
    
def rename_file_overarching(files): # change it a bit
    prefix = 'renamed'
    suffix = 'txt'
    arg_tup = zip(files, repeat(prefix), repeat(suffix))

    pool = Pool(4)
    processed_files = pool.map(rename_file, arg_tup)
    pool.close()
    pool.join()
    return processed_files

start = datetime.datetime.utcnow()

comm = MPI.COMM_WORLD # init NECESSARY
rank = comm.Get_rank() # each node gets a rank from 0 to however nodes you have NECESSARY
size = comm.Get_size() # size is number of nodes NECESSARY

rank0_files = np.zeros(1) # rank0 is master node MUST BE IN NUMPY ARRAY
not_rank0_files = np.zeros(1) # other ranks are other nodes MUST BE IN NUMPY ARRAY

files_to_process_per_node = np.array_split(files, size) # split up the work
# if there are 10 files and 2 nodes, the first 5 files goes to rank0/node1 and the other 5 goes to rank1/node2
# if it's 11 file files and 2 nodes, the first 6 files goes to rank0/node1 and the other 5 goes to rank1/node2
# see np.array_split doc; however I dont have access to more than one node and when I do, it crashes >.>

renamed_file = rename_file_overarching(files_to_process_per_node[rank]) # your function(files_to_parallel_process[node_rank])
# ^^^ every node runs this step

if rank == 0: # now here is where the master node0 does all the preparation to receive and receiving
    renamed_files = np.array([]) # prepare empty list for appending/attachment
    renamed_files = np.append(renamed_files, renamed_file) # attach node0 results
    for i in range(1, size): # if I used more than one node...
        comm.Recv(not_rank0_files, ANY_SOURCE) # receive from "LOOK AT ME HERE" down there
        renamed_files = np.append(renamed_file, not_rank0_files) # attach to the original node0 results
else:
    # all other process send their result
    print('huh more than one node? should not have happened because this is only one comp')
    comm.Send(renamed_file) # send to node0 LOOK AT ME HERE
    
end = datetime.datetime.utcnow()
print(end - start)
print(renamed_files[0:10])


0:00:13.806838
['renamed_data_yr01.txt' 'renamed_data_yr02.txt' 'renamed_data_yr03.txt'
 'renamed_data_yr04.txt' 'renamed_data_yr05.txt' 'renamed_data_yr06.txt'
 'renamed_data_yr07.txt' 'renamed_data_yr08.txt' 'renamed_data_yr09.txt'
 'renamed_data_yr10.txt']

In [ ]: