Fundamentos de multiprocesing

La forma más sencilla de generar un segundo proceso es crear una instancia del objeto Process con una función de destino y llamar start() para que comience a trabajar.

multiprocessing_simple.py
import multiprocessing


def worker():
    """worker function"""
    print('Worker')


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

La salida incluye la palabra «Worker» impresa cinco veces, aunque puede que no salga completamente limpia, dependiendo del orden de ejecución, porque cada proceso compite por el acceso al flujo de salida.

$ python3 multiprocessing_simple.py

Worker
Worker
Worker
Worker
Worker

Por lo general, es más útil poder generar un proceso con argumentos para decirle qué trabajo hacer. A diferencia de threading, para pasar argumentos a un multiprocessing Process, los argumentos deben poder ser serializados usando pickle. Este ejemplo pasa a cada trabajador un número para imprimir.

multiprocessing_simpleargs.py
import multiprocessing


def worker(num):
    """thread worker function"""
    print('Worker:', num)


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

El argumento entero ahora está incluido en el mensaje impreso por cada trabajador.

$ python3 multiprocessing_simpleargs.py

Worker: 1
Worker: 0
Worker: 2
Worker: 3
Worker: 4

Funciones objetivo importables

Una diferencia entre los ejemplos de threading y multiprocessing es la protección adicional para __main__ usada en los ejemplos multiprocessing. Debido a la forma en que los nuevos procesos son iniciados, el proceso hijo necesita poder importar la secuencia de comandos que contiene la función de destino. Envolver la parte principal de la aplicación en una verificación para __main__ asegura que no se ejecuta recursivamente en cada niño cuando se importa el módulo. Otro enfoque es importar la función de destino desde una secuencia de comando separada. Por ejemplo, multiprocessing_import_main.py usa una función de trabajo definida en un segundo módulo.

multiprocessing_import_main.py
import multiprocessing
import multiprocessing_import_worker

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(
            target=multiprocessing_import_worker.worker,
        )
        jobs.append(p)
        p.start()

La función de trabajo se define en multiprocessing_import_worker.py.

multiprocessing_import_worker.py
def worker():
    """worker function"""
    print('Worker')
    return

Llamar al programa principal produce una salida similar al primer ejemplo.

$ python3 multiprocessing_import_main.py

Worker
Worker
Worker
Worker
Worker

Determinar el proceso actual

Pasar argumentos para identificar o nombrar el proceso es engorroso, y innecesario. Cada instancia de Process tiene un nombre con un valor predeterminado que se puede cambiar cuando que se crea el proceso. Procesos de denominación son útiles para realizar un seguimiento de ellos, especialmente en aplicaciones con múltiples tipos de procesos que se ejecutan simultáneamente.

multiprocessing_names.py
import multiprocessing
import time


def worker():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')


def my_service():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')


if __name__ == '__main__':
    service = multiprocessing.Process(
        name='my_service',
        target=my_service,
    )
    worker_1 = multiprocessing.Process(
        name='worker 1',
        target=worker,
    )
    worker_2 = multiprocessing.Process(  # default name
        target=worker,
    )

    worker_1.start()
    worker_2.start()
    service.start()

La salida de depuración incluye el nombre del proceso actual en cada línea. Las líneas con Process-3 en la columna de nombre corresponden a el proceso sin nombre worker_2.

$ python3 multiprocessing_names.py

worker 1 Starting
worker 1 Exiting
Process-3 Starting
Process-3 Exiting
my_service Starting
my_service Exiting

Procesos demonio

Por defecto, el programa principal no terminará hasta que todos los niños han terminado. Hay momentos en es útil iniciar un proceso de fondo que se ejecuta sin bloquear la salida del programa principal, como en servicios donde puede que no haya una manera fácil de interrumpir el trabajador, o donde dejarlo morir en medio de su trabajo no hace que se pierdan o dañen datos (por ejemplo, una tarea que genera «latidos») para una herramienta de monitoreo de servicio).

Para marcar un proceso como demonio, establece su atributo daemon en True. El valor predeterminado es que los procesos no sean demonios.

multiprocessing_daemon.py
import multiprocessing
import time
import sys


def daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(2)
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


def non_daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

La salida no incluye el mensaje «Exiting» del proceso demonio, ya que todos los procesos no demonios (incluidos el programa principal) terminan antes de que el proceso demonio despierte de sus dos segundos de sueño.

$ python3 multiprocessing_daemon.py

Starting: daemon 41838
Starting: non-daemon 41841
Exiting : non-daemon 41841

El proceso demonio se termina automáticamente antes de que el programa principal termine, lo que evita dejar procesos huérfanos en ejecución. Esto puede ser verificado al buscar el valor de ID de proceso impreso cuando el programa corre, y revisando luego ese proceso con un comando como ps.

Esperar por procesos

Para esperar hasta que un proceso haya completado su trabajo y termine, usa el método join().

multiprocessing_daemon_join.py
import multiprocessing
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

    d.join()
    n.join()

Dado que el proceso principal espera a que el demonio se termine usando join(), esta vez se imprime el mensaje «Exiting».

$ python3 multiprocessing_daemon_join.py

Starting: non-daemon
Exiting : non-daemon
Starting: daemon
Exiting : daemon

Por defecto, join() bloquea indefinidamente. También es posible pasar un argumento de tiempo de espera (un númedo de coma flotante que representa el número de segundos para esperar a que el proceso se vuelva inactivo). Si el proceso no se completa dentro del período de tiempo de espera, join() devuelve de todos modos.

multiprocessing_daemon_join_timeout.py
import multiprocessing
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    time.sleep(2)
    print('Exiting :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('Starting:', name)
    print('Exiting :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    n.start()

    d.join(1)
    print('d.is_alive()', d.is_alive())
    n.join()

Dado que el tiempo de espera transcurrido es menor que la cantidad de tiempo que el demonio pausa, el proceso sigue «vivo» después de que join() regresa.

$ python3 multiprocessing_daemon_join_timeout.py

Starting: non-daemon
Exiting : non-daemon
d.is_alive() True

Terminar procesos

Aunque es mejor usar el método de señalización píldora venenosa para un proceso que debería salir (ver Pasar mensajes a procesos, más adelante en este capítulo), si un proceso aparece bloqueado o muerto, puede ser útil para poder matarlo por la fuerza. Llamando a terminate() en un objeto de proceso mata el proceso hijo.

multiprocessing_terminate.py
import multiprocessing
import time


def slow_worker():
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')


if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print('BEFORE:', p, p.is_alive())

    p.start()
    print('DURING:', p, p.is_alive())

    p.terminate()
    print('TERMINATED:', p, p.is_alive())

    p.join()
    print('JOINED:', p, p.is_alive())

Nota

Es importante join() (unir) el proceso después de terminarlo con el fin de dar tiempo al código de gestión de procesos para actualizar el estado del objeto para reflejar la terminación.

$ python3 multiprocessing_terminate.py

BEFORE: <Process(Process-1, initial)> False
DURING: <Process(Process-1, started)> True
TERMINATED: <Process(Process-1, started)> True
JOINED: <Process(Process-1, stopped[SIGTERM])> False

Estado de salida del proceso

El código de estado producido cuando el proceso termina se puede acceder a través del atributo exitcode. Los rangos permitidos están listados en the table below.

Códigos de salida de multiprocesamiento
Código de salida Significado
== 0 no se produjo ningún error
> 0 el proceso tuvo un error, y salió con ese código
< 0 El proceso fue terminado con una señal de -1 * exitcode
multiprocessing_exitcode.py
import multiprocessing
import sys
import time


def exit_error():
    sys.exit(1)


def exit_ok():
    return


def return_value():
    return 1


def raises():
    raise RuntimeError('There was an error!')


def terminated():
    time.sleep(3)


if __name__ == '__main__':
    jobs = []
    funcs = [
        exit_error,
        exit_ok,
        return_value,
        raises,
        terminated,
    ]
    for f in funcs:
        print('Starting process for', f.__name__)
        j = multiprocessing.Process(target=f, name=f.__name__)
        jobs.append(j)
        j.start()

    jobs[-1].terminate()

    for j in jobs:
        j.join()
        print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))

Los procesos que generan una excepción obtienen automáticamente una exitcode de 1.

$ python3 multiprocessing_exitcode.py

Starting process for exit_error
Starting process for exit_ok
Starting process for return_value
Starting process for raises
Starting process for terminated
Process raises:
Traceback (most recent call last):
  File ".../lib/python3.6/multiprocessing/process.py", line 258,
in _bootstrap
    self.run()
  File ".../lib/python3.6/multiprocessing/process.py", line 93,
in run
    self._target(*self._args, **self._kwargs)
  File "multiprocessing_exitcode.py", line 28, in raises
    raise RuntimeError('There was an error!')
RuntimeError: There was an error!
     exit_error.exitcode = 1
        exit_ok.exitcode = 0
   return_value.exitcode = 0
         raises.exitcode = 1
     terminated.exitcode = -15

Logging

Al depurar problemas de concurrencia, puede ser útil tener acceso a los elementos internos de los objetos proporcionados por multiprocessing. Hay una función conveniente a nivel de módulo para habilitar el registro llamada log_to_stderr(). Establece un objeto logger utilizando logging y agrega un controlador para que los mensajes de registro se envíen a canal de error estándar.

multiprocessing_log_to_stderr.py
import multiprocessing
import logging
import sys


def worker():
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

De forma predeterminada, el nivel de registro se establece en NOTSET, por lo que no se procuden mensajes. Pasa un nivel diferente para inicializar el registrador al nivel de detalle deseado.

$ python3 multiprocessing_log_to_stderr.py

[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority
>= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with
priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

Para manipular el registrador directamente (cambiar su configuración de nivel o agregar manejadores), use get_logger().

multiprocessing_get_logger.py
import multiprocessing
import logging
import sys


def worker():
    print('Doing some work')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

El registrador también se puede configurar a través de la interfaz de programación de archivo de configuración logging, usando el nombre «multiprocessing».

$ python3 multiprocessing_get_logger.py

[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

Subclasificar process

Aunque la forma más sencilla de comenzar un trabajo en un proceso separado es usar Process y pasar una función de destino, también es posible utilizar una subclase personalizada.

multiprocessing_subclass.py
import multiprocessing


class Worker(multiprocessing.Process):

    def run(self):
        print('In {}'.format(self.name))
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

La clase derivada debería anular run() para hacer su trabajo.

$ python3 multiprocessing_subclass.py

In Worker-1
In Worker-3
In Worker-2
In Worker-4
In Worker-5