In [24]:
def count_down(n):
while n > 0:
n-=1
COUNT = 500000000
Suppose we have a CPU bound function
In [25]:
import time
start = time.time()
count_down(COUNT)
end = time.time()
nothing = end-start
print(nothing)
In [26]:
from threading import Thread
# Setup the worker functions
t1 = Thread(target=count_down, args=(COUNT//2,))
t2 = Thread(target=count_down, args=(COUNT//2,))
start = time.time()
t1.start; t2.start()
t1.join; t2.join()
end = time.time()
two_thread = end-start
print "Nothing:\t%.4f" % nothing
print "2 Threads:\t%.4f" % two_thread
In [27]:
NUM_THREADS = 10
threads = [Thread(target=count_down, args=(COUNT//NUM_THREADS,)) for x in range(NUM_THREADS)]
start = time.time()
# Run thread pool
for t in threads:
t.start()
# Wait for the completed threads to exit
for t in threads:
t.join()
end = time.time()
multi_thread = end - start
print "Nothing:\t%.4f" % nothing
print "2 Threads:\t%.4f" % two_thread
print "%d Threads:\t%.4f" % (NUM_THREADS, multi_thread)
In [46]:
import os
a = 0
newpid = os.fork()
if newpid == 0:
print 'A new child (%d) is born!' % os.getpid( )
a+=1
print "The value of a in the child is %d" % a
os._exit(0)
else:
pids = (os.getpid(), newpid)
time.sleep(5)
print "parent: %d, child: %d" % pids
print "The value of a in the parent is %d" % a
In [47]:
import os
import time
start = time.time()
newpid = os.fork()
if newpid==0:
count_down(COUNT/2)
os._exit(0)
else:
count_down(COUNT/2)
end = time.time()
forking = end-start
print "Nothing:\t%.4f" % nothing
print "Forking:\t%.4f" % forking
In [30]:
import multiprocessing as mp
import random
import string
start = time.time()
p1 = mp.Process(target=count_down, args=(COUNT//2,))
p2 = mp.Process(target=count_down, args=(COUNT//2,))
p1.start(); p2.start()
p1.join(); p2.join()
end = time.time()
two_proc = end - start
print "Nothing:\t%.4f" % nothing
print "Forking:\t%.4f" % forking
print "2 Processes:\t%.4f" % two_proc
In [31]:
NUM_PROCS = 10
processes = [mp.Process(target=count_down, args=(COUNT//NUM_PROCS,)) for x in range(NUM_PROCS)]
start = time.time()
# Run processes
for p in processes:
p.start()
# Wait for the completed processes to exit
for p in processes:
p.join()
end = time.time()
multi_proc = end - start
print "Nothing:\t%.4f" % nothing
print "Forking:\t%.4f" % forking
print "2 Processes:\t%.4f" % two_proc
print "%d Processes:\t%.4f" % (NUM_PROCS, multi_proc)
In [32]:
print "Nothing:\t%.4f" % nothing
print "2 Threads:\t%.4f" % two_thread
print "%d Threads:\t%.4f" % (NUM_THREADS, multi_thread)
print "Forking:\t%.4f" % forking
print "2 Processes:\t%.4f" % two_proc
print "%d Processes:\t%.4f" % (NUM_PROCS, multi_proc)
In [33]:
import subprocess
ls_output = subprocess.check_output(['ls'])
print(ls_output)
The subprocess module is for spawning processes and doing things with their input/output - not for running functions.
In [34]:
from subprocess import Popen, PIPE
import sys
command = "ls -l".split(" ")
proc = Popen(command, cwd='/', stdout=PIPE, stderr=PIPE)
while True:
out = proc.stdout.read(1)
if out == '' and proc.poll() != None:
break
if out != '':
sys.stdout.write(out)
sys.stdout.flush()
In [35]:
from Queue import Queue
from threading import Thread, current_thread, Event
import scraping
q = Queue()
# Pop something out of the queue
def worker(stop_event):
while not stop_event.is_set():
url = q.get() # Internal mutex in queue
print "Thread %s - Downloading '%s...%s'" % (current_thread().name, url[:7], url[len(url)-50:len(url)])
# Tell the queue you are done with the item
# When queue is zero-ed, queue.join no longer blocks
q.task_done()
# Spawn 10 threads
NUM_THREADS = 10
stop_event = Event()
threads = [Thread(target=worker, name=x, args=(stop_event,)) for x in range(NUM_THREADS)]
# Run threads
for t in threads:
t.daemon = True
t.start()
# Put the scraped URLS into the queue
for url in scraping.main():
print "Putting into queue"
q.put(url)
# Wait for the queue to be empty
q.join()
print "The queue is now empty"
stop_event.set()
.kill()
or .terminate()
for processesdaemon
makes it easy for us so we don't have to explicitly exit threads (unless you're in iPython...).kill()
mechanismt.Event()
while not stop_event: do_stuff()
In [36]:
import atexit
procs = []
# No matter what happens, kill all the remaining processes
@atexit.register
def kill_subprocesses():
for proc in procs:
proc.kill()
Lock() v RLock() Another difference is that an acquired Lock can be released by any thread, while an acquired RLock can only be released by the thread which acquired it.
In [37]:
from threading import Thread, RLock
mutex = RLock()
def processData(fp):
mutex.acquire()
fp.write("Some data")
mutex.release()
with open('a_new_file', 'w') as fp:
t = Thread(target = processData, args = (fp,))
t.start()
t.join()
In [40]:
from PIL import Image
from IPython.display import Image as I
from IPython.display import display
import threading
w = 512 # image width
h = 512 # image height
image = Image.new("RGB", (w, h))
wh = w * h
maxIt = 12 # max number of iterations allowed
# drawing region (xa < xb & ya < yb)
xa = -2.0
xb = 1.0
ya = -1.5
yb = 1.5
xd = xb - xa
yd = yb - ya
numThr = 5 # number of threads to run
# lock = threading.Lock()
class ManFrThread(threading.Thread):
def __init__ (self, k):
self.k = k
threading.Thread.__init__(self)
def run(self):
# each thread only calculates its own share of pixels
for i in range(k, wh, numThr):
kx = i % w
ky = int(i / w)
a = xa + xd * kx / (w - 1.0)
b = ya + yd * ky / (h - 1.0)
x = a
y = b
for kc in range(maxIt):
x0 = x * x - y * y + a
y = 2.0 * x * y + b
x = x0
if x * x + y * y > 4:
# various color palettes can be created here
red = (kc % 8) * 32
green = (16 - kc % 16) * 16
blue = (kc % 16) * 16
# lock.acquire()
global image
image.putpixel((kx, ky), (red, green, blue))
# lock.release()
break
tArr = []
for k in range(numThr): # create all threads
tArr.append(ManFrThread(k))
for k in range(numThr): # start all threads
tArr[k].start()
for k in range(numThr): # wait until all threads finished
tArr[k].join()
image.save("MandelbrotFractal.png", "PNG")
i = I(filename='MandelbrotFractal.png')
display(i)
In [41]:
from subprocess import Popen, PIPE
from threading import Thread
from Queue import Queue, Empty
io_q = Queue()
# Sticks anything from the pipe into a queue
def stream_watcher(identifier, stream):
for line in stream:
io_q.put((identifier, line))
if not stream.closed:
stream.close()
# Takes things out of the queue and prints them to the screen as they come
def printer():
while True:
try:
# Block for 1 second.
item = io_q.get(True, 1)
except Empty:
# No output in either streams for a second. Are we done?
if proc.poll() is not None:
break
else:
identifier, line = item
print identifier + ':', line
command = "ls -l".split(" ")
proc = Popen(command, cwd='/usr/local/bin', stdout=PIPE, stderr=PIPE)
stdout_t = Thread(target=stream_watcher,
args=('STDOUT', proc.stdout))
stderr_t = Thread(target=stream_watcher,
args=('STDERR', proc.stderr))
stdout_t.daemon = True; stderr_t.daemon = True
stdout_t.start(); stderr_t.start()
print_t = Thread(target=printer)
print_t.daemon = True
print_t.start()