This notebook should contain a pipeline method for bancruptcy


In [1]:
SQLContext.newSession(sqlContext)

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, ValidatorParams
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, DoubleType, IntegerType
from pyspark.ml.linalg import Vectors, VectorUDT, Matrix, MatrixUDT, DenseMatrix
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row, Window, functions as F
from pyspark.ml import Pipeline, Transformer, Estimator
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark import keyword_only 
#from spark_sklearn import GridSearchCV,Converter
#from sklearn.cluster import KMeans as skKmeans
#from sklearn.linear_model import LogisticRegression as skLogistic

import pandas as pd

import re
import random
from prettytable import PrettyTable
import sys
from datetime import datetime
from operator import add
import numpy as np
import matplotlib.pyplot as plt
#from spark_sklearn import GridSearchCV,Converter
PATH = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/"
sc.addPyFile("/home/svanhmic/workspace/Python/Erhvervs/src/cvr/GridSearchLogRegAndKmeans.py")
sc.addPyFile("/home/svanhmic/workspace/Python/Erhvervs/src/cvr/ConvertAllToVecToMl.py")

import GridSearchLogRegAndKmeans
from ConvertAllToVecToMl import ConvertAllToVecToMl as convert


/usr/local/lib/python3.5/dist-packages/sklearn/cross_validation.py:44: DeprecationWarning: This module was deprecated in version 0.18 in favor of the model_selection module into which all the refactored classes and functions are moved. Also note that the interface of the new CV iterators are different from that of this module. This module will be removed in 0.20.
  "This module will be removed in 0.20.", DeprecationWarning)
/usr/local/lib/python3.5/dist-packages/sklearn/grid_search.py:43: DeprecationWarning: This module was deprecated in version 0.18 in favor of the model_selection module into which all the refactored classes and functions are moved. This module will be removed in 0.20.
  DeprecationWarning)

import the data


In [22]:
#RAW DATA!!! 
df = sqlContext.read.parquet(PATH+"featureDataCvr")

#exclude some of the variables, and cast all variables to double
excludeCols = ["medArb_"+str(i) for i in range(1,16)] # we don't need the medarbejders 
includeCols = [i for i in df.columns if i not in excludeCols]
rankCols = [re.sub(pattern="rank_",repl="vaerdiSlope_",string=i) for i in includeCols]
finalCols = [F.col(i) for i in includeCols[:2]]+[F.col(i).cast("double") for i in includeCols[2:]]

renamedDf = (df
             .select(*finalCols)
             .select([F.col(val).alias(rankCols[idx]) for idx,val in enumerate(includeCols)])
            )


#renamedDf.show(3)
#renamedDf.printSchema()
#import name data frame

In [24]:
#import company names with CVR-number
windowSpecRank =(Window.partitionBy(F.col("cvrNummer"))).orderBy(F.col("gyldigFra").desc())
groupCols = ["cvrNummer","vaerdi"]

companyNameDf = (sqlContext
                 .read
                 .parquet(PATH+"companyCvrData")
                 .withColumn(colName="rank",col=F.rank().over(windowSpecRank))
                 .filter((F.col("rank")==1) & (F.col("sekvensnr")==0))
                 .select([F.col(i) for i in groupCols])
                 .withColumnRenamed(existing="vaerdi",new="navn")
                 .orderBy(F.col("cvrNummer"))
                 .cache()
                )

In [5]:
#build own transformer

In [ ]:


In [23]:
labelCols = ["navn","cvrNummer","label","status"]
featCols = [i for i in companyNameDf.columns+renamedDf.columns if i not in labelCols]

#get minimum values from each column
minCols = [F.min(i).alias(i) for i in featCols]
minValsRdd = renamedDf.groupby().agg(*minCols).rdd
broadcastedmin = sc.broadcast(minValsRdd.first().asDict())

#create array that subtracts minimum value in the numeric columns.
logColsSelected = [F.col(i).alias(i) for i in labelCols]+[(F.col(i)-F.lit(broadcastedmin.value[i])).alias(i) for i in featCols]

#takes log(x+1) to the numeric columns and fills the blanks with 0.0 
logDf = (renamedDf
         .join(companyNameDf,(companyNameDf["cvrNummer"]==renamedDf["cvrNummer"]),"inner")
         .drop(companyNameDf["cvrNummer"])
         .select(*logColsSelected)
         .select([F.col(i).alias(i) for i in labelCols]+[F.log1p(F.col(i)).alias(i) for i in featCols])
         .distinct()
         .na
         .fill(0.0,featCols)
         .cache()
        )

In [131]:
def computeAndInsertClusterCenter(dataset,centers):
    '''
        Insert a clusterCenter as column.
    '''
    
    distanceUdf = F.udf(lambda x,y: float(np.sqrt(np.sum((x-y)*(x-y)))),DoubleType())
    
    return (dataset
            .join(F.broadcast(centers),on=(dataset["prediction"]==centers["cluster"]),how="inner")
            .withColumn(colName="distance",col=distanceUdf(F.col("scaledFeatures"),F.col("center")))
            .drop("cluster")
            .drop("features")
            .drop("v2")
            )

In [111]:


In [138]:
vectorizer = VectorAssembler(inputCols=featCols,outputCol="features")
con = convert(inputCol=vectorizer.getOutputCol(),outputCol="v2")
standardScale = StandardScaler(withMean=True,withStd=True,inputCol=con.getOutputCol(),outputCol="scaledFeatures")
kmeans = KMeans(featuresCol=standardScale.getOutputCol(),predictionCol="prediction")

pipeline = Pipeline(stages=[vectorizer,con,standardScale,kmeans])

paramMap = ({kmeans.k: 3,kmeans.initMode:"random"}
            ,{kmeans.k: 4,kmeans.initMode:"random"}
            ,{kmeans.k: 5,kmeans.initMode:"random"})

In [139]:
#builld the model for the pipeline
model = pipeline.fit(logDf,params=paramMap)

In [164]:
transformedDfs = [i.transform(logDf) for i in model]
transformedModels = [v.stages[-1].computeCost(transformedDfs[i]) for i,v in enumerate(model)]

In [166]:
transformedModels
bestModel = model[-1]

#ide til næste gang beregn beste stuff for alle modeller i pipelinen, derefer tag bedste pipeline ud og byg videre på den.

In [168]:
type(pipeline)


Out[168]:
pyspark.ml.pipeline.Pipeline

In [150]:
print([i.computeCost() for i in models])
#transformed = model.transform(logDf)
#rowCenters = [Row(cluster=i,center=Vectors.dense(k)) for i,k in list(enumerate(model.stages[-1].clusterCenters()))]
#centersDf = sc.parallelize(rowCenters).toDF()
#transformedCenter = computeAndInsertClusterCenter(transformed,centersDf)
#transformedCenter.select("cvrNummer","prediction","distance").show(truncate=False)


---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-150-c7eb5ca73d1d> in <module>()
----> 1 print([i.computeCost() for i in models])
      2 #transformed = model.transform(logDf)
      3 #rowCenters = [Row(cluster=i,center=Vectors.dense(k)) for i,k in list(enumerate(model.stages[-1].clusterCenters()))]
      4 #centersDf = sc.parallelize(rowCenters).toDF()
      5 #transformedCenter = computeAndInsertClusterCenter(transformed,centersDf)

<ipython-input-150-c7eb5ca73d1d> in <listcomp>(.0)
----> 1 print([i.computeCost() for i in models])
      2 #transformed = model.transform(logDf)
      3 #rowCenters = [Row(cluster=i,center=Vectors.dense(k)) for i,k in list(enumerate(model.stages[-1].clusterCenters()))]
      4 #centersDf = sc.parallelize(rowCenters).toDF()
      5 #transformedCenter = computeAndInsertClusterCenter(transformed,centersDf)

/usr/local/share/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
    842         if name not in self.columns:
    843             raise AttributeError(
--> 844                 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
    845         jc = self._jdf.apply(name)
    846         return Column(jc)

AttributeError: 'DataFrame' object has no attribute 'computeCost'

In [66]:
km = model.stages[-1]
km
#km.computeCost(transformed)


Out[66]:
KMeans_4423b3fa5e143c7a1fd3

In [41]:
class DistanceTransformation(Transformer,HasInputCol,HasOutputCol):
    '''
    
    '''
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, model=None):
        super(DistanceTransformation, self).__init__()
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, model=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset,model):
        
        
        def computeAndInsertClusterCenter(dataset,centers):
            '''
            Insert a clusterCenter as column.
            '''

            distanceUdf = F.udf(lambda x,y: float(np.sqrt(np.sum((x-y)*(x-y)))),DoubleType())

            return (dataset
                    .join(F.broadcast(centers),on=(dataset["prediction"]==centers["cluster"]),how="inner")
                    .withColumn(colName="distance",col=distanceUdf(F.col("scaledFeatures"),F.col("center")))
                    .drop("cluster")
                    .drop("features")
                    .drop("v2")
                    )

In [40]:
print(getCenters(0))

paramGrid = ParamGridBuilder() \
    .addGrid(kmeans.k, [2, 4, 10]) \
    .addGrid(kmeans.initSteps, [3,5,10]) \
    .build()


[ 1.24316123  1.29036901  1.43017047  1.57093235  1.68765505  1.79981332
  1.89225804  1.96319964  2.00468028  2.00085553  1.96622339  1.90925039
  1.81417976  1.67450273  1.56527918  1.07171302  1.19343275  1.35238984
  1.49336364  1.62851051  1.75304658  1.85680779  1.93851861  1.99426609
  2.00190868  1.97999797  1.92952839  1.83842153  1.70008073  1.59322725
  0.0058728   0.29533131  0.53410141  0.77398575  0.71721803  0.60442868
  0.50440329  0.41001621  0.34144952  0.28307943  0.0791012 ]

In [9]:
#create an unsupervised classification evaluator
class ElbowEvaluation(Estimator,ValidatorParams):
    '''
        doc
    '''
    
    @keyword_only
    def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None,
                 seed=None):
        super(ElbowEvaluation, self).__init__()
        kwargs = self.__init__._input_kwargs
        self._set(**kwargs)
    
    @keyword_only
    def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)
    
    computeDistanceToCenterUdf = F.udf(lambda x,y: (x-y)*(x-y),VectorUDT())
    
    
    def _fit(self, dataset):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)
        eva = self.getOrDefault(self.evaluator)
        
        for j in range(numModels):
            model = est.fit(dataset, epm[j])
            model.
            
            metric = eva.evaluate(model.transform(dataset, epm[j]))
            metrics[j] += metric
        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)
        bestModel = est.fit(dataset, epm[bestIndex])
        return self._copyValues(TrainValidationSplitModel(bestModel, metrics))
    
    def copy(self, extra=None):
        """
        Creates a copy of this instance with a randomly generated uid
        and some extra params. This copies creates a deep copy of
        the embedded paramMap, and copies the embedded and extra parameters over.

        :param extra: Extra parameters to copy to the new instance
        :return: Copy of this instance
        """
        if extra is None:
            extra = dict()
        newTVS = Params.copy(self, extra)
        if self.isSet(self.estimator):
            newTVS.setEstimator(self.getEstimator().copy(extra))
        # estimatorParamMaps remain the same
        if self.isSet(self.evaluator):
            newTVS.setEvaluator(self.getEvaluator().copy(extra))
        return newTVS

In [ ]:
class ElbowEvaluationModel(Model, ValidatorParams):
    """
    .. note:: Experimental

    Model from train validation split.

    .. versionadded:: 2.0.0
    """

    def __init__(self, bestModel, validationMetrics=[]):
        super(TrainValidationSplitModel, self).__init__()
        #: best model from cross validation
        self.bestModel = bestModel
        #: evaluated validation metrics
        self.validationMetrics = validationMetrics

    def _transform(self, dataset):
        return self.bestModel.transform(dataset)

    def copy(self, extra=None):
        """
        Creates a copy of this instance with a randomly generated uid
        and some extra params. This copies the underlying bestModel,
        creates a deep copy of the embedded paramMap, and
        copies the embedded and extra parameters over.
        And, this creates a shallow copy of the validationMetrics.

        :param extra: Extra parameters to copy to the new instance
        :return: Copy of this instance
        """
        if extra is None:
            extra = dict()
        bestModel = self.bestModel.copy(extra)
        validationMetrics = list(self.validationMetrics)
        return TrainValidationSplitModel(bestModel, validationMetrics)