In [3]:
from pyspark.mllib.feature import PCA as PCAmllib
from pyspark.mllib.linalg import Vectors
from numpy.core.numeric import array
from scipy.linalg import eig
from pyspark.sql import Row
from pyspark.mllib.linalg.distributed import IndexedRow, RowMatrix, IndexedRowMatrix
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql.types import *
from sklearn.metrics import pairwise
from collections import OrderedDict
from functools import partial
from pyspark.mllib.clustering import GaussianMixture
class LabelPropagationDistributed():
    '''
    The Fergus et al 2009 eigenfunction propagation classifier.

    Parameters
    ----------
    k : integer > 0
        Number of eigenvectors to use (default: # of labels, not including
        unlabeled data) in eigen-decomposition.
    lagrangian : float > 0
        Lagrangian multiplier to constrain the regularization penalty (default: 10).
    numBins : float > 0
        Number of bins for the 1-D histograms of data.


    Examples
    --------
    TODO

    References
    ----------
    Rob Fergus, Yair Weiss, Antonio Torralba. Semi-Supervised Learning in
    Gigantic Image Collections (2009).
    http://eprints.pascal-network.org/archive/00005636/01/ssl-1.pdf
    '''
    global transformer
    global selectValues
    global bc_EdgeMeans
    global bc_newg
    global kb
    def __init__(self, k = -1,numBins = -1 ,lagrangian = 10):
        
        self.k = k
        self.numBins = numBins
        self.lagrangian = lagrangian
             
        
    def makeDF(self,rotatedData, dimensions):
        X_ = rotatedData.map(lambda vec: np.array(vec))
        dataAsDict = X_.map(lambda x: tuple(float(f) for f in x))
        schemaString = ""
        for i in range(dimensions):
            i = i+1
            schemaString += str(i) + " "
        schemaString = schemaString.strip()
        fields = [StructField(field_name,FloatType(), True) for field_name in schemaString.split()]
        schema = StructType(fields)
        return sqlContext.createDataFrame(dataAsDict, schema)

    def getdataboundaries(self,dictData,k):
        dataBounds = OrderedDict()
        for i in range(0,k):
            s = str(i+1)
            tmprdd = dictData.select(s).rdd.map(lambda row: row.asDict().values()[0])
            dataBounds[i] = (tmprdd.min(),tmprdd.max())
        return dataBounds
 
    
    def transformer(vec, bounds, bc_EdgeMeans, bc_newg):
        vec1 = vec.toArray()
        tmpArr = np.zeros(vec1.shape)
        edge_means = bc_EdgeMeans.value
        for i in range(len(vec1)):
            inter2 = ip.interp1d(edge_means[:,i], bc_newg.value[:,i])
            (minVal,maxVal) = bounds.get(i)
            if (minVal < edge_means[:,i].min()) or (maxVal > edge_means[:,i].max()):
                val = (((edge_means[:,i].max()-edge_means[:,i].min())*(vec1[i] - minVal))/(maxVal - minVal)) + edge_means[:,i].min()
                if vec1[i]==minVal:
                    val = val + 0.001
                if vec1[i]==maxVal:
                    val = val - 0.001
            else:
                val = vec1[i]
            tmpArr[i] = inter2(val)
        return DenseVector(tmpArr)
        
        
    @staticmethod
    def indRowMatMaker(irm):
        return IndexedRowMatrix(irm.zipWithIndex().map(lambda x:IndexedRow(x[1],x[0])))
    
    def solver(self,vectors):
        U = vectors
        alpha_bc = sc.broadcast(self.alpha.reshape(self.k,1))
        f = U.map(lambda vec: np.dot(vec.toArray(), alpha_bc.value))
        return f
    
    def selectValues(ddict,kb):
        desired_keys = sorted([int(k) for k in ddict.keys()])[0:kb.value]
        newddict = { i: ddict[str(i)] for i in desired_keys }
        return newddict
    
    def relabel(self,labels):
        gaussians = np.zeros((self.k,1))
        i=0
        for ls in self.gmm.gaussians:
            gaussians[i] = (ls.mu)
            i = i +1
        distmeans = sc.broadcast(np.argsort(gaussians.flatten()))
        return labels.map(lambda x: np.where(distmeans.value == x)[0][0])


    def fit(self,X,y=None):
        '''
        X and y should be RDDs
        
        Solve for alpha
        
        '''
        
        def _get_kernel(X, y = None,ker=None):

            if ker == "rbf":
                if y is None:
                    return pairwise.rbf_kernel(X, X, gamma = 625)
                else:
                    return pairwise.rbf_kernel(X, y, gamma = 625)
            elif ker == "linear":

                #dist = DistanceMetric.get_metric('euclidean')
                if y is None:
                    return pairwise.euclidean_distances(X, X)
                    #return dist.pairwise(X)
                else:
                    return pairwise.euclidean_distances(X, y)
                    #return dist.pairwise(X)
            else:
                raise ValueError("is not a valid kernel. Only rbf and euclidean"
                                 " are supported at this time")
        
        self.newX = X.zipWithIndex().map(lambda (x,y): (y,x))
        self.n = X.cache().count()
        self.dimensions= X.zipWithIndex().filter(lambda (ls,i): i==0).map(lambda (row,i): len(row)).collect()[0]
        self.classes = sorted(y.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b).map(lambda (x,arr): x).collect())
        if self.classes[0] == -1:
            self.classes = np.delete(self.classes, 0) # remove the -1 from this list
            if self.k == -1:
                self.k = np.size(self.classes)
        if self.numBins == -1:
                self.numBins = self.k
         
        sig = np.zeros(self.dimensions)
        gee = np.zeros((self.numBins,self.dimensions))
        b_edgeMeans = np.zeros((self.numBins,self.dimensions))
        #-----------End of declarations-----------------
        XforPCA = X.map(lambda rw: Vectors.dense(rw))
        self.PCA = PCAmllib(self.dimensions).fit(XforPCA)
        rotatedData = self.PCA.transform(XforPCA)
        dictData = self.makeDF(rotatedData, self.dimensions)
    
        for i in range(self.dimensions):
            s = str(i+1)
            dimRdd = dictData.select(s)
            binEdges,histograms = dimRdd.rdd.map(lambda x: x.asDict().values()[0]).histogram(self.numBins)
            histograms = array(histograms)
            binEdges = np.array(binEdges)
            db = array(np.diff(binEdges),float)
            histograms = histograms/db/histograms.sum()
            histograms = histograms + 0.01
            histograms /= histograms.sum()
            # calculating means on the bin edges as the x-axis for the interpolators
            b_edgeMeans[:,i] = np.array([binEdges[j:j + 2].mean() for j in range(binEdges.shape[0] - 1)])
            
            #get D~, P, W~
            '''
            Wdis = Affinity between discrete points.
            Since Affinity matrix will be build on one histogram at a time. I am using pairwise linear- kernel affinities
            P  = Diagonal matrix of histograms
            Ddis = Diagonal matrix whose diagonal elements are the sum of the columns of PW~P
            Dhat = Diagonal matrix whose diagonal elements are the sum of columns of PW~
            '''
            Wdis = _get_kernel(histograms.reshape(histograms.shape[0],1),y=None,ker="linear")
            P = np.diag(histograms)
            Ddis = np.diag(np.sum((P.dot(Wdis.dot(P))),axis=0))
            Dhat = np.diag(np.sum(P.dot(Wdis),axis=0))
            #Creating generalized eigenfunctions and eigenvalues from histograms.
            sigmaVals, functions = eig((Ddis-(P.dot(Wdis.dot(P)))),(P.dot(Dhat)))
            arg = np.argsort(np.real(sigmaVals))[1]
            sig[i] = np.real(sigmaVals)[arg]
            gee[:,i] = np.real(functions)[:,arg]
        
        # k smallest eigenvalues and eigenvectors
        if np.isnan(np.min(sig)):
            nan_num = np.isnan(sig)
            sig[nan_num] = 0
        ind =  np.argsort(sig)[0:self.k]
        self.newsig = sig[ind]
        self.newg = gee[:,ind]
        self.newEdgeMeans = b_edgeMeans[:,ind]
        bc_EdgeMeans = sc.broadcast(self.newEdgeMeans)
        bc_newg = sc.broadcast(self.newg)
        kb = sc.broadcast(self.k)  
        dataBounds = self.getdataboundaries(dictData, self.k)
        
        #make it a matrix
        makeItMatrix = RowMatrix(dictData.rdd.map(lambda row: selectValues(row.asDict(), kb).values()))
        approxValues = makeItMatrix.rows.map(lambda rw: transformer(rw, dataBounds, bc_EdgeMeans, bc_newg))
        # U: k eigenvectors corresponding to smallest eigenvalues. (n_samples by k)
        # S: Diagonal matrix of k smallest eigenvalues. (k by k)
        # V: Diagonal matrix of LaGrange multipliers for labeled data, 0 for unlabeled. (n_samples by n_samples)
        U = LabelPropagationDistributed.indRowMatMaker(approxValues)
        labeled_ind = np.array(y.zipWithIndex().filter(lambda (a,b): a!=-1).map(lambda (a,b): b).collect())
        matent = []
        for i in labeled_ind:
            matent.append(MatrixEntry(i,i,self.lagrangian))
        V = CoordinateMatrix(sc.parallelize(matent),numRows=self.n, numCols=self.n)
        Utrans = U.toCoordinateMatrix().transpose()  
        Ublk = U.toBlockMatrix()
        product1 = Utrans.toBlockMatrix().multiply(V.toBlockMatrix())
        product2 = product1.multiply(Ublk)
        S = np.diag(self.newsig)
        localblk = product2.toLocalMatrix().toArray()
        A = S + localblk
        if np.linalg.det(A) == 0:
            A = A + np.eye(A.shape[1])*0.000001
        yblk = CoordinateMatrix(y.zipWithIndex().map(lambda x: MatrixEntry(x[1],0,x[0]))).toBlockMatrix()
        b = product1.multiply(yblk)
        b = b.toLocalMatrix().toArray()
        alpha = np.linalg.solve(A, b)
        self.alpha = alpha
        efunc = self.solver(approxValues)
        # Set up a GMM to assign the hard labels from the eigenfunctions.
        self.gmm = GaussianMixture.train(efunc, np.size(self.classes), convergenceTol=0.0001,
                                         maxIterations=20000, seed=None)
        labels_ = self.gmm.predict(efunc)
        self.labels_ = self.relabel(labels_)
        #===============================
        self.ind = ind
        self.U1 = Ublk
        self.S = S
        self.V = V
        self.A1 = A
        self.b1 = b
        self.b_edgeMeans = b_edgeMeans
        self.ec = efunc
        self.aprval = approxValues
        self.databeforeapproxval = makeItMatrix
        self.pcadata = dictData
        self.yblk = yblk
        self.fullpca = rotatedData
        self.databounds = dataBounds
        self.gee = gee
        self.ori_edgem = binEdges
        self.justU = U
        #=================================

        return self
    
    def predict(self, X,y=None):
        '''
        X and y should be RDDs
        
        '''
        bc_EdgeMeans = sc.broadcast(self.newEdgeMeans)
        bc_newg = sc.broadcast(self.newg)
        kb = sc.broadcast(self.k)  
        testXforPCA = X.map(lambda rw: Vectors.dense(rw))
        newX = self.PCA.transform(testXforPCA)
        testdf = self.makeDF(newX, self.dimensions)
        testdatabounds = self.getdataboundaries(testdf, self.k)
        test_bounds = sc.broadcast(testdatabounds)
        testmakeItMatrix = RowMatrix(testdf.rdd.map(lambda row: selectValues(row.asDict(), kb).values()))
        testapproxValues = testmakeItMatrix.rows.map(lambda rw: transformer(rw, testdatabounds,bc_EdgeMeans, bc_newg))
        #testapproxValuesblk = LabelPropagationDistributed.indRowMatMaker(testapproxValues).toBlockMatrix()
        testfunctions = self.solver(testapproxValues)
        #testfunc = testfunctions.toCoordinateMatrix().toRowMatrix().rows.map(lambda x: x.toArray())
        predictedlabels = self.relabel(self.gmm.predict(testfunctions))
        
        return predictedlabels

In [ ]:


In [ ]: