threading — Gestionar operaciones concurrentes dentro de un proceso

Propósito:Gestionar varios hilos de ejecución.

Usar hilos permite que un programa ejecute múltiples operaciones simultáneamente en el mismo espacio de proceso.

Objetos Thread

La forma más sencilla de usar un Thread es crear una instancia con un función de destino y llamar a start() para que comience a funcionar.

threading_simple.py
import threading


def worker():
    """thread worker function"""
    print('Worker')


threads = []
for i in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()

La salida son cinco líneas con "Worker" en cada una.

$ python3 threading_simple.py

Worker
Worker
Worker
Worker
Worker

Es útil poder generar un hilo y pasarle argumentos para decirle que trabajo hacer. Cualquier tipo de objeto puede ser pasado como argumento al hilo. Este ejemplo pasa un número, que luego el hilo imprime.

threading_simpleargs.py
import threading


def worker(num):
    """thread worker function"""
    print('Worker: %s' % num)


threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

El argumento entero ahora está incluido en el mensaje impreso por cada hilo.

$ python3 threading_simpleargs.py

Worker: 0
Worker: 1
Worker: 2
Worker: 3
Worker: 4

Determinar el hilo actual

Usar argumentos para identificar o nombrar el hilo es engorroso e innecesario. Cada instancia Thread tiene un nombre con un valor predeterminado que se puede cambiar cuando se crea el hilo. Nombrar hilos es útil en procesos de servidor con múltiples hilos de servicio manejando diferentes operaciones.

threading_names.py
import threading
import time


def worker():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.2)
    print(threading.current_thread().getName(), 'Exiting')


def my_service():
    print(threading.current_thread().getName(), 'Starting')
    time.sleep(0.3)
    print(threading.current_thread().getName(), 'Exiting')


t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker)  # use default name

w.start()
w2.start()
t.start()

La salida de depuración incluye el nombre del hilo actual en cada línea. Las líneas con "Thread-1" en la columna de nombre de hilo corresponden al hilo sin nombre w2.

$ python3 threading_names.py

worker Starting
Thread-1 Starting
my_service Starting
worker Exiting
Thread-1 Exiting
my_service Exiting

La mayoría de los programas no usan print para depurar. El módulo logging admite incrustar el nombre del hilo en cada mensaje de registro usando el código del formateador %(threadName)s. Incluir los nombres de hilo en los mensajes de registro hacen posible rastrear esos mensajes hasta su origen.

threading_names_log.py
import logging
import threading
import time


def worker():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')


def my_service():
    logging.debug('Starting')
    time.sleep(0.3)
    logging.debug('Exiting')


logging.basicConfig(
    level=logging.DEBUG,
    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
)

t = threading.Thread(name='my_service', target=my_service)
w = threading.Thread(name='worker', target=worker)
w2 = threading.Thread(target=worker)  # use default name

w.start()
w2.start()
t.start()

logging también es seguro para subprocesos, por lo que los mensajes de diferentes subprocesos se mantienen distintos en la salida.

$ python3 threading_names_log.py

[DEBUG] (worker    ) Starting
[DEBUG] (Thread-1  ) Starting
[DEBUG] (my_service) Starting
[DEBUG] (worker    ) Exiting
[DEBUG] (Thread-1  ) Exiting
[DEBUG] (my_service) Exiting

Hilos de Daemon vs. No-Daemon

Hasta este punto, los programas de ejemplo han esperado implícitamente para salir hasta que todos los hilos hayan completado su trabajo. A veces los programas generan un hilo como un demonio que se ejecuta sin bloquear el programa principal de salir. El uso de hilos de demonio es útil para servicios donde puede que no haya una manera fácil de interrumpir el hilo, o donde dejar que el el hilo muera en medio de su trabajo, no pierde ni corrompe los datos (por ejemplo, un hilo que genera «latidos del corazón» para una herramienta de monitoreo de servicio). Para marcar un hilo como demonio, pasa daemon=True al construirlo o llama a su método set_daemon() con True. El valor predeterminado es que los subprocesos no sean demonios.

threading_daemon.py
import threading
import time
import logging


def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')


def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='daemon', target=daemon, daemon=True)

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

La salida no incluye el mensaje "Exiting" del hilo demonio, ya que todos los hilos no demonio (incluyendo el hilo principal) terminan antes de que el hilo demonio se despierte de la llamada sleep().

$ python3 threading_daemon.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting

Para esperar hasta que un subproceso demonio haya completado su trabajo, usa el método join().

threading_daemon_join.py
import threading
import time
import logging


def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')


def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='daemon', target=daemon, daemon=True)

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join()
t.join()

Esperar que el hilo demonio termine usando join() significa que tiene la oportunidad de producir su mensaje "Exiting".

$ python3 threading_daemon_join.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
(daemon    ) Exiting

Por defecto, join() bloquea indefinidamente. También es posible pasar un valor flotante que represente el número de segundos a esperar por el hilo para convertirse en inactivo. Si el hilo no se completa dentro del período de tiempo de espera, join() retorna de todos modos.

threading_daemon_join_timeout.py
import threading
import time
import logging


def daemon():
    logging.debug('Starting')
    time.sleep(0.2)
    logging.debug('Exiting')


def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='daemon', target=daemon, daemon=True)

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join(0.1)
print('d.isAlive()', d.isAlive())
t.join()

Dado que el tiempo de espera transcurrido es menor que la cantidad de tiempo que el hilo demonio duerme, el hilo sigue «vivo» después de join() retorna.

$ python3 threading_daemon_join_timeout.py

(daemon    ) Starting
(non-daemon) Starting
(non-daemon) Exiting
d.isAlive() True

Enumerar todos los hilos

No es necesario conservar un identificador explícito para todos los hilos demonio para asegurar que se han completado antes de salir del proceso principal. enumerate() devuelve una lista de instancia de Thread activas. La lista incluye el hilo actual, y ya que se unir al hilo actual introduce una situación de interbloqueo, se debe omitir.

threading_enumerate.py
import random
import threading
import time
import logging


def worker():
    """thread worker function"""
    pause = random.randint(1, 5) / 10
    logging.debug('sleeping %0.2f', pause)
    time.sleep(pause)
    logging.debug('ending')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(3):
    t = threading.Thread(target=worker, daemon=True)
    t.start()

main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is main_thread:
        continue
    logging.debug('joining %s', t.getName())
    t.join()

Debido a que el trabajador está durmiendo durante un período de tiempo aleatorio, la salida de este programa puede variar.

$ python3 threading_enumerate.py

(Thread-1  ) sleeping 0.20
(Thread-2  ) sleeping 0.30
(Thread-3  ) sleeping 0.40
(MainThread) joining Thread-1
(Thread-1  ) ending
(MainThread) joining Thread-3
(Thread-2  ) ending
(Thread-3  ) ending
(MainThread) joining Thread-2

Subclase de hilo

Al inicio, un Thread realiza una inicialización básica y luego llama a su método run(), que llama a la función objetivo pasada al constructor. Para crear una subclase de Thread, anula run() para hacer lo que sea necesario.

threading_subclass.py
import threading
import logging


class MyThread(threading.Thread):

    def run(self):
        logging.debug('running')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(5):
    t = MyThread()
    t.start()

El valor de retorno de run() se ignora.

$ python3 threading_subclass.py

(Thread-1  ) running
(Thread-2  ) running
(Thread-3  ) running
(Thread-4  ) running
(Thread-5  ) running

Porque los valores args y kwargs pasados al constructor Thread se guardan en variables privadas usando nombres prefijados con '__', no se puede acceder fácilmente desde una subclase. Para pasar argumentos a un tipo de hilo personalizado, redefine el constructor para guardar los valores en un atributo de instancia que se pueden ver en la subclase.

threading_subclass_args.py
import threading
import logging


class MyThreadWithArgs(threading.Thread):

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        super().__init__(group=group, target=target, name=name,
                         daemon=daemon)
        self.args = args
        self.kwargs = kwargs

    def run(self):
        logging.debug('running with %s and %s',
                      self.args, self.kwargs)


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(5):
    t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'})
    t.start()

MyThreadWithArgs usa la misma interfaz de programación que Thread, pero otra clase podría cambiar fácilmente el método constructor para tomar más o diferentes argumentos más directamente relacionados con el propósito del hilo, como con cualquier otra clase.

$ python3 threading_subclass_args.py

(Thread-1  ) running with (0,) and {'b': 'B', 'a': 'A'}
(Thread-2  ) running with (1,) and {'b': 'B', 'a': 'A'}
(Thread-3  ) running with (2,) and {'b': 'B', 'a': 'A'}
(Thread-4  ) running with (3,) and {'b': 'B', 'a': 'A'}
(Thread-5  ) running with (4,) and {'b': 'B', 'a': 'A'}

Hilos de temporizador

Un ejemplo de una razón para una subclase de Thread es proporcionada por Timer, también incluida en threading. Un Timer comienza su trabajo después de un retraso, y puede cancelarse en cualquier punto dentro de ese período de tiempo de retraso.

threading_timer.py
import threading
import time
import logging


def delayed():
    logging.debug('worker running')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

t1 = threading.Timer(0.3, delayed)
t1.setName('t1')
t2 = threading.Timer(0.3, delayed)
t2.setName('t2')

logging.debug('starting timers')
t1.start()
t2.start()

logging.debug('waiting before canceling %s', t2.getName())
time.sleep(0.2)
logging.debug('canceling %s', t2.getName())
t2.cancel()
logging.debug('done')

El segundo temporizador en este ejemplo nunca se ejecuta, y el primer temporizador parece que se ejecuta después de que el resto del programa principal ha terminado. Ya que es no es un hilo demonio, se une implícitamente cuando el hilo principal ha terminado.

$ python3 threading_timer.py

(MainThread) starting timers
(MainThread) waiting before canceling t2
(MainThread) canceling t2
(MainThread) done
(t1        ) worker running

Señalización Entre Hilos

Aunque el punto de usar múltiples hilos es ejecutar separadamente operaciones al mismo tiempo, hay momentos en que es importante ser capaz de sincronizar las operaciones en dos o más hilos. Los objetos evento son una forma sencilla de comunicarse entre hilos de forma segura. Un Event gestiona una bandera interna que las personas que llaman pueden controlar con los métodos set() y clear(). Otros hilos pueden usar wait() para pausar hasta que se establezca la bandera, bloqueando efectivamente el avance hasta que se permita continuar.

threading_event.py
import logging
import threading
import time


def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('event set: %s', event_is_set)


def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.is_set():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

e = threading.Event()
t1 = threading.Thread(
    name='block',
    target=wait_for_event,
    args=(e,),
)
t1.start()

t2 = threading.Thread(
    name='nonblock',
    target=wait_for_event_timeout,
    args=(e, 2),
)
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(0.3)
e.set()
logging.debug('Event is set')

El método wait_for_event_timeout() toma un argumento que representa el número de segundos que el evento espera antes de que se agote el tiempo de espera. Devuelve un booleano indicando si el evento está configurado o no, para que la persona que llama sepa por qué wait() regresó. El método is_set() puede ser usado por separado en el evento sin miedo a bloquear.

En este ejemplo, wait_for_event_timeout() comprueba el estatus del evento sin bloqueo indefinido. El wait_for_event() bloquea en la llamada a wait(), que no regresa hasta que el estado del evento cambie.

$ python3 threading_event.py

(block     ) wait_for_event starting
(nonblock  ) wait_for_event_timeout starting
(MainThread) Waiting before calling Event.set()
(MainThread) Event is set
(nonblock  ) event set: True
(nonblock  ) processing event
(block     ) event set: True

Controlar el acceso a los recursos

Además de sincronizar las operaciones de los hilos, también es importante poder controlar el acceso a los recursos compartidos para prevenir corrupción o pérdida de datos. Estructuras de datos incorporadas de Python (listas, diccionarios, etc.) son seguras para subprocesos como un efecto secundario de tener códigos de bytes atómicos para manipularlos (el bloqueo global del intérprete utilizado para proteger las estructuras de datos internas de Python no es liberada en el medio de una actualización). Otras estructuras de datos implementadas en Python, o los tipos más simples como números enteros y de coma flotante, no tienen esa protección. Para protegerse contra el acceso simultáneo a un objeto, usa un Objeto Lock.

threading_lock.py
import logging
import random
import threading
import time


class Counter:

    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start

    def increment(self):
        logging.debug('Waiting for lock')
        self.lock.acquire()
        try:
            logging.debug('Acquired lock')
            self.value = self.value + 1
        finally:
            self.lock.release()


def worker(c):
    for i in range(2):
        pause = random.random()
        logging.debug('Sleeping %0.02f', pause)
        time.sleep(pause)
        c.increment()
    logging.debug('Done')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

counter = Counter()
for i in range(2):
    t = threading.Thread(target=worker, args=(counter,))
    t.start()

logging.debug('Waiting for worker threads')
main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()
logging.debug('Counter: %d', counter.value)

En este ejemplo, la función worker() incrementa la instancia Counter, que administra un Lock para evitar que dos hilos de cambien su estado interno al mismo tiempo. Si el Lock no se usó, existe la posibilidad de perder un cambio al atributo de valor.

$ python3 threading_lock.py

(Thread-1  ) Sleeping 0.18
(Thread-2  ) Sleeping 0.93
(MainThread) Waiting for worker threads
(Thread-1  ) Waiting for lock
(Thread-1  ) Acquired lock
(Thread-1  ) Sleeping 0.11
(Thread-1  ) Waiting for lock
(Thread-1  ) Acquired lock
(Thread-1  ) Done
(Thread-2  ) Waiting for lock
(Thread-2  ) Acquired lock
(Thread-2  ) Sleeping 0.81
(Thread-2  ) Waiting for lock
(Thread-2  ) Acquired lock
(Thread-2  ) Done
(MainThread) Counter: 4

Para averiguar si otro hilo ha adquirido el bloqueo sin sostener el hilo actual, pasa False para el argumento block para acquire(). En el siguiente ejemplo, worker() intenta adquirir el bloqueo tres veces por separado y cuenta cuántos intentos tiene que hacer para lograrlo. Mientras tanto, lock_holder() cicla entre mantener y liberar el bloqueo, con breves pausas en cada estado utilizado para simular carga.

threading_lock_noblock.py
import logging
import threading
import time


def lock_holder(lock):
    logging.debug('Starting')
    while True:
        lock.acquire()
        try:
            logging.debug('Holding')
            time.sleep(0.5)
        finally:
            logging.debug('Not holding')
            lock.release()
        time.sleep(0.5)


def worker(lock):
    logging.debug('Starting')
    num_tries = 0
    num_acquires = 0
    while num_acquires < 3:
        time.sleep(0.5)
        logging.debug('Trying to acquire')
        have_it = lock.acquire(0)
        try:
            num_tries += 1
            if have_it:
                logging.debug('Iteration %d: Acquired',
                              num_tries)
                num_acquires += 1
            else:
                logging.debug('Iteration %d: Not acquired',
                              num_tries)
        finally:
            if have_it:
                lock.release()
    logging.debug('Done after %d iterations', num_tries)


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

lock = threading.Lock()

holder = threading.Thread(
    target=lock_holder,
    args=(lock,),
    name='LockHolder',
    daemon=True,
)
holder.start()

worker = threading.Thread(
    target=worker,
    args=(lock,),
    name='Worker',
)
worker.start()

worker() necesita más de tres iteraciones para adquirir el bloqueo tres veces separadas.

$ python3 threading_lock_noblock.py

(LockHolder) Starting
(LockHolder) Holding
(Worker    ) Starting
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 1: Acquired
(LockHolder) Holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 2: Not acquired
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 3: Acquired
(LockHolder) Holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 4: Not acquired
(LockHolder) Not holding
(Worker    ) Trying to acquire
(Worker    ) Iteration 5: Acquired
(Worker    ) Done after 5 iterations

Bloqueos reentrantes

Los objetos normales Lock no se pueden adquirir más de una vez, incluso por el mismo hilo. Esto puede introducir efectos secundarios indeseables si se accede a una cerradura mediante más de una función en la misma cadena de llamadas.

threading_lock_reacquire.py
import threading

lock = threading.Lock()

print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))

En este caso, la segunda llamada a acquire() recibe un tiempo de espera cero para evitar que se bloquee porque se ha obtenido el bloqueo por la primera llamada.

$ python3 threading_lock_reacquire.py

First try : True
Second try: False

En una situación donde el código separado del mismo hilo necesita «volver a adquirir» el bloqueo, usa un RLock en su lugar.

threading_rlock.py
import threading

lock = threading.RLock()

print('First try :', lock.acquire())
print('Second try:', lock.acquire(0))

El único cambio en el código del ejemplo anterior fue la sustitución de Lock por RLock.

$ python3 threading_rlock.py

First try : True
Second try: True

Bloqueos como gestores de contexto

Los bloqueos implementan la interfaz de programación del administrador de contexto y son compatibles con la declaración with. Usar with elimina la necesidad de adquirir y liberar explícitamente el bloqueo.

threading_lock_with.py
import threading
import logging


def worker_with(lock):
    with lock:
        logging.debug('Lock acquired via with')


def worker_no_with(lock):
    lock.acquire()
    try:
        logging.debug('Lock acquired directly')
    finally:
        lock.release()


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

lock = threading.Lock()
w = threading.Thread(target=worker_with, args=(lock,))
nw = threading.Thread(target=worker_no_with, args=(lock,))

w.start()
nw.start()

Las dos funciones worker_with() y worker_no_with() administran el bloqueo de manera equivalente.

$ python3 threading_lock_with.py

(Thread-1  ) Lock acquired via with
(Thread-2  ) Lock acquired directly

Sincronizar hilos

Además de usar Events, otra forma de sincronizar los hilos son a través del uso de un objeto Condition. Porque Condition utiliza un Lock, se puede vincular a un recurso compartido, permitiendo que múltiples hilos esperen a que el recurso sea actualizado. En este ejemplo, los hilos consumer() esperan el Condition que se establezca antes de continuar. El hilo producer() es responsable de establecer la condición y notificar a los otros hilos que pueden continuar.

threading_condition.py
import logging
import threading
import time


def consumer(cond):
    """wait for the condition and use the resource"""
    logging.debug('Starting consumer thread')
    with cond:
        cond.wait()
        logging.debug('Resource is available to consumer')


def producer(cond):
    """set up the resource to be used by the consumer"""
    logging.debug('Starting producer thread')
    with cond:
        logging.debug('Making resource available')
        cond.notifyAll()


logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer,
                      args=(condition,))
c2 = threading.Thread(name='c2', target=consumer,
                      args=(condition,))
p = threading.Thread(name='p', target=producer,
                     args=(condition,))

c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()

Los hilos usan with para adquirir el bloqueo asociado con la Condition. Usando los métodos capture() y release() explícitamente también funcionan.

$ python3 threading_condition.py

2016-07-10 10:45:28,170 (c1) Starting consumer thread
2016-07-10 10:45:28,376 (c2) Starting consumer thread
2016-07-10 10:45:28,581 (p ) Starting producer thread
2016-07-10 10:45:28,581 (p ) Making resource available
2016-07-10 10:45:28,582 (c1) Resource is available to consumer
2016-07-10 10:45:28,582 (c2) Resource is available to consumer

Las barreras son otro mecanismo de sincronización de hilos. Una Barrier establece un punto de control y todos los hilos participantes bloquean hasta que todas las «partes» participantes hayan alcanzado ese punto. Permite que los hilos se inicien por separado y luego se pause hasta que todos están listos para continuar.

threading_barrier.py
import threading
import time


def worker(barrier):
    print(threading.current_thread().name,
          'waiting for barrier with {} others'.format(
              barrier.n_waiting))
    worker_id = barrier.wait()
    print(threading.current_thread().name, 'after barrier',
          worker_id)


NUM_THREADS = 3

barrier = threading.Barrier(NUM_THREADS)

threads = [
    threading.Thread(
        name='worker-%s' % i,
        target=worker,
        args=(barrier,),
    )
    for i in range(NUM_THREADS)
]

for t in threads:
    print(t.name, 'starting')
    t.start()
    time.sleep(0.1)

for t in threads:
    t.join()

En este ejemplo, la Barrier está configurada para bloquear hasta que tres hilos estén esperando. Cuando se cumple la condición, todos los los hilos se liberan más allá del punto de control al mismo tiempo. Los valores de retorno de wait() indica el número de la parte que está siendo liberada, y puede usarse para limitar algunos subprocesos de realizar una acción como limpiar un recurso compartido.

$ python3 threading_barrier.py

worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-2 after barrier 2
worker-0 after barrier 0
worker-1 after barrier 1

El método abort() de Barrier provoca que todos los hilos en espera de recibir un BrokenBarrierError. Esto permite a los hilos limpiar si el procesamiento se detiene mientras estén bloqueados en wait().

threading_barrier_abort.py
import threading
import time


def worker(barrier):
    print(threading.current_thread().name,
          'waiting for barrier with {} others'.format(
              barrier.n_waiting))
    try:
        worker_id = barrier.wait()
    except threading.BrokenBarrierError:
        print(threading.current_thread().name, 'aborting')
    else:
        print(threading.current_thread().name, 'after barrier',
              worker_id)


NUM_THREADS = 3

barrier = threading.Barrier(NUM_THREADS + 1)

threads = [
    threading.Thread(
        name='worker-%s' % i,
        target=worker,
        args=(barrier,),
    )
    for i in range(NUM_THREADS)
]

for t in threads:
    print(t.name, 'starting')
    t.start()
    time.sleep(0.1)

barrier.abort()

for t in threads:
    t.join()

Este ejemplo configura la Barrier para esperar un hilo participante más que los que en realidad se inician para que el procesamiento para que todos los hilos estén bloqueados. La llamada abort() genera un excepción en cada hilo bloqueado.

$ python3 threading_barrier_abort.py

worker-0 starting
worker-0 waiting for barrier with 0 others
worker-1 starting
worker-1 waiting for barrier with 1 others
worker-2 starting
worker-2 waiting for barrier with 2 others
worker-0 aborting
worker-2 aborting
worker-1 aborting

Limitar el acceso concurrente a los recursos

A veces es útil permitir que más de un trabajador acceda a un recurso a la vez, mientras que todavía se limita el número total. Por ejemplo, una agrupación de conexiones podría admitir un número fijo de conexiones simultáneas, o una aplicación de red podría soportar una número fijo de descargas concurrentes. Un Semaphore es una forma para gestionar esas conexiones.

threading_semaphore.py
import logging
import random
import threading
import time


class ActivePool:

    def __init__(self):
        super(ActivePool, self).__init__()
        self.active = []
        self.lock = threading.Lock()

    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
            logging.debug('Running: %s', self.active)

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
            logging.debug('Running: %s', self.active)


def worker(s, pool):
    logging.debug('Waiting to join the pool')
    with s:
        name = threading.current_thread().getName()
        pool.makeActive(name)
        time.sleep(0.1)
        pool.makeInactive(name)


logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
    t = threading.Thread(
        target=worker,
        name=str(i),
        args=(s, pool),
    )
    t.start()

En este ejemplo, la clase ActivePool simplemente sirve como una manera conveniente de rastrear qué hilos son capaces de correr en un determinado momento. Un grupo de recursos real asignaría una conexión o algún otro valor para el subproceso recién activo, y reclamaría el valor cuando el hilo termine. Aquí, solo se utiliza para mantener los nombres de los hilos activos para mostrar que a lo sumo dos se están ejecutando simultáneamente.

$ python3 threading_semaphore.py

2016-07-10 10:45:29,398 (0 ) Waiting to join the pool
2016-07-10 10:45:29,398 (0 ) Running: ['0']
2016-07-10 10:45:29,399 (1 ) Waiting to join the pool
2016-07-10 10:45:29,399 (1 ) Running: ['0', '1']
2016-07-10 10:45:29,399 (2 ) Waiting to join the pool
2016-07-10 10:45:29,399 (3 ) Waiting to join the pool
2016-07-10 10:45:29,501 (1 ) Running: ['0']
2016-07-10 10:45:29,501 (0 ) Running: []
2016-07-10 10:45:29,502 (3 ) Running: ['3']
2016-07-10 10:45:29,502 (2 ) Running: ['3', '2']
2016-07-10 10:45:29,607 (3 ) Running: ['2']
2016-07-10 10:45:29,608 (2 ) Running: []

Datos específicos del hilo

Mientras que algunos recursos necesitan estar bloqueados para que múltiples hilos los puedan usar, otros necesitan estar protegidos para que estén ocultos de los hilos que no los poseen. La clase local() crea un objeto capaz de ocultar valores de la vista en hilos separados.

threading_local.py
import random
import threading
import logging


def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)


def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

El atributo local_data.value no está presente para ningún hilo hasta que se establece en ese hilo.

$ python3 threading_local.py

(MainThread) No value yet
(MainThread) value=1000
(Thread-1  ) No value yet
(Thread-1  ) value=33
(Thread-2  ) No value yet
(Thread-2  ) value=74

Para inicializar la configuración para que todos los hilos comiencen con el mismo valor, usa una subclase y establece los atributos en __init__().

threading_local_defaults.py
import random
import threading
import logging


def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)


def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)


class MyLocal(threading.local):

    def __init__(self, value):
        super().__init__()
        logging.debug('Initializing %r', self)
        self.value = value


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

local_data = MyLocal(1000)
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

__init__() se invoca en el mismo objeto (note el valor id()), una vez en cada hilo para establecer los valores predeterminados.

$ python3 threading_local_defaults.py

(MainThread) Initializing <__main__.MyLocal object at
0x101c6c288>
(MainThread) value=1000
(Thread-1  ) Initializing <__main__.MyLocal object at
0x101c6c288>
(Thread-1  ) value=1000
(Thread-1  ) value=18
(Thread-2  ) Initializing <__main__.MyLocal object at
0x101c6c288>
(Thread-2  ) value=1000
(Thread-2  ) value=77

Ver también