In [ ]:
import ipyparallel
rc = ipyparallel.Client(profile='crestone-node')
print(rc.ids)
In [ ]:
#########################################################################3
# Example: Initializing IPyParallel
#
# This example demonstrates how to access the individual
# ipython engines running within the cluster.
#
import ipyparallel
import os
import socket
#Create a client instance, used to connect the controller to the remote engines
rc=ipyparallel.Client(profile='default')
nengines = len(rc)
#create direct views into each engine
all_proc = rc[:] # all_proc is a list of ipython DirectView objects
#Only the controller prints this
print('\n ',nengines," Python engines are active.\n")
# Each Python engine calls the gethostname and getpid functions
hostnames = all_proc.apply_sync(socket.gethostname)
pids = all_proc.apply_sync(os.getpid)
for i in range(nengines):
istr = '{:02d}'.format(i) # returns a 2-digit string whose value is i
pstr = str(pids[i])
hstr = str(hostnames[i])
msg = 'Engine '+istr+': pid = '+pstr+'; hostname ='+hstr
print(msg)
print(' ')
In [ ]:
#########################################################################3
# Example: Assignment in IPyParallel
#
# We can assign values to variables globally (across all
# engines at once) or locally (across some subset
# of engines).
import ipyparallel
import os
import socket
#identify the our python engines
rc=ipyparallel.Client(profile='default')
nengines = len(rc)
#create views into each engine
all_proc = rc[:]
#we can also create views into individual engines...
proc0 = rc[0]
proc1 = rc[1]
proc2 = rc[2]
#... or into only the even engines, or only the odd engines
even_proc = rc[range(0,nengines,2)]
odd_proc = rc[range(1,nengines,2)]
#Only the hub prints this
print('\n ',nengines," Python engines are active.\n")
# Each Python engine calls the gethostname and getpid functions
hostnames = all_proc.apply_sync(socket.gethostname)
pids = all_proc.apply_sync(os.getpid)
#Assign a list-value of [0,1] to the variable 'b' on all python engines
all_proc['b']=[0,1]
#We can view the value on all engines
vals = all_proc['b']
print('All values of b: ',vals) # or use all_proc['b']
#or on a single engine:
print('')
print("Engine zero's value of b: ", proc0['b']) #could also use vals[0]
#Assign a value of 1 to var1 on all python engines
all_proc['var1']=1
#Assign a value of 2 to var2 on all python engines
all_proc['var2']=2
#Change the value of var2 to 3 and 4 on engines 0 and 2 respectively
proc0['var2'] = 3
proc2['var2'] =4
#Assign engine 2's value of var2 to engine 1's value of var2
proc1['var2'] = proc2['var2']
print('var2 on engine 1 and engine 2: ', proc1['var2'], ' , ', proc2['var2'])
#Assign the value of 0 to var3 on even-numbered engines and 1 to var3 on odd-numbered engines
even_proc['var3']=0
odd_proc['var3']=1
vars1 = all_proc['var1']
vars2 = all_proc['var2']
vars3 = all_proc['var3']
print(' ')
for i in range(nengines):
istr = '{:02d}'.format(i) # returns a 2-digit string whose value is i
v1str = str(vars1[i])
v2str = str(vars2[i])
v3str = str(vars3[i])
msg = 'Engine '+istr+': var1 = '+v1str+'; var2 ='+v2str+'; var3 ='+v3str
print(msg)
print(' ')
In [ ]:
#########################################################################3
# Exercise: Assignment
#
# Modify this program so that var1 receives the value n,
# where n is the remainder of the process ID divided by 3.
# i.e., n = pid % 3
import ipyparallel
import os
import socket
#identify the our python engines
rc=ipyparallel.Client(profile='default')
nengines = len(rc)
#create views into each engine
all_proc = rc[:]
proc0 = rc[0]
proc2 = rc[2]
even_proc = rc[range(0,nengines,2)]
odd_proc = rc[range(1,nengines,2)]
#Only the hub prints this
print('\n ',nengines," Python engines are active.\n")
# Each Python engine calls the gethostname and getpid functions
hostnames = all_proc.apply_sync(socket.gethostname)
pids = all_proc.apply_sync(os.getpid)
#Assign a list-value of [0,1] to the variable 'b' on all python engines
all_proc['b']=[0,1]
#We can view the value on all engines
vals = all_proc['b']
print('All values of b: ',vals) # or use all_proc['b']
#or on a single engine:
print('')
print("Engine zero's value of b: ", proc0['b']) #could also use vals[0]
#Assign a value of 1 to var1 on all python engines
all_proc['var1']=1
#Assign a value of 2 to var2 on all python engines
all_proc['var2']=2
#Change the value of var2 to 3 and 4 on engines 0 and 2 respectively
proc0['var2'] = 3
proc2['var2'] =4
even_proc['var3']=0
odd_proc['var3']=1
vars1 = all_proc['var1']
vars2 = all_proc['var2']
vars3 = all_proc['var3']
print(' ')
for i in range(nengines):
istr = '{:02d}'.format(i) # returns a 2-digit string whose value is i
v1str = str(vars1[i])
v2str = str(vars2[i])
v3str = str(vars3[i])
msg = 'Engine '+istr+': var1 = '+v1str+'; var2 ='+v2str+'; var3 ='+v3str
print(msg)
print(' ')
for i in range(nengines):
rc[i]['var1'] = i % 3
vars1 = all_proc['var1']
print("After our change:")
for i in range(nengines):
istr = '{:02d}'.format(i) # returns a 2-digit string whose value is i
v1str = str(vars1[i])
v2str = str(vars2[i])
v3str = str(vars3[i])
msg = 'Engine '+istr+': var1 = '+v1str+'; var2 ='+v2str+'; var3 ='+v3str
print(msg)
print(' ')
In [ ]:
################################################################
# Example: Scattering and Gathering Lists in Python
#
# One common task when parallel programming involves
# distributing (scattering) a list of numbers among
# the different processes or collating (gathering)
# a distributed list of numbers back to the hub processes.
# This example illustrates the basic mechanics of scattering
# and gathering.
import ipyparallel
#identify the our python engines
rc =ipyparallel.Client(profile='default')
nengines = len(rc)
#create views into each engine
all_proc = rc[:]
# This keeps the engines in-sync with one another
# Engines do not proceed to instruction 'b' until all engines
# have completed instruction 'a.'
all_proc.block=True
a = []
lsize=6*nengines
for i in range(0,lsize):
a.append(i**2)
#We scatter the list "a" from the hub out to all engines
#Each process stores a portion of "a" locally in the variable "mylist"
all_proc.scatter('mylist',a)
# Create a variable on the controller that holds the contents of 'mylist' for each engine
# sub_lists is a nested list, sub_list[i][:] holds the value 'mylist' for engine 'i'
sub_lists = all_proc['mylist']
#Only the controller prints this
print('\n ',nengines," Python engines are active.\n")
print(' ')
for i in range(nengines):
istr = '{:02d}'.format(i) # returns a 2-digit string whose value is i
msg = 'Engine '+istr+': list segment = '
print(msg, sub_lists[i])
print(' ')
#Gather 'mylist' back to the controller, store the contents in a list named gathered
gathered = all_proc.gather('mylist')
print('Gathered list: ', gathered[:], type(gathered))
In [ ]:
#######################################################################
#
# Exercise: Scatter/Gather
#
# Scatter list a to all even processors, assigning its values to the variable mylist
# Scatter list b to all odd processors, assigning its values to mylist
# Gather from all processors to create the list [0,1,0,1,2,3,4,9,....]
#
import ipyparallel
#identify the our python engines
rc =ipyparallel.Client(profile='default')
nengines = len(rc)
#create views into each engine
all_proc = rc[:]
all_proc.block=True
even_proc = rc[range(0,nengines,2)]
odd_proc = rc[range(1,nengines,2)]
a = []
b = []
lsize=nengines
for i in range(0,lsize):
a.append(i)
b.append(i**2)
even_proc.scatter('mylist',a)
odd_proc.scatter('mylist',b)
sub_lists_a = even_proc['mylist']
sub_lists_b = odd_proc['mylist']
#Only the hub prints this
print('\n ',nengines," Python engines are active.\n")
print(' ')
for i in range(int(nengines/2)):
istr = '{:02d}'.format(i) # returns a 2-digit string whose value is i
msg = 'Engine '+istr+': list segment = '
print(msg, sub_lists_a[i])
print(msg, sub_lists_b[i])
print(' ')
gathered = all_proc.gather('mylist')
print('Gathered list: ', gathered[:], type(gathered))
In [ ]:
#############################################################################
# Example: Parallel computation of pi using IPyParallel
#
# This program demonstrates how to evaluate a function in parallel
# using map_sync.
#
# In this example, each engine computes a unique estimate of pi.
# The result is averaged across all engines in the cluster.
#
import ipyparallel
import random
import time
import numpy as np
def estimate_pi(n):
# (a) This function uses the random module.
# If we import random here, we are guaranteed that
# all processes will import the module. However, see (b)
# import random
count = 0
for i in range(n):
x = random.random()
y = random.random()
if (x**2 + y**2) <= 1:
count += 1
return 4.0*count/float(n)
rc=ipyparallel.Client(profile='default')
nengines = len(rc)
all_proc = rc[:]
all_proc.block=True
#(b) Alternatively, we can sync our module imports from the hub
with all_proc.sync_imports():
import random
# First in serial
print('\n\n\n')
print('Serial Estimation of Pi')
print('')
for i in range(8):
nx = 10**i
t0 = time.time()
est_pi = estimate_pi(nx)
t1 = time.time()
tval = t1-t0
msg = 'Estimation based on '+str(10**i)+' points: '
tmsg = 'Calculation time (seconds) : '
print(msg,est_pi,tmsg,tval)
print('\n\n\n')
print('Parallel Estimation of Pi')
print('')
#Now in parallel
for i in range(8,10):
nx = 10**i
t0 = time.time()
#[nx//nengines]*nengines creates a list of length nengines, where each element
# has value nx//nengines. Each process gets one element of this list
# and passes it to estimate_pi
pi_estimates = all_proc.map_sync(estimate_pi, [nx//nengines]*nengines)
est_pi = np.mean(pi_estimates)
t1 = time.time()
tval = t1-t0
msg = 'Estimation based on '+str(10**i)+' points: '
tmsg = 'Calculation time (seconds) : '
print(msg,est_pi,tmsg,tval)
In [ ]:
##########################################################3
# Exercise: Parallel Function Evaluation
#
# Modify this code as follows:
#
# i) Have squared return the tuple ( x^2, process ID).
# ii) Revise the formatting of the loop output to read as follows:
# 47^2 is 2209 according to process PID: 192159
import ipyparallel
import os
def squared(x):
import os
return (x**2, os.getpid())
#Initialize Communication
rc = ipyparallel.Client(profile='default')
nengines = len(rc)
#create views into each engine
all_proc = rc[:]
all_proc.block=True
#Only the hub prints this
print('\n ',nengines," Python engines are active.\n")
n=nengines*4
#Engine 0 gets arguments [0,1,2,3]
#Engine 1 gets arguments [4,5,6,7] etc.
results = all_proc.map_sync(squared,range(n))
print(results)
# We can use the process ID's to create a dictionary mapping
# process ID to engine #
pids = all_proc.apply_sync(os.getpid)
pdict = {}
for i,p in enumerate(pids):
pdict[p] = i
for i in range(n):
istr = '{:4d}'.format(i)
vstr = '{:4d}'.format(results[i][0])
pstr = '{:6d}'.format(results[i][1])
# Leave this line alone; results[i][1] will hold a process ID, which we then use to
# access the engine ID in the pdict dictionary
estr = '{:2d}'.format(pdict[results[i][1]])
msg = istr+'^2 is '+vstr+' according to process PID: '+pstr + ', Engine: '+estr
print(msg)
In [ ]:
################################################################
#
# Exercise: Modify this code so that the calculation of
# Collatz-sequence lengths is distributed across
# multiple processors.
# Perform the calculation in serial and in parallel and
# measure the time required in each instance
#
import time
def collatz_length(n):
val = n
length = 1
while (val != 1):
length += 1
if ((val % 2) == 0):
val = val//2
else:
val =3*val+1
#print(val)
return (n, length)
n = 100000
t0 = time.time()
max_len = 1
max_i = 1
for i in range(1,n):
val, length = collatz_length(i)
if (length > max_len):
max_i = i
max_len = length
t1 = time.time()
serial_t = t1 - t0
print("The serial answer is:")
msg = 'The integer with the longest collatz sequence in the interval [1,'+str(n)+'] is '
msg2 = str(max_i)+', whose sequence has length '+str(max_len)+'.'
print(msg+msg2)
print("The parallel answer is:")
t0 = time.time()
#Initialize Communication
rc = ipyparallel.Client(profile='default')
nengines = len(rc)
#create views into each engine
all_proc = rc[:]
all_proc.block=True
#Only the hub prints this
print('\n ',nengines," Python engines are active.\n")
#Engine 0 gets arguments [0,1,2,3, (one-quarter of n)]
#Engine 1 gets arguments [4,5,6,7] etc.
results = all_proc.map_sync(collatz_length,range(1,n))
arr_results = np.array(results)
ans_tuple = np.squeeze(arr_results[np.where(arr_results[:,1] == np.max(arr_results[:,1]))])
print(ans_tuple)
print("The longest length is: %d for number= %d" % (ans_tuple[1], ans_tuple[0]))
t1 = time.time()
parallel_t = t1 - t0
print("Serial time = %f" % serial_t)
print("Parallel time = %f" % parallel_t)
In [ ]:
import ipyparallel
import numpy as np
import time
def collatz_length(n):
val = n
length = 1
while (val != 1):
length += 1
if ((val % 2) == 0):
val = val//2
else:
val =3*val+1
return length
n = 100000
max_len = 1
max_i = 1
print('\n')
print('...........................')
print('Serial Calculation')
print('')
t0 = time.time()
for i in range(1,n):
length = collatz_length(i)
if (length > max_len):
max_i = i
max_len = length
t1 = time.time()
dt = t1-t0
msg = 'The integer with the longest collatz sequence in the interval [1,'+str(n)+'] is '
msg2 = str(max_i)+', whose sequence has length '+str(max_len)+'.'
print(msg+msg2)
print('Calculation time (serial; seconds): ', dt)
rc=ipyparallel.Client(profile='default')
nengines = len(rc)
all_proc = rc[:]
all_proc.block=True
print('\n')
print('...........................')
print('Parallel Calculation')
print('')
#Generate a list of numbers whose collatz-sequence lengths we want to know
numbers=[]
for i in range(1,n):
numbers.append(i)
# Map the list of numbers to all processes, evaluate collatz_length
# using numbers as arguments.
t0 = time.time()
lengths = all_proc.map_sync(collatz_length, numbers)
max_len=np.max(lengths)
# search the list for the index corresponding to max_len; the number corresponding to that length is index+1
max_i = lengths.index(max_len)+1
t1 = time.time()
dt=t1-t0
msg = 'The integer with the longest collatz sequence in the interval [1,'+str(n)+'] is '
msg2 = str(max_i)+', whose sequence has length '+str(max_len)+'.'
print(msg+msg2)
print('Calculation time (parallel; seconds): ', dt)
In [ ]:
new = np.reshape(np.arange(10),(5,2))
print(new)
In [ ]:
test[:,1]
In [ ]: