In [1]:
from os import getpid, getppid
from time import sleep
def printer(val, wait=0):
sleep(wait)
print('Pid: {}, PPid: {}, Value: {}'
.format(getpid(), getppid(), val))
In [2]:
from multiprocessing import Process
print('Starting demo...')
p = Process(target=printer, args=('hello demo',))
p.start()
In [3]:
proc_list = []
for values in [('immediate', 0), ('delayed', 2), ('eternity', 5)]:
p = Process(target=printer, args=values)
proc_list.append(p)
p.start() # start execution of printer
print('Not waiting for proccesses to finish...')
[p.join() for p in proc_list]
print('After processes...')
In [4]:
from multiprocessing.pool import Pool
with Pool(3) as pool:
pool.map(printer, ['Its', ('A', 5), 'Race'])
# each worker process executes one function
In [5]:
with Pool(2) as pool:
pool.starmap(printer, [('Its',), ('A', 2), ('Race',)])
# one worker will execute 2 functions, one worker will execute the 'slow' function
In [6]:
def pretend_delete_method(provider, vm_name):
print('Pretend delete: {} on {}. (Pid: {})'
.format(vm_name, provider, getpid()))
# Assuming we fetched a list of vm names on providers we want to cleanup...
example_provider_vm_lists = dict(
vmware=['test_vm_1', 'test_vm_2'],
rhv=['test_vm_3', 'test_vm_4'],
osp=['test_vm_5', 'test_vm_6'],
)
In [7]:
# don't hate me for nested comprehension here - building tuples of provider+name
from multiprocessing.pool import ThreadPool
# Threadpool instead of process pool, same interface
with ThreadPool(6) as pool:
pool.starmap(
pretend_delete_method,
[(key, vm)
for key, vms
in example_provider_vm_lists.items()
for vm in vms]
)
In [8]:
# Printing is thread safe, but will sometimes print separate messages on the same line (above)
# Use a lock around print
from multiprocessing import Lock
lock = Lock()
def safe_printing_method(provider, vm_name):
with lock:
print('Pretend delete: {} on {}. (Pid: {})'
.format(vm_name, provider, getpid()))
In [9]:
with ThreadPool(6) as pool:
pool.starmap(
safe_printing_method,
[(key, vm) for key, vms in example_provider_vm_lists.items() for vm in vms])
FIFO stack with put, get, and empty methods
multiprocessing.Queue
In [10]:
from multiprocessing import Manager
from random import randint
# Create instance of manager
manager = Manager()
def multiple_output_method(provider, vm_name, fail_queue):
# random success of called method
if randint(0, 1):
return True
else:
# Store our failure vm on the queue
fail_queue.put(vm_name)
return None
# Create queue object to give to child processes
queue_for_failures = manager.Queue()
with Pool(2) as pool:
results = pool.starmap(
multiple_output_method,
[(key, vm, queue_for_failures)
for key, vms
in example_provider_vm_lists.items()
for vm in vms]
)
print('Results are in: {}'.format(results))
failed_vms = []
# get items from the queue while its not empty
while not queue_for_failures.empty():
failed_vms.append(queue_for_failures.get())
print('Failures are in: {}'.format(failed_vms))