Notas para contenedor de docker:

Comando de docker para ejecución de la nota de forma local:

nota: cambiar <ruta a mi directorio> por la ruta de directorio que se desea mapear a /datos dentro del contenedor de docker.

docker run --rm -v <ruta a mi directorio>:/datos --name jupyterlab_numerical -p 8888:8888 -d palmoreck/jupyterlab_numerical:1.1.0

password para jupyterlab: qwerty

Detener el contenedor de docker:

docker stop jupyterlab_numerical

Documentación de la imagen de docker palmoreck/jupyterlab_numerical:1.1.0 en liga.


Esta nota utiliza métodos vistos en 1.5.Integracion_numerica

El módulo de multiprocessing

Documentación en: multiprocessing.

La implementación más utilizada de Python, CPython no utiliza múltiples cores por default. De tarea queda leer la discusión de la liga anterior en el apartado Design sobre el Global INterpreter Lock: GIL y el por qué CPython no soporta ejecución multithreaded o multiprocesses.

El módulo multiprocessing nos permite realizar procesamientos basados en procesos o threads para compartir trabajo y datos. Se recomienda usar este módulo para el shared memory programming (ver 2.2.Sistemas_de_memoria_compartida) y para trabajos que son demandantes de CPU. Para paralelizar trabajos demandantes en I/O no se recomienda su uso.

Otro módulo en Python para procesamiento utilizando los cores de tu máquina es concurrent.futures que provee el comportamiento principal de multiprocessing. Ver liga y liga para más sobre concurrent.futures y concurrent.futures vs multiprocessing.

Nota sobre el GIL y multiprocessing

Aunque en Python los threads son nativos del sistema operativo (esto es, no se simulan, son realmente threads del sistema operativo creados en el hardware), están limitados por el global interpreter lock, GIL, de modo que un sólo thread interactúe con un objeto Python en un único tiempo. Esto degrada el performance de los programas pues los threads compiten por el GIL presente en el intérprete de Python, ver Understanding the Python GIL.

Al usar el módulo multiprocessing ejecutamos en paralelo un número de interpretadores Python (CPython), cada uno con su propio espacio de memoria privada y su propio GIL que se ejecutan en un instante (y con un thread).

Comentario: en multiprocessing se utilizan subprocesos en lugar de threads y en lugar de fork se realiza spawn. Ver Contexts and start methods y difference between threadpool vs pool in python multiprocessing para diferencias entre fork y spawn.

Ejemplos

Hello world!

En multiprocessing los procesos son generados al utilizar la clase Process para crear objetos y llamar al método start(). Ver Process para documentación de ésta clase.


In [1]:
from multiprocessing import Process #importamos clase Process

In [2]:
def f():
    print('hello world! de subproceso')
    
if __name__=='__main__':
    p1 = Process(target=f)
    p2 = Process(target=f)
    p1.start() #start sólo puede ser llamada una vez por objeto Process
    p2.start() 
    p1.join() #el proceso principal espera a que termine p1
    p2.join() #el proceso principal espera a que termine p2
    print('hello world! de proceso')


hello world! de subproceso
hello world! de subproceso
hello world! de proceso

Comentario: es una buena práctica explícitamente hacer join's para cada objeto process que realizó start. Ver Programming guidelines para buenas prácticas.

La clase Process recibe la función a ejecutar para cada proceso con el argumento target y también tiene args para los argumentos de la función:


In [3]:
def f(s):
    print(s)
    
if __name__=='__main__':
    p1 = Process(target=f, args=('hola',))
    p2 = Process(target=f, args=('mundo',)) 
    p1.start()
    p2.start()
    p1.join()
    p2.join()


hola
mundo

Comentarios:

  • De acuerdo a Programming guidelines se usa if __name__=='__main__': para que las personas que tienen Windows puedan ejecutar sin problema el código (para las personas que están en Unix pueden quitar el bloque de if). El bloque del if ayuda a que los subprocesos importen el módulo __main__ (por lo que no se ejecuta la sección que está dentro de if __name__=='__main__': pues no son programas principales) y continúa la ejecución de las líneas de start (cada subproceso ejecuta f) y join. Si se quita este statement por ejemplo:
def f(s):
    print(s)
p1 = Process(target=f, args=('hola',))
p2 = Process(target=f, args=('mundo',)) 
p1.start()
p2.start()
p1.join()
p2.join()

el notebook quedará bloqueado pues una celda con el código anterior creará subprocesos que a su vez crearán otros subprocesos, que a su vez crearán otros subprocesos... y así de forma recursiva.

  • En multiprocessing tenemos la función cpu_count para determinar el número de cores que el sistema operativo puede usar. Este número es la cantidad física o simulada (hyperthreading) de cores.

In [4]:
import multiprocessing

In [5]:
multiprocessing.cpu_count()


Out[5]:
2

Pool of workers, ver Using a pool of workers

La clase Pool crea un conjunto (pool) de procesos tipo worker que procesarán las tareas a realizar vía funciones tipo map o apply. Se hace map del input hacia los procesadores y se colecta el output de éstos. Mientras el map se realiza, el proceso que lanzó el map se bloquea hasta que finalicen las tareas (aunque hay map_async). El output es una lista.

Obs: el procesamiento de las tareas podríamos hacerlo con la clase Process de arriba pero tendríamos que utilizar un ciclo y colectar los resultados.

Comentario: para un gran número de tareas a realizar utilicen Pool, para pocas tareas a realizar (pocas=menos de $10$) utilicen Process.


In [7]:
from multiprocessing import Pool #importamos clase Pool

1) Hello world!


In [8]:
def f(dummy):
    return 'hello world!'
    
if __name__ == '__main__':
    pool = Pool(multiprocessing.cpu_count())
    results = pool.map(f,range(multiprocessing.cpu_count()))
    print(results)
    pool.close()    
    pool.join()


['hello world!', 'hello world!']

In [9]:
def f(dummy):
    return 'hello world!'
    
if __name__ == '__main__':
    num_processes=2
    pool = Pool(num_processes)
    results = pool.map(f,range(num_processes))
    print(results)
    pool.close()    
    pool.join()


['hello world!', 'hello world!']

Con apply:


In [10]:
def f():
    return 'hello world!'
    
if __name__ == '__main__':
    num_processes=2
    pool = Pool(num_processes)
    results = [pool.apply(f) for x in range(num_processes)]
    print(results)
    pool.close()    
    pool.join()


['hello world!', 'hello world!']

Podemos usar un context manager para evitar tener líneas pool.close(), pool.join()


In [11]:
def f(dummy):
    return 'hello world!'
    
if __name__ == '__main__':
    num_processes=2
    with Pool(processes=num_processes) as pool:
        results = pool.map(f,range(num_processes))
        print(results)


['hello world!', 'hello world!']

2) Pasar múltiples argumentos vía starmap


In [12]:
def f(s):
    return s
    
if __name__ == '__main__':
    num_processes=2
    with Pool(processes=num_processes) as pool:
        results = pool.starmap(f,[('hola',),('mundo',)])
        print(results)


['hola', 'mundo']

De acuerdo a Programming guidelines:

Explicitly pass resources to child processes ... On Unix a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process...

3) Regla compuesta del rectángulo


In [13]:
import math
import time
from scipy.integrate import quad

In [14]:
f=lambda x: math.exp(-x**2)
a=0
b=1
n=10**6
h_hat=(b-a)/n

Forma secuencial


In [15]:
def Rcf(f, a, b, n): #Rcf: rectángulo compuesto para f
    """
    Compute numerical approximation using rectangle or mid-point method in 
    an interval.
    Nodes are generated via formula: x_i = a+(i+1/2)h_hat for i=0,1,...,n-1 and h_hat=(b-a)/n
    Args:
        f (lambda expression): lambda expression of integrand
        a (int): left point of interval
        b (int): right point of interval
        n (int): number of subintervals
    Returns:
        Rcf (float) 
    """
    h_hat=(b-a)/n
    suma_res=0
    for i in range(0,n):
        x=a+(i+1/2.0)*h_hat
        suma_res+=f(x)
    return h_hat*suma_res

In [16]:
start_time = time.time()
aprox=Rcf(f,a,b,n)
end_time = time.time()

In [17]:
secs = end_time-start_time
print("Rcf tomó",secs,"segundos" )


Rcf tomó 0.3380098342895508 segundos

In [18]:
obj, err = quad(f, a, b)

In [19]:
def err_relativo(aprox, obj):
    return math.fabs(aprox-obj)/math.fabs(obj) #obsérvese el uso de la librería math

In [20]:
err_relativo(aprox,obj)


Out[20]:
6.71939731300312e-14

La flag -o nos permite guardar el output de timeit.


In [21]:
Rcf_secuencial_timeit = %timeit -n 5 -r 10 -o Rcf(f,a,b,n)


329 ms ± 16.5 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)

Forma en paralelo


In [22]:
p=multiprocessing.cpu_count() #número de procesos
ns_p=int(n/p) #número de subintervalos por proceso
              #se asume que n es divisible por p
              #si no se cumple esto, se puede definir 
              #ns_p=int(n/p) habiendo definido n primero
              #y redefinir n como: 
              #n=p*ns_p

In [23]:
print("número de subintervalos:",n)


número de subintervalos: 1000000

In [24]:
print("número de subintervalos por proceso:",ns_p)


número de subintervalos por proceso: 500000

In [25]:
def Rcf_parallel(mi_id):
    begin=mi_id*ns_p
    end=begin+ns_p
    h_hat=(b-a)/n
    suma_res=0
    for i in range(begin,end):
        x=a+(i+1/2.0)*h_hat
        suma_res+=f(x)
    return suma_res
if __name__ == '__main__':
    start_time=time.time()
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel,range(p))
        aprox=h_hat*sum(results)
    end_time=time.time()

In [26]:
secs = end_time-start_time
print("Rcf_parallel tomó",secs,"segundos" )


Rcf_parallel tomó 0.23510241508483887 segundos

In [27]:
err_relativo(aprox,obj)


Out[27]:
5.842307840730588e-14

In [28]:
%%timeit -n 5 -r 10 -o
if __name__ == '__main__':
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel,range(p))
        aprox=h_hat*sum(results)


241 ms ± 19.1 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)
Out[28]:
<TimeitResult : 241 ms ± 19.1 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)>

In [29]:
Rcf_parallel_timeit=_

In [30]:
Rcf_parallel_timeit.average


Out[30]:
0.2410215291600025

4) Ejemplo para pasar múltiples parámetros a una función vía un generator.

Las funciones definidas con lambda no son picklable. Utilizamos la forma def f(x): siguiente para definir a la función.


In [31]:
def f(x):
    return math.exp(-x**2)

In [32]:
def Rcf_parallel2(t):
    f,a,b,mi_id,n,ns_p = t
    begin=mi_id*ns_p
    end=begin+ns_p
    h_hat=(b-a)/n
    suma_res=0
    for i in range(begin,end):
        x=a+(i+1/2.0)*h_hat
        suma_res+=f(x)
    return suma_res
if __name__ == '__main__':
    start_time=time.time()
    it=((f,a,b,k,n,ns_p) for k in range(p))
    h_hat=(b-a)/n
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel2,it)
        aprox=h_hat*sum(results)
    end_time=time.time()

In [33]:
secs = end_time-start_time
print("Rcf_parallel2 tomó",secs,"segundos" )


Rcf_parallel2 tomó 0.3233485221862793 segundos

In [34]:
err_relativo(aprox,obj)


Out[34]:
5.842307840730588e-14

In [35]:
%%timeit -n 5 -r 10 -o
if __name__ == '__main__':
    it=((f,a,b,k,n,ns_p) for k in range(p))
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel2,it)
        aprox=h_hat*sum(results)


231 ms ± 25 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)
Out[35]:
<TimeitResult : 231 ms ± 25 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)>

In [36]:
Rcf_parallel2_timeit=_

In [37]:
Rcf_parallel2_timeit.average


Out[37]:
0.2311474588999977

In [38]:
def Rcf_parallel3(t):
    f,a,b,mi_id,n,ns_p = t
    begin=mi_id*ns_p
    end=begin+ns_p
    h_hat=(b-a)/n
    sum_res=0
    f_nodes=(f(a+(i+1/2.0)*h_hat) for i in range(begin,end))
    suma_res=sum(f_nodes)
    return suma_res
if __name__ == '__main__':
    start_time=time.time()
    it=((f,a,b,k,n,ns_p) for k in range(p))
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel3,it)
        aprox=h_hat*sum(results)
    end_time=time.time()

In [39]:
secs = end_time-start_time
print("Rcf_parallel3 tomó",secs,"segundos" )


Rcf_parallel3 tomó 0.32324814796447754 segundos

In [40]:
err_relativo(aprox,obj)


Out[40]:
5.842307840730588e-14

In [41]:
%%timeit -n 5 -r 10 -o
if __name__ == '__main__':
    it=((f,a,b,k,n,ns_p) for k in range(p))
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel3,it)
        aprox=h_hat*sum(results)


232 ms ± 27.5 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)
Out[41]:
<TimeitResult : 232 ms ± 27.5 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)>

In [42]:
Rcf_parallel3_timeit=_

In [43]:
Rcf_parallel3_timeit.average


Out[43]:
0.23234400814000084

En la siguiente propuesta, el proceso principal realiza todas las sumas de la evaluación de los subprocesos:


In [44]:
def Rcf_parallel4(t):
    f,i,a,b,h_hat = t
    f_nodes=f(a+(i+1/2)*h_hat)
    return f_nodes
if __name__ == '__main__':
    start_time=time.time()
    h_hat=(b-a)/n
    it=((f,i,a,b,h_hat) for i in range(n))
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel4,it)
    suma_res=sum(results)
    aprox=h_hat*suma_res
    end_time=time.time()

In [45]:
secs = end_time-start_time
print("Rcf_parallel4 tomó",secs,"segundos" )


Rcf_parallel4 tomó 1.8619468212127686 segundos

In [46]:
err_relativo(aprox,obj)


Out[46]:
6.71939731300312e-14

In [47]:
%%timeit -n 5 -r 10 -o
if __name__ == '__main__':
    it=((f,i,a,b,h_hat) for i in range(n))
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel4,it)
    suma_res=sum(results)
    aprox=h_hat*suma_res


919 ms ± 38.9 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)
Out[47]:
<TimeitResult : 919 ms ± 38.9 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)>

In [48]:
Rcf_parallel4_timeit=_

In [49]:
Rcf_parallel4_timeit.average


Out[49]:
0.9189972480200026

En la siguiente propuesta usamos threads en lugar de subprocesos:


In [50]:
from multiprocessing.pool import ThreadPool

In [51]:
def Rcf_parallel_threads_mp(t):
    f,a,b,mi_id,n,ns_p = t
    begin=mi_id*ns_p
    end=begin+ns_p
    h_hat=(b-a)/n
    suma_res=0
    for i in range(begin,end):
        x=a+(i+1/2.0)*h_hat
        suma_res+=f(x)
    return suma_res
if __name__ == '__main__':
    start_time=time.time()
    it=((f,a,b,k,n,ns_p) for k in range(p))
    h_hat=(b-a)/n
    with ThreadPool(processes=p) as pool:
        results = pool.map(Rcf_parallel_threads_mp,it)
        aprox=h_hat*sum(results)
    end_time=time.time()

In [52]:
secs = end_time-start_time
print("Rcf_parallel_threads_mp tomó",secs,"segundos" )


Rcf_parallel_threads_mp tomó 0.49099159240722656 segundos

In [53]:
err_relativo(aprox,obj)


Out[53]:
5.842307840730588e-14

In [54]:
%%timeit -n 5 -r 10 -o
if __name__ == '__main__':
    it=((f,a,b,k,n,ns_p) for k in range(p))
    with ThreadPool(processes=p) as pool:
        results = pool.map(Rcf_parallel_threads_mp,it)
        aprox=h_hat*sum(results)


392 ms ± 37.3 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)
Out[54]:
<TimeitResult : 392 ms ± 37.3 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)>

In [55]:
Rcf_parallel_threads_mp=_

In [56]:
Rcf_parallel_threads_mp.average


Out[56]:
0.39209109546000037

Obsérvese que es más lenta la ejecución usando threads que processes de multiprocessing:

Para problemas altos en I/O se recomienda el uso de threads y no de processes de multiprocessing. Para problemas CPU bound se recomienda el uso de processes y no de threads.

Gráfica de tiempo de ejecución vs número de procesos

Nota: los siguientes resultados se obtuvieron con una máquina con 8 cores, así que pueden no coincidir con los resultados previos de esta sección.


In [49]:
import matplotlib.pyplot as plt

In [50]:
(Rcf_secuencial_timeit.average,Rcf_parallel_timeit.average,
 Rcf_parallel2_timeit.average,Rcf_parallel3_timeit.average,
 Rcf_parallel4_timeit.average)


Out[50]:
(0.37864778737537563,
 0.2407200336828828,
 0.2351025953167118,
 0.23654820258496329,
 0.7486265898752026)

El mejor tiempo lo tiene Rcf_parallel2 con:


In [51]:
p


Out[51]:
8

procesos.

Usaremos ésta implementación para realizar la gráfica variando el número de procesos de $1$ hasta multiprocessing.cpu_count()


In [54]:
multiprocessing.cpu_count()


Out[54]:
8

In [55]:
err_np=[]
n_cpus=[]

In [56]:
n=10**6 #cambiar n si se desea
h_hat=(b-a)/n

In [57]:
def mifun(p,ns_p):
    it=((f,a,b,k,n,ns_p) for k in range(p))
    with Pool(processes=p) as pool:
        results = pool.map(Rcf_parallel2,it)
        aprox=h_hat*sum(results)
    return err_relativo(aprox,obj)

In [58]:
for p in range(1,multiprocessing.cpu_count()+1):
    if n%p==0:
        ns_p=int(n/p)
        err_np.append(mifun(p,ns_p))
        n_cpus.append(p)

In [59]:
err_np


Out[59]:
[6.71939731300312e-14,
 5.842307840730588e-14,
 4.816559135869493e-14,
 5.024682061493483e-14,
 3.2556371936895645e-14]

In [60]:
n_cpus


Out[60]:
[1, 2, 4, 5, 8]

In [61]:
l=[]
n_cpus=[]

In [62]:
for p in range(1,multiprocessing.cpu_count()+1):
    if n%p==0:
        ns_p=int(n/p)
        resultado_timeit=%timeit -n 5 -r 10 -o mifun(p,ns_p)
        l.append(resultado_timeit.average)
        n_cpus.append(p)


422 ms ± 8.41 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)
264 ms ± 12.3 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)
232 ms ± 1.67 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)
234 ms ± 2.48 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)
248 ms ± 2.1 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)

In [63]:
n_cpus


Out[63]:
[1, 2, 4, 5, 8]

In [64]:
l


Out[64]:
[0.4215729755931534,
 0.2644839840568602,
 0.23185158796142788,
 0.23440644810209052,
 0.2481302970601246]

In [67]:
plt.plot(n_cpus,l,'o-')
plt.title('Gráfica num cpus vs tiempo')
plt.xlabel('num cpus')
plt.ylabel('tiempo')
plt.grid()
plt.show()


Ejercicio: elegir regla de Simpson o integración por el método de Monte Carlo para generar la gráfica anterior. No olviden medir errores relativos. Tales reglas están en 1.5.Integracion_numerica.

Si eligieron regla de Simpson:

  • Medir tiempo de las $4$ versiones, por ejemplo, para el caso del rectángulo se tienen Rcf, Rcf_parallel, Rcf_parallel2,Rcf_parallel3,Rcf_parallel4.

  • Elegir la mejor versión en medición del tiempo de ejecución y variar el número de procesadores. Aquí se miden errores relativos y se hace la gráfica de número de cpus vs tiempo.

Referencias

  1. M. Gorelick, I. Ozsvald, High Performance Python, O'Reilly Media, 2014.

  2. 2.1.Un_poco_de_historia_y_generalidades

  3. 2.2.Sistemas_de_memoria_compartida.ipynb