In [2]:
%%HTML
<script>
function code_toggle() {
if (code_shown){
$('div.input').hide('500');
$('#toggleButton').val('Show Code')
} else {
$('div.input').show('500');
$('#toggleButton').val('Hide Code')
}
code_shown = !code_shown
}
$( document ).ready(function(){
code_shown=false;
$('div.input').hide()
});
</script>
<form action="javascript:code_toggle()"><input type="submit" id="toggleButton" value="Show Code"></form>
In [5]:
%%HTML
<style type="text/css">
.output_prompt {
display:none !important;
}
</style>
In [1]:
#Always Pyspark first!
ErhvervsPath = "/home/svanhmic/workspace/Python/Erhvervs"
from pyspark.sql import functions as F, Window, WindowSpec
from pyspark.sql import Row
from pyspark.sql.types import StringType,ArrayType,IntegerType,DoubleType,StructField,StructType
sc.addPyFile(ErhvervsPath+"/src/RegnSkabData/ImportRegnskabData.py")
sc.addPyFile(ErhvervsPath+'/src/RegnSkabData/RegnskabsClass.py')
sc.addPyFile(ErhvervsPath+'/src/cvr/Fstat.py')
sc.addPyFile(ErhvervsPath+'/src/cvr/GetNextJsonLayer.py')
import sys
import re
import os
import ImportRegnskabData
import GetNextJsonLayer
import itertools
import functools
%matplotlib inline
import seaborn as sb
import pandas as pan
import matplotlib.pyplot as plt
import numpy as np
import Fstat
import scipy as sp
import IPython
from IPython.display import display, Markdown, Latex
from pandas.tools.plotting import scatter_matrix
In [2]:
regnskabPath = ErhvervsPath+'/data/regnskabsdata/sparkdata/parquet/regnskaber.parquet'
csvPath = ErhvervsPath+'/data/regnskabsdata/cleanCSV'
taxPath = ErhvervsPath+'/data/regnskabsdata/cleanTaxLists'
In [3]:
lenUdf = F.udf(lambda x: ImportRegnskabData.lend(x),IntegerType())
convertedUdf = F.udf(lambda x: str(ImportRegnskabData.convertToSym(x)),StringType())
strs ="Anvendt regnskabspraksis Den anvendte regnskabspraksis er uændret i forhold til sidste år.                Generelt om indregning og måling        Regnskabet er udarbejdet med udgangspunkt i det historiske kostprisprincip.                Indtægter indregnes i resultatopgørelsen i takt med, at de indtjenes. Herudover indregnes værdireguleringer af finansielle aktiver og forpligtelser, der måles til dagsværdi eller amortiseret kostpris. Endvidere indregnes i resultatopgørelsen alle omkostninger, der er afholdt for at opnå årets indtjening, herunder afskrivninger, nedskrivninger og hensatte forpligtelser samt tilbageførsler som følge af ændrede regnskabsmæssige skønstrs ="
In [4]:
def pivotOnText(df,**kvargs):
'''
does the pivotation on text cols and removes the excess counts
input df - dataframe
kvargs - optional arguments included are:
pivotCol - specify column that shoould be pivotated, default type
valueCol - specify column that should be aggregated on, defalut vaerdi
expectedList - specify the values in the pivotated column, default ["KAPITAL"]
'''
#sets some of the optional parameters
pivotCol = kvargs.get("pivotCol","type")
expectedList = kvargs.get("expectedList",["KAPITAL"])
valueCol = kvargs.get("valueCol","vaerdi")
holdOutsCols = [pivotCol,valueCol]
nonHoldOutCols = [i for i in df.columns if i not in holdOutsCols]
newDf = (df
.groupBy(df.columns)
.count()
.groupBy(*nonHoldOutCols)
.pivot(pivotCol,expectedList)
.agg(F.max(F.struct("count",valueCol)))
)
expandedDf = GetNextJsonLayer.expandSubCols(newDf,*expectedList)
newCols = [i for i in expandedDf.columns if i not in [v+"_count" for v in expectedList] ]
return expandedDf.select(newCols)
In [52]:
def showScatterMatrix(df,cols):
featuresDf = df.select(*cols).distinct().drop("cvrNummer").toPandas()
axes = scatter_matrix(featuresDf,alpha=0.5,figsize=[9,9])
[plt.setp(item.yaxis.get_majorticklabels(), 'size', 6) for item in axes.ravel()]
#x ticklabels
[plt.setp(item.xaxis.get_majorticklabels(), 'size', 6) for item in axes.ravel()]
[plt.setp(item.yaxis.get_label(), 'size', 6) for item in axes.ravel()]
#x labels
[plt.setp(item.xaxis.get_label(), 'size', 6) for item in axes.ravel()]
plt.show()
In [94]:
cvrPath = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/parquet"
namePath = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/"
cvrfiles = os.listdir(cvrPath)
print(cvrfiles)
In [92]:
#import crv data
cvrDf = (sqlContext
.read
.parquet(cvrPath+"/"+cvrfiles[1])
)
#cvrDf.show(1)
print(cvrDf.select("cvrNummer").distinct().count())
cvrDf.printSchema()
In [16]:
#Extract all Aps and A/S companies
companyByAsApsDf = sqlContext.read.parquet(cvrPath+"/AllApsAs.parquet")
companyByAsApsDf.drop("rank").drop("ansvarligDataleverandoer").drop("virksomhedsformkode").show(10)
In [156]:
display(Markdown("#### Import medarbejdstal"))
medarbejdsDf = sqlContext.read.parquet(cvrPath+"/TotalAarsVaerker.parquet")
medarbejdsDf.limit(10).toPandas()#.show(10)
Out[156]:
In [18]:
# we are only interested in kapital after 1997
mainKapitalDf = (sqlContext
.read
.parquet(cvrPath+"/KaptialDataFrame.parquet")
.drop("KAPITALKLASSER_vaerdi")
.drop("KAPITAL_DELVIST_vaerdi")
.withColumn(col=F.coalesce(F.col("gyldigTil"),F.lit(F.current_date())),colName="gyldigTil")
.withColumn(col=F.datediff(F.col("GyldigTil"),F.col("gyldigFra")),colName="datediff")
.withColumn(col=F.col("KAPITAL_vaerdi").cast("double"),colName="KAPITAL_vaerdi")
.filter(F.year("gyldigFra") >= 1997)
)
mainKapitalDf.show(5)
mainKapitalDf.printSchema()
The following cell divides the attributes into two data frames in order to make a proper sampling of medarbejdstal compared to years. Yeah kapital entry is looked at, in respect to the amount of days, that this entry is current. Meaning, entries that are current, for more than a year gets joined as secondary tabel to medarbejdstal. Entries, that are opposite gets joined as primary tabel.
In [20]:
display(Markdown("### Hvornår opdateres kapitalværdierne?"))
#How does the duration look for posting kapitals?
datediffs = mainKapitalDf.select(["cvrNummer","datediff"]).distinct().na.drop("any").toPandas()
plt.hist(datediffs["datediff"],bins=100,range=[0,8000])
plt.title("Histogram of durration of submissions for kapital")
plt.xlabel("Days")
plt.ylabel("Count")
plt.axis()
plt.show()
#datediffs
In [21]:
avgKapital = (mainKapitalDf
.filter(F.col("KAPITALVALUTA_vaerdi") == "DKK")
.select("cvrNummer","KAPITAL_vaerdi","gyldigFra")
.distinct()
.groupBy("cvrNummer")
.mean("KAPITAL_vaerdi")
.withColumnRenamed(existing="avg(KAPITAL_vaerdi)",new="avgkapital")
.na
.drop("any")
.toPandas())
p1 = plt.hist(avgKapital["avgkapital"],bins=150,range=[125000,1000000000])
plt.yscale('log')
plt.title("Average kapital for each Company in DKK")
plt.ylabel("Count")
plt.xlabel("Kroner")
display(Markdown("### Hvad er den gennemsnitlig kapital i virksomhederne?"))
plt.show()
Medarbejdstal is created here!
In [22]:
#the kapital gets joined with years in mainKap over
kapOverDf = (medarbejdsDf
.join(other=mainKapitalDf,on=((medarbejdsDf["cvrNummer"] == mainKapitalDf["cvrNummer"])
& (medarbejdsDf["aar"] == mainKapitalDf["aar"])
& (medarbejdsDf["maaned"] == mainKapitalDf["maaned"])),how="inner")
.drop(mainKapitalDf["cvrNummer"])
.drop(mainKapitalDf["aar"])
.drop(mainKapitalDf["maaned"])
.filter(F.col("KAPITALVALUTA_vaerdi")=="DKK")
)
desckapOverDf = kapOverDf.describe()
In [23]:
kapOverDf.orderBy("cvrNummer","aar","maaned").show()
In [24]:
#totalDf.printSchema()
#totalDf.orderBy("cvrNummer","aar").show()
describeKapMedDf = (kapOverDf
.filter(F.col("KAPITALVALUTA_vaerdi")=="DKK")
.withColumnRenamed(existing="lower_intervalKodeAntalAarsvaerk",new="AntalAarsvaerk")
.withColumnRenamed(existing="lower_intervalKodeAntalAnsatte",new="AntalAnsatte")
.drop("cvrNummer")
.drop("timeStampFra")
.drop("timeStampTil")
.drop("gyldigFra")
.drop("gyldigTil")
.drop("ts"))
describeKapMedDf.show()
OK lets try the correlation (Pearsons) between kapital and the two work-figures...
In [34]:
#The three beskæftigelses numbers are joined together and re-sampled
display(Markdown("### Standard korrelations koeficienter."))
print("Korrelationen imellem kapital og årsværker: "+str(kapOverDf.corr("KAPITAL_vaerdi","lower_intervalKodeAntalAarsvaerk"))[:5])
print("Korrelationen imellem kapital og årsværker: "+str(kapOverDf.corr("KAPITAL_vaerdi","lower_intervalKodeAntalAnsatte"))[:5])
In [109]:
#do stuff to the description dataframe
def scaleEm(df,labelCol,featCols):
meanAndStd = (df.describe().filter( (F.col("summary") == "mean")|(F.col("summary") == "stddev") )
.rdd
.map(lambda x: (x["summary"],x.asDict())).collectAsMap())
mstdBroadcast = sc.broadcast(meanAndStd)
#the function columns are made here!
scaleCol = [((F.col(i) - F.lit(mstdBroadcast.value["mean"][i]) )/F.lit(mstdBroadcast.value["stddev"][i])).alias(i) for i in featcols]
featuresDf = (kapOverDf
.select(labelsCol+scaleCol)
.distinct()
)
return featuresDf
In [110]:
# OK so we're taking the log1p first if that doesn't work then we'll scale 'em
labelsCol = ["cvrNummer","lower_intervalKodeAntalAarsvaerk","lower_intervalKodeAntalAnsatte","aar"]
featcols = ["KAPITAL_vaerdi"]
onlyLogKapCols = [F.log1p("KAPITAL_vaerdi").alias("KAPITAL_vaerdi"),"lower_intervalKodeAntalAarsvaerk","lower_intervalKodeAntalAnsatte","aar"]
#funcsCol = [((F.col(i) - F.lit(mstdBroadcast.value["mean"][i]) )/F.lit(mstdBroadcast.value["stddev"][i])).alias(i) for i in featcols]
#logFuncCol = [F.log1p(i) for i in featcols]
featuresDf = (scaleEm(kapOverDf,labelsCol,onlyLogKapCols)
.withColumnRenamed(existing="lower_intervalKodeAntalAarsvaerk",new="pAntalAarsvaerk")
.withColumnRenamed(existing="lower_intervalKodeAntalAnsatte",new="pAntalAnsatte"))
In [54]:
showScatterMatrix(featuresDf,labelsCol+featcols)
In [44]:
def translateCols(df,months):
'''
NOTE: needs to be more general!
'''
windowYearLag = (Window
.partitionBy(F.col("cvrNummer"))
.orderBy(F.col("aar"),F.col("maaned")))
return (df
.withColumn(col=F.lead(F.col("lower_intervalKodeAntalAarsvaerk"),count=months).over(windowYearLag),colName="pAntalAarsvaerk")
.withColumn(col=F.lead(F.col("lower_intervalKodeAntalAnsatte"),count=months).over(windowYearLag),colName="pAntalAnsatte")
.na
.drop("all",subset=["pAntalAarsvaerk","pAntalAnsatte"])
.select(["cvrNummer","aar","maaned","ts","KAPITAL_vaerdi","pAntalAarsvaerk","pAntalAnsatte"])
)
In [113]:
oneYearDf = translateCols(kapOverDf,12).cache()
twoYearsDf = translateCols(kapOverDf,24).cache()
threeYearsDf = translateCols(kapOverDf,36).cache()
allDfs = [featuresDf,oneYearDf,twoYearsDf,threeYearsDf]
allDfs[0].show()
In [55]:
showScatterMatrix(oneYearDf,["aar","cvrNummer",F.log1p("KAPITAL_vaerdi"),"pAntalAarsvaerk","pAntalAnsatte"])
In [56]:
#oneYearDf.show()
display(Markdown("### Korrelation med forskudt kapiptal"))
print("Korrelation imellem Årsværk og kapital efter 1 årsforskydning: "+str(oneYearDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAarsvaerk").corr("vaerdi","pAntalAarsvaerk"))[:5])
print("Korrelation imellem Ansatte og kapital efter 1 årsforskydning: "+str(oneYearDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAnsatte").corr("vaerdi","pAntalAnsatte"))[:5])
print("Årsværk og kapital efter 2 år: "+str(twoYearsDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAarsvaerk").corr("vaerdi","pAntalAarsvaerk"))[:5])
print("Ansatte og kapital efter 2 år: "+str(twoYearsDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAnsatte").corr("vaerdi","pAntalAnsatte"))[:5])
print("Årsværk og kapital efter 3 år: "+str(threeYearsDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAarsvaerk").corr("vaerdi","pAntalAarsvaerk"))[:5])
print("Ansatte og kapital efter 3 år: "+str(threeYearsDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAnsatte").corr("vaerdi","pAntalAnsatte"))[:5])
display(Markdown("Ikke den store overaskelse..."))
In [57]:
#twoYearsDf.show()
print(oneYearDf.count())
print(twoYearsDf.count())
print(threeYearsDf.count())
In [49]:
import time
def quantile(rdd, p, sample=None, seed=None):
"""Compute a quantile of order p ∈ [0, 1]
:rdd a numeric rdd
:p quantile(between 0 and 1)
:sample fraction of and rdd to use. If not provided we use a whole dataset
:seed random number generator seed to be used with sample
"""
assert 0 <= p <= 1
assert sample is None or 0 < sample <= 1
seed = seed if seed is not None else time.time()
rdd = rdd if sample is None else rdd.sample(False, sample, seed)
rddSortedWithIndex = (rdd
.sortBy(lambda x: x)
.zipWithIndex()
.map(lambda x: (x[1], x[0]))
.cache())
n = rddSortedWithIndex.count()
h = (n - 1) * p
rddX, rddXPlusOne = (
rddSortedWithIndex.lookup(x)[0]
for x in int(np.floor(h)) + np.array([0, 1]))
return rddX + (h - np.floor(h)) * (rddXPlusOne - rddX)
In [50]:
#heres what you'll do. Filter on pantalansatte
def getQuantileOutliers(df,group=0,subset=["cvrNummer","aar","KAPITAL_vaerdi","pAntalAarsvaerk","pAntalAnsatte"],valueCol="KAPITAL_vaerdi",groupCol="pAntalAnsatte"):
groupPdf = (oneYearDf
.dropDuplicates(subset)
.filter((F.col(groupCol)==group))
.toPandas())
q1 = groupPdf.quantile(0.25)
q3 = groupPdf.quantile(0.75)
iQR = q3 - q1
#print(q1-iQR*1.5)
#print(q3)
#print(iQR["KAPITAL_vaerdi"])
return (oneYearDf
.dropDuplicates(subset)
.filter((~F.col(valueCol).between(q1[valueCol]-1.5*iQR[valueCol],q3[valueCol]+1.5*iQR[valueCol]))
& (F.col(groupCol)==group))
)
#quantile(oneYearDf.select("KAPITAL_vaerdi").na.drop().rdd.map(lambda x: x[0]),0.75)
Box plot for aarsværkstal and medarbejdstal with displacement
In [144]:
plotLength = len(allDfs)
years = ["Årsværker", "Antal ansatte"]
funCols = ["pAntalAnsatte","pAntalAarsvaerk"]
fig, axes = plt.subplots(1,2,figsize=(10,5))
#allDfs[i].printSchema()
df = (allDfs[0]
.filter(F.col("aar")==2012)
.select(F.log1p("KAPITAL_vaerdi").alias("log_kapital"),"pAntalAnsatte","pAntalAarsvaerk")
.toPandas())
#allDfs[0].show()
for i in range(2):
axes[i].set_title("kapital sammenlignet med "+years[i])
sb.boxplot(x=funCols[i],y="log_kapital",data=df,ax=axes[i])
#sb.boxplot(x="pAntalAarsvaerk",y="log_kapital",data=df,ax=aarsAx)
display(Markdown("### Boxplot for Årsværk og antal ansatte kombineret med kapital i 2012"))
[plt.setp(item.yaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
#x ticklabels
[plt.setp(item.xaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
[plt.setp(item.yaxis.get_label(), 'size', 5) for item in axes.ravel()]
#x labels
[plt.setp(item.xaxis.get_label(), 'size', 5) for item in axes.ravel()]
plt.show()
In [117]:
#display(Markdown("Boxplot for Årsværk og antal ansatte kombineret med 1 forskudt kapital"))
df = (allDfs[1]
.filter(F.col("aar") == 2012)
.select(F.log1p("KAPITAL_vaerdi").alias("log_kapital"),"pAntalAnsatte","pAntalAarsvaerk")
.toPandas())
fig, axes = plt.subplots(1,2,figsize=(10,5))
for i in range(2,4):
#allDfs[i].printSchema()
axes[i-2].set_title("Forskudt kapital sammenlignet med "+years[i-2])
sb.boxplot(x=funCols[i-2],y="log_kapital",data=df,ax=axes[i-2])
#sb.boxplot(x="pAntalAarsvaerk",y="log_kapital",data=df,ax=aarsAx)
[plt.setp(item.yaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
#x ticklabels
[plt.setp(item.xaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
[plt.setp(item.yaxis.get_label(), 'size', 5) for item in axes.ravel()]
#x labels
[plt.setp(item.xaxis.get_label(), 'size', 5) for item in axes.ravel()]
display(Markdown("### Boxplot for Årsværk og antal ansatte kombineret med 1 års forskudt kapital i 2012"))
plt.show()
In [123]:
df = (allDfs[2]
.filter(F.col("aar") == 2012)
.select(F.log1p("KAPITAL_vaerdi").alias("log_kapital"),"pAntalAnsatte","pAntalAarsvaerk")
.toPandas())
fig, axes = plt.subplots(1,2,figsize=(10,5))
for i in range(4,6):
#allDfs[i].printSchema()
axes[i-4].set_title("Forskudt kapital sammenlignet med "+years[i-4])
sb.boxplot(x=funCols[i-4],y="log_kapital",data=df,ax=axes[i-4])
#sb.boxplot(x="pAntalAarsvaerk",y="log_kapital",data=df,ax=aarsAx)
[plt.setp(item.yaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
#x ticklabels
[plt.setp(item.xaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
[plt.setp(item.yaxis.get_label(), 'size', 5) for item in axes.ravel()]
#x labels
[plt.setp(item.xaxis.get_label(), 'size', 5) for item in axes.ravel()]
display(Markdown("### Boxplot for Årsværk og antal ansatte kombineret 2 års forskudt med kapital i 2012"))
plt.show()
In [96]:
windowSpecRank =(Window.partitionBy(F.col("cvrNummer"))).orderBy(F.col("periode_gyldigFra").desc())
groupCols = ["cvrNummer","vaerdi"]
companyNameDf = (sqlContext
.read
.parquet(namePath+"companyCvrData")
.withColumn(colName="rank",col=F.rank().over(windowSpecRank))
.filter((F.col("rank")==1) & (F.col("sekvensnr")==0))
.select([F.col(i) for i in groupCols])
.withColumnRenamed(existing="vaerdi",new="navn")
.orderBy(F.col("cvrNummer"))
.cache()
)
In [149]:
qOutliersDf = getQuantileOutliers(allDfs[1].filter(F.col("aar")==2012),group=1)
withCompanies = (qOutliersDf
.join(other=companyNameDf,on=(qOutliersDf["cvrNummer"]==companyNameDf["cvrNummer"]),how="left")
.select("navn","KAPITAL_vaerdi")
.groupBy("navn")
.agg(F.mean("KAPITAL_vaerdi"))
.orderBy(F.col("avg(KAPITAL_vaerdi)").desc())
)#join companyname her!
display(Markdown("### Top 20 outliers med gennemsnitlig kapital for 1 ansat forskudt med 1 år"))
withCompanies.show(truncate=False)
print( qOutliersDf.count())
In [150]:
qOutliersDf = getQuantileOutliers(allDfs[1].filter(F.col("aar")==2012),group=50)
withCompanies = (qOutliersDf
.join(other=companyNameDf,on=(qOutliersDf["cvrNummer"]==companyNameDf["cvrNummer"]),how="left")
.select("navn","KAPITAL_vaerdi")
.groupBy("navn")
.agg(F.mean("KAPITAL_vaerdi"))
.orderBy(F.col("avg(KAPITAL_vaerdi)").desc())
)#join companyname her!
display(Markdown("### Top 20 outliers med gennemsnitlig kapital for 50 ansatte forskudt med 1 år"))
withCompanies.show(truncate=False)
In [151]:
qOutliersDf = getQuantileOutliers(allDfs[2].filter(F.col("aar")==2012),group=50)
withCompanies = (qOutliersDf
.join(other=companyNameDf,on=(qOutliersDf["cvrNummer"]==companyNameDf["cvrNummer"]),how="left")
.select("navn","KAPITAL_vaerdi")
.groupBy("navn")
.agg(F.mean("KAPITAL_vaerdi"))
.orderBy(F.col("avg(KAPITAL_vaerdi)").desc())
)#join companyname her!
display(Markdown("### Top 20 outliers med gennemsnitlig kapital for 50 ansatte forskudt med 2 år"))
withCompanies.show(truncate=False)
Medarbejds- og Årsværkstal er indelt i kategorier, mens kapital er mere frit indsat.
Ændringer i kapital er ret uregelmæssigt indberettet, mens årsværker og antal ansatte indberettes fra års, kvartals og månedsbasis.
Det ses at der findes mange "outliers" ift. virksomheder der har få ansatte eller antalårsværk i forhold til kapital. Dog ses det også at flere firmaer ligger "pænt" når kapitalen forskydes med 1 og 2 år.
Yderligere undersøgelse kunne omhandle outliers i de forskellige grupper, for at se om firmaer vandre fra gruppe til gruppe.
In [143]:
qOutliersArr = [getQuantileOutliers(allDfs[i].filter(F.col("aar")==2012),group=1) for i in range(1,4)]
withCompanies = [(qOutliersArr[i]
.join(other=companyNameDf,on=(qOutliersArr[i]["cvrNummer"]==companyNameDf["cvrNummer"]),how="left")
.select("navn","KAPITAL_vaerdi")
.groupBy("navn")
.agg(F.mean("KAPITAL_vaerdi"))
.orderBy(F.col("avg(KAPITAL_vaerdi)").desc())
) for i in range(0,3)]
display(Markdown("Gennemsnitlig "))
.subtract(withCompanies1).show(truncate=False)
(withCompanies[1].subtract(withCompanies1)).show(truncate=False)
withCompanies[2].subtract(withCompanies1).show(truncate=False)
In [ ]:
In [ ]:
In [15]:
def computeExplainedVar(df,groupCol,summationCol):
'''
This method computes the explained variance also called
'''
funcCols = [F.count,F.avg]
exprsCols = [f(summationCol) for f in funcCols]
secondFuncCols = [F.count,F.sum]
secondExpsCols = [f("avgKapital") for f in secondFuncCols]
totalMean = df.na.drop().groupBy().mean(summationCol).collect()[0]
groupMeanDf = (df
.na
.drop()
.select(groupCol,summationCol)
.groupBy(groupCol)
.agg(*exprsCols)
.withColumn(col=
F.col("count(KAPITAL_VAERDI)")*(F.col("avg(KAPITAL_VAERDI)")-totalMean[0])**2
,colName="avgKapital")
.groupBy()
.agg(*secondExpsCols)
.withColumn(col=F.col("count(avgKapital)")-F.lit(1),colName="DegreeOFExplained")
.withColumn(col=F.col("sum(avgKapital)")/(F.col("DegreeOFExplained")),colName="ExplainedVar")
)
return groupMeanDf
In [16]:
computeExplainedVar(twoYearsDf,"pAntalAnsatte","KAPITAL_VAERDI").show()
In [17]:
def computeUnexplainedVar(df,groupCol,summationCol):
'''
This method computes the unexplained variance or within-group variability which is the denominator in the F-test
computation
Input:
- df spark data frame containing the data. Data should at least contain a group column and the column that is
subjected to variance
- groupCol string that keeps the name of the column listing the group variabels
- summationCol string that keeps the name of the column with variability
Output:
- subtractMeanDf spark data frame that contains the unexplained variance.
'''
noMissingDf = (df
.select(groupCol,summationCol)
.na
.drop())
funcCols = [F.mean]
exprsCols = [f(summationCol) for f in funcCols]
groupMeanRdd = (noMissingDf
.groupBy(groupCol)
.agg(*exprsCols)
.rdd
)
meanMap = groupMeanRdd.collectAsMap()
subtractMeanRdd = (noMissingDf
.rdd
.map(lambda x: (x[0],x[1],meanMap[x[0]]))
)
NminusK = noMissingDf.count()-groupMeanRdd.count()
schema = StructType([StructField(groupCol,IntegerType()),StructField(summationCol,DoubleType()),StructField("groupMean",DoubleType())])
meanFuncUdf = F.udf(lambda x,y: float(((x-y)**2)/(NminusK)),DoubleType())
subtractMeanDf = (sqlContext
.createDataFrame(subtractMeanRdd,schema=schema)
.withColumn(col=meanFuncUdf(F.col(summationCol),F.col("groupMean")),colName="subSums")
.groupBy()
.sum()
.withColumn(col=F.lit(NminusK),colName="DegreeOFunexplained")
)
#subtractMeanDf.show()
return subtractMeanDf
In [18]:
#twoYearsDf.show()
computeUnexplainedVar(twoYearsDf,"pAntalAnsatte","KAPITAL_VAERDI").show()
In [19]:
def computeF(df,groupCol,summationCol):
explainedVar = computeExplainedVar(df,groupCol,summationCol).collect()[0]
unExplainedVar = computeUnexplainedVar(df,groupCol,summationCol).collect()[0]
F_val = float(explainedVar["ExplainedVar"]/unExplainedVar["sum(subSums)"])
return [F_val,explainedVar["DegreeOFExplained"],unExplainedVar["DegreeOFunexplained"]]
In [20]:
F1 = computeF(oneYearDf,"pAntalAnsatte","KAPITAL_VAERDI")
In [21]:
F2 = computeF(twoYearsDf,"pAntalAnsatte","KAPITAL_VAERDI")
In [22]:
sp.stats.f.sf(F2[0], float(F2[1]), float(F2[2]))
Out[22]:
In [17]:
sp.stats.f.sf(F1[0], float(F1[1]), float(F1[2]))
#print(sp.stats.f.sf(F2[0], float(F2[1]), float(F2[2])))
Out[17]:
In [ ]:
In [10]:
t1 = [164, 172, 168, 177, 156, 195]
t2 = [178, 191, 197, 182, 185, 177]
t3 = [175, 193, 178, 171, 163, 176]
t4 = [155, 166, 149, 164, 170, 168]
val = pan.DataFrame([t1,t2,t3,t4],index=['type1', 'type2', 'type3', 'type4'],columns=["ex0","ex1","ex2","ex3","ex4","ex5"])
In [45]:
val["label"] = [1, 2, 3, 4]
fxUdf = F.udf(lambda x,y,z,v,w,a: [float(x),float(y),float(z),float(v),float(w),float(a)],ArrayType(DoubleType()))
dftestF = (sqlContext
.createDataFrame(data=val)
.withColumn(col=fxUdf(F.col("ex0"),F.col("ex1"),F.col("ex2"),F.col("ex3"),F.col("ex4"),F.col("ex5")),colName="vector")
.select("label",F.explode("vector").alias("KAPITAL_vaerdi"))
)
dftestF.printSchema()
In [23]:
#dftestF.show()
In [47]:
Ft = computeF(dftestF,"label","KAPITAL_vaerdi")
In [49]:
sp.stats.f.sf(Ft[0], float(Ft[1]), float(Ft[2])) # this shows that own implementation of F.test works, p-value at 0.68
Out[49]:
In [ ]: