Using threads


In [ ]:
import threading
import time
import random
import queue

queue_length = 10
q = queue.Queue(queue_length)

class Producer(threading.Thread):
    def __init__(self, iters=10):
        super(Producer,self).__init__()
        self.iters = iters

    def run(self):
        i = 0
        while i < self.iters:
            if not q.full():
                item = random.randint(1,10)
                q.put(item)
                print('Produced {} (queue length = {})'.format(item,q.qsize()))
                i += 1
                time.sleep(random.random())

class Consumer(threading.Thread):
    def __init__(self, iters=10):
        super(Consumer,self).__init__()
        self.iters = iters

    def run(self):
        i = 0
        while i < self.iters:
            item = q.get()
            print('Consumed {} (queue length = {})'.format(item,q.qsize()))
            i += 1
            time.sleep(random.random())

p = Producer()
p.start()
c = Consumer()
c.start()

p.join()
c.join()

print('done')

Using processes


In [ ]:
import multiprocessing
import time
import random

class Producer():
    def __init__(self, q, iters=10):
        self.iters = iters
        self.q = q

    def run(self):
        i = 0
        while i < self.iters:
            if not self.q.full():
                item = random.randint(1,10)
                self.q.put(item)
                print('Produced {} (queue length = {})'.format(item, self.q.qsize()))
                i += 1
                time.sleep(random.random())
    
    def start(self):
        process = multiprocessing.Process(target=self.run)
        process.start()
        return process

class Consumer():
    def __init__(self, q, iters=10):
        self.iters = iters
        self.q = q

    def run(self):
        i = 0
        while i < self.iters:
            item = self.q.get()
            print('Consumed {} (queue length = {})'.format(item, self.q.qsize()))
            i += 1
            time.sleep(random.random())

    def start(self):
        process = multiprocessing.Process(target=self.run)
        process.start()
        return process

queue_length = 10
q = multiprocessing.Queue(queue_length)

p = Producer(q)
task_p = p.start()
c = Consumer(q)
task_c = c.start()

task_p.join()
task_c.join()

print('done')

Using coroutines


In [ ]:
import time
import asyncio

async def producer():
    for i in range(10**6):
        await processor(i)

async def processor(i):
    await consumer(i+1)

async def consumer(i):
    pass

now = time.perf_counter()
await producer()
elapsed = time.perf_counter() - now
print(f"\nelapsed {elapsed:0.2f} seconds")

In [4]:
import asyncio
import time
import random
import queue

class Producer():
    def __init__(self, q, iters=10):
        self.iters = iters
        self.q = q

    async def run(self):
        i = 0
        while i < self.iters:
            if not self.q.full():
                item = random.randint(1,10)
                self.q.put(item)
                print('Produced {} (queue length = {})'.format(item, self.q.qsize()))
                i += 1
            await asyncio.sleep(random.random())

class Consumer():
    def __init__(self, q, iters=10):
        self.iters = iters
        self.q = q

    async def run(self):
        i = 0
        while i < self.iters:
            if not self.q.empty():
                item = self.q.get()
                print('Consumed {} (queue length = {})'.format(item, self.q.qsize()))
                i += 1
            await asyncio.sleep(random.random())

queue_length = 10
q = queue.Queue(queue_length)

p = Producer(q)
c = Consumer(q)
await asyncio.gather(p.run(), c.run())

print('done')


Produced 6 (queue length = 1)
Consumed 6 (queue length = 0)
Produced 2 (queue length = 1)
Consumed 2 (queue length = 0)
Produced 6 (queue length = 1)
Consumed 6 (queue length = 0)
Produced 4 (queue length = 1)
Consumed 4 (queue length = 0)
Produced 2 (queue length = 1)
Produced 4 (queue length = 2)
Consumed 2 (queue length = 1)
Produced 5 (queue length = 2)
Produced 10 (queue length = 3)
Produced 4 (queue length = 4)
Consumed 4 (queue length = 3)
Produced 6 (queue length = 4)
Consumed 5 (queue length = 3)
Consumed 10 (queue length = 2)
Consumed 4 (queue length = 1)
Consumed 6 (queue length = 0)
done

In [ ]: