In this notebook we look at the relative performance of a single queue vs multiple queues using the Simpy framework as well as exploring various common load balancing algorithms and their performance in M/G/k systems.
First we establish a baseline simulator which can simulate arbitrary latency distributions and generative processes.
In [1]:
    
# %load src/request_simulator.py
from collections import namedtuple
import random
import numpy as np
import simpy
LatencyDatum = namedtuple('LatencyDatum', ('t_queued', 't_processing', 't_total'))
class RequestSimulator(object):
    """ Simulates a M/G/k process common in request processing (computing)
    :param worker_desc: A tuple of (count, capacity) to construct workers with
    :param local_balancer: A function which takes the current request number and the
                           list of workers and returns the index of the worker to
                           send the next request to
    :param latency_fn: A function which takes the curent request number and the worker
                       that was assigned by the load balancer and returns the number
                       of milliseconds a request took to process
    :param number_of_requests: The number of requests to run through the simulator
    :param request_per_s: The rate of requests per second.
    """
    def __init__(
            self, worker_desc, load_balancer, latency_fn,
            number_of_requests, request_per_s):
        self.worker_desc = worker_desc
        self.load_balancer = load_balancer
        self.latency_fn = latency_fn
        self.number_of_requests = int(number_of_requests)
        self.request_interval_ms = 1. / (request_per_s / 1000.)
        self.data = []
    def simulate(self):
        # Setup and start the simulation
        random.seed(1)
        np.random.seed(1)
        self.env = simpy.Environment()
        count, cap = self.worker_desc
        self.workers = [
            simpy.Resource(self.env, capacity=cap) for i in range(count)
        ]
        self.env.process(self.generate_requests())
        self.env.run()
    def generate_requests(self):
        for i in range(self.number_of_requests):
            idx = self.load_balancer(i, self.workers)
            worker = self.workers[idx]
            response = self.process_request(
                i, worker,
            )
            self.env.process(response)
            # Exponential inter-arrival times == Poisson
            arrival_interval = random.expovariate(
                1.0 / self.request_interval_ms
            )
            yield self.env.timeout(arrival_interval)
    def process_request(self, request_id, worker):
        """ Request arrives, possibly queues, and then processes"""
        t_arrive = self.env.now
        with worker.request() as req:
            yield req
            t_start = self.env.now
            t_queued = t_start - t_arrive
            # Let the operation take w.e. amount of time the latency
            # function tells us to
            yield self.env.timeout(self.latency_fn(request_id))
            t_done = self.env.now
            t_processing = t_done - t_start
            t_total_response = t_done - t_arrive
            datum = LatencyDatum(t_queued, t_processing, t_total_response)
            self.data.append(datum)
def run_simulation(
        worker_desc, load_balancer, num_requests, request_per_s, latency_fn):
    simulator = RequestSimulator(
        worker_desc, load_balancer, latency_fn,
        num_requests, request_per_s
    )
    simulator.simulate()
    return simulator.data
    
In [2]:
    
# %load src/lb_policies.py
import random
def queue_size(resource):
    return resource.count + len(resource.queue)
def random_lb(request_num, workers):
    return random.randint(0, len(workers) - 1)
def rr_lb(request_num, workers):
    return request_num % len(workers)
def choice_two_lb(request_num, workers):
    r1 = random_lb(request_num, workers)
    r2 = random_lb(request_num, workers)
    if queue_size(workers[r1]) < queue_size(workers[r2]):
        return r1
    return r2
def choice_two_adjacent_lb(request_num, workers):
    r1 = random_lb(request_num, workers)
    if r1 + 2 >= len(workers):
        r2 = r1 - 1
        r3 = r1 - 2
    else:
        r2 = r1 + 1
        r3 = r1 + 2
    iq = [(queue_size(workers[i]), i) for i in (r1, r2, r3)]
    return (sorted(iq)[0][1])
def shortest_queue_lb(request_num, workers):
    idx = 0
    for i in range(len(workers)):
        if queue_size(workers[i]) < queue_size(workers[idx]):
            idx = i
    return idx
    
In [3]:
    
# %load src/latency_distributions.py
import numpy as np
import random
def pareto(mean, shape):
    # mean = scale * shape / (shape - 1)
    # solve for scale given mean and shape (aka skew)
    scale = mean - mean / shape
    def latency(request):
        return ((np.random.pareto(shape) + 1) * scale)
    return latency
def expon(mean):
    def latency(request):
        return random.expovariate(1.0 / mean)
    return latency
    
Here we explore the effects of having N queues that handle 1/N the load (aka "Frequency Division Multiplexing",
aka FDM) vs a single queue distributing out to N servers (aka M/M/k queue aka MMk). We confirm the theoretical results that can be obtained using the closed form solutions for E[T]_MMk and E[T]_FDM via
a simulation.
In particular M/M/k queues have a closed form solution for the mean response time given the probability of queueing (and we know if requests queued)
E[T]_MMk = (1 / λ) * Pq * ρ / (1-ρ) + 1 / μ
Where
λ  = the arrival rate (hertz)
Pq = the probability of a request queueing
ρ  = the system load aka λ / (k * μ)
μ  = the average response time (hertz)
Frequency division multiplexing (multiple queues) also has a closed form solution:
E[T]_FDM = (k / (k * μ - λ))
In [4]:
    
# Simulate the various choices
NUM_REQUESTS = 60000
QPS = 18000
AVG_RESPONSE_MS = 0.4
SERVERS = 10
    
In [5]:
    
multiple_queues_latency = []
for i in range(SERVERS):
    multiple_queues_latency.extend([
        i[2] for i in run_simulation((1, 1), rr_lb, NUM_REQUESTS/SERVERS, QPS/SERVERS, expon(AVG_RESPONSE_MS))
    ])
    
single_queue = [
    i for i in run_simulation((1, SERVERS), rr_lb, NUM_REQUESTS, QPS, expon(AVG_RESPONSE_MS))
]
single_queue_latency = [i[2] for i in single_queue]
Pq = sum([i[0] > 0 for i in single_queue]) / float(NUM_REQUESTS)
# MMk have a closed for mean given the probability of queueing (and we know if requests queued)
# E[T]_MMk = (1 / λ) * Pq * ρ / (1-ρ) + 1 / μ
# Where
# λ  = the arrival rate (hertz)
# Pq = the probability of a request queueing
# ρ  = the system load aka λ / (k * μ)
# μ  = the average response time (hertz)
mu_MMk = (1.0 / AVG_RESPONSE_MS) * 1000
lambda_MMk = QPS
rho_MMk = lambda_MMk / (SERVERS * mu_MMk)
expected_MMk_mean_s = (1 / (lambda_MMk)) * Pq * (rho_MMk / (1-rho_MMk)) + 1 / mu_MMk
expected_MMk_mean_ms = expected_MMk_mean_s * 1000.0
# Frequency-division multiplexing also has a closed form for mean
# E[T]_FDM = (k / (k * μ - λ))
expected_FDM_mean_ms = (SERVERS / (SERVERS * mu_MMk - lambda_MMk)) * 1000.0
print("Theory Results")
print("--------------")
print("Pq       = {0:4.2f}".format(Pq))
print("E[T]_FDM = {0:4.2f}".format(expected_FDM_mean_ms))
print("E[T]_MMk = {0:4.2f}".format(expected_MMk_mean_ms))
# And now the simulation
queueing_options = {
    'Multiple Queues': multiple_queues_latency,
    'Single Queue': single_queue_latency,
}
print()
print("Simulation results")
print("------------------")
hdr = "{0:16} | {1:>7} | {2:>7} | {3:>7} | {4:>7} | {5:>7} | {6:>7} |".format(
"Strategy", "mean", "var", "p50", "p95", "p99", "p99.9")
print(hdr)
for opt in sorted(queueing_options.keys()):
    mean = np.mean(queueing_options[opt])
    var = np.var(queueing_options[opt])
    percentiles = np.percentile(queueing_options[opt], [50, 95, 99, 99.9])
    print ("{0:16} | {1:7.2f} | {2:7.2f} | {3:7.2f} | {4:7.2f} | {5:>7.2f} | {6:7.2f} |".format(
        opt, mean, var, percentiles[0], percentiles[1], percentiles[2],
        percentiles[3]
    ))
    
    
In [14]:
    
import numpy as np
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import matplotlib.style as style
style.use('seaborn-pastel')
def color_bplot(bp, edge_color, fill_color):
    for element in ['boxes', 'whiskers', 'fliers', 'means', 'medians', 'caps']:
        plt.setp(bp[element], color=edge_color)
    for box in bp['boxes']:
        box.set_facecolor(fill_color)  
fig1, ax = plt.subplots(figsize=(12,3))
opts = sorted(queueing_options.keys())
data = [queueing_options[i] for i in opts]
flier = dict(markerfacecolor='r', marker='.')
bplot1 = ax.boxplot(data,whis=[1,99],showfliers=True,flierprops=flier, labels=opts,
                    patch_artist=True, vert=False)
color_bplot(bplot1, 'black', 'lightblue')
plt.title("Response Time Distribution \n[{0} QPS @ {1}ms avg with {2} servers]".format(
    QPS, AVG_RESPONSE_MS, SERVERS)
)
plt.minorticks_on()
plt.grid(which='major', linestyle=':', linewidth='0.4', color='black')
# Customize the minor grid
plt.grid(which='minor', linestyle=':', linewidth='0.4', color='black')
plt.xlabel('Response Time (ms)')
plt.show()
    
    
In [7]:
    
from multiprocessing import Pool
def run_multiple_simulations(simulation_args):
    with Pool(12) as p:
        results = p.starmap(run_simulation, simulation_args)
    return results
    
In [8]:
    
# Should overload 10 servers with 0.4ms avg response time at 25k
qps_range = [i*1000 for i in range(1, 21)]
multi = []
num_requests = 100000
# Can't pickle inner functions apparently...
def expon_avg(request):
    return random.expovariate(1.0 / AVG_RESPONSE_MS)
print("Multi Queues")
args = []
for qps in qps_range:
    args.clear()
    multiple_queues_latency = []
    for i in range(SERVERS):
        args.append(
            ((1, 1), rr_lb, num_requests/SERVERS, qps/SERVERS, expon_avg)
        )
    for d in run_multiple_simulations(args):
        multiple_queues_latency.extend([i.t_total for i in d])
    print('({0:>4.2f} -> {1:>4.2f})'.format(qps, np.mean(multiple_queues_latency)), end=', '),
    percentiles = np.percentile(multiple_queues_latency, [10, 50, 90])
    multi.append(percentiles)
single = []
args.clear()
for qps in qps_range:
    args.append(
        ((1, SERVERS), rr_lb, num_requests, qps, expon_avg)
    )
    
print("\nSingle Queue")
for idx, results in enumerate(run_multiple_simulations(args)):
    d = [i.t_total for i in results]
    print('({0:>4.2f} -> {1:>4.2f})'.format(qps_range[idx], np.mean(d)), end=', ')
    percentiles = np.percentile(d, [25, 50, 75])
    single.append(percentiles)
    
    
In [9]:
    
import matplotlib.gridspec as gridspec
fig = plt.figure(figsize=(14, 5))
gs = gridspec.GridSpec(1, 2, hspace=0.1, wspace=0.1)
plt.suptitle("Response Time Distribution, {0}ms avg response time with {1} servers".format(
    AVG_RESPONSE_MS, SERVERS), fontsize=14
)
ax1 = plt.subplot(gs[0, 0])
ax1.plot(qps_range, [m[1] for m in multi], '.b-', label='Multi-Queue median')
ax1.fill_between(qps_range, [m[0] for m in multi], [m[2] for m in multi], alpha=0.4,
                label='Multi-Queue IQR', edgecolor='black')
ax1.set_title('Multi-Queue Performance')
ax1.minorticks_on()
ax1.grid(which='major', linestyle=':', linewidth='0.4', color='black')
ax1.grid(which='minor', linestyle=':', linewidth='0.4', color='black')
ax1.set_xlabel('QPS (req / s)')
ax1.set_ylabel('Response Time (ms)')
ax1.legend(loc='upper left')
ax2 = plt.subplot(gs[0, 1], sharey=ax1)
ax2.plot(qps_range, [m[1] for m in single], '.g-', label='Single-Queue median')
ax2.fill_between(qps_range, [m[0] for m in single], [m[2] for m in single], alpha=0.4,
                label='Single-Queue IQR', color='lightgreen', edgecolor='black')
ax2.set_title('Single-Queue Performance')
ax2.minorticks_on()
ax2.grid(which='major', linestyle=':', linewidth='0.4', color='black')
ax2.grid(which='minor', linestyle=':', linewidth='0.4', color='black')
ax2.set_xlabel('QPS (req / s)')
ax2.legend(loc='upper left')
plt.show()
    
    
Now we look at the affects of choosing different load balancing algorithms on the multiple queue approach:
In [10]:
    
multiple_queues_latency = []
for i in range(SERVERS):
    multiple_queues_latency.extend([
        i[2] for i in run_simulation((1, 1), rr_lb, NUM_REQUESTS/SERVERS,
                                     request_per_s=QPS/SERVERS, latency_fn=expon(AVG_RESPONSE_MS))
    ])
random_queue_latency = [
    i[2] for i in run_simulation((SERVERS, 1), random_lb, NUM_REQUESTS, QPS, expon(AVG_RESPONSE_MS))
]
join_shorted_queue_latency = [
    i[2] for i in run_simulation((SERVERS, 1), shortest_queue_lb, NUM_REQUESTS, QPS, expon(AVG_RESPONSE_MS))
]
two_adjacent_latency = [
    i[2] for i in run_simulation((SERVERS, 1), choice_two_adjacent_lb, NUM_REQUESTS, QPS, expon(AVG_RESPONSE_MS))
]
# And now the simulation
all_queueing_options = {
    'Multiple Queues': multiple_queues_latency,
    'Single Queue': single_queue_latency,
    'LB: Join Shortest': join_shorted_queue_latency,
    'LB: Best of Two Adj': two_adjacent_latency,
    'LB: Random': random_queue_latency
}
print()
print("Simulation results")
print("------------------")
hdr = "{0:20} | {1:>7} | {2:>7} | {3:>7} | {4:>7} | {5:>7} | {6:>7} ".format(
"Strategy", "mean", "var", "p50", "p95", "p99", "p99.9")
print(hdr)
for opt in sorted(all_queueing_options.keys()):
    mean = np.mean(all_queueing_options[opt])
    var = np.var(all_queueing_options[opt])
    percentiles = np.percentile(all_queueing_options[opt], [50, 95, 99, 99.9])
    print ("{0:20} | {1:7.2f} | {2:7.2f} | {3:7.2f} | {4:7.2f} | {5:>7.2f} | {6:7.2f} |".format(
        opt, mean, var, percentiles[0], percentiles[1], percentiles[2],
        percentiles[3]
    ))
    
    
In [11]:
    
fig1, ax = plt.subplots(figsize=(12,4))
opts = sorted(all_queueing_options.keys())
data = [all_queueing_options[i] for i in opts]
bplot1 = ax.boxplot(data, whis=[1,99],showfliers=True,flierprops=flier, labels=opts,
                    patch_artist=True, vert=False)
color_bplot(bplot1, 'black', 'lightblue')
plt.title("Response Time Distribution \n[{0} QPS @ {1}ms avg with {2} servers]".format(
    QPS, AVG_RESPONSE_MS, SERVERS)
)
plt.minorticks_on()
plt.grid(which='major', linestyle=':', linewidth='0.4', color='black')
plt.grid(which='minor', linestyle=':', linewidth='0.4', color='black')
plt.xlabel('Response Time (ms)')
plt.show()
    
    
Now we look at M/G/k queues over multiple different load balancing choices.
We explore:
In [12]:
    
lb_algos = {
    'choice-of-two': choice_two_lb,
    'random': random_lb,
    'round-robin': rr_lb,
    'shortest-queue': shortest_queue_lb,
}
lbs = {
    k : [i[2] for i in run_simulation((SERVERS, 1), v, NUM_REQUESTS, QPS, pareto(AVG_RESPONSE_MS, 2))]
    for (k, v) in lb_algos.items()
}
lbs['MGk'] = [
    i[2] for i in run_simulation((1, SERVERS), rr_lb, NUM_REQUESTS, QPS, pareto(AVG_RESPONSE_MS, 2))]
lbs['join-idle'] = [
    i[2] for i in run_simulation((1, SERVERS), rr_lb, NUM_REQUESTS, QPS,
                                 lambda request: 0.1 + pareto(AVG_RESPONSE_MS, 2)(request))
]
types = sorted(lbs.keys())
hdr = "{0:20} | {1:>7} | {2:>7} | {3:>7} | {4:>7} | {5:>7} | {6:>7} ".format(
"Strategy", "mean", "var", "p50", "p95", "p99", "p99.9")
#hdr = "Strategy   |  mean   |   var   |   p50   | p95 | p99 | p99.9 | max"
print(hdr)
for lb in types:
    mean = np.mean(lbs[lb])
    var = np.var(lbs[lb])
    percentiles = np.percentile(lbs[lb], [50, 95, 99, 99.9])
    print ("{0:20} | {1:7.1f} | {2:7.1f} | {3:7.1f} | {4:7.1f} | {5:>7.1f} | {6:7.1f} |".format(
        lb, mean, var, percentiles[0], percentiles[1], percentiles[2],
        percentiles[3]
    ))
    
    
In [13]:
    
fig1, ax = plt.subplots(figsize=(14,7))
diamond = dict(markerfacecolor='r', marker='D')
data = [lbs[i] for i in types]
bplot1 = ax.boxplot(data, whis=[2.5,97.5],showfliers=False,flierprops=flier, labels=types,
                    patch_artist=True, vert=False)
color_bplot(bplot1, 'black', 'lightblue')
plt.title('Response Distribution M/G Process ({0} QPS @ {1}ms avg with {2} servers):'.format(
    QPS, AVG_RESPONSE_MS, SERVERS)
)
plt.minorticks_on()
plt.grid(which='major', linestyle=':', linewidth='0.4', color='black')
plt.grid(which='minor', linestyle=':', linewidth='0.4', color='black')
plt.xlabel('Response Time (ms)')
plt.show()