Multiprocessing in Python 3

Threads vs Processes

Thread/Process execution, timing

Direct Thread/Process Instantiation

Thread/Process Pools

Iteration with complex function signatures

Storing/Fetching data with Queues

Threads vs Processes

  • Thread

    • Is bound to processor that python process running on
    • Is controlled by Global Interpreter Lock (GIL)
      • Single python bytecode executed at a time by any thread
  • Process

    • Uses multiple processors
    • Concurrency between threads and processes (local and remote)
    • Ignores GIL

In [1]:
from os import getpid, getppid
from time import sleep

def printer(val, wait=0):
    sleep(wait)
    print('Pid: {}, PPid: {}, Value: {}'
          .format(getpid(), getppid(), val))

Process Instantiation

Let's start with most basic example of spawning new process to run a function


In [2]:
from multiprocessing import Process

print('Starting demo...')
p = Process(target=printer, args=('hello demo',))
p.start()


Starting demo...
Pid: 18625, PPid: 18613, Value: hello demo

Process timing

  • Use printer's delay to see process timing
  • Track multiple process objects
  • Execute code in main process while chile process is running
  • Use Process.join() to wait for processes to finish

In [3]:
proc_list = []
for values in [('immediate', 0), ('delayed', 2), ('eternity', 5)]:
    p = Process(target=printer, args=values)
    proc_list.append(p)
    p.start()  # start execution of printer

print('Not waiting for proccesses to finish...')
    
[p.join() for p in proc_list]

print('After processes...')


Pid: 18628, PPid: 18613, Value: immediate
Not waiting for proccesses to finish...
Pid: 18629, PPid: 18613, Value: delayed
Pid: 18632, PPid: 18613, Value: eternity
After processes...

Process Pool

  • Worker processes instead of direct instantiation
  • Context manager to handle starting/joining child processes
  • Pool.map() works like default python map(f, args) function
  • Pool.map() Does not unpack args

In [4]:
from multiprocessing.pool import Pool

with Pool(3) as pool:
    pool.map(printer, ['Its', ('A', 5), 'Race'])
    # each worker process executes one function


Pid: 18640, PPid: 18613, Value: Its
Pid: 18641, PPid: 18613, Value: ('A', 5)
Pid: 18642, PPid: 18613, Value: Race

Process + args/kwargs iteration with starmap


In [5]:
with Pool(2) as pool:
    pool.starmap(printer, [('Its',), ('A', 2), ('Race',)])
    # one worker will execute 2 functions, one worker will execute the 'slow' function


Pid: 18652, PPid: 18613, Value: Its
Pid: 18652, PPid: 18613, Value: Race
Pid: 18653, PPid: 18613, Value: A

Starmap is the bomb


In [6]:
def pretend_delete_method(provider, vm_name):
    print('Pretend delete: {} on {}. (Pid: {})'
          .format(vm_name, provider, getpid()))    
    
# Assuming we fetched a list of vm names on providers we want to cleanup...
example_provider_vm_lists = dict(
    vmware=['test_vm_1', 'test_vm_2'],
    rhv=['test_vm_3', 'test_vm_4'],
    osp=['test_vm_5', 'test_vm_6'],
)

In [7]:
# don't hate me for nested comprehension here - building tuples of provider+name
from multiprocessing.pool import ThreadPool

# Threadpool instead of process pool, same interface
with ThreadPool(6) as pool:
    pool.starmap(
        pretend_delete_method, 
        [(key, vm) 
         for key, vms 
         in example_provider_vm_lists.items() 
         for vm in vms]
    )


Pretend delete: test_vm_1 on vmware. (Pid: 18613)Pretend delete: test_vm_2 on vmware. (Pid: 18613)
Pretend delete: test_vm_3 on rhv. (Pid: 18613)
Pretend delete: test_vm_4 on rhv. (Pid: 18613)
Pretend delete: test_vm_5 on osp. (Pid: 18613)
Pretend delete: test_vm_6 on osp. (Pid: 18613)

Locking

  • semaphore-type object that can be acquired and released
  • When acquired, only thread that has the lock can run
  • Necessary when using shared objects

In [8]:
# Printing is thread safe, but will sometimes print separate messages on the same line (above)
# Use a lock around print
from multiprocessing import Lock

lock = Lock()
def safe_printing_method(provider, vm_name):
    with lock:
        print('Pretend delete: {} on {}. (Pid: {})'
              .format(vm_name, provider, getpid()))

In [9]:
with ThreadPool(6) as pool:
    pool.starmap(
        safe_printing_method, 
        [(key, vm) for key, vms in example_provider_vm_lists.items() for vm in vms])


Pretend delete: test_vm_1 on vmware. (Pid: 18613)
Pretend delete: test_vm_2 on vmware. (Pid: 18613)
Pretend delete: test_vm_3 on rhv. (Pid: 18613)
Pretend delete: test_vm_4 on rhv. (Pid: 18613)
Pretend delete: test_vm_5 on osp. (Pid: 18613)
Pretend delete: test_vm_6 on osp. (Pid: 18613)

Queues

  • Store data/objects in child thread/processes and retrieve in parent
  • FIFO stack with put, get, and empty methods

  • multiprocessing.Queue

    • cannot be pickled and thus can't be passed to Pool methods
    • can deadlock with improper join use
  • multiprocessing.Manager.Queue
    • is proxy, can be pickled
    • can be shared between processes

In [10]:
from multiprocessing import Manager
from random import randint

# Create instance of manager
manager = Manager()

def multiple_output_method(provider, vm_name, fail_queue):
    # random success of called method
    if randint(0, 1):
        return True
    else:
        # Store our failure vm on the queue
        fail_queue.put(vm_name)
        return None

# Create queue object to give to child processes
queue_for_failures = manager.Queue()
with Pool(2) as pool:
    results = pool.starmap(
        multiple_output_method, 
        [(key, vm, queue_for_failures)
         for key, vms
         in example_provider_vm_lists.items()
         for vm in vms]
    )

print('Results are in: {}'.format(results))

failed_vms = []
# get items from the queue while its not empty
while not queue_for_failures.empty():
    failed_vms.append(queue_for_failures.get())
    
print('Failures are in: {}'.format(failed_vms))


Results are in: [None, True, None, None, None, True]
Failures are in: ['test_vm_1', 'test_vm_4', 'test_vm_3', 'test_vm_5']