Fundamentos de multiprocesing¶
La forma más sencilla de generar un segundo proceso es crear una instancia del
objeto Process
con una función de destino y llamar start()
para que
comience a trabajar.
import multiprocessing
def worker():
"""worker function"""
print('Worker')
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()
La salida incluye la palabra «Worker» impresa cinco veces, aunque puede que no salga completamente limpia, dependiendo del orden de ejecución, porque cada proceso compite por el acceso al flujo de salida.
$ python3 multiprocessing_simple.py
Worker
Worker
Worker
Worker
Worker
Por lo general, es más útil poder generar un proceso con argumentos para
decirle qué trabajo hacer. A diferencia de threading
, para pasar
argumentos a un multiprocessing
Process
, los argumentos deben poder ser
serializados usando pickle
. Este ejemplo pasa a cada trabajador un
número para imprimir.
import multiprocessing
def worker(num):
"""thread worker function"""
print('Worker:', num)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
El argumento entero ahora está incluido en el mensaje impreso por cada trabajador.
$ python3 multiprocessing_simpleargs.py
Worker: 1
Worker: 0
Worker: 2
Worker: 3
Worker: 4
Funciones objetivo importables¶
Una diferencia entre los ejemplos de threading
y multiprocessing
es la
protección adicional para __main__
usada en los ejemplos
multiprocessing
. Debido a la forma en que los nuevos procesos son
iniciados, el proceso hijo necesita poder importar la secuencia de comandos que
contiene la función de destino. Envolver la parte principal de la aplicación
en una verificación para __main__
asegura que no se ejecuta recursivamente
en cada niño cuando se importa el módulo. Otro enfoque es importar la función
de destino desde una secuencia de comando separada. Por ejemplo,
multiprocessing_import_main.py
usa una función de trabajo definida en un
segundo módulo.
import multiprocessing
import multiprocessing_import_worker
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(
target=multiprocessing_import_worker.worker,
)
jobs.append(p)
p.start()
La función de trabajo se define en multiprocessing_import_worker.py
.
def worker():
"""worker function"""
print('Worker')
return
Llamar al programa principal produce una salida similar al primer ejemplo.
$ python3 multiprocessing_import_main.py
Worker
Worker
Worker
Worker
Worker
Determinar el proceso actual¶
Pasar argumentos para identificar o nombrar el proceso es engorroso, y
innecesario. Cada instancia de Process
tiene un nombre con un valor
predeterminado que se puede cambiar cuando que se crea el proceso. Procesos de
denominación son útiles para realizar un seguimiento de ellos, especialmente en
aplicaciones con múltiples tipos de procesos que se ejecutan simultáneamente.
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print(name, 'Starting')
time.sleep(2)
print(name, 'Exiting')
def my_service():
name = multiprocessing.current_process().name
print(name, 'Starting')
time.sleep(3)
print(name, 'Exiting')
if __name__ == '__main__':
service = multiprocessing.Process(
name='my_service',
target=my_service,
)
worker_1 = multiprocessing.Process(
name='worker 1',
target=worker,
)
worker_2 = multiprocessing.Process( # default name
target=worker,
)
worker_1.start()
worker_2.start()
service.start()
La salida de depuración incluye el nombre del proceso actual en cada línea.
Las líneas con Process-3
en la columna de nombre corresponden a el proceso
sin nombre worker_2
.
$ python3 multiprocessing_names.py
worker 1 Starting
worker 1 Exiting
Process-3 Starting
Process-3 Exiting
my_service Starting
my_service Exiting
Procesos demonio¶
Por defecto, el programa principal no terminará hasta que todos los niños han terminado. Hay momentos en es útil iniciar un proceso de fondo que se ejecuta sin bloquear la salida del programa principal, como en servicios donde puede que no haya una manera fácil de interrumpir el trabajador, o donde dejarlo morir en medio de su trabajo no hace que se pierdan o dañen datos (por ejemplo, una tarea que genera «latidos») para una herramienta de monitoreo de servicio).
Para marcar un proceso como demonio, establece su atributo daemon
en
True
. El valor predeterminado es que los procesos no sean demonios.
import multiprocessing
import time
import sys
def daemon():
p = multiprocessing.current_process()
print('Starting:', p.name, p.pid)
sys.stdout.flush()
time.sleep(2)
print('Exiting :', p.name, p.pid)
sys.stdout.flush()
def non_daemon():
p = multiprocessing.current_process()
print('Starting:', p.name, p.pid)
sys.stdout.flush()
print('Exiting :', p.name, p.pid)
sys.stdout.flush()
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
time.sleep(1)
n.start()
La salida no incluye el mensaje «Exiting» del proceso demonio, ya que todos los procesos no demonios (incluidos el programa principal) terminan antes de que el proceso demonio despierte de sus dos segundos de sueño.
$ python3 multiprocessing_daemon.py
Starting: daemon 41838
Starting: non-daemon 41841
Exiting : non-daemon 41841
El proceso demonio se termina automáticamente antes de que el programa
principal termine, lo que evita dejar procesos huérfanos en ejecución. Esto
puede ser verificado al buscar el valor de ID de proceso impreso cuando el
programa corre, y revisando luego ese proceso con un comando como ps
.
Esperar por procesos¶
Para esperar hasta que un proceso haya completado su trabajo y termine, usa el
método join()
.
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
time.sleep(2)
print('Exiting :', name)
def non_daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
print('Exiting :', name)
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
time.sleep(1)
n.start()
d.join()
n.join()
Dado que el proceso principal espera a que el demonio se termine usando
join()
, esta vez se imprime el mensaje «Exiting».
$ python3 multiprocessing_daemon_join.py
Starting: non-daemon
Exiting : non-daemon
Starting: daemon
Exiting : daemon
Por defecto, join()
bloquea indefinidamente. También es posible pasar un
argumento de tiempo de espera (un númedo de coma flotante que representa el
número de segundos para esperar a que el proceso se vuelva inactivo). Si el
proceso no se completa dentro del período de tiempo de espera, join()
devuelve de todos modos.
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
time.sleep(2)
print('Exiting :', name)
def non_daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
print('Exiting :', name)
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
n.start()
d.join(1)
print('d.is_alive()', d.is_alive())
n.join()
Dado que el tiempo de espera transcurrido es menor que la cantidad de tiempo
que el demonio pausa, el proceso sigue «vivo» después de que join()
regresa.
$ python3 multiprocessing_daemon_join_timeout.py
Starting: non-daemon
Exiting : non-daemon
d.is_alive() True
Terminar procesos¶
Aunque es mejor usar el método de señalización píldora venenosa para un
proceso que debería salir (ver Pasar mensajes a procesos, más adelante en
este capítulo), si un proceso aparece bloqueado o muerto, puede ser útil para
poder matarlo por la fuerza. Llamando a terminate()
en un objeto de
proceso mata el proceso hijo.
import multiprocessing
import time
def slow_worker():
print('Starting worker')
time.sleep(0.1)
print('Finished worker')
if __name__ == '__main__':
p = multiprocessing.Process(target=slow_worker)
print('BEFORE:', p, p.is_alive())
p.start()
print('DURING:', p, p.is_alive())
p.terminate()
print('TERMINATED:', p, p.is_alive())
p.join()
print('JOINED:', p, p.is_alive())
Nota
Es importante join()
(unir) el proceso después de terminarlo con el fin
de dar tiempo al código de gestión de procesos para actualizar el estado
del objeto para reflejar la terminación.
$ python3 multiprocessing_terminate.py
BEFORE: <Process(Process-1, initial)> False
DURING: <Process(Process-1, started)> True
TERMINATED: <Process(Process-1, started)> True
JOINED: <Process(Process-1, stopped[SIGTERM])> False
Estado de salida del proceso¶
El código de estado producido cuando el proceso termina se puede acceder a
través del atributo exitcode
. Los rangos permitidos están listados en
the table below.
Código de salida | Significado |
---|---|
== 0 |
no se produjo ningún error |
> 0 |
el proceso tuvo un error, y salió con ese código |
< 0 |
El proceso fue terminado con una señal de -1 * exitcode |
import multiprocessing
import sys
import time
def exit_error():
sys.exit(1)
def exit_ok():
return
def return_value():
return 1
def raises():
raise RuntimeError('There was an error!')
def terminated():
time.sleep(3)
if __name__ == '__main__':
jobs = []
funcs = [
exit_error,
exit_ok,
return_value,
raises,
terminated,
]
for f in funcs:
print('Starting process for', f.__name__)
j = multiprocessing.Process(target=f, name=f.__name__)
jobs.append(j)
j.start()
jobs[-1].terminate()
for j in jobs:
j.join()
print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))
Los procesos que generan una excepción obtienen automáticamente una
exitcode
de 1.
$ python3 multiprocessing_exitcode.py
Starting process for exit_error
Starting process for exit_ok
Starting process for return_value
Starting process for raises
Starting process for terminated
Process raises:
Traceback (most recent call last):
File ".../lib/python3.6/multiprocessing/process.py", line 258,
in _bootstrap
self.run()
File ".../lib/python3.6/multiprocessing/process.py", line 93,
in run
self._target(*self._args, **self._kwargs)
File "multiprocessing_exitcode.py", line 28, in raises
raise RuntimeError('There was an error!')
RuntimeError: There was an error!
exit_error.exitcode = 1
exit_ok.exitcode = 0
return_value.exitcode = 0
raises.exitcode = 1
terminated.exitcode = -15
Logging¶
Al depurar problemas de concurrencia, puede ser útil tener acceso a los
elementos internos de los objetos proporcionados por multiprocessing
. Hay
una función conveniente a nivel de módulo para habilitar el registro llamada
log_to_stderr()
. Establece un objeto logger utilizando logging
y
agrega un controlador para que los mensajes de registro se envíen a canal de
error estándar.
import multiprocessing
import logging
import sys
def worker():
print('Doing some work')
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.DEBUG)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
De forma predeterminada, el nivel de registro se establece en NOTSET
, por
lo que no se procuden mensajes. Pasa un nivel diferente para inicializar el
registrador al nivel de detalle deseado.
$ python3 multiprocessing_log_to_stderr.py
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority
>= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with
priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
Para manipular el registrador directamente (cambiar su configuración de nivel o
agregar manejadores), use get_logger()
.
import multiprocessing
import logging
import sys
def worker():
print('Doing some work')
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
El registrador también se puede configurar a través de la interfaz de
programación de archivo de configuración logging
, usando el nombre
«multiprocessing
».
$ python3 multiprocessing_get_logger.py
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
Subclasificar process¶
Aunque la forma más sencilla de comenzar un trabajo en un proceso separado es
usar Process
y pasar una función de destino, también es posible utilizar
una subclase personalizada.
import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print('In {}'.format(self.name))
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
La clase derivada debería anular run()
para hacer su trabajo.
$ python3 multiprocessing_subclass.py
In Worker-1
In Worker-3
In Worker-2
In Worker-4
In Worker-5