Introducción

Cuando implementamos programas para tratamiento y análisis de datos, uno de los requisitos que surge con gran frecuencia es cómo podemos paralelizar la ejecución de tareas que tengan poca o ninguna dependencia entre sí. Si bien es cierto que plataformas como Hadoop resuelven este problema escalando el tratamiento de datos en un cluster de varias máquinas, no siempre se puede tener acceso a ese tipo de recursos. Por otro lado, en ocasiones una implementación más sencilla puede ayudarnos a ahorrar tiempo, paralelizando ciertas tareas sin gran esfuerzo.

En este contexto, surge la utilidad del módulo multiprocessing en Python [1]. Este módulo nos ofrece una interfaz sencilla para la ejecución de múltiples procesos en paralelo, siempre y cuando nuestro equipo cuente con más de una unidad de procesamiento (como resulta habitual en el hardware moderno). El catálogo de métodos que incluye la API de multiprocessing se diseñó para ser muy similar al que ofrece el módulo threading.

Primer ejemplo

Veamos un primer ejemplo sencillo de cómo podemos crear múltiples procesos con esta fórmula.


In [6]:
import multiprocessing as mp

def worker(num):
    """
    Método que realizará el trabajo, clonándose en cada nuevo proceso
    """
    name = mp.current_process().name
    print "Comenzando proceso: %s" % name
    print "Worker número: %s" % num
    print "Finalizando proceso: %s" % name
    
    
if __name__ == "__main__":
    # Definición y puesta en marcha de subprocesos
    # Mantenemos un listado de referencias a los procesos creados
    workers = []
    for i in range(5):
        p = mp.Process(name="Proceso-"+unicode(i), target=worker, args=(i,) )
        workers.append(p)
        workers[i].start()
        
    # Esperar a que finalicen todos los subprocesos
    for p in workers:
        p.join()


Comenzando proceso: Proceso-0
Comenzando proceso: Proceso-1
Comenzando proceso: Proceso-3
Comenzando proceso: Proceso-2
Comenzando proceso: Proceso-4
Worker número: 0
Worker número: 1
Worker número: 3
Worker número: 2
Worker número: 4
Finalizando proceso: Proceso-0
Finalizando proceso: Proceso-1
Finalizando proceso: Proceso-3
Finalizando proceso: Proceso-2
Finalizando proceso: Proceso-4

Ejercicio 1

El método p.is_alive() permite comprobar si un proceso continúa vivo o no. Modifique el ejemplo anterior para comprobar que mientras los subprocesos que hemos creado siguen activos el proceso padre puede recuperar el estado de ejecución de cada proceso.

Servicio de logging

Además de las funciones básicas de creación, terminación, configuración y espera de la finalización de procesos, tenemos a nuestra disposición otras funciones muy útiles tales como un servicio centralizado de logging para todos nuestros subprocesos. Veamos un ejemplo.


In [2]:
import multiprocessing as mp
import logging
import time

def worker(num):
    """
    Método que realizará el trabajo, clonándose en cada nuevo proceso
    """
    name = mp.current_process().name
    print "Comenzando proceso: %s" % name
    time.sleep(2)  # Cada subproceso esperará 2 segs.
    print "Finalizando proceso: %s" % name
    
    
if __name__ == "__main__":
    # Definición y activación del logger centralizado
    mp.log_to_stderr()
    logger = mp.get_logger()
    logger.setLevel(logging.INFO)
    
    # Definición y puesta en marcha de subprocesos
    # Mantenemos un listado de referencias a los procesos creados
    workers = []
    for i in range(5):
        p = mp.Process(name="Proceso-"+unicode(i), target=worker, args=(i,) )
        workers.append(p)
        workers[i].start()
        
    # Esperar a que finalicen todos los subprocesos
    for p in workers:
        p.join()


Comenzando proceso: Proceso-1
Comenzando proceso: Proceso-0
Comenzando proceso: Proceso-3
Comenzando proceso: Proceso-4
Comenzando proceso: Proceso-2
Finalizando proceso: Proceso-1Finalizando proceso: Proceso-0Finalizando proceso: Proceso-3Finalizando proceso: Proceso-4Finalizando proceso: Proceso-2




[INFO/Proceso-0] child process calling self.run()
[INFO/Proceso-2] child process calling self.run()
[INFO/Proceso-1] child process calling self.run()
[INFO/Proceso-3] child process calling self.run()
[INFO/Proceso-4] child process calling self.run()
[INFO/Proceso-0] child process calling self.run()
[INFO/Proceso-2] child process calling self.run()
[INFO/Proceso-1] child process calling self.run()
[INFO/Proceso-3] child process calling self.run()
[INFO/Proceso-4] child process calling self.run()
[INFO/Proceso-0] process shutting down
[INFO/Proceso-2] process shutting down
[INFO/Proceso-1] process shutting down
[INFO/Proceso-3] process shutting down
[INFO/Proceso-4] process shutting down
[INFO/Proceso-0] process shutting down
[INFO/Proceso-2] process shutting down
[INFO/Proceso-1] process shutting down
[INFO/Proceso-3] process shutting down
[INFO/Proceso-4] process shutting down
[INFO/Proceso-0] process exiting with exitcode 0
[INFO/Proceso-2] process exiting with exitcode 0
[INFO/Proceso-1] process exiting with exitcode 0
[INFO/Proceso-3] process exiting with exitcode 0
[INFO/Proceso-4] process exiting with exitcode 0
[INFO/Proceso-0] process exiting with exitcode 0
[INFO/Proceso-2] process exiting with exitcode 0
[INFO/Proceso-1] process exiting with exitcode 0
[INFO/Proceso-3] process exiting with exitcode 0
[INFO/Proceso-4] process exiting with exitcode 0

Pool de procesos

En aquellos casos en los que los procesos pueden realizar tareas completamente independientes entre sí, podemos optar por utilizar un Pool de procesos.


In [5]:
import multiprocessing as mp
import math

def do_calculation(data):
    return math.sqrt(data)

def initialize():
    print 'Comenzando', mp.current_process().name

if __name__ == '__main__':
    logger.setLevel(logging.NOTSET)  # Disable previous logger
    inputs = [i ** 2 for i in list(range(10))]
    print 'Entrada   :', inputs
    
    # Salida del método 'map' de la biblioteca standard de Python
    builtin_outputs = map(do_calculation, inputs)
    print 'Salida map: ', builtin_outputs
    
    # Versión con multiprocessing.Pool
    pool_size = mp.cpu_count() * 2  # Calculamos tamaño del pool en función del num. de cores
    pool = mp.Pool(processes=pool_size,
                   initializer=initialize,
                   )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # fin de las tareas
    pool.join()   # esperamos finalización de procesos

    print 'Salida pool: ', pool_outputs


Entrada   : [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Salida map:  [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Comenzando PoolWorker-27
Comenzando PoolWorker-28
Salida pool:  [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Comenzando PoolWorker-30
Comenzando PoolWorker-32
Comenzando PoolWorker-29
Comenzando PoolWorker-31
Comenzando PoolWorker-34
Comenzando PoolWorker-33

Pipelining de datos

Un patrón de programación muy común es el llamado fan out/fan in, también conocido como pipelining de datos. En este esquema, un proceso va generando bloques de datos que se pueden procesar independientemente, y los envía a un grupo de procesos intermedios (workers) para su procesado. Finalmente, una vez completado el procesado los workers envían los resultados a uno o varios procesos que los consolidan y envían de vuelta al programa principal.

Un posible esquema para implementar esta estrategia sería el siguiente [2, 3].

Productor


In [7]:
import multiprocessing as mp

class ProducerProcess(mp.Process):
    """Genera items que se colocan en una cola (Queue),
    esperando ser procesados.
    
    El "target" debe ser una función generadora que produzca items
    serializables.
    """
    def __init__(self, group=None, target=None, name=None, args=None,
                 kwargs=None, output_queue=None, consumers=0):
        super(ProducerProcess, self).__init__(name=name)
        self.target= target
        self.args= args if args is not None else []
        self.kwargs= kwargs if kwargs is not None else {}
        self.output_queue= output_queue
        self.consumers= consumers
    
    def run(self):
        target= self.target
        for item in target(*self.args, **self.kwargs):
            self.output_queue.put(item)
        for x in range(self.consumers):
            # Insertar marcas de parada para los procesos consumidores
            self.output_queue.put(None)
        self.output_queue.close()

Worker (procesador intermedio)


In [9]:
import multiprocessing as mp

class ConsumerProducerProcess( mp.Process ):
    """Consume items de una cola, los procesa y los coloca en
    otra cola para su consolidación final por parte de los consumidores.
    
    El "target" debe ser una función generadora que cree items
    serializables y que espere un iterable como su único argumento. 
    Por tanto, los argumentos 'args' no se emplean aquí.
    """
    def __init__(self, group=None, target=None, name=None, kwargs=None,
                 input_queue=None, producers=0, output_queue=None, consumers=0):
        super(ConsumerProducerProcess, self).__init__(name=name)
        self.target= target
        self.kwargs= kwargs if kwargs is not None else {}
        self.input_queue= input_queue
        self.producers= producers
        self.output_queue= output_queue
        self.consumers= consumers
    
    def items(self):
        while self.producers != 0:
            for item in iter(self.input_queue.get, None):
                yield item
            self.producers -= 1
    
    def run(self):
        target= self.target
        for item in target(self.items(), **self.kwargs):
            self.output_queue.put(item)
        for x in range(self.consumers):
            # Insertamos marca de finalización para todos los consumidores
            self.output_queue.put(None)
        self.output_queue.close()

Consumidor


In [10]:
import multiprocessing as mp

class ConsumerProcess(mp.Process):
    """Cosume items de una cola (Queue).
    
    El "target" debe ser un método que espere un iterable como único
    argumento. Los argumentos 'args' no se emplean.
    """
    def __init__(self, group=None, target=None, name=None, kwargs=None,
                 input_queue=None, producers=0):
        super(ConsumerProcess, self).__init__(name=name)
        self.target= target
        self.kwargs= kwargs if kwargs is not None else {}
        self.input_queue= input_queue
        self.producers= producers
    
    def items(self):
        while self.producers != 0:
            for item in iter(self.input_queue.get, None):
                yield item
            self.producers -= 1
    
    def run(self):
        target= self.target
        target(self.items(), **self.kwargs)

Referencias

  1. Doug Hellmann. Multiprocessing-PyMOTW. http://pymotw.com/2/multiprocessing/ (Último acceso marzo 2015).
  2. Steven F. Lott. Multiprocessing goodness, part I. http://slott-softwarearchitect.blogspot.com.es/2012/02/multiprocessing-goodness-part-1-use.html (Último acceso marzo 2015).
  3. Steven F. Lott. Multiprocessing goodness, part II. http://slott-softwarearchitect.blogspot.com.es/2012/02/multiprocessing-goodness-part-2-class.html (Último acceso marzo 2015).

In [ ]: