In [131]:
import os
import numpy as np
from itertools import repeat
from multiprocessing import Pool
In [132]:
# setup
files = ['data_yr{0:02d}.nc'.format(i) for i in range(1, 101)] * 10000 # just to make multiprocessing worth it
processed_fmt = 'processed_{file}'
processed_files = []
In [133]:
%%time
# serial/for loop method
for file in files:
processed_files.append(processed_fmt.format(file=file)) # essentially attach "processed_" as a prefix
print(processed_files[0:10])
In [134]:
%%time
# multiprocessing pool method
def process_file(file): # define a function wrapper
return processed_fmt.format(file=file) # do your stuff in here
pool = Pool(4)
processed_files = pool.map(process_file, files)
pool.close()
pool.join()
print(processed_files[0:10])
In [135]:
%%time
# what if you need to use more than one input? dump all your arguments into a tuple or list
file_fmt = '{prefix}_{file}.{suffix}'
prefix = 'renamed'
suffix = 'txt'
arg_tup = zip(files, repeat(prefix), repeat(suffix)) # see cell below to see what this does
def rename_file(arg_tup):
file = arg_tup[0] # unravel the arg tuple
prefix = arg_tup[1] # just know what you put in there
suffix = arg_tup[2] # in order
return file_fmt.format(prefix=prefix, file=file[:-3], suffix=suffix) # just removing the .nc
pool = Pool(4)
processed_files = pool.map(rename_file, arg_tup)
pool.close()
pool.join()
print(processed_files[0:10])
In [136]:
tuple(zip(['some', 'thing', 'special'], repeat(prefix), repeat(suffix)))
Out[136]:
In [137]:
from mpi4py import MPI # now for across nodes and processors which is more complicated
from mpi4py.MPI import ANY_SOURCE
In [138]:
def rename_file(arg_tup):
file = arg_tup[0] # unravel the arg tuple
prefix = arg_tup[1] # just know what you put in there
suffix = arg_tup[2] # in order
return file_fmt.format(prefix=prefix, file=file[:-3], suffix=suffix) # just removing the .nc
def rename_file_overarching(files): # change it a bit
prefix = 'renamed'
suffix = 'txt'
arg_tup = zip(files, repeat(prefix), repeat(suffix))
pool = Pool(4)
processed_files = pool.map(rename_file, arg_tup)
pool.close()
pool.join()
return processed_files
start = datetime.datetime.utcnow()
comm = MPI.COMM_WORLD # init NECESSARY
rank = comm.Get_rank() # each node gets a rank from 0 to however nodes you have NECESSARY
size = comm.Get_size() # size is number of nodes NECESSARY
rank0_files = np.zeros(1) # rank0 is master node MUST BE IN NUMPY ARRAY
not_rank0_files = np.zeros(1) # other ranks are other nodes MUST BE IN NUMPY ARRAY
files_to_process_per_node = np.array_split(files, size) # split up the work
# if there are 10 files and 2 nodes, the first 5 files goes to rank0/node1 and the other 5 goes to rank1/node2
# if it's 11 file files and 2 nodes, the first 6 files goes to rank0/node1 and the other 5 goes to rank1/node2
# see np.array_split doc; however I dont have access to more than one node and when I do, it crashes >.>
renamed_file = rename_file_overarching(files_to_process_per_node[rank]) # your function(files_to_parallel_process[node_rank])
# ^^^ every node runs this step
if rank == 0: # now here is where the master node0 does all the preparation to receive and receiving
renamed_files = np.array([]) # prepare empty list for appending/attachment
renamed_files = np.append(renamed_files, renamed_file) # attach node0 results
for i in range(1, size): # if I used more than one node...
comm.Recv(not_rank0_files, ANY_SOURCE) # receive from "LOOK AT ME HERE" down there
renamed_files = np.append(renamed_file, not_rank0_files) # attach to the original node0 results
else:
# all other process send their result
print('huh more than one node? should not have happened because this is only one comp')
comm.Send(renamed_file) # send to node0 LOOK AT ME HERE
end = datetime.datetime.utcnow()
print(end - start)
print(renamed_files[0:10])
In [ ]: