PyShop Session 1


CPU and GPU Parallelization: How to Super Compute

This session will introduce basic speed issues in the Python interpreter and the principles of parallelization. We will work with a real problem, that of estimating an integral by monte carlo. To begin, we'll see how to solve this problem in NumPy (hopefully a review at this point). Then we'll see how to use just-in-time compiling (JIT) to increase speed with minimal effort. Next, we'll discuss some of the basics of how a computer carries out computation, including a discussion of hard disks, ram, CPU, and cores. We'll then look at some simple parallelization algorithms and how to break down a problem to be parallelized. Finally, we'll cover how to use a graphics card to do GPU computation using PyCUDA, which can offer from 300 to 3000 times speed up over serial/native Python, and some publicly available resources. By the end of this session you should understand the logic behind parallelization and have a general idea of what resources are available in Python. You should also have an understanding of how to use PyCUDA to run code on a GPU.

Python and Speed

When you run code in Python, you benefit from the use of the interpreter. This takes out the step of having to compile your code. However, you suffer from the problem of dynamic typing. This is what the interpreter must do in order to run your code dynamically.

Dynamic typing refers to how the interpreter determines the type of data contained in an object. Each object has associated to it a data type, and each time the interpreter encounters an object it must check to see what type it is. This is contrary to C, where variables have hard coded data types as soon as they are defined.

Beyond the issue of typing, Python has something called the "Global Interpreter Lock (GIL)". This refers to the fact that Python only allows a single thread to run at any one point in time. Because of this, sprouting threads could actually slow your code down as Python switches back and forth.

A Working Example

Say we would like to calculate the following expectation:

$$ \phi = \mathbb{E}[f(x)] = \int_X f(x)g(x)dx \\ X \subset \mathbb{R}^n $$

If we would like to estimate this by a quadrature rule with, say, $100$ points in each dimension, then we need to make $100^n$ function evaluations, calculate the pdf $g(x)$ $100^n$ times, and generate $100^n$ quadrature weights in high dimensions. This poses a massive computational problem and for dimensions higher than 10 is usually unfeasible (try it with scipy!).

Alternatively, we can use monte carlo methods to leverage the law of large numbers to our advantage. Given we know the law of $x$, we can generate random variables, take functional evaluations, and average to get an estimation of the expectation. By the law of large numbers, this estimation will converge at a rate $\frac{1}{\sqrt{M}}$, where $M$ is the sample size. That is, we can write

$$ \mathbb{E}[f(x)] \approx \hat{\phi} = \frac{V}{M}\sum_{i = 1}^M f(x_i)g(x_i) $$

where $V=\int_Xdx $ is the volume of $X$. We can also get a variance estimate by

$$ \sigma^2 \approx \hat{\sigma^2} = \frac{V^2}{M(M - 1)} \sum_{i = 1}^M \left(f(x_i)g(x_i) - \frac{1}{V}\hat{\phi} \right)^2 $$

We can code this in python fairly easily:


In [104]:
import numpy as np
# NOTE: LLVM is not supported on ARM, so no numba
from numba import jit
# NOTE: No gpu on raspberry pi, so no pycuda
import pycuda.driver as cuda
import pycuda.autoinit
from pycuda.compiler import SourceModule
# NOTE: multiprocessing on raspberry pi?  you're joking. But wait! it's quad core!
import multiprocessing as mp
import time

In [2]:
def monte_carlo_expectation_serial(f, g, M, X):
    """
    A loop based monte carlo integration.
    
    inputs:
        f    :    function; the moment to be estimated
        g    :    function; the distribution function of x
        M    :    scalar; the number of points to sample
        X    :    ndarray; bounds in R^n for the support
        
    """
    # Generate points uniformly from the sample space
    N = X.shape[0]
    points = np.random.rand(M, N)
    
    #Scale the points
    points = points*(np.array([X[:, 1], ]*M)
                     - np.array([X[:, 0], ]*M))\
        +  np.array([X[:, 0], ]*M)
        
    # Calculate the volume.  Assume a hyperrectangle for support.
    V = np.prod(np.abs(X[:,1] - X[:,0]))

    # Calculate the sum as a loop
    sum1 = 0.0
    for i in range(0, M):
        sum1 += f(points[i, :])*g(points[i, :], X)
    sum1 /= M
    
    # Calculate the variance
    var1 = 0.0
    for i in range(0, M):
        var1 += (f(points[i, :])*g(points[i, :], X) - sum1)**2

    # Return the result
    return V*sum1, V**2*var1/(M*(M - 1))

The above function takes in a support for the random variable (assumed to be a hyperrectangle), a moment function, a distribution function, and number of points to sample. It returns an estimate for the expectation and its standard error. This function is very flexible and we could define many different moments and istributions, but for now we will define the moment to be the euclidian norm and the distribution to be the n-dimensional uniform:

$$ f(x) = \Vert x \Vert^2 = \sum_1^N x_i^2 \\ g(x) = Uni(X) = \prod_1^N\frac{1}{\overline{X_i} - \underline{X_i}} $$

For now we'll use numpy to calculate the norm.


In [3]:
M = 100
N = 2
points = np.random.rand(M, N)
np.linalg.norm(points[0,:])**2


Out[3]:
0.14310621492794479

And we'll define our functions $f$ and $g$ to pass to the monte carlo estimator:


In [5]:
def f(x):
    return np.linalg.norm(x)**2

def g(x, X):
    return np.prod(1/(X[:, 1] - X[:, 0]))

As we've said before, these functions are objects that we can pass to our estimator. Finally, we can run it and see what we get. We'll assume $x$ is standard uniform, so the true value should be $\frac{N}{3}$.


In [5]:
M = 10
N = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T

monte_carlo_expectation_serial(f, g, M, X)


Out[5]:
(0.40562423563716499, 0.0088462251727122083)

Notice this gives us a value that is far from the truth (especially for high values of $N$, try it!) and with a fairly high standard error. However, our sample size is just $10$. Let's see what happens as $M$ increases:


In [8]:
for M in [100, 1000, 10000]:
    print(monte_carlo_expectation_serial(f, g, M, X))


(0.63712709429927616, 0.0019073392857243774)
(0.67816661109642673, 0.00017961591501247626)
(0.67330751666642097, 1.805657183897351e-05)

We can also time our code to see how its doing. Contrary to session 3, we'll use the %timeit magic. This will run an optimum number of loops over our function (optimum in the sense that it won't take too long) and return a 'best of' run time.


In [9]:
for M in [100, 1000, 10000]:
    %timeit monte_carlo_expectation_serial(f, g, M, X)


The slowest run took 4.38 times longer than the fastest. This could mean that an intermediate result is being cached 
100 loops, best of 3: 1.88 ms per loop
100 loops, best of 3: 19 ms per loop
1 loops, best of 3: 191 ms per loop

You're probably asking yourself why, after all this, are we using native Python again?! It's just for exposition. We can do the same thing in NumPy and compare. Notice that we need to rewrite our functions to take array arguments and not vector arguments. For f and g we add the axis argument to tell NumPy to operate along the '1'th axis and in the monte carlo estimation we replace the loop by a dot product.


In [10]:
x = np.array([[1,1],[2,2]])
np.prod(np.ones((x.shape[0], x.shape[1]))/(X[:, 1] - X[:, 0]), axis=1)


Out[10]:
array([ 1.,  1.])

In [3]:
def f_vec(x):
    return np.linalg.norm(x, axis = 1)**2

def g_vec(x, X):
    return np.prod(np.ones((x.shape[0], x.shape[1]))
                   /(X[:, 1] - X[:, 0]), axis=1)

def monte_carlo_expectation_numpy(f, g, M, X):
    """
    A loop based monte carlo integration.
    
    inputs:
        f    :    function; the moment to be estimated
        g    :    function; the distribution function of x
        M    :    scalar; the number of points to sample
        X    :    ndarray; bounds in R^n for the support
        
    """
    # Generate points uniformly from the sample space
    N = X.shape[0]
    points = np.random.rand(M, N)
    
    #Scale the points
    points = points*(np.array([X[:, 1], ]*M)
                     - np.array([X[:, 0], ]*M))\
        +  np.array([X[:, 0], ]*M)
        
    # Calculate the volume.  Assume a hyperrectangle for support.
    V = np.prod(np.abs(X[:,1] - X[:,0]))

    # Calculate the sum as a dot product
    sum1 = np.dot(f_vec(points), g_vec(points, X))/M
    
    # Calculate the variance
    var1 = np.linalg.norm(f_vec(points)*g_vec(points, X) - sum1)**2

    # Return the result
    return V*sum1, V**2*var1/(M*(M - 1))

That's it! Now we can look and compare our results. Keep in mind that the values are random, but should converge for high $M$.


In [6]:
M = 100
N = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T

print(monte_carlo_expectation_serial(f, g, M, X))
print(monte_carlo_expectation_numpy(f_vec, g_vec, M, X))


(0.58549310304394508, 0.0016324008286405893)
(0.68501904409723469, 0.0018532056651179671)

Success! Well, at least in the sense that we got about the same answer. Now let's see how we do on time.


In [14]:
M = 100
N = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T
for M in [100, 1000, 10000]:
    print("M = %s" %M)
    %timeit monte_carlo_expectation_serial(f, g, M, X)
    %timeit monte_carlo_expectation_numpy(f_vec, g_vec, M, X)


M = 100
1000 loops, best of 3: 1.91 ms per loop
10000 loops, best of 3: 118 µs per loop
M = 1000
100 loops, best of 3: 19 ms per loop
1000 loops, best of 3: 712 µs per loop
M = 10000
10 loops, best of 3: 190 ms per loop
100 loops, best of 3: 6.59 ms per loop

Now we can really shout, "Success!" Our NumPy function is 20 times faster than the native Python implimentation. We could have also tried to use vectorize to move from native to NumPy:


In [14]:
M = 100
N = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T

# This fails.  Our function is too complex for vectorize
vectorize_monte_carlo_serial = np.vectorize(monte_carlo_expectation_serial)

print(vectorize_monte_carlo_serial(f, g, M, X))


---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-14-0c74be08b3e7> in <module>()
      6 vectorize_monte_carlo_serial = np.vectorize(monte_carlo_expectation_serial)
      7 
----> 8 print(vectorize_monte_carlo_serial(f, g, M, X))

/home/tmabbot/anaconda/envs/snakes/lib/python3.3/site-packages/numpy/lib/function_base.py in __call__(self, *args, **kwargs)
   1698             vargs.extend([kwargs[_n] for _n in names])
   1699 
-> 1700         return self._vectorize_call(func=func, args=vargs)
   1701 
   1702     def _get_ufunc_and_otypes(self, func, args):

/home/tmabbot/anaconda/envs/snakes/lib/python3.3/site-packages/numpy/lib/function_base.py in _vectorize_call(self, func, args)
   1761             _res = func()
   1762         else:
-> 1763             ufunc, otypes = self._get_ufunc_and_otypes(func=func, args=args)
   1764 
   1765             # Convert args to object arrays first

/home/tmabbot/anaconda/envs/snakes/lib/python3.3/site-packages/numpy/lib/function_base.py in _get_ufunc_and_otypes(self, func, args)
   1723             # arrays (the input values are not checked to ensure this)
   1724             inputs = [asarray(_a).flat[0] for _a in args]
-> 1725             outputs = func(*inputs)
   1726 
   1727             # Performance note: profiling indicates that -- for simple

<ipython-input-3-f3bfaaadc2a5> in monte_carlo_expectation_serial(f, g, M, X)
     11     """
     12     # Generate points uniformly from the sample space
---> 13     N = X.shape[0]
     14     points = np.random.rand(M, N)
     15 

IndexError: tuple index out of range

Sadly, it seems our function is too complex for vectorize. We could further simplify it and vectorize sub-routines, but I'm not going to treat that here.

So far we have simply repeated material we already know. There are several other ways to speed up our code.

Speed Based on Types

As mentioned in the introduction, the dynamic typing done by the Python interpreter is one of the main drivers of sluggishness. Given that, we can try to solve this problem by specifying types directly. The two ways this is done is by using Cython and Numba.

Cython

Throughout this course I've mentioned the fact that C is faster than Python. Given this, some bright people created Cython in order to allow you to compile your Python code directly into C. This requires a few simple modifications, changing variable definitions and some syntax, and you can achieve a large boost. However, it requires a working compiler and can be rather tricky to debug. Because of this, I won't be treating it here, but the documntation offers plenty of information to get you started if you're interested.

Numba

The way around this problem of compilation and types is something called a "just-in-time compiler", or a JIT compiler. JIT compilation works in a similar way to the Python interpreter, but with the major difference that it stores compiled code as it runs, returning to this code on later passes. Therefore, when you call a function many times, it will be run as a pre-compiled C program on later passes. This is a big black box and apparently the JIT compiler is hard for even computer scientists to understand, but it is sufficient to understand that it is the best of both worlds: no explicit compilation step and dynamic typing.

Additionally, according to the Numba documentation, you could also reduce the memory footprint, but they do not say how or why...

Using a just in time compiler requires simply importing it and adding the @jit decorator to your functions. Check it out:


In [7]:
import time
#If running on RasPi this wont work

# Simply add the jit decorator before the function
@jit
def jitted_monte_carlo_serial(f, g, M, X):
    """
    A loop based monte carlo integration.
    
    inputs:
        f    :    function; the moment to be estimated
        g    :    function; the distribution function of x
        M    :    scalar; the number of points to sample
        X    :    ndarray; bounds in R^n for the support
        
    """
    # Generate points uniformly from the sample space
    N = X.shape[0]
    points = np.random.rand(M, N)
    
    #Scale the points
    points = points*(np.array([X[:, 1], ]*M)
                     - np.array([X[:, 0], ]*M)) +  np.array([X[:, 0], ]*M)
        
    # Calculate the volume.  Assume a hyperrectangle for support.
    V = np.prod(np.abs(X[:,1] - X[:,0]))

    # Calculate the sum as a loop
    sum1 = 0.0
    for i in range(0, M):
        sum1 += f(points[i, :])*g(points[i, :], X)
    sum1 /= M
    
    # Calculate the variance
    var1 = 0.0
    for i in range(0, M):
        var1 += (f(points[i, :]) - sum1)**2
    
    # Return the result
    return V*sum1, V**2*var1/(M*(M - 1))

@jit
def jitted_monte_carlo_numpy(f, g, M, X):
    """
    A loop based monte carlo integration.
    
    inputs:
        f    :    function; the moment to be estimated
        g    :    function; the distribution function of x
        M    :    scalar; the number of points to sample
        X    :    ndarray; bounds in R^n for the support
        
    """
    # Generate points uniformly from the sample space
    N = X.shape[0]
    points = np.random.rand(M, N)
    
    #Scale the points
    points = points*(np.array([X[:, 1], ]*M)
                     - np.array([X[:, 0], ]*M)) +  np.array([X[:, 0], ]*M)
        
    # Calculate the volume.  Assume a hyperrectangle for support.
    V = np.prod(np.abs(X[:,1] - X[:,0]))

    # Calculate the sum as a dot product
    sum1 = np.dot(f_vec(points), g_vec(points, X))/M
    
    # Calculate the variance
    var1 = np.linalg.norm(f_vec(points)*g_vec(points, X) - sum1)**2

    # Return the result
    return V*sum1, V**2*var1/(M*(M - 1))

M = 100
N = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T

%timeit jitted_monte_carlo_serial(f, g, M, X)
%timeit jitted_monte_carlo_numpy(f_vec, g_vec, M, X)

start = time.time()
jitted_monte_carlo_serial(f,g,M,X)
print(time.time() - start)

start = time.time()
jitted_monte_carlo_numpy(f,g,M,X)
print(time.time() - start)


The slowest run took 318.95 times longer than the fastest. This could mean that an intermediate result is being cached 
1 loops, best of 3: 1.46 ms per loop
The slowest run took 1621.33 times longer than the fastest. This could mean that an intermediate result is being cached 
1 loops, best of 3: 164 µs per loop
0.001901388168334961
0.0003018379211425781

Notice here the response of the %timeit magic. The slowest run will be hundreds or even thousands of times slower than the fastest. This is exactly what we expect, given that on subsequent runs it will use pre-compiled code.

That being said, our function only runs once, so if we only want to estimate a single expectation, this doesn't help. However, we could get some speed up from jitting the function calls within the estimation:


In [8]:
# However, to benefit you need to jit all functions
@jit
def jit_f(x):
    return np.linalg.norm(x)**2
@jit
def jit_g(x, X):
    return np.prod(1/(X[:, 1] - X[:, 0]))

@jit
def jit_f_vec(x):
    return np.linalg.norm(x, axis = 1)**2

@jit
def jit_g_vec(x, X):
    return np.prod(np.ones((x.shape[0], x.shape[1]))/(X[:, 1] - X[:, 0]), axis=1)


M = 100
N = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T

%timeit jitted_monte_carlo_serial(jit_f, jit_g, M, X)
%timeit jitted_monte_carlo_numpy(jit_f_vec, jit_g_vec, M, X)


The slowest run took 72.41 times longer than the fastest. This could mean that an intermediate result is being cached 
1000 loops, best of 3: 1.57 ms per loop
10000 loops, best of 3: 148 µs per loop

However, you'll notice that this didn't quite work. That is because we are already using numpy to do all of our calculations. If we had non-standard moments that were not calculated using numpy functions, we would definitely get some boost. Here's an example of some simple functions that speed up using @jit.


In [9]:
# Since we didn't get a speed up, let's see a simple example that does
def test_func(x):
    return np.sum(x)

@jit
def jit_test_func(x):
    return np.sum(x)

x = np.arange(1000)
%timeit test_func(x)
%timeit jit_test_func(x)


The slowest run took 44.02 times longer than the fastest. This could mean that an intermediate result is being cached 
100000 loops, best of 3: 2.34 µs per loop
The slowest run took 82672.19 times longer than the fastest. This could mean that an intermediate result is being cached 
1000000 loops, best of 3: 587 ns per loop

JIT compilation represents a truly plug-and-play modification to your code that could give you significant gains. To get the most out of it, keep your code simple and loops short. Use lots of functions calls and keep argument packing and unpacking to a minimum.

Beyond type based speed ups, we can look at how to break down our problem into subproblems which are parallelizable.

Parallelization

Before we discuss parallelization, it helps to discuss how a computer works (in broad terms). The main parts of the computer are the hard disk, the ram (or memory), and the CPU. The hard disk stores long term data. The ram stores data for immediate use in calculation. The CPU carries out calculation based on logical and arithmetical operations.

CPU Parallelization

The CPU itself is made of several parts, which you can see in the diagram below. Here is a list of the main ones:

  1. Arithmetic Logic Unit (ALU) - Executes arithmetical calculations and logic functions. Also possible to have a floating point unit (FPU).
  2. Control Unit - Links incoming data, decodes it, and sends it to the execution unit.
  3. Registers - Temporary storage for data to be processed.

<img src="cpu.png" width="500" /img> Source: ccm.net

These all make up a core. So for a multiprocessor there will be several sets of these components and a cache and bus to move things about.

Taking advantage of multiple cores requires splitting up the work. When we run code in Python, we run it "in serial". That is, we run a single chain of instructions on the CPU. Our problem, however, may be "parallelizable", meaning that we could simultaneously calculate portions of the solution and combine the sub-solutions at the end, running our code "in parallel". This is the idea behind using "parallelization".

Types of Parallelism

There are several ways to speed up code using the idea of parallelization. The first is called "bit-level parallelism". This is done directly on the CPU and is how you take advantage of Moore's law. Since the introduction of transistor based computer hardware, technology has essentially followed Moore's law, which states that the number of transistors on a chip will double approximately every two years. Roughly, that means that the speed of chips is increasing exponentially. That being said, we are nearing the end of this exponential growth in the complexity of computer chip design. Eventually (suprisingly soon), the transistors will be built on an atomic scale and the distance between them will become so small that chips cannot handle more density. Eventually Moore's law will end.

When someone discusses 32-bit versus 64-bit chip architecture, this is what they are talking about. You as a programmer will never be able to interact with this type of parallelism. Sorry. This part was just for your culture.

Secon, there is "instruction-level parallelism". This is the process of determining what functions or programs will be run in what order. Again, this is beyond our ability (unless you are a computer scientist and didn't tell me), but it's nice to know.

Finally, there is "task parallelism". This is what you can and will take advantage of in order to speed up your code. This is the idea of breaking down your program into a series of very simplistic tasks and splitting them up into portions that can be run seperately.

Threaded v. Multiprocessing

Given that you would like to leverage the task parallelism of your problem, you may have heard several terms referring to parallel computing: threading and multiprocessing. Threading is when a single instance of a program sends out several sets of instructions and executes them seperately. Multiprocessing refers to spawning several instances of a program and running the processes seperately. This may seem like an unecessary differentiation now, but these two are very different.

In particular, Python does not allow threading because of the "Global Interpreter Lock", or GIL. This allows the interpreter to run only a single rinstruction at a time. In fact, running a multithreaded program could actually be slower than a serial one, as Python must switch between threads. It is for this reason that we will be using multiprocessing for now. This gets around the problem as each instance of the interpreter can run its own instructions. That being said, in a moment we will do some GPU parallelization that uses threads, but this is a seperate issue.

Parallel Monte Carlo

Monte carlo methods lends itself well to parallelization. Why? Because it is essentially just a sum, and this sum can be split up into partial sums. So, to parallelize our estimation we simply split the original data into several parts, calculate the partial sums, then retrieve the data. If that doesn't make sense, your homework is to write out the monte carlo problem as the sum of two partial sums.

To do this, we'll make use of the multiprocessing package. This contains functions for sprouting workers, executing the sub-routines, and gathering the data. This is probably best explained in an example. To make things more complicated, we'll define our first class of objects to estimate the expectation.

Here's the code, heavily annotated to explain the logic:


In [10]:
class cpu_parallel_monte_carlo:
    """
    A class containing CPU based parallelization of the serial monte carlo
    
    inputs:
        f    :    function; the moment to be estimated
        g    :    function; the distribution function of x
        M    :    scalar; the number of points to sample
        X    :    ndarray; bounds in R^n for the support
        
    """
    # Define initial characteristics
    # NOTE:  When you define a variable of some class, this function is run.
    # It initializes differeent characteristics of the object.
    def __init__(self, f, g, M, X, workers):
        # Check if the number of operations is divisible by the number of workers
        # NOTE: This may not be an issue, but better safe than sorry
        if M%workers is not 0:
            raise ValueError('The vector of points must be evenly divisible'
                             + ' by the number of workers.')
        # Initialize the object attributes
        self.f = f
        self.g = g
        self.M = M
        self.X = X
        self.workers = workers
        # mp.Queue() creates an object where you can store the output of the
        # different processes as you go.  As you'll see, we can then recover
        # this data from the queue
        self.queue = mp.Queue()
        self.N =  X.shape[0]
        self.points = np.random.rand(M, N)*(np.array([X[:, 1], ]*M) 
                                           - np.array([X[:, 0], ]*M))\
            +  np.array([X[:, 0], ]*M)
        self.V = np.prod(np.abs(X[:,1] - X[:,0]))
        # When the object is defined we go ahead and estimate.
        # NOTE: we'll change this in a moment
        self.mc_mean()
        self.mc_var()


    # Define functions that can be parallelized
    # NOTE: This function simply uses the process number to take a slice of
    # the points and then calculate the sum over these points.
    def partial_sum(self, process):
        """
        A function that will calculate the partial sum for the monte carlo
        mean.
        
        inputs:
            process   :    int; the process number

        """
        # Unpack some info
        # NOTE: This is the length of the slice of points.  We could have
        # defined this as an attribute of the object itself, but oh well!
        K = int(self.M/self.workers)
        partial_points = self.points[process*K:(process + 1)*K, :]
        
        # Calculate the sum as a loop
        sum1 = 0.0
        for i in range(0, K):
            sum1 += self.f(partial_points[i, :])*self.g(partial_points[i, :], self.X)
            
        # Return the calculated sum to the queue
        # NOTE: This queue will store the output intil the processes
        # have caught up to eachother
        self.queue.put(sum1)
    
    # NOTE: This functionscalculates the partial sum for the standard
    # error calculation.  The steps are identical, but the formula is different
    def partial_var(self, process):
        """
        A function that will calculate the partial sum for
        the monte carlo variance.
        
        inputs:
            process   :    int; the process number

        """
        # Unpack some info
        K = int(self.M/self.workers)
        partial_points = self.points[process*K:(process + 1)*K, :]
        
        #Calculate the paritial sums as a loop
        var1 = 0.0
        sum1 = self.mean/self.V
        for i in range(0, K):
            var1 += (f(partial_points[i, :])*g(partial_points[i, :], X) - sum1)**2
        self.queue.put(var1)
    
    # Calculate the estimation of the mean using multiprocessing
    def mc_mean(self):
        """
        A method to calculate the mean by parallel montecarlo.
        
        """
        # Create a list of processes to run
        # NOTE: mp.Process requires 2 arguments: target is the function to be run
        # and kwargs are keyword arguments passed as a dictionary.  We create
        # a list of processes to iterate over, one for each worker
        processes = [mp.Process(target=self.partial_sum,
                                kwargs=dict(process=i))
                     for i in range(0, self.workers)]
        
        # Run the processes
        for p in processes:
            # This command tells the processes to run.
            # NOTE: The processes do not need to wait for the others to finish in order to start.
            p.start()
        
        # When the processes are done, exit
        for p in processes:
            # This command tells the spawning instance, the one running your program, to 
            # wait until process p has finished to continue.  So, on the first loop it will 
            # pause until the first process finishes, etc.
            p.join()
        
        # Retrieve results from the queue and calculate the mean
        # NOTE: queue.get() simply pops data out of the queue.  This is very similar
        # to pop in lists.
        partial_sums = [self.queue.get() for p in processes]
        self.mean = sum(partial_sums)*self.V/self.M
    
    # Calculate the estimation of the variance using multiprocessing
    # NOTE: The logic is the same as the preceding function
    def mc_var(self):
        """
        A method to calculate the variance by parallel montecarlo.
        
        """        
        # Create a list of processes to run
        processes = [mp.Process(target=self.partial_var,
                                kwargs=dict(process=i))
                     for i in range(0, self.workers)]
        
        # Run the processes
        for p in processes:
            p.start()
        
        # When the processes are done, exit
        for p in processes:
            p.join()
        
        # Retrieve results from the queue and calculate the mean
        partial_sums = [self.queue.get() for p in processes]
        self.var = sum(partial_sums)*self.V**2/(self.M*(self.M - 1))

This code is quite long and takes a lot of thought on the first go. This is the price you pay when you parallelize your problem. The learning curve is steap and the logic can often be difficult. However, the steps are always the same and you can often reuse code. For instance, you never have to write a multiprocessor monte carlo again! This one works for any moment and any distribution! Well, I guess you need the distribution function, but come on.

Ok, Let's see how the program performs.


In [11]:
workers = 4
M = 1000
N = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T

test = cpu_parallel_monte_carlo(f, g, M, X, workers)

print(monte_carlo_expectation_serial(f, g, M, X))
print(monte_carlo_expectation_numpy(f_vec, g_vec, M, X))
print((test.mean, test.var))


(0.66089366292574248, 0.00018354183632094729)
(0.66855723391600896, 0.00016819194662196142)
(0.65451147105416585, 0.00017530210189413452)

Great! Our mean and variance estimates are the same in the serial, the numpy, and the multiprocessing application. Now let's time it.

First, let's see what happens with a single worker to see the overhead from sprouting workers:


In [24]:
workers = 4
M = 1000
N = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T

# We can also time these three side by side
%timeit monte_carlo_expectation_serial(f, g, M, X)
%timeit monte_carlo_expectation_numpy(f_vec, g_vec, M, X)
%timeit cpu_parallel_monte_carlo(f, g, M, X, workers)


10 loops, best of 3: 19.2 ms per loop
1000 loops, best of 3: 718 µs per loop
10 loops, best of 3: 51.8 ms per loop

The NumPy solution is still the clear winner, but notice that the muliprocessor solution is 3 to 10 times as slow as the plain serial implimentation. This could be caused by the overhead of defining the class. To abstract away from this, let's change our class to not calculate the mean and variance estimates initially, but to define them as methods. NOTE: When you do this you need to add a return statement to the funciton if you would like it to return anything, not simply calculate the value and add it as an attribute. This depends on your implimentaiton, but I like it here for checking the output in a concise way.


In [12]:
class cpu_parallel_monte_carlo:
    """
    A class containing CPU based parallelization of the serial monte carlo
    
    inputs:
        f    :    function; the moment to be estimated
        g    :    function; the distribution function of x
        M    :    scalar; the number of points to sample
        X    :    ndarray; bounds in R^n for the support
        
    """
    def __init__(self, f, g, M, X, workers):
        # Check if the number of operations is divisible by the number of workers
        if M%workers is not 0:
            raise ValueError('The vector of points must be evenly divisible'
                             + ' by the number of workers.')
        self.f = f
        self.g = g
        self.M = M
        self.X = X
        self.workers = workers
        self.queue = mp.Queue()
        self.N =  X.shape[0]
        self.points = np.random.rand(M, N)*(np.array([X[:, 1], ]*M) 
                                           - np.array([X[:, 0], ]*M))\
            +  np.array([X[:, 0], ]*M)
        self.V = np.prod(np.abs(X[:,1] - X[:,0]))
 
    def partial_sum(self, process):
        """
        A function that will calculate the partial sum for the monte carlo
        mean.
        
        inputs:
            process   :    int; the process number

        """
        # Unpack some info
        K = int(self.M/self.workers)
        partial_points = self.points[process*K:(process + 1)*K, :]
        
        # Calculate the sum as a loop
        sum1 = 0.0
        for i in range(0, K):
            sum1 += self.f(partial_points[i, :])*self.g(partial_points[i, :], self.X)
            
        # Return the calculated sum to the queue
        self.queue.put(sum1)
    
    def partial_var(self, process):
        """
        A function that will calculate the partial sum for
        the monte carlo variance.
        
        inputs:
            process   :    int; the process number

        """
        # Unpack some info
        K = int(self.M/self.workers)
        partial_points = self.points[process*K:(process + 1)*K, :]
        
        #Calculate the paritial sums as a loop
        var1 = 0.0
        sum1 = self.mean/self.V
        for i in range(0, K):
            var1 += (f(partial_points[i, :])*g(partial_points[i, :], X) - sum1)**2
        self.queue.put(var1)
    
    # Calculate the estimation of the mean using multiprocessing
    def mc_mean(self):
        """
        A method to calculate the mean by parallel montecarlo.
        
        """
        # Create a list of processes to run
        processes = [mp.Process(target=self.partial_sum,
                                kwargs=dict(process=i))
                     for i in range(0, self.workers)]
        
        # Run the processes
        for p in processes:
            p.start()
        
        # When the processes are done, exit
        for p in processes:
            p.join()
        
        # Retrieve results from the queue and calculate the mean
        partial_sums = [self.queue.get() for p in processes]
        self.mean = sum(partial_sums)*self.V/self.M
        return self.mean
    
    # Calculate the estimation of the variance using multiprocessing
    def mc_var(self):
        """
        A method to calculate the variance by parallel montecarlo.
        
        """        
        # Create a list of processes to run
        processes = [mp.Process(target=self.partial_var,
                                kwargs=dict(process=i))
                     for i in range(0, self.workers)]
        
        # Run the processes
        for p in processes:
            p.start()
        
        # When the processes are done, exit
        for p in processes:
            p.join()
        
        # Retrieve results from the queue and calculate the mean
        partial_sums = [self.queue.get() for p in processes]
        self.var = sum(partial_sums)*self.V**2/(self.M*(self.M - 1))
        return self.var

In [26]:
workers = 4
M = 100
N = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T

test = cpu_parallel_monte_carlo(f, g, M, X, workers)

# We can also time these three side by side
%timeit monte_carlo_expectation_serial(f, g, M, X)
%timeit monte_carlo_expectation_numpy(f_vec, g_vec, M, X)
%timeit (test.mc_mean(), test.mc_var())


100 loops, best of 3: 1.93 ms per loop
10000 loops, best of 3: 120 µs per loop
10 loops, best of 3: 42.4 ms per loop

This does not solve the problem and is a clear example of how much overhead is faced in running several Python instances simultaneously.

In terms of speed, let's see how the serial version compares to the parallel one:


In [27]:
workers = 4
for M in [100, 1000, 10000]:
    print("M = %s" %M)
    test = cpu_parallel_monte_carlo(f, g, M, X, workers)
    %timeit monte_carlo_expectation_serial(f, g, M, X)
    %timeit (test.mc_mean(), test.mc_var())
    del test


M = 100
The slowest run took 4.76 times longer than the fastest. This could mean that an intermediate result is being cached 
100 loops, best of 3: 1.92 ms per loop
10 loops, best of 3: 42.1 ms per loop
M = 1000
100 loops, best of 3: 19.2 ms per loop
10 loops, best of 3: 49.5 ms per loop
M = 10000
10 loops, best of 3: 192 ms per loop
10 loops, best of 3: 136 ms per loop

We got speed! Notice, though that we needed $M$ to be quite large to benefit. This is because we only have four workers and large overhead for getting them started. If you're computer is faster than mine, run the above up to 100000, or even 1000000 if you have a lot of memory!

Ok, so we've done the above using loops, but what if we use numpy instead, as we did before. The changes required are minimal:


In [13]:
class cpu_parallel_monte_carlo_numpy:
    """
    A class containing CPU based parallelization of the serial monte carlo
    
    inputs:
        f    :    function; the moment to be estimated
        g    :    function; the distribution function of x
        M    :    scalar; the number of points to sample
        X    :    ndarray; bounds in R^n for the support
        
    """
    # Define initial characteristics
    def __init__(self, f, g, M, X, workers):
        if M%workers is not 0:
            raise ValueError('The vector of points must be evenly divisible'
                             + ' by the number of workers.')
        self.f = f
        self.g = g
        self.M = M
        self.X = X
        self.workers = workers
        self.queue = mp.Queue()
        self.N =  X.shape[0]
        self.points = np.random.rand(M, N)*(np.array([X[:, 1], ]*M) 
                                           - np.array([X[:, 0], ]*M))\
            +  np.array([X[:, 0], ]*M)
        self.V = np.prod(np.abs(X[:,1] - X[:,0]))

    # Define functions that can be parallelized
    def partial_sum(self, process):
        """
        A function that will calculate the partial sum for the monte carlo
        mean.
        
        inputs:
            process   :    int; the process number

        """
        # Unpack some info
        K = int(self.M/self.workers)
        partial_points = self.points[process*K:(process + 1)*K, :]

        # Calculate the sum as a dot product
        self.queue.put(np.dot(self.f(partial_points), self.g(partial_points, X)))
    
    def partial_var(self, process):
        """
        A function that will calculate the partial sum for
        the monte carlo variance.
        
        inputs:
            process   :    int; the process number

        """
        # Unpack some info
        K = int(self.M/self.workers)
        partial_points = self.points[process*K:(process + 1)*K, :]
        
        # Calculate the variance
        self.queue.put(np.linalg.norm(self.f(partial_points)*self.g(partial_points, X) - self.mean/self.V)**2)

    def mc_mean(self):
        """
        A method to calculate the mean by parallel montecarlo.
        
        """
        # Create a list of processes to run
        processes = [mp.Process(target=self.partial_sum,
                                kwargs=dict(process=i))
                     for i in range(0, self.workers)]
        
        # Run the processes
        for p in processes:
            p.start()
        
        # When the processes are done, exit
        for p in processes:
            p.join()
        
        # Retrieve results from the queue and calculate the mean
        partial_sums = [self.queue.get() for p in processes]
        self.mean = sum(partial_sums)*self.V/self.M
        
        return self.mean
    
    def mc_var(self):
        """
        A method to calculate the variance by parallel montecarlo.
        
        """        
        # Create a list of processes to run
        processes = [mp.Process(target=self.partial_var,
                                kwargs=dict(process=i))
                     for i in range(0, self.workers)]
        
        # Run the processes
        for p in processes:
            p.start()
        
        # When the processes are done, exit
        for p in processes:
            p.join()
        
        # Retrieve results from the queue and calculate the mean
        partial_sums = [self.queue.get() for p in processes]
        self.var = sum(partial_sums)*self.V**2/(self.M*(self.M - 1))
        
        return self.var
    
    def output(self):
        """
        A method to calculate monte carlo integral and variance of estimate.
        """
        return (self.mc_mean(), self.mc_var())

We should again check that the answers are similar:


In [2]:
workers = 2
M = 1000
N = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T

test = cpu_parallel_monte_carlo(f, g, M, X, workers)
test_numpy = cpu_parallel_monte_carlo_numpy(f_vec, g_vec, M, X, workers)

print(monte_carlo_expectation_serial(f, g, M, X))
print(monte_carlo_expectation_numpy(f_vec, g_vec, M, X))
print((test.mc_mean(), test.mc_var()))
print(test_numpy.output())


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-2-f4ccdc876990> in <module>()
      2 M = 1000
      3 N = 2
----> 4 X = np.array((0*np.ones(N), 1*np.ones(N))).T
      5 
      6 test = cpu_parallel_monte_carlo(f, g, M, X, workers)

NameError: name 'np' is not defined

Super. Now we can look at speed and see how these four compare.


In [32]:
# NOTE: only see gains when M >> 100000
workers = 4
M = 100
N = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T

test = cpu_parallel_monte_carlo(f, g, M, X, workers)
test_numpy = cpu_parallel_monte_carlo_numpy(f_vec, g_vec, M, X, workers)

# We can also time these three side by side
%timeit monte_carlo_expectation_serial(f, g, M, X)
%timeit monte_carlo_expectation_numpy(f_vec, g_vec, M, X)
%timeit (test.mc_mean(), test.mc_var())
%timeit test_numpy.output()


1000 loops, best of 3: 1.91 ms per loop
10000 loops, best of 3: 119 µs per loop
10 loops, best of 3: 40.7 ms per loop
10 loops, best of 3: 39.9 ms per loop

Again, the numpy imlimentation is faster, but the parallel numpy version is WAYYY slower than the serial one, 35 times slower! Let's see how this does for higher values of $M$:


In [33]:
workers = 4
for M in [100, 1000, 10000]:
    print("M = %s" %M)
    test = cpu_parallel_monte_carlo(f, g, M, X, workers)
    test_numpy = cpu_parallel_monte_carlo_numpy(f_vec, g_vec, M, X, workers)
    %timeit monte_carlo_expectation_serial(f, g, M, X)
    %timeit monte_carlo_expectation_numpy(f_vec, g_vec, M, X)
    %timeit (test.mc_mean(), test.mc_var())
    %timeit test_numpy.output()
    del test
    del test_numpy


M = 100
100 loops, best of 3: 1.9 ms per loop
10000 loops, best of 3: 119 µs per loop
10 loops, best of 3: 41.3 ms per loop
10 loops, best of 3: 42.2 ms per loop
M = 1000
100 loops, best of 3: 19 ms per loop
1000 loops, best of 3: 727 µs per loop
10 loops, best of 3: 48.9 ms per loop
10 loops, best of 3: 40.3 ms per loop
M = 10000
10 loops, best of 3: 192 ms per loop
100 loops, best of 3: 6.6 ms per loop
10 loops, best of 3: 137 ms per loop
10 loops, best of 3: 40.8 ms per loop

As evidence of how efficient NumPy is, even at 10000 observations we still don't have a speed up. In fact, we won't get one until M >> 10000 (keep in mind this is different on different systems. In fact, on my home computer it takes M > 100000, while on the raspberry pi it is M > 15000. Not sure if that is a compliment for the raspberry pi or an embarassment for my desktop...):


In [31]:
workers = 4
M = 100000
test_numpy = cpu_parallel_monte_carlo_numpy(f_vec, g_vec, M, X, workers)
%timeit monte_carlo_expectation_numpy(f_vec, g_vec, M, X)
%timeit test_numpy.output()


10 loops, best of 3: 67.4 ms per loop
10 loops, best of 3: 45.9 ms per loop

That sums up our introduction to multiprocessing, but if you would like to learn more check out the documentation. There are neat functions like Pool() which automate much of what we did.

GPU Parallelization

Now that we've seen how multiprocessing can help us, what about GPU parallelization? In fact, what the heck is a GPU!?

A Graphics Processing Unit (GPU) is very similar to a CPU. When you play modern video games it takes a massive amount of calculation to render the grass blowing in the wind or explosions (this is a topic for a PhD dissertation). These calculations are often very similar and very simple, as well as highly parallelizable. Because of this, NVidia, a graphics card company, created a special processor designed simply to render computer graphics. They essentially took a CPU, removed everything that wasn't absolutely necessary for calculation, and packed it with cores. And more cores, and more cores...

<img src="cpu_v_gpu.jpg" width="500" /img> Source: Nvidia.com

A modern, bottom shelf graphics card from NVidia (like the one in my home computer) retails for around 100€ and contains 512 cores. 512 cores! Think about that: a core on a GPU costs less than 20 centimes. This is just the bottom shelf. For example I have space on my motherboard for three graphics cards and they can be wired up in parallel. So I can easily invest another 200€ euros and triply my computing speed. If we wanted to go to the top of the line NVidia scientific computing cards, we could get a Tesla K80, which offers 4992 cores PER CARD at 8.74 terraflops on single precission floating point arithmetic. That's 8.74 million million operations per second! To put this into context, in the year 2000 the fastest computer in the world was the IBM ASCI White... its peakspead was 7.226 terraflops. So a single GPU is faster than the fastest computer in the world was when we entered the new millenium. Wiring three of these bad boys up in parallel would create a desktop computer that could carry out 764 times the number of calculations per second as the IBM Deep Blue supercomputer that beat Gary Kasporov. However, the cost of the cores is higher, at about 84 centimes per core.

Nerding out completed, let's return to the task at hand. Actually working with a GPU can be quite difficult. Because they desire to cram all of these cores onto a single chip, the GPU is quite dumb. It can really only do very basic calculations and the CPU must do all of the higher level organizing. It is even necessary to communicate with the GPU in a special language called CUDA. Thankfully, there's a package for that.

PyCUDA is a package that automates the hard work of working with the GPU, but it is still necessary to do some data management and write a "kernel" in C. Don't be discouraged! You only have to do this for the most low level calculations and you can often just re-use someone else's kernel.

Threading

Possibly the most difficult part about using PyCUDA is keeping track of the threads. You could have hundreds or even thousands of threads running simultaneously. To keep thes all in place, there are several structures known as the grid and the blocks.

A grid is a two dimensional matrix of blocks.

A block is a three dimensional matrix of threads.

You can break up your work however you would like across these objects, but we will do the simplest thing. We will define a block that is simply a vector of length 512 (the number of cores on my GPU. You should change this on your computer.). We'll then define a grid as a vector of length $\frac{M}{512}$. In this way we will release $\frac{M}{512}$ waves of 512 threads.

Each one of these threads is indexed by its threadId's and blockId's. For our set up, there is only the x direction, but there could also be a y and z direction. Here's a short example of a CUDA kernal that carries out an elementwise multiplication and sotres the result in a:


In [14]:
mod = SourceModule("""
    __global__ void elementwise_multiply(float *a, float *b)
    {

        const int t = threadIdx.x + blockIdx.x * 512 ; //the thread identifier to reference the original index

        a[t] *= b[t];
        
    }

    """)

Note the line that calculates the index in the vectors based on the threadId and blockId. This is a typical way to find the index.

PyCUDA precompiles this code into a function, which you must retrieve. To run it, you have to carry out several steps:

  1. Define the function and thread structure.
  2. Allocate memory both on cpu and gpu.
  3. Transfer data to the gpu.
  4. Execute the function.
  5. Retrieve the solution.

Let's see all this in a function:


In [15]:
### DEFINE THE FUNCTION
# Define a function for the pairwise multiplicaiton
gpu_elementwise = mod.get_function("elementwise_multiply")

def super_fast_elementwise_multiply(a, b, NUMBER_OF_BLOCKS,
                                 THREADS_PER_BLOCK):
    """
    A GPU implementation of a elementwise multiplicaiton of two vectors.
    
    Inputs:
    
        a,b    :    ndarray

    """
    ### ALLOCATE MEMORY OIN THE CPU
    # Allocate memory to fill with solution
    c = np.zeros(a.shape[0]).astype(np.float32)
    
    ### ALLOCATE MEMORY ON THE GPU
    a_gpu = cuda.mem_alloc(a.nbytes)
    b_gpu = cuda.mem_alloc(b.nbytes)
    
    ### TRANSFER DATA TO THE DEVICE
    cuda.memcpy_htod(a_gpu, a)
    cuda.memcpy_htod(b_gpu, b)
    
    ### EXECUTE THE FUNCTION
    # Carry out the multiplication
    gpu_elementwise(a_gpu, b_gpu,
                    block=(THREADS_PER_BLOCK, 1, 1),
                    grid=(NUMBER_OF_BLOCKS, 1))
    
    ### RETRIEVE SOLUTION
    # Retrieve data from host
    cuda.memcpy_dtoh(c, a_gpu)
    
    # Free up the memory
    del a_gpu
    del b_gpu

    return c

# How big do you want your vectors?  They will be of length M*512
M = 10
M *= 512

### DEFINE THE THREAD STRUCTURE
# Define GPU size parameters
NUMBER_OF_BLOCKS = int(M/512)
THREADS_PER_BLOCK = 512

# Generate some data
A = np.ones((M)).astype(np.float32)*3
B = np.ones((M)).astype(np.float32)*2

# Calculate the elementwise multiplication
C = super_fast_elementwise_multiply(A, B, NUMBER_OF_BLOCKS,
                                 THREADS_PER_BLOCK)
print(C)
print(A*B)


[ 6.  6.  6. ...,  6.  6.  6.]
[ 6.  6.  6. ...,  6.  6.  6.]

In [12]:
#Success!  Now let's see how fast it is...
# I hate to say it, but you'll run out of memory
# before this speeds up your code!!!
for M in [1000, 10000, 100000, 200000]:
    M *= 512

    # Define GPU size parameters
    NUMBER_OF_BLOCKS = int(M/512)
    THREADS_PER_BLOCK = 512

    # Generate some data
    a = np.ones((M)).astype(np.float32)*3
    b = np.ones((M)).astype(np.float32)*2

    print("\nNumber of Elements: %s" % M)
    %timeit a*b
    %timeit super_fast_elementwise_multiply(a, b, NUMBER_OF_BLOCKS, THREADS_PER_BLOCK)
    
    # Free up memory
    del a
    del b


Number of Elements: 512000
1000 loops, best of 3: 426 µs per loop
100 loops, best of 3: 2.79 ms per loop

Number of Elements: 5120000
100 loops, best of 3: 5.76 ms per loop
100 loops, best of 3: 17 ms per loop

Number of Elements: 51200000
10 loops, best of 3: 59.7 ms per loop
10 loops, best of 3: 169 ms per loop

Number of Elements: 102400000
10 loops, best of 3: 121 ms per loop
1 loops, best of 3: 337 ms per loop

Sadly, this doesn't give us much of a speed up. This is caused by the high latency in data transfer to the GPU, as well as the efficiency of NumPy. Latency is synonomous with lag or sluggishness. When we transfer data to the GPU our calculation gets bogged down by the data transfer onto the device. In order to benefit, we need to do more calculation and less transfer.

One way we can take a step in this direction is if we wanted to do a dot product on the GPU. The CUDA kernel below uses a pairwise summation to calculate the dot product. It procedes in two steps:

  1. Carry out the elementwise multiplication. When done, __syncthreads(); tells CUDA to wait until all of the multiplications are terminated.

  2. Do a pairwise summation. When complete, use atomicAdd to add the result for the block to the total sum.

Note the atomicAdd. We have not discussed this, but transferring data across threads is complicated. However, since we are only adding to c, we don't have to worry too much. The atomic method just adds to the object no matter the block.


In [16]:
mod = SourceModule("""
    __global__ void dot_prod(float *a, float *b, float *c)
    {
        // Define a shared object to fill with the elementwise mult
        // NOTE: This can't be dynamic, so what I'll do is just fill
        // a with the solution.  That would allow for dynamic shape
        // NOTE: That won't work, as the pairwise sum is based on the
        // block level thread id's
        // ***** The number 512 must be changed to optimize on a specific
        // machine.  Set equal number of cores.
        __shared__ float temp[512];

        //the thread identifier to reference the original index
        const int t = threadIdx.x + blockIdx.x * blockDim.x ;

        //the number of elements in a block to pairwise sum
        int n = 256;
        temp[threadIdx.x] = a[t]*b[t];

        __syncthreads();

        //now pairwise sum the vector temp, store in temp, then output to c
        while (n != 0)
        {
            if (threadIdx.x < n)
                temp[threadIdx.x] += temp[threadIdx.x+n];
                __syncthreads();
            n /= 2;
        }

        //add the total to the sum only once per block
        if (0 == threadIdx.x)
            atomicAdd(c,temp[0]);
    }
    """)

In [17]:
# Define a function for the pairwise multiplicaiton
gpu_dot = mod.get_function("dot_prod")

def super_fast_dot_product(a, b, NUMBER_OF_BLOCKS, THREADS_PER_BLOCK):
    """
    A GPU implementation of a elementwise multiplicaiton of two vectors.
    
    Inputs:
    
        a,b    :    ndarray

    """
    # Allocate memory to fill with solution
    c = np.zeros(1).astype(np.float32)

    # Allocate memory on the gpu
    a_gpu = cuda.mem_alloc(a.nbytes)
    b_gpu = cuda.mem_alloc(b.nbytes)
    c_gpu = cuda.mem_alloc(c.nbytes)
    
    # Transfer data to the device
    cuda.memcpy_htod(a_gpu, a)
    cuda.memcpy_htod(b_gpu, b)
    cuda.memcpy_htod(c_gpu, c)
    
    # Carry out the multiplication
    gpu_dot(a_gpu, b_gpu, c_gpu,
                    block=(THREADS_PER_BLOCK, 1, 1),
                    grid=(NUMBER_OF_BLOCKS, 1))
    
    # Retrieve data from host
    cuda.memcpy_dtoh(c, c_gpu)
    
    return c

threads = 512
# How big do you want your vectors?  They will be of length M*512
M = 10
M *= threads

# Define GPU size parameters
NUMBER_OF_BLOCKS = int(M/threads)
THREADS_PER_BLOCK = threads

# Generate some data
A = np.ones((M)).astype(np.float32)*3
B = np.ones((M)).astype(np.float32)*2

# Calculate the elementwise multiplication
C = super_fast_dot_product(A, B, NUMBER_OF_BLOCKS,
                                 THREADS_PER_BLOCK)
print(C)
print(np.dot(A, B))


[ 30720.]
30720.0

In [39]:
#Success again!
# I hate to say it, but you'll run out of memory
# before this speeds up your code!!!
for M in [1000, 10000, 100000]:
    M *= 512

    # Define GPU size parameters
    NUMBER_OF_BLOCKS = int(M/512)
    THREADS_PER_BLOCK = 512

    # Generate some data
    a = np.ones((M)).astype(np.float32)*3
    b = np.ones((M)).astype(np.float32)*2

    print("\nNumber of Elements: %s" % M)
    %timeit np.dot(a,b)
    %timeit super_fast_dot_product(a, b, NUMBER_OF_BLOCKS, THREADS_PER_BLOCK)
    
    # Free up memory
    del a
    del b


Number of Elements: 512000
10000 loops, best of 3: 182 µs per loop
1000 loops, best of 3: 1.21 ms per loop

Number of Elements: 5120000
100 loops, best of 3: 3.5 ms per loop
100 loops, best of 3: 10.1 ms per loop

Number of Elements: 51200000
10 loops, best of 3: 33 ms per loop
10 loops, best of 3: 94.9 ms per loop

Again, we are still not getting the speed up we were hoping for. However, if we also can parallelize our moment or distribution function, we could get a speed up. Finally, let's write a kernel to speed up our monte carlo calculation.

NOTE: There is an error in this version that I have created while editing and can't seem to find... bonus points to anyone who can point it out!


In [7]:
mod = SourceModule(""""
    __global__ void sum1(float *a, float *b, float *c)
    {
        // Objects to store pairwise sums
        //__shared__ float temp2[%(MATRIX_SIZE)s];
        __shared__ float temp2[256];

        // 2D Thread ID to calculate L2 norm
        // index to entry in a
        const int ida = threadIdx.y + threadIdx.x*blockDim.y + blockIdx.x*blockDim.y*blockDim.x;

        // The block level id over matrix temp1
        //const int block_tx = threadIdx.y + threadIdx.x*blockDim.y;

        // Index to entry in b
        const int idb = threadIdx.x + blockIdx.x*blockDim.x;

        // Constants for pairwise sum
        int n = blockDim.x / 2; //the number of rows in a block to pairwise sum
        int k = blockDim.y / 2; //the number of columns in a block to pairwise sum

        // Square the entries in the sum
        a[ida] *= a[ida];

        // Carry out pairwise summation of squared entries along row
        // to calculate the L2 norm
        while (k != 0)
        {
            if (threadIdx.y < k)
                a[ida] += a[ida + k];
            __syncthreads();
            k /= 2;
        }

        // Carry out the elementwise multiplication
        // NOTE: only on left most row entry
        if (0 == threadIdx.y)
            temp2[threadIdx.x] = a[ida]*b[idb];

        __syncthreads();

        //now pairwise sum the vector temp2, store in temp2, then output to c
        while (n != 0)
        {
            if (threadIdx.x < n)
                if (0 == threadIdx.y)
                    temp2[threadIdx.x] += temp2[threadIdx.x+n];
            __syncthreads();
            n /= 2;
        }
        //add the total to the sum only once per block
        if (0 == threadIdx.x)
            if (0 == threadIdx.y)
                atomicAdd(c,temp2[0]);
    }

    """)

In [18]:
class GPU_parallel_L2_monte_carlo_numpy:
    """
    A class containing GPU based parallelization of monte carlo
    
    inputs:
        g    :    function; the distribution function of x
        M    :    scalar; the number of points to sample
        X    :    ndarray; bounds in R^n for the support
        cores:    scalar; number of cuda cores on gpu 
        
    """
    # Define initial characteristics
    def __init__(self, g, M, X, cores):
        # NOTE: This is no longer necessary.  We've incorporated it into the kernel
        #self.f = f
        # NOTE: We could get more speed from using the fact that
        # g is constant over x, but for 
        # comparability issues we'll keep this
        self.g = g
        self.M = M
        self.X = X
        self.N =  X.shape[0]
        self.points = np.random.rand(M, N)*(np.array([X[:, 1], ]*M) 
                                           - np.array([X[:, 0], ]*M))\
            +  np.array([X[:, 0], ]*M)
        self.V = np.prod(np.abs(X[:,1] - X[:,0]))
        
        # Define GPU size parameters
        self.cores = cores
        if self.cores % N is not 0:
            raise ValueError('The number of cores is not divisible by '
                             + 'the dimension N.')
        self.threadsX = int(self.cores/N)
        if self.M % self.threadsX is not 0:
            raise ValueError('The number of observations is not divisible '
                               + 'by the number of threads per block.')
        self.NUMBER_OF_BLOCKS = int(self.M/self.cores)
        self.THREADS_X_PER_BLOCK = self.threadsX
        self.THREADS_Y_PER_BLOCK = self.N

        # Define a string object containing the kernel
        kernel_code = """
            #include <stdio.h>
            __global__ void sum1(float *a, float *b, float *c)
            {
                // Objects to store pairwise sums
                __shared__ float temp2[256];
                
                // 2D Thread ID to calculate L2 norm
                // index to entry in a
                const int ida = threadIdx.y + threadIdx.x*blockDim.y + blockIdx.x*blockDim.y*blockDim.x;

                // Index to entry in b
                const int idb = threadIdx.x + blockIdx.x*blockDim.x;

                // Constants for pairwise sum
                int n = blockDim.x / 2; //the number of rows in a block to pairwise sum
                int k = blockDim.y / 2; //the number of columns in a block to pairwise sum

                // Square the entries in the sum
                a[ida] *= a[ida];

                // Carry out pairwise summation of squared entries along row
                // to calculate the L2 norm
                while (k != 0)
                {
                    if (threadIdx.y < k)
                        a[ida] += a[ida + k];
                    __syncthreads();
                    k /= 2;
                }

                // Carry out the elementwise multiplication
                // NOTE: only on left most row entry
                if (0 == threadIdx.y)
                    temp2[threadIdx.x] = a[ida]*b[idb];

                __syncthreads();

                //now pairwise sum the vector temp2, store in temp2, then output to c
                while (n != 0)
                {
                    if (threadIdx.x < n)
                        if (0 == threadIdx.y)
                            temp2[threadIdx.x] += temp2[threadIdx.x+n];
                    __syncthreads();
                    n /= 2;
                }
                //add the total to the sum only once per block
                if (0 == threadIdx.x)
                    if (0 == threadIdx.y)
                        atomicAdd(c,temp2[0]);
            }


            __global__ void var1(float *a, float *b, float *c, float *d)
            {
                // Objects to store pairwise sums
                //__shared__ float temp2[%(MATRIX_SIZE)s];
                __shared__ float temp2[256];

                // 2D Thread ID to calculate L2 norm
                // index to entry in a
                const int ida = threadIdx.y + threadIdx.x*blockDim.y + blockIdx.x*blockDim.y*blockDim.x;
                
                // Index to entry in b
                const int idb = threadIdx.x + blockIdx.x*blockDim.x;

                // Constants for pairwise sum
                int n = blockDim.x / 2; //the number of rows in a block to pairwise sum
                int k = blockDim.y / 2; //the number of columns in a block to pairwise sum

                // Square the entries in the sum
                a[ida] *= a[ida];

                // Carry out pairwise summation of squared entries along row
                // to calculate the L2 norm
                while (k != 0)
                {
                    if (threadIdx.y < k)
                        a[ida] += a[ida + k];
                    __syncthreads();
                    k /= 2;
                }

                // Carry out the elementwise multiplication
                // NOTE: only on left most row entry
                printf("%f \\n", d[0]);
                if (0 == threadIdx.y)
                    temp2[threadIdx.x] = a[ida]*b[idb];
                    temp2[threadIdx.x] -= d[0];
                    temp2[threadIdx.x] *= temp2[threadIdx.x];

                __syncthreads();

                //now pairwise sum the vector temp2, store in temp2, then output to c
                while (n != 0)
                {
                    if (threadIdx.x < n)
                        if (0 == threadIdx.y)
                            temp2[threadIdx.x] += temp2[threadIdx.x+n];
                    __syncthreads();
                    n /= 2;
                }

                //add the total to the sum only once per block
                if (0 == threadIdx.x)
                    if (0 == threadIdx.y)
                        atomicAdd(c,temp2[0]);
            }

            """
        # Pass to PyCUDA the size of the matrix
        #kernel_code = kernel_code % {
        #    'MATRIX_SIZE': self.THREADS_X_PER_BLOCK,
        #    }

        # compile the kernel code 
        mod = SourceModule(kernel_code)
        
        # Define a function for the parallel sum
        self.gpu_sum1 = mod.get_function("sum1")
        self.gpu_var1 = mod.get_function("var1")

    def mean1(self):
        """
        A GPU implementation of monte carlo mean.

        """
        # Calculate the probability of the observations in serial
        # NOTE: Even more speed up available here by parallelization
        self.g_of_points = self.g(self.points, self.X).astype(np.float32)
        
        # Allocate memory to fill with solution
        sum1 = np.zeros(1).astype(np.float32)

        
        # Allocate memory on the gpu
        a_gpu = cuda.mem_alloc(self.points.astype(np.float32).nbytes)
        b_gpu = cuda.mem_alloc(self.g_of_points.astype(np.float32).nbytes)
        c_gpu = cuda.mem_alloc(sum1.nbytes)

        # Transfer data to the device
        cuda.memcpy_htod(a_gpu, self.points.astype(np.float32))
        cuda.memcpy_htod(b_gpu, self.g_of_points.astype(np.float32))
        cuda.memcpy_htod(c_gpu, sum1)

        # Carry out the multiplication
        self.gpu_sum1(a_gpu, b_gpu, c_gpu,
                        block=(self.THREADS_X_PER_BLOCK,
                               self.THREADS_Y_PER_BLOCK, 1),
                        grid=(self.NUMBER_OF_BLOCKS, 1))

        # Retrieve data from host
        cuda.memcpy_dtoh(sum1, c_gpu)

        # Calculate the mean
        self.mean = sum1*self.V/self.M
        
        # Clean up memory
        del a_gpu, b_gpu, c_gpu
        
        return self.mean
    
    def var1(self):
        """
        A method to calculate the variance by gpu parallel montecarlo.
        
        """
        # Calculate the probability of the observations in serial
        # NOTE: Even more speed up available here by parallelization
        # NOTE: I saved this information from previous calculation
        #g_of_points = self.g(self.points, self.X)
        # Calculate parameter
        d = np.array((self.mean/self.V), dtype=np.float32)
        
        # Allocate memory to fill with solution
        var1 = np.zeros(1).astype(np.float32)

        # Allocate memory on the gpu
        a_gpu = cuda.mem_alloc(self.points.astype(np.float32).nbytes)
        b_gpu = cuda.mem_alloc(self.g_of_points.astype(np.float32).nbytes)
        c_gpu = cuda.mem_alloc(var1.nbytes)
        d_gpu = cuda.mem_alloc(d.nbytes)

        # Transfer data to the device
        cuda.memcpy_htod(a_gpu, self.points.astype(np.float32))
        cuda.memcpy_htod(b_gpu, self.g_of_points.astype(np.float32))
        cuda.memcpy_htod(c_gpu, var1)
        cuda.memcpy_htod(d_gpu, d)

        # Carry out the multiplication
        self.gpu_var1(a_gpu, b_gpu, c_gpu, d_gpu,
                        block=(self.THREADS_X_PER_BLOCK,
                               self.THREADS_Y_PER_BLOCK, 1),
                        grid=(self.NUMBER_OF_BLOCKS, 1))

        # Retrieve data from host
        cuda.memcpy_dtoh(var1, c_gpu)
        
        # Calculate the variance
        self.var = var1*self.V**2/(self.M*(self.M - 1))
        
        # Clean up memory
        del a_gpu, b_gpu, c_gpu, d_gpu
        return self.var
    
    def output(self):
        """
        A method to calculate monte carlo integral and variance of estimate.
        """
        return (self.mean1(), self.var1())


### Define parameterization
# Dimension of the random variable
N = 2
cores = 512
# #arameter controlling number of rows
# NOTE: defined this way so each block is completely rows; no rows
# are split across blocks
threadsX = int(cores/N)
# Parameter controlling number of observations
# They will be of length M*512
M = 5
M *= cores

# Define GPU size parameters
#NUMBER_OF_BLOCKS = int(M/threadsX)
#THREADS_X_PER_BLOCK = threadsX
#THREADS_Y_PER_BLOCK = N

# Generate some data
#A = np.ones((M)).astype(np.float32)*3
#B = np.ones((M)).astype(np.float32)*2

workers = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T


# Calculate the elementwise multiplication
test = GPU_parallel_L2_monte_carlo_numpy(g_vec, M, X, cores)
test_numpy = cpu_parallel_monte_carlo_numpy(f_vec, g_vec, M, X, workers)
print(test_numpy.output())
print(test.output())


(0.67277291379281345, 7.0744866043010118e-05)
(array([ 0.36519364], dtype=float32), array([ 0.00680973]))

As I said, while editing I introduced an error.

Ignoring my error, this function is actually even slower than before! Why? The reason has to do with the thread structure.

When we define our function over a two dimensional block of threads, we ignore the problem of idle threads. In particular, we initially use all of our threads in the calculation of the norm, but then only use a single column of threads to calculate the sum. This implies that after the norm is calculated we leave $512\frac{N-1}{N}$ threads idle. To address this we can split the two functions and redistribute the threads. We do this by de"fining two seperate functions on the GPU for the norm and the moments.


In [97]:
class GPU_parallel_L2_monte_carlo_block:
    """
    A class containing GPU based parallelization of monte carlo

    inputs:
        g    :    function; the distribution function of x
        M    :    scalar; the number of points to sample
        X    :    ndarray; bounds in R^n for the support
        cores:    scalar; number of cuda cores on gpu

    """
    # Define initial characteristics
    def __init__(self, g, M, X, cores):
        # NOTE: This is no longer necessary.  We've incorporated it into the kernel
        #self.f = f
        # NOTE: We could get more speed from using the fact that
        # g is constant over x, but for
        # comparability issues we'll keep this
        self.g = g
        self.M = M
        self.X = X
        self.N =  X.shape[0]
        self.points = np.random.rand(M, N)*(np.array([X[:, 1], ]*M)
                                           - np.array([X[:, 0], ]*M))\
            +  np.array([X[:, 0], ]*M)
        self.V = np.prod(np.abs(X[:,1] - X[:,0]))

        # Define GPU size parameters
        self.cores = cores
        if self.cores % self.N is not 0:
            raise ValueError('The number of cores is not divisible by '
                             + 'the dimension N.')
        self.threadsX = int(self.cores/self.N)
        if self.M % self.threadsX is not 0:
            raise ValueError('The number of observations is not divisible '
                               + 'by the number of threads per block.')
        self.NUMBER_OF_BLOCKS = int(self.M*self.N/self.cores)
        self.THREADS_X_PER_BLOCK = self.threadsX
        self.THREADS_Y_PER_BLOCK = self.N

        # Define a string object containing the kernel
        kernel_code = """
            #include <stdio.h>
            __global__ void L2_norm(float *a, float *b)
            {
                // 2D Thread ID to calculate L2 norm
                // index to entry in a
                const int ida = threadIdx.y + threadIdx.x*blockDim.y + blockIdx.x*blockDim.y*blockDim.x;
                // Index to entry in b
                const int idb = threadIdx.x + blockIdx.x*blockDim.x;

                int k = blockDim.y / 2; //the number of columns in a block to pairwise sum

                // Square the entries in the sum
                a[ida] *= a[ida];

                // Carry out pairwise summation of squared entries along row
                // to calculate the L2 norm
                while (k != 0)
                {
                    if (threadIdx.y < k)
                        a[ida] += a[ida + k];
                    __syncthreads();
                    k /= 2;
                }

                // Output to b
                if (0 == threadIdx.y)
                {
                    b[idb] = a[ida];
                }
            }

            __global__ void sum1(float *a, float *b, float *c)
            {
                // Object to store pairwise sum
                __shared__ float temp2[512];

                // Index to entry
                const int idb = threadIdx.x + blockIdx.x*blockDim.x;

                // Constants for pairwise sum
                int n = blockDim.x / 2; //the number of rows in a block to pairwise sum

                // Carry out the elementwise multiplication
                temp2[threadIdx.x] = a[idb]*b[idb];

                __syncthreads();

                //now pairwise sum the vector temp2, store in temp2, then output to c
                while (n != 0)
                {
                    if (threadIdx.x < n)
                        temp2[threadIdx.x] += temp2[threadIdx.x+n];
                    __syncthreads();
                    n /= 2;
                }
                //add the total to the sum only once per block
                if (0 == threadIdx.x)
                    atomicAdd(c,temp2[0]);
            }


            __global__ void var1(float *a, float *b, float *c, float *d)
            {
                // Objects to store pairwise sums
                __shared__ float temp2[512];

                // Index to entry in b
                const int idb = threadIdx.x + blockIdx.x*blockDim.x;

                // Constants for pairwise sum
                int n = blockDim.x / 2; //the number of rows in a block to pairwise sum

                // Carry out the elementwise multiplication
                // NOTE: only on left most row entry
                //printf("%f \\n", d[0]);
                if (0 == threadIdx.y)
                    temp2[threadIdx.x] = a[idb]*b[idb];
                    temp2[threadIdx.x] -= d[0];
                    temp2[threadIdx.x] *= temp2[threadIdx.x];

                __syncthreads();

                //now pairwise sum the vector temp2, store in temp2, then output to c
                while (n != 0)
                {
                    if (threadIdx.x < n)
                        temp2[threadIdx.x] += temp2[threadIdx.x+n];
                    __syncthreads();
                    n /= 2;
                }

                //add the total to the sum only once per block
                if (0 == threadIdx.x)
                    atomicAdd(c,temp2[0]);
            }

            """
        # Pass to PyCUDA the size of the matrix
        #kernel_code = kernel_code % {
        #    'MATRIX_SIZE': self.THREADS_X_PER_BLOCK,
        #    }

        # compile the kernel code
        mod = SourceModule(kernel_code)

        # Define a function for the parallel sum
        self.gpu_sum1 = mod.get_function("sum1")
        self.gpu_var1 = mod.get_function("var1")
        self.gpu_L2 = mod.get_function("L2_norm")

    def mean1(self):
        """
        A GPU implementation of monte carlo mean.

        """
        # Calculate the probability of the observations in serial
        # NOTE: Even more speed up available here by parallelization
        self.g_of_points = self.g(self.points, self.X).astype(np.float32)

        # Allocate memory to fill with solution
        sum1 = np.zeros(1).astype(np.float32)

        # Allocate memory on the gpu
        a_gpu = cuda.mem_alloc(self.points.astype(np.float32).nbytes)
        a_temp_gpu = cuda.mem_alloc(self.g_of_points.nbytes)
        b_gpu = cuda.mem_alloc(self.g_of_points.nbytes)
        c_gpu = cuda.mem_alloc(sum1.nbytes)

        # Transfer data to the device
        cuda.memcpy_htod(a_gpu, self.points.astype(np.float32))
        cuda.memcpy_htod(a_temp_gpu, np.zeros(self.M).astype(np.float32))
        cuda.memcpy_htod(b_gpu, self.g_of_points)
        cuda.memcpy_htod(c_gpu, sum1)

        # Carry out the multiplication
        self.gpu_L2(a_gpu, a_temp_gpu, block=(self.THREADS_X_PER_BLOCK,
                                              self.THREADS_Y_PER_BLOCK, 1),
                    grid=(self.NUMBER_OF_BLOCKS, 1))
        self.gpu_sum1(a_temp_gpu, b_gpu, c_gpu,
                      block=(self.cores, 1, 1),
                      grid=(int(self.NUMBER_OF_BLOCKS/self.N), 1))

        # Retrieve data from host
        cuda.memcpy_dtoh(sum1, c_gpu)

        # Calculate the mean
        self.mean = np.copy(sum1)*self.V/self.M

        # Clean up memory
        del a_gpu, a_temp_gpu, b_gpu, c_gpu

        return self.mean

    def var1(self):
        """
        A method to calculate the variance by gpu parallel montecarlo.

        """
        # Calculate the probability of the observations in serial
        # NOTE: Even more speed up available here by parallelization
        # NOTE: I saved this information from previous calculation
        #g_of_points = self.g(self.points, self.X)
        # Calculate parameter
        d = np.array((self.mean/self.V), dtype=np.float32)

        # Allocate memory to fill with solution
        var1 = np.zeros(1).astype(np.float32)

        # Allocate memory on the gpu
        a_gpu = cuda.mem_alloc(self.points.astype(np.float32).nbytes)
        a_temp_gpu = cuda.mem_alloc(self.g_of_points.astype(np.float32).nbytes)
        b_gpu = cuda.mem_alloc(self.g_of_points.astype(np.float32).nbytes)
        c_gpu = cuda.mem_alloc(var1.nbytes)
        d_gpu = cuda.mem_alloc(d.nbytes)

        # Transfer data to the device
        cuda.memcpy_htod(a_gpu, self.points.astype(np.float32))
        cuda.memcpy_htod(b_gpu, self.g_of_points.astype(np.float32))
        cuda.memcpy_htod(c_gpu, var1)
        cuda.memcpy_htod(d_gpu, d)

        # Carry out the multiplication
        self.gpu_L2(a_gpu, a_temp_gpu, block=(self.THREADS_X_PER_BLOCK,
                                              self.THREADS_Y_PER_BLOCK, 1),
                    grid=(self.NUMBER_OF_BLOCKS, 1))
        self.gpu_var1(a_temp_gpu, b_gpu, c_gpu, d_gpu,
                      block=(self.cores, 1, 1),
                      grid=(int(self.NUMBER_OF_BLOCKS/self.N), 1))

        # Retrieve data from host
        cuda.memcpy_dtoh(var1, c_gpu)

        # Calculate the variance
        self.var = np.copy(var1)*self.V**2/(self.M*(self.M - 1))

        # Clean up memory
        del a_gpu, b_gpu, c_gpu, d_gpu
        return self.var

    def output(self):
        """
        A method to calculate monte carlo integral and variance of estimate.
        """
        return (self.mean1(), self.var1())

In [98]:
### Define parameterization
# Dimension of the random variable
N = 2
cores = 512
# #arameter controlling number of rows
# NOTE: defined this way so each block is completely rows; no rows
# are split across blocks
threadsX = int(cores/N)
# Parameter controlling number of observations
# They will be of length M*512
M = 2
M *= threadsX

# Define GPU size parameters
#NUMBER_OF_BLOCKS = int(M/threadsX)
#THREADS_X_PER_BLOCK = threadsX
#THREADS_Y_PER_BLOCK = N

# Generate some data
#A = np.ones((M)).astype(np.float32)*3
#B = np.ones((M)).astype(np.float32)*2

workers = 2
X = np.array((0*np.ones(N), 1*np.ones(N))).T


# Calculate the elementwise multiplication
test = GPU_parallel_L2_monte_carlo_block(g_vec, M, X, cores)
test_numpy = cpu_parallel_monte_carlo_numpy(f_vec, g_vec, M, X, workers)
print(test_numpy.output())
print(test.output())


(0.65008526445444514, 0.00039038503054202115)
(array([ 0.66984808], dtype=float32), array([ 0.00034686]))

In [99]:
workers = 4
N = 2
cores = 512
threadsX = int(cores/N)
X = np.array((0*np.ones(N), 1*np.ones(N))).T

# NOTE: For optimization reasons, M >= N => #observations/#dimensions >=1
for M in [N, N*10, N*100, N*1000, N*10000]:
    M *= threadsX
    print("\n\nM = %s" % M)
    test_numpy = cpu_parallel_monte_carlo_numpy(f_vec, g_vec, M, X, workers)
    test_gpu = GPU_parallel_L2_monte_carlo_block(g_vec, M, X, cores)
    #print("SERIAL NATIVE")
    #%timeit monte_carlo_expectation_serial(f, g, M, X)
    #print("SERIAL NUMPY")
    #%timeit monte_carlo_expectation_numpy(f_vec, g_vec, M, X)
    print("CPU PARALLEL NUMPY")
    start = time.time()
    test_numpy.output()
    print("Executed in %s seconds \n" % (time.time() - start))
    print("GPU FULLY PARALLEL")
    start = time.time()
    test_gpu.output()
    print("Executed in %s seconds \n" % (time.time() - start))
    print(test_numpy.mean, test_gpu.mean)
    print(test_numpy.var, test_gpu.var)
    print(M,test_gpu.M*test_gpu.N, test_gpu.THREADS_X_PER_BLOCK*test_gpu.THREADS_Y_PER_BLOCK*test_gpu.NUMBER_OF_BLOCKS)
    print(test_gpu.THREADS_X_PER_BLOCK*test_gpu.THREADS_Y_PER_BLOCK, test_gpu.NUMBER_OF_BLOCKS)
    print(test_gpu.NUMBER_OF_BLOCKS, test_gpu.THREADS_X_PER_BLOCK, test_gpu.THREADS_Y_PER_BLOCK)
    print(test_numpy.M, test_gpu.M)
    print(test_numpy.V, test_gpu.V)
    del test_numpy
    del test_gpu



M = 512
CPU PARALLEL NUMPY
Executed in 0.050628662109375 seconds 

GPU FULLY PARALLEL
Executed in 0.0007331371307373047 seconds 

0.672066684165 [ 0.70166504]
0.000357086780394 [ 0.00034693]
512 1024 1024
512 2
2 256 2
512 512
1.0 1.0


M = 5120
CPU PARALLEL NUMPY
Executed in 0.04885745048522949 seconds 

GPU FULLY PARALLEL
Executed in 0.0011625289916992188 seconds 

0.664581233453 [ 0.70603484]
3.49983027665e-05 [  3.74081871e-05]
5120 10240 10240
512 20
20 256 2
5120 5120
1.0 1.0


M = 51200
CPU PARALLEL NUMPY
Executed in 0.051087379455566406 seconds 

GPU FULLY PARALLEL
Executed in 0.0031862258911132812 seconds 

0.668189490004 [ 0.71707338]
3.50057251041e-06 [  3.55969244e-06]
51200 102400 102400
512 200
200 256 2
51200 51200
1.0 1.0


M = 512000
CPU PARALLEL NUMPY
Executed in 0.08083868026733398 seconds 

GPU FULLY PARALLEL
Executed in 0.02205514907836914 seconds 

0.666878076079 [ 0.6936203]
3.47104518107e-07 [  3.53021536e-07]
512000 1024000 1024000
512 2000
2000 256 2
512000 512000
1.0 1.0


M = 5120000
CPU PARALLEL NUMPY
Executed in 0.3504774570465088 seconds 

GPU FULLY PARALLEL
Executed in 0.2295088768005371 seconds 

0.666835827599 [ 0.69409995]
3.47292112444e-08 [  3.53105614e-08]
5120000 10240000 10240000
512 20000
20000 256 2
5120000 5120000
1.0 1.0

Can we speed this up by leaving the L2 on the gpu? Let's try:


In [100]:
class GPU_parallel_L2_monte_carlo_block:
    """
    A class containing GPU based parallelization of monte carlo

    inputs:
        g    :    function; the distribution function of x
        M    :    scalar; the number of points to sample
        X    :    ndarray; bounds in R^n for the support
        cores:    scalar; number of cuda cores on gpu

    """
    # Define initial characteristics
    def __init__(self, g, M, X, cores):
        # NOTE: This is no longer necessary.  We've incorporated it into the kernel
        #self.f = f
        # NOTE: We could get more speed from using the fact that
        # g is constant over x, but for
        # comparability issues we'll keep this
        self.g = g
        self.M = M
        self.X = X
        self.N =  X.shape[0]
        self.points = np.random.rand(M, N)*(np.array([X[:, 1], ]*M)
                                           - np.array([X[:, 0], ]*M))\
            +  np.array([X[:, 0], ]*M)
        self.V = np.prod(np.abs(X[:,1] - X[:,0]))

        # Define GPU size parameters
        self.cores = cores
        if self.cores % self.N is not 0:
            raise ValueError('The number of cores is not divisible by '
                             + 'the dimension N.')
        self.threadsX = int(self.cores/self.N)
        if self.M % self.threadsX is not 0:
            raise ValueError('The number of observations is not divisible '
                               + 'by the number of threads per block.')
        self.NUMBER_OF_BLOCKS = int(self.M*self.N/self.cores)
        self.THREADS_X_PER_BLOCK = self.threadsX
        self.THREADS_Y_PER_BLOCK = self.N

        # Define a string object containing the kernel
        kernel_code = """
            #include <stdio.h>
            __global__ void L2_norm(float *a, float *b)
            {
                // 2D Thread ID to calculate L2 norm
                // index to entry in a
                const int ida = threadIdx.y + threadIdx.x*blockDim.y + blockIdx.x*blockDim.y*blockDim.x;
                // Index to entry in b
                const int idb = threadIdx.x + blockIdx.x*blockDim.x;

                int k = blockDim.y / 2; //the number of columns in a block to pairwise sum

                // Square the entries in the sum
                a[ida] *= a[ida];
                if (511 == ida)
                {
                    printf("%d, %d, %d, %d \\n", blockDim.y, blockDim.x, ida, idb);
                }
                // Carry out pairwise summation of squared entries along row
                // to calculate the L2 norm
                while (k != 0)
                {
                    if (threadIdx.y < k)
                        a[ida] += a[ida + k];
                    __syncthreads();
                    k /= 2;
                }

                // Output to b
                if (0 == threadIdx.y)
                {
                    b[idb] = a[ida];
                }
            }

            __global__ void sum1(float *a, float *b, float *c)
            {
                // Object to store pairwise sum
                __shared__ float temp2[512];

                // Index to entry
                const int idb = threadIdx.x + blockIdx.x*blockDim.x;

                // Constants for pairwise sum
                int n = blockDim.x / 2; //the number of rows in a block to pairwise sum

                // Carry out the elementwise multiplication
                temp2[threadIdx.x] = a[idb]*b[idb];

                __syncthreads();

                //now pairwise sum the vector temp2, store in temp2, then output to c
                while (n != 0)
                {
                    if (threadIdx.x < n)
                        temp2[threadIdx.x] += temp2[threadIdx.x+n];
                    __syncthreads();
                    n /= 2;
                }
                //add the total to the sum only once per block
                if (0 == threadIdx.x)
                    atomicAdd(c,temp2[0]);
            }


            __global__ void var1(float *a, float *b, float *c, float *d)
            {
                // Objects to store pairwise sums
                __shared__ float temp2[512];

                // Index to entry in b
                const int idb = threadIdx.x + blockIdx.x*blockDim.x;

                // Constants for pairwise sum
                int n = blockDim.x / 2; //the number of rows in a block to pairwise sum

                // Carry out the elementwise multiplication
                // NOTE: only on left most row entry
                //printf("%f \\n", d[0]);
                if (0 == threadIdx.y)
                    temp2[threadIdx.x] = a[idb]*b[idb];
                    temp2[threadIdx.x] -= d[0];
                    temp2[threadIdx.x] *= temp2[threadIdx.x];

                __syncthreads();

                //now pairwise sum the vector temp2, store in temp2, then output to c
                while (n != 0)
                {
                    if (threadIdx.x < n)
                        temp2[threadIdx.x] += temp2[threadIdx.x+n];
                    __syncthreads();
                    n /= 2;
                }

                //add the total to the sum only once per block
                if (0 == threadIdx.x)
                    atomicAdd(c,temp2[0]);
            }

            """
        # Pass to PyCUDA the size of the matrix
        #kernel_code = kernel_code % {
        #    'MATRIX_SIZE': self.THREADS_X_PER_BLOCK,
        #    }

        # compile the kernel code
        mod = SourceModule(kernel_code)

        # Define a function for the parallel sum
        self.gpu_sum1 = mod.get_function("sum1")
        self.gpu_var1 = mod.get_function("var1")
        self.gpu_L2 = mod.get_function("L2_norm")

    def estimate(self):
        """
        A GPU implementation of monte carlo mean.

        """
        # Calculate the probability of the observations in serial
        # NOTE: Even more speed up available here by parallelization
        self.g_of_points = self.g(self.points, self.X).astype(np.float32)

        # Allocate memory to fill with solution
        sum1 = np.zeros(1).astype(np.float32)

        # Allocate memory on the gpu
        a_gpu = cuda.mem_alloc(self.points.astype(np.float32).nbytes)
        a_temp_gpu = cuda.mem_alloc(self.g_of_points.astype(np.float32).nbytes)
        b_gpu = cuda.mem_alloc(self.g_of_points.astype(np.float32).nbytes)
        c_gpu = cuda.mem_alloc(sum1.nbytes)

        # Transfer data to the device
        cuda.memcpy_htod(a_gpu, self.points.astype(np.float32))
        cuda.memcpy_htod(a_temp_gpu, np.zeros(self.M).astype(np.float32))
        cuda.memcpy_htod(b_gpu, self.g_of_points)
        cuda.memcpy_htod(c_gpu, sum1)

        # Carry out the multiplication
        self.gpu_L2(a_gpu, a_temp_gpu, block=(self.THREADS_X_PER_BLOCK,
                                              self.THREADS_Y_PER_BLOCK, 1),
                    grid=(self.NUMBER_OF_BLOCKS, 1))
        self.gpu_sum1(a_temp_gpu, b_gpu, c_gpu,
                      block=(self.cores, 1, 1),
                      grid=(int(self.NUMBER_OF_BLOCKS/self.N), 1))

        # Retrieve data from host
        cuda.memcpy_dtoh(sum1, c_gpu)

        # Calculate the mean
        self.mean = sum1*self.V/self.M

        # Clean up memory
        del a_gpu, c_gpu

        # Calculate the probability of the observations in serial
        # NOTE: Even more speed up available here by parallelization
        # NOTE: I saved this information from previous calculation
        #g_of_points = self.g(self.points, self.X)
        # Calculate parameter
        d = np.array((self.mean/self.V), dtype=np.float32)

        # Allocate memory to fill with solution
        var1 = np.zeros(1).astype(np.float32)

        # Allocate memory on the gpu
        #a_gpu = cuda.mem_alloc(self.points.astype(np.float32).nbytes)
        #a_temp_gpu = cuda.mem_alloc(self.g_of_points.astype(np.float32).nbytes)
        b_gpu = cuda.mem_alloc(self.g_of_points.astype(np.float32).nbytes)
        c_gpu = cuda.mem_alloc(var1.nbytes)
        d_gpu = cuda.mem_alloc(d.nbytes)

        # Transfer data to the device
        #cuda.memcpy_htod(a_gpu, self.points.astype(np.float32))
        cuda.memcpy_htod(b_gpu, self.g_of_points.astype(np.float32))
        cuda.memcpy_htod(c_gpu, var1)
        cuda.memcpy_htod(d_gpu, d)

        # Carry out the multiplication
        #self.gpu_L2(a_gpu, a_temp_gpu, block=(self.THREADS_X_PER_BLOCK,
        #                                      self.THREADS_Y_PER_BLOCK, 1),
        #            grid=(self.NUMBER_OF_BLOCKS, 1))
        self.gpu_var1(a_temp_gpu, b_gpu, c_gpu, d_gpu,
                      block=(self.cores, 1, 1),
                      grid=(int(self.NUMBER_OF_BLOCKS/self.N), 1))

        # Retrieve data from host
        cuda.memcpy_dtoh(var1, c_gpu)

        # Calculate the variance
        self.var = var1*self.V**2/(self.M*(self.M - 1))

        # Clean up memory
        del b_gpu, c_gpu, d_gpu
        return self.mean, self.var

    def output(self):
        """
        A method to calculate monte carlo integral and variance of estimate.
        """
        return (self.estimate())

In [101]:
workers = 4
N = 8
cores = 512
threadsX = int(cores/N)
X = np.array((0*np.ones(N), 1*np.ones(N))).T

# NOTE: For optimization reasons, M >= N => #observations/#dimensions >=1
for M in [N, N*10, N*100, N*1000, N*10000]:
    M *= threadsX
    print("\n\nM = %s" % M)
    test_numpy = cpu_parallel_monte_carlo_numpy(f_vec, g_vec, M, X, workers)
    test_gpu = GPU_parallel_L2_monte_carlo_block(g_vec, M, X, cores)
    #print("SERIAL NATIVE")
    #%timeit monte_carlo_expectation_serial(f, g, M, X)
    #print("SERIAL NUMPY")
    #%timeit monte_carlo_expectation_numpy(f_vec, g_vec, M, X)
    print("CPU PARALLEL NUMPY")
    start = time.time()
    test_numpy.output()
    print("Executed in %s seconds \n" % (time.time() - start))
    print("GPU FULLY PARALLEL")
    start = time.time()
    test_gpu.output()
    print("Executed in %s seconds \n" % (time.time() - start))
    print(test_numpy.mean, test_gpu.mean)
    print(test_numpy.var, test_gpu.var)
    del test_numpy
    del test_gpu



M = 512
CPU PARALLEL NUMPY
Executed in 0.045313358306884766 seconds 

GPU FULLY PARALLEL
Executed in 0.0010352134704589844 seconds 

2.65155342787 [ 2.84231591]
0.00143525759458 [ 0.00144499]


M = 5120
CPU PARALLEL NUMPY
Executed in 0.052376508712768555 seconds 

GPU FULLY PARALLEL
Executed in 0.0014107227325439453 seconds 

2.67685436261 [ 2.83520079]
0.000139364237776 [ 0.00014605]


M = 51200
CPU PARALLEL NUMPY
Executed in 0.05659008026123047 seconds 

GPU FULLY PARALLEL
Executed in 0.005671262741088867 seconds 

2.67080567941 [ 2.87825203]
1.39140496787e-05 [  1.47154911e-05]


M = 512000
CPU PARALLEL NUMPY
Executed in 0.10376501083374023 seconds 

GPU FULLY PARALLEL
Executed in 0.040251731872558594 seconds 

2.66728599415 [ 2.85387769]
1.38747953048e-06 [  1.45752321e-06]


M = 5120000
CPU PARALLEL NUMPY
Executed in 0.6914644241333008 seconds 

GPU FULLY PARALLEL
Executed in 0.3931279182434082 seconds 

2.66672165535 [ 2.8569416]
1.38968790511e-07 [  1.45831385e-07]

Indeed, calculating the L2 norm slows us down. We get an order of magnitude faster for lower values of M and double the speed for M = 5120000.

An issue here is that the solution is diverging rather than converging. There must be a mistake somewhere in the code, but alas, I'm lost as to where it is.

A final question to ask is whether the GPU solution beats NumPy. Let's check.


In [103]:
N = 8
cores = 512
threadsX = int(cores/N)
X = np.array((0*np.ones(N), 1*np.ones(N))).T

# NOTE: For optimization reasons, M >= N => #observations/#dimensions >=1
for M in [N, N*10, N*100, N*1000, N*10000]:
    M *= threadsX
    print("\n\nM = %s" % M)
    test_gpu = GPU_parallel_L2_monte_carlo_block(g_vec, M, X, cores)
    #print("SERIAL NATIVE")
    #%timeit monte_carlo_expectation_serial(f, g, M, X)
    #print("SERIAL NUMPY")
    #%timeit monte_carlo_expectation_numpy(f_vec, g_vec, M, X)
    print("PURE NUMPY")
    start = time.time()
    test_numpy = monte_carlo_expectation_numpy(f_vec, g_vec, M, X)
    print("Executed in %s seconds \n" % (time.time() - start))
    print("GPU FULLY PARALLEL")
    start = time.time()
    test_gpu.output()
    print("Executed in %s seconds \n" % (time.time() - start))
    print(test_numpy[0], test_gpu.mean)
    print(test_numpy[1], test_gpu.var)
    del test_numpy
    del test_gpu



M = 512
PURE NUMPY
Executed in 0.0007174015045166016 seconds 

GPU FULLY PARALLEL
Executed in 0.0007171630859375 seconds 

2.72271814003 [ 2.93542123]
0.00129062974425 [ 0.00135468]


M = 5120
PURE NUMPY
Executed in 0.00506591796875 seconds 

GPU FULLY PARALLEL
Executed in 0.0011417865753173828 seconds 

2.66298405365 [ 2.8684392]
0.000140959950759 [ 0.00014492]


M = 51200
PURE NUMPY
Executed in 0.05710792541503906 seconds 

GPU FULLY PARALLEL
Executed in 0.005090236663818359 seconds 

2.66582038948 [ 2.87064481]
1.38518394922e-05 [  1.46132507e-05]


M = 512000
PURE NUMPY
Executed in 0.5198211669921875 seconds 

GPU FULLY PARALLEL
Executed in 0.04703259468078613 seconds 

2.66729774271 [ 2.85724609]
1.38971480731e-06 [  1.45909749e-06]


M = 5120000
PURE NUMPY
Executed in 5.03239107131958 seconds 

GPU FULLY PARALLEL
Executed in 0.39262962341308594 seconds 

2.66706278055 [ 2.8562916]
1.38787050164e-07 [  1.45985689e-07]

Yes! For high values of N (ie >> 2) we do indeed get speed.

Conclusion

This class saw applications of Python, NumPy, Numba, multiprocessing, and GPU multithreading for increasing speed. Hopefully you should be familiar with how these work and the complexity of the approach for each. I hope you enjoyed the class and that in the future you consider Python as a part of your own toolbox for numerical problem solving.