In [28]:
import numpy as np
from IPython import parallel
In [43]:
rc = parallel.Client()
# load balanced view
d_view = rc[:]
# do synchronized processing
d_view.block=True
# import Numpy
with d_view.sync_imports():
import numpy
In [126]:
def initial_centroids(data, k):
''' select k random data points from the data'''
return data[numpy.random.choice(range(data.shape[0]), k, replace=False)]
def compare_centroids(old_centroids, new_centroids, precision=-1):
if precision == -1:
return np.array_equal(old_centroids, new_centroids)
else:
diff = np.sum((new_centroids - old_centroids)**2, axis=1)
if np.max(diff) <= precision:
return True
else:
return False
def lloyds_iteration(data, centroids):
# find the Euclidean distance between a center and a data point
# centroids array shape = k x m
# data array shape = n x m
# In order to broadcast it, we have to introduce a third dimension into data
# data array becomes n x 1 x m
# now as a result of broadcasting, both array sizes will be n x k x m
data_ex = data[:, numpy.newaxis, :]
euclidean_dist = (data_ex - centroids) ** 2
# now take the summation of all distances along the 3rd axis(length of the dimension is m).
# This will be the total distance from each centroid for each data point.
# resulting vector will be of size n x k
distance_arr = numpy.sum(euclidean_dist, axis=2)
# now we need to find out to which cluster each data point belongs.
# Use a matrix of n x k where [i,j] = 1 if the ith data point belongs
# to cluster j.
min_location = numpy.zeros(distance_arr.shape)
min_location[range(distance_arr.shape[0]), numpy.argmin(distance_arr, axis=1)] = 1
# calculate J
j_val = numpy.sum(distance_arr[min_location == True])
# calculates the new centroids
new_centroids = numpy.empty(centroids.shape)
membership_count = numpy.empty(centroids.shape[0])
for col in range(0, centroids.shape[0]):
new_centroids[col] = numpy.mean(data[min_location[:, col] == True, :], axis=0)
membership_count[col] = numpy.count_nonzero(min_location[:, col])
return {'j-value':j_val, 'centroids':new_centroids, 'element-count':membership_count}
In [130]:
data = np.random.randn(1000000,2)
# distribute the data among the engines
d_view.scatter('data', data)
centroids = np.array(d_view.apply(initial_centroids, parallel.Reference('data'),2)).reshape(4,2)
stabilized = False
iterations = 0
while not stabilized:
iterations += 1
ret_vals = d_view.apply(lloyds_iteration, parallel.Reference('data'), centroids)
member_count = np.sum(np.array([r['element-count'] for r in ret_vals]).reshape(len(ret_vals),-1),axis=0)
local_sum = np.array([r['centroids'] * r['element-count'].reshape(-1,1) for r in ret_vals])
new_centroids = np.sum(local_sum, axis=0)/member_count.reshape(-1,1)
if compare_centroids(centroids, new_centroids):
stabilized = True
else:
centroids = new_centroids
print('Iterations:', iterations)
In [69]:
In [ ]: