In [14]:
import numpy
from IPython import parallel
import KMeansBase
import KMeansPP
In [2]:
rc = parallel.Client()
# load balanced view
d_view = rc[:]
# do synchronized processing
d_view.block=True
# import Numpy
with d_view.sync_imports():
import numpy
In [3]:
def initial_centroids(data, k):
''' select k random data points from the data'''
return data[numpy.random.choice(range(data.shape[0]), k, replace=False)]
def compare_centroids(old_centroids, new_centroids, precision=-1):
if precision == -1:
return numpy.array_equal(old_centroids, new_centroids)
else:
diff = numpy.sum((new_centroids - old_centroids)**2, axis=1)
if numpy.max(diff) <= precision:
return True
else:
return False
def lloyds_iteration(data, centroids):
# find the Euclidean distance between a center and a data point
# centroids array shape = k x m
# data array shape = n x m
# In order to broadcast it, we have to introduce a third dimension into data
# data array becomes n x 1 x m
# now as a result of broadcasting, both array sizes will be n x k x m
data_ex = data[:, numpy.newaxis, :]
euclidean_dist = (data_ex - centroids) ** 2
# now take the summation of all distances along the 3rd axis(length of the dimension is m).
# This will be the total distance from each centroid for each data point.
# resulting vector will be of size n x k
distance_arr = numpy.sum(euclidean_dist, axis=2)
# now we need to find out to which cluster each data point belongs.
# Use a matrix of n x k where [i,j] = 1 if the ith data point belongs
# to cluster j.
min_location = numpy.zeros(distance_arr.shape)
min_location[range(distance_arr.shape[0]), numpy.argmin(distance_arr, axis=1)] = 1
# calculate J
j_val = numpy.sum(distance_arr[min_location == True])
# calculates the new centroids
new_centroids = numpy.empty(centroids.shape)
membership_count = numpy.empty(centroids.shape[0])
for col in range(0, centroids.shape[0]):
new_centroids[col] = numpy.mean(data[min_location[:, col] == True, :], axis=0)
membership_count[col] = numpy.count_nonzero(min_location[:, col])
return {'j-value':j_val, 'centroids':new_centroids, 'element-count':membership_count}
def ScalableKMeansPP(data, l, centroids):
data_ex = data[:, numpy.newaxis, :]
euclidean_dist = (data_ex - centroids) ** 2
distance_arr = numpy.sum(euclidean_dist, axis=2)
# find the minimum distance, this will be the weight
min = numpy.min(distance_arr, axis=1).reshape(-1, 1)
# let's use weighted reservoir sampling algorithm to select l centroids
random_numbers = numpy.random.rand(min.shape[0], min.shape[1])
# replace zeros in min if available with the lowest positive float in Python
min[numpy.where(min==0)] = numpy.nextafter(0,1)
# take the n^th root of random numbers where n is the weights
with numpy.errstate(all='ignore'):
random_numbers = random_numbers ** (1.0/min)
# pick the highest l
cent = data[numpy.argsort(random_numbers, axis=0)[:, 0]][::-1][:l, :]
# combine the new set of centroids with the previous set
centroids = numpy.vstack((centroids, cent))
# now we have the initial set of centroids which is higher than k.
# we should reduce this to k using scalable K-Means++
euclidean_dist = (data_ex - centroids) ** 2
distance_arr = numpy.sum(euclidean_dist, axis=2)
min_location = numpy.zeros(distance_arr.shape)
min_location[range(distance_arr.shape[0]), numpy.argmin(distance_arr, axis=1)] = 1
weights = numpy.array([numpy.count_nonzero(min_location[:, col]) for col in range(centroids.shape[0])]).reshape(-1,1)
return {'centroids': centroids, 'weights': weights}
In [ ]:
data = numpy.random.randn(1000000,2)
# distribute the data among the engines
d_view.scatter('data', data)
# first pick a random centroid. Ask one engine to pick the first random centroid
centroids = rc[0].apply(initial_centroids, parallel.Reference('data'),1).get()
r = 3
l = 2
k = 4
passes = 0
while passes < r:
result = d_view.apply(ScalableKMeansPP, parallel.Reference('data'), l, centroids)
print('centroids from one engine: ', result[0]['centroids'].shape)
# combine the centroids for the next iteration
centroids = numpy.vstack(r['centroids'] for r in result)
passes += 1
# next step is to calculate k centroids out of the centroids returned by each engine
# for this we use KMeans++
weights = numpy.vstack(r['weights'] for r in result)
kmeans_pp = KMeansPP.KMeansPP(weights, k)
_, _, _, min_locations = kmeans_pp.cluster()
# calculates the new centroids
new_centroids = numpy.empty((k, data.shape[1]))
for col in range(0, k):
new_centroids[col] = numpy.mean(centroids[min_locations[:, col] == True, :], axis=0)
centroids = new_centroids
# now do the lloyd's iterations
stabilized = False
iterations = 0
while not stabilized:
iterations += 1
ret_vals = d_view.apply(lloyds_iteration, parallel.Reference('data'), centroids)
member_count = numpy.sum(numpy.array([r['element-count'] for r in ret_vals]).reshape(len(ret_vals),-1),axis=0)
local_sum = numpy.array([r['centroids'] * r['element-count'].reshape(-1,1) for r in ret_vals])
new_centroids = numpy.sum(local_sum, axis=0)/member_count.reshape(-1,1)
if compare_centroids(centroids, new_centroids):
stabilized = True
else:
centroids = new_centroids
print('Iterations:', iterations)
In [4]:
In [4]: