This notebook should contain a pipeline method for bancruptcy

from import VectorAssembler, StandardScaler
from import CrossValidator, ParamGridBuilder, ValidatorParams
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, DoubleType, IntegerType
from import Vectors, VectorUDT, Matrix, MatrixUDT, DenseMatrix
from import KMeans
from import LogisticRegression
from pyspark.sql import Row, Window, functions as F
from import Pipeline, Transformer, Estimator
from import HasInputCol, HasOutputCol, Param
from pyspark import keyword_only 
#from spark_sklearn import GridSearchCV,Converter
#from sklearn.cluster import KMeans as skKmeans
#from sklearn.linear_model import LogisticRegression as skLogistic

import pandas as pd

import re
import random
from prettytable import PrettyTable
import sys
from datetime import datetime
from operator import add
import numpy as np
import matplotlib.pyplot as plt
#from spark_sklearn import GridSearchCV,Converter
PATH = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/"

import GridSearchLogRegAndKmeans
from ConvertAllToVecToMl import ConvertAllToVecToMl as convert

import the data

df ="featureDataCvr")

#exclude some of the variables, and cast all variables to double
excludeCols = ["medArb_"+str(i) for i in range(1,16)] # we don't need the medarbejders 
includeCols = [i for i in df.columns if i not in excludeCols]
rankCols = [re.sub(pattern="rank_",repl="vaerdiSlope_",string=i) for i in includeCols]
finalCols = [F.col(i) for i in includeCols[:2]]+[F.col(i).cast("double") for i in includeCols[2:]]

renamedDf = (df
             .select([F.col(val).alias(rankCols[idx]) for idx,val in enumerate(includeCols)])
#import name data frame

#import company names with CVR-number
windowSpecRank =(Window.partitionBy(F.col("cvrNummer"))).orderBy(F.col("gyldigFra").desc())
groupCols = ["cvrNummer","vaerdi"]

companyNameDf = (sqlContext
                 .filter((F.col("rank")==1) & (F.col("sekvensnr")==0))
                 .select([F.col(i) for i in groupCols])

#build own transformer

labelCols = ["navn","cvrNummer","label","status"]
featCols = [i for i in companyNameDf.columns+renamedDf.columns if i not in labelCols]

#get minimum values from each column
minCols = [F.min(i).alias(i) for i in featCols]
minValsRdd = renamedDf.groupby().agg(*minCols).rdd
broadcastedmin = sc.broadcast(minValsRdd.first().asDict())

#create array that subtracts minimum value in the numeric columns.
logColsSelected = [F.col(i).alias(i) for i in labelCols]+[(F.col(i)-F.lit(broadcastedmin.value[i])).alias(i) for i in featCols]

#takes log(x+1) to the numeric columns and fills the blanks with 0.0 
logDf = (renamedDf
         .select([F.col(i).alias(i) for i in labelCols]+[F.log1p(F.col(i)).alias(i) for i in featCols])

def computeAndInsertClusterCenter(dataset,centers):
        Insert a clusterCenter as column.
    distanceUdf = F.udf(lambda x,y: float(np.sqrt(np.sum((x-y)*(x-y)))),DoubleType())
    return (dataset

vectorizer = VectorAssembler(inputCols=featCols,outputCol="features")
con = convert(inputCol=vectorizer.getOutputCol(),outputCol="v2")
standardScale = StandardScaler(withMean=True,withStd=True,inputCol=con.getOutputCol(),outputCol="scaledFeatures")
kmeans = KMeans(featuresCol=standardScale.getOutputCol(),predictionCol="prediction")

pipeline = Pipeline(stages=[vectorizer,con,standardScale,kmeans])

paramMap = ({kmeans.k: 3,kmeans.initMode:"random"}
            ,{kmeans.k: 4,kmeans.initMode:"random"}
            ,{kmeans.k: 5,kmeans.initMode:"random"})

#builld the model for the pipeline
model =,params=paramMap)

transformedDfs = [i.transform(logDf) for i in model]
transformedModels = [v.stages[-1].computeCost(transformedDfs[i]) for i,v in enumerate(model)]

bestModel = model[-1]

#ide til næste gang beregn beste stuff for alle modeller i pipelinen, derefer tag bedste pipeline ud og byg videre på den.

print([i.computeCost() for i in models])
#transformed = model.transform(logDf)
#rowCenters = [Row(cluster=i,center=Vectors.dense(k)) for i,k in list(enumerate(model.stages[-1].clusterCenters()))]
#centersDf = sc.parallelize(rowCenters).toDF()
#transformedCenter = computeAndInsertClusterCenter(transformed,centersDf)"cvrNummer","prediction","distance").show(truncate=False)

km = model.stages[-1]


class DistanceTransformation(Transformer,HasInputCol,HasOutputCol):
    def __init__(self, inputCol=None, outputCol=None, model=None):
        super(DistanceTransformation, self).__init__()
        kwargs = self.__init__._input_kwargs

    def setParams(self, inputCol=None, outputCol=None, model=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset,model):
        def computeAndInsertClusterCenter(dataset,centers):
            Insert a clusterCenter as column.

            distanceUdf = F.udf(lambda x,y: float(np.sqrt(np.sum((x-y)*(x-y)))),DoubleType())

            return (dataset

paramGrid = ParamGridBuilder() \
    .addGrid(kmeans.k, [2, 4, 10]) \
    .addGrid(kmeans.initSteps, [3,5,10]) \

#create an unsupervised classification evaluator
class ElbowEvaluation(Estimator,ValidatorParams):
    def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None,
        super(ElbowEvaluation, self).__init__()
        kwargs = self.__init__._input_kwargs
    def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)
    computeDistanceToCenterUdf = F.udf(lambda x,y: (x-y)*(x-y),VectorUDT())
    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)
        eva = self.getOrDefault(self.evaluator)
        for j in range(numModels):
            model =, epm[j])
            metric = eva.evaluate(model.transform(dataset, epm[j]))
            metrics[j] += metric
        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
            bestIndex = np.argmin(metrics)
        bestModel =, epm[bestIndex])
        return self._copyValues(TrainValidationSplitModel(bestModel, metrics))
    def copy(self, extra=None):
        Creates a copy of this instance with a randomly generated uid
        and some extra params. This copies creates a deep copy of
        the embedded paramMap, and copies the embedded and extra parameters over.

        :param extra: Extra parameters to copy to the new instance
        :return: Copy of this instance
        if extra is None:
            extra = dict()
        newTVS = Params.copy(self, extra)
        if self.isSet(self.estimator):
        # estimatorParamMaps remain the same
        if self.isSet(self.evaluator):
        return newTVS

class ElbowEvaluationModel(Model, ValidatorParams):
    .. note:: Experimental

    Model from train validation split.

    .. versionadded:: 2.0.0

    def __init__(self, bestModel, validationMetrics=[]):
        super(TrainValidationSplitModel, self).__init__()
        #: best model from cross validation
        self.bestModel = bestModel
        #: evaluated validation metrics
        self.validationMetrics = validationMetrics

    def _transform(self, dataset):
        return self.bestModel.transform(dataset)

    def copy(self, extra=None):
        Creates a copy of this instance with a randomly generated uid
        and some extra params. This copies the underlying bestModel,
        creates a deep copy of the embedded paramMap, and
        copies the embedded and extra parameters over.
        And, this creates a shallow copy of the validationMetrics.

        :param extra: Extra parameters to copy to the new instance
        :return: Copy of this instance
        if extra is None:
            extra = dict()
        bestModel = self.bestModel.copy(extra)
        validationMetrics = list(self.validationMetrics)
        return TrainValidationSplitModel(bestModel, validationMetrics)