The Queue class implements a basic first-in, first-out container. Element are added to one "end" of the sequence using put()
, and removed from the other using get()
In [ ]:
import queue
In [ ]:
q = queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
This example uses a single thread to illustrate that elements are removed from the queue in the same order in which they are inserted
In contrast to the standard FIFO implementation of Queue, the LifoQueue uses last-in, first-out ordering (normally associated with a stack data structure)
In [ ]:
import queue
q = queue.LifoQueue()
In [ ]:
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
Sometimes the processing order of the items in a queue needs to be based on characteristics of those items, rather than just the order they are created or added to the queue. For example, print jobs from the payroll department may take precedence over a code listing that a developer wants to print. PriorityQueue uses the sort order of the contents of the queue to decide which item to retrieve.
In [ ]:
import functools
import queue
import threading
import time
@functools.total_ordering
class Job:
def __init__(self, priority, description):
self.priority = priority
self.description = description
print('New job:', description)
return
def __eq__(self, other):
try:
return self.priority == other.priority
except AttributeError:
return NotImplemented
def __lt__(self, other):
try:
return self.priority < other.priority
except AttributeError:
return NotImplemented
q = queue.PriorityQueue()
q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))
time.sleep(3)
def process_job(q):
while True:
next_job = q.get()
print('Processing job:', next_job.description)
q.task_done()
workers = [
threading.Thread(target=process_job, args=(q,)),
# threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
w.setDaemon(True)
w.start()
q.join()
This example has multiple threads consuming the jobs, which are processed based on the priority of items in the queue at the time get() was called. The order of processing for items added to the queue while the consumer threads are running depends on thread context switching.
In [ ]:
# %load fetch_podcasts.py
# First, some operating parameters are established.
# Usually, these would come from user inputs
# (e.g., preferences or a database). The example uses hard-coded
# values for the number of threads and list of URLs to fetch.
from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse
import feedparser
# Set up some global variables
num_fetch_threads = 2
enclosure_queue = Queue()
# A real app wouldn't use hard-coded data...
feed_urls = [
'http://talkpython.fm/episodes/rss',
]
def message(s):
print('{}: {}'.format(threading.current_thread().name, s))
# The function download_enclosures() runs in the worker thread
# and processes the downloads using urllib.
def download_enclosures(q):
"""This is the worker thread function.
It processes items in the queue one after
another. These daemon threads go into an
infinite loop, and exit only when
the main thread ends.
"""
while True:
message('looking for the next enclosure')
url = q.get()
filename = url.rpartition('/')[-1]
message('downloading {}'.format(filename))
response = urllib.request.urlopen(url)
data = response.read()
# Save the downloaded file to the current directory
message('writing to {}'.format(filename))
with open(filename, 'wb') as outfile:
outfile.write(data)
q.task_done()
# Set up some threads to fetch the enclosures
for i in range(num_fetch_threads):
worker = threading.Thread(
target=download_enclosures,
args=(enclosure_queue,),
name='worker-{}'.format(i),
)
worker.setDaemon(True)
worker.start()
# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
response = feedparser.parse(url, agent='fetch_podcasts.py')
for entry in response['entries'][:5]:
for enclosure in entry.get('enclosures', []):
parsed_url = urlparse(enclosure['url'])
message('queuing {}'.format(
parsed_url.path.rpartition('/')[-1]))
enclosure_queue.put(enclosure['url'])
# Now wait for the queue to be empty, indicating that we have
# processed all of the downloads.
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')
In [26]:
!python fetch_podcasts.py
In [27]:
# remove all download files
!rm *.mp3
Queue.task_done
and Queue.join
is used closely:
When you call join
, the thread call it will block untile all item in the queue have been gotten and processed.
The count of unfinised tasks goes up whenever an item is added to the queue.
The count goes down whenever a consumer call task_done
to indicate that the item was retrived and all work on it is complete. When the count of unfinised tasks drops to zero, join
unblocks.
In [ ]: