In [ ]:
%matplotlib inline
In [ ]:
import importlib
In [ ]:
importlib.reload(RooFitMP_analysis)
In [ ]:
import RooFitMP_analysis
In [ ]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import glob
from RooFitMP_analysis import *
In [ ]:
df_split_timings_1538069 = build_comb_df_split_timing_info('../rootbench/1538069.burrell.nikhef.nl.out')
dfs_1538069 = load_result_file('../rootbench/1538069.burrell.nikhef.nl/RoofitMPworkspace_1549966927.json', match_y_axes=True)
In [ ]:
df_total_timings_1538069 = dfs_1538069['BM_RooFit_MP_GradMinimizer_workspace_file']
df_meta_1538069 = df_total_timings_1538069.drop(['real_time', 'real or ideal'], axis=1).dropna().set_index('benchmark_number', drop=True)
In [ ]:
df_baseline_timings_1538069 = dfs_1538069['BM_RooFit_RooMinimizer_workspace_file']
vanilla_t, vanilla_t_std = df_baseline_timings_1538069['real_time'].mean() / 1000, df_baseline_timings_1538069['real_time'].std() / 1000
In [ ]:
_df = combine_detailed_with_gbench_timings_by_name(df_total_timings_1538069,
df_split_timings_1538069,
{'update': 'update state',
'gradient': 'gradient work',
'terminate': 'terminate'},
add_ideal=['gradient'])
_g = sns.relplot(data=_df,
x='NumCPU', y='time [s]', style="real or ideal",
hue='timing_type',
markers=True, err_style="bars", legend='full', kind="line")
linestyle = {'color': 'black', 'lw': 0.7}
_g.axes[0,0].axhline(vanilla_t, **linestyle)
_g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
_g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)
In [ ]:
_timing_types = {
'update': 'update state',
'gradient': 'gradient work',
'terminate': 'terminate',
'partial derivatives': 'partial derivative'
}
_df = combine_detailed_with_gbench_timings_by_name(df_total_timings_1538069,
df_split_timings_1538069,
timing_types=_timing_types,
add_ideal=['gradient'],
exclude_from_rest=['partial derivatives'])
_g = sns.relplot(data=_df,
x='NumCPU', y='time [s]', style="real or ideal",
hue='timing_type',
markers=True, err_style="bars", legend='full', kind="line")
linestyle = {'color': 'black', 'lw': 0.7}
_g.axes[0,0].axhline(vanilla_t, **linestyle)
_g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
_g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)
In [ ]:
df_split_timings_1604382 = build_comb_df_split_timing_info('../rootbench/1604382.burrell.nikhef.nl.out')
In [ ]:
dfs_1604382 = load_result_file('../rootbench/1604382.burrell.nikhef.nl/RoofitMPworkspaceNoOptConst_1551699016.json', match_y_axes=True)
In [ ]:
df_total_timings_1604382 = dfs_1604382['BM_RooFit_MP_GradMinimizer_workspace_file_noOptConst']
df_meta_1604382 = df_total_timings_1604382.drop(['real_time', 'real or ideal'], axis=1).dropna().set_index('benchmark_number', drop=True)
df_baseline_timings_1604382 = dfs_1604382['BM_RooFit_RooMinimizer_workspace_file_noOptConst']
vanilla_t, vanilla_t_std = df_baseline_timings_1604382['real_time'].mean() / 1000, df_baseline_timings_1604382['real_time'].std() / 1000
In [ ]:
_timing_types = {
'update': 'update state',
'gradient': 'gradient work',
'terminate': 'terminate',
'partial derivatives': 'partial derivative'
}
_df = combine_detailed_with_gbench_timings_by_name(df_total_timings_1604382,
df_split_timings_1604382,
timing_types=_timing_types,
add_ideal=['gradient'],
exclude_from_rest=['partial derivatives'])
_g = sns.relplot(data=_df,
x='NumCPU', y='time [s]', style="real or ideal",
hue='timing_type',
markers=True, err_style="bars", legend='full', kind="line")
linestyle = {'color': 'black', 'lw': 0.7}
_g.axes[0,0].axhline(vanilla_t, **linestyle)
_g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
_g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)
In [ ]:
df_split_timings_1604381 = build_comb_df_split_timing_info('../rootbench/1604381.burrell.nikhef.nl.out')
In [ ]:
dfs_1604381 = load_result_file('../rootbench/1604381.burrell.nikhef.nl/RoofitMPworkspace_1551694135.json', match_y_axes=True)
Ok, that one failed after 4 runs, but let's see how that went anyway to get a better feeling for it:
In [ ]:
_df = df_split_timings_1604381[df_split_timings_1604381['timing_type'] == 'gradient work']
_x = np.arange(len(_df))
plt.bar(_x, _df['time [s]'])
In [ ]:
_df['time [s]'].describe()
In the next benchmarks, we added a lot more timing output that needs to be incorporated into the analysis:
line_search:
As an additional book keeping complication, we need to run the large workspaces separately for different NumCPU parameters, both to speed up the runs (let them run on the cluster in parallel) and because we are currently getting crashes when running everything in one go; when running with 10 repeats per NumCPU the whole thing just stops after 4 repeats of the single-worker run; when running with 1 repeat it just crashes after the single-worker run (though it does write out the benchmark data to JSON, so that's promising for running the tasks separately).
In [ ]:
df_split_timings_1604381 = build_comb_df_split_timing_info('../rootbench/1633603.burrell.nikhef.nl.out')
Yeah, so, we're going to have to make adjustments, due to the new timings having been added.
Let's paste the necessary functions back in here and adjust them.
In [ ]:
def extract_split_timing_info(fn):
"""
Group lines by benchmark iteration, starting from migrad until
after the forks have been terminated.
"""
with open(fn, 'r') as fh:
lines = fh.read().splitlines()
bm_iterations = []
start_indices = []
end_indices = []
for ix, line in enumerate(lines):
if 'start migrad' in line:
if lines[ix - 1] == 'start migrad': # sometimes 'start migrad' appears twice
start_indices.pop()
start_indices.append(ix)
elif 'terminate' in line:
end_indices.append(ix)
if len(start_indices) != len(end_indices):
raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
for ix in range(len(start_indices)):
bm_iterations.append(lines[start_indices[ix] + 1:end_indices[ix] + 1])
return bm_iterations
def separate_partderi_job_time_lines(lines):
partial_derivatives = []
job_times = []
for line in lines:
# if line[:18] == '[#1] DEBUG: -- job':
if line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id':
partial_derivatives.append(line)
else:
job_times.append(line)
return partial_derivatives, job_times
def group_timing_lines(bm_iteration_lines):
"""
Group lines (from one benchmark iteration) by gradient call,
specifying:
- Update time
- Gradient work time
- For all partial derivatives a sublist of all lines
Finally, the terminate time for the entire bm_iteration is also
returned (last line in the list).
"""
gradient_calls = []
start_indices = []
end_indices = []
# flag to check whether we are still in the same gradient call
in_block = False
for ix, line in enumerate(bm_iteration_lines[:-1]): # -1: leave out terminate line
if not in_block and (line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id'):
start_indices.append(ix)
in_block = True
elif line[:12] == 'update_state' or line[:27] == '[#0] DEBUG: -- update_state':
end_indices.append(ix)
in_block = False
if len(start_indices) != len(end_indices):
raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
for ix in range(len(start_indices)):
partial_derivatives, job_times = separate_partderi_job_time_lines(bm_iteration_lines[start_indices[ix]:end_indices[ix]])
gradient_calls.append({
'gradient_total': bm_iteration_lines[end_indices[ix]],
'partial_derivatives': partial_derivatives,
'job_times': job_times
})
try:
terminate_line = bm_iteration_lines[-1]
except IndexError:
terminate_line = None
return gradient_calls, terminate_line
def build_df_split_timing_run(timing_grouped_lines_list, terminate_line):
data = {'time [s]': [], 'timing_type': [], 'worker_id': [], 'task': []}
for gradient_call_timings in timing_grouped_lines_list:
words = gradient_call_timings['gradient_total'].split()
data['time [s]'].append(float(words[4][:-2]))
data['timing_type'].append('update state')
data['worker_id'].append(None)
data['task'].append(None)
data['time [s]'].append(float(words[11][:-1]))
data['timing_type'].append('gradient work')
data['worker_id'].append(None)
data['task'].append(None)
for partial_derivative_line in gradient_call_timings['partial_derivatives']:
words = partial_derivative_line.split()
try:
data['worker_id'].append(words[4][:-1])
data['task'].append(words[6][:-1])
data['time [s]'].append(float(words[10][:-1]))
data['timing_type'].append('partial derivative')
except ValueError as e:
print(words)
raise e
words = terminate_line.split()
data['time [s]'].append(float(words[4][:-1]))
data['timing_type'].append('terminate')
data['worker_id'].append(None)
data['task'].append(None)
return pd.DataFrame(data)
def build_dflist_split_timing_info(fn):
bm_iterations = extract_split_timing_info(fn)
dflist = []
for bm in bm_iterations:
grouped_lines, terminate_line = group_timing_lines(bm)
if terminate_line is not None:
dflist.append(build_df_split_timing_run(grouped_lines, terminate_line))
return dflist
def build_comb_df_split_timing_info(fn):
dflist = build_dflist_split_timing_info(fn)
ix = 0
for df in dflist:
df_pardiff = df[df["timing_type"] == "partial derivative"]
N_tasks = len(df_pardiff["task"].unique())
N_gradients = len(df_pardiff) // N_tasks
gradient_indices = np.hstack(i * np.ones(N_tasks, dtype='int') for i in range(N_gradients))
df["gradient number"] = pd.Series(dtype='Int64')
df.loc[df["timing_type"] == "partial derivative", "gradient number"] = gradient_indices
df["benchmark_number"] = ix
ix += 1
return pd.concat(dflist)
# AhsF1415
def combine_split_total_timings(df_total_timings, df_split_timings,
calculate_rest=True, exclude_from_rest=[],
add_ideal=[]):
df_meta = df_total_timings.drop(['real_time', 'real or ideal'], axis=1).dropna().set_index('benchmark_number', drop=True)
df_all_timings = df_total_timings.rename(columns={'real_time': 'time [s]'})
df_all_timings['time [s]'] /= 1000 # convert to seconds
df_all_timings['timing_type'] = 'total'
df_split_sum = {}
for name, df in df_split_timings.items():
df_split_sum[name] = df.groupby('benchmark_number').sum().join(df_meta, on='benchmark_number').reset_index()
df_split_sum[name]['real or ideal'] = 'real'
if name in add_ideal:
df_split_sum[name] = add_ideal_timings(df_split_sum[name], time_col='time [s]')
df_split_sum[name]['timing_type'] = name
# note: sort sorts the *columns* if they are not aligned, nothing happens with the column data itself
df_all_timings = pd.concat([df_all_timings, ] + list(df_split_sum.values()), sort=True)
if calculate_rest:
rest_time = df_all_timings[(df_all_timings['timing_type'] == 'total') & (df_all_timings['real or ideal'] == 'real')].set_index('benchmark_number')['time [s]']
for name, df in df_split_sum.items():
if name not in exclude_from_rest:
rest_time = rest_time - df.set_index('benchmark_number')['time [s]']
df_rest_time = rest_time.to_frame().join(df_meta, on='benchmark_number').reset_index()
df_rest_time['timing_type'] = 'rest'
df_rest_time['real or ideal'] = 'real'
# note: sort sorts the *columns* if they are not aligned, nothing happens with the column data itself
df_all_timings = df_all_timings.append(df_rest_time, sort=True)
return df_all_timings
def combine_detailed_with_gbench_timings_by_name(df_gbench, df_detailed, timing_types={}, **kwargs):
detailed_selection = {}
if len(timing_types) == 0:
raise Exception("Please give some timing_types, otherwise this function is pointless.")
for name, timing_type in timing_types.items():
detailed_selection[name] = df_detailed[df_detailed['timing_type'] == timing_type].drop('timing_type', axis=1)
return combine_split_total_timings(df_gbench, detailed_selection, **kwargs)
In [ ]:
df_split_timings_1633603 = build_comb_df_split_timing_info('../rootbench/1633603.burrell.nikhef.nl.out')
Ok, that works, now to combine them with the json timings.
In [ ]:
dfs_split_timings_1633594__603 = {}
for i in range(1633594, 1633604):
dfs_split_timings_1633594__603[i] = build_comb_df_split_timing_info(f'../rootbench/{i}.burrell.nikhef.nl.out')
Then the json timings:
In [ ]:
dfs_1633594__603 = {}
for i in range(1633594, 1633604):
json_file = glob.glob(f'../rootbench/{i}.burrell.nikhef.nl/RoofitMPworkspaceNumCPUInConfigFile_*.json')
if len(json_file) == 1:
dfs_1633594__603[i] = RooFitMP_analysis.load_result_file(json_file[0], plot_results=False)
else:
print(json_file)
raise Exception("whups")
In [ ]:
dfs_total_timings_1633594__603 = {}
nums = list(range(1, 9)) + [16, 32]
num_from_run = {}
for ix, run in enumerate(range(1633594, 1633604)):
num_from_run[run] = nums[ix]
for run, df in dfs_1633594__603.items():
dfc = df['BM_RooFit_MP_GradMinimizer_workspace_file_NumCPUInConfigFile'].copy()
dfc['NumCPU'] = num_from_run[run]
dfs_total_timings_1633594__603[run] = dfc
Combine them per run, because otherwise the benchmark_numbers won't match (they only count within a run).
In [ ]:
# df_baseline_timings_1633594__603 = pd.concat([df['BM_RooFit_RooMinimizer_workspace_file_NumCPUInConfigFile'] for df in dfs_1633594__603])
# vanilla_t, vanilla_t_std = df_baseline_timings_1633594__603['real_time'].mean() / 1000, df_baseline_timings_1633594__603['real_time'].std() / 1000
In [ ]:
_timing_types = {
'update': 'update state',
'gradient': 'gradient work',
'terminate': 'terminate',
# 'partial derivatives': 'partial derivative'
}
_comb = {}
for i in range(1633594, 1633604):
_comb[i] = combine_detailed_with_gbench_timings_by_name(dfs_total_timings_1633594__603[i],
dfs_split_timings_1633594__603[i],
timing_types=_timing_types,
add_ideal=['gradient'],
exclude_from_rest=['partial derivatives'])
_df = pd.concat(_comb.values())
_g = sns.relplot(data=_df,
x='NumCPU', y='time [s]', style="real or ideal",
hue='timing_type',
markers=True, err_style="bars", legend='full', kind="line")
_g.fig.set_size_inches(10,6)
_g.axes[0,0].set_xlim((0,33))
# linestyle = {'color': 'black', 'lw': 0.7}
# _g.axes[0,0].axhline(vanilla_t, **linestyle)
# _g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
# _g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)
A rather big rest term now! It's linear though...
Could this be the cost of the extra prints we now introduced all over the place? Especially the update_state ones are a lot.
Only one way to really find out: run again without all those added prints. We're not using them yet anyway.
... But we can do a quick estimate as well. A flush costs about 10 microseconds according to the interwebs. The output files of the runs (which each contain 3 repeats) have the following number of lines:
This means that the flushing overhead per minimization is:
In [ ]:
{'32': 5309080/3/100000,
'16': 2647773/3/100000,
'8': 1419130/3/100000,
'1': 345466/3/100000}
Ok, so that's not the dominant term in the rest term, although it may explain the slight upward trend at the end, though we could hardly call that significant.
Let's look at the line search timings as well. There is one line_search print after each gradient in these runs, that's fine. There's also a whole bunch of other line_search prints at the end of the output file which belong to the RooMinimizer run, so we shouldn't include those. We will then only modify the function above to take the line_search lines directly below a gradient output block.
Also, we'll add the queue and worker process update_real timings here.
In [ ]:
def group_timing_lines(bm_iteration_lines):
"""
Group lines (from one benchmark iteration) by gradient call,
specifying:
- Update time on master
- update_real times on queue and workers
- Gradient work time
- For all partial derivatives a sublist of all lines
- After each gradient block, there may be line_search times,
these are also included as a sublist
Finally, the terminate time for the entire bm_iteration is also
returned (last line in the list).
"""
gradient_calls = []
start_indices = []
end_indices = []
line_search_lines = []
update_queue = []
update_worker = []
# flag to check whether we are still in the same gradient call
in_block = False
for ix, line in enumerate(bm_iteration_lines[:-1]): # -1: leave out terminate line
if not in_block and (line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id'):
start_indices.append(ix)
in_block = True
elif 'update_state' in line:
end_indices.append(ix)
in_block = False
# the rest has nothing to do with the gradient call block, so we don't touch in_block there:
elif 'line_search' in line:
line_search_lines.append(line)
elif 'update_real on queue' in line:
update_queue.append(line)
elif 'update_real on worker' in line:
update_worker.append(line)
if len(start_indices) != len(end_indices):
raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
for ix in range(len(start_indices)):
partial_derivatives, job_times = separate_partderi_job_time_lines(bm_iteration_lines[start_indices[ix]:end_indices[ix]])
gradient_calls.append({
'gradient_total': bm_iteration_lines[end_indices[ix]],
'partial_derivatives': partial_derivatives,
'job_times': job_times
})
try:
terminate_line = bm_iteration_lines[-1]
except IndexError:
terminate_line = None
return gradient_calls, line_search_lines, update_queue, update_worker, terminate_line
def build_df_split_timing_run(timing_grouped_lines_list, line_search_lines, update_queue, update_worker, terminate_line):
data = {'time [s]': [], 'timing_type': [], 'worker_id': [], 'task': []}
for gradient_call_timings in timing_grouped_lines_list:
words = gradient_call_timings['gradient_total'].split()
data['time [s]'].append(float(words[4][:-2]))
data['timing_type'].append('update state')
data['worker_id'].append(None)
data['task'].append(None)
data['time [s]'].append(float(words[11][:-1]))
data['timing_type'].append('gradient work')
data['worker_id'].append(None)
data['task'].append(None)
for partial_derivative_line in gradient_call_timings['partial_derivatives']:
words = partial_derivative_line.split()
try:
data['worker_id'].append(words[4][:-1])
data['task'].append(words[6][:-1])
data['time [s]'].append(float(words[10][:-1]))
data['timing_type'].append('partial derivative')
except ValueError as e:
print(words)
raise e
for line_search_line in line_search_lines:
words = line_search_line.split()
data['time [s]'].append(float(words[1][:-1]))
data['timing_type'].append('line search')
data['worker_id'].append(None)
data['task'].append(None)
for line in update_queue:
words = line.split()
data['time [s]'].append(float(words[6][:-1]))
data['timing_type'].append('update_real on queue')
data['worker_id'].append(None)
data['task'].append(None)
for line in update_worker:
words = line.split()
data['time [s]'].append(float(words[7][:-1]))
data['timing_type'].append('update_real on worker')
data['worker_id'].append(int(words[6][:-1]))
data['task'].append(None)
words = terminate_line.split()
data['time [s]'].append(float(words[4][:-1]))
data['timing_type'].append('terminate')
data['worker_id'].append(None)
data['task'].append(None)
return pd.DataFrame(data)
def build_dflist_split_timing_info(fn):
bm_iterations = extract_split_timing_info(fn)
dflist = []
for bm in bm_iterations:
grouped_lines, line_search_lines, update_queue, update_worker, terminate_line = group_timing_lines(bm)
if terminate_line is not None:
dflist.append(build_df_split_timing_run(grouped_lines, line_search_lines, update_queue, update_worker, terminate_line))
return dflist
In [ ]:
dfs_split_timings_1633594__603_v2 = {}
for i in range(1633594, 1633604):
dfs_split_timings_1633594__603_v2[i] = build_comb_df_split_timing_info(f'../rootbench/{i}.burrell.nikhef.nl.out')
In [ ]:
_timing_types = {
'update': 'update state',
'gradient': 'gradient work',
'terminate': 'terminate',
'line search': 'line search',
'update_real on queue': 'update_real on queue',
'update_real on worker': 'update_real on worker'
# 'partial derivatives': 'partial derivative'
}
add_ideal_cols = ['gradient']
_comb = {}
for i in range(1633594, 1633604):
_comb[i] = combine_detailed_with_gbench_timings_by_name(dfs_total_timings_1633594__603[i],
dfs_split_timings_1633594__603_v2[i],
timing_types=_timing_types,
exclude_from_rest=['partial derivatives',
'update_real on queue',
'update_real on worker'])
_df = pd.concat(_comb.values())
for col in add_ideal_cols:
df_ideal = RooFitMP_analysis.add_ideal_timings(_df[_df['timing_type'] == col],
time_col='time [s]', return_ideal=True, ideal_stat='mean')
df_ideal['timing_type'] = col
_df = _df.append(df_ideal, sort=True)
_g = sns.relplot(data=_df,
x='NumCPU', y='time [s]', style="real or ideal",
hue='timing_type',
markers=True, err_style="bars", legend='full', kind="line")
_g.fig.set_size_inches(16,8)
_g.axes[0,0].set_xlim((0,33))
# linestyle = {'color': 'black', 'lw': 0.7}
# _g.axes[0,0].axhline(vanilla_t, **linestyle)
# _g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
# _g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)
Ok, so the line search is not insignificant, but also not the whole story.
The update_real on the queue also grows a bit, but nothing too shocking.
The update_real on the workers is not really useful as a total aggregate, since we're really only interested in the time that each worker individually "wastes", i.e. the time between when the master thinks it's done updating (but then the queue and workers still have to process it) and when the worker starts to do actual work.
Ah, another possibility for the rest term is actually the constant term optimization. We should measure this as well, because if indeed it is so big, it might become interesting to turn it off, or at least set it to 1 instead of 2. The latter option may give a factor 2 slower runs in single core mode, but if it can scale down due to parallelism, it might still become faster in the end!
That still doesn't explain the discrepancy in real vs ideal of the gradient timing though. For that we really need to dive into the absolute timings to find out whether there are significant delays between jobs on the workers.
A simple first tally of some of the output may already tell us some things. For instance, let's look at how much workers spend asking for work when there's nothing left:
for i in {594..603}; do no_work=$(grep "no work" 1633${i}.burrell.nikhef.nl.out | wc -l); job_done=$(grep "job done" 1633${i}.burrell.nikhef.nl.out | wc -l); echo "$i: $job_done / $no_work"; done
The result for these runs is:
594: 34512 / 178
595: 34512 / 586
596: 34512 / 6083
597: 34512 / 33391
598: 34512 / 32171
599: 34512 / 63563
600: 34512 / 87326
601: 34512 / 107505
602: 34512 / 231764
603: 34512 / 684300
That's quite the rise! To see what kind of impact this has exactly, we'd have to dig deeper, though. It may be pretty much harmless, since all it should do is cause a delay in the queue loop for processing actually useful worker messages, but the useful workers are probably not producing useful results every nanosecond, so small delays in their processing may not be that important.
In [ ]:
def extract_split_timing_info(fn):
"""
Group lines by benchmark iteration, starting from migrad until
after the forks have been terminated.
"""
with open(fn, 'r') as fh:
lines = fh.read().splitlines()
bm_iterations = []
start_indices = []
end_indices = []
for ix, line in enumerate(lines):
if 'start migrad' in line:
if 'start migrad' in lines[ix - 1]: # sometimes 'start migrad' appears twice
start_indices.pop()
start_indices.append(ix)
elif 'terminate: ' in line:
end_indices.append(ix)
if len(start_indices) != len(end_indices):
raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
for ix in range(len(start_indices)):
bm_iterations.append(lines[start_indices[ix]:end_indices[ix] + 1])
return bm_iterations
def group_timing_lines(bm_iteration_lines):
"""
Group lines (from one benchmark iteration) by gradient call,
specifying:
- Update time on master
- update_real times on queue and workers
- Gradient work time
- For all partial derivatives a sublist of all lines
- After each gradient block, there may be line_search times,
these are also included as a sublist
Finally, the terminate time for the entire bm_iteration is also
returned (last line in the list).
In each gradient, also timestamps are printed. These are not
further subdivided in this function, but are output as part of
the `gradient_calls` list for further processing elsewhere.
"""
gradient_calls = []
start_indices = []
end_indices = []
line_search_lines = []
update_queue = []
update_worker = []
# flag to check whether we are still in the same gradient call
in_block = False
for ix, line in enumerate(bm_iteration_lines[:-1]): # -1: leave out terminate line
if not in_block and (line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id'):
start_indices.append(ix)
in_block = True
elif 'update_state' in line:
end_indices.append(ix)
in_block = False
# the rest has nothing to do with the gradient call block, so we don't touch in_block there:
elif 'line_search' in line:
line_search_lines.append(line)
elif 'update_real on queue' in line:
update_queue.append(line)
elif 'update_real on worker' in line:
update_worker.append(line)
elif 'start migrad' in line:
start_migrad_line = line
elif 'end migrad' in line:
end_migrad_line = line
if len(start_indices) != len(end_indices):
raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
for ix in range(len(start_indices)):
partial_derivatives, timestamps = separate_partderi_job_time_lines(bm_iteration_lines[start_indices[ix]:end_indices[ix]])
gradient_calls.append({
'gradient_total': bm_iteration_lines[end_indices[ix]],
'partial_derivatives': partial_derivatives,
'timestamps': timestamps
})
try:
terminate_line = bm_iteration_lines[-1]
except IndexError:
terminate_line = None
return gradient_calls, line_search_lines, update_queue, update_worker, terminate_line, start_migrad_line, end_migrad_line
def build_df_stamps(grouped_lines, start_migrad_line, end_migrad_line):
data = {'timestamp': [], 'stamp_type': [], 'worker_id': []}
words = start_migrad_line.split()
data['timestamp'].append(int(words[6]))
data['stamp_type'].append('start migrad')
data['worker_id'].append(None)
NumCPU = int(words[10])
words = end_migrad_line.split()
data['timestamp'].append(int(words[6]))
data['stamp_type'].append('end migrad')
data['worker_id'].append(None)
for gradient_group in grouped_lines:
for line in gradient_group['timestamps']:
if 'no work' in line:
words = line.split()
data['worker_id'].append(int(words[6]))
data['timestamp'].append(int(words[9]))
data['stamp_type'].append('no job - asked')
data['worker_id'].append(int(words[6]))
data['timestamp'].append(int(words[14]))
data['stamp_type'].append('no job - denied')
elif 'job done' in line:
words = line.split()
data['worker_id'].append(int(words[6]))
data['timestamp'].append(int(words[9][:-1]))
data['stamp_type'].append('job done - asked')
data['worker_id'].append(int(words[6]))
data['timestamp'].append(int(words[12]))
data['stamp_type'].append('job done - started')
data['worker_id'].append(int(words[6]))
data['timestamp'].append(int(words[16]))
data['stamp_type'].append('job done - finished')
elif 'update_real' in line:
# discard for now
pass
else:
raise Exception("got a weird line:\n" + line)
return pd.DataFrame(data), NumCPU
def build_dflist_split_timing_info(fn, extract_fcn=extract_split_timing_info):
bm_iterations = extract_fcn(fn)
dflist = []
dflist_stamps = []
NumCPU_list = []
for bm in bm_iterations:
grouped_lines, line_search_lines, update_queue, update_worker, terminate_line, start_migrad_line, end_migrad_line = group_timing_lines(bm)
if terminate_line is not None:
dflist.append(build_df_split_timing_run(grouped_lines, line_search_lines, update_queue, update_worker, terminate_line))
df_stamps, NumCPU = build_df_stamps(grouped_lines, start_migrad_line, end_migrad_line)
dflist_stamps.append(df_stamps)
NumCPU_list.append(NumCPU)
return dflist, dflist_stamps, NumCPU_list
def build_comb_df_split_timing_info(fn, extract_fcn=extract_split_timing_info):
dflist, dflist_stamps, NumCPU_list = build_dflist_split_timing_info(fn, extract_fcn=extract_fcn)
for ix, df in enumerate(dflist):
df_pardiff = df[df["timing_type"] == "partial derivative"]
N_tasks = len(df_pardiff["task"].unique())
N_gradients = len(df_pardiff) // N_tasks
gradient_indices = np.hstack(i * np.ones(N_tasks, dtype='int') for i in range(N_gradients))
df["gradient number"] = pd.Series(dtype='Int64')
df.loc[df["timing_type"] == "partial derivative", "gradient number"] = gradient_indices
dflist_stamps[ix]["gradient number"] = pd.Series(dtype='Int64')
dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - asked", "gradient number"] = gradient_indices
dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - started", "gradient number"] = gradient_indices
dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - finished", "gradient number"] = gradient_indices
df["benchmark_number"] = ix
dflist_stamps[ix]["benchmark_number"] = ix
# assuming the stamps are ordered properly, which I'm pretty sure is correct,
# we can do ffill:
df_stamps = pd.concat(dflist_stamps)
df_stamps.loc[~df_stamps['stamp_type'].str.contains('migrad'), 'gradient number'] = df_stamps.loc[~df_stamps['stamp_type'].str.contains('migrad'), 'gradient number'].fillna(method='ffill')
return pd.concat(dflist), df_stamps
In [ ]:
df_split_timings_1633594_v3, df_stamps_1633594 = build_comb_df_split_timing_info('../rootbench/1633594.burrell.nikhef.nl.out')
In [ ]:
df_stamps_1633594[df_stamps_1633594['stamp_type'].str.contains('no job')]
In [ ]:
def load_acat19_out(fn):
"""
Just single migrad runs, so no need for further splitting by run.
Does add dummy terminate line, because the other functions expect
this. Also dummy start and end migrad lines.
"""
with open(fn, 'r') as fh:
lines = fh.read().splitlines()
print(lines[-1])
lines = ['[#0] DEBUG: -- start migrad at 0 with NumCPU = 0'] + lines
lines.append('[#0] DEBUG: -- end migrad at 0')
lines.append('[#0] DEBUG: -- terminate: 0.0s')
return [lines]
In [ ]:
fn_acat19_1552415151 = '/Users/pbos/projects/apcocsm/code/acat19_1552415151.out'
df_split_timings_acat19_1552415151, df_stamps_acat19_1552415151 = build_comb_df_split_timing_info(fn_acat19_1552415151,
extract_fcn=load_acat19_out)
In [ ]:
df_split_timings_acat19_1552415151.groupby('timing_type')['time [s]'].sum()
In [ ]:
579.161-536.84-15.586-1.225684
That's good, at least the rest term is constant. This is important, because in reality, this fit will take about an hour, so 25 seconds are acceptable overhead.
In [ ]:
df_split_timings_1633594_v3.groupby(['benchmark_number', 'timing_type'])['time [s]'].sum()
In [ ]:
536.840400/216.742200, 15.586356/5.496111
Indeed, the other terms scale by a factor of about 2.5 to 3, while the rest term remains constant.
In [ ]:
dfs_baseline_timings_1633594__603 = {}
for run, df in dfs_1633594__603.items():
dfc = df['BM_RooFit_RooMinimizer_workspace_file_NumCPUInConfigFile'].copy()
dfc['NumCPU'] = 1
dfs_baseline_timings_1633594__603[run] = dfc
df_baseline_timings_1633594__603 = pd.concat(dfs_baseline_timings_1633594__603.values())
In [ ]:
vanilla_t, vanilla_t_std = df_baseline_timings_1633594__603['real_time'].mean() / 1000, df_baseline_timings_1633594__603['real_time'].std() / 1000
In [ ]:
_timing_types = {
'update': 'update state',
'gradient': 'gradient work',
'terminate': 'terminate',
'line search': 'line search',
# 'update_real on queue': 'update_real on queue',
# 'update_real on worker': 'update_real on worker'
# 'partial derivatives': 'partial derivative'
}
add_ideal_cols = ['gradient']
_comb = {}
for i in range(1633594, 1633604):
_comb[i] = combine_detailed_with_gbench_timings_by_name(dfs_total_timings_1633594__603[i],
dfs_split_timings_1633594__603_v2[i],
timing_types=_timing_types,
exclude_from_rest=['partial derivatives',
'update_real on queue',
'update_real on worker'])
_df = pd.concat(_comb.values())
# for col in add_ideal_cols:
# df_ideal = RooFitMP_analysis.add_ideal_timings(_df[_df['timing_type'] == col],
# time_col='time [s]', return_ideal=True, ideal_stat='mean')
# df_ideal['timing_type'] = col
# _df = _df.append(df_ideal, sort=True)
# _g = sns.relplot(data=_df,
# x='NumCPU', y='time [s]', style="real or ideal",
# hue='timing_type',
# markers=True,
# err_style="bars",
# legend='full', kind="line")
_g = sns.catplot(data=_df,
x='NumCPU', y='time [s]', style="real or ideal",
hue='timing_type',
# markers=True,
# err_style="bars",
legend='full', kind="point")
_g.fig.set_size_inches(16,8)
# _g.axes[0,0].set_xlim((0,33))
linestyle = {'color': 'black', 'lw': 0.7}
_g.axes[0,0].axhline(vanilla_t, **linestyle)
_g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
_g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)
In [ ]:
(dfs_total_timings_1633594__603[1633594]['real_time'] / 1000).mean(), vanilla_t
In [ ]:
fig, ax = plt.subplots(1, 1, sharey=True, figsize=(14, 8))
_timing_types = ['gradient', 'line search', 'update', 'terminate', 'rest']
colors = plt.cm.get_cmap('tab10', 10)
first_nc = True
for nc in _df['NumCPU'].unique():
prev_time = 0
for ix_ttype, ttype in enumerate(_timing_types):
time_mu = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
time_std = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].std()
if first_nc:
label = ttype
else:
label = ""
ax.bar(str(nc), time_mu, bottom=prev_time, color=colors(ix_ttype), label=label)
prev_time += float(time_mu)
first_nc = False
plt.legend()
In [ ]:
_df_speedup = {'NumCPU': [], 'timing_type': [], 'speedup': []}
for ttype in _df['timing_type'].unique():
mean_single_core_time = _df[(_df['NumCPU'] == 1) & (_df['timing_type'] == ttype)]['time [s]'].mean()
for nc in _df['NumCPU'].unique():
mean_time = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
_df_speedup['NumCPU'].append(nc)
_df_speedup['timing_type'].append(ttype)
_df_speedup['speedup'].append(mean_single_core_time / mean_time)
_df_speedup = pd.DataFrame(_df_speedup)
In [ ]:
with sns.axes_style('whitegrid'):
_g = sns.catplot(data=_df_speedup, x='NumCPU', y='speedup', hue='timing_type', kind='bar')
_g.fig.set_size_inches(14,8)
In [ ]:
_st = df_stamps_1633594
_st_b0 = _st[_st['benchmark_number'] == 0]
In [ ]:
start_stamps = _st[_st['stamp_type'] == 'start migrad']
start_stamps
# _st[~_st['stamp_type'].str.contains('migrad')]
In [ ]:
end_stamps = _st[_st['stamp_type'] == 'end migrad']
end_stamps
In [ ]:
start_st0 = _st_b0[_st_b0['stamp_type'] == 'start migrad']['timestamp'].iloc[0]
end_st0 = _st_b0[_st_b0['stamp_type'] == 'end migrad']['timestamp'].iloc[0]
In [ ]:
(end_st0 - start_st0)/1.e9
In [ ]:
_st_b0[~_st_b0['stamp_type'].str.contains('migrad')].head()
We can get two kinds of worker-loop overhead out of the "job done" stamps: "explicit" overhead in the time between asked and started and "implicit" overhead in the time between a finished job and the asking of a next one.
In [ ]:
_st_b0_jd = _st_b0[_st_b0['stamp_type'].str.contains('job done')]
In [ ]:
_st_b0_jd_ask = _st_b0_jd[_st_b0_jd['stamp_type'].str.contains('asked')]
_st_b0_jd_sta = _st_b0_jd[_st_b0_jd['stamp_type'].str.contains('started')]
_st_b0_jd_fin = _st_b0_jd[_st_b0_jd['stamp_type'].str.contains('finished')]
Explicit:
In [ ]:
(_st_b0_jd_sta.reset_index()['timestamp'] - _st_b0_jd_ask.reset_index()['timestamp']).sum()/1.e9
Implicit:
In [ ]:
(_st_b0_jd_ask.iloc[1:].reset_index()['timestamp'] - _st_b0_jd_fin.iloc[:-1].reset_index()['timestamp']).sum()/1.e9
Does this sum together with the actual partial derivatives to the total gradient time?
In [ ]:
_df_split = dfs_split_timings_1633594__603_v2[1633594]
_pd_time = _df_split[(_df_split["timing_type"] == "partial derivative")
& (_df_split["benchmark_number"] == 0)]['time [s]'].sum()
_grad_time = _df_split[(_df_split["timing_type"] == "gradient work")
& (_df_split["benchmark_number"] == 0)]['time [s]'].sum()
_grad_time, _pd_time
Oh, it's too much... wait, probably the times between gradients are longer, so they should be filtered out first.
In [ ]:
impl_overhead = 0
for g in _st_b0_jd_ask['gradient number'].unique():
_ask_g = _st_b0_jd_ask[_st_b0_jd_ask['gradient number'] == g]
_fin_g = _st_b0_jd_fin[_st_b0_jd_fin['gradient number'] == g]
impl_overhead += (_ask_g.iloc[1:].reset_index()['timestamp'] - _fin_g.iloc[:-1].reset_index()['timestamp']).sum()/1.e9
impl_overhead
In [ ]:
216.74220000000003 - (0.779148892 + 1.015019575 + 212.982649869)
In [ ]:
_df_split = dfs_split_timings_1633594__603_v2[1633594]
_pdtime = _df_split[_df_split['timing_type'] == 'partial derivative']['time [s]']
# fig, ax = plt.subplots(1, 1, figsize=(14,8))
_g = sns.distplot(_pdtime, kde=False)#, ax=ax)
_g.set_yscale('log')
In [ ]:
sns.reset_orig()
sns.set_context('talk')
sns.set_style('whitegrid')
In [ ]:
_df = combine_detailed_with_gbench_timings_by_name(df_total_timings_1538069,
df_split_timings_1538069,
{'update': 'update state',
'gradient': 'gradient work',
'terminate': 'terminate'})
fig, ax = plt.subplots(1, 1, sharey=True, figsize=(14, 8), dpi=200)
_timing_types = ['gradient', 'update', 'terminate', 'rest']
colors = plt.cm.get_cmap('tab10', 10)
lighten = lambda c: min(0.3*c, 1)
first_nc = True
for nc in _df['NumCPU'].unique():
prev_time = 0
for ix_ttype, ttype in enumerate(_timing_types):
time_mu = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
time_std = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].std()
if first_nc:
label = ttype
else:
label = ""
color = colors(ix_ttype)
ecolor = (lighten(color[0]), lighten(color[1]), lighten(color[2]), 0.8)
ax.bar(str(nc), time_mu, bottom=prev_time, color=color, label=label,
yerr=time_std, ecolor=ecolor, capsize=5)
prev_time += float(time_mu)
first_nc = False
plt.legend()
ax.set_xlabel('workers')
ax.set_ylabel('time [s]')
_vanilla_t, _vanilla_t_std = df_baseline_timings_1538069['real_time'].mean() / 1000, df_baseline_timings_1538069['real_time'].std() / 1000
linestyle = {'color': 'black', 'lw': 0.7}
ax.axhline(_vanilla_t, **linestyle)
ax.axhline(_vanilla_t - _vanilla_t_std, alpha=0.5, **linestyle)
ax.axhline(_vanilla_t + _vanilla_t_std, alpha=0.5, **linestyle)
In [ ]:
_df_speedup = {'NumCPU': [], 'timing type': [], 'mean speedup': []}
ttypes = _df['timing_type'].unique()
for ttype in ttypes:
mean_single_core_time = _df[(_df['NumCPU'] == 1) & (_df['timing_type'] == ttype)]['time [s]'].mean()
for nc in _df['NumCPU'].unique():
mean_time = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
_df_speedup['NumCPU'].append(nc)
_df_speedup['timing type'].append(ttype)
_df_speedup['mean speedup'].append(mean_single_core_time / mean_time)
_df_speedup = pd.DataFrame(_df_speedup)
_g = sns.catplot(data=_df_speedup, x='NumCPU', y='mean speedup', hue='timing type', kind='point', palette='tab10', hue_order=_timing_types + ['total'])
_g.fig.set_size_inches(11,8)
_g.fig.set_dpi(200)
_g.axes[0,0].set_xlabel('workers')
In [ ]:
_timing_types = {
'update': 'update state',
'gradient': 'gradient work',
'terminate': 'terminate',
'line search': 'line search',
}
add_ideal_cols = ['gradient']
_comb = {}
for i in range(1633594, 1633604):
_comb[i] = combine_detailed_with_gbench_timings_by_name(dfs_total_timings_1633594__603[i],
dfs_split_timings_1633594__603_v2[i],
timing_types=_timing_types,
exclude_from_rest=['partial derivatives',
'update_real on queue',
'update_real on worker'])
_df = pd.concat(_comb.values())
fig, ax = plt.subplots(1, 1, sharey=True, figsize=(14, 8), dpi=200)
_timing_types = ['gradient', 'line search', 'update', 'terminate', 'rest']
colors = plt.cm.get_cmap('tab10', 10)
lighten = lambda c: min(0.3*c, 1)
first_nc = True
for nc in _df['NumCPU'].unique():
prev_time = 0
for ix_ttype, ttype in enumerate(_timing_types):
time_mu = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
time_std = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].std()
if first_nc:
label = ttype
else:
label = ""
color = colors(ix_ttype)
ecolor = (lighten(color[0]), lighten(color[1]), lighten(color[2]), 0.8)
ax.bar(str(nc), time_mu, bottom=prev_time, color=color, label=label,
yerr=time_std, ecolor=ecolor, capsize=5)
prev_time += float(time_mu)
first_nc = False
plt.legend()
ax.set_xlabel('workers')
ax.set_ylabel('time [s]')
# _vanilla_t, _vanilla_t_std = df_baseline_timings_1633594__603['real_time'].mean() / 1000, df_baseline_timings_1633594__603['real_time'].std() / 1000
# linestyle = {'color': 'black', 'lw': 0.7}
# ax.axhline(_vanilla_t, **linestyle)
# ax.axhline(_vanilla_t - _vanilla_t_std, alpha=0.5, **linestyle)
# ax.axhline(_vanilla_t + _vanilla_t_std, alpha=0.5, **linestyle)
In [ ]:
_df_speedup = {'NumCPU': [], 'timing type': [], 'speedup': []}
ttypes = _df['timing_type'].unique()
for ttype in ttypes:
mean_single_core_time = _df[(_df['NumCPU'] == 1) & (_df['timing_type'] == ttype)]['time [s]'].mean()
for nc in _df['NumCPU'].unique():
# mean_time = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
times = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]']
for time in times:
_df_speedup['NumCPU'].append(nc)
_df_speedup['timing type'].append(ttype)
_df_speedup['speedup'].append(mean_single_core_time / float(time))
_df_speedup = pd.DataFrame(_df_speedup)
_g = sns.catplot(data=_df_speedup, x='NumCPU', y='speedup', hue='timing type',
kind='point', palette='tab10', hue_order=_timing_types + ['total'])
_g.fig.set_size_inches(11,8)
_g.fig.set_dpi(200)
_g.axes[0,0].set_xlabel('workers')
Extrapolate total time to longer run times: 10 minutes, 1 hour, 2 hours.
In [ ]:
# in the actual run
total_runtime = _df[(_df['NumCPU'] == 1) & (_df['timing_type'] == 'total')]['time [s]'].mean() # seconds
gradient = _df[(_df['NumCPU'] == 1) & (_df['timing_type'] == 'gradient')]['time [s]'].mean()
total_overhead = total_runtime - gradient
_df_total_times = _df[(_df['timing_type'] == 'total')]
_df_total_times['timing_type'] = 'total (~4min)'
# extrapolate gradient timings
_df_extrapolate_10min = {'NumCPU': [], 'timing_type': [], 'time [s]': []}
extrap_factor_10min = (10 * 60 - total_overhead) / gradient
for nc in _df['NumCPU'].unique():
times = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == 'gradient')]['time [s]']
for time in times:
_df_extrapolate_10min['NumCPU'].append(nc)
_df_extrapolate_10min['timing_type'].append('total (10min)')
_df_extrapolate_10min['time [s]'].append(float(time) * extrap_factor_10min + total_overhead)
_df_extrapolate_10min = pd.DataFrame(_df_extrapolate_10min)
_df_extrapolate_1h = {'NumCPU': [], 'timing_type': [], 'time [s]': []}
extrap_factor_1h = (60 * 60 - total_overhead) / gradient
for nc in _df['NumCPU'].unique():
times = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == 'gradient')]['time [s]']
for time in times:
_df_extrapolate_1h['NumCPU'].append(nc)
_df_extrapolate_1h['timing_type'].append('total (1h)')
_df_extrapolate_1h['time [s]'].append(float(time) * extrap_factor_1h + total_overhead)
_df_extrapolate_1h = pd.DataFrame(_df_extrapolate_1h)
_df_extrapolate_2h = {'NumCPU': [], 'timing_type': [], 'time [s]': []}
extrap_factor_2h = (2 * 60 * 60 - total_overhead) / gradient
for nc in _df['NumCPU'].unique():
times = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == 'gradient')]['time [s]']
for time in times:
_df_extrapolate_2h['NumCPU'].append(nc)
_df_extrapolate_2h['timing_type'].append('total (2h)')
_df_extrapolate_2h['time [s]'].append(float(time) * extrap_factor_2h + total_overhead)
_df_extrapolate_2h = pd.DataFrame(_df_extrapolate_2h)
In [ ]:
_df_extr_speedup = {'NumCPU': [], 'timing type': [], 'speedup': []}
for extr_time, _df_extr in {'true run': _df_total_times,
'10min': _df_extrapolate_10min,
'1h': _df_extrapolate_1h,
'2h': _df_extrapolate_2h,
}.items():
ttypes = _df_extr['timing_type'].unique()
for ttype in ttypes:
mean_single_core_time = _df_extr[(_df_extr['NumCPU'] == 1) & (_df_extr['timing_type'] == ttype)]['time [s]'].mean()
for nc in _df_extr['NumCPU'].unique():
times = _df_extr[(_df_extr['NumCPU'] == nc) & (_df_extr['timing_type'] == ttype)]['time [s]']
for time in times:
_df_extr_speedup['NumCPU'].append(nc)
_df_extr_speedup['timing type'].append(ttype)
_df_extr_speedup['speedup'].append(mean_single_core_time / float(time))
_df_extr_speedup = pd.DataFrame(_df_extr_speedup)
_g = sns.catplot(data=_df_extr_speedup, x='NumCPU', y='speedup', hue='timing type',
kind='point', palette='tab10')
_g.fig.set_size_inches(11,8)
_g.fig.set_dpi(200)
_g.axes[0,0].set_xlabel('workers')
In [ ]:
_st = df_stamps_1633594
Let's try to just plot the timeline to visualize the load on the worker during the whole run.
In [ ]:
_sta = _st_b0_jd_sta['timestamp'] - start_st0
_fin = _st_b0_jd_fin['timestamp'] - start_st0
fig, ax = plt.subplots(1, 1, figsize=(20, 3))
for ix in range(len(_sta)):
# for ix in range(2000):
ax.barh('worker 0', _fin.iloc[ix] - _sta.iloc[ix], left=_sta.iloc[ix],
color='red', linewidth=0)
ax.set_xlim((0, end_st0 - start_st0))
In [ ]:
len(_fin), len(_sta)
Goddamn, that takes forever to complete. Seems like matplotlib cannot handle too much bars at the same time.
But ok, no shockers here.
Now for a multi-worker run:
In [ ]:
_, df_stamps_1633603 = build_comb_df_split_timing_info('../rootbench/1633603.burrell.nikhef.nl.out')
In [ ]:
_, df_stamps_1633602 = build_comb_df_split_timing_info('../rootbench/1633602.burrell.nikhef.nl.out')
In [ ]:
_, df_stamps_1633601 = build_comb_df_split_timing_info('../rootbench/1633601.burrell.nikhef.nl.out')
In [ ]:
_bench_nr = (df_stamps_1633601['benchmark_number'] == 0)
_start_st = df_stamps_1633601[_bench_nr
& (df_stamps_1633601['stamp_type'] == 'start migrad')
]['timestamp'].iloc[0]
_end_st = df_stamps_1633601[_bench_nr
& (df_stamps_1633601['stamp_type'] == 'end migrad')
]['timestamp'].iloc[0]
_sta = df_stamps_1633601[_bench_nr
& (df_stamps_1633601['stamp_type'].str.contains('started'))
]#['timestamp'] - _start_st
_fin = df_stamps_1633601[_bench_nr
& (df_stamps_1633601['stamp_type'].str.contains('finished'))
]#['timestamp'] - _start_st
In [ ]:
len(_sta), len(_fin), _end_st, _start_st
In [ ]:
fig, ax = plt.subplots(1, 1, figsize=(20, 5))
ax.set_ylabel('worker')
ax.set_xlim((0, _end_st - _start_st))
batches = int(len(_sta)/1000) + 1 # get aruond matplotlib limits?
for batch in range(batches):
for ix in range(batch * 1000, (batch + 1) * 1000):
worker = _fin.iloc[ix]["worker_id"]
ax.barh(worker,
_fin.iloc[ix]["timestamp"] - _sta.iloc[ix]["timestamp"],
left=_sta.iloc[ix]["timestamp"] - _start_st,
color='red', linewidth=0)
In [ ]:
import matplotlib as mpl
mpl.use('qt5Agg')
%matplotlib inline
In [ ]:
batch_size = 2000
batches = int(len(_sta)/batch_size) + 1 # get aruond matplotlib limits?
for batch in range(batches):
fig, ax = plt.subplots(1, 1, figsize=(20, 5))
ax.set_ylabel('worker')
ax.set_xlim((0, _end_st - _start_st))
for ix in range(min(batch * batch_size, len(_sta)),
min((batch + 1) * batch_size, len(_sta))):
worker = _fin.iloc[ix]["worker_id"]
ax.barh(worker,
_fin.iloc[ix]["timestamp"] - _sta.iloc[ix]["timestamp"],
left=_sta.iloc[ix]["timestamp"] - _start_st,
color='red', linewidth=0)
In [ ]:
for grad_nr in df_stamps_1633601['gradient number'].unique():
_grad_nr = (df_stamps_1633601['gradient number'] == grad_nr)
_sta = df_stamps_1633601[_bench_nr & _grad_nr
& (df_stamps_1633601['stamp_type'].str.contains('started'))
]#['timestamp'] - _start_st
_fin = df_stamps_1633601[_bench_nr & _grad_nr
& (df_stamps_1633601['stamp_type'].str.contains('finished'))
]#['timestamp'] - _start_st
fig, ax = plt.subplots(1, 1, figsize=(20, 5))
ax.set_ylabel('worker')
# ax.set_xlim((0, _end_st - _start_st))
for ix in range(len(_sta)):
worker = _fin.iloc[ix]["worker_id"]
ax.barh(worker,
_fin.iloc[ix]["timestamp"] - _sta.iloc[ix]["timestamp"],
left=_sta.iloc[ix]["timestamp"] - _start_st,
color='red', linewidth=0)
In [ ]:
_st = df_stamps_1633601
_st_b0 = _st[_st['benchmark_number'] == 0]
_jd = _st_b0['stamp_type'].str.contains('job done')
_st_b0_jd_ask = _st_b0[_jd & _st_b0['stamp_type'].str.contains('asked')]
_st_b0_jd_sta = _st_b0[_jd & _st_b0['stamp_type'].str.contains('started')]
_st_b0_jd_fin = _st_b0[_jd & _st_b0['stamp_type'].str.contains('finished')]
explicit_delay = 0
implicit_delay = 0
for g in _st_b0['gradient number'].unique():
for w in _st_b0['worker_id'].unique():
_sta_g = _st_b0_jd_sta[(_st_b0_jd_sta['gradient number'] == g) & (_st_b0_jd_sta['worker_id'] == w)]
_ask_g = _st_b0_jd_ask[(_st_b0_jd_ask['gradient number'] == g) & (_st_b0_jd_ask['worker_id'] == w)]
_fin_g = _st_b0_jd_fin[(_st_b0_jd_fin['gradient number'] == g) & (_st_b0_jd_fin['worker_id'] == w)]
explicit_delay += (_sta_g.reset_index()['timestamp'] - _ask_g.reset_index()['timestamp']).sum()/1.e9
implicit_delay += (_ask_g.iloc[1:].reset_index()['timestamp'] - _fin_g.iloc[:-1].reset_index()['timestamp']).sum()/1.e9
explicit_delay, implicit_delay
In [ ]:
_st = df_stamps_1633602
_st_b0 = _st[_st['benchmark_number'] == 0]
_jd = _st_b0['stamp_type'].str.contains('job done')
_st_b0_jd_ask = _st_b0[_jd & _st_b0['stamp_type'].str.contains('asked')]
_st_b0_jd_sta = _st_b0[_jd & _st_b0['stamp_type'].str.contains('started')]
_st_b0_jd_fin = _st_b0[_jd & _st_b0['stamp_type'].str.contains('finished')]
explicit_delay = 0
implicit_delay = 0
for g in _st_b0['gradient number'].unique():
for w in _st_b0['worker_id'].unique():
_sta_g = _st_b0_jd_sta[(_st_b0_jd_sta['gradient number'] == g) & (_st_b0_jd_sta['worker_id'] == w)]
_ask_g = _st_b0_jd_ask[(_st_b0_jd_ask['gradient number'] == g) & (_st_b0_jd_ask['worker_id'] == w)]
_fin_g = _st_b0_jd_fin[(_st_b0_jd_fin['gradient number'] == g) & (_st_b0_jd_fin['worker_id'] == w)]
explicit_delay += (_sta_g.reset_index()['timestamp'] - _ask_g.reset_index()['timestamp']).sum()/1.e9
implicit_delay += (_ask_g.iloc[1:].reset_index()['timestamp'] - _fin_g.iloc[:-1].reset_index()['timestamp']).sum()/1.e9
explicit_delay, implicit_delay
In [ ]:
_st = df_stamps_1633603
_st_b0 = _st[_st['benchmark_number'] == 0]
_jd = _st_b0['stamp_type'].str.contains('job done')
_st_b0_jd_ask = _st_b0[_jd & _st_b0['stamp_type'].str.contains('asked')]
_st_b0_jd_sta = _st_b0[_jd & _st_b0['stamp_type'].str.contains('started')]
_st_b0_jd_fin = _st_b0[_jd & _st_b0['stamp_type'].str.contains('finished')]
explicit_delay = 0
implicit_delay = 0
for g in _st_b0['gradient number'].unique():
for w in _st_b0['worker_id'].unique():
_sta_g = _st_b0_jd_sta[(_st_b0_jd_sta['gradient number'] == g) & (_st_b0_jd_sta['worker_id'] == w)]
_ask_g = _st_b0_jd_ask[(_st_b0_jd_ask['gradient number'] == g) & (_st_b0_jd_ask['worker_id'] == w)]
_fin_g = _st_b0_jd_fin[(_st_b0_jd_fin['gradient number'] == g) & (_st_b0_jd_fin['worker_id'] == w)]
explicit_delay += (_sta_g.reset_index()['timestamp'] - _ask_g.reset_index()['timestamp']).sum()/1.e9
implicit_delay += (_ask_g.iloc[1:].reset_index()['timestamp'] - _fin_g.iloc[:-1].reset_index()['timestamp']).sum()/1.e9
explicit_delay, implicit_delay
That's all not that spectacular, except the 32 worker run, but that might also be due to actually having 34 processes to run (incl queue and master).
Let's now sum up the delays caused by having to wait for one of the workers at the end + the corresponding effect at the beginning (the latter effect is a lot smaller, the workers start more synchronously than they end).
There are several ways you can measure the impact of these worker idle times.
In [ ]:
_st = df_stamps_1633601
for b in _st['benchmark_number'].dropna().unique():
_st_b0 = _st[_st['benchmark_number'] == b]
_jd = _st_b0['stamp_type'].str.contains('job done')
_st_b0_jd_ask = _st_b0[_jd & _st_b0['stamp_type'].str.contains('asked')]
_st_b0_jd_sta = _st_b0[_jd & _st_b0['stamp_type'].str.contains('started')]
_st_b0_jd_fin = _st_b0[_jd & _st_b0['stamp_type'].str.contains('finished')]
start_delay = 0
finish_delay = 0
for g in _st_b0['gradient number'].dropna().unique():
min_start = _st_b0_jd_sta[(_st_b0_jd_sta['gradient number'] == g)]['timestamp'].min()
max_finish = _st_b0_jd_fin[(_st_b0_jd_fin['gradient number'] == g)]['timestamp'].max()
for w in _st_b0['worker_id'].dropna().unique():
start_delay += (_st_b0_jd_sta[(_st_b0_jd_sta['gradient number'] == g)
& (_st_b0_jd_sta['worker_id'] == w)
]['timestamp'].min() - min_start) / 1.e9
finish_delay += (max_finish - _st_b0_jd_fin[(_st_b0_jd_fin['gradient number'] == g)
& (_st_b0_jd_fin['worker_id'] == w)
]['timestamp'].max()) / 1.e9
print('benchmark', b, 'with', len(_st_b0['worker_id'].dropna().unique()), 'workers')
print('start delay: ', start_delay)
print('finish delay:', finish_delay)
print('total delay: ', start_delay + finish_delay)
print('time lost due to imperfect load balancing: ', (start_delay + finish_delay) / len(_st_b0['worker_id'].dropna().unique()))
Ok, so that is negligible.
Suggestion by Wouter: compare the partial derivative wall time with the CPU time. This way we can find out whether something is really being delayed in the calculation itself, or whether we should look elsewhere.
Did another acat19.cpp run including CPU timing, now with 8 workers and without the mu = 1.5
line.
Have to again modify the functions for this.
In [ ]:
def build_df_split_timing_run(timing_grouped_lines_list, line_search_lines, update_queue, update_worker, terminate_line):
data = {'time [s]': [], 'timing_type': [], 'worker_id': [], 'task': []}
for gradient_call_timings in timing_grouped_lines_list:
words = gradient_call_timings['gradient_total'].split()
data['time [s]'].append(float(words[4][:-2]))
data['timing_type'].append('update state')
data['worker_id'].append(None)
data['task'].append(None)
data['time [s]'].append(float(words[11][:-1]))
data['timing_type'].append('gradient work')
data['worker_id'].append(None)
data['task'].append(None)
for partial_derivative_line in gradient_call_timings['partial_derivatives']:
words = partial_derivative_line.split()
try:
data['worker_id'].append(words[4][:-1])
data['task'].append(words[6][:-1])
data['time [s]'].append(float(words[10][:-1]))
data['timing_type'].append('partial derivative')
if len(words) > 13:
data['worker_id'].append(words[4][:-1])
data['task'].append(words[6][:-1])
data['time [s]'].append(float(words[13][:-1]))
data['timing_type'].append('partial derivative CPU time')
except ValueError as e:
print(words)
raise e
for line_search_line in line_search_lines:
words = line_search_line.split()
data['time [s]'].append(float(words[1][:-1]))
data['timing_type'].append('line search')
data['worker_id'].append(None)
data['task'].append(None)
for line in update_queue:
words = line.split()
data['time [s]'].append(float(words[6][:-1]))
data['timing_type'].append('update_real on queue')
data['worker_id'].append(None)
data['task'].append(None)
for line in update_worker:
words = line.split()
data['time [s]'].append(float(words[7][:-1]))
data['timing_type'].append('update_real on worker')
data['worker_id'].append(int(words[6][:-1]))
data['task'].append(None)
words = terminate_line.split()
data['time [s]'].append(float(words[4][:-1]))
data['timing_type'].append('terminate')
data['worker_id'].append(None)
data['task'].append(None)
return pd.DataFrame(data)
In [ ]:
fn_acat19_1552664372 = '/Users/pbos/projects/apcocsm/code/acat19_1552664372.out'
df_split_timings_acat19_1552664372, df_stamps_acat19_1552664372 = build_comb_df_split_timing_info(fn_acat19_1552664372,
extract_fcn=load_acat19_out)
In [ ]:
df_split_timings_acat19_1552664372[df_split_timings_acat19_1552664372['timing_type'] == 'partial derivative']['time [s]'].sum(),\
df_split_timings_acat19_1552664372[df_split_timings_acat19_1552664372['timing_type'] == 'partial derivative CPU time']['time [s]'].sum()
Those are almost completely equal, which is good news: we don't have to look anywhere else, like in scheduling or something. There is really something strange going on in the partial derivatives themselves that makes their total time grow.
Note that these numbers in absolute sense are not comparable to the earlier big fit runs, because those were on the schudeled nodes, and this was run on the stoomboot headnode! Indeed, that seems to have 2400 MHz cores, while google benchmark reports 1996 MHz (weird, but ok) for the compute nodes.
We may be onto the culprit. Put timing around the SetInitialGradient function...
Shit, nope. See the acat19_1552671011.out
file. Three times ~0.0003 seconds. That's nothing. In hindsight, it makes sense: SetInitialGradient never calls the function (the likelihood), so it's not at all expensive.
Wouter suspected that somehow Simplex gets run somewhere before Migrad, but I haven't been able to find it anywhere... at least not starting from RooMinimizer::migrad
. Maybe it is somewhere else, perhaps somewhere in the Migrad/Minuit setup.
We could time the Migrad Seed creation to be sure...
But maybe it's easier to just throw in some timestamps, then at least we can pin down where in migrad things are happening.
In [ ]:
def group_timing_lines(bm_iteration_lines):
"""
Group lines (from one benchmark iteration) by gradient call,
specifying:
- Update time on master
- update_real times on queue and workers
- Gradient work time
- For all partial derivatives a sublist of all lines
- After each gradient block, there may be line_search times,
these are also included as a sublist
Finally, the terminate time for the entire bm_iteration is also
returned (last line in the list).
In each gradient, also timestamps are printed. These are not
further subdivided in this function, but are output as part of
the `gradient_calls` list for further processing elsewhere.
"""
gradient_calls = []
start_indices = []
end_indices = []
line_search_lines = []
update_queue = []
update_worker = []
# flag to check whether we are still in the same gradient call
in_block = False
for ix, line in enumerate(bm_iteration_lines[:-1]): # -1: leave out terminate line
if not in_block and (line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id'):
start_indices.append(ix)
in_block = True
elif 'update_state' in line:
end_indices.append(ix)
in_block = False
# the rest has nothing to do with the gradient call block, so we don't touch in_block there:
elif 'line_search' in line:
line_search_lines.append(line)
elif 'update_real on queue' in line:
update_queue.append(line)
elif 'update_real on worker' in line:
update_worker.append(line)
elif 'start migrad' in line:
start_migrad_line = line
elif 'end migrad' in line:
end_migrad_line = line
if len(start_indices) != len(end_indices):
raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
for ix in range(len(start_indices)):
partial_derivatives, timestamps = separate_partderi_job_time_lines(bm_iteration_lines[start_indices[ix]:end_indices[ix]])
gradient_calls.append({
'gradient_total': bm_iteration_lines[end_indices[ix]],
'partial_derivatives': partial_derivatives,
'timestamps': timestamps
})
try:
terminate_line = bm_iteration_lines[-1]
except IndexError:
terminate_line = None
special_lines = dict(terminate_line=terminate_line,
start_migrad_line=start_migrad_line,
end_migrad_line=end_migrad_line)
return gradient_calls, line_search_lines, update_queue, update_worker, special_lines
def build_df_stamps(grouped_lines, special_lines, update_queue, update_worker):
data = {'timestamp': [], 'stamp_type': [], 'worker_id': []}
words = special_lines["start_migrad_line"].split()
shift = 3 if '[#' in words[0] else 0
data['timestamp'].append(int(words[3 + shift]))
data['stamp_type'].append('start migrad')
data['worker_id'].append(None)
if len(words) > 10:
NumCPU = int(words[10])
else:
NumCPU = 0
words = special_lines["end_migrad_line"].split()
shift = 3 if '[#' in words[0] else 0
data['timestamp'].append(int(words[3 + shift]))
data['stamp_type'].append('end migrad')
data['worker_id'].append(None)
for gradient_group in grouped_lines:
for line in gradient_group['timestamps']:
if 'no work' in line:
words = line.split()
shift = 3 if '[#' in words[0] else 0
data['worker_id'].append(int(words[3 + shift]))
data['timestamp'].append(int(words[6 + shift]))
data['stamp_type'].append('no job - asked')
data['worker_id'].append(int(words[3 + shift]))
data['timestamp'].append(int(words[11 + shift]))
data['stamp_type'].append('no job - denied')
elif 'job done' in line:
words = line.split()
shift = 3 if '[#' in words[0] else 0
data['worker_id'].append(int(words[3 + shift]))
data['timestamp'].append(int(words[6 + shift][:-1]))
data['stamp_type'].append('job done - asked')
data['worker_id'].append(int(words[3 + shift]))
data['timestamp'].append(int(words[9 + shift]))
data['stamp_type'].append('job done - started')
data['worker_id'].append(int(words[3 + shift]))
data['timestamp'].append(int(words[13 + shift]))
data['stamp_type'].append('job done - finished')
else:
raise Exception("got a weird line:\n" + line)
for line in update_queue:
words = line.split()
shift = 3 if '[#' in words[0] else 0
data['worker_id'].append(None)
data['timestamp'].append(int(words[5 + shift]))
data['stamp_type'].append('update_real queue start')
data['worker_id'].append(None)
data['timestamp'].append(int(words[7 + shift][:-3]))
data['stamp_type'].append('update_real queue end')
for line in update_worker:
words = line.split()
shift = 3 if '[#' in words[0] else 0
data['worker_id'].append(int(words[3 + shift][:-1]))
data['timestamp'].append(int(words[6 + shift]))
data['stamp_type'].append('update_real worker start')
data['worker_id'].append(int(words[3 + shift][:-1]))
data['timestamp'].append(int(words[8 + shift][:-3]))
data['stamp_type'].append('update_real worker end')
return pd.DataFrame(data), NumCPU
def build_dflist_split_timing_info(fn, extract_fcn=extract_split_timing_info):
bm_iterations = extract_fcn(fn)
dflist = []
dflist_stamps = []
NumCPU_list = []
for bm in bm_iterations:
grouped_lines, line_search_lines, update_queue, update_worker, special_lines = group_timing_lines(bm)
if special_lines["terminate_line"] is not None:
dflist.append(build_df_split_timing_run(grouped_lines, line_search_lines, update_queue, update_worker, special_lines["terminate_line"]))
df_stamps, NumCPU = build_df_stamps(grouped_lines, special_lines, update_queue, update_worker)
dflist_stamps.append(df_stamps)
NumCPU_list.append(NumCPU)
return dflist, dflist_stamps, NumCPU_list
def build_comb_df_split_timing_info(fn, extract_fcn=extract_split_timing_info):
dflist, dflist_stamps, NumCPU_list = build_dflist_split_timing_info(fn, extract_fcn=extract_fcn)
for ix, df in enumerate(dflist):
df_pardiff = df[df["timing_type"] == "partial derivative"]
N_tasks = len(df_pardiff["task"].unique())
N_gradients = len(df_pardiff) // N_tasks
gradient_indices = np.hstack(i * np.ones(N_tasks, dtype='int') for i in range(N_gradients))
df["gradient number"] = pd.Series(dtype='Int64')
df.loc[df["timing_type"] == "partial derivative", "gradient number"] = gradient_indices
dflist_stamps[ix]["gradient number"] = pd.Series(dtype='Int64')
dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - asked", "gradient number"] = gradient_indices
dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - started", "gradient number"] = gradient_indices
dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - finished", "gradient number"] = gradient_indices
df["benchmark_number"] = ix
dflist_stamps[ix]["benchmark_number"] = ix
# assuming the stamps are ordered properly, which I'm pretty sure is correct,
# we can do ffill:
df_stamps = pd.concat(dflist_stamps)
df_stamps.loc[~df_stamps['stamp_type'].str.contains('migrad'), 'gradient number'] = df_stamps.loc[~df_stamps['stamp_type'].str.contains('migrad'), 'gradient number'].fillna(method='ffill')
return pd.concat(dflist), df_stamps
In [ ]:
# _, df_stamps_1633603 = build_comb_df_split_timing_info('../rootbench/1633603.burrell.nikhef.nl.out')
In [ ]:
_, df_stamps_1633602 = build_comb_df_split_timing_info('../rootbench/1633602.burrell.nikhef.nl.out')
In [ ]:
_, df_stamps_1633601 = build_comb_df_split_timing_info('../rootbench/1633601.burrell.nikhef.nl.out')
In [ ]:
_st = df_stamps_1633601
_st['stamp_type'].unique()
In [ ]:
_st = df_stamps_1633601
_bench_nr = (_st['benchmark_number'] == 0)
_start_st = _st[_bench_nr
& (_st['stamp_type'] == 'start migrad')
]['timestamp'].iloc[0]
_end_st = _st[_bench_nr
& (_st['stamp_type'] == 'end migrad')
]['timestamp'].iloc[0]
_sta = _st[_bench_nr
& (_st['stamp_type'] == 'update_real queue start')
]#['timestamp'] - _start_st
_fin = _st[_bench_nr
& (_st['stamp_type'] == 'update_real queue end')
]#['timestamp'] - _start_st
fig, ax = plt.subplots(1, 1, figsize=(20, 5))
# ax.set_ylabel('worker')
ax.set_xlim((0, _end_st - _start_st))
# worker = _fin.iloc[ix]["worker_id"]
ax.plot(_sta["timestamp"] - _start_st,
np.zeros_like(_sta["timestamp"]),
color='blue', marker='|', linestyle='', alpha=0.5)
ax.plot(_fin["timestamp"] - _start_st,
np.zeros_like(_sta["timestamp"]),
color='red', marker='|', linestyle='', alpha=0.5)
Cool, this works a lot better than the bar plots, at least we can show everything like this.
Let's also try for the job stamps:
In [ ]:
_st = df_stamps_1633601
_bench_nr = (_st['benchmark_number'] == 0)
_start_st = _st[_bench_nr
& (_st['stamp_type'] == 'start migrad')
]['timestamp'].iloc[0]
_end_st = _st[_bench_nr
& (_st['stamp_type'] == 'end migrad')
]['timestamp'].iloc[0]
_sta = _st[_bench_nr
& (_st['stamp_type'].str.contains('started'))
]#['timestamp'] - _start_st
_fin = _st[_bench_nr
& (_st['stamp_type'].str.contains('finished'))
]#['timestamp'] - _start_st
fig, ax = plt.subplots(1, 1, figsize=(20, 5))
ax.set_ylabel('worker')
ax.set_xlim((0, _end_st - _start_st))
ax.plot(_sta["timestamp"] - _start_st, _sta["worker_id"],
color='green', marker='|', linestyle='', alpha=0.5)
ax.plot(_fin["timestamp"] - _start_st, _fin["worker_id"],
color='blue', marker='|', linestyle='', alpha=0.5)
Nice! Let's combine:
In [ ]:
def plot_timestamps(_st, bench_nr=0, figsize=(20, 5)):
vert_offset = 0.1
_bench_nr = (_st['benchmark_number'] == bench_nr)
_start_st = _st[_bench_nr & (_st['stamp_type'] == 'start migrad')]['timestamp'].iloc[0]
_end_st = _st[_bench_nr & (_st['stamp_type'] == 'end migrad')]['timestamp'].iloc[0]
fig, ax = plt.subplots(1, 1, figsize=figsize)
ax.set_ylabel('worker')
ax.set_xlim((0, _end_st - _start_st))
# update @ queue
_sta = _st[_bench_nr & (_st['stamp_type'].str.contains('update_real queue start'))]
_fin = _st[_bench_nr & (_st['stamp_type'].str.contains('update_real queue end'))]
ax.plot(_sta["timestamp"] - _start_st, np.zeros_like(_sta["timestamp"]) - 1 - vert_offset,
color='red', marker='|', linestyle='', alpha=0.5)
ax.plot(_fin["timestamp"] - _start_st, np.zeros_like(_fin["timestamp"]) - 1 + vert_offset,
color='red', marker='|', linestyle='', alpha=0.5)
# update @ queue
_sta = _st[_bench_nr & (_st['stamp_type'].str.contains('update_real worker start'))]
_fin = _st[_bench_nr & (_st['stamp_type'].str.contains('update_real worker end'))]
ax.plot(_sta["timestamp"] - _start_st, _sta["worker_id"] - vert_offset,
color='red', marker='|', linestyle='', alpha=0.5)
ax.plot(_fin["timestamp"] - _start_st, _fin["worker_id"] + vert_offset,
color='red', marker='|', linestyle='', alpha=0.5)
# jobs
_sta = _st[_bench_nr & (_st['stamp_type'].str.contains('started'))]
_fin = _st[_bench_nr & (_st['stamp_type'].str.contains('finished'))]
ax.plot(_sta["timestamp"] - _start_st, _sta["worker_id"] - vert_offset,
color='blue', marker='|', linestyle='', alpha=0.8)
ax.plot(_fin["timestamp"] - _start_st, _fin["worker_id"] + vert_offset,
color='blue', marker='|', linestyle='', alpha=0.8)
# out of jobs!
_sta = _st[_bench_nr & (_st['stamp_type'].str.contains('no job - asked'))]
_fin = _st[_bench_nr & (_st['stamp_type'].str.contains('no job - denied'))]
ax.plot(_sta["timestamp"] - _start_st, _sta["worker_id"] - vert_offset,
color='grey', marker='|', linestyle='', alpha=0.3)
ax.plot(_fin["timestamp"] - _start_st, _fin["worker_id"] + vert_offset,
color='grey', marker='|', linestyle='', alpha=0.3)
# migrad timestamps
migrad = _st[_bench_nr & (_st['stamp_type'].str.contains('migrad timestamp'))]
ax.plot(migrad["timestamp"] - _start_st, np.zeros_like(migrad["timestamp"]) - 2,
color='black', marker='|', linestyle='')
# setfcn timestamps
_sta = _st[_bench_nr & (_st['stamp_type'].str.contains('start SetFCN'))]
_fin = _st[_bench_nr & (_st['stamp_type'].str.contains('end SetFCN'))]
ax.plot(_sta["timestamp"] - _start_st, np.zeros_like(_sta["timestamp"]) - 2 - vert_offset,
color='violet', marker='|', linestyle='', alpha=0.9)
ax.plot(_fin["timestamp"] - _start_st, np.zeros_like(_fin["timestamp"]) - 2 + vert_offset,
color='violet', marker='|', linestyle='', alpha=0.9)
# GradFcnSynchronize timestamps
GradFcnSynchronize = _st[_bench_nr & (_st['stamp_type'].str.contains('GradFcnSynchronize timestamp'))]
ax.plot(GradFcnSynchronize["timestamp"] - _start_st, np.zeros_like(GradFcnSynchronize["timestamp"]) - 2,
color='tab:green', marker='|', linestyle='', alpha=0.9)
# setup_differentiate timestamps
_this = _st[_bench_nr & (_st['stamp_type'].str.contains('setup_differentiate timestamps'))]
ax.plot(_this["timestamp"] - _start_st, _this['worker_id'],
color='tab:orange', marker='|', linestyle='', alpha=0.9)
return fig, ax
In [ ]:
plot_timestamps(df_stamps_1633601)
Ok, interesting observation: there is actually a long job at the start of every gradient calculation iteration on every worker. This does hint quite strongly at either recalculation of the cache or perhaps of something to do with the sync_parameters stuff... We should measure this first task of each gradient in more detail, because this might very well explain (a big part of) the non-scaling of the gradient!
After the blue but before the red parts the line search probably takes place. Don't have timestamps for them, but they take between about 0.7 and 0.9 seconds, that fits nicely with those gaps.
Did some measurements of the ZeroMQ performance using the provided performance measurement tools. At least on my Macbook, the red parts should take about 0.1s for 8 workers, so this ~1 second is a bit mysterious. It could be that the performance is different on Stoomboot though...
No, actually it turns out the performance of the Stoomboot headnode (where these runs were done) is a bit higher than my Macbook! So it should be even more than a factor 10 faster!
In [ ]:
plot_timestamps(df_stamps_1633602)
Discussed with Wouter, we can make two quick tests to see what's going on here:
For the longer initial tasks: let's measure this on an N-dim Gaussian (128 or something) so that we can exclude computational causes, because all components should then take exactly the same amount of time.
For the communication overhead, as a quick test for the ZMQ throughput: send twice the amount of data to see if the time indeed increases.
In [ ]:
# _, df_stamps_acat_ND_gauss_1553700233 = build_comb_df_split_timing_info('../acat19_ND_gauss_1553700233.out', extract_fcn=load_acat19_out)
Oh, that one doesn't have migrad start and end timestamps... fixed that in the next one, plus new loading function that doesn't add dummy lines:
In [ ]:
def load_acat19_out_v2(fn):
"""
Just single migrad runs, so no need for further splitting by run.
Does add dummy terminate line, because the other functions expect
this. No more dummy start and end migrad lines.
"""
with open(fn, 'r') as fh:
lines = fh.read().splitlines()
print(lines[-1])
lines.append('[#0] DEBUG: -- terminate: 0.0s')
return [lines]
In [ ]:
_, df_stamps_acat_ND_gauss_1553701454 = build_comb_df_split_timing_info('../acat19_ND_gauss_1553701454.out', extract_fcn=load_acat19_out_v2)
In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_ND_gauss_1553701454)
ax.set_xlim(0, 0.2e10)
Let's try without randomized parameters, hopefully the run will be a bit shorter then.
In [ ]:
_, df_stamps_acat_ND_gauss_1553754554 = build_comb_df_split_timing_info('../acat19_ND_gauss_1553754554.out', extract_fcn=load_acat19_out_v2)
In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_ND_gauss_1553754554)
ax.set_xlim(0, 0.2e10)
Nope, still pretty long. Same picture though, indeed also here there's a slight increase in the runtime for the first tasks of the run.
Could this scale with the number of parameters? The above runs were with 128 parameters, Let's try a run with 1024.
In [ ]:
_, df_stamps_acat_ND_gauss_1553755013 = build_comb_df_split_timing_info('../acat19_ND_gauss_1553755013.out', extract_fcn=load_acat19_out_v2)
In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_ND_gauss_1553755013)
# ax.set_xlim(0, 0.2e10)
Ok, bit weird, only one iteration in this case... maybe reactivate parameter randomization here.
Oh, one other thing, I added the "migrad timestamps", should read those in!
In [ ]:
def group_timing_lines(bm_iteration_lines):
"""
Group lines (from one benchmark iteration) by gradient call,
specifying:
- Update time on master
- update_real times on queue and workers
- Gradient work time
- For all partial derivatives a sublist of all lines
- After each gradient block, there may be line_search times,
these are also included as a sublist
Finally, the terminate time for the entire bm_iteration is also
returned (last line in the list).
In each gradient, also timestamps are printed. These are not
further subdivided in this function, but are output as part of
the `gradient_calls` list for further processing elsewhere.
"""
gradient_calls = []
start_indices = []
end_indices = []
line_search_lines = []
update_queue = []
update_worker = []
setfcn_timestamps = []
setup_differentiate_timestamps = []
# to prevent UnboundLocalErrors in older data files:
GradFcnSynchronize_timestamps = None
# flag to check whether we are still in the same gradient call
in_block = False
for ix, line in enumerate(bm_iteration_lines[:-1]): # -1: leave out terminate line
if not in_block and (line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id'):
start_indices.append(ix)
in_block = True
elif 'update_state' in line:
end_indices.append(ix)
in_block = False
# the rest has nothing to do with the gradient call block, so we don't touch in_block there:
elif 'line_search' in line:
line_search_lines.append(line)
elif 'update_real on queue' in line:
update_queue.append(line)
elif 'update_real on worker' in line:
update_worker.append(line)
elif 'start migrad' in line:
start_migrad_line = line
elif 'end migrad' in line:
end_migrad_line = line
elif 'migrad timestamps' in line:
migrad_timestamps = line
elif 'Fitter::SetFCN timestamps' in line:
setfcn_timestamps.append(line)
elif 'RooGradientFunction::synchronize_parameter_settings timestamps' in line:
GradFcnSynchronize_timestamps = line
elif 'NumericalDerivatorMinuit2::setup_differentiate' in line:
setup_differentiate_timestamps.append(line)
if len(start_indices) != len(end_indices):
raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
for ix in range(len(start_indices)):
partial_derivatives, timestamps = separate_partderi_job_time_lines(bm_iteration_lines[start_indices[ix]:end_indices[ix]])
gradient_calls.append({
'gradient_total': bm_iteration_lines[end_indices[ix]],
'partial_derivatives': partial_derivatives,
'timestamps': timestamps
})
try:
terminate_line = bm_iteration_lines[-1]
except IndexError:
terminate_line = None
special_lines = dict(terminate_line=terminate_line,
start_migrad_line=start_migrad_line,
end_migrad_line=end_migrad_line,
migrad_timestamps=migrad_timestamps,
setfcn_timestamps=setfcn_timestamps,
GradFcnSynchronize_timestamps=GradFcnSynchronize_timestamps,
setup_differentiate_timestamps=setup_differentiate_timestamps)
return gradient_calls, line_search_lines, update_queue, update_worker, special_lines
def build_df_stamps(grouped_lines, special_lines, update_queue, update_worker):
data = {'timestamp': [], 'stamp_type': [], 'worker_id': []}
words = special_lines["start_migrad_line"].split()
shift = 3 if '[#' in words[0] else 0
data['timestamp'].append(int(words[3 + shift]))
data['stamp_type'].append('start migrad')
data['worker_id'].append(None)
if len(words) > 10:
NumCPU = int(words[10])
else:
NumCPU = 0
words = special_lines["end_migrad_line"].split()
shift = 3 if '[#' in words[0] else 0
data['timestamp'].append(int(words[3 + shift]))
data['stamp_type'].append('end migrad')
data['worker_id'].append(None)
words = special_lines["migrad_timestamps"].split()
shift = 3 if '[#' in words[0] else 0
for word in words[shift + 2:]:
data['timestamp'].append(int(word))
data['stamp_type'].append('migrad timestamp')
data['worker_id'].append(None)
if special_lines["GradFcnSynchronize_timestamps"] is not None:
words = special_lines["GradFcnSynchronize_timestamps"].split()
shift = 3 if '[#' in words[0] else 0
for word in words[shift + 2:]:
data['timestamp'].append(int(word))
data['stamp_type'].append('GradFcnSynchronize timestamp')
data['worker_id'].append(None)
for line in special_lines["setfcn_timestamps"]:
words = line.split()
shift = 3 if '[#' in words[0] else 0
data['timestamp'].append(int(words[shift + 2]))
data['stamp_type'].append('start SetFCN')
data['worker_id'].append(None)
data['timestamp'].append(int(words[shift + 3]))
data['stamp_type'].append('end SetFCN')
data['worker_id'].append(None)
for line in special_lines["setup_differentiate_timestamps"]:
words = line.split()
shift = 3 if '[#' in words[0] else 0
worker = int(words[shift + 3][:-1])
for word in words[shift + 5:]:
data['timestamp'].append(int(word))
data['stamp_type'].append('setup_differentiate timestamps')
data['worker_id'].append(worker)
for gradient_group in grouped_lines:
for line in gradient_group['timestamps']:
if 'no work' in line:
words = line.split()
shift = 3 if '[#' in words[0] else 0
data['worker_id'].append(int(words[3 + shift]))
data['timestamp'].append(int(words[6 + shift]))
data['stamp_type'].append('no job - asked')
data['worker_id'].append(int(words[3 + shift]))
data['timestamp'].append(int(words[11 + shift]))
data['stamp_type'].append('no job - denied')
elif 'job done' in line:
words = line.split()
shift = 3 if '[#' in words[0] else 0
data['worker_id'].append(int(words[3 + shift]))
data['timestamp'].append(int(words[6 + shift][:-1]))
data['stamp_type'].append('job done - asked')
data['worker_id'].append(int(words[3 + shift]))
data['timestamp'].append(int(words[9 + shift]))
data['stamp_type'].append('job done - started')
data['worker_id'].append(int(words[3 + shift]))
data['timestamp'].append(int(words[13 + shift]))
data['stamp_type'].append('job done - finished')
elif 'NumericalDerivatorMinuit2::setup_differentiate' in line:
pass
elif 'fVal on worker' in line or 'fVal after line search' in line:
pass
else:
raise Exception("got a weird line:\n" + line)
for line in update_queue:
words = line.split()
shift = 3 if '[#' in words[0] else 0
data['worker_id'].append(None)
data['timestamp'].append(int(words[5 + shift]))
data['stamp_type'].append('update_real queue start')
data['worker_id'].append(None)
data['timestamp'].append(int(words[7 + shift][:-3]))
data['stamp_type'].append('update_real queue end')
for line in update_worker:
words = line.split()
shift = 3 if '[#' in words[0] else 0
data['worker_id'].append(int(words[3 + shift][:-1]))
data['timestamp'].append(int(words[6 + shift]))
data['stamp_type'].append('update_real worker start')
data['worker_id'].append(int(words[3 + shift][:-1]))
data['timestamp'].append(int(words[8 + shift][:-3]))
data['stamp_type'].append('update_real worker end')
return pd.DataFrame(data), NumCPU
In [ ]:
_, df_stamps_acat_ND_gauss_1553754554 = build_comb_df_split_timing_info('../acat19_ND_gauss_1553754554.out', extract_fcn=load_acat19_out_v2)
In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_ND_gauss_1553754554)
ax.set_xlim(-1e9, 3e10)
In [ ]:
_start_st = df_stamps_acat_ND_gauss_1553754554[df_stamps_acat_ND_gauss_1553754554['stamp_type'] == "start migrad"]
df_stamps_acat_ND_gauss_1553754554[df_stamps_acat_ND_gauss_1553754554['stamp_type'] == "migrad timestamp"]['timestamp'] - int(_start_st["timestamp"])
Ok, clearly the only dominant part of migrad is FitFCN. New run with timestamps inside there to measure SetFCN...
In [ ]:
_, df_stamps_acat_1553758901 = build_comb_df_split_timing_info('../acat19_1553758901.out', extract_fcn=load_acat19_out_v2)
In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_1553758901)
Ok, so SetFCN is not the main problem here, though it might be the cause of the two ZMQ threads problem.
But, wait a minute there, that's a pretty significant gap over there in the migrad timestamps all of a sudden! Synchronize
is the major rest-term culprit!
In [ ]:
_start_st = df_stamps_acat_1553758901[df_stamps_acat_1553758901['stamp_type'] == "start migrad"]
df_stamps_acat_1553758901[df_stamps_acat_1553758901['stamp_type'] == "migrad timestamp"]['timestamp'] - int(_start_st["timestamp"])
Ok, added some timestamps there as well...
In [ ]:
_, df_stamps_acat_1553763665 = build_comb_df_split_timing_info('../acat19_1553763665.out', extract_fcn=load_acat19_out_v2)
In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_1553763665)
In [ ]:
_start_st = int(df_stamps_acat_1553763665[df_stamps_acat_1553763665['stamp_type'] == "start migrad"]["timestamp"])
df_stamps_acat_1553763665[df_stamps_acat_1553763665['stamp_type'].str.contains('Grad')]["timestamp"] - _start_st
Ok, interesting, the first part is the first big loop in synchronize_parameter_settings
, but the second part is not the second big loop, but the copying of parameters from one list to the other! Both take about the same time and the rest is insignificant.
In [ ]:
# _, df_stamps_acat_1553778980 = build_comb_df_split_timing_info('../acat19_1553778980.out', extract_fcn=load_acat19_out_v2)
Oh, forgot to add worker_id there, again:
In [ ]:
_, df_stamps_acat_1553779882 = build_comb_df_split_timing_info('../acat19_1553779882.out', extract_fcn=load_acat19_out_v2)
In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_1553779882)
ax.set_xlim(2.01e10, 2.1e10)
Ok, there's definitely something there that's taking a while... But which part is it?
In [ ]:
def setup_differentiate_timestamps_duration_per_task(df, task_ix, worker_id=0):
worker = (df["worker_id"] == worker_id)
_df_w0 = df[worker]
first_task_start = int(_df_w0[_df_w0["stamp_type"].str.contains('job done - started')]['timestamp'].iloc[task_ix])
first_task_end = int(_df_w0[_df_w0["stamp_type"].str.contains('job done - finished')]['timestamp'].iloc[task_ix])
first_task = (_df_w0['timestamp'] > first_task_start) & (_df_w0['timestamp'] < first_task_end)
_st = _df_w0[_df_w0["stamp_type"].str.contains('setup_differentiate') & first_task]['timestamp']
_st = _st - _st.iloc[0]
return _st
In [ ]:
# worker 0, first task:
setup_differentiate_timestamps_duration_per_task(df_stamps_acat_1553779882, 0, worker_id=0)
In [ ]:
# worker 0, second task:
setup_differentiate_timestamps_duration_per_task(df_stamps_acat_1553779882, 1, worker_id=0)
Ok, so it's rather clear that most time in the first task goes to the stuff between t6 and t7, which is... as we guessed: the function call! fVal
has to be calculated once at the start for that set of parameters. But wait, doesn't that hold for all tasks? I think it does.
To be sure, let's check out a few more tasks:
In [ ]:
def setup_differentiate_fVal_duration(df, task_ix, worker_id=0):
_ts = setup_differentiate_timestamps_duration_per_task(df, task_ix, worker_id=worker_id)
return _ts.iloc[6] - _ts.iloc[5]
In [ ]:
# all first tasks:
print([setup_differentiate_fVal_duration(df_stamps_acat_1553779882, 0, worker_id=w) for w in range(8)])
In [ ]:
# all 2-10th tasks:
print([setup_differentiate_fVal_duration(df_stamps_acat_1553779882, t, worker_id=w) for w in range(8) for t in range(1, 10)])
In [ ]:
# all 11-100th tasks:
print([setup_differentiate_fVal_duration(df_stamps_acat_1553779882, t, worker_id=w) for w in range(8) for t in range(10, 100)])
In [ ]:
# all 101-250th tasks:
print([setup_differentiate_fVal_duration(df_stamps_acat_1553779882, t, worker_id=w) for w in range(8) for t in range(100, 250)])
Yeah, seems pretty solid.
Now, what exactly causes this first fVal calculation to be so slow? Recalculation of all cached elements? If so, then you won't get out of this.
So then the only option left to make things scale better is to make communication times shorter.
5 * 3 * N_tasks * (N_workers-1)/N_workers
sends, i.e., for our example here, about:
In [ ]:
5 * 3 * 1600 * 7/8
Or, more simply put, it reduces the sending workload by a factor N_workers
for the gradient updates. Unfortunately, there's still the parameter updates which have to be sent fully.
In [ ]:
_, df_stamps_acat_1554710451 = build_comb_df_split_timing_info('../acat19_1554710451.out', extract_fcn=load_acat19_out_v2)
In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_1554710451)
Hm, doesn't really seem to help that much, unfortunately... putting it next to an earlier run, it's more or less similar in communication time:
In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_1553763665)
In [ ]: