Using a queue. See https://www.geeksforgeeks.org/coroutine-in-python/
In [55]:
import threading
import queue
process_queue = queue.Queue()
consumer_queue = queue.Queue()
def producer():
for i in range(10):
process_queue.put(i) # Queue is thread safe
print("produced", i)
print("producer: done")
def processor():
while True:
i = process_queue.get()
consumer_queue.put(i+1)
if i == 9:
break
print("process: done")
def consumer():
while True:
i = consumer_queue.get()
print("consumed", i)
if i == 10:
break
print("consumer: done")
p = threading.Thread(target=producer)
i = threading.Thread(target=processor)
c = threading.Thread(target=consumer)
p.start()
i.start()
c.start()
In [54]:
import multiprocessing
process_queue = multiprocessing.Queue()
consumer_queue = multiprocessing.Queue()
def producer():
for i in range(10):
process_queue.put(i) # Queue is thread safe
print("produced", i)
print("producer: done")
def processor():
while True:
i = process_queue.get()
consumer_queue.put(i+1)
if i == 9:
break
print("process: done")
def consumer():
while True:
i = consumer_queue.get()
print("consumed", i)
if i == 10:
break
print("consumer: done")
p = multiprocessing.Process(target=producer)
i = multiprocessing.Process(target=processor)
c = multiprocessing.Process(target=consumer)
p.start()
i.start()
c.start()
In [53]:
def producer(next_task):
for i in range(10):
print("producing", i)
next_task.send(i)
next_task.close()
print("producer: done")
def processor(next_task):
try:
while True:
i = yield
print("processing", i)
next_task.send(i+1)
except GeneratorExit:
print("process: done")
def consumer():
try:
while True:
i = yield
print("consuming", i)
except GeneratorExit:
print("consumer: done")
c = consumer()
c.__next__() # Advance until the first yield
i = processor(c)
i.__next__() # Advance until the first yield
producer(i)
In [43]:
import asyncio
async def producer():
for i in range(10):
print("producing", i)
await processor(i)
async def processor(i):
print("processing", i)
await consumer(i+1)
async def consumer(i):
print("consumed", i)
await producer()
In [49]:
import time
import asyncio
async def producer():
for i in range(10**6):
await processor(i)
async def processor(i):
await consumer(i+1)
async def consumer(i):
pass
now = time.perf_counter()
await producer()
elapsed = time.perf_counter() - now
print(f"\nelapsed {elapsed:0.2f} seconds")
Remember, coroutines are now suitable for CPU-bound problems :-/
In [50]:
import time
def all_together():
for i in range(10**6):
i += 1
now = time.perf_counter()
all_together()
elapsed = time.perf_counter() - now
print(f"\nelapsed {elapsed:0.2f} seconds")