In [ ]:
%%HTML
<style type="text/css">
.input_prompt, .input_area, .output_prompt {
display:none !important;
}
</style>
In [1]:
%%HTML
<script>
function code_toggle() {
if (code_shown){
$('div.input').hide('500');
$('#toggleButton').val('Show Code')
} else {
$('div.input').show('500');
$('#toggleButton').val('Hide Code')
}
code_shown = !code_shown
}
$( document ).ready(function(){
code_shown=false;
$('div.input').hide()
});
</script>
<form action="javascript:code_toggle()"><input type="submit" id="toggleButton" value="Show Code"></form>
In [1]:
#Always Pyspark first!
ErhvervsPath = "/home/svanhmic/workspace/DABAI"
parquetPath = "/home/svanhmic/workspace/data/DABAI/sparkdata/parquet"
from pyspark.sql import functions as F, Window, WindowSpec
from pyspark.sql import Row
from pyspark.sql.types import StringType,ArrayType,IntegerType,DoubleType,StructField,StructType,BooleanType
sc.addPyFile(ErhvervsPath+"/RegnSkabData/ImportRegnskabData.py")
sc.addPyFile(ErhvervsPath+'/RegnSkabData/RegnskabsClass.py')
sc.addPyFile(ErhvervsPath+'/ReadData/Fstat.py')
sc.addPyFile(ErhvervsPath+'/ReadData/GetNextJsonLayer.py')
import sys
import re
import os
import ImportRegnskabData
import GetNextJsonLayer
import itertools
import functools
%matplotlib inline
import seaborn as sb
import matplotlib.pyplot as plt
import numpy as np
import Fstat
import scipy as sp
import IPython
from IPython.display import display, Markdown, Latex
from pandas.tools.plotting import scatter_matrix
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:65% !important; }</style>"))
sb.set_context("talk")
In [2]:
def isText(col):
#if the value contains letters
try:
matches = re.search(col,r'\w+ ^\d+',re.I)
print(matches.group(0))
return "string"
except:
#print(col)
return "double"
textUdf = F.udf(lambda x: isText(x),StringType())
In [3]:
#input debt data
regnDf = (sqlContext
.read
.parquet(parquetPath+"/regnskaberDebt.parquet")
.withColumn(colName="Dec",col=F.col("Dec").cast("integer"))
.withColumn(colName="Prec",col=F.col("Prec").cast("integer"))
.withColumn(colName="type",col=textUdf(F.col("Value")))
)
#regnDf.printSchema()
regnDf.show(20,False)
In [4]:
display(Markdown("### Hvilke variable har vi at gøre godt med gæld at gøre i de ca. 600000 regnskaber, fra 2014-2016?"))
x = regnDf.select(F.regexp_replace("Name","\w+:|\w+-","").alias("Name")).groupBy("Name").count()
print("Der er i alt "+str(x.count()))
x.orderBy(F.col("count").desc()).limit(8).toPandas()#.show(8,False)
Out[4]:
De 6 øverste bruges i denne analyse.
In [5]:
display(Markdown("Data fordelt på de 6 felter og år"))
filtstr = ""
cols = ["ShorttermDebtToBanks"
,"LongtermMortgageDebt"
,"ShorttermDebtToOtherCreditInstitutions"
,"LongtermDebtToBanks"
,"LongtermDebtToOtherCreditInstitutions"
,"ShorttermMortgageDebt"]
for i in cols[:-1]:
filtstr += "( Name = '"+i + "') or "
print()
#filtstr+"(Name == '"+cols[-1]+"')"
(regnDf
.select("EntityIdentifier",F.regexp_replace("Name","\w+:|\w+-","").alias("Name"),F.year("End_Instant").alias("year"))
.filter(filtstr+"(Name = '"+cols[-1]+"')")
.groupBy("Name","year")
.count()
.groupBy("Name")
.pivot("year")
.agg(F.max("count"))
.toPandas()
#.show(truncate=True)
)
Out[5]:
In [6]:
def containsConsolidated(arr):
for a in arr:
pattern = re.search(pattern=r":Consolidated",string=a,flags=re.IGNORECASE)
if pattern != "":
return True
return False
consolidatedUdf = F.udf(lambda x: containsConsolidated(x),BooleanType())
(regnDf
.select("EntityIdentifier",F.regexp_replace("Name","\w+:|\w+-","").alias("Name"),F.year("End_Instant").alias("year"),consolidatedUdf("Dimensions").alias("containsDim"))
.filter(F.col("containsDim") == False)
.select("Name")
.groupBy("Name")
.count()
.orderBy(F.col("count").desc())
.show(100,truncate=False)
)
Det ses at data er mest koncenteret omkring 2012-2016.
In [7]:
#OK how many are acctual value columns?
valDf = regnDf.filter((F.col("type") == "double"))
cols = [i for i in regnDf.columns if i not in ("Name","Value")]
funcsCols = [F.regexp_replace("Name","\w+:|\w+-","").alias("Name") , F.regexp_replace("Value",",","").alias("Value")]+cols
valDf = (valDf
.select(funcsCols)
.filter((F.col("type")=="double"))
.withColumn(col=F.col("Value").cast("double"),colName="Value")
.withColumn(col=F.unix_timestamp(F.col("End_Instant")),colName="End_Instant_ts")
#.drop("contextRef")
.drop("Dimensions")
.drop("Lang")
.drop("DebtNames")
.drop("originalLength")
.distinct()
)
#print(valDf.groupBy("Name").count().count())
#valDf.show()
#valDf.groupBy("Name").count().orderBy(F.col("Name").desc()).show(53,False)
In [8]:
#how many companies are the in this dataset?
nameCols = ["EntityIdentifier","Name",F.year("End_Instant").alias("End_Instant")]
originalNamesCols = ["EntityIdentifier","Name","End_Instant"]
namesDf = regnDf.select(*nameCols).groupBy(*originalNamesCols).count()
#namesDf.orderBy(nameCols[0],F.col("count").desc()).show()
In [9]:
#Load the kapitals in.
mainKapitalDf = (sqlContext
.read
.parquet(parquetPath+"/KaptialDataFrame.parquet")
.drop("KAPITALKLASSER_vaerdi")
.drop("KAPITAL_DELVIST_vaerdi")
.withColumn(col=F.coalesce(F.col("gyldigTil"),F.lit(F.current_date())),colName="gyldigTil")
.withColumn(col=F.datediff(F.col("GyldigTil"),F.col("gyldigFra")),colName="datediff")
.withColumn(col=F.col("KAPITAL_vaerdi").cast("double"),colName="KAPITAL_vaerdi")
.filter(F.year("gyldigFra") >= 2007)
)
display(Markdown("### Hvordan ser kapital data ud? "))
display(Markdown("Kapital data fra 2007 og frem"))
mainKapitalDf.limit(2).toPandas()#.show(2)
#mainKapitalDf.printSchema()
In [ ]:
joinsOn = ( (mainKapitalDf["cvrNummer"] == valDf["EntityIdentifier"] )
& (valDf["End_Instant_ts"].between(mainKapitalDf["timeStampFra"],mainKapitalDf["timeStampTil"]))
& (F.year(valDf["End_Instant"]) == mainKapitalDf["aar"])
)
NamesToUse = ["ShorttermDebtToBanks"
,"LongtermMortgageDebt"
,"ShorttermDebtToOtherCreditInstitutions"
,"LongtermDebtToBanks"
,"LongtermDebtToOtherCreditInstitutions"
,"ShorttermMortgageDebt"
,"ShorttermDebtToBanksCashFlowsStatement"
,"OtherLongtermDebtRaisedByIssuanceOfBonds"
]
notPivotCols = ["aar","cvrNummer","End_Instant","End_Instant_ts","KAPITAL_vaerdi"]
selectedCols = ["aar","cvrNummer","Name","KAPITAL_vaerdi","KAPITALVALUTA_vaerdi","Value","End_Instant","End_Instant_ts"]
filterdCols = ["Name == '"+str(i)+"'" for i in NamesToUse]
combinedCols = notPivotCols+[F.col(col+".Value").alias(col) for i,col in enumerate(NamesToUse)]
filtersStr = "( "
for i in filterdCols[:-1]:
filtersStr += i+" | "
filtersStr += filterdCols[-1]+")"
#print(filtersStr)
joinedDf = (mainKapitalDf
.drop("ts")
.drop("maaned") # this gives all years for all companies that has a registered kapital
.distinct()
.join(other=valDf,on=joinsOn,how="left")
#.filter(filtersStr)
.select(*selectedCols)
.groupBy(*selectedCols)
.count()
.groupBy(*notPivotCols)
.pivot("Name",NamesToUse)
.agg(F.max(F.struct([F.col("count"),F.col("Value")])))
.select(*combinedCols)
.cache() #nice to have, when plotting stuff.
)
In [ ]:
joinedDf.orderBy("cvrNummer").show(10,False)
In [ ]:
#not scaled so depricated
scatterKapDf = joinedDf.select(["cvrNummer","aar","KAPITAL_vaerdi"]+NamesToUse[:4])
scatterDescribeDf = scatterKapDf.drop("cvrNummer").drop("aar").describe()
#scatterDescribeDf.show()
#scatterKapDf.count()
#axes = scatter_matrix(scatterKapDf.drop("cvrNummer").drop("aar").toPandas(),alpha=0.5,figsize=[30,30])
#[plt.setp(item.yaxis.get_majorticklabels(), 'size', 15) for item in axes.ravel()]
#x ticklabels
#[plt.setp(item.xaxis.get_majorticklabels(), 'size', 15) for item in axes.ravel()]
#[plt.setp(item.yaxis.get_label(), 'size', 20) for item in axes.ravel()]
#x labels
#[plt.setp(item.xaxis.get_label(), 'size', 20) for item in axes.ravel()]
#print(scatterKapDf.count())
#plt.show()
In [ ]:
#OKAY lets scale with mean and std and take log1p of the scaled stuff
description = (scatterDescribeDf
.filter( (F.col("summary") == "mean")|(F.col("summary") == "stddev") )
.rdd
.map(lambda x: (x["summary"],x.asDict())).collectAsMap())
#print(description)
describBroadCast = sc.broadcast(description)
cols = [F.log1p((F.col(i)-F.lit(describBroadCast.value["mean"][i]))/F.lit(describBroadCast.value["stddev"][i])).alias(i) for i in ["KAPITAL_vaerdi"]+NamesToUse[:4]]
scaledScatterKapDf = scatterKapDf.select(*cols,"aar")
scatterDescribeDf = scaledScatterKapDf.describe()
scaledScatterKapDf.count()
axes = scatter_matrix(scaledScatterKapDf.drop("aar").toPandas(),alpha=0.5,figsize=[9,9])
#axes = scatter_matrix(scaledScatterKapDf.filter(F.col("aar") == 2012).drop("aar").toPandas(),alpha=0.5,figsize=[9,9])
[plt.setp(item.yaxis.get_majorticklabels(), 'size', 7) for item in axes.ravel()]
#x ticklabels
[plt.setp(item.xaxis.get_majorticklabels(), 'size', 7) for item in axes.ravel()]
[plt.setp(item.yaxis.get_label(), 'size', 7) for item in axes.ravel()]
#x labels
[plt.setp(item.xaxis.get_label(), 'size', 7) for item in axes.ravel()]
display(Markdown("### Hvordan er sammenhængen mellem kapitalforhøjelser og de 6 former for gæld?"))
#plt.title("Kapitalvaerdier mod de 6 former for gæld i regnskabsdata")
plt.show()
In [ ]:
#OK what's the correlation plot for
corr = scaledScatterKapDf.drop("aar").toPandas().corr()
#corr = scaledScatterKapDf.filter(F.col("aar") == 2012).drop("aar").toPandas().corr()
cmap = sb.diverging_palette(220, 10, as_cmap=True)
sb.heatmap(corr, cmap=cmap,annot=True)
display(Markdown("### Korrelation i mellem Kapital og gældsformer"))
plt.title("Den egentlige korrelation imellem kapital og diverse former for gæld")
plt.show()
display(Markdown("OK der er ikke den store sammenhæng mellem kapital og gæld, men hvad nu hvis vi forskyder gæld. "
))
#so in initial conclusion, we can't see that much for when debt is in the same year as kapital. But what about when we shift kapital years bac
In [ ]:
def skewDebt(df,cols,years=1):
"this is still too messy!"
skeewWindow = (Window.partitionBy("cvrNummer").orderBy("aar"))
skeewedDf = (df
.withColumn(col=F.lag(F.struct(*cols[:4]),count=years).over(skeewWindow),colName="oneYearLag")
.select(["cvrNummer","aar","KAPITAL_vaerdi"]+[F.col("oneYearLag."+str(i)) for i in cols[:4]])
)
descr = (skeewedDf.describe()
.filter( (F.col("summary") == "mean")|(F.col("summary") == "stddev") )
.rdd
.map(lambda x: (x["summary"],x.asDict())).collectAsMap())
scaleCols = [F.log1p((F.col(i)-F.lit(descr["mean"][i]))/F.lit(descr["stddev"][i])).alias(i) for i in ["KAPITAL_vaerdi"]+cols[:4]]
return (scatterKapDf.select(*scaleCols))
#skeewedDf.printSchema()
In [ ]:
skeewedDf1 = skewDebt(scatterKapDf,NamesToUse,1)
skeewedDf2 = skewDebt(scatterKapDf,NamesToUse,2)
skeewedDf3 = skewDebt(scatterKapDf,NamesToUse,3)
skeewedDf4 = skewDebt(scatterKapDf,NamesToUse,4)
In [ ]:
#skeewedDf1.printSchema()
In [ ]:
skeewArr = [skeewedDf1,skeewedDf2,skeewedDf3,skeewedDf4]
#skeewArr = [i.filter(F.col("aar")==2012).drop("aar") for i in [skeewedDf1,skeewedDf2,skeewedDf3,skeewedDf4]]
In [ ]:
# a heatmap of the correlation between the different variables:
# Compute the correlation matrix
plotLen = len(skeewArr)
fig, axes = plt.subplots(1,2,figsize=(25,10))
cmap = sb.diverging_palette(220, 10, as_cmap=True)
years = [1,2,3,4]
sb.set(font_scale=1.0)
ax0 = axes[0]
ax1 = axes[1]
ax0.set_title("Gældstyper sammenlignet med kapital fra 1 år siden.")
ax1.set_title("Gældstyper sammenlignet med kapital fra 2 år siden.")
sb.heatmap(skeewArr[0].na.drop("all").toPandas().corr(), cmap=cmap,annot=True,ax=ax0)
sb.heatmap(skeewArr[1].na.drop("all").toPandas().corr(), cmap=cmap,annot=True,ax=ax1)
#scatter_matrix(data,alpha=0.5,ax=axes[x,y])
#plt.title("correlation between kapital and various forms of debt short and long term")
display(Markdown("### Sammenligning mellem gæld og forskudt kapital"))
#[plt.setp(item, 'rotation', 15) for item in axes.ravel()]
plt.show()
In [ ]:
plotLen = len(skeewArr)
fig, axes = plt.subplots(1,2,figsize=(25,10))
cmap = sb.diverging_palette(220, 10, as_cmap=True)
axes[0].set_title("Gældstyper sammenlignet med kapital fra 3 år siden.")
axes[1].set_title("Gældstyper sammenlignet med kapital fra 4 år siden.")
sb.heatmap(skeewArr[2].na.drop("all").toPandas().corr(), cmap=cmap,annot=True,ax=axes[0])
sb.heatmap(skeewArr[3].na.drop("all").toPandas().corr(), cmap=cmap,annot=True,ax=axes[1])
#scatter_matrix(data,alpha=0.5,ax=axes[x,y])
#plt.title("correlation between kapital and various forms of debt short and long term")
display(Markdown("### Sammenligning mellem gæld og forskudt kapital"))
plt.show()
In [ ]: