In [ ]:
import ipyparallel
rc = ipyparallel.Client(profile='crestone-node')
print(rc.ids)

In [ ]:
#########################################################################3
#           Example:  Initializing IPyParallel
#
#                     This example demonstrates how to access the individual
#                     ipython engines running within the cluster.
#


import ipyparallel
import os
import socket

#Create a client instance, used to connect the controller to the remote engines
rc=ipyparallel.Client(profile='default')
nengines = len(rc)

#create direct views into each engine
all_proc  = rc[:]  # all_proc is a list of ipython DirectView objects

#Only the controller prints this
print('\n ',nengines," Python engines are active.\n")

# Each Python engine calls the gethostname and getpid functions
hostnames = all_proc.apply_sync(socket.gethostname)
pids = all_proc.apply_sync(os.getpid)

for i in range(nengines):
    istr = '{:02d}'.format(i)  # returns a 2-digit string whose value is i
    pstr = str(pids[i])
    hstr = str(hostnames[i])
    msg = 'Engine '+istr+':   pid = '+pstr+';  hostname ='+hstr
    print(msg)
print(' ')

Variable Assignment across engines


In [ ]:
#########################################################################3
#       Example:  Assignment in IPyParallel
#           
#                 We can assign values to variables globally (across all 
#                 engines at once) or locally (across some subset
#                 of engines).


import ipyparallel
import os
import socket

#identify the our python engines
rc=ipyparallel.Client(profile='default')
nengines = len(rc)

#create views into each engine
all_proc  = rc[:]

#we can also create views into individual engines...
proc0 = rc[0]
proc1 = rc[1]
proc2 = rc[2]

#... or into only the even engines, or only the odd engines
even_proc = rc[range(0,nengines,2)]
odd_proc  = rc[range(1,nengines,2)]

#Only the hub prints this
print('\n ',nengines," Python engines are active.\n")

# Each Python engine calls the gethostname and getpid functions
hostnames = all_proc.apply_sync(socket.gethostname)
pids = all_proc.apply_sync(os.getpid)

#Assign a list-value of [0,1] to the variable 'b' on all python engines
all_proc['b']=[0,1]
#We can view the value on all engines
vals = all_proc['b']
print('All values of b: ',vals) # or use all_proc['b']
#or on a single engine:
print('')
print("Engine zero's value of b: ", proc0['b']) #could also use vals[0]

#Assign a value of 1 to var1 on all python engines
all_proc['var1']=1

#Assign a value of 2 to var2 on all python engines
all_proc['var2']=2

#Change the value of var2 to 3 and 4 on engines 0 and 2 respectively
proc0['var2'] = 3
proc2['var2'] =4

#Assign engine 2's value of var2 to engine 1's value of var2
proc1['var2'] = proc2['var2']
print('var2 on engine 1 and engine 2: ', proc1['var2'], ' , ', proc2['var2'])
#Assign the value of 0 to var3 on even-numbered engines and 1 to var3 on odd-numbered engines
even_proc['var3']=0
odd_proc['var3']=1


vars1 = all_proc['var1']
vars2 = all_proc['var2']
vars3 = all_proc['var3']
print(' ')
for i in range(nengines):
    istr = '{:02d}'.format(i)  # returns a 2-digit string whose value is i
    v1str = str(vars1[i])
    v2str = str(vars2[i])
    v3str = str(vars3[i])
    msg = 'Engine '+istr+':   var1 = '+v1str+';  var2 ='+v2str+';  var3 ='+v3str
    print(msg)
print(' ')

And here is our attempt at the assignment!


In [ ]:
#########################################################################3
#       Exercise:  Assignment 
#           
#                 Modify this program so that var1 receives the value n,
#                 where n is the remainder of the process ID divided by 3.
#                 i.e., n = pid % 3


import ipyparallel
import os
import socket

#identify the our python engines
rc=ipyparallel.Client(profile='default')
nengines = len(rc)

#create views into each engine
all_proc  = rc[:]
proc0 = rc[0]
proc2 = rc[2]
even_proc = rc[range(0,nengines,2)]
odd_proc  = rc[range(1,nengines,2)]

#Only the hub prints this
print('\n ',nengines," Python engines are active.\n")

# Each Python engine calls the gethostname and getpid functions
hostnames = all_proc.apply_sync(socket.gethostname)
pids = all_proc.apply_sync(os.getpid)

#Assign a list-value of [0,1] to the variable 'b' on all python engines
all_proc['b']=[0,1]
#We can view the value on all engines
vals = all_proc['b']
print('All values of b: ',vals) # or use all_proc['b']
#or on a single engine:
print('')
print("Engine zero's value of b: ", proc0['b']) #could also use vals[0]

#Assign a value of 1 to var1 on all python engines
all_proc['var1']=1

#Assign a value of 2 to var2 on all python engines
all_proc['var2']=2

#Change the value of var2 to 3 and 4 on engines 0 and 2 respectively
proc0['var2'] = 3
proc2['var2'] =4

even_proc['var3']=0
odd_proc['var3']=1


vars1 = all_proc['var1']
vars2 = all_proc['var2']
vars3 = all_proc['var3']
print(' ')
for i in range(nengines):
    istr = '{:02d}'.format(i)  # returns a 2-digit string whose value is i
    v1str = str(vars1[i])
    v2str = str(vars2[i])
    v3str = str(vars3[i])
    msg = 'Engine '+istr+':   var1 = '+v1str+';  var2 ='+v2str+';  var3 ='+v3str
    print(msg)
print(' ')

for i in range(nengines):
    rc[i]['var1'] = i % 3

vars1 = all_proc['var1']

print("After our change:")
for i in range(nengines):
    istr = '{:02d}'.format(i)  # returns a 2-digit string whose value is i
    v1str = str(vars1[i])
    v2str = str(vars2[i])
    v3str = str(vars3[i])
    msg = 'Engine '+istr+':   var1 = '+v1str+';  var2 ='+v2str+';  var3 ='+v3str
    print(msg)
print(' ')

Scattering and Gathering


In [ ]:
################################################################
#           Example:  Scattering and Gathering Lists in Python
#
#           One common task when parallel programming involves
#           distributing (scattering) a list of numbers among
#           the different processes or collating (gathering)
#           a distributed list of numbers back to the hub processes.
#           This example illustrates the basic mechanics of scattering
#           and gathering.

import ipyparallel

#identify the our python engines
rc  =ipyparallel.Client(profile='default')
nengines = len(rc)

#create views into each engine
all_proc  = rc[:]

# This keeps the engines in-sync with one another
# Engines do not proceed to instruction 'b' until all engines
# have completed instruction 'a.'
all_proc.block=True

a = []
lsize=6*nengines
for i in range(0,lsize):
	a.append(i**2)

#We scatter the list "a" from the hub out to all engines
#Each process stores a portion of "a" locally in the variable "mylist"
all_proc.scatter('mylist',a)

# Create a variable on the controller that holds the contents of 'mylist' for each engine
# sub_lists is a nested list, sub_list[i][:] holds the value 'mylist' for engine 'i'
sub_lists = all_proc['mylist']


#Only the controller prints this
print('\n ',nengines," Python engines are active.\n")

print(' ')
for i in range(nengines):
    istr = '{:02d}'.format(i)  # returns a 2-digit string whose value is i
    msg = 'Engine '+istr+':   list segment = '
    print(msg, sub_lists[i])
print(' ')

#Gather 'mylist' back to the controller, store the contents in a list named gathered
gathered = all_proc.gather('mylist')
print('Gathered list: ', gathered[:], type(gathered))

Scatter/Gather Exercise


In [ ]:
#######################################################################
#
#   Exercise:  Scatter/Gather
#
#   Scatter list a to all even processors, assigning its values to the variable mylist
#   Scatter list b to all odd processors, assigning its values to mylist
#   Gather from all processors to create the list [0,1,0,1,2,3,4,9,....]
#
import ipyparallel

#identify the our python engines
rc  =ipyparallel.Client(profile='default')
nengines = len(rc)

#create views into each engine
all_proc  = rc[:]
all_proc.block=True

even_proc = rc[range(0,nengines,2)]
odd_proc  = rc[range(1,nengines,2)]


a = []
b = []
lsize=nengines
for i in range(0,lsize):
	a.append(i)
	b.append(i**2)

even_proc.scatter('mylist',a)
odd_proc.scatter('mylist',b)

sub_lists_a = even_proc['mylist']
sub_lists_b = odd_proc['mylist']

#Only the hub prints this
print('\n ',nengines," Python engines are active.\n")


print(' ')
for i in range(int(nengines/2)):
    istr = '{:02d}'.format(i)  # returns a 2-digit string whose value is i
    msg = 'Engine '+istr+':   list segment = '
    print(msg, sub_lists_a[i])
    print(msg, sub_lists_b[i])
print(' ')
gathered = all_proc.gather('mylist')
print('Gathered list: ', gathered[:], type(gathered))

Computing Pi with map_sync


In [ ]:
#############################################################################
#       Example:  Parallel computation of pi using IPyParallel
#
#                 This program demonstrates how to evaluate a function in parallel
#                 using map_sync.
#
#                 In this example, each engine computes a unique estimate of pi.
#                 The result is averaged across all engines in the cluster.
#
import ipyparallel
import random
import time
import numpy as np
 
def estimate_pi(n):
    # (a) This function uses the random module.
    # If we import random here, we are guaranteed that
    # all processes will import the module.  However, see (b)

    # import random
    count = 0
    for i in range(n):
        x = random.random()
        y = random.random()
        if (x**2 + y**2) <= 1:
            count += 1
    return 4.0*count/float(n)

rc=ipyparallel.Client(profile='default')
nengines = len(rc)
all_proc = rc[:]
all_proc.block=True

#(b) Alternatively, we can sync our module imports from the hub
with all_proc.sync_imports():
    import random

# First in serial
print('\n\n\n')
print('Serial Estimation of Pi')
print('')

for i in range(8):
    nx = 10**i

    t0 = time.time()

    est_pi = estimate_pi(nx)

    t1 = time.time()
    tval = t1-t0

    msg = 'Estimation based on '+str(10**i)+' points: '
    tmsg = 'Calculation time (seconds) : '
    print(msg,est_pi,tmsg,tval)


print('\n\n\n')
print('Parallel Estimation of Pi')
print('')

#Now in parallel
for i in range(8,10):
    nx = 10**i

    t0 = time.time()

    #[nx//nengines]*nengines creates a list of length nengines, where each element
    # has value nx//nengines.  Each process gets one element of this list 
    # and passes it to estimate_pi

    pi_estimates = all_proc.map_sync(estimate_pi, [nx//nengines]*nengines)

    est_pi = np.mean(pi_estimates)

    t1 = time.time()
    tval = t1-t0

    msg = 'Estimation based on '+str(10**i)+' points: '
    tmsg = 'Calculation time (seconds) : '
    print(msg,est_pi,tmsg,tval)

In [ ]:
##########################################################3
#       Exercise:   Parallel Function Evaluation
#
#       Modify this code as follows:
#
#        i) Have squared return the tuple ( x^2,  process ID).  
#       ii) Revise the formatting of the loop output to read as follows:                                                
#                  47^2 is 2209 according to process PID: 192159


import ipyparallel
import os

def squared(x):
    import os
    return (x**2, os.getpid())

#Initialize Communication
rc  = ipyparallel.Client(profile='default')
nengines = len(rc)

#create views into each engine
all_proc  = rc[:]
all_proc.block=True

#Only the hub prints this
print('\n ',nengines," Python engines are active.\n")

n=nengines*4

#Engine 0 gets arguments [0,1,2,3]
#Engine 1 gets arguments [4,5,6,7] etc.
results = all_proc.map_sync(squared,range(n))
print(results)

# We can use the process ID's to create a dictionary mapping
# process ID to engine #
pids = all_proc.apply_sync(os.getpid)
pdict = {}
for i,p in enumerate(pids):
    pdict[p] = i

for i in range(n):
    istr = '{:4d}'.format(i) 
    vstr = '{:4d}'.format(results[i][0])
    pstr = '{:6d}'.format(results[i][1])
    # Leave this line alone; results[i][1] will hold a process ID, which we then use to
    # access the engine ID in the pdict dictionary
    estr = '{:2d}'.format(pdict[results[i][1]])  
    msg = istr+'^2 is '+vstr+' according to process PID: '+pstr + ', Engine: '+estr
    print(msg)

In [ ]:
################################################################
#
#   Exercise:  Modify this code so that the calculation of 
#              Collatz-sequence lengths is distributed across
#              multiple processors. 
#              Perform the calculation in serial and in parallel and 
#              measure the time required in each instance
#
import time

def collatz_length(n):
    val = n
    length = 1
    while (val != 1):
        length += 1
        if ((val % 2) == 0):
            val = val//2
        else:
            val =3*val+1
        #print(val)
    return (n, length)


n = 100000

t0 = time.time()
max_len = 1
max_i = 1
for i in range(1,n):
    val, length = collatz_length(i)
    if (length > max_len):
        max_i = i
        max_len = length
t1 = time.time()
serial_t = t1 - t0

print("The serial answer is:")
msg = 'The integer with the longest collatz sequence in the interval [1,'+str(n)+'] is '
msg2 = str(max_i)+', whose sequence has length '+str(max_len)+'.'
print(msg+msg2)

print("The parallel answer is:")

t0 = time.time()

#Initialize Communication
rc  = ipyparallel.Client(profile='default')
nengines = len(rc)

#create views into each engine
all_proc  = rc[:]
all_proc.block=True

#Only the hub prints this
print('\n ',nengines," Python engines are active.\n")


#Engine 0 gets arguments [0,1,2,3, (one-quarter of n)]
#Engine 1 gets arguments [4,5,6,7] etc.
results = all_proc.map_sync(collatz_length,range(1,n))
arr_results = np.array(results)
ans_tuple = np.squeeze(arr_results[np.where(arr_results[:,1] == np.max(arr_results[:,1]))])
print(ans_tuple)
print("The longest length is: %d for number= %d" % (ans_tuple[1], ans_tuple[0]))

t1 = time.time()
parallel_t = t1 - t0

print("Serial   time = %f" % serial_t)
print("Parallel time = %f" % parallel_t)

Nick's answer is:


In [ ]:
import ipyparallel
import numpy as np
import time
def collatz_length(n):
    val = n
    length = 1
    while (val != 1):
        length += 1
        if ((val % 2) == 0):
            val = val//2
        else:
            val =3*val+1
    return length


n = 100000
max_len = 1
max_i = 1

print('\n')
print('...........................')
print('Serial Calculation')
print('')


t0 = time.time()
for i in range(1,n):
    length = collatz_length(i)
    if (length > max_len):
        max_i = i
        max_len = length
t1 = time.time()
dt = t1-t0
msg = 'The integer with the longest collatz sequence in the interval [1,'+str(n)+'] is '
msg2 = str(max_i)+', whose sequence has length '+str(max_len)+'.'
print(msg+msg2)
print('Calculation time (serial; seconds): ', dt)

rc=ipyparallel.Client(profile='default')
nengines = len(rc)
all_proc = rc[:]
all_proc.block=True

print('\n')
print('...........................')
print('Parallel Calculation')
print('')

#Generate a list of numbers whose collatz-sequence lengths we want to know
numbers=[]
for i in range(1,n):
    numbers.append(i)

# Map the list of numbers to all processes, evaluate collatz_length
# using numbers as arguments.

t0 = time.time()
lengths = all_proc.map_sync(collatz_length, numbers)
max_len=np.max(lengths)
# search the list for the index corresponding to max_len; the number corresponding to that length is index+1
max_i = lengths.index(max_len)+1  
t1 = time.time()
dt=t1-t0


msg = 'The integer with the longest collatz sequence in the interval [1,'+str(n)+'] is '
msg2 = str(max_i)+', whose sequence has length '+str(max_len)+'.'
print(msg+msg2)
print('Calculation time (parallel; seconds): ', dt)

In [ ]:
new = np.reshape(np.arange(10),(5,2))
print(new)

In [ ]:
test[:,1]

In [ ]: