In [1]:
import numpy as np
from sklearn import datasets
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt1
import timeit
import sys
import os
from sklearn.cross_validation import KFold
from collections import OrderedDict
import operator
import random
from sklearn.cluster import KMeans
import numpy as np
import scipy.linalg as LA
import scipy.sparse
import sklearn.utils.arpack as SLA
from sklearn.base import ClassifierMixin
from sklearn.base import BaseEstimator
from sklearn.manifold import spectral_embedding
from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel
import sklearn.metrics.pairwise as pairwise
from sklearn import decomposition as pca
from scipy import interpolate as ip
import sklearn.mixture as mixture
import sys
from sklearn.metrics.pairwise import chi2_kernel
from sklearn.neighbors import DistanceMetric
from pyspark.sql import SQLContext
from pyspark.sql.types import *
%matplotlib inline
dataX,dataY=datasets.make_blobs(n_samples=20000, n_features=10, centers=2, cluster_std=1.5, center_box=(-10.0, 10.0), shuffle=True, random_state=None)
def labelremover(X,y):
newX1 = np.around(X,decimals=2)
newY1=np.copy(y)
dim = X.shape[1]
points = np.array(np.empty(len(np.unique(y))))
knownX = np.empty((len(points),dim))
knownY = np.empty(len(points))
for i in np.unique(y):
points[i] = np.where(y==(i))[0][0]
for j in np.arange(0,len(newY1)):
newY1[j]=-1
for k in np.unique(y):
newY1[points[k]] = y[points[k]]
knownX = X[[i for i in points]]
knownY = y[[i for i in points]]
print "These are labels of known points: "+ str(knownY)
return (newY1, knownX, knownY)
trainX = dataX[0:18000,:]
trainY = dataY[0:18000]
testX = dataX[18000:20000,:]
testY = dataY[18000:20000]
newtrainY, knownX, knownY = labelremover(trainX,trainY)
In [2]:
X = trainX
y = newtrainY
classes = np.sort(np.unique(y))
PCA = pca.PCA(n_components=X.shape[1])
rotatedData = PCA.fit_transform(X)
#plt.scatter(rotatedData[:,0], rotatedData[:,1], c=trainY, cmap = (('ocean')))
X_ = rotatedData
dim = X_.shape[1]
classes = np.delete(classes, 0) # remove the -1 from this list
k = np.size(classes)
numBins =k+1
def _get_kernel(X, y = None,ker=None):
if ker == "rbf":
if y is None:
return pairwise.rbf_kernel(X, X, gamma = 625)
else:
return pairwise.rbf_kernel(X, y, gamma = 625)
elif ker == "linear":
dist = DistanceMetric.get_metric('euclidean')
if y is None:
return pairwise.euclidean_distances(X, X)
#return dist.pairwise(X)
else:
return pairwise.euclidean_distances(X, y)
#return dist.pairwise(X)
else:
raise ValueError("is not a valid kernel. Only rbf and euclidean"
" are supported at this time")
sig = np.zeros(dim)
gee = np.zeros((numBins,dim))
b_edgeMeans = np.zeros((numBins,dim))
interpolators = []
index = []
for i in range(dim):
histograms,binEdges = np.histogram(X_[:,i],bins=numBins,density=True)
#add 0.01 to histograms and normalize it
histograms = histograms+ 0.01
#print histograms
histograms /= histograms.sum()
# calculating means on the bin edges as the x-axis for the interpolators
b_edgeMeans[:,i] = np.array([binEdges[j:j + 2].mean() for j in range(binEdges.shape[0] - 1)])
#print b_edgeMeans
#get D~, P, W~
'''
Wdis = Affinity between discrete points.
Since Affinity matrix will be build on one histogram at a time. I am using pairwise linear- kernel affinities
P = Diagonal matrix of histograms
Ddis = Diagonal matrix whose diagonal elements are the sum of the columns of PW~P
Dhat = Diagonal matrix whose diagonal elements are the sum of columns of PW~
'''
Wdis = _get_kernel(histograms.reshape(histograms.shape[0],1),y=None,ker="linear")
#print Wdis
P = np.diag(histograms)
#print P
Ddis = np.diag(np.sum((P.dot(Wdis.dot(P))),axis=0))
#print Ddis
Dhat = np.diag(np.sum(P.dot(Wdis),axis=0))
#print Dhat
#Creating generalized eigenfunctions and eigenvalues from histograms.
sigmaVals, functions = scipy.linalg.eig((Ddis-(P.dot(Wdis.dot(P)))),(P.dot(Dhat)))
#print functions.shape
#print("eigenValue"+repr(i)+": "+repr(np.real(sigmaVals[0:self.k]))+"Eigenfunctions"+repr(i)+": "+repr(np.real(functions[:,0:self.k])))
#print np.sort(np.real(sigmaVals))
arg = np.argsort(np.real(sigmaVals))[1]
index.append(arg)
sig[i] = np.real(sigmaVals)[arg]
gee[:,i] = np.real(functions)[:,arg]
#b_edge_for_test[:,i] = b_edgeMeans[:,arg]
#print "===="
'''for i in index:
plt.hist(X_[:,i], bins = numBins)
plt.axis([binEdges.min()-5, binEdges.max()+5, 0, X_.shape[0]])
plt.show()
'''
if np.isnan(np.min(sig)):
nan_num = np.isnan(sig)
sig[nan_num] = 0
newsig = np.zeros(k)
newg = np.zeros((numBins,k))
newEdgeMeans = np.zeros((numBins,k))
transformeddata = np.zeros((X_.shape[0],k))
approxValues = np.zeros((X_.shape[0],k))
def get_transformed_data(ori_data,edge_means,i):
dim = edge_means.shape[1]
transformeddata = np.empty((ori_data.shape[0],dim))
if ori_data[:,i].min() < edge_means[:,i].min() or ori_data[:,i].max() > edge_means[:,i].max():
ls=[]
for num in ori_data[:,i]:
val = (((edge_means[:,i].max()-edge_means[:,i].min())*(num - ori_data[:,i].min()))/(ori_data[:,i].max() - ori_data[:,i].min())) + edge_means[:,i].min()
if num==ori_data[:,i].min():
val = val + 0.001
if num==ori_data[:,i].max():
val = val - 0.001
ls.append(val)
return np.array(ls)
#transformeddata[i,:] = transformer(b_edgeMeans[i].min(), b_edgeMeans[i].max(), self.X_[:,i])
else:
print("within range")
return ori_data[:,i]
#selecting the first k eigenvalues and corresponding eigenvectors from all dimensions
ind = np.argsort(sig)[0:k]
print ind
newsig = sig[ind]
newg = gee[:,ind]
newEdgeMeans = b_edgeMeans[:,ind]
allinterpolators = []
b_edge_for_test = np.zeros((numBins,dim))
for i in range(0,k):
interpolators.append(ip.interp1d(newEdgeMeans[:,i], newg[:,i]))
transformeddata[:,i] = get_transformed_data(X_,newEdgeMeans,i)
approxValues[:,i] = interpolators[i](transformeddata[:,i])
U = approxValues
S = np.diag(newsig)
V = np.diag(np.zeros(np.size(y)))
labeled = np.where(y != -1)
V[labeled, labeled] = 10
# Solve for alpha and use it to compute the eigenfunctions, f.
A = S + np.dot(np.dot(U.T, V), U)
if np.linalg.det(A) == 0:
A = A + np.eye(A.shape[1])*0.000001
b = np.dot(np.dot(U.T, V), y)
#print "this is A: "+ str(A)
#print "this is b: "+ str(b)
alpha = np.linalg.solve(A, b)
#labels_ = solver(U, alpha)
f = np.dot(U,alpha)
f = f.reshape((f.shape[0],-1))
# Set up a GMM to assign the hard labels from the eigenfunctions.
g = mixture.GMM(n_components = np.size(classes),n_iter=5000, covariance_type='full',min_covar=0.0000001)
g.fit(f)
labels_ = g.predict(f)
means = np.argsort(g.means_.flatten())
for i in range(0, np.size(labels_)):
labels_[i] = np.where(means == labels_[i])[0][0]
#===========================================================================
tx = PCA.transform(testX)
approxVec1 = np.zeros((tx.shape[0],k))
allpoints = np.zeros((tx.shape[0],k))
for i in range(k):
allpoints[:,i] = get_transformed_data(tx[:,0:k], newEdgeMeans,i)
#print allpoints
for p in range(tx.shape[0]):
kpoints = allpoints[p]
#print kpoints
for d in range(len(kpoints)):
val = kpoints[d]
approxVec1[p,d] = interpolators[d](val)
tf = np.dot(approxVec1,alpha)
tf = tf.reshape((tf.shape[0],-1))
tlabels_ = g.predict(tf)
for i in range(0, np.size(tlabels_)):
tlabels_[i] = np.where(means == tlabels_[i])[0][0]
#===========================================================================
In [3]:
#Distributed code
X = sc.parallelize(trainX)
y = sc.parallelize(newtrainY)
n = X.count()
#make it accessible
dimensions= trainX.shape[1]
k=-1
numBins= -1
classes = sorted(y.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).map(lambda (x,arr): x).collect())
if classes[0] == -1:
classes = np.delete(classes, 0) # remove the -1 from this list
if k == -1:
k = np.size(classes)
if numBins == -1:
numBins = k
g = mixture.GMM(n_components = np.size(classes),n_iter=5000, covariance_type='diag',min_covar=0.0000001)
numBins = k+1
kb = sc.broadcast(k)
#pca_data = pca.PCA(n_components=X.shape[1])
from pyspark.mllib.feature import PCA as PCAmllib
from pyspark.mllib.linalg import Vectors
XforPCA = X.map(lambda rw: Vectors.dense(rw))
PCA = PCAmllib(dimensions).fit(XforPCA)
rotatedData = PCA.transform(XforPCA)
def makeDF(rotatedData):
X_ = rotatedData.map(lambda vec: np.array(vec))
dataAsDict = X_.map(lambda x: tuple(float(f) for f in x))
schemaString = ""
for i in range(dimensions):
i = i+1
schemaString += str(i) + " "
schemaString = schemaString.strip()
fields = [StructField(field_name,FloatType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
return sqlContext.createDataFrame(dataAsDict, schema)
dictData = makeDF(rotatedData)
for i in range(dimensions):
#i=0
s = str(i+1)
dimRdd = dictData.select(s)
binEdges,histograms = dimRdd.rdd.map(lambda x: x.asDict().values()[0]).histogram(numBins)
from numpy.core.numeric import array
histograms = array(histograms)
binEdges = np.array(binEdges)
db= array(np.diff(binEdges),float)
histograms = histograms/db/histograms.sum()
histograms = histograms+ 0.01
#print histograms
histograms /= histograms.sum()
# calculating means on the bin edges as the x-axis for the interpolators
b_edgeMeans[:,i] = np.array([binEdges[j:j + 2].mean() for j in range(binEdges.shape[0] - 1)])
#print b_edgeMeans
#get D~, P, W~
'''
Wdis = Affinity between discrete points.
Since Affinity matrix will be build on one histogram at a time. I am using pairwise linear- kernel affinities
P = Diagonal matrix of histograms
Ddis = Diagonal matrix whose diagonal elements are the sum of the columns of PW~P
Dhat = Diagonal matrix whose diagonal elements are the sum of columns of PW~
'''
Wdis = _get_kernel(histograms.reshape(histograms.shape[0],1),y=None,ker="linear")
#print Wdis
P = np.diag(histograms)
#print P
Ddis = np.diag(np.sum((P.dot(Wdis.dot(P))),axis=0))
#print Ddis
Dhat = np.diag(np.sum(P.dot(Wdis),axis=0))
#print Dhat
#Creating generalized eigenfunctions and eigenvalues from histograms.
sigmaVals, functions = scipy.linalg.eig((Ddis-(P.dot(Wdis.dot(P)))),(P.dot(Dhat)))
#print functions.shape
#print("eigenValue"+repr(i)+": "+repr(np.real(sigmaVals[0:self.k]))+"Eigenfunctions"+repr(i)+": "+repr(np.real(functions[:,0:self.k])))
#print np.sort(np.real(sigmaVals))
arg = np.argsort(np.real(sigmaVals))[1]
index.append(arg)
sig[i] = np.real(sigmaVals)[arg]
gee[:,i] = np.real(functions)[:,arg]
if np.isnan(np.min(sig)):
nan_num = np.isnan(sig)
sig[nan_num] = 0
newsig = np.zeros(k)
newg = np.zeros((numBins,k))
newEdgeMeans = np.zeros((numBins,k))
approxVec=[]
ind = np.argsort(sig)[0:k]
#print ind
newsig = sig[ind]
newg = gee[:,ind]
newEdgeMeans = b_edgeMeans[:,ind]
bc_EdgeMeans = sc.broadcast(newEdgeMeans)
bc_newg = sc.broadcast(newg)
from pyspark.sql import Row
from pyspark.mllib.linalg.distributed import IndexedRow, RowMatrix, IndexedRowMatrix
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
def getdataboundaries(dictData):
dataBounds = OrderedDict()
for i in range(0,k):
s = str(i+1)
tmprdd = dictData.select(s).rdd.map(lambda row: row.asDict().values()[0])
dataBounds[i] = (tmprdd.min(),tmprdd.max())
return dataBounds
dataBounds = getdataboundaries(dictData)
b_bounds = sc.broadcast(dataBounds)
def selectValues(ddict):
desired_keys = sorted([int(k) for k in ddict.keys()])[0:kb.value]
newddict = { i: ddict[str(i)] for i in desired_keys }
return newddict
makeItMatrix = RowMatrix(dictData.rdd.map(lambda row: selectValues(row.asDict()).values()))
def transformer(vec, bounds):
vec1 = vec.toArray()
tmpArr = np.zeros(vec1.shape)
edge_means = bc_EdgeMeans.value
for i in range(len(vec1)):
#inter1 = ip.interp1d([minVal,maxVal], [edge_means[:,i].min(),edge_means[:,i].max()])
inter2 = ip.interp1d(edge_means[:,i], bc_newg.value[:,i])
(minVal,maxVal) = bounds.value[i]
if (minVal < edge_means[:,i].min()) or (maxVal > edge_means[:,i].max()):
val = (((edge_means[:,i].max()-edge_means[:,i].min())*(vec1[i] - minVal))/(maxVal - minVal)) + edge_means[:,i].min()
if vec1[i]==minVal:
val = val + 0.001
if vec1[i]==maxVal:
val = val - 0.001
else:
val = vec1[i]
tmpArr[i] = inter2(val)
return DenseVector(tmpArr)
approxValues = makeItMatrix.rows.map(lambda rw: transformer(rw, b_bounds))
# U: k eigenvectors corresponding to smallest eigenvalues. (n_samples by k)
# S: Diagonal matrix of k smallest eigenvalues. (k by k)
# V: Diagonal matrix of LaGrange multipliers for labeled data, 0 for unlabeled. (n_samples by n_samples)
lagrange = 10.0
def indRowMatMaker(irm):
return IndexedRowMatrix(irm.zipWithIndex().map(lambda x:IndexedRow(x[1],x[0])))
U = indRowMatMaker(approxValues)
S = np.diag(newsig)
#Sblk = indRowMatMaker(S).toBlockMatrix()
labeled_ind = np.array(y.zipWithIndex().filter(lambda (a,b): a!=-1).map(lambda (a,b): b).collect())
matent = []
for i in labeled_ind:
matent.append(MatrixEntry(i,i,lagrange))
V = CoordinateMatrix(sc.parallelize(matent),numRows=n, numCols=n)
Utrans = U.toCoordinateMatrix().transpose()
Ublk = U.toBlockMatrix()
#A = S + np.dot(np.dot(U.T, V), U)
product1 = Utrans.toBlockMatrix().multiply(V.toBlockMatrix())
product2 = product1.multiply(Ublk)
localblk = product2.toLocalMatrix().toArray()
A = S + localblk
if np.linalg.det(A) == 0:
A = A + np.eye(A.shape[1])*0.000001
ycoord = CoordinateMatrix(y.zipWithIndex().map(lambda x: MatrixEntry(x[1],0,x[0]))).toBlockMatrix()
b = product1.multiply(ycoord)
bloc = b.toLocalMatrix().toArray()
alpha = np.linalg.solve(A, bloc)
alphablk = indRowMatMaker(sc.parallelize(alpha)).toBlockMatrix()
def solver(vectors):
U = vectors
f = U.multiply(alphablk)
#f = f.reshape((f.shape[0],-1))
return f
efunctions = solver(Ublk)
efunctions.
ec = efunctions.toCoordinateMatrix().toRowMatrix().rows.map(lambda x: x.toArray())
# Set up a GMM to assign the hard labels from the eigenfunctions.
gmm = GaussianMixture.train(ec, np.size(classes), convergenceTol=0.0001,
maxIterations=5000, seed=10)
labels_ =gmm.predict(ec)
#=====================================================================
#Predict
testXrdd = sc.parallelize(testX)
testXforPCA = testXrdd.map(lambda rw: Vectors.dense(rw))
newX = PCA.transform(testXforPCA)
testdf = makeDF(newX)
testdatabounds = getdataboundaries(testdf)
test_bounds = sc.broadcast(testdatabounds)
testmakeItMatrix = RowMatrix(testdf.rdd.map(lambda row: selectValues(row.asDict()).values()))
testapproxValues = testmakeItMatrix.rows.map(lambda rw: transformer(rw, test_bounds))
testapproxValuesblk = indRowMatMaker(testapproxValues).toBlockMatrix()
testfunctions = solver(testapproxValuesblk)
testfunc = testfunctions.toCoordinateMatrix().toRowMatrix().rows.map(lambda x: x.toArray())
predictedlabels = gmm.predict(testfunc)
In [37]:
predictedlabels.collect()
Out[37]:
In [1]:
import numpy as np
from sklearn import datasets
dataX,dataY=datasets.make_blobs(n_samples=20000, n_features=10, centers=2, cluster_std=1.5, center_box=(-10.0, 10.0), shuffle=True, random_state=None)
def labelremover(X,y):
newX1 = np.around(X,decimals=2)
newY1=np.copy(y)
dim = X.shape[1]
points = np.array(np.empty(len(np.unique(y))))
knownX = np.empty((len(points),dim))
knownY = np.empty(len(points))
for i in np.unique(y):
points[i] = np.where(y==(i))[0][0]
for j in np.arange(0,len(newY1)):
newY1[j]=-1
for k in np.unique(y):
newY1[points[k]] = y[points[k]]
knownX = X[[i for i in points]]
knownY = y[[i for i in points]]
print "These are labels of known points: "+ str(knownY)
return (newY1, knownX, knownY)
trainX = dataX[0:18000,:]
trainY = dataY[0:18000]
testX = dataX[18000:20000,:]
testY = dataY[18000:20000]
newtrainY, knownX, knownY = labelremover(trainX,trainY)
In [129]:
np.set_printoptions(threshold = 10)
In [245]:
%run LabelPropagationDistributed.ipynb
lpd = LabelPropagationDistributed(numBins=3)
X = sc.parallelize(trainX)
y = sc.parallelize(newtrainY)
lpd.fit(X,y)
plabels_ = lpd.predict(sc.parallelize(testX))
In [250]:
trainX
Out[250]:
In [249]:
print "standalone k smallest eigenvalues: " ,newg
print "distributed k smallest eigenvectors: ", lpd.newg
print "====================================================\n"
print "binEdges:", binEdges
print "dist binedges:", lpd.ori_edgem
print "hist edgemeans:", b_edgeMeans
print "dist edgemeans:", lpd.b_edgeMeans
print "ind is:", ind
print "dist ind is:", lpd.ind
print "edgemeans are:", newEdgeMeans
print "dist edgemeans are:", lpd.newEdgeMeans
print "====================================================\n"
print "U is:", U
print "dist U is:", lpd.U1.toLocalMatrix().toArray()
print "S is:", S
print "dist S is:", lpd.S
print "V is:", V
print "dist V is:", lpd.V.entries.collect()
print "====================================================\n"
print "A is: ", A
print "dist A is: ", lpd.A1
print "b is: ",b
print "dict b is: ", lpd.b1
print "====================================================\n"
print "standalone alpha is:", alpha
print "distributed alpha is ", lpd.alphab.toLocalMatrix().toArray()
print lpd.ab1
In [86]:
plt.scatter(trainX[:, 0], trainX[:, 1], marker='o', c=trainY, cmap = ('ocean'))
plt.show()
plt.scatter(trainX[:, 0], trainX[:, 1], marker='o', c=lpd.labels_.collect(), cmap = ('ocean'))
plt.scatter(trainX[:, 0], trainX[:, 1], marker='o', c=labels_, cmap = ('ocean'))
plt.scatter(testX[:, 0], testX[:, 1], marker='o', c=testY, cmap = ('ocean'))
plt.scatter(testX[:, 0], testX[:, 1], marker='o', c=plabels_.collect(), cmap = ('ocean'))
plt.scatter(testX[:, 0], testX[:, 1], marker='o', c=tlabels_, cmap = ('ocean'))
In [213]:
b_edgeMeans
Out[213]:
In [214]:
lpd.b_edgeMeans
Out[214]:
In [239]:
histograms,binEdges = np.histogram(trainX[:,3],bins=3,density=True)
In [240]:
b1,h1 = sc.parallelize(trainX[:,3]).histogram(3)
In [237]:
b1
Out[237]:
In [235]:
binEdges
Out[235]:
In [ ]: