queue — Implementación FIFO segura para hilos

Propósito:Proporciona una implementación FIFO segura para hilos

El módulo queue proporciona una estructura datos de primero en entrar, primero en salir (FIFO) adecuada para la programación de múltiples hilos. Se puede usar para pasar mensajes u otros datos entre los hilos del productor y del consumidor sin peligro. El bloqueo se maneja para la persona que llama, por lo que muchos hilos pueden funcionar con la misma instancia Queue de forma segura y fácil. El tamaño de un Queue (la cantidad de elementos que contiene) puede estar restringida para acelerar el uso de la memoria o el procesamiento.

Nota

Esta discusión asume que ya comprendes la naturaleza general de una cola. Si no lo haces, es posible que desees leer algunas de las referencias antes de continuar.

Cola FIFO básica

La clase Queue implementa un envase primero en entrar, primero en salir. Los elementos se agregan a un «final» de la secuencia usando put(), y eliminado del otro extremo usando get().

queue_fifo.py
import queue

q = queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

Este ejemplo usa un solo hilo para ilustrar que los elementos son eliminados de la cola en el mismo orden en que se insertaron.

$ python3 queue_fifo.py

0 1 2 3 4

Cola LIFO

A diferencia de la implementación FIFO estándar de Queue, la LifoQueue utiliza ordenamiento de último en entrar, primero en salir (normalmente asociadocon una estructura de datos de pila).

queue_lifo.py
import queue

q = queue.LifoQueue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

El ítem más recientemente put en la cola es removido mediante get.

$ python3 queue_lifo.py

4 3 2 1 0

Colas de prioridad

A veces, el orden de procesamiento de los elementos en una cola debe ser basado en las características de esos elementos, en lugar de solo el orden en que se crean o se agregan a la cola. Por ejemplo, imprimir trabajos desde el departamento de nómina puede tener prioridad sobre una lista de códigos que un desarrollador quiere imprimir. PriorityQueue utiliza el orden de el contenido de la cola para decidir qué elemento recuperar.

queue_priority.py
import functools
import queue
import threading


@functools.total_ordering
class Job:

    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print('New job:', description)
        return

    def __eq__(self, other):
        try:
            return self.priority == other.priority
        except AttributeError:
            return NotImplemented

    def __lt__(self, other):
        try:
            return self.priority < other.priority
        except AttributeError:
            return NotImplemented


q = queue.PriorityQueue()

q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))


def process_job(q):
    while True:
        next_job = q.get()
        print('Processing job:', next_job.description)
        q.task_done()


workers = [
    threading.Thread(target=process_job, args=(q,)),
    threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
    w.setDaemon(True)
    w.start()

q.join()

Este ejemplo tiene varios subprocesos que consumen los trabajos, que son procesado en función de la prioridad de los elementos en la cola en el momento en que get() fue llamada. El orden de procesamiento de los elementos agregados ala cola mientras los hilos del consumidor se están ejecutando depende del hilocambio de contexto.

$ python3 queue_priority.py

New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job

Creando un cliente de Podcast con hilos

El código fuente para el cliente de podcasting en esta sección demuestra cómo usar la clase Queue con múltiples hilos. El programa lee uno o más fentes RSS, pone en cola los cinco episodios más recientes de cada fuente para descargar, y procesa varias descargas en paralelo usando hilos. No tiene suficiente manejo de errores para uso de producción, pero el la implementación esqueleto ilustra el uso del módulo queue.

Primero, se establecen algunos parámetros operativos. Por lo general, estos provienen de las entradas del usuario (por ejemplo, preferencias o una base de datos). El ejemplo utiliza valores codificados para el número de subprocesos y la lista de URL para extraher.

fetch_podcasts.py
from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse

import feedparser

# Set up some global variables
num_fetch_threads = 2
enclosure_queue = Queue()

# A real app wouldn't use hard-coded data...
feed_urls = [
    'http://talkpython.fm/episodes/rss',
]


def message(s):
    print('{}: {}'.format(threading.current_thread().name, s))

La función download_enclosures() se ejecuta en el hilo de trabajo y procesa las descargas usando urllib.

def download_enclosures(q):
    """This is the worker thread function.
    It processes items in the queue one after
    another.  These daemon threads go into an
    infinite loop, and exit only when
    the main thread ends.
    """
    while True:
        message('looking for the next enclosure')
        url = q.get()
        filename = url.rpartition('/')[-1]
        message('downloading {}'.format(filename))
        response = urllib.request.urlopen(url)
        data = response.read()
        # Save the downloaded file to the current directory
        message('writing to {}'.format(filename))
        with open(filename, 'wb') as outfile:
            outfile.write(data)
        q.task_done()

Una vez que se define la función de destino para los hilos, el hilo trabajador pueden ser iniciado. Cuando download_enclosures() procesa la declaración url = q.get(), bloquea y espera hasta que la cola tenga algo para regresar. Eso significa que es seguro iniciar los hilo santes de que haya algo en la cola.

# Set up some threads to fetch the enclosures
for i in range(num_fetch_threads):
    worker = threading.Thread(
        target=download_enclosures,
        args=(enclosure_queue,),
        name='worker-{}'.format(i),
    )
    worker.setDaemon(True)
    worker.start()

El siguiente paso es recuperar los contenidos de la fuente utilizando el módulo feedparser y añadir las URLs de los enclosures a la cola. Tan pronto como la primera URL se agrega a la cola, uno de los hilos de trabajo la recoge y comienza a descargarla. El ciclo continúa agregando elementos hasta que se agote la fuente, y los hilos trabajadores tomen turnos sacandos las URL de la cola para descargarlas.

# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
    response = feedparser.parse(url, agent='fetch_podcasts.py')
    for entry in response['entries'][:5]:
        for enclosure in entry.get('enclosures', []):
            parsed_url = urlparse(enclosure['url'])
            message('queuing {}'.format(
                parsed_url.path.rpartition('/')[-1]))
            enclosure_queue.put(enclosure['url'])

Lo único que queda es esperar a que la cola se vacíe nuevamente, usando join().

# Now wait for the queue to be empty, indicating that we have
# processed all of the downloads.
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')

Ejecutar del script de muestra produce resultados similares a los siguientes.

$ python3 fetch_podcasts.py

worker-0: looking for the next enclosure
worker-1: looking for the next enclosure
MainThread: queuing turbogears-and-the-future-of-python-web-frameworks.mp3
MainThread: queuing continuum-scientific-python-and-the-business-of-open-source.mp3
MainThread: queuing openstack-cloud-computing-built-on-python.mp3
MainThread: queuing pypy.js-pypy-python-in-your-browser.mp3
MainThread: queuing machine-learning-with-python-and-scikit-learn.mp3
MainThread: *** main thread waiting
worker-0: downloading turbogears-and-the-future-of-python-web-frameworks.mp3
worker-1: downloading continuum-scientific-python-and-the-business-of-open-source.mp3
worker-0: looking for the next enclosure
worker-0: downloading openstack-cloud-computing-built-on-python.mp3
worker-1: looking for the next enclosure
worker-1: downloading pypy.js-pypy-python-in-your-browser.mp3
worker-0: looking for the next enclosure
worker-0: downloading machine-learning-with-python-and-scikit-learn.mp3
worker-1: looking for the next enclosure
worker-0: looking for the next enclosure
MainThread: *** done

La salida real dependerá del contenido de la fuente RSS utilizada.

Ver también