Purpose

  • Provide Thread-Safe FIFO Implementation
  • multi-producer, multi-consumer queue

Basic FIFO Queue

The Queue class implements a basic first-in, first-out container. Element are added to one "end" of the sequence using put(), and removed from the other using get()


In [ ]:
import queue

In [ ]:
q = queue.Queue()

for i in range(5):
    q.put(i)
    
while not q.empty():
    print(q.get(), end=' ')

This example uses a single thread to illustrate that elements are removed from the queue in the same order in which they are inserted

LIFO Queue

In contrast to the standard FIFO implementation of Queue, the LifoQueue uses last-in, first-out ordering (normally associated with a stack data structure)


In [ ]:
import queue

q = queue.LifoQueue()

In [ ]:
for i in range(5):
    q.put(i)
    
while not q.empty():
    print(q.get(), end=' ')

Priority Queue

Sometimes the processing order of the items in a queue needs to be based on characteristics of those items, rather than just the order they are created or added to the queue. For example, print jobs from the payroll department may take precedence over a code listing that a developer wants to print. PriorityQueue uses the sort order of the contents of the queue to decide which item to retrieve.


In [ ]:
import functools
import queue
import threading
import time


@functools.total_ordering
class Job:

    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print('New job:', description)
        return

    def __eq__(self, other):
        try:
            return self.priority == other.priority
        except AttributeError:
            return NotImplemented

    def __lt__(self, other):
        try:
            return self.priority < other.priority
        except AttributeError:
            return NotImplemented
        
q = queue.PriorityQueue()

q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))


time.sleep(3)

def process_job(q):
    while True:
        next_job = q.get()
        print('Processing job:', next_job.description)
        q.task_done()


workers = [
    threading.Thread(target=process_job, args=(q,)),
#      threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
    w.setDaemon(True)
    w.start()

q.join()

This example has multiple threads consuming the jobs, which are processed based on the priority of items in the queue at the time get() was called. The order of processing for items added to the queue while the consumer threads are running depends on thread context switching.

Building a Threaded Podcast Client


In [ ]:
# %load fetch_podcasts.py
# First, some operating parameters are established. 
# Usually, these would come from user inputs 
# (e.g., preferences or a database). The example uses hard-coded 
# values for the number of threads and list of URLs to fetch.

from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse

import feedparser

# Set up some global variables
num_fetch_threads = 2
enclosure_queue = Queue()

# A real app wouldn't use hard-coded data...
feed_urls = [
    'http://talkpython.fm/episodes/rss',
]


def message(s):
    print('{}: {}'.format(threading.current_thread().name, s))

# The function download_enclosures() runs in the worker thread 
# and processes the downloads using urllib.    
 
def download_enclosures(q):
    """This is the worker thread function.
    It processes items in the queue one after
    another.  These daemon threads go into an
    infinite loop, and exit only when
    the main thread ends.
    """
    while True:
        message('looking for the next enclosure')
        url = q.get()
        filename = url.rpartition('/')[-1]
        message('downloading {}'.format(filename))
        response = urllib.request.urlopen(url)
        data = response.read()
        # Save the downloaded file to the current directory
        message('writing to {}'.format(filename))
        with open(filename, 'wb') as outfile:
            outfile.write(data)
        q.task_done()

# Set up some threads to fetch the enclosures
for i in range(num_fetch_threads):
    worker = threading.Thread(
        target=download_enclosures,
        args=(enclosure_queue,),
        name='worker-{}'.format(i),
    )
    worker.setDaemon(True)
    worker.start()
    

# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
    response = feedparser.parse(url, agent='fetch_podcasts.py')
    for entry in response['entries'][:5]:
        for enclosure in entry.get('enclosures', []):
            parsed_url = urlparse(enclosure['url'])
            message('queuing {}'.format(
                parsed_url.path.rpartition('/')[-1]))
            enclosure_queue.put(enclosure['url'])

# Now wait for the queue to be empty, indicating that we have
# processed all of the downloads.
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')

In [26]:
!python fetch_podcasts.py


worker-0: looking for the next enclosure
worker-1: looking for the next enclosure
worker-1: writing to 10-books-python-developers-should-be-reading.mp3
worker-1: looking for the next enclosure
MainThread: *** main thread waiting
MainThread: *** done

In [27]:
# remove all download files
!rm *.mp3

Join and task_done

Queue.task_done and Queue.join is used closely:

  • When you call join, the thread call it will block untile all item in the queue have been gotten and processed.

  • The count of unfinised tasks goes up whenever an item is added to the queue.

  • The count goes down whenever a consumer call task_done to indicate that the item was retrived and all work on it is complete. When the count of unfinised tasks drops to zero, join unblocks.


In [ ]: