Combine all data sources
This notebook should contain the first attempt at combining data elements from different tabels in cvr
In [1]:
#import sys
#sys.path.append("/home/svanhmic/workspace/Python/Erhvervs/src/cvr")
sc.addPyFile("/home/svanhmic/workspace/DABAI/ReadData/GetNextJsonLayer.py")
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.window import Window
import os
import GetNextJsonLayer # Custom
from functools import reduce
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
from datetime import datetime
import sys
In [2]:
parquetLocation = "/home/svanhmic/workspace/data/DABAI/sparkdata/parquet"
cvrData = "/home/svanhmic/workspace/data/DABAI/sparkdata/json"
In [3]:
#import data
cvrDf = sqlContext.read.json(cvrData+"/AlleDatavirksomheder.json")
cvrDf.printSchema()
In [4]:
areasCols = ["cvrNummer","aarsbeskaeftigelse","virksomhedsstatus","reklamebeskyttet","brancheAnsvarskode","attributter","penheder","hovedbranche","virksomhedsform","maanedsbeskaeftigelse","kvartalsbeskaeftigelse"]
allDataDf = cvrDf.select([F.col("virksomhed."+i) for i in areasCols]).cache()
allDataDf.write.parquet(mode="overwrite",path=parquetLocation+"/virkdata.parquet")
allDataDf.show()
allDataDf.printSchema()
In [5]:
secondLayerSchema = GetNextJsonLayer.getNextSchemaLayer(cvrDf.schema,"virksomhed")
#cvrDf.select(cvrDf["virksomhed.attributter.sekvensnr"]).show()
print(secondLayerSchema)
In [6]:
#CreateAarsbeskaeftigelses data frame
aarsbeskaeftigelseDf = GetNextJsonLayer.createNextLayerTable(allDataDf,["cvrNummer"],"aarsbeskaeftigelse")
#vaekstDf.show()
regexedVaekstDf = (aarsbeskaeftigelseDf
.select(aarsbeskaeftigelseDf["cvrNummer"]
,aarsbeskaeftigelseDf["aar"]
,F.split(F.regexp_extract("intervalKodeAntalAarsvaerk",r'(\d{1,4}_\d{1,4})',0),r"\_").alias("intervalKodeAntalAarsvaerk")
,F.split(F.regexp_extract("intervalKodeAntalAnsatte",r'(\d{1,4}_\d{1,4})',0),r"\_").alias("intervalKodeAntalAnsatte")
,F.split(F.regexp_extract("intervalKodeAntalInklusivEjere",r'(\d{1,4}_\d{1,4})',0),r"\_").alias("intervalKodeAntalInklusivEjere")
,aarsbeskaeftigelseDf["sidstOpdateret"])
)
#regexedVaekstDf.orderBy(["cvrNummer","aar"]).show()
#regexedVaekstDf.printSchema()
In [7]:
#model the number of aarsværk as a features 15 columns.
windowAarsRank = (Window.partitionBy(F.col("cvrNummer"))
.orderBy(F.col("aar").desc()))
aarsDf = (regexedVaekstDf
.select(regexedVaekstDf["cvrNummer"]
,regexedVaekstDf["aar"]
,regexedVaekstDf["intervalKodeAntalAarsvaerk"][0].cast("double").alias("AarsvaerkLowerBound"))
.withColumn(colName="rank",col=F.concat(F.lit("AarsVaerk_"),F.rank().over(windowAarsRank)))
.groupby("cvrNummer")
.pivot("rank",["AarsVaerk_"+str(i) for i in range(1,16)])
.sum("AarsvaerkLowerBound")
)
aarsDf.show()
In [8]:
#take a look at the average number of years a company is open.
allYears = (regexedVaekstDf
.select(regexedVaekstDf["cvrNummer"],regexedVaekstDf["aar"])
.groupby("cvrNummer")
.count()
.select(F.col("count"))
.collect())
npAllYears = np.array(list(map(lambda x: x["count"],allYears)))
print(npAllYears[:3])
In [9]:
n, bins, patches = plt.hist(npAllYears, 15, facecolor='green', alpha=0.75)
plt.show()
In [10]:
#model number of employees plus chief featured over 15 years
medarbejdsDf = (regexedVaekstDf
.select(regexedVaekstDf["cvrNummer"]
,regexedVaekstDf["aar"]
,regexedVaekstDf["intervalKodeAntalInklusivEjere"][0].cast("double").alias("AntalInklusivEjereLowerBound"))
.withColumn(colName="rank",col=F.concat(F.lit("medArb_"),F.rank().over(windowAarsRank)))
.groupby("cvrNummer")
.pivot("rank",["medArb_"+str(i) for i in range(1,16)])
.sum("AntalInklusivEjereLowerBound")
).cache()
#medarbejdsDf.show()
In [11]:
#Create misc dataframe with cvrnummer, reklamebeskyttelse, virksomhedsstatus
virksomhedsStatusDf = GetNextJsonLayer.createNextLayerTable(allDataDf,["cvrNummer"],"virksomhedsstatus")
exVirksomhedsStatusDf = GetNextJsonLayer.expandSubCols(virksomhedsStatusDf,"periode")
exVirksomhedsStatusDf.cache()
exVirksomhedsStatusDf.show()
In [12]:
reklameBeskytDf = GetNextJsonLayer.createNextLayerTable(allDataDf,["cvrNummer","reklamebeskyttet",],"virksomhedsstatus")
reklameBeskytDf = reklameBeskytDf.drop("periode").drop("status").drop("sidstOpdateret")
#reklameBeskytDf.show()
In [13]:
exVirksomhedsStatusDf.select(F.lower(F.col("status")).alias("status")).groupby("status").count().orderBy(F.col("count").desc()).show(truncate=False)
In [14]:
statusTransactionDf = exVirksomhedsStatusDf.groupBy("cvrNummer").count()#.orderBy(F.col("count").desc())
#statusTransactionDf.show()
#Show transaction distribution
transactionCol = np.array(list(map(lambda x: float(x[1]),statusTransactionDf.collect())))
n, bins, patches = plt.hist(transactionCol, 11, facecolor='green', alpha=0.75)
plt.show()
(statusTransactionDf
.select(F.col("count").alias("statusCount"))
.groupBy()
.mean("statusCount")
.show())
In [ ]:
In [15]:
#Determine causes of action for companies
windowRankStatus = (Window
.partitionBy(F.col("cvrNummer"))
.orderBy(F.col("periode_gyldigFra").desc())
)
handlingsForlobDf = (exVirksomhedsStatusDf
.withColumn(colName="statusRank",col=F.rank().over(windowRankStatus))
.filter(F.col("statusRank") <= 4)
.select(F.col("cvrNummer"),F.col("status"),F.col("statusRank"))
.rdd
.map(lambda x: (x["cvrNummer"],[(x["status"],x["statusRank"])]))
.reduceByKey(lambda x,y: x+y)
.map(lambda x: (x[0],[v[0] for v in x[1]],"".join(map(lambda y: y[0]+str(y[1]),x[1]))))
.toDF(["cvrNummer","status","statHash"])
.select(F.col("cvrNummer"), F.col("status"),F.hash(F.col("statHash")).alias("statusHash"))
)
#handlingsForlobDf.show(truncate=False)
In [16]:
#create a list of causes that can be used as labels
def checkLabels(stringVal):
zeroLabels = ["NORMAL","TVANGSOPLØST","UNDER FRIVILLIG LIKVIDATION","OPLØST EFTER ERKLÆRING","OPLØST EFTER FRIVILLIG LIKVIDATION"
,"OPLØST EFTER FUSION","OPLØST EFTER SPALTNING"]
oneLabels = ["UNDER KONKURS","OPLØST EFTER KONKURS"]
miscLabels = ["SLETTET"
,"UNDER RETSVIRKNING"
,"UNDER REASSUMERING"
,"UNDER REKONSTRUKTION"
,"AKTIV"
,"UNDER REASUMMERING"
,"OPLØST"
,"SLETTES"
,"OPLØST EFTER TVANGSOPLØSNING"
,"UNDER REASUMMATION"]
if stringVal in oneLabels:
return 1.0
elif stringVal in zeroLabels:
return 0.0
else:
return 2.0
broadcastedHandlingsForlob = (handlingsForlobDf#.withColumn(col=F.co,colName="label")
.drop(F.col("cvrNummer"))
.distinct()
.rdd
.map(lambda x: Row(label=checkLabels(x["status"][0]),statusHash=x["statusHash"]))
.toDF(["label","statusHash"])
)
#broadcastedHandlingsForlob.show()
In [17]:
#show the distribution of the causes of action
groupedHandlingerDf = (handlingsForlobDf
.groupBy(F.col("statusHash"))
.count())
(groupedHandlingerDf
.join(broadcastedHandlingsForlob,broadcastedHandlingsForlob["statusHash"]==groupedHandlingerDf["statusHash"])
.drop(groupedHandlingerDf["statusHash"])
.orderBy(F.col("count").desc())
#.show(truncate=False)
)
Out[17]:
In [18]:
#create a list of causes that can be used as labels status' are listed as latest first in the array
labelStatusDf = (handlingsForlobDf
.join(broadcastedHandlingsForlob,broadcastedHandlingsForlob["statusHash"]==handlingsForlobDf["statusHash"])
.drop(broadcastedHandlingsForlob["statusHash"]).drop(handlingsForlobDf["statusHash"])
)
#labelStatusDf.show()
#for v in broadcastedHandlingsForlob.take(5):
# print(v)
In [19]:
#Create attributes Dataframe
attributesDf = GetNextJsonLayer.createNextLayerTable(allDataDf,["cvrNummer"],"attributter")
#attributesDf.show(10)
#print(attributesDf.schema["cvrNummer"])
#get column names
attributesCols = attributesDf.columns
attributesCols.remove("vaerdier")
attributesWithValueDf = GetNextJsonLayer.createNextLayerTable(attributesDf,attributesCols,"vaerdier")
#attributesWithValueDf.show(10)
attributesWithValueAndPeriodDf = GetNextJsonLayer.expandSubCols(attributesWithValueDf,"periode")
#attributesWithValueAndPeriodDf.show()
valCols = ["cvrNummer","vaerdi","periode_gyldigFra","periode_gyldigTil"]
kapitalFromAttributesDf = (attributesWithValueAndPeriodDf
.filter(F.col("type") == "KAPITAL")
.select([F.col(x) for x in valCols]+[F.datediff(F.col("periode_gyldigTil"),F.col("periode_gyldigFra")).alias("varighed")])
).cache()
#kapitalFromAttributesDf.orderBy([F.col("cvrNummer"),F.col("gyldigFra")]).show()
In [20]:
formalDF = attributesDf.select("type").groupby("type").count().orderBy(F.col("count").desc())
formalDF.show(40,truncate=False)
formalDF.printSchema
Out[20]:
In [21]:
#Export cvr and names to parque-file
companyAtributDf = attributesWithValueAndPeriodDf.filter(F.col("type") == "NAVN_IDENTITET")
companyAtributDf.write.parquet(mode="overwrite",path=parquetLocation+"/companyCvrData")
In [22]:
#generate x,y dataset
vaerdiListRdd = (kapitalFromAttributesDf
.select(F.col("cvrNummer"),F.col("vaerdi").cast("double"),F.col("varighed"))
.rdd
.map(lambda x: (x["cvrNummer"],([x["vaerdi"]],[x["varighed"]]))))
In [23]:
reduceVaerdiListRdd = (vaerdiListRdd
.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
.map(lambda x: (x[0],x[1][0],x[1][1])).toDF(["cvrNummer","vaerdier","varigheder"])
)
#reduceVaerdiListRdd.show()
In [24]:
lenUdf = F.udf(lambda x: len(x),IntegerType())
allNumberOfEntries = reduceVaerdiListRdd.select(F.col("cvrNummer"),lenUdf(F.col("vaerdier")).alias("entries"))
entriesCol = np.array(list(map(lambda x: x[1],allNumberOfEntries.collect())))
n, bins, patches = plt.hist(entriesCol, 30, facecolor='green', alpha=0.75)
plt.yscale("log")
plt.xlim([0,100])
plt.show()
print(np.mean(entriesCol))
In [25]:
#compute slope e.g. delta = (vaerdi_j - vaerdi_i)/(varighed_j-varighed_i) and get latest 7 slope values
windowSlope = (Window
.partitionBy(kapitalFromAttributesDf["cvrNummer"])
.orderBy(kapitalFromAttributesDf["periode_gyldigFra"].asc())
)
windowSlope.rowsBetween(0,1)
windowRank = (Window
.partitionBy(kapitalFromAttributesDf["cvrNummer"])
.orderBy(
kapitalFromAttributesDf["periode_gyldigFra"].desc()))
windowRank.rowsBetween(-sys.maxsize,sys.maxsize)
valsCol = ["cvrNummer"]
vaerdiWithSlopeDf = (kapitalFromAttributesDf
.withColumn(colName="slope",col=( F.lead("vaerdi").over(windowSlope)-F.col("vaerdi"))/ F.datediff(F.col("periode_gyldigTil"),F.col("periode_gyldigFra")))
.withColumn(colName="rank",col=F.rank().over(windowRank))
.filter(F.col("rank") <= 8)
#.withColumn(colName="inversRank",col=(7-F.col("rank")+1))
#.drop(F.col("rank"))
#.withColumnRenamed("inversRank","rank")
.select(F.col("cvrNummer"),F.col("slope"),F.concat(F.lit("rank_"),F.col("rank")).alias("rank"))
.groupby(*valsCol)
.pivot("rank",["rank_"+str(i) for i in range(1,9)])
.sum("slope")
.drop("rank_1")
.select(valsCol+[F.col("rank_"+str(i)).alias("rank_"+str(i-1)) for i in range(2,9)])
).cache()
vaerdiWithSlopeDf.show()
In [32]:
import numpy as np
np.abs([-1])
Out[32]:
In [26]:
#Create p-enheder dataframe create count as well
rawpEnhedDf = GetNextJsonLayer.createNextLayerTable(allDataDf,["cvrNummer"],"penheder")
exPEnhedDf = GetNextJsonLayer.expandSubCols(rawpEnhedDf,"periode")
#exPEnhedDf.orderBy("cvrNummer").show()
aggregates = [F.mean,F.sum]
datas = ["aabneEnheder","lukketEnheder","varighed"]
groups = ["cvrNummer"]
exprs = [f(d) for f in aggregates for d in datas]
#print(exprs)
pEnhedDf = (exPEnhedDf.select(F.col("cvrNummer")
,F.col("pNummer"),F.col("periode_gyldigTil"),F.col("periode_gyldigFra")
,F.datediff(F.col("periode_gyldigTil"),F.col("periode_gyldigFra")).alias("varighed")
,F.when(F.isnull(F.col("periode_gyldigTil")),0).otherwise(1).alias("aabneEnheder"),
F.when(F.isnull(F.col("periode_gyldigTil")),1).otherwise(0).alias("lukketEnheder"))
.groupby(*groups)
.agg(*exprs)
.drop(F.col("sum(varighed)"))
).cache()
pEnhedDf.show()
In [27]:
#add virksomhedstype
takeOuts = ["cvrNummer","virksomhedsform"]
fDf = allDataDf.select(*takeOuts)
formDf = GetNextJsonLayer.createNextLayerTable(fDf,takeOuts[:1],takeOuts[1])
formDf = (GetNextJsonLayer
.expandSubCols(formDf,"periode")
.withColumn(col=F.unix_timestamp(F.col("periode_gyldigFra"), format='yyyy-MM-dd'),colName="periode_gyldigFra")
.withColumn(col=F.unix_timestamp(F.col("periode_gyldigTil"), format='yyyy-MM-dd'),colName="periode_gyldigTil")
)
outputCols = ["cvrNummer","kortBeskrivelse"]
#formDf.show()
##formDf.printSchema()
latestsStatusDf = formDf.groupBy(*outputCols).max("periode_gyldigFra").drop("max(periode_gyldigFra)")
latestsStatusDf.show()
In [28]:
#combine all sub data frames together.
compiledDf = (labelStatusDf
.join(aarsDf,aarsDf["cvrNummer"]==labelStatusDf["cvrNummer"],"inner")
.drop(aarsDf["cvrNummer"]) #Aarsværker
.join(medarbejdsDf,labelStatusDf["cvrNummer"]==medarbejdsDf["cvrNummer"],"inner")
.drop(medarbejdsDf["cvrNummer"]) #Medarbejder
.join(pEnhedDf,pEnhedDf["cvrNummer"]==labelStatusDf["cvrNummer"],"inner")
.drop(pEnhedDf["cvrNummer"])#Penheder
.join(vaerdiWithSlopeDf,vaerdiWithSlopeDf["cvrNummer"]==labelStatusDf["cvrNummer"],"inner")
.drop(vaerdiWithSlopeDf["cvrNummer"])
.join(reklameBeskytDf,reklameBeskytDf["cvrNummer"]==labelStatusDf["cvrNummer"],"inner")
.drop(reklameBeskytDf["cvrNummer"])#reklamebeskyttelse osv.
.join(latestsStatusDf,latestsStatusDf["cvrNummer"]==labelStatusDf["cvrNummer"],"inner")
.drop(latestsStatusDf["cvrNummer"])
.cache() #kapitaler
)
compiledDf.show()
In [29]:
#rename columns and write to parque-file
renamedCompiledDf = (compiledDf
.drop("avg(lukketEnheder)")
.drop("avg(aabneEnheder)")
.withColumnRenamed(existing="avg(varighed)",new="avgVarighed")
.withColumnRenamed(existing="sum(aabneEnheder)",new="totalAabneEnheder")
.withColumnRenamed(existing="sum(lukketEnheder)",new="totalLukketEnheder"))
renamedCompiledDf.first()
renamedCompiledDf.write.parquet(mode="overwrite",path=parquetLocation+"/featureDataCvr.parquet")
dateDiff = (datetime(2100,1,1)-datetime(1900,1,1)).days
#print(dateDiff)
In [30]:
filledCols = ["AarsVaerk_1","AarsVaerk_2","AarsVaerk_3","AarsVaerk_4","AarsVaerk_5","AarsVaerk_6","AarsVaerk_7",
"AarsVaerk_8","AarsVaerk_9","AarsVaerk_10","AarsVaerk_11","AarsVaerk_12","AarsVaerk_13","AarsVaerk_14",
"AarsVaerk_15","medArb_1","medArb_2","medArb_3","medArb_4","medArb_5","medArb_6","medArb_7","medArb_8",
"medArb_9","medArb_10","medArb_11","medArb_12","medArb_13","medArb_14","medArb_15","rank_1","rank_2"
,"rank_3","rank_4","rank_5","rank_6","rank_7"]
noCvrCols = compiledDf.columns
noCvrCols.remove("cvrNummer")
noCvrCols.remove("status")
noCvrCols.remove("label")
#smallestValueDf = compiledDf.select([F.min(F.col(i)).alias(i) for i in noCvrCols])
#smallestValDict = sc.broadcast(smallestValueDf.collect()[0].asDict())
print(smallestValDict.value)
#filledAndShiftedCompiledDf = (compiledDf
# .na
# .fill(value=0.0,subset=filledCols)
# .na
# .fill(value=dateDiff,subset=["avg(varighed)"])
# .drop("status")
# #.select([F.col("cvrNummer"),F.col("label")]+[F.log1p(F.col(i)-F.lit(smallestValDict.value[i])).alias(i) for i in noCvrCols])
# )
#filledAndShiftedCompiledDf.show()
In [ ]:
In [ ]:
In [ ]:
In [ ]: