Combinar co-rutinas con hilos y procesos

Muchas bibliotecas existentes no están listas para ser utilizadas con asyncio de forma nativa. Pueden bloquear, o depender de características de concurrencia no disponibles a través del módulo. Todavía es posible utilizar esas bibliotecas en una aplicación basada en asyncio usando un ejecutor de concurrent.futures para ejecutar el código ya sea en un hilo separado o en un proceso separado.

Hilos

El método run_in_executor() del bucle de eventos toma una instancia ejecutor, un invocable regular para ejecutar y cualquier argumento que se pase al llamable. Devuelve un Future que puede usarse para esperar que la función termine su trabajo y devuelva algo. Si no se pasa un ejecutor, se crea un ThreadPoolExecutor. Esta ejemplo crea explícitamente un ejecutor para limitar el número de hilos trabajadores que tendrá disponibles.

Un ThreadPoolExecutor inicia sus hilos de trabajo y luego llama cada una de las funciones proporcionadas una vez en un hilo. Este ejemplo muestra cómo combinar run_in_executor() y wait() para que una co-rutina ceda el control del bucle de eventos mientras se ejecutan las funciones de bloqueo en hilos separados, y luego volver a activarlas cuando esas funciones están listas.

asyncio_executor_thread.py
import asyncio
import concurrent.futures
import logging
import sys
import time


def blocks(n):
    log = logging.getLogger('blocks({})'.format(n))
    log.info('running')
    time.sleep(0.1)
    log.info('done')
    return n ** 2


async def run_blocking_tasks(executor):
    log = logging.getLogger('run_blocking_tasks')
    log.info('starting')

    log.info('creating executor tasks')
    loop = asyncio.get_event_loop()
    blocking_tasks = [
        loop.run_in_executor(executor, blocks, i)
        for i in range(6)
    ]
    log.info('waiting for executor tasks')
    completed, pending = await asyncio.wait(blocking_tasks)
    results = [t.result() for t in completed]
    log.info('results: {!r}'.format(results))

    log.info('exiting')


if __name__ == '__main__':
    # Configure logging to show the name of the thread
    # where the log message originates.
    logging.basicConfig(
        level=logging.INFO,
        format='%(threadName)10s %(name)18s: %(message)s',
        stream=sys.stderr,
    )

    # Create a limited thread pool.
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=3,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            run_blocking_tasks(executor)
        )
    finally:
        event_loop.close()

asyncio_executor_thread.py usa logging indicar convenientemente qué hilo y función están produciendo cada mensaje de registro. Debido a que se utiliza un registrador separado en cada llamada a blocks(), la salida muestra claramente los mismos hilos que se están reutilizando para llamar a varias copias de la función con diferentes argumentos.

$ python3 asyncio_executor_thread.py

MainThread run_blocking_tasks: starting
MainThread run_blocking_tasks: creating executor tasks
ThreadPoolExecutor-0_0          blocks(0): running
ThreadPoolExecutor-0_1          blocks(1): running
ThreadPoolExecutor-0_2          blocks(2): running
MainThread run_blocking_tasks: waiting for executor tasks
ThreadPoolExecutor-0_0          blocks(0): done
ThreadPoolExecutor-0_1          blocks(1): done
ThreadPoolExecutor-0_2          blocks(2): done
ThreadPoolExecutor-0_0          blocks(3): running
ThreadPoolExecutor-0_1          blocks(4): running
ThreadPoolExecutor-0_2          blocks(5): running
ThreadPoolExecutor-0_0          blocks(3): done
ThreadPoolExecutor-0_2          blocks(5): done
ThreadPoolExecutor-0_1          blocks(4): done
MainThread run_blocking_tasks: results: [0, 9, 16, 25, 1, 4]
MainThread run_blocking_tasks: exiting

Procesos

Un ProcessPoolExecutor funciona de manera muy similar, creando un conjunto de procesos de trabajo en lugar de hilos. Usar procesos separados requiere más recursos del sistema, pero para operaciones de computación intensiva puede tener sentido ejecutar una tarea separada en cada núcleo de CPU.

asyncio_executor_process.py
# changes from asyncio_executor_thread.py

if __name__ == '__main__':
    # Configure logging to show the id of the process
    # where the log message originates.
    logging.basicConfig(
        level=logging.INFO,
        format='PID %(process)5s %(name)18s: %(message)s',
        stream=sys.stderr,
    )

    # Create a limited process pool.
    executor = concurrent.futures.ProcessPoolExecutor(
        max_workers=3,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            run_blocking_tasks(executor)
        )
    finally:
        event_loop.close()

El único cambio necesario para pasar de hilos a procesos es crear un ejecutor de tipo diferente. Este ejemplo también cambia la cadena de formato de registro para incluir el ID de proceso en lugar del nombre del hilo, para demostrar que las tareas se ejecutan realmente en procesos separados.

$ python3 asyncio_executor_process.py

PID 40498 run_blocking_tasks: starting
PID 40498 run_blocking_tasks: creating executor tasks
PID 40498 run_blocking_tasks: waiting for executor tasks
PID 40499          blocks(0): running
PID 40500          blocks(1): running
PID 40501          blocks(2): running
PID 40499          blocks(0): done
PID 40500          blocks(1): done
PID 40501          blocks(2): done
PID 40500          blocks(3): running
PID 40499          blocks(4): running
PID 40501          blocks(5): running
PID 40499          blocks(4): done
PID 40500          blocks(3): done
PID 40501          blocks(5): done
PID 40498 run_blocking_tasks: results: [1, 4, 9, 0, 16, 25]
PID 40498 run_blocking_tasks: exiting