select — Esperar E/S eficientemente

Propósito:Espera la notificación de que un canal de entrada o salida está listo.

El módulo select proporciona acceso a funciones específicas de la plataforma de monitoreo E/S. La interfaz más portátil es la función POSIX select(), que está disponible en Unix y Windows. El módulo también incluye poll(), una interfaz de programación solo para Unix, y varias opciones que solo funcionan con variantes específicas de Unix.

Nota

El nuevo módulo selectors proporciona una interfaz de nivel superior construida sobre las interfaz de programación select. Es más fácil de construir código portátil usando selectors, así que usa ese módulo a menos que la interfaz de programación de bajo nivel proporcionada por select es de alguna manera necesarias.

Usar select()

La función select() de Python es una interfaz directa a LA implementación del sistema operativo subyacente. Monitorea conectores, abre archivos y pipes (cualquier cosa con un método fileno() que devuelve un descriptor de archivo válido) hasta que sean legibles, escribibles o se produce un error de comunicación. select() hace que sea más fácil monitorear múltiples conexiones al mismo tiempo, y es más eficiente que escribir un bucle de sondeo en Python usando los tiempos de espera del conector, porque el monitoreo ocurre en la capa de red del sistema operativo, en lugar del interprete.

Nota

Usar objetos de archivo de Python con select() funciona para Unix, pero no es compatible con Windows.

El ejemplo del servidor de eco de la sección socket se puede ampliar para ver más de una conexión a la vez usando select(). La nueva versión comienza creando un conector TCP/IP que no se bloquea y configurarlo para escuchar en una dirección.

select_echo_server.py
import select
import socket
import sys
import queue

# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)

# Bind the socket to the port
server_address = ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address),
      file=sys.stderr)
server.bind(server_address)

# Listen for incoming connections
server.listen(5)

Los argumentos para select() son tres listas que contienen canales de comunicación a monitorear. La primera es una lista de los objetos para verificar los datos entrantes que se leerán, el segundo contiene objetos que recibirán datos salientes cuando haya espacio en su búfer, y el tercero aquellos que pueden tener un error (generalmente un combinación de los objetos del canal de entrada y salida). El siguiente paso en el servidor es configurar las listas que contienen fuentes de entrada y los destinos de salida que se pasarán a select().

# Sockets from which we expect to read
inputs = [server]

# Sockets to which we expect to write
outputs = []

El bucle principal del servidor agrega y elimina las conexiones de estas listas. Dado que esta versión del servidor va a esperar poder escribir a un conector antes de enviar cualquier dato (en lugar de enviar inmediatamente la respuesta), cada conexión de salida necesita una cola para actuar como un búfer para los datos que se enviarán a través de él.

# Outgoing message queues (socket:Queue)
message_queues = {}

La parte del programa principal del servidor hace un bucle, llamando a select() para bloquear y esperar la actividad de la red.

while inputs:

    # Wait for at least one of the sockets to be
    # ready for processing
    print('waiting for the next event', file=sys.stderr)
    readable, writable, exceptional = select.select(inputs,
                                                    outputs,
                                                    inputs)

select() devuelve tres nuevas listas, que contienen subconjuntos del contenido de las listas pasadas. Todos los conectores en la lista readable tiene datos entrantes almacenados en búfer y disponibles para ser leídos. Todos los conectores en la lista writable tienen espacio libre en su búfer y se puede escribir en ellos. Los conectores devueltos en excepcional han tenido un error (la definición real de «condición excepcional» depende de la plataforma).

Los conectores «legibles» representan tres casos posibles. Si el conector es el conector principal del «servidor», el que se usa para escuchar conexiones, entonces la condición «legible» significa que está listo para aceptar otra conexión entrante. Además de añadir la nueva conexión a la lista de entradas para monitorear, esta sección establece que el conector del cliente no se bloquee.

    # Handle inputs
    for s in readable:

        if s is server:
            # A "readable" socket is ready to accept a connection
            connection, client_address = s.accept()
            print('  connection from', client_address,
                  file=sys.stderr)
            connection.setblocking(0)
            inputs.append(connection)

            # Give the connection a queue for data
            # we want to send
            message_queues[connection] = queue.Queue()

El siguiente caso es una conexión establecida con un cliente que ha enviado datos. Los datos se leen con recv(), luego se colocan en la cola para que puedan ser enviados a través del conector y de vuelta al cliente.

        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
                print('  received {!r} from {}'.format(
                    data, s.getpeername()), file=sys.stderr,
                )
                message_queues[s].put(data)
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)

Un conector legible sin datos disponibles proviene de un cliente que se ha desconectado, y el flujo está listo para ser cerrado.

            else:
                # Interpret empty result as closed connection
                print('  closing', client_address,
                      file=sys.stderr)
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()

                # Remove message queue
                del message_queues[s]

Hay menos casos para las conexiones escribibles. Si hay datos en la cola para una conexión, se envía el siguiente mensaje. De otra manera, la conexión se elimina de la lista de conexiones de salida para que la próxima vez a través del bucle select() no indique que el conector está listo para enviar datos.

    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # No messages waiting so stop checking
            # for writability.
            print('  ', s.getpeername(), 'queue empty',
                  file=sys.stderr)
            outputs.remove(s)
        else:
            print('  sending {!r} to {}'.format(next_msg,
                                                s.getpeername()),
                  file=sys.stderr)
            s.send(next_msg)

Finalmente, si hay un error con un conector, se cierra.

    # Handle "exceptional conditions"
    for s in exceptional:
        print('exception condition on', s.getpeername(),
              file=sys.stderr)
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        # Remove message queue
        del message_queues[s]

El programa cliente de ejemplo utiliza dos conectores para demostrar cómo el servidor con select() administra múltiples conexiones al mismo tiempo. El cliente comienza conectando cada conector TCP/IP al servidor.

select_echo_multiclient.py
import socket
import sys

messages = [
    'This is the message. ',
    'It will be sent ',
    'in parts.',
]
server_address = ('localhost', 10000)

# Create a TCP/IP socket
socks = [
    socket.socket(socket.AF_INET, socket.SOCK_STREAM),
    socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]

# Connect the socket to the port where the server is listening
print('connecting to {} port {}'.format(*server_address),
      file=sys.stderr)
for s in socks:
    s.connect(server_address)

Luego envía una parte del mensaje a la vez a través de cada conector y lee todas las respuestas disponibles después de escribir nuevos datos.

for message in messages:
    outgoing_data = message.encode()

    # Send messages on both sockets
    for s in socks:
        print('{}: sending {!r}'.format(s.getsockname(),
                                        outgoing_data),
              file=sys.stderr)
        s.send(outgoing_data)

    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print('{}: received {!r}'.format(s.getsockname(),
                                         data),
              file=sys.stderr)
        if not data:
            print('closing socket', s.getsockname(),
                  file=sys.stderr)
            s.close()

Ejecuta el servidor en una ventana y el cliente en otra. La salida se verá así, con diferentes números de puerto.

$ python3 select_echo_server.py
starting up on localhost port 10000
waiting for the next event
  connection from ('127.0.0.1', 61003)
waiting for the next event
  connection from ('127.0.0.1', 61004)
waiting for the next event
  received b'This is the message. ' from ('127.0.0.1', 61003)
  received b'This is the message. ' from ('127.0.0.1', 61004)
waiting for the next event
  sending b'This is the message. ' to ('127.0.0.1', 61003)
  sending b'This is the message. ' to ('127.0.0.1', 61004)
waiting for the next event
   ('127.0.0.1', 61003) queue empty
   ('127.0.0.1', 61004) queue empty
waiting for the next event
  received b'It will be sent ' from ('127.0.0.1', 61003)
  received b'It will be sent ' from ('127.0.0.1', 61004)
waiting for the next event
  sending b'It will be sent ' to ('127.0.0.1', 61003)
  sending b'It will be sent ' to ('127.0.0.1', 61004)
waiting for the next event
   ('127.0.0.1', 61003) queue empty
   ('127.0.0.1', 61004) queue empty
waiting for the next event
  received b'in parts.' from ('127.0.0.1', 61003)
waiting for the next event
  received b'in parts.' from ('127.0.0.1', 61004)
  sending b'in parts.' to ('127.0.0.1', 61003)
waiting for the next event
   ('127.0.0.1', 61003) queue empty
  sending b'in parts.' to ('127.0.0.1', 61004)
waiting for the next event
   ('127.0.0.1', 61004) queue empty
waiting for the next event
  closing ('127.0.0.1', 61004)
  closing ('127.0.0.1', 61004)
waiting for the next event

La salida del cliente muestra los datos que se envían y reciben utilizando ambos conectores.

$ python3 select_echo_multiclient.py
connecting to localhost port 10000
('127.0.0.1', 61003): sending b'This is the message. '
('127.0.0.1', 61004): sending b'This is the message. '
('127.0.0.1', 61003): received b'This is the message. '
('127.0.0.1', 61004): received b'This is the message. '
('127.0.0.1', 61003): sending b'It will be sent '
('127.0.0.1', 61004): sending b'It will be sent '
('127.0.0.1', 61003): received b'It will be sent '
('127.0.0.1', 61004): received b'It will be sent '
('127.0.0.1', 61003): sending b'in parts.'
('127.0.0.1', 61004): sending b'in parts.'
('127.0.0.1', 61003): received b'in parts.'
('127.0.0.1', 61004): received b'in parts.'

E/S sin bloqueo con tiempos de espera

select() también toma un cuarto parámetro opcional, que es el número de segundos a esperar antes de interrumpir el monitoreo si no se han activado canales. El uso de un valor de tiempo de espera permite a un programa principal ejecutar select() como parte de un ciclo de procesamiento más grande, tomando otras acciones entre la comprobación de entrada de red.

Cuando el tiempo de espera expira, select() devuelve tres listas vacías. Actualizar el ejemplo del servidor para usar un tiempo de espera requiere agregar el argumento extra a la llamada select() y manejo de las listas vacías que select() devuelve.

select_echo_server_timeout.py
    readable, writable, exceptional = select.select(inputs,
                                                    outputs,
                                                    inputs,
                                                    timeout)

    if not (readable or writable or exceptional):
        print('  timed out, do some other work here',
              file=sys.stderr)
        continue

Esta versión «lenta» del programa cliente se detiene después de enviar cada mensaje, para simular la latencia u otro retraso en la transmisión.

select_echo_slow_client.py
import socket
import sys
import time

# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Connect the socket to the port where the server is listening
server_address = ('localhost', 10000)
print('connecting to {} port {}'.format(*server_address),
      file=sys.stderr)
sock.connect(server_address)

time.sleep(1)

messages = [
    'Part one of the message.',
    'Part two of the message.',
]
amount_expected = len(''.join(messages))

try:

    # Send data
    for message in messages:
        data = message.encode()
        print('sending {!r}'.format(data), file=sys.stderr)
        sock.sendall(data)
        time.sleep(1.5)

    # Look for the response
    amount_received = 0

    while amount_received < amount_expected:
        data = sock.recv(16)
        amount_received += len(data)
        print('received {!r}'.format(data), file=sys.stderr)

finally:
    print('closing socket', file=sys.stderr)
    sock.close()

Ejecutar el nuevo servidor con el cliente lento produce:

$ python3 select_echo_server_timeout.py
starting up on localhost port 10000
waiting for the next event
  timed out, do some other work here
waiting for the next event
  connection from ('127.0.0.1', 61144)
waiting for the next event
  timed out, do some other work here
waiting for the next event
  received b'Part one of the message.' from ('127.0.0.1', 61144)
waiting for the next event
  sending b'Part one of the message.' to ('127.0.0.1', 61144)
waiting for the next event
('127.0.0.1', 61144) queue empty
waiting for the next event
  timed out, do some other work here
waiting for the next event
  received b'Part two of the message.' from ('127.0.0.1', 61144)
waiting for the next event
  sending b'Part two of the message.' to ('127.0.0.1', 61144)
waiting for the next event
('127.0.0.1', 61144) queue empty
waiting for the next event
  timed out, do some other work here
waiting for the next event
closing ('127.0.0.1', 61144)
waiting for the next event
  timed out, do some other work here

Y esta es la salida del cliente:

$ python3 select_echo_slow_client.py
connecting to localhost port 10000
sending b'Part one of the message.'
sending b'Part two of the message.'
received b'Part one of the '
received b'message.Part two'
received b' of the message.'
closing socket

Usar poll()

La función poll() proporciona características similares a select(), pero la implementación subyacente es más eficiente. La compensación es que poll() no es compatible con Windows, por lo que los programas que usan poll() son menos portátiles.

Un servidor de eco construido con poll() comienza con el mismo código de configuración del conector utilizado en los otros ejemplos.

select_poll_echo_server.py
import select
import socket
import sys
import queue

# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)

# Bind the socket to the port
server_address = ('localhost', 10000)
print('starting up on {} port {}'.format(*server_address),
      file=sys.stderr)
server.bind(server_address)

# Listen for incoming connections
server.listen(5)

# Keep up with the queues of outgoing messages
message_queues = {}

El valor de tiempo de espera pasado en poll() se representa en milisegundos, en lugar de segundos, así que para hacer una pausa de un segundo completp, el tiempo de espera debe configurarse en 1000.

# Do not block forever (milliseconds)
TIMEOUT = 1000

Python implementa poll() con una clase que administra los canales de datos registrados siendo monitoreados. Los canales son agregados llamando a register() con indicadores que indican qué eventos son Interesante para ese canal. El conjunto completo de banderas está listado en the table below.

Indicadores de evento para poll()
Evento Descripción
POLLIN Entrada lista
POLLPRI Entrada de prioridad
POLLOUT Capaz de recibir salida
POLLERR Error
POLLHUP Canal cerrado
POLLNVAL Canal no disponible

El servidor de eco configurará algunos conectores solo para leer y otros para leer o escribir. Las combinaciones apropiadas de las banderas se guardan en las variables locales READ_ONLY y READ_WRITE.

# Commonly used flag sets
READ_ONLY = (
    select.POLLIN |
    select.POLLPRI |
    select.POLLHUP |
    select.POLLERR
)
READ_WRITE = READ_ONLY | select.POLLOUT

El conector server está registrado para que cualquier conexión de entrada o datos desencadenan un evento.

# Set up the poller
poller = select.poll()
poller.register(server, READ_ONLY)

Como poll() devuelve una lista de tuplas que contienen el archivo descriptor para el conector y el indicador de evento, solo se necesita una asignación de descriptores de archivo a los objetos necesarios para recuperar el socket para leer o escribir desde él.

# Map file descriptors to socket objects
fd_to_socket = {
    server.fileno(): server,
}

El bucle del servidor llama a poll() y luego procesa los «eventos» devueltos revisando el conector y tomando medidas en función de la bandera en el evento.

while True:

    # Wait for at least one of the sockets to be
    # ready for processing
    print('waiting for the next event', file=sys.stderr)
    events = poller.poll(TIMEOUT)

    for fd, flag in events:

        # Retrieve the actual socket from its file descriptor
        s = fd_to_socket[fd]

Al igual que con select(), cuando el conector del servidor principal es «legible» eso realmente significa que hay una conexión pendiente de un cliente. La nueva conexión se registra con las banderas READ_ONLY para ver que nuevos datos lleguen a través de él.

        # Handle inputs
        if flag & (select.POLLIN | select.POLLPRI):

            if s is server:
                # A readable socket is ready
                # to accept a connection
                connection, client_address = s.accept()
                print('  connection', client_address,
                      file=sys.stderr)
                connection.setblocking(0)
                fd_to_socket[connection.fileno()] = connection
                poller.register(connection, READ_ONLY)

                # Give the connection a queue for data to send
                message_queues[connection] = queue.Queue()

Los conectores que no sean el servidor son clientes existentes y recv() se utiliza para acceder a los datos en espera de ser leídos.

            else:
                data = s.recv(1024)

Si recv() devuelve algún dato, se coloca en la cola de salida para el conector, y las banderas para ese conector se cambian usando modify() así que poll() observará que el conector esté listo para recibir los datos.

                if data:
                    # A readable client socket has data
                    print('  received {!r} from {}'.format(
                        data, s.getpeername()), file=sys.stderr,
                    )
                    message_queues[s].put(data)
                    # Add output channel for response
                    poller.modify(s, READ_WRITE)

Una cadena vacía devuelta por recv() significa el cliente desconectado, por lo que unregister() se usa para indicar al objeto poll que ignore el conector.

                else:
                    # Interpret empty result as closed connection
                    print('  closing', client_address,
                          file=sys.stderr)
                    # Stop listening for input on the connection
                    poller.unregister(s)
                    s.close()

                    # Remove message queue
                    del message_queues[s]

La bandera POLLHUP indica un cliente que «colgó» el conexión sin cerrarla limpiamente. El servidor deja de sondear clientes que desaparecen.

        elif flag & select.POLLHUP:
            # Client hung up
            print('  closing', client_address, '(HUP)',
                  file=sys.stderr)
            # Stop listening for input on the connection
            poller.unregister(s)
            s.close()

El manejo de conectores de escritura se parece a la versión utilizada en el ejemplo para select(), excepto que modify() se usa para cambiar las banderas para el conector en la sonda, en lugar de eliminarlo de la lista de salida.

        elif flag & select.POLLOUT:
            # Socket is ready to send data,
            # if there is any to send.
            try:
                next_msg = message_queues[s].get_nowait()
            except queue.Empty:
                # No messages waiting so stop checking
                print(s.getpeername(), 'queue empty',
                      file=sys.stderr)
                poller.modify(s, READ_ONLY)
            else:
                print('  sending {!r} to {}'.format(
                    next_msg, s.getpeername()), file=sys.stderr,
                )
                s.send(next_msg)

Y, finalmente, cualquier evento con POLLERR hace que el servidor cierre el conector.

        elif flag & select.POLLERR:
            print('  exception on', s.getpeername(),
                  file=sys.stderr)
            # Stop listening for input on the connection
            poller.unregister(s)
            s.close()

            # Remove message queue
            del message_queues[s]

Cuando el servidor basado en sondeo se ejecuta junto con select_echo_multiclient.py (el programa cliente que usa múltiples conectores), esta es la salida.

$ python3 select_poll_echo_server.py
starting up on localhost port 10000
waiting for the next event
waiting for the next event
waiting for the next event
waiting for the next event
  connection ('127.0.0.1', 61253)
waiting for the next event
  connection ('127.0.0.1', 61254)
waiting for the next event
  received b'This is the message. ' from ('127.0.0.1', 61253)
  received b'This is the message. ' from ('127.0.0.1', 61254)
waiting for the next event
  sending b'This is the message. ' to ('127.0.0.1', 61253)
  sending b'This is the message. ' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
  received b'It will be sent ' from ('127.0.0.1', 61253)
  received b'It will be sent ' from ('127.0.0.1', 61254)
waiting for the next event
  sending b'It will be sent ' to ('127.0.0.1', 61253)
  sending b'It will be sent ' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
  received b'in parts.' from ('127.0.0.1', 61253)
  received b'in parts.' from ('127.0.0.1', 61254)
waiting for the next event
  sending b'in parts.' to ('127.0.0.1', 61253)
  sending b'in parts.' to ('127.0.0.1', 61254)
waiting for the next event
('127.0.0.1', 61253) queue empty
('127.0.0.1', 61254) queue empty
waiting for the next event
  closing ('127.0.0.1', 61254)
waiting for the next event
  closing ('127.0.0.1', 61254)
waiting for the next event

Opciones específicas de plataforma

Las opciones menos portátiles proporcionadas por select son epoll, la interfaz de programación de edge polling soportada por Linux; kqueue, que utiliza la cola del nucleo de BSD; y kevent, interfaz de eventos del nucleo de BSD. Consulta la documentación de la biblioteca del sistema operativo para más detalles sobre cómo funcionan.

Ver también

  • Documentación de la biblioteca estándar para select
  • selectors – Abstracción de nivel superior en la parte superior de select.
  • Guía de programación de conectores – Una Guía instructiva por Gordon McMillan, incluida en la documentación de la biblioteca estándar.
  • socket – Comunicación de red de bajo nivel.
  • SocketServer – Marco para la creación aplicaciones de servidor de red.
  • asyncio – Marco de E/A asíncrona
  • Unix Network Programming, Volume 1: The Sockets Networking API, 3/E De W. Richard Stevens, Bill Fenner, y Andrew M. Rudoff. Publicado pro Addison-Wesley Professional, 2004. ISBN-10: 0131411551
  • Foundations of Python Network Programminng, 3/E De Brandon Rhodes y John Goerzen. Publicado por Apress, 2014. ISBN-10: 1430258543