In [1]:
SQLContext.newSession(sqlContext)
from pyspark.sql import functions as F
from pyspark.accumulators import AccumulatorParam
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import StringType,StructField,StructType,ArrayType,DoubleType
from pyspark.ml.linalg import Vectors, VectorUDT,Matrix,MatrixUDT,DenseMatrix
from pyspark.ml.feature import StandardScaler,VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql import Row
from pyspark.sql import Window
import re
import random
from prettytable import PrettyTable
import sys
from datetime import datetime
from operator import add
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import silhouette_score,silhouette_samples
#from spark_sklearn import GridSearchCV,Converter
PATH = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/"
regnskabsPath= "/home/svanhmic/workspace/Python/Erhvervs/data/regnskabsdata/sparkdata/parquet"
The following data frame contains the same data as in the bancruptcy and outlier method.
In [2]:
#import data and rename bad name rank into vaerdiSlope
cvrDf = sqlContext.read.parquet(PATH+"featureDataCvr")
regnDf =sqlContext.read.parquet(regnskabsPath+"/formatAndCleanedRegnskaber.parque")
rankCols = [re.sub(pattern="rank_",repl="vaerdiSlope_",string=i) for i in cvrDf.columns ]
renamedDf = (cvrDf
.withColumn(colName="reklamebeskyttet",col=F.col("reklamebeskyttet").cast("integer"))
.select([F.col(val).alias(rankCols[idx]) for idx,val in enumerate(cvrDf.columns)])
)
#renamedDf.show()
In [3]:
#regnDf.show()
In [4]:
#regnDf.printSchema()
dcolsRegnDict = dict(regnDf.dtypes)
print(regnDf.count())
#regnDf.describe().show()
doubleFeatures = list(i for i in dcolsRegnDict.keys() if dcolsRegnDict[i] not in ["string","date"])
len(doubleFeatures)
Out[4]:
In [ ]:
In [ ]:
In [ ]:
In [6]:
import matplotlib.pyplot as plt
import pandas
from pandas.tools.plotting import scatter_matrix
import seaborn as sb
In [7]:
doublePDf = (regnDf
.select(doubleFeatures)
.toPandas()) #topandas dataframe
#print(np.log1p(doublePDf["Assets"])+100)
loggedPDF = doublePDf.apply(np.log1p)
describePandasDf = loggedPDF.describe()
counts = describePandasDf.filter(like="count",axis=0).transpose()
above = counts[(counts > 8000).any(axis=1)]
columns = [i for i in above.index.values if i not in ("id","IdentificationNumberCvrOfReportingEntity","IdentificationNumberCvrOfSubmittingEnterprise")]
columns
Out[7]:
In [53]:
scatter_matrix(loggedPDF[columns],alpha=0.1,figsize=(20,20))
plt.show()
Assets and LiabilitiesAndEquity are highlig correlated. Again shown in the plot below.
ShorttermLiabilitiesOtherThanProvisions, OtherShorttermPayables and LiabilitiesOtherThanProvisions are correlated to some degree...
Equity and LiabilitiesAndQuity are correlated to some degree.
In [54]:
corrMatrixPDf = loggedPDF[columns].corr()
sb.heatmap(corrMatrixPDf,
xticklabels=corrMatrixPDf.columns.values,
yticklabels=corrMatrixPDf.columns.values)
plt.show()
In [10]:
#renamedDf.printSchema()
In [11]:
holdOutCols = ["cvrNummer","status","label"]
inputCols = [i for i in renamedDf.columns if i not in holdOutCols]
toDenseUdf = F.udf(lambda x: Vectors.dense(x.toArray()),VectorUDT())
vectorMaker = VectorAssembler(inputCols=inputCols,outputCol="features")
vectorDf = (vectorMaker
.transform(renamedDf.na.fill(0.0))
.select(toDenseUdf(F.col("features")).alias("features"))
)
#vectorDf.show()
standardScale = StandardScaler(withMean=True,withStd=True,inputCol=vectorMaker.getOutputCol(),outputCol="scaledFeatures")
standardScaleModel = standardScale.fit(vectorDf)
standardDf = (standardScaleModel
.transform(vectorDf)
.select(F.col("scaledFeatures"))
)
#standardDf.show()
In [12]:
FeatureRow = Row(*inputCols)
standardPDf = (standardDf
.rdd
.map(lambda x: dict(zip(inputCols,[float(i) for i in x[0].toArray()])))
.toDF()
.toPandas()
)
In [151]:
print(standardPDf[standardPDf.columns][:4])
corrMatrixPDf = standardPDf.corr()
sb.heatmap(corrMatrixPDf,
xticklabels=corrMatrixPDf.columns.values,
yticklabels=corrMatrixPDf.columns.values)
plt.show()
In [161]:
medArbCols = ["medArb_"+str(i) for i in range(1,6)]
AarsVaerCols = ["AarsVaerk_"+str(i) for i in range(1,6)]
medarbejdPDf = standardPDf.ix[:,medArbCols+AarsVaerCols]
corrMatrixPDf = medarbejdPDf.corr()
sb.heatmap(corrMatrixPDf,
xticklabels=corrMatrixPDf.columns.values,
yticklabels=corrMatrixPDf.columns.values)
plt.show()
In [228]:
#plot scatter matrix
#scatter_matrix(medarbejdPDf,alpha=0.1,figsize=(20,20))
#plt.show()
In [207]:
def computeRatio(aarsvaerk,medarbejd):
if (medarbejd == None) or (aarsvaerk == None) :
return 0.0
elif (medarbejd == 0):
return float(np.log1p(aarsvaerk/0.5 ))
else:
return float(np.log1p(aarsvaerk/medarbejd))
computeAarsMedRatioUdf = F.udf(lambda x,y: computeRatio(x,y),DoubleType())
ratioCols = [computeAarsMedRatioUdf(AarsVaerCols[i],medArbCols[i]).alias("ArsOverMedarbj"+str(i+1)) for i in range(len(AarsVaerCols))]
aarsVaerkToMedarbjdDf = renamedDf.select(holdOutCols+ratioCols).distinct()
aarsVaerkToMedarbjdDf.orderBy(["ArsOverMedarbj"+str(i+1) for i in range(5)],ascending=[0,0,0,0,0]).show()
In [224]:
#check if some cvrs are overlapping
overlapDf = (renamedDf
.join(regnDf,renamedDf["cvrNummer"]==regnDf["IdentificationNumberCvrOfReportingEntity"],"inner")
.select(renamedDf.columns+doubleFeatures)
)
overlapDf.show(1)
In [ ]:
overlapDf
In [ ]: