select — Esperar E/S eficientemente¶
Propósito: | Espera la notificación de que un canal de entrada o salida está listo. |
---|
El módulo select
proporciona acceso a funciones específicas de la
plataforma de monitoreo E/S. La interfaz más portátil es la función POSIX
select()
, que está disponible en Unix y Windows. El módulo también incluye
poll()
, una interfaz de programación solo para Unix, y varias opciones que
solo funcionan con variantes específicas de Unix.
Nota
El nuevo módulo selectors
proporciona una interfaz de nivel superior
construida sobre las interfaz de programación select
. Es más fácil de
construir código portátil usando selectors
, así que usa ese módulo a
menos que la interfaz de programación de bajo nivel proporcionada por
select
es de alguna manera necesarias.
Usar select()¶
La función select()
de Python es una interfaz directa a LA implementación
del sistema operativo subyacente. Monitorea conectores, abre archivos y pipes
(cualquier cosa con un método fileno()
que devuelve un descriptor de
archivo válido) hasta que sean legibles, escribibles o se produce un error de
comunicación. select()
hace que sea más fácil monitorear múltiples
conexiones al mismo tiempo, y es más eficiente que escribir un bucle de sondeo
en Python usando los tiempos de espera del conector, porque el monitoreo ocurre
en la capa de red del sistema operativo, en lugar del interprete.
Nota
Usar objetos de archivo de Python con select()
funciona para Unix, pero
no es compatible con Windows.
El ejemplo del servidor de eco de la sección socket
se puede ampliar
para ver más de una conexión a la vez usando select()
. La nueva versión
comienza creando un conector TCP/IP que no se bloquea y configurarlo para
escuchar en una dirección.
import select
import socket
import sys
import queue
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
# Bind the socket to the port
server_address = ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address),
file=sys.stderr)
server.bind(server_address)
# Listen for incoming connections
server.listen(5)
Los argumentos para select()
son tres listas que contienen canales de
comunicación a monitorear. La primera es una lista de los objetos para
verificar los datos entrantes que se leerán, el segundo contiene objetos que
recibirán datos salientes cuando haya espacio en su búfer, y el tercero
aquellos que pueden tener un error (generalmente un combinación de los objetos
del canal de entrada y salida). El siguiente paso en el servidor es configurar
las listas que contienen fuentes de entrada y los destinos de salida que se
pasarán a select()
.
# Sockets from which we expect to read
inputs = [server]
# Sockets to which we expect to write
outputs = []
El bucle principal del servidor agrega y elimina las conexiones de estas listas. Dado que esta versión del servidor va a esperar poder escribir a un conector antes de enviar cualquier dato (en lugar de enviar inmediatamente la respuesta), cada conexión de salida necesita una cola para actuar como un búfer para los datos que se enviarán a través de él.
# Outgoing message queues (socket:Queue)
message_queues = {}
La parte del programa principal del servidor hace un bucle, llamando a
select()
para bloquear y esperar la actividad de la red.
while inputs:
# Wait for at least one of the sockets to be
# ready for processing
print('waiting for the next event', file=sys.stderr)
readable, writable, exceptional = select.select(inputs,
outputs,
inputs)
select()
devuelve tres nuevas listas, que contienen subconjuntos del
contenido de las listas pasadas. Todos los conectores en la lista readable
tiene datos entrantes almacenados en búfer y disponibles para ser leídos.
Todos los conectores en la lista writable
tienen espacio libre en su búfer
y se puede escribir en ellos. Los conectores devueltos en excepcional
han
tenido un error (la definición real de «condición excepcional» depende de la
plataforma).
Los conectores «legibles» representan tres casos posibles. Si el conector es el conector principal del «servidor», el que se usa para escuchar conexiones, entonces la condición «legible» significa que está listo para aceptar otra conexión entrante. Además de añadir la nueva conexión a la lista de entradas para monitorear, esta sección establece que el conector del cliente no se bloquee.
# Handle inputs
for s in readable:
if s is server:
# A "readable" socket is ready to accept a connection
connection, client_address = s.accept()
print(' connection from', client_address,
file=sys.stderr)
connection.setblocking(0)
inputs.append(connection)
# Give the connection a queue for data
# we want to send
message_queues[connection] = queue.Queue()
El siguiente caso es una conexión establecida con un cliente que ha enviado
datos. Los datos se leen con recv()
, luego se colocan en la cola para que
puedan ser enviados a través del conector y de vuelta al cliente.
else:
data = s.recv(1024)
if data:
# A readable client socket has data
print(' received {!r} from {}'.format(
data, s.getpeername()), file=sys.stderr,
)
message_queues[s].put(data)
# Add output channel for response
if s not in outputs:
outputs.append(s)
Un conector legible sin datos disponibles proviene de un cliente que se ha desconectado, y el flujo está listo para ser cerrado.
else:
# Interpret empty result as closed connection
print(' closing', client_address,
file=sys.stderr)
# Stop listening for input on the connection
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
Hay menos casos para las conexiones escribibles. Si hay datos en la cola para
una conexión, se envía el siguiente mensaje. De otra manera, la conexión se
elimina de la lista de conexiones de salida para que la próxima vez a través
del bucle select()
no indique que el conector está listo para enviar datos.
# Handle outputs
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except queue.Empty:
# No messages waiting so stop checking
# for writability.
print(' ', s.getpeername(), 'queue empty',
file=sys.stderr)
outputs.remove(s)
else:
print(' sending {!r} to {}'.format(next_msg,
s.getpeername()),
file=sys.stderr)
s.send(next_msg)
Finalmente, si hay un error con un conector, se cierra.
# Handle "exceptional conditions"
for s in exceptional:
print('exception condition on', s.getpeername(),
file=sys.stderr)
# Stop listening for input on the connection
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
# Remove message queue
del message_queues[s]
El programa cliente de ejemplo utiliza dos conectores para demostrar cómo el
servidor con select()
administra múltiples conexiones al mismo tiempo. El
cliente comienza conectando cada conector TCP/IP al servidor.
import socket
import sys
messages = [
'This is the message. ',
'It will be sent ',
'in parts.',
]
server_address = ('localhost', 10000)
# Create a TCP/IP socket
socks = [
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]
# Connect the socket to the port where the server is listening
print('connecting to {} port {}'.format(*server_address),
file=sys.stderr)
for s in socks:
s.connect(server_address)
Luego envía una parte del mensaje a la vez a través de cada conector y lee todas las respuestas disponibles después de escribir nuevos datos.
for message in messages:
outgoing_data = message.encode()
# Send messages on both sockets
for s in socks:
print('{}: sending {!r}'.format(s.getsockname(),
outgoing_data),
file=sys.stderr)
s.send(outgoing_data)
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print('{}: received {!r}'.format(s.getsockname(),
data),
file=sys.stderr)
if not data:
print('closing socket', s.getsockname(),
file=sys.stderr)
s.close()
Ejecuta el servidor en una ventana y el cliente en otra. La salida se verá así, con diferentes números de puerto.
$ python3 select_echo_server.py
starting up on localhost port 10000
waiting for the next event
connection from ('127.0.0.1', 61003)
waiting for the next event
connection from ('127.0.0.1', 61004)
waiting for the next event
received b'This is the message. ' from ('127.0.0.1', 61003)
received b'This is the message. ' from ('127.0.0.1', 61004)
waiting for the next event
sending b'This is the message. ' to ('127.0.0.1', 61003)
sending b'This is the message. ' to ('127.0.0.1', 61004)
waiting for the next event
('127.0.0.1', 61003) queue empty
('127.0.0.1', 61004) queue empty
waiting for the next event
received b'It will be sent ' from ('127.0.0.1', 61003)
received b'It will be sent ' from ('127.0.0.1', 61004)
waiting for the next event
sending b'It will be sent ' to ('127.0.0.1', 61003)
sending b'It will be sent ' to ('127.0.0.1', 61004)
waiting for the next event
('127.0.0.1', 61003) queue empty
('127.0.0.1', 61004) queue empty
waiting for the next event
received b'in parts.' from ('127.0.0.1', 61003)
waiting for the next event
received b'in parts.' from ('127.0.0.1', 61004)
sending b'in parts.' to ('127.0.0.1', 61003)
waiting for the next event
('127.0.0.1', 61003) queue empty
sending b'in parts.' to ('127.0.0.1', 61004)
waiting for the next event
('127.0.0.1', 61004) queue empty
waiting for the next event
closing ('127.0.0.1', 61004)
closing ('127.0.0.1', 61004)
waiting for the next event
La salida del cliente muestra los datos que se envían y reciben utilizando ambos conectores.
$ python3 select_echo_multiclient.py
connecting to localhost port 10000
('127.0.0.1', 61003): sending b'This is the message. '
('127.0.0.1', 61004): sending b'This is the message. '
('127.0.0.1', 61003): received b'This is the message. '
('127.0.0.1', 61004): received b'This is the message. '
('127.0.0.1', 61003): sending b'It will be sent '
('127.0.0.1', 61004): sending b'It will be sent '
('127.0.0.1', 61003): received b'It will be sent '
('127.0.0.1', 61004): received b'It will be sent '
('127.0.0.1', 61003): sending b'in parts.'
('127.0.0.1', 61004): sending b'in parts.'
('127.0.0.1', 61003): received b'in parts.'
('127.0.0.1', 61004): received b'in parts.'
E/S sin bloqueo con tiempos de espera¶
select()
también toma un cuarto parámetro opcional, que es el número de
segundos a esperar antes de interrumpir el monitoreo si no se han activado
canales. El uso de un valor de tiempo de espera permite a un programa
principal ejecutar select()
como parte de un ciclo de procesamiento más
grande, tomando otras acciones entre la comprobación de entrada de red.
Cuando el tiempo de espera expira, select()
devuelve tres listas vacías.
Actualizar el ejemplo del servidor para usar un tiempo de espera requiere
agregar el argumento extra a la llamada select()
y manejo de las listas
vacías que select()
devuelve.
readable, writable, exceptional = select.select(inputs,
outputs,
inputs,
timeout)
if not (readable or writable or exceptional):
print(' timed out, do some other work here',
file=sys.stderr)
continue
Esta versión «lenta» del programa cliente se detiene después de enviar cada mensaje, para simular la latencia u otro retraso en la transmisión.
import socket
import sys
import time
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_address = ('localhost', 10000)
print('connecting to {} port {}'.format(*server_address),
file=sys.stderr)
sock.connect(server_address)
time.sleep(1)
messages = [
'Part one of the message.',
'Part two of the message.',
]
amount_expected = len(''.join(messages))
try:
# Send data
for message in messages:
data = message.encode()
print('sending {!r}'.format(data), file=sys.stderr)
sock.sendall(data)
time.sleep(1.5)
# Look for the response
amount_received = 0
while amount_received < amount_expected:
data = sock.recv(16)
amount_received += len(data)
print('received {!r}'.format(data), file=sys.stderr)
finally:
print('closing socket', file=sys.stderr)
sock.close()
Ejecutar el nuevo servidor con el cliente lento produce:
$ python3 select_echo_server_timeout.py
starting up on localhost port 10000
waiting for the next event
timed out, do some other work here
waiting for the next event
connection from ('127.0.0.1', 61144)
waiting for the next event
timed out, do some other work here
waiting for the next event
received b'Part one of the message.' from ('127.0.0.1', 61144)
waiting for the next event
sending b'Part one of the message.' to ('127.0.0.1', 61144)
waiting for the next event
('127.0.0.1', 61144) queue empty
waiting for the next event
timed out, do some other work here
waiting for the next event
received b'Part two of the message.' from ('127.0.0.1', 61144)
waiting for the next event
sending b'Part two of the message.' to ('127.0.0.1', 61144)
waiting for the next event
('127.0.0.1', 61144) queue empty
waiting for the next event
timed out, do some other work here
waiting for the next event
closing ('127.0.0.1', 61144)
waiting for the next event
timed out, do some other work here
Y esta es la salida del cliente:
$ python3 select_echo_slow_client.py
connecting to localhost port 10000
sending b'Part one of the message.'
sending b'Part two of the message.'
received b'Part one of the '
received b'message.Part two'
received b' of the message.'
closing socket
Usar poll()¶
La función poll()
proporciona características similares a select()
,
pero la implementación subyacente es más eficiente. La compensación es que
poll()
no es compatible con Windows, por lo que los programas que usan
poll()
son menos portátiles.
Un servidor de eco construido con poll()
comienza con el mismo código de
configuración del conector utilizado en los otros ejemplos.
import select
import socket
import sys
import queue
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
# Bind the socket to the port
server_address = ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address),
file=sys.stderr)
server.bind(server_address)
# Listen for incoming connections
server.listen(5)
# Keep up with the queues of outgoing messages
message_queues = {}
El valor de tiempo de espera pasado en poll()
se representa en
milisegundos, en lugar de segundos, así que para hacer una pausa de un segundo
completp, el tiempo de espera debe configurarse en 1000
.
# Do not block forever (milliseconds)
TIMEOUT = 1000
Python implementa poll()
con una clase que administra los canales de datos
registrados siendo monitoreados. Los canales son agregados llamando a
register()
con indicadores que indican qué eventos son Interesante para ese
canal. El conjunto completo de banderas está listado en the table below.
Evento | Descripción |
---|---|
POLLIN |
Entrada lista |
POLLPRI |
Entrada de prioridad |
POLLOUT |
Capaz de recibir salida |
POLLERR |
Error |
POLLHUP |
Canal cerrado |
POLLNVAL |
Canal no disponible |
El servidor de eco configurará algunos conectores solo para leer y otros para
leer o escribir. Las combinaciones apropiadas de las banderas se guardan en
las variables locales READ_ONLY
y READ_WRITE
.
# Commonly used flag sets
READ_ONLY = (
select.POLLIN |
select.POLLPRI |
select.POLLHUP |
select.POLLERR
)
READ_WRITE = READ_ONLY | select.POLLOUT
El conector server
está registrado para que cualquier conexión de entrada o
datos desencadenan un evento.
# Set up the poller
poller = select.poll()
poller.register(server, READ_ONLY)
Como poll()
devuelve una lista de tuplas que contienen el archivo
descriptor para el conector y el indicador de evento, solo se necesita una
asignación de descriptores de archivo a los objetos necesarios para recuperar
el socket
para leer o escribir desde él.
# Map file descriptors to socket objects
fd_to_socket = {
server.fileno(): server,
}
El bucle del servidor llama a poll()
y luego procesa los «eventos»
devueltos revisando el conector y tomando medidas en función de la bandera en
el evento.
while True:
# Wait for at least one of the sockets to be
# ready for processing
print('waiting for the next event', file=sys.stderr)
events = poller.poll(TIMEOUT)
for fd, flag in events:
# Retrieve the actual socket from its file descriptor
s = fd_to_socket[fd]
Al igual que con select()
, cuando el conector del servidor principal es
«legible» eso realmente significa que hay una conexión pendiente de un cliente.
La nueva conexión se registra con las banderas READ_ONLY
para ver que
nuevos datos lleguen a través de él.
# Handle inputs
if flag & (select.POLLIN | select.POLLPRI):
if s is server:
# A readable socket is ready
# to accept a connection
connection, client_address = s.accept()
print(' connection', client_address,
file=sys.stderr)
connection.setblocking(0)
fd_to_socket[connection.fileno()] = connection
poller.register(connection, READ_ONLY)
# Give the connection a queue for data to send
message_queues[connection] = queue.Queue()
Los conectores que no sean el servidor son clientes existentes y recv()
se
utiliza para acceder a los datos en espera de ser leídos.
else:
data = s.recv(1024)
Si recv()
devuelve algún dato, se coloca en la cola de salida para el
conector, y las banderas para ese conector se cambian usando modify()
así
que poll()
observará que el conector esté listo para recibir los datos.
if data:
# A readable client socket has data
print(' received {!r} from {}'.format(
data, s.getpeername()), file=sys.stderr,
)
message_queues[s].put(data)
# Add output channel for response
poller.modify(s, READ_WRITE)
Una cadena vacía devuelta por recv()
significa el cliente desconectado, por
lo que unregister()
se usa para indicar al objeto poll
que ignore el
conector.
else:
# Interpret empty result as closed connection
print(' closing', client_address,
file=sys.stderr)
# Stop listening for input on the connection
poller.unregister(s)
s.close()
# Remove message queue
del message_queues[s]
La bandera POLLHUP
indica un cliente que «colgó» el conexión sin cerrarla
limpiamente. El servidor deja de sondear clientes que desaparecen.
elif flag & select.POLLHUP:
# Client hung up
print(' closing', client_address, '(HUP)',
file=sys.stderr)
# Stop listening for input on the connection
poller.unregister(s)
s.close()
El manejo de conectores de escritura se parece a la versión utilizada en el
ejemplo para select()
, excepto que modify()
se usa para cambiar las
banderas para el conector en la sonda, en lugar de eliminarlo de la lista de
salida.
elif flag & select.POLLOUT:
# Socket is ready to send data,
# if there is any to send.
try:
next_msg = message_queues[s].get_nowait()
except queue.Empty:
# No messages waiting so stop checking
print(s.getpeername(), 'queue empty',
file=sys.stderr)
poller.modify(s, READ_ONLY)
else:
print(' sending {!r} to {}'.format(
next_msg, s.getpeername()), file=sys.stderr,
)
s.send(next_msg)
Y, finalmente, cualquier evento con POLLERR
hace que el servidor cierre el
conector.
elif flag & select.POLLERR:
print(' exception on', s.getpeername(),
file=sys.stderr)
# Stop listening for input on the connection
poller.unregister(s)
s.close()
# Remove message queue
del message_queues[s]
Cuando el servidor basado en sondeo se ejecuta junto con
select_echo_multiclient.py
(el programa cliente que usa múltiples
conectores), esta es la salida.
$ python3 select_poll_echo_server.py
starting up on localhost port 10000
waiting for the next event
waiting for the next event
waiting for the next event
waiting for the next event
connection ('127.0.0.1', 61253)
waiting for the next event
connection ('127.0.0.1', 61254)
waiting for the next event
received b'This is the message. ' from ('127.0.0.1', 61253)
received b'This is the message. ' from ('127.0.0.1', 61254)
waiting for the next event
sending b'This is the message. ' to ('127.0.0.1', 61253)
sending b'This is the message. ' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
received b'It will be sent ' from ('127.0.0.1', 61253)
received b'It will be sent ' from ('127.0.0.1', 61254)
waiting for the next event
sending b'It will be sent ' to ('127.0.0.1', 61253)
sending b'It will be sent ' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
received b'in parts.' from ('127.0.0.1', 61253)
received b'in parts.' from ('127.0.0.1', 61254)
waiting for the next event
sending b'in parts.' to ('127.0.0.1', 61253)
sending b'in parts.' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
closing ('127.0.0.1', 61254)
waiting for the next event
closing ('127.0.0.1', 61254)
waiting for the next event
Opciones específicas de plataforma¶
Las opciones menos portátiles proporcionadas por select
son epoll
, la
interfaz de programación de edge polling soportada por Linux; kqueue
, que
utiliza la cola del nucleo de BSD; y kevent
, interfaz de eventos del
nucleo de BSD. Consulta la documentación de la biblioteca del sistema
operativo para más detalles sobre cómo funcionan.
Ver también
- Documentación de la biblioteca estándar para select
selectors
– Abstracción de nivel superior en la parte superior deselect
.- Guía de programación de conectores – Una Guía instructiva por Gordon McMillan, incluida en la documentación de la biblioteca estándar.
socket
– Comunicación de red de bajo nivel.SocketServer
– Marco para la creación aplicaciones de servidor de red.asyncio
– Marco de E/A asíncrona- Unix Network Programming, Volume 1: The Sockets Networking API, 3/E De W. Richard Stevens, Bill Fenner, y Andrew M. Rudoff. Publicado pro Addison-Wesley Professional, 2004. ISBN-10: 0131411551
- Foundations of Python Network Programminng, 3/E De Brandon Rhodes y John Goerzen. Publicado por Apress, 2014. ISBN-10: 1430258543