Notas para contenedor de docker:

Comando de docker para ejecución de la nota de forma local:

nota: cambiar <ruta a mi directorio> por la ruta de directorio que se desea mapear a /datos dentro del contenedor de docker.

docker run --rm -v <ruta a mi directorio>:/datos --name jupyterlab_numerical -p 8888:8888 -p 8786:8786 -p 9797:8787 -d palmoreck/jupyterlab_numerical:1.1.0

password para jupyterlab: qwerty

Detener el contenedor de docker:

docker stop jupyterlab_numerical

Documentación de la imagen de docker palmoreck/jupyterlab_numerical:1.1.0 en liga.

Observen que para esta nota es necesario tener el puerto 8787 mapeado a un puerto en su máquina por ejemplo arriba en la línea de docker run está el 9797


Esta nota utiliza métodos vistos en 1.5.Integracion_numerica

Con la siguiente celda instalamos paquetes desde jupyterlab. Ver liga para magic commands.


In [2]:
%%bash
sudo apt-get install -y graphviz


Reading package lists...
Building dependency tree...
Reading state information...
graphviz is already the newest version (2.40.1-2).
0 upgraded, 0 newly installed, 0 to remove and 24 not upgraded.

In [1]:
%pip install -q --user graphviz


WARNING: You are using pip version 19.3.1; however, version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
Note: you may need to restart the kernel to use updated packages.

Se instala Graphviz para visualización de las task graphs.

La siguiente celda reiniciará el kernel de IPython para cargar los paquetes instalados en la celda anterior. Dar Ok en el mensaje que salga y continuar con el contenido del notebook.


In [2]:
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)


Out[2]:
{'status': 'ok', 'restart': True}

Dask

Documentación en: dask, dask-github, blog.

¿Qué es y qué nos ofrece?

  • Librería para cómputo en paralelo, en específico ayuda al procesamiento en sistemas de memoria compartida o distribuida. Ver 2.2.Sistemas_de_memoria_compartida.

  • Extiende interfaces de arrays, dataframes y listas usadas en numpy, pandas e iterators para el procesamiento en paralelo y manejo de datasets que no caben en la memoria RAM: larger than memory datasets. En el contexto de dask a los tipos de datos anteriores les denomina big data collections. Entonces las big data collections soportadas por dask son alternativas a los arrays y dataframes de numpy y pandas para grandes datasets y ejecución en paralelo de algunas funciones de éstos paquetes.

  • Soporta un task scheduling dinámico y optimizado para cómputo*.

*Task scheduling es un enfoque de paralelización (ver 2.1.Un_poco_de_historia_y_generalidades) en el que dividimos el programa en muchos tasks medium-sized. En dask se representan tales tasks como nodos de un grafo con líneas entre éstos si un task depende de lo producido por otro.

*Dinámico pues las task graphs pueden ser definidas a partir de las big data collections o por users.

*Optimizado para cómputo pues en dask existen task schedulers para ejecutar en paralelo (o secuencial) la task graph.

¿Cómo se implementan una task y una task graph en dask?

Cada task se implementa como un tuple de Python que contiene funciones y argumentos de las mismas:


In [1]:
def fun_suma(arg1,arg2):
    return arg1+arg2

t = (fun_suma,-1,2) #tuple, también puede usarse así: t = tuple([fun,-1,2])

In [2]:
t


Out[2]:
(<function __main__.fun_suma(arg1, arg2)>, -1, 2)

Una task graph se implementa como un diccionario de tasks:


In [3]:
diccionario = {'arg1': -1,
               'arg2': 2,
               'resultado': (fun_suma, 'arg1', 'arg2')}

y a este diccionario de tasks se le llama dask graph.

Comentario: dask posee módulos para optimizar la ejecución de las task graphs (ver optimization)

¿Cómo ejecutamos la dask graph?


In [4]:
import dask

Con los schedulers definidos en dask. Ver Scheduling y Scheduler Overview

Para cualquier scheduler definido en dask el entry point es una función get la cual recibe una dask graph y una key o lista de keys para cálculos:


In [5]:
dask.get(diccionario,'resultado') #synchronous scheduler


Out[5]:
1

In [6]:
dask.threaded.get(diccionario,'resultado') #scheduler backed by a thread pool


Out[6]:
1

In [7]:
dask.multiprocessing.get(diccionario,'resultado') #scheduler backed by a process pool


Out[7]:
1

Comentarios:

  • La función dask.get ejecuta con un synchronous scheduler el cual sólo utiliza un thread de ejecución sin paralelización. Útil para debugging y perfilamiento.

  • La función dask.threaded.get ejecuta con un scheduler multiprocessing.pool.ThreadPool. Como el nombre lo indica, se hacen fork's y join's de un proceso. El overhead para ejecución es pequeño y no hay costo en transferencia de datos entre los taks. Sin embargo debido al GIL de Python, este scheduler provee paralelización si tu código es esencialmente "no Python", por ejemplo si utilizas código de numpy o pandas o cython. Ver 2.2.Python_multiprocessing y 1.7.Compilar_a_C_Cython.

  • La función dask.multiprocessing.get ejecuta con un scheduler multiprocessing.Pool. Ver 2.2.Python_multiprocessing. Se crean para cada task un proceso, no tiene problemas del GIL de Python para código Python, sin embargo, mover datos hacia procesos y de vuelta al proceso principal tiene costos altos. Útil para tasks que no requieren mucha transferencia de datos y cuando los inputs y outputs son pequeños.

Comentario: en la documentación de dask se recomienda utilizar al scheduler distributed, documentación en Dask.distributed, en lugar de usar dask.multiprocesing.get. Para una pequeña explicación ver Dask Distributed.

¿Cómo visualizamos la task graph?

Usamos visualize, ver por ejemplo: Visualize task graphs y dask.visualize.


In [8]:
dask.visualize(diccionario,'resultado')


Out[8]:

Las cajas representan datos, los círculos representan funciones que se ejecutan sobre tales datos y las flechas especifican cuáles funciones producen/consumen qué datos.

En resumen en dask seguimos un flujo como el siguiente dibujo:

y para crear task graphs podemos partir de las collections o bien, definir nuestras propias dask graphs.

Comentario: si trabajamos con las collections será extraño que trabajemos a nivel de funciones get. Cada collection tiene un default scheduler y una función compute que calcula el output de la collection:

Ejemplo:

dask arrays y dask dataframes utilizan al threaded scheduler por default pero con compute puede cambiarse de scheduler:


In [9]:
import dask.array as da

In [10]:
x = da.arange(100)

In [11]:
x


Out[11]:
Array Chunk
Bytes 800 B 800 B
Shape (100,) (100,)
Count 1 Tasks 1 Chunks
Type int64 numpy.ndarray
100 1

In [12]:
type(x)


Out[12]:
dask.array.core.Array

Llamamos a la función dask.compute. Por default para el dask array utiliza el threaded scheduler para la ejecución de la dask graph asociada:


In [13]:
x.sum().compute()


Out[13]:
4950

Podemos cambiar de scheduler con compute:


In [14]:
x.sum().compute(scheduler='processes')


Out[14]:
4950

Comentario: por default dask array trabaja con chunks. En el ejemplo pasado no definimos, pero podemos hacer:


In [15]:
x = da.arange(100, chunks=10)

In [16]:
x


Out[16]:
Array Chunk
Bytes 800 B 80 B
Shape (100,) (10,)
Count 10 Tasks 10 Chunks
Type int64 numpy.ndarray
100 1

Comentario: también por default, dask array trabaja de una forma lazy, esto es, dask array no calculará ningún resultado hasta que explícitamente se le dé la instrucción. Por ejemplo, podemos hacer:


In [17]:
y = x.sum()

In [18]:
y


Out[18]:
Array Chunk
Bytes 8 B 8 B
Shape () ()
Count 24 Tasks 1 Chunks
Type int64 numpy.ndarray

y hasta ahora no se ha calculado la suma (cuyo resultado es un escalar). Después de llamar a compute se ejecuta la task graph para ejecución en paralelo (si se definió al scheduler para este tipo de ejecución):


In [19]:
y.compute()


Out[19]:
4950

Y podemos visualizar la task graph definida en y que es un dask array:


In [20]:
type(y)


Out[20]:
dask.array.core.Array

In [23]:
y.visualize()


Out[23]:

Ver la API de dask array en array api para más funcionalidad.

Comentario: también dask tiene una función compute más general que toma múltiples collections y regresa múltiples resultados. Esto combina grafos de cada collection de modo que resultados intermedios son compartidos. Por ejemplo para los dask array's anteriores:


In [24]:
y = (x+1).sum()
z = (x+1).mean()

In [25]:
da.compute(y, z)


Out[25]:
(5050, 50.5)

y aquí el resultado intermedio x+1 sólo fue calculado una vez. Si hubiéramos hecho:

y.compute()
z.compute()

el resultado intermedio x+1 hubiera sido calculado dos veces. Ésta función compute trabaja sobre cualquier collection y se encuentra en dask.base:


In [26]:
from dask.base import compute

In [27]:
compute is da.compute


Out[27]:
True

In [28]:
compute(y,z)


Out[28]:
(5050, 50.5)

También se puede especificar para el scheduler threaded o multiprocessing el número de workers en compute:


In [29]:
y.compute(num_workers=2)


Out[29]:
5050

Ver configuration para otras formas de especificar al scheduler.

Ver también Delayed y Custom Workloads with Dask Delayed para otra opción diferente a los dask array's, dask dataframe's y dask bag's para cómputo lazy.

Dask distributed

Esta librería tiene una documentación en un proyecto externo al de dask y se puede encontrar en Dask.distributed.

Es una librería que extiende a concurrent.futures (multiprocessing es similar a concurrent.futures) y a dask a clústers de tamaño mediano (alrededor de cientos de máquinas).

Aunque el nombre distributed puede pensarse como que la ejecución sólo puede llevarse a cabo en clústers de máquinas, también puede utilizarse en nuestra máquina local. Por razones como proveer acceso a una API asíncrona: Futures, acceso a un dashboard diagnóstico y manejar mejor la data locality con múltiples procesos más eficientemente que con dask.multiprocessing.get, representa una alternativa fuerte a la generación de procesos locales vía dask.multiprocessing: "The distributed scheduler described a couple sections below is often a better choice today than dask.multiprocessing..."

Satisface las siguientes características:

  • Low latency: Each task suffers about 1ms of overhead. A small computation and network roundtrip can complete in less than 10ms.
  • Peer-to-peer data sharing: Workers communicate with each other to share data. This removes central bottlenecks for data transfer.
  • Complex Scheduling: Supports complex workflows (not just map/filter/reduce) which are necessary for sophisticated algorithms used in nd-arrays, machine learning, image processing, and statistics.
  • Data Locality: Scheduling algorithms cleverly execute computations where data lives. This minimizes network traffic and improves efficiency.
  • Familiar APIs: Compatible with the concurrent.futures API in the Python standard library. Compatible with dask API for parallel algorithms

Para ver más sobre el distributed scheduler ver architecture.

Es un módulo o librería para ejecución de tasks en paralelo. El módulo multiprocessing es otra librería para ejecución en paralelo. Ver 2.2.Python_multiprocessing.

En concurrent.futures se trabaja con objetos tipo future los cuales son resultados de cálculos que estarán disponibles en un futuro, de aquí el nombre de future y su cómputo no es lazy sino inmediato. La obtención de este valor en el cómputo en paralelo es asíncrona por lo que permite cómputo concurrente.

Ejemplo:


In [1]:
import time

In [2]:
def inc(x):
    time.sleep(1)
    return x+1

Ejecución secuencial:


In [3]:
%%time
inputs = [1,2,3,4,5,6,7,8,9,10]
results=[]

for x in inputs:
    result=inc(x)
    results.append(result)


CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 10 s

In [4]:
results


Out[4]:
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

Ejecución con concurrent.futures:


In [5]:
from concurrent.futures import ProcessPoolExecutor
import multiprocessing

In [6]:
executor = ProcessPoolExecutor(multiprocessing.cpu_count())

In [7]:
executor


Out[7]:
<concurrent.futures.process.ProcessPoolExecutor at 0x7f29b4331710>

In [14]:
%%time
future = executor.submit(inc,3)


CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 759 µs

Obs: obsérvese que se obtiene el future inmediatamente. Ver submit


In [15]:
future


Out[15]:
<Future at 0x7f29b42daa20 state=running>

La ejecución del future se encuentra en algún subproceso del proceso local. Obsérvese que tiene un estado running. Y después de un tiempo el estado es finished:


In [16]:
future


Out[16]:
<Future at 0x7f29b42daa20 state=finished returned int>

Para obtener el resultado hacemos:


In [17]:
future.result()


Out[17]:
4

Entonces podemos hacer submit de varios cálculos de forma asíncrona con la clase Executor y devolverá futures:


In [18]:
%%time
inputs = [1,2,3,4,5,6,7,8,9,10]
futures=[]

for x in inputs:
    future=executor.submit(inc,x)
    futures.append(future)
    
results = [future.result() for future in futures] #esta línea bloquea la ejecución 
                                                  #del proceso local
                                                  #hasta que todos los procesos 
                                                  #finalizan la ejecución.


CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 2.02 s

Obs: obsérvese que se redujo el tiempo pues la ejecución fue en paralelo por los subprocesos que ejecutaron inc.


In [19]:
results


Out[19]:
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

In [20]:
executor.shutdown()

También en concurrent.futures se tiene una función map para cómputo inmediato (no lazy) usando futures que soporta múltiples llamados a una función utilizando un iterable.

Para una vista rápida a concurrent.futures ver: Python: A quick introduction to the concurrent.futures module.

Client

El Client es el entry point para users de dask.distributed. Creamos un distributed scheduler al importar Client. Esta acción override cualquier configuración que se haya hecho del scheduler.


In [2]:
from dask.distributed import Client

In [3]:
client = Client() #se le pueden pasar argumentos como número de
                  #workers, si se desean procesos o no,
                  #el límite de memoria a usar por cada proceso

In [4]:
client


Out[4]:

Client

Cluster

  • Workers: 4
  • Cores: 8
  • Memory: 6.25 GB

Al dar click en el dashboard de arriba se mostrará la página de status mediante bokeh (si corre éste notebook en un docker, revisar a qué puerto está mapeado el de 8787):

y en el apartado de workers:

Ver Web Interface y Diagnosing Performance para más sobre la información de diagnóstico que se junta en el dashboard.

Comentario: La línea anterior que contiene Client(), configura un scheduler en el proceso local y varios procesos workers o dask-workers que corren un thread de ejecución (pueden correr más por default). Si se desea que los procesos worker estén contenidos en el proceso local puede usarse Client(processes=False). Esta situación es preferible si se desea evitar comunicación inter worker y si nuestros cálculos están soltando el GIL (por ejemplo al usar numpy o dask.array).

Algunas opciones que pueden utilizarse con la línea de Client son:

  • Client(n_workers=4).

  • Client(threads_per_worker=2).

  • Client(memory_limit='10GB').

Ver LocalCluster para opciones a usar en Client y API para la API con Client.

Regla compuesta del rectángulo


In [5]:
import math
import time
from scipy.integrate import quad

In [6]:
f=lambda x: math.exp(-x**2)
a=0
b=1
n=10**6
h_hat=(b-a)/n

Forma secuencial


In [7]:
def Rcf(f, a, b, n): #Rcf: rectángulo compuesto para f
    """
    Compute numerical approximation using rectangle or mid-point method in 
    an interval.
    Nodes are generated via formula: x_i = a+(i+1/2)h_hat for i=0,1,...,n-1 and h_hat=(b-a)/n
    Args:
        f (lambda expression): lambda expression of integrand
        a (int): left point of interval
        b (int): right point of interval
        n (int): number of subintervals
    Returns:
        Rcf (float) 
    """
    h_hat=(b-a)/n
    suma_res=0
    for i in range(0,n):
        x=a+(i+1/2.0)*h_hat
        suma_res+=f(x)
    return h_hat*suma_res

In [8]:
start_time = time.time()
aprox=Rcf(f,a,b,n)
end_time = time.time()

In [9]:
secs = end_time-start_time
print("Rcf tomó",secs,"segundos" )


Rcf tomó 0.4179039001464844 segundos

In [10]:
obj, err = quad(f, a, b)

In [11]:
def err_relativo(aprox, obj):
    return math.fabs(aprox-obj)/math.fabs(obj) #obsérvese el uso de la librería math

In [12]:
err_relativo(aprox,obj)


Out[12]:
6.71939731300312e-14

In [13]:
Rcf_secuencial_timeit = %timeit -n 5 -r 10 -o Rcf(f,a,b,n)


411 ms ± 5.12 ms per loop (mean ± std. dev. of 10 runs, 5 loops each)

Forma en paralelo


In [13]:
p=multiprocessing.cpu_count()
ns_p = int(n/p) #número de subintervalos por proceso
                #se asume que n es divisible por p
                #si no se cumple esto, se puede definir 
                #ns_p=int(n/p) habiendo definido n primero
                #y redefinir n como: 
                #n=p*ns_p

In [14]:
def construye_nodos_en_subintervalos(mi_id,a,b,h_hat,n,ns_p):
    begin=mi_id*ns_p
    end=begin+ns_p
    h_hat=(b-a)/n
    nodos = []
    for i in range(begin,end):
        nodos.append(a+(i+1/2.0)*h_hat)
    return nodos

In [15]:
def evalua_f_en_nodos_y_suma(nodos,f):
    suma_res = 0
    for nodo in nodos:
        suma_res+=f(nodo)
    return suma_res

In [16]:
%%time
futures_nodos = client.map(construye_nodos_en_subintervalos,range(p),
                                                            **{'a':a,
                                                               'b':b,
                                                               'h_hat':h_hat,
                                                               'n':n,
                                                               'ns_p':ns_p}
                            )
futures_evalua = client.map(evalua_f_en_nodos_y_suma, futures_nodos,
                                                       **{'f':f}
                            )
results = client.gather(futures_evalua)
aprox=h_hat*sum(results)


CPU times: user 80 ms, sys: 10 ms, total: 90 ms
Wall time: 1.23 s

In [17]:
err_relativo(aprox,obj)


Out[17]:
3.2556371936895645e-14

Con map o submit se han enviado las tasks al scheduler a realizar. Ver map, submit y concurrent.futures. Los cálculos definidos por las funciones construye_nodos_en_subintervalos y evalua_f_en_nodos_y_suma se ejecutan en cada dask-worker y a distintos tiempos. Obsérvese que se puede hacer map o submit en futures. Los resultados viven en los procesosdask-workers.

Con gather se juntan los resultados (futures) en el proceso local.

Obs: podemos usar gather o result de los futures.


In [18]:
client.close()

In [19]:
client = Client(n_workers=multiprocessing.cpu_count(),
                threads_per_worker=1
               )

In [20]:
client


Out[20]:

Client

Cluster

  • Workers: 8
  • Cores: 8
  • Memory: 6.25 GB

In [21]:
%%time
futures_nodos = client.map(construye_nodos_en_subintervalos,range(p),
                                                            **{'a':a,
                                                               'b':b,
                                                               'h_hat':h_hat,
                                                               'n':n,
                                                               'ns_p':ns_p}
                            )
futures_evalua = client.map(evalua_f_en_nodos_y_suma, futures_nodos,
                                                       **{'f':f}
                            )

results = [future.result() for future in futures_evalua]
aprox = h_hat*sum(results)


CPU times: user 110 ms, sys: 20 ms, total: 130 ms
Wall time: 1.11 s

In [22]:
err_relativo(aprox,obj)


Out[22]:
3.2556371936895645e-14

Gráfica de tiempo de ejecución vs número de procesos

Nota: los siguientes resultados se obtuvieron con una máquina con 8 cores, así que pueden no coincidir con los resultados previos de esta sección.


In [23]:
err_np=[]
n_cpus=[]

In [24]:
def Rcf_parallel(mi_id,f,a,b,h_hat,n,ns_p):
    begin=mi_id*ns_p
    end=begin+ns_p
    h_hat=(b-a)/n
    suma_res = 0
    for i in range(begin,end):
        x=a+(i+1/2.0)*h_hat
        suma_res+=f(x)
    return suma_res

In [25]:
def mifun(cl,p,ns_p):
    futures_Rcf_parallel = cl.map(Rcf_parallel,range(p),
                                             **{'f':f,
                                                'a':a,
                                                'b':b,
                                                'h_hat':h_hat,
                                                'n':n,
                                                'ns_p':ns_p}
                              ) 
    results = cl.gather(futures_Rcf_parallel)
    aprox = h_hat*sum(results)
    return err_relativo(aprox,obj)

In [26]:
client.close()

In [27]:
client


Out[27]:

Client

Cluster

  • Workers: 0
  • Cores: 0
  • Memory: 0 B

In [28]:
for p in range(1,multiprocessing.cpu_count()+1):
    if n%p==0:
        ns_p=int(n/p)
        client = Client(n_workers=p,
                        threads_per_worker=1
                       )
        err_np.append(mifun(client,p,ns_p))
        n_cpus.append(p)
        client.close()

In [29]:
err_np


Out[29]:
[6.71939731300312e-14,
 5.842307840730588e-14,
 4.816559135869493e-14,
 5.024682061493483e-14,
 3.2556371936895645e-14]

In [30]:
n_cpus


Out[30]:
[1, 2, 4, 5, 8]

In [66]:
l=[]
n_cpus=[]

In [67]:
client.close()

In [68]:
client


Out[68]:

Client

Cluster

  • Workers: 0
  • Cores: 0
  • Memory: 0 B

In [69]:
for p in range(1,multiprocessing.cpu_count()+1):
    if n%p==0:
        ns_p=int(n/p)
        client = Client(n_workers=p,
                threads_per_worker=1
               )
        futures_Rcf_parallel = client.map(Rcf_parallel,range(p),
                                          **{'f':f,
                                             'a':a,
                                             'b':b,
                                             'h_hat':h_hat,
                                             'n':n,
                                             'ns_p':ns_p}
                                         )
        resultado_timeit=%timeit -n 1 -r 1 -o client.gather(futures_Rcf_parallel)
        l.append(resultado_timeit.average)
        n_cpus.append(p)
        client.close()


462 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
255 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
141 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
147 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
155 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

In [70]:
l


Out[70]:
[0.46233253995887935,
 0.25474231713451445,
 0.14068673201836646,
 0.14710374106653035,
 0.15464612585492432]

In [71]:
n_cpus


Out[71]:
[1, 2, 4, 5, 8]

In [72]:
import matplotlib.pyplot as plt

In [73]:
plt.plot(n_cpus,l,'o-')
plt.title('Gráfica num cpus vs tiempo')
plt.xlabel('num cpus')
plt.ylabel('tiempo')
plt.grid()
plt.show()


Ejercicio: elegir regla de Simpson o integración por el método de Monte Carlo para generar la gráfica anterior. No olviden medir errores relativos. Tales reglas están en 1.5.Integracion_numerica.

Referencias:

  1. 2.1.Un_poco_de_historia_y_generalidades

  2. 2.2.Python_multiprocessing

  3. dask

Otras referencias:

Alternativa a la gráfica (...en construcción la siguiente sección...)


In [74]:
def create_Client(n):
    for p in range(1,multiprocessing.cpu_count()+1):
        if n%p==0:
            ns_p=int(n/p)
            cl = Client(n_workers=p,
                        threads_per_worker=1
                       )
            yield (cl,p,ns_p)

In [75]:
def mifun2(cl,p,ns_p):
    futures_Rcf_parallel = cl.map(Rcf_parallel,range(p),
                                             **{'f':f,
                                                'a':a,
                                                'b':b,
                                                'h_hat':h_hat,
                                                'n':n,
                                                'ns_p':ns_p}
                                  )
    results = cl.gather(futures_Rcf_parallel)
    aprox = h_hat*sum(results)
    return aprox

In [76]:
g = create_Client(n)

In [78]:
client.close()

In [79]:
client


Out[79]:

Client

Cluster

  • Workers: 0
  • Cores: 0
  • Memory: 0 B

In [80]:
client,p,ns_p = next(g)

In [81]:
client


Out[81]:

Client

Cluster

  • Workers: 2
  • Cores: 2
  • Memory: 6.25 GB

In [83]:
mifun2(client,p,ns_p)


Out[83]:
0.7468241328124707

In [84]:
%timeit -n 1 -r 1 -o mifun2(client,p,ns_p)


252 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Out[84]:
<TimeitResult : 252 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)>

In [85]:
_


Out[85]:
<TimeitResult : 252 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)>

In [86]:
%%timeit -n 3 -r 3 -o
mifun2(client,p,ns_p)
client.restart()


492 ms ± 8.2 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)
Out[86]:
<TimeitResult : 492 ms ± 8.2 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)>

In [87]:
_


Out[87]:
<TimeitResult : 492 ms ± 8.2 ms per loop (mean ± std. dev. of 3 runs, 3 loops each)>

In [94]:
client.close()


---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-94-1ef8b845d8fa> in <module>
----> 1 client.close()

AttributeError: 'tuple' object has no attribute 'close'

In [93]:
client,p,ns_p  = zip(*create_Client(n))


/home/miuser/.local/lib/python3.6/site-packages/distributed/dashboard/core.py:72: UserWarning: 
Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
  warnings.warn("\n" + msg)

In [95]:
client


Out[95]:
(<Client: 'tcp://127.0.0.1:36523' processes=1 threads=1, memory=6.25 GB>,
 <Client: 'tcp://127.0.0.1:38169' processes=2 threads=2, memory=6.25 GB>,
 <Client: 'tcp://127.0.0.1:42237' processes=4 threads=4, memory=6.25 GB>,
 <Client: 'tcp://127.0.0.1:35123' processes=5 threads=5, memory=6.25 GB>,
 <Client: 'tcp://127.0.0.1:33545' processes=8 threads=8, memory=6.25 GB>)

In [96]:
p


Out[96]:
(1, 2, 4, 5, 8)

In [97]:
ns_p


Out[97]:
(1000000, 500000, 250000, 200000, 125000)

In [98]:
mifun2(client[0],p[0],ns_p[0])


Out[98]:
0.7468241328124773

In [99]:
mifun2(client[1],p[1],ns_p[1])


Out[99]:
0.7468241328124707

In [ ]:
%timeit -n 3 -r 3 -o mifun2(client[1],p[1],ns_p[1])