Notas para contenedor de docker:
Comando de docker para ejecución de la nota de forma local:
nota: cambiar <ruta a mi directorio>
por la ruta de directorio que se desea mapear a /datos
dentro del contenedor de docker.
docker run --rm -v <ruta a mi directorio>:/datos --name jupyterlab_numerical -p 8888:8888 -p 8786:8786 -p 9797:8787 -d palmoreck/jupyterlab_numerical:1.1.0
password para jupyterlab: qwerty
Detener el contenedor de docker:
docker stop jupyterlab_numerical
Documentación de la imagen de docker palmoreck/jupyterlab_numerical:1.1.0
en liga.
Observen que para esta nota es necesario tener el puerto 8787 mapeado a un puerto en su máquina por ejemplo arriba en la línea de docker run
está el 9797
Esta nota utiliza métodos vistos en 1.5.Integracion_numerica
Con la siguiente celda instalamos paquetes desde jupyterlab. Ver liga para magic commands.
In [2]:
%%bash
sudo apt-get install -y graphviz
In [1]:
%pip install -q --user graphviz
Se instala Graphviz para visualización de las task graphs.
La siguiente celda reiniciará el kernel de IPython para cargar los paquetes instalados en la celda anterior. Dar Ok en el mensaje que salga y continuar con el contenido del notebook.
In [2]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
Out[2]:
Documentación en: dask, dask-github, blog.
Librería para cómputo en paralelo, en específico ayuda al procesamiento en sistemas de memoria compartida o distribuida. Ver 2.2.Sistemas_de_memoria_compartida.
Extiende interfaces de arrays, dataframes y listas usadas en numpy
, pandas
e iterators
para el procesamiento en paralelo y manejo de datasets que no caben en la memoria RAM: larger than memory datasets. En el contexto de dask a los tipos de datos anteriores les denomina big data collections. Entonces las big data collections soportadas por dask
son alternativas a los arrays y dataframes de numpy
y pandas
para grandes datasets y ejecución en paralelo de algunas funciones de éstos paquetes.
Soporta un task scheduling dinámico y optimizado para cómputo*.
*Task scheduling es un enfoque de paralelización (ver 2.1.Un_poco_de_historia_y_generalidades) en el que dividimos el programa en muchos tasks medium-sized. En dask
se representan tales tasks como nodos de un grafo con líneas entre éstos si un task depende de lo producido por otro.
*Dinámico pues las task graphs pueden ser definidas a partir de las big data collections o por users.
*Optimizado para cómputo pues en dask existen task schedulers para ejecutar en paralelo (o secuencial) la task graph.
Cada task se implementa como un tuple de Python que contiene funciones y argumentos de las mismas:
In [1]:
def fun_suma(arg1,arg2):
return arg1+arg2
t = (fun_suma,-1,2) #tuple, también puede usarse así: t = tuple([fun,-1,2])
In [2]:
t
Out[2]:
Una task graph se implementa como un diccionario de tasks:
In [3]:
diccionario = {'arg1': -1,
'arg2': 2,
'resultado': (fun_suma, 'arg1', 'arg2')}
y a este diccionario de tasks se le llama dask graph.
Comentario: dask posee módulos para optimizar la ejecución de las task graphs (ver optimization)
In [4]:
import dask
Con los schedulers definidos en dask. Ver Scheduling y Scheduler Overview
Para cualquier scheduler definido en dask el entry point es una función get la cual recibe una dask graph y una key o lista de keys para cálculos:
In [5]:
dask.get(diccionario,'resultado') #synchronous scheduler
Out[5]:
In [6]:
dask.threaded.get(diccionario,'resultado') #scheduler backed by a thread pool
Out[6]:
In [7]:
dask.multiprocessing.get(diccionario,'resultado') #scheduler backed by a process pool
Out[7]:
Comentarios:
La función dask.get
ejecuta con un synchronous scheduler
el cual sólo utiliza un thread de ejecución sin paralelización. Útil para debugging y perfilamiento.
La función dask.threaded.get
ejecuta con un scheduler multiprocessing.pool.ThreadPool
. Como el nombre lo indica, se hacen fork
's y join
's de un proceso. El overhead para ejecución es pequeño y no hay costo en transferencia de datos entre los taks. Sin embargo debido al GIL de Python, este scheduler provee paralelización si tu código es esencialmente "no Python", por ejemplo si utilizas código de numpy
o pandas
o cython
. Ver 2.2.Python_multiprocessing y 1.7.Compilar_a_C_Cython.
La función dask.multiprocessing.get
ejecuta con un scheduler multiprocessing.Pool
. Ver 2.2.Python_multiprocessing. Se crean para cada task un proceso, no tiene problemas del GIL de Python para código Python, sin embargo, mover datos hacia procesos y de vuelta al proceso principal tiene costos altos. Útil para tasks que no requieren mucha transferencia de datos y cuando los inputs y outputs son pequeños.
Comentario: en la documentación de dask
se recomienda utilizar al scheduler distributed, documentación en Dask.distributed, en lugar de usar dask.multiprocesing.get
. Para una pequeña explicación ver Dask Distributed.
Usamos visualize
, ver por ejemplo: Visualize task graphs y dask.visualize.
In [8]:
dask.visualize(diccionario,'resultado')
Out[8]:
Las cajas representan datos, los círculos representan funciones que se ejecutan sobre tales datos y las flechas especifican cuáles funciones producen/consumen qué datos.
y para crear task graphs podemos partir de las collections o bien, definir nuestras propias dask graphs.
Comentario: si trabajamos con las collections será extraño que trabajemos a nivel de funciones get
. Cada collection tiene un default scheduler y una función compute que calcula el output de la collection:
dask arrays y dask dataframes utilizan al threaded scheduler por default pero con compute
puede cambiarse de scheduler:
In [9]:
import dask.array as da
In [10]:
x = da.arange(100)
In [11]:
x
Out[11]:
In [12]:
type(x)
Out[12]:
Llamamos a la función dask.compute. Por default para el dask array utiliza el threaded scheduler para la ejecución de la dask graph asociada:
In [13]:
x.sum().compute()
Out[13]:
Podemos cambiar de scheduler con compute:
In [14]:
x.sum().compute(scheduler='processes')
Out[14]:
Comentario: por default dask array
trabaja con chunks. En el ejemplo pasado no definimos, pero podemos hacer:
In [15]:
x = da.arange(100, chunks=10)
In [16]:
x
Out[16]:
Comentario: también por default, dask array
trabaja de una forma lazy, esto es, dask array
no calculará ningún resultado hasta que explícitamente se le dé la instrucción. Por ejemplo, podemos hacer:
In [17]:
y = x.sum()
In [18]:
y
Out[18]:
y hasta ahora no se ha calculado la suma (cuyo resultado es un escalar). Después de llamar a compute
se ejecuta la task graph para ejecución en paralelo (si se definió al scheduler para este tipo de ejecución):
In [19]:
y.compute()
Out[19]:
Y podemos visualizar la task graph definida en y
que es un dask array:
In [20]:
type(y)
Out[20]:
In [23]:
y.visualize()
Out[23]:
Ver la API de dask array en array api para más funcionalidad.
Comentario: también dask
tiene una función compute más general que toma múltiples collections y regresa múltiples resultados. Esto combina grafos de cada collection de modo que resultados intermedios son compartidos. Por ejemplo para los dask array's anteriores:
In [24]:
y = (x+1).sum()
z = (x+1).mean()
In [25]:
da.compute(y, z)
Out[25]:
y aquí el resultado intermedio x+1
sólo fue calculado una vez. Si hubiéramos hecho:
y.compute()
z.compute()
el resultado intermedio x+1
hubiera sido calculado dos veces. Ésta función compute
trabaja sobre cualquier collection y se encuentra en dask.base
:
In [26]:
from dask.base import compute
In [27]:
compute is da.compute
Out[27]:
In [28]:
compute(y,z)
Out[28]:
También se puede especificar para el scheduler threaded
o multiprocessing
el número de workers en compute
:
In [29]:
y.compute(num_workers=2)
Out[29]:
Ver configuration para otras formas de especificar al scheduler
.
Ver también Delayed y Custom Workloads with Dask Delayed para otra opción diferente a los dask array's, dask dataframe's y dask bag's para cómputo lazy.
Esta librería tiene una documentación en un proyecto externo al de dask y se puede encontrar en Dask.distributed.
Es una librería que extiende a concurrent.futures (multiprocessing
es similar a concurrent.futures
) y a dask
a clústers de tamaño mediano (alrededor de cientos de máquinas).
Aunque el nombre distributed puede pensarse como que la ejecución sólo puede llevarse a cabo en clústers de máquinas, también puede utilizarse en nuestra máquina local. Por razones como proveer acceso a una API asíncrona: Futures, acceso a un dashboard diagnóstico y manejar mejor la data locality con múltiples procesos más eficientemente que con dask.multiprocessing.get
, representa una alternativa fuerte a la generación de procesos locales vía dask.multiprocessing
: "The distributed scheduler described a couple sections below is often a better choice today than dask.multiprocessing
..."
Satisface las siguientes características:
Para ver más sobre el distributed scheduler
ver architecture.
Es un módulo o librería para ejecución de tasks en paralelo. El módulo multiprocessing
es otra librería para ejecución en paralelo. Ver 2.2.Python_multiprocessing.
En concurrent.futures
se trabaja con objetos tipo future
los cuales son resultados de cálculos que estarán disponibles en un futuro, de aquí el nombre de future y su cómputo no es lazy sino inmediato. La obtención de este valor en el cómputo en paralelo es asíncrona por lo que permite cómputo concurrente.
Ejemplo:
In [1]:
import time
In [2]:
def inc(x):
time.sleep(1)
return x+1
Ejecución secuencial:
In [3]:
%%time
inputs = [1,2,3,4,5,6,7,8,9,10]
results=[]
for x in inputs:
result=inc(x)
results.append(result)
In [4]:
results
Out[4]:
Ejecución con concurrent.futures:
In [5]:
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
In [6]:
executor = ProcessPoolExecutor(multiprocessing.cpu_count())
In [7]:
executor
Out[7]:
In [14]:
%%time
future = executor.submit(inc,3)
Obs: obsérvese que se obtiene el future
inmediatamente. Ver submit
In [15]:
future
Out[15]:
La ejecución del future
se encuentra en algún subproceso del proceso local. Obsérvese que tiene un estado running. Y después de un tiempo el estado es finished:
In [16]:
future
Out[16]:
Para obtener el resultado hacemos:
In [17]:
future.result()
Out[17]:
Entonces podemos hacer submit
de varios cálculos de forma asíncrona con la clase Executor y devolverá futures:
In [18]:
%%time
inputs = [1,2,3,4,5,6,7,8,9,10]
futures=[]
for x in inputs:
future=executor.submit(inc,x)
futures.append(future)
results = [future.result() for future in futures] #esta línea bloquea la ejecución
#del proceso local
#hasta que todos los procesos
#finalizan la ejecución.
Obs: obsérvese que se redujo el tiempo pues la ejecución fue en paralelo por los subprocesos que ejecutaron inc
.
In [19]:
results
Out[19]:
In [20]:
executor.shutdown()
También en concurrent.futures
se tiene una función map para cómputo inmediato (no lazy) usando futures que soporta múltiples llamados a una función utilizando un iterable.
Para una vista rápida a concurrent.futures
ver: Python: A quick introduction to the concurrent.futures module.
El Client es el entry point para users de dask.distributed
. Creamos un distributed scheduler al importar Client
. Esta acción override cualquier configuración que se haya hecho del scheduler.
In [2]:
from dask.distributed import Client
In [3]:
client = Client() #se le pueden pasar argumentos como número de
#workers, si se desean procesos o no,
#el límite de memoria a usar por cada proceso
In [4]:
client
Out[4]:
Al dar click en el dashboard
de arriba se mostrará la página de status
mediante bokeh (si corre éste notebook en un docker, revisar a qué puerto está mapeado el de 8787
):
y en el apartado de workers
:
Ver Web Interface y Diagnosing Performance para más sobre la información de diagnóstico que se junta en el dashboard.
Comentario: La línea anterior que contiene Client()
, configura un scheduler en el proceso local y varios procesos workers o dask-workers que corren un thread de ejecución (pueden correr más por default). Si se desea que los procesos worker estén contenidos en el proceso local puede usarse Client(processes=False)
. Esta situación es preferible si se desea evitar comunicación inter worker y si nuestros cálculos están soltando el GIL (por ejemplo al usar numpy
o dask.array
).
Algunas opciones que pueden utilizarse con la línea de Client
son:
Client(n_workers=4)
.
Client(threads_per_worker=2)
.
Client(memory_limit='10GB')
.
Ver LocalCluster para opciones a usar en Client
y API para la API con Client
.
In [5]:
import math
import time
from scipy.integrate import quad
In [6]:
f=lambda x: math.exp(-x**2)
a=0
b=1
n=10**6
h_hat=(b-a)/n
Forma secuencial
In [7]:
def Rcf(f, a, b, n): #Rcf: rectángulo compuesto para f
"""
Compute numerical approximation using rectangle or mid-point method in
an interval.
Nodes are generated via formula: x_i = a+(i+1/2)h_hat for i=0,1,...,n-1 and h_hat=(b-a)/n
Args:
f (lambda expression): lambda expression of integrand
a (int): left point of interval
b (int): right point of interval
n (int): number of subintervals
Returns:
Rcf (float)
"""
h_hat=(b-a)/n
suma_res=0
for i in range(0,n):
x=a+(i+1/2.0)*h_hat
suma_res+=f(x)
return h_hat*suma_res
In [8]:
start_time = time.time()
aprox=Rcf(f,a,b,n)
end_time = time.time()
In [9]:
secs = end_time-start_time
print("Rcf tomó",secs,"segundos" )
In [10]:
obj, err = quad(f, a, b)
In [11]:
def err_relativo(aprox, obj):
return math.fabs(aprox-obj)/math.fabs(obj) #obsérvese el uso de la librería math
In [12]:
err_relativo(aprox,obj)
Out[12]:
In [13]:
Rcf_secuencial_timeit = %timeit -n 5 -r 10 -o Rcf(f,a,b,n)
Forma en paralelo
In [13]:
p=multiprocessing.cpu_count()
ns_p = int(n/p) #número de subintervalos por proceso
#se asume que n es divisible por p
#si no se cumple esto, se puede definir
#ns_p=int(n/p) habiendo definido n primero
#y redefinir n como:
#n=p*ns_p
In [14]:
def construye_nodos_en_subintervalos(mi_id,a,b,h_hat,n,ns_p):
begin=mi_id*ns_p
end=begin+ns_p
h_hat=(b-a)/n
nodos = []
for i in range(begin,end):
nodos.append(a+(i+1/2.0)*h_hat)
return nodos
In [15]:
def evalua_f_en_nodos_y_suma(nodos,f):
suma_res = 0
for nodo in nodos:
suma_res+=f(nodo)
return suma_res
In [16]:
%%time
futures_nodos = client.map(construye_nodos_en_subintervalos,range(p),
**{'a':a,
'b':b,
'h_hat':h_hat,
'n':n,
'ns_p':ns_p}
)
futures_evalua = client.map(evalua_f_en_nodos_y_suma, futures_nodos,
**{'f':f}
)
results = client.gather(futures_evalua)
aprox=h_hat*sum(results)
In [17]:
err_relativo(aprox,obj)
Out[17]:
Con map
o submit
se han enviado las tasks al scheduler a realizar. Ver map, submit y concurrent.futures. Los cálculos definidos por las funciones construye_nodos_en_subintervalos
y evalua_f_en_nodos_y_suma
se ejecutan en cada dask-worker
y a distintos tiempos. Obsérvese que se puede hacer map
o submit
en futures. Los resultados viven en los procesosdask-workers
.
Con gather
se juntan los resultados (futures) en el proceso local.
Obs: podemos usar gather
o result
de los futures.
In [18]:
client.close()
In [19]:
client = Client(n_workers=multiprocessing.cpu_count(),
threads_per_worker=1
)
In [20]:
client
Out[20]:
In [21]:
%%time
futures_nodos = client.map(construye_nodos_en_subintervalos,range(p),
**{'a':a,
'b':b,
'h_hat':h_hat,
'n':n,
'ns_p':ns_p}
)
futures_evalua = client.map(evalua_f_en_nodos_y_suma, futures_nodos,
**{'f':f}
)
results = [future.result() for future in futures_evalua]
aprox = h_hat*sum(results)
In [22]:
err_relativo(aprox,obj)
Out[22]:
Nota: los siguientes resultados se obtuvieron con una máquina con 8 cores, así que pueden no coincidir con los resultados previos de esta sección.
In [23]:
err_np=[]
n_cpus=[]
In [24]:
def Rcf_parallel(mi_id,f,a,b,h_hat,n,ns_p):
begin=mi_id*ns_p
end=begin+ns_p
h_hat=(b-a)/n
suma_res = 0
for i in range(begin,end):
x=a+(i+1/2.0)*h_hat
suma_res+=f(x)
return suma_res
In [25]:
def mifun(cl,p,ns_p):
futures_Rcf_parallel = cl.map(Rcf_parallel,range(p),
**{'f':f,
'a':a,
'b':b,
'h_hat':h_hat,
'n':n,
'ns_p':ns_p}
)
results = cl.gather(futures_Rcf_parallel)
aprox = h_hat*sum(results)
return err_relativo(aprox,obj)
In [26]:
client.close()
In [27]:
client
Out[27]:
In [28]:
for p in range(1,multiprocessing.cpu_count()+1):
if n%p==0:
ns_p=int(n/p)
client = Client(n_workers=p,
threads_per_worker=1
)
err_np.append(mifun(client,p,ns_p))
n_cpus.append(p)
client.close()
In [29]:
err_np
Out[29]:
In [30]:
n_cpus
Out[30]:
In [66]:
l=[]
n_cpus=[]
In [67]:
client.close()
In [68]:
client
Out[68]:
In [69]:
for p in range(1,multiprocessing.cpu_count()+1):
if n%p==0:
ns_p=int(n/p)
client = Client(n_workers=p,
threads_per_worker=1
)
futures_Rcf_parallel = client.map(Rcf_parallel,range(p),
**{'f':f,
'a':a,
'b':b,
'h_hat':h_hat,
'n':n,
'ns_p':ns_p}
)
resultado_timeit=%timeit -n 1 -r 1 -o client.gather(futures_Rcf_parallel)
l.append(resultado_timeit.average)
n_cpus.append(p)
client.close()
In [70]:
l
Out[70]:
In [71]:
n_cpus
Out[71]:
In [72]:
import matplotlib.pyplot as plt
In [73]:
plt.plot(n_cpus,l,'o-')
plt.title('Gráfica num cpus vs tiempo')
plt.xlabel('num cpus')
plt.ylabel('tiempo')
plt.grid()
plt.show()
Ejercicio: elegir regla de Simpson o integración por el método de Monte Carlo para generar la gráfica anterior. No olviden medir errores relativos. Tales reglas están en 1.5.Integracion_numerica.
Referencias:
Otras referencias:
En dask se hace referencia al uso de funciones pure. Ver: Pure Functions by Default y Function Purity para ejemplos de funciones pure.
En Dask JupyterLab Extension se muestra cómo instalar la extensión en jupyterlab para dask.
In [74]:
def create_Client(n):
for p in range(1,multiprocessing.cpu_count()+1):
if n%p==0:
ns_p=int(n/p)
cl = Client(n_workers=p,
threads_per_worker=1
)
yield (cl,p,ns_p)
In [75]:
def mifun2(cl,p,ns_p):
futures_Rcf_parallel = cl.map(Rcf_parallel,range(p),
**{'f':f,
'a':a,
'b':b,
'h_hat':h_hat,
'n':n,
'ns_p':ns_p}
)
results = cl.gather(futures_Rcf_parallel)
aprox = h_hat*sum(results)
return aprox
In [76]:
g = create_Client(n)
In [78]:
client.close()
In [79]:
client
Out[79]:
In [80]:
client,p,ns_p = next(g)
In [81]:
client
Out[81]:
In [83]:
mifun2(client,p,ns_p)
Out[83]:
In [84]:
%timeit -n 1 -r 1 -o mifun2(client,p,ns_p)
Out[84]:
In [85]:
_
Out[85]:
In [86]:
%%timeit -n 3 -r 3 -o
mifun2(client,p,ns_p)
client.restart()
Out[86]:
In [87]:
_
Out[87]:
In [94]:
client.close()
In [93]:
client,p,ns_p = zip(*create_Client(n))
In [95]:
client
Out[95]:
In [96]:
p
Out[96]:
In [97]:
ns_p
Out[97]:
In [98]:
mifun2(client[0],p[0],ns_p[0])
Out[98]:
In [99]:
mifun2(client[1],p[1],ns_p[1])
Out[99]:
In [ ]:
%timeit -n 3 -r 3 -o mifun2(client[1],p[1],ns_p[1])