Bo Zhang (NAOC, mailto:bozhang@nao.cas.cn) will have a few lessons on python.
python
to process astronomical data.These lectures are organized as below:
use as many computing resources as possible
parallel computing in multi-CPU/core computer (multiprocessing
, ...)
run code on multi-node computer cluster (PBS
, ...)
In [ ]:
In [1]:
%pylab inline
In [2]:
# with plt.xkcd():
fig = plt.figure(figsize=(10, 10))
ax = plt.axes(frameon=False)
plt.xlim(-1.5,1.5)
plt.ylim(-1.5,1.5)
circle = plt.Circle((0.,0.), 1., color='w', fill=False)
rect = plt.Rectangle((-1,-1), 2, 2, color='gray')
plt.gca().add_artist(rect)
plt.gca().add_artist(circle)
plt.arrow(-2., 0., 3.3, 0., head_width=0.1, head_length=0.2)
plt.arrow(0., -2., 0., 3.3, head_width=0.1, head_length=0.2)
randx = np.random.uniform(-1, 1, (100,))
randy = np.random.uniform(-1, 1, (100,))
plot(randx, randy, 'kx')
plt.gca().axis('off')
plt.text(-1.3, -0.1, '(-1, 0)', fontsize=20)
plt.text( 1.1, -0.1, '(+1, 0)', fontsize=20)
plt.text( 0.1, 1.1, '(0, +1)', fontsize=20)
plt.text( 0.1, -1.1, '(0, -1)', fontsize=20);
In [3]:
%%time
import random
samples = 1E5
hits = 0
for i in range(int(samples)):
x = random.uniform(-1.0, 1.0)
y = random.uniform(-1.0, 1.0)
if x**2 + y**2 <= 1.0:
hits += 1
pi = 4.0*hits/samples
print pi
In [4]:
%%time
import multiprocessing
def sample():
x = random.uniform(-1.0, 1.0)
y = random.uniform(-1.0, 1.0)
if x**2 + y**2 <= 1.0:
return 1
else:
return 0
pool = multiprocessing.Pool()
results_async = [pool.apply_async(sample) for i in range(int(samples))]
hits = sum(r.get() for r in results_async)
pool.close()
pi = 4.0*hits/samples
print pi
In [5]:
%%time
import multiprocessing
def sample_multiple(samples_partial):
return sum(sample() for i in range(samples_partial))
ntasks = 10
chunk_size = int(samples/ntasks)
pool = multiprocessing.Pool()
results_async = [pool.apply_async(sample_multiple, [chunk_size]) for i in range(ntasks)]
hits = sum(r.get() for r in results_async)
pool.close()
pi = 4.0*hits/samples
print pi
In [ ]:
IPython's power is not limited to its advanced shell. Its parallel package includes a framework to setup and run calculations on single and multi-core machines, as well as on multiple nodes connected to a network. IPython is great because it gives an interactive twist to parallel computing and provides a common interface to different communication protocols.
$ ipcluster start -n 12
in terminal to start a 12-engine
In [6]:
# to creat an instance of Client, import Client from IPython.parallel
from IPython.parallel import Client
# from ipyparallel import Client
In [7]:
%%bash
#ipcluster start -n 12
In [8]:
rc = Client() # creat an Client instance
In [9]:
rc.ids # show IDs of each engine
Out[9]:
In [10]:
dview = rc[0] # select the first engine
dview
Out[10]:
In [11]:
dview = rc[::2] # select every other engine
dview
Out[11]:
In [12]:
dview = rc[:] # select all engines
dview
Out[12]:
In [13]:
dview.execute('a = 1')
Out[13]:
In [14]:
dview.pull('a').get() # equivalent to dview['a']
Out[14]:
In [15]:
dview.push({'a':2}) # equivbalent to dview['a'] = 2
Out[15]:
In [16]:
dview['a']
Out[16]:
In [17]:
res = dview.execute('a = T_T') # got error
In [18]:
res.get()
In [19]:
res = dview.execute('b = a+1')
In [20]:
dview['b']
Out[20]:
In [21]:
res = dview.execute('b = b+1')
In [22]:
dview['b']
Out[22]:
Engines should be treated as independent IPython sessions, and imports and custom-defined functions must be synchronized over the network. To import some libraries, both locally and in the engines, you can use the DirectView.sync_imports context manager:
In [23]:
with dview.sync_imports():
import numpy
# the syntax import _ as _ is not supported
In [24]:
a = range(100)
In [25]:
def square(x):
return x*x
In [26]:
results_async = dview.map_async(square, a)
print results_async.get()
In [27]:
@dview.parallel(block=False)
def square(x):
return x * x
In [28]:
print square.map(range(100)).get()
In [29]:
def square(x):
return x*x
In [30]:
result_async = dview.apply(square, 2)
In [31]:
result_async.get()
Out[31]:
In [32]:
dview.scatter('a', [0, 1, 2, 3])
Out[32]:
In [33]:
print dview['a']
In [34]:
dview.scatter('a', np.arange(16))
Out[34]:
In [35]:
print dview['a']
In [36]:
dview.execute('a = a**2')
Out[36]:
In [37]:
print dview['a']
In [38]:
dview.gather('a').get()
Out[38]:
In [39]:
from IPython.parallel import Client
In [40]:
rc = Client()
In [41]:
tview = rc.load_balanced_view()
In [42]:
def square(x):
return x * x
In [43]:
dview.apply(square, 2) # executes in every engine!
Out[43]:
In [44]:
tview.apply(square, 2).get() # executed in ONLY 1 engine
Out[44]:
In [45]:
tview.apply(square, np.arange(10)).get()
Out[45]:
In [ ]:
In [46]:
def sample():
x = numpy.random.uniform(-1.0, 1.0)
y = numpy.random.uniform(-1.0, 1.0)
if x**2 + y**2 <= 1.0:
return 1
else:
return 0
@dview.parallel()
def sample_multiple(samples_partial):
return sum(sample() for i in range(samples_partial))
In [47]:
%%time
samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)
results = sample_multiple.map([chunk_size for i in range(ntasks)]) # ntask determines the # of processes-->10
In [48]:
print 'pi: ', sum(results.get())/np.double(samples)*4.
In [49]:
def sample():
x = numpy.random.uniform(-1.0, 1.0)
y = numpy.random.uniform(-1.0, 1.0)
if x**2 + y**2 <= 1.0:
return 1
else:
return 0
def sample_multiple(samples_partial):
return sum(sample() for i in range(samples_partial))
dview.push({'sample':sample, 'sample_multiple':sample_multiple})
Out[49]:
In [50]:
%%time
samples = int(1E8)
ntasks = len(dview)
chunk_size = int(samples/ntasks)
dview.block = True
results = dview.map(sample_multiple, [chunk_size for i in range(ntasks)])
# task should be evenly splited on every engine
In [52]:
print 'pi: ', sum(results)/np.double(samples)*4.
In [53]:
%%time
samples = int(1E8)
ntasks = len(tview)
chunk_size = int(samples/ntasks)
tview.block = True
results = tview.map(sample_multiple, [chunk_size for i in range(ntasks)])
# task should be evenly splited on every engine
In [54]:
print 'pi: ', sum(results)/np.double(samples)*4.
In [55]:
%%time
samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)
dview.block = False
results_async = dview.map_async(sample_multiple,
[chunk_size for i in range(ntasks)])
In [56]:
print results_async.ready()
In [57]:
print 'pi: ', sum(results_async.get())/np.double(samples)*4.
In [58]:
print results_async.ready()
In [59]:
%%time
samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)
tview.block = False
results_async = tview.map_async(sample_multiple,
[chunk_size for i in range(ntasks)]) #determines the tasks for each engine
# print 'pi: ', sum(results_async.get())/np.double(samples)*4.
In [60]:
print results_async.ready()
In [61]:
print 'pi: ', sum(results_async.get())/np.double(samples)*4.
In [62]:
print results_async.ready()
In [63]:
%%time
samples = 1E8
ntasks = len(dview)
chunk_size = int(samples/ntasks)
dview.block = True
results = dview.apply(sample_multiple, chunk_size)
In [64]:
print 'pi: ', sum(results)/np.double(samples)*4.
In [65]:
%%time
samples = 1E8
ntasks = len(tview)
chunk_size = int(samples/ntasks)
dview.block = True
results = tview.apply(sample_multiple, chunk_size)
In [66]:
print 'pi: ', sum(results.get())/np.double(samples)*4.
In [67]:
print 'pi: ', sum(results.get())/np.double(samples)*4.*ntasks
In [68]:
samples = 1E8
ntasks = 50
chunk_size = int(samples/ntasks)
In [69]:
dview.scatter('chunk_size', [chunk_size for i in range(ntasks)])
dview.scatter('sum_sample', [0 for i in range(ntasks)])
In [70]:
for cz in dview['chunk_size']:
print cz
In [71]:
dview['sample_multiple']
Out[71]:
In [72]:
dview.execute('sum_sample = [sample_multiple(chunk_size_) for chunk_size_ in chunk_size]')
Out[72]:
In [73]:
dview['sum_sample']
Out[73]:
In [74]:
sum(dview.gather('sum_sample'))/samples*4.
Out[74]:
In [ ]:
In [75]:
%%time
samples = 1E8
ntasks = 10
chunk_size = int(samples/ntasks)
dview.block = False
results_async = dview.map_async(sample_multiple,
[chunk_size for i in range(ntasks)])
In [76]:
results_async.ready()
Out[76]:
In [77]:
dview.wait(results_async)
Out[77]:
In [78]:
print 'pi: ', sum(results_async.get())/np.double(samples)*4.
In [ ]:
In [79]:
dview = rc[::4]
dview.execute('qtconsole')
Out[79]:
In [ ]: