In [ ]:
%matplotlib inline

In [ ]:
%load_ext autoreload

In [ ]:
%autoreload 3

In [ ]:
import numpy as np
import pandas as pd
import glob
import json
from collections import defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
import altair as alt
import bqplot.pyplot as blt
import egp.plot

In [ ]:
default_columns = ('bm_name', 'NumCPU', 'manual_time')

columns = {'BM_RooFit_BinnedMultiProcGradient': ('bm_name', 'bins', 'NumCPU', 'manual_time'),
           'BM_RooFit_1DUnbinnedGaussianMultiProcessGradMinimizer': default_columns,
           'BM_RooFit_NDUnbinnedGaussianMultiProcessGradMinimizer': ('bm_name', 'NumCPU', 'dims', 'manual_time'),
           'BM_RooFit_MP_GradMinimizer_workspace_file': default_columns,
           'BM_RooFit_RooMinimizer_workspace_file': ('bm_name', 'manual_time')
          }

In [ ]:
# def remove_outliers_from_df(df, col='real_time', group='NumCPU'):
#     values = df[col]
#     values_grouped = df.groupby(group)[col]
#     max_values_grouped = (values_grouped.mean() - np.nan_to_num(values_grouped.std())*3)

#     filter_index = values >= max_values_grouped[df[group]].reset_index(level=0)[col]
#     return df[filter_index]

In [ ]:
def load_result(it=-1, **kwargs):
    json_files = sorted(glob.glob('/Users/pbos/projects/apcocsm/roofit-dev/rootbench/cmake-build-debug/root/roofit/roofit/RoofitMultiproc_*.json'))
    return load_result_file(json_files[it], **kwargs)

def load_result_file(fn, show_dfs=False, figscale=6, plot_ideal=True, match_y_axes=False):
    dfs = {}

    with open(fn) as fh:
        raw = json.load(fh)

    print(raw['context'])
    benchmarks = defaultdict(list)
    benchmark_number = 0
    for bm in raw['benchmarks']:
        name = bm['name'].split('/')[0]
        # order of benchmarks is important for merging with less structured sources (stdout) later on
        # but only for real benchmarks, not the mean/median/stddev entries in the json file
        if bm['name'].split('_')[-1] not in ('mean', 'median', 'stddev'):
            bm['benchmark_number'] = benchmark_number
            benchmark_number += 1
        benchmarks[name].append(bm)

    fig, ax = egp.plot.subplots(len(benchmarks), wrap=3,
                               figsize=(len(benchmarks)*1.1*figscale, figscale),
                               squeeze=False)
    ax = ax.flatten()

    for ix, (name, bmlist) in enumerate(benchmarks.items()):
        dfs[name] = pd.DataFrame(bmlist)
        df_names = pd.DataFrame(dfs[name].name.str.slice(start=len("BM_RooFit_")).str.split('/').values.tolist(), columns=columns[name])
        for c in columns[name][1:-1]:
            df_names[c] = pd.to_numeric(df_names[c])
        dfs[name] = dfs[name].join(df_names)
        # Drop mean, median and std results, only keep normal timings (we do stats ourselves):
        dfs[name] = dfs[name][dfs[name]['manual_time'] == 'manual_time']
        dfs[name] = dfs[name].drop(['name', 'manual_time', 'cpu_time', 'iterations', 'time_unit'], axis=1)#, 'iterations'], axis=1)
        if show_dfs:
            display(dfs[name])
        
#         if remove_outliers:
#             dfs[name] = remove_outliers_from_df(dfs[name])
            
        if name == 'BM_RooFit_BinnedMultiProcGradient':
            hue = 'bins'
        elif name == 'BM_RooFit_NDUnbinnedGaussianMultiProcessGradMinimizer':
            hue = 'dims'
        else:
            hue = None
        
        # for single core runs, add NumCPU column:
        if 'NumCPU' not in dfs[name].columns:
            dfs[name]['NumCPU'] = 1
            if plot_ideal:
                print("Not plotting ideal, since this was only a single core run")
                plot_ideal = False
        
        dfs[name]["real or ideal"] = "real"
        
#         if hue is not None:
#             leg_handles, leg_labels = ax[ix].get_legend_handles_labels()
        
        ax[ix].set_title(name)
        
        if plot_ideal:
            if hue is not None:
                min_single_core = dfs[name][dfs[name]['NumCPU'] == 1].groupby(hue)['real_time'].min()
                df_ideal = min_single_core.to_frame('real_time')
                df_ideal.reset_index(level=0, inplace=True)
            else:
                min_single_core = dfs[name][dfs[name]['NumCPU'] == 1]['real_time'].min()
                df_ideal = pd.DataFrame({'real_time': [min_single_core]})
            numCPU = np.unique(dfs[name]['NumCPU'])
            numCPU.sort()
            df_numCPU = pd.Series(numCPU, name='NumCPU').to_frame()
            # necessary for doing a cross merge (cartesian product):
            df_numCPU['key'] = 1
            df_ideal['key'] = 1
            df_ideal = df_ideal.merge(df_numCPU, on='key', how='outer').drop('key', axis=1)
            df_ideal['real_time'] /= df_ideal['NumCPU']
            df_ideal['real or ideal'] = "ideal"
            dfs[name] = pd.concat([dfs[name], df_ideal], sort=False)
        
        dfs[name] = dfs[name].astype(dtype={'benchmark_number': 'Int64'})

        sns.lineplot(data=dfs[name], x='NumCPU', y='real_time', hue=hue, style="real or ideal",
                     markers=True, err_style="bars", legend='full',
                     ax=ax[ix])

#             sns.pointplot(data=df_ideal, x='NumCPU', y='ideal_time', hue=hue, ax=ax[ix], linestyles='--')
            
#             # remove duplicate legend labels by resetting to what they were before the ideal plot addition
#             if hue is not None:
#                 ax[ix].legend(leg_handles, leg_labels, title=hue)

    if match_y_axes:
        ymin, ymax = ax[0].get_ylim()
        for axi in ax:
            ymin = min(ymin, axi.get_ylim()[0])
            ymax = max(ymax, axi.get_ylim()[1])

        for axi in ax:
            axi.set_ylim((ymin, ymax))
    
    return dfs

In [ ]:
_ = load_result()

In [ ]:
dfs = load_result_file('stbc-i5_marks/RoofitMultiproc_1545134908.json')

In [ ]:
dfs.keys()

In [ ]:
# dfs['BM_RooFit_BinnedMultiProcGradient']

Stoomboot


In [ ]:
# op centos7 machine (zijn dus idd 64 cpus, had maar 8 gereserveerd!)
dfs = load_result_file('../rootbench/1461350.burrell.nikhef.nl/RoofitMPworkspace_1547478971.json')

Stoomboot: meerdere repetitions


In [ ]:
# op centos7 machine (nu wel ook 32 CPUs (= max) gereserveerd)
dfs = load_result_file('../rootbench/1464718.burrell.nikhef.nl/RoofitMPworkspace_1547613964.json')

In [ ]:
dfs

RooMinimizer also included

The first run was without optConst = 2 and Minuit2 not activated on RooMinimizer. Probably Minuit1 is what causes the RooMinimizer run to be slightly slower than the single core GradMinimizer case.


In [ ]:
# op centos7 machine (nu wel ook 32 CPUs (= max) gereserveerd)
dfs = load_result_file('../rootbench/1469398.burrell.nikhef.nl/RoofitMPworkspace_1547645550.json', match_y_axes=True)

The next run does have optConst = 2 and minimizerType = Minuit2 for RooMinimizer. As we see, about 10x (!!!) faster than without optConst = 2. Also, the RooMinimizer case is now a bit faster again than the GradMinimizer case.


In [ ]:
dfs = load_result_file('../rootbench/1471528.burrell.nikhef.nl/RoofitMPworkspace_1547720829.json', match_y_axes=True)

Split up timings

The out logs contain more detailed timing information, split up into communication (update step) and actual gradient calculation. We will now extract this information.

The following functions are for outputs of git commit 6b98b0c0.


In [ ]:
# 6b98b0c0 (2019-01-16 14:18)

def extract_split_timing_info(fn):
    with open(fn, 'r') as fh:
        lines = fh.read().splitlines()
    
    bm_iterations = []
    
    start_indices = []
    end_indices = []
    for ix, line in enumerate(lines):
        if line == 'start migrad':
            start_indices.append(ix)
        elif line == 'end migrad':
            end_indices.append(ix)
    
    start_indices_clean = []
    for ix, index in enumerate(start_indices):
        try:
            if start_indices[ix + 1] != index + 1:
                start_indices_clean.append(index)
        except IndexError:
            start_indices_clean.append(index)

    for ix in range(len(start_indices_clean)):
        bm_iterations.append(lines[start_indices_clean[ix]+1:end_indices[ix]])
    
    return bm_iterations


def build_df_split_timing_run(timing_string_list):
    data = {'time [s]': [], 'timing_type': []}
    for line in timing_string_list:
        words = line.split()
        data['time [s]'].append(float(words[1][:-2]))
        data['timing_type'].append('update state')
        data['time [s]'].append(float(words[4][:-1]))
        data['timing_type'].append('gradient work')
    return pd.DataFrame(data)


def build_dflist_split_timing_info(fn):
    bm_iterations = extract_split_timing_info(fn)
    
    dflist = []
    for bm in bm_iterations:
        dflist.append(build_df_split_timing_run(bm))
    
    return dflist


def build_comb_df_split_timing_info(fn):
    dflist = build_dflist_split_timing_info(fn)
    
    ix = 0
    for df in dflist:
        df["benchmark_number"] = ix
        ix += 1
    
    return pd.concat(dflist)

In [ ]:
df_split_timings_1471528 = build_comb_df_split_timing_info('../rootbench/1471528.burrell.nikhef.nl.out')

In [ ]:
df_split_timings_1471528.columns

In [ ]:
dfs_1471528 = load_result_file('../rootbench/1471528.burrell.nikhef.nl/RoofitMPworkspace_1547720829.json', match_y_axes=True)

In [ ]:
df_total_timings_1471528 = dfs_1471528['BM_RooFit_MP_GradMinimizer_workspace_file']

In [ ]:
df_meta_1471528 = df_total_timings_1471528.drop(['real_time', 'real or ideal'], axis=1).dropna().set_index('benchmark_number', drop=True)

In [ ]:
df_meta_1471528.head()

In [ ]:
def combine_split_total_timings_v1(df_total_timings, df_split_timings, calculate_rest=True):
    df_meta = df_total_timings.drop(['real_time', 'real or ideal'], axis=1).dropna().set_index('benchmark_number', drop=True)
    
    df_all_timings = df_total_timings.rename(columns={'real_time': 'time [s]'})
    df_all_timings['time [s]'] /= 1000  # convert to seconds
    df_all_timings['timing_type'] = 'total'

    df_split_sum = {}
    for name, df in df_split_timings.items():
        df_split_sum[name] = df.groupby('benchmark_number').sum().join(df_meta, on='benchmark_number').reset_index()
        df_split_sum[name]['real or ideal'] = 'real'
        df_split_sum[name]['timing_type'] = name

    df_all_timings = pd.concat([df_all_timings,] + list(df_split_sum.values()))
    
    if calculate_rest:
        rest_time = df_all_timings[(df_all_timings['timing_type'] == 'total') & (df_all_timings['real or ideal'] == 'real')].set_index('benchmark_number')['time [s]']
        for name, df in df_split_sum.items():
            rest_time = rest_time - df.set_index('benchmark_number')['time [s]']

        df_rest_time = rest_time.to_frame().join(df_meta, on='benchmark_number').reset_index()
        df_rest_time['timing_type'] = 'rest'
        df_rest_time['real or ideal'] = 'real'

        df_all_timings = df_all_timings.append(df_rest_time)
    
    return df_all_timings

In [ ]:
df_update_timings_1471528 = df_split_timings_1471528[df_split_timings_1471528['timing_type'] == 'update state'].drop('timing_type', axis=1)
df_gradient_timings_1471528 = df_split_timings_1471528[df_split_timings_1471528['timing_type'] == 'gradient work'].drop('timing_type', axis=1)

df_all_timings_1471528 = combine_split_total_timings_v1(df_total_timings_1471528,
                                                        {'update': df_update_timings_1471528,
                                                         'gradient': df_gradient_timings_1471528})

In [ ]:
sns.relplot(data=df_all_timings_1471528, x='NumCPU', y='time [s]', style="real or ideal",
#             col='timing_type',
            hue='timing_type',
            markers=True, err_style="bars", legend='full', kind="line")

Skype met Wouter

(Di 29 Jan, ~14h)

Kijken naar tabel met opgesplitste resultaten:

  • Opvallend: gradient time schaalt helemaal niet netjes! Dat zou je wel verwachten...
  • Update/communicatie tijd schaalt wel wat omhoog, maar zeker niet dominant.
  • "Rest tijd" wordt ook steeds meer.

Checken:

  • BELANGRIJKSTE: Wat voor derivatives zijn het? Tijdsverdeling van derivatives. Met name tijd van langste berekeing; als dat meer is dan een 1/8e, dan is de ideal tijd ook anders!
  • Elke worker node welke derivatives hij gedaan heeft.
  • Seconden wall time van elke worker node --> load balancing.
  • Werk terug van de queue.
  • Wat is de derde component?
  • Ook breakdown van ND-Gaussians

Toevoeging van Inti (30 Jan)

  • Check ook of het aan caching kan liggen: zet caching uit (alwaysDirty)

For the partial derivatives distribution, we'll need new timings, because now we only have per full gradient timings (of each gradient calculation, but still, useless here). For good measure, the variation in those timings can easily be seen to be very low:


In [ ]:
df_gradient_timings_1471528.groupby('benchmark_number').std().plot()

15-dim Gaussian (60 parameters; 12 feb 2019)


In [ ]:
dfs_local1549950020 = load_result_file('/Users/pbos/projects/apcocsm/roofit-dev/rootbench/cmake-build-debug/root/roofit/roofit/RoofitMP_1549950020.json')

This was on Macbook, so reasonable enough.

Split out partial derivatives


In [ ]:
# bd2783d0 (2019-01-30 13:42)

def extract_split_timing_info_20190130(fn):
    """
    Group lines by benchmark iteration, starting from migrad until
    after the forks have been terminated.
    """
    with open(fn, 'r') as fh:
        lines = fh.read().splitlines()
    
    bm_iterations = []
    
    start_indices = []
    end_indices = []
    for ix, line in enumerate(lines):
        if 'start migrad' in line:
            if lines[ix-1] == 'start migrad':  # sometimes 'start migrad' appears twice
                start_indices.pop()
            start_indices.append(ix)
            # sanity check:
#             if line != 'start migrad':
#                 print(line)
        elif line[:11] == 'terminate: ':
            end_indices.append(ix)
    
    if len(start_indices) != len(end_indices):
        raise Exception("Number of start and end indices unequal!")
    
    for ix in range(len(start_indices)):
        bm_iterations.append(lines[start_indices[ix]+1:end_indices[ix]+1])
    
    return bm_iterations


def group_timing_lines(bm_iteration_lines):
    """
    Group lines (from one benchmark iteration) by gradient call,
    specifying:
    - Update time
    - Gradient work time
    - For all partial derivatives a sublist of all lines
    Finally, the terminate time for the entire bm_iteration is also
    returned (last line in the list).
    """    
    gradient_calls = []
    
    start_indices = []
    end_indices = []
    for ix, line in enumerate(bm_iteration_lines[:-1]):  # -1: leave out terminate line
        if line[:9] == 'worker_id':
            if bm_iteration_lines[ix-1][:9] != 'worker_id':  # only use the first of these
                start_indices.append(ix)
        elif line[:12] == 'update_state':
            end_indices.append(ix)
        
    for ix in range(len(start_indices)):
        gradient_calls.append({
            'gradient_total': bm_iteration_lines[end_indices[ix]],
            'partial_derivatives': bm_iteration_lines[start_indices[ix]:end_indices[ix]]
        })
        
    try:
        terminate_line = bm_iteration_lines[-1]
    except IndexError:
        terminate_line = None

    return gradient_calls, terminate_line


def build_df_split_timing_run_20190130(timing_grouped_lines_list, terminate_line):
    data = {'time [s]': [], 'timing_type': [], 'worker_id': [], 'task': []}
    
    for gradient_call_timings in timing_grouped_lines_list:
        words = gradient_call_timings['gradient_total'].split()
        
        data['time [s]'].append(float(words[1][:-2]))
        data['timing_type'].append('update state')
        data['worker_id'].append(None)
        data['task'].append(None)
        
        data['time [s]'].append(float(words[4][:-1]))
        data['timing_type'].append('gradient work')
        data['worker_id'].append(None)
        data['task'].append(None)

        for partial_derivative_line in gradient_call_timings['partial_derivatives']:
            words = partial_derivative_line.split()
            data['worker_id'].append(words[1][:-1])
            data['task'].append(words[3][:-1])
            data['time [s]'].append(float(words[7][:-1]))
            data['timing_type'].append('partial derivative')

    words = terminate_line.split()
    data['time [s]'].append(float(words[1][:-1]))
    data['timing_type'].append('terminate')
    data['worker_id'].append(None)
    data['task'].append(None)
        
    return pd.DataFrame(data)


def build_dflist_split_timing_info_20190130(fn):
    bm_iterations = extract_split_timing_info_20190130(fn)
    
    dflist = []
    for bm in bm_iterations:
        grouped_lines, terminate_line = group_timing_lines(bm)
        if terminate_line is not None:
            dflist.append(build_df_split_timing_run_20190130(grouped_lines, terminate_line))
    
    return dflist


def build_comb_df_split_timing_info_20190130(fn):
    dflist = build_dflist_split_timing_info_20190130(fn)
    
    ix = 0
    for df in dflist:
        df["benchmark_number"] = ix
        ix += 1
    
    return pd.concat(dflist)

In [ ]:
df_split_timings_1538069 = build_comb_df_split_timing_info_20190130('../rootbench/1538069.burrell.nikhef.nl.out')

In [ ]:
dfs_1538069 = load_result_file('../rootbench/1538069.burrell.nikhef.nl/RoofitMPworkspace_1549966927.json', match_y_axes=True)

In [ ]:
df_total_timings_1538069 = dfs_1538069['BM_RooFit_MP_GradMinimizer_workspace_file']
df_meta_1538069 = df_total_timings_1538069.drop(['real_time', 'real or ideal'], axis=1).dropna().set_index('benchmark_number', drop=True)

In [ ]:
df_update_timings_1538069 = df_split_timings_1538069[df_split_timings_1538069['timing_type'] == 'update state'].drop('timing_type', axis=1)
df_gradient_timings_1538069 = df_split_timings_1538069[df_split_timings_1538069['timing_type'] == 'gradient work'].drop('timing_type', axis=1)
df_terminate_timings_1538069 = df_split_timings_1538069[df_split_timings_1538069['timing_type'] == 'terminate'].drop('timing_type', axis=1)

df_all_timings_1538069 = combine_split_total_timings_v1(df_total_timings_1538069,
                                                        {'update': df_update_timings_1538069,
                                                         'gradient': df_gradient_timings_1538069,
                                                         'terminate': df_terminate_timings_1538069})

In [ ]:
sns.relplot(data=df_all_timings_1538069, x='NumCPU', y='time [s]', style="real or ideal",
#             col='timing_type',
            hue='timing_type',
            markers=True, err_style="bars", legend='full', kind="line")

Ok, so terminate does have some impact, but it's not the end of the "rest" story, there's more there than just the line search steps.

Let's also check out the partial derivatives in total.


In [ ]:
df_partderiv_timings_1538069 = df_split_timings_1538069[df_split_timings_1538069['timing_type'] == 'partial derivative'].drop('timing_type', axis=1)

sns.relplot(data=combine_split_total_timings_v1(df_total_timings_1538069,
                                                        {'update': df_update_timings_1538069,
                                                         'gradient': df_gradient_timings_1538069,
                                                         'terminate': df_terminate_timings_1538069,
                                                         'partial derivative': df_partderiv_timings_1538069}, calculate_rest=False), x='NumCPU', y='time [s]', style="real or ideal",
#             col='timing_type',
            hue='timing_type',
            markers=True, err_style="bars", legend='full', kind="line")

In [ ]:
df_terminate_timings_1538069.join(df_meta_1538069, on='benchmark_number').groupby('NumCPU').mean()

This is also interesting; apparently the total time spent on partial derivatives (just the calculation of the tasks on the workers!) goes up with number of workers!

This could either be because the workers were for some reason disrupted, or because they lack some cache maybe.

Partial derivatives in detail

Let's see how the tasks are divided over the workers, split out by run-type, i.e. NumCPU.

Splitting out by individual run is also good to check out, but that's 80 splits, while NumCPU is only 8.


In [ ]:
df_partderiv_timings_1538069['benchmark_number'].unique()

In [ ]:
# failed plotting attempts; stacked bar plots are a bitch

# fig, ax = plt.subplots(1,1, figsize=(14,8))
# sns.distplot(pd, ax=ax)
# ax.set_yscale('log')
# pd_mp.groupby(['worker_id', 'task', 'benchmark_number'])['time_s'].mean().plot(kind='bar')
# sns.catplot(data=pardiff_mp_1538069, kind='bar', sharey=True,
#             col='benchmark_number', col_wrap=4,
#             x='worker_id', y='time_s', hue='task')

# altair: holy crap, this one EMPTY plot increases the size of the notebook by 128 MB!
# alt.data_transformers.enable('default', max_rows=None)
# alt.Chart(df_partderiv_timings_1538069).mark_bar().encode(
#     x='worker_id',
#     y='mean(time [s])',
#     color='task'
# )
# alt.data_transformers.enable('default', max_rows=5000)

In [ ]:
dfpd_1538069 = df_partderiv_timings_1538069.join(df_meta_1538069, on='benchmark_number')

In [ ]:
dfpd_1538069_meanVSnumCPU = dfpd_1538069.groupby(['worker_id', 'task', 'NumCPU'])['time [s]'].mean().to_frame().reset_index()

In [ ]:
dfpd_1538069_mean1numCPU = dfpd_1538069_meanVSnumCPU[dfpd_1538069_meanVSnumCPU['NumCPU'] == 1]
dfpd_1538069_mean1n2numCPU = dfpd_1538069_meanVSnumCPU[dfpd_1538069_meanVSnumCPU['NumCPU'] < 3]

In [ ]:
def plot_partial_derivative_per_worker(data, figsize=(16, 10)):    
    # bqplot: also can't get it to work
#     fig = blt.figure()
#     bar = blt.bar(worker_ids, times, 
#                   padding=0.2,
# #                   colors=tasks
#                  )
#     display(fig)

#     width = 0.35

    N_tasks = len(data['task'].unique())

    colors = plt.cm.get_cmap('prism', N_tasks)
    
    fig, ax = plt.subplots(2, 4, sharey=True, figsize=figsize)
    ax = ax.flatten()
    for ix_n, n in enumerate(data['NumCPU'].unique()):
        data_n = data[data['NumCPU'] == n]
        for w in data_n['worker_id'].unique():
            data_n_w = data_n[data_n['worker_id'] == w]
            prev_time = 0
            for task in data_n_w['task'].unique():
                time = data_n_w[data_n_w['task'] == task]['time [s]']
                if any(time):
                    ax[ix_n].bar(w, time, bottom=prev_time, color=colors(int(task)),
                                 linewidth=0.3,
                                 edgecolor=(0.2,0.2,0.2)
                                )
                    prev_time = time

In [ ]:
plot_partial_derivative_per_worker(dfpd_1538069_meanVSnumCPU)

In [ ]:
len(df_split_timings_1538069.groupby('benchmark_number')), len(df_total_timings_1538069)

In [ ]:
dfpd_1538069_sum_VSnumCPU = dfpd_1538069.groupby(['worker_id', 'task', 'NumCPU'])['time [s]'].sum().to_frame().reset_index()

In [ ]:
plot_partial_derivative_per_worker(dfpd_1538069_sum_VSnumCPU)

In [ ]:
def plot_partial_derivative_per_benchmark(data, figsize=(20, 10)):    
    N_tasks = len(data['task'].unique())

    colors = plt.cm.get_cmap('prism', N_tasks)
    
    fig, ax = plt.subplots(2, 5, sharey=True, figsize=figsize)
    ax = ax.flatten()
    for ix_b, b in enumerate(data['benchmark_number'].unique()):
        data_b = data[data['benchmark_number'] == b]
        for w in data_b['worker_id'].unique():
            data_b_w = data_b[data_b['worker_id'] == w]
            prev_time = 0
            for task in data_b_w['task'].unique():
                time = data_b_w[data_b_w['task'] == task]['time [s]']
                if any(time):
                    ax[ix_b].bar(w, time, bottom=prev_time, color=colors(int(task)),
                                 linewidth=0.3,
                                 edgecolor=(0.2,0.2,0.2)
                                )
                    prev_time = time

In [ ]:
dfpd_1538069_sum_byall = dfpd_1538069.groupby(['benchmark_number', 'worker_id', 'task', 'NumCPU'])['time [s]'].sum().to_frame().reset_index()

In [ ]:
plot_partial_derivative_per_benchmark(dfpd_1538069_sum_byall[dfpd_1538069_sum_byall['NumCPU'] == 3])

Mo 25 feb 2019

It's still odd that those earlier summed bars don't go down so fast, e.g. that the sum of the two 2-core bars is almost twice the 1-core bar...

Could there be one or two dominant runs there that skew the statistics? This should then show in the mean, or better the median, because the mean as we saw even earlier is also very odd. Let's try median then:


In [ ]:
dfpd_1538069_medianVSnumCPU = dfpd_1538069.groupby(['worker_id', 'task', 'NumCPU'])['time [s]'].median().to_frame().reset_index()

In [ ]:
plot_partial_derivative_per_worker(dfpd_1538069_medianVSnumCPU)

Hmm, almost identical to the mean plot... What about min and max then?


In [ ]:
dfpd_1538069_minVSnumCPU = dfpd_1538069.groupby(['worker_id', 'task', 'NumCPU'])['time [s]'].min().to_frame().reset_index()
dfpd_1538069_maxVSnumCPU = dfpd_1538069.groupby(['worker_id', 'task', 'NumCPU'])['time [s]'].max().to_frame().reset_index()

In [ ]:
plot_partial_derivative_per_worker(dfpd_1538069_minVSnumCPU)

In [ ]:
plot_partial_derivative_per_worker(dfpd_1538069_maxVSnumCPU)q

Talk over Skype with Wouter (25 Feb 2019 10:00)

It's still pretty unclear what's happening in these summarized plots.

TODO:

  • Find out what the components are and label them in the plot
  • Start back at the basics: do 1 plot for every gradient, so we can see whether that load balances properly.
  • Check why the total times are only about 6 seconds while for the total plots its around 20 s...

Add gradient_nr

The current parallel gradient splitout function doesn't add gradient number, which makes the above difficult, so let's correct that.


In [ ]:
def build_comb_df_split_timing_info_20190225(fn):
    dflist = build_dflist_split_timing_info_20190130(fn)
    
    ix = 0
    for df in dflist:
        df_pardiff = df[df["timing_type"] == "partial derivative"]
        N_tasks = len(df_pardiff["task"].unique())
        N_gradients = len(df_pardiff) // N_tasks
        gradient_indices = np.hstack(i * np.ones(N_tasks, dtype='int') for i in range(N_gradients))
        
        df["gradient number"] = pd.Series(dtype='Int64')
        df.loc[df["timing_type"] == "partial derivative", "gradient number"] = gradient_indices
        
        df["benchmark_number"] = ix
        ix += 1
    
    return pd.concat(dflist)

In [ ]:
df_split_timings_1538069_v2 = build_comb_df_split_timing_info_20190225('../rootbench/1538069.burrell.nikhef.nl.out')

In [ ]:
def plot_partial_derivative_per_gradient(data, figsize=(20, 1.61*20), wrap=8):
    N_tasks = len(data['task'].unique())

    colors = plt.cm.get_cmap('jet', N_tasks)
    
    fig, ax = egp.plot.subplots(len(data['gradient number'].unique()), wrap=wrap,
                                sharey=True, figsize=figsize)
    try:
        ax = ax.flatten()
    except:
        ax = [ax]
    for b in data['benchmark_number'].unique():
        data_b = data[data['benchmark_number'] == b]
        for ix_g, g in enumerate(data_b['gradient number'].unique()):
            data_b_g = data_b[data_b['gradient number'] == g]
            for w in data_b_g['worker_id'].unique():
                data_b_g_w = data_b_g[data_b_g['worker_id'] == w]
                prev_time = 0
                for task in data_b_g_w['task'].unique():
                    time = data_b_g_w[data_b_g_w['task'] == task]['time [s]']
                    if any(time):
                        ax[ix_g].bar(w, time, bottom=prev_time, color=colors(int(task)),
                                     linewidth=0.3,
                                     edgecolor=(0.2,0.2,0.2)
                                    )
                        prev_time += float(time)

In [ ]:
df_partderiv_timings_1538069_v2 = df_split_timings_1538069_v2[df_split_timings_1538069_v2['timing_type'] == 'partial derivative']\
                                    .drop('timing_type', axis=1)

dfpd_1538069_v2 = df_partderiv_timings_1538069_v2.join(df_meta_1538069, on='benchmark_number')
dfpd_1538069_v2_sumVSgradient = dfpd_1538069_v2.groupby(['worker_id', 'task', 'NumCPU',
                                                         'benchmark_number', 'gradient number'])['time [s]'].sum() \
                                               .to_frame().reset_index()

In [ ]:
full = dfpd_1538069_v2_sumVSgradient
selection = full[full['NumCPU'] == 3]
selection = selection[selection['benchmark_number'] == selection['benchmark_number'].iloc[0]]
plot_partial_derivative_per_gradient(selection)

Something was going wrong with the y-axis on the plots... Here's a test set:


In [ ]:
df_fake = pd.DataFrame({'worker_id': [0, 0, 0, 1, 1, 1, 2, 2, 2],
                        'task': range(9),
                        'NumCPU': np.ones(9)*3,
                        'benchmark_number': np.ones(9),
                        'gradient number': np.ones(9),
                        'time [s]': np.ones(9)})

In [ ]:
plot_partial_derivative_per_gradient(df_fake, figsize=(7,7))

This now (27 Feb 11:00) works, after some debugging above! Basically, we did prev_time = time, which is obviously wrong, but just prev_time += time also didn't cut it, because then you get mismatching pandas Series indices, which will only give you two bars: the first one at which prev_time is zero, and the second one at which prev_time is the first time value. After that, next time values will have a different index value than prev_time, so a NaN will be filled in for the missing index on which it's trying to sum to prev_time, meaning the result will be NaN for bar's bottom parameter from that moment on.

So the fix is to use prev_time += float(time) instead.

This all also explains why in previous plots we saw that the total time did not seem to make sense: we kept plotting from the bottom, just replacing the bottom with the previous time, meaning the bars would be superimposed instead of actually stacked.

... Anyway, now it works. Below the plots redone.

Above plots redone with fixed bar plotting (27 Feb)


In [ ]:
def plot_partial_derivative_per_worker(data, figsize=(16, 10)):    
    N_tasks = len(data['task'].unique())

    colors = plt.cm.get_cmap('prism', N_tasks)
    
    fig, ax = plt.subplots(2, 4, sharey=True, figsize=figsize)
    ax = ax.flatten()
    for ix_n, n in enumerate(data['NumCPU'].unique()):
        data_n = data[data['NumCPU'] == n]
        for w in data_n['worker_id'].unique():
            data_n_w = data_n[data_n['worker_id'] == w]
            prev_time = 0
            for task in data_n_w['task'].unique():
                time = data_n_w[data_n_w['task'] == task]['time [s]']
                if any(time):
                    ax[ix_n].bar(w, time, bottom=prev_time, color=colors(int(task)),
                                 linewidth=0.3,
                                 edgecolor=(0.2,0.2,0.2)
                                )
                    prev_time += float(time)


def plot_partial_derivative_per_benchmark(data, figsize=(20, 10)):    
    N_tasks = len(data['task'].unique())

    colors = plt.cm.get_cmap('prism', N_tasks)
    
    fig, ax = plt.subplots(2, 5, sharey=True, figsize=figsize)
    ax = ax.flatten()
    for ix_b, b in enumerate(data['benchmark_number'].unique()):
        data_b = data[data['benchmark_number'] == b]
        for w in data_b['worker_id'].unique():
            data_b_w = data_b[data_b['worker_id'] == w]
            prev_time = 0
            for task in data_b_w['task'].unique():
                time = data_b_w[data_b_w['task'] == task]['time [s]']
                if any(time):
                    ax[ix_b].bar(w, time, bottom=prev_time, color=colors(int(task)),
                                 linewidth=0.3,
                                 edgecolor=(0.2,0.2,0.2)
                                )
                    prev_time += float(time)


def plot_partial_derivative_per_gradient(data, figsize=(20, 1.61*20), wrap=8):
    N_tasks = len(data['task'].unique())

    colors = plt.cm.get_cmap('jet', N_tasks)
    
    fig, ax = egp.plot.subplots(len(data['gradient number'].unique()), wrap=wrap,
                                sharey=True, figsize=figsize)
    try:
        ax = ax.flatten()
    except:
        ax = [ax]
    for b in data['benchmark_number'].unique():
        data_b = data[data['benchmark_number'] == b]
        for ix_g, g in enumerate(data_b['gradient number'].unique()):
            data_b_g = data_b[data_b['gradient number'] == g]
            for w in data_b_g['worker_id'].unique():
                data_b_g_w = data_b_g[data_b_g['worker_id'] == w]
                prev_time = 0
                for task in data_b_g_w['task'].unique():
                    time = data_b_g_w[data_b_g_w['task'] == task]['time [s]']
                    if any(time):
                        ax[ix_g].bar(w, time, bottom=prev_time, color=colors(int(task)),
                                     linewidth=0.3,
                                     edgecolor=(0.2,0.2,0.2)
                                    )
                        prev_time += float(time)

In [ ]:
plot_partial_derivative_per_worker(dfpd_1538069_meanVSnumCPU)

In [ ]:
plot_partial_derivative_per_worker(dfpd_1538069_sum_VSnumCPU)

In [ ]:
plot_partial_derivative_per_benchmark(dfpd_1538069_sum_byall[dfpd_1538069_sum_byall['NumCPU'] == 3])

In [ ]:
plot_partial_derivative_per_worker(dfpd_1538069_medianVSnumCPU)

In [ ]:
plot_partial_derivative_per_worker(dfpd_1538069_minVSnumCPU)

In [ ]:
plot_partial_derivative_per_worker(dfpd_1538069_maxVSnumCPU)

In [ ]:
full = dfpd_1538069_v2_sumVSgradient
selection = full[full['NumCPU'] == 3]
selection = selection[selection['benchmark_number'] == selection['benchmark_number'].iloc[0]]
plot_partial_derivative_per_gradient(selection)

In [ ]:
full = dfpd_1538069_v2_sumVSgradient
selection = full[full['NumCPU'] == 1]
selection = selection[selection['benchmark_number'] == selection['benchmark_number'].iloc[0]]
plot_partial_derivative_per_gradient(selection, wrap=10, figsize=(20, 30))

In [ ]:
full = dfpd_1538069_v2_sumVSgradient
selection = full[full['NumCPU'] == 8]
selection = selection[selection['benchmark_number'] == selection['benchmark_number'].iloc[0]]
plot_partial_derivative_per_gradient(selection, wrap=5, figsize=(20, 40))

TODO (Jisk)

Disable caching completely to see whether the partial derivatives total timing line stays flat; it should then, unless something else is going on!

27 Feb 2019, 12:00, discussed new results with Wouter

It's now clear that load balancing is not an issue.

It is still unclear what is exactly causing the total time of the partial derivatives to rise. Is it caching? For this we could do the test Jisk suggested. Are we actually using caching at all?

Also there are still some overhead sources unidentified.

Now the important next step is to run on the large workspace, because this is what the whole project was about from the start. If we can get that from an hour to a few minutes, we're in business.

Todo for ACAT

  • Run on the large workspace from Carsten; if that scales well, we have a great message to deliver.
    • Run with even more workers, we can go up to 32 on the new Stoomboot nodes. Maybe also include the 1-8 ones? Or just 1, 2:8:2 and then 16 and 32?
  • After that, go back to the shorter workspace and measure there the overhead in even more detail.
    • Especially: the communication overhead on the queue process and on the workers.
    • For this we'll need to do things slightly differently: we need to also print absolute times, because otherwise we cannot determine the total time the communication takes. This is going to be a bit complicated in any case, because first the master sends to queue (this we already measure), then the queue starts sending to the workers, but during that time the master can already start sending new things to the queue; the queue just won't receive it yet. The same thing will happen with the queue and workers. Sending and receiving does not happen in sync in general.
  • Check out what's happening with caching exactly.
    • Explicitly disable it to see whether this caused the rise in partial derivatives total time.
    • Check what it was before (either level 1 or 2 I presume) and try level 2 if it wasn't done before.

In [ ]: