In [14]:
import numpy
from IPython import parallel
import KMeansBase
import KMeansPP

In [2]:
rc = parallel.Client()
# load balanced view
d_view = rc[:]
# do synchronized processing
d_view.block=True

# import Numpy
with d_view.sync_imports():
    import numpy


importing numpy on engine(s)

In [3]:
def initial_centroids(data, k):
    ''' select k random data points from the data'''
    return data[numpy.random.choice(range(data.shape[0]), k, replace=False)]

def compare_centroids(old_centroids, new_centroids, precision=-1):
    if precision == -1:
        return numpy.array_equal(old_centroids, new_centroids)
    else:
        diff = numpy.sum((new_centroids - old_centroids)**2, axis=1)
        if numpy.max(diff) <= precision:
            return True
        else:
            return False

def lloyds_iteration(data, centroids):
    # find the Euclidean distance between a center and a data point
    # centroids array shape = k x m
    # data array shape = n x m
    # In order to broadcast it, we have to introduce a third dimension into data
    # data array becomes n x 1 x m
    # now as a result of broadcasting, both array sizes will be n x k x m
    data_ex = data[:, numpy.newaxis, :]
    euclidean_dist = (data_ex - centroids) ** 2
    # now take the summation of all distances along the 3rd axis(length of the dimension is m).
    # This will be the total distance from each centroid for each data point.
    # resulting vector will be of size n x k
    distance_arr = numpy.sum(euclidean_dist, axis=2)

    # now we need to find out to which cluster each data point belongs.
    # Use a matrix of n x k where [i,j] = 1 if the ith data point belongs
    # to cluster j.
    min_location = numpy.zeros(distance_arr.shape)
    min_location[range(distance_arr.shape[0]), numpy.argmin(distance_arr, axis=1)] = 1

    # calculate J
    j_val = numpy.sum(distance_arr[min_location == True])

    # calculates the new centroids
    new_centroids = numpy.empty(centroids.shape)
    membership_count = numpy.empty(centroids.shape[0])
    for col in range(0, centroids.shape[0]):
        new_centroids[col] = numpy.mean(data[min_location[:, col] == True, :], axis=0)
        membership_count[col] = numpy.count_nonzero(min_location[:, col])
        
    return {'j-value':j_val, 'centroids':new_centroids, 'element-count':membership_count}

def ScalableKMeansPP(data, l, centroids):
    data_ex = data[:, numpy.newaxis, :]
    euclidean_dist = (data_ex - centroids) ** 2
    distance_arr = numpy.sum(euclidean_dist, axis=2)
    # find the minimum distance, this will be the weight
    min = numpy.min(distance_arr, axis=1).reshape(-1, 1)
    # let's use weighted reservoir sampling algorithm to select l centroids
    random_numbers = numpy.random.rand(min.shape[0], min.shape[1])
    # replace zeros in min if available with the lowest positive float in Python
    min[numpy.where(min==0)] = numpy.nextafter(0,1)
    # take the n^th root of random numbers where n is the weights
    with numpy.errstate(all='ignore'):
        random_numbers = random_numbers ** (1.0/min)
    # pick the highest l
    cent = data[numpy.argsort(random_numbers, axis=0)[:, 0]][::-1][:l, :]
    # combine the new set of centroids with the previous set
    centroids = numpy.vstack((centroids, cent))
    # now we have the initial set of centroids which is higher than k.
    # we should reduce this to k using scalable K-Means++
    euclidean_dist = (data_ex - centroids) ** 2
    distance_arr = numpy.sum(euclidean_dist, axis=2)
    min_location = numpy.zeros(distance_arr.shape)
    min_location[range(distance_arr.shape[0]), numpy.argmin(distance_arr, axis=1)] = 1
    weights = numpy.array([numpy.count_nonzero(min_location[:, col]) for col in range(centroids.shape[0])]).reshape(-1,1)
    return {'centroids': centroids, 'weights': weights}

In [ ]:
data = numpy.random.randn(1000000,2)
# distribute the data among the engines
d_view.scatter('data', data)

# first pick a random centroid. Ask one engine to pick the first random centroid
centroids = rc[0].apply(initial_centroids, parallel.Reference('data'),1).get()

r = 3
l = 2
k = 4
passes = 0
while passes < r:
    result = d_view.apply(ScalableKMeansPP, parallel.Reference('data'), l, centroids)
    print('centroids from one engine: ', result[0]['centroids'].shape)
    # combine the centroids for the next iteration
    centroids = numpy.vstack(r['centroids'] for r in result)
    passes += 1
# next step is to calculate k centroids out of the centroids returned by each engine
# for this we use KMeans++
weights = numpy.vstack(r['weights'] for r in result)
kmeans_pp = KMeansPP.KMeansPP(weights, k)
_, _, _, min_locations = kmeans_pp.cluster()
# calculates the new centroids
new_centroids = numpy.empty((k, data.shape[1]))
for col in range(0, k):
    new_centroids[col] = numpy.mean(centroids[min_locations[:, col] == True, :], axis=0)

centroids = new_centroids
# now do the lloyd's iterations
stabilized = False
iterations = 0
while not stabilized:
    iterations += 1
    ret_vals = d_view.apply(lloyds_iteration, parallel.Reference('data'), centroids)
    member_count = numpy.sum(numpy.array([r['element-count'] for r in ret_vals]).reshape(len(ret_vals),-1),axis=0)
    local_sum = numpy.array([r['centroids'] * r['element-count'].reshape(-1,1) for r in ret_vals])
    new_centroids = numpy.sum(local_sum, axis=0)/member_count.reshape(-1,1)
    if compare_centroids(centroids, new_centroids):
        stabilized = True
    else:
        centroids = new_centroids

print('Iterations:', iterations)


centroids from one engine:  (3, 2)
centroids from one engine:  (14, 2)
centroids from one engine: 

In [4]:


In [4]: