In [1]:
#Always Pyspark first!
ErhvervsPath = "/home/svanhmic/workspace/DABAI"
cvrPath = "/home/svanhmic/workspace/data/parquet"
from pyspark.sql import functions as F, Window, WindowSpec
from pyspark.sql import Row
from pyspark.sql.types import StringType,ArrayType,IntegerType,DoubleType,StructField,StructType
sc.addPyFile(ErhvervsPath+"/RegnSkabData/ImportRegnskabData.py")
sc.addPyFile(ErhvervsPath+'/RegnSkabData/RegnskabsClass.py')
sc.addPyFile(ErhvervsPath+'/ReadData/GetNextJsonLayer.py')

import sys
import re
import os
import ImportRegnskabData
import GetNextJsonLayer
import itertools
import functools


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-1d76d751b606> in <module>()
     12 import re
     13 import os
---> 14 import ImportRegnskabData
     15 import GetNextJsonLayer
     16 import itertools

/tmp/spark-f9c98dce-e601-4d86-9244-d860661bbecc/userFiles-38352852-6147-4548-9b10-b56ab7d49b69/ImportRegnskabData.py in <module>()
     31 sqlContext = SQLContext(sc)
     32 if getpass.getuser() == "svanhmic":
---> 33     sc.addPyFile('/home/svanhmic/workspace/Python/Erhvervs/src/RegnSkabData/RegnskabsClass.py') # this adds the class regnskabsClass to the spark execution
     34 elif getpass.getuser() == "biml":
     35     sc.addPyFile("/home/biml/bigdata/python_files/regnskab/RegnskabsClass.py")

/usr/local/share/spark/python/pyspark/context.py in addPyFile(self, path)
    803         HTTP, HTTPS or FTP URI.
    804         """
--> 805         self.addFile(path)
    806         (dirname, filename) = os.path.split(path)  # dirname may be directory or HDFS/S3 prefix
    807         if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:

/usr/local/share/spark/python/pyspark/context.py in addFile(self, path)
    786         [100, 200, 300, 400]
    787         """
--> 788         self._jsc.sc().addFile(path)
    789 
    790     def clearFiles(self):

/usr/local/share/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/usr/local/share/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/share/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JJavaError(
    311                     "An error occurred while calling {0}{1}{2}.\n".
--> 312                     format(target_id, ".", name), value)
    313             else:
    314                 raise Py4JError(

Py4JJavaError: An error occurred while calling o33.addFile.
: java.io.FileNotFoundException: Added file file:/home/svanhmic/workspace/Python/Erhvervs/src/RegnSkabData/RegnskabsClass.py does not exist.
	at org.apache.spark.SparkContext.addFile(SparkContext.scala:1413)
	at org.apache.spark.SparkContext.addFile(SparkContext.scala:1384)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:211)
	at java.lang.Thread.run(Thread.java:745)

In [2]:
def createManPowerTabel(aarsDf,kvartDf,maanedDf):
    '''
        Combines medarbejdstabels to one tabel and makes sure that all is sampel once a month
        
        Input:
        
        Output:
    
    
    '''
    def assignPrefix(df,prefix):
        excluded = ["langBeskrivelse","virksomhedsformkode","periode_gyldigFra","periode_gyldigTil","rank"]
        return df.select([F.col(i).alias(prefix+i) for i in df.columns if i not in excluded])
    
    def assignKvartLowerBound(col):
        
        if col == 1: 
            return 1
        elif col == 2:
            return 4
        elif col == 3:
            return 7
        else:
            return 10
        
    def assignKvartUpperBound(col):
    
        if col == 1:
            return 3
        elif col == 2:
            return 6
        elif col == 3:
            return 9
        else:
            return 12
    
    nameCols = ["final_cvrNummer"
                ,"aar"
                ,"maaned"
                ,"final_lower_intervalKodeAntalAarsvaerk"
                ,"final_lower_intervalKodeAntalAnsatte"
                ,"final_kortBeskrivelse"]
     
    #what we want to get is as dataframe that looks like this
    #| cvr | year | month | x | y | aps/as| 
    # where years below 2015 are currently represented as year or qarter data. 
    
    attributesCols = ["aar"]
    minMaxCols =  [F.min,F.max]
    combinedCols = [f(i) for i in attributesCols for f in minMaxCols]
    
    minYear = aarsDf.groupBy().agg(*combinedCols).collect()[0]
    years = [Row(aar=i,maaned=j)  for i in range(minYear[0],minYear[1]+1) for j in range(1,13)]
    
    lowerUdf = F.udf(lambda x: float(assignKvartLowerBound(x)),DoubleType())
    upperUdf = F.udf(lambda x: float(assignKvartUpperBound(x)),DoubleType())
    
    aDf = (assignPrefix(aarsDf,"aar_")
           .withColumnRenamed(existing="aar_lower_intervalKodeAntalInklusivEjere",new="aar_lower_intervalKodeAntalAnsatte")
           .drop("aar_ansvarligDataleverandoer")
           .repartition("aar_aar")
          )
      
    kDf = (assignPrefix(kvartDf,"kvart_")
           .withColumn(col=lowerUdf("kvart_kvartal"),colName ="l")
           .withColumn(col=upperUdf("kvart_kvartal"),colName ="u")
           .repartition("kvart_aar"))
    #aDf.show()

    mDf = (maanedDf
           .select([F.col(re.sub("final_","",c)).alias(nameCols[i]) for i,c in enumerate(nameCols)]))
    
    yearsAndMonthDf = sqlContext.createDataFrame(years).repartition("aar")

    secondCols = [kDf.columns[0]]+yearsAndMonthDf.columns+kDf.columns[3:5]+[kDf.columns[6]]
    secondJoinDf = (yearsAndMonthDf
                    .join(kDf,((yearsAndMonthDf["aar"] == kDf["kvart_aar"])
                               & ( yearsAndMonthDf["maaned"].between(kDf["l"],kDf["u"]) )),how="left")
                    .select([F.col(c).alias(nameCols[i]) for i,c in enumerate(secondCols)])
                    )
                   
    #secondJoinDf.orderBy(F.col("final_cvrNummer").desc()).show()

    coalseSecondCol = [secondJoinDf.columns[0]]+secondJoinDf.columns[3:]
    aCoalseCol = [aDf.columns[0]]+aDf.columns[2:]
    combinedCols = list(zip(coalseSecondCol,aCoalseCol))
    #print(combinedCols)
    actionCols = [F.coalesce(combinedCols[0][0],combinedCols[0][1]).alias(combinedCols[0][0])]+yearsAndMonthDf.columns+[F.coalesce(i,j).alias(i) for i,j in combinedCols[1:]]
    #print(actionCols)
    
    lastJoinDf = (secondJoinDf
                  .join(aDf,((secondJoinDf["aar"] == aDf["aar_aar"]) 
                             & (secondJoinDf["final_cvrNummer"] == None)),how="left")
                  .select(actionCols)
                  )
    #lastJoinDf.show()
    
    return (lastJoinDf
            .na
            .drop(how="all",subset=["final_cvrNummer"])
            
            # remove all that don't have any cvr-nummber
            ) #extract only relevant

In [3]:
dfA = sqlContext.read.parquet(cvrPath+"/AarsVaerker.parquet")
dfK = sqlContext.read.parquet(cvrPath+"/KvartalsVaerker.parquet")
dfM = sqlContext.read.parquet(cvrPath+"/MaanedsVaerker.parquet")

In [4]:
#xDf = createManPowerTabel(dfA,dfK,dfM)
#xDf.write.parquet(mode="overwrite",path="/home/svanhmic/workspace/Python/Erhvervs/data/cdata/TotalAarsVaerker.parquet")

In [5]:
def assignPrefix(df,prefix):
    excluded = ["langBeskrivelse","virksomhedsformkode","periode_gyldigFra","periode_gyldigTil","rank"]
    return df.select([F.col(i).alias(prefix+i) for i in df.columns if i not in excluded])
    
def assignKvartLowerBound(col):
        
    if col == 1: 
        return 1
    elif col == 2:
        return 4
    elif col == 3:
        return 7
    else:
        return 10
        
def assignKvartUpperBound(col):
    
    if col == 1:
        return 3
    elif col == 2:
        return 6
    elif col == 3:
        return 9
    else:
        return 12

In [ ]:


In [6]:
def createYearTabel(df):
    
    attributesCols = ["aar"]
    minMaxCols =  [F.min,F.max]
    combinedCols = [f(i) for i in attributesCols for f in minMaxCols]
    minYear = df.groupBy().agg(*combinedCols).collect()[0]
    years = [Row(aar=i,maaned=j)  for i in range(minYear[0],minYear[1]+1) for j in range(1,13)]
    yearDf = sqlContext.createDataFrame(years)
    
    return yearDf

In [7]:
lowerUdf = F.udf(lambda x: float(assignKvartLowerBound(x)),DoubleType())
upperUdf = F.udf(lambda x: float(assignKvartUpperBound(x)),DoubleType())

y = createYearTabel(dfK)
kCols = ["cvrNummer","aar","maaned","lower_intervalKodeAntalAarsvaerk","lower_intervalKodeAntalAnsatte","kortBeskrivelse"]
expandKDf = (y
             .join(dfK
                   .withColumn(col=lowerUdf("kvartal"),colName="l")
                   .withColumn(col=upperUdf("kvartal"),colName="u")
                   ,((y["aar"] == dfK["aar"]) & (y["maaned"].between(F.col("l"),F.col("u"))))
                   ,"inner"
                  )
             .drop(dfK["aar"])
             .drop(dfK["kvartal"])
             .select(kCols)
            )

expandKDf.show()
expandKDf.count()


+---------+----+------+--------------------------------+------------------------------+---------------+
|cvrNummer| aar|maaned|lower_intervalKodeAntalAarsvaerk|lower_intervalKodeAntalAnsatte|kortBeskrivelse|
+---------+----+------+--------------------------------+------------------------------+---------------+
| 29214654|2007|     1|                            null|                             5|            APS|
| 14339094|2007|     1|                            null|                             2|            A/S|
| 87136213|2007|     1|                            null|                            10|            A/S|
| 21763403|2007|     1|                            null|                             5|            APS|
| 10099366|2007|     1|                            null|                             5|            APS|
| 20888601|2007|     1|                            null|                             2|            APS|
| 13075549|2007|     1|                            null|                             5|            APS|
| 52107016|2007|     1|                            null|                            10|            A/S|
| 26059216|2007|     1|                            null|                             5|            APS|
| 30078993|2007|     1|                            null|                             1|            APS|
| 26677130|2007|     1|                            null|                             5|            APS|
| 25773950|2007|     1|                            null|                             0|            APS|
| 25795687|2007|     1|                            null|                             2|            APS|
| 28112122|2007|     1|                            null|                             2|            APS|
| 70446111|2007|     1|                            null|                            10|            A/S|
| 28697783|2007|     1|                            null|                             1|            APS|
| 25685083|2007|     1|                            null|                             5|            A/S|
| 25826523|2007|     1|                            null|                             5|            A/S|
| 51033019|2007|     1|                            null|                             2|            APS|
| 45532410|2007|     1|                            null|                             5|            A/S|
+---------+----+------+--------------------------------+------------------------------+---------------+
only showing top 20 rows

Out[7]:
13403772

In [8]:
x = createYearTabel(dfA)
cols = ["cvrNummer","aar","maaned","lower_intervalKodeAntalAarsvaerk","lower_intervalKodeAntalInklusivEjere","kortBeskrivelse"]
expandedADf = (x
               .join(dfA,(x["aar"] == dfA["aar"] ),"left")
               .drop(dfA["aar"])
               .select(cols)
              )

expandedADf.show()


+---------+----+------+--------------------------------+------------------------------------+---------------+
|cvrNummer| aar|maaned|lower_intervalKodeAntalAarsvaerk|lower_intervalKodeAntalInklusivEjere|kortBeskrivelse|
+---------+----+------+--------------------------------+------------------------------------+---------------+
| 83072911|1997|     1|                               1|                                   1|            APS|
| 82552316|1997|     1|                               1|                                   1|            A/S|
| 70346419|1997|     1|                               5|                                   5|            APS|
| 19302695|1997|     1|                               2|                                   2|            APS|
| 10667607|1997|     1|                               1|                                   2|            APS|
| 10967899|1997|     1|                               1|                                   2|            APS|
| 89461812|1997|     1|                               5|                                   5|            APS|
| 15678577|1997|     1|                               5|                                   5|            APS|
| 13224188|1997|     1|                               2|                                   2|            APS|
| 83888016|1997|     1|                               1|                                   1|            APS|
| 73396719|1997|     1|                               5|                                   5|            APS|
| 20199083|1997|     1|                               2|                                   5|            APS|
| 12467893|1997|     1|                               2|                                   5|            A/S|
| 17713396|1997|     1|                               2|                                   5|            APS|
| 71276619|1997|     1|                              10|                                  10|            A/S|
| 76699119|1997|     1|                              20|                                  20|            A/S|
| 40234012|1997|     1|                              10|                                  10|            A/S|
| 21862843|1997|     1|                               5|                                  10|            APS|
| 16244201|1997|     1|                              50|                                 100|            A/S|
| 61786619|1997|     1|                               1|                                   2|            APS|
+---------+----+------+--------------------------------+------------------------------------+---------------+
only showing top 20 rows


In [9]:
mCols= ["cvrNummer","aar","maaned","lower_intervalKodeAntalAarsvaerk","lower_intervalKodeAntalAnsatte","kortBeskrivelse"]
MaanedsDf = dfM.select(mCols)

In [10]:
kvartCvrDf = expandKDf.select("cvrNummer","aar","maaned")
aarCvrDf = expandedADf.select("cvrNummer","aar","maaned")

In [11]:
onlyInAar = aarCvrDf.subtract(kvartCvrDf)
onlyInDataDf = (onlyInAar
                .join(expandedADf,((onlyInAar["cvrNummer"] == expandedADf["cvrNummer"]) 
                                   & (onlyInAar["aar"] == expandedADf["aar"])
                                   & (onlyInAar["maaned"] == expandedADf["maaned"])),"inner")
                .drop(expandedADf["cvrNummer"])
                .drop(expandedADf["aar"])
                .drop(expandedADf["maaned"])
               )

In [12]:
onlyInDataDf.show()


+---------+----+------+--------------------------------+------------------------------------+---------------+
|cvrNummer| aar|maaned|lower_intervalKodeAntalAarsvaerk|lower_intervalKodeAntalInklusivEjere|kortBeskrivelse|
+---------+----+------+--------------------------------+------------------------------------+---------------+
| 10000009|1999|     8|                               1|                                   1|            APS|
| 10001706|1998|     1|                               2|                                   2|            APS|
| 10001765|2009|     3|                               1|                                   2|            APS|
| 10002923|1999|     4|                               1|                                   0|            APS|
| 10002966|2006|    12|                               0|                                   1|            APS|
| 10003164|2001|     1|                               0|                                   1|            APS|
| 10003652|2011|     1|                               1|                                   0|            APS|
| 10004500|1998|     4|                              20|                                  20|            A/S|
| 10004861|2004|     7|                               1|                                   0|            A/S|
| 10005205|1999|     7|                               1|                                   1|            APS|
| 10005620|2003|     8|                               2|                                   2|            A/S|
| 10006120|2000|     6|                               0|                                   0|            A/S|
| 10006481|1997|     8|                              10|                                  10|            A/S|
| 10006716|1999|    12|                               2|                                   5|            APS|
| 10010535|1997|     7|                               2|                                   5|            APS|
| 10011221|2003|     7|                               1|                                   1|            A/S|
| 10011574|1997|     1|                               1|                                   1|            APS|
| 10011574|1997|    11|                               1|                                   1|            APS|
| 10012090|1997|     7|                               5|                                   5|            A/S|
| 10012090|1998|     6|                               5|                                  10|            A/S|
+---------+----+------+--------------------------------+------------------------------------+---------------+
only showing top 20 rows


In [13]:
combineAllKADf = expandKDf.unionAll(onlyInDataDf)
combineAllKAMDf = combineAllKADf.unionAll(MaanedsDf)

In [14]:
combineAllKAMDf.show()


+---------+----+------+--------------------------------+------------------------------+---------------+
|cvrNummer| aar|maaned|lower_intervalKodeAntalAarsvaerk|lower_intervalKodeAntalAnsatte|kortBeskrivelse|
+---------+----+------+--------------------------------+------------------------------+---------------+
| 29214654|2007|     1|                            null|                             5|            APS|
| 14339094|2007|     1|                            null|                             2|            A/S|
| 87136213|2007|     1|                            null|                            10|            A/S|
| 21763403|2007|     1|                            null|                             5|            APS|
| 10099366|2007|     1|                            null|                             5|            APS|
| 20888601|2007|     1|                            null|                             2|            APS|
| 13075549|2007|     1|                            null|                             5|            APS|
| 52107016|2007|     1|                            null|                            10|            A/S|
| 26059216|2007|     1|                            null|                             5|            APS|
| 30078993|2007|     1|                            null|                             1|            APS|
| 26677130|2007|     1|                            null|                             5|            APS|
| 25773950|2007|     1|                            null|                             0|            APS|
| 25795687|2007|     1|                            null|                             2|            APS|
| 28112122|2007|     1|                            null|                             2|            APS|
| 70446111|2007|     1|                            null|                            10|            A/S|
| 28697783|2007|     1|                            null|                             1|            APS|
| 25685083|2007|     1|                            null|                             5|            A/S|
| 25826523|2007|     1|                            null|                             5|            A/S|
| 51033019|2007|     1|                            null|                             2|            APS|
| 45532410|2007|     1|                            null|                             5|            A/S|
+---------+----+------+--------------------------------+------------------------------+---------------+
only showing top 20 rows


In [16]:
combineAllKADf.count()


Out[16]:
15729030

In [17]:
combineAllKAMDf.count()


Out[17]:
16836855

In [18]:
combineAllKAMDf.write.parquet(cvrPath+"/TotalAarsVaerker.parquet",mode="overwrite")

In [ ]: