Welcome to my lessons


Bo Zhang (NAOC, mailto:bozhang@nao.cas.cn) will have a few lessons on python.

  • These are very useful knowledge, skills and code styles when you use python to process astronomical data.
  • All materials can be found on my github page.
  • jupyter notebook (formerly named ipython notebook) is recommeded to use

These lectures are organized as below:

  1. install python
  2. basic syntax
  3. numerical computing
  4. scientific computing
  5. plotting
  6. astronomical data processing
  7. high performance computing
  8. version control

flowchart

  1. test your code BEFORE you do ANY optimization!
  2. find the bottleneck of your code (ps: learn to use profiler to find the bottleneck)
  3. use tricks, experience to optimize code
  4. use as many computing resources as possible

    1. parallel computing in multi-CPU/core computer (multiprocessing, ...)

    2. run code on multi-node computer cluster (PBS, ...)

some simple principles for optimization

1. memory vs. speed
2. vectorization
3. type check
4. parallel    

recommended packages

1. numexpr
2. Cython
- parallel
    1. multiprocessing (standard library)
    2. ipcluster/ipyparallel (support PBS)

further reading

  1. Parallel Programming with Python
  2. Python High performance Programming
  3. Learning Cython Programming

In [ ]:

Parallel computing

  • threads: shared memory, involves locks
  • processes: isolated memory for each process, inter-process communication is less efficient
    • the easiest way to do parallel computing: embarassingly parallel (no inter-process communication), which is the case we met most often

Monte Carlo approximation for $\pi$


In [1]:
%pylab inline


Populating the interactive namespace from numpy and matplotlib

In [2]:
# with plt.xkcd():
fig = plt.figure(figsize=(10, 10))
ax = plt.axes(frameon=False)
plt.xlim(-1.5,1.5)
plt.ylim(-1.5,1.5)
circle = plt.Circle((0.,0.), 1., color='w', fill=False)
rect = plt.Rectangle((-1,-1), 2, 2, color='gray')
plt.gca().add_artist(rect)
plt.gca().add_artist(circle)
plt.arrow(-2., 0., 3.3, 0., head_width=0.1, head_length=0.2)
plt.arrow(0., -2., 0., 3.3, head_width=0.1, head_length=0.2)
randx = np.random.uniform(-1, 1, (100,))
randy = np.random.uniform(-1, 1, (100,))
plot(randx, randy, 'kx')
plt.gca().axis('off')

plt.text(-1.3, -0.1, '(-1, 0)', fontsize=20)
plt.text( 1.1, -0.1, '(+1, 0)', fontsize=20)
plt.text( 0.1,  1.1, '(0, +1)', fontsize=20)
plt.text( 0.1, -1.1, '(0, -1)', fontsize=20);



In [3]:
%%time

import random
samples = 1E5
hits = 0

for i in range(int(samples)):
    x = random.uniform(-1.0, 1.0)
    y = random.uniform(-1.0, 1.0)
    
    if x**2 + y**2 <= 1.0:
        hits += 1
    
pi = 4.0*hits/samples
print pi


3.14108
CPU times: user 92 ms, sys: 40 ms, total: 132 ms
Wall time: 100 ms

DO NOTICE , this is extremely SLOW!


In [4]:
%%time

import multiprocessing

def sample():
    x = random.uniform(-1.0, 1.0)
    y = random.uniform(-1.0, 1.0)
    
    if x**2 + y**2 <= 1.0:
        return 1
    else:
        return 0

pool = multiprocessing.Pool()
results_async = [pool.apply_async(sample) for i in range(int(samples))]
hits = sum(r.get() for r in results_async)
pool.close()

pi = 4.0*hits/samples
print pi


3.14228
CPU times: user 5.05 s, sys: 1.9 s, total: 6.95 s
Wall time: 4.92 s

In [5]:
%%time

import multiprocessing

def sample_multiple(samples_partial):
    return sum(sample() for i in range(samples_partial))

ntasks = 10
chunk_size = int(samples/ntasks)

pool = multiprocessing.Pool()
results_async = [pool.apply_async(sample_multiple, [chunk_size]) for i in range(ntasks)]
hits = sum(r.get() for r in results_async)
pool.close()

pi = 4.0*hits/samples
print pi


3.13904
CPU times: user 40 ms, sys: 88 ms, total: 128 ms
Wall time: 153 ms

In [ ]:

IPython parallel

IPython's power is not limited to its advanced shell. Its parallel package includes a framework to setup and run calculations on single and multi-core machines, as well as on multiple nodes connected to a network. IPython is great because it gives an interactive twist to parallel computing and provides a common interface to different communication protocols.

  • how to start engines?
    • type $ ipcluster start -n 12 in terminal to start a 12-engine
  • how to use engines?
    • direct view (specify tasks for engines!)
    • task-base view (load balanced)

Direct interface


In [6]:
# to creat an instance of Client, import Client from IPython.parallel
from IPython.parallel import Client
# from ipyparallel import Client


/usr/local/lib/python2.7/dist-packages/IPython/parallel.py:13: ShimWarning: The `IPython.parallel` package has been deprecated. You should import from ipyparallel instead.
  "You should import from ipyparallel instead.", ShimWarning)

In [7]:
%%bash 
#ipcluster start -n 12

In [8]:
rc = Client() # creat an Client instance

In [9]:
rc.ids # show IDs of each engine


Out[9]:
[0, 1, 2, 3, 4, 5, 6, 7]

In [10]:
dview = rc[0] # select the first engine
dview


Out[10]:
<DirectView 0>

In [11]:
dview = rc[::2] # select every other engine
dview


Out[11]:
<DirectView [0, 2, 4, 6]>

In [12]:
dview = rc[:] # select all engines
dview


Out[12]:
<DirectView [0, 1, 2, 3,...]>

In [13]:
dview.execute('a = 1')


Out[13]:
<AsyncResult: execute>

In [14]:
dview.pull('a').get() # equivalent to dview['a']


Out[14]:
[1, 1, 1, 1, 1, 1, 1, 1]

In [15]:
dview.push({'a':2}) # equivbalent to dview['a'] = 2


Out[15]:
<AsyncResult: _push>

In [16]:
dview['a']


Out[16]:
[2, 2, 2, 2, 2, 2, 2, 2]

In [17]:
res = dview.execute('a = T_T') # got error

In [18]:
res.get()


[0:execute]: 
NameErrorTraceback (most recent call last)<ipython-input-1-5dd6c0744106> in <module>()
----> 1 a = T_T
NameError: name 'T_T' is not defined

[1:execute]: 
NameErrorTraceback (most recent call last)<ipython-input-1-5dd6c0744106> in <module>()
----> 1 a = T_T
NameError: name 'T_T' is not defined

[2:execute]: 
NameErrorTraceback (most recent call last)<ipython-input-1-5dd6c0744106> in <module>()
----> 1 a = T_T
NameError: name 'T_T' is not defined

[3:execute]: 
NameErrorTraceback (most recent call last)<ipython-input-1-5dd6c0744106> in <module>()
----> 1 a = T_T
NameError: name 'T_T' is not defined

... 4 more exceptions ...

In [19]:
res = dview.execute('b = a+1')

In [20]:
dview['b']


Out[20]:
[3, 3, 3, 3, 3, 3, 3, 3]

In [21]:
res = dview.execute('b = b+1')

In [22]:
dview['b']


Out[22]:
[4, 4, 4, 4, 4, 4, 4, 4]

Engines should be treated as independent IPython sessions, and imports and custom-defined functions must be synchronized over the network. To import some libraries, both locally and in the engines, you can use the DirectView.sync_imports context manager:


In [23]:
with dview.sync_imports():
    import numpy
    # the syntax import _ as _ is not supported


importing numpy on engine(s)

DirectView.map_async


In [24]:
a = range(100)

In [25]:
def square(x):
    return x*x

In [26]:
results_async = dview.map_async(square, a)
print  results_async.get()


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]

parallel decorator


In [27]:
@dview.parallel(block=False)
def square(x):
    return x * x

In [28]:
print square.map(range(100)).get()


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]

DirectView.apply

  • executed on every engine

In [29]:
def square(x):
    return x*x

In [30]:
result_async = dview.apply(square, 2)

In [31]:
result_async.get()


Out[31]:
[4, 4, 4, 4, 4, 4, 4, 4]

scatter & gather


In [32]:
dview.scatter('a', [0, 1, 2, 3])


Out[32]:
<AsyncResult: scatter>

In [33]:
print dview['a']


[[0], [1], [2], [3], [], [], [], []]

In [34]:
dview.scatter('a', np.arange(16))


Out[34]:
<AsyncResult: scatter>

In [35]:
print dview['a']


[array([0, 1]), array([2, 3]), array([4, 5]), array([6, 7]), array([8, 9]), array([10, 11]), array([12, 13]), array([14, 15])]

In [36]:
dview.execute('a = a**2')


Out[36]:
<AsyncResult: execute>

In [37]:
print dview['a']


[array([0, 1]), array([4, 9]), array([16, 25]), array([36, 49]), array([64, 81]), array([100, 121]), array([144, 169]), array([196, 225])]

In [38]:
dview.gather('a').get()


Out[38]:
array([  0,   1,   4,   9,  16,  25,  36,  49,  64,  81, 100, 121, 144,
       169, 196, 225])

task-based interface (load balanced)


In [39]:
from IPython.parallel import Client

In [40]:
rc = Client()

In [41]:
tview = rc.load_balanced_view()

In [42]:
def square(x):
    return x * x

In [43]:
dview.apply(square, 2) # executes in every engine!


Out[43]:
<AsyncResult: square>

In [44]:
tview.apply(square, 2).get() # executed in ONLY 1 engine


Out[44]:
4

In [45]:
tview.apply(square, np.arange(10)).get()


Out[45]:
array([ 0,  1,  4,  9, 16, 25, 36, 49, 64, 81])

In [ ]:

Run the Monte-Carlo for$\pi$ on IPython cluster

1. using @dview.parallel() decorator


In [46]:
def sample():    
    x = numpy.random.uniform(-1.0, 1.0)
    y = numpy.random.uniform(-1.0, 1.0)
    
    if x**2 + y**2 <= 1.0:
        return 1
    else:
        return 0

@dview.parallel()
def sample_multiple(samples_partial):
    return sum(sample() for i in range(samples_partial))

In [47]:
%%time

samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)

results = sample_multiple.map([chunk_size for i in range(ntasks)]) # ntask determines the # of processes-->10


CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 9.3 ms

In [48]:
print 'pi: ', sum(results.get())/np.double(samples)*4.


pi: 
[Engine Exception]
NameErrorTraceback (most recent call last)<string> in <module>()
<ipython-input-46-3c136ff51af4> in sample_multiple(samples_partial)
<ipython-input-46-3c136ff51af4> in <genexpr>((i,))
NameError: global name 'sample' is not defined

[Engine Exception]
NameErrorTraceback (most recent call last)<string> in <module>()
<ipython-input-46-3c136ff51af4> in sample_multiple(samples_partial)
<ipython-input-46-3c136ff51af4> in <genexpr>((i,))
NameError: global name 'sample' is not defined

[Engine Exception]
NameErrorTraceback (most recent call last)<string> in <module>()
<ipython-input-46-3c136ff51af4> in sample_multiple(samples_partial)
<ipython-input-46-3c136ff51af4> in <genexpr>((i,))
NameError: global name 'sample' is not defined

[Engine Exception]
NameErrorTraceback (most recent call last)<string> in <module>()
<ipython-input-46-3c136ff51af4> in sample_multiple(samples_partial)
<ipython-input-46-3c136ff51af4> in <genexpr>((i,))
NameError: global name 'sample' is not defined

... 4 more exceptions ...

2. using direct/task-based interface


In [49]:
def sample():    
    x = numpy.random.uniform(-1.0, 1.0)
    y = numpy.random.uniform(-1.0, 1.0)
    
    if x**2 + y**2 <= 1.0:
        return 1
    else:
        return 0

def sample_multiple(samples_partial):
    return sum(sample() for i in range(samples_partial))

dview.push({'sample':sample, 'sample_multiple':sample_multiple})


Out[49]:
<AsyncResult: _push>

2.1 dview.apply - blocking mode

executed in every engin!


In [50]:
%%time

samples = int(1E8)
ntasks = len(dview)
chunk_size = int(samples/ntasks)

dview.block = True
results = dview.map(sample_multiple, [chunk_size for i in range(ntasks)])
# task should be evenly splited on every engine


CPU times: user 32 ms, sys: 4 ms, total: 36 ms
Wall time: 20.2 s

In [52]:
print 'pi: ', sum(results)/np.double(samples)*4.


pi:  3.14196252

2.2 tview.map (load balanced)


In [53]:
%%time

samples = int(1E8)
ntasks = len(tview)
chunk_size = int(samples/ntasks)

tview.block = True
results = tview.map(sample_multiple, [chunk_size for i in range(ntasks)])
# task should be evenly splited on every engine


CPU times: user 28 ms, sys: 4 ms, total: 32 ms
Wall time: 18.6 s

In [54]:
print 'pi: ', sum(results)/np.double(samples)*4.


pi:  3.1416446

2.3 dview.map_async


In [55]:
%%time

samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)

dview.block = False
results_async = dview.map_async(sample_multiple, 
                                [chunk_size for i in range(ntasks)])


CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 9.09 ms

In [56]:
print results_async.ready()


False

In [57]:
print 'pi: ', sum(results_async.get())/np.double(samples)*4.


pi:  3.14170044

In [58]:
print results_async.ready()


True

2.4 tview.map_async


In [59]:
%%time

samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)

tview.block = False
results_async = tview.map_async(sample_multiple, 
                                [chunk_size for i in range(ntasks)]) #determines the tasks for each engine

# print 'pi: ', sum(results_async.get())/np.double(samples)*4.


CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 10.6 ms

In [60]:
print results_async.ready()


False

In [61]:
print 'pi: ', sum(results_async.get())/np.double(samples)*4.


pi:  3.141516

In [62]:
print results_async.ready()


True

2.5 dview.apply


In [63]:
%%time

samples = 1E8
ntasks = len(dview)
chunk_size = int(samples/ntasks)

dview.block = True
results = dview.apply(sample_multiple, chunk_size)


CPU times: user 24 ms, sys: 0 ns, total: 24 ms
Wall time: 21 s

In [64]:
print 'pi: ', sum(results)/np.double(samples)*4.


pi:  3.14160196

2.6 tview.apply (single engine execution!)


In [65]:
%%time

samples = 1E8
ntasks = len(tview)
chunk_size = int(samples/ntasks)

dview.block = True
results = tview.apply(sample_multiple, chunk_size)


CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 568 µs

In [66]:
print 'pi: ', sum(results.get())/np.double(samples)*4.


pi:  0.39274236

In [67]:
print 'pi: ', sum(results.get())/np.double(samples)*4.*ntasks


pi:  3.14193888

2.7 scatter & gather


In [68]:
samples = 1E8
ntasks = 50
chunk_size = int(samples/ntasks)

In [69]:
dview.scatter('chunk_size', [chunk_size for i in range(ntasks)])
dview.scatter('sum_sample', [0 for i in range(ntasks)])

In [70]:
for cz in dview['chunk_size']:
    print cz


[2000000, 2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]
[2000000, 2000000, 2000000, 2000000, 2000000, 2000000]

In [71]:
dview['sample_multiple']


Out[71]:
[<function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>,
 <function sample_multiple>]

In [72]:
dview.execute('sum_sample = [sample_multiple(chunk_size_) for chunk_size_ in chunk_size]')


Out[72]:
<AsyncResult: execute:finished>

In [73]:
dview['sum_sample']


Out[73]:
[[1570762, 1571220, 1570577, 1569768, 1571206, 1570366, 1570209],
 [1570821, 1571711, 1569952, 1571582, 1571127, 1571310, 1572184],
 [1570133, 1570223, 1571538, 1571680, 1572032, 1570745],
 [1570163, 1571448, 1571191, 1570501, 1570848, 1571122],
 [1570817, 1570981, 1571109, 1570527, 1571422, 1571518],
 [1569734, 1571547, 1572102, 1570127, 1570073, 1570573],
 [1571102, 1570951, 1569925, 1571305, 1571198, 1569940],
 [1569893, 1569968, 1570509, 1570992, 1571428, 1569348]]

In [74]:
sum(dview.gather('sum_sample'))/samples*4.


Out[74]:
3.1416603200000002

In [ ]:

view.wait()

can be used to block the async results


In [75]:
%%time

samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)

dview.block = False
results_async = dview.map_async(sample_multiple, 
                                [chunk_size for i in range(ntasks)])


CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 11 ms

In [76]:
results_async.ready()


Out[76]:
False

In [77]:
dview.wait(results_async)


Out[77]:
True

In [78]:
print 'pi: ', sum(results_async.get())/np.double(samples)*4.


pi:  3.14160688

In [ ]:

open qtconsole to engines?


In [79]:
dview = rc[::4]
dview.execute('qtconsole')


Out[79]:
<AsyncResult: execute>

Discussion: how to further improve the speed?

  • 1.
  • 2.

In [ ]: