threading — Gestionar operaciones concurrentes dentro de un proceso¶
Propósito: | Gestiona varios hilos de ejecución. |
---|
Usar hilos permite que un programa ejecute múltiples operaciones simultáneamente en el mismo espacio de proceso.
Objetos Thread¶
La forma más sencilla de usar un Thread
es crear una instancia con un
función de destino y llamar a start()
para que comience a funcionar.
import threading
def worker():
"""thread worker function"""
print('Worker')
threads = []
for i in range(5):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
La salida son cinco líneas con "Worker"
en cada una.
$ python3 threading_simple.py
Worker
Worker
Worker
Worker
Worker
Es útil poder generar un hilo y pasarle argumentos para decirle que trabajo hacer. Cualquier tipo de objeto puede ser pasado como argumento al hilo. Este ejemplo pasa un número, que luego el hilo imprime.
import threading
def worker(num):
"""thread worker function"""
print('Worker: %s' % num)
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
El argumento entero ahora está incluido en el mensaje impreso por cada hilo.
$ python3 threading_simpleargs.py
Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4
Determinar el hilo actual¶
Usar argumentos para identificar o nombrar el hilo es engorroso e innecesario.
Cada instancia Thread
tiene un nombre con un valor predeterminado que se
puede cambiar cuando se crea el hilo. Nombrar hilos es útil en procesos de
servidor con múltiples hilos de servicio manejando diferentes operaciones.
import threading
import time
def worker():
print(threading.current_thread().getName(), 'Starting')
time.sleep(0.2)
print(threading.current_thread().getName(), 'Exiting')
def my_service():
print(threading.current_thread().getName(), 'Starting')
time.sleep(0.3)
print(threading.current_thread().getName(), 'Exiting')
t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # use default name
w.start()
w2.start()
t.start()
La salida de depuración incluye el nombre del hilo actual en cada línea. Las
líneas con "Thread-1"
en la columna de nombre de hilo corresponden al hilo
sin nombre w2
.
$ python3 threading_names.py
worker Starting
Thread-1 Starting
my_service Starting
worker Exiting
Thread-1 Exiting
my_service Exiting
La mayoría de los programas no usan print
para depurar. El módulo
logging
admite incrustar el nombre del hilo en cada mensaje de registro
usando el código del formateador %(threadName)s
. Incluir los nombres de
hilo en los mensajes de registro hacen posible rastrear esos mensajes hasta su
origen.
import logging
import threading
import time
def worker():
logging.debug('Starting')
time.sleep(0.2)
logging.debug('Exiting')
def my_service():
logging.debug('Starting')
time.sleep(0.3)
logging.debug('Exiting')
logging.basicConfig(
level=logging.DEBUG,
format='[%(levelname)s] (%(threadName)-10s) %(message)s',
)
t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker) # use default name
w.start()
w2.start()
t.start()
logging
también es seguro para subprocesos, por lo que los mensajes de
diferentes subprocesos se mantienen distintos en la salida.
$ python3 threading_names_log.py
[DEBUG] (worker ) Starting
[DEBUG] (Thread-1 ) Starting
[DEBUG] (my_service) Starting
[DEBUG] (worker ) Exiting
[DEBUG] (Thread-1 ) Exiting
[DEBUG] (my_service) Exiting
Hilos de Daemon vs. No-Daemon¶
Hasta este punto, los programas de ejemplo han esperado implícitamente para
salir hasta que todos los hilos hayan completado su trabajo. A veces los
programas generan un hilo como un demonio que se ejecuta sin bloquear el
programa principal de salir. El uso de hilos de demonio es útil para servicios
donde puede que no haya una manera fácil de interrumpir el hilo, o donde dejar
que el el hilo muera en medio de su trabajo, no pierde ni corrompe los datos
(por ejemplo, un hilo que genera «latidos del corazón» para una herramienta de
monitoreo de servicio). Para marcar un hilo como demonio, pasa daemon=True
al construirlo o llama a su método set_daemon()
con True
. El valor
predeterminado es que los subprocesos no sean demonios.
import threading
import time
import logging
def daemon():
logging.debug('Starting')
time.sleep(0.2)
logging.debug('Exiting')
def non_daemon():
logging.debug('Starting')
logging.debug('Exiting')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
La salida no incluye el mensaje "Exiting"
del hilo demonio, ya que todos
los hilos no demonio (incluyendo el hilo principal) terminan antes de que el
hilo demonio se despierte de la llamada sleep()
.
$ python3 threading_daemon.py
(daemon ) Starting
(non-daemon) Starting
(non-daemon) Exiting
Para esperar hasta que un subproceso demonio haya completado su trabajo, usa el
método join()
.
import threading
import time
import logging
def daemon():
logging.debug('Starting')
time.sleep(0.2)
logging.debug('Exiting')
def non_daemon():
logging.debug('Starting')
logging.debug('Exiting')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
d.join()
t.join()
Esperar que el hilo demonio termine usando join()
significa que tiene la
oportunidad de producir su mensaje "Exiting"
.
$ python3 threading_daemon_join.py
(daemon ) Starting
(non-daemon) Starting
(non-daemon) Exiting
(daemon ) Exiting
Por defecto, join()
bloquea indefinidamente. También es posible pasar un
valor flotante que represente el número de segundos a esperar por el hilo para
convertirse en inactivo. Si el hilo no se completa dentro del período de
tiempo de espera, join()
retorna de todos modos.
import threading
import time
import logging
def daemon():
logging.debug('Starting')
time.sleep(0.2)
logging.debug('Exiting')
def non_daemon():
logging.debug('Starting')
logging.debug('Exiting')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='daemon', target=daemon, daemon=True)
t = threading.Thread(name='non-daemon', target=non_daemon)
d.start()
t.start()
d.join(0.1)
print('d.isAlive()', d.isAlive())
t.join()
Dado que el tiempo de espera transcurrido es menor que la cantidad de tiempo
que el hilo demonio duerme, el hilo sigue «vivo» después de join()
retorna.
$ python3 threading_daemon_join_timeout.py
(daemon ) Starting
(non-daemon) Starting
(non-daemon) Exiting
d.isAlive() True
Enumerar todos los hilos¶
No es necesario conservar un identificador explícito para todos los hilos
demonio para asegurar que se han completado antes de salir del proceso
principal. enumerate()
devuelve una lista de instancia de Thread
activas. La lista incluye el hilo actual, y ya que se unir al hilo actual
introduce una situación de interbloqueo, se debe omitir.
import random
import threading
import time
import logging
def worker():
"""thread worker function"""
pause = random.randint(1, 5) / 10
logging.debug('sleeping %0.2f', pause)
time.sleep(pause)
logging.debug('ending')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(3):
t = threading.Thread(target=worker, daemon=True)
t.start()
main_thread = threading.main_thread()
for t in threading.enumerate():
if t is main_thread:
continue
logging.debug('joining %s', t.getName())
t.join()
Debido a que el trabajador está durmiendo durante un período de tiempo aleatorio, la salida de este programa puede variar.
$ python3 threading_enumerate.py
(Thread-1 ) sleeping 0.20
(Thread-2 ) sleeping 0.30
(Thread-3 ) sleeping 0.40
(MainThread) joining Thread-1
(Thread-1 ) ending
(MainThread) joining Thread-3
(Thread-2 ) ending
(Thread-3 ) ending
(MainThread) joining Thread-2
Subclase de hilo¶
Al inicio, un Thread
realiza una inicialización básica y luego llama a su
método run()
, que llama a la función objetivo pasada al constructor. Para
crear una subclase de Thread
, anula run()
para hacer lo que sea
necesario.
import threading
import logging
class MyThread(threading.Thread):
def run(self):
logging.debug('running')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
t = MyThread()
t.start()
El valor de retorno de run()
se ignora.
$ python3 threading_subclass.py
(Thread-1 ) running
(Thread-2 ) running
(Thread-3 ) running
(Thread-4 ) running
(Thread-5 ) running
Porque los valores args
y kwargs
pasados al constructor Thread
se
guardan en variables privadas usando nombres prefijados con '__'
, no se
puede acceder fácilmente desde una subclase. Para pasar argumentos a un tipo
de hilo personalizado, redefine el constructor para guardar los valores en un
atributo de instancia que se pueden ver en la subclase.
import threading
import logging
class MyThreadWithArgs(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
super().__init__(group=group, target=target, name=name,
daemon=daemon)
self.args = args
self.kwargs = kwargs
def run(self):
logging.debug('running with %s and %s',
self.args, self.kwargs)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'})
t.start()
MyThreadWithArgs
usa la misma interfaz de programación que Thread
, pero
otra clase podría cambiar fácilmente el método constructor para tomar más o
diferentes argumentos más directamente relacionados con el propósito del hilo,
como con cualquier otra clase.
$ python3 threading_subclass_args.py
(Thread-1 ) running with (0,) and {'b': 'B', 'a': 'A'}
(Thread-2 ) running with (1,) and {'b': 'B', 'a': 'A'}
(Thread-3 ) running with (2,) and {'b': 'B', 'a': 'A'}
(Thread-4 ) running with (3,) and {'b': 'B', 'a': 'A'}
(Thread-5 ) running with (4,) and {'b': 'B', 'a': 'A'}
Hilos de temporizador¶
Un ejemplo de una razón para una subclase de Thread
es proporcionada por
Timer
, también incluida en threading
. Un Timer
comienza su trabajo
después de un retraso, y puede cancelarse en cualquier punto dentro de ese
período de tiempo de retraso.
import threading
import time
import logging
def delayed():
logging.debug('worker running')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
t1 = threading.Timer(0.3, delayed)
t1.setName('t1')
t2 = threading.Timer(0.3, delayed)
t2.setName('t2')
logging.debug('starting timers')
t1.start()
t2.start()
logging.debug('waiting before canceling %s', t2.getName())
time.sleep(0.2)
logging.debug('canceling %s', t2.getName())
t2.cancel()
logging.debug('done')
El segundo temporizador en este ejemplo nunca se ejecuta, y el primer temporizador parece que se ejecuta después de que el resto del programa principal ha terminado. Ya que es no es un hilo demonio, se une implícitamente cuando el hilo principal ha terminado.
$ python3 threading_timer.py
(MainThread) starting timers
(MainThread) waiting before canceling t2
(MainThread) canceling t2
(MainThread) done
(t1 ) worker running
Señalización Entre Hilos¶
Aunque el punto de usar múltiples hilos es ejecutar separadamente operaciones
al mismo tiempo, hay momentos en que es importante ser capaz de sincronizar las
operaciones en dos o más hilos. Los objetos evento son una forma sencilla de
comunicarse entre hilos de forma segura. Un Event
gestiona una bandera
interna que las personas que llaman pueden controlar con los métodos set()
y clear()
. Otros hilos pueden usar wait()
para pausar hasta que se
establezca la bandera, bloqueando efectivamente el avance hasta que se permita
continuar.
import logging
import threading
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
logging.debug('wait_for_event starting')
event_is_set = e.wait()
logging.debug('event set: %s', event_is_set)
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
while not e.is_set():
logging.debug('wait_for_event_timeout starting')
event_is_set = e.wait(t)
logging.debug('event set: %s', event_is_set)
if event_is_set:
logging.debug('processing event')
else:
logging.debug('doing other work')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
e = threading.Event()
t1 = threading.Thread(
name='block',
target=wait_for_event,
args=(e,),
)
t1.start()
t2 = threading.Thread(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
t2.start()
logging.debug('Waiting before calling Event.set()')
time.sleep(0.3)
e.set()
logging.debug('Event is set')
El método wait_for_event_timeout()
toma un argumento que representa el
número de segundos que el evento espera antes de que se agote el tiempo de
espera. Devuelve un booleano indicando si el evento está configurado o no,
para que la persona que llama sepa por qué wait()
regresó. El método
is_set()
puede ser usado por separado en el evento sin miedo a bloquear.
En este ejemplo, wait_for_event_timeout()
comprueba el estatus del evento
sin bloqueo indefinido. El wait_for_event()
bloquea en la llamada a
wait()
, que no regresa hasta que el estado del evento cambie.
$ python3 threading_event.py
(block ) wait_for_event starting
(nonblock ) wait_for_event_timeout starting
(MainThread) Waiting before calling Event.set()
(MainThread) Event is set
(nonblock ) event set: True
(nonblock ) processing event
(block ) event set: True
Controlar el acceso a los recursos¶
Además de sincronizar las operaciones de los hilos, también es importante poder
controlar el acceso a los recursos compartidos para prevenir corrupción o
pérdida de datos. Estructuras de datos incorporadas de Python (listas,
diccionarios, etc.) son seguras para subprocesos como un efecto secundario de
tener códigos de bytes atómicos para manipularlos (el bloqueo global del
intérprete utilizado para proteger las estructuras de datos internas de Python
no es liberada en el medio de una actualización). Otras estructuras de datos
implementadas en Python, o los tipos más simples como números enteros y de coma
flotante, no tienen esa protección. Para protegerse contra el acceso
simultáneo a un objeto, usa un Objeto Lock
.
import logging
import random
import threading
import time
class Counter:
def __init__(self, start=0):
self.lock = threading.Lock()
self.value = start
def increment(self):
logging.debug('Waiting for lock')
self.lock.acquire()
try:
logging.debug('Acquired lock')
self.value = self.value + 1
finally:
self.lock.release()
def worker(c):
for i in range(2):
pause = random.random()
logging.debug('Sleeping %0.02f', pause)
time.sleep(pause)
c.increment()
logging.debug('Done')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
counter = Counter()
for i in range(2):
t = threading.Thread(target=worker, args=(counter,))
t.start()
logging.debug('Waiting for worker threads')
main_thread = threading.main_thread()
for t in threading.enumerate():
if t is not main_thread:
t.join()
logging.debug('Counter: %d', counter.value)
En este ejemplo, la función worker()
incrementa la instancia Counter
,
que administra un Lock
para evitar que dos hilos de cambien su estado
interno al mismo tiempo. Si el Lock
no se usó, existe la posibilidad de
perder un cambio al atributo de valor.
$ python3 threading_lock.py
(Thread-1 ) Sleeping 0.18
(Thread-2 ) Sleeping 0.93
(MainThread) Waiting for worker threads
(Thread-1 ) Waiting for lock
(Thread-1 ) Acquired lock
(Thread-1 ) Sleeping 0.11
(Thread-1 ) Waiting for lock
(Thread-1 ) Acquired lock
(Thread-1 ) Done
(Thread-2 ) Waiting for lock
(Thread-2 ) Acquired lock
(Thread-2 ) Sleeping 0.81
(Thread-2 ) Waiting for lock
(Thread-2 ) Acquired lock
(Thread-2 ) Done
(MainThread) Counter: 4
Para averiguar si otro hilo ha adquirido el bloqueo sin sostener el hilo
actual, pasa False
para el argumento block
para acquire()
. En el
siguiente ejemplo, worker()
intenta adquirir el bloqueo tres veces por
separado y cuenta cuántos intentos tiene que hacer para lograrlo. Mientras
tanto, lock_holder()
cicla entre mantener y liberar el bloqueo, con breves
pausas en cada estado utilizado para simular carga.
import logging
import threading
import time
def lock_holder(lock):
logging.debug('Starting')
while True:
lock.acquire()
try:
logging.debug('Holding')
time.sleep(0.5)
finally:
logging.debug('Not holding')
lock.release()
time.sleep(0.5)
def worker(lock):
logging.debug('Starting')
num_tries = 0
num_acquires = 0
while num_acquires < 3:
time.sleep(0.5)
logging.debug('Trying to acquire')
have_it = lock.acquire(0)
try:
num_tries += 1
if have_it:
logging.debug('Iteration %d: Acquired',
num_tries)
num_acquires += 1
else:
logging.debug('Iteration %d: Not acquired',
num_tries)
finally:
if have_it:
lock.release()
logging.debug('Done after %d iterations', num_tries)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
lock = threading.Lock()
holder = threading.Thread(
target=lock_holder,
args=(lock,),
name='LockHolder',
daemon=True,
)
holder.start()
worker = threading.Thread(
target=worker,
args=(lock,),
name='Worker',
)
worker.start()
worker()
necesita más de tres iteraciones para adquirir el bloqueo tres
veces separadas.
$ python3 threading_lock_noblock.py
(LockHolder) Starting
(LockHolder) Holding
(Worker ) Starting
(LockHolder) Not holding
(Worker ) Trying to acquire
(Worker ) Iteration 1: Acquired
(LockHolder) Holding
(Worker ) Trying to acquire
(Worker ) Iteration 2: Not acquired
(LockHolder) Not holding
(Worker ) Trying to acquire
(Worker ) Iteration 3: Acquired
(LockHolder) Holding
(Worker ) Trying to acquire
(Worker ) Iteration 4: Not acquired
(LockHolder) Not holding
(Worker ) Trying to acquire
(Worker ) Iteration 5: Acquired
(Worker ) Done after 5 iterations
Bloqueos reentrantes¶
Los objetos normales Lock
no se pueden adquirir más de una vez, incluso por
el mismo hilo. Esto puede introducir efectos secundarios indeseables si se
accede a una cerradura mediante más de una función en la misma cadena de
llamadas.
import threading
lock = threading.Lock()
print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))
En este caso, la segunda llamada a acquire()
recibe un tiempo de espera
cero para evitar que se bloquee porque se ha obtenido el bloqueo por la primera
llamada.
$ python3 threading_lock_reacquire.py
First try : True
Second try: False
En una situación donde el código separado del mismo hilo necesita «volver a
adquirir» el bloqueo, usa un RLock
en su lugar.
import threading
lock = threading.RLock()
print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))
El único cambio en el código del ejemplo anterior fue la sustitución de
Lock
por RLock
.
$ python3 threading_rlock.py
First try : True
Second try: True
Bloqueos como gestores de contexto¶
Los bloqueos implementan la interfaz de programación del administrador de
contexto y son compatibles con la declaración with
. Usar with
elimina
la necesidad de adquirir y liberar explícitamente el bloqueo.
import threading
import logging
def worker_with(lock):
with lock:
logging.debug('Lock acquired via with')
def worker_no_with(lock):
lock.acquire()
try:
logging.debug('Lock acquired directly')
finally:
lock.release()
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
lock = threading.Lock()
w = threading.Thread(target=worker_with, args=(lock,))
nw = threading.Thread(target=worker_no_with, args=(lock,))
w.start()
nw.start()
Las dos funciones worker_with()
y worker_no_with()
administran el
bloqueo de manera equivalente.
$ python3 threading_lock_with.py
(Thread-1 ) Lock acquired via with
(Thread-2 ) Lock acquired directly
Sincronizar hilos¶
Además de usar Events
, otra forma de sincronizar los hilos son a través del
uso de un objeto Condition
. Porque Condition
utiliza un Lock
, se
puede vincular a un recurso compartido, permitiendo que múltiples hilos esperen
a que el recurso sea actualizado. En este ejemplo, los hilos consumer()
esperan el Condition
que se establezca antes de continuar. El hilo
producer()
es responsable de establecer la condición y notificar a los
otros hilos que pueden continuar.
import logging
import threading
import time
def consumer(cond):
"""wait for the condition and use the resource"""
logging.debug('Starting consumer thread')
with cond:
cond.wait()
logging.debug('Resource is available to consumer')
def producer(cond):
"""set up the resource to be used by the consumer"""
logging.debug('Starting producer thread')
with cond:
logging.debug('Making resource available')
cond.notifyAll()
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s (%(threadName)-2s) %(message)s',
)
condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer,
args=(condition,))
c2 = threading.Thread(name='c2', target=consumer,
args=(condition,))
p = threading.Thread(name='p', target=producer,
args=(condition,))
c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()
Los hilos usan with
para adquirir el bloqueo asociado con la Condition
.
Usando los métodos capture()
y release()
explícitamente también
funcionan.
$ python3 threading_condition.py
2016-07-10 10:45:28,170 (c1) Starting consumer thread
2016-07-10 10:45:28,376 (c2) Starting consumer thread
2016-07-10 10:45:28,581 (p ) Starting producer thread
2016-07-10 10:45:28,581 (p ) Making resource available
2016-07-10 10:45:28,582 (c1) Resource is available to consumer
2016-07-10 10:45:28,582 (c2) Resource is available to consumer
Las barreras son otro mecanismo de sincronización de hilos. Una Barrier
establece un punto de control y todos los hilos participantes bloquean hasta
que todas las «partes» participantes hayan alcanzado ese punto. Permite que
los hilos se inicien por separado y luego se pause hasta que todos están listos
para continuar.
import threading
import time
def worker(barrier):
print(threading.current_thread().name,
'waiting for barrier with {} others'.format(
barrier.n_waiting))
worker_id = barrier.wait()
print(threading.current_thread().name, 'after barrier',
worker_id)
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS)
threads = [
threading.Thread(
name='worker-%s' % i,
target=worker,
args=(barrier,),
)
for i in range(NUM_THREADS)
]
for t in threads:
print(t.name, 'starting')
t.start()
time.sleep(0.1)
for t in threads:
t.join()
En este ejemplo, la Barrier
está configurada para bloquear hasta que tres
hilos estén esperando. Cuando se cumple la condición, todos los los hilos se
liberan más allá del punto de control al mismo tiempo. Los valores de retorno
de wait()
indica el número de la parte que está siendo liberada, y puede
usarse para limitar algunos subprocesos de realizar una acción como limpiar un
recurso compartido.
$ python3 threading_barrier.py
worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-2 after barrier 2
worker-0 after barrier 0
worker-1 after barrier 1
El método abort()
de Barrier
provoca que todos los hilos en espera de
recibir un BrokenBarrierError
. Esto permite a los hilos limpiar si el
procesamiento se detiene mientras estén bloqueados en wait()
.
import threading
import time
def worker(barrier):
print(threading.current_thread().name,
'waiting for barrier with {} others'.format(
barrier.n_waiting))
try:
worker_id = barrier.wait()
except threading.BrokenBarrierError:
print(threading.current_thread().name, 'aborting')
else:
print(threading.current_thread().name, 'after barrier',
worker_id)
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS + 1)
threads = [
threading.Thread(
name='worker-%s' % i,
target=worker,
args=(barrier,),
)
for i in range(NUM_THREADS)
]
for t in threads:
print(t.name, 'starting')
t.start()
time.sleep(0.1)
barrier.abort()
for t in threads:
t.join()
Este ejemplo configura la Barrier
para esperar un hilo participante más que
los que en realidad se inician para que el procesamiento para que todos los
hilos estén bloqueados. La llamada abort()
genera un excepción en cada
hilo bloqueado.
$ python3 threading_barrier_abort.py
worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-0 aborting
worker-2 aborting
worker-1 aborting
Limitar el acceso concurrente a los recursos¶
A veces es útil permitir que más de un trabajador acceda a un recurso a la vez,
mientras que todavía se limita el número total. Por ejemplo, una agrupación de
conexiones podría admitir un número fijo de conexiones simultáneas, o una
aplicación de red podría soportar una número fijo de descargas concurrentes.
Un Semaphore
es una forma para gestionar esas conexiones.
import logging
import random
import threading
import time
class ActivePool:
def __init__(self):
super(ActivePool, self).__init__()
self.active = []
self.lock = threading.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
logging.debug('Running: %s', self.active)
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
logging.debug('Running: %s', self.active)
def worker(s, pool):
logging.debug('Waiting to join the pool')
with s:
name = threading.current_thread().getName()
pool.makeActive(name)
time.sleep(0.1)
pool.makeInactive(name)
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s (%(threadName)-2s) %(message)s',
)
pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
t = threading.Thread(
target=worker,
name=str(i),
args=(s, pool),
)
t.start()
En este ejemplo, la clase ActivePool
simplemente sirve como una manera
conveniente de rastrear qué hilos son capaces de correr en un determinado
momento. Un grupo de recursos real asignaría una conexión o algún otro valor
para el subproceso recién activo, y reclamaría el valor cuando el hilo termine.
Aquí, solo se utiliza para mantener los nombres de los hilos activos para
mostrar que a lo sumo dos se están ejecutando simultáneamente.
$ python3 threading_semaphore.py
2016-07-10 10:45:29,398 (0 ) Waiting to join the pool
2016-07-10 10:45:29,398 (0 ) Running: ['0']
2016-07-10 10:45:29,399 (1 ) Waiting to join the pool
2016-07-10 10:45:29,399 (1 ) Running: ['0', '1']
2016-07-10 10:45:29,399 (2 ) Waiting to join the pool
2016-07-10 10:45:29,399 (3 ) Waiting to join the pool
2016-07-10 10:45:29,501 (1 ) Running: ['0']
2016-07-10 10:45:29,501 (0 ) Running: []
2016-07-10 10:45:29,502 (3 ) Running: ['3']
2016-07-10 10:45:29,502 (2 ) Running: ['3', '2']
2016-07-10 10:45:29,607 (3 ) Running: ['2']
2016-07-10 10:45:29,608 (2 ) Running: []
Datos específicos del hilo¶
Mientras que algunos recursos necesitan estar bloqueados para que múltiples
hilos los puedan usar, otros necesitan estar protegidos para que estén ocultos
de los hilos que no los poseen. La clase local()
crea un objeto capaz de
ocultar valores de la vista en hilos separados.
import random
import threading
import logging
def show_value(data):
try:
val = data.value
except AttributeError:
logging.debug('No value yet')
else:
logging.debug('value=%s', val)
def worker(data):
show_value(data)
data.value = random.randint(1, 100)
show_value(data)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)
for i in range(2):
t = threading.Thread(target=worker, args=(local_data,))
t.start()
El atributo local_data.value
no está presente para ningún hilo hasta que se
establece en ese hilo.
$ python3 threading_local.py
(MainThread) No value yet
(MainThread) value=1000
(Thread-1 ) No value yet
(Thread-1 ) value=33
(Thread-2 ) No value yet
(Thread-2 ) value=74
Para inicializar la configuración para que todos los hilos comiencen con el
mismo valor, usa una subclase y establece los atributos en __init__()
.
import random
import threading
import logging
def show_value(data):
try:
val = data.value
except AttributeError:
logging.debug('No value yet')
else:
logging.debug('value=%s', val)
def worker(data):
show_value(data)
data.value = random.randint(1, 100)
show_value(data)
class MyLocal(threading.local):
def __init__(self, value):
super().__init__()
logging.debug('Initializing %r', self)
self.value = value
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
local_data = MyLocal(1000)
show_value(local_data)
for i in range(2):
t = threading.Thread(target=worker, args=(local_data,))
t.start()
__init__()
se invoca en el mismo objeto (note el valor id()
), una vez
en cada hilo para establecer los valores predeterminados.
$ python3 threading_local_defaults.py
(MainThread) Initializing <__main__.MyLocal object at
0x101c6c288>
(MainThread) value=1000
(Thread-1 ) Initializing <__main__.MyLocal object at
0x101c6c288>
(Thread-1 ) value=1000
(Thread-1 ) value=18
(Thread-2 ) Initializing <__main__.MyLocal object at
0x101c6c288>
(Thread-2 ) value=1000
(Thread-2 ) value=77
Ver también
- Documentación de la biblioteca estándar para threading
- Notas para portar Python 2 a 3 para threading
thread
– Interfaz de programación thread de bajo nivel.Queue
– Cola segura para subprocesos, útil para pasar mensajes entre hilos.multiprocessing
– Una interfaz de programación para trabajar con procesos que refleja la interfaz de programación dethreading
.