In [1]:
#this notebook generates Kapitals for cvrData
#Always Pyspark first!
ErhvervsPath = "/home/svanhmic/workspace/Python/Erhvervs"

from pyspark.sql import functions as F, Window, WindowSpec
from pyspark.sql import Row
from pyspark.sql.types import StringType,ArrayType,IntegerType,DoubleType,StructField,StructType
sc.addPyFile(ErhvervsPath+"/src/RegnSkabData/ImportRegnskabData.py")
sc.addPyFile(ErhvervsPath+'/src/RegnSkabData/RegnskabsClass.py')
sc.addPyFile(ErhvervsPath+'/src/cvr/GetNextJsonLayer.py')

import sys
import re
import os
import ImportRegnskabData
import GetNextJsonLayer
import itertools
import functools
from pandas.tools.plotting import scatter_matrix


cvrPath = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/parquet"
cvrfiles = os.listdir(cvrPath)
print(cvrfiles)


['KaptialDataFrame.parquet', 'virkdata.parquet', 'AllApsAs.parquet', 'KvartalsVaerker.parquet', 'AarsVaerker.parquet', 'MaanedsVaerker.parquet', 'TotalAarsVaerker.parquet']

In [2]:
#import crv data
cvrDf = (sqlContext
         .read
         .parquet(cvrPath+"/"+cvrfiles[1])
        )

#Extract all Aps and A/S companies
virkformCols = ("cvrNummer","virksomhedsform")

virkformDf = GetNextJsonLayer.createNextLayerTable(cvrDf.select(*virkformCols),[virkformCols[0]],virkformCols[1])
virkformDf = GetNextJsonLayer.expandSubCols(virkformDf,mainColumn="periode")
virkformDf = (virkformDf
              .drop("sidstOpdateret")
              .withColumn(col=F.col("periode_gyldigFra").cast("date"),colName="periode_gyldigFra")
              .withColumn(col=F.col("periode_gyldigTil").cast("date"),colName="periode_gyldigTil")
             )

#virkformDf.show(1)
checkCols = ["kortBeskrivelse","langBeskrivelse","virksomhedsformkode"]

#Consistencycheck is kortBeskrivelse and virksomhedsformkode always mapped the same way
#check1 = virkformDf.select(checkCols+["cvrNummer"]).distinct().groupby(*checkCols).count()
#check1.orderBy("kortBeskrivelse","count").show(check1.count(),truncate=False)

#Second test does any companies go from Aps or A/S to other or vice versa?
joinCols = ["cvrNummer","langBeskrivelse","rank"]
cvrCols = ["cvrNummer"]
gyldigCol = ["periode_gyldigFra"]

statusChangeWindow = (Window
                      .partitionBy(F.col(*cvrCols))
                      .orderBy(F.col("periode_gyldigFra").desc()))

#virkformDf.select(checkCols).distinct().show(50,truncate=False)



#Extract APS and AS here and latest status...
aggregationCols = [F.max(i) for i in gyldigCol]
groupsCol = [i for i in virkformDf.columns if i not in gyldigCol]

companyByAsApsDf = (virkformDf
                    .where((F.col("virksomhedsformkode") == 60) | (F.col("virksomhedsformkode") == 80))
                    .withColumn(col=F.rank().over(statusChangeWindow),colName="rank")
                    .filter(F.col("rank") == 1)
                    .cache()
                   )

companyByAsApsDf.write.parquet(cvrPath+"/AllApsAs.parquet",mode="overwrite")
#companyByAsApsDf.printSchema()
#compDf = companyByAsApsDf.groupBy(*companyByAsApsDf.columns).agg(F.last(F.col("rank")).over(rankWindow))

In [3]:
def pivotOnText(df,**kvargs):
    '''
        does the pivotation on text cols and removes the excess counts
        input df - dataframe 
        kvargs - optional arguments included are: 
                pivotCol - specify column that shoould be pivotated, default type
                valueCol - specify column that should be aggregated on, defalut vaerdi
                expectedList - specify the values in the pivotated column, default ["KAPITAL"]
    '''
    
    #sets some of the optional parameters
    pivotCol = kvargs.get("pivotCol","type")
    expectedList = kvargs.get("expectedList",["KAPITAL"])
    valueCol = kvargs.get("valueCol","vaerdi")
    
    holdOutsCols = [pivotCol,valueCol]
    nonHoldOutCols = [i for i in df.columns if i not in holdOutsCols]

    
    newDf = (df
             .groupBy(df.columns)
             .count()
             .groupBy(*nonHoldOutCols)
             .pivot(pivotCol,expectedList)
             .agg(F.max(F.struct("count",valueCol)))
           )
    expandedDf = GetNextJsonLayer.expandSubCols(newDf,*expectedList)
    newCols = [i for i in expandedDf.columns if i not in [v+"_count" for v in expectedList] ]
    return expandedDf.select(newCols)

In [4]:
#get Attributes data frame
attributDf = GetNextJsonLayer.createNextLayerTable(cvrDf,["cvrNummer"],"attributter")
orderedAttattributDf = attributDf.groupBy("type").count().orderBy(F.col("count").desc())
attributDf.registerTempTable("attribut")
extractedList = ("KAPITAL","KAPITALVALUTA","KAPITALKLASSER","KAPITAL_DELVIST")

#use sql commands to use filter
sqlExprs = "SELECT * FROM attribut WHERE "
for i in extractedList:
    sqlExprs += ' type == "'+i+ '" OR'
    
filtAttributDf = sqlContext.sql(sqlExprs[0:-3])

vaerdi = "vaerdier"
filterdCols = [ i for i in filtAttributDf.columns if i not in (vaerdi)]

filtAttributDf = GetNextJsonLayer.createNextLayerTable(filtAttributDf,filterdCols,vaerdi)
filtAttributDf = GetNextJsonLayer.expandSubCols(filtAttributDf,"periode")
filtAttributDf.registerTempTable("filteredAttributes")

In [23]:
orderedCols = [F.col(i) for i in ("cvrNummer","type","periode_gyldigFra")]
#filtAttributDf.orderBy(*orderedCols).show()

#take a look at the entries with longer sequence numbers 
(sqlContext
 .sql("SELECT * FROM filteredAttributes AS l WHERE EXISTS (SELECT DISTINCT cvrNummer FROM filteredAttributes AS r WHERE sekvensnr >= 1 AND l.cvrNummer = r.cvrNummer)")
 .orderBy(*orderedCols)
 .show()
)


+---------+---------+-------------+----------+--------------------+------------+-----------------+-----------------+
|cvrNummer|sekvensnr|         type|vaerditype|      sidstOpdateret|      vaerdi|periode_gyldigFra|periode_gyldigTil|
+---------+---------+-------------+----------+--------------------+------------+-----------------+-----------------+
| 10005019|        0|      KAPITAL|   decimal|2015-02-10T00:00:...|-13929317.00|       1999-11-04|       2000-01-18|
| 10005019|        0|      KAPITAL|   decimal|2015-02-10T00:00:...| -4729317.00|       2000-01-19|       2000-04-17|
| 10005019|        0|      KAPITAL|   decimal|2015-02-10T00:00:...|  -153840.00|       2000-04-18|       2001-03-29|
| 10005019|        0|      KAPITAL|   decimal|2015-02-10T00:00:...|         .00|       2001-03-30|       2002-10-29|
| 10005019|        0|      KAPITAL|   decimal|2015-02-10T00:00:...|    67300.00|       2002-10-30|       2003-01-07|
| 10005019|        1|      KAPITAL|   decimal|2015-02-10T00:00:...|-14121617.00|       2003-01-08|       2003-01-08|
| 10005019|        0|      KAPITAL|   decimal|2015-02-10T00:00:...|-14054317.00|       2003-01-08|       2013-05-31|
| 10005019|        0|KAPITALVALUTA|    string|2015-02-10T00:00:...|         DKK|       1999-11-04|       2000-01-18|
| 10005019|        0|KAPITALVALUTA|    string|2015-02-10T00:00:...|         DKK|       2000-01-19|       2000-04-17|
| 10005019|        0|KAPITALVALUTA|    string|2015-02-10T00:00:...|         DKK|       2000-04-18|       2001-03-29|
| 10005019|        0|KAPITALVALUTA|    string|2015-02-10T00:00:...|         DKK|       2001-03-30|       2002-10-29|
| 10005019|        0|KAPITALVALUTA|    string|2015-02-10T00:00:...|         EUR|       2002-10-30|       2003-01-07|
| 10005019|        0|KAPITALVALUTA|    string|2015-02-10T00:00:...|         EUR|       2003-01-08|       2013-05-31|
| 10005019|        1|KAPITALVALUTA|    string|2015-02-10T00:00:...|         DKK|       2003-01-08|       2003-01-08|
| 10130123|        1|      KAPITAL|   decimal|2015-02-09T21:00:...|    16813.73|       2003-01-31|       2003-01-31|
| 10130123|        0|      KAPITAL|   decimal|2015-02-09T21:00:...|   125000.00|       2003-01-31|       2003-11-03|
| 10130123|        0|      KAPITAL|   decimal|2015-02-09T21:00:...|    83186.27|       2003-11-04|       2005-05-11|
| 10130123|        0|      KAPITAL|   decimal|2015-02-09T21:00:...|    98186.27|       2005-05-12|       2006-12-19|
| 10130123|        0|      KAPITAL|   decimal|2015-02-09T21:00:...|    98187.27|       2006-12-20|       2013-12-04|
| 10130123|        1|KAPITALVALUTA|    string|2015-02-09T21:00:...|         EUR|       2003-01-31|       2003-01-31|
+---------+---------+-------------+----------+--------------------+------------+-----------------+-----------------+
only showing top 20 rows

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-23-8f86c0583721> in <module>()
     10 
     11 
---> 12 yearMonDf = sqlContext.createDataFrame([Row(aar=i,maaned=j,ts=F.unix_timestamp(str(i)+"-"+str(j)+"01",format="yyyy-mm-dd")) for i in range(1997,2017) for j in range(1,13)])

/usr/local/share/spark/python/pyspark/sql/context.py in createDataFrame(self, data, schema, samplingRatio)
    297         Py4JJavaError: ...
    298         """
--> 299         return self.sparkSession.createDataFrame(data, schema, samplingRatio)
    300 
    301     @since(1.3)

/usr/local/share/spark/python/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio)
    520             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    521         else:
--> 522             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    523         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    524         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/usr/local/share/spark/python/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    384 
    385         if schema is None or isinstance(schema, (list, tuple)):
--> 386             struct = self._inferSchemaFromList(data)
    387             if isinstance(schema, (list, tuple)):
    388                 for i, name in enumerate(schema):

/usr/local/share/spark/python/pyspark/sql/session.py in _inferSchemaFromList(self, data)
    316             warnings.warn("inferring schema from dict is deprecated,"
    317                           "please use pyspark.sql.Row instead")
--> 318         schema = reduce(_merge_type, map(_infer_schema, data))
    319         if _has_nulltype(schema):
    320             raise ValueError("Some of types cannot be determined after inferring")

/usr/local/share/spark/python/pyspark/sql/types.py in _infer_schema(row)
    989         raise TypeError("Can not infer schema for type: %s" % type(row))
    990 
--> 991     fields = [StructField(k, _infer_type(v), True) for k, v in items]
    992     return StructType(fields)
    993 

/usr/local/share/spark/python/pyspark/sql/types.py in <listcomp>(.0)
    989         raise TypeError("Can not infer schema for type: %s" % type(row))
    990 
--> 991     fields = [StructField(k, _infer_type(v), True) for k, v in items]
    992     return StructType(fields)
    993 

/usr/local/share/spark/python/pyspark/sql/types.py in _infer_type(obj)
    964     else:
    965         try:
--> 966             return _infer_schema(obj)
    967         except TypeError:
    968             raise TypeError("not supported type: %s" % type(obj))

/usr/local/share/spark/python/pyspark/sql/types.py in _infer_schema(row)
    989         raise TypeError("Can not infer schema for type: %s" % type(row))
    990 
--> 991     fields = [StructField(k, _infer_type(v), True) for k, v in items]
    992     return StructType(fields)
    993 

/usr/local/share/spark/python/pyspark/sql/types.py in <listcomp>(.0)
    989         raise TypeError("Can not infer schema for type: %s" % type(row))
    990 
--> 991     fields = [StructField(k, _infer_type(v), True) for k, v in items]
    992     return StructType(fields)
    993 

/usr/local/share/spark/python/pyspark/sql/types.py in __init__(self, name, dataType, nullable, metadata)
    401         False
    402         """
--> 403         assert isinstance(dataType, DataType), "dataType should be DataType"
    404         assert isinstance(name, basestring), "field name should be string"
    405         if not isinstance(name, str):

AssertionError: dataType should be DataType

In [28]:
yearMonDf = (sqlContext
             .createDataFrame([Row(aar=i,maaned=j,ts=str(i)+"-"+str(j)+"-1") for i in range(1997,2017) for j in range(1,13)])
             .withColumn(col=F.unix_timestamp(F.col("ts").cast("date")),colName="ts")
            )
yearMonDf.show()


+----+------+---------+
| aar|maaned|       ts|
+----+------+---------+
|1997|     1|852073200|
|1997|     2|854751600|
|1997|     3|857170800|
|1997|     4|859845600|
|1997|     5|862437600|
|1997|     6|865116000|
|1997|     7|867708000|
|1997|     8|870386400|
|1997|     9|873064800|
|1997|    10|875656800|
|1997|    11|878338800|
|1997|    12|880930800|
|1998|     1|883609200|
|1998|     2|886287600|
|1998|     3|888706800|
|1998|     4|891381600|
|1998|     5|893973600|
|1998|     6|896652000|
|1998|     7|899244000|
|1998|     8|901922400|
+----+------+---------+
only showing top 20 rows


In [20]:
#virkformDf.show(1)
checkCols = ["kortBeskrivelse","langBeskrivelse","virksomhedsformkode"]

#Consistencycheck is kortBeskrivelse and virksomhedsformkode always mapped the same way
#check1 = virkformDf.select(checkCols+["cvrNummer"]).distinct().groupby(*checkCols).count()
#check1.orderBy("kortBeskrivelse","count").show(check1.count(),truncate=False)

#Second test does any companies go from Aps or A/S to other or vice versa?
joinCols = ["cvrNummer","langBeskrivelse","rank"]
cvrCols = ["cvrNummer"]
gyldigCol = ["periode_gyldigFra"]

statusChangeWindow = (Window
                      .partitionBy(F.col(*cvrCols))
                      .orderBy(F.col("periode_gyldigFra").desc()))

mainKapitalDf = (pivotOnText(filtAttributDf.drop("vaerditype").drop("sidstOpdateret"),expectedList=extractedList)
                 .withColumnRenamed(existing="periode_gyldigFra",new="gyldigFra")
                 .withColumnRenamed(existing="periode_gyldigTil",new="gyldigTil")
                 .withColumn(col=F.col("gyldigFra").cast("date"),colName="gyldigFra")
                 .withColumn(col=F.col("gyldigTil").cast("date"),colName="gyldigTil")
                 .withColumn(col=F.coalesce(F.col("gyldigTil"),F.lit(F.current_date())),colName="gyldigTil")
                 .withColumn(col=F.datediff(F.col("gyldigTil"),F.col("gyldigFra")),colName="datediff")
                 .withColumn(col=F.unix_timestamp(F.col("gyldigFra")),colName="timeStampFra")
                 .withColumn(col=F.unix_timestamp(F.col("gyldigTil")),colName="timeStampTil")
                 .filter(F.col("sekvensnr")==0)
                 .drop(F.col("sekvensnr"))
                 .orderBy("cvrNummer")
                )
mainKapitalDf.show()


+---------+----------+----------+--------------+--------------------+---------------------+----------------------+--------+------------+------------+
|cvrNummer| gyldigFra| gyldigTil|KAPITAL_vaerdi|KAPITALVALUTA_vaerdi|KAPITALKLASSER_vaerdi|KAPITAL_DELVIST_vaerdi|datediff|timeStampFra|timeStampTil|
+---------+----------+----------+--------------+--------------------+---------------------+----------------------+--------+------------+------------+
| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|   939679200|  1008025200|
| 10000025|1999-10-13|2017-03-09|     125000.00|                 DKK|                 null|                  null|    6357|   939765600|  1489014000|
| 10000122|1999-10-14|2002-08-15|     125000.00|                 DKK|                 null|                  null|    1036|   939852000|  1029362400|
| 10000157|1999-11-04|2017-03-09|     125000.00|                 DKK|                 null|                  null|    6335|   941670000|  1489014000|
| 10000165|1999-10-13|2006-05-18|     500000.00|                 DKK|                 null|                  null|    2409|   939765600|  1147903200|
| 10000211|1999-09-29|2017-03-09|     500000.00|                 DKK|                 null|                  null|    6371|   938556000|  1489014000|
| 10000238|1987-08-30|1989-07-05|     300000.00|                 DKK|                 null|                  null|     675|   557272800|   615592800|
| 10000254|2009-12-29|2017-03-09|    1100500.00|                 DKK|                 null|                  null|    2627|  1262041200|  1489014000|
| 10000254|1999-10-12|2000-08-22|     500000.00|                 DKK|                 null|                  null|     315|   939679200|   966895200|
| 10000254|2000-08-23|2009-12-28|    1000000.00|                 DKK|                 null|                  null|    3414|   966981600|  1261954800|
| 10000262|1999-10-01|2017-03-09|     144000.00|                 DKK|                 null|                  null|    6369|   938728800|  1489014000|
| 10000319|1987-08-30|2001-02-27|     300000.00|                 DKK|                 null|                  null|    4930|   557272800|   983228400|
| 10000327|1999-10-15|2009-02-25|     125000.00|                 DKK|                 null|                  null|    3421|   939938400|  1235516400|
| 10000351|2000-06-15|2000-06-15|      33576.00|                 EUR|                 null|                  null|       0|   961020000|   961020000|
| 10000351|2000-04-05|2000-06-14|      16788.00|                 EUR|                 null|                  null|      70|   954885600|   960933600|
| 10000351|1999-10-15|2000-04-04|     125000.00|                 DKK|                 null|                  null|     172|   939938400|   954799200|
| 10000416|1999-10-14|2017-03-09|     125000.00|                 DKK|                 null|                  null|    6356|   939852000|  1489014000|
| 10000424|1999-10-15|2000-04-04|     125000.00|                 DKK|                 null|                  null|     172|   939938400|   954799200|
| 10000424|2000-05-12|2000-05-15|      33576.00|                 EUR|                 null|                  null|       3|   958082400|   958341600|
| 10000424|2000-04-05|2000-05-11|      16788.00|                 EUR|                 null|                  null|      36|   954885600|   957996000|
+---------+----------+----------+--------------+--------------------+---------------------+----------------------+--------+------------+------------+
only showing top 20 rows


In [33]:
joinedDf = yearMonDf.join(mainKapitalDf,(yearMonDf["ts"].between(F.col("timeStampFra"),F.col("timeStampTil"))),"inner") 
joinedDf.write.parquet(path=cvrPath+"/KaptialDataFrame.parquet",mode="overwrite")

In [32]:
(joinedDf
 .drop("ts")
 .drop("timeStampFra")
 .drop("timeStampTil")
 .orderBy("cvrNummer","aar","maaned").show())


+----+------+---------+----------+----------+--------------+--------------------+---------------------+----------------------+--------+
| aar|maaned|cvrNummer| gyldigFra| gyldigTil|KAPITAL_vaerdi|KAPITALVALUTA_vaerdi|KAPITALKLASSER_vaerdi|KAPITAL_DELVIST_vaerdi|datediff|
+----+------+---------+----------+----------+--------------+--------------------+---------------------+----------------------+--------+
|1999|    11| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|1999|    12| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|     1| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|     2| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|     3| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|     4| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|     5| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|     6| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|     7| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|     8| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|     9| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|    10| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|    11| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2000|    12| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2001|     1| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2001|     2| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2001|     3| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2001|     4| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2001|     5| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
|2001|     6| 10000009|1999-10-12|2001-12-11|     125000.00|                 DKK|                 null|                  null|     791|
+----+------+---------+----------+----------+--------------+--------------------+---------------------+----------------------+--------+
only showing top 20 rows


In [ ]: