This notebook implements a simple regression testing for FRETBursts.
This notebook defines a series of test-cases that are executed with the current version of FRETBursts. The result of each test is compared with results saved by a previous commit (compare_commit). Saving the tests results (by setting save_reference = True), is possible to use the current commit as a compare_commit in future revisions.
In [ ]:
# If True save this run as reference for future comparison
save_reference = True
In [ ]:
# Commit to use for test comparison
compare_commit = 'f5faeae' # previous saved test point (or None to bootstrap)
In [ ]:
%run load_fretbursts.py --nogui
In [ ]:
data_dir = os.path.abspath(u'../../data/') + '/'
data_dir
In [ ]:
import os
test_save_dir = ''
if save_reference:
test_save_dir = data_dir + 'test/' + git.get_last_commit() + '/'
if not os.path.exists(test_save_dir):
os.mkdir(test_save_dir)
print 'Saving test results in:', test_save_dir
test_load_dir = ''
if compare_commit is not None:
test_load_dir = data_dir + 'test/' + compare_commit + '/'
if not os.path.exists(test_load_dir):
raise ValueError('Path %s not found, choose a different commit.' % test_load_dir)
print 'Loading test results from:', test_load_dir
In [ ]:
import cPickle as pickle
In [ ]:
map_ph_sel = {Ph_sel(Dex='Dem'): 'D', Ph_sel(Dex='Aem'): 'A',
Ph_sel('all'): 'DA', Ph_sel(Aex='Aem'): 'AA'}
map_renames = dict(leakage='BT', rate_th='Th', leakage_corrected='bt_corrected', dir_ex='dir')
new_names = ['bg_fun_name' , 'bg_auto_th', 'bg_ph_sel', 'rate_da', 'bg_da', 'dir_ex', 'dir_ex_corrected']
def compare_data(d1, d2, verbose=True, debug=False, exclude_ph_times=False):
"""Compare two Data() objects for equality (useful for regression test).
"""
if d2 is None:
print ' * WARNING: Saved test not found, skipping comparison.'
return True
equal = True
for key in d1:
if verbose:
print "Testing %s (%s) ... " % (key, type(d1[key])),
if callable(d1[key]):
# Skip function comparison
if verbose: print "skipping (function)"
continue
if exclude_ph_times and key == 'ph_times_m':
if verbose: print "[TEST SKIPPED]"
continue
# Detect variable renames or recent additions
if key not in d2:
if key in map_renames and map_renames[key] in d2:
d2[key] = d2[map_renames[key]]
elif key.endswith('_err') or key.startswith('fit_E_') or \
key.startswith('bg_th_us') or key in new_names:
print "WARNING: Attribute '%s' in d1 is missing in d2" % key
continue
else:
print "ERROR\n * Attribute '%s' in d1 is missing in d2" % key
equal = False
continue
if d1[key] is None:
if not (d2[key] is None):
equal = False
print "ERROR\n * Attribute '%s' is None d1 but %s in d2" % \
(key, d2[key])
elif verbose:
print 'OK (None)'
continue
# Detect new Ph_sel type and compare to old str representation
if type(d1[key]) is Ph_sel and type(d2[key]) is str:
if map_ph_sel[d1[key]] != d2[key]:
#equal = False
print "ERROR\n * Attribute ph_sel does not match: '%s', '%s'" % \
(d1[key], d2[key])
print " >>>> Error amended due to bug in old versions."
continue
if type(d1[key]) is Ph_sel and type(d2[key]) is Ph_sel:
equal = d1[key] == d2[key]
if not equal:
print "ERROR\n * Attribute '%s' do not match: '%s', %s" % \
(d1[key], d2[key])
continue
# Test if the attributes have the same type
if not (type(d1[key]) == type(d2[key])):
equal = False
print "ERROR\n * Attribute '%s' has type %s in d1 but %s in d2" % (key,
type(d1[key]), type(d2[key]))
continue
if np.isscalar(d1[key]):
scalar1, scalar2 = d1[key], d2[key]
if key == 'fname':
scalar1 = os.path.basename(os.path.abspath(scalar1))
scalar2 = os.path.basename(os.path.abspath(scalar2))
if scalar1 != scalar2:
print("ERROR\n d1.{k} and d2.{k} differ (scalar).".format(k=key))
equal = False
elif verbose:
print 'OK (scalar)'
continue
# If the attribute is an empty list
if type(d1[key]) is list and len(d1[key]) == 0:
if not (type(d2[key]) is list and len(d2[key]) == 0):
print "ERROR\n * Attribute '%s' is an empty list in d1 but not in d2"
equal = False
elif verbose:
print 'OK (empty list)'
continue
# If the attribute is a dict
if type(d1[key]) is dict:
dict_comp = []
for sub_key in d1[key]:
if sub_key in d2[key]:
d2_key_subkey = d2[key][sub_key]
else:
if sub_key == Ph_sel(Aex='Dem'):
# Ignore missing key
print " * WARNING: Ignoring missing key %s in d2['%s']" %\
(sub_key, key)
continue
# Try to map new Ph_sel keys to old-style strings
if type(sub_key) is Ph_sel:
ph_map = {Ph_sel(Dex='Dem'): 'D', Ph_sel(Dex='Aem'): 'A',
Ph_sel('all'): 'DA', Ph_sel(Aex='Aem'): 'AA'}
d2_key_subkey = d2[key][ph_map[sub_key]]
else:
print "ERROR\n * The dict '%s' has the key '%s' in d1 but not in d2."
equal = False
continue
if type(d1[key][sub_key]) == np.ndarray:
dict_comp.append(np.allclose(d1[key][sub_key], d2_key_subkey))
else:
dict_comp.append(d1[key][sub_key] == d2[key][sub_key])
equal_dict = np.alltrue(dict_comp)
if not equal_dict:
equal = False
print "ERROR\n * Attribute '%s' (dict) differs between d1 and d2"
elif verbose:
print 'OK (dict)'
continue
# Detect cases of espected changed values
if key == 'bg_th_us_user':
if not d1['ALEX']:
# Test only the first 3 elemenents:
# other elements, if present, are not significant
d1[key] = d1[key][:3]
d2[key] = d2[key][:3]
equal = np.array_equal(d1[key], d2[key])
if not equal:
print "ERROR\n * Attribute '%s' does not match: '%s' vs '%s'" % \
(key, d1[key], d2[key])
continue
# Now the attribute should only be a list of arrays (one per channel)
try:
assert (len(d1[key]) == d1['nch']) and (len(d2[key]) == d2['nch'])
except AssertionError:
equal = False
print "ERROR\n * Attribute '%s' has length %d in d1 and length %d in d2." % \
(key, len(d1[key]), len(d2[key]))
print " They should both be nch (%d)" % d1['nch']
continue
# Test the multi-ch fields (list of arrays)
test_res = np.zeros(d1['nch'], dtype=bool)
for ich, (val1, val2) in enumerate(zip(d1[key], d2[key])):
if type(val1) == type(None):
if debug:
print ('NA1 {} ({}) {}'.format(key, type(val1), val1))
print ('NA2 {} ({}) {}'.format(key, type(val2), val2))
test_res[ich] = (val1 == val2)
else:
if debug:
print ('A1 {} ({}) {}'.format(key, type(val1), val1))
print ('A2 {} ({}) {}'.format(key, type(val2), val2))
test_res[ich] = np.allclose(val1, val2)
if not test_res[ich] and np.isnan(val1).any():
print ' * WARNING: Testing only non-NaN values in %s[%d]' % (key, ich)
test_res[ich] = np.allclose(val1[~np.isnan(val1)], val2[~np.isnan(val1)])
if not test_res.all():
print "ERROR\n d1.%s and d2.%s differ (non-scalar)." % \
(key, key)
print " Test mask: %s " % (test_res,)
equal = False
elif verbose:
print 'OK (multi)'
return equal
In [ ]:
def save_test(name, d, dir_=test_save_dir, exclude_ph_times=True):
print 'Saving test to:', test_save_dir
d_save = dict(d)
# Remove functions
for key in d:
if callable(d_save[key]):
d_save.pop(key)
if exclude_ph_times:
d_save.pop('ph_times_m')
with open(dir_+TEST+'.pickle', 'wb') as f:
pickle.dump(d_save, f, protocol=2)
def load_test(name, dir_=test_load_dir):
print 'Loading test from:', test_load_dir
file_name = dir_ + TEST + '.pickle'
if not os.path.isfile(file_name):
print ' - Saved test not found.'
return None
with open(file_name, 'rb') as f:
d2 = pickle.load(f)
return d2
In [ ]:
fn = "7d_New_150p_320mW_steer_3.dat"
Find the full file name:
In [ ]:
fname = data_dir + fn
fname
Load and process the data:
In [ ]:
TEST = 'test1'
d = loader.multispot8(fname=fname)
d.add(leakage=0.044, gamma=1.)
d.calc_bg(bg.exp_fit, time_s=20, tail_min_us=200)
d.burst_search_t(L=10, m=10, F=6)
d_test = d
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)
In [ ]:
TEST = 'test2'
d.calc_bg(bg.exp_fit, time_s=20, tail_min_us=200)
d.burst_search_t(L=10, m=10, P=None, F=6, ph_sel=Ph_sel(Dex='Dem'))
d_test = d
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)
In [ ]:
d_test['ph_sel'], d_saved['ph_sel']
In [ ]:
TEST = 'test3'
d.calc_bg(bg.exp_fit, time_s=20, tail_min_us=200)
d.burst_search_t(L=10, m=10, F=6, ph_sel=Ph_sel(Dex='Aem'))
d.fuse_bursts(ms=-1)
d_test = d
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)
In [ ]:
TEST = 'test4'
d.calc_bg(bg.exp_fit, time_s=5, tail_min_us=200)
d.burst_search_t(L=20, m=10, F=6, ph_sel=Ph_sel('all'))
d.fuse_bursts(ms=1)
d_test = d
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)
In [ ]:
TEST = 'test5'
E1_raw = 0.65
gamma = 0.45
d.burst_search_t(L=10, m=10, F=6, ph_sel=Ph_sel(Dex='Aem'))
ds = Sel(d, select_bursts.nda, th1=20, gamma1=gamma)
ds.update_gamma(1.)
ds.fit_E_ML_poiss(E1=E1_raw, method=2)
d_test = ds
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)
In [ ]:
TEST = 'test6'
ds.fit_E_generic(E1=E1_raw, fit_fun=bl.gaussian_fit_hist, weights='size', gamma=gamma)
d_test = ds
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, debug=False, exclude_ph_times=True)
In [ ]:
TEST = 'test7'
ds.fit_E_m(E1=E1_raw, weights='size', gamma=gamma)
d_test = ds
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)
In [ ]:
TEST = 'test8'
ds.fit_E_m(E1=E1_raw, weights=None, gamma=gamma)
d_test = ds
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)
In [ ]:
# When reference is d1e37 this fails because TEST9 cannot be saved for that reference
TEST = 'test9'
ds.update_gamma(gamma)
ds.fit_E_two_gauss_EM(weights='size', gamma=gamma)
d_test = ds
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)
In [ ]:
TEST = 'test10'
ds.fit_E_generic(E1=E1_raw, fit_fun=bl.gaussian_fit_cdf)
d_test = ds
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)
In [ ]:
TEST = 'test11'
d.calc_bg(bg.exp_fit, time_s=20, tail_min_us='auto', F_bg=1.7)
d.burst_search_t(L=10, m=10, F=6, ph_sel=Ph_sel('all'), max_rate=True)
d_test = d
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)
In [ ]:
TEST = 'test12'
d.burst_search_t(L=10, m=10, F=6)
ds = Sel(d, select_bursts.size, th1=30)
print ds.num_bu()
ds.calc_max_rate(m=5, ph_sel=Ph_sel(Dex='Aem'))
d_test = ds
if save_reference: save_test(TEST, d_test)
d_saved = load_test(TEST)
In [ ]:
assert compare_data(d_test, d_saved, verbose=False, exclude_ph_times=True)
In [ ]:
In [ ]:
d_test.stats()
In [ ]:
print 'OK'
In [ ]:
In [ ]: