Pasar mensajes a procesos¶
Al igual que con los hilos, un patrón de uso común para múltiples procesos es
dividir un trabajo entre varios trabajadores para ejecutar en paralelo. El uso
eficaz múltiples procesos usualmente requiere cierta comunicación entre ellos,
para que el trabajo se pueda dividir y los resultados se puedan agregar. Una
forma sencilla de comunicar entre procesos con multiprocessing
es usar una
Queue
para pasar mensajes de ida y vuelta. Cualquier objeto que pueda ser
serializado con pickle
puede pasar por una Queue
.
import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print('Doing something fancy in {} for {}!'.format(
proc_name, self.name))
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
Este breve ejemplo solo pasa un solo mensaje a un solo trabajador, entonces el proceso principal espera a que el trabajador termine.
$ python3 multiprocessing_queue.py
Doing something fancy in Process-1 for Fancy Dan!
Un ejemplo más complejo muestra cómo gestionar varios trabajadores consumiendo
datos de un JoinableQueue
y pasar los resultados al proceso de padre. La
técnica píldora venenosa se utiliza para detener la trabajadores. Después de
configurar las tareas reales, el programa principal agrega una valor de
«parada» por trabajador a la cola de trabajos. Cuando un trabajador encuentra
el valor especial, se sale de su ciclo de procesamiento. El proceso principal
utiliza el método join()
de la cola de tareas para esperar a todas las
tareas a finalizar antes de procesar los resultados.
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print('{}: Exiting'.format(proc_name))
self.task_queue.task_done()
break
print('{}: {}'.format(proc_name, next_task))
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
class Task:
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take time to do the work
return '{self.a} * {self.b} = {product}'.format(
self=self, product=self.a * self.b)
def __str__(self):
return '{self.a} * {self.b}'.format(self=self)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print('Creating {} consumers'.format(num_consumers))
consumers = [
Consumer(tasks, results)
for i in range(num_consumers)
]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in range(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print('Result:', result)
num_jobs -= 1
Aunque los trabajos entran en la cola en orden, su ejecución es paralelizada por lo que no hay garantía sobre el orden en que serán terminadas.
$ python3 -u multiprocessing_producer_consumer.py
Creating 8 consumers
Consumer-1: 0 * 0
Consumer-2: 1 * 1
Consumer-3: 2 * 2
Consumer-4: 3 * 3
Consumer-5: 4 * 4
Consumer-6: 5 * 5
Consumer-7: 6 * 6
Consumer-8: 7 * 7
Consumer-3: 8 * 8
Consumer-7: 9 * 9
Consumer-4: Exiting
Consumer-1: Exiting
Consumer-2: Exiting
Consumer-5: Exiting
Consumer-6: Exiting
Consumer-8: Exiting
Consumer-7: Exiting
Consumer-3: Exiting
Result: 6 * 6 = 36
Result: 2 * 2 = 4
Result: 3 * 3 = 9
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 7 * 7 = 49
Result: 4 * 4 = 16
Result: 5 * 5 = 25
Result: 8 * 8 = 64
Result: 9 * 9 = 81
Señalizar entre procesos¶
La clase Event
proporciona una manera simple de comunicar la información de
estado entre procesos. Un evento se puede alternar entre estados armado y
desarmado. Los usuarios del objeto de evento pueden esperar a que cambie de
desarmado a armado, utilizando un valor opcional de tiempo de espera.
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')
Cuando wait()
se agota, se devuelve sin error. El llamador es responsable
de verificar el estado del evento usando is_set()
.
$ python3 -u multiprocessing_event.py
main: waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
main: event is set
wait_for_event: e.is_set()-> True
Controlar el acceso a los recursos¶
En situaciones en las que un único recurso debe ser compartido entre procesos
múltiples, se puede usar un Lock
para evitar conflictos de acceso.
import multiprocessing
import sys
def worker_with(lock, stream):
with lock:
stream.write('Lock acquired via with\n')
def worker_no_with(lock, stream):
lock.acquire()
try:
stream.write('Lock acquired directly\n')
finally:
lock.release()
lock = multiprocessing.Lock()
w = multiprocessing.Process(
target=worker_with,
args=(lock, sys.stdout),
)
nw = multiprocessing.Process(
target=worker_no_with,
args=(lock, sys.stdout),
)
w.start()
nw.start()
w.join()
nw.join()
En este ejemplo, los mensajes impresos en la consola se pueden mezclar juntos si los dos procesos no sincronizan su acceso de la secuencia de salida con el bloqueo.
$ python3 multiprocessing_lock.py
Lock acquired via with
Lock acquired directly
Sincronizar operaciones¶
Los objetos Condition
se pueden usar para sincronizar partes de un flujo de
trabajo para que algunos se ejecuten en paralelo pero otros se ejecutan de
forma secuencial, incluso si están en procesos separados.
import multiprocessing
import time
def stage_1(cond):
"""perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all()
def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
cond.wait()
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',
target=stage_1,
args=(condition,))
s2_clients = [
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
En este ejemplo, dos procesos ejecutan la segunda etapa de un trabajo en paralelo, pero solo después de que la primera etapa termina.
$ python3 -u multiprocessing_condition.py
Starting stage_2[1]
Starting stage_2[2]
Starting s1
s1 done and ready for stage 2
stage_2[1] running
stage_2[2] running
Control de acceso concurrente a los recursos¶
A veces es útil permitir que más de un trabajador acceda a un recurso a la vez,
mientras que todavía se limita el número total. Por ejemplo, una agrupación de
conexiones podría admitir un número fijo de conexiones simultáneas, o una
aplicación de red podría soportar una número fijo de descargas concurrentes.
Un Semaphore
es una forma de gestionar esas conexiones.
import random
import multiprocessing
import time
class ActivePool:
def __init__(self):
super(ActivePool, self).__init__()
self.mgr = multiprocessing.Manager()
self.active = self.mgr.list()
self.lock = multiprocessing.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
def __str__(self):
with self.lock:
return str(self.active)
def worker(s, pool):
name = multiprocessing.current_process().name
with s:
pool.makeActive(name)
print('Activating {} now running {}'.format(
name, pool))
time.sleep(random.random())
pool.makeInactive(name)
if __name__ == '__main__':
pool = ActivePool()
s = multiprocessing.Semaphore(3)
jobs = [
multiprocessing.Process(
target=worker,
name=str(i),
args=(s, pool),
)
for i in range(10)
]
for j in jobs:
j.start()
while True:
alive = 0
for j in jobs:
if j.is_alive():
alive += 1
j.join(timeout=0.1)
print('Now running {}'.format(pool))
if alive == 0:
# all done
break
En este ejemplo, la clase ActivePool
simplemente sirve como una forma
cómoda de rastrear qué procesos se están ejecutando en un determinado momento.
Un grupo de recursos real probablemente asignaría una conexión o algún otro
valor para el proceso recién activado, y reclamar el valor cuando la tarea está
terminada. Aquí, el grupo de recursos solo se utiliza para mantener los
nombres de los procesos activos para mostrar que solo tres se están ejecutando
concurrentemente.
$ python3 -u multiprocessing_semaphore.py
Activating 0 now running ['0', '1', '2']
Activating 1 now running ['0', '1', '2']
Activating 2 now running ['0', '1', '2']
Now running ['0', '1', '2']
Now running ['0', '1', '2']
Now running ['0', '1', '2']
Now running ['0', '1', '2']
Activating 3 now running ['0', '1', '3']
Activating 4 now running ['1', '3', '4']
Activating 6 now running ['1', '4', '6']
Now running ['1', '4', '6']
Now running ['1', '4', '6']
Activating 5 now running ['1', '4', '5']
Now running ['1', '4', '5']
Now running ['1', '4', '5']
Now running ['1', '4', '5']
Activating 8 now running ['4', '5', '8']
Now running ['4', '5', '8']
Now running ['4', '5', '8']
Now running ['4', '5', '8']
Now running ['4', '5', '8']
Now running ['4', '5', '8']
Activating 7 now running ['5', '8', '7']
Now running ['5', '8', '7']
Activating 9 now running ['8', '7', '9']
Now running ['8', '7', '9']
Now running ['8', '9']
Now running ['8', '9']
Now running ['9']
Now running ['9']
Now running ['9']
Now running ['9']
Now running []
Gestionar Estado Compartido¶
En el ejemplo anterior, se mantiene centralmente la lista de procesos activos
en la instancia ActivePool
a través de un tipo especial de objeto lista
creado por un Manager
. El Manager
es responsable de coordinar el
estado de la información compartida entre todos sus usuarios.
import multiprocessing
import pprint
def worker(d, key, value):
d[key] = value
if __name__ == '__main__':
mgr = multiprocessing.Manager()
d = mgr.dict()
jobs = [
multiprocessing.Process(
target=worker,
args=(d, i, i * 2),
)
for i in range(10)
]
for j in jobs:
j.start()
for j in jobs:
j.join()
print('Results:', d)
Al crear la lista a través del gestor, es compartida y las actualizan se ven en todos los procesos. Diccionarios también son compatibles.
$ python3 multiprocessing_manager_dict.py
Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14,
8: 16, 9: 18}
Espacios de nombres compartidos¶
Además de los diccionarios y las listas, un Manager
puede crear un
Namespace
compartido.
import multiprocessing
def producer(ns, event):
ns.value = 'This is the value'
event.set()
def consumer(ns, event):
try:
print('Before event: {}'.format(ns.value))
except Exception as err:
print('Before event, error:', str(err))
event.wait()
print('After event:', ns.value)
if __name__ == '__main__':
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
event = multiprocessing.Event()
p = multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c = multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
Cualquier valor con nombre agregado al Namespace
es visible para todos los
clientes que reciben la instancia Namespace
.
$ python3 multiprocessing_namespaces.py
Before event, error: 'Namespace' object has no attribute 'value'
After event: This is the value
Es importante saber que las actualizaciones a los contenidos de valores mutables en el espacio de nombres no se propagan automáticamente.
import multiprocessing
def producer(ns, event):
# DOES NOT UPDATE GLOBAL VALUE!
ns.my_list.append('This is the value')
event.set()
def consumer(ns, event):
print('Before event:', ns.my_list)
event.wait()
print('After event :', ns.my_list)
if __name__ == '__main__':
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
namespace.my_list = []
event = multiprocessing.Event()
p = multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c = multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
Para actualizar la lista, adjúntala de nuevo al objeto de espacio de nombres.
$ python3 multiprocessing_namespaces_mutable.py
Before event: []
After event : []
Agrupación de procesos¶
La clase Pool
se puede usar para administrar un número fijo de trabajadores
para casos simples donde el trabajo a realizar puede dividirse y repartido
entre trabajadores de forma independiente. Los valores de retorno de los
trabajos se recogen y se devuelven como una lista. Los argumentos de la
agrupación de procesos incluye el número de procesos y una función para
ejecutar al iniciar proceso de la tarea (invocado una vez por proceso).
import multiprocessing
def do_calculation(data):
return data * 2
def start_process():
print('Starting', multiprocessing.current_process().name)
if __name__ == '__main__':
inputs = list(range(10))
print('Input :', inputs)
builtin_outputs = map(do_calculation, inputs)
print('Built-in:', builtin_outputs)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks
print('Pool :', pool_outputs)
El resultado del método map()
es funcionalmente equivalente al map()
incorporado, excepto que las tareas individuales se ejecutan en paralelo. Como
la agrupación está procesando sus entradas en paralelo, close()
y
join()
puede usarse para sincronizar el proceso principal con el proceso de
la tarea para asegurar una correcta limpieza.
$ python3 multiprocessing_pool.py
Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x1007b2be0>
Starting ForkPoolWorker-3
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
Starting ForkPoolWorker-1
Starting ForkPoolWorker-7
Starting ForkPoolWorker-2
Starting ForkPoolWorker-8
Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Por defecto, Pool
crea un número fijo de procesos de trabajo y les pasa
trabajos hasta que no quedan más trabajos. Configurando el parámetro
maxtasksperchild
le dice a la agrupación que reinicie un proceso trabajador
después de que haya terminado algunas tareas, evitando que trabajadores de
ejecución prolongada consuman cada vez más recursos del sistema.
import multiprocessing
def do_calculation(data):
return data * 2
def start_process():
print('Starting', multiprocessing.current_process().name)
if __name__ == '__main__':
inputs = list(range(10))
print('Input :', inputs)
builtin_outputs = map(do_calculation, inputs)
print('Built-in:', builtin_outputs)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
maxtasksperchild=2,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks
print('Pool :', pool_outputs)
La agrupación reinicia a los trabajadores cuando han completado sus tareas asignadas, incluso si no hay más trabajo. En esta salida, se crean ocho trabajadores, aunque solo hay 10 tareas, y cada trabajador puede completar dos de ellas a la vez.
$ python3 multiprocessing_pool_maxtasksperchild.py
Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x1007b21d0>
Starting ForkPoolWorker-1
Starting ForkPoolWorker-2
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
Starting ForkPoolWorker-3
Starting ForkPoolWorker-7
Starting ForkPoolWorker-8
Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]