In [3]:
from pyspark.mllib.feature import PCA as PCAmllib
from pyspark.mllib.linalg import Vectors
from numpy.core.numeric import array
from scipy.linalg import eig
from pyspark.sql import Row
from pyspark.mllib.linalg.distributed import IndexedRow, RowMatrix, IndexedRowMatrix
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql.types import *
from sklearn.metrics import pairwise
from collections import OrderedDict
from functools import partial
from pyspark.mllib.clustering import GaussianMixture
class LabelPropagationDistributed():
'''
The Fergus et al 2009 eigenfunction propagation classifier.
Parameters
----------
k : integer > 0
Number of eigenvectors to use (default: # of labels, not including
unlabeled data) in eigen-decomposition.
lagrangian : float > 0
Lagrangian multiplier to constrain the regularization penalty (default: 10).
numBins : float > 0
Number of bins for the 1-D histograms of data.
Examples
--------
TODO
References
----------
Rob Fergus, Yair Weiss, Antonio Torralba. Semi-Supervised Learning in
Gigantic Image Collections (2009).
http://eprints.pascal-network.org/archive/00005636/01/ssl-1.pdf
'''
global transformer
global selectValues
global bc_EdgeMeans
global bc_newg
global kb
def __init__(self, k = -1,numBins = -1 ,lagrangian = 10):
self.k = k
self.numBins = numBins
self.lagrangian = lagrangian
def makeDF(self,rotatedData, dimensions):
X_ = rotatedData.map(lambda vec: np.array(vec))
dataAsDict = X_.map(lambda x: tuple(float(f) for f in x))
schemaString = ""
for i in range(dimensions):
i = i+1
schemaString += str(i) + " "
schemaString = schemaString.strip()
fields = [StructField(field_name,FloatType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
return sqlContext.createDataFrame(dataAsDict, schema)
def getdataboundaries(self,dictData,k):
dataBounds = OrderedDict()
for i in range(0,k):
s = str(i+1)
tmprdd = dictData.select(s).rdd.map(lambda row: row.asDict().values()[0])
dataBounds[i] = (tmprdd.min(),tmprdd.max())
return dataBounds
def transformer(vec, bounds, bc_EdgeMeans, bc_newg):
vec1 = vec.toArray()
tmpArr = np.zeros(vec1.shape)
edge_means = bc_EdgeMeans.value
for i in range(len(vec1)):
inter2 = ip.interp1d(edge_means[:,i], bc_newg.value[:,i])
(minVal,maxVal) = bounds.get(i)
if (minVal < edge_means[:,i].min()) or (maxVal > edge_means[:,i].max()):
val = (((edge_means[:,i].max()-edge_means[:,i].min())*(vec1[i] - minVal))/(maxVal - minVal)) + edge_means[:,i].min()
if vec1[i]==minVal:
val = val + 0.001
if vec1[i]==maxVal:
val = val - 0.001
else:
val = vec1[i]
tmpArr[i] = inter2(val)
return DenseVector(tmpArr)
@staticmethod
def indRowMatMaker(irm):
return IndexedRowMatrix(irm.zipWithIndex().map(lambda x:IndexedRow(x[1],x[0])))
def solver(self,vectors):
U = vectors
alpha_bc = sc.broadcast(self.alpha.reshape(self.k,1))
f = U.map(lambda vec: np.dot(vec.toArray(), alpha_bc.value))
return f
def selectValues(ddict,kb):
desired_keys = sorted([int(k) for k in ddict.keys()])[0:kb.value]
newddict = { i: ddict[str(i)] for i in desired_keys }
return newddict
def relabel(self,labels):
gaussians = np.zeros((self.k,1))
i=0
for ls in self.gmm.gaussians:
gaussians[i] = (ls.mu)
i = i +1
distmeans = sc.broadcast(np.argsort(gaussians.flatten()))
return labels.map(lambda x: np.where(distmeans.value == x)[0][0])
def fit(self,X,y=None):
'''
X and y should be RDDs
Solve for alpha
'''
def _get_kernel(X, y = None,ker=None):
if ker == "rbf":
if y is None:
return pairwise.rbf_kernel(X, X, gamma = 625)
else:
return pairwise.rbf_kernel(X, y, gamma = 625)
elif ker == "linear":
#dist = DistanceMetric.get_metric('euclidean')
if y is None:
return pairwise.euclidean_distances(X, X)
#return dist.pairwise(X)
else:
return pairwise.euclidean_distances(X, y)
#return dist.pairwise(X)
else:
raise ValueError("is not a valid kernel. Only rbf and euclidean"
" are supported at this time")
self.newX = X.zipWithIndex().map(lambda (x,y): (y,x))
self.n = X.cache().count()
self.dimensions= X.zipWithIndex().filter(lambda (ls,i): i==0).map(lambda (row,i): len(row)).collect()[0]
self.classes = sorted(y.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).map(lambda (x,arr): x).collect())
if self.classes[0] == -1:
self.classes = np.delete(self.classes, 0) # remove the -1 from this list
if self.k == -1:
self.k = np.size(self.classes)
if self.numBins == -1:
self.numBins = self.k
sig = np.zeros(self.dimensions)
gee = np.zeros((self.numBins,self.dimensions))
b_edgeMeans = np.zeros((self.numBins,self.dimensions))
#-----------End of declarations-----------------
XforPCA = X.map(lambda rw: Vectors.dense(rw))
self.PCA = PCAmllib(self.dimensions).fit(XforPCA)
rotatedData = self.PCA.transform(XforPCA)
dictData = self.makeDF(rotatedData, self.dimensions)
for i in range(self.dimensions):
s = str(i+1)
dimRdd = dictData.select(s)
binEdges,histograms = dimRdd.rdd.map(lambda x: x.asDict().values()[0]).histogram(self.numBins)
histograms = array(histograms)
binEdges = np.array(binEdges)
db = array(np.diff(binEdges),float)
histograms = histograms/db/histograms.sum()
histograms = histograms + 0.01
histograms /= histograms.sum()
# calculating means on the bin edges as the x-axis for the interpolators
b_edgeMeans[:,i] = np.array([binEdges[j:j + 2].mean() for j in range(binEdges.shape[0] - 1)])
#get D~, P, W~
'''
Wdis = Affinity between discrete points.
Since Affinity matrix will be build on one histogram at a time. I am using pairwise linear- kernel affinities
P = Diagonal matrix of histograms
Ddis = Diagonal matrix whose diagonal elements are the sum of the columns of PW~P
Dhat = Diagonal matrix whose diagonal elements are the sum of columns of PW~
'''
Wdis = _get_kernel(histograms.reshape(histograms.shape[0],1),y=None,ker="linear")
P = np.diag(histograms)
Ddis = np.diag(np.sum((P.dot(Wdis.dot(P))),axis=0))
Dhat = np.diag(np.sum(P.dot(Wdis),axis=0))
#Creating generalized eigenfunctions and eigenvalues from histograms.
sigmaVals, functions = eig((Ddis-(P.dot(Wdis.dot(P)))),(P.dot(Dhat)))
arg = np.argsort(np.real(sigmaVals))[1]
sig[i] = np.real(sigmaVals)[arg]
gee[:,i] = np.real(functions)[:,arg]
# k smallest eigenvalues and eigenvectors
if np.isnan(np.min(sig)):
nan_num = np.isnan(sig)
sig[nan_num] = 0
ind = np.argsort(sig)[0:self.k]
self.newsig = sig[ind]
self.newg = gee[:,ind]
self.newEdgeMeans = b_edgeMeans[:,ind]
bc_EdgeMeans = sc.broadcast(self.newEdgeMeans)
bc_newg = sc.broadcast(self.newg)
kb = sc.broadcast(self.k)
dataBounds = self.getdataboundaries(dictData, self.k)
#make it a matrix
makeItMatrix = RowMatrix(dictData.rdd.map(lambda row: selectValues(row.asDict(), kb).values()))
approxValues = makeItMatrix.rows.map(lambda rw: transformer(rw, dataBounds, bc_EdgeMeans, bc_newg))
# U: k eigenvectors corresponding to smallest eigenvalues. (n_samples by k)
# S: Diagonal matrix of k smallest eigenvalues. (k by k)
# V: Diagonal matrix of LaGrange multipliers for labeled data, 0 for unlabeled. (n_samples by n_samples)
U = LabelPropagationDistributed.indRowMatMaker(approxValues)
labeled_ind = np.array(y.zipWithIndex().filter(lambda (a,b): a!=-1).map(lambda (a,b): b).collect())
matent = []
for i in labeled_ind:
matent.append(MatrixEntry(i,i,self.lagrangian))
V = CoordinateMatrix(sc.parallelize(matent),numRows=self.n, numCols=self.n)
Utrans = U.toCoordinateMatrix().transpose()
Ublk = U.toBlockMatrix()
product1 = Utrans.toBlockMatrix().multiply(V.toBlockMatrix())
product2 = product1.multiply(Ublk)
S = np.diag(self.newsig)
localblk = product2.toLocalMatrix().toArray()
A = S + localblk
if np.linalg.det(A) == 0:
A = A + np.eye(A.shape[1])*0.000001
yblk = CoordinateMatrix(y.zipWithIndex().map(lambda x: MatrixEntry(x[1],0,x[0]))).toBlockMatrix()
b = product1.multiply(yblk)
b = b.toLocalMatrix().toArray()
alpha = np.linalg.solve(A, b)
self.alpha = alpha
efunc = self.solver(approxValues)
# Set up a GMM to assign the hard labels from the eigenfunctions.
self.gmm = GaussianMixture.train(efunc, np.size(self.classes), convergenceTol=0.0001,
maxIterations=20000, seed=None)
labels_ = self.gmm.predict(efunc)
self.labels_ = self.relabel(labels_)
#===============================
self.ind = ind
self.U1 = Ublk
self.S = S
self.V = V
self.A1 = A
self.b1 = b
self.b_edgeMeans = b_edgeMeans
self.ec = efunc
self.aprval = approxValues
self.databeforeapproxval = makeItMatrix
self.pcadata = dictData
self.yblk = yblk
self.fullpca = rotatedData
self.databounds = dataBounds
self.gee = gee
self.ori_edgem = binEdges
self.justU = U
#=================================
return self
def predict(self, X,y=None):
'''
X and y should be RDDs
'''
bc_EdgeMeans = sc.broadcast(self.newEdgeMeans)
bc_newg = sc.broadcast(self.newg)
kb = sc.broadcast(self.k)
testXforPCA = X.map(lambda rw: Vectors.dense(rw))
newX = self.PCA.transform(testXforPCA)
testdf = self.makeDF(newX, self.dimensions)
testdatabounds = self.getdataboundaries(testdf, self.k)
test_bounds = sc.broadcast(testdatabounds)
testmakeItMatrix = RowMatrix(testdf.rdd.map(lambda row: selectValues(row.asDict(), kb).values()))
testapproxValues = testmakeItMatrix.rows.map(lambda rw: transformer(rw, testdatabounds,bc_EdgeMeans, bc_newg))
#testapproxValuesblk = LabelPropagationDistributed.indRowMatMaker(testapproxValues).toBlockMatrix()
testfunctions = self.solver(testapproxValues)
#testfunc = testfunctions.toCoordinateMatrix().toRowMatrix().rows.map(lambda x: x.toArray())
predictedlabels = self.relabel(self.gmm.predict(testfunctions))
return predictedlabels
In [ ]:
In [ ]: