In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as F
from functools import reduce
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
#Python imports
import sys
sys.path.append("/home/svanhmic/workspace/Python/Erhvervs/src/cvr")
import GetNextJsonLayer
import os
path = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata"
#paths
In [2]:
#%ls /home/svanhmic/workspace/Python/Erhvervs/data/cdata
%env
#%pdb
from graphframes import graphframe
In [ ]:
In [3]:
df = sqlContext.read.json(path+"/AlleDeltager.json")
deltagerCols = GetNextJsonLayer.getNextSchemaLayer(df.schema,"deltager")
In [4]:
deltagerCols
df.printSchema()
root
|-- deltager: struct (nullable = true)
| |-- attributter: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- sekvensnr: long (nullable = true)
| | | |-- type: string (nullable = true)
| | | |-- vaerdier: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- periode: struct (nullable = true)
| | | | | | |-- gyldigFra: string (nullable = true)
| | | | | | |-- gyldigTil: string (nullable = true)
| | | | | |-- sidstOpdateret: string (nullable = true)
| | | | | |-- vaerdi: string (nullable = true)
| | | |-- vaerditype: string (nullable = true)
| |-- beliggenhedsadresse: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- bogstavFra: string (nullable = true)
| | | |-- bogstavTil: string (nullable = true)
| | | |-- bynavn: string (nullable = true)
| | | |-- conavn: string (nullable = true)
| | | |-- etage: string (nullable = true)
| | | |-- fritekst: string (nullable = true)
| | | |-- husnummerFra: long (nullable = true)
| | | |-- husnummerTil: long (nullable = true)
| | | |-- kommune: struct (nullable = true)
| | | | |-- kommuneKode: long (nullable = true)
| | | | |-- kommuneNavn: string (nullable = true)
| | | | |-- periode: struct (nullable = true)
| | | | | |-- gyldigFra: string (nullable = true)
| | | | | |-- gyldigTil: string (nullable = true)
| | | | |-- sidstOpdateret: string (nullable = true)
| | | |-- landekode: string (nullable = true)
| | | |-- periode: struct (nullable = true)
| | | | |-- gyldigFra: string (nullable = true)
| | | | |-- gyldigTil: string (nullable = true)
| | | |-- postboks: string (nullable = true)
| | | |-- postdistrikt: string (nullable = true)
| | | |-- postnummer: long (nullable = true)
| | | |-- sidedoer: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| | | |-- vejkode: long (nullable = true)
| | | |-- vejnavn: string (nullable = true)
| |-- dataAdgang: long (nullable = true)
| |-- deltagerpersonMetadata: struct (nullable = true)
| | |-- nyesteBeliggenhedsadresse: struct (nullable = true)
| | | |-- bogstavFra: string (nullable = true)
| | | |-- bogstavTil: string (nullable = true)
| | | |-- bynavn: string (nullable = true)
| | | |-- conavn: string (nullable = true)
| | | |-- etage: string (nullable = true)
| | | |-- fritekst: string (nullable = true)
| | | |-- husnummerFra: long (nullable = true)
| | | |-- husnummerTil: long (nullable = true)
| | | |-- kommune: struct (nullable = true)
| | | | |-- kommuneKode: long (nullable = true)
| | | | |-- kommuneNavn: string (nullable = true)
| | | | |-- periode: struct (nullable = true)
| | | | | |-- gyldigFra: string (nullable = true)
| | | | |-- sidstOpdateret: string (nullable = true)
| | | |-- landekode: string (nullable = true)
| | | |-- periode: struct (nullable = true)
| | | | |-- gyldigFra: string (nullable = true)
| | | | |-- gyldigTil: string (nullable = true)
| | | |-- postboks: string (nullable = true)
| | | |-- postdistrikt: string (nullable = true)
| | | |-- postnummer: long (nullable = true)
| | | |-- sidedoer: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| | | |-- vejkode: long (nullable = true)
| | | |-- vejnavn: string (nullable = true)
| | |-- nyesteKontaktoplysninger: array (nullable = true)
| | | |-- element: string (containsNull = true)
| |-- elektroniskPost: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- hemmelig: boolean (nullable = true)
| | | |-- kontaktoplysning: string (nullable = true)
| | | |-- periode: struct (nullable = true)
| | | | |-- gyldigFra: string (nullable = true)
| | | | |-- gyldigTil: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| |-- enhedsNummer: long (nullable = true)
| |-- enhedstype: string (nullable = true)
| |-- fejlRegistreret: boolean (nullable = true)
| |-- fejlVedIndlaesning: boolean (nullable = true)
| |-- forretningsnoegle: long (nullable = true)
| |-- naermesteFremtidigeDato: string (nullable = true)
| |-- navne: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- navn: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| |-- postadresse: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- conavn: string (nullable = true)
| | | |-- fritekst: string (nullable = true)
| | | |-- landekode: string (nullable = true)
| | | |-- periode: struct (nullable = true)
| | | | |-- gyldigFra: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| |-- samtId: long (nullable = true)
| |-- sidstIndlaest: string (nullable = true)
| |-- sidstOpdateret: string (nullable = true)
| |-- stilling: string (nullable = true)
| |-- telefaxNummer: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- telefonNummer: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- hemmelig: boolean (nullable = true)
| | | |-- kontaktoplysning: string (nullable = true)
| | | |-- periode: struct (nullable = true)
| | | | |-- gyldigFra: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| |-- virkningsAktoer: string (nullable = true)
| |-- virksomhedSummariskRelation: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- organisationer: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- attributter: array (nullable = true)
| | | | | | |-- element: struct (containsNull = true)
| | | | | | | |-- sekvensnr: long (nullable = true)
| | | | | | | |-- type: string (nullable = true)
| | | | | | | |-- vaerdier: array (nullable = true)
| | | | | | | | |-- element: struct (containsNull = true)
| | | | | | | | | |-- periode: struct (nullable = true)
| | | | | | | | | | |-- gyldigFra: string (nullable = true)
| | | | | | | | | | |-- gyldigTil: string (nullable = true)
| | | | | | | | | |-- sidstOpdateret: string (nullable = true)
| | | | | | | | | |-- vaerdi: string (nullable = true)
| | | | | | | |-- vaerditype: string (nullable = true)
| | | | | |-- enhedsNummerOrganisation: long (nullable = true)
| | | | | |-- hovedtype: string (nullable = true)
| | | | | |-- medlemsData: array (nullable = true)
| | | | | | |-- element: struct (containsNull = true)
| | | | | | | |-- attributter: array (nullable = true)
| | | | | | | | |-- element: struct (containsNull = true)
| | | | | | | | | |-- sekvensnr: long (nullable = true)
| | | | | | | | | |-- type: string (nullable = true)
| | | | | | | | | |-- vaerdier: array (nullable = true)
| | | | | | | | | | |-- element: struct (containsNull = true)
| | | | | | | | | | | |-- periode: struct (nullable = true)
| | | | | | | | | | | | |-- gyldigFra: string (nullable = true)
| | | | | | | | | | | | |-- gyldigTil: string (nullable = true)
| | | | | | | | | | | |-- sidstOpdateret: string (nullable = true)
| | | | | | | | | | | |-- vaerdi: string (nullable = true)
| | | | | | | | | |-- vaerditype: string (nullable = true)
| | | | | |-- organisationsNavn: array (nullable = true)
| | | | | | |-- element: struct (containsNull = true)
| | | | | | | |-- navn: string (nullable = true)
| | | | | | | |-- periode: struct (nullable = true)
| | | | | | | | |-- gyldigFra: string (nullable = true)
| | | | | | | | |-- gyldigTil: string (nullable = true)
| | | | | | | |-- sidstOpdateret: string (nullable = true)
| | | |-- virksomhed: struct (nullable = true)
| | | | |-- cvrNummer: long (nullable = true)
| | | | |-- enhedsNummer: long (nullable = true)
| | | | |-- enhedstype: string (nullable = true)
| | | | |-- fejlRegistreret: boolean (nullable = true)
| | | | |-- livsforloeb: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- periode: struct (nullable = true)
| | | | | | | |-- gyldigFra: string (nullable = true)
| | | | | | | |-- gyldigTil: string (nullable = true)
| | | | | | |-- sidstOpdateret: string (nullable = true)
| | | | |-- navne: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- navn: string (nullable = true)
| | | | | | |-- periode: struct (nullable = true)
| | | | | | | |-- gyldigFra: string (nullable = true)
| | | | | | | |-- gyldigTil: string (nullable = true)
| | | | | | |-- sidstOpdateret: string (nullable = true)
| | | | |-- regNummer: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- periode: struct (nullable = true)
| | | | | | | |-- gyldigFra: string (nullable = true)
| | | | | | | |-- gyldigTil: string (nullable = true)
| | | | | | |-- regnummer: string (nullable = true)
| | | | | | |-- sidstOpdateret: string (nullable = true)
| | | | |-- sidstIndlaest: string (nullable = true)
| | | | |-- status: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- kreditoplysningkode: long (nullable = true)
| | | | | | |-- periode: struct (nullable = true)
| | | | | | | |-- gyldigFra: string (nullable = true)
| | | | | | | |-- gyldigTil: string (nullable = true)
| | | | | | |-- sidstOpdateret: string (nullable = true)
| | | | | | |-- statuskode: long (nullable = true)
| | | | |-- virksomhedsform: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- ansvarligDataleverandoer: string (nullable = true)
| | | | | | |-- kortBeskrivelse: string (nullable = true)
| | | | | | |-- langBeskrivelse: string (nullable = true)
| | | | | | |-- periode: struct (nullable = true)
| | | | | | | |-- gyldigFra: string (nullable = true)
| | | | | | | |-- gyldigTil: string (nullable = true)
| | | | | | |-- sidstOpdateret: string (nullable = true)
| | | | | | |-- virksomhedsformkode: long (nullable = true)
| | | | |-- virksomhedsstatus: array (nullable = true)
| | | | | |-- element: struct (containsNull = true)
| | | | | | |-- periode: struct (nullable = true)
| | | | | | | |-- gyldigFra: string (nullable = true)
| | | | | | | |-- gyldigTil: string (nullable = true)
| | | | | | |-- sidstOpdateret: string (nullable = true)
| | | | | | |-- status: string (nullable = true)
In [5]:
deltagerAttributterDf = GetNextJsonLayer.createNextLayerTable(df,["enhedsNummer","enhedstype","navne"],"attributter","deltager")
holdOutCol = ["vaerdier"]
vaerdiCols = [i for i in deltagerAttributterDf.columns if i not in holdOutCol]
deltagerAttributterWithValuesDf = (GetNextJsonLayer
.createNextLayerTable(deltagerAttributterDf,vaerdiCols,holdOutCol[0])
.drop("periode")
.drop("sidstOpdateret")
)
del holdOutCol[0]
holdOutCol.append("navne")
del vaerdiCols
vaerdiCols = [i for i in deltagerAttributterWithValuesDf.columns if i not in holdOutCol]
withNamesAttributsDf = (GetNextJsonLayer
.createNextLayerTable(deltagerAttributterWithValuesDf,vaerdiCols,holdOutCol[0])
.drop("sidstOpdateret")
)
#deltagerAttWithValsAndPeriodDf = GetNextJsonLayer.expandSubCols(df=deltagerAttributterWithValuesDf,mainColumn="periode")
#deltagerAttributterDf.columns
In [6]:
#withNamesAttributsDf.show(truncate=False)
In [7]:
holdtoCols = ["enhedsNummer","enhedstype","navne","stilling"]
delVirkRelDf = (GetNextJsonLayer
.createNextLayerTable(df,holdtoCols,"virksomhedSummariskRelation","deltager")
.withColumnRenamed(existing="enhedsNummer",new="personEnhedsNummer")
.withColumnRenamed(existing="enhedstype",new="personEnhedstype")
.withColumnRenamed(existing="stilling",new="personStilling")
.withColumnRenamed(existing="navne",new="personNavne")
.drop("organisationer")
)
In [8]:
delVirkRelDf.show(20,truncate=False)
delVirkRelDf.printSchema()

|personEnhedsNummer|personEnhedstype|personNavne |personStilling|virksomhed |

|4000201845 |PERSON |[[Ulrik Sindal Sørensen,null]] |null |[14222979,4000873375,VIRKSOMHED,false,WrappedArray([[1990-06-01,2011-12-31],1999-10-16T17:49:52.000+02:00]),WrappedArray([ULRIKS VINDUESPOLERING OG RENGØRING V/ULRIK SINDAL SØRENSEN,[1990-06-01,2011-12-31],2013-11-22T19:12:17.000+01:00]),WrappedArray(),2015-06-24T13:00:21.307+02:00,WrappedArray(),WrappedArray([T&S,ENK,Enkeltmandsvirksomhed,[1990-06-01,2011-12-31],2013-11-22T19:00:13.000+01:00,10]),WrappedArray()] |
|4004119836 |PERSON |[[Munte Malou Tina Herløv Cordrey,null]]|Kasserer |[27733123,4001457636,VIRKSOMHED,false,WrappedArray([[2004-05-06,2009-08-17],2004-05-11T10:55:40.000+02:00]),WrappedArray([AMPU 1 ApS,[2004-06-16,2009-08-17],2013-11-22T19:12:17.000+01:00], [ApS KBIL 17 NR. 1826,[2004-05-06,2004-06-15],2004-05-11T10:56:12.000+02:00]),WrappedArray(),2015-06-25T13:10:55.286+02:00,WrappedArray(),WrappedArray([E&S,APS,Anpartsselskab,[2004-05-06,2009-08-17],2013-11-22T19:00:13.000+01:00,80]),WrappedArray([[2004-05-06,2008-08-24],2015-02-09T21:00:00.000+01:00,NORMAL], [[2008-08-25,2009-08-16],2015-02-09T21:00:00.000+01:00,UNDER TVANGSOPLØSNING], [[2009-08-17,2009-08-17],2015-02-09T21:00:00.000+01:00,TVANGSOPLØST])] |
|4000562568 |PERSON |[[Mette Bruun Christensen,null]] |Pædagog |[29552932,4001580288,VIRKSOMHED,false,WrappedArray([[2006-06-01,2007-08-31],2006-06-13T12:40:20.000+02:00]),WrappedArray([Terapien v/Mette Bruun Christensen,[2006-06-01,2007-08-31],2013-11-22T19:12:17.000+01:00]),WrappedArray(),2015-06-24T00:55:48.572+02:00,WrappedArray(),WrappedArray([T&S,ENK,Enkeltmandsvirksomhed,[2006-06-01,2007-08-31],2013-11-22T19:00:13.000+01:00,10]),WrappedArray()] |
|4000078388 |PERSON |[[Kim Bergensol Corto Andersen,null]] |null |[20083670,4001021649,VIRKSOMHED,false,WrappedArray([[1997-06-19,2000-09-30],1999-10-16T19:50:03.000+02:00], [[2008-08-20,2012-12-31],2008-10-07T12:03:24.000+02:00], [[2015-10-01,null],2015-08-12T11:11:57.000+02:00]),WrappedArray([KIM ANDERSEN,[1997-06-19,2008-08-19],2013-11-22T22:09:56.000+01:00], [Lyset i Livet v/Kim Andersen,[2008-08-20,2012-12-31],2013-11-22T22:12:17.000+01:00], [Bergensol,[2015-10-01,null],2015-08-12T11:11:57.000+02:00]),WrappedArray(),2015-08-12T11:12:08.938+02:00,WrappedArray(),WrappedArray([T&S,ENK,Enkeltmandsvirksomhed,[1997-06-19,2012-12-31],2013-11-22T22:00:13.000+01:00,10], [T&S,ENK,Enkeltmandsvirksomhed,[2015-10-01,null],2015-08-12T11:11:57.000+02:00,10]),WrappedArray()] |
|4000560418 |PERSON |[[Trine Byrgesen Nielsen,null]] |null |[29485232,4001565256,VIRKSOMHED,false,WrappedArray([[2006-05-01,2008-02-21],2008-02-29T07:05:06.000+01:00]),WrappedArray([Byens-Rengøringsservice I/S v/Trine Nielsen & Jesper Nielsen,[2006-05-01,2008-02-21],2013-11-22T19:12:17.000+01:00]),WrappedArray(),2015-06-24T00:59:40.024+02:00,WrappedArray(),WrappedArray([T&S,I/S,Interessentskab,[2006-05-01,2008-02-21],2013-11-22T19:00:13.000+01:00,30]),WrappedArray()] |
|4000103042 |PERSON |[[Lissi Marthin,null]] |null |[13607400,4000856627,VIRKSOMHED,false,WrappedArray([[1981-01-01,2007-02-28],1999-10-16T17:49:51.000+02:00]),WrappedArray([Marthin Pels Design v/Rita Marthin,[2007-02-06,2007-02-28],2013-11-22T19:12:17.000+01:00], [Skindstudiet Marthin Pels En Gros v/Lissi Marthin,[1981-01-01,2000-11-07],2000-11-08T04:26:03.000+01:00], [Marthin Pels Design v/Lissi Marthin,[2000-11-08,2007-02-04],2000-11-08T04:26:05.000+01:00], [Marthin Pels Design v/Rita Mathin,[2007-02-05,2007-02-05],2007-02-07T11:35:57.000+01:00]),WrappedArray(),2015-06-24T17:20:49.623+02:00,WrappedArray(),WrappedArray([T&S,ENK,Enkeltmandsvirksomhed,[1981-01-01,2007-02-28],2013-11-22T19:00:13.000+01:00,10]),WrappedArray()] |
|4003996076 |PERSON |[[Arne Kirkeby,null]] |Afd fuldm |[17172212,4000945369,VIRKSOMHED,false,WrappedArray([[1964-04-21,2015-05-26],2015-07-08T15:52:18.000+02:00]),WrappedArray([DFDS PENSIONSKASSE,[1964-04-21,2015-05-26],2015-07-08T15:52:18.000+02:00]),WrappedArray([[1992-01-01,1999-10-18],VIR209787,2015-02-10T00:00:00.000+01:00]),2015-07-08T15:52:45.651+02:00,WrappedArray(),WrappedArray([T&S,FOR,Forening,[1964-04-21,2002-11-26],2013-11-22T21:57:52.000+01:00,110], [E&S,FIV,Særlig finansiel virksomhed,[2002-11-27,2015-05-26],2015-07-08T15:52:18.000+02:00,285]),WrappedArray([[1992-01-01,2014-12-02],2015-02-10T00:00:00.000+01:00,NORMAL], [[2014-12-03,2015-05-25],2015-07-08T15:52:17.000+02:00,UNDER FRIVILLIG LIKVIDATION], [[2015-05-26,2015-05-26],2015-07-08T15:52:18.000+02:00,OPLØST EFTER FRIVILLIG LIKVIDATION])] |
|4003996076 |PERSON |[[Arne Kirkeby,null]] |Afd fuldm |[12669216,4000842325,VIRKSOMHED,false,WrappedArray([[1992-05-26,2005-06-13],1999-10-16T17:49:50.000+02:00]),WrappedArray([J. LAURITZEN A/S'S PENSIONSKASSE. AFVIKLINGSKASSE,[1992-05-26,2005-06-13],2013-11-22T19:12:17.000+01:00]),WrappedArray([[1992-05-26,1999-10-18],VIR209750,2015-02-09T21:00:00.000+01:00]),2015-06-25T04:32:32.420+02:00,WrappedArray(),WrappedArray([E&S,UOP,Uoplyst virksomhedsform,[1992-05-26,2005-06-13],2013-11-22T19:00:13.000+01:00,990]),WrappedArray([[1992-05-26,2005-06-12],2015-02-09T21:00:00.000+01:00,NORMAL], [[2005-06-13,2005-06-13],2015-02-09T21:00:00.000+01:00,SLETTET])] |
|4000696805 |PERSON |[[Christina Irmgard Hjort,null]] |null |[33411847,4001835473,VIRKSOMHED,false,WrappedArray([[2011-01-01,2011-03-31],2011-02-15T09:19:30.000+01:00]),WrappedArray([Klinik Kamille v/Christina Irmgard Hjort,[2011-01-01,2011-03-31],2013-11-22T19:12:17.000+01:00]),WrappedArray(),2015-06-24T14:30:42.793+02:00,WrappedArray(),WrappedArray([T&S,ENK,Enkeltmandsvirksomhed,[2011-01-01,2011-03-31],2013-11-22T19:00:13.000+01:00,10]),WrappedArray()] |
|4004156000 |PERSON |[[Dan Klein Jensen,null]] |null |[29415455,4001560713,VIRKSOMHED,false,WrappedArray([[2006-03-16,2007-08-15],2006-03-16T08:37:50.000+01:00]),WrappedArray([ZITRO ApS,[2006-03-16,2007-08-15],2013-11-22T19:12:17.000+01:00]),WrappedArray(),2015-06-25T10:49:06.631+02:00,WrappedArray(),WrappedArray([E&S,APS,Anpartsselskab,[2006-03-16,2007-08-15],2013-11-22T19:00:13.000+01:00,80]),WrappedArray([[2006-03-16,2006-12-19],2015-02-09T21:00:00.000+01:00,NORMAL], [[2006-12-20,2007-08-14],2015-02-09T21:00:00.000+01:00,UNDER FRIVILLIG LIKVIDATION], [[2007-08-15,2007-08-15],2015-02-09T21:00:00.000+01:00,OPLØST EFTER FRIVILLIG LIKVIDATION])] |
|4000709918 |PERSON |[[Jacob Pilegaard,null]] |null |[33808577,4001863635,VIRKSOMHED,false,WrappedArray([[2011-10-01,2014-05-31],2014-05-31T20:30:49.000+02:00]),WrappedArray([pilegaardconsulting v/Jacob Pilegaard,[2011-10-01,2014-05-31],2014-05-31T20:30:49.000+02:00]),WrappedArray(),2015-06-24T11:48:18.565+02:00,WrappedArray(),WrappedArray([T&S,ENK,Enkeltmandsvirksomhed,[2011-10-01,2014-05-31],2014-05-31T20:30:49.000+02:00,10]),WrappedArray()] |
|4004092700 |PERSON |[[Jeanne le Sage de Fontenay,null]] |Musikpædagog |[26498201,4001370180,VIRKSOMHED,false,WrappedArray([[2002-02-22,2008-03-26],2002-03-07T10:19:42.000+01:00]),WrappedArray([AFVIKLINGSSELSKABET AF 6/1 2005 A/S,[2005-01-04,2008-03-26],2013-11-22T19:12:17.000+01:00], [SØNDERJYSK SKOLEFOTO A/S,[2002-02-22,2005-01-03],2002-03-19T04:51:21.000+01:00]),WrappedArray(),2015-06-25T11:40:39.083+02:00,WrappedArray([3,[2005-01-19,2008-03-26],2013-11-22T19:01:37.000+01:00,1]),WrappedArray([E&S,A/S,Aktieselskab,[2002-02-22,2008-03-26],2013-11-22T19:00:13.000+01:00,60]),WrappedArray([[2002-02-22,2005-01-12],2015-02-09T21:00:00.000+01:00,NORMAL], [[2005-01-13,2008-03-25],2015-02-09T21:00:00.000+01:00,UNDER KONKURS], [[2008-03-26,2008-03-26],2015-02-09T21:00:00.000+01:00,OPLØST EFTER KONKURS])] |
|4000457798 |PERSON |[[Lene Schonnings,null]] |null |[27014097,4001393755,VIRKSOMHED,false,WrappedArray([[2003-03-01,2015-04-30],2015-04-07T08:10:11.000+02:00]),WrappedArray([Lene Schonnings/ Teknisk Dokumentation,[2003-03-01,2015-04-30],2015-04-07T08:10:11.000+02:00]),WrappedArray(),2015-06-24T05:28:32.779+02:00,WrappedArray(),WrappedArray([T&S,ENK,Enkeltmandsvirksomhed,[2003-03-01,2015-04-30],2015-04-07T08:10:11.000+02:00,10]),WrappedArray()] |
|4000457798 |PERSON |[[Lene Schonnings,null]] |null |[29743797,4001595738,VIRKSOMHED,false,WrappedArray([[2006-09-01,2006-09-01],2006-09-12T12:08:22.000+02:00]),WrappedArray([Nordvesten I/S v/Lene Schonnings & Kim Nielsen,[2006-09-01,2006-09-01],2013-11-22T19:12:17.000+01:00]),WrappedArray(),2015-06-24T07:10:14.491+02:00,WrappedArray(),WrappedArray([T&S,I/S,Interessentskab,[2006-09-01,2006-09-01],2013-11-22T19:00:13.000+01:00,30]),WrappedArray()] |
|4000408635 |PERSON |[[Ragnhild Vestergaard Pedersen,null]] |Smørrebr jfr |[26299632,4001342872,VIRKSOMHED,false,WrappedArray([[2001-12-01,2014-12-30],2015-01-16T10:03:38.000+01:00]),WrappedArray([Café Vaffelhuset,[2013-04-01,2014-12-30],2015-01-16T10:03:38.000+01:00], [Strandvejens Kiosk Ragnhild Pedersen,[2001-12-01,2005-10-06],2013-11-22T19:09:56.000+01:00], [Strandvejens Kiosk/Vaffelbageriet v/Ragnhild Pedersen,[2005-10-07,2006-05-14],2005-10-11T07:48:06.000+02:00], [Vaffelbageriet v/Ragnhild Pedersen,[2006-05-15,2007-05-07],2006-06-01T07:47:25.000+02:00], [Torvegrillen v/Ragnhild Pedersen,[2007-05-08,2007-05-17],2007-05-14T05:13:39.000+02:00], [Vaffelbageriet / Torvegrillen v/Ragnhild Pedersen,[2007-05-18,2008-12-31],2007-05-18T07:33:35.000+02:00], [Vaffelbageriet,[2009-01-01,2013-03-31],2008-12-17T12:47:56.000+01:00]),WrappedArray(),2015-06-24T14:02:13.234+02:00,WrappedArray(),WrappedArray([T&S,ENK,Enkeltmandsvirksomhed,[2001-12-01,2014-12-30],2015-01-16T10:03:38.000+01:00,10]),WrappedArray()] |
|4000611422 |PERSON |[[Dagmar Renate Uhlenfeldt,null]] |null |[30950755,4001686004,VIRKSOMHED,false,WrappedArray([[2007-09-01,2009-04-07],2007-12-11T10:33:23.000+01:00]),WrappedArray([Hundekennel v/Dagmar Renate Uhlenfeldt,[2007-09-01,2009-04-07],2013-11-22T19:12:17.000+01:00]),WrappedArray(),2015-06-24T07:33:59.657+02:00,WrappedArray(),WrappedArray([T&S,ENK,Enkeltmandsvirksomhed,[2007-09-01,2009-04-07],2013-11-22T19:00:13.000+01:00,10]),WrappedArray()] |
|4000269638 |PERSON |[[Brian Mortensen,null]] |null |[21531499,4001088105,VIRKSOMHED,false,WrappedArray([[1999-04-01,2001-03-31],1999-10-16T17:50:08.000+02:00], [[2006-02-01,2007-01-31],2006-01-25T09:53:16.000+01:00], [[2010-07-01,2011-04-30],2010-07-02T09:23:25.000+02:00]),WrappedArray([Mediepusheren.com v/ Brian Mortensen,[2010-07-01,2011-04-30],2013-11-22T19:12:17.000+01:00], [Twozero Websolutions v/Brian Mortensen,[1999-04-01,2001-03-30],2001-04-02T13:27:47.000+02:00], [Hr. Brian Mortensen,[2001-03-31,2006-01-31],2001-04-02T13:27:50.000+02:00], [FairHosting v/ Brian Mortensen,[2006-02-01,2006-11-21],2006-01-25T09:53:16.000+01:00], [twozero.biz v/ Brian Mortensen,[2006-11-22,2010-06-30],2006-11-27T06:58:08.000+01:00]),WrappedArray(),2015-06-24T02:31:53.202+02:00,WrappedArray(),WrappedArray([T&S,ENK,Enkeltmandsvirksomhed,[1999-04-01,2011-04-30],2013-11-22T19:00:13.000+01:00,10]),WrappedArray()] |
|4004131896 |PERSON |[[Tove Cortnum Poulsen,null]] |Dekoratør |[31070392,4001676150,VIRKSOMHED,false,WrappedArray([[2007-11-26,2011-08-30],2007-11-26T12:00:28.000+01:00]),WrappedArray([CORTNUM INVEST ApS,[2007-11-26,2011-08-30],2013-11-22T19:12:17.000+01:00]),WrappedArray(),2015-06-24T22:14:35.152+02:00,WrappedArray(),WrappedArray([E&S,APS,Anpartsselskab,[2007-11-26,2011-08-30],2013-11-22T19:00:13.000+01:00,80]),WrappedArray([[2007-11-26,2011-08-29],2015-02-09T21:00:00.000+01:00,NORMAL], [[2011-08-30,2011-08-30],2015-02-09T21:00:00.000+01:00,OPLØST EFTER ERKLÆRING])] |
|4004020600 |PERSON |[[Annette Præstmark,null]] |Ergoterapeut |[26269075,4001338782,VIRKSOMHED,false,WrappedArray([[2001-10-03,2006-01-26],2001-10-12T09:18:40.000+02:00]),WrappedArray([INTERACTIVE VISION HOLDING A/S,[2001-11-07,2006-01-26],2013-11-22T19:12:17.000+01:00], [INTERACTIVE VISION HOLDING A/S UNDER STIFTELSE,[2001-10-03,2001-11-06],2013-11-22T19:09:56.000+01:00]),WrappedArray(),2015-06-25T03:21:13.080+02:00,WrappedArray(),WrappedArray([E&S,A/S,Aktieselskab,[2001-11-07,2006-01-26],2013-11-22T19:00:13.000+01:00,60], [E&S,EUO,Enhed under oprettelse i Erhvervsstyrelsen,[2001-10-03,2001-11-06],2013-11-22T18:57:52.000+01:00,270]),WrappedArray([[2001-10-03,2005-08-22],2015-02-09T21:00:00.000+01:00,NORMAL], [[2005-08-23,2006-01-25],2015-02-09T21:00:00.000+01:00,UNDER TVANGSOPLØSNING], [[2006-01-26,2006-01-26],2015-02-09T21:00:00.000+01:00,TVANGSOPLØST])] |
|4004020600 |PERSON |[[Annette Præstmark,null]] |Ergoterapeut |[17379038,4000944771,VIRKSOMHED,false,WrappedArray([[1993-11-01,2011-09-16],1999-10-16T17:49:57.000+02:00]),WrappedArray([AFVIKLINGSSELSKABET AF 4. MARTS 2005 A/S,[2005-03-04,2011-09-16],2013-11-22T19:12:17.000+01:00], [INTERACTIVE VISION A/S,[1999-04-24,2005-03-03],2015-03-18T21:00:00.000+01:00], [INTERACTIVISION A/S AF 1/11 1993,[1993-11-01,1999-01-08],2015-03-18T21:00:00.000+01:00], [INTERACTI VISION A/S,[1999-01-09,1999-04-23],2015-03-18T21:00:00.000+01:00]),WrappedArray([[1993-11-01,1999-10-18],A/S214967,2015-02-09T21:00:00.000+01:00]),2015-06-25T18:26:59.543+02:00,WrappedArray([3,[2011-07-22,2011-09-16],2013-11-22T19:01:37.000+01:00,3], [3,[2005-04-21,2010-12-16],2008-12-10T16:33:22.000+01:00,1], [3,[2010-12-17,2011-07-21],2010-12-17T11:43:02.000+01:00,5]),WrappedArray([E&S,A/S,Aktieselskab,[1993-11-01,2011-09-16],2013-11-22T19:00:13.000+01:00,60]),WrappedArray([[1993-11-01,2005-04-04],2015-02-09T21:00:00.000+01:00,NORMAL], [[2005-04-05,2011-09-15],2015-02-09T21:00:00.000+01:00,UNDER KONKURS], [[2011-09-16,2011-09-16],2015-02-09T21:00:00.000+01:00,OPLØST EFTER KONKURS])]|

only showing top 20 rows
root
|-- personEnhedsNummer: long (nullable = true)
|-- personEnhedstype: string (nullable = true)
|-- personNavne: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- navn: string (nullable = true)
| | |-- sidstOpdateret: string (nullable = true)
|-- personStilling: string (nullable = true)
|-- virksomhed: struct (nullable = true)
| |-- cvrNummer: long (nullable = true)
| |-- enhedsNummer: long (nullable = true)
| |-- enhedstype: string (nullable = true)
| |-- fejlRegistreret: boolean (nullable = true)
| |-- livsforloeb: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- periode: struct (nullable = true)
| | | | |-- gyldigFra: string (nullable = true)
| | | | |-- gyldigTil: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| |-- navne: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- navn: string (nullable = true)
| | | |-- periode: struct (nullable = true)
| | | | |-- gyldigFra: string (nullable = true)
| | | | |-- gyldigTil: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| |-- regNummer: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- periode: struct (nullable = true)
| | | | |-- gyldigFra: string (nullable = true)
| | | | |-- gyldigTil: string (nullable = true)
| | | |-- regnummer: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| |-- sidstIndlaest: string (nullable = true)
| |-- status: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- kreditoplysningkode: long (nullable = true)
| | | |-- periode: struct (nullable = true)
| | | | |-- gyldigFra: string (nullable = true)
| | | | |-- gyldigTil: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| | | |-- statuskode: long (nullable = true)
| |-- virksomhedsform: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- ansvarligDataleverandoer: string (nullable = true)
| | | |-- kortBeskrivelse: string (nullable = true)
| | | |-- langBeskrivelse: string (nullable = true)
| | | |-- periode: struct (nullable = true)
| | | | |-- gyldigFra: string (nullable = true)
| | | | |-- gyldigTil: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| | | |-- virksomhedsformkode: long (nullable = true)
| |-- virksomhedsstatus: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- periode: struct (nullable = true)
| | | | |-- gyldigFra: string (nullable = true)
| | | | |-- gyldigTil: string (nullable = true)
| | | |-- sidstOpdateret: string (nullable = true)
| | | |-- status: string (nullable = true)
In [ ]:
lenUdf = F.udf(lambda x: len(x),IntegerType())
delVirkRelDf.select("personNavne",lenUdf(F.col("personNavne")).alias("length")).orderBy(F.col("length").asc()).show()
In [10]:
delCols = [i for i in delVirkRelDf.columns if i not in ["organisationer","virksomhed"]]
delVirkRelOrgDf = (GetNextJsonLayer
.expandSubCols(delVirkRelDf,"virksomhed")
.drop("livsforloeb")
.drop(F.col("fejlRegistreret"))
.drop("virksomhedsstatus")
.drop("virksomhedsform")
.drop("sidstIndlaest")
.drop("status")
.drop("regNummer")
.withColumnRenamed(existing="enhedsNummer",new="firmaEnhedsNummer")
.withColumnRenamed(existing="navne",new="firmaNavne")
.withColumnRenamed(existing="enhedstype",new="firmaEnhedstype")
.cache()
)
holdOutCol = ["personNavne"]
namesCol = [i for i in delVirkRelOrgDf.columns if i not in holdOutCol]
delVirkRelWithPerNamDf = (GetNextJsonLayer
.createNextLayerTable(delVirkRelOrgDf,namesCol,holdOutCol[0])
.drop("sidstOpdateret")
.withColumnRenamed(existing="navn",new="personNavn")
)
del holdOutCol
del namesCol
holdOutCol = ["firmaNavne"]
namesCol = [i for i in delVirkRelWithPerNamDf.columns if i not in holdOutCol]
delVirkRelWithVirkNamDf = (GetNextJsonLayer
.createNextLayerTable(delVirkRelWithPerNamDf,namesCol,holdOutCol[0])
.drop("sidstOpdateret")
.drop("periode")
.withColumnRenamed(existing="navn",new="firmaNavn")
)
delVirkRelWithVirkNamDf.show()
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/usr/local/share/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
/usr/local/share/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
311 "An error occurred while calling {0}{1}{2}.\n".
--> 312 format(target_id, ".", name), value)
313 else:
Py4JJavaError: An error occurred while calling o282.apply.
: org.apache.spark.sql.AnalysisException: Cannot resolve column name "firmaNavne" among (personEnhedsNummer, personEnhedstype, personStilling, virksomhed_cvrNummer, virksomhed_enhedsNummer, virksomhed_enhedstype, virksomhed_fejlRegistreret, virksomhed_livsforloeb, virksomhed_navne, virksomhed_regNummer, virksomhed_sidstIndlaest, virksomhed_status, virksomhed_virksomhedsform, virksomhed_virksomhedsstatus, personNavn);
at org.apache.spark.sql.Dataset$$anonfun$resolve$1.apply(Dataset.scala:220)
at org.apache.spark.sql.Dataset$$anonfun$resolve$1.apply(Dataset.scala:220)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.Dataset.resolve(Dataset.scala:219)
at org.apache.spark.sql.Dataset.col(Dataset.scala:921)
at org.apache.spark.sql.Dataset.apply(Dataset.scala:908)
at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)
During handling of the above exception, another exception occurred:
AnalysisException Traceback (most recent call last)
<ipython-input-10-8d73163761a8> in <module>()
27 namesCol = [i for i in delVirkRelWithPerNamDf.columns if i not in holdOutCol]
28 delVirkRelWithVirkNamDf = (GetNextJsonLayer
---> 29 .createNextLayerTable(delVirkRelWithPerNamDf,namesCol,holdOutCol[0])
30 .drop("sidstOpdateret")
31 .drop("periode")
/home/svanhmic/workspace/Python/Erhvervs/src/cvr/GetNextJsonLayer.py in createNextLayerTable(df, nonExplodedColumns, explodedColumn, *nonExplodedPrefix)
29 else:
30 relationsDf = df.select([df[prefixedStr+v].alias(v) for v in nonExplodedColumns]+
---> 31 [F.explode(df[prefixedStr+explodedColumn]).alias(explodedColumn)])
32
33 dfSchema = getNextSchemaLayer(relationsDf.schema,explodedColumn)
/usr/local/share/spark/python/pyspark/sql/dataframe.py in __getitem__(self, item)
821 """
822 if isinstance(item, basestring):
--> 823 jc = self._jdf.apply(item)
824 return Column(jc)
825 elif isinstance(item, Column):
/usr/local/share/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:
/usr/local/share/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: 'Cannot resolve column name "firmaNavne" among (personEnhedsNummer, personEnhedstype, personStilling, virksomhed_cvrNummer, virksomhed_enhedsNummer, virksomhed_enhedstype, virksomhed_fejlRegistreret, virksomhed_livsforloeb, virksomhed_navne, virksomhed_regNummer, virksomhed_sidstIndlaest, virksomhed_status, virksomhed_virksomhedsform, virksomhed_virksomhedsstatus, personNavn);'
In [108]:
#create person to oraganisation edge
personFirmaEdgeDf = (delVirkRelWithVirkNamDf.select(F.col("personEnhedsNummer").alias("src"),F.col("firmaEnhedsNummer").alias("dst")))
In [109]:
personFirmaEdgeDf.show()
+----------+----------+
| src| dst|
+----------+----------+
|4000201845|4000873375|
|4004119836|4001457636|
|4004119836|4001457636|
|4000562568|4001580288|
|4000078388|4001021649|
|4000078388|4001021649|
|4000078388|4001021649|
|4000560418|4001565256|
|4000103042|4000856627|
|4000103042|4000856627|
|4000103042|4000856627|
|4000103042|4000856627|
|4003996076|4000945369|
|4003996076|4000842325|
|4000696805|4001835473|
|4004156000|4001560713|
|4000709918|4001863635|
|4004092700|4001370180|
|4004092700|4001370180|
|4000457798|4001393755|
+----------+----------+
only showing top 20 rows
In [ ]:
In [20]:
In [21]:
+----------+----------+-------------+
| src| dst| relationship|
+----------+----------+-------------+
|4000201845| 0|Interessenter|
|4004119836|4004954366| Bestyrelse|
|4000562568| 0|Interessenter|
|4000078388| 0|Interessenter|
|4000560418| 0|Interessenter|
|4000103042| 0|Interessenter|
|4003996076|4004503102| Direktion|
|4003996076|4004411703| Direktion|
|4000696805| 0|Interessenter|
|4004156000|4005087331| Stiftere|
|4004156000|4005087332| Bestyrelse|
|4000709918| 0|Interessenter|
|4004092700|4004857561| Bestyrelse|
|4000457798| 0|Interessenter|
|4000457798| 0|Interessenter|
|4000408635| 0|Interessenter|
|4000611422| 0|Interessenter|
|4000269638| 0|Interessenter|
|4004131896|4005243831| Stiftere|
|4004131896|4005243832| Direktion|
+----------+----------+-------------+
only showing top 20 rows
In [60]:
allPersons = (virkSumRelationWithNamesDf
.select(F.col("enhedsNummer").alias("id")
,F.col("navne_navn").alias("navn")
,F.lit("Person").alias("type"))
.distinct()
.cache()
)
allOrgs = (virkSumRelationWithNamesDf
.select(F.col("enhedsNummerOrganisation").alias("id")
,F.col("orgNavn_navn").alias("navn")
,F.lit("Organisation").alias("type"))
.filter(F.col("id") !=0)
.cache()
)
allVerticesDf = allOrgs.unionAll(allPersons)
In [61]:
allPersons.count()
allOrgs.count()
Out[61]:
2820218
In [56]:
personOrgGrsaphDf = graphframe.GraphFrame(allVerticesDf,personTilFirmaEdgeDf).cache()
groupedCols = [F.col("type")]
personOrgGraphDf.vertices.groupby(*groupedCols).count().show()
+------------+-------+
| type| count|
+------------+-------+
| Person|3773220|
|Organisation|3773220|
+------------+-------+
In [27]:
personOrgGraphDf.degrees.show()
+----------+------+
| id|degree|
+----------+------+
|4004163266| 3|
|4004383649| 14|
|4005812438| 4|
|4004826402| 4|
|4004831974| 3|
|4004203683| 1|
|4004821174| 13|
|4004712825| 28|
|4000219055| 2|
|4005813839| 2|
|4005782118| 3|
|4004274696| 1|
|4004746497| 1|
|4005809630| 6|
|4005806416| 1|
|4004702564| 10|
|4005810208| 1|
|4005811904| 2|
|4005813403| 4|
|4004382609| 9|
+----------+------+
only showing top 20 rows
In [30]:
personOrgGraphDf.outDegrees
+----------+---------+
| id|outDegree|
+----------+---------+
|4004163266| 3|
|4004203683| 1|
|4000219055| 2|
|4004274696| 1|
|4000238588| 2|
|4004072379| 1|
|4000349745| 2|
|4004146496| 1|
|4000052652| 2|
|4004248640| 1|
|4004212823| 2|
|4000312242| 4|
|4004262639| 1|
|4004205560| 1|
|4000338938| 1|
|4000305565| 1|
|4003946989| 1|
|4000394288| 3|
|4000394312| 1|
|4000000978| 2|
+----------+---------+
only showing top 20 rows
In [34]:
personOrgGraphDegreesDf = personOrgGraphDf.vertices.join(personOrgGraphDf.outDegrees,(personOrgGraphDf.vertices["id"]==personOrgGraphDf.outDegrees["id"]),"inner")
In [35]:
personOrgGraphDegreesDf.show()
+-------+--------------------+------+-------+---------+
| id| navn| type| id|outDegree|
+-------+--------------------+------+-------+---------+
|1859433| Lars Stampe Nielsen|Person|1859433| 3|
|1859433| Lars Stampe Nielsen|Person|1859433| 3|
|1859433| Lars Stampe Nielsen|Person|1859433| 3|
|2164870|Jørgen Herman Jan...|Person|2164870| 2|
|2164870|Jørgen Herman Jan...|Person|2164870| 2|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
|2249965| Jakob Lee Jensen|Person|2249965| 19|
+-------+--------------------+------+-------+---------+
only showing top 20 rows
In [ ]:
Content source: mssalvador/notebooks
Similar notebooks: