In [1]:
transformedDfs = [i.transform(logDf) for i in model]
costs = [(i,v.stages[-1].computeCost(transformedDfs[i])) for i,v in enumerate(model)]
costs
#transformedModels = [v.stages[-1].computeCost(transformedDfs[i]) for i,v in enumerate(model)]
In [ ]:
newParamMap = ({kmeans.k: 10,kmeans.initMode:"random"})
newModel = pipeline.fit(logDf,newParamMap)
#computedModel = pipeline.fit(logDf)
#ide til næste gang beregn beste stuff for alle modeller i pipelinen, derefer tag bedste pipeline ud og byg videre på den.
In [ ]:
trans = newModel.transform(logDf)
trans.groupBy("prediction").count().show() # shows the distribution of companies
vec = [Row(cluster=i,center=Vectors.dense(v)) for i,v in enumerate(newModel.stages[-1].clusterCenters())]
#print(type(vec))
SpDf = sqlContext.createDataFrame(data=vec)
#SpDf.show(truncate=False)
featureContributionUdf = F.udf(lambda x,y: (x-y)*(x-y),VectorUDT() )
sqrtUdf = F.udf(lambda x,y: float(Vectors.norm(vector=x-y,p=2)),DoubleType())
printUdf = F.udf(lambda x: type(x),StringType())
toDenseUDf = F.udf(lambda x: Vectors.dense(x.toArray()),VectorUDT())
#print(np.sum(vec[0]["vec"]))
joinedDf = (trans
.join(SpDf,on=(trans["prediction"]==SpDf["cluster"]),how="left")
.withColumn(colName="features",col=toDenseUDf(F.col("features")))
.drop(SpDf["cluster"])
.withColumn(colName="contribution",col=featureContributionUdf(F.col("features"),F.col("center")))
.withColumn(colName="distance",col=sqrtUdf(F.col("features"),F.col("center")))
)
In [ ]:
int_range = widgets.IntSlider()
display(int_range)
def on_value_change(change):
print(change['new'])
int_range.observe(on_value_change, names='value')
In [ ]:
def printTotalAndAvgFeatContribution(df,cluster=0,toPrint=False):
joinedRdd = (df
.select("prediction","contribution")
.rdd)
#print(joinedRdd.take(1))
summed = joinedRdd.reduceByKey(add)
normedtotalContribute = summed.map(lambda x: (x[0],x[1])).collectAsMap()
return normedtotalContribute
In [ ]:
stuff = printTotalAndAvgFeatContribution(joinedDf)
centers = [(i,np.log(stuff[i])/np.sum(np.log(stuff[i]))) for i in range(0,10)]
cols =joinedDf.columns[5:31]
centers
clusters = np.array([i[1] for i in centers if i[0] in [6,9,4] ])
transposedCluster = np.log1p(clusters.transpose())
N =3
import colorsys
HSV_tuples = [(x*1.0/len(transposedCluster), 0.5, 0.5) for x in range(len(transposedCluster))]
RGB_tuples = list(map(lambda x: colorsys.hsv_to_rgb(*x), HSV_tuples))
ind = np.arange(N)
#print(ind)# the x locations for the groups
width = 0.5
plots = [plt.bar(ind, transposedCluster[1], width, color='#d62728')]
former = transposedCluster[1]
for i,v in enumerate(transposedCluster[1:]):
plots.append(plt.bar(ind, v, width, color=RGB_tuples[i],bottom=former))
former += v
plt.ylabel('log Scores')
plt.title('Component Contribution for outlier clusters')
plt.xticks(ind+0.3, ['C_'+str(i) for i in [6,9,4]])
plt.legend([p[0] for p in plots], cols,bbox_to_anchor=(1.05, 1.5),loc=2,borderaxespad=1)
plt.show()
In [ ]:
class DistanceTransformation(Transformer,HasInputCol,HasOutputCol):
'''
'''
@keyword_only
def __init__(self, inputCol=None, outputCol=None, model=None):
super(DistanceTransformation, self).__init__()
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None, model=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset,model):
def computeAndInsertClusterCenter(dataset,centers):
'''
Insert a clusterCenter as column.
'''
distanceUdf = F.udf(lambda x,y: float(np.sqrt(np.sum((x-y)*(x-y)))),DoubleType())
return (dataset
.join(F.broadcast(centers),on=(dataset["prediction"]==centers["cluster"]),how="inner")
.withColumn(colName="distance",col=distanceUdf(F.col("scaledFeatures"),F.col("center")))
.drop("cluster")
.drop("features")
.drop("v2")
)
In [ ]:
print(getCenters(0))
paramGrid = ParamGridBuilder() \
.addGrid(kmeans.k, [2, 4, 10]) \
.addGrid(kmeans.initSteps, [3,5,10]) \
.build()
In [ ]:
#create an unsupervised classification evaluator
class ElbowEvaluation(Estimator,ValidatorParams):
'''
doc
'''
@keyword_only
def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None,
seed=None):
super(ElbowEvaluation, self).__init__()
kwargs = self.__init__._input_kwargs
self._set(**kwargs)
@keyword_only
def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
computeDistanceToCenterUdf = F.udf(lambda x,y: (x-y)*(x-y),VectorUDT())
def _fit(self, dataset):
est = self.getOrDefault(self.estimator)
epm = self.getOrDefault(self.estimatorParamMaps)
numModels = len(epm)
eva = self.getOrDefault(self.evaluator)
for j in range(numModels):
model = est.fit(dataset, epm[j])
model.
metric = eva.evaluate(model.transform(dataset, epm[j]))
metrics[j] += metric
if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
else:
bestIndex = np.argmin(metrics)
bestModel = est.fit(dataset, epm[bestIndex])
return self._copyValues(TrainValidationSplitModel(bestModel, metrics))
def copy(self, extra=None):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies creates a deep copy of
the embedded paramMap, and copies the embedded and extra parameters over.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
if extra is None:
extra = dict()
newTVS = Params.copy(self, extra)
if self.isSet(self.estimator):
newTVS.setEstimator(self.getEstimator().copy(extra))
# estimatorParamMaps remain the same
if self.isSet(self.evaluator):
newTVS.setEvaluator(self.getEvaluator().copy(extra))
return newTVS
In [ ]:
class ElbowEvaluationModel(Model, ValidatorParams):
"""
.. note:: Experimental
Model from train validation split.
.. versionadded:: 2.0.0
"""
def __init__(self, bestModel, validationMetrics=[]):
super(TrainValidationSplitModel, self).__init__()
#: best model from cross validation
self.bestModel = bestModel
#: evaluated validation metrics
self.validationMetrics = validationMetrics
def _transform(self, dataset):
return self.bestModel.transform(dataset)
def copy(self, extra=None):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies the underlying bestModel,
creates a deep copy of the embedded paramMap, and
copies the embedded and extra parameters over.
And, this creates a shallow copy of the validationMetrics.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
if extra is None:
extra = dict()
bestModel = self.bestModel.copy(extra)
validationMetrics = list(self.validationMetrics)
return TrainValidationSplitModel(bestModel, validationMetrics)