In [1]:
#Always Pyspark first!
ErhvervsPath = "/home/svanhmic/workspace/Python/Erhvervs"
from pyspark.sql import functions as F, Window, WindowSpec
from pyspark.sql import Row
from pyspark.sql.types import StringType,ArrayType,IntegerType,DoubleType,StructField,StructType
sc.addPyFile(ErhvervsPath+"/src/RegnSkabData/ImportRegnskabData.py")
sc.addPyFile(ErhvervsPath+'/src/RegnSkabData/RegnskabsClass.py')
sc.addPyFile(ErhvervsPath+'/src/cvr/GetNextJsonLayer.py')
import sys
import re
import os
import ImportRegnskabData
import GetNextJsonLayer
import itertools
import functools
In [2]:
cvrPath = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/parquet"
cvrfiles = os.listdir(cvrPath)
In [3]:
#import crv data
cvrDf = (sqlContext
.read
.parquet(cvrPath+"/"+cvrfiles[0])
)
#cvrDf.show(1)
#print(cvrDf.select("cvrNummer").distinct().count())
#Extract all Aps and A/S companies
virkformCols = ("cvrNummer","virksomhedsform")
virkformDf = GetNextJsonLayer.createNextLayerTable(cvrDf.select(*virkformCols),[virkformCols[0]],virkformCols[1])
virkformDf = GetNextJsonLayer.expandSubCols(virkformDf,mainColumn="periode")
virkformDf = (virkformDf
.drop("sidstOpdateret")
.withColumn(col=F.col("periode_gyldigFra").cast("date"),colName="periode_gyldigFra")
.withColumn(col=F.col("periode_gyldigTil").cast("date"),colName="periode_gyldigTil")
)
#virkformDf.show(1)
checkCols = ["kortBeskrivelse","langBeskrivelse","virksomhedsformkode"]
#Consistencycheck is kortBeskrivelse and virksomhedsformkode always mapped the same way
#check1 = virkformDf.select(checkCols+["cvrNummer"]).distinct().groupby(*checkCols).count()
#check1.orderBy("kortBeskrivelse","count").show(check1.count(),truncate=False)
#Second test does any companies go from Aps or A/S to other or vice versa?
joinCols = ["cvrNummer","langBeskrivelse","rank"]
cvrCols = ["cvrNummer"]
gyldigCol = ["periode_gyldigFra"]
statusChangeWindow = (Window
.partitionBy(F.col(*cvrCols))
.orderBy(F.col("periode_gyldigFra").desc()))
#virkformDf.select(checkCols).distinct().show(50,truncate=False)
#Extract APS and AS here and latest status...
aggregationCols = [F.max(i) for i in gyldigCol]
groupsCol = [i for i in virkformDf.columns if i not in gyldigCol]
companyByAsApsDf = (virkformDf
.where((F.col("virksomhedsformkode") == 60) | (F.col("virksomhedsformkode") == 80))
.withColumn(col=F.rank().over(statusChangeWindow),colName="rank")
.filter(F.col("rank") == 1)
)
In [5]:
#Get the medarbejdstal
fastCols = ["cvrNummer","aar"]
regCols = ["intervalKodeAntalAarsvaerk","intervalKodeAntalInklusivEjere"]
reg2Cols = ["intervalKodeAntalAarsvaerk","intervalKodeAntalAnsatte"]
fCols = [F.split(F.regexp_replace(F.col(i),pattern=r'ANTAL_',replacement=""),"_").alias(i) for i in regCols]
mkCols = [F.split(F.regexp_replace(F.col(i),pattern=r'ANTAL_',replacement=""),"_").alias(i) for i in reg2Cols]
#kvartCols = [F.split(F.regexp_replace(F.col(i),pattern=r'ANTAL_',replacement=""),"_").alias(i) for i in cols]
def getLower(x):
try:
return int(x[0])
except:
return None
def getUpper(x):
try:
return int(x[0])
except:
return None
getLowerBound = F.udf(lambda x: getLower(x),IntegerType())
getUpperBound = F.udf(lambda x: getUpper(x),IntegerType())
aarsDf = (GetNextJsonLayer
.createNextLayerTable(cvrDf,["cvrNummer"],"aarsbeskaeftigelse")
.select(fastCols+fCols)
.select(fastCols+[getLowerBound(i).alias("lower_"+i) for i in regCols])
)
maanedsDf = (GetNextJsonLayer
.createNextLayerTable(cvrDf,["cvrNummer"],"maanedsbeskaeftigelse")
.select(["cvrNummer","aar","maaned"]+mkCols)
.select(["cvrNummer","aar","maaned"]+[getLowerBound(i).alias("lower_"+i) for i in reg2Cols])
)
kvartDf = (GetNextJsonLayer
.createNextLayerTable(cvrDf,["cvrNummer"],"kvartalsbeskaeftigelse")
.select(["cvrNummer","aar","kvartal"]+mkCols)
.select(["cvrNummer","aar","kvartal"]+[getLowerBound(i).alias("lower_"+i) for i in reg2Cols])
)
#maanedsDf.show()
#cvrDf.unpersist()
#maanedsDf.show()
#kvartDf.show()
#print(aarsDf.count())
#print(aarsDf.na.drop(how="all",subset=["lower_"+i for i in cols]).count())
In [6]:
# OK how many are represented in both or all three groups?
distinctMaanedDf = (maanedsDf
.join(companyByAsApsDf,on=(maanedsDf["cvrNummer"]==companyByAsApsDf["cvrNummer"]),how="right")
.drop(companyByAsApsDf["cvrNummer"])
.distinct()
)
#distinctMaanedDf.show()
distinctKvartalDf = (kvartDf
.join(companyByAsApsDf,on=(kvartDf["cvrNummer"]==companyByAsApsDf["cvrNummer"]),how="right")
.drop(companyByAsApsDf["cvrNummer"])
.distinct()
)
distinctAarDf = (aarsDf
.join(companyByAsApsDf,on=(aarsDf["cvrNummer"]==companyByAsApsDf["cvrNummer"]),how="right")
.drop(companyByAsApsDf["cvrNummer"])
.distinct()
)
In [7]:
distinctMaanedDf.write.parquet(mode="overwrite",path=cvrPath+"/MaanedsVaerker.parquet")
distinctKvartalDf.write.parquet(mode="overwrite",path=cvrPath+"/KvartalsVaerker.parquet")
distinctAarDf.write.parquet(mode="overwrite",path=cvrPath+"/AarsVaerker.parquet")
#print("månedsbeskæftigelse: "+str(distinctMaanedDf.count()))
#print("kvartalsbeskæftigelse: "+str(distinctKvartalDf.count()))
#print("årsbeskæftigelse: "+str(distinctAarDf.count()))
#print("Årsbeskæftigelse til kvartalsbeskæftigelse: "+str(distinctAarDf.select(F.col("cvrNummer")).distinct()
# .join(distinctKvartalDf.select(F.col("cvrNummer")).distinct(),(distinctKvartalDf["cvrNummer"]==distinctAarDf["cvrNummer"]),how="inner")
# .drop(distinctAarDf["cvrNummer"]).distinct().count()
#))
#print("Årsbeskæftigelse til månedsbeskæftigelse: "+str(distinctAarDf.select(F.col("cvrNummer")).distinct()
# .join(distinctMaanedDf.select(F.col("cvrNummer")).distinct(),(distinctMaanedDf["cvrNummer"]==distinctAarDf["cvrNummer"]),how="inner")
# .drop(distinctAarDf["cvrNummer"]).distinct().count()
#))
#print("Kvartalsbeskæftigelse til månedsbeskæftigelse: "+str(distinctKvartalDf.select(F.col("cvrNummer")).distinct()
# .join(distinctMaanedDf.select(F.col("cvrNummer")).distinct(),(distinctMaanedDf["cvrNummer"]==distinctKvartalDf["cvrNummer"]),how="inner")
# .drop(distinctAarDf["cvrNummer"]).distinct().count()
#))
AllThreeCount = (distinctAarDf
.select(F.col("cvrNummer")).distinct()
.join(distinctKvartalDf.select(F.col("cvrNummer")).distinct(),(distinctAarDf["cvrNummer"]==distinctKvartalDf["cvrNummer"]),how="inner")
.drop(distinctKvartalDf["cvrNummer"])
.join(distinctMaanedDf.select(F.col("cvrNummer")).distinct(),(distinctAarDf["cvrNummer"]==distinctMaanedDf["cvrNummer"]),how="inner")
.drop(distinctMaanedDf["cvrNummer"])
.distinct()
)
#print("Aarsbeskæftigelse til kvartalsbeskæftigelse til månedsbeskæftigelse: "+str(AllThreeCount.count()))
In [7]:
In [ ]:
In [ ]: