concurrent.futures — Administrar grupos de tareas concurrentes

Propósito:Administra fácilmente tareas que se ejecutan simultáneamente y en paralelo.

Los módulos concurrent.futures proporcionan interfaces para ejecutar tareas utilizando grupos de subprocesos o trabajadores de proceso. Las interfaces de programación son las mismas, para que las aplicaciones puedan cambiar entre hilos y procesos con un mínimo cambios

El módulo proporciona dos tipos de clases para interactuar con los grupos. Los ejecutores se utilizan para gestionar grupos de trabajadores, y los futuros se utilizan para gestionar los resultados calculados por los trabajadores. Para utilizar un grupo de trabajadores, una aplicación crea una instancia de clase ejecutora apropiada y luego envía las tareas para que se ejecuten. Cuando cada tarea se inicia, se devuelve una instancia Future. Cuando el resultado de la tarea se necesita, una aplicación puede utilizar el Future para bloquear hasta que el resultado esté disponible. Varias interfaces de programación se proporcionan para que sea conveniente esperar a que se completen las tareas, de manera que los objetos Future no necesitan ser gestionados directamente.

Usar map() con un grupo básico de hilos

El ThreadPoolExecutor maneja un conjunto de hilos de trabajo, pasándoles tareas a medida que estén disponibles para más trabajo. Este ejemplo utiliza map() para producir simultáneamente un conjunto de resultados de una entrada iterable. La tarea usa time.sleep() para pausar un tiempo diferente para demostrar que, independientemente del orden de ejecución de tareas concurrentes, map() siempre devuelve los valores en orden basados en las entradas.

futures_thread_pool_map.py
from concurrent import futures
import threading
import time


def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
results = ex.map(task, range(5, 0, -1))
print('main: unprocessed results {}'.format(results))
print('main: waiting for real results')
real_results = list(results)
print('main: results: {}'.format(real_results))

El valor de retorno de map() es en realidad un tipo especial de iterador que sabe esperar por cada respuesta mientras el programa principal itera sobre él.

$ python3 futures_thread_pool_map.py

main: starting
ThreadPoolExecutor-0_0: sleeping 5
ThreadPoolExecutor-0_1: sleeping 4
main: unprocessed results <generator object
Executor.map.<locals>.result_iterator at 0x103e12780>
main: waiting for real results
ThreadPoolExecutor-0_1: done with 4
ThreadPoolExecutor-0_1: sleeping 3
ThreadPoolExecutor-0_0: done with 5
ThreadPoolExecutor-0_0: sleeping 2
ThreadPoolExecutor-0_0: done with 2
ThreadPoolExecutor-0_0: sleeping 1
ThreadPoolExecutor-0_1: done with 3
ThreadPoolExecutor-0_0: done with 1
main: results: [0.5, 0.4, 0.3, 0.2, 0.1]

Programar tareas individuales

Además de usar map(), es posible programar un tarea individual con un ejecutor utilizando submit(), y usar la instancia Future retornada para esperar los resultados de esa tarea.

futures_thread_pool_submit.py
from concurrent import futures
import threading
import time


def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('main: waiting for results')
result = f.result()
print('main: result: {}'.format(result))
print('main: future after result: {}'.format(f))

El estado de los cambios futuros después de que se completen las tareas y el resultado está disponible.

$ python3 futures_thread_pool_submit.py

main: starting
ThreadPoolExecutor-0_0: sleeping 5
main: future: <Future at 0x1034e1ef0 state=running>
main: waiting for results
ThreadPoolExecutor-0_0: done with 5
main: result: 0.5
main: future after result: <Future at 0x1034e1ef0 state=finished
 returned float>

Esperar por tareas en cualquier orden

Invocar el método result() de los bloques Future hasta que la tarea se completa (ya sea devolviendo un valor o elevando un excepción), o se cancela. Los resultados de múltiples tareas pueden ser accedidos en el orden en que se programaron las tareas usando map(). Si no importa en qué orden se procesen los resultados, usa as_completed() para procesarlos a medida que cada tarea finaliza.

futures_as_completed.py
from concurrent import futures
import random
import time


def task(n):
    time.sleep(random.random())
    return (n, n / 10)


ex = futures.ThreadPoolExecutor(max_workers=5)
print('main: starting')

wait_for = [
    ex.submit(task, i)
    for i in range(5, 0, -1)
]

for f in futures.as_completed(wait_for):
    print('main: result: {}'.format(f.result()))

Debido a que el grupo tiene tantos trabajadores como tareas, todas las tareas pueden ser empezadas. Terminan en orden aleatorio por lo que los valores generados por as_completed() son diferentes cada vez que se ejecuta el ejemplo.

$ python3 futures_as_completed.py

main: starting
main: result: (1, 0.1)
main: result: (5, 0.5)
main: result: (3, 0.3)
main: result: (2, 0.2)
main: result: (4, 0.4)

Devoluciones de llamada futuras

Para realizar alguna acción cuando se complete una tarea, sin esperar explícitamente por el resultado, usa add_done_callback() para especificar una nueva función para llamar cuando el Future esté listo. La devolución de llamada debe ser invocable tomando un solo argumento, la instancia Future.

futures_future_callback.py
from concurrent import futures
import time


def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        error = fn.exception()
        if error:
            print('{}: error returned: {}'.format(
                fn.arg, error))
        else:
            result = fn.result()
            print('{}: value returned: {}'.format(
                fn.arg, result))


if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    f = ex.submit(task, 5)
    f.arg = 5
    f.add_done_callback(done)
    result = f.result()

La devolución de llamada se invoca independientemente de la razón por la que Future se considera «listo», por lo que es necesario verificar el estado del objeto pasado a la devolución de llamada antes de usarlo de cualquier manera.

$ python3 futures_future_callback.py

main: starting
5: sleeping
5: done
5: value returned: 0.5

Cancelar tareas

Un Future puede ser cancelado, si ha sido enviado pero no comenzó, llamando a su método cancel().

futures_future_callback_cancel.py
from concurrent import futures
import time


def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        print('{}: not canceled'.format(fn.arg))


if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    tasks = []

    for i in range(10, 0, -1):
        print('main: submitting {}'.format(i))
        f = ex.submit(task, i)
        f.arg = i
        f.add_done_callback(done)
        tasks.append((i, f))

    for i, t in reversed(tasks):
        if not t.cancel():
            print('main: did not cancel {}'.format(i))

    ex.shutdown()

cancel() devuelve un valor booleano que indica si la tarea se pudo cancelar o no.

$ python3 futures_future_callback_cancel.py

main: starting
main: submitting 10
10: sleeping
main: submitting 9
9: sleeping
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
1: canceled
2: canceled
3: canceled
4: canceled
5: canceled
6: canceled
7: canceled
8: canceled
main: did not cancel 9
main: did not cancel 10
10: done
10: not canceled
9: done
9: not canceled

Excepciones en tareas

Si una tarea genera una excepción no controlada, se guarda en el Future para la tarea y se hace disponible a través de los métodos result() o exception().

futures_future_exception.py
from concurrent import futures


def task(n):
    print('{}: starting'.format(n))
    raise ValueError('the value {} is no good'.format(n))


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)

error = f.exception()
print('main: error: {}'.format(error))

try:
    result = f.result()
except ValueError as e:
    print('main: saw error "{}" when accessing result'.format(e))

Si se llama a result() después de que se genera una excepción no manejada dentro de una función de tarea, la misma excepción se re-plantea en el contexto actual.

$ python3 futures_future_exception.py

main: starting
5: starting
main: error: the value 5 is no good
main: saw error "the value 5 is no good" when accessing result

Gestor de contexto

Los ejecutores trabajan como administradores de contexto, ejecutando tareas simultáneamente y esperando que todas se completen. Cuando el administrador de contexto termina, el método shutdown() del ejecutor es llamado.

futures_context_manager.py
from concurrent import futures


def task(n):
    print(n)


with futures.ThreadPoolExecutor(max_workers=2) as ex:
    print('main: starting')
    ex.submit(task, 1)
    ex.submit(task, 2)
    ex.submit(task, 3)
    ex.submit(task, 4)

print('main: done')

Este modo de usar el ejecutor es útil cuando el hilo o los recursos del proceso deben ser limpiados cuando la ejecución deja el alcance actual.

$ python3 futures_context_manager.py

main: starting
1
2
3
4
main: done

Grupos de procesos

El ProcessPoolExecutor funciona de la misma manera que ThreadPoolExecutor, pero usa procesos en lugar de hilos. Esto permite que las operaciones intensivas de CPU usen un CPU separado y no sean bloqueadas por el bloqueo global del intérprete de CPython.

futures_process_pool_map.py
from concurrent import futures
import os


def task(n):
    return (n, os.getpid())


ex = futures.ProcessPoolExecutor(max_workers=2)
results = ex.map(task, range(5, 0, -1))
for n, pid in results:
    print('ran task {} in process {}'.format(n, pid))

Al igual que con el grupo de hilos, los procesos de trabajo individuales se reutilizan para múltiples tareas.

$ python3 futures_process_pool_map.py

ran task 5 in process 40854
ran task 4 in process 40854
ran task 3 in process 40854
ran task 2 in process 40854
ran task 1 in process 40854

Si algo le sucede a uno de los procesos de trabajo para hacer que salir inesperadamente, el ProcessPoolExecutor es considerado «roto» y ya no programará tareas.

futures_process_pool_broken.py
from concurrent import futures
import os
import signal


with futures.ProcessPoolExecutor(max_workers=2) as ex:
    print('getting the pid for one worker')
    f1 = ex.submit(os.getpid)
    pid1 = f1.result()

    print('killing process {}'.format(pid1))
    os.kill(pid1, signal.SIGHUP)

    print('submitting another task')
    f2 = ex.submit(os.getpid)
    try:
        pid2 = f2.result()
    except futures.process.BrokenProcessPool as e:
        print('could not start new tasks: {}'.format(e))

La excepción BrokenProcessPool se lanza realmente cuando los resultados se procesan, en lugar de cuando se envía la nueva tarea.

$ python3 futures_process_pool_broken.py

getting the pid for one worker
killing process 40858
submitting another task
could not start new tasks: A process in the process pool was
terminated abruptly while the future was running or pending.

Ver también