Notas para contenedor de docker:
Comando de docker para ejecución de la nota de forma local:
nota: cambiar <ruta a mi directorio>
por la ruta de directorio que se desea mapear a /datos
dentro del contenedor de docker.
docker run --rm -v <ruta a mi directorio>:/datos --name jupyterlab_r_kernel_local -p 8888:8888 -d palmoreck/jupyterlab_r_kernel:1.1.0
password para jupyterlab: qwerty
Detener el contenedor de docker:
docker stop jupyterlab_r_kernel_local
Documentación de la imagen de docker palmoreck/jupyterlab_r_kernel:1.1.0
en liga.
Esta nota utiliza métodos vistos en 1.5.Integracion_numerica
In [1]:
install.packages("microbenchmark",lib="/usr/local/lib/R/site-library/",
repos="https://cran.itam.mx/",verbose=TRUE)
In [2]:
install.packages("tictoc",lib="/usr/local/lib/R/site-library/",
repos="https://cran.itam.mx/",verbose=TRUE)
Entre las herramientas más populares en R para procesamiento en paralelo están:
Simple Network Of Workstations: snow, ver liga para más información (ya es una herramienta incluida en el paquete parallel).
multicore (funciona en la familia Unix pero no en Windows y ya es una herramienta incluida en el paquete parallel).
Rmpi. Paralelización en máquinas multicore y en clústers de máquinas.
Comentarios:
Las primeras dos son parte del paquete parallel el cual está incluido en la instalación de R desde la versión de R 2.14.0.
Los cuatro paquetes de arriba emplean un paradigma de programación en paralelo del tipo scatter/gather: se tienen múltiples instancias de R corriendo al mismo tiempo (revisar si esto es correcto para el caso de multicore...), ya sea en un clúster de máquinas, o en una máquina multicore. Una de las instancias se le denomina manager y las restantes workers. El cómputo en paralelo procede como sigue:
scatter: manager descompone el cómputo a realizar en chunks y envía (scatters) los chunks a workers.
chunk computation: workers hacen el cómputo en cada chunk y envían de regreso los resultados a manager.
gather: manager recibe (gathers) los resultados y los combina para resolver el problema.
In [3]:
library(parallel)
En el paquete parallel tenemos la función detectCores
que como su nombre lo indica obtiene el número de CPU's en nuestra máquina:
In [4]:
p<-detectCores()
In [5]:
p
Usamos makeCluster
para iniciar el snow cluster:
In [6]:
cl<-makeCluster(p)
La línea anterior crea p
workers y cada worker es un proceso de R (como el proceso manager) que corren en la misma máquina.
Comentario: el nombre de clúster en snow hace referencia al conjunto de workers y no a máquinas físicas. El objeto cl
contiene información de workers y es de clase cluster:
In [7]:
print(class(cl))
La comunicación entre manager y workers por default en makeCluster
es vía network sockets y es posible crear un clúster de máquinas físicas conectadas vía una network.
In [8]:
cl
En parallel se tiene la función clusterApply para ejecución en paralelo de una función aplicada a cada elemento de una lista: manager envía lista[1]
a cl[1]
, lista[2]
a cl[2]
,...,lista[p]
a cl[p]
. Además, si el número de elementos de la lista es mayor que el número de workers entonces clusterApply
realiza una asignación tipo round robin: por ejemplo, si la lista tiene $4$ elementos y se tienen $2$ workers entonces clusterApply
le envía a worker1 el elemento de la primera posición de la lista, a worker2 el segundo elemento. Después que worker1 finaliza el procesamiento, clusterApply
le envía la tercera posición y al terminar worker2 del procesamiento, clusterApply
le envía la cuarta posición (clusterApply
aprovecha la característica que tiene R de reciclar operaciones).
In [9]:
print(clusterApply(cl, 1:p,function(dummy)print("Hello world!")))
In [10]:
print(clusterApply(cl, 1:5,function(dummy)print("Hello world!")))
Obs: obsérvese que el resultado de clusterApply
es una lista.
clusterExport
nos permite transmitir valores de variables definidas en el workspace global de manager hacia los workspaces de cada worker bajo los mismos nombres de variables:
In [11]:
s<-"Hola mundo!"
In [12]:
clusterExport(cl,'s')
In [13]:
print(clusterApply(cl, 1:p,function(dummy)print(s)))
Comentario: ¿qué pasa si mi clusterExport
está dentro de una función?
In [14]:
mifun<-function(){
s_local<-"Hola mundo! usando variable local a mifun"
clusterExport(cl,"s_local")
print(clusterApply(cl, 1:p,function(dummy)print(s_local)))
}
In [15]:
mifun()
¿Cómo lo resuelvo?
In [16]:
mifun2<-function(){
s_global<<-"Hola mundo! usando superassignment en mifun2" #<<- superassignment
clusterExport(cl,"s_global")
clusterApply(cl, 1:p,function(dummy)print(s_global))
}
In [17]:
print(mifun2())
O bien con environment()
:
In [18]:
mifun3<-function(s_global2){
clusterExport(cl,"s_global2")
clusterApply(cl, 1:p,function(dummy)print(s_global2))
}
In [19]:
print(mifun3("Hola mundo! usando variable local a mifun3"))
In [20]:
mifun4<-function(s_global2){
clusterExport(cl,"s_global2", envir=environment())
clusterApply(cl, 1:p,function(dummy)print(s_global2))
}
In [21]:
print(mifun4("Hola mundo! usando environment()"))
In [20]:
library(microbenchmark)
library(tictoc)
In [21]:
f<-function(x)exp(-x**2)
In [22]:
a<-0
b<-1
n<-10**6
h_hat<-(b-a)/n
Forma secuencial
In [23]:
Rcf1<-function(f,a,b,n){
#Compute numerical approximation using rectangle or mid-point method in
#an interval.
#Nodes are generated via formula: x_i = a+(i+1/2)h_hat for i=0,1,...,n-1 and h_hat=(b-a)/n
#Args:
# f (function): function of integrand
# a (int): left point of interval
# b (int): right point of interval
# n (int): number of subintervals
#Returns:
# Rcf (float)
h_hat<-(b-a)/n
sum_res<-0
for(j in 0:(n-1)){
x<-a+(j+1/2.0)*h_hat
sum_res<-sum_res+f(x)
}
h_hat*sum_res
}
In [24]:
system.time(aprox<-Rcf1(f,a,b,n))
In [25]:
err_relativo<-function(aprox,obj)abs(aprox-obj)/abs(obj)
In [26]:
obj<-integrate(Vectorize(f),a,b) #en la documentación de integrate
#se menciona que se utilice Vectorize
In [27]:
err_relativo(aprox,obj$value)
In [28]:
Rcf2<-function(f,a,b,n){
#Compute numerical approximation using rectangle or mid-point method in
#an interval.
#Nodes are generated via formula: x_i = a+(i+1/2)h_hat for i=0,1,...,n-1 and h_hat=(b-a)/n
#Args:
# f (function): function of integrand
# a (int): left point of interval
# b (int): right point of interval
# n (int): number of subintervals
#Returns:
# Rcf (float)
h_hat<-(b-a)/n
sum_res<-0
x<-vapply(0:(n-1),function(j)a+(j+1/2.0)*h_hat,numeric(1))
for(j in 1:n){
sum_res<-sum_res+f(x[j])
}
h_hat*sum_res
}
In [29]:
system.time(aprox<-Rcf2(f,a,b,n))
In [30]:
err_relativo(aprox,obj$value)
Una implementación que utiliza la función sum
de R
es la siguiente:
In [31]:
Rcf3<-function(f,a,b,n){
#Compute numerical approximation using rectangle or mid-point method in
#an interval.
#Nodes are generated via formula: x_i = a+(i+1/2)h_hat for i=0,1,...,n-1 and h_hat=(b-a)/n
#Args:
# f (function): function of integrand
# a (int): left point of interval
# b (int): right point of interval
# n (int): number of subintervals
#Returns:
# Rcf (float)
h_hat<-(b-a)/n
x<-vapply(0:(n-1),function(j)a+(j+1/2.0)*h_hat,numeric(1))
h_hat*sum(f(x))
}
In [32]:
system.time(aprox<-Rcf3(f,a,b,n))
In [33]:
err_relativo(aprox,obj$value)
In [34]:
library(tictoc)
In [35]:
tic("medición de tiempo de regla de trapecio secuencial con tictoc")
tic()
Rcf1(f,a,b,n)
toc()
In [36]:
mbk<-microbenchmark(
Rcf1(f,a,b,n),
Rcf2(f,a,b,n),
Rcf3(f,a,b,n),
times=10
)
In [37]:
print(mbk)
Forma en paralelo
Tomemos la implementación secuencial más rápida: Rcf1
para paralelizarla:
In [38]:
ns_p<-as.integer(n/p)#número de subintervalos por proceso
#se asume que n es divisible por p
#si no se cumple esto, se puede definir
#ns_p=as.integer(n/p) habiendo definido n primero
#y redefinir n como:
#n=p*ns_p
In [39]:
sprintf("número de subintervalos: %d",n)
In [40]:
sprintf("número de subintervalos por proceso: %d",ns_p)
In [41]:
Rcf_parallel<-function(mi_id){
begin<-mi_id*ns_p
end<-begin+ns_p
suma_res<-0
for(j in begin:(end-1)){
x<-a+(j+1/2.0)*h_hat
suma_res<-suma_res+f(x)
}
suma_res
}
In [42]:
clusterExport(cl,c('ns_p','a','f','h_hat'))
Utilizamos la función Reduce
y la función sum
a la lista que resulta de clusterApply
:
In [43]:
tic("regla Rcf_parallel")
result<-clusterApply(cl,0:(p-1),Rcf_parallel)
aprox<-h_hat*Reduce(sum,result)
toc()
In [44]:
err_relativo(aprox,obj$value)
La encapsulamos en una función para pasarla a microbenchmark
:
In [45]:
clapply<-function(cl,p){
result<-clusterApply(cl,0:(p-1),Rcf_parallel)
aprox<-h_hat*Reduce(sum,result)
}
In [46]:
mbk<-microbenchmark(
Rcf1(f,a,b,n),
clapply(cl,p),
times=10
)
In [47]:
print(mbk)
La función clusterSplit
divide una secuencia en chunks de subsecuencias consecutivas. El número de chunks que regresa es igual al número de workers. clusterSplit
intenta que los chunks sean similares en longitud y la división se realiza en manager.
En la siguiente implementación se recibe un chunk
de nodos:
In [48]:
Rcf_parallel2<-function(chunk){
#Únicamente se acumula el valor de f en el nodo:
suma_res<-0
for(x in chunk){
suma_res<-suma_res+f(x)
}
suma_res
}
In [49]:
tic()
#se crean los chunks del conjunto de nodos:
chunks<-clusterSplit(cl,vapply(0:(n-1),function(j)a+(j+1/2.0)*h_hat,numeric(1)))
result<-clusterApply(cl,chunks,Rcf_parallel2)
aprox<-h_hat*Reduce(sum,result)
toc()
In [50]:
err_relativo(aprox,obj$value)
En la siguiente implementación, se divide en chunks la secuencia: 0,1,...,n-1
:
In [51]:
Rcf_parallel3<-function(chunk){
ns_p<-length(chunk)
begin<-chunk[1]
end<-begin+ns_p
suma_res<-0
for(j in begin:(end-1)){
x<-a+(j+1/2.0)*h_hat
suma_res<-suma_res+f(x)
}
suma_res
}
In [52]:
tic()
#se crean los chunks de la secuencia: 0,1,...,n-1
chunks<-clusterSplit(cl,0:(n-1))
result<-clusterApply(cl,chunks,Rcf_parallel3)
aprox<-h_hat*Reduce(sum,result)
toc()
In [53]:
err_relativo(aprox,obj$value)
Medimos tiempos con microbenchmark
:
In [54]:
clapply2<-function(cl){
chunks<-clusterSplit(cl,vapply(0:(n-1),function(j)a+(j+1/2.0)*h_hat,numeric(1)))
result<-clusterApply(cl,chunks,Rcf_parallel2)
aprox<-h_hat*Reduce(sum,result)
}
In [55]:
clapply3<-function(cl){
chunks<-clusterSplit(cl,0:(n-1))
result<-clusterApply(cl,chunks,Rcf_parallel3)
aprox<-h_hat*Reduce(sum,result)
}
In [56]:
mbk<-microbenchmark(
Rcf1(f,a,b,n),
clapply(cl,p),
clapply2(cl),
clapply3(cl),
times=10
)
In [57]:
print(mbk)
In [58]:
library(ggplot2)
Nota: los siguientes resultados se obtuvieron con una máquina con 8 cores, así que pueden no coincidir con los resultados previos de esta sección.
Una buena práctica es detener el clúster, observar que en la documentación se menciona: It is good practice to shut down the workers by calling stopCluster: however the workers will terminate themselves once the socket on which they are listening for commands becomes unavailable, which it should if the master R session is completed (or its process dies).
In [59]:
stopCluster(cl)
En lo siguiente se utiliza Rcf_parallel
pues tuvo el menor tiempo de ejecución. Usaremos ésta implementación para realizar la gráfica variando el número de procesos de $1$ hasta detectCores()
.
In [60]:
Rcf_parallel<-function(mi_id){
begin<-mi_id*ns_p
end<-begin+ns_p
suma_res<-0
for(j in begin:(end-1)){
x<-a+(j+1/2.0)*h_hat
suma_res<-suma_res+f(x)
}
suma_res
}
In [61]:
n<-10**6 #cambiar n si se desea
h_hat<-(b-a)/n
In [62]:
mifun<-function(cl,p,ns_p){
result<-clusterApply(cl,0:(p-1),Rcf_parallel)
aprox<-h_hat*Reduce(sum,result)
err_relativo(aprox,obj$value)
}
In [63]:
dim<-sum(n%%(1:detectCores())==0)
In [64]:
err_np<-vector("double", dim)
n_cpus<-vector("integer",dim)
In [65]:
i<-1
for(p in 1:detectCores()){
if(n%%p==0){
ns_p<-n/p
cl<-makeCluster(p)
clusterExport(cl,c('a','f','h_hat','ns_p'))
err_np[i]<-mifun(cl,p,ns_p)
n_cpus[i]<-p
stopCluster(cl)
i<-i+1
}
}
In [66]:
print(err_np)
In [67]:
print(n_cpus)
In [68]:
v<-vector("double", dim)
n_cpus<-vector("integer",dim)
In [69]:
i<-1
for(p in 1:detectCores()){
if(n%%p==0){
ns_p<-n/p
cl<-makeCluster(p)
clusterExport(cl,c('a','f','h_hat','ns_p'))
df<-print(microbenchmark(mifun(cl,p,ns_p),times=10,unit="s"))
stopCluster(cl)
v[i]<-df$median
n_cpus[i]<-p
i<-i+1
}
}
In [70]:
print(v)
In [71]:
print(n_cpus)
In [72]:
gg<-ggplot()
In [73]:
gg+
geom_line(aes(x=n_cpus,y=v))
Ejercicio: elegir regla de Simpson o integración por el método de Monte Carlo para generar la gráfica anterior. No olviden medir errores relativos. Tales reglas están en 1.5.Integracion_numerica.
Si eligieron regla de Simpson:
Medir tiempo de las $3$ versiones, por ejemplo, para el caso del rectángulo se tienen Rcf, Rcf_parallel, Rcf_parallel2, Rcf_parallel3.
Elegir la mejor versión en medición del tiempo de ejecución y variar el número de procesadores. Aquí se miden errores relativos y se hace la gráfica de número de cpus vs tiempo.
Referencias:
N. Matloff, Parallel Computing for Data Science. With Examples in R, C++ and CUDA, 2014.
Otras referencias:
Otro paquete a revisar: