In [28]:
import numpy as np
from IPython import parallel

In [43]:
rc = parallel.Client()
# load balanced view
d_view = rc[:]
# do synchronized processing
d_view.block=True

# import Numpy
with d_view.sync_imports():
    import numpy


importing numpy on engine(s)

In [126]:
def initial_centroids(data, k):
    ''' select k random data points from the data'''
    return data[numpy.random.choice(range(data.shape[0]), k, replace=False)]

def compare_centroids(old_centroids, new_centroids, precision=-1):
    if precision == -1:
        return np.array_equal(old_centroids, new_centroids)
    else:
        diff = np.sum((new_centroids - old_centroids)**2, axis=1)
        if np.max(diff) <= precision:
            return True
        else:
            return False

def lloyds_iteration(data, centroids):
    # find the Euclidean distance between a center and a data point
    # centroids array shape = k x m
    # data array shape = n x m
    # In order to broadcast it, we have to introduce a third dimension into data
    # data array becomes n x 1 x m
    # now as a result of broadcasting, both array sizes will be n x k x m
    data_ex = data[:, numpy.newaxis, :]
    euclidean_dist = (data_ex - centroids) ** 2
    # now take the summation of all distances along the 3rd axis(length of the dimension is m).
    # This will be the total distance from each centroid for each data point.
    # resulting vector will be of size n x k
    distance_arr = numpy.sum(euclidean_dist, axis=2)

    # now we need to find out to which cluster each data point belongs.
    # Use a matrix of n x k where [i,j] = 1 if the ith data point belongs
    # to cluster j.
    min_location = numpy.zeros(distance_arr.shape)
    min_location[range(distance_arr.shape[0]), numpy.argmin(distance_arr, axis=1)] = 1

    # calculate J
    j_val = numpy.sum(distance_arr[min_location == True])

    # calculates the new centroids
    new_centroids = numpy.empty(centroids.shape)
    membership_count = numpy.empty(centroids.shape[0])
    for col in range(0, centroids.shape[0]):
        new_centroids[col] = numpy.mean(data[min_location[:, col] == True, :], axis=0)
        membership_count[col] = numpy.count_nonzero(min_location[:, col])
        
    return {'j-value':j_val, 'centroids':new_centroids, 'element-count':membership_count}

In [130]:
data = np.random.randn(1000000,2)
# distribute the data among the engines
d_view.scatter('data', data)
centroids = np.array(d_view.apply(initial_centroids, parallel.Reference('data'),2)).reshape(4,2)
stabilized = False
iterations = 0
while not stabilized:
    iterations += 1
    ret_vals = d_view.apply(lloyds_iteration, parallel.Reference('data'), centroids)
    member_count = np.sum(np.array([r['element-count'] for r in ret_vals]).reshape(len(ret_vals),-1),axis=0)
    local_sum = np.array([r['centroids'] * r['element-count'].reshape(-1,1) for r in ret_vals])
    new_centroids = np.sum(local_sum, axis=0)/member_count.reshape(-1,1)
    if compare_centroids(centroids, new_centroids):
        stabilized = True
    else:
        centroids = new_centroids

print('Iterations:', iterations)


Iterations: 178

In [69]:


In [ ]: