E/S asíncronas con abstracciones de clase de protocolo¶
Hasta este punto, todos los ejemplos han evitado la mezclar concurrencia y las
operaciones de E/S para centrarse en un concepto a la vez. Sin embargo, el
cambio de contextos cuando E/S bloquea de es uno de los principales casos de
uso para asyncio
. Construyendo sobre los conceptos de concurrencia ya
introducidos, esta sección examina dos programas de ejemplo que implementan un
servidor y cliente de eco simple, similar a los ejemplos utilizados en las
secciones socket
y socketserver
. Un cliente puede conectarse al
servidor, enviar algunos datos y luego recibir los mismos datos como respuesta.
Cada vez que se inicia una operación de E/S, el código de ejecución cede el
control al bucle de eventos, permitiendo que otras tareas se ejecuten hasta la
E/S esté lista
Servidor de eco¶
El servidor comienza importando los módulos que necesita para configurar
asyncio
y logging
, y luego crea un objeto bucle de eventos.
import asyncio
import logging
import sys
SERVER_ADDRESS = ('localhost', 10000)
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s: %(message)s',
stream=sys.stderr,
)
log = logging.getLogger('main')
event_loop = asyncio.get_event_loop()
Luego define una subclase de asyncio.Protocol
para manejar la comunicación
con el cliente. Los métodos del objeto protocolo se invocan en base a los
eventos asociados con el conector del servidor.
class EchoServer(asyncio.Protocol):
Cada nueva conexión de cliente desencadena una llamada a connection_made()
.
El argumento transport es una instancia de asyncio.Transport
, que
proporciona una abstracción para hacer E/S asíncrona utilizando el conector.
Diferentes tipos de comunicación. proporcionan diferentes implementaciones de
transporte, todas con la misma interfaz de programación. Por ejemplo, hay
clases de transporte separadas para trabajar con conectores y para trabajar con
pipes para subprocesos. La dirección del cliente entrante está disponible
desde el transporte a través de get_extra_info()
, un método específico a la
implementación.
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.log = logging.getLogger(
'EchoServer_{}_{}'.format(*self.address)
)
self.log.debug('connection accepted')
Después de que se establece una conexión, cuando los datos se envían desde el
cliente al servidor el método data_received()
del protocolo es invocado
para pasar los datos para su procesamiento. Los datos se pasan como una cadena
de bytes, y depende de la aplicación decodificarlos en una manera adecuada.
Aquí se registran los resultados y luego se envía una respuesta a el cliente
inmediatamente llamando a transport.write()
.
def data_received(self, data):
self.log.debug('received {!r}'.format(data))
self.transport.write(data)
self.log.debug('sent {!r}'.format(data))
Algunos transportes admiten un indicador especial de fin de archivo («EOF»).
Cuando se encuentra un EOF, se llama al método eof_received()
. En esta
implementación, el EOF se envía de vuelta al cliente para indicar que fue
recibido. Porque no todos los transportes soportan un EOF explícito, este
protocolo pregunta primero al transporte si es seguro enviar EOF.
def eof_received(self):
self.log.debug('received EOF')
if self.transport.can_write_eof():
self.transport.write_eof()
Cuando se cierra una conexión, ya sea normalmente o como resultado de un error,
se llama al método connection_lost()
del protocolo. Si hubo un error, el
argumento contiene un objeto excepción apropiado. De lo contrario es None
.
def connection_lost(self, error):
if error:
self.log.error('ERROR: {}'.format(error))
else:
self.log.debug('closing')
super().connection_lost(error)
Hay dos pasos para iniciar el servidor. Primero la aplicación le dice al bucle
de eventos que cree un nuevo objeto de servidor usando la clase protocolo y el
nombre de host y conector para escuchar. El método create_server()
es una
co-rutina, por lo que los resultados deben ser procesado por el bucle de
eventos para iniciar realmente el servidor. Completar la rutina produce una
instancia asyncio.Server
vinculada al bucle de eventos.
# Create the server and let the loop finish the coroutine before
# starting the real event loop.
factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug('starting up on {} port {}'.format(*SERVER_ADDRESS))
Luego, el ciclo de eventos debe ejecutarse para procesar eventos y manejar las
solicitudes de los clientes. Para un servicio de larga duración, el método
run_forever()
es la forma más sencilla de hacer esto. Cuando el bucle de
eventos se detiene, ya sea por el código de la aplicación o por señal al
proceso, el servidor se puede cerrar para limpiar el conector correctamente, y
luego el bucle de eventos se puede cerrar para terminar de manejar cualquier
otra co-rutina antes de que el programa termine.
# Enter the event loop permanently to handle all connections.
try:
event_loop.run_forever()
finally:
log.debug('closing server')
server.close()
event_loop.run_until_complete(server.wait_closed())
log.debug('closing event loop')
event_loop.close()
Cliente de eco¶
Construir un cliente usando una clase de protocolo es muy similar a construir
un servidor. El código comienza de nuevo importando los módulos que necesita
para configurar asyncio
y logging
, y luego creando un objeto de
bucle de eventos.
import asyncio
import functools
import logging
import sys
MESSAGES = [
b'This is the message. ',
b'It will be sent ',
b'in parts.',
]
SERVER_ADDRESS = ('localhost', 10000)
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s: %(message)s',
stream=sys.stderr,
)
log = logging.getLogger('main')
event_loop = asyncio.get_event_loop()
La clase de protocolo del cliente define los mismos métodos que el servidor,
con diferentes implementaciones. El constructor de la clase acepta dos
argumentos, una lista de los mensajes a enviar y una instancia Future
a
utilizar para indicar que el cliente ha completado un ciclo de trabajo al
recibir una respuesta del servidor.
class EchoClient(asyncio.Protocol):
def __init__(self, messages, future):
super().__init__()
self.messages = messages
self.log = logging.getLogger('EchoClient')
self.f = future
Cuando el cliente se conecta con éxito al servidor, empieza a comunicarse inmediatamente. La secuencia de mensajes se envía uno a uno, aunque el código de red subyacente puede combinar múltiples mensajes en una sola transmisión. Cuando todos los mensajes se han agotado, se envía un EOF.
Aunque parece que todos los datos se envían inmediatamente, de hecho, el objeto de transporte almacena los datos salientes y establece una devolución de llamada para transmitir realmente cuando el búfer del conector está listo para recibir datos. Todo esto se maneja de forma transparente, por lo el código de la aplicación se puede escribir como si la operación de E/S se está realizando correctamente.
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.log.debug(
'connecting to {} port {}'.format(*self.address)
)
# This could be transport.writelines() except that
# would make it harder to show each part of the message
# being sent.
for msg in self.messages:
transport.write(msg)
self.log.debug('sending {!r}'.format(msg))
if transport.can_write_eof():
transport.write_eof()
Cuando se recibe la respuesta del servidor, se registra.
def data_received(self, data):
self.log.debug('received {!r}'.format(data))
Y cuando se recibe un marcador de fin de archivo o la conexión es cerrado desde el lado del servidor, el objeto de transporte local es cerrado y el objeto futuro se marca como listo al establecer un resultado.
def eof_received(self):
self.log.debug('received EOF')
self.transport.close()
if not self.f.done():
self.f.set_result(True)
def connection_lost(self, exc):
self.log.debug('server closed connection')
self.transport.close()
if not self.f.done():
self.f.set_result(True)
super().connection_lost(exc)
Normalmente, la clase de protocolo se pasa al bucle de eventos para crear la
conexión. En este caso, porque el bucle de eventos no tiene facilidades para
pasar argumentos adicionales al constructor de protocolo, es necesario crear un
partial
para envolver la clase cliente y pasar la lista de mensajes a
enviar y la instancia Future
. Esa nueva llamada luego se usa en lugar de
la clase al llamar create_connection()
para establecer la conexión al
cliente.
client_completed = asyncio.Future()
client_factory = functools.partial(
EchoClient,
messages=MESSAGES,
future=client_completed,
)
factory_coroutine = event_loop.create_connection(
client_factory,
*SERVER_ADDRESS,
)
Para desencadenar la ejecución del cliente, se llama al bucle de eventos una
vez con la co-rutina para crear el cliente y luego otra vez con la instancia
Future
dada al cliente para comunicarse cuando esté listo. El uso de dos
llamadas como esta evita tener un bucle infinito en el programa cliente, que
probablemente quiera salir después de que haya terminado de comunicarse con el
servidor. Si solo la primera llamada fue usada para esperar para que el
co-rutina cree el cliente, podría no procesar todo los datos de respuesta y
limpiar la conexión al servidor correctamente.
log.debug('waiting for client to complete')
try:
event_loop.run_until_complete(factory_coroutine)
event_loop.run_until_complete(client_completed)
finally:
log.debug('closing event loop')
event_loop.close()
Salida¶
Ejecutar el servidor en una ventana y el cliente en otra produce la siguiente salida.
$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop
$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop
$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: KqueueSelector
main: waiting for client to complete
EchoClient: connecting to ::1 port 10000
EchoClient: sending b'This is the message. '
EchoClient: sending b'It will be sent '
EchoClient: sending b'in parts.'
EchoClient: received b'This is the message. It will be sent in parts.'
EchoClient: received EOF
EchoClient: server closed connection
main: closing event loop
Aunque el cliente siempre envía los mensajes por separado, la primera vez que el cliente ejecuta, el servidor recibe un mensaje grande y hace eco de éste al cliente. Estos resultados varían en ejecuciones posteriores, en función de qué tan ocupada está la red y si los búfers de la red son vaciados antes de que todos los datos estén preparados.
$ python3 asyncio_echo_server_protocol.py
asyncio: Using selector: KqueueSelector
main: starting up on localhost port 10000
EchoServer_::1_63347: connection accepted
EchoServer_::1_63347: received b'This is the message. It will be sent in parts.'
EchoServer_::1_63347: sent b'This is the message. It will be sent in parts.'
EchoServer_::1_63347: received EOF
EchoServer_::1_63347: closing
EchoServer_::1_63387: connection accepted
EchoServer_::1_63387: received b'This is the message. '
EchoServer_::1_63387: sent b'This is the message. '
EchoServer_::1_63387: received b'It will be sent in parts.'
EchoServer_::1_63387: sent b'It will be sent in parts.'
EchoServer_::1_63387: received EOF
EchoServer_::1_63387: closing
EchoServer_::1_63389: connection accepted
EchoServer_::1_63389: received b'This is the message. It will be sent '
EchoServer_::1_63389: sent b'This is the message. It will be sent '
EchoServer_::1_63389: received b'in parts.'
EchoServer_::1_63389: sent b'in parts.'
EchoServer_::1_63389: received EOF
EchoServer_::1_63389: closing