Results Analysis

This notebook analyzes results produced by the anti-entropy reinforcement learning experiments. The practical purpose of this notebook is to create graphs that can be used to display anti-entropy topologies, but also to extract information relevant to each experimental run.


In [1]:
%matplotlib notebook

import os 
import re 
import glob 
import json
import unicodedata

import numpy as np 
import pandas as pd 
import seaborn as sns
import networkx as nx
import matplotlib as mpl 
import graph_tool.all as gt
import matplotlib.pyplot as plt 

from nx2gt import nx2gt
from datetime import timedelta 
from collections import defaultdict


/Users/benjamin/.pyenv/versions/3.6.2/Python.framework/Versions/3.6/lib/python3.6/site-packages/graph_tool/draw/cairo_draw.py:1480: RuntimeWarning: Error importing Gtk module: No module named 'gi'; GTK+ drawing will not work.
  warnings.warn(msg, RuntimeWarning)

Data Loading

The data directory contains directories whose names are the hosts along with configuration files for each run. Each run is stored in its own metrics.json file, suffixed by the run number. The data loader yields all rows from all metric files and appends them with the correct configuration data.


In [2]:
DATA = "../data"
FIGS = "../figures"
GRAPHS = "../graphs"
HOSTS = "hosts.json"
RESULTS = "metrics-*.json"
CONFIGS = "config-*.json"
NULLDATE = "0001-01-01T00:00:00Z"
DURATION = re.compile("^([\d\.]+)(\w+)$") 

def suffix(path):
    # Get the run id from the path 
    name, _ = os.path.splitext(path)
    return int(name.split("-")[-1]) 


def parse_duration(d):
    match = DURATION.match(d)
    if match is None:
        raise TypeError("could not parse duration '{}'".format(d))
    amount, units = match.groups() 
    amount = float(amount)

    unitkw = {
        "µs": "microseconds", 
        "ms": "milliseconds", 
        "s": "seconds", 
    }[units]
    
    return timedelta(**{unitkw:amount}).total_seconds()


def load_hosts(path=DATA):
    with open(os.path.join(path, HOSTS), 'r') as f:
        return json.load(f)


def load_configs(path=DATA):
    configs = {}
    for name in glob.glob(os.path.join(path, CONFIGS)):
        with open(name, 'r') as f:
            configs[suffix(name)] = json.load(f)
    return configs 


def slugify(name):
    slug = unicodedata.normalize('NFKD', name)
    slug = str(slug.encode('ascii', 'ignore')).lower()
    slug = re.sub(r'[^a-z0-9]+', '-', slug).strip('-')
    slug = re.sub(r'[-]+', '-', slug)
    return slug 


def load_results(path=DATA, run=0):
    rpath = os.path.join(path, 'run-{}'.format(run))
    
    hosts = load_hosts(path)
    configs = load_configs(path) 
    for host in os.listdir(rpath):
        for name in glob.glob(os.path.join(rpath, host, "metrics-*.json")):
            run = suffix(name) 
            with open(name, 'r') as f:
                for line in f:
                    row = json.loads(line.strip())
                    row['name'] = host 
                    row['host'] = hosts[host]["hostname"] + ":3264"
                    row['runid'] = run 
                    row['config'] = configs[run]
                    yield row


def merge_results(path, data=DATA, run=0):
    # Merge all of the results into a single unified file 
    with open(path, 'w') as f:
        for row in load_results(data, run):
            f.write(json.dumps(row))
            f.write("\n")

Graph Extraction

This section extracts a NeworkX graph for each of the experimental runs such that each graph defines an anti-entropy topology.


In [7]:
def extract_graphs(path=DATA, outdir=None):
    graphs = defaultdict(nx.DiGraph)
    for row in load_results(path):
        
        # Get the graph for the topology 
        G = graphs[row["runid"]]
        
        # Update the graph information 
        name = row["bandit"]["strategy"].title()
        
        # Get the replica config 
        conf = row["config"]["replicas"]["config"]
        
        # Compute epsilon string 
        epsilon =  conf.get("epsilon", None)
        if epsilon:
            name += " ε={}".format(epsilon)
        
        G.graph.update({
            "name": name + " (E{})".format(row["runid"]), 
            "experiment": row["runid"], 
            "uptime": conf["uptime"], 
            "bandit": conf["bandit"], 
            "epsilon": epsilon or "", 
            "anti_entropy_interval": conf["delay"], 
            "workload_duration": row["config"]["clients"]["config"]["duration"], 
            "n_clients": len(row["config"]["clients"]["hosts"]),
#             "workload": row["config"]["clients"]["hosts"], 
            "store": row["store"], 
        })
        
        # Update the vertex information
        vnames = row["name"].split("-")
        vertex = {
            "duration": row["duration"], 
            "finished": row["finished"] if row["finished"] != NULLDATE else "", 
            "started": row["started"] if row["started"] != NULLDATE else "",
            "keys_stored": row["nkeys"], 
            "reads": row["reads"], 
            "writes": row["writes"], 
            "throughput": row["throughput"], 
            "location": " ".join(vnames[1:-1]).title(), 
            "pid": int(vnames[-1]), 
            "name": row["name"]
        }
        source_id = row["host"]
        source = G.add_node(source_id, **vertex)
        
        # Get bandit edge information 
        bandit_counts = dict(zip(row["peers"], row["bandit"]["counts"]))
        bandit_values = dict(zip(row["peers"], row["bandit"]["values"]))
        
        # Add the edges from the sync table 
        for target_id, stats in row["syncs"].items():
            edge = {
                "count": bandit_counts[target_id], 
                "reward": bandit_values[target_id], 
                "misses": stats["Misses"],
                "pulls": stats["Pulls"], 
                "pushes": stats["Pushes"], 
                "syncs": stats["Syncs"],
                "versions": stats["Versions"], 
                "mean_pull_latency": parse_duration(stats["PullLatency"]["mean"]),
                "mean_push_latency": parse_duration(stats["PushLatency"]["mean"]),
            }
            G.add_edge(source_id, target_id, **edge)
    
    # Write Graphs
    if outdir:
        for G in graphs.values():
            opath = os.path.join(outdir, slugify(G.name)+".graphml.gz")
            nx.write_graphml(G, opath)
    
    return graphs 

        
# for G in extract_graphs(outdir=GRAPHS).values():
for G in extract_graphs().values():
    print(nx.info(G))
    print()


Name: Uniform Selection (E1)
Type: DiGraph
Number of nodes: 45
Number of edges: 1980
Average in degree:  44.0000
Average out degree:  44.0000

Name: Epsilon Greedy ε=0.5 (E4)
Type: DiGraph
Number of nodes: 45
Number of edges: 1980
Average in degree:  44.0000
Average out degree:  44.0000

Name: Annealing Epsilon Greedy (E5)
Type: DiGraph
Number of nodes: 45
Number of edges: 1980
Average in degree:  44.0000
Average out degree:  44.0000

Name: Epsilon Greedy ε=0.1 (E2)
Type: DiGraph
Number of nodes: 45
Number of edges: 1980
Average in degree:  44.0000
Average out degree:  44.0000

Name: Epsilon Greedy ε=0.2 (E3)
Type: DiGraph
Number of nodes: 45
Number of edges: 1980
Average in degree:  44.0000
Average out degree:  44.0000


In [62]:
LOCATION_COLORS = {'Oregon': '#000000', 'California': '#7e008f', 'Ohio': '#1800a7', 'Virginia': '#001cdd', 'Canada': '#0090dd', 'Sao Paulo': '#00aaa5', 'Ireland': '#00a13d', 'London': '#00bc00', 'Paris': '#00ec00', 'Frankfurt': '#a1ff00', 'Mumbai': '#f3e500', 'Singapore': '#ffa500', 'Seoul': '#f90000', 'Tokyo': '#d30000', 'Sydney': '#cccccc'}
LOCATION_GROUPS = sorted(list(LOCATION_COLORS.keys()))
LOCATION_CODES = {
    "Virginia": "VA", 
    "Ohio": "OH", 
    "California": "CA", 
    "Sao Paulo": "BR", 
    "London": "GB", 
    "Frankfurt": "DE", 
    "Seoul": "KR", 
    "Sydney": "AU", 
    "Tokyo": "JP", 
    "Mumbai": "IN", 
    "Singapore": "SG", 
    "Canada": "QC", 
    "Ireland": "IE", 
    "Paris": "FR", 
    "Oregon": "OR", 
}

def filter_edges(h, pulls=0, pushes=0, versions=42):
    # Create a view of the graph with only edges with syncs > 0 
    efilt = h.new_edge_property('bool')
    for edge in h.edges():
        efilt[edge] = ((h.ep['pulls'][edge] > pulls or h.ep['pushes'][edge] > pushes) and h.ep['versions'][edge] >= versions)
    return gt.GraphView(h, efilt=efilt)


def mklabel(name, loc):
    code = LOCATION_CODES[loc]
    parts = name.split("-")
    return "{}{}".format(code, parts[-1])


def visualize_graph(G, layout='sfdp', filter=True, save=True):
    print(G.name)
    output = None
    if save:
        output = os.path.join(FIGS, slugify(G.name) + ".pdf")
    
    # Convert the nx Graph to a gt Graph 
    g = nx2gt(G) 
    if filter:
        g = filter_edges(g)
    
    # Vertex Properties 
    vgroup = g.new_vertex_property('int32_t')
    vcolor = g.new_vertex_property('string')
    vlabel = g.new_vertex_property('string')
    for vertex in g.vertices():
        vcolor[vertex] = LOCATION_COLORS[g.vp['location'][vertex]]
        vgroup[vertex] = LOCATION_GROUPS.index(g.vp['location'][vertex])
        vlabel[vertex] = mklabel(g.vp['name'][vertex], g.vp['location'][vertex])
    vsize = gt.prop_to_size(g.vp['writes'], ma=65, mi=35)
    
    # Edge Properties 
    esize  = gt.prop_to_size(g.ep['versions'], mi=0, ma=7)
    ecolor = gt.prop_to_size(g.ep['mean_pull_latency'], mi=1, ma=5, log=True)
    
    # Compute the layout and draw 
    if layout == 'fruchterman_reingold':
        pos = gt.fruchterman_reingold_layout(g, weight=esize, circular=True, grid=False)
    elif layout == 'sfdp':
        pos = gt.sfdp_layout(g, eweight=esize, groups=vgroup)
    else:
        raise ValueError("unknown layout '{}".format(layout))

    gt.graph_draw(
        g, pos=pos, output_size=(1200,1200), output=output, inline=True,
        vertex_size=vsize, vertex_fill_color=vcolor, vertex_text=vlabel, 
        vertex_halo=False, vertex_pen_width=1.2,
        edge_pen_width=esize,
    )
    
    
visualize_graph(extract_graphs()[5])


Annealing Epsilon Greedy (E5)

Rewards DataFrame

This section extracts a timeseries of rewards on a per-replica basis.


In [3]:
def extract_rewards(path=DATA):
    for row in load_results(path):
        bandit = row["bandit"]
        history = bandit["history"]
        strategy = bandit["strategy"]
        epsilon = row["config"]["replicas"]["config"].get("epsilon")
        if epsilon:
            strategy += " ε={}".format(epsilon)
        values = np.array(list(map(float, history["rewards"])))
        series = pd.Series(values, name=row["name"] + " " + strategy)
        yield series, row['runid']

In [4]:
total_rewards = {} 
for series, rowid in extract_rewards():
    if rowid not in total_rewards:
        total_rewards[rowid] = series 
    else:
        total_rewards[rowid] += series

In [5]:
cumulative_rewards = {
    rowid: s.cumsum()
    for rowid, s in total_rewards.items()
}

In [12]:
from pandas.plotting import autocorrelation_plot
df = pd.DataFrame({
    " ".join(s.name.split(" ")[1:]): s 
    for s in total_rewards.values()
}).iloc[0:361]

df.reset_index(inplace=True, drop=True)

fig,ax = plt.subplots(figsize=(9,5))

df.rolling(window=20,center=False).mean().plot(ax=ax)

ax.set_ylabel("Rolling Mean of Total System Reward (w=20)")
ax.set_xlabel("Timesteps (Anti-Entropy Sessions)")
ax.grid(True, ls='--')

# ax.axvline(30, ls='--', lw=1, c='#444444')
# ax.axvline(600, ls='--', lw=1, c='#444444')

ax.set_xlim(12, 346)

plt.savefig(os.path.join(FIGS, "rewards.pdf"))



In [68]:
df.iloc[50:300].max()


Out[68]:
annealing epsilon greedy    37.25
epsilon greedy ε=0.1        34.10
epsilon greedy ε=0.2        38.05
epsilon greedy ε=0.5        37.60
uniform selection           31.30
dtype: float64

In [69]:
df.iloc[50:300].mean()


Out[69]:
annealing epsilon greedy    31.6996
epsilon greedy ε=0.1        26.7522
epsilon greedy ε=0.2        31.1638
epsilon greedy ε=0.5        31.3980
uniform selection           27.5008
dtype: float64

In [72]:
df.iloc[50:300].quantile(.95)


Out[72]:
annealing epsilon greedy    35.7550
epsilon greedy ε=0.1        32.2000
epsilon greedy ε=0.2        35.7325
epsilon greedy ε=0.5        35.6100
uniform selection           30.3000
Name: 0.95, dtype: float64

Sync Latency

This section looks at the latency of pulls and pushes


In [3]:
def extract_rewards(path=DATA):
    hostmap = {}
    latency = []
    
    for row in load_results(path):
        hostmap[row['host']] = row['name']
        
        strategy = row["bandit"]["strategy"]
        epsilon = row["config"]["replicas"]["config"].get("epsilon")
        if epsilon:
            strategy += " ε={}".format(epsilon)
            
        for host, syncs in row["syncs"].items():
            latency.append({
                'src': row['host'], 
                'dst': host, 
                'pull': parse_duration(syncs['PullLatency']['mean']) * 1000,
                'push': parse_duration(syncs['PushLatency']['mean']) * 1000,
            })
    
    for row in latency:
        row['src'] = " ".join(hostmap[row['src']].split("-")[1:-1]).title()
        row['dst'] = " ".join(hostmap[row['dst']].split("-")[1:-1]).title()
        yield row

In [4]:
latency = pd.DataFrame(extract_rewards())
latency["sync"] = latency.pull + latency.push

In [5]:
latency_mean = latency.groupby(['src', 'dst']).mean()

In [107]:
latency_mean.to_csv(os.path.join(DATA, "latency.csv"))

In [15]:
table = pd.pivot_table(latency_mean, values='sync', index='src', columns='dst')

In [22]:
origins_sorted =[
    "Oregon",
    "California",
    "Ohio",
    "Virginia",
    "Canada",
    "Sao Paulo",
    "Ireland",
    "London",
    "Paris",
    "Frankfurt",
    "Mumbai",
    "Singapore",
    "Seoul",
    "Tokyo",
    "Sydney",
]
table


Out[22]:
dst California Canada Frankfurt Ireland London Mumbai Ohio Oregon Paris Sao Paulo Seoul Singapore Sydney Tokyo Virginia
src
California 4.064933 145.943733 291.349111 256.300600 276.739044 457.390733 103.102800 52.840311 283.394044 310.942200 250.821156 301.822689 304.921978 188.100222 108.451800
Canada 141.270489 1.521467 209.342089 145.972400 168.457222 370.546489 50.929333 124.792089 173.098133 236.602133 351.634822 346.045111 428.997867 311.479622 43.331911
Frankfurt 303.390089 197.265533 4.018433 51.362867 44.263089 218.286422 194.741711 319.134733 37.136978 355.396422 458.695222 273.734422 591.382689 372.836333 158.383956
Ireland 257.935489 148.136733 57.992933 2.402833 37.106622 233.808800 164.853622 287.387089 47.996933 314.891644 420.087622 299.295489 563.206267 364.683622 127.239044
London 255.511556 165.581978 39.683867 33.731400 2.317267 222.965911 163.465933 291.700667 30.979933 339.261889 450.430067 267.478689 569.588200 416.493956 134.247511
Mumbai 450.925622 368.358689 186.873111 217.031333 204.119756 4.297233 367.151644 386.443578 183.398978 501.436711 266.915000 118.865911 455.488444 210.727222 327.712533
Ohio 105.148356 53.905400 209.472956 164.450489 163.225667 376.398244 4.533967 120.849156 165.826000 258.751111 329.047978 414.326889 393.685867 331.200756 35.030511
Oregon 47.423578 125.704067 326.486844 275.642689 271.514222 419.591822 135.106156 2.979100 276.109067 329.593178 240.466822 259.181267 327.400800 196.834111 143.027000
Paris 254.618778 180.909333 40.182822 41.956444 28.272644 207.203756 183.693400 291.925022 9.489600 341.166222 458.162178 257.498689 564.500578 415.254444 145.947867
Sao Paulo 372.275867 205.244178 375.368933 335.829356 330.073222 511.781822 249.418089 342.066956 372.870667 4.315167 501.128356 563.859467 571.624689 405.199289 188.767511
Seoul 265.931889 332.269911 446.775689 420.446511 490.675400 274.493689 333.465067 243.898356 493.027933 485.042889 3.248067 198.803444 265.918800 65.164222 358.851267
Singapore 305.164778 408.941044 322.923378 353.301756 306.975933 112.300400 454.715733 297.789800 278.132111 630.064578 205.856378 2.860000 342.265689 141.550711 436.649044
Sydney 279.703578 392.507978 480.413978 467.019933 524.347889 412.839244 338.302533 289.875067 480.349533 526.218067 254.322556 362.050111 3.506600 200.949000 391.335444
Tokyo 215.221111 287.994911 417.198667 387.189822 459.421244 218.208178 256.746178 204.785733 441.853133 436.578778 68.573867 146.978467 206.934400 2.770967 334.481622
Virginia 124.531978 41.960689 188.613444 138.079289 141.899444 378.045756 39.477956 137.333244 141.106778 246.964467 369.278378 428.417356 379.720800 319.030911 2.721133

In [144]:
def plot_heatmap(data=latency_mean, measure='sync'):
    fig, ax = plt.subplots(figsize=(9,7))
    table = pd.pivot_table(data, values=measure, index='src', columns='dst')
    g = sns.heatmap(table, annot=True, fmt="0.0f", cmap='viridis_r', ax=ax)
    g.set_ylabel("source region")
    g.set_xlabel("destination region")
    g.set_title("{} latencies (milliseconds)".format(measure))
    plt.tight_layout()
    
    plt.savefig(os.path.join(FIGS, "latency_{}_heatmap.pdf".format(measure)))
    return g 


plot_heatmap(measure='sync')


Out[144]:
<matplotlib.axes._subplots.AxesSubplot at 0x129c96ba8>

In [31]:
def print_latencies(df=latency_mean, region='Frankfurt'):
    df=df.reset_index()
    df=df.loc[(df['src'] == region)]

    f, ax = plt.subplots(figsize=(9, 6))
    
    sns.set_style('whitegrid')
    sns.set_context('notebook')
    sns.barplot(x="dst", y="sync", data=df,
                       capsize=.2, color="b", errwidth=0.01,label="push")

    sns.barplot(x="dst", y="pull", data=df,
                       capsize=.2, color="g", errwidth=0.01, label="pull")

    ax.legend(ncol=2, loc="upper center", frameon=True)
    ax.set(ylabel="latency (ms)", xlabel="destination region", title="synchronization latencies from {}".format(region))
    ax.set_xticklabels(ax.get_xticklabels(), rotation=45)
    
    plt.tight_layout()
    plt.savefig(os.path.join(FIGS, "push_pull_{}_latencies.pdf".format(region.lower())))
    
print_latencies()