Combinar co-rutinas con hilos y procesos¶
Muchas bibliotecas existentes no están listas para ser utilizadas con
asyncio
de forma nativa. Pueden bloquear, o depender de características de
concurrencia no disponibles a través del módulo. Todavía es posible utilizar
esas bibliotecas en una aplicación basada en asyncio
usando un ejecutor
de concurrent.futures
para ejecutar el código ya sea en un hilo separado
o en un proceso separado.
Hilos¶
El método run_in_executor()
del bucle de eventos toma una instancia
ejecutor, un invocable regular para ejecutar y cualquier argumento que se pase
al llamable. Devuelve un Future
que puede usarse para esperar que la
función termine su trabajo y devuelva algo. Si no se pasa un ejecutor, se crea
un ThreadPoolExecutor
. Esta ejemplo crea explícitamente un ejecutor para
limitar el número de hilos trabajadores que tendrá disponibles.
Un ThreadPoolExecutor
inicia sus hilos de trabajo y luego llama cada una de
las funciones proporcionadas una vez en un hilo. Este ejemplo muestra cómo
combinar run_in_executor()
y wait()
para que una co-rutina ceda el
control del bucle de eventos mientras se ejecutan las funciones de bloqueo en
hilos separados, y luego volver a activarlas cuando esas funciones están
listas.
import asyncio
import concurrent.futures
import logging
import sys
import time
def blocks(n):
log = logging.getLogger('blocks({})'.format(n))
log.info('running')
time.sleep(0.1)
log.info('done')
return n ** 2
async def run_blocking_tasks(executor):
log = logging.getLogger('run_blocking_tasks')
log.info('starting')
log.info('creating executor tasks')
loop = asyncio.get_event_loop()
blocking_tasks = [
loop.run_in_executor(executor, blocks, i)
for i in range(6)
]
log.info('waiting for executor tasks')
completed, pending = await asyncio.wait(blocking_tasks)
results = [t.result() for t in completed]
log.info('results: {!r}'.format(results))
log.info('exiting')
if __name__ == '__main__':
# Configure logging to show the name of the thread
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
# Create a limited thread pool.
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
asyncio_executor_thread.py
usa logging
indicar convenientemente qué
hilo y función están produciendo cada mensaje de registro. Debido a que se
utiliza un registrador separado en cada llamada a blocks()
, la salida
muestra claramente los mismos hilos que se están reutilizando para llamar a
varias copias de la función con diferentes argumentos.
$ python3 asyncio_executor_thread.py
MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0 blocks(0): running
ThreadPoolExecutor-0_1 blocks(1): running
ThreadPoolExecutor-0_2 blocks(2): running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_0 blocks(0): done
ThreadPoolExecutor-0_1 blocks(1): done
ThreadPoolExecutor-0_2 blocks(2): done
ThreadPoolExecutor-0_0 blocks(3): running
ThreadPoolExecutor-0_1 blocks(4): running
ThreadPoolExecutor-0_2 blocks(5): running
ThreadPoolExecutor-0_0 blocks(3): done
ThreadPoolExecutor-0_2 blocks(5): done
ThreadPoolExecutor-0_1 blocks(4): done
MainThread run_blocking_tasks: results: [0, 9, 16, 25, 1, 4]
MainThread run_blocking_tasks: exiting
Procesos¶
Un ProcessPoolExecutor
funciona de manera muy similar, creando un conjunto
de procesos de trabajo en lugar de hilos. Usar procesos separados requiere más
recursos del sistema, pero para operaciones de computación intensiva puede
tener sentido ejecutar una tarea separada en cada núcleo de CPU.
# changes from asyncio_executor_thread.py
if __name__ == '__main__':
# Configure logging to show the id of the process
# where the log message originates.
logging.basicConfig(
level=logging.INFO,
format='PID %(process)5s %(name)18s: %(message)s',
stream=sys.stderr,
)
# Create a limited process pool.
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=3,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
El único cambio necesario para pasar de hilos a procesos es crear un ejecutor de tipo diferente. Este ejemplo también cambia la cadena de formato de registro para incluir el ID de proceso en lugar del nombre del hilo, para demostrar que las tareas se ejecutan realmente en procesos separados.
$ python3 asyncio_executor_process.py
PID 40498 run_blocking_tasks: starting
PID 40498 run_blocking_tasks: creating executor tasks
PID 40498 run_blocking_tasks: waiting for executor tasks
PID 40499 blocks(0): running
PID 40500 blocks(1): running
PID 40501 blocks(2): running
PID 40499 blocks(0): done
PID 40500 blocks(1): done
PID 40501 blocks(2): done
PID 40500 blocks(3): running
PID 40499 blocks(4): running
PID 40501 blocks(5): running
PID 40499 blocks(4): done
PID 40500 blocks(3): done
PID 40501 blocks(5): done
PID 40498 run_blocking_tasks: results: [1, 4, 9, 0, 16, 25]
PID 40498 run_blocking_tasks: exiting