In [1]:
transformedDfs = [i.transform(logDf) for i in model]
costs = [(i,v.stages[-1].computeCost(transformedDfs[i])) for i,v in enumerate(model)]
costs

#transformedModels = [v.stages[-1].computeCost(transformedDfs[i]) for i,v in enumerate(model)]


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-1-9ce919b82574> in <module>()
----> 1 transformedDfs = [i.transform(logDf) for i in model]
      2 costs = [(i,v.stages[-1].computeCost(transformedDfs[i])) for i,v in enumerate(model)]
      3 costs
      4 
      5 #transformedModels = [v.stages[-1].computeCost(transformedDfs[i]) for i,v in enumerate(model)]

NameError: name 'model' is not defined

In [ ]:
newParamMap =  ({kmeans.k: 10,kmeans.initMode:"random"})
newModel = pipeline.fit(logDf,newParamMap)
#computedModel = pipeline.fit(logDf)

#ide til næste gang beregn beste stuff for alle modeller i pipelinen, derefer tag bedste pipeline ud og byg videre på den.

In [ ]:
trans = newModel.transform(logDf)
trans.groupBy("prediction").count().show() # shows the distribution of companies 


vec = [Row(cluster=i,center=Vectors.dense(v)) for i,v in enumerate(newModel.stages[-1].clusterCenters())]
#print(type(vec))
SpDf = sqlContext.createDataFrame(data=vec)
#SpDf.show(truncate=False)



featureContributionUdf = F.udf(lambda x,y: (x-y)*(x-y),VectorUDT() )
sqrtUdf = F.udf(lambda x,y: float(Vectors.norm(vector=x-y,p=2)),DoubleType())
printUdf = F.udf(lambda x: type(x),StringType())
toDenseUDf = F.udf(lambda x: Vectors.dense(x.toArray()),VectorUDT())
#print(np.sum(vec[0]["vec"]))
joinedDf = (trans
            .join(SpDf,on=(trans["prediction"]==SpDf["cluster"]),how="left")
            .withColumn(colName="features",col=toDenseUDf(F.col("features")))
            .drop(SpDf["cluster"])
            .withColumn(colName="contribution",col=featureContributionUdf(F.col("features"),F.col("center")))
            .withColumn(colName="distance",col=sqrtUdf(F.col("features"),F.col("center")))
           )

In [ ]:
int_range = widgets.IntSlider()
display(int_range)

def on_value_change(change):
    print(change['new'])

int_range.observe(on_value_change, names='value')

In [ ]:
def printTotalAndAvgFeatContribution(df,cluster=0,toPrint=False):
    joinedRdd = (df
                 .select("prediction","contribution")
                 .rdd)
    #print(joinedRdd.take(1))
    summed = joinedRdd.reduceByKey(add)
    normedtotalContribute = summed.map(lambda x: (x[0],x[1])).collectAsMap()
    
    
    
    return normedtotalContribute

In [ ]:
stuff = printTotalAndAvgFeatContribution(joinedDf)

centers = [(i,np.log(stuff[i])/np.sum(np.log(stuff[i]))) for i in range(0,10)]
cols =joinedDf.columns[5:31]
centers

clusters = np.array([i[1] for i in centers if i[0] in [6,9,4]  ])
transposedCluster = np.log1p(clusters.transpose())
N =3

import colorsys
HSV_tuples = [(x*1.0/len(transposedCluster), 0.5, 0.5) for x in range(len(transposedCluster))]
RGB_tuples = list(map(lambda x: colorsys.hsv_to_rgb(*x), HSV_tuples))

ind = np.arange(N)
#print(ind)# the x locations for the groups
width = 0.5 
plots = [plt.bar(ind, transposedCluster[1], width, color='#d62728')]  
former = transposedCluster[1]
for i,v in enumerate(transposedCluster[1:]):
    plots.append(plt.bar(ind, v, width, color=RGB_tuples[i],bottom=former))
    former += v
plt.ylabel('log Scores')
plt.title('Component Contribution for outlier clusters')
plt.xticks(ind+0.3, ['C_'+str(i) for i in [6,9,4]])
plt.legend([p[0] for p in plots], cols,bbox_to_anchor=(1.05, 1.5),loc=2,borderaxespad=1)
plt.show()

In [ ]:
class DistanceTransformation(Transformer,HasInputCol,HasOutputCol):
    '''
    
    '''
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, model=None):
        super(DistanceTransformation, self).__init__()
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, model=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset,model):
        
        
        def computeAndInsertClusterCenter(dataset,centers):
            '''
            Insert a clusterCenter as column.
            '''

            distanceUdf = F.udf(lambda x,y: float(np.sqrt(np.sum((x-y)*(x-y)))),DoubleType())

            return (dataset
                    .join(F.broadcast(centers),on=(dataset["prediction"]==centers["cluster"]),how="inner")
                    .withColumn(colName="distance",col=distanceUdf(F.col("scaledFeatures"),F.col("center")))
                    .drop("cluster")
                    .drop("features")
                    .drop("v2")
                    )

In [ ]:
print(getCenters(0))

paramGrid = ParamGridBuilder() \
    .addGrid(kmeans.k, [2, 4, 10]) \
    .addGrid(kmeans.initSteps, [3,5,10]) \
    .build()

In [ ]:
#create an unsupervised classification evaluator
class ElbowEvaluation(Estimator,ValidatorParams):
    '''
        doc
    '''
    
    @keyword_only
    def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None,
                 seed=None):
        super(ElbowEvaluation, self).__init__()
        kwargs = self.__init__._input_kwargs
        self._set(**kwargs)
    
    @keyword_only
    def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)
    
    computeDistanceToCenterUdf = F.udf(lambda x,y: (x-y)*(x-y),VectorUDT())
    
    
    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)
        eva = self.getOrDefault(self.evaluator)
        
        for j in range(numModels):
            model = est.fit(dataset, epm[j])
            model.
            
            metric = eva.evaluate(model.transform(dataset, epm[j]))
            metrics[j] += metric
        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)
        bestModel = est.fit(dataset, epm[bestIndex])
        return self._copyValues(TrainValidationSplitModel(bestModel, metrics))
    
    def copy(self, extra=None):
        """
        Creates a copy of this instance with a randomly generated uid
        and some extra params. This copies creates a deep copy of
        the embedded paramMap, and copies the embedded and extra parameters over.

        :param extra: Extra parameters to copy to the new instance
        :return: Copy of this instance
        """
        if extra is None:
            extra = dict()
        newTVS = Params.copy(self, extra)
        if self.isSet(self.estimator):
            newTVS.setEstimator(self.getEstimator().copy(extra))
        # estimatorParamMaps remain the same
        if self.isSet(self.evaluator):
            newTVS.setEvaluator(self.getEvaluator().copy(extra))
        return newTVS

In [ ]:
class ElbowEvaluationModel(Model, ValidatorParams):
    """
    .. note:: Experimental

    Model from train validation split.

    .. versionadded:: 2.0.0
    """

    def __init__(self, bestModel, validationMetrics=[]):
        super(TrainValidationSplitModel, self).__init__()
        #: best model from cross validation
        self.bestModel = bestModel
        #: evaluated validation metrics
        self.validationMetrics = validationMetrics

    def _transform(self, dataset):
        return self.bestModel.transform(dataset)

    def copy(self, extra=None):
        """
        Creates a copy of this instance with a randomly generated uid
        and some extra params. This copies the underlying bestModel,
        creates a deep copy of the embedded paramMap, and
        copies the embedded and extra parameters over.
        And, this creates a shallow copy of the validationMetrics.

        :param extra: Extra parameters to copy to the new instance
        :return: Copy of this instance
        """
        if extra is None:
            extra = dict()
        bestModel = self.bestModel.copy(extra)
        validationMetrics = list(self.validationMetrics)
        return TrainValidationSplitModel(bestModel, validationMetrics)