In [1]:
SQLContext.newSession(sqlContext)
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, ValidatorParams
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, DoubleType, IntegerType
from pyspark.ml.linalg import Vectors, VectorUDT, Matrix, MatrixUDT, DenseMatrix
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row, Window, functions as F
from pyspark.ml import Pipeline, Transformer, Estimator
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark import keyword_only
#from spark_sklearn import GridSearchCV,Converter
#from sklearn.cluster import KMeans as skKmeans
#from sklearn.linear_model import LogisticRegression as skLogistic
import pandas as pd
import re
import random
from prettytable import PrettyTable
import sys
from datetime import datetime
from operator import add
import numpy as np
import matplotlib.pyplot as plt
#from spark_sklearn import GridSearchCV,Converter
PATH = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/"
sc.addPyFile("/home/svanhmic/workspace/Python/Erhvervs/src/cvr/GridSearchLogRegAndKmeans.py")
sc.addPyFile("/home/svanhmic/workspace/Python/Erhvervs/src/cvr/ConvertAllToVecToMl.py")
import GridSearchLogRegAndKmeans
from ConvertAllToVecToMl import ConvertAllToVecToMl as convert
In [22]:
#RAW DATA!!!
df = sqlContext.read.parquet(PATH+"featureDataCvr")
#exclude some of the variables, and cast all variables to double
excludeCols = ["medArb_"+str(i) for i in range(1,16)] # we don't need the medarbejders
includeCols = [i for i in df.columns if i not in excludeCols]
rankCols = [re.sub(pattern="rank_",repl="vaerdiSlope_",string=i) for i in includeCols]
finalCols = [F.col(i) for i in includeCols[:2]]+[F.col(i).cast("double") for i in includeCols[2:]]
renamedDf = (df
.select(*finalCols)
.select([F.col(val).alias(rankCols[idx]) for idx,val in enumerate(includeCols)])
)
#renamedDf.show(3)
#renamedDf.printSchema()
#import name data frame
In [24]:
#import company names with CVR-number
windowSpecRank =(Window.partitionBy(F.col("cvrNummer"))).orderBy(F.col("gyldigFra").desc())
groupCols = ["cvrNummer","vaerdi"]
companyNameDf = (sqlContext
.read
.parquet(PATH+"companyCvrData")
.withColumn(colName="rank",col=F.rank().over(windowSpecRank))
.filter((F.col("rank")==1) & (F.col("sekvensnr")==0))
.select([F.col(i) for i in groupCols])
.withColumnRenamed(existing="vaerdi",new="navn")
.orderBy(F.col("cvrNummer"))
.cache()
)
In [5]:
#build own transformer
In [ ]:
In [23]:
labelCols = ["navn","cvrNummer","label","status"]
featCols = [i for i in companyNameDf.columns+renamedDf.columns if i not in labelCols]
#get minimum values from each column
minCols = [F.min(i).alias(i) for i in featCols]
minValsRdd = renamedDf.groupby().agg(*minCols).rdd
broadcastedmin = sc.broadcast(minValsRdd.first().asDict())
#create array that subtracts minimum value in the numeric columns.
logColsSelected = [F.col(i).alias(i) for i in labelCols]+[(F.col(i)-F.lit(broadcastedmin.value[i])).alias(i) for i in featCols]
#takes log(x+1) to the numeric columns and fills the blanks with 0.0
logDf = (renamedDf
.join(companyNameDf,(companyNameDf["cvrNummer"]==renamedDf["cvrNummer"]),"inner")
.drop(companyNameDf["cvrNummer"])
.select(*logColsSelected)
.select([F.col(i).alias(i) for i in labelCols]+[F.log1p(F.col(i)).alias(i) for i in featCols])
.distinct()
.na
.fill(0.0,featCols)
.cache()
)
In [131]:
def computeAndInsertClusterCenter(dataset,centers):
'''
Insert a clusterCenter as column.
'''
distanceUdf = F.udf(lambda x,y: float(np.sqrt(np.sum((x-y)*(x-y)))),DoubleType())
return (dataset
.join(F.broadcast(centers),on=(dataset["prediction"]==centers["cluster"]),how="inner")
.withColumn(colName="distance",col=distanceUdf(F.col("scaledFeatures"),F.col("center")))
.drop("cluster")
.drop("features")
.drop("v2")
)
In [111]:
In [138]:
vectorizer = VectorAssembler(inputCols=featCols,outputCol="features")
con = convert(inputCol=vectorizer.getOutputCol(),outputCol="v2")
standardScale = StandardScaler(withMean=True,withStd=True,inputCol=con.getOutputCol(),outputCol="scaledFeatures")
kmeans = KMeans(featuresCol=standardScale.getOutputCol(),predictionCol="prediction")
pipeline = Pipeline(stages=[vectorizer,con,standardScale,kmeans])
paramMap = ({kmeans.k: 3,kmeans.initMode:"random"}
,{kmeans.k: 4,kmeans.initMode:"random"}
,{kmeans.k: 5,kmeans.initMode:"random"})
In [139]:
#builld the model for the pipeline
model = pipeline.fit(logDf,params=paramMap)
In [164]:
transformedDfs = [i.transform(logDf) for i in model]
transformedModels = [v.stages[-1].computeCost(transformedDfs[i]) for i,v in enumerate(model)]
In [166]:
transformedModels
bestModel = model[-1]
#ide til næste gang beregn beste stuff for alle modeller i pipelinen, derefer tag bedste pipeline ud og byg videre på den.
In [168]:
type(pipeline)
Out[168]:
In [150]:
print([i.computeCost() for i in models])
#transformed = model.transform(logDf)
#rowCenters = [Row(cluster=i,center=Vectors.dense(k)) for i,k in list(enumerate(model.stages[-1].clusterCenters()))]
#centersDf = sc.parallelize(rowCenters).toDF()
#transformedCenter = computeAndInsertClusterCenter(transformed,centersDf)
#transformedCenter.select("cvrNummer","prediction","distance").show(truncate=False)
In [66]:
km = model.stages[-1]
km
#km.computeCost(transformed)
Out[66]:
In [41]:
class DistanceTransformation(Transformer,HasInputCol,HasOutputCol):
'''
'''
@keyword_only
def __init__(self, inputCol=None, outputCol=None, model=None):
super(DistanceTransformation, self).__init__()
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, inputCol=None, outputCol=None, model=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
def _transform(self, dataset,model):
def computeAndInsertClusterCenter(dataset,centers):
'''
Insert a clusterCenter as column.
'''
distanceUdf = F.udf(lambda x,y: float(np.sqrt(np.sum((x-y)*(x-y)))),DoubleType())
return (dataset
.join(F.broadcast(centers),on=(dataset["prediction"]==centers["cluster"]),how="inner")
.withColumn(colName="distance",col=distanceUdf(F.col("scaledFeatures"),F.col("center")))
.drop("cluster")
.drop("features")
.drop("v2")
)
In [40]:
print(getCenters(0))
paramGrid = ParamGridBuilder() \
.addGrid(kmeans.k, [2, 4, 10]) \
.addGrid(kmeans.initSteps, [3,5,10]) \
.build()
In [9]:
#create an unsupervised classification evaluator
class ElbowEvaluation(Estimator,ValidatorParams):
'''
doc
'''
@keyword_only
def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None,
seed=None):
super(ElbowEvaluation, self).__init__()
kwargs = self.__init__._input_kwargs
self._set(**kwargs)
@keyword_only
def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
computeDistanceToCenterUdf = F.udf(lambda x,y: (x-y)*(x-y),VectorUDT())
def _fit(self, dataset):
est = self.getOrDefault(self.estimator)
epm = self.getOrDefault(self.estimatorParamMaps)
numModels = len(epm)
eva = self.getOrDefault(self.evaluator)
for j in range(numModels):
model = est.fit(dataset, epm[j])
model.
metric = eva.evaluate(model.transform(dataset, epm[j]))
metrics[j] += metric
if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
else:
bestIndex = np.argmin(metrics)
bestModel = est.fit(dataset, epm[bestIndex])
return self._copyValues(TrainValidationSplitModel(bestModel, metrics))
def copy(self, extra=None):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies creates a deep copy of
the embedded paramMap, and copies the embedded and extra parameters over.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
if extra is None:
extra = dict()
newTVS = Params.copy(self, extra)
if self.isSet(self.estimator):
newTVS.setEstimator(self.getEstimator().copy(extra))
# estimatorParamMaps remain the same
if self.isSet(self.evaluator):
newTVS.setEvaluator(self.getEvaluator().copy(extra))
return newTVS
In [ ]:
class ElbowEvaluationModel(Model, ValidatorParams):
"""
.. note:: Experimental
Model from train validation split.
.. versionadded:: 2.0.0
"""
def __init__(self, bestModel, validationMetrics=[]):
super(TrainValidationSplitModel, self).__init__()
#: best model from cross validation
self.bestModel = bestModel
#: evaluated validation metrics
self.validationMetrics = validationMetrics
def _transform(self, dataset):
return self.bestModel.transform(dataset)
def copy(self, extra=None):
"""
Creates a copy of this instance with a randomly generated uid
and some extra params. This copies the underlying bestModel,
creates a deep copy of the embedded paramMap, and
copies the embedded and extra parameters over.
And, this creates a shallow copy of the validationMetrics.
:param extra: Extra parameters to copy to the new instance
:return: Copy of this instance
"""
if extra is None:
extra = dict()
bestModel = self.bestModel.copy(extra)
validationMetrics = list(self.validationMetrics)
return TrainValidationSplitModel(bestModel, validationMetrics)