queue — Implementación FIFO segura para hilos¶
Propósito: | Proporciona una implementación FIFO segura para hilos |
---|
El módulo queue
proporciona una estructura datos de primero en entrar,
primero en salir (FIFO) adecuada para la programación de múltiples hilos. Se
puede usar para pasar mensajes u otros datos entre los hilos del productor y
del consumidor sin peligro. El bloqueo se maneja para la persona que llama,
por lo que muchos hilos pueden funcionar con la misma instancia Queue
de
forma segura y fácil. El tamaño de un Queue
(la cantidad de elementos que
contiene) puede estar restringida para acelerar el uso de la memoria o el
procesamiento.
Nota
Esta discusión asume que ya comprendes la naturaleza general de una cola. Si no lo haces, es posible que desees leer algunas de las referencias antes de continuar.
Cola FIFO básica¶
La clase Queue
implementa un envase primero en entrar, primero en salir.
Los elementos se agregan a un «final» de la secuencia usando put()
, y
eliminado del otro extremo usando get()
.
import queue
q = queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
Este ejemplo usa un solo hilo para ilustrar que los elementos son eliminados de la cola en el mismo orden en que se insertaron.
$ python3 queue_fifo.py
0 1 2 3 4
Cola LIFO¶
A diferencia de la implementación FIFO estándar de Queue
, la LifoQueue
utiliza ordenamiento de último en entrar, primero en salir (normalmente
asociadocon una estructura de datos de pila).
import queue
q = queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
El ítem más recientemente put
en la cola es removido mediante get
.
$ python3 queue_lifo.py
4 3 2 1 0
Colas de prioridad¶
A veces, el orden de procesamiento de los elementos en una cola debe ser basado
en las características de esos elementos, en lugar de solo el orden en que se
crean o se agregan a la cola. Por ejemplo, imprimir trabajos desde el
departamento de nómina puede tener prioridad sobre una lista de códigos que un
desarrollador quiere imprimir. PriorityQueue
utiliza el orden de el
contenido de la cola para decidir qué elemento recuperar.
import functools
import queue
import threading
@functools.total_ordering
class Job:
def __init__(self, priority, description):
self.priority = priority
self.description = description
print('New job:', description)
return
def __eq__(self, other):
try:
return self.priority == other.priority
except AttributeError:
return NotImplemented
def __lt__(self, other):
try:
return self.priority < other.priority
except AttributeError:
return NotImplemented
q = queue.PriorityQueue()
q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))
def process_job(q):
while True:
next_job = q.get()
print('Processing job:', next_job.description)
q.task_done()
workers = [
threading.Thread(target=process_job, args=(q,)),
threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
w.setDaemon(True)
w.start()
q.join()
Este ejemplo tiene varios subprocesos que consumen los trabajos, que son
procesado en función de la prioridad de los elementos en la cola en el momento
en que get()
fue llamada. El orden de procesamiento de los elementos
agregados ala cola mientras los hilos del consumidor se están ejecutando
depende del hilocambio de contexto.
$ python3 queue_priority.py
New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job
Creando un cliente de Podcast con hilos¶
El código fuente para el cliente de podcasting en esta sección demuestra cómo
usar la clase Queue
con múltiples hilos. El programa lee uno o más fentes
RSS, pone en cola los cinco episodios más recientes de cada fuente para
descargar, y procesa varias descargas en paralelo usando hilos. No tiene
suficiente manejo de errores para uso de producción, pero el la implementación
esqueleto ilustra el uso del módulo queue
.
Primero, se establecen algunos parámetros operativos. Por lo general, estos provienen de las entradas del usuario (por ejemplo, preferencias o una base de datos). El ejemplo utiliza valores codificados para el número de subprocesos y la lista de URL para extraher.
from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse
import feedparser
# Set up some global variables
num_fetch_threads = 2
enclosure_queue = Queue()
# A real app wouldn't use hard-coded data...
feed_urls = [
'http://talkpython.fm/episodes/rss',
]
def message(s):
print('{}: {}'.format(threading.current_thread().name, s))
La función download_enclosures()
se ejecuta en el hilo de trabajo y procesa
las descargas usando urllib
.
def download_enclosures(q):
"""This is the worker thread function.
It processes items in the queue one after
another. These daemon threads go into an
infinite loop, and exit only when
the main thread ends.
"""
while True:
message('looking for the next enclosure')
url = q.get()
filename = url.rpartition('/')[-1]
message('downloading {}'.format(filename))
response = urllib.request.urlopen(url)
data = response.read()
# Save the downloaded file to the current directory
message('writing to {}'.format(filename))
with open(filename, 'wb') as outfile:
outfile.write(data)
q.task_done()
Una vez que se define la función de destino para los hilos, el hilo trabajador
pueden ser iniciado. Cuando download_enclosures()
procesa la declaración
url = q.get()
, bloquea y espera hasta que la cola tenga algo para regresar.
Eso significa que es seguro iniciar los hilo santes de que haya algo en la
cola.
# Set up some threads to fetch the enclosures
for i in range(num_fetch_threads):
worker = threading.Thread(
target=download_enclosures,
args=(enclosure_queue,),
name='worker-{}'.format(i),
)
worker.setDaemon(True)
worker.start()
El siguiente paso es recuperar los contenidos de la fuente utilizando el módulo
feedparser
y añadir las URLs de los enclosures a la cola. Tan pronto como
la primera URL se agrega a la cola, uno de los hilos de trabajo la recoge y
comienza a descargarla. El ciclo continúa agregando elementos hasta que se
agote la fuente, y los hilos trabajadores tomen turnos sacandos las URL de la
cola para descargarlas.
# Download the feed(s) and put the enclosure URLs into
# the queue.
for url in feed_urls:
response = feedparser.parse(url, agent='fetch_podcasts.py')
for entry in response['entries'][:5]:
for enclosure in entry.get('enclosures', []):
parsed_url = urlparse(enclosure['url'])
message('queuing {}'.format(
parsed_url.path.rpartition('/')[-1]))
enclosure_queue.put(enclosure['url'])
Lo único que queda es esperar a que la cola se vacíe nuevamente, usando
join()
.
# Now wait for the queue to be empty, indicating that we have
# processed all of the downloads.
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')
Ejecutar del script de muestra produce resultados similares a los siguientes.
$ python3 fetch_podcasts.py
worker-0: looking for the next enclosure
worker-1: looking for the next enclosure
MainThread: queuing turbogears-and-the-future-of-python-web-frameworks.mp3
MainThread: queuing continuum-scientific-python-and-the-business-of-open-source.mp3
MainThread: queuing openstack-cloud-computing-built-on-python.mp3
MainThread: queuing pypy.js-pypy-python-in-your-browser.mp3
MainThread: queuing machine-learning-with-python-and-scikit-learn.mp3
MainThread: *** main thread waiting
worker-0: downloading turbogears-and-the-future-of-python-web-frameworks.mp3
worker-1: downloading continuum-scientific-python-and-the-business-of-open-source.mp3
worker-0: looking for the next enclosure
worker-0: downloading openstack-cloud-computing-built-on-python.mp3
worker-1: looking for the next enclosure
worker-1: downloading pypy.js-pypy-python-in-your-browser.mp3
worker-0: looking for the next enclosure
worker-0: downloading machine-learning-with-python-and-scikit-learn.mp3
worker-1: looking for the next enclosure
worker-0: looking for the next enclosure
MainThread: *** done
La salida real dependerá del contenido de la fuente RSS utilizada.
Ver también
- Standard library documentation for queue
- deque — Cola doblemente terminada de
collections
- Cola (informática) – artículo de Wikipedia que explica la cola.
- First in, first out – artículo de Wikipedia que explica la estructura ded datos first in, first out.
- módulo feedparser – Un módulo para analizar fuentes RSS y Atom, creado por Mark Pilgrim y maintanido por Kurt McKee.