In [41]:
SQLContext.newSession(sqlContext)
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler,StandardScaler,RFormula
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.linalg import VectorUDT,Vectors
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import Window
from pyspark.ml import Pipeline
from pyspark.ml.regression import GeneralizedLinearRegression
import re
from tabulate import tabulate
import random
import sys
import numpy as np
In [88]:
#import data and rename bad name rank into vaerdiSlope
df = sqlContext.read.parquet("/home/svanhmic/workspace/Python/Erhvervs/data/cdata/featureDataCvr")
df.select(["cvrNummer"])
rankCols = [re.sub(pattern="rank_",repl="vaerdiSlope_",string=i) for i in df.columns ]
renamedDf = (df.withColumn(colName="reklamebeskyttet",col=F.col("reklamebeskyttet").cast("integer"))
.select([F.col(val).alias(rankCols[idx]) for idx,val in enumerate(df.columns)])
.withColumn(col=F.col("totalAabneEnheder").cast("double"),colName="totalAabneEnheder")
.withColumn(col=F.col("totalLukketEnheder").cast("double"),colName="totalLukketEnheder")
.withColumn(col=F.col("reklamebeskyttet").cast("double"),colName="reklamebeskyttet")
.withColumn(col=F.col("label").cast("double"),colName="label")
)
renamedDf.show()
In [ ]:
In [ ]:
In [99]:
strs = ""
excludedCols = ["medArb_"+str(i) for i in range(1,16)]+["cvrNummer","label","status"]
for i in renamedDf.columns:
if i not in excludedCols:
strs += i+" + "
#excludedCols
imputedDf = renamedDf.fillna(value=0.0)
formula = RFormula(formula="label ~ "+strs[:-3],labelCol="label")
glr = GeneralizedLinearRegression(family="binomial", link="logit", maxIter=10, regParam=0.3)
print(glr.
lr = LogisticRegression()
pipeline = Pipeline(stages=[formula,glr])
grid = (ParamGridBuilder()
.baseOn({lr.predictionCol:"prediction"})
.baseOn({lr.rawPredictionCol:"rawPrediction"})
.baseOn({lr.probabilityCol:"probability"})
.baseOn({lr.labelCol:"label"})
.baseOn({lr.featuresCol:"features"})
.addGrid(param=lr.elasticNetParam,values=[0.1,1.0])
.addGrid(param=lr.getMaxIter,values=[10])
.build()
)
evaluate = BinaryClassificationEvaluator()
trainEvalModel = TrainValidationSplit(estimator=pipeline,estimatorParamMaps=grid,evaluator=evaluate,trainRatio=0.8)
In [96]:
cols = [i for i in renamedDf.columns if i not in excludedCols]+["label"]
model = trainEvalModel.fit(imputedDf.select(*cols).filter(F.col("label") <= 1))
In [91]:
predict = model..transform(imputedDf.select(*cols).filter(F.col("label") <= 1))
imputedDf.select(*cols).filter(F.col("label") <= 1).printSchema()
In [95]:
p = model.stages[1].summary
print("Coefficient Standard Errors: " + str(p.coefficientStandardErrors))
print("T Values: " + str(p.tValues))
print("P Values: " + str(p.pValues))
print("Dispersion: " + str(p.dispersion))
print("Null Deviance: " + str(p.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(p.residualDegreeOfFreedomNull))
print("Deviance: " + str(p.deviance))
print("Residual Degree Of Freedom: " + str(p.residualDegreeOfFreedom))
print("AIC: " + str(p.aic))
print("Deviance Residuals: ")
p.residuals().show()
In [3]:
#check null values.
descriptionCVR = renamedDf.describe()
descriptionCVR.select("summary").show()
descriptionCVR.filter(F.col("summary")=="count").show()
In [4]:
#check mean and stddev
descriptionCVR.filter((F.col("summary") =="mean") | (F.col("summary") =="stddev")).show()
In [5]:
windowSpecRank =(Window.partitionBy(F.col("cvrNummer"))).orderBy(F.col("gyldigFra").desc())
groupCols = ["cvrNummer","vaerdi"]
companyNameDf = (sqlContext
.read
.parquet("/home/svanhmic/workspace/Python/Erhvervs/data/cdata/"+"companyCvrData")
.withColumn(colName="rank",col=F.rank().over(windowSpecRank))
.filter((F.col("rank")==1) & (F.col("sekvensnr")==0))
.select([F.col(i) for i in groupCols])
.withColumnRenamed(existing="vaerdi",new="navn")
.orderBy(F.col("cvrNummer"))
.cache()
)
companyNameDf.show(2)
In [6]:
#take ln(x+1) of features
labelCols = ["cvrNummer","label","status"]
logFeatCols = [i for i in renamedDf.columns if i not in labelCols]
#print(logFeatCols)
mininum = descriptionCVR.filter(F.col("summary")=="min").collect()[0]
#print(mininum)
logDf = (renamedDf
.select([F.col("cvrNummer"),F.col("label")]+[F.log1p(F.col(i)-F.lit(mininum[i])).alias(i) for i in logFeatCols])
.na
.fill(0.0,logFeatCols)
)
#logDf.show(2)
#First convert features to vetor
toDenseUDf = F.udf(lambda x: Vectors.dense(x.toArray()),VectorUDT())
vectorizer = VectorAssembler(inputCols=logFeatCols,outputCol="features")
rawVectorDataDf = (vectorizer.transform(renamedDf
.join(companyNameDf,(companyNameDf["cvrNummer"]==renamedDf["cvrNummer"]),"inner")
.drop(companyNameDf["cvrNummer"])
#.select(*logColsSelected)
.na
.fill(0.0,logFeatCols)
.distinct()
)
.select(["navn"]+labelCols+[toDenseUDf(vectorizer.getOutputCol()).alias(vectorizer.getOutputCol())])
)
standardScale = StandardScaler(withMean=True,withStd=True,inputCol=vectorizer.getOutputCol(),outputCol="scaledFeatures")
standardScaleModel = standardScale.fit(rawVectorDataDf)
scaledFeaturesDf = (standardScaleModel
.transform(rawVectorDataDf)
.drop("features")
.withColumnRenamed(existing="scaledFeatures",new="features")
)
scaledFeaturesDf.show()
In [14]:
#put them into a feature vecto
vectorizedTestDf = scaledFeaturesDf.filter(F.col("label") <= 1).sampleBy("label", fractions={0: 0.2, 1: 0.2}, seed=42)
vectorizedTestDf.groupBy("label").count().show()
scaledCvrDf = scaledFeaturesDf.select(F.col("cvrNummer"))
cvrTestDf = vectorizedTestDf.select("cvrNummer")
cvrTrainDf = scaledCvrDf.subtract(cvrTestDf) #take the other partion as training set
vectorizedTrainDf = (scaledFeaturesDf
.filter(F.col("label") <= 1)
.join(cvrTrainDf,(scaledFeaturesDf["cvrNummer"] == cvrTrainDf["cvrNummer"]),"inner")
.drop(cvrTrainDf["cvrNummer"])
)
vectorizedTrainDf.groupBy("label").count().show()
print("Number of data points: "+str(scaledFeaturesDf.count()))
print("Number of data points train: "+str(vectorizedTrainDf.select("cvrNummer").count()))
print("Number of data points test: "+str(vectorizedTestDf.select("cvrNummer").count()))
#vectorizedTrainDf.printSchema()
#print(vectorizedTrainDf.first())
In [15]:
vectorizedTrainDf.show()
In [16]:
#Train the logistic regressionmodel
lr = LogisticRegression()
grid = (ParamGridBuilder()
.baseOn({lr.predictionCol:"prediction"})
.baseOn({lr.rawPredictionCol:"rawPrediction"})
.baseOn({lr.probabilityCol:"probability"})
.baseOn({lr.labelCol:"label"})
.baseOn({lr.featuresCol:"features"})
.addGrid(param=lr.elasticNetParam,values=[0.1,1.0])
.addGrid(param=lr.getMaxIter,values=[10])
.build()
)
evaluate = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
crossVal = CrossValidator(estimator=lr,estimatorParamMaps=grid,evaluator=evaluate,numFolds=10)
crossValModel = crossVal.fit(dataset=vectorizedTrainDf)
evaluate.evaluate(crossValModel.transform(vectorizedTestDf))
#coef = lrModel.coefficients
Out[16]:
In [17]:
bestModel = crossValModel.bestModel
In [18]:
#test the values
result = bestModel.transform(vectorizedTestDf)
In [12]:
#
In [19]:
#result.orderBy("prediction").show(100)
confCols = [F.col(i) for i in ["TP","TN","FP","FN"]]
csCols = [F.when((F.col("label")==1) & (F.col("difference") == 0),"TP")
,F.when((F.col("label")==0) & (F.col("difference") == 0),"TN")
,F.when(F.col("difference") == 1,"FN")
,F.when(F.col("difference") == -1,"FP")
]
confusionDf = result.select(F.col("label"),F.col("prediction"),(F.col("label")-F.col("prediction")).alias("difference"))
(confusionDf
.select(F.coalesce(*csCols).alias("cases")
#,.otherwise(0).alias("FP")
#,.otherwise(0).alias("FN")
)
.groupBy("cases").count()
).show()
In [17]:
crossValModel.bestModel.hasSummary
Out[17]:
In [20]:
summary = crossValModel.bestModel.summary
In [24]:
summary.predictions.show()
In [ ]:
In [ ]: