Trabajar con subprocesos

Con frecuencia es necesario trabajar con otros programas y procesos, para aprovechar el código existente sin reescribirlo o acceder a bibliotecas o funciones no disponibles desde Python. Al igual que con E/S de red, asyncio incluye dos abstracciones para iniciar otro programa y luego interactuar con él.

Usar las abstracciones de protocolo con subprocesos

Este ejemplo utiliza una co-rutina para iniciar un proceso para ejecutar el comando Unix df para encontrar el espacio libre en los discos locales. Usa subprocess_exec() para iniciar el proceso y vincularlo a una clase protocolo que sabe leer la salida del comando df y analizarla. Los métodos de la clase protocolo se llaman automáticamente basados en eventos de E/S para el subproceso. Porque tanto los argumentos stdin como de stderr están configurados establecidos en None, esos canales de comunicación no están conectados al nuevo proceso.

asyncio_subprocess_protocol.py
import asyncio
import functools


async def run_df(loop):
    print('in run_df')

    cmd_done = asyncio.Future(loop=loop)
    factory = functools.partial(DFProtocol, cmd_done)
    proc = loop.subprocess_exec(
        factory,
        'df', '-hl',
        stdin=None,
        stderr=None,
    )
    try:
        print('launching process')
        transport, protocol = await proc
        print('waiting for process to complete')
        await cmd_done
    finally:
        transport.close()

    return cmd_done.result()

La clase DFProtocol se deriva de SubprocessProtocol, que define la interfaz de programación para que una clase se comunique con otro proceso a través de pipes. El argumento done se espera que sea un Future que la persona que llama utilizará para seguir que el proceso termine.

class DFProtocol(asyncio.SubprocessProtocol):

    FD_NAMES = ['stdin', 'stdout', 'stderr']

    def __init__(self, done_future):
        self.done = done_future
        self.buffer = bytearray()
        super().__init__()

Al igual que con la comunicación de conector, connection_made() se invoca cuando se configuran los canales de entrada al nuevo proceso. El argumento transport es una instancia de una subclase de BaseSubprocessTransport. Puede leer la salida de datos de el proceso y escribir datos en el flujo de entrada para el proceso, si el proceso fue configurado para recibir entrada.

    def connection_made(self, transport):
        print('process started {}'.format(transport.get_pid()))
        self.transport = transport

Cuando el proceso ha generado resultados, pipe_data_received() es invocada con el descriptor de archivo donde se emitieron los datos y los datos reales leídos desde el pipe. La clase de protocolo guarda la salida de el canal de salida estándar del proceso en un buffer para procesarlos más tarde.

    def pipe_data_received(self, fd, data):
        print('read {} bytes from {}'.format(len(data),
                                             self.FD_NAMES[fd]))
        if fd == 1:
            self.buffer.extend(data)

Cuando el proceso termina, se llama a process_exited(). El código de salida del proceso está disponible desde el objeto de transporte llamando a get_returncode(). En este caso, si no se ha reportado error, el resultado disponible se decodifica y analiza antes de ser devuelto a través de la instancia Future. Si hay un error, se asume que los resultados están vacíos. Fijar el resultado del futuro le dice a run_df() que el proceso ha terminado, por lo que limpia y luego devuelve los resultados.

    def process_exited(self):
        print('process exited')
        return_code = self.transport.get_returncode()
        print('return code {}'.format(return_code))
        if not return_code:
            cmd_output = bytes(self.buffer).decode()
            results = self._parse_results(cmd_output)
        else:
            results = []
        self.done.set_result((return_code, results))

La salida del comando se analiza en una secuencia de diccionarios mapeando los nombres de encabezado a sus valores para cada línea de salida, y se devuelve la lista resultante.

    def _parse_results(self, output):
        print('parsing results')
        # Output has one row of headers, all single words.  The
        # remaining rows are one per filesystem, with columns
        # matching the headers (assuming that none of the
        # mount points have whitespace in the names).
        if not output:
            return []
        lines = output.splitlines()
        headers = lines[0].split()
        devices = lines[1:]
        results = [
            dict(zip(headers, line.split()))
            for line in devices
        ]
        return results

La co-rutina run_df() se ejecuta usando run_until_complete(), luego se examinan los resultados y el espacio libre en cada dispositivo se imprime.

event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        run_df(event_loop)
    )
finally:
    event_loop.close()

if return_code:
    print('error exit {}'.format(return_code))
else:
    print('\nFree space:')
    for r in results:
        print('{Mounted:25}: {Avail}'.format(**r))

La siguiente salida muestra la secuencia de pasos tomados, y el espacio libre en tres unidades en el sistema donde se ejecutó.

$ python3 asyncio_subprocess_protocol.py

in run_df
launching process
process started 49675
waiting for process to complete
read 332 bytes from stdout
process exited
return code 0
parsing results

Free space:
/                        : 233Gi
/Volumes/hubertinternal  : 157Gi
/Volumes/hubert-tm       : 2.3Ti

Llamar a subprocesos con co-rutinas y flujos

Para usar co-rutinas para ejecutar un proceso directamente, en lugar de acceder a él a través de una subclase Protocol, llama create_subprocess_exec() y especifica cuál de stdout, stderr, y stdin para conectarla a un pipe. El resultado de la co-rutina para generar el subproceso es una instancia Process que se puede utilizar para manipular el subproceso o comunicarse con él.

asyncio_subprocess_coroutine.py
import asyncio
import asyncio.subprocess


async def run_df():
    print('in run_df')

    buffer = bytearray()

    create = asyncio.create_subprocess_exec(
        'df', '-hl',
        stdout=asyncio.subprocess.PIPE,
    )
    print('launching process')
    proc = await create
    print('process started {}'.format(proc.pid))

En este ejemplo, df no necesita ninguna una entrada que no sean sus argumentos de línea de comando, por lo que el siguiente paso es leer toda la salida. Con el Protocol no hay control sobre la cantidad de datos que se leen a la vez. Este ejemplo usa readline() pero también podría llamar read() directamente para leer datos que no estén orientados a líneas. La salida del comando se almacena en búfer, como en el ejemplo del protocolo, por lo que se puede analizar más tarde.

    while True:
        line = await proc.stdout.readline()
        print('read {!r}'.format(line))
        if not line:
            print('no more output from command')
            break
        buffer.extend(line)

El método readline() devuelve una cadena de bytes vacía cuando no hay más salida porque el programa ha terminado. Para garantizar que el proceso se limpia correctamente, el siguiente paso es esperar a que el proceso termine completamente

    print('waiting for process to complete')
    await proc.wait()

En ese punto, se puede examinar el estado de salida para determinar si analizar la salida o tratar el error como si no produjo salida. La lógica de análisis es la misma que en el ejemplo anterior, pero está en una función independiente (que no se muestra aquí) porque no hay clase protocolo para ocultarla. Una vez analizados los datos, los resultados y el código de salida se devuelve a la persona que llama.

    return_code = proc.returncode
    print('return code {}'.format(return_code))
    if not return_code:
        cmd_output = bytes(buffer).decode()
        results = _parse_results(cmd_output)
    else:
        results = []

    return (return_code, results)

El programa principal se parece al ejemplo basado en protocolo, porque los cambios de implementación están aislados en run_df().

event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        run_df()
    )
finally:
    event_loop.close()

if return_code:
    print('error exit {}'.format(return_code))
else:
    print('\nFree space:')
    for r in results:
        print('{Mounted:25}: {Avail}'.format(**r))

Dado que la salida de df se puede leer una línea a la vez, se hizo eco para mostrar el progreso del programa. De lo contrario, la salida se ve similar al ejemplo anterior.

$ python3 asyncio_subprocess_coroutine.py

in run_df
launching process
process started 49678
read b'Filesystem     Size   Used  Avail Capacity   iused
ifree %iused  Mounted on\n'
read b'/dev/disk2s2  446Gi  213Gi  233Gi    48%  55955082
61015132   48%   /\n'
read b'/dev/disk1    465Gi  307Gi  157Gi    67%  80514922
41281172   66%   /Volumes/hubertinternal\n'
read b'/dev/disk3s2  3.6Ti  1.4Ti  2.3Ti    38% 181837749
306480579   37%   /Volumes/hubert-tm\n'
read b''
no more output from command
waiting for process to complete
return code 0
parsing results

Free space:
/                        : 233Gi
/Volumes/hubertinternal  : 157Gi
/Volumes/hubert-tm       : 2.3Ti

Enviar datos a un subproceso

Los dos ejemplos anteriores utilizaron un solo canal de comunicación para leer datos de un segundo proceso. A menudo es necesario enviar datos a un comando para su procesamiento. Este ejemplo define una co-rutina para que ejecute el comando de Unix tr para traducir los caracteres en su flujo de entrada. En este caso, tr se utiliza para convertir letras minúsculas a letras mayúsculas.

La co-rutina to_upper() toma como argumento un bucle de evento y una cadena de entrada. Genera un segundo proceso que ejecuta "tr [:lower:] [:upper:]".

asyncio_subprocess_coroutine_write.py
import asyncio
import asyncio.subprocess


async def to_upper(input):
    print('in to_upper')

    create = asyncio.create_subprocess_exec(
        'tr', '[:lower:]', '[:upper:]',
        stdout=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
    )
    print('launching process')
    proc = await create
    print('pid {}'.format(proc.pid))

A continuación to_upper() usa el método communicate() de Process para enviar la cadena de entrada al comando y leer toda la salida resultante, de forma asíncrona. Como con la versión subprocess.Popen del mismo método, communicate() devuelve las cadena completa de bytes de salida. Si un es probable que el comando genere más datos de los que caben cómodamente en la memoria, la entrada no se puede producir de una vez, o la salida debe ser procesada de forma incremental, es posible utilizar stdin, stdout, y stderr de Process directamente en lugar de llamar comunicate().

    print('communicating with process')
    stdout, stderr = await proc.communicate(input.encode())

Una vez finalizada la E/S, esperando que el proceso finalice por completo asegura que se limpie correctamente.

    print('waiting for process to complete')
    await proc.wait()

El código de retorno puede ser examinado, y la cadena de bytes de salida decodificada, para preparar el valor de retorno de la co-rutina.

    return_code = proc.returncode
    print('return code {}'.format(return_code))
    if not return_code:
        results = bytes(stdout).decode()
    else:
        results = ''

    return (return_code, results)

La parte principal del programa establece una cadena de mensaje para ser transformada, y luego configura el bucle de eventos para ejecutar to_upper() e imprime los resultados.

MESSAGE = """
This message will be converted
to all caps.
"""

event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        to_upper(MESSAGE)
    )
finally:
    event_loop.close()

if return_code:
    print('error exit {}'.format(return_code))
else:
    print('Original: {!r}'.format(MESSAGE))
    print('Changed : {!r}'.format(results))

La salida muestra la secuencia de operaciones y luego cómo se transforma el simple mensaje de texto.

$ python3 asyncio_subprocess_coroutine_write.py

in to_upper
launching process
pid 49684
communicating with process
waiting for process to complete
return code 0
Original: '\nThis message will be converted\nto all caps.\n'
Changed : '\nTHIS MESSAGE WILL BE CONVERTED\nTO ALL CAPS.\n'