In [1]:
#Always Pyspark first!
ErhvervsPath = "/home/svanhmic/workspace/DABAI"
cvrPath = "/home/svanhmic/workspace/data/parquet"
from pyspark.sql import functions as F, Window, WindowSpec
from pyspark.sql import Row
from pyspark.sql.types import StringType,ArrayType,IntegerType,DoubleType,StructField,StructType
sc.addPyFile(ErhvervsPath+"/RegnSkabData/ImportRegnskabData.py")
sc.addPyFile(ErhvervsPath+'/RegnSkabData/RegnskabsClass.py')
sc.addPyFile(ErhvervsPath+'/ReadData/GetNextJsonLayer.py')
import sys
import re
import os
import ImportRegnskabData
import GetNextJsonLayer
import itertools
import functools
In [2]:
def createManPowerTabel(aarsDf,kvartDf,maanedDf):
'''
Combines medarbejdstabels to one tabel and makes sure that all is sampel once a month
Input:
Output:
'''
def assignPrefix(df,prefix):
excluded = ["langBeskrivelse","virksomhedsformkode","periode_gyldigFra","periode_gyldigTil","rank"]
return df.select([F.col(i).alias(prefix+i) for i in df.columns if i not in excluded])
def assignKvartLowerBound(col):
if col == 1:
return 1
elif col == 2:
return 4
elif col == 3:
return 7
else:
return 10
def assignKvartUpperBound(col):
if col == 1:
return 3
elif col == 2:
return 6
elif col == 3:
return 9
else:
return 12
nameCols = ["final_cvrNummer"
,"aar"
,"maaned"
,"final_lower_intervalKodeAntalAarsvaerk"
,"final_lower_intervalKodeAntalAnsatte"
,"final_kortBeskrivelse"]
#what we want to get is as dataframe that looks like this
#| cvr | year | month | x | y | aps/as|
# where years below 2015 are currently represented as year or qarter data.
attributesCols = ["aar"]
minMaxCols = [F.min,F.max]
combinedCols = [f(i) for i in attributesCols for f in minMaxCols]
minYear = aarsDf.groupBy().agg(*combinedCols).collect()[0]
years = [Row(aar=i,maaned=j) for i in range(minYear[0],minYear[1]+1) for j in range(1,13)]
lowerUdf = F.udf(lambda x: float(assignKvartLowerBound(x)),DoubleType())
upperUdf = F.udf(lambda x: float(assignKvartUpperBound(x)),DoubleType())
aDf = (assignPrefix(aarsDf,"aar_")
.withColumnRenamed(existing="aar_lower_intervalKodeAntalInklusivEjere",new="aar_lower_intervalKodeAntalAnsatte")
.drop("aar_ansvarligDataleverandoer")
.repartition("aar_aar")
)
kDf = (assignPrefix(kvartDf,"kvart_")
.withColumn(col=lowerUdf("kvart_kvartal"),colName ="l")
.withColumn(col=upperUdf("kvart_kvartal"),colName ="u")
.repartition("kvart_aar"))
#aDf.show()
mDf = (maanedDf
.select([F.col(re.sub("final_","",c)).alias(nameCols[i]) for i,c in enumerate(nameCols)]))
yearsAndMonthDf = sqlContext.createDataFrame(years).repartition("aar")
secondCols = [kDf.columns[0]]+yearsAndMonthDf.columns+kDf.columns[3:5]+[kDf.columns[6]]
secondJoinDf = (yearsAndMonthDf
.join(kDf,((yearsAndMonthDf["aar"] == kDf["kvart_aar"])
& ( yearsAndMonthDf["maaned"].between(kDf["l"],kDf["u"]) )),how="left")
.select([F.col(c).alias(nameCols[i]) for i,c in enumerate(secondCols)])
)
#secondJoinDf.orderBy(F.col("final_cvrNummer").desc()).show()
coalseSecondCol = [secondJoinDf.columns[0]]+secondJoinDf.columns[3:]
aCoalseCol = [aDf.columns[0]]+aDf.columns[2:]
combinedCols = list(zip(coalseSecondCol,aCoalseCol))
#print(combinedCols)
actionCols = [F.coalesce(combinedCols[0][0],combinedCols[0][1]).alias(combinedCols[0][0])]+yearsAndMonthDf.columns+[F.coalesce(i,j).alias(i) for i,j in combinedCols[1:]]
#print(actionCols)
lastJoinDf = (secondJoinDf
.join(aDf,((secondJoinDf["aar"] == aDf["aar_aar"])
& (secondJoinDf["final_cvrNummer"] == None)),how="left")
.select(actionCols)
)
#lastJoinDf.show()
return (lastJoinDf
.na
.drop(how="all",subset=["final_cvrNummer"])
# remove all that don't have any cvr-nummber
) #extract only relevant
In [3]:
dfA = sqlContext.read.parquet(cvrPath+"/AarsVaerker.parquet")
dfK = sqlContext.read.parquet(cvrPath+"/KvartalsVaerker.parquet")
dfM = sqlContext.read.parquet(cvrPath+"/MaanedsVaerker.parquet")
In [4]:
#xDf = createManPowerTabel(dfA,dfK,dfM)
#xDf.write.parquet(mode="overwrite",path="/home/svanhmic/workspace/Python/Erhvervs/data/cdata/TotalAarsVaerker.parquet")
In [5]:
def assignPrefix(df,prefix):
excluded = ["langBeskrivelse","virksomhedsformkode","periode_gyldigFra","periode_gyldigTil","rank"]
return df.select([F.col(i).alias(prefix+i) for i in df.columns if i not in excluded])
def assignKvartLowerBound(col):
if col == 1:
return 1
elif col == 2:
return 4
elif col == 3:
return 7
else:
return 10
def assignKvartUpperBound(col):
if col == 1:
return 3
elif col == 2:
return 6
elif col == 3:
return 9
else:
return 12
In [ ]:
In [6]:
def createYearTabel(df):
attributesCols = ["aar"]
minMaxCols = [F.min,F.max]
combinedCols = [f(i) for i in attributesCols for f in minMaxCols]
minYear = df.groupBy().agg(*combinedCols).collect()[0]
years = [Row(aar=i,maaned=j) for i in range(minYear[0],minYear[1]+1) for j in range(1,13)]
yearDf = sqlContext.createDataFrame(years)
return yearDf
In [7]:
lowerUdf = F.udf(lambda x: float(assignKvartLowerBound(x)),DoubleType())
upperUdf = F.udf(lambda x: float(assignKvartUpperBound(x)),DoubleType())
y = createYearTabel(dfK)
kCols = ["cvrNummer","aar","maaned","lower_intervalKodeAntalAarsvaerk","lower_intervalKodeAntalAnsatte","kortBeskrivelse"]
expandKDf = (y
.join(dfK
.withColumn(col=lowerUdf("kvartal"),colName="l")
.withColumn(col=upperUdf("kvartal"),colName="u")
,((y["aar"] == dfK["aar"]) & (y["maaned"].between(F.col("l"),F.col("u"))))
,"inner"
)
.drop(dfK["aar"])
.drop(dfK["kvartal"])
.select(kCols)
)
expandKDf.show()
expandKDf.count()
Out[7]:
In [8]:
x = createYearTabel(dfA)
cols = ["cvrNummer","aar","maaned","lower_intervalKodeAntalAarsvaerk","lower_intervalKodeAntalInklusivEjere","kortBeskrivelse"]
expandedADf = (x
.join(dfA,(x["aar"] == dfA["aar"] ),"left")
.drop(dfA["aar"])
.select(cols)
)
expandedADf.show()
In [9]:
mCols= ["cvrNummer","aar","maaned","lower_intervalKodeAntalAarsvaerk","lower_intervalKodeAntalAnsatte","kortBeskrivelse"]
MaanedsDf = dfM.select(mCols)
In [10]:
kvartCvrDf = expandKDf.select("cvrNummer","aar","maaned")
aarCvrDf = expandedADf.select("cvrNummer","aar","maaned")
In [11]:
onlyInAar = aarCvrDf.subtract(kvartCvrDf)
onlyInDataDf = (onlyInAar
.join(expandedADf,((onlyInAar["cvrNummer"] == expandedADf["cvrNummer"])
& (onlyInAar["aar"] == expandedADf["aar"])
& (onlyInAar["maaned"] == expandedADf["maaned"])),"inner")
.drop(expandedADf["cvrNummer"])
.drop(expandedADf["aar"])
.drop(expandedADf["maaned"])
)
In [12]:
onlyInDataDf.show()
In [13]:
combineAllKADf = expandKDf.unionAll(onlyInDataDf)
combineAllKAMDf = combineAllKADf.unionAll(MaanedsDf)
In [14]:
combineAllKAMDf.show()
In [16]:
combineAllKADf.count()
Out[16]:
In [17]:
combineAllKAMDf.count()
Out[17]:
In [18]:
combineAllKAMDf.write.parquet(cvrPath+"/TotalAarsVaerker.parquet",mode="overwrite")
In [ ]: