In [5]:
'''
Created on Jun 13, 2016

@author: svanhmic

This script creates a compiled csv file with the test csv records. The csv files containing the accounts are converted from a column-based representation to a row based
representation. meaning:
account alpha
var x , val x date x
var y , val y date y
...

to 

account aplha [var x, val x , date x], beta[var x , val x , date x] 

'''
sc.addPyFile('/home/svanhmic/workspace/Python/Erhvervs/src/RegnSkabData/RegnskabsClass.py') # this adds the class regnskabsClass to the spark execution

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import StringType,StructType, ArrayType,IntegerType,DateType
import pyspark.sql.functions as F
import os
import re
from datetime import datetime
import numpy as np
import matplotlib.pyplot as plt
from RegnskabsClass import Regnskaber
import sys
#reload(sys)
#sys.setdefaultencoding('utf-8')

#sc = SparkContext("local[8]","importRegnskabs")
sqlContext = SQLContext(sc)

folderPath = "/home/svanhmic/workspace/Python/Erhvervs/data/regnskabsdata/testcsv"
finalXML = "/home/svanhmic/workspace/Python/Erhvervs/data/regnskabsdata/finalXML"

In [6]:
def convertToDate(col):
    try:
        return datetime.strptime(col,'%Y-%M-%d')
    except:
        return None

In [7]:
def extractFilesForTaxonomy(fileNamesDf,taxTypeDf):
    '''
    Method: This method compares the csv-files in a folder and checks whether the csv-file has a taxonomy.
    
    Input:
        fileNamesDf - Spark Data frame that contains the files that
        taxTypeDf -  Spark Data frame that contains the taxonomys for all csv-files
    
    Output:
        A list of csv-files that contains the taxonomy with most occurences 
    
    '''
    minDf = taxTypeDf.select(F.concat(F.regexp_extract("file",'(\w+)/(\d+-\d+-\d+.xml)',2),F.lit(".csv")).alias("file"),taxTypeDf["taxonomy"]).cache()
    #minDf.show(5,truncate=False)
    
    intersectFilesDf = (fileNamesDf
                        .join(minDf,minDf["file"] == fileNamesDf["file"],"inner")
                        .drop(fileNamesDf["file"])) # join list fo files with list of files with taxonomy, so we can single out those records we want to analyze
    #intersectFilesDf.show(20,truncate=False)

    groupedIntersectFilesDf = intersectFilesDf.groupBy("taxonomy").count()
    #groupedIntersectFilesDf.orderBy(groupedIntersectFilesDf["count"].desc()).show(truncate=False) # show the different types of tax'
    
    mostTaxonomy = groupedIntersectFilesDf.orderBy(groupedIntersectFilesDf["count"].desc()).first()["taxonomy"]
    print(mostTaxonomy)
    filteredCsvDf = intersectFilesDf.filter(intersectFilesDf["taxonomy"] == mostTaxonomy)
    return [str(f["file"]) for f in filteredCsvDf.collect()]

In [8]:
def main():
    lengthUdf = F.udf(lambda x: len(x), IntegerType()) # user def methods 
    convertToDateUdf = F.udf(convertToDate,DateType()) # user def methods
    
    files = os.listdir(folderPath) # gets all the files in csv
    fileNamesDf = sqlContext.createDataFrame([Row(file=f) for f in files]) # import of csv files to dataframe   
    
    struct = StructType().add("file",StringType(),True).add("taxonomy",StringType(),True)
    taxTypeDf = sqlContext.read.csv(finalXML+"/taxlist.csv",header=False,schema=struct,sep=";")
    
    recordList = extractFilesForTaxonomy(fileNamesDf,taxTypeDf)
    #print(recordList)
    list = []
    for f in recordList:
        list.append(Regnskaber(folderPath+"/"+f))
        
    print("Done with all file")
    
    df = sqlContext.createDataFrame(list)
    del(list)
    df.printSchema()
    df.show()
    #print(df.take(1))
    listCsvDf = df.drop("file").select(F.explode(F.col("field")))
    valueCsvDf = listCsvDf.select(F.regexp_replace(listCsvDf["col"]["name"],r"\w+:","").alias("name")
                                   ,listCsvDf["col"]["id"].cast("integer").alias("id")
                                   ,listCsvDf["col"]["value"].alias("value")
                                   ,listCsvDf["col"]["unit"].alias("unit")
                                   ,listCsvDf["col"]["contextRef"].alias("contextRef")
                                   ,listCsvDf["col"]["startDate"].alias("startDate")
                                   ,listCsvDf["col"]["endDate"].alias("endDate"))
    
    valueCsvDf.show(truncate=False)
    #orderedListCsvDf = listCsvDf.orderBy(listCsvDf["fieldlength"].desc()).select(listCsvDf["fieldlength"])
    #newDf = df.select(df["field"]["name"].alias("name"),df["field"]["value"].alias("value"))
    valueCsvDf.write.csv("/home/svanhmic/workspace/Python/Erhvervs/data/regnskabsdata/sparkdata/csv/regnskabsdata.csv",mode='overwrite',header=True,sep=";")
    
    
    
    #plt.eventplot(variableOccurance)
    #plt.show()
    #print(variableOccurance[0])

In [ ]:
if __name__ == '__main__':
    main()

In [ ]: