E/S asíncronas con abstracciones de clase de protocolo

Hasta este punto, todos los ejemplos han evitado la mezclar concurrencia y las operaciones de E/S para centrarse en un concepto a la vez. Sin embargo, el cambio de contextos cuando E/S bloquea de es uno de los principales casos de uso para asyncio. Construyendo sobre los conceptos de concurrencia ya introducidos, esta sección examina dos programas de ejemplo que implementan un servidor y cliente de eco simple, similar a los ejemplos utilizados en las secciones socket y socketserver. Un cliente puede conectarse al servidor, enviar algunos datos y luego recibir los mismos datos como respuesta. Cada vez que se inicia una operación de E/S, el código de ejecución cede el control al bucle de eventos, permitiendo que otras tareas se ejecuten hasta la E/S esté lista

Servidor de eco

El servidor comienza importando los módulos que necesita para configurar asyncio y logging, y luego crea un objeto bucle de eventos.

asyncio_echo_server_protocol.py
import asyncio
import logging
import sys

SERVER_ADDRESS = ('localhost', 10000)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

Luego define una subclase de asyncio.Protocol para manejar la comunicación con el cliente. Los métodos del objeto protocolo se invocan en base a los eventos asociados con el conector del servidor.

class EchoServer(asyncio.Protocol):

Cada nueva conexión de cliente desencadena una llamada a connection_made(). El argumento transport es una instancia de asyncio.Transport, que proporciona una abstracción para hacer E/S asíncrona utilizando el conector. Diferentes tipos de comunicación. proporcionan diferentes implementaciones de transporte, todas con la misma interfaz de programación. Por ejemplo, hay clases de transporte separadas para trabajar con conectores y para trabajar con pipes para subprocesos. La dirección del cliente entrante está disponible desde el transporte a través de get_extra_info(), un método específico a la implementación.

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.log = logging.getLogger(
            'EchoServer_{}_{}'.format(*self.address)
        )
        self.log.debug('connection accepted')

Después de que se establece una conexión, cuando los datos se envían desde el cliente al servidor el método data_received() del protocolo es invocado para pasar los datos para su procesamiento. Los datos se pasan como una cadena de bytes, y depende de la aplicación decodificarlos en una manera adecuada. Aquí se registran los resultados y luego se envía una respuesta a el cliente inmediatamente llamando a transport.write().

    def data_received(self, data):
        self.log.debug('received {!r}'.format(data))
        self.transport.write(data)
        self.log.debug('sent {!r}'.format(data))

Algunos transportes admiten un indicador especial de fin de archivo («EOF»). Cuando se encuentra un EOF, se llama al método eof_received(). En esta implementación, el EOF se envía de vuelta al cliente para indicar que fue recibido. Porque no todos los transportes soportan un EOF explícito, este protocolo pregunta primero al transporte si es seguro enviar EOF.

    def eof_received(self):
        self.log.debug('received EOF')
        if self.transport.can_write_eof():
            self.transport.write_eof()

Cuando se cierra una conexión, ya sea normalmente o como resultado de un error, se llama al método connection_lost() del protocolo. Si hubo un error, el argumento contiene un objeto excepción apropiado. De lo contrario es None.

    def connection_lost(self, error):
        if error:
            self.log.error('ERROR: {}'.format(error))
        else:
            self.log.debug('closing')
        super().connection_lost(error)

Hay dos pasos para iniciar el servidor. Primero la aplicación le dice al bucle de eventos que cree un nuevo objeto de servidor usando la clase protocolo y el nombre de host y conector para escuchar. El método create_server() es una co-rutina, por lo que los resultados deben ser procesado por el bucle de eventos para iniciar realmente el servidor. Completar la rutina produce una instancia asyncio.Server vinculada al bucle de eventos.

# Create the server and let the loop finish the coroutine before
# starting the real event loop.
factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))

Luego, el ciclo de eventos debe ejecutarse para procesar eventos y manejar las solicitudes de los clientes. Para un servicio de larga duración, el método run_forever() es la forma más sencilla de hacer esto. Cuando el bucle de eventos se detiene, ya sea por el código de la aplicación o por señal al proceso, el servidor se puede cerrar para limpiar el conector correctamente, y luego el bucle de eventos se puede cerrar para terminar de manejar cualquier otra co-rutina antes de que el programa termine.

# Enter the event loop permanently to handle all connections.
try:
    event_loop.run_forever()
finally:
    log.debug('closing server')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('closing event loop')
    event_loop.close()

Cliente de eco

Construir un cliente usando una clase de protocolo es muy similar a construir un servidor. El código comienza de nuevo importando los módulos que necesita para configurar asyncio y logging, y luego creando un objeto de bucle de eventos.

asyncio_echo_client_protocol.py
import asyncio
import functools
import logging
import sys

MESSAGES = [
    b'This is the message. ',
    b'It will be sent ',
    b'in parts.',
]
SERVER_ADDRESS = ('localhost', 10000)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

La clase de protocolo del cliente define los mismos métodos que el servidor, con diferentes implementaciones. El constructor de la clase acepta dos argumentos, una lista de los mensajes a enviar y una instancia Future a utilizar para indicar que el cliente ha completado un ciclo de trabajo al recibir una respuesta del servidor.

class EchoClient(asyncio.Protocol):

    def __init__(self, messages, future):
        super().__init__()
        self.messages = messages
        self.log = logging.getLogger('EchoClient')
        self.f = future

Cuando el cliente se conecta con éxito al servidor, empieza a comunicarse inmediatamente. La secuencia de mensajes se envía uno a uno, aunque el código de red subyacente puede combinar múltiples mensajes en una sola transmisión. Cuando todos los mensajes se han agotado, se envía un EOF.

Aunque parece que todos los datos se envían inmediatamente, de hecho, el objeto de transporte almacena los datos salientes y establece una devolución de llamada para transmitir realmente cuando el búfer del conector está listo para recibir datos. Todo esto se maneja de forma transparente, por lo el código de la aplicación se puede escribir como si la operación de E/S se está realizando correctamente.

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.log.debug(
            'connecting to {} port {}'.format(*self.address)
        )
        # This could be transport.writelines() except that
        # would make it harder to show each part of the message
        # being sent.
        for msg in self.messages:
            transport.write(msg)
            self.log.debug('sending {!r}'.format(msg))
        if transport.can_write_eof():
            transport.write_eof()

Cuando se recibe la respuesta del servidor, se registra.

    def data_received(self, data):
        self.log.debug('received {!r}'.format(data))

Y cuando se recibe un marcador de fin de archivo o la conexión es cerrado desde el lado del servidor, el objeto de transporte local es cerrado y el objeto futuro se marca como listo al establecer un resultado.

    def eof_received(self):
        self.log.debug('received EOF')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)

    def connection_lost(self, exc):
        self.log.debug('server closed connection')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)
        super().connection_lost(exc)

Normalmente, la clase de protocolo se pasa al bucle de eventos para crear la conexión. En este caso, porque el bucle de eventos no tiene facilidades para pasar argumentos adicionales al constructor de protocolo, es necesario crear un partial para envolver la clase cliente y pasar la lista de mensajes a enviar y la instancia Future. Esa nueva llamada luego se usa en lugar de la clase al llamar create_connection() para establecer la conexión al cliente.

client_completed = asyncio.Future()

client_factory = functools.partial(
    EchoClient,
    messages=MESSAGES,
    future=client_completed,
)
factory_coroutine = event_loop.create_connection(
    client_factory,
    *SERVER_ADDRESS,
)

Para desencadenar la ejecución del cliente, se llama al bucle de eventos una vez con la co-rutina para crear el cliente y luego otra vez con la instancia Future dada al cliente para comunicarse cuando esté listo. El uso de dos llamadas como esta evita tener un bucle infinito en el programa cliente, que probablemente quiera salir después de que haya terminado de comunicarse con el servidor. Si solo la primera llamada fue usada para esperar para que el co-rutina cree el cliente, podría no procesar todo los datos de respuesta y limpiar la conexión al servidor correctamente.

log.debug('waiting for client to complete')
try:
    event_loop.run_until_complete(factory_coroutine)
    event_loop.run_until_complete(client_completed)
finally:
    log.debug('closing event loop')
    event_loop.close()

Salida

Ejecutar el servidor en una ventana y el cliente en otra produce la siguiente salida.

$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop

$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop

$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop

Aunque el cliente siempre envía los mensajes por separado, la primera vez que el cliente ejecuta, el servidor recibe un mensaje grande y hace eco de éste al cliente. Estos resultados varían en ejecuciones posteriores, en función de qué tan ocupada está la red y si los búfers de la red son vaciados antes de que todos los datos estén preparados.

$ python3 asyncio_echo_server_protocol.py
asyncio: Using selector: KqueueSelector
main: starting up on localhost port 10000
EchoServer_::1_63347: connection accepted
EchoServer_::1_63347: received b'This is the message. It will be sent in parts.'
EchoServer_::1_63347: sent b'This is the message. It will be sent in parts.'
EchoServer_::1_63347: received EOF
EchoServer_::1_63347: closing

EchoServer_::1_63387: connection accepted
EchoServer_::1_63387: received b'This is the message. '
EchoServer_::1_63387: sent b'This is the message. '
EchoServer_::1_63387: received b'It will be sent in parts.'
EchoServer_::1_63387: sent b'It will be sent in parts.'
EchoServer_::1_63387: received EOF
EchoServer_::1_63387: closing

EchoServer_::1_63389: connection accepted
EchoServer_::1_63389: received b'This is the message. It will be sent '
EchoServer_::1_63389: sent b'This is the message. It will be sent '
EchoServer_::1_63389: received b'in parts.'
EchoServer_::1_63389: sent b'in parts.'
EchoServer_::1_63389: received EOF
EchoServer_::1_63389: closing