Cuando implementamos programas para tratamiento y análisis de datos, uno de los requisitos que surge con gran frecuencia es cómo podemos paralelizar la ejecución de tareas que tengan poca o ninguna dependencia entre sí. Si bien es cierto que plataformas como Hadoop resuelven este problema escalando el tratamiento de datos en un cluster de varias máquinas, no siempre se puede tener acceso a ese tipo de recursos. Por otro lado, en ocasiones una implementación más sencilla puede ayudarnos a ahorrar tiempo, paralelizando ciertas tareas sin gran esfuerzo.
En este contexto, surge la utilidad del módulo multiprocessing en Python [1]. Este módulo nos ofrece una interfaz sencilla para la ejecución de múltiples procesos en paralelo, siempre y cuando nuestro equipo cuente con más de una unidad de procesamiento (como resulta habitual en el hardware moderno). El catálogo de métodos que incluye la API de multiprocessing se diseñó para ser muy similar al que ofrece el módulo threading.
In [6]:
import multiprocessing as mp
def worker(num):
"""
Método que realizará el trabajo, clonándose en cada nuevo proceso
"""
name = mp.current_process().name
print "Comenzando proceso: %s" % name
print "Worker número: %s" % num
print "Finalizando proceso: %s" % name
if __name__ == "__main__":
# Definición y puesta en marcha de subprocesos
# Mantenemos un listado de referencias a los procesos creados
workers = []
for i in range(5):
p = mp.Process(name="Proceso-"+unicode(i), target=worker, args=(i,) )
workers.append(p)
workers[i].start()
# Esperar a que finalicen todos los subprocesos
for p in workers:
p.join()
In [2]:
import multiprocessing as mp
import logging
import time
def worker(num):
"""
Método que realizará el trabajo, clonándose en cada nuevo proceso
"""
name = mp.current_process().name
print "Comenzando proceso: %s" % name
time.sleep(2) # Cada subproceso esperará 2 segs.
print "Finalizando proceso: %s" % name
if __name__ == "__main__":
# Definición y activación del logger centralizado
mp.log_to_stderr()
logger = mp.get_logger()
logger.setLevel(logging.INFO)
# Definición y puesta en marcha de subprocesos
# Mantenemos un listado de referencias a los procesos creados
workers = []
for i in range(5):
p = mp.Process(name="Proceso-"+unicode(i), target=worker, args=(i,) )
workers.append(p)
workers[i].start()
# Esperar a que finalicen todos los subprocesos
for p in workers:
p.join()
In [5]:
import multiprocessing as mp
import math
def do_calculation(data):
return math.sqrt(data)
def initialize():
print 'Comenzando', mp.current_process().name
if __name__ == '__main__':
logger.setLevel(logging.NOTSET) # Disable previous logger
inputs = [i ** 2 for i in list(range(10))]
print 'Entrada :', inputs
# Salida del método 'map' de la biblioteca standard de Python
builtin_outputs = map(do_calculation, inputs)
print 'Salida map: ', builtin_outputs
# Versión con multiprocessing.Pool
pool_size = mp.cpu_count() * 2 # Calculamos tamaño del pool en función del num. de cores
pool = mp.Pool(processes=pool_size,
initializer=initialize,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # fin de las tareas
pool.join() # esperamos finalización de procesos
print 'Salida pool: ', pool_outputs
Un patrón de programación muy común es el llamado fan out/fan in, también conocido como pipelining de datos. En este esquema, un proceso va generando bloques de datos que se pueden procesar independientemente, y los envía a un grupo de procesos intermedios (workers) para su procesado. Finalmente, una vez completado el procesado los workers envían los resultados a uno o varios procesos que los consolidan y envían de vuelta al programa principal.
Un posible esquema para implementar esta estrategia sería el siguiente [2, 3].
In [7]:
import multiprocessing as mp
class ProducerProcess(mp.Process):
"""Genera items que se colocan en una cola (Queue),
esperando ser procesados.
El "target" debe ser una función generadora que produzca items
serializables.
"""
def __init__(self, group=None, target=None, name=None, args=None,
kwargs=None, output_queue=None, consumers=0):
super(ProducerProcess, self).__init__(name=name)
self.target= target
self.args= args if args is not None else []
self.kwargs= kwargs if kwargs is not None else {}
self.output_queue= output_queue
self.consumers= consumers
def run(self):
target= self.target
for item in target(*self.args, **self.kwargs):
self.output_queue.put(item)
for x in range(self.consumers):
# Insertar marcas de parada para los procesos consumidores
self.output_queue.put(None)
self.output_queue.close()
In [9]:
import multiprocessing as mp
class ConsumerProducerProcess( mp.Process ):
"""Consume items de una cola, los procesa y los coloca en
otra cola para su consolidación final por parte de los consumidores.
El "target" debe ser una función generadora que cree items
serializables y que espere un iterable como su único argumento.
Por tanto, los argumentos 'args' no se emplean aquí.
"""
def __init__(self, group=None, target=None, name=None, kwargs=None,
input_queue=None, producers=0, output_queue=None, consumers=0):
super(ConsumerProducerProcess, self).__init__(name=name)
self.target= target
self.kwargs= kwargs if kwargs is not None else {}
self.input_queue= input_queue
self.producers= producers
self.output_queue= output_queue
self.consumers= consumers
def items(self):
while self.producers != 0:
for item in iter(self.input_queue.get, None):
yield item
self.producers -= 1
def run(self):
target= self.target
for item in target(self.items(), **self.kwargs):
self.output_queue.put(item)
for x in range(self.consumers):
# Insertamos marca de finalización para todos los consumidores
self.output_queue.put(None)
self.output_queue.close()
In [10]:
import multiprocessing as mp
class ConsumerProcess(mp.Process):
"""Cosume items de una cola (Queue).
El "target" debe ser un método que espere un iterable como único
argumento. Los argumentos 'args' no se emplean.
"""
def __init__(self, group=None, target=None, name=None, kwargs=None,
input_queue=None, producers=0):
super(ConsumerProcess, self).__init__(name=name)
self.target= target
self.kwargs= kwargs if kwargs is not None else {}
self.input_queue= input_queue
self.producers= producers
def items(self):
while self.producers != 0:
for item in iter(self.input_queue.get, None):
yield item
self.producers -= 1
def run(self):
target= self.target
target(self.items(), **self.kwargs)
In [ ]: