In [1]:
#this notebook generates Kapitals for cvrData
#Always Pyspark first!
ErhvervsPath = "/home/svanhmic/workspace/Python/Erhvervs"
from pyspark.sql import functions as F, Window, WindowSpec
from pyspark.sql import Row
from pyspark.sql.types import StringType,ArrayType,IntegerType,DoubleType,StructField,StructType
sc.addPyFile(ErhvervsPath+"/src/RegnSkabData/ImportRegnskabData.py")
sc.addPyFile(ErhvervsPath+'/src/RegnSkabData/RegnskabsClass.py')
sc.addPyFile(ErhvervsPath+'/src/cvr/GetNextJsonLayer.py')
import sys
import re
import os
import ImportRegnskabData
import GetNextJsonLayer
import itertools
import functools
from pandas.tools.plotting import scatter_matrix
cvrPath = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/parquet"
cvrfiles = os.listdir(cvrPath)
print(cvrfiles)
In [2]:
#import crv data
cvrDf = (sqlContext
.read
.parquet(cvrPath+"/"+cvrfiles[1])
)
#Extract all Aps and A/S companies
virkformCols = ("cvrNummer","virksomhedsform")
virkformDf = GetNextJsonLayer.createNextLayerTable(cvrDf.select(*virkformCols),[virkformCols[0]],virkformCols[1])
virkformDf = GetNextJsonLayer.expandSubCols(virkformDf,mainColumn="periode")
virkformDf = (virkformDf
.drop("sidstOpdateret")
.withColumn(col=F.col("periode_gyldigFra").cast("date"),colName="periode_gyldigFra")
.withColumn(col=F.col("periode_gyldigTil").cast("date"),colName="periode_gyldigTil")
)
#virkformDf.show(1)
checkCols = ["kortBeskrivelse","langBeskrivelse","virksomhedsformkode"]
#Consistencycheck is kortBeskrivelse and virksomhedsformkode always mapped the same way
#check1 = virkformDf.select(checkCols+["cvrNummer"]).distinct().groupby(*checkCols).count()
#check1.orderBy("kortBeskrivelse","count").show(check1.count(),truncate=False)
#Second test does any companies go from Aps or A/S to other or vice versa?
joinCols = ["cvrNummer","langBeskrivelse","rank"]
cvrCols = ["cvrNummer"]
gyldigCol = ["periode_gyldigFra"]
statusChangeWindow = (Window
.partitionBy(F.col(*cvrCols))
.orderBy(F.col("periode_gyldigFra").desc()))
#virkformDf.select(checkCols).distinct().show(50,truncate=False)
#Extract APS and AS here and latest status...
aggregationCols = [F.max(i) for i in gyldigCol]
groupsCol = [i for i in virkformDf.columns if i not in gyldigCol]
companyByAsApsDf = (virkformDf
.where((F.col("virksomhedsformkode") == 60) | (F.col("virksomhedsformkode") == 80))
.withColumn(col=F.rank().over(statusChangeWindow),colName="rank")
.filter(F.col("rank") == 1)
.cache()
)
companyByAsApsDf.write.parquet(cvrPath+"/AllApsAs.parquet",mode="overwrite")
#companyByAsApsDf.printSchema()
#compDf = companyByAsApsDf.groupBy(*companyByAsApsDf.columns).agg(F.last(F.col("rank")).over(rankWindow))
In [3]:
def pivotOnText(df,**kvargs):
'''
does the pivotation on text cols and removes the excess counts
input df - dataframe
kvargs - optional arguments included are:
pivotCol - specify column that shoould be pivotated, default type
valueCol - specify column that should be aggregated on, defalut vaerdi
expectedList - specify the values in the pivotated column, default ["KAPITAL"]
'''
#sets some of the optional parameters
pivotCol = kvargs.get("pivotCol","type")
expectedList = kvargs.get("expectedList",["KAPITAL"])
valueCol = kvargs.get("valueCol","vaerdi")
holdOutsCols = [pivotCol,valueCol]
nonHoldOutCols = [i for i in df.columns if i not in holdOutsCols]
newDf = (df
.groupBy(df.columns)
.count()
.groupBy(*nonHoldOutCols)
.pivot(pivotCol,expectedList)
.agg(F.max(F.struct("count",valueCol)))
)
expandedDf = GetNextJsonLayer.expandSubCols(newDf,*expectedList)
newCols = [i for i in expandedDf.columns if i not in [v+"_count" for v in expectedList] ]
return expandedDf.select(newCols)
In [4]:
#get Attributes data frame
attributDf = GetNextJsonLayer.createNextLayerTable(cvrDf,["cvrNummer"],"attributter")
orderedAttattributDf = attributDf.groupBy("type").count().orderBy(F.col("count").desc())
attributDf.registerTempTable("attribut")
extractedList = ("KAPITAL","KAPITALVALUTA","KAPITALKLASSER","KAPITAL_DELVIST")
#use sql commands to use filter
sqlExprs = "SELECT * FROM attribut WHERE "
for i in extractedList:
sqlExprs += ' type == "'+i+ '" OR'
filtAttributDf = sqlContext.sql(sqlExprs[0:-3])
vaerdi = "vaerdier"
filterdCols = [ i for i in filtAttributDf.columns if i not in (vaerdi)]
filtAttributDf = GetNextJsonLayer.createNextLayerTable(filtAttributDf,filterdCols,vaerdi)
filtAttributDf = GetNextJsonLayer.expandSubCols(filtAttributDf,"periode")
filtAttributDf.registerTempTable("filteredAttributes")
In [23]:
orderedCols = [F.col(i) for i in ("cvrNummer","type","periode_gyldigFra")]
#filtAttributDf.orderBy(*orderedCols).show()
#take a look at the entries with longer sequence numbers
(sqlContext
.sql("SELECT * FROM filteredAttributes AS l WHERE EXISTS (SELECT DISTINCT cvrNummer FROM filteredAttributes AS r WHERE sekvensnr >= 1 AND l.cvrNummer = r.cvrNummer)")
.orderBy(*orderedCols)
.show()
)
In [28]:
yearMonDf = (sqlContext
.createDataFrame([Row(aar=i,maaned=j,ts=str(i)+"-"+str(j)+"-1") for i in range(1997,2017) for j in range(1,13)])
.withColumn(col=F.unix_timestamp(F.col("ts").cast("date")),colName="ts")
)
yearMonDf.show()
In [20]:
#virkformDf.show(1)
checkCols = ["kortBeskrivelse","langBeskrivelse","virksomhedsformkode"]
#Consistencycheck is kortBeskrivelse and virksomhedsformkode always mapped the same way
#check1 = virkformDf.select(checkCols+["cvrNummer"]).distinct().groupby(*checkCols).count()
#check1.orderBy("kortBeskrivelse","count").show(check1.count(),truncate=False)
#Second test does any companies go from Aps or A/S to other or vice versa?
joinCols = ["cvrNummer","langBeskrivelse","rank"]
cvrCols = ["cvrNummer"]
gyldigCol = ["periode_gyldigFra"]
statusChangeWindow = (Window
.partitionBy(F.col(*cvrCols))
.orderBy(F.col("periode_gyldigFra").desc()))
mainKapitalDf = (pivotOnText(filtAttributDf.drop("vaerditype").drop("sidstOpdateret"),expectedList=extractedList)
.withColumnRenamed(existing="periode_gyldigFra",new="gyldigFra")
.withColumnRenamed(existing="periode_gyldigTil",new="gyldigTil")
.withColumn(col=F.col("gyldigFra").cast("date"),colName="gyldigFra")
.withColumn(col=F.col("gyldigTil").cast("date"),colName="gyldigTil")
.withColumn(col=F.coalesce(F.col("gyldigTil"),F.lit(F.current_date())),colName="gyldigTil")
.withColumn(col=F.datediff(F.col("gyldigTil"),F.col("gyldigFra")),colName="datediff")
.withColumn(col=F.unix_timestamp(F.col("gyldigFra")),colName="timeStampFra")
.withColumn(col=F.unix_timestamp(F.col("gyldigTil")),colName="timeStampTil")
.filter(F.col("sekvensnr")==0)
.drop(F.col("sekvensnr"))
.orderBy("cvrNummer")
)
mainKapitalDf.show()
In [33]:
joinedDf = yearMonDf.join(mainKapitalDf,(yearMonDf["ts"].between(F.col("timeStampFra"),F.col("timeStampTil"))),"inner")
joinedDf.write.parquet(path=cvrPath+"/KaptialDataFrame.parquet",mode="overwrite")
In [32]:
(joinedDf
.drop("ts")
.drop("timeStampFra")
.drop("timeStampTil")
.orderBy("cvrNummer","aar","maaned").show())
In [ ]: