Parallelization with Julia.

Getting started.

We will start by adding $n$ processes using function $\textit{addprocs(n)}$. When parallelizing, work will be distributed among processes.

Every process has its own identification. A list of id's of available processes can be obtained with function $\textit{procs()}$. The total number of available processes can be obtained with function $\textit{nprocs()}$.


In [2]:
addprocs(5)
println(procs())
println(nprocs())


[1,2,3,4,5,6]
6

However, to many processes will imply a lot of communication between processes, decreasing the performance. Function $\textit{rmprocs(v)}$ can be used to remove processes with id's contained in $v$.


In [3]:
rmprocs(5:6)


Out[3]:
:ok

In [4]:
println(procs())
workers()


[1
Out[4]:
3-element Array{Int64,1}:
 2
 3
 4
,2,3,4]

Now we will show how some "embarrasingly paralleliziable" functions would look like.

First example is tossing a coin, and counting how many heads we get.


In [6]:
nheads = Int64(0);
@time for i=1:4e8
    nheads += Int(rand(Bool))
end


 21.863490 seconds (400.00 M allocations: 5.960 GB, 3.34% gc time)

We can do better


In [7]:
function count_heads(n::Int64)
    c::Int64 = 0
    for i=1:n
        c += Int(rand(Bool))
    end
    c
end


Out[7]:
count_heads (generic function with 1 method)

In [9]:
@time count_heads(Int64(4e8))


  
Out[9]:
200014190
2.138496 seconds (6 allocations: 192 bytes)

We can do even better if we parallelize the code. The $\textit{@parallel}$ macro distributes work among available processes, and the reduction function $(+)$ will group the computed work into a final result.


In [11]:
@time nheads = @parallel (+) for i=1:4e8
  Int(rand(Bool))
end


  
Out[11]:
200019059
1.574346 seconds (4.57 k allocations: 320.975 KB)

As using functions improve the performance, we can try to combine it with $\textit{@parallel for}$.

Something to keep in mind, is that every process has to be able to run the functions we need. Every package should be called by all processes, every function and type should be run in all processes. For that purpose, we have the macro $\textit{@everywhere}$.


In [12]:
@everywhere function count_heads(n::Int64)
    c::Int64 = 0
    for i=1:n
        c += Int(rand(Bool))
    end
    c
end

In [14]:
@time nheads = @parallel (+) for i=1:4
    count_heads(Int64(1e8))
end


  
Out[14]:
199988219
1.569074 seconds (4.49 k allocations: 312.838 KB)

We can also use function $\textit{pmap(f,v)}$ to apply function $f$ to all elements in $v$, using all processes in the same way as $\textit{@parallel}$.

$\textit{pmap}$ is intended to be used when $f$ is a complex function and $\textit{length(v)}$ is small, whereas $\textit{@parallel for}$ should be used when tasks are simple and the number of tasks is large.


In [15]:
v = rand(10)
a = pmap(sqrt, v)
println(a)


Any[0.8621947868406267,0.8208619301085898,0.6728495227447142,0.8212105904843707,0.4880690024195767,0.3648453575417017,0.7170022853868998,0.9432472351396324,0.8877457345601799,0.4629096974162006]

We can compare the performance of $\textit{pmap}$ with the performance of $\textit{@parallel for}$ with functions.


In [17]:
v = [Int64(1e8) for i in 1:4]
@time nheads = pmap(count_heads, v)


  
Out[17]:
4-element Array{Any,1}:
 49998050
 50000602
 49995506
 50000205
1.517635 seconds (666 allocations: 45.609 KB)

Shared arrays.

If we wish to modify an array in parallel, every process should be able to access the array. For that reason, we have the type $\textit{SharedArray}$. It works in the same way as a regular $\textit{Array}$, but every process will have access to it.


In [19]:
workspace() # removes all variables
a = zeros(10)
@parallel for i=1:10
    a[i] = i
end


Out[19]:
3-element Array{Any,1}:
 Future(2,1,42,Nullable{Any}())
 Future(3,1,43,Nullable{Any}())
 Future(4,1,44,Nullable{Any}())

In [20]:
fetch(a)


Out[20]:
10-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

In [21]:
workspace()
a = SharedArray(Float64,10)
@parallel for i=1:10
  a[i] = i
end


Out[21]:
3-element Array{Any,1}:
 Future(2,1,51,Nullable{Any}())
 Future(3,1,52,Nullable{Any}())
 Future(4,1,53,Nullable{Any}())

In [22]:
println(typeof(a))
println(a)
println(a[5])


SharedArray{Float64,1}
[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0]
5.0

If we really need to recover the $\textit{Array}$ type, we can use the function $\textit{sdata}$.


In [23]:
b = sdata(a)
println(b)
println(typeof(b))


[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0]
Array{Float64,1}

Parallelization at a lower level

Sometimes, for a better performance, it is good to know how $\textit{@parallel for}$ and $\textit{pmap}$ work. For example, if we want to distribute unpredictable or unbalanced tasks among all processes, we want to assign tasks to processes as soon as jobs are done ($\textit{dynamic scheduling}$).

The function $\textit{remotecall(f, i, ...)}$ is called and performed immediatly on one process, and the result is to apply function $f(...)$ on the process $i$. The macro $\textit{@spawnat}$ evaluates the expression provided in the second argument in the process with id provided in the first argument.

The result of a $\textit{remotecall}$ or a $\textit{@spawnat}$ is of type $\textit{Future}$; the full value of a $\textit{Future}$ can be obtained using function $\textit{fetch}$.


In [24]:
workspace()
r = remotecall(rand, 2, 2, 2) # Parameters are: function to be called, process to be used, 
                              #                          ... (extra parameters to be passed to function)


Out[24]:
Future(2,1,54,Nullable{Any}())

In [25]:
s = @spawnat 3 1 .+ fetch(r) # Parameters are: process to be used, task to be run in the selected worker.


Out[25]:
Future(3,1,55,Nullable{Any}())

In [26]:
fetch(s)


Out[26]:
2x2 Array{Float64,2}:
 1.32116  1.28082
 1.20993  1.30656

The function $\textit{remotecall_fetch}$ yields the same result as $\textit{fetch(remotecall( ))}$, but it is more efficient.

Also, to make things easier, we can use the macro $\textit{@spawn}$. It works the same as $\textit{@spawnat}$, but it will pick the process where to evaluate the expression automatically.

We can also make use of function $\textit{wait()}$ on a returned $\textit{Future}$ to wait for a remote call to finish, and then make decisions and continue computations with the task already finished.

Performance tips

The ultimate goal of parallelization is to obtain a better performance of our code. So, we can not avoid a small reference to performance tips, to really have an optimal code.

· Use macro $\textit{@time}$ and pay attention to memory allocation

Not only the time per run is important, a very large amount of memory allocation is a sign of a non optimal code. Several other tools can be used to explore the code and optimise it ($\textit{Profiling, ProfileView, @code_warntype, Lint, TypeCheck}$ and more).

· Be consistent with variable types

Julia is very smart with variable type management, as it can change variable types to adapt to what we are doing (i.e. changing from an abstract type to $\textit{Float64}$ when computing). However, this takes a lot of time. Being consistent with our variable types will improve the code performance. Some things to be aware of when coding are:

  • If we define a variable whose type will not change, we should declare its type when declaring the variable.
  • We should declare variable types of local variables inside functions we define (keyword arguments).
  • Returned values of the functions we define should never change of type (type-stable functions).
  • Etc...

· Avoid the use of global variables

This leads to divide our code in pieces, where each piece can be written as a function. This also helps to write a friendly code (easier to mantain, reuse and understand).

· Access arrays memory order, along columns

If we plan to access to every entry of a multidimensional array (say a matrix), the optimal way to do it is to respect the order in which the language orders the array in memory. In the case of Julia, it is better to explore a matrix by columns. Different languages may have different conventions.

· Pre-allocate outputs

If we run a function that builds several variables (for example, several matrices) but we do not need to store them (for example, building random matrices and adding them), we can overwrite a variable to use less memory.