Pasar mensajes a procesos

Al igual que con los hilos, un patrón de uso común para múltiples procesos es dividir un trabajo entre varios trabajadores para ejecutar en paralelo. El uso eficaz múltiples procesos usualmente requiere cierta comunicación entre ellos, para que el trabajo se pueda dividir y los resultados se puedan agregar. Una forma sencilla de comunicar entre procesos con multiprocessing es usar una Queue para pasar mensajes de ida y vuelta. Cualquier objeto que pueda ser serializado con pickle puede pasar por una Queue.

multiprocessing_queue.py
import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print('Doing something fancy in {} for {}!'.format(
            proc_name, self.name))


def worker(q):
    obj = q.get()
    obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

Este breve ejemplo solo pasa un solo mensaje a un solo trabajador, entonces el proceso principal espera a que el trabajador termine.

$ python3 multiprocessing_queue.py

Doing something fancy in Process-1 for Fancy Dan!

Un ejemplo más complejo muestra cómo gestionar varios trabajadores consumiendo datos de un JoinableQueue y pasar los resultados al proceso de padre. La técnica píldora venenosa se utiliza para detener la trabajadores. Después de configurar las tareas reales, el programa principal agrega una valor de «parada» por trabajador a la cola de trabajos. Cuando un trabajador encuentra el valor especial, se sale de su ciclo de procesamiento. El proceso principal utiliza el método join() de la cola de tareas para esperar a todas las tareas a finalizar antes de procesar los resultados.

multiprocessing_producer_consumer.py
import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print('{}: Exiting'.format(proc_name))
                self.task_queue.task_done()
                break
            print('{}: {}'.format(proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)


class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # pretend to take time to do the work
        return '{self.a} * {self.b} = {product}'.format(
            self=self, product=self.a * self.b)

    def __str__(self):
        return '{self.a} * {self.b}'.format(self=self)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating {} consumers'.format(num_consumers))
    consumers = [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1

Aunque los trabajos entran en la cola en orden, su ejecución es paralelizada por lo que no hay garantía sobre el orden en que serán terminadas.

$ python3 -u multiprocessing_producer_consumer.py

Creating 8 consumers
Consumer-1: 0 * 0
Consumer-2: 1 * 1
Consumer-3: 2 * 2
Consumer-4: 3 * 3
Consumer-5: 4 * 4
Consumer-6: 5 * 5
Consumer-7: 6 * 6
Consumer-8: 7 * 7
Consumer-3: 8 * 8
Consumer-7: 9 * 9
Consumer-4: Exiting
Consumer-1: Exiting
Consumer-2: Exiting
Consumer-5: Exiting
Consumer-6: Exiting
Consumer-8: Exiting
Consumer-7: Exiting
Consumer-3: Exiting
Result: 6 * 6 = 36
Result: 2 * 2 = 4
Result: 3 * 3 = 9
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 7 * 7 = 49
Result: 4 * 4 = 16
Result: 5 * 5 = 25
Result: 8 * 8 = 64
Result: 9 * 9 = 81

Señalizar entre procesos

La clase Event proporciona una manera simple de comunicar la información de estado entre procesos. Un evento se puede alternar entre estados armado y desarmado. Los usuarios del objeto de evento pueden esperar a que cambie de desarmado a armado, utilizando un valor opcional de tiempo de espera.

multiprocessing_event.py
import multiprocessing
import time


def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print('wait_for_event: starting')
    e.wait()
    print('wait_for_event: e.is_set()->', e.is_set())


def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print('wait_for_event_timeout: starting')
    e.wait(t)
    print('wait_for_event_timeout: e.is_set()->', e.is_set())


if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(
        name='block',
        target=wait_for_event,
        args=(e,),
    )
    w1.start()

    w2 = multiprocessing.Process(
        name='nonblock',
        target=wait_for_event_timeout,
        args=(e, 2),
    )
    w2.start()

    print('main: waiting before calling Event.set()')
    time.sleep(3)
    e.set()
    print('main: event is set')

Cuando wait() se agota, se devuelve sin error. El llamador es responsable de verificar el estado del evento usando is_set().

$ python3 -u multiprocessing_event.py

main: waiting before calling Event.set()
wait_for_event: starting
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
main: event is set
wait_for_event: e.is_set()-> True

Controlar el acceso a los recursos

En situaciones en las que un único recurso debe ser compartido entre procesos múltiples, se puede usar un Lock para evitar conflictos de acceso.

multiprocessing_lock.py
import multiprocessing
import sys


def worker_with(lock, stream):
    with lock:
        stream.write('Lock acquired via with\n')


def worker_no_with(lock, stream):
    lock.acquire()
    try:
        stream.write('Lock acquired directly\n')
    finally:
        lock.release()


lock = multiprocessing.Lock()
w = multiprocessing.Process(
    target=worker_with,
    args=(lock, sys.stdout),
)
nw = multiprocessing.Process(
    target=worker_no_with,
    args=(lock, sys.stdout),
)

w.start()
nw.start()

w.join()
nw.join()

En este ejemplo, los mensajes impresos en la consola se pueden mezclar juntos si los dos procesos no sincronizan su acceso de la secuencia de salida con el bloqueo.

$ python3 multiprocessing_lock.py

Lock acquired via with
Lock acquired directly

Sincronizar operaciones

Los objetos Condition se pueden usar para sincronizar partes de un flujo de trabajo para que algunos se ejecuten en paralelo pero otros se ejecutan de forma secuencial, incluso si están en procesos separados.

multiprocessing_condition.py
import multiprocessing
import time


def stage_1(cond):
    """perform first stage of work,
    then notify stage_2 to continue
    """
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        print('{} done and ready for stage 2'.format(name))
        cond.notify_all()


def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        cond.wait()
        print('{} running'.format(name))


if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1',
                                 target=stage_1,
                                 args=(condition,))
    s2_clients = [
        multiprocessing.Process(
            name='stage_2[{}]'.format(i),
            target=stage_2,
            args=(condition,),
        )
        for i in range(1, 3)
    ]

    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()

    s1.join()
    for c in s2_clients:
        c.join()

En este ejemplo, dos procesos ejecutan la segunda etapa de un trabajo en paralelo, pero solo después de que la primera etapa termina.

$ python3 -u multiprocessing_condition.py

Starting stage_2[1]
Starting stage_2[2]
Starting s1
s1 done and ready for stage 2
stage_2[1] running
stage_2[2] running

Control de acceso concurrente a los recursos

A veces es útil permitir que más de un trabajador acceda a un recurso a la vez, mientras que todavía se limita el número total. Por ejemplo, una agrupación de conexiones podría admitir un número fijo de conexiones simultáneas, o una aplicación de red podría soportar una número fijo de descargas concurrentes. Un Semaphore es una forma de gestionar esas conexiones.

multiprocessing_semaphore.py
import random
import multiprocessing
import time


class ActivePool:

    def __init__(self):
        super(ActivePool, self).__init__()
        self.mgr = multiprocessing.Manager()
        self.active = self.mgr.list()
        self.lock = multiprocessing.Lock()

    def makeActive(self, name):
        with self.lock:
            self.active.append(name)

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)

    def __str__(self):
        with self.lock:
            return str(self.active)


def worker(s, pool):
    name = multiprocessing.current_process().name
    with s:
        pool.makeActive(name)
        print('Activating {} now running {}'.format(
            name, pool))
        time.sleep(random.random())
        pool.makeInactive(name)


if __name__ == '__main__':
    pool = ActivePool()
    s = multiprocessing.Semaphore(3)
    jobs = [
        multiprocessing.Process(
            target=worker,
            name=str(i),
            args=(s, pool),
        )
        for i in range(10)
    ]

    for j in jobs:
        j.start()

    while True:
        alive = 0
        for j in jobs:
            if j.is_alive():
                alive += 1
                j.join(timeout=0.1)
                print('Now running {}'.format(pool))
        if alive == 0:
            # all done
            break

En este ejemplo, la clase ActivePool simplemente sirve como una forma cómoda de rastrear qué procesos se están ejecutando en un determinado momento. Un grupo de recursos real probablemente asignaría una conexión o algún otro valor para el proceso recién activado, y reclamar el valor cuando la tarea está terminada. Aquí, el grupo de recursos solo se utiliza para mantener los nombres de los procesos activos para mostrar que solo tres se están ejecutando concurrentemente.

$ python3 -u multiprocessing_semaphore.py

Activating 0 now running ['0', '1', '2']
Activating 1 now running ['0', '1', '2']
Activating 2 now running ['0', '1', '2']
Now running ['0', '1', '2']
Now running ['0', '1', '2']
Now running ['0', '1', '2']
Now running ['0', '1', '2']
Activating 3 now running ['0', '1', '3']
Activating 4 now running ['1', '3', '4']
Activating 6 now running ['1', '4', '6']
Now running ['1', '4', '6']
Now running ['1', '4', '6']
Activating 5 now running ['1', '4', '5']
Now running ['1', '4', '5']
Now running ['1', '4', '5']
Now running ['1', '4', '5']
Activating 8 now running ['4', '5', '8']
Now running ['4', '5', '8']
Now running ['4', '5', '8']
Now running ['4', '5', '8']
Now running ['4', '5', '8']
Now running ['4', '5', '8']
Activating 7 now running ['5', '8', '7']
Now running ['5', '8', '7']
Activating 9 now running ['8', '7', '9']
Now running ['8', '7', '9']
Now running ['8', '9']
Now running ['8', '9']
Now running ['9']
Now running ['9']
Now running ['9']
Now running ['9']
Now running []

Gestionar Estado Compartido

En el ejemplo anterior, se mantiene centralmente la lista de procesos activos en la instancia ActivePool a través de un tipo especial de objeto lista creado por un Manager. El Manager es responsable de coordinar el estado de la información compartida entre todos sus usuarios.

multiprocessing_manager_dict.py
import multiprocessing
import pprint


def worker(d, key, value):
    d[key] = value


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    d = mgr.dict()
    jobs = [
        multiprocessing.Process(
            target=worker,
            args=(d, i, i * 2),
        )
        for i in range(10)
    ]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print('Results:', d)

Al crear la lista a través del gestor, es compartida y las actualizan se ven en todos los procesos. Diccionarios también son compatibles.

$ python3 multiprocessing_manager_dict.py

Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14,
8: 16, 9: 18}

Espacios de nombres compartidos

Además de los diccionarios y las listas, un Manager puede crear un Namespace compartido.

multiprocessing_namespaces.py
import multiprocessing


def producer(ns, event):
    ns.value = 'This is the value'
    event.set()


def consumer(ns, event):
    try:
        print('Before event: {}'.format(ns.value))
    except Exception as err:
        print('Before event, error:', str(err))
    event.wait()
    print('After event:', ns.value)


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()

Cualquier valor con nombre agregado al Namespace es visible para todos los clientes que reciben la instancia Namespace.

$ python3 multiprocessing_namespaces.py

Before event, error: 'Namespace' object has no attribute 'value'
After event: This is the value

Es importante saber que las actualizaciones a los contenidos de valores mutables en el espacio de nombres no se propagan automáticamente.

multiprocessing_namespaces_mutable.py
import multiprocessing


def producer(ns, event):
    # DOES NOT UPDATE GLOBAL VALUE!
    ns.my_list.append('This is the value')
    event.set()


def consumer(ns, event):
    print('Before event:', ns.my_list)
    event.wait()
    print('After event :', ns.my_list)


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    namespace.my_list = []

    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()

Para actualizar la lista, adjúntala de nuevo al objeto de espacio de nombres.

$ python3 multiprocessing_namespaces_mutable.py

Before event: []
After event : []

Agrupación de procesos

La clase Pool se puede usar para administrar un número fijo de trabajadores para casos simples donde el trabajo a realizar puede dividirse y repartido entre trabajadores de forma independiente. Los valores de retorno de los trabajos se recogen y se devuelven como una lista. Los argumentos de la agrupación de procesos incluye el número de procesos y una función para ejecutar al iniciar proceso de la tarea (invocado una vez por proceso).

multiprocessing_pool.py
import multiprocessing


def do_calculation(data):
    return data * 2


def start_process():
    print('Starting', multiprocessing.current_process().name)


if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)

    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print('Pool    :', pool_outputs)

El resultado del método map() es funcionalmente equivalente al map() incorporado, excepto que las tareas individuales se ejecutan en paralelo. Como la agrupación está procesando sus entradas en paralelo, close() y join() puede usarse para sincronizar el proceso principal con el proceso de la tarea para asegurar una correcta limpieza.

$ python3 multiprocessing_pool.py

Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x1007b2be0>
Starting ForkPoolWorker-3
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
Starting ForkPoolWorker-1
Starting ForkPoolWorker-7
Starting ForkPoolWorker-2
Starting ForkPoolWorker-8
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Por defecto, Pool crea un número fijo de procesos de trabajo y les pasa trabajos hasta que no quedan más trabajos. Configurando el parámetro maxtasksperchild le dice a la agrupación que reinicie un proceso trabajador después de que haya terminado algunas tareas, evitando que trabajadores de ejecución prolongada consuman cada vez más recursos del sistema.

multiprocessing_pool_maxtasksperchild.py
import multiprocessing


def do_calculation(data):
    return data * 2


def start_process():
    print('Starting', multiprocessing.current_process().name)


if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)

    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
        maxtasksperchild=2,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print('Pool    :', pool_outputs)

La agrupación reinicia a los trabajadores cuando han completado sus tareas asignadas, incluso si no hay más trabajo. En esta salida, se crean ocho trabajadores, aunque solo hay 10 tareas, y cada trabajador puede completar dos de ellas a la vez.

$ python3 multiprocessing_pool_maxtasksperchild.py

Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x1007b21d0>
Starting ForkPoolWorker-1
Starting ForkPoolWorker-2
Starting ForkPoolWorker-4
Starting ForkPoolWorker-5
Starting ForkPoolWorker-6
Starting ForkPoolWorker-3
Starting ForkPoolWorker-7
Starting ForkPoolWorker-8
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]