In [2]:
%%HTML
<script>
  function code_toggle() {
    if (code_shown){
      $('div.input').hide('500');
      $('#toggleButton').val('Show Code')
    } else {
      $('div.input').show('500');
      $('#toggleButton').val('Hide Code')
    }
    code_shown = !code_shown
  }

  $( document ).ready(function(){
    code_shown=false;
    $('div.input').hide()
  });
</script>
<form action="javascript:code_toggle()"><input type="submit" id="toggleButton" value="Show Code"></form>



In [5]:
%%HTML
<style type="text/css">
.output_prompt {
    display:none !important;
}
</style>



In [1]:
#Always Pyspark first!
ErhvervsPath = "/home/svanhmic/workspace/Python/Erhvervs"

from pyspark.sql import functions as F, Window, WindowSpec
from pyspark.sql import Row
from pyspark.sql.types import StringType,ArrayType,IntegerType,DoubleType,StructField,StructType
sc.addPyFile(ErhvervsPath+"/src/RegnSkabData/ImportRegnskabData.py")
sc.addPyFile(ErhvervsPath+'/src/RegnSkabData/RegnskabsClass.py')
sc.addPyFile(ErhvervsPath+'/src/cvr/Fstat.py')
sc.addPyFile(ErhvervsPath+'/src/cvr/GetNextJsonLayer.py')

import sys
import re
import os
import ImportRegnskabData
import GetNextJsonLayer
import itertools
import functools

%matplotlib inline
import seaborn as sb
import pandas as pan
import matplotlib.pyplot as plt
import numpy as np
import Fstat
import scipy as sp

import IPython
from IPython.display import display, Markdown, Latex
from pandas.tools.plotting import scatter_matrix

In [2]:
regnskabPath = ErhvervsPath+'/data/regnskabsdata/sparkdata/parquet/regnskaber.parquet'
csvPath = ErhvervsPath+'/data/regnskabsdata/cleanCSV'
taxPath = ErhvervsPath+'/data/regnskabsdata/cleanTaxLists'

In [3]:
lenUdf = F.udf(lambda x: ImportRegnskabData.lend(x),IntegerType())
convertedUdf = F.udf(lambda x: str(ImportRegnskabData.convertToSym(x)),StringType())
strs ="Anvendt regnskabspraksis Den anvendte regnskabspraksis er u&#230;ndret i forhold til sidste &#229;r.&#160;&#160;&#160;&#160;&#160;&#160;&#160;  &#160;&#160;&#160;&#160;&#160;&#160;&#160;   Generelt om indregning og m&#229;ling&#160;&#160;&#160;&#160;&#160;&#160;&#160;  Regnskabet er udarbejdet med udgangspunkt i det historiske kostprisprincip.&#160;&#160;&#160;&#160;&#160;&#160;&#160;  &#160;&#160;&#160;&#160;&#160;&#160;&#160;  Indt&#230;gter indregnes i resultatopg&#248;relsen i takt med, at de indtjenes. Herudover indregnes v&#230;rdireguleringer af finansielle aktiver og forpligtelser, der m&#229;les til dagsv&#230;rdi eller amortiseret kostpris. Endvidere indregnes i resultatopg&#248;relsen alle omkostninger, der er afholdt for at opn&#229; &#229;rets indtjening, herunder afskrivninger, nedskrivninger og hensatte forpligtelser samt tilbagef&#248;rsler som f&#248;lge af &#230;ndrede regnskabsm&#230;ssige sk&#248;nstrs ="

In [4]:
def pivotOnText(df,**kvargs):
    '''
        does the pivotation on text cols and removes the excess counts
        input df - dataframe 
        kvargs - optional arguments included are: 
                pivotCol - specify column that shoould be pivotated, default type
                valueCol - specify column that should be aggregated on, defalut vaerdi
                expectedList - specify the values in the pivotated column, default ["KAPITAL"]
    '''
    
    #sets some of the optional parameters
    pivotCol = kvargs.get("pivotCol","type")
    expectedList = kvargs.get("expectedList",["KAPITAL"])
    valueCol = kvargs.get("valueCol","vaerdi")
    
    holdOutsCols = [pivotCol,valueCol]
    nonHoldOutCols = [i for i in df.columns if i not in holdOutsCols]

    
    newDf = (df
             .groupBy(df.columns)
             .count()
             .groupBy(*nonHoldOutCols)
             .pivot(pivotCol,expectedList)
             .agg(F.max(F.struct("count",valueCol)))
           )
    expandedDf = GetNextJsonLayer.expandSubCols(newDf,*expectedList)
    newCols = [i for i in expandedDf.columns if i not in [v+"_count" for v in expectedList] ]
    return expandedDf.select(newCols)

In [52]:
def showScatterMatrix(df,cols):
    
    featuresDf = df.select(*cols).distinct().drop("cvrNummer").toPandas()
    axes = scatter_matrix(featuresDf,alpha=0.5,figsize=[9,9])
    [plt.setp(item.yaxis.get_majorticklabels(), 'size', 6) for item in axes.ravel()]
    #x ticklabels
    [plt.setp(item.xaxis.get_majorticklabels(), 'size', 6) for item in axes.ravel()]
    [plt.setp(item.yaxis.get_label(), 'size', 6) for item in axes.ravel()]
    #x labels
    [plt.setp(item.xaxis.get_label(), 'size', 6) for item in axes.ravel()]
    plt.show()

In [94]:
cvrPath = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/parquet"
namePath = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/"
cvrfiles = os.listdir(cvrPath)

print(cvrfiles)


['KaptialDataFrame.parquet', 'virkdata.parquet', 'AllApsAs.parquet', 'KvartalsVaerker.parquet', 'AarsVaerker.parquet', 'MaanedsVaerker.parquet', 'TotalAarsVaerker.parquet']

In [92]:
#import crv data
cvrDf = (sqlContext
         .read
         .parquet(cvrPath+"/"+cvrfiles[1])
        )
#cvrDf.show(1)
print(cvrDf.select("cvrNummer").distinct().count())
cvrDf.printSchema()


1529578
root
 |-- cvrNummer: long (nullable = true)
 |-- aarsbeskaeftigelse: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- aar: long (nullable = true)
 |    |    |-- intervalKodeAntalAarsvaerk: string (nullable = true)
 |    |    |-- intervalKodeAntalAnsatte: string (nullable = true)
 |    |    |-- intervalKodeAntalInklusivEjere: string (nullable = true)
 |    |    |-- sidstOpdateret: string (nullable = true)
 |-- virksomhedsstatus: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- periode: struct (nullable = true)
 |    |    |    |-- gyldigFra: string (nullable = true)
 |    |    |    |-- gyldigTil: string (nullable = true)
 |    |    |-- sidstOpdateret: string (nullable = true)
 |    |    |-- status: string (nullable = true)
 |-- reklamebeskyttet: boolean (nullable = true)
 |-- brancheAnsvarskode: long (nullable = true)
 |-- attributter: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- sekvensnr: long (nullable = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- vaerdier: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- periode: struct (nullable = true)
 |    |    |    |    |    |-- gyldigFra: string (nullable = true)
 |    |    |    |    |    |-- gyldigTil: string (nullable = true)
 |    |    |    |    |-- sidstOpdateret: string (nullable = true)
 |    |    |    |    |-- vaerdi: string (nullable = true)
 |    |    |-- vaerditype: string (nullable = true)
 |-- penheder: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- pNummer: long (nullable = true)
 |    |    |-- periode: struct (nullable = true)
 |    |    |    |-- gyldigFra: string (nullable = true)
 |    |    |    |-- gyldigTil: string (nullable = true)
 |    |    |-- sidstOpdateret: string (nullable = true)
 |-- hovedbranche: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- branchekode: string (nullable = true)
 |    |    |-- branchetekst: string (nullable = true)
 |    |    |-- periode: struct (nullable = true)
 |    |    |    |-- gyldigFra: string (nullable = true)
 |    |    |    |-- gyldigTil: string (nullable = true)
 |    |    |-- sidstOpdateret: string (nullable = true)
 |-- virksomhedsform: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ansvarligDataleverandoer: string (nullable = true)
 |    |    |-- kortBeskrivelse: string (nullable = true)
 |    |    |-- langBeskrivelse: string (nullable = true)
 |    |    |-- periode: struct (nullable = true)
 |    |    |    |-- gyldigFra: string (nullable = true)
 |    |    |    |-- gyldigTil: string (nullable = true)
 |    |    |-- sidstOpdateret: string (nullable = true)
 |    |    |-- virksomhedsformkode: long (nullable = true)
 |-- maanedsbeskaeftigelse: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- aar: long (nullable = true)
 |    |    |-- intervalKodeAntalAarsvaerk: string (nullable = true)
 |    |    |-- intervalKodeAntalAnsatte: string (nullable = true)
 |    |    |-- maaned: long (nullable = true)
 |    |    |-- sidstOpdateret: string (nullable = true)
 |-- kvartalsbeskaeftigelse: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- aar: long (nullable = true)
 |    |    |-- intervalKodeAntalAarsvaerk: string (nullable = true)
 |    |    |-- intervalKodeAntalAnsatte: string (nullable = true)
 |    |    |-- kvartal: long (nullable = true)
 |    |    |-- sidstOpdateret: string (nullable = true)


In [16]:
#Extract all Aps and A/S companies
companyByAsApsDf = sqlContext.read.parquet(cvrPath+"/AllApsAs.parquet")
companyByAsApsDf.drop("rank").drop("ansvarligDataleverandoer").drop("virksomhedsformkode").show(10)


+---------+---------------+---------------+-----------------+-----------------+
|cvrNummer|kortBeskrivelse|langBeskrivelse|periode_gyldigFra|periode_gyldigTil|
+---------+---------------+---------------+-----------------+-----------------+
| 10005574|            APS| Anpartsselskab|       1985-11-08|       1988-03-26|
| 10009022|            APS| Anpartsselskab|       1999-10-01|       2015-09-17|
| 10011620|            A/S|   Aktieselskab|       1999-11-18|             null|
| 10012376|            A/S|   Aktieselskab|       1985-11-08|             null|
| 10013259|            APS| Anpartsselskab|       2000-01-10|       2007-02-21|
| 10014816|            APS| Anpartsselskab|       2000-01-21|             null|
| 10016959|            APS| Anpartsselskab|       2000-02-09|       2008-04-16|
| 10023122|            APS| Anpartsselskab|       2000-03-01|       2007-04-25|
| 10025753|            A/S|   Aktieselskab|       2010-12-14|             null|
| 10029457|            APS| Anpartsselskab|       2000-06-27|             null|
+---------+---------------+---------------+-----------------+-----------------+
only showing top 10 rows

Hypotese:

  • I hvor høj grad korrelerer kapitalforhøjelser med vækst i virksomhederne? Der skal i den sammenhæng tages højde for (over)kursen ved kapitalforhøjelsen. Der er regnet med antal ansatte intervalskoden og antal årsværk

In [156]:
display(Markdown("#### Import medarbejdstal"))
medarbejdsDf = sqlContext.read.parquet(cvrPath+"/TotalAarsVaerker.parquet")
medarbejdsDf.limit(10).toPandas()#.show(10)


Import medarbejdstal

Out[156]:
cvrNummer aar maaned lower_intervalKodeAntalAarsvaerk lower_intervalKodeAntalAnsatte kortBeskrivelse
0 12617585 2009 1 None 20 A/S
1 17201948 2009 1 None 2 APS
2 24219194 2009 1 None 1 APS
3 26273366 2009 1 None 10 A/S
4 10089646 2009 1 None 10 APS
5 14591443 2009 1 None 1 APS
6 30562119 2009 1 None 0 APS
7 75060513 2009 1 None 2 APS
8 30284895 2009 1 None 2 APS
9 26090555 2009 1 None 2 APS

In [18]:
# we are only interested in kapital after 1997
mainKapitalDf = (sqlContext
                 .read
                 .parquet(cvrPath+"/KaptialDataFrame.parquet")
                 .drop("KAPITALKLASSER_vaerdi")
                 .drop("KAPITAL_DELVIST_vaerdi")
                 .withColumn(col=F.coalesce(F.col("gyldigTil"),F.lit(F.current_date())),colName="gyldigTil")
                 .withColumn(col=F.datediff(F.col("GyldigTil"),F.col("gyldigFra")),colName="datediff")
                 .withColumn(col=F.col("KAPITAL_vaerdi").cast("double"),colName="KAPITAL_vaerdi")
                 .filter(F.year("gyldigFra") >= 1997)
                )
mainKapitalDf.show(5)
mainKapitalDf.printSchema()


+----+------+----------+---------+----------+----------+--------------+--------------------+--------+------------+------------+
| aar|maaned|        ts|cvrNummer| gyldigFra| gyldigTil|KAPITAL_vaerdi|KAPITALVALUTA_vaerdi|datediff|timeStampFra|timeStampTil|
+----+------+----------+---------+----------+----------+--------------+--------------------+--------+------------+------------+
|2012|     1|1325372400| 10000025|1999-10-13|2017-03-09|      125000.0|                 DKK|    6357|   939765600|  1489014000|
|2012|     1|1325372400| 10000157|1999-11-04|2017-03-09|      125000.0|                 DKK|    6335|   941670000|  1489014000|
|2012|     1|1325372400| 10000211|1999-09-29|2017-03-09|      500000.0|                 DKK|    6371|   938556000|  1489014000|
|2012|     1|1325372400| 10000254|2009-12-29|2017-03-09|     1100500.0|                 DKK|    2627|  1262041200|  1489014000|
|2012|     1|1325372400| 10000262|1999-10-01|2017-03-09|      144000.0|                 DKK|    6369|   938728800|  1489014000|
+----+------+----------+---------+----------+----------+--------------+--------------------+--------+------------+------------+
only showing top 5 rows

root
 |-- aar: long (nullable = true)
 |-- maaned: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- cvrNummer: long (nullable = true)
 |-- gyldigFra: date (nullable = true)
 |-- gyldigTil: date (nullable = false)
 |-- KAPITAL_vaerdi: double (nullable = true)
 |-- KAPITALVALUTA_vaerdi: string (nullable = true)
 |-- datediff: integer (nullable = true)
 |-- timeStampFra: long (nullable = true)
 |-- timeStampTil: long (nullable = true)

The following cell divides the attributes into two data frames in order to make a proper sampling of medarbejdstal compared to years. Yeah kapital entry is looked at, in respect to the amount of days, that this entry is current. Meaning, entries that are current, for more than a year gets joined as secondary tabel to medarbejdstal. Entries, that are opposite gets joined as primary tabel.


In [20]:
display(Markdown("### Hvornår opdateres kapitalværdierne?"))
#How does the duration look for posting kapitals?
datediffs = mainKapitalDf.select(["cvrNummer","datediff"]).distinct().na.drop("any").toPandas()
plt.hist(datediffs["datediff"],bins=100,range=[0,8000])
plt.title("Histogram of durration of submissions for kapital")
plt.xlabel("Days")
plt.ylabel("Count")
plt.axis()
plt.show()
#datediffs


Hvornår opdateres kapitalværdierne?


In [21]:
avgKapital = (mainKapitalDf
              .filter(F.col("KAPITALVALUTA_vaerdi") == "DKK")
              .select("cvrNummer","KAPITAL_vaerdi","gyldigFra")
              .distinct()
              .groupBy("cvrNummer")
              .mean("KAPITAL_vaerdi")
              .withColumnRenamed(existing="avg(KAPITAL_vaerdi)",new="avgkapital")
              .na
              .drop("any")
              .toPandas())
p1 = plt.hist(avgKapital["avgkapital"],bins=150,range=[125000,1000000000])
plt.yscale('log')
plt.title("Average kapital for each Company in DKK")
plt.ylabel("Count")
plt.xlabel("Kroner")

display(Markdown("### Hvad er den gennemsnitlig kapital i virksomhederne?"))
plt.show()


Hvad er den gennemsnitlig kapital i virksomhederne?

Medarbejdstal is created here!


In [22]:
#the kapital gets joined with years  in mainKap over
kapOverDf = (medarbejdsDf
             .join(other=mainKapitalDf,on=((medarbejdsDf["cvrNummer"] == mainKapitalDf["cvrNummer"]) 
                                         & (medarbejdsDf["aar"] == mainKapitalDf["aar"]) 
                                         & (medarbejdsDf["maaned"] == mainKapitalDf["maaned"])),how="inner")
             .drop(mainKapitalDf["cvrNummer"])
             .drop(mainKapitalDf["aar"])
             .drop(mainKapitalDf["maaned"])
             .filter(F.col("KAPITALVALUTA_vaerdi")=="DKK")
            )

desckapOverDf = kapOverDf.describe()

In [23]:
kapOverDf.orderBy("cvrNummer","aar","maaned").show()


+---------+----+------+--------------------------------+------------------------------+---------------+---------+----------+----------+--------------+--------------------+--------+------------+------------+
|cvrNummer| aar|maaned|lower_intervalKodeAntalAarsvaerk|lower_intervalKodeAntalAnsatte|kortBeskrivelse|       ts| gyldigFra| gyldigTil|KAPITAL_vaerdi|KAPITALVALUTA_vaerdi|datediff|timeStampFra|timeStampTil|
+---------+----+------+--------------------------------+------------------------------+---------------+---------+----------+----------+--------------+--------------------+--------+------------+------------+
| 10000009|1999|    11|                            null|                             0|            APS|941410800|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|1999|    12|                            null|                             0|            APS|944002800|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|     1|                            null|                             0|            APS|946681200|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|     2|                            null|                             0|            APS|949359600|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|     3|                            null|                             0|            APS|951865200|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|     4|                            null|                             0|            APS|954540000|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|     5|                            null|                             0|            APS|957132000|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|     6|                            null|                             0|            APS|959810400|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|     7|                            null|                             1|            APS|962402400|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|     8|                            null|                             1|            APS|965080800|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|     9|                            null|                             1|            APS|967759200|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|    10|                            null|                             0|            APS|970351200|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|    11|                            null|                             0|            APS|973033200|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2000|    12|                            null|                             0|            APS|975625200|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2001|     1|                            null|                             0|            APS|978303600|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2001|     2|                            null|                             0|            APS|980982000|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2001|     3|                            null|                             0|            APS|983401200|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2001|     4|                            null|                             0|            APS|986076000|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2001|     5|                            null|                             0|            APS|988668000|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
| 10000009|2001|     6|                            null|                             0|            APS|991346400|1999-10-12|2001-12-11|      125000.0|                 DKK|     791|   939679200|  1008025200|
+---------+----+------+--------------------------------+------------------------------+---------------+---------+----------+----------+--------------+--------------------+--------+------------+------------+
only showing top 20 rows


In [24]:
#totalDf.printSchema()
#totalDf.orderBy("cvrNummer","aar").show()
describeKapMedDf = (kapOverDf
                    .filter(F.col("KAPITALVALUTA_vaerdi")=="DKK")
                    .withColumnRenamed(existing="lower_intervalKodeAntalAarsvaerk",new="AntalAarsvaerk")
                    .withColumnRenamed(existing="lower_intervalKodeAntalAnsatte",new="AntalAnsatte")
                    .drop("cvrNummer")
                    .drop("timeStampFra")
                    .drop("timeStampTil")
                    .drop("gyldigFra")
                    .drop("gyldigTil")
                    .drop("ts"))
describeKapMedDf.show()


+----+------+--------------+------------+---------------+--------------+--------------------+--------+
| aar|maaned|AntalAarsvaerk|AntalAnsatte|kortBeskrivelse|KAPITAL_vaerdi|KAPITALVALUTA_vaerdi|datediff|
+----+------+--------------+------------+---------------+--------------+--------------------+--------+
|2005|     9|          null|          20|            A/S|     1000000.0|                 DKK|    3414|
|2005|    12|          null|          20|            A/S|     1000000.0|                 DKK|    3414|
|2007|     7|          null|          20|            A/S|     1000000.0|                 DKK|    3414|
|2013|    10|             1|           1|            A/S|        1.15E7|                 DKK|     481|
|2008|    12|          null|           5|            A/S|      500000.0|                 DKK|    6358|
|2007|    10|          null|          10|            A/S|      500000.0|                 DKK|    3691|
|2003|     6|          null|           5|            APS|      125000.0|                 DKK|    1701|
|1999|    12|          null|           1|            APS|      125000.0|                 DKK|    1691|
|2006|     1|          null|           5|            A/S|      500000.0|                 DKK|    6335|
|2009|    10|          null|           2|            A/S|      500000.0|                 DKK|    6335|
|2015|     2|             2|           5|            A/S|      500000.0|                 DKK|    6335|
|2009|     3|             1|           2|            APS|      125000.0|                 DKK|    6370|
|2002|     8|          null|           1|            APS|      125000.0|                 DKK|    2567|
|2014|     2|             1|           2|            APS|      125000.0|                 DKK|    6369|
|2000|     8|          null|           1|            APS|      125000.0|                 DKK|    5481|
|2000|     6|          null|           1|            APS|      125000.0|                 DKK|    1112|
|2006|    12|             0|           1|            APS|      125000.0|                 DKK|    6369|
|2011|    12|          null|           1|            APS|      125000.0|                 DKK|    6369|
|2001|     1|          null|           2|            APS|      125000.0|                 DKK|    6351|
|2012|     9|          null|          10|            APS|      125000.0|                 DKK|    6351|
+----+------+--------------+------------+---------------+--------------+--------------------+--------+
only showing top 20 rows

OK lets try the correlation (Pearsons) between kapital and the two work-figures...


In [34]:
#The three beskæftigelses numbers are joined together and re-sampled
display(Markdown("### Standard korrelations koeficienter."))
print("Korrelationen imellem kapital og årsværker: "+str(kapOverDf.corr("KAPITAL_vaerdi","lower_intervalKodeAntalAarsvaerk"))[:5])
print("Korrelationen imellem kapital og årsværker: "+str(kapOverDf.corr("KAPITAL_vaerdi","lower_intervalKodeAntalAnsatte"))[:5])


Korrelations koeficienter.

Korrelationen imellem kapital og årsværker: 0.112
Korrelationen imellem kapital og årsværker: 0.288

In [109]:
#do stuff to the description dataframe
def scaleEm(df,labelCol,featCols):
    
    
    meanAndStd = (df.describe().filter( (F.col("summary") == "mean")|(F.col("summary") == "stddev") )
              .rdd
              .map(lambda x: (x["summary"],x.asDict())).collectAsMap())
    mstdBroadcast = sc.broadcast(meanAndStd)
    
    
    #the function columns are made here!
    scaleCol = [((F.col(i) - F.lit(mstdBroadcast.value["mean"][i]) )/F.lit(mstdBroadcast.value["stddev"][i])).alias(i) for i in featcols]
    featuresDf = (kapOverDf
                  .select(labelsCol+scaleCol)
                  .distinct()
                 )
    return featuresDf

In [110]:
# OK so we're taking the log1p first if that doesn't work then we'll scale 'em 

labelsCol = ["cvrNummer","lower_intervalKodeAntalAarsvaerk","lower_intervalKodeAntalAnsatte","aar"]
featcols = ["KAPITAL_vaerdi"]
onlyLogKapCols = [F.log1p("KAPITAL_vaerdi").alias("KAPITAL_vaerdi"),"lower_intervalKodeAntalAarsvaerk","lower_intervalKodeAntalAnsatte","aar"]
#funcsCol = [((F.col(i) - F.lit(mstdBroadcast.value["mean"][i]) )/F.lit(mstdBroadcast.value["stddev"][i])).alias(i) for i in featcols]
#logFuncCol = [F.log1p(i) for i in featcols]
featuresDf = (scaleEm(kapOverDf,labelsCol,onlyLogKapCols)
              .withColumnRenamed(existing="lower_intervalKodeAntalAarsvaerk",new="pAntalAarsvaerk")
              .withColumnRenamed(existing="lower_intervalKodeAntalAnsatte",new="pAntalAnsatte"))

In [54]:
showScatterMatrix(featuresDf,labelsCol+featcols)



In [44]:
def translateCols(df,months):
    '''
    NOTE: needs to be more general!
    '''
    windowYearLag = (Window
                 .partitionBy(F.col("cvrNummer"))
                 .orderBy(F.col("aar"),F.col("maaned")))
    
    return (df
            .withColumn(col=F.lead(F.col("lower_intervalKodeAntalAarsvaerk"),count=months).over(windowYearLag),colName="pAntalAarsvaerk")
            .withColumn(col=F.lead(F.col("lower_intervalKodeAntalAnsatte"),count=months).over(windowYearLag),colName="pAntalAnsatte")
            .na
            .drop("all",subset=["pAntalAarsvaerk","pAntalAnsatte"])
            .select(["cvrNummer","aar","maaned","ts","KAPITAL_vaerdi","pAntalAarsvaerk","pAntalAnsatte"])
           )

In [113]:
oneYearDf = translateCols(kapOverDf,12).cache()
twoYearsDf = translateCols(kapOverDf,24).cache()
threeYearsDf = translateCols(kapOverDf,36).cache()
allDfs = [featuresDf,oneYearDf,twoYearsDf,threeYearsDf]
allDfs[0].show()


+---------+---------------+-------------+----+--------------------+
|cvrNummer|pAntalAarsvaerk|pAntalAnsatte| aar|      KAPITAL_vaerdi|
+---------+---------------+-------------+----+--------------------+
| 10006856|           null|           20|2012| 0.05961542561398157|
| 10038200|           null|            5|2012| 0.12650146584013006|
| 10055261|           null|            2|2006|-0.05452707181790555|
| 10150450|              1|            0|2009|-0.05452707181790555|
| 10217601|           null|            1|2009|-0.05452707181790555|
| 10223601|           null|           10|2012|-0.04362173822404799|
| 10316472|           null|            1|2011|-0.05452707181790555|
| 10523249|           null|            5|2008|-0.04798387166159...|
| 12497040|           null|          500|2007|  0.8157185489719283|
| 12502893|           null|            2|2009|0.016575703214045785|
| 12619499|             10|           10|2014|-0.04798387166159...|
| 12673191|           null|            1|2011|-0.05007769571161...|
| 12699794|           null|            5|2012|-0.04798387166159...|
| 13241430|           null|           10|2004| -0.0462390182865738|
| 13623279|           null|            5|2009|-0.05452707181790555|
| 13909709|           null|           20|2011|  1.1646892239753706|
| 13992401|              2|            2|1997|-0.04798387166159...|
| 14073590|              1|            2|1998|-0.05321843178664264|
| 14119507|              5|            5|2015|-0.04798387166159...|
| 14345493|           null|            5|2003|-0.04798387166159...|
+---------+---------------+-------------+----+--------------------+
only showing top 20 rows


In [55]:
showScatterMatrix(oneYearDf,["aar","cvrNummer",F.log1p("KAPITAL_vaerdi"),"pAntalAarsvaerk","pAntalAnsatte"])



In [56]:
#oneYearDf.show()
display(Markdown("### Korrelation med forskudt kapiptal"))
print("Korrelation imellem Årsværk og kapital efter 1 årsforskydning: "+str(oneYearDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAarsvaerk").corr("vaerdi","pAntalAarsvaerk"))[:5])
print("Korrelation imellem Ansatte og kapital efter 1 årsforskydning: "+str(oneYearDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAnsatte").corr("vaerdi","pAntalAnsatte"))[:5])
print("Årsværk og kapital efter 2 år: "+str(twoYearsDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAarsvaerk").corr("vaerdi","pAntalAarsvaerk"))[:5])
print("Ansatte og kapital efter 2 år: "+str(twoYearsDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAnsatte").corr("vaerdi","pAntalAnsatte"))[:5])
print("Årsværk og kapital efter 3 år: "+str(threeYearsDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAarsvaerk").corr("vaerdi","pAntalAarsvaerk"))[:5])
print("Ansatte og kapital efter 3 år: "+str(threeYearsDf.select(F.log1p("KAPITAL_vaerdi").alias("vaerdi"),"pAntalAnsatte").corr("vaerdi","pAntalAnsatte"))[:5])
display(Markdown("Ikke den store overaskelse..."))


Korrelation med forskudt kapiptal

Korrelation imellem Årsværk og kapital efter 1 årsforskydning: 0.085
Korrelation imellem Ansatte og kapital efter 1 årsforskydning: 0.235
Årsværk og kapital efter 2 år: 0.099
Ansatte og kapital efter 2 år: 0.247
Årsværk og kapital efter 3 år: 0.113
Ansatte og kapital efter 3 år: 0.255

Ikke den store overaskelse...


In [57]:
#twoYearsDf.show()
print(oneYearDf.count())
print(twoYearsDf.count())
print(threeYearsDf.count())


9361787
7776803
6470397

In [49]:
import time
def quantile(rdd, p, sample=None, seed=None):
    """Compute a quantile of order p ∈ [0, 1]
    :rdd a numeric rdd
    :p quantile(between 0 and 1)
    :sample fraction of and rdd to use. If not provided we use a whole dataset
    :seed random number generator seed to be used with sample
    """
    assert 0 <= p <= 1
    assert sample is None or 0 < sample <= 1

    seed = seed if seed is not None else time.time()
    rdd = rdd if sample is None else rdd.sample(False, sample, seed)

    rddSortedWithIndex = (rdd
                          .sortBy(lambda x: x)
                          .zipWithIndex()
                          .map(lambda x: (x[1], x[0]))        
                          .cache())

    n = rddSortedWithIndex.count()
    h = (n - 1) * p

    rddX, rddXPlusOne = (
        rddSortedWithIndex.lookup(x)[0]
        for x in int(np.floor(h)) + np.array([0, 1]))

    return rddX + (h - np.floor(h)) * (rddXPlusOne - rddX)

In [50]:
#heres what you'll do. Filter on pantalansatte
def getQuantileOutliers(df,group=0,subset=["cvrNummer","aar","KAPITAL_vaerdi","pAntalAarsvaerk","pAntalAnsatte"],valueCol="KAPITAL_vaerdi",groupCol="pAntalAnsatte"):

    groupPdf = (oneYearDf
                .dropDuplicates(subset)
                .filter((F.col(groupCol)==group))
                .toPandas())
    
    q1 = groupPdf.quantile(0.25)
    q3 = groupPdf.quantile(0.75)
    iQR = q3 - q1
    #print(q1-iQR*1.5)
    #print(q3)
    #print(iQR["KAPITAL_vaerdi"])

    return (oneYearDf
            .dropDuplicates(subset)
            .filter((~F.col(valueCol).between(q1[valueCol]-1.5*iQR[valueCol],q3[valueCol]+1.5*iQR[valueCol]))
                    & (F.col(groupCol)==group))
                 )

#quantile(oneYearDf.select("KAPITAL_vaerdi").na.drop().rdd.map(lambda x: x[0]),0.75)

Box plot for aarsværkstal and medarbejdstal with displacement


In [144]:
plotLength = len(allDfs)
years = ["Årsværker", "Antal ansatte"]
funCols = ["pAntalAnsatte","pAntalAarsvaerk"]
fig, axes = plt.subplots(1,2,figsize=(10,5))
#allDfs[i].printSchema()
df = (allDfs[0]
      .filter(F.col("aar")==2012)
      .select(F.log1p("KAPITAL_vaerdi").alias("log_kapital"),"pAntalAnsatte","pAntalAarsvaerk")
      .toPandas())

#allDfs[0].show()

for i in range(2):
    
    axes[i].set_title("kapital sammenlignet med "+years[i])
    sb.boxplot(x=funCols[i],y="log_kapital",data=df,ax=axes[i])
    #sb.boxplot(x="pAntalAarsvaerk",y="log_kapital",data=df,ax=aarsAx)

display(Markdown("### Boxplot for Årsværk og antal ansatte kombineret med kapital i 2012"))
[plt.setp(item.yaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
    #x ticklabels
[plt.setp(item.xaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
[plt.setp(item.yaxis.get_label(), 'size', 5) for item in axes.ravel()]
    #x labels
[plt.setp(item.xaxis.get_label(), 'size', 5) for item in axes.ravel()]
plt.show()


Boxplot for Årsværk og antal ansatte kombineret med kapital i 2012


In [117]:
#display(Markdown("Boxplot for Årsværk og antal ansatte kombineret med 1  forskudt kapital"))

df = (allDfs[1]
      .filter(F.col("aar") == 2012)
        .select(F.log1p("KAPITAL_vaerdi").alias("log_kapital"),"pAntalAnsatte","pAntalAarsvaerk")
        .toPandas())
fig, axes = plt.subplots(1,2,figsize=(10,5))    
for i in range(2,4):
    #allDfs[i].printSchema()

    axes[i-2].set_title("Forskudt kapital sammenlignet med "+years[i-2])
    sb.boxplot(x=funCols[i-2],y="log_kapital",data=df,ax=axes[i-2])
    #sb.boxplot(x="pAntalAarsvaerk",y="log_kapital",data=df,ax=aarsAx)
[plt.setp(item.yaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
    #x ticklabels
[plt.setp(item.xaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
[plt.setp(item.yaxis.get_label(), 'size', 5) for item in axes.ravel()]
    #x labels
[plt.setp(item.xaxis.get_label(), 'size', 5) for item in axes.ravel()]
display(Markdown("### Boxplot for Årsværk og antal ansatte kombineret med 1 års forskudt kapital i 2012"))
plt.show()


Boxplot for Årsværk og antal ansatte kombineret med 1 års forskudt kapital i 2012


In [123]:
df = (allDfs[2]
      .filter(F.col("aar") == 2012)
        .select(F.log1p("KAPITAL_vaerdi").alias("log_kapital"),"pAntalAnsatte","pAntalAarsvaerk")
        .toPandas())
fig, axes = plt.subplots(1,2,figsize=(10,5))    
for i in range(4,6):
    #allDfs[i].printSchema()

    axes[i-4].set_title("Forskudt kapital sammenlignet med "+years[i-4])
    sb.boxplot(x=funCols[i-4],y="log_kapital",data=df,ax=axes[i-4])
    #sb.boxplot(x="pAntalAarsvaerk",y="log_kapital",data=df,ax=aarsAx)
[plt.setp(item.yaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
    #x ticklabels
[plt.setp(item.xaxis.get_majorticklabels(), 'size', 5) for item in axes.ravel()]
[plt.setp(item.yaxis.get_label(), 'size', 5) for item in axes.ravel()]
    #x labels
[plt.setp(item.xaxis.get_label(), 'size', 5) for item in axes.ravel()]
display(Markdown("### Boxplot for Årsværk og antal ansatte kombineret 2 års forskudt med kapital i 2012"))
plt.show()


Boxplot for Årsværk og antal ansatte kombineret 2 års forskudt med kapital i 2012


In [96]:
windowSpecRank =(Window.partitionBy(F.col("cvrNummer"))).orderBy(F.col("periode_gyldigFra").desc())
groupCols = ["cvrNummer","vaerdi"]

companyNameDf = (sqlContext
                 .read
                 .parquet(namePath+"companyCvrData")
                 .withColumn(colName="rank",col=F.rank().over(windowSpecRank))
                 .filter((F.col("rank")==1) & (F.col("sekvensnr")==0))
                 .select([F.col(i) for i in groupCols])
                 .withColumnRenamed(existing="vaerdi",new="navn")
                 .orderBy(F.col("cvrNummer"))
                 .cache()
                )

In [149]:
qOutliersDf = getQuantileOutliers(allDfs[1].filter(F.col("aar")==2012),group=1)
withCompanies =  (qOutliersDf
                  .join(other=companyNameDf,on=(qOutliersDf["cvrNummer"]==companyNameDf["cvrNummer"]),how="left")
                  .select("navn","KAPITAL_vaerdi")
                  .groupBy("navn")
                  .agg(F.mean("KAPITAL_vaerdi"))
                  .orderBy(F.col("avg(KAPITAL_vaerdi)").desc())
                 )#join companyname her!
display(Markdown("### Top 20 outliers med gennemsnitlig kapital for 1 ansat forskudt med 1 år"))
withCompanies.show(truncate=False)

print( qOutliersDf.count())


Top 20 outliers med gennemsnitlig kapital for 1 ansat forskudt med 1 år

+-------------------------------------+-------------------+
|navn                                 |avg(KAPITAL_vaerdi)|
+-------------------------------------+-------------------+
|USIMINAS GALVANIZED STEEL            |2.903838439E9      |
|USIMINAS ELECTROGALVANIZED STEEL     |2.897235469E9      |
|SEAS-NVE NET                         |2.316457E9         |
|FLSMIDTH & CO.                       |1.064E9            |
|A.P. MØLLER HOLDING                  |1.0E9              |
|PEPSICO INVESTMENTS DENMARK LIMITED 1|9.95175E8          |
|FIH KAPITAL BANK                     |9.0E8              |
|ISS WORLD SERVICES                   |8.9095912E8        |
|CODAN                                |8.8209775E8        |
|AMAGERBANKEN.                        |8.318321E8         |
|KAPITALPLEJE                         |8.02E8             |
|FIONIA ASSET COMPANY                 |7.249505733333334E8|
|FORMUEPLEJE PENTA                    |6.3423539625E8     |
|PRAS                                 |5.775E8            |
|VANDCENTER SYD                       |5.61797753E8       |
|SCANDINAVIAN PRIVATE EQUITY          |5.005E8            |
|MP EJENDOMME                         |4.84375E8          |
|MAERSK INSURANCE                     |4.835E8            |
|ENERGIGRUPPEN JYLLAND                |4.4839064E8        |
|PFA INVEST INTERNATIONAL             |4.416666666666667E8|
+-------------------------------------+-------------------+
only showing top 20 rows

105766

In [150]:
qOutliersDf = getQuantileOutliers(allDfs[1].filter(F.col("aar")==2012),group=50)
withCompanies =  (qOutliersDf
                  .join(other=companyNameDf,on=(qOutliersDf["cvrNummer"]==companyNameDf["cvrNummer"]),how="left")
                  .select("navn","KAPITAL_vaerdi")
                  .groupBy("navn")
                  .agg(F.mean("KAPITAL_vaerdi"))
                  .orderBy(F.col("avg(KAPITAL_vaerdi)").desc())
                 )#join companyname her!
display(Markdown("### Top 20 outliers med gennemsnitlig kapital for 50 ansatte forskudt med 1 år"))
withCompanies.show(truncate=False)


Top 20 outliers med gennemsnitlig kapital for 50 ansatte forskudt med 1 år

+------------------------------------+-------------------+
|navn                                |avg(KAPITAL_vaerdi)|
+------------------------------------+-------------------+
|DE FORENEDE BRYGGERIER              |3.05113612E9       |
|ATP-EJENDOMME                       |2.314285714285714E9|
|SANTA FE GROUP                      |1.831640776E9      |
|CHR. HANSEN HOLDING                 |1.205005776E9      |
|DANSKE KREDIT REALKREDIT            |1.0E9              |
|FLSMIDTH & CO.                      |9.8344262E8        |
|JEUDAN                              |9.732524E8         |
|SWISS RE DENMARK REINSURANCE        |8.94127125E8       |
|AGFA-GEVAERT                        |8.63043E8          |
|ISS WORLD SERVICES                  |8.423379957142857E8|
|GN STORE NORD                       |7.93133804E8       |
|SKANDINAVISKA ENSKILDA BANKEN       |7.812503125E8      |
|RPC SUPERFOS                        |7.519835E8         |
|SWISS RE DENMARK H                  |7.00125E8          |
|ENERGIGRUPPEN JYLLAND               |6.6616798E8        |
|ESN AF 31. DECEMBER 2004            |6.4384944E8        |
|TDC MOBILE INTERNATIONAL            |6.0E8              |
|STATOIL DETAILHANDEL                |5.36E8             |
|HOLDINGSELSKABET AF 3. NOVEMBER 2005|5.2E8              |
|FIH                                 |5.135725E8         |
+------------------------------------+-------------------+
only showing top 20 rows


In [151]:
qOutliersDf = getQuantileOutliers(allDfs[2].filter(F.col("aar")==2012),group=50)
withCompanies =  (qOutliersDf
                  .join(other=companyNameDf,on=(qOutliersDf["cvrNummer"]==companyNameDf["cvrNummer"]),how="left")
                  .select("navn","KAPITAL_vaerdi")
                  .groupBy("navn")
                  .agg(F.mean("KAPITAL_vaerdi"))
                  .orderBy(F.col("avg(KAPITAL_vaerdi)").desc())
                 )#join companyname her!
display(Markdown("### Top 20 outliers med gennemsnitlig kapital for 50 ansatte forskudt med 2 år"))
withCompanies.show(truncate=False)


Top 20 outliers med gennemsnitlig kapital for 50 ansatte forskudt med 2 år

+------------------------------------+-------------------+
|navn                                |avg(KAPITAL_vaerdi)|
+------------------------------------+-------------------+
|DE FORENEDE BRYGGERIER              |3.05113612E9       |
|ATP-EJENDOMME                       |2.314285714285714E9|
|SANTA FE GROUP                      |1.831640776E9      |
|CHR. HANSEN HOLDING                 |1.205005776E9      |
|DANSKE KREDIT REALKREDIT            |1.0E9              |
|FLSMIDTH & CO.                      |9.8344262E8        |
|JEUDAN                              |9.732524E8         |
|SWISS RE DENMARK REINSURANCE        |8.94127125E8       |
|AGFA-GEVAERT                        |8.63043E8          |
|ISS WORLD SERVICES                  |8.423379957142857E8|
|GN STORE NORD                       |7.93133804E8       |
|SKANDINAVISKA ENSKILDA BANKEN       |7.812503125E8      |
|RPC SUPERFOS                        |7.519835E8         |
|SWISS RE DENMARK H                  |7.00125E8          |
|ENERGIGRUPPEN JYLLAND               |6.6616798E8        |
|ESN AF 31. DECEMBER 2004            |6.4384944E8        |
|TDC MOBILE INTERNATIONAL            |6.0E8              |
|STATOIL DETAILHANDEL                |5.36E8             |
|HOLDINGSELSKABET AF 3. NOVEMBER 2005|5.2E8              |
|FIH                                 |5.135725E8         |
+------------------------------------+-------------------+
only showing top 20 rows

Opsummering

  • Medarbejds- og Årsværkstal er indelt i kategorier, mens kapital er mere frit indsat.

  • Ændringer i kapital er ret uregelmæssigt indberettet, mens årsværker og antal ansatte indberettes fra års, kvartals og månedsbasis.

  • Det ses at der findes mange "outliers" ift. virksomheder der har få ansatte eller antalårsværk i forhold til kapital. Dog ses det også at flere firmaer ligger "pænt" når kapitalen forskydes med 1 og 2 år.

  • Yderligere undersøgelse kunne omhandle outliers i de forskellige grupper, for at se om firmaer vandre fra gruppe til gruppe.


In [143]:
qOutliersArr = [getQuantileOutliers(allDfs[i].filter(F.col("aar")==2012),group=1) for i in range(1,4)]

withCompanies =  [(qOutliersArr[i]
                  .join(other=companyNameDf,on=(qOutliersArr[i]["cvrNummer"]==companyNameDf["cvrNummer"]),how="left")
                  .select("navn","KAPITAL_vaerdi")
                  .groupBy("navn")
                  .agg(F.mean("KAPITAL_vaerdi"))
                  .orderBy(F.col("avg(KAPITAL_vaerdi)").desc())
                 ) for i in range(0,3)]
display(Markdown("Gennemsnitlig   "))
.subtract(withCompanies1).show(truncate=False)
(withCompanies[1].subtract(withCompanies1)).show(truncate=False)
withCompanies[2].subtract(withCompanies1).show(truncate=False)


Gennemsnitlig

+------------------------------------+-------------------+
|navn                                |avg(KAPITAL_vaerdi)|
+------------------------------------+-------------------+
|19.12. AF 2001                      |1000000.0          |
|A KLOVBESKÆR                        |80000.0            |
|A. C. BANG                          |625000.0           |
|AAGE F ROSENSTAND HOLDING           |200000.0           |
|AALBORG PSYKOLOGPRAKSIS             |80000.0            |
|AAQUIST ENTREPRISE                  |577000.0           |
|ADVOKAT 10.10.10                    |80000.0            |
|ADVOKATERNES INKASSOSELSKAB, ADVOKAT|500000.0           |
|AF 1/7 1993, ESBJERG                |253234.0           |
|AF 16. JUNI 2000                    |833000.0           |
|AF 16.7.2010                        |700000.0           |
|AFVIKLINGSSELSKABET AF 2004         |500000.0           |
|AIRLAND PROJECTS                    |80000.0            |
|AL TRADING                          |80000.0            |
|ALLER INTERNATIONAL                 |1.81E7             |
|ALSSUND ORTOPÆDI HOLDING            |1000000.0          |
|BALTIC CONTROL LTD., AARHUS         |200000.0           |
|BECH KNUDSEN REGNSKAB               |80000.0            |
|BF EXPRESS                          |50000.0            |
|BLAKSKJÆR MANAGEMENT                |500000.0           |
+------------------------------------+-------------------+
only showing top 20 rows

+------------------------------------+-------------------+
|navn                                |avg(KAPITAL_vaerdi)|
+------------------------------------+-------------------+
|19.12. AF 2001                      |1000000.0          |
|A KLOVBESKÆR                        |80000.0            |
|A. C. BANG                          |625000.0           |
|AAGE F ROSENSTAND HOLDING           |200000.0           |
|AALBORG PSYKOLOGPRAKSIS             |80000.0            |
|AAQUIST ENTREPRISE                  |577000.0           |
|ADVOKAT 10.10.10                    |80000.0            |
|ADVOKATERNES INKASSOSELSKAB, ADVOKAT|500000.0           |
|AF 1/7 1993, ESBJERG                |253234.0           |
|AF 16. JUNI 2000                    |833000.0           |
|AF 16.7.2010                        |700000.0           |
|AFVIKLINGSSELSKABET AF 2004         |500000.0           |
|AIRLAND PROJECTS                    |80000.0            |
|AL TRADING                          |80000.0            |
|ALLER INTERNATIONAL                 |1.81E7             |
|ALSSUND ORTOPÆDI HOLDING            |1000000.0          |
|BALTIC CONTROL LTD., AARHUS         |200000.0           |
|BECH KNUDSEN REGNSKAB               |80000.0            |
|BF EXPRESS                          |50000.0            |
|BLAKSKJÆR MANAGEMENT                |500000.0           |
+------------------------------------+-------------------+
only showing top 20 rows

+------------------------------------+-------------------+
|navn                                |avg(KAPITAL_vaerdi)|
+------------------------------------+-------------------+
|19.12. AF 2001                      |1000000.0          |
|A KLOVBESKÆR                        |80000.0            |
|A. C. BANG                          |625000.0           |
|AAGE F ROSENSTAND HOLDING           |200000.0           |
|AALBORG PSYKOLOGPRAKSIS             |80000.0            |
|AAQUIST ENTREPRISE                  |577000.0           |
|ADVOKAT 10.10.10                    |80000.0            |
|ADVOKATERNES INKASSOSELSKAB, ADVOKAT|500000.0           |
|AF 1/7 1993, ESBJERG                |253234.0           |
|AF 16. JUNI 2000                    |833000.0           |
|AF 16.7.2010                        |700000.0           |
|AFVIKLINGSSELSKABET AF 2004         |500000.0           |
|AIRLAND PROJECTS                    |80000.0            |
|AL TRADING                          |80000.0            |
|ALLER INTERNATIONAL                 |1.81E7             |
|ALSSUND ORTOPÆDI HOLDING            |1000000.0          |
|BALTIC CONTROL LTD., AARHUS         |200000.0           |
|BECH KNUDSEN REGNSKAB               |80000.0            |
|BF EXPRESS                          |50000.0            |
|BLAKSKJÆR MANAGEMENT                |500000.0           |
+------------------------------------+-------------------+
only showing top 20 rows


In [ ]:


In [ ]:

anova test


In [15]:
def computeExplainedVar(df,groupCol,summationCol):
    '''
        This method computes the explained variance also called
    '''
    funcCols = [F.count,F.avg]
    exprsCols = [f(summationCol) for f in funcCols]
    secondFuncCols = [F.count,F.sum]
    secondExpsCols = [f("avgKapital") for f in secondFuncCols]
    
    totalMean = df.na.drop().groupBy().mean(summationCol).collect()[0]   
    groupMeanDf = (df
                   .na
                   .drop()
                   .select(groupCol,summationCol)
                   .groupBy(groupCol)
                   .agg(*exprsCols)
                   .withColumn(col=
                               F.col("count(KAPITAL_VAERDI)")*(F.col("avg(KAPITAL_VAERDI)")-totalMean[0])**2
                               ,colName="avgKapital")
                   .groupBy()
                   .agg(*secondExpsCols)
                   .withColumn(col=F.col("count(avgKapital)")-F.lit(1),colName="DegreeOFExplained")
                   .withColumn(col=F.col("sum(avgKapital)")/(F.col("DegreeOFExplained")),colName="ExplainedVar")
                   )
    return groupMeanDf

In [16]:
computeExplainedVar(twoYearsDf,"pAntalAnsatte","KAPITAL_VAERDI").show()


+-----------------+--------------------+-----------------+--------------------+
|count(avgKapital)|     sum(avgKapital)|DegreeOFExplained|        ExplainedVar|
+-----------------+--------------------+-----------------+--------------------+
|               11|5.637484665128664...|               10|5.637484665128664E19|
+-----------------+--------------------+-----------------+--------------------+


In [17]:
def computeUnexplainedVar(df,groupCol,summationCol):
    '''
        This method computes the unexplained variance or within-group variability which is the denominator in the F-test
        computation
        
        Input:
            - df spark data frame containing the data. Data should at least contain a group column and the column that is 
            subjected to variance
            - groupCol string that keeps the name of the column listing the group variabels
            - summationCol string that keeps the name of the column with variability
        Output:
            - subtractMeanDf spark data frame that contains the unexplained variance.
    '''
    noMissingDf = (df
                   .select(groupCol,summationCol)
                   .na
                   .drop())
    
    funcCols = [F.mean]
    exprsCols = [f(summationCol) for f in funcCols]
    groupMeanRdd = (noMissingDf
                    .groupBy(groupCol)
                    .agg(*exprsCols)
                    .rdd
                   )
    meanMap = groupMeanRdd.collectAsMap() 
    
    subtractMeanRdd = (noMissingDf
                       .rdd
                       .map(lambda x: (x[0],x[1],meanMap[x[0]]))
                      )
    
    NminusK = noMissingDf.count()-groupMeanRdd.count()
    
    schema = StructType([StructField(groupCol,IntegerType()),StructField(summationCol,DoubleType()),StructField("groupMean",DoubleType())])
    meanFuncUdf = F.udf(lambda x,y: float(((x-y)**2)/(NminusK)),DoubleType())
    
    subtractMeanDf = (sqlContext
                      .createDataFrame(subtractMeanRdd,schema=schema)
                      .withColumn(col=meanFuncUdf(F.col(summationCol),F.col("groupMean")),colName="subSums")
                      .groupBy()
                      .sum()
                      .withColumn(col=F.lit(NminusK),colName="DegreeOFunexplained")
                     )
    
    #subtractMeanDf.show()  
    return subtractMeanDf

In [18]:
#twoYearsDf.show()
computeUnexplainedVar(twoYearsDf,"pAntalAnsatte","KAPITAL_VAERDI").show()


+------------------+--------------------+--------------------+--------------------+-------------------+
|sum(pAntalAnsatte)| sum(KAPITAL_VAERDI)|      sum(groupMean)|        sum(subSums)|DegreeOFunexplained|
+------------------+--------------------+--------------------+--------------------+-------------------+
|          86781155|2.827328359374182E13|2.827328359374062E13|2.992521524352759...|            7670629|
+------------------+--------------------+--------------------+--------------------+-------------------+


In [19]:
def computeF(df,groupCol,summationCol):
    
    explainedVar = computeExplainedVar(df,groupCol,summationCol).collect()[0]
    unExplainedVar = computeUnexplainedVar(df,groupCol,summationCol).collect()[0]
    F_val = float(explainedVar["ExplainedVar"]/unExplainedVar["sum(subSums)"])
    
    return [F_val,explainedVar["DegreeOFExplained"],unExplainedVar["DegreeOFunexplained"]]

In [20]:
F1 = computeF(oneYearDf,"pAntalAnsatte","KAPITAL_VAERDI")

In [21]:
F2 = computeF(twoYearsDf,"pAntalAnsatte","KAPITAL_VAERDI")

In [22]:
sp.stats.f.sf(F2[0], float(F2[1]), float(F2[2]))


Out[22]:
0.0

Debug ftest here!


In [17]:
sp.stats.f.sf(F1[0], float(F1[1]), float(F1[2]))
#print(sp.stats.f.sf(F2[0], float(F2[1]), float(F2[2])))


Out[17]:
0.0

In [ ]:


In [10]:
t1 = [164, 172, 168, 177, 156, 195]
t2 = [178, 191, 197, 182, 185, 177]
t3 = [175, 193, 178, 171, 163, 176]
t4 = [155, 166, 149, 164, 170, 168]
val = pan.DataFrame([t1,t2,t3,t4],index=['type1', 'type2', 'type3', 'type4'],columns=["ex0","ex1","ex2","ex3","ex4","ex5"])

In [45]:
val["label"] = [1, 2, 3, 4]

fxUdf = F.udf(lambda x,y,z,v,w,a: [float(x),float(y),float(z),float(v),float(w),float(a)],ArrayType(DoubleType()))

dftestF = (sqlContext
           .createDataFrame(data=val)
           .withColumn(col=fxUdf(F.col("ex0"),F.col("ex1"),F.col("ex2"),F.col("ex3"),F.col("ex4"),F.col("ex5")),colName="vector")
           .select("label",F.explode("vector").alias("KAPITAL_vaerdi"))          
          )
dftestF.printSchema()


root
 |-- label: long (nullable = true)
 |-- KAPITAL_vaerdi: double (nullable = true)


In [23]:
#dftestF.show()

In [47]:
Ft = computeF(dftestF,"label","KAPITAL_vaerdi")

In [49]:
sp.stats.f.sf(Ft[0], float(Ft[1]), float(Ft[2])) # this shows that own implementation of F.test works, p-value at 0.68


Out[49]:
0.0068759477547351002

In [ ]: