In [1]:
#import sys
#sys.path.append("/home/svanhmic/workspace/Python/Erhvervs/src/cvr")
SQLContext.newSession(sqlContext)
from pyspark.sql import functions as F
from pyspark.accumulators import AccumulatorParam
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import StringType,StructField,StructType,ArrayType,DoubleType
from pyspark.ml.linalg import Vectors, VectorUDT,Matrix,MatrixUDT,DenseMatrix
from pyspark.ml.clustering import KMeans
from pyspark.sql import Row
from pyspark.sql import Window
from spark_sklearn import GridSearchCV,Converter
from sklearn.cluster import KMeans as skKmeans
from sklearn.linear_model import LogisticRegression as skLogistic
from pyspark.mllib.linalg import Vectors as oVector, VectorUDT as oVectorUDT
import pandas as pd
from IPython.display import display
import re
import random
from prettytable import PrettyTable
import sys
from datetime import datetime
from operator import add
import numpy as np
import matplotlib.pyplot as plt
#from spark_sklearn import GridSearchCV,Converter
PATH = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/"
sc.addPyFile("/home/svanhmic/workspace/Python/Erhvervs/src/cvr/GridSearchLogRegAndKmeans.py")
import GridSearchLogRegAndKmeans
In [2]:
#import data and rename bad name rank into vaerdiSlope
df = (sqlContext
.read
.parquet(PATH+"featureDataCvr")
)
rankCols = [re.sub(pattern="rank_",repl="vaerdiSlope_",string=i) for i in df.columns ]
renamedDf = (df
.withColumn(colName="reklamebeskyttet",col=F.col("reklamebeskyttet").cast("integer"))
.select([F.col(val).alias(rankCols[idx]) for idx,val in enumerate(df.columns)])
)
renamedDf.show()
In [3]:
def getAllDistances(matrix1,matrix2):
return [[np.linalg.norm(v-w) for v in matrix1 ] for w in matrix2]
def createPrettyTable(matrix,labels,names=None):
#print(type(matrix))
try:
x = PrettyTable(["v"]+labels)
except:
x = PrettyTable(["v"]+list(labels))
if names is not None:
zippedMatrix = zip(names,matrix)
else:
zippedMatrix = enumerate(matrix)
for i in labels:
x.align[str(i)] = "l"
for idx,v in zippedMatrix:
#print(type(v))
x.add_row([str(idx)]+['{:.7}'.format(str(i)) for i in v])
return x
In [5]:
#import name data frame
windowSpecRank =(Window.partitionBy(F.col("cvrNummer"))).orderBy(F.col("periode_gyldigFra").desc())
groupCols = ["cvrNummer","vaerdi"]
companyNameDf = (sqlContext
.read
.parquet(PATH+"companyCvrData")
.withColumn(colName="rank",col=F.rank().over(windowSpecRank))
.filter((F.col("rank")==1) & (F.col("sekvensnr")==0))
.select([F.col(i) for i in groupCols])
.withColumnRenamed(existing="vaerdi",new="navn")
.orderBy(F.col("cvrNummer"))
.cache()
)
#companyNameDf.show()
In [5]:
#take ln(x+1) of features
#Depricated!
labelCols = ["navn","cvrNummer","label","status"]
featCols = [i for i in companyNameDf.columns+renamedDf.columns if i not in labelCols]
minCols = [F.min(i).alias(i) for i in featCols]
minValsRdd = renamedDf.groupby().agg(*minCols).rdd
broadcastedmin = sc.broadcast(minValsRdd.first().asDict())
logColsSelected = [F.col(i).alias(i) for i in labelCols]+[(F.col(i)-F.lit(broadcastedmin.value[i])).alias(i) for i in featCols]
logDf = (renamedDf
.join(companyNameDf,(companyNameDf["cvrNummer"]==renamedDf["cvrNummer"]),"inner")
.drop(companyNameDf["cvrNummer"])
.select(*logColsSelected)
.select([F.col(i).alias(i) for i in labelCols]+[F.log1p(F.col(i)).alias(i) for i in featCols])
.distinct()
.na
.fill(0.0,featCols) # yes i know we set null to zero here. But a company has zero in kapital if it is not opened
.cache()
)
In [6]:
#First convert features to vetor and scale it with mean and stddev
toDenseUDf = F.udf(lambda x: Vectors.dense(x.toArray()),VectorUDT())
vectorizer = VectorAssembler(inputCols=featCols,outputCol="features")
rawVectorDataDf = (vectorizer.transform(renamedDf
.join(companyNameDf,(companyNameDf["cvrNummer"]==renamedDf["cvrNummer"]),"inner")
.drop(companyNameDf["cvrNummer"])
.na
.fill(0.0,featCols)
.distinct()
)
.select(labelCols+[toDenseUDf(vectorizer.getOutputCol()).alias(vectorizer.getOutputCol())])
)
standardScale = StandardScaler(withMean=True,withStd=True,inputCol=vectorizer.getOutputCol(),outputCol="scaledFeatures")
standardScaleModel = standardScale.fit(rawVectorDataDf)
scaledFeaturesDf = (standardScaleModel
.transform(rawVectorDataDf)
.drop("features")
.withColumnRenamed(existing="scaledFeatures",new="features")
.filter(F.col("label") != 2)
)
scaledFeaturesDf.show()
In [7]:
#show new summery statistics
stats = [standardScaleModel.mean,standardScaleModel.std]
statsShow = createPrettyTable(matrix=stats,labels=featCols,names=["mean","std"])
print(statsShow)
In [8]:
#dfDescriptionDf = renamedDf.describe()
In [9]:
def printDf(df):
#this method is very narrow aimed towards this particular problem.
##TODO: create a general method for printing summery statistics.
df.select([F.col("summary")]+[F.col("AarsVaerk_"+str(i)) for i in range(1,6)]).show()
df.select([F.col("summary")]+[F.col("AarsVaerk_"+str(i)) for i in range(6,11)]).show()
df.select([F.col("summary")]+[F.col("AarsVaerk_"+str(i)) for i in range(10,16)]).show()
df.select([F.col("summary")]+[F.col("medArb_"+str(i)) for i in range(1,7)]).show()
df.select([F.col("summary")]+[F.col("medArb_"+str(i)) for i in range(7,12)]).show()
df.select([F.col("summary")]+[F.col("medArb_"+str(i)) for i in range(11,16)]).show()
df.select([F.col("summary")]+[F.col("vaerdiSlope_"+str(i)) for i in range(1,6)]).show()
df.select([F.col("summary")]+[F.col("vaerdiSlope_"+str(i)) for i in range(6,8)]).show()
cols = ["avgVarighed","totalAabneEnheder","totalLukketEnheder","reklamebeskyttet"]
df.select([F.col("summary")]+[F.col(i) for i in cols]).show()
In [ ]:
In [ ]:
Initial analysis completed now it's time for using machine learning!
In [ ]:
In [10]:
def getConfusion(label,prediction,outlier):
diff = abs(label-prediction)
if (diff == 0) and (label == 0):
return "TN"
elif (diff == 0) and (label == 1):
return "TP"
elif (diff == 1) and (label == 0):
return "FP"
elif (diff == 1) and (label == 1):
return "FN"
elif outlier > 0:
return "Excluded"
subUdf = F.udf(lambda x,y,z: getConfusion(x,y,z),StringType())
def computeConfusion(df):
cols = [F.col(i) for i in ("cvrNummer","label","predictionLogReg")]
return (df
.select(*cols,subUdf(df["label"],df["predictionLogReg"],df["isOutlier"]).alias("diff"))
.groupby().pivot("diff",["TP","TN","FN","FP","Excluded"])
.count()
.withColumn(col=(F.col("TP")+F.col("TN"))/(F.col("TP")+F.col("TN")+F.col("FN")+F.col("FP")),colName="Accuracy")
)
def showStats(df):
accuracyDf = computeConfusion(df)
accuracyDf.show()
In [11]:
def runner(df,**kvargs):
'''
This method computes the accuracy for each of iteration of the combined k-means logistic regression method.
It outputs a spark dataframe that has been subjected to the iterations and contains clusters that have been
segmented into k clusters, and has been
'''
if "isOutlier" not in df.columns:
df = df.withColumn(colName="isOutlier",col=F.lit(0))
dfs = GridSearchLogRegAndKmeans.createPandasDf(sc,df,"features","cvrNummer","label","isOutlier")
iteration = kvargs.get("i",2)
labelCol = kvargs.get("labelCol","label")
idCol = kvargs.get("idCol","cvrNummer")
featuresCol = kvargs.get("featuresCol","features")
threshold = kvargs.get("threshold",0.005)
dfsI = dfs
for i in range(1,iteration):
dfsI = (GridSearchLogRegAndKmeans
.onePass(sc,dfsI,{'n_clusters':(8,10),},featuresCol,idCol,labelCol,i,threshold)
)
#display(trainPdf)
#display(testPdf)
#display(OutlierPdf)
sparkDf = sqlContext.createDataFrame(dfsI)
if i == 1:
totalDf = (computeConfusion(sparkDf)
.withColumn(col=F.lit(i),colName="iteration")
)
else:
totalDf = (totalDf
.unionAll(computeConfusion(sparkDf).withColumn(col=F.lit(i),colName="iteration"))
)
return (sparkDf,totalDf)
In [12]:
#print(dfs)
segmentedDf,statsDf = runner(scaledFeaturesDf,i=5)
In [ ]:
In [13]:
statsDf.show()
In [14]:
segmentedDf.show()
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [4]:
vec = [Row(cluster=i,center=Vectors.dense(v)) for i,v in enumerate(bModel.clusterCenters())]
#print(type(vec))
SpDf = sqlContext.createDataFrame(data=vec)
#SpDf.show(truncate=False)
#compute the squared distance beetween the cluster-center and its data points.
featureContributionUdf = F.udf(lambda x,y: (x-y)*(x-y),VectorUDT() )
sqrtUdf = F.udf(lambda x,y: float(Vectors.norm(vector=x-y,p=2)),DoubleType())
printUdf = F.udf(lambda x: type(x),StringType())
#print(np.sum(vec[0]["vec"]))
joinedDf = (predictDf
.withColumn(colName="features",col=toDenseUDf(F.col("features")))
.join(SpDf,on=(predictDf["prediction"]==SpDf["cluster"]),how="left")
.drop(SpDf["cluster"])
.withColumn(colName="contribution",col=featureContributionUdf(F.col("features"),F.col("center")))
.withColumn(colName="distance",col=sqrtUdf(F.col("features"),F.col("center")))
.drop("centers")
.drop("features")
.orderBy(F.col("distance").desc())
)
joinedDf.show(truncate=True)
#joinedDf.printSchema()
In [2]:
joinedDf.groupBy("label").count()
In [ ]:
In [112]:
#show elements in joinedDf
#joinedDf.filter(F.col("prediction")==7).show(truncate=True)
#windowDistToAll = (Window.partitionBy(F.col("prediction")))
class VectorAccumulatorParam(AccumulatorParam):
def zero(self, value):
return [0.0] * len(value)
def addInPlace(self, val1, val2):
for i in xrange(len(val1)):
val1[i] += val2[i]
return val1
va = sc.accumulator([1.0, 2.0, 3.0], VectorAccumulatorParam())
va.value
def g(x):
global va
va += [x] * 3
rdd.foreach(g)
va.value
In [32]:
centroids = bModel.clusterCenters()
cols = ["prediction","label"]
funcs = [F.count, F.sum]
funcCols = [f(i) for f in funcs for i in cols]
#list the number of elements in each cluster
groupedByClusters = (joinedDf
.filter(F.col("label") <= 1)
.groupBy(F.col("prediction"))
.agg(*funcCols)
.drop(F.col("sum(prediction)"))
.drop(F.col("count(label)"))
.withColumn(colName="bancruptcy ratio",col=F.col("sum(label)")/F.col("count(prediction)"))
)
groupedByClusters.show()
#show distances for all cluster centers
allDists = getAllDistances(centroids,centroids)
pred = createPrettyTable(allDists,range(0,len(allDists)))
print(pred)
In [48]:
#take out cluster two
clusterTwo = (joinedDf
.filter((F.col("prediction") == 6) |
(F.col("prediction") == 1) |
(F.col("prediction") == 4) |
(F.col("prediction") == 3) |
(F.col("prediction") == 7) |
(F.col("prediction") == 9))
.drop(F.col("center"))
.orderBy(F.col("prediction").desc(),F.col("distance").desc())
)
clusterTwo.show(300,truncate=True)
In [34]:
def printTotalAndAvgFeatContribution(df,cluster=0,toPrint=False):
avgFeatureContributionRDD = (df
.filter(F.col("prediction") == cluster)
.select("cvrNummer","navn","contribution")
.rdd)
length = len(avgFeatureContributionRDD.first()["contribution"])
totalContribution = avgFeatureContributionRDD.map(lambda x: x[2]).reduce(add)
avgContribution = totalContribution/avgFeatureContributionRDD.count()
columns = [totalContribution.toArray(),avgContribution.toArray(),]
contributions = createPrettyTable(columns,featCols,["total","avg"])
if toPrint:
print(contributions)
return avgContribution
In [35]:
cluster6 = printTotalAndAvgFeatContribution(joinedDf,6)
In [36]:
cluster1 = printTotalAndAvgFeatContribution(joinedDf,1)
In [37]:
cluster8 = printTotalAndAvgFeatContribution(joinedDf,8)
In [38]:
cluster0 = printTotalAndAvgFeatContribution(joinedDf,0)
In [39]:
cluster2 = printTotalAndAvgFeatContribution(joinedDf,2)
cluster3 = printTotalAndAvgFeatContribution(joinedDf,3)
cluster4 = printTotalAndAvgFeatContribution(joinedDf,4)
cluster5 = printTotalAndAvgFeatContribution(joinedDf,5)
cluster7 = printTotalAndAvgFeatContribution(joinedDf,7)
cluster9 = printTotalAndAvgFeatContribution(joinedDf,9)
In [40]:
clusters = np.array([cluster0,cluster1,cluster2,cluster3,cluster4,cluster5,cluster6,cluster7,cluster8,cluster9])
transposedCluster = np.log1p(clusters.transpose())
N = 10
import colorsys
HSV_tuples = [(x*1.0/len(transposedCluster), 0.5, 0.5) for x in range(len(transposedCluster))]
RGB_tuples = list(map(lambda x: colorsys.hsv_to_rgb(*x), HSV_tuples))
ind = np.arange(N) # the x locations for the groups
width = 0.35
plots = [plt.bar(ind, transposedCluster[1], width, color='#d62728')]
former = transposedCluster[1]
for i,v in enumerate(transposedCluster[1:]):
plots.append(plt.bar(ind, v, width, color=RGB_tuples[i],bottom=former))
former += v
plt.ylabel('Scores')
plt.title('Scores by Cluster and features')
plt.xticks(ind, ('C0', 'C1','C2','C3','C4','C5', 'C6','C7', 'C8','C9'))
plt.legend([p[0] for p in plots], featCols,bbox_to_anchor=(1.05, 1),loc=2,borderaxespad=0.)
plt.show()
In [ ]:
#how does it look in each of the clusters