Multiprocessing and multithreading

Parallelism in python


In [10]:
%%file multihello.py
'''hello from another process
'''
from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('world',))
    p.start()
    p.join()
    

# EOF


Overwriting multihello.py

In [2]:
!python2.7 multihello.py


hello world
  • On Windows: multiprocessing spawns with subprocess.Popen

In [42]:
if __name__ == '__main__':
    from multiprocessing import freeze_support
    freeze_support()
    
    # Then, do multiprocessing stuff...
  • Data parallelism versus task parallelism
  • Multithreading versus multiple threads
  • The global interpreter lock
  • Processes versus threads

Shared memory and shared objects

  • Shared objects: Value and Array

In [3]:
%%file sharedobj.py
'''demonstrate shared objects in multiprocessing
'''
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]
    

# EOF


Overwriting sharedobj.py

In [4]:
!python2.7 sharedobj.py


3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
  • Manager and proxies

In [5]:
%%file sharedproxy.py
'''demonstrate sharing objects by proxy through a manager
'''
from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    l = manager.list(range(10))

    p = Process(target=f, args=(d, l))
    p.start()
    p.join()

    print d
    print l


# EOF


Overwriting sharedproxy.py

In [6]:
!python2.7 sharedproxy.py


{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
  • Working in C with ctypes and numpy

In [7]:
%%file numpyshared.py
'''demonstrating shared objects using numpy and ctypes
'''
import multiprocessing as mp
from multiprocessing import sharedctypes
from numpy import ctypeslib

def fill_arr(arr_view, i):
    arr_view.fill(i)

if __name__ == '__main__':
    ra = sharedctypes.RawArray('i', 4)
    arr = ctypeslib.as_array(ra)
    arr.shape = (2, 2)
    p1 = mp.Process(target=fill_arr, args=(arr[:1, :], 1))
    p2 = mp.Process(target=fill_arr, args=(arr[1:, :], 2))
    p1.start(); p2.start()
    p1.join(); p2.join()
    print arr


Overwriting numpyshared.py

In [8]:
!python2.7 numpyshared.py


[[1 1]
 [2 2]]
  • Issues: threading and locks

Low-level task parallelism: point to point communication

  • Process

In [9]:
%%file mprocess.py
'''demonstrate the process claas
'''
import multiprocessing as mp
from time import sleep
from random import random

def worker(num):
    sleep(2.0 * random())
    name = mp.current_process().name
    print "worker {},name:{}".format(num, name)

if __name__ == '__main__':
    master = mp.current_process().name
    print "Master name: {}".format(master)
    for i in range(2):
        p = mp.Process(target=worker, args=(i,))
        p.start()

    # Close all child processes spawn
    [p.join() for p in mp.active_children()]


Overwriting mprocess.py

In [10]:
!python2.7 mprocess.py


Master name: MainProcess
worker 0,name:Process-1
worker 1,name:Process-2
  • Queue and Pipe

In [11]:
%%file queuepipe.py
'''demonstrate queues and pipes
'''
import multiprocessing as mp
import pickle

def qworker(q):
    v = q.get() # blocking!
    print "queue worker got '{}' from parent".format(v)
    
def pworker(p):
    import pickle  # needed for encapsulation
    msg = 'hello hello hello'
    print "pipe worker sending {!r} to parent".format(msg)
    p.send(msg)
    v = p.recv()
    print "pipe worker got {!r} from parent".format(v)
    print "unpickled to {}".format(pickle.loads(v))

    
if __name__ == '__main__':
    q = mp.Queue()
    p = mp.Process(target=qworker, args=(q,))
    p.start() # blocks at q.get()
    v = 'python rocks!'
    print "putting '{}' on queue".format(v)
    q.put(v)
    p.join()
    print ''

    # The two ends of the pipe: the parent and the child connections
    p_conn, c_conn = mp.Pipe()
    p = mp.Process(target=pworker, args=(c_conn,))
    p.start()
    msg = pickle.dumps([1,2,3],-1)
    print "got {!r} from child".format(p_conn.recv())
    print "sending {!r} to child".format(msg)
    p_conn.send(msg)
    import datetime
    print "\nfinished: {}".format(datetime.date.today())
    p.join()


Overwriting queuepipe.py

In [12]:
!python2.7 queuepipe.py


putting 'python rocks!' on queue
queue worker got 'python rocks!' from parent

pipe worker sending 'hello hello hello' to parent
got 'hello hello hello' from child
sending '\x80\x02]q\x00(K\x01K\x02K\x03e.' to child
pipe worker got '\x80\x02]q\x00(K\x01K\x02K\x03e.' from parent
unpickled to [1, 2, 3]

finished: 2015-07-04
  • Synchronization with Lock and Event

In [13]:
%%file multi_sync.py
'''demonstrating locks
'''
import multiprocessing as mp

def print_lock(lk, i):
    name = mp.current_process().name
    lk.acquire()
    for j in range(5):
        print i, "from process", name
    lk.release()


if __name__ == '__main__':
    lk = mp.Lock()
    ps = [mp.Process(target=print_lock, args=(lk,i)) for i in range(5)]
    [p.start() for p in ps]
    [p.join() for p in ps]


Overwriting multi_sync.py

In [14]:
!python2.7 multi_sync.py


0 from process Process-1
0 from process Process-1
0 from process Process-1
0 from process Process-1
0 from process Process-1
1 from process Process-2
1 from process Process-2
1 from process Process-2
1 from process Process-2
1 from process Process-2
2 from process Process-3
2 from process Process-3
2 from process Process-3
2 from process Process-3
2 from process Process-3
3 from process Process-4
3 from process Process-4
3 from process Process-4
3 from process Process-4
3 from process Process-4
4 from process Process-5
4 from process Process-5
4 from process Process-5
4 from process Process-5
4 from process Process-5

In [15]:
'''events
'''
import multiprocessing as mp

def wait_on_event(e):
    name = mp.current_process().name
    e.wait()
    print name, "finished waiting"    


if __name__ == '__main__':
    e = mp.Event()
    ps = [mp.Process(target=wait_on_event, args=(e,)) for i in range(10)]
    [p.start() for p in ps]
    print "e.is_set()", e.is_set()
    #raw_input("press any key to set event")
    e.set()
    [p.join() for p in ps]


e.is_set() False
Process-7 finished waiting
Process-2 finished waiting
Process-5 finished waiting
Process-1 finished waiting
Process-4 finished waiting
Process-6 finished waiting
Process-8 finished waiting
Process-9 finished waiting
Process-10 finished waiting
Process-3 finished waiting

High-level task parallelism: collective communication

  • The task Pool
  • pipes (apply) and map

In [2]:
import multiprocessing as mp

def random_mean(x):
    import numpy as np
    return round(np.mean(np.random.randint(-x,x+1,10000)), 3)


if __name__ == '__main__':
    # create a pool with cpu_count() procsesses
    p = mp.Pool()
    results = p.map(random_mean, range(1,10))
    print results
    print p.apply(random_mean, [100])
    p.close()
    p.join()


[-0.007, -0.03, -0.005, -0.011, 0.005, 0.001, -0.056, -0.02, 0.058]
0.173
  • Variants: blocking, iterative, unordered, and asynchronous

In [26]:
import multiprocessing as mp

def random_mean_count(x):
    import numpy as np
    return x + round(np.mean(np.random.randint(-x,x+1,10000)), 3)


if __name__ == '__main__':
    # create a pool with cpu_count() procsesses
    p = mp.Pool()
    results = p.imap_unordered(random_mean_count, range(1,10))
    print "[",
    for i in results:
        print i,
        if abs(i) <= 1.0:
            print "...] QUIT"
            break
    list(results)
    p.close()
    p.join()


[ 4.006 8.992 3.016 2.006 5.025 5.957 7.009 0.994 ...] QUIT

In [32]:
import multiprocessing as mp

def random_mean_count(x):
    import numpy as np
    return x + round(np.mean(np.random.randint(-x,x+1,10000)), 3)


if __name__ == '__main__':
    # create a pool with cpu_count() procsesses
    p = mp.Pool()
    results = p.map_async(random_mean_count, range(1,10))
    print "Waiting .",
    i = 0
    while not results.ready():
        if not i%4000:
            print ".",
        i += 1
    print results.get()
    print "\n", p.apply_async(random_mean_count, [100]).get()
    p.close()
    p.join()


Waiting . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . [0.997, 1.979, 2.994, 4.016, 5.031, 5.944, 7.114, 8.066, 9.0]

99.965

Issues: random number generators


In [3]:
import numpy as np

def walk(x, n=100, box=.5, delta=.2):
    "perform a random walk"
    w = np.cumsum(x + np.random.uniform(-delta,delta,n))
    w = np.where(abs(w) > box)[0]
    return w[0] if len(w) else n

N = 10

# run N trials, all starting from x=0
pwalk = np.vectorize(walk)
print pwalk(np.zeros(N))

# run again, using list comprehension instead of ufunc
print [walk(0) for i in range(N)]

# run again, using multiprocessing's map
import multiprocessing as mp
p = mp.Pool()
print p.map(walk, [0]*N)


[31 22 13 24 23 31  2 60 13 35]
[24, 6, 8, 6, 11, 41, 16, 64, 40, 17]
[4, 4, 4, 4, 4, 4, 4, 4, 50, 50]

In [1]:
%%file state.py
"""some good state utilities
"""

def check_pickle(x, dill=False):
    "checks the pickle across a subprocess"
    import pickle
    import subprocess
    if dill:
        import dill as pickle
        pik = "dill"
    else:
        pik = "pickle"
    fail = True
    try:
        _x = pickle.dumps(x)
        fail = False
    finally:
        if fail:
            print "DUMP FAILED"
    msg = "python -c import {0}; print {0}.loads({1})".format(pik,repr(_x))
    print "SUCCESS" if not subprocess.call(msg.split(None,2)) else "LOAD FAILED"

    
def random_seed(s=None):
    "sets the seed for calls to 'random()'"
    import random
    random.seed(s)
    try:
        from numpy import random
        random.seed(s)
    except:
        pass
    return


def random_state(module='random', new=False, seed='!'):
    """return a (optionally manually seeded) random generator

For a given module, return an object that has random number generation (RNG)
methods available.  If new=False, use the global copy of the RNG object.
If seed='!', do not reseed the RNG (using seed=None 'removes' any seeding).
If seed='*', use a seed that depends on the process id (PID); this is useful
for building RNGs that are different across multiple threads or processes.
    """
    import random
    if module == 'random':
        rng = random
    elif not isinstance(module, type(random)):
        # convienence for passing in 'numpy'
        if module == 'numpy': module = 'numpy.random'
        try:
            import importlib
            rng = importlib.import_module(module)
        except ImportError:
            rng = __import__(module, fromlist=module.split('.')[-1:])
    elif module.__name__ == 'numpy': # convienence for passing in numpy
        from numpy import random as rng
    else: rng = module

    _rng = getattr(rng, 'RandomState', None) or \
           getattr(rng, 'Random') # throw error if no rng found
    if new:
        rng = _rng()

    if seed == '!': # special case: don't reset the seed
        return rng
    if seed == '*': # special case: random seeding for multiprocessing
        try:
            try:
                import multiprocessing as mp
            except ImportError:
                import processing as mp
            try:
                seed = mp.current_process().pid
            except AttributeError:
                seed = mp.currentProcess().getPid()
        except:
            seed = 0
        import time
        seed += int(time.time()*1e6)

    # set the random seed (or 'reset' with None)
    rng.seed(seed)
    return rng


# EOF


Overwriting state.py

Issues: serialization

  • Better serialization: multiprocess

In [33]:
import multiprocess

print multiprocess.Pool().map(lambda x:x**2, range(10))


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

EXERCISE: << Either the mystic multi-solve or one of the pathos tests or with rng >>

  • Code-based versus object-based serialization: pp(ft)

In [40]:
%%file runppft.py
'''demonstrate ppft
'''

import ppft

def squared(x):
    return x*x

server = ppft.Server()  # can take 'localhost:8000' or remote:port
result = server.submit(squared, (5,))
result.wait()
print result.finished
print result()


Overwriting runppft.py

In [41]:
!python2.7 runppft.py


True
25

Programming efficiency: pathos

  • Multi-argument map functions
  • Unified API for threading, multiprocessing, and serial and parallel python (pp)

In [2]:
%%file allpool.py
'''demonstrate pool API
'''
import pathos

def sum_squared(x,y):
    return (x+y)**2

x = range(5)                  
y = range(0,10,2)

if __name__ == '__main__':
    sp = pathos.pools.SerialPool()                                          
    pp = pathos.pools.ParallelPool()
    mp = pathos.pools.ProcessPool()
    tp = pathos.pools.ThreadPool()

    for pool in [sp,pp,mp,tp]:
        print pool.map(sum_squared, x, y)
        pool.close()
        pool.join()


Writing allpool.py

In [3]:
!python2.7 allpool.py


[0, 9, 36, 81, 144]
[0, 9, 36, 81, 144]
[0, 9, 36, 81, 144]
[0, 9, 36, 81, 144]
  • Strives for natural programming constructs in parallel code

In [2]:
from itertools import izip


PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    import math
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def sleep_add1(x):
    from time import sleep
    if x < 4: sleep(x/10.0)
    return x+1

def sleep_add2(x):
    from time import sleep
    if x < 4: sleep(x/10.0)
    return x+2

def test_with_multipool(Pool):
    inputs = range(10)
    with Pool() as pool1:
        res1 = pool1.amap(sleep_add1, inputs)
    with Pool() as pool2:
        res2 = pool2.amap(sleep_add2, inputs)

    with Pool() as pool3:
        for number, prime in izip(PRIMES, pool3.imap(is_prime, PRIMES)):
            assert prime if number != PRIMES[-1] else not prime

    assert res1.get() == [i+1 for i in inputs]
    assert res2.get() == [i+2 for i in inputs]
    print "OK"


if __name__ == '__main__':
    from pathos.pools import ProcessPool
    test_with_multipool(ProcessPool)


OK
  • Programming models and hierarchical computing

In [4]:
import pathos
from math import sin, cos

if __name__ == '__main__':
    mp = pathos.pools.ProcessPool()
    tp = pathos.pools.ThreadPool()
    
    print mp.amap(tp.map, [sin, cos], [range(3),range(3)]).get()
    mp.close(); tp.close()
    mp.join(); tp.join()


[[0.0, 0.8414709848078965, 0.9092974268256817], [1.0, 0.5403023058681398, -0.4161468365471424]]
  • Pool caching
  • Not covered: IPython.parallel and scoop

EXERCISE: Let's take another swing at Monte Carlo betting. You'll want to focus on roll.py, trials.py and optimize.py. Can you speed things up with careful placement of a Pool? Are there small modifications to the code that would allow hierarchical parallelism? Can we speed up the calculation, or does parallel computing lose to spin-up overhead? Where are we now hitting the wall?

See: 'solution'

Remote execution

  • Easy: the pp.Server

In [ ]:
localhost>$ ppserver.py -p 8000
  • Even easier: Pool().server in pathos

In [ ]:
>>> def squared(x):
...     return x**2
... 
>>> import pathos
>>> pool = pathos.pools.ParallelPool(nodes=1, servers=('localhost:8000',))
>>> results = pool.map(squared, range(100))
>>> print pathos.pp.stats()
Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
        65 |         65.00 |       0.2004 |     0.003083 | localhost:8000
        35 |         35.00 |       0.0538 |     0.001538 | local
Time elapsed since server creation 21.2711749077
0 active tasks, 1 cores

>>> pool.close()
>>> pool.join()
  • Not covered: rpyc, pyro, and zmq

Related: secure authentication with ssh

  • pathos.secure: connection and tunnel

In [16]:
import pathos
import sys

rhost = 'localhost'
rport = 23

if __name__ == '__main__':
    tunnel = pathos.secure.Tunnel()
    lport = tunnel.connect(rhost, rport)
    print 'SSH Tunnel to:', rhost
    print 'Remote port:', rport
    print 'Local port:', lport
    print 'Press <Enter> to disconnect'
    sys.stdin.readline()
    tunnel.disconnect()


handling pipe response
SSH Tunnel to: localhost
Remote port: 23
Local port: 42376
Press <Enter> to disconnect
Kill ssh pid=63720

In [ ]:
import pathos

launcher = pathos.secure.Pipe()
config = launcher(command='hostname', rhost='localhost', background=False)
launcher.launch()
print launcher.response()
  • Not oovered: paramiko

What about large-scale cluster computing?