In [ ]:
%matplotlib inline

In [ ]:
import importlib

In [ ]:
importlib.reload(RooFitMP_analysis)

In [ ]:
import RooFitMP_analysis

In [ ]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import glob

from RooFitMP_analysis import *

In [ ]:
df_split_timings_1538069 = build_comb_df_split_timing_info('../rootbench/1538069.burrell.nikhef.nl.out')
dfs_1538069 = load_result_file('../rootbench/1538069.burrell.nikhef.nl/RoofitMPworkspace_1549966927.json', match_y_axes=True)

In [ ]:
df_total_timings_1538069 = dfs_1538069['BM_RooFit_MP_GradMinimizer_workspace_file']
df_meta_1538069 = df_total_timings_1538069.drop(['real_time', 'real or ideal'], axis=1).dropna().set_index('benchmark_number', drop=True)

In [ ]:
df_baseline_timings_1538069 = dfs_1538069['BM_RooFit_RooMinimizer_workspace_file']
vanilla_t, vanilla_t_std = df_baseline_timings_1538069['real_time'].mean() / 1000, df_baseline_timings_1538069['real_time'].std() / 1000

In [ ]:
_df = combine_detailed_with_gbench_timings_by_name(df_total_timings_1538069,
                                                   df_split_timings_1538069,
                                                   {'update': 'update state',
                                                    'gradient': 'gradient work',
                                                    'terminate': 'terminate'},
                                                   add_ideal=['gradient'])
_g = sns.relplot(data=_df,
                x='NumCPU', y='time [s]', style="real or ideal",
                hue='timing_type',
                markers=True, err_style="bars", legend='full', kind="line")

linestyle = {'color': 'black', 'lw': 0.7}
_g.axes[0,0].axhline(vanilla_t, **linestyle)
_g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
_g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)

In [ ]:
_timing_types = {
    'update': 'update state',
    'gradient': 'gradient work',
    'terminate': 'terminate',
    'partial derivatives': 'partial derivative'
}
_df = combine_detailed_with_gbench_timings_by_name(df_total_timings_1538069,
                                                   df_split_timings_1538069,
                                                   timing_types=_timing_types,
                                                   add_ideal=['gradient'],
                                                   exclude_from_rest=['partial derivatives'])
_g = sns.relplot(data=_df,
            x='NumCPU', y='time [s]', style="real or ideal",
            hue='timing_type',
            markers=True, err_style="bars", legend='full', kind="line")

linestyle = {'color': 'black', 'lw': 0.7}
_g.axes[0,0].axhline(vanilla_t, **linestyle)
_g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
_g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)

Run with optConst = 1


In [ ]:
df_split_timings_1604382 = build_comb_df_split_timing_info('../rootbench/1604382.burrell.nikhef.nl.out')

In [ ]:
dfs_1604382 = load_result_file('../rootbench/1604382.burrell.nikhef.nl/RoofitMPworkspaceNoOptConst_1551699016.json', match_y_axes=True)

In [ ]:
df_total_timings_1604382 = dfs_1604382['BM_RooFit_MP_GradMinimizer_workspace_file_noOptConst']
df_meta_1604382 = df_total_timings_1604382.drop(['real_time', 'real or ideal'], axis=1).dropna().set_index('benchmark_number', drop=True)

df_baseline_timings_1604382 = dfs_1604382['BM_RooFit_RooMinimizer_workspace_file_noOptConst']
vanilla_t, vanilla_t_std = df_baseline_timings_1604382['real_time'].mean() / 1000, df_baseline_timings_1604382['real_time'].std() / 1000

In [ ]:
_timing_types = {
    'update': 'update state',
    'gradient': 'gradient work',
    'terminate': 'terminate',
    'partial derivatives': 'partial derivative'
}
_df = combine_detailed_with_gbench_timings_by_name(df_total_timings_1604382,
                                                   df_split_timings_1604382,
                                                   timing_types=_timing_types,
                                                   add_ideal=['gradient'],
                                                   exclude_from_rest=['partial derivatives'])
_g = sns.relplot(data=_df,
            x='NumCPU', y='time [s]', style="real or ideal",
            hue='timing_type',
            markers=True, err_style="bars", legend='full', kind="line")

linestyle = {'color': 'black', 'lw': 0.7}
_g.axes[0,0].axhline(vanilla_t, **linestyle)
_g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
_g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)

Large workspace run


In [ ]:
df_split_timings_1604381 = build_comb_df_split_timing_info('../rootbench/1604381.burrell.nikhef.nl.out')

In [ ]:
dfs_1604381 = load_result_file('../rootbench/1604381.burrell.nikhef.nl/RoofitMPworkspace_1551694135.json', match_y_axes=True)

Ok, that one failed after 4 runs, but let's see how that went anyway to get a better feeling for it:


In [ ]:
_df = df_split_timings_1604381[df_split_timings_1604381['timing_type'] == 'gradient work']
_x = np.arange(len(_df))
plt.bar(_x, _df['time [s]'])

In [ ]:
_df['time [s]'].describe()

More timings

In the next benchmarks, we added a lot more timing output that needs to be incorporated into the analysis:

  • Line search timings: single lines starting with line_search:
  • update_real timings on queue process
  • update_real timings on worker processes
  • absolute time stamps (in nanoseconds since epoch) for:
    • start migrad (this line has changed!)
    • end migrad (same)
    • for each worker: lines that contain either two or three stamps:
      • time of ask for task and time of rejection
      • time of ask for task, time of start, time of end of task
    • maybe the update_real/update_state ones as well, if useful

As an additional book keeping complication, we need to run the large workspaces separately for different NumCPU parameters, both to speed up the runs (let them run on the cluster in parallel) and because we are currently getting crashes when running everything in one go; when running with 10 repeats per NumCPU the whole thing just stops after 4 repeats of the single-worker run; when running with 1 repeat it just crashes after the single-worker run (though it does write out the benchmark data to JSON, so that's promising for running the tasks separately).

Try-out: 32 core large workspace run


In [ ]:
df_split_timings_1604381 = build_comb_df_split_timing_info('../rootbench/1633603.burrell.nikhef.nl.out')

Yeah, so, we're going to have to make adjustments, due to the new timings having been added.

Let's paste the necessary functions back in here and adjust them.


In [ ]:
def extract_split_timing_info(fn):
    """
    Group lines by benchmark iteration, starting from migrad until
    after the forks have been terminated.
    """
    with open(fn, 'r') as fh:
        lines = fh.read().splitlines()

    bm_iterations = []

    start_indices = []
    end_indices = []
    for ix, line in enumerate(lines):
        if 'start migrad' in line:
            if lines[ix - 1] == 'start migrad':  # sometimes 'start migrad' appears twice
                start_indices.pop()
            start_indices.append(ix)
        elif 'terminate' in line:
            end_indices.append(ix)

    if len(start_indices) != len(end_indices):
        raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")

    for ix in range(len(start_indices)):
        bm_iterations.append(lines[start_indices[ix] + 1:end_indices[ix] + 1])

    return bm_iterations


def separate_partderi_job_time_lines(lines):
    partial_derivatives = []
    job_times = []
    for line in lines:
#         if line[:18] == '[#1] DEBUG: -- job':
        if line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id':
            partial_derivatives.append(line)
        else:
            job_times.append(line)
    return partial_derivatives, job_times


def group_timing_lines(bm_iteration_lines):
    """
    Group lines (from one benchmark iteration) by gradient call,
    specifying:
    - Update time
    - Gradient work time
    - For all partial derivatives a sublist of all lines
    Finally, the terminate time for the entire bm_iteration is also
    returned (last line in the list).
    """
    gradient_calls = []

    start_indices = []
    end_indices = []
    # flag to check whether we are still in the same gradient call
    in_block = False
    for ix, line in enumerate(bm_iteration_lines[:-1]):  # -1: leave out terminate line
        if not in_block and (line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id'):
            start_indices.append(ix)
            in_block = True
        elif line[:12] == 'update_state' or line[:27] == '[#0] DEBUG: -- update_state':
            end_indices.append(ix)
            in_block = False

    if len(start_indices) != len(end_indices):
        raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
        
    for ix in range(len(start_indices)):
        partial_derivatives, job_times = separate_partderi_job_time_lines(bm_iteration_lines[start_indices[ix]:end_indices[ix]])
        gradient_calls.append({
            'gradient_total': bm_iteration_lines[end_indices[ix]],
            'partial_derivatives': partial_derivatives,
            'job_times': job_times
        })

    try:
        terminate_line = bm_iteration_lines[-1]
    except IndexError:
        terminate_line = None

    return gradient_calls, terminate_line


def build_df_split_timing_run(timing_grouped_lines_list, terminate_line):
    data = {'time [s]': [], 'timing_type': [], 'worker_id': [], 'task': []}

    for gradient_call_timings in timing_grouped_lines_list:
        words = gradient_call_timings['gradient_total'].split()

        data['time [s]'].append(float(words[4][:-2]))
        data['timing_type'].append('update state')
        data['worker_id'].append(None)
        data['task'].append(None)

        data['time [s]'].append(float(words[11][:-1]))
        data['timing_type'].append('gradient work')
        data['worker_id'].append(None)
        data['task'].append(None)

        for partial_derivative_line in gradient_call_timings['partial_derivatives']:
            words = partial_derivative_line.split()
            try:
                data['worker_id'].append(words[4][:-1])
                data['task'].append(words[6][:-1])
                data['time [s]'].append(float(words[10][:-1]))
                data['timing_type'].append('partial derivative')
            except ValueError as e:
                print(words)
                raise e
            
    words = terminate_line.split()
    data['time [s]'].append(float(words[4][:-1]))
    data['timing_type'].append('terminate')
    data['worker_id'].append(None)
    data['task'].append(None)

    return pd.DataFrame(data)


def build_dflist_split_timing_info(fn):
    bm_iterations = extract_split_timing_info(fn)

    dflist = []
    for bm in bm_iterations:
        grouped_lines, terminate_line = group_timing_lines(bm)
        if terminate_line is not None:
            dflist.append(build_df_split_timing_run(grouped_lines, terminate_line))

    return dflist


def build_comb_df_split_timing_info(fn):
    dflist = build_dflist_split_timing_info(fn)

    ix = 0
    for df in dflist:
        df_pardiff = df[df["timing_type"] == "partial derivative"]
        N_tasks = len(df_pardiff["task"].unique())
        N_gradients = len(df_pardiff) // N_tasks
        gradient_indices = np.hstack(i * np.ones(N_tasks, dtype='int') for i in range(N_gradients))

        df["gradient number"] = pd.Series(dtype='Int64')
        df.loc[df["timing_type"] == "partial derivative", "gradient number"] = gradient_indices

        df["benchmark_number"] = ix
        ix += 1

    return pd.concat(dflist)

# AhsF1415
def combine_split_total_timings(df_total_timings, df_split_timings,
                                calculate_rest=True, exclude_from_rest=[],
                                add_ideal=[]):
    df_meta = df_total_timings.drop(['real_time', 'real or ideal'], axis=1).dropna().set_index('benchmark_number', drop=True)

    df_all_timings = df_total_timings.rename(columns={'real_time': 'time [s]'})
    df_all_timings['time [s]'] /= 1000  # convert to seconds
    df_all_timings['timing_type'] = 'total'

    df_split_sum = {}
    for name, df in df_split_timings.items():
        df_split_sum[name] = df.groupby('benchmark_number').sum().join(df_meta, on='benchmark_number').reset_index()
        df_split_sum[name]['real or ideal'] = 'real'
        if name in add_ideal:
            df_split_sum[name] = add_ideal_timings(df_split_sum[name], time_col='time [s]')
        df_split_sum[name]['timing_type'] = name

    # note: sort sorts the *columns* if they are not aligned, nothing happens with the column data itself
    df_all_timings = pd.concat([df_all_timings, ] + list(df_split_sum.values()), sort=True)

    if calculate_rest:
        rest_time = df_all_timings[(df_all_timings['timing_type'] == 'total') & (df_all_timings['real or ideal'] == 'real')].set_index('benchmark_number')['time [s]']
        for name, df in df_split_sum.items():
            if name not in exclude_from_rest:
                rest_time = rest_time - df.set_index('benchmark_number')['time [s]']

        df_rest_time = rest_time.to_frame().join(df_meta, on='benchmark_number').reset_index()
        df_rest_time['timing_type'] = 'rest'
        df_rest_time['real or ideal'] = 'real'

        # note: sort sorts the *columns* if they are not aligned, nothing happens with the column data itself
        df_all_timings = df_all_timings.append(df_rest_time, sort=True)

    return df_all_timings


def combine_detailed_with_gbench_timings_by_name(df_gbench, df_detailed, timing_types={}, **kwargs):
    detailed_selection = {}
    if len(timing_types) == 0:
        raise Exception("Please give some timing_types, otherwise this function is pointless.")
    for name, timing_type in timing_types.items():
        detailed_selection[name] = df_detailed[df_detailed['timing_type'] == timing_type].drop('timing_type', axis=1)
    return combine_split_total_timings(df_gbench, detailed_selection, **kwargs)

In [ ]:
df_split_timings_1633603 = build_comb_df_split_timing_info('../rootbench/1633603.burrell.nikhef.nl.out')

Ok, that works, now to combine them with the json timings.


In [ ]:
dfs_split_timings_1633594__603 = {}
for i in range(1633594, 1633604):
    dfs_split_timings_1633594__603[i] = build_comb_df_split_timing_info(f'../rootbench/{i}.burrell.nikhef.nl.out')

Then the json timings:


In [ ]:
dfs_1633594__603 = {}
for i in range(1633594, 1633604):
    json_file = glob.glob(f'../rootbench/{i}.burrell.nikhef.nl/RoofitMPworkspaceNumCPUInConfigFile_*.json')
    if len(json_file) == 1:
        dfs_1633594__603[i] = RooFitMP_analysis.load_result_file(json_file[0], plot_results=False)
    else:
        print(json_file)
        raise Exception("whups")

In [ ]:
dfs_total_timings_1633594__603 = {}
nums = list(range(1, 9)) + [16, 32]
num_from_run = {}
for ix, run in enumerate(range(1633594, 1633604)):
    num_from_run[run] = nums[ix]

for run, df in dfs_1633594__603.items():
    dfc = df['BM_RooFit_MP_GradMinimizer_workspace_file_NumCPUInConfigFile'].copy()
    dfc['NumCPU'] = num_from_run[run]
    dfs_total_timings_1633594__603[run] = dfc

Combine them per run, because otherwise the benchmark_numbers won't match (they only count within a run).


In [ ]:
# df_baseline_timings_1633594__603 = pd.concat([df['BM_RooFit_RooMinimizer_workspace_file_NumCPUInConfigFile'] for df in dfs_1633594__603])
# vanilla_t, vanilla_t_std = df_baseline_timings_1633594__603['real_time'].mean() / 1000, df_baseline_timings_1633594__603['real_time'].std() / 1000

In [ ]:
_timing_types = {
    'update': 'update state',
    'gradient': 'gradient work',
    'terminate': 'terminate',
#     'partial derivatives': 'partial derivative'
}

_comb = {}
for i in range(1633594, 1633604):
    _comb[i] = combine_detailed_with_gbench_timings_by_name(dfs_total_timings_1633594__603[i],
                                                   dfs_split_timings_1633594__603[i],
                                                   timing_types=_timing_types,
                                                   add_ideal=['gradient'],
                                                   exclude_from_rest=['partial derivatives'])

_df = pd.concat(_comb.values())

_g = sns.relplot(data=_df,
            x='NumCPU', y='time [s]', style="real or ideal",
            hue='timing_type',
            markers=True, err_style="bars", legend='full', kind="line")
_g.fig.set_size_inches(10,6)
_g.axes[0,0].set_xlim((0,33))
# linestyle = {'color': 'black', 'lw': 0.7}
# _g.axes[0,0].axhline(vanilla_t, **linestyle)
# _g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
# _g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)

A rather big rest term now! It's linear though...

Could this be the cost of the extra prints we now introduced all over the place? Especially the update_state ones are a lot.

Only one way to really find out: run again without all those added prints. We're not using them yet anyway.

... But we can do a quick estimate as well. A flush costs about 10 microseconds according to the interwebs. The output files of the runs (which each contain 3 repeats) have the following number of lines:

  • 32 workers: 5309080
  • 16 workers: 2647773
  • 8 workers: 1419130
  • 1 worker: 345466

This means that the flushing overhead per minimization is:


In [ ]:
{'32': 5309080/3/100000,
 '16': 2647773/3/100000,
 '8': 1419130/3/100000,
 '1': 345466/3/100000}

Ok, so that's not the dominant term in the rest term, although it may explain the slight upward trend at the end, though we could hardly call that significant.

Let's look at the line search timings as well. There is one line_search print after each gradient in these runs, that's fine. There's also a whole bunch of other line_search prints at the end of the output file which belong to the RooMinimizer run, so we shouldn't include those. We will then only modify the function above to take the line_search lines directly below a gradient output block.

Also, we'll add the queue and worker process update_real timings here.


In [ ]:
def group_timing_lines(bm_iteration_lines):
    """
    Group lines (from one benchmark iteration) by gradient call,
    specifying:
    - Update time on master
    - update_real times on queue and workers
    - Gradient work time
    - For all partial derivatives a sublist of all lines
    - After each gradient block, there may be line_search times,
      these are also included as a sublist
    Finally, the terminate time for the entire bm_iteration is also
    returned (last line in the list).
    """
    gradient_calls = []

    start_indices = []
    end_indices = []
    line_search_lines = []
    update_queue = []
    update_worker = []
    # flag to check whether we are still in the same gradient call
    in_block = False
    for ix, line in enumerate(bm_iteration_lines[:-1]):  # -1: leave out terminate line
        if not in_block and (line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id'):
            start_indices.append(ix)
            in_block = True
        elif 'update_state' in line:
            end_indices.append(ix)
            in_block = False
        # the rest has nothing to do with the gradient call block, so we don't touch in_block there:
        elif 'line_search' in line:
            line_search_lines.append(line)
        elif 'update_real on queue' in line:
            update_queue.append(line)
        elif 'update_real on worker' in line:
            update_worker.append(line)
    
    if len(start_indices) != len(end_indices):
        raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
        
    for ix in range(len(start_indices)):
        partial_derivatives, job_times = separate_partderi_job_time_lines(bm_iteration_lines[start_indices[ix]:end_indices[ix]])
        gradient_calls.append({
            'gradient_total': bm_iteration_lines[end_indices[ix]],
            'partial_derivatives': partial_derivatives,
            'job_times': job_times
        })

    try:
        terminate_line = bm_iteration_lines[-1]
    except IndexError:
        terminate_line = None

    return gradient_calls, line_search_lines, update_queue, update_worker, terminate_line


def build_df_split_timing_run(timing_grouped_lines_list, line_search_lines, update_queue, update_worker, terminate_line):
    data = {'time [s]': [], 'timing_type': [], 'worker_id': [], 'task': []}

    for gradient_call_timings in timing_grouped_lines_list:
        words = gradient_call_timings['gradient_total'].split()

        data['time [s]'].append(float(words[4][:-2]))
        data['timing_type'].append('update state')
        data['worker_id'].append(None)
        data['task'].append(None)

        data['time [s]'].append(float(words[11][:-1]))
        data['timing_type'].append('gradient work')
        data['worker_id'].append(None)
        data['task'].append(None)
        
        for partial_derivative_line in gradient_call_timings['partial_derivatives']:
            words = partial_derivative_line.split()
            try:
                data['worker_id'].append(words[4][:-1])
                data['task'].append(words[6][:-1])
                data['time [s]'].append(float(words[10][:-1]))
                data['timing_type'].append('partial derivative')
            except ValueError as e:
                print(words)
                raise e

    for line_search_line in line_search_lines:
        words = line_search_line.split()
        data['time [s]'].append(float(words[1][:-1]))
        data['timing_type'].append('line search')
        data['worker_id'].append(None)
        data['task'].append(None)

    for line in update_queue:
        words = line.split()
        data['time [s]'].append(float(words[6][:-1]))
        data['timing_type'].append('update_real on queue')
        data['worker_id'].append(None)
        data['task'].append(None)

    for line in update_worker:
        words = line.split()
        data['time [s]'].append(float(words[7][:-1]))
        data['timing_type'].append('update_real on worker')
        data['worker_id'].append(int(words[6][:-1]))
        data['task'].append(None)

    words = terminate_line.split()
    data['time [s]'].append(float(words[4][:-1]))
    data['timing_type'].append('terminate')
    data['worker_id'].append(None)
    data['task'].append(None)

    return pd.DataFrame(data)


def build_dflist_split_timing_info(fn):
    bm_iterations = extract_split_timing_info(fn)

    dflist = []
    for bm in bm_iterations:
        grouped_lines, line_search_lines, update_queue, update_worker, terminate_line = group_timing_lines(bm)
        if terminate_line is not None:
            dflist.append(build_df_split_timing_run(grouped_lines, line_search_lines, update_queue, update_worker, terminate_line))

    return dflist

In [ ]:
dfs_split_timings_1633594__603_v2 = {}
for i in range(1633594, 1633604):
    dfs_split_timings_1633594__603_v2[i] = build_comb_df_split_timing_info(f'../rootbench/{i}.burrell.nikhef.nl.out')

In [ ]:
_timing_types = {
    'update': 'update state',
    'gradient': 'gradient work',
    'terminate': 'terminate',
    'line search': 'line search',
    'update_real on queue': 'update_real on queue',
    'update_real on worker': 'update_real on worker'
#     'partial derivatives': 'partial derivative'
}

add_ideal_cols = ['gradient']

_comb = {}
for i in range(1633594, 1633604):
    _comb[i] = combine_detailed_with_gbench_timings_by_name(dfs_total_timings_1633594__603[i],
                                                   dfs_split_timings_1633594__603_v2[i],
                                                   timing_types=_timing_types,
                                                   exclude_from_rest=['partial derivatives',
                                                                      'update_real on queue',
                                                                      'update_real on worker'])

_df = pd.concat(_comb.values())
for col in add_ideal_cols:
    df_ideal = RooFitMP_analysis.add_ideal_timings(_df[_df['timing_type'] == col],
                                                   time_col='time [s]', return_ideal=True, ideal_stat='mean')
    df_ideal['timing_type'] = col
    _df = _df.append(df_ideal, sort=True)

_g = sns.relplot(data=_df,
            x='NumCPU', y='time [s]', style="real or ideal",
            hue='timing_type',
            markers=True, err_style="bars", legend='full', kind="line")
_g.fig.set_size_inches(16,8)
_g.axes[0,0].set_xlim((0,33))
# linestyle = {'color': 'black', 'lw': 0.7}
# _g.axes[0,0].axhline(vanilla_t, **linestyle)
# _g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
# _g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)

Ok, so the line search is not insignificant, but also not the whole story.

The update_real on the queue also grows a bit, but nothing too shocking.

The update_real on the workers is not really useful as a total aggregate, since we're really only interested in the time that each worker individually "wastes", i.e. the time between when the master thinks it's done updating (but then the queue and workers still have to process it) and when the worker starts to do actual work.

Ah, another possibility for the rest term is actually the constant term optimization. We should measure this as well, because if indeed it is so big, it might become interesting to turn it off, or at least set it to 1 instead of 2. The latter option may give a factor 2 slower runs in single core mode, but if it can scale down due to parallelism, it might still become faster in the end!

That still doesn't explain the discrepancy in real vs ideal of the gradient timing though. For that we really need to dive into the absolute timings to find out whether there are significant delays between jobs on the workers.

A simple first tally of some of the output may already tell us some things. For instance, let's look at how much workers spend asking for work when there's nothing left:

for i in {594..603}; do no_work=$(grep "no work" 1633${i}.burrell.nikhef.nl.out | wc -l); job_done=$(grep "job done" 1633${i}.burrell.nikhef.nl.out | wc -l); echo "$i: $job_done / $no_work"; done

The result for these runs is:

594:    34512 /      178
595:    34512 /      586
596:    34512 /     6083
597:    34512 /    33391
598:    34512 /    32171
599:    34512 /    63563
600:    34512 /    87326
601:    34512 /   107505
602:    34512 /   231764
603:    34512 /   684300

That's quite the rise! To see what kind of impact this has exactly, we'd have to dig deeper, though. It may be pretty much harmless, since all it should do is cause a delay in the queue loop for processing actually useful worker messages, but the useful workers are probably not producing useful results every nanosecond, so small delays in their processing may not be that important.

Timestamps

Let's do one more modification round to also read in the absolute timestamps. We'll first leave out the timestamps of update_real, since we're more interested in the delays between start and work, between work and end and especially between jobs.


In [ ]:
def extract_split_timing_info(fn):
    """
    Group lines by benchmark iteration, starting from migrad until
    after the forks have been terminated.
    """
    with open(fn, 'r') as fh:
        lines = fh.read().splitlines()

    bm_iterations = []

    start_indices = []
    end_indices = []
    for ix, line in enumerate(lines):
        if 'start migrad' in line:
            if 'start migrad' in lines[ix - 1]:  # sometimes 'start migrad' appears twice
                start_indices.pop()
            start_indices.append(ix)
        elif 'terminate: ' in line:
            end_indices.append(ix)

    if len(start_indices) != len(end_indices):
        raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")

    for ix in range(len(start_indices)):
        bm_iterations.append(lines[start_indices[ix]:end_indices[ix] + 1])

    return bm_iterations


def group_timing_lines(bm_iteration_lines):
    """
    Group lines (from one benchmark iteration) by gradient call,
    specifying:
    - Update time on master
    - update_real times on queue and workers
    - Gradient work time
    - For all partial derivatives a sublist of all lines
    - After each gradient block, there may be line_search times,
      these are also included as a sublist
    Finally, the terminate time for the entire bm_iteration is also
    returned (last line in the list).
    
    In each gradient, also timestamps are printed. These are not
    further subdivided in this function, but are output as part of
    the `gradient_calls` list for further processing elsewhere.
    """
    gradient_calls = []

    start_indices = []
    end_indices = []
    line_search_lines = []
    update_queue = []
    update_worker = []
    # flag to check whether we are still in the same gradient call
    in_block = False
    for ix, line in enumerate(bm_iteration_lines[:-1]):  # -1: leave out terminate line
        if not in_block and (line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id'):
            start_indices.append(ix)
            in_block = True
        elif 'update_state' in line:
            end_indices.append(ix)
            in_block = False
        # the rest has nothing to do with the gradient call block, so we don't touch in_block there:
        elif 'line_search' in line:
            line_search_lines.append(line)
        elif 'update_real on queue' in line:
            update_queue.append(line)
        elif 'update_real on worker' in line:
            update_worker.append(line)
        elif 'start migrad' in line:
            start_migrad_line = line
        elif 'end migrad' in line:
            end_migrad_line = line
    
    if len(start_indices) != len(end_indices):
        raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
        
    for ix in range(len(start_indices)):
        partial_derivatives, timestamps = separate_partderi_job_time_lines(bm_iteration_lines[start_indices[ix]:end_indices[ix]])
        gradient_calls.append({
            'gradient_total': bm_iteration_lines[end_indices[ix]],
            'partial_derivatives': partial_derivatives,
            'timestamps': timestamps
        })

    try:
        terminate_line = bm_iteration_lines[-1]
    except IndexError:
        terminate_line = None

    return gradient_calls, line_search_lines, update_queue, update_worker, terminate_line, start_migrad_line, end_migrad_line
    

def build_df_stamps(grouped_lines, start_migrad_line, end_migrad_line):
    data = {'timestamp': [], 'stamp_type': [], 'worker_id': []}

    words = start_migrad_line.split()
    data['timestamp'].append(int(words[6]))
    data['stamp_type'].append('start migrad')
    data['worker_id'].append(None)

    NumCPU = int(words[10])

    words = end_migrad_line.split()
    data['timestamp'].append(int(words[6]))
    data['stamp_type'].append('end migrad')
    data['worker_id'].append(None)

    for gradient_group in grouped_lines:
        for line in gradient_group['timestamps']:
            if 'no work' in line:
                words = line.split()
                data['worker_id'].append(int(words[6]))
                data['timestamp'].append(int(words[9]))
                data['stamp_type'].append('no job - asked')

                data['worker_id'].append(int(words[6]))
                data['timestamp'].append(int(words[14]))
                data['stamp_type'].append('no job - denied')
            elif 'job done' in line:
                words = line.split()
                data['worker_id'].append(int(words[6]))
                data['timestamp'].append(int(words[9][:-1]))
                data['stamp_type'].append('job done - asked')

                data['worker_id'].append(int(words[6]))
                data['timestamp'].append(int(words[12]))
                data['stamp_type'].append('job done - started')

                data['worker_id'].append(int(words[6]))
                data['timestamp'].append(int(words[16]))
                data['stamp_type'].append('job done - finished')
            elif 'update_real' in line:
                # discard for now
                pass
            else:
                raise Exception("got a weird line:\n" + line)

    return pd.DataFrame(data), NumCPU


def build_dflist_split_timing_info(fn, extract_fcn=extract_split_timing_info):
    bm_iterations = extract_fcn(fn)

    dflist = []
    dflist_stamps = []
    NumCPU_list = []
    for bm in bm_iterations:
        grouped_lines, line_search_lines, update_queue, update_worker, terminate_line, start_migrad_line, end_migrad_line = group_timing_lines(bm)
        if terminate_line is not None:
            dflist.append(build_df_split_timing_run(grouped_lines, line_search_lines, update_queue, update_worker, terminate_line))

            df_stamps, NumCPU = build_df_stamps(grouped_lines, start_migrad_line, end_migrad_line)
            dflist_stamps.append(df_stamps)
            NumCPU_list.append(NumCPU)

    return dflist, dflist_stamps, NumCPU_list


def build_comb_df_split_timing_info(fn, extract_fcn=extract_split_timing_info):
    dflist, dflist_stamps, NumCPU_list = build_dflist_split_timing_info(fn, extract_fcn=extract_fcn)

    for ix, df in enumerate(dflist):
        df_pardiff = df[df["timing_type"] == "partial derivative"]
        N_tasks = len(df_pardiff["task"].unique())
        N_gradients = len(df_pardiff) // N_tasks
        gradient_indices = np.hstack(i * np.ones(N_tasks, dtype='int') for i in range(N_gradients))

        df["gradient number"] = pd.Series(dtype='Int64')
        df.loc[df["timing_type"] == "partial derivative", "gradient number"] = gradient_indices
        
        dflist_stamps[ix]["gradient number"] = pd.Series(dtype='Int64')
        dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - asked", "gradient number"] = gradient_indices
        dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - started", "gradient number"] = gradient_indices
        dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - finished", "gradient number"] = gradient_indices

        df["benchmark_number"] = ix
        dflist_stamps[ix]["benchmark_number"] = ix

    # assuming the stamps are ordered properly, which I'm pretty sure is correct,
    # we can do ffill:
    df_stamps = pd.concat(dflist_stamps)
    df_stamps.loc[~df_stamps['stamp_type'].str.contains('migrad'), 'gradient number'] = df_stamps.loc[~df_stamps['stamp_type'].str.contains('migrad'), 'gradient number'].fillna(method='ffill')
        
    return pd.concat(dflist), df_stamps

In [ ]:
df_split_timings_1633594_v3, df_stamps_1633594 = build_comb_df_split_timing_info('../rootbench/1633594.burrell.nikhef.nl.out')

In [ ]:
df_stamps_1633594[df_stamps_1633594['stamp_type'].str.contains('no job')]

ACAT 2019

Some quick runs for ACAT 2019 that requires custom analysis.


In [ ]:
def load_acat19_out(fn):
    """
    Just single migrad runs, so no need for further splitting by run.
    Does add dummy terminate line, because the other functions expect
    this. Also dummy start and end migrad lines.
    """
    with open(fn, 'r') as fh:
        lines = fh.read().splitlines()
    print(lines[-1])
    lines = ['[#0] DEBUG: -- start migrad at 0 with NumCPU = 0'] + lines
    lines.append('[#0] DEBUG: -- end migrad at 0')
    lines.append('[#0] DEBUG: -- terminate: 0.0s')    
    return [lines]

In [ ]:
fn_acat19_1552415151 = '/Users/pbos/projects/apcocsm/code/acat19_1552415151.out'

df_split_timings_acat19_1552415151, df_stamps_acat19_1552415151 = build_comb_df_split_timing_info(fn_acat19_1552415151,
                                                                                 extract_fcn=load_acat19_out)

In [ ]:
df_split_timings_acat19_1552415151.groupby('timing_type')['time [s]'].sum()

In [ ]:
579.161-536.84-15.586-1.225684

That's good, at least the rest term is constant. This is important, because in reality, this fit will take about an hour, so 25 seconds are acceptable overhead.


In [ ]:
df_split_timings_1633594_v3.groupby(['benchmark_number', 'timing_type'])['time [s]'].sum()

In [ ]:
536.840400/216.742200, 15.586356/5.496111

Indeed, the other terms scale by a factor of about 2.5 to 3, while the rest term remains constant.

RooMinimizer timings for comparison


In [ ]:
dfs_baseline_timings_1633594__603 = {}

for run, df in dfs_1633594__603.items():
    dfc = df['BM_RooFit_RooMinimizer_workspace_file_NumCPUInConfigFile'].copy()
    dfc['NumCPU'] = 1
    dfs_baseline_timings_1633594__603[run] = dfc

df_baseline_timings_1633594__603 = pd.concat(dfs_baseline_timings_1633594__603.values())

In [ ]:
vanilla_t, vanilla_t_std = df_baseline_timings_1633594__603['real_time'].mean() / 1000, df_baseline_timings_1633594__603['real_time'].std() / 1000

In [ ]:
_timing_types = {
    'update': 'update state',
    'gradient': 'gradient work',
    'terminate': 'terminate',
    'line search': 'line search',
#     'update_real on queue': 'update_real on queue',
#     'update_real on worker': 'update_real on worker'
#     'partial derivatives': 'partial derivative'
}

add_ideal_cols = ['gradient']

_comb = {}
for i in range(1633594, 1633604):
    _comb[i] = combine_detailed_with_gbench_timings_by_name(dfs_total_timings_1633594__603[i],
                                                   dfs_split_timings_1633594__603_v2[i],
                                                   timing_types=_timing_types,
                                                   exclude_from_rest=['partial derivatives',
                                                                      'update_real on queue',
                                                                      'update_real on worker'])

_df = pd.concat(_comb.values())
# for col in add_ideal_cols:
#     df_ideal = RooFitMP_analysis.add_ideal_timings(_df[_df['timing_type'] == col],
#                                                    time_col='time [s]', return_ideal=True, ideal_stat='mean')
#     df_ideal['timing_type'] = col
#     _df = _df.append(df_ideal, sort=True)

# _g = sns.relplot(data=_df,
#             x='NumCPU', y='time [s]', style="real or ideal",
#             hue='timing_type',
#             markers=True,
#                  err_style="bars",
#                  legend='full', kind="line")
_g = sns.catplot(data=_df, 
            x='NumCPU', y='time [s]', style="real or ideal",
            hue='timing_type',
#             markers=True,
#                  err_style="bars",
                 legend='full', kind="point")
_g.fig.set_size_inches(16,8)
# _g.axes[0,0].set_xlim((0,33))

linestyle = {'color': 'black', 'lw': 0.7}
_g.axes[0,0].axhline(vanilla_t, **linestyle)
_g.axes[0,0].axhline(vanilla_t - vanilla_t_std, alpha=0.5, **linestyle)
_g.axes[0,0].axhline(vanilla_t + vanilla_t_std, alpha=0.5, **linestyle)

In [ ]:
(dfs_total_timings_1633594__603[1633594]['real_time'] / 1000).mean(), vanilla_t

In [ ]:
fig, ax = plt.subplots(1, 1, sharey=True, figsize=(14, 8))

_timing_types = ['gradient', 'line search', 'update', 'terminate', 'rest']

colors = plt.cm.get_cmap('tab10', 10)

first_nc = True
for nc in _df['NumCPU'].unique():
    prev_time = 0
    for ix_ttype, ttype in enumerate(_timing_types):
        time_mu  = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
        time_std = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].std()
        if first_nc:
            label = ttype
        else:
            label = ""
        ax.bar(str(nc), time_mu, bottom=prev_time, color=colors(ix_ttype), label=label)
        prev_time += float(time_mu)
    first_nc = False
plt.legend()

calculate speed-up

W.r.t. mean time of single core run. Not w.r.t. old RooMinimizer timing because for that we don't measure all the specific timing types, only total time.


In [ ]:
_df_speedup = {'NumCPU': [], 'timing_type': [], 'speedup': []}

for ttype in _df['timing_type'].unique():
    mean_single_core_time = _df[(_df['NumCPU'] == 1) & (_df['timing_type'] == ttype)]['time [s]'].mean()
    for nc in _df['NumCPU'].unique():
        mean_time = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
        _df_speedup['NumCPU'].append(nc)
        _df_speedup['timing_type'].append(ttype)
        _df_speedup['speedup'].append(mean_single_core_time / mean_time)

_df_speedup = pd.DataFrame(_df_speedup)

In [ ]:
with sns.axes_style('whitegrid'):
    _g = sns.catplot(data=_df_speedup, x='NumCPU', y='speedup', hue='timing_type', kind='bar')
    _g.fig.set_size_inches(14,8)

Back to timestamps


In [ ]:
_st = df_stamps_1633594
_st_b0 = _st[_st['benchmark_number'] == 0]

In [ ]:
start_stamps = _st[_st['stamp_type'] == 'start migrad']
start_stamps
# _st[~_st['stamp_type'].str.contains('migrad')]

In [ ]:
end_stamps = _st[_st['stamp_type'] == 'end migrad']
end_stamps

In [ ]:
start_st0 = _st_b0[_st_b0['stamp_type'] == 'start migrad']['timestamp'].iloc[0]
end_st0   = _st_b0[_st_b0['stamp_type'] == 'end migrad']['timestamp'].iloc[0]

In [ ]:
(end_st0 - start_st0)/1.e9

In [ ]:
_st_b0[~_st_b0['stamp_type'].str.contains('migrad')].head()

We can get two kinds of worker-loop overhead out of the "job done" stamps: "explicit" overhead in the time between asked and started and "implicit" overhead in the time between a finished job and the asking of a next one.


In [ ]:
_st_b0_jd = _st_b0[_st_b0['stamp_type'].str.contains('job done')]

In [ ]:
_st_b0_jd_ask = _st_b0_jd[_st_b0_jd['stamp_type'].str.contains('asked')]
_st_b0_jd_sta = _st_b0_jd[_st_b0_jd['stamp_type'].str.contains('started')]
_st_b0_jd_fin = _st_b0_jd[_st_b0_jd['stamp_type'].str.contains('finished')]

Explicit:


In [ ]:
(_st_b0_jd_sta.reset_index()['timestamp'] - _st_b0_jd_ask.reset_index()['timestamp']).sum()/1.e9

Implicit:


In [ ]:
(_st_b0_jd_ask.iloc[1:].reset_index()['timestamp'] - _st_b0_jd_fin.iloc[:-1].reset_index()['timestamp']).sum()/1.e9

Does this sum together with the actual partial derivatives to the total gradient time?


In [ ]:
_df_split = dfs_split_timings_1633594__603_v2[1633594]
_pd_time = _df_split[(_df_split["timing_type"] == "partial derivative")
                     & (_df_split["benchmark_number"] == 0)]['time [s]'].sum()
_grad_time = _df_split[(_df_split["timing_type"] == "gradient work")
                       & (_df_split["benchmark_number"] == 0)]['time [s]'].sum()
_grad_time, _pd_time

Oh, it's too much... wait, probably the times between gradients are longer, so they should be filtered out first.


In [ ]:
impl_overhead = 0
for g in _st_b0_jd_ask['gradient number'].unique():
    _ask_g = _st_b0_jd_ask[_st_b0_jd_ask['gradient number'] == g]
    _fin_g = _st_b0_jd_fin[_st_b0_jd_fin['gradient number'] == g]
    impl_overhead += (_ask_g.iloc[1:].reset_index()['timestamp'] - _fin_g.iloc[:-1].reset_index()['timestamp']).sum()/1.e9
impl_overhead

In [ ]:
216.74220000000003 - (0.779148892 + 1.015019575 + 212.982649869)

Ok, that makes more sense. The remaining ~2 seconds could be any number of things, like communication delays with the queue, or delays caused by update_real.

TODO

  • Also check the wait time from job rejections.
  • Include delays from update_real

Distribution of parallel task times


In [ ]:
_df_split = dfs_split_timings_1633594__603_v2[1633594]
_pdtime = _df_split[_df_split['timing_type'] == 'partial derivative']['time [s]']

# fig, ax = plt.subplots(1, 1, figsize=(14,8))
_g = sns.distplot(_pdtime, kde=False)#, ax=ax)
_g.set_yscale('log')

ACAT 2019 talk plots


In [ ]:
sns.reset_orig()
sns.set_context('talk')
sns.set_style('whitegrid')

In [ ]:
_df = combine_detailed_with_gbench_timings_by_name(df_total_timings_1538069,
                                                   df_split_timings_1538069,
                                                   {'update': 'update state',
                                                    'gradient': 'gradient work',
                                                    'terminate': 'terminate'})

fig, ax = plt.subplots(1, 1, sharey=True, figsize=(14, 8), dpi=200)

_timing_types = ['gradient', 'update', 'terminate', 'rest']

colors = plt.cm.get_cmap('tab10', 10)

lighten = lambda c: min(0.3*c, 1)

first_nc = True
for nc in _df['NumCPU'].unique():
    prev_time = 0
    for ix_ttype, ttype in enumerate(_timing_types):
        time_mu  = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
        time_std = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].std()
        if first_nc:
            label = ttype
        else:
            label = ""
        color = colors(ix_ttype)
        ecolor = (lighten(color[0]), lighten(color[1]), lighten(color[2]), 0.8)
        ax.bar(str(nc), time_mu, bottom=prev_time, color=color, label=label,
               yerr=time_std, ecolor=ecolor, capsize=5)
        prev_time += float(time_mu)
    first_nc = False

plt.legend()

ax.set_xlabel('workers')
ax.set_ylabel('time [s]')

_vanilla_t, _vanilla_t_std = df_baseline_timings_1538069['real_time'].mean() / 1000, df_baseline_timings_1538069['real_time'].std() / 1000

linestyle = {'color': 'black', 'lw': 0.7}
ax.axhline(_vanilla_t, **linestyle)
ax.axhline(_vanilla_t - _vanilla_t_std, alpha=0.5, **linestyle)
ax.axhline(_vanilla_t + _vanilla_t_std, alpha=0.5, **linestyle)

In [ ]:
_df_speedup = {'NumCPU': [], 'timing type': [], 'mean speedup': []}

ttypes = _df['timing_type'].unique()

for ttype in ttypes:
    mean_single_core_time = _df[(_df['NumCPU'] == 1) & (_df['timing_type'] == ttype)]['time [s]'].mean()
    for nc in _df['NumCPU'].unique():
        mean_time = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
        _df_speedup['NumCPU'].append(nc)
        _df_speedup['timing type'].append(ttype)
        _df_speedup['mean speedup'].append(mean_single_core_time / mean_time)

_df_speedup = pd.DataFrame(_df_speedup)

_g = sns.catplot(data=_df_speedup, x='NumCPU', y='mean speedup', hue='timing type', kind='point', palette='tab10', hue_order=_timing_types + ['total'])
_g.fig.set_size_inches(11,8)
_g.fig.set_dpi(200)
_g.axes[0,0].set_xlabel('workers')

In [ ]:
_timing_types = {
    'update': 'update state',
    'gradient': 'gradient work',
    'terminate': 'terminate',
    'line search': 'line search',
}

add_ideal_cols = ['gradient']

_comb = {}
for i in range(1633594, 1633604):
    _comb[i] = combine_detailed_with_gbench_timings_by_name(dfs_total_timings_1633594__603[i],
                                                   dfs_split_timings_1633594__603_v2[i],
                                                   timing_types=_timing_types,
                                                   exclude_from_rest=['partial derivatives',
                                                                      'update_real on queue',
                                                                      'update_real on worker'])

_df = pd.concat(_comb.values())


fig, ax = plt.subplots(1, 1, sharey=True, figsize=(14, 8), dpi=200)

_timing_types = ['gradient', 'line search', 'update', 'terminate', 'rest']

colors = plt.cm.get_cmap('tab10', 10)

lighten = lambda c: min(0.3*c, 1)

first_nc = True
for nc in _df['NumCPU'].unique():
    prev_time = 0
    for ix_ttype, ttype in enumerate(_timing_types):
        time_mu  = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
        time_std = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].std()
        if first_nc:
            label = ttype
        else:
            label = ""
        color = colors(ix_ttype)
        ecolor = (lighten(color[0]), lighten(color[1]), lighten(color[2]), 0.8)
        ax.bar(str(nc), time_mu, bottom=prev_time, color=color, label=label,
               yerr=time_std, ecolor=ecolor, capsize=5)
        prev_time += float(time_mu)
    first_nc = False

plt.legend()

ax.set_xlabel('workers')
ax.set_ylabel('time [s]')

# _vanilla_t, _vanilla_t_std = df_baseline_timings_1633594__603['real_time'].mean() / 1000, df_baseline_timings_1633594__603['real_time'].std() / 1000

# linestyle = {'color': 'black', 'lw': 0.7}
# ax.axhline(_vanilla_t, **linestyle)
# ax.axhline(_vanilla_t - _vanilla_t_std, alpha=0.5, **linestyle)
# ax.axhline(_vanilla_t + _vanilla_t_std, alpha=0.5, **linestyle)

In [ ]:
_df_speedup = {'NumCPU': [], 'timing type': [], 'speedup': []}

ttypes = _df['timing_type'].unique()

for ttype in ttypes:
    mean_single_core_time = _df[(_df['NumCPU'] == 1) & (_df['timing_type'] == ttype)]['time [s]'].mean()
    for nc in _df['NumCPU'].unique():
#         mean_time = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]'].mean()
        times = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == ttype)]['time [s]']
        for time in times:
            _df_speedup['NumCPU'].append(nc)
            _df_speedup['timing type'].append(ttype)
            _df_speedup['speedup'].append(mean_single_core_time / float(time))

_df_speedup = pd.DataFrame(_df_speedup)

_g = sns.catplot(data=_df_speedup, x='NumCPU', y='speedup', hue='timing type',
                 kind='point', palette='tab10', hue_order=_timing_types + ['total'])
_g.fig.set_size_inches(11,8)
_g.fig.set_dpi(200)
_g.axes[0,0].set_xlabel('workers')

Extrapolate total time to longer run times: 10 minutes, 1 hour, 2 hours.


In [ ]:
# in the actual run
total_runtime = _df[(_df['NumCPU'] == 1) & (_df['timing_type'] == 'total')]['time [s]'].mean()  # seconds
gradient = _df[(_df['NumCPU'] == 1) & (_df['timing_type'] == 'gradient')]['time [s]'].mean()
total_overhead = total_runtime - gradient

_df_total_times = _df[(_df['timing_type'] == 'total')]
_df_total_times['timing_type'] = 'total (~4min)'

# extrapolate gradient timings
_df_extrapolate_10min = {'NumCPU': [], 'timing_type': [], 'time [s]': []}
extrap_factor_10min = (10 * 60 - total_overhead) / gradient
for nc in _df['NumCPU'].unique():
    times = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == 'gradient')]['time [s]']
    for time in times:
        _df_extrapolate_10min['NumCPU'].append(nc)
        _df_extrapolate_10min['timing_type'].append('total (10min)')
        _df_extrapolate_10min['time [s]'].append(float(time) * extrap_factor_10min + total_overhead)

_df_extrapolate_10min = pd.DataFrame(_df_extrapolate_10min)


_df_extrapolate_1h = {'NumCPU': [], 'timing_type': [], 'time [s]': []}
extrap_factor_1h = (60 * 60 - total_overhead) / gradient
for nc in _df['NumCPU'].unique():
    times = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == 'gradient')]['time [s]']
    for time in times:
        _df_extrapolate_1h['NumCPU'].append(nc)
        _df_extrapolate_1h['timing_type'].append('total (1h)')
        _df_extrapolate_1h['time [s]'].append(float(time) * extrap_factor_1h + total_overhead)

_df_extrapolate_1h = pd.DataFrame(_df_extrapolate_1h)

_df_extrapolate_2h = {'NumCPU': [], 'timing_type': [], 'time [s]': []}
extrap_factor_2h = (2 * 60 * 60 - total_overhead) / gradient
for nc in _df['NumCPU'].unique():
    times = _df[(_df['NumCPU'] == nc) & (_df['timing_type'] == 'gradient')]['time [s]']
    for time in times:
        _df_extrapolate_2h['NumCPU'].append(nc)
        _df_extrapolate_2h['timing_type'].append('total (2h)')
        _df_extrapolate_2h['time [s]'].append(float(time) * extrap_factor_2h + total_overhead)

_df_extrapolate_2h = pd.DataFrame(_df_extrapolate_2h)

In [ ]:
_df_extr_speedup = {'NumCPU': [], 'timing type': [], 'speedup': []}

for extr_time, _df_extr in {'true run': _df_total_times,
                            '10min': _df_extrapolate_10min,
                            '1h': _df_extrapolate_1h,
                            '2h': _df_extrapolate_2h,
                            }.items():
    ttypes = _df_extr['timing_type'].unique()

    for ttype in ttypes:
        mean_single_core_time = _df_extr[(_df_extr['NumCPU'] == 1) & (_df_extr['timing_type'] == ttype)]['time [s]'].mean()
        for nc in _df_extr['NumCPU'].unique():
            times = _df_extr[(_df_extr['NumCPU'] == nc) & (_df_extr['timing_type'] == ttype)]['time [s]']
            for time in times:
                _df_extr_speedup['NumCPU'].append(nc)
                _df_extr_speedup['timing type'].append(ttype)
                _df_extr_speedup['speedup'].append(mean_single_core_time / float(time))

_df_extr_speedup = pd.DataFrame(_df_extr_speedup)

_g = sns.catplot(data=_df_extr_speedup, x='NumCPU', y='speedup', hue='timing type',
                 kind='point', palette='tab10')
_g.fig.set_size_inches(11,8)
_g.fig.set_dpi(200)
_g.axes[0,0].set_xlabel('workers')

Ok, talk done. Back to:

Back to back to timestamps


In [ ]:
_st = df_stamps_1633594

Let's try to just plot the timeline to visualize the load on the worker during the whole run.


In [ ]:
_sta = _st_b0_jd_sta['timestamp'] - start_st0
_fin = _st_b0_jd_fin['timestamp'] - start_st0

fig, ax = plt.subplots(1, 1, figsize=(20, 3))

for ix in range(len(_sta)):
# for ix in range(2000):
    ax.barh('worker 0', _fin.iloc[ix] - _sta.iloc[ix], left=_sta.iloc[ix],
            color='red', linewidth=0)

ax.set_xlim((0, end_st0 - start_st0))

In [ ]:
len(_fin), len(_sta)

Goddamn, that takes forever to complete. Seems like matplotlib cannot handle too much bars at the same time.

But ok, no shockers here.

Now for a multi-worker run:


In [ ]:
_, df_stamps_1633603 = build_comb_df_split_timing_info('../rootbench/1633603.burrell.nikhef.nl.out')

In [ ]:
_, df_stamps_1633602 = build_comb_df_split_timing_info('../rootbench/1633602.burrell.nikhef.nl.out')

In [ ]:
_, df_stamps_1633601 = build_comb_df_split_timing_info('../rootbench/1633601.burrell.nikhef.nl.out')

In [ ]:
_bench_nr = (df_stamps_1633601['benchmark_number'] == 0)

_start_st = df_stamps_1633601[_bench_nr
                              & (df_stamps_1633601['stamp_type'] == 'start migrad')
                             ]['timestamp'].iloc[0]
_end_st = df_stamps_1633601[_bench_nr
                              & (df_stamps_1633601['stamp_type'] == 'end migrad')
                             ]['timestamp'].iloc[0]

_sta = df_stamps_1633601[_bench_nr
                         & (df_stamps_1633601['stamp_type'].str.contains('started'))
                        ]#['timestamp'] - _start_st
_fin = df_stamps_1633601[_bench_nr
                         & (df_stamps_1633601['stamp_type'].str.contains('finished'))
                        ]#['timestamp'] - _start_st

In [ ]:
len(_sta), len(_fin), _end_st, _start_st

In [ ]:
fig, ax = plt.subplots(1, 1, figsize=(20, 5))
ax.set_ylabel('worker')
ax.set_xlim((0, _end_st - _start_st))

batches = int(len(_sta)/1000) + 1  # get aruond matplotlib limits?

for batch in range(batches):
    for ix in range(batch * 1000, (batch + 1) * 1000):
        worker = _fin.iloc[ix]["worker_id"]
        ax.barh(worker,
                _fin.iloc[ix]["timestamp"] - _sta.iloc[ix]["timestamp"],
                left=_sta.iloc[ix]["timestamp"] - _start_st,
                color='red', linewidth=0)

In [ ]:
import matplotlib as mpl
mpl.use('qt5Agg')
%matplotlib inline

In [ ]:
batch_size = 2000

batches = int(len(_sta)/batch_size) + 1  # get aruond matplotlib limits?

for batch in range(batches):
    fig, ax = plt.subplots(1, 1, figsize=(20, 5))
    ax.set_ylabel('worker')
    ax.set_xlim((0, _end_st - _start_st))
    
    for ix in range(min(batch * batch_size, len(_sta)),
                    min((batch + 1) * batch_size, len(_sta))):
        worker = _fin.iloc[ix]["worker_id"]
        ax.barh(worker,
                _fin.iloc[ix]["timestamp"] - _sta.iloc[ix]["timestamp"],
                left=_sta.iloc[ix]["timestamp"] - _start_st,
                color='red', linewidth=0)

In [ ]:
for grad_nr in df_stamps_1633601['gradient number'].unique():
    _grad_nr = (df_stamps_1633601['gradient number'] == grad_nr)

    _sta = df_stamps_1633601[_bench_nr & _grad_nr
                             & (df_stamps_1633601['stamp_type'].str.contains('started'))
                            ]#['timestamp'] - _start_st
    _fin = df_stamps_1633601[_bench_nr & _grad_nr
                             & (df_stamps_1633601['stamp_type'].str.contains('finished'))
                            ]#['timestamp'] - _start_st

    fig, ax = plt.subplots(1, 1, figsize=(20, 5))
    ax.set_ylabel('worker')
    # ax.set_xlim((0, _end_st - _start_st))

    for ix in range(len(_sta)):
        worker = _fin.iloc[ix]["worker_id"]
        ax.barh(worker,
                _fin.iloc[ix]["timestamp"] - _sta.iloc[ix]["timestamp"],
                left=_sta.iloc[ix]["timestamp"] - _start_st,
                color='red', linewidth=0)

In [ ]:
_st = df_stamps_1633601
_st_b0 = _st[_st['benchmark_number'] == 0]

_jd = _st_b0['stamp_type'].str.contains('job done')

_st_b0_jd_ask = _st_b0[_jd & _st_b0['stamp_type'].str.contains('asked')]
_st_b0_jd_sta = _st_b0[_jd & _st_b0['stamp_type'].str.contains('started')]
_st_b0_jd_fin = _st_b0[_jd & _st_b0['stamp_type'].str.contains('finished')]

explicit_delay = 0
implicit_delay = 0
for g in _st_b0['gradient number'].unique():
    for w in _st_b0['worker_id'].unique():
        _sta_g = _st_b0_jd_sta[(_st_b0_jd_sta['gradient number'] == g) & (_st_b0_jd_sta['worker_id'] == w)]
        _ask_g = _st_b0_jd_ask[(_st_b0_jd_ask['gradient number'] == g) & (_st_b0_jd_ask['worker_id'] == w)]
        _fin_g = _st_b0_jd_fin[(_st_b0_jd_fin['gradient number'] == g) & (_st_b0_jd_fin['worker_id'] == w)]
        explicit_delay += (_sta_g.reset_index()['timestamp'] - _ask_g.reset_index()['timestamp']).sum()/1.e9
        implicit_delay += (_ask_g.iloc[1:].reset_index()['timestamp'] - _fin_g.iloc[:-1].reset_index()['timestamp']).sum()/1.e9

explicit_delay, implicit_delay

In [ ]:
_st = df_stamps_1633602
_st_b0 = _st[_st['benchmark_number'] == 0]

_jd = _st_b0['stamp_type'].str.contains('job done')

_st_b0_jd_ask = _st_b0[_jd & _st_b0['stamp_type'].str.contains('asked')]
_st_b0_jd_sta = _st_b0[_jd & _st_b0['stamp_type'].str.contains('started')]
_st_b0_jd_fin = _st_b0[_jd & _st_b0['stamp_type'].str.contains('finished')]

explicit_delay = 0
implicit_delay = 0
for g in _st_b0['gradient number'].unique():
    for w in _st_b0['worker_id'].unique():
        _sta_g = _st_b0_jd_sta[(_st_b0_jd_sta['gradient number'] == g) & (_st_b0_jd_sta['worker_id'] == w)]
        _ask_g = _st_b0_jd_ask[(_st_b0_jd_ask['gradient number'] == g) & (_st_b0_jd_ask['worker_id'] == w)]
        _fin_g = _st_b0_jd_fin[(_st_b0_jd_fin['gradient number'] == g) & (_st_b0_jd_fin['worker_id'] == w)]
        explicit_delay += (_sta_g.reset_index()['timestamp'] - _ask_g.reset_index()['timestamp']).sum()/1.e9
        implicit_delay += (_ask_g.iloc[1:].reset_index()['timestamp'] - _fin_g.iloc[:-1].reset_index()['timestamp']).sum()/1.e9

explicit_delay, implicit_delay

In [ ]:
_st = df_stamps_1633603
_st_b0 = _st[_st['benchmark_number'] == 0]

_jd = _st_b0['stamp_type'].str.contains('job done')

_st_b0_jd_ask = _st_b0[_jd & _st_b0['stamp_type'].str.contains('asked')]
_st_b0_jd_sta = _st_b0[_jd & _st_b0['stamp_type'].str.contains('started')]
_st_b0_jd_fin = _st_b0[_jd & _st_b0['stamp_type'].str.contains('finished')]

explicit_delay = 0
implicit_delay = 0
for g in _st_b0['gradient number'].unique():
    for w in _st_b0['worker_id'].unique():
        _sta_g = _st_b0_jd_sta[(_st_b0_jd_sta['gradient number'] == g) & (_st_b0_jd_sta['worker_id'] == w)]
        _ask_g = _st_b0_jd_ask[(_st_b0_jd_ask['gradient number'] == g) & (_st_b0_jd_ask['worker_id'] == w)]
        _fin_g = _st_b0_jd_fin[(_st_b0_jd_fin['gradient number'] == g) & (_st_b0_jd_fin['worker_id'] == w)]
        explicit_delay += (_sta_g.reset_index()['timestamp'] - _ask_g.reset_index()['timestamp']).sum()/1.e9
        implicit_delay += (_ask_g.iloc[1:].reset_index()['timestamp'] - _fin_g.iloc[:-1].reset_index()['timestamp']).sum()/1.e9

explicit_delay, implicit_delay

That's all not that spectacular, except the 32 worker run, but that might also be due to actually having 34 processes to run (incl queue and master).

Let's now sum up the delays caused by having to wait for one of the workers at the end + the corresponding effect at the beginning (the latter effect is a lot smaller, the workers start more synchronously than they end).

There are several ways you can measure the impact of these worker idle times.

  • You can sum the total time that all workers wait, which gives you a sum total of all the potential calculation time you could have used while the slowest worker was still working.
  • But this doesn't tell you how fast it could have actually been if load balancing were perfect. To get the time you could have saved if you had perfect load balancing, divide the above number by the number of workers. This is the actual time lost due to imperfect load balancing.

In [ ]:
_st = df_stamps_1633601
for b in _st['benchmark_number'].dropna().unique():
    _st_b0 = _st[_st['benchmark_number'] == b]

    _jd = _st_b0['stamp_type'].str.contains('job done')

    _st_b0_jd_ask = _st_b0[_jd & _st_b0['stamp_type'].str.contains('asked')]
    _st_b0_jd_sta = _st_b0[_jd & _st_b0['stamp_type'].str.contains('started')]
    _st_b0_jd_fin = _st_b0[_jd & _st_b0['stamp_type'].str.contains('finished')]

    start_delay = 0
    finish_delay = 0
    for g in _st_b0['gradient number'].dropna().unique():
        min_start = _st_b0_jd_sta[(_st_b0_jd_sta['gradient number'] == g)]['timestamp'].min()
        max_finish = _st_b0_jd_fin[(_st_b0_jd_fin['gradient number'] == g)]['timestamp'].max()
        for w in _st_b0['worker_id'].dropna().unique():
            start_delay += (_st_b0_jd_sta[(_st_b0_jd_sta['gradient number'] == g)
                                          & (_st_b0_jd_sta['worker_id'] == w)
                                         ]['timestamp'].min() - min_start) / 1.e9
            finish_delay += (max_finish - _st_b0_jd_fin[(_st_b0_jd_fin['gradient number'] == g)
                                           & (_st_b0_jd_fin['worker_id'] == w)
                                          ]['timestamp'].max()) / 1.e9
    print('benchmark', b, 'with', len(_st_b0['worker_id'].dropna().unique()), 'workers')
    print('start delay: ', start_delay)
    print('finish delay:', finish_delay)
    print('total delay: ', start_delay + finish_delay)
    print('time lost due to imperfect load balancing: ', (start_delay + finish_delay) / len(_st_b0['worker_id'].dropna().unique()))

Ok, so that is negligible.

CPU time

Suggestion by Wouter: compare the partial derivative wall time with the CPU time. This way we can find out whether something is really being delayed in the calculation itself, or whether we should look elsewhere.

Did another acat19.cpp run including CPU timing, now with 8 workers and without the mu = 1.5 line.

Have to again modify the functions for this.


In [ ]:
def build_df_split_timing_run(timing_grouped_lines_list, line_search_lines, update_queue, update_worker, terminate_line):
    data = {'time [s]': [], 'timing_type': [], 'worker_id': [], 'task': []}

    for gradient_call_timings in timing_grouped_lines_list:
        words = gradient_call_timings['gradient_total'].split()

        data['time [s]'].append(float(words[4][:-2]))
        data['timing_type'].append('update state')
        data['worker_id'].append(None)
        data['task'].append(None)

        data['time [s]'].append(float(words[11][:-1]))
        data['timing_type'].append('gradient work')
        data['worker_id'].append(None)
        data['task'].append(None)
        
        for partial_derivative_line in gradient_call_timings['partial_derivatives']:
            words = partial_derivative_line.split()
            try:
                data['worker_id'].append(words[4][:-1])
                data['task'].append(words[6][:-1])
                data['time [s]'].append(float(words[10][:-1]))
                data['timing_type'].append('partial derivative')
                
                if len(words) > 13:
                    data['worker_id'].append(words[4][:-1])
                    data['task'].append(words[6][:-1])
                    data['time [s]'].append(float(words[13][:-1]))
                    data['timing_type'].append('partial derivative CPU time')
            except ValueError as e:
                print(words)
                raise e

    for line_search_line in line_search_lines:
        words = line_search_line.split()
        data['time [s]'].append(float(words[1][:-1]))
        data['timing_type'].append('line search')
        data['worker_id'].append(None)
        data['task'].append(None)

    for line in update_queue:
        words = line.split()
        data['time [s]'].append(float(words[6][:-1]))
        data['timing_type'].append('update_real on queue')
        data['worker_id'].append(None)
        data['task'].append(None)

    for line in update_worker:
        words = line.split()
        data['time [s]'].append(float(words[7][:-1]))
        data['timing_type'].append('update_real on worker')
        data['worker_id'].append(int(words[6][:-1]))
        data['task'].append(None)

    words = terminate_line.split()
    data['time [s]'].append(float(words[4][:-1]))
    data['timing_type'].append('terminate')
    data['worker_id'].append(None)
    data['task'].append(None)

    return pd.DataFrame(data)

In [ ]:
fn_acat19_1552664372 = '/Users/pbos/projects/apcocsm/code/acat19_1552664372.out'

df_split_timings_acat19_1552664372, df_stamps_acat19_1552664372 = build_comb_df_split_timing_info(fn_acat19_1552664372,
                                                                                 extract_fcn=load_acat19_out)

In [ ]:
df_split_timings_acat19_1552664372[df_split_timings_acat19_1552664372['timing_type'] == 'partial derivative']['time [s]'].sum(),\
df_split_timings_acat19_1552664372[df_split_timings_acat19_1552664372['timing_type'] == 'partial derivative CPU time']['time [s]'].sum()

Those are almost completely equal, which is good news: we don't have to look anywhere else, like in scheduling or something. There is really something strange going on in the partial derivatives themselves that makes their total time grow.

Note that these numbers in absolute sense are not comparable to the earlier big fit runs, because those were on the schudeled nodes, and this was run on the stoomboot headnode! Indeed, that seems to have 2400 MHz cores, while google benchmark reports 1996 MHz (weird, but ok) for the compute nodes.

What about the big REST block?

We may be onto the culprit. Put timing around the SetInitialGradient function...

Shit, nope. See the acat19_1552671011.out file. Three times ~0.0003 seconds. That's nothing. In hindsight, it makes sense: SetInitialGradient never calls the function (the likelihood), so it's not at all expensive.

Wouter suspected that somehow Simplex gets run somewhere before Migrad, but I haven't been able to find it anywhere... at least not starting from RooMinimizer::migrad. Maybe it is somewhere else, perhaps somewhere in the Migrad/Minuit setup.

We could time the Migrad Seed creation to be sure...

But maybe it's easier to just throw in some timestamps, then at least we can pin down where in migrad things are happening.


In the meantime, why not take a look at those...

update_state timestamps


In [ ]:
def group_timing_lines(bm_iteration_lines):
    """
    Group lines (from one benchmark iteration) by gradient call,
    specifying:
    - Update time on master
    - update_real times on queue and workers
    - Gradient work time
    - For all partial derivatives a sublist of all lines
    - After each gradient block, there may be line_search times,
      these are also included as a sublist
    Finally, the terminate time for the entire bm_iteration is also
    returned (last line in the list).
    
    In each gradient, also timestamps are printed. These are not
    further subdivided in this function, but are output as part of
    the `gradient_calls` list for further processing elsewhere.
    """
    gradient_calls = []

    start_indices = []
    end_indices = []
    line_search_lines = []
    update_queue = []
    update_worker = []
    # flag to check whether we are still in the same gradient call
    in_block = False
    for ix, line in enumerate(bm_iteration_lines[:-1]):  # -1: leave out terminate line
        if not in_block and (line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id'):
            start_indices.append(ix)
            in_block = True
        elif 'update_state' in line:
            end_indices.append(ix)
            in_block = False
        # the rest has nothing to do with the gradient call block, so we don't touch in_block there:
        elif 'line_search' in line:
            line_search_lines.append(line)
        elif 'update_real on queue' in line:
            update_queue.append(line)
        elif 'update_real on worker' in line:
            update_worker.append(line)
        elif 'start migrad' in line:
            start_migrad_line = line
        elif 'end migrad' in line:
            end_migrad_line = line
    
    if len(start_indices) != len(end_indices):
        raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
        
    for ix in range(len(start_indices)):
        partial_derivatives, timestamps = separate_partderi_job_time_lines(bm_iteration_lines[start_indices[ix]:end_indices[ix]])
        gradient_calls.append({
            'gradient_total': bm_iteration_lines[end_indices[ix]],
            'partial_derivatives': partial_derivatives,
            'timestamps': timestamps
        })

    try:
        terminate_line = bm_iteration_lines[-1]
    except IndexError:
        terminate_line = None
        
    special_lines = dict(terminate_line=terminate_line,
                         start_migrad_line=start_migrad_line,
                         end_migrad_line=end_migrad_line)

    return gradient_calls, line_search_lines, update_queue, update_worker, special_lines


def build_df_stamps(grouped_lines, special_lines, update_queue, update_worker):
    data = {'timestamp': [], 'stamp_type': [], 'worker_id': []}

    words = special_lines["start_migrad_line"].split()
    shift = 3 if '[#' in words[0] else 0
    data['timestamp'].append(int(words[3 + shift]))
    data['stamp_type'].append('start migrad')
    data['worker_id'].append(None)

    if len(words) > 10:
        NumCPU = int(words[10])
    else:
        NumCPU = 0

    words = special_lines["end_migrad_line"].split()
    shift = 3 if '[#' in words[0] else 0
    data['timestamp'].append(int(words[3 + shift]))
    data['stamp_type'].append('end migrad')
    data['worker_id'].append(None)

    for gradient_group in grouped_lines:
        for line in gradient_group['timestamps']:
            if 'no work' in line:
                words = line.split()
                shift = 3 if '[#' in words[0] else 0
                data['worker_id'].append(int(words[3 + shift]))
                data['timestamp'].append(int(words[6 + shift]))
                data['stamp_type'].append('no job - asked')

                data['worker_id'].append(int(words[3 + shift]))
                data['timestamp'].append(int(words[11 + shift]))
                data['stamp_type'].append('no job - denied')
            elif 'job done' in line:
                words = line.split()
                shift = 3 if '[#' in words[0] else 0
                data['worker_id'].append(int(words[3 + shift]))
                data['timestamp'].append(int(words[6 + shift][:-1]))
                data['stamp_type'].append('job done - asked')

                data['worker_id'].append(int(words[3 + shift]))
                data['timestamp'].append(int(words[9 + shift]))
                data['stamp_type'].append('job done - started')

                data['worker_id'].append(int(words[3 + shift]))
                data['timestamp'].append(int(words[13 + shift]))
                data['stamp_type'].append('job done - finished')
            else:
                raise Exception("got a weird line:\n" + line)

    for line in update_queue:
        words = line.split()
        shift = 3 if '[#' in words[0] else 0
        data['worker_id'].append(None)
        data['timestamp'].append(int(words[5 + shift]))
        data['stamp_type'].append('update_real queue start')

        data['worker_id'].append(None)
        data['timestamp'].append(int(words[7 + shift][:-3]))
        data['stamp_type'].append('update_real queue end')

    for line in update_worker:
        words = line.split()
        shift = 3 if '[#' in words[0] else 0
        data['worker_id'].append(int(words[3 + shift][:-1]))
        data['timestamp'].append(int(words[6 + shift]))
        data['stamp_type'].append('update_real worker start')

        data['worker_id'].append(int(words[3 + shift][:-1]))
        data['timestamp'].append(int(words[8 + shift][:-3]))
        data['stamp_type'].append('update_real worker end')

    return pd.DataFrame(data), NumCPU


def build_dflist_split_timing_info(fn, extract_fcn=extract_split_timing_info):
    bm_iterations = extract_fcn(fn)

    dflist = []
    dflist_stamps = []
    NumCPU_list = []
    for bm in bm_iterations:
        grouped_lines, line_search_lines, update_queue, update_worker, special_lines = group_timing_lines(bm)
        if special_lines["terminate_line"] is not None:
            dflist.append(build_df_split_timing_run(grouped_lines, line_search_lines, update_queue, update_worker, special_lines["terminate_line"]))

            df_stamps, NumCPU = build_df_stamps(grouped_lines, special_lines, update_queue, update_worker)
            dflist_stamps.append(df_stamps)
            NumCPU_list.append(NumCPU)

    return dflist, dflist_stamps, NumCPU_list


def build_comb_df_split_timing_info(fn, extract_fcn=extract_split_timing_info):
    dflist, dflist_stamps, NumCPU_list = build_dflist_split_timing_info(fn, extract_fcn=extract_fcn)

    for ix, df in enumerate(dflist):
        df_pardiff = df[df["timing_type"] == "partial derivative"]
        N_tasks = len(df_pardiff["task"].unique())
        N_gradients = len(df_pardiff) // N_tasks
        gradient_indices = np.hstack(i * np.ones(N_tasks, dtype='int') for i in range(N_gradients))

        df["gradient number"] = pd.Series(dtype='Int64')
        df.loc[df["timing_type"] == "partial derivative", "gradient number"] = gradient_indices
        
        dflist_stamps[ix]["gradient number"] = pd.Series(dtype='Int64')
        dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - asked", "gradient number"] = gradient_indices
        dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - started", "gradient number"] = gradient_indices
        dflist_stamps[ix].loc[dflist_stamps[ix]["stamp_type"] == "job done - finished", "gradient number"] = gradient_indices

        df["benchmark_number"] = ix
        dflist_stamps[ix]["benchmark_number"] = ix

    # assuming the stamps are ordered properly, which I'm pretty sure is correct,
    # we can do ffill:
    df_stamps = pd.concat(dflist_stamps)
    df_stamps.loc[~df_stamps['stamp_type'].str.contains('migrad'), 'gradient number'] = df_stamps.loc[~df_stamps['stamp_type'].str.contains('migrad'), 'gradient number'].fillna(method='ffill')
        
    return pd.concat(dflist), df_stamps

In [ ]:
# _, df_stamps_1633603 = build_comb_df_split_timing_info('../rootbench/1633603.burrell.nikhef.nl.out')

In [ ]:
_, df_stamps_1633602 = build_comb_df_split_timing_info('../rootbench/1633602.burrell.nikhef.nl.out')

In [ ]:
_, df_stamps_1633601 = build_comb_df_split_timing_info('../rootbench/1633601.burrell.nikhef.nl.out')

In [ ]:
_st = df_stamps_1633601
_st['stamp_type'].unique()

update_real queue timeline


In [ ]:
_st = df_stamps_1633601

_bench_nr = (_st['benchmark_number'] == 0)

_start_st = _st[_bench_nr
                              & (_st['stamp_type'] == 'start migrad')
                             ]['timestamp'].iloc[0]
_end_st = _st[_bench_nr
                              & (_st['stamp_type'] == 'end migrad')
                             ]['timestamp'].iloc[0]

_sta = _st[_bench_nr
                         & (_st['stamp_type'] == 'update_real queue start')
                        ]#['timestamp'] - _start_st
_fin = _st[_bench_nr
                         & (_st['stamp_type'] == 'update_real queue end')
                        ]#['timestamp'] - _start_st

fig, ax = plt.subplots(1, 1, figsize=(20, 5))
# ax.set_ylabel('worker')
ax.set_xlim((0, _end_st - _start_st))

# worker = _fin.iloc[ix]["worker_id"]
ax.plot(_sta["timestamp"] - _start_st,
        np.zeros_like(_sta["timestamp"]),
        color='blue', marker='|', linestyle='', alpha=0.5)
ax.plot(_fin["timestamp"] - _start_st,
        np.zeros_like(_sta["timestamp"]),
        color='red', marker='|', linestyle='', alpha=0.5)

Cool, this works a lot better than the bar plots, at least we can show everything like this.

Let's also try for the job stamps:


In [ ]:
_st = df_stamps_1633601

_bench_nr = (_st['benchmark_number'] == 0)

_start_st = _st[_bench_nr
                              & (_st['stamp_type'] == 'start migrad')
                             ]['timestamp'].iloc[0]
_end_st = _st[_bench_nr
                              & (_st['stamp_type'] == 'end migrad')
                             ]['timestamp'].iloc[0]

_sta = _st[_bench_nr
                         & (_st['stamp_type'].str.contains('started'))
                        ]#['timestamp'] - _start_st
_fin = _st[_bench_nr
                         & (_st['stamp_type'].str.contains('finished'))
                        ]#['timestamp'] - _start_st

fig, ax = plt.subplots(1, 1, figsize=(20, 5))
ax.set_ylabel('worker')
ax.set_xlim((0, _end_st - _start_st))

ax.plot(_sta["timestamp"] - _start_st, _sta["worker_id"],
        color='green', marker='|', linestyle='', alpha=0.5)
ax.plot(_fin["timestamp"] - _start_st, _fin["worker_id"],
        color='blue', marker='|', linestyle='', alpha=0.5)

Nice! Let's combine:


In [ ]:
def plot_timestamps(_st, bench_nr=0, figsize=(20, 5)):
    vert_offset = 0.1

    _bench_nr = (_st['benchmark_number'] == bench_nr)

    _start_st = _st[_bench_nr & (_st['stamp_type'] == 'start migrad')]['timestamp'].iloc[0]
    _end_st = _st[_bench_nr & (_st['stamp_type'] == 'end migrad')]['timestamp'].iloc[0]

    fig, ax = plt.subplots(1, 1, figsize=figsize)
    ax.set_ylabel('worker')
    ax.set_xlim((0, _end_st - _start_st))

    # update @ queue
    _sta = _st[_bench_nr & (_st['stamp_type'].str.contains('update_real queue start'))]
    _fin = _st[_bench_nr & (_st['stamp_type'].str.contains('update_real queue end'))]

    ax.plot(_sta["timestamp"] - _start_st, np.zeros_like(_sta["timestamp"]) - 1 - vert_offset,
            color='red', marker='|', linestyle='', alpha=0.5)
    ax.plot(_fin["timestamp"] - _start_st, np.zeros_like(_fin["timestamp"]) - 1 + vert_offset,
            color='red', marker='|', linestyle='', alpha=0.5)

    # update @ queue
    _sta = _st[_bench_nr & (_st['stamp_type'].str.contains('update_real worker start'))]
    _fin = _st[_bench_nr & (_st['stamp_type'].str.contains('update_real worker end'))]

    ax.plot(_sta["timestamp"] - _start_st, _sta["worker_id"] - vert_offset,
            color='red', marker='|', linestyle='', alpha=0.5)
    ax.plot(_fin["timestamp"] - _start_st, _fin["worker_id"] + vert_offset,
            color='red', marker='|', linestyle='', alpha=0.5)

    # jobs
    _sta = _st[_bench_nr & (_st['stamp_type'].str.contains('started'))]
    _fin = _st[_bench_nr & (_st['stamp_type'].str.contains('finished'))]

    ax.plot(_sta["timestamp"] - _start_st, _sta["worker_id"] - vert_offset,
            color='blue', marker='|', linestyle='', alpha=0.8)
    ax.plot(_fin["timestamp"] - _start_st, _fin["worker_id"] + vert_offset,
            color='blue', marker='|', linestyle='', alpha=0.8)

    # out of jobs!
    _sta = _st[_bench_nr & (_st['stamp_type'].str.contains('no job - asked'))]
    _fin = _st[_bench_nr & (_st['stamp_type'].str.contains('no job - denied'))]

    ax.plot(_sta["timestamp"] - _start_st, _sta["worker_id"] - vert_offset,
            color='grey', marker='|', linestyle='', alpha=0.3)
    ax.plot(_fin["timestamp"] - _start_st, _fin["worker_id"] + vert_offset,
            color='grey', marker='|', linestyle='', alpha=0.3)
    
    # migrad timestamps
    migrad = _st[_bench_nr & (_st['stamp_type'].str.contains('migrad timestamp'))]
    
    ax.plot(migrad["timestamp"] - _start_st, np.zeros_like(migrad["timestamp"]) - 2,
            color='black', marker='|', linestyle='')

    # setfcn timestamps
    _sta = _st[_bench_nr & (_st['stamp_type'].str.contains('start SetFCN'))]
    _fin = _st[_bench_nr & (_st['stamp_type'].str.contains('end SetFCN'))]
    
    ax.plot(_sta["timestamp"] - _start_st, np.zeros_like(_sta["timestamp"]) - 2 - vert_offset,
            color='violet', marker='|', linestyle='', alpha=0.9)
    ax.plot(_fin["timestamp"] - _start_st, np.zeros_like(_fin["timestamp"]) - 2 + vert_offset,
            color='violet', marker='|', linestyle='', alpha=0.9)
    
    # GradFcnSynchronize timestamps
    GradFcnSynchronize = _st[_bench_nr & (_st['stamp_type'].str.contains('GradFcnSynchronize timestamp'))]
    
    ax.plot(GradFcnSynchronize["timestamp"] - _start_st, np.zeros_like(GradFcnSynchronize["timestamp"]) - 2,
            color='tab:green', marker='|', linestyle='', alpha=0.9)
    
    # setup_differentiate timestamps
    _this = _st[_bench_nr & (_st['stamp_type'].str.contains('setup_differentiate timestamps'))]
    
    ax.plot(_this["timestamp"] - _start_st, _this['worker_id'],
            color='tab:orange', marker='|', linestyle='', alpha=0.9)

    return fig, ax

In [ ]:
plot_timestamps(df_stamps_1633601)

Ok, interesting observation: there is actually a long job at the start of every gradient calculation iteration on every worker. This does hint quite strongly at either recalculation of the cache or perhaps of something to do with the sync_parameters stuff... We should measure this first task of each gradient in more detail, because this might very well explain (a big part of) the non-scaling of the gradient!

After the blue but before the red parts the line search probably takes place. Don't have timestamps for them, but they take between about 0.7 and 0.9 seconds, that fits nicely with those gaps.

Does ZeroMQ performance explain the update state durations?

Did some measurements of the ZeroMQ performance using the provided performance measurement tools. At least on my Macbook, the red parts should take about 0.1s for 8 workers, so this ~1 second is a bit mysterious. It could be that the performance is different on Stoomboot though...

No, actually it turns out the performance of the Stoomboot headnode (where these runs were done) is a bit higher than my Macbook! So it should be even more than a factor 10 faster!

16 cores

Is it the case that by coincidence the first 8 tasks are longer than the rest?


In [ ]:
plot_timestamps(df_stamps_1633602)

Todo

Discussed with Wouter, we can make two quick tests to see what's going on here:

  • For the longer initial tasks: let's measure this on an N-dim Gaussian (128 or something) so that we can exclude computational causes, because all components should then take exactly the same amount of time.

  • For the communication overhead, as a quick test for the ZMQ throughput: send twice the amount of data to see if the time indeed increases.

N-D gaussian, run on stoomboot head node


In [ ]:
# _, df_stamps_acat_ND_gauss_1553700233 = build_comb_df_split_timing_info('../acat19_ND_gauss_1553700233.out', extract_fcn=load_acat19_out)

Oh, that one doesn't have migrad start and end timestamps... fixed that in the next one, plus new loading function that doesn't add dummy lines:


In [ ]:
def load_acat19_out_v2(fn):
    """
    Just single migrad runs, so no need for further splitting by run.
    Does add dummy terminate line, because the other functions expect
    this. No more dummy start and end migrad lines.
    """
    with open(fn, 'r') as fh:
        lines = fh.read().splitlines()
    print(lines[-1])
    lines.append('[#0] DEBUG: -- terminate: 0.0s')    
    return [lines]

In [ ]:
_, df_stamps_acat_ND_gauss_1553701454 = build_comb_df_split_timing_info('../acat19_ND_gauss_1553701454.out', extract_fcn=load_acat19_out_v2)

In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_ND_gauss_1553701454)
ax.set_xlim(0, 0.2e10)

Let's try without randomized parameters, hopefully the run will be a bit shorter then.


In [ ]:
_, df_stamps_acat_ND_gauss_1553754554 = build_comb_df_split_timing_info('../acat19_ND_gauss_1553754554.out', extract_fcn=load_acat19_out_v2)

In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_ND_gauss_1553754554)
ax.set_xlim(0, 0.2e10)

Nope, still pretty long. Same picture though, indeed also here there's a slight increase in the runtime for the first tasks of the run.

Could this scale with the number of parameters? The above runs were with 128 parameters, Let's try a run with 1024.


In [ ]:
_, df_stamps_acat_ND_gauss_1553755013 = build_comb_df_split_timing_info('../acat19_ND_gauss_1553755013.out', extract_fcn=load_acat19_out_v2)

In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_ND_gauss_1553755013)
# ax.set_xlim(0, 0.2e10)

Ok, bit weird, only one iteration in this case... maybe reactivate parameter randomization here.

Oh, one other thing, I added the "migrad timestamps", should read those in!


In [ ]:
def group_timing_lines(bm_iteration_lines):
    """
    Group lines (from one benchmark iteration) by gradient call,
    specifying:
    - Update time on master
    - update_real times on queue and workers
    - Gradient work time
    - For all partial derivatives a sublist of all lines
    - After each gradient block, there may be line_search times,
      these are also included as a sublist
    Finally, the terminate time for the entire bm_iteration is also
    returned (last line in the list).
    
    In each gradient, also timestamps are printed. These are not
    further subdivided in this function, but are output as part of
    the `gradient_calls` list for further processing elsewhere.
    """
    gradient_calls = []

    start_indices = []
    end_indices = []
    line_search_lines = []
    update_queue = []
    update_worker = []
    setfcn_timestamps = []
    setup_differentiate_timestamps = []
    
    # to prevent UnboundLocalErrors in older data files:
    GradFcnSynchronize_timestamps = None

    # flag to check whether we are still in the same gradient call
    in_block = False
    for ix, line in enumerate(bm_iteration_lines[:-1]):  # -1: leave out terminate line
        if not in_block and (line[:9] == 'worker_id' or line[:24] == '[#0] DEBUG: -- worker_id'):
            start_indices.append(ix)
            in_block = True
        elif 'update_state' in line:
            end_indices.append(ix)
            in_block = False
        # the rest has nothing to do with the gradient call block, so we don't touch in_block there:
        elif 'line_search' in line:
            line_search_lines.append(line)
        elif 'update_real on queue' in line:
            update_queue.append(line)
        elif 'update_real on worker' in line:
            update_worker.append(line)
        elif 'start migrad' in line:
            start_migrad_line = line
        elif 'end migrad' in line:
            end_migrad_line = line
        elif 'migrad timestamps' in line:
            migrad_timestamps = line
        elif 'Fitter::SetFCN timestamps' in line:
            setfcn_timestamps.append(line)
        elif 'RooGradientFunction::synchronize_parameter_settings timestamps' in line:
            GradFcnSynchronize_timestamps = line
        elif 'NumericalDerivatorMinuit2::setup_differentiate' in line:
            setup_differentiate_timestamps.append(line)
    
    if len(start_indices) != len(end_indices):
        raise Exception(f"Number of start and end indices unequal (resp. {len(start_indices)} and {len(end_indices)})!")
        
    for ix in range(len(start_indices)):
        partial_derivatives, timestamps = separate_partderi_job_time_lines(bm_iteration_lines[start_indices[ix]:end_indices[ix]])
        gradient_calls.append({
            'gradient_total': bm_iteration_lines[end_indices[ix]],
            'partial_derivatives': partial_derivatives,
            'timestamps': timestamps
        })

    try:
        terminate_line = bm_iteration_lines[-1]
    except IndexError:
        terminate_line = None
        
    special_lines = dict(terminate_line=terminate_line,
                         start_migrad_line=start_migrad_line,
                         end_migrad_line=end_migrad_line,
                         migrad_timestamps=migrad_timestamps,
                         setfcn_timestamps=setfcn_timestamps,
                         GradFcnSynchronize_timestamps=GradFcnSynchronize_timestamps,
                         setup_differentiate_timestamps=setup_differentiate_timestamps)

    return gradient_calls, line_search_lines, update_queue, update_worker, special_lines


def build_df_stamps(grouped_lines, special_lines, update_queue, update_worker):
    data = {'timestamp': [], 'stamp_type': [], 'worker_id': []}

    words = special_lines["start_migrad_line"].split()
    shift = 3 if '[#' in words[0] else 0
    data['timestamp'].append(int(words[3 + shift]))
    data['stamp_type'].append('start migrad')
    data['worker_id'].append(None)

    if len(words) > 10:
        NumCPU = int(words[10])
    else:
        NumCPU = 0

    words = special_lines["end_migrad_line"].split()
    shift = 3 if '[#' in words[0] else 0
    data['timestamp'].append(int(words[3 + shift]))
    data['stamp_type'].append('end migrad')
    data['worker_id'].append(None)

    words = special_lines["migrad_timestamps"].split()
    shift = 3 if '[#' in words[0] else 0
    for word in words[shift + 2:]:
        data['timestamp'].append(int(word))
        data['stamp_type'].append('migrad timestamp')
        data['worker_id'].append(None)
    
    if special_lines["GradFcnSynchronize_timestamps"] is not None:
        words = special_lines["GradFcnSynchronize_timestamps"].split()
        shift = 3 if '[#' in words[0] else 0
        for word in words[shift + 2:]:
            data['timestamp'].append(int(word))
            data['stamp_type'].append('GradFcnSynchronize timestamp')
            data['worker_id'].append(None)

    for line in special_lines["setfcn_timestamps"]:
        words = line.split()
        shift = 3 if '[#' in words[0] else 0
        
        data['timestamp'].append(int(words[shift + 2]))
        data['stamp_type'].append('start SetFCN')
        data['worker_id'].append(None)

        data['timestamp'].append(int(words[shift + 3]))
        data['stamp_type'].append('end SetFCN')
        data['worker_id'].append(None)


    for line in special_lines["setup_differentiate_timestamps"]:
        words = line.split()
        shift = 3 if '[#' in words[0] else 0
        worker = int(words[shift + 3][:-1])
        for word in words[shift + 5:]:
            data['timestamp'].append(int(word))
            data['stamp_type'].append('setup_differentiate timestamps')
            data['worker_id'].append(worker)


    for gradient_group in grouped_lines:
        for line in gradient_group['timestamps']:
            if 'no work' in line:
                words = line.split()
                shift = 3 if '[#' in words[0] else 0
                data['worker_id'].append(int(words[3 + shift]))
                data['timestamp'].append(int(words[6 + shift]))
                data['stamp_type'].append('no job - asked')

                data['worker_id'].append(int(words[3 + shift]))
                data['timestamp'].append(int(words[11 + shift]))
                data['stamp_type'].append('no job - denied')
            elif 'job done' in line:
                words = line.split()
                shift = 3 if '[#' in words[0] else 0
                data['worker_id'].append(int(words[3 + shift]))
                data['timestamp'].append(int(words[6 + shift][:-1]))
                data['stamp_type'].append('job done - asked')

                data['worker_id'].append(int(words[3 + shift]))
                data['timestamp'].append(int(words[9 + shift]))
                data['stamp_type'].append('job done - started')

                data['worker_id'].append(int(words[3 + shift]))
                data['timestamp'].append(int(words[13 + shift]))
                data['stamp_type'].append('job done - finished')
            elif 'NumericalDerivatorMinuit2::setup_differentiate' in line:
                pass
            elif 'fVal on worker' in line or 'fVal after line search' in line:
                pass
            else:
                raise Exception("got a weird line:\n" + line)

    for line in update_queue:
        words = line.split()
        shift = 3 if '[#' in words[0] else 0
        data['worker_id'].append(None)
        data['timestamp'].append(int(words[5 + shift]))
        data['stamp_type'].append('update_real queue start')

        data['worker_id'].append(None)
        data['timestamp'].append(int(words[7 + shift][:-3]))
        data['stamp_type'].append('update_real queue end')

    for line in update_worker:
        words = line.split()
        shift = 3 if '[#' in words[0] else 0
        data['worker_id'].append(int(words[3 + shift][:-1]))
        data['timestamp'].append(int(words[6 + shift]))
        data['stamp_type'].append('update_real worker start')

        data['worker_id'].append(int(words[3 + shift][:-1]))
        data['timestamp'].append(int(words[8 + shift][:-3]))
        data['stamp_type'].append('update_real worker end')

    return pd.DataFrame(data), NumCPU

In [ ]:
_, df_stamps_acat_ND_gauss_1553754554 = build_comb_df_split_timing_info('../acat19_ND_gauss_1553754554.out', extract_fcn=load_acat19_out_v2)

In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_ND_gauss_1553754554)
ax.set_xlim(-1e9, 3e10)

In [ ]:
_start_st = df_stamps_acat_ND_gauss_1553754554[df_stamps_acat_ND_gauss_1553754554['stamp_type'] == "start migrad"]
df_stamps_acat_ND_gauss_1553754554[df_stamps_acat_ND_gauss_1553754554['stamp_type'] == "migrad timestamp"]['timestamp'] - int(_start_st["timestamp"])

Ok, clearly the only dominant part of migrad is FitFCN. New run with timestamps inside there to measure SetFCN...


In [ ]:
_, df_stamps_acat_1553758901 = build_comb_df_split_timing_info('../acat19_1553758901.out', extract_fcn=load_acat19_out_v2)

In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_1553758901)

Ok, so SetFCN is not the main problem here, though it might be the cause of the two ZMQ threads problem.

But, wait a minute there, that's a pretty significant gap over there in the migrad timestamps all of a sudden! Synchronize is the major rest-term culprit!


In [ ]:
_start_st = df_stamps_acat_1553758901[df_stamps_acat_1553758901['stamp_type'] == "start migrad"]
df_stamps_acat_1553758901[df_stamps_acat_1553758901['stamp_type'] == "migrad timestamp"]['timestamp'] - int(_start_st["timestamp"])

Ok, added some timestamps there as well...


In [ ]:
_, df_stamps_acat_1553763665 = build_comb_df_split_timing_info('../acat19_1553763665.out', extract_fcn=load_acat19_out_v2)

In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_1553763665)

In [ ]:
_start_st = int(df_stamps_acat_1553763665[df_stamps_acat_1553763665['stamp_type'] == "start migrad"]["timestamp"])
df_stamps_acat_1553763665[df_stamps_acat_1553763665['stamp_type'].str.contains('Grad')]["timestamp"] - _start_st

Ok, interesting, the first part is the first big loop in synchronize_parameter_settings, but the second part is not the second big loop, but the copying of parameters from one list to the other! Both take about the same time and the rest is insignificant.


In [ ]:
# _, df_stamps_acat_1553778980 = build_comb_df_split_timing_info('../acat19_1553778980.out', extract_fcn=load_acat19_out_v2)

Oh, forgot to add worker_id there, again:


In [ ]:
_, df_stamps_acat_1553779882 = build_comb_df_split_timing_info('../acat19_1553779882.out', extract_fcn=load_acat19_out_v2)

In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_1553779882)
ax.set_xlim(2.01e10, 2.1e10)

Ok, there's definitely something there that's taking a while... But which part is it?


In [ ]:
def setup_differentiate_timestamps_duration_per_task(df, task_ix, worker_id=0):
    worker = (df["worker_id"] == worker_id)
    _df_w0 = df[worker]
    first_task_start = int(_df_w0[_df_w0["stamp_type"].str.contains('job done - started')]['timestamp'].iloc[task_ix])
    first_task_end = int(_df_w0[_df_w0["stamp_type"].str.contains('job done - finished')]['timestamp'].iloc[task_ix])
    first_task = (_df_w0['timestamp'] > first_task_start) & (_df_w0['timestamp'] < first_task_end)
    _st = _df_w0[_df_w0["stamp_type"].str.contains('setup_differentiate') & first_task]['timestamp']
    _st = _st - _st.iloc[0]
    return _st

In [ ]:
# worker 0, first task:
setup_differentiate_timestamps_duration_per_task(df_stamps_acat_1553779882, 0, worker_id=0)

In [ ]:
# worker 0, second task:
setup_differentiate_timestamps_duration_per_task(df_stamps_acat_1553779882, 1, worker_id=0)

Ok, so it's rather clear that most time in the first task goes to the stuff between t6 and t7, which is... as we guessed: the function call! fVal has to be calculated once at the start for that set of parameters. But wait, doesn't that hold for all tasks? I think it does.

To be sure, let's check out a few more tasks:


In [ ]:
def setup_differentiate_fVal_duration(df, task_ix, worker_id=0):
    _ts = setup_differentiate_timestamps_duration_per_task(df, task_ix, worker_id=worker_id)
    return _ts.iloc[6] - _ts.iloc[5]

In [ ]:
# all first tasks:
print([setup_differentiate_fVal_duration(df_stamps_acat_1553779882, 0, worker_id=w) for w in range(8)])

In [ ]:
# all 2-10th tasks:
print([setup_differentiate_fVal_duration(df_stamps_acat_1553779882, t, worker_id=w) for w in range(8) for t in range(1, 10)])

In [ ]:
# all 11-100th tasks:
print([setup_differentiate_fVal_duration(df_stamps_acat_1553779882, t, worker_id=w) for w in range(8) for t in range(10, 100)])

In [ ]:
# all 101-250th tasks:
print([setup_differentiate_fVal_duration(df_stamps_acat_1553779882, t, worker_id=w) for w in range(8) for t in range(100, 250)])

Yeah, seems pretty solid.

Now, what exactly causes this first fVal calculation to be so slow? Recalculation of all cached elements? If so, then you won't get out of this.

So then the only option left to make things scale better is to make communication times shorter.

Shorter communication times

  • The most obvious option to do this is to simply send around less stuff. We could send only the updated gradients (and hessians and stepsizes) that are necessary for a certain task. This could be sent along with the task, for instance. This would save on average 5 * 3 * N_tasks * (N_workers-1)/N_workers sends, i.e., for our example here, about:

In [ ]:
5 * 3 * 1600 * 7/8

Or, more simply put, it reduces the sending workload by a factor N_workers for the gradient updates. Unfortunately, there's still the parameter updates which have to be sent fully.

8 April

Benchmark with added none_have_been_calculated parameter


In [ ]:
_, df_stamps_acat_1554710451 = build_comb_df_split_timing_info('../acat19_1554710451.out', extract_fcn=load_acat19_out_v2)

In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_1554710451)

Hm, doesn't really seem to help that much, unfortunately... putting it next to an earlier run, it's more or less similar in communication time:


In [ ]:
fig, ax = plot_timestamps(df_stamps_acat_1553763665)

In [ ]: