Notas para contenedor de docker:
Comando de docker para ejecución de la nota de forma local:
nota: cambiar <ruta a mi directorio>
por la ruta de directorio que se desea mapear a /datos
dentro del contenedor de docker.
docker run --rm -v <ruta a mi directorio>:/datos --name jupyterlab_numerical -p 8888:8888 -d palmoreck/jupyterlab_numerical:1.1.0
password para jupyterlab: qwerty
Detener el contenedor de docker:
docker stop jupyterlab_numerical
Documentación de la imagen de docker palmoreck/jupyterlab_numerical:1.1.0
en liga.
Esta nota utiliza métodos vistos en 1.5.Integracion_numerica
Documentación en: multiprocessing.
La implementación más utilizada de Python, CPython no utiliza múltiples cores por default. De tarea queda leer la discusión de la liga anterior en el apartado Design sobre el Global INterpreter Lock: GIL y el por qué CPython no soporta ejecución multithreaded o multiprocesses.
El módulo multiprocessing
nos permite realizar procesamientos basados en procesos o threads para compartir trabajo y datos. Se recomienda usar este módulo para el shared memory programming (ver 2.2.Sistemas_de_memoria_compartida) y para trabajos que son demandantes de CPU. Para paralelizar trabajos demandantes en I/O no se recomienda su uso.
Otro módulo en Python para procesamiento utilizando los cores de tu máquina es concurrent.futures que provee el comportamiento principal de multiprocessing
. Ver liga y liga para más sobre concurrent.futures
y concurrent.futures
vs multiprocessing
.
Aunque en Python los threads son nativos del sistema operativo (esto es, no se simulan, son realmente threads del sistema operativo creados en el hardware), están limitados por el global interpreter lock, GIL, de modo que un sólo thread interactúe con un objeto Python en un único tiempo. Esto degrada el performance de los programas pues los threads compiten por el GIL presente en el intérprete de Python, ver Understanding the Python GIL.
Al usar el módulo multiprocessing
ejecutamos en paralelo un número de interpretadores Python (CPython), cada uno con su propio espacio de memoria privada y su propio GIL que se ejecutan en un instante (y con un thread).
Comentario: en multiprocessing
se utilizan subprocesos en lugar de threads y en lugar de fork se realiza spawn. Ver Contexts and start methods y difference between threadpool vs pool in python multiprocessing para diferencias entre fork y spawn.
En multiprocessing
los procesos son generados al utilizar la clase Process
para crear objetos y llamar al método start()
. Ver Process para documentación de ésta clase.
In [1]:
from multiprocessing import Process #importamos clase Process
In [2]:
def f():
print('hello world! de subproceso')
if __name__=='__main__':
p1 = Process(target=f)
p2 = Process(target=f)
p1.start() #start sólo puede ser llamada una vez por objeto Process
p2.start()
p1.join() #el proceso principal espera a que termine p1
p2.join() #el proceso principal espera a que termine p2
print('hello world! de proceso')
Comentario: es una buena práctica explícitamente hacer join
's para cada objeto process que realizó start
. Ver Programming guidelines para buenas prácticas.
La clase Process
recibe la función a ejecutar para cada proceso con el argumento target
y también tiene args
para los argumentos de la función:
In [3]:
def f(s):
print(s)
if __name__=='__main__':
p1 = Process(target=f, args=('hola',))
p2 = Process(target=f, args=('mundo',))
p1.start()
p2.start()
p1.join()
p2.join()
Comentarios:
if __name__=='__main__':
para que las personas que tienen Windows puedan ejecutar sin problema el código (para las personas que están en Unix pueden quitar el bloque de if
). El bloque del if
ayuda a que los subprocesos importen el módulo __main__
(por lo que no se ejecuta la sección que está dentro de if __name__=='__main__':
pues no son programas principales) y continúa la ejecución de las líneas de start
(cada subproceso ejecuta f
) y join
. Si se quita este statement por ejemplo:def f(s):
print(s)
p1 = Process(target=f, args=('hola',))
p2 = Process(target=f, args=('mundo',))
p1.start()
p2.start()
p1.join()
p2.join()
el notebook quedará bloqueado pues una celda con el código anterior creará subprocesos que a su vez crearán otros subprocesos, que a su vez crearán otros subprocesos... y así de forma recursiva.
cpu_count
para determinar el número de cores que el sistema operativo puede usar. Este número es la cantidad física o simulada (hyperthreading) de cores.
In [4]:
import multiprocessing
In [5]:
multiprocessing.cpu_count()
Out[5]:
La clase Pool
crea un conjunto (pool) de procesos tipo worker que procesarán las tareas a realizar vía funciones tipo map
o apply
. Se hace map
del input hacia los procesadores y se colecta el output de éstos. Mientras el map
se realiza, el proceso que lanzó el map se bloquea hasta que finalicen las tareas (aunque hay map_async). El output es una lista.
Obs: el procesamiento de las tareas podríamos hacerlo con la clase Process
de arriba pero tendríamos que utilizar un ciclo y colectar los resultados.
Comentario: para un gran número de tareas a realizar utilicen Pool
, para pocas tareas a realizar (pocas=menos de $10$) utilicen Process
.
In [7]:
from multiprocessing import Pool #importamos clase Pool
In [8]:
def f(dummy):
return 'hello world!'
if __name__ == '__main__':
pool = Pool(multiprocessing.cpu_count())
results = pool.map(f,range(multiprocessing.cpu_count()))
print(results)
pool.close()
pool.join()
In [9]:
def f(dummy):
return 'hello world!'
if __name__ == '__main__':
num_processes=2
pool = Pool(num_processes)
results = pool.map(f,range(num_processes))
print(results)
pool.close()
pool.join()
Con apply:
In [10]:
def f():
return 'hello world!'
if __name__ == '__main__':
num_processes=2
pool = Pool(num_processes)
results = [pool.apply(f) for x in range(num_processes)]
print(results)
pool.close()
pool.join()
Podemos usar un context manager para evitar tener líneas pool.close()
, pool.join()
In [11]:
def f(dummy):
return 'hello world!'
if __name__ == '__main__':
num_processes=2
with Pool(processes=num_processes) as pool:
results = pool.map(f,range(num_processes))
print(results)
In [12]:
def f(s):
return s
if __name__ == '__main__':
num_processes=2
with Pool(processes=num_processes) as pool:
results = pool.starmap(f,[('hola',),('mundo',)])
print(results)
De acuerdo a Programming guidelines:
Explicitly pass resources to child processes ... On Unix a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process...
In [13]:
import math
import time
from scipy.integrate import quad
In [14]:
f=lambda x: math.exp(-x**2)
a=0
b=1
n=10**6
h_hat=(b-a)/n
Forma secuencial
In [15]:
def Rcf(f, a, b, n): #Rcf: rectángulo compuesto para f
"""
Compute numerical approximation using rectangle or mid-point method in
an interval.
Nodes are generated via formula: x_i = a+(i+1/2)h_hat for i=0,1,...,n-1 and h_hat=(b-a)/n
Args:
f (lambda expression): lambda expression of integrand
a (int): left point of interval
b (int): right point of interval
n (int): number of subintervals
Returns:
Rcf (float)
"""
h_hat=(b-a)/n
suma_res=0
for i in range(0,n):
x=a+(i+1/2.0)*h_hat
suma_res+=f(x)
return h_hat*suma_res
In [16]:
start_time = time.time()
aprox=Rcf(f,a,b,n)
end_time = time.time()
In [17]:
secs = end_time-start_time
print("Rcf tomó",secs,"segundos" )
In [18]:
obj, err = quad(f, a, b)
In [19]:
def err_relativo(aprox, obj):
return math.fabs(aprox-obj)/math.fabs(obj) #obsérvese el uso de la librería math
In [20]:
err_relativo(aprox,obj)
Out[20]:
La flag -o
nos permite guardar el output de timeit
.
In [21]:
Rcf_secuencial_timeit = %timeit -n 5 -r 10 -o Rcf(f,a,b,n)
Forma en paralelo
In [22]:
p=multiprocessing.cpu_count() #número de procesos
ns_p=int(n/p) #número de subintervalos por proceso
#se asume que n es divisible por p
#si no se cumple esto, se puede definir
#ns_p=int(n/p) habiendo definido n primero
#y redefinir n como:
#n=p*ns_p
In [23]:
print("número de subintervalos:",n)
In [24]:
print("número de subintervalos por proceso:",ns_p)
In [25]:
def Rcf_parallel(mi_id):
begin=mi_id*ns_p
end=begin+ns_p
h_hat=(b-a)/n
suma_res=0
for i in range(begin,end):
x=a+(i+1/2.0)*h_hat
suma_res+=f(x)
return suma_res
if __name__ == '__main__':
start_time=time.time()
with Pool(processes=p) as pool:
results = pool.map(Rcf_parallel,range(p))
aprox=h_hat*sum(results)
end_time=time.time()
In [26]:
secs = end_time-start_time
print("Rcf_parallel tomó",secs,"segundos" )
In [27]:
err_relativo(aprox,obj)
Out[27]:
In [28]:
%%timeit -n 5 -r 10 -o
if __name__ == '__main__':
with Pool(processes=p) as pool:
results = pool.map(Rcf_parallel,range(p))
aprox=h_hat*sum(results)
Out[28]:
In [29]:
Rcf_parallel_timeit=_
In [30]:
Rcf_parallel_timeit.average
Out[30]:
Las funciones definidas con lambda
no son picklable
. Utilizamos la forma def f(x):
siguiente para definir a la función.
In [31]:
def f(x):
return math.exp(-x**2)
In [32]:
def Rcf_parallel2(t):
f,a,b,mi_id,n,ns_p = t
begin=mi_id*ns_p
end=begin+ns_p
h_hat=(b-a)/n
suma_res=0
for i in range(begin,end):
x=a+(i+1/2.0)*h_hat
suma_res+=f(x)
return suma_res
if __name__ == '__main__':
start_time=time.time()
it=((f,a,b,k,n,ns_p) for k in range(p))
h_hat=(b-a)/n
with Pool(processes=p) as pool:
results = pool.map(Rcf_parallel2,it)
aprox=h_hat*sum(results)
end_time=time.time()
In [33]:
secs = end_time-start_time
print("Rcf_parallel2 tomó",secs,"segundos" )
In [34]:
err_relativo(aprox,obj)
Out[34]:
In [35]:
%%timeit -n 5 -r 10 -o
if __name__ == '__main__':
it=((f,a,b,k,n,ns_p) for k in range(p))
with Pool(processes=p) as pool:
results = pool.map(Rcf_parallel2,it)
aprox=h_hat*sum(results)
Out[35]:
In [36]:
Rcf_parallel2_timeit=_
In [37]:
Rcf_parallel2_timeit.average
Out[37]:
In [38]:
def Rcf_parallel3(t):
f,a,b,mi_id,n,ns_p = t
begin=mi_id*ns_p
end=begin+ns_p
h_hat=(b-a)/n
sum_res=0
f_nodes=(f(a+(i+1/2.0)*h_hat) for i in range(begin,end))
suma_res=sum(f_nodes)
return suma_res
if __name__ == '__main__':
start_time=time.time()
it=((f,a,b,k,n,ns_p) for k in range(p))
with Pool(processes=p) as pool:
results = pool.map(Rcf_parallel3,it)
aprox=h_hat*sum(results)
end_time=time.time()
In [39]:
secs = end_time-start_time
print("Rcf_parallel3 tomó",secs,"segundos" )
In [40]:
err_relativo(aprox,obj)
Out[40]:
In [41]:
%%timeit -n 5 -r 10 -o
if __name__ == '__main__':
it=((f,a,b,k,n,ns_p) for k in range(p))
with Pool(processes=p) as pool:
results = pool.map(Rcf_parallel3,it)
aprox=h_hat*sum(results)
Out[41]:
In [42]:
Rcf_parallel3_timeit=_
In [43]:
Rcf_parallel3_timeit.average
Out[43]:
En la siguiente propuesta, el proceso principal realiza todas las sumas de la evaluación de los subprocesos:
In [44]:
def Rcf_parallel4(t):
f,i,a,b,h_hat = t
f_nodes=f(a+(i+1/2)*h_hat)
return f_nodes
if __name__ == '__main__':
start_time=time.time()
h_hat=(b-a)/n
it=((f,i,a,b,h_hat) for i in range(n))
with Pool(processes=p) as pool:
results = pool.map(Rcf_parallel4,it)
suma_res=sum(results)
aprox=h_hat*suma_res
end_time=time.time()
In [45]:
secs = end_time-start_time
print("Rcf_parallel4 tomó",secs,"segundos" )
In [46]:
err_relativo(aprox,obj)
Out[46]:
In [47]:
%%timeit -n 5 -r 10 -o
if __name__ == '__main__':
it=((f,i,a,b,h_hat) for i in range(n))
with Pool(processes=p) as pool:
results = pool.map(Rcf_parallel4,it)
suma_res=sum(results)
aprox=h_hat*suma_res
Out[47]:
In [48]:
Rcf_parallel4_timeit=_
In [49]:
Rcf_parallel4_timeit.average
Out[49]:
En la siguiente propuesta usamos threads en lugar de subprocesos:
In [50]:
from multiprocessing.pool import ThreadPool
In [51]:
def Rcf_parallel_threads_mp(t):
f,a,b,mi_id,n,ns_p = t
begin=mi_id*ns_p
end=begin+ns_p
h_hat=(b-a)/n
suma_res=0
for i in range(begin,end):
x=a+(i+1/2.0)*h_hat
suma_res+=f(x)
return suma_res
if __name__ == '__main__':
start_time=time.time()
it=((f,a,b,k,n,ns_p) for k in range(p))
h_hat=(b-a)/n
with ThreadPool(processes=p) as pool:
results = pool.map(Rcf_parallel_threads_mp,it)
aprox=h_hat*sum(results)
end_time=time.time()
In [52]:
secs = end_time-start_time
print("Rcf_parallel_threads_mp tomó",secs,"segundos" )
In [53]:
err_relativo(aprox,obj)
Out[53]:
In [54]:
%%timeit -n 5 -r 10 -o
if __name__ == '__main__':
it=((f,a,b,k,n,ns_p) for k in range(p))
with ThreadPool(processes=p) as pool:
results = pool.map(Rcf_parallel_threads_mp,it)
aprox=h_hat*sum(results)
Out[54]:
In [55]:
Rcf_parallel_threads_mp=_
In [56]:
Rcf_parallel_threads_mp.average
Out[56]:
Obsérvese que es más lenta la ejecución usando threads que processes de multiprocessing:
Para problemas altos en I/O se recomienda el uso de threads y no de processes de multiprocessing. Para problemas CPU bound se recomienda el uso de processes y no de threads.
Nota: los siguientes resultados se obtuvieron con una máquina con 8 cores, así que pueden no coincidir con los resultados previos de esta sección.
In [49]:
import matplotlib.pyplot as plt
In [50]:
(Rcf_secuencial_timeit.average,Rcf_parallel_timeit.average,
Rcf_parallel2_timeit.average,Rcf_parallel3_timeit.average,
Rcf_parallel4_timeit.average)
Out[50]:
El mejor tiempo lo tiene Rcf_parallel2
con:
In [51]:
p
Out[51]:
procesos.
Usaremos ésta implementación para realizar la gráfica variando el número de procesos de $1$ hasta multiprocessing.cpu_count()
In [54]:
multiprocessing.cpu_count()
Out[54]:
In [55]:
err_np=[]
n_cpus=[]
In [56]:
n=10**6 #cambiar n si se desea
h_hat=(b-a)/n
In [57]:
def mifun(p,ns_p):
it=((f,a,b,k,n,ns_p) for k in range(p))
with Pool(processes=p) as pool:
results = pool.map(Rcf_parallel2,it)
aprox=h_hat*sum(results)
return err_relativo(aprox,obj)
In [58]:
for p in range(1,multiprocessing.cpu_count()+1):
if n%p==0:
ns_p=int(n/p)
err_np.append(mifun(p,ns_p))
n_cpus.append(p)
In [59]:
err_np
Out[59]:
In [60]:
n_cpus
Out[60]:
In [61]:
l=[]
n_cpus=[]
In [62]:
for p in range(1,multiprocessing.cpu_count()+1):
if n%p==0:
ns_p=int(n/p)
resultado_timeit=%timeit -n 5 -r 10 -o mifun(p,ns_p)
l.append(resultado_timeit.average)
n_cpus.append(p)
In [63]:
n_cpus
Out[63]:
In [64]:
l
Out[64]:
In [67]:
plt.plot(n_cpus,l,'o-')
plt.title('Gráfica num cpus vs tiempo')
plt.xlabel('num cpus')
plt.ylabel('tiempo')
plt.grid()
plt.show()
Ejercicio: elegir regla de Simpson o integración por el método de Monte Carlo para generar la gráfica anterior. No olviden medir errores relativos. Tales reglas están en 1.5.Integracion_numerica.
Si eligieron regla de Simpson:
Medir tiempo de las $4$ versiones, por ejemplo, para el caso del rectángulo se tienen Rcf, Rcf_parallel, Rcf_parallel2,Rcf_parallel3,Rcf_parallel4.
Elegir la mejor versión en medición del tiempo de ejecución y variar el número de procesadores. Aquí se miden errores relativos y se hace la gráfica de número de cpus vs tiempo.
Referencias
M. Gorelick, I. Ozsvald, High Performance Python, O'Reilly Media, 2014.