Componiendo co-rutinas con estructuras de control

El flujo de control lineal entre una serie de co-rutinas es fácil de manejar con la palabra clave await incorporada al lenguaje. Estructuras más complicadas que permiten que una co-rutina espere a que otras completen en paralelo también son posibles usando herramientas en asyncio.

Esperar múltiples co-rutinas

A menudo es útil dividir una operación en muchas partes y ejecutarlas por separado. Por ejemplo, descargar varios recursos remotos o consultar interfaces de programación remotas. En situaciones donde el orden de ejecución no importa, y donde puede haber un número arbitrario de operaciones, wait() se puede usar para pausar una co-rutina hasta que otras operaciones de segundo plano sean completadas.

asyncio_wait.py
import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    await asyncio.sleep(0.1 * i)
    print('done with phase {}'.format(i))
    return 'phase {} result'.format(i)


async def main(num_phases):
    print('starting main')
    phases = [
        phase(i)
        for i in range(num_phases)
    ]
    print('waiting for phases to complete')
    completed, pending = await asyncio.wait(phases)
    results = [t.result() for t in completed]
    print('results: {!r}'.format(results))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

Internamente, wait() usa un set para mantener la instancia Task que crea. Esto hace que comiencen y terminen en un orden impredecible. El valor de retorno de wait() es una tupla que contiene dos conjuntos que contienen las tareas terminadas y pendientes.

$ python3 asyncio_wait.py

starting main
waiting for phases to complete
in phase 0
in phase 1
in phase 2
done with phase 0
done with phase 1
done with phase 2
results: ['phase 1 result', 'phase 0 result', 'phase 2 result']

Solo quedarán operaciones pendientes si se usa wait() con un valor de tiempo de espera.

asyncio_wait_timeout.py
import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    try:
        await asyncio.sleep(0.1 * i)
    except asyncio.CancelledError:
        print('phase {} canceled'.format(i))
        raise
    else:
        print('done with phase {}'.format(i))
        return 'phase {} result'.format(i)


async def main(num_phases):
    print('starting main')
    phases = [
        phase(i)
        for i in range(num_phases)
    ]
    print('waiting 0.1 for phases to complete')
    completed, pending = await asyncio.wait(phases, timeout=0.1)
    print('{} completed and {} pending'.format(
        len(completed), len(pending),
    ))
    # Cancel remaining tasks so they do not generate errors
    # as we exit without finishing them.
    if pending:
        print('canceling tasks')
        for t in pending:
            t.cancel()
    print('exiting main')


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

Las operaciones de segundo plano restantes deben cancelarse o ser terminadas al esperarlas. Dejándolos pendientes mientras el bucle de eventos continúa les permitirá seguir ejecutándose, lo que puede no ser deseable si la operación general se considera abortada. Dejándolos pendiente al final del proceso resultará en advertencias que serán reportadas.

$ python3 asyncio_wait_timeout.py

starting main
waiting 0.1 for phases to complete
in phase 1
in phase 0
in phase 2
done with phase 0
1 completed and 2 pending
cancelling tasks
exiting main
phase 1 cancelled
phase 2 cancelled

Recopilar resultados de co-rutinas

Si las fases de segundo plano están bien definidas, y solo los resultados de esas fases son importantes, entonces gather() puede ser más útil para esperar múltiples operaciones.

asyncio_gather.py
import asyncio


async def phase1():
    print('in phase1')
    await asyncio.sleep(2)
    print('done with phase1')
    return 'phase1 result'


async def phase2():
    print('in phase2')
    await asyncio.sleep(1)
    print('done with phase2')
    return 'phase2 result'


async def main():
    print('starting main')
    print('waiting for phases to complete')
    results = await asyncio.gather(
        phase1(),
        phase2(),
    )
    print('results: {!r}'.format(results))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main())
finally:
    event_loop.close()

Las tareas creadas por gather no son expuestas, por lo que no pueden ser canceladas. El valor de retorno es una lista de resultados en el mismo orden que los argumentos pasados a gather(), independientemente del orden en que las operaciones de segundo plano son completadas.

$ python3 asyncio_gather.py

starting main
waiting for phases to complete
in phase2
in phase1
done with phase2
done with phase1
results: ['phase1 result', 'phase2 result']

Manejar operaciones de segundo plano a medida que terminan

as_completed() es un generador que administra la ejecución de una lista de co-rutinas que se le dan y produce sus resultados uno por uno a medida que terminan de correr. Al igual que con wait(), el orden no es garantizado por as_completed(), pero no es necesario esperar para que todas las operaciones en segundo plano se completen antes de tomar otra acción.

asyncio_as_completed.py
import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    await asyncio.sleep(0.5 - (0.1 * i))
    print('done with phase {}'.format(i))
    return 'phase {} result'.format(i)


async def main(num_phases):
    print('starting main')
    phases = [
        phase(i)
        for i in range(num_phases)
    ]
    print('waiting for phases to complete')
    results = []
    for next_to_complete in asyncio.as_completed(phases):
        answer = await next_to_complete
        print('received answer {!r}'.format(answer))
        results.append(answer)
    print('results: {!r}'.format(results))
    return results


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

Este ejemplo inicia varias fases de segundo plano que terminan en el orden inverso en el que comienzan. A medida que se consume el generador, el bucle espera el resultado de la rutina usando await.

$ python3 asyncio_as_completed.py

starting main
waiting for phases to complete
in phase 0
in phase 2
in phase 1
done with phase 2
received answer 'phase 2 result'
done with phase 1
received answer 'phase 1 result'
done with phase 0
received answer 'phase 0 result'
results: ['phase 2 result', 'phase 1 result', 'phase 0 result']