In [ ]:
#Imports and Constants
from IPython.core.debugger import set_trace
import numpy as np
import pandas as pd
import sqlalchemy as sql
import datetime
import itertools
import glob
import re
from openpyxl import load_workbook
#Set these values for each run
#Dieser Wert bestimmt der Auftrag, der vervollständigt werden soll.
AuftragPath = './absys_files/input/*'
#Dieser Wert bestimmt die ExcelDatei in der die Heizkörper bewertet wurden.
HeizkoerperfaktordatenPath = './absys_files/HK_Gesobau_Steffen.xlsx'
#Diese Regular expression bestimmt die Protokolle, aus denen die Daten ausgelesen werden sollen.
protoPathPattern='./absys_files/protokolle/*'
In [ ]:
#Functions
def findZählernummer(ZNr, df):
if isinstance(int(ZNr), int):
return df[(df['ZählernummerAlt'] == ZNr)]
def protoCleanupHeizung(df):
df2append = df.dropna(subset=['Baulänge','Bauhöhe','Bautiefe','HK-Art'])
df2append = df2append[(df2append['Baulänge'] != 'Baulänge') &
(df2append['Bauhöhe'] != 'Bauhöhe')&
(df2append['Bautiefe'] != 'Bautiefe') &
(df2append['ZählernummerAlt'] != 'Altgerät') &
(df2append['ZählernummerNeu'] != 'Neugerät')]
df2append = df2append[(df2append['Baulänge'] != 'D') &
(df2append['Bauhöhe'] != 'E') &
(df2append['Bautiefe'] != 'F') &
(df2append['ZählernummerAlt'] != 'Nr.') &
(df2append['ZählernummerNeu'] != 'Nr.')]
return df2append
def protoCleanupWasser(df):
df2append = df[df['WZ']=='W']
df2append = df2append.append(df[df['WZ']=='K'])
return df2append
def protoExtractDateHeizung(df):
date = np.nan
dfDates = []
dates = list(zip(df['Date1'].values, df['Bauhöhe'].values,df['Date3'].values))
for date1, date2, date3 in dates:
if type(date1) == datetime.datetime:
if type(date2) == datetime.datetime:
if type(date3) == datetime.datetime:
date = date3
else:
date = date2
else:
date = date1
dfDates.append(date)
df['MontageDatum'] = dfDates
return df
def protoExtractDateWasser(df):
date = np.nan
dfDates = []
dates = list(zip(df['Zählerart'].values, df['ZählernummerAlt'].values,df['ZählerStandAlt'].values))
for date1, date2, date3 in dates:
if type(date1) == datetime.datetime:
if type(date2) == datetime.datetime:
if type(date3) == datetime.datetime:
date = date3
else:
date = date2
else:
date = date1
dfDates.append(date)
df['MontageDatum'] = dfDates
return df
def collectProtocols(pathPattern):
#Hier werden die Heizkostenverteilerdaten eingelesen
protoColumns = {'Date1':str,
'HK-Art':str,
'Baulänge':str,
'Bauhöhe':str,
'Bautiefe':str,
'Baugliedlänge':str,
'Nabensabstand':str,
'anderer Stuff':str,
'Date3':str,
'ZählernummerAlt':int,
'ZählerStandAlt':str,
'ZählernummerNeu':int}
protokolle = glob.glob(pathPattern)
#ToDo: Fehlerbehandlung wenn keine Dateien gefunden werden.
protoDF = pd.DataFrame(columns=protoColumns.keys())
print('Processing '+str(len(protokolle))+' Files for HKs')
for protokol in protokolle:
print("Processing HKs: "+protokol)
df = pd.read_excel(protokol,
sheetname=0,
names=protoColumns.keys(),
dtype=protoColumns,
parse_cols="C,D,E,F,G,H,I,J,K,L,R,V")
dfWithDatesHeizung = protoExtractDateHeizung(df)
df2appendHeizung = protoCleanupHeizung(dfWithDatesHeizung)
df2appendHeizung['sourcefile'] = protokol
protoDF = protoDF.append(df2appendHeizung)
#Hier werden die Heizkostenverteilerdaten eingelesen
protoColumns = {'WZ':str,
'Zählerart':str,
'ZählernummerAlt':int,
'ZählerStandAlt':str,
'ZählernummerNeu':int,
'ZählerStandNeu':str,}
#ToDo: Fehlerbehandlung wenn keine Dateien gefunden werden.
print('Processing '+str(len(protokolle))+' Files for WZs')
for protokol in protokolle:
print("Processing WZs: "+protokol)
df = pd.read_excel(protokol,
sheetname=0,
names=protoColumns.keys(),
dtype=protoColumns,
parse_cols="A,C,F,K,L,W")
dfWithDatesWasser = protoExtractDateWasser(df)
df2appendWasser = protoCleanupWasser(dfWithDatesWasser)
df2appendWasser['sourcefile'] = protokol
protoDF = protoDF.append(df2appendWasser)
protoDF.reset_index(drop=True, inplace=True)
protoDF['ZählernummerAlt'] = pd.to_numeric(protoDF['ZählernummerAlt'], errors="coarse")
return protoDF
def filterHK(df, value=None, dimension=None, deviation=0):
try:
return df[(value-(deviation*value) <= df[dimension]) & (df[dimension] <= value+(deviation*value))]
except:
print("Critical Error Processing: "+str(value)+", "+dimension)
def filterHKFactors(HKdf, laenge, hoehe, tiefe, deviation_threshold = 5):
if type(laenge) is int:
for deviation in range(0, deviation_threshold):
HKtempdf_laenge = filterHK(HKdf, laenge, 'Länge', deviation*0.01)
if (len(HKtempdf_laenge) > 0):
print('Abweichung fuer Länge: '+str(deviation*0.01)+'%')
break
else:
HKtempdf_laenge = HKdf
#print(HKtempdf_laenge)
if type(hoehe) is int:
for deviation in range(0, deviation_threshold):
HKtempdf_hoehe = filterHK(HKtempdf_laenge, hoehe, 'Höhe', deviation*0.01)
if (len(HKtempdf_hoehe) > 0):
print('Abweichung fuer Höhe: '+str(deviation*0.01)+'%')
break
else:
HKtempdf_hoehe = HKtempdf_laenge
#print(HKtempdf_hoehe)
if type(tiefe) is int:
for deviation in range(0, deviation_threshold):
HKtempdf_tiefe = filterHK(HKtempdf_hoehe, tiefe, 'Tiefe', deviation*0.01)
if (len(HKtempdf_tiefe) > 0):
print('Abweichung fuer Tiefe: '+str(deviation*0.01)+'%')
break
else:
HKtempdf_tiefe = HKtempdf_hoehe
#print(HKtempdf_tiefe)
return HKtempdf_tiefe
def HKFactor(laenge, hoehe, tiefe, hkArt):
HKFactors = HKdf
#HKFactors = filterHKFactors(HKdf, laenge, hoehe, tiefe)
#Enforce exact matching
try:
HKFactors = HKFactors[(HKFactors['Baulänge'] == laenge) &
(HKFactors['Bauhöhe'] == hoehe) &
(HKFactors['Bautiefe'] == tiefe) &
(HKFactors['HK-Art'] == hkArt)].drop_duplicates(keep='first')
except:
print(str(laenge)+':'+str(type(laenge))+' '
+str(hoehe)+':'+str(type(hoehe))+' '
+str(tiefe)+':'+str(type(tiefe)))
if type(HKFactors) == type(None):
return -1
elif len(HKFactors) == 0:
#print('-1: No Match')
return -1
elif len(HKFactors) == 1:
return float(HKFactors['EN 75/65'].values[0])
elif len(HKFactors) > 1:
print('-2: Multiple Matches')
print(HKFactors)
return -2
print('Critical Error at HKFactor('+str(laenge)+':'+str(type(laenge))+' '
+str(hoehe)+':'+str(type(hoehe))+' '
+str(tiefe)+':'+str(type(tiefe))+' '
+str(kArt)+':'+str(type(kArt))+')')
In [ ]:
#Load Data
#Reads FactorData from file
HKdf = pd.read_excel(HeizkoerperfaktordatenPath, header=0)
HKdf["HK-Art"] = HKdf["HK-Art"].str.upper()
HKdf.drop_duplicates(keep='first',subset={"Baulänge","Bauhöhe","Bautiefe","HK-Art"}, inplace=True)
#Read Protokolle
protoAll = collectProtocols(protoPathPattern).drop(['Date1', 'Date3'], axis=1)
In [ ]:
HKdfLen = len(HKdf)
for Auftrag in glob.glob(AuftragPath):
print('Processing: '+Auftrag)
#Nimmt an, dass das Excel-sheet IMMER 'Zählerliste' heißt. Tippfehler gehen schief!!!
df = pd.read_excel(Auftrag, sheetname='Zählerliste', converters = {'NE':str}, header=9)
df['alte\nZähler-Nr'] = pd.to_numeric(df['alte\nZähler-Nr'], errors="coerce")
#Es wird vorrausgesetzt, das die Zaehlernummern immer in der 5.(E) Spalte stehen.
for Zählernummer in df['alte\nZähler-Nr']:
#Fehlerbehandlung, wenn Feldinhalt keine Zählernummer
try:
proto = findZählernummer(Zählernummer, protoAll)
except:
df.loc[(df['alte\nZähler-Nr'] == Zählernummer),'Hinweis\nMonteur'] = 'Keine gültige Zählernummer'
if (len(proto['ZählernummerNeu'].values) > 0):
df.loc[(df['alte\nZähler-Nr'] == Zählernummer),'neue\nZähler-Nr'] = proto['ZählernummerNeu'].values[0]
#ToDo: Fehlerbehandlung wenn ein gültiger Zählerstand vorhanden.
df.loc[(df['alte\nZähler-Nr'] == Zählernummer),'Stand\nAusbau'] = proto['ZählerStandAlt'].values[0]
df.loc[(df['alte\nZähler-Nr'] == Zählernummer),'Stand\nEinbau'] = proto['ZählerStandNeu'].values[0]
#Convert MontageDatum to german formating
try:
df.loc[(df['alte\nZähler-Nr'] == Zählernummer),'Einbau\nDatum'] = pd.to_datetime(proto['MontageDatum'].values[0]).strftime("%d.%m.%Y")
except:
df.loc[(df['alte\nZähler-Nr'] == Zählernummer),'Hinweis\nMonteur'] = (str(Zählernummer)+' hat kein gültiges Datum: '+proto['sourcefile'])
pass
laenge = proto['Baulänge'].values[0]
hoehe = proto['Bauhöhe'].values[0]
tiefe = proto['Bautiefe'].values[0]
try:
hkArt = proto['HK-Art'].values[0].upper()
except:
pass
try:
if 'techem' not in proto['Zählerart'].values[0].lower():
if 'ista' in proto['Zählerart'].values[0].lower():
df.loc[(df['alte\nZähler-Nr'] == Zählernummer),'Hinweis\nMonteur'] = '2" Ista MK'
except:
pass
Faktor = HKFactor(laenge=laenge, hoehe=hoehe, tiefe=tiefe, hkArt=hkArt)
if Faktor == -1:
if ((str(laenge) == 'nan') & (str(hoehe) == 'nan') & (str(tiefe) == 'nan')):
pass
else:
print('adding:')
print(proto)
df.loc[(df['alte\nZähler-Nr'] == Zählernummer),'Hinweis\nMonteur'] = ('-1: Heizkörper aus '+proto['sourcefile']+' noch nicht bestimmt.')
HKdf = HKdf.append(proto)
elif not(type(Faktor) == float):
print(str(Faktor))
else:
df.loc[(df['alte\nZähler-Nr'] == Zählernummer),'Faktor'] = Faktor
else:
df.loc[(df['alte\nZähler-Nr'] == Zählernummer),'Hinweis\nMonteur'] = ('Zählernummer '+str(Zählernummer)+' nicht gefunden')
pass
#This function writes the file after doing some final formatting
#Writing file:
AuftragOutput = re.sub('input', 'output', Auftrag)
AuftragOutput = re.sub('.XLSX$', '_Output.xlsx', AuftragOutput)
writer = pd.ExcelWriter(AuftragOutput)
df.to_excel(writer, 'Zählerliste', index=False, startrow = 9)
try:
writer.save()
print(AuftragOutput+' saved')
except:
print('I/O Error: saving '+AuftragOutput+' failed')
#Copy'n'Pastes the Header of the current Auftragsfile
cells2copy = [(1,1),(1,3),(1,4),(1,5),(1,6),(2,3),(2,4),(2,5),(2,6),(5,3)]
wb_in = load_workbook(Auftrag)
sheet_in = wb_in['Zählerliste']
wb_out = load_workbook(AuftragOutput)
sheet_out = wb_out['Zählerliste']
for column, row in cells2copy:
sheet_out.cell(row=row, column=column).value = sheet_in.cell(row=row, column=column).value
wb_out.save(AuftragOutput)
if len(HKdf) > HKdfLen:
writer = pd.ExcelWriter(HeizkoerperfaktordatenPath)
HKdf['HK-Art'] = HKdf['HK-Art'].str.upper()
HKdf.drop_duplicates(keep='first',subset={"Baulänge","Bauhöhe","Bautiefe","HK-Art"}, inplace=True)
HKdf.to_excel(writer, 'HK', index=False)
writer.save()
print('Writing new HKs to '+HeizkoerperfaktordatenPath)
In [ ]:
HKdf