Using threads


In [55]:
import threading
import queue 

process_queue = queue.Queue()
consumer_queue = queue.Queue()

def producer():
    for i in range(10):
        process_queue.put(i) # Queue is thread safe
        print("produced", i)
    print("producer: done")

def processor():
    while True:
        i = process_queue.get()
        consumer_queue.put(i+1)
        if i == 9:
            break
    print("process: done") 

def consumer():
    while True: 
        i = consumer_queue.get()
        print("consumed", i)
        if i == 10:
            break
    print("consumer: done") 

p = threading.Thread(target=producer)
i = threading.Thread(target=processor)
c = threading.Thread(target=consumer)

p.start()
i.start()
c.start()


produced 0
produced 1
produced 2
produced 3
produced 4
producedconsumed 1 
5
produced 6
produced 7
produced 8
produced 9
producer: done
consumed 2
consumed 3
consumed 4
consumed 5
consumed process: done6
consumed 7

consumed 8
consumed 9
consumed 10
consumer: done

Using processes


In [54]:
import multiprocessing

process_queue = multiprocessing.Queue()
consumer_queue = multiprocessing.Queue()

def producer():
    for i in range(10):
        process_queue.put(i) # Queue is thread safe
        print("produced", i)
    print("producer: done")

def processor():
    while True:
        i = process_queue.get()
        consumer_queue.put(i+1)
        if i == 9:
            break
    print("process: done") 

def consumer():
    while True: 
        i = consumer_queue.get()
        print("consumed", i)
        if i == 10:
            break
    print("consumer: done") 

p = multiprocessing.Process(target=producer)
i = multiprocessing.Process(target=processor)
c = multiprocessing.Process(target=consumer)

p.start()
i.start()
c.start()


consumer: done
produced 0
produced 1
produced 2
produced 5
produced 3
produced 4
produced 6
consumed 1
produced 7
consumed 2
produced 8
consumed 3
produced 9
consumed 4
producer: done
consumed 5
consumed 6
process: done
consumed 7
consumed 8
consumed 9
consumed 10
consumer: done

Using coroutines


In [53]:
def producer(next_task):
    for i in range(10):
        print("producing", i)
        next_task.send(i) 
    next_task.close()
    print("producer: done")

def processor(next_task): 
    try: 
        while True: 
            i = yield
            print("processing", i)
            next_task.send(i+1)
    except GeneratorExit: 
        print("process: done") 

def consumer():
    try: 
        while True: 
            i = yield
            print("consuming", i) 
    except GeneratorExit: 
        print("consumer: done") 

c = consumer()
c.__next__() # Advance until the first yield
i = processor(c)
i.__next__() # Advance until the first yield
producer(i)


consumer: done
producing 0
processing 0
consuming 1
producing 1
processing 1
consuming 2
producing 2
processing 2
consuming 3
producing 3
processing 3
consuming 4
producing 4
processing 4
consuming 5
producing 5
processing 5
consuming 6
producing 6
processing 6
consuming 7
producing 7
processing 7
consuming 8
producing 8
processing 8
consuming 9
producing 9
processing 9
consuming 10
process: done
producer: done

In [43]:
import asyncio

async def producer():
    for i in range(10):
        print("producing", i)
        await processor(i)

async def processor(i):
    print("processing", i)
    await consumer(i+1)

async def consumer(i):
    print("consumed", i)

await producer()


producing 0
processing 0
consumed 1
producing 1
processing 1
consumed 2
producing 2
processing 2
consumed 3
producing 3
processing 3
consumed 4
producing 4
processing 4
consumed 5
producing 5
processing 5
consumed 6
producing 6
processing 6
consumed 7
producing 7
processing 7
consumed 8
producing 8
processing 8
consumed 9
producing 9
processing 9
consumed 10

In [49]:
import time
import asyncio

async def producer():
    for i in range(10**6):
        await processor(i)

async def processor(i):
    await consumer(i+1)

async def consumer(i):
    pass

now = time.perf_counter()
await producer()
elapsed = time.perf_counter() - now
print(f"\nelapsed {elapsed:0.2f} seconds")


elapsed 0.50 seconds

Remember, coroutines are now suitable for CPU-bound problems :-/


In [50]:
import time

def all_together():
    for i in range(10**6):
        i += 1
now = time.perf_counter()
all_together()
elapsed = time.perf_counter() - now
print(f"\nelapsed {elapsed:0.2f} seconds")


elapsed 0.14 seconds