E/S asíncrona usando co-rutinas y flujos

Esta sección examina versiones alternativas de los dos programas de ejemplo que implementan un servidor y cliente de eco simple, usando co-rutinas y la interfaz de programación de flujos de asyncio en lugar del protocolo y las abstracciones de clases de transporte. Los ejemplos operan a un nivel de abstracción más bajo que la interfaz de programación Protocol discutida anteriormente, pero los eventos procesados son similares.

Servidor de eco

El servidor comienza importando los módulos que necesita para configurar asyncio y logging, y luego crea un objeto bucle de eventos.

asyncio_echo_server_coroutine.py
import asyncio
import logging
import sys

SERVER_ADDRESS = ('localhost', 10000)
logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

A continuación, define una co-rutina para manejar la comunicación. Cada vez que un cliente se conecta, se invocará una nueva instancia de la co-rutina para que dentro de la función el código solo se comunica con un cliente a la vez. El sistema en tiempo de ejecución del lenguaje de Python administra el estado de cada instancia de co-rutina, por lo que el código de la aplicación no necesita gestionar cualquier estructura de datos extra para seguir clientes separados.

Los argumentos para las co-rutinas son instancias StreamReader y StreamWriter asociadas con la nueva conexión. Como con el Transport, se puede acceder a la dirección del cliente a través del método get_extra_info() del escritor .

async def echo(reader, writer):
    address = writer.get_extra_info('peername')
    log = logging.getLogger('echo_{}_{}'.format(*address))
    log.debug('connection accepted')

Aunque se llama la co-rutina cuando se establece la conexión, puede que todavía no haya datos para leer. Para evitar el bloqueo mientras se lee, la co-rutina usa await con la llamada read() para permitir que el bucle de eventos continúe procesando otras tareas hasta que haya datos para leer.

    while True:
        data = await reader.read(128)

Si el cliente envía datos, se los devuelve desde await y pueden ser enviados de vuelta al cliente pasándolos al escritor. Llamadas múltiples a write() pueden usarse para almacenar datos salientes, y luego drain() se usa para limpiar los resultados. Ya que el vaciado de la red de E/S puede bloquearse, de nuevo se usa await para restaurar el control del bucle de eventos, que supervisa el conector de escritura e invoca al escritor cuando es posible enviar más datos.

        if data:
            log.debug('received {!r}'.format(data))
            writer.write(data)
            await writer.drain()
            log.debug('sent {!r}'.format(data))

Si el cliente no ha enviado datos, read() devuelve una cadena de bytes vacía para indicar que la conexión está cerrada. El servidor necesita cerrar el conector para escribir al cliente, y luego la co-rutina puede volver para indicar que ha terminado.

        else:
            log.debug('closing')
            writer.close()
            return

Hay dos pasos para iniciar el servidor. Primero la aplicación le dice al bucle de eventos que cree un nuevo objeto de servidor usando la co-rutina y el nombre de host y el conector para escuchar. El método start_server() es en sí una co-rutina, por lo que los resultados deben ser procesados por el bucle de eventos para iniciar realmente el servidor. Completar la rutina produce una instancia asyncio.Server vinculada al bucle de eventos.

# Create the server and let the loop finish the coroutine before
# starting the real event loop.
factory = asyncio.start_server(echo, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))

Luego, el ciclo de eventos debe ejecutarse para procesar eventos y manejar las solicitudes de los clientes. Para un servicio de larga duración, el método run_forever() es la forma más sencilla de hacer esto. Cuando el el bucle de eventos se detiene, ya sea por el código de la aplicación o por señalización al proceso, el servidor se puede cerrar para limpiar el conector correctamente, y luego el bucle de eventos se puede cerrar para terminar de manejar otras co-rutinas antes de que el programa termine.

# Enter the event loop permanently to handle all connections.
try:
    event_loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    log.debug('closing server')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('closing event loop')
    event_loop.close()

Cliente de eco

Construir un cliente usando una co-rutina es muy similar a construir un servidor. El código comienza de nuevo importando los módulos que necesita para configurar asyncio y logging, y luego creando un objeto de bucle de evento.

asyncio_echo_client_coroutine.py
import asyncio
import logging
import sys

MESSAGES = [
    b'This is the message. ',
    b'It will be sent ',
    b'in parts.',
]
SERVER_ADDRESS = ('localhost', 10000)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

La co-rutina echo_client acepta argumentos que le dicen dónde está el servidor es y qué mensajes enviar.

async def echo_client(address, messages):

Se llama a la co-rutina cuando se inicia la tarea, pero no tiene una conexión activa para trabajar. El primer paso, por lo tanto, es tener que el cliente establezca su propia conexión. Utiliza await para evitar bloquear a otra actividad mientras que la co-rutina open_connection() se ejecuta.

    log = logging.getLogger('echo_client')

    log.debug('connecting to {} port {}'.format(*address))
    reader, writer = await asyncio.open_connection(*address)

La co-rutina open_connection() devuelve instancias StreamReader y StreamWriter asociadas con el nuevo conector. El siguiente paso es usar el escritor para enviar datos a servidor. Al igual que en el servidor, el escritor almacenará los datos salientes hasta que el conector está listo o drain() se utiliza para vaciar los resultados. Dado que la E/S de la red de descarga puede bloquearse, nuevamente se usa await para restaurar el control del bucle de eventos, que supervisa el conector de escritura e invoca al escritor cuando es posible enviar más datos.

    # This could be writer.writelines() except that
    # would make it harder to show each part of the message
    # being sent.
    for msg in messages:
        writer.write(msg)
        log.debug('sending {!r}'.format(msg))
    if writer.can_write_eof():
        writer.write_eof()
    await writer.drain()

A continuación, el cliente busca una respuesta del servidor intentando leer datos hasta que no queda nada por leer. Para evitar el bloqueo de una llamada individual de read(), await cede el control al bucle de eventos. Si el servidor ha enviado datos, se registran. Si el servidor no ha enviado datos, read() devuelve una cadena de bytes vacía para indicar que la conexión está cerrada. El cliente necesita cerrar el conector para enviar al servidor y luego volver para indicar que ha terminado.

    log.debug('waiting for response')
    while True:
        data = await reader.read(128)
        if data:
            log.debug('received {!r}'.format(data))
        else:
            log.debug('closing')
            writer.close()
            return

Para iniciar el cliente, el bucle de eventos se llama con la co-rutina para crear el cliente. Usar run_until_complete() evita tener un bucle infinito en el programa cliente. A diferencia del ejemplo de protocolo, no se necesita un futuro separado para señalar cuando la co-rutina ha terminado, porque echo_client() contiene toda la lógica del cliente y no regresa hasta que haya recibido una respuesta y cerrado el conexión del servidor.

try:
    event_loop.run_until_complete(
        echo_client(SERVER_ADDRESS, MESSAGES)
    )
finally:
    log.debug('closing event loop')
    event_loop.close()

Salida

Ejecutar el servidor en una ventana y el cliente en otra produce La siguiente salida.

$ python3 asyncio_echo_client_coroutine.py
asyncio: Using selector: KqueueSelector
echo_client: connecting to localhost port 10000
echo_client: sending b'This is the message. '
echo_client: sending b'It will be sent '
echo_client: sending b'in parts.'
echo_client: waiting for response
echo_client: received b'This is the message. It will be sent in parts.'
echo_client: closing
main: closing event loop

$ python3 asyncio_echo_client_coroutine.py
asyncio: Using selector: KqueueSelector
echo_client: connecting to localhost port 10000
echo_client: sending b'This is the message. '
echo_client: sending b'It will be sent '
echo_client: sending b'in parts.'
echo_client: waiting for response
echo_client: received b'This is the message. It will be sent in parts.'
echo_client: closing
main: closing event loop

$ python3 asyncio_echo_client_coroutine.py
asyncio: Using selector: KqueueSelector
echo_client: connecting to localhost port 10000
echo_client: sending b'This is the message. '
echo_client: sending b'It will be sent '
echo_client: sending b'in parts.'
echo_client: waiting for response
echo_client: received b'This is the message. It will be sent '
echo_client: received b'in parts.'
echo_client: closing
main: closing event loop

Aunque el cliente siempre envía los mensajes por separado, las dos primeras veces que el cliente se ejecuta, el servidor recibe un mensaje grande y hace eco de éste al cliente. Estos resultados varían en ejecuciones posteriores, en función de lo ocupada que esté la red y de si los búfers de la red se vacían antes de que todos los datos estén preparados.

$ python3 asyncio_echo_server_coroutine.py
asyncio: Using selector: KqueueSelector
main: starting up on localhost port 10000
echo_::1_64624: connection accepted
echo_::1_64624: received b'This is the message. It will be sent in parts.'
echo_::1_64624: sent b'This is the message. It will be sent in parts.'
echo_::1_64624: closing

echo_::1_64626: connection accepted
echo_::1_64626: received b'This is the message. It will be sent in parts.'
echo_::1_64626: sent b'This is the message. It will be sent in parts.'
echo_::1_64626: closing

echo_::1_64627: connection accepted
echo_::1_64627: received b'This is the message. It will be sent '
echo_::1_64627: sent b'This is the message. It will be sent '
echo_::1_64627: received b'in parts.'
echo_::1_64627: sent b'in parts.'
echo_::1_64627: closing