In [1]:
#Always Pyspark first!
ErhvervsPath = "/home/svanhmic/workspace/Python/Erhvervs"

from pyspark.sql import functions as F, Window, WindowSpec
from pyspark.sql import Row
from pyspark.sql.types import StringType,ArrayType,IntegerType,DoubleType,StructField,StructType
sc.addPyFile(ErhvervsPath+"/src/RegnSkabData/ImportRegnskabData.py")
sc.addPyFile(ErhvervsPath+'/src/RegnSkabData/RegnskabsClass.py')
sc.addPyFile(ErhvervsPath+'/src/cvr/GetNextJsonLayer.py')

import sys
import re
import os
import ImportRegnskabData
import GetNextJsonLayer
import itertools
import functools

In [2]:
cvrPath = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/parquet"
cvrfiles = os.listdir(cvrPath)

In [3]:
#import crv data
cvrDf = (sqlContext
         .read
         .parquet(cvrPath+"/"+cvrfiles[0])
        )
#cvrDf.show(1)
#print(cvrDf.select("cvrNummer").distinct().count())

#Extract all Aps and A/S companies
virkformCols = ("cvrNummer","virksomhedsform")

virkformDf = GetNextJsonLayer.createNextLayerTable(cvrDf.select(*virkformCols),[virkformCols[0]],virkformCols[1])
virkformDf = GetNextJsonLayer.expandSubCols(virkformDf,mainColumn="periode")
virkformDf = (virkformDf
              .drop("sidstOpdateret")
              .withColumn(col=F.col("periode_gyldigFra").cast("date"),colName="periode_gyldigFra")
              .withColumn(col=F.col("periode_gyldigTil").cast("date"),colName="periode_gyldigTil")
             )

#virkformDf.show(1)
checkCols = ["kortBeskrivelse","langBeskrivelse","virksomhedsformkode"]

#Consistencycheck is kortBeskrivelse and virksomhedsformkode always mapped the same way
#check1 = virkformDf.select(checkCols+["cvrNummer"]).distinct().groupby(*checkCols).count()
#check1.orderBy("kortBeskrivelse","count").show(check1.count(),truncate=False)

#Second test does any companies go from Aps or A/S to other or vice versa?
joinCols = ["cvrNummer","langBeskrivelse","rank"]
cvrCols = ["cvrNummer"]
gyldigCol = ["periode_gyldigFra"]

statusChangeWindow = (Window
                      .partitionBy(F.col(*cvrCols))
                      .orderBy(F.col("periode_gyldigFra").desc()))

#virkformDf.select(checkCols).distinct().show(50,truncate=False)



#Extract APS and AS here and latest status...
aggregationCols = [F.max(i) for i in gyldigCol]
groupsCol = [i for i in virkformDf.columns if i not in gyldigCol]

companyByAsApsDf = (virkformDf
                    .where((F.col("virksomhedsformkode") == 60) | (F.col("virksomhedsformkode") == 80))
                    .withColumn(col=F.rank().over(statusChangeWindow),colName="rank")
                    .filter(F.col("rank") == 1)
                   )

In [5]:
#Get the medarbejdstal
fastCols = ["cvrNummer","aar"]
regCols = ["intervalKodeAntalAarsvaerk","intervalKodeAntalInklusivEjere"]
reg2Cols = ["intervalKodeAntalAarsvaerk","intervalKodeAntalAnsatte"]

fCols = [F.split(F.regexp_replace(F.col(i),pattern=r'ANTAL_',replacement=""),"_").alias(i) for i in regCols]
mkCols = [F.split(F.regexp_replace(F.col(i),pattern=r'ANTAL_',replacement=""),"_").alias(i) for i in reg2Cols]
#kvartCols = [F.split(F.regexp_replace(F.col(i),pattern=r'ANTAL_',replacement=""),"_").alias(i) for i in cols]

def getLower(x):
    
    try:
        return int(x[0])
    except:
        return None

def getUpper(x):
    
    try:
        return int(x[0])
    except:
        return None

getLowerBound = F.udf(lambda x: getLower(x),IntegerType())
getUpperBound = F.udf(lambda x: getUpper(x),IntegerType())

aarsDf = (GetNextJsonLayer
          .createNextLayerTable(cvrDf,["cvrNummer"],"aarsbeskaeftigelse")
          .select(fastCols+fCols)
          .select(fastCols+[getLowerBound(i).alias("lower_"+i) for i in regCols]) 
         )

maanedsDf = (GetNextJsonLayer
             .createNextLayerTable(cvrDf,["cvrNummer"],"maanedsbeskaeftigelse")
             .select(["cvrNummer","aar","maaned"]+mkCols)
             .select(["cvrNummer","aar","maaned"]+[getLowerBound(i).alias("lower_"+i) for i in reg2Cols])
            )
           
kvartDf = (GetNextJsonLayer
           .createNextLayerTable(cvrDf,["cvrNummer"],"kvartalsbeskaeftigelse")
           .select(["cvrNummer","aar","kvartal"]+mkCols)
           .select(["cvrNummer","aar","kvartal"]+[getLowerBound(i).alias("lower_"+i) for i in reg2Cols])
          )

#maanedsDf.show()
#cvrDf.unpersist()
#maanedsDf.show()
#kvartDf.show()
#print(aarsDf.count())
#print(aarsDf.na.drop(how="all",subset=["lower_"+i for i in cols]).count())

In [6]:
# OK how many are represented in both or all three groups? 
distinctMaanedDf = (maanedsDf
                    .join(companyByAsApsDf,on=(maanedsDf["cvrNummer"]==companyByAsApsDf["cvrNummer"]),how="right")
                    .drop(companyByAsApsDf["cvrNummer"])
                    .distinct()
                    
                   )
#distinctMaanedDf.show()

distinctKvartalDf = (kvartDf      
                     .join(companyByAsApsDf,on=(kvartDf["cvrNummer"]==companyByAsApsDf["cvrNummer"]),how="right")
                     .drop(companyByAsApsDf["cvrNummer"])
                     .distinct()
                    )

distinctAarDf = (aarsDf
                 .join(companyByAsApsDf,on=(aarsDf["cvrNummer"]==companyByAsApsDf["cvrNummer"]),how="right")
                 .drop(companyByAsApsDf["cvrNummer"])
                 .distinct()
                )

In [7]:
distinctMaanedDf.write.parquet(mode="overwrite",path=cvrPath+"/MaanedsVaerker.parquet")
distinctKvartalDf.write.parquet(mode="overwrite",path=cvrPath+"/KvartalsVaerker.parquet")
distinctAarDf.write.parquet(mode="overwrite",path=cvrPath+"/AarsVaerker.parquet")

#print("månedsbeskæftigelse: "+str(distinctMaanedDf.count()))
#print("kvartalsbeskæftigelse: "+str(distinctKvartalDf.count()))
#print("årsbeskæftigelse: "+str(distinctAarDf.count()))



#print("Årsbeskæftigelse til kvartalsbeskæftigelse: "+str(distinctAarDf.select(F.col("cvrNummer")).distinct()
# .join(distinctKvartalDf.select(F.col("cvrNummer")).distinct(),(distinctKvartalDf["cvrNummer"]==distinctAarDf["cvrNummer"]),how="inner")
# .drop(distinctAarDf["cvrNummer"]).distinct().count()
#))
#print("Årsbeskæftigelse til månedsbeskæftigelse: "+str(distinctAarDf.select(F.col("cvrNummer")).distinct()
# .join(distinctMaanedDf.select(F.col("cvrNummer")).distinct(),(distinctMaanedDf["cvrNummer"]==distinctAarDf["cvrNummer"]),how="inner")
# .drop(distinctAarDf["cvrNummer"]).distinct().count()
#))
#print("Kvartalsbeskæftigelse til månedsbeskæftigelse: "+str(distinctKvartalDf.select(F.col("cvrNummer")).distinct()
# .join(distinctMaanedDf.select(F.col("cvrNummer")).distinct(),(distinctMaanedDf["cvrNummer"]==distinctKvartalDf["cvrNummer"]),how="inner")
# .drop(distinctAarDf["cvrNummer"]).distinct().count()
#))

AllThreeCount = (distinctAarDf
                 .select(F.col("cvrNummer")).distinct()
                 .join(distinctKvartalDf.select(F.col("cvrNummer")).distinct(),(distinctAarDf["cvrNummer"]==distinctKvartalDf["cvrNummer"]),how="inner")
                 .drop(distinctKvartalDf["cvrNummer"])
                 .join(distinctMaanedDf.select(F.col("cvrNummer")).distinct(),(distinctAarDf["cvrNummer"]==distinctMaanedDf["cvrNummer"]),how="inner")
                 .drop(distinctMaanedDf["cvrNummer"])
                 .distinct()
                )
#print("Aarsbeskæftigelse til kvartalsbeskæftigelse til månedsbeskæftigelse: "+str(AllThreeCount.count()))

In [7]:


In [ ]:


In [ ]: