Bo Zhang (NAOC, mailto:bozhang@nao.cas.cn) will have a few lessons on python.

  • These are very useful knowledge, skills and code styles when you use python to process astronomical data.
  • All materials can be found on my github page.
  • jupyter notebook (formerly named ipython notebook) is recommeded to use

These lectures are organized as below:

  1. install python
  2. basic syntax
  3. numerical computing
  4. scientific computing
  5. plotting
  6. astronomical data processing
  7. high performance computing
  8. version control


  1. test your code BEFORE you do ANY optimization!
  2. find the bottleneck of your code (ps: learn to use profiler to find the bottleneck)
  3. use tricks, experience to optimize code
  4. use as many computing resources as possible

    1. parallel computing in multi-CPU/core computer (multiprocessing, ...)

    2. run code on multi-node computer cluster (PBS, ...)

some simple principles for optimization

1. memory vs. speed
2. vectorization
3. type check
4. parallel    

recommended packages

1. numexpr
2. Cython
- parallel
    1. multiprocessing (standard library)
    2. ipcluster/ipyparallel (support PBS)

further reading

  1. Parallel Programming with Python
  2. Python High performance Programming
  3. Learning Cython Programming

Parallel computing

  • threads: shared memory, involves locks
  • processes: isolated memory for each process, inter-process communication is less efficient
    • the easiest way to do parallel computing: embarassingly parallel (no inter-process communication), which is the case we met most often

Monte Carlo approximation for $\pi$

In [1]:
%pylab inline

Populating the interactive namespace from numpy and matplotlib

In [2]:
# with plt.xkcd():
fig = plt.figure(figsize=(10, 10))
ax = plt.axes(frameon=False)
circle = plt.Circle((0.,0.), 1., color='w', fill=False)
rect = plt.Rectangle((-1,-1), 2, 2, color='gray')
plt.arrow(-2., 0., 3.3, 0., head_width=0.1, head_length=0.2)
plt.arrow(0., -2., 0., 3.3, head_width=0.1, head_length=0.2)
randx = np.random.uniform(-1, 1, (100,))
randy = np.random.uniform(-1, 1, (100,))
plot(randx, randy, 'kx')

plt.text(-1.3, -0.1, '(-1, 0)', fontsize=20)
plt.text( 1.1, -0.1, '(+1, 0)', fontsize=20)
plt.text( 0.1,  1.1, '(0, +1)', fontsize=20)
plt.text( 0.1, -1.1, '(0, -1)', fontsize=20);

import random
samples = 1E5
hits = 0

for i in range(int(samples)):
    x = random.uniform(-1.0, 1.0)
    y = random.uniform(-1.0, 1.0)
    if x**2 + y**2 <= 1.0:
        hits += 1
pi = 4.0*hits/samples
print pi

CPU times: user 92 ms, sys: 40 ms, total: 132 ms
Wall time: 100 ms

DO NOTICE , this is extremely SLOW!

import multiprocessing

def sample():
    x = random.uniform(-1.0, 1.0)
    y = random.uniform(-1.0, 1.0)
    if x**2 + y**2 <= 1.0:
        return 1
        return 0

pool = multiprocessing.Pool()
results_async = [pool.apply_async(sample) for i in range(int(samples))]
hits = sum(r.get() for r in results_async)

pi = 4.0*hits/samples
print pi

CPU times: user 5.05 s, sys: 1.9 s, total: 6.95 s
Wall time: 4.92 s

import multiprocessing

def sample_multiple(samples_partial):
    return sum(sample() for i in range(samples_partial))

ntasks = 10
chunk_size = int(samples/ntasks)

pool = multiprocessing.Pool()
results_async = [pool.apply_async(sample_multiple, [chunk_size]) for i in range(ntasks)]
hits = sum(r.get() for r in results_async)

pi = 4.0*hits/samples
print pi

CPU times: user 40 ms, sys: 88 ms, total: 128 ms
Wall time: 153 ms

IPython parallel

IPython's power is not limited to its advanced shell. Its parallel package includes a framework to setup and run calculations on single and multi-core machines, as well as on multiple nodes connected to a network. IPython is great because it gives an interactive twist to parallel computing and provides a common interface to different communication protocols.

  • how to start engines?
    • type $ ipcluster start -n 12 in terminal to start a 12-engine
  • how to use engines?
    • direct view (specify tasks for engines!)
    • task-base view (load balanced)

Direct interface

# to creat an instance of Client, import Client from IPython.parallel
from IPython.parallel import Client
# from ipyparallel import Client

/usr/local/lib/python2.7/dist-packages/IPython/parallel.py:13: ShimWarning: The `IPython.parallel` package has been deprecated. You should import from ipyparallel instead.
  "You should import from ipyparallel instead.", ShimWarning)

#ipcluster start -n 12

rc = Client() # creat an Client instance

In [9]:
rc.ids # show IDs of each engine

[0, 1, 2, 3, 4, 5, 6, 7]

In [10]:
dview = rc[0] # select the first engine

<DirectView 0>

In [11]:
dview = rc[::2] # select every other engine

<DirectView [0, 2, 4, 6]>

In [12]:
dview = rc[:] # select all engines

<DirectView [0, 1, 2, 3,...]>

In [13]:
dview.execute('a = 1')

<AsyncResult: execute>

In [14]:
dview.pull('a').get() # equivalent to dview['a']

[1, 1, 1, 1, 1, 1, 1, 1]

In [15]:
dview.push({'a':2}) # equivbalent to dview['a'] = 2

<AsyncResult: _push>

[2, 2, 2, 2, 2, 2, 2, 2]

In [17]:
res = dview.execute('a = T_T') # got error

In [18]:

NameErrorTraceback (most recent call last)<ipython-input-1-5dd6c0744106> in <module>()
----> 1 a = T_T
NameError: name 'T_T' is not defined

NameErrorTraceback (most recent call last)<ipython-input-1-5dd6c0744106> in <module>()
----> 1 a = T_T
NameError: name 'T_T' is not defined

NameErrorTraceback (most recent call last)<ipython-input-1-5dd6c0744106> in <module>()
----> 1 a = T_T
NameError: name 'T_T' is not defined

NameErrorTraceback (most recent call last)<ipython-input-1-5dd6c0744106> in <module>()
----> 1 a = T_T
NameError: name 'T_T' is not defined

... 4 more exceptions ...

res = dview.execute('b = a+1')

In [20]:

[3, 3, 3, 3, 3, 3, 3, 3]

In [21]:
res = dview.execute('b = b+1')

In [22]:

[4, 4, 4, 4, 4, 4, 4, 4]

Engines should be treated as independent IPython sessions, and imports and custom-defined functions must be synchronized over the network. To import some libraries, both locally and in the engines, you can use the DirectView.sync_imports context manager:

with dview.sync_imports():
    import numpy
    # the syntax import _ as _ is not supported

importing numpy on engine(s)


In [24]:
a = range(100)

In [25]:
def square(x):
    return x*x

In [26]:
results_async = dview.map_async(square, a)
print  results_async.get()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]

parallel decorator

def square(x):
    return x * x

In [28]:
print square.map(range(100)).get()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]


  • executed on every engine

def square(x):
    return x*x

In [30]:
result_async = dview.apply(square, 2)

In [31]:

[4, 4, 4, 4, 4, 4, 4, 4]

scatter & gather

dview.scatter('a', [0, 1, 2, 3])

<AsyncResult: scatter>

In [33]:
print dview['a']

[[0], [1], [2], [3], [], [], [], []]

In [34]:
dview.scatter('a', np.arange(16))

<AsyncResult: scatter>

In [35]:
print dview['a']

[array([0, 1]), array([2, 3]), array([4, 5]), array([6, 7]), array([8, 9]), array([10, 11]), array([12, 13]), array([14, 15])]

In [36]:
dview.execute('a = a**2')

<AsyncResult: execute>

In [37]:
print dview['a']

[array([0, 1]), array([4, 9]), array([16, 25]), array([36, 49]), array([64, 81]), array([100, 121]), array([144, 169]), array([196, 225])]

In [38]:

array([  0,   1,   4,   9,  16,  25,  36,  49,  64,  81, 100, 121, 144,
       169, 196, 225])

task-based interface (load balanced)

from IPython.parallel import Client

rc = Client()

In [41]:
tview = rc.load_balanced_view()

In [42]:
def square(x):
    return x * x

In [43]:
dview.apply(square, 2) # executes in every engine!

<AsyncResult: square>

In [44]:
tview.apply(square, 2).get() # executed in ONLY 1 engine


tview.apply(square, np.arange(10)).get()

array([ 0,  1,  4,  9, 16, 25, 36, 49, 64, 81])

Run the Monte-Carlo for$\pi$ on IPython cluster

1. using @dview.parallel() decorator

def sample():    
    x = numpy.random.uniform(-1.0, 1.0)
    y = numpy.random.uniform(-1.0, 1.0)
    if x**2 + y**2 <= 1.0:
        return 1
        return 0

def sample_multiple(samples_partial):
    return sum(sample() for i in range(samples_partial))

samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)

results = sample_multiple.map([chunk_size for i in range(ntasks)]) # ntask determines the # of processes-->10

CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 9.3 ms

print 'pi: ', sum(results.get())/np.double(samples)*4.

[Engine Exception]
NameErrorTraceback (most recent call last)<string> in <module>()
<ipython-input-46-3c136ff51af4> in sample_multiple(samples_partial)
<ipython-input-46-3c136ff51af4> in <genexpr>((i,))
NameError: global name 'sample' is not defined

[Engine Exception]
NameErrorTraceback (most recent call last)<string> in <module>()
<ipython-input-46-3c136ff51af4> in sample_multiple(samples_partial)
<ipython-input-46-3c136ff51af4> in <genexpr>((i,))
NameError: global name 'sample' is not defined

[Engine Exception]
NameErrorTraceback (most recent call last)<string> in <module>()
<ipython-input-46-3c136ff51af4> in sample_multiple(samples_partial)
<ipython-input-46-3c136ff51af4> in <genexpr>((i,))
NameError: global name 'sample' is not defined

[Engine Exception]
NameErrorTraceback (most recent call last)<string> in <module>()
<ipython-input-46-3c136ff51af4> in sample_multiple(samples_partial)
<ipython-input-46-3c136ff51af4> in <genexpr>((i,))
NameError: global name 'sample' is not defined

... 4 more exceptions ...

2. using direct/task-based interface

def sample():    
    x = numpy.random.uniform(-1.0, 1.0)
    y = numpy.random.uniform(-1.0, 1.0)
    if x**2 + y**2 <= 1.0:
        return 1
        return 0

def sample_multiple(samples_partial):
    return sum(sample() for i in range(samples_partial))

dview.push({'sample':sample, 'sample_multiple':sample_multiple})

<AsyncResult: _push>

executed in every engin!

samples = int(1E8)
ntasks = len(dview)
chunk_size = int(samples/ntasks)

dview.block = True
results = dview.map(sample_multiple, [chunk_size for i in range(ntasks)])
# task should be evenly splited on every engine

CPU times: user 32 ms, sys: 4 ms, total: 36 ms
Wall time: 20.2 s

print 'pi: ', sum(results)/np.double(samples)*4.

pi:  3.14196252

2.2 tview.map (load balanced)

samples = int(1E8)
ntasks = len(tview)
chunk_size = int(samples/ntasks)

tview.block = True
results = tview.map(sample_multiple, [chunk_size for i in range(ntasks)])
# task should be evenly splited on every engine

CPU times: user 28 ms, sys: 4 ms, total: 32 ms
Wall time: 18.6 s

print 'pi: ', sum(results)/np.double(samples)*4.

pi:  3.1416446

2.3 dview.map_async

samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)

dview.block = False
results_async = dview.map_async(sample_multiple, 
                                [chunk_size for i in range(ntasks)])

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 9.09 ms

print results_async.ready()


print 'pi: ', sum(results_async.get())/np.double(samples)*4.

pi:  3.14170044

print results_async.ready()


2.4 tview.map_async

samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)

tview.block = False
results_async = tview.map_async(sample_multiple, 
                                [chunk_size for i in range(ntasks)]) #determines the tasks for each engine

# print 'pi: ', sum(results_async.get())/np.double(samples)*4.

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 10.6 ms

print results_async.ready()


print 'pi: ', sum(results_async.get())/np.double(samples)*4.

pi:  3.141516

print results_async.ready()


2.5 dview.apply

samples = 1E8
ntasks = len(dview)
chunk_size = int(samples/ntasks)

dview.block = True
results = dview.apply(sample_multiple, chunk_size)

CPU times: user 24 ms, sys: 0 ns, total: 24 ms
Wall time: 21 s

print 'pi: ', sum(results)/np.double(samples)*4.

pi:  3.14160196

2.6 tview.apply (single engine execution!)

samples = 1E8
ntasks = len(tview)
chunk_size = int(samples/ntasks)

dview.block = True
results = tview.apply(sample_multiple, chunk_size)

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 568 µs

print 'pi: ', sum(results.get())/np.double(samples)*4.

pi:  0.39274236

print 'pi: ', sum(results.get())/np.double(samples)*4.*ntasks

pi:  3.14193888

In [68]:
samples = 1E8
ntasks = 50
chunk_size = int(samples/ntasks)

dview.scatter('chunk_size', [chunk_size for i in range(ntasks)])
dview.scatter('sum_sample', [0 for i in range(ntasks)])

In [70]:
for cz in dview['chunk_size']:
    print cz

[2000000, 2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]

[<function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>]

dview.execute('sum_sample = [sample_multiple(chunk_size_) for chunk_size_ in chunk_size]')

<AsyncResult: execute:finished>

In [73]:

[[1570762, 1571220, 1570577, 1569768, 1571206, 1570366, 1570209],
 [1570821, 1571711, 1569952, 1571582, 1571127, 1571310, 1572184],
 [1570133, 1570223, 1571538, 1571680, 1572032, 1570745],
 [1570163, 1571448, 1571191, 1570501, 1570848, 1571122],
 [1570817, 1570981, 1571109, 1570527, 1571422, 1571518],
 [1569734, 1571547, 1572102, 1570127, 1570073, 1570573],
 [1571102, 1570951, 1569925, 1571305, 1571198, 1569940],
 [1569893, 1569968, 1570509, 1570992, 1571428, 1569348]]

can be used to block the async results

samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)

dview.block = False
results_async = dview.map_async(sample_multiple, 
                                [chunk_size for i in range(ntasks)])

CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 11 ms

print 'pi: ', sum(results_async.get())/np.double(samples)*4.

pi:  3.14160688

open qtconsole to engines?

dview = rc[::4]

<AsyncResult: execute>

Discussion: how to further improve the speed?

  • 1.
  • 2.

