Goal is to work out the basics of how to use multiprocessing and the IPython Parallel


In [ ]:
# http://pymotw.com/2/multiprocessing/basics.html

import multiprocessing

def worker():
    """worker function"""
    print 'Worker'
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

In [ ]:


In [ ]:
# 

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
    'http://www.python.org', 
    'http://www.python.org/about/',
    'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
    'http://www.python.org/doc/',
    'http://www.python.org/download/',
    'http://www.python.org/getit/',
    'http://www.python.org/community/',
    'https://wiki.python.org/moin/',
    'http://planet.python.org/',
    'https://wiki.python.org/moin/LocalUserGroups',
    'http://www.python.org/psf/',
    'http://docs.python.org/devguide/',
    'http://www.python.org/community/awards/'
    # etc.. 
]

def urlopen(url):
    try:
        return urllib2.urlopen(url)
    except:
        return None

def run_with_pool(n):
    pool = ThreadPool(n) 
    results = pool.map(urlopen, urls)
    pool.close()
    pool.join()
    return results

In [ ]:
%time [urlopen(url) for url in urls]

In [ ]:
%time run_with_pool(16)

In [ ]:
# Make the Pool of workers
pool = ThreadPool(4) 
# Open the urls in their own threads
# and return the results
results = pool.map(urlopen, urls)
#close the pool and wait for the work to finish 
pool.close() 
pool.join() 

results = [] 
for url in urls:
    result = urlopen(url)
    results.append(result)

# ------- VERSUS ------- # 


# ------- 4 Pool ------- # 
pool = ThreadPool(4) 
results = pool.map(urlopen, urls)

# ------- 8 Pool ------- # 

pool = ThreadPool(8) 
results = pool.map(urlopen, urls)

# ------- 13 Pool ------- # 

pool = ThreadPool(13) 
results = pool.map(urlopen, urls)




# 						Single thread:  14.4 Seconds 
# 						       4 Pool:   3.1 Seconds
# 						       8 Pool:   1.4 Seconds
# 						      13 Pool:   1.3 Seconds

Asynchronous approaches


In [ ]:
import multiprocessing
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

from math import factorial

POOL_SIZE = 8
pool = ThreadPool(POOL_SIZE)
inputs = range(100)

results = pool.map_async(factorial, inputs)

I'm surprised that map_async returns a single AsyncResult object and doesn't seem to provide visibility into the progress of individual parts.


In [ ]:
results.wait()
if results.successful():
    print results.get()

pool.imap_unordered


In [ ]:
import multiprocessing
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

from math import factorial

def factorial_calc(n):
    return (n, factorial(n))

POOL_SIZE = 16
pool = ThreadPool(POOL_SIZE)
inputs = range(10000)

results = pool.imap_unordered(factorial_calc, inputs, 1)

In [ ]:
for (i, result) in enumerate(results):
    print (i,)

What happens if there is an exception?

What happens if there is an exception? Answer: need to catch exception in function


In [ ]:
from multiprocessing.dummy import Pool as ThreadPool
from math import factorial

def factorial_calc(n):
    try:
        return (n, factorial(n))
    except Exception, e:
        return (n, e)

POOL_SIZE = 16
pool = ThreadPool(POOL_SIZE)
inputs = [9.3] + range(1000)

results = pool.imap_unordered(factorial_calc, inputs, 100)

In [ ]:
for (i, result) in enumerate(results):
    try:
        print (i, result[0]), 
    except Exception, e:
        print e

In [ ]:
#http://stackoverflow.com/a/7715670/7782

from __future__ import print_function

import sys, time

for i in xrange(0, 101, 10):
  print ('\r>> You have finished %d%%' % i, end="")
  time.sleep(2)
print

Threads vs Processes


In [ ]:
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import Pool as ProcessPool

from math import factorial

def factorial_calc(n):
    try:
        return (n, factorial(n))
    except Exception, e:

        return (n, e)

def calc_factorials(max_int=1000, pool_size=8, threads=True, chunk_size=10):
    
    if threads:
        pool = ThreadPool(pool_size)
    else:
        pool = ProcessPool(pool_size)
        
    results = pool.map(factorial_calc, range(max_int), chunk_size)
    pool.close()
    pool.join()
    return results

In [ ]:
%time results = calc_factorials(5000, pool_size=8, threads=True, chunk_size=10)

In [ ]:
%time results = calc_factorials(20000, pool_size=8, threads=False, chunk_size=10)

IPython Parallel


In [ ]: