In [1]:
import multiprocessing
from contextlib import closing
def parallel(worker, data):
with closing(multiprocessing.Pool(multiprocessing.cpu_count())) as pool:
for result in pool.map(worker, data):
yield result
In [2]:
import time
import random
def simple_worker(data):
wait_seconds = random.randint(1, 10)
print("Sleep %d seconds" % wait_seconds)
time.sleep(wait_seconds)
return data
list(parallel(simple_worker, list(range(10))))
Out[2]:
In [13]:
class Consumer(multiprocessing.Process):
def __init__(self, result_queue):
multiprocessing.Process.__init__(self)
self.result_queue = result_queue
self.keep_running = True
def run(self):
while self.keep_running:
result = self.result_queue.get()
print("Consumer receives %s" % result)
print("consumer stops now")
class Producer(multiprocessing.Process):
def __init__(self, result_queue, value):
multiprocessing.Process.__init__(self)
self.result_queue = result_queue
self.value = value
self.start()
def run(self):
wait_seconds = random.randint(1, 10)
print("Sleep %d seconds" % wait_seconds)
time.sleep(wait_seconds)
self.result_queue.put(self.value)
# Establish communication queues
result_queue = multiprocessing.Queue()
consumer = Consumer(result_queue)
consumer.start()
tasks = [Producer(result_queue, idx) for idx in range(10)]
for task in tasks:
task.join()
print("Waiting for consumer")
consumer.keep_running = False
result_queue.close()
consumer.terminate()
print("done")
In [ ]: