In [9]:
from collections import namedtuple
from statistics import mean as python_mean
from functools import partial
import pandas as pd
In [10]:
LogDescriptions = namedtuple('LogDescriptions', ['DT1', 'DT2', 'KT'])
DESC = LogDescriptions('DT1', 'DT2', 'KT')
TEST_HUMIDITY_MODE = {
'logs': [
{
'file': r'C:\Users\2065\Desktop\easytest2\src\test_data\humidity\1.xlsx',
'desc': DESC.DT1
},
{
'file': r'C:\Users\2065\Desktop\easytest2\src\test_data\humidity\2.xlsx',
'desc': DESC.DT2
},
{
'file': r'C:\Users\2065\Desktop\easytest2\src\test_data\humidity\2.xlsx',
'desc': DESC.KT
}
],
'target': {
'humidity': 90,
'temperature': 25
},
'md': {
'humidity': [90, 90, 90, 90, 90, 90, 90, 90, 90, 90],
'temperature': [25, 25, 25, 25, 25, 25, 25, 25, 25, 25]
}
}
TEST_HUMIDITY_MODE.update({ # default humidity settings
'slice_length': 10,
'round_to': 1,
'max_deviation': {
'temperature': 2,
'humidity': {
'default': (3, -3),
'98': (2, -3)
}
}
})
In [11]:
def parse_xlsx(data):
dates = []
df = pd.DataFrame()
for item in data:
file, label = item
log = pd.read_excel(file, skiprows=56)
log.columns = ['date', 'time', label+'_hum', label+'_temp', 'crap']
dates.append(log.date[0].date())
log.drop(['date', 'time', 'crap'], axis=1, inplace=True)
df = log if df.empty else pd.merge(df, log,
left_index=True,
right_index=True,
how='inner')
date = max(dates) # use latest date
df['date'] = pd.Series([date for i in df.index])
return df
def parse_csv(data):
raise NotImplementedError('This method is not implemented yet')
def mode_valid(mode):
return all(mode['slice_length'] == len(mode['md'][i])
for i in ['humidity', 'temperature'])
def get_abs_distance(a, b):
return abs(max([a, b]) - min([a, b]))
def calculate_mode(mode, log_slice):
#######################
# TEST BUSINESS LOGIC #
#######################
def mean(arr):
return round(python_mean(arr), mode['round_to'])
date = log_slice.date.max()
dt1_humidity = log_slice.DT1_hum.astype(float).tolist()
dt2_humidity = log_slice.DT2_hum.astype(float).tolist()
kt_temperature = log_slice.KT_temp.astype(float).tolist()
kt_humidity = log_slice.KT_hum.astype(float).tolist()
target_humidity = mode['target']['humidity']
hum_max_dev_settings = mode['max_deviation']['humidity']
print(hum_max_dev_settings)
# if not special setting use default values
humidity_max_deviation = \
hum_max_dev_settings.get(target_humidity,
hum_max_dev_settings['default'])
max_mean_humidity = max(mean(dt1_humidity),
mean(dt2_humidity),
mean(kt_humidity))
min_mean_humidity = min(mean(dt1_humidity),
mean(dt2_humidity),
mean(kt_humidity))
md_delta_humidity = get_abs_distance(mean(kt_humidity),
mean(mode['md']['humidity']))
positive_deviation = get_abs_distance(max_mean_humidity, target_humidity)
negative_deviation = get_abs_distance(min_mean_humidity, target_humidity)
output = { # resulting object with all the necessary data
'date': date,
'target': mode['target'],
'max_allowed_deviation': {
'temperature': mode['max_deviation']['temperature'],
'humidity': humidity_max_deviation
},
'md_temperature': mode['md']['temperature'],
'md_humidity': mode['md']['humidity'],
'dt1_mean_temperature': mean(dt1_humidity),
'dt2_mean_temperature': mean(dt2_humidity),
'kt_mean_temperature': mean(kt_temperature),
'kt_mean_humidity': mean(kt_humidity),
'md_mean_temperature': mean(mode['md']['temperature']),
'md_mean_humidity': mean(mode['md']['humidity']),
'max_mean_humidity': max_mean_humidity,
'min_mean_humidity': min_mean_humidity,
'md_delta_humidity': md_delta_humidity,
'positive_deviation': positive_deviation,
'negative_deviation': negative_deviation,
'humidity_deviation': max_mean_humidity - min_mean_humidity,
'result': {}
}
if positive_deviation:
output['result']['positive'] = {
'passed': positive_deviation < (abs(humidity_max_deviation[0]) -
md_delta_humidity),
'equation_result': (abs(humidity_max_deviation[0]) -
md_delta_humidity)
}
if negative_deviation:
output['result']['negative'] = {
'passed': negative_deviation < (abs(humidity_max_deviation[1]) -
md_delta_humidity),
'equation_result': (abs(humidity_max_deviation[1]) -
md_delta_humidity)
}
output['result']['summary_mode_result'] = (
output['result']['negative']['passed'] and
output['result']['positive']['passed'])
return output
def handle_mode(mode):
if not mode_valid(mode):
raise ValueError('Invalid humidity mode data provided')
log = parse_xlsx((log['file'], log['desc']) for log in mode['logs'])
log.drop(['DT1_temp', 'DT2_temp'], axis=1, inplace=True) # no need
slice_length = mode['slice_length']
cursor = 0
while True:
log_slice = log[cursor: cursor + slice_length]
result = calculate_mode(mode, log_slice)
if log_slice.shape[0] < 10 or result['result']['summary_mode_result']:
return result # if test totaly passed return result or log ended
else:
cursor += 1 # move futher
In [12]:
handle_mode(TEST_HUMIDITY_MODE)
Out[12]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: