Parallelism in python
In [10]:
%%file multihello.py
'''hello from another process
'''
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('world',))
p.start()
p.join()
# EOF
In [2]:
!python2.7 multihello.py
multiprocessing
spawns with subprocess.Popen
In [42]:
if __name__ == '__main__':
from multiprocessing import freeze_support
freeze_support()
# Then, do multiprocessing stuff...
Shared memory and shared objects
Value
and Array
In [3]:
%%file sharedobj.py
'''demonstrate shared objects in multiprocessing
'''
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print num.value
print arr[:]
# EOF
In [4]:
!python2.7 sharedobj.py
Manager
and proxies
In [5]:
%%file sharedproxy.py
'''demonstrate sharing objects by proxy through a manager
'''
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print d
print l
# EOF
In [6]:
!python2.7 sharedproxy.py
ctypes
and numpy
In [7]:
%%file numpyshared.py
'''demonstrating shared objects using numpy and ctypes
'''
import multiprocessing as mp
from multiprocessing import sharedctypes
from numpy import ctypeslib
def fill_arr(arr_view, i):
arr_view.fill(i)
if __name__ == '__main__':
ra = sharedctypes.RawArray('i', 4)
arr = ctypeslib.as_array(ra)
arr.shape = (2, 2)
p1 = mp.Process(target=fill_arr, args=(arr[:1, :], 1))
p2 = mp.Process(target=fill_arr, args=(arr[1:, :], 2))
p1.start(); p2.start()
p1.join(); p2.join()
print arr
In [8]:
!python2.7 numpyshared.py
Low-level task parallelism: point to point communication
Process
In [9]:
%%file mprocess.py
'''demonstrate the process claas
'''
import multiprocessing as mp
from time import sleep
from random import random
def worker(num):
sleep(2.0 * random())
name = mp.current_process().name
print "worker {},name:{}".format(num, name)
if __name__ == '__main__':
master = mp.current_process().name
print "Master name: {}".format(master)
for i in range(2):
p = mp.Process(target=worker, args=(i,))
p.start()
# Close all child processes spawn
[p.join() for p in mp.active_children()]
In [10]:
!python2.7 mprocess.py
Queue
and Pipe
In [11]:
%%file queuepipe.py
'''demonstrate queues and pipes
'''
import multiprocessing as mp
import pickle
def qworker(q):
v = q.get() # blocking!
print "queue worker got '{}' from parent".format(v)
def pworker(p):
import pickle # needed for encapsulation
msg = 'hello hello hello'
print "pipe worker sending {!r} to parent".format(msg)
p.send(msg)
v = p.recv()
print "pipe worker got {!r} from parent".format(v)
print "unpickled to {}".format(pickle.loads(v))
if __name__ == '__main__':
q = mp.Queue()
p = mp.Process(target=qworker, args=(q,))
p.start() # blocks at q.get()
v = 'python rocks!'
print "putting '{}' on queue".format(v)
q.put(v)
p.join()
print ''
# The two ends of the pipe: the parent and the child connections
p_conn, c_conn = mp.Pipe()
p = mp.Process(target=pworker, args=(c_conn,))
p.start()
msg = pickle.dumps([1,2,3],-1)
print "got {!r} from child".format(p_conn.recv())
print "sending {!r} to child".format(msg)
p_conn.send(msg)
import datetime
print "\nfinished: {}".format(datetime.date.today())
p.join()
In [12]:
!python2.7 queuepipe.py
Lock
and Event
In [13]:
%%file multi_sync.py
'''demonstrating locks
'''
import multiprocessing as mp
def print_lock(lk, i):
name = mp.current_process().name
lk.acquire()
for j in range(5):
print i, "from process", name
lk.release()
if __name__ == '__main__':
lk = mp.Lock()
ps = [mp.Process(target=print_lock, args=(lk,i)) for i in range(5)]
[p.start() for p in ps]
[p.join() for p in ps]
In [14]:
!python2.7 multi_sync.py
In [15]:
'''events
'''
import multiprocessing as mp
def wait_on_event(e):
name = mp.current_process().name
e.wait()
print name, "finished waiting"
if __name__ == '__main__':
e = mp.Event()
ps = [mp.Process(target=wait_on_event, args=(e,)) for i in range(10)]
[p.start() for p in ps]
print "e.is_set()", e.is_set()
#raw_input("press any key to set event")
e.set()
[p.join() for p in ps]
High-level task parallelism: collective communication
Pool
apply
) and map
In [2]:
import multiprocessing as mp
def random_mean(x):
import numpy as np
return round(np.mean(np.random.randint(-x,x+1,10000)), 3)
if __name__ == '__main__':
# create a pool with cpu_count() procsesses
p = mp.Pool()
results = p.map(random_mean, range(1,10))
print results
print p.apply(random_mean, [100])
p.close()
p.join()
In [26]:
import multiprocessing as mp
def random_mean_count(x):
import numpy as np
return x + round(np.mean(np.random.randint(-x,x+1,10000)), 3)
if __name__ == '__main__':
# create a pool with cpu_count() procsesses
p = mp.Pool()
results = p.imap_unordered(random_mean_count, range(1,10))
print "[",
for i in results:
print i,
if abs(i) <= 1.0:
print "...] QUIT"
break
list(results)
p.close()
p.join()
In [32]:
import multiprocessing as mp
def random_mean_count(x):
import numpy as np
return x + round(np.mean(np.random.randint(-x,x+1,10000)), 3)
if __name__ == '__main__':
# create a pool with cpu_count() procsesses
p = mp.Pool()
results = p.map_async(random_mean_count, range(1,10))
print "Waiting .",
i = 0
while not results.ready():
if not i%4000:
print ".",
i += 1
print results.get()
print "\n", p.apply_async(random_mean_count, [100]).get()
p.close()
p.join()
Issues: random
number generators
In [3]:
import numpy as np
def walk(x, n=100, box=.5, delta=.2):
"perform a random walk"
w = np.cumsum(x + np.random.uniform(-delta,delta,n))
w = np.where(abs(w) > box)[0]
return w[0] if len(w) else n
N = 10
# run N trials, all starting from x=0
pwalk = np.vectorize(walk)
print pwalk(np.zeros(N))
# run again, using list comprehension instead of ufunc
print [walk(0) for i in range(N)]
# run again, using multiprocessing's map
import multiprocessing as mp
p = mp.Pool()
print p.map(walk, [0]*N)
In [1]:
%%file state.py
"""some good state utilities
"""
def check_pickle(x, dill=False):
"checks the pickle across a subprocess"
import pickle
import subprocess
if dill:
import dill as pickle
pik = "dill"
else:
pik = "pickle"
fail = True
try:
_x = pickle.dumps(x)
fail = False
finally:
if fail:
print "DUMP FAILED"
msg = "python -c import {0}; print {0}.loads({1})".format(pik,repr(_x))
print "SUCCESS" if not subprocess.call(msg.split(None,2)) else "LOAD FAILED"
def random_seed(s=None):
"sets the seed for calls to 'random()'"
import random
random.seed(s)
try:
from numpy import random
random.seed(s)
except:
pass
return
def random_state(module='random', new=False, seed='!'):
"""return a (optionally manually seeded) random generator
For a given module, return an object that has random number generation (RNG)
methods available. If new=False, use the global copy of the RNG object.
If seed='!', do not reseed the RNG (using seed=None 'removes' any seeding).
If seed='*', use a seed that depends on the process id (PID); this is useful
for building RNGs that are different across multiple threads or processes.
"""
import random
if module == 'random':
rng = random
elif not isinstance(module, type(random)):
# convienence for passing in 'numpy'
if module == 'numpy': module = 'numpy.random'
try:
import importlib
rng = importlib.import_module(module)
except ImportError:
rng = __import__(module, fromlist=module.split('.')[-1:])
elif module.__name__ == 'numpy': # convienence for passing in numpy
from numpy import random as rng
else: rng = module
_rng = getattr(rng, 'RandomState', None) or \
getattr(rng, 'Random') # throw error if no rng found
if new:
rng = _rng()
if seed == '!': # special case: don't reset the seed
return rng
if seed == '*': # special case: random seeding for multiprocessing
try:
try:
import multiprocessing as mp
except ImportError:
import processing as mp
try:
seed = mp.current_process().pid
except AttributeError:
seed = mp.currentProcess().getPid()
except:
seed = 0
import time
seed += int(time.time()*1e6)
# set the random seed (or 'reset' with None)
rng.seed(seed)
return rng
# EOF
Issues: serialization
multiprocess
In [33]:
import multiprocess
print multiprocess.Pool().map(lambda x:x**2, range(10))
EXERCISE: Try several variants of looping patterns to see if you can speed up a toy password cracker.
See: 'exercise'
pp(ft)
In [40]:
%%file runppft.py
'''demonstrate ppft
'''
import ppft
def squared(x):
return x*x
server = ppft.Server() # can take 'localhost:8000' or remote:port
result = server.submit(squared, (5,))
result.wait()
print result.finished
print result()
In [41]:
!python2.7 runppft.py
Programming efficiency: pathos
map
functionsthreading
, multiprocessing
, and serial and parallel python (pp
)
In [2]:
%%file allpool.py
'''demonstrate pool API
'''
import pathos
def sum_squared(x,y):
return (x+y)**2
x = range(5)
y = range(0,10,2)
if __name__ == '__main__':
sp = pathos.pools.SerialPool()
pp = pathos.pools.ParallelPool()
mp = pathos.pools.ProcessPool()
tp = pathos.pools.ThreadPool()
for pool in [sp,pp,mp,tp]:
print pool.map(sum_squared, x, y)
pool.close()
pool.join()
In [3]:
!python2.7 allpool.py
In [2]:
from itertools import izip
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
import math
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def sleep_add1(x):
from time import sleep
if x < 4: sleep(x/10.0)
return x+1
def sleep_add2(x):
from time import sleep
if x < 4: sleep(x/10.0)
return x+2
def test_with_multipool(Pool):
inputs = range(10)
with Pool() as pool1:
res1 = pool1.amap(sleep_add1, inputs)
with Pool() as pool2:
res2 = pool2.amap(sleep_add2, inputs)
with Pool() as pool3:
for number, prime in izip(PRIMES, pool3.imap(is_prime, PRIMES)):
assert prime if number != PRIMES[-1] else not prime
assert res1.get() == [i+1 for i in inputs]
assert res2.get() == [i+2 for i in inputs]
print "OK"
if __name__ == '__main__':
from pathos.pools import ProcessPool
test_with_multipool(ProcessPool)
In [4]:
import pathos
from math import sin, cos
if __name__ == '__main__':
mp = pathos.pools.ProcessPool()
tp = pathos.pools.ThreadPool()
print mp.amap(tp.map, [sin, cos], [range(3),range(3)]).get()
mp.close(); tp.close()
mp.join(); tp.join()
Pool
cachingIPython.parallel
and scoop
EXERCISE: Let's take another swing at Monte Carlo betting. You'll want to focus on roll.py
, trials.py
and optimize.py
. Can you speed things up with careful placement of a Pool
? Are there small modifications to the code that would allow hierarchical parallelism? Can we speed up the calculation, or does parallel computing lose to spin-up overhead? Where are we now hitting the wall?
See: 'solution'
Remote execution
pp.Server
In [ ]:
localhost>$ ppserver.py -p 8000
Pool().server
in pathos
In [ ]:
>>> def squared(x):
... return x**2
...
>>> import pathos
>>> pool = pathos.pools.ParallelPool(nodes=1, servers=('localhost:8000',))
>>> results = pool.map(squared, range(100))
>>> print pathos.pp.stats()
Job execution statistics:
job count | % of all jobs | job time sum | time per job | job server
65 | 65.00 | 0.2004 | 0.003083 | localhost:8000
35 | 35.00 | 0.0538 | 0.001538 | local
Time elapsed since server creation 21.2711749077
0 active tasks, 1 cores
>>> pool.close()
>>> pool.join()
rpyc
, pyro
, and zmq
Related: secure authentication with ssh
pathos.secure
: connection
and tunnel
In [16]:
import pathos
import sys
rhost = 'localhost'
rport = 23
if __name__ == '__main__':
tunnel = pathos.secure.Tunnel()
lport = tunnel.connect(rhost, rport)
print 'SSH Tunnel to:', rhost
print 'Remote port:', rport
print 'Local port:', lport
print 'Press <Enter> to disconnect'
sys.stdin.readline()
tunnel.disconnect()
In [ ]:
import pathos
launcher = pathos.secure.Pipe()
config = launcher(command='hostname', rhost='localhost', background=False)
launcher.launch()
print launcher.response()
paramiko
What about large-scale cluster computing?