concurrent.futures — Administrar grupos de tareas concurrentes¶
Propósito: | Administra fácilmente tareas que se ejecutan simultáneamente y en paralelo. |
---|
Los módulos concurrent.futures
proporcionan interfaces para ejecutar tareas
utilizando grupos de subprocesos o trabajadores de proceso. Las interfaces de
programación son las mismas, para que las aplicaciones puedan cambiar entre
hilos y procesos con un mínimo cambios
El módulo proporciona dos tipos de clases para interactuar con los grupos. Los
ejecutores se utilizan para gestionar grupos de trabajadores, y los futuros
se utilizan para gestionar los resultados calculados por los trabajadores.
Para utilizar un grupo de trabajadores, una aplicación crea una instancia de
clase ejecutora apropiada y luego envía las tareas para que se ejecuten.
Cuando cada tarea se inicia, se devuelve una instancia Future
. Cuando el
resultado de la tarea se necesita, una aplicación puede utilizar el Future
para bloquear hasta que el resultado esté disponible. Varias interfaces de
programación se proporcionan para que sea conveniente esperar a que se
completen las tareas, de manera que los objetos Future
no necesitan ser
gestionados directamente.
Usar map() con un grupo básico de hilos¶
El ThreadPoolExecutor
maneja un conjunto de hilos de trabajo, pasándoles
tareas a medida que estén disponibles para más trabajo. Este ejemplo utiliza
map()
para producir simultáneamente un conjunto de resultados de una
entrada iterable. La tarea usa time.sleep()
para pausar un tiempo
diferente para demostrar que, independientemente del orden de ejecución de
tareas concurrentes, map()
siempre devuelve los valores en orden basados en
las entradas.
from concurrent import futures
import threading
import time
def task(n):
print('{}: sleeping {}'.format(
threading.current_thread().name,
n)
)
time.sleep(n / 10)
print('{}: done with {}'.format(
threading.current_thread().name,
n)
)
return n / 10
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
results = ex.map(task, range(5, 0, -1))
print('main: unprocessed results {}'.format(results))
print('main: waiting for real results')
real_results = list(results)
print('main: results: {}'.format(real_results))
El valor de retorno de map()
es en realidad un tipo especial de iterador
que sabe esperar por cada respuesta mientras el programa principal itera sobre
él.
$ python3 futures_thread_pool_map.py
main: starting
ThreadPoolExecutor-0_0: sleeping 5
ThreadPoolExecutor-0_1: sleeping 4
main: unprocessed results <generator object
Executor.map.<locals>.result_iterator at 0x103e12780>
main: waiting for real results
ThreadPoolExecutor-0_1: done with 4
ThreadPoolExecutor-0_1: sleeping 3
ThreadPoolExecutor-0_0: done with 5
ThreadPoolExecutor-0_0: sleeping 2
ThreadPoolExecutor-0_0: done with 2
ThreadPoolExecutor-0_0: sleeping 1
ThreadPoolExecutor-0_1: done with 3
ThreadPoolExecutor-0_0: done with 1
main: results: [0.5, 0.4, 0.3, 0.2, 0.1]
Programar tareas individuales¶
Además de usar map()
, es posible programar un tarea individual con un
ejecutor utilizando submit()
, y usar la instancia Future
retornada para
esperar los resultados de esa tarea.
from concurrent import futures
import threading
import time
def task(n):
print('{}: sleeping {}'.format(
threading.current_thread().name,
n)
)
time.sleep(n / 10)
print('{}: done with {}'.format(
threading.current_thread().name,
n)
)
return n / 10
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('main: waiting for results')
result = f.result()
print('main: result: {}'.format(result))
print('main: future after result: {}'.format(f))
El estado de los cambios futuros después de que se completen las tareas y el resultado está disponible.
$ python3 futures_thread_pool_submit.py
main: starting
ThreadPoolExecutor-0_0: sleeping 5
main: future: <Future at 0x1034e1ef0 state=running>
main: waiting for results
ThreadPoolExecutor-0_0: done with 5
main: result: 0.5
main: future after result: <Future at 0x1034e1ef0 state=finished
returned float>
Esperar por tareas en cualquier orden¶
Invocar el método result()
de los bloques Future
hasta que la tarea se
completa (ya sea devolviendo un valor o elevando un excepción), o se cancela.
Los resultados de múltiples tareas pueden ser accedidos en el orden en que se
programaron las tareas usando map()
. Si no importa en qué orden se
procesen los resultados, usa as_completed()
para procesarlos a medida que
cada tarea finaliza.
from concurrent import futures
import random
import time
def task(n):
time.sleep(random.random())
return (n, n / 10)
ex = futures.ThreadPoolExecutor(max_workers=5)
print('main: starting')
wait_for = [
ex.submit(task, i)
for i in range(5, 0, -1)
]
for f in futures.as_completed(wait_for):
print('main: result: {}'.format(f.result()))
Debido a que el grupo tiene tantos trabajadores como tareas, todas las tareas
pueden ser empezadas. Terminan en orden aleatorio por lo que los valores
generados por as_completed()
son diferentes cada vez que se ejecuta el
ejemplo.
$ python3 futures_as_completed.py
main: starting
main: result: (1, 0.1)
main: result: (5, 0.5)
main: result: (3, 0.3)
main: result: (2, 0.2)
main: result: (4, 0.4)
Devoluciones de llamada futuras¶
Para realizar alguna acción cuando se complete una tarea, sin esperar
explícitamente por el resultado, usa add_done_callback()
para especificar
una nueva función para llamar cuando el Future
esté listo. La devolución
de llamada debe ser invocable tomando un solo argumento, la instancia
Future
.
from concurrent import futures
import time
def task(n):
print('{}: sleeping'.format(n))
time.sleep(0.5)
print('{}: done'.format(n))
return n / 10
def done(fn):
if fn.cancelled():
print('{}: canceled'.format(fn.arg))
elif fn.done():
error = fn.exception()
if error:
print('{}: error returned: {}'.format(
fn.arg, error))
else:
result = fn.result()
print('{}: value returned: {}'.format(
fn.arg, result))
if __name__ == '__main__':
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
f.arg = 5
f.add_done_callback(done)
result = f.result()
La devolución de llamada se invoca independientemente de la razón por la que
Future
se considera «listo», por lo que es necesario verificar el estado
del objeto pasado a la devolución de llamada antes de usarlo de cualquier
manera.
$ python3 futures_future_callback.py
main: starting
5: sleeping
5: done
5: value returned: 0.5
Cancelar tareas¶
Un Future
puede ser cancelado, si ha sido enviado pero no comenzó, llamando
a su método cancel()
.
from concurrent import futures
import time
def task(n):
print('{}: sleeping'.format(n))
time.sleep(0.5)
print('{}: done'.format(n))
return n / 10
def done(fn):
if fn.cancelled():
print('{}: canceled'.format(fn.arg))
elif fn.done():
print('{}: not canceled'.format(fn.arg))
if __name__ == '__main__':
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
tasks = []
for i in range(10, 0, -1):
print('main: submitting {}'.format(i))
f = ex.submit(task, i)
f.arg = i
f.add_done_callback(done)
tasks.append((i, f))
for i, t in reversed(tasks):
if not t.cancel():
print('main: did not cancel {}'.format(i))
ex.shutdown()
cancel()
devuelve un valor booleano que indica si la tarea se pudo cancelar
o no.
$ python3 futures_future_callback_cancel.py
main: starting
main: submitting 10
10: sleeping
main: submitting 9
9: sleeping
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
1: canceled
2: canceled
3: canceled
4: canceled
5: canceled
6: canceled
7: canceled
8: canceled
main: did not cancel 9
main: did not cancel 10
10: done
10: not canceled
9: done
9: not canceled
Excepciones en tareas¶
Si una tarea genera una excepción no controlada, se guarda en el Future
para la tarea y se hace disponible a través de los métodos result()
o
exception()
.
from concurrent import futures
def task(n):
print('{}: starting'.format(n))
raise ValueError('the value {} is no good'.format(n))
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
error = f.exception()
print('main: error: {}'.format(error))
try:
result = f.result()
except ValueError as e:
print('main: saw error "{}" when accessing result'.format(e))
Si se llama a result()
después de que se genera una excepción no manejada
dentro de una función de tarea, la misma excepción se re-plantea en el contexto
actual.
$ python3 futures_future_exception.py
main: starting
5: starting
main: error: the value 5 is no good
main: saw error "the value 5 is no good" when accessing result
Gestor de contexto¶
Los ejecutores trabajan como administradores de contexto, ejecutando tareas
simultáneamente y esperando que todas se completen. Cuando el administrador de
contexto termina, el método shutdown()
del ejecutor es llamado.
from concurrent import futures
def task(n):
print(n)
with futures.ThreadPoolExecutor(max_workers=2) as ex:
print('main: starting')
ex.submit(task, 1)
ex.submit(task, 2)
ex.submit(task, 3)
ex.submit(task, 4)
print('main: done')
Este modo de usar el ejecutor es útil cuando el hilo o los recursos del proceso deben ser limpiados cuando la ejecución deja el alcance actual.
$ python3 futures_context_manager.py
main: starting
1
2
3
4
main: done
Grupos de procesos¶
El ProcessPoolExecutor
funciona de la misma manera que
ThreadPoolExecutor
, pero usa procesos en lugar de hilos. Esto permite que
las operaciones intensivas de CPU usen un CPU separado y no sean bloqueadas por
el bloqueo global del intérprete de CPython.
from concurrent import futures
import os
def task(n):
return (n, os.getpid())
ex = futures.ProcessPoolExecutor(max_workers=2)
results = ex.map(task, range(5, 0, -1))
for n, pid in results:
print('ran task {} in process {}'.format(n, pid))
Al igual que con el grupo de hilos, los procesos de trabajo individuales se reutilizan para múltiples tareas.
$ python3 futures_process_pool_map.py
ran task 5 in process 40854
ran task 4 in process 40854
ran task 3 in process 40854
ran task 2 in process 40854
ran task 1 in process 40854
Si algo le sucede a uno de los procesos de trabajo para hacer que salir
inesperadamente, el ProcessPoolExecutor
es considerado «roto» y ya no
programará tareas.
from concurrent import futures
import os
import signal
with futures.ProcessPoolExecutor(max_workers=2) as ex:
print('getting the pid for one worker')
f1 = ex.submit(os.getpid)
pid1 = f1.result()
print('killing process {}'.format(pid1))
os.kill(pid1, signal.SIGHUP)
print('submitting another task')
f2 = ex.submit(os.getpid)
try:
pid2 = f2.result()
except futures.process.BrokenProcessPool as e:
print('could not start new tasks: {}'.format(e))
La excepción BrokenProcessPool
se lanza realmente cuando los resultados se
procesan, en lugar de cuando se envía la nueva tarea.
$ python3 futures_process_pool_broken.py
getting the pid for one worker
killing process 40858
submitting another task
could not start new tasks: A process in the process pool was
terminated abruptly while the future was running or pending.
Ver también
- Documentación de la biblioteca estándar para concurrent.futures
- PEP 3148 – La propuesta para crear el conjunto de características de
concurrent.futures
- Combinar co-rutinas con hilos y procesos
threading
multiprocessing