Goal is to work out the basics of how to use multiprocessing and the IPython Parallel
In [ ]:
# http://pymotw.com/2/multiprocessing/basics.html
import multiprocessing
def worker():
"""worker function"""
print 'Worker'
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()
In [ ]:
In [ ]:
#
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
'http://planet.python.org/',
'https://wiki.python.org/moin/LocalUserGroups',
'http://www.python.org/psf/',
'http://docs.python.org/devguide/',
'http://www.python.org/community/awards/'
# etc..
]
def urlopen(url):
try:
return urllib2.urlopen(url)
except:
return None
def run_with_pool(n):
pool = ThreadPool(n)
results = pool.map(urlopen, urls)
pool.close()
pool.join()
return results
In [ ]:
%time [urlopen(url) for url in urls]
In [ ]:
%time run_with_pool(16)
In [ ]:
# Make the Pool of workers
pool = ThreadPool(4)
# Open the urls in their own threads
# and return the results
results = pool.map(urlopen, urls)
#close the pool and wait for the work to finish
pool.close()
pool.join()
results = []
for url in urls:
result = urlopen(url)
results.append(result)
# ------- VERSUS ------- #
# ------- 4 Pool ------- #
pool = ThreadPool(4)
results = pool.map(urlopen, urls)
# ------- 8 Pool ------- #
pool = ThreadPool(8)
results = pool.map(urlopen, urls)
# ------- 13 Pool ------- #
pool = ThreadPool(13)
results = pool.map(urlopen, urls)
# Single thread: 14.4 Seconds
# 4 Pool: 3.1 Seconds
# 8 Pool: 1.4 Seconds
# 13 Pool: 1.3 Seconds
In [ ]:
import multiprocessing
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
from math import factorial
POOL_SIZE = 8
pool = ThreadPool(POOL_SIZE)
inputs = range(100)
results = pool.map_async(factorial, inputs)
I'm surprised that map_async
returns a single AsyncResult object and doesn't seem to provide visibility into the progress of individual parts.
In [ ]:
results.wait()
if results.successful():
print results.get()
In [ ]:
import multiprocessing
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
from math import factorial
def factorial_calc(n):
return (n, factorial(n))
POOL_SIZE = 16
pool = ThreadPool(POOL_SIZE)
inputs = range(10000)
results = pool.imap_unordered(factorial_calc, inputs, 1)
In [ ]:
for (i, result) in enumerate(results):
print (i,)
What happens if there is an exception?
What happens if there is an exception? Answer: need to catch exception in function
In [ ]:
from multiprocessing.dummy import Pool as ThreadPool
from math import factorial
def factorial_calc(n):
try:
return (n, factorial(n))
except Exception, e:
return (n, e)
POOL_SIZE = 16
pool = ThreadPool(POOL_SIZE)
inputs = [9.3] + range(1000)
results = pool.imap_unordered(factorial_calc, inputs, 100)
In [ ]:
for (i, result) in enumerate(results):
try:
print (i, result[0]),
except Exception, e:
print e
In [ ]:
#http://stackoverflow.com/a/7715670/7782
from __future__ import print_function
import sys, time
for i in xrange(0, 101, 10):
print ('\r>> You have finished %d%%' % i, end="")
time.sleep(2)
print
In [ ]:
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import Pool as ProcessPool
from math import factorial
def factorial_calc(n):
try:
return (n, factorial(n))
except Exception, e:
return (n, e)
def calc_factorials(max_int=1000, pool_size=8, threads=True, chunk_size=10):
if threads:
pool = ThreadPool(pool_size)
else:
pool = ProcessPool(pool_size)
results = pool.map(factorial_calc, range(max_int), chunk_size)
pool.close()
pool.join()
return results
In [ ]:
%time results = calc_factorials(5000, pool_size=8, threads=True, chunk_size=10)
In [ ]:
%time results = calc_factorials(20000, pool_size=8, threads=False, chunk_size=10)
In [ ]: