In [5]:
# %load src/request_simulator.py
from collections import namedtuple
import random
import numpy as np
import simpy
LatencyDatum = namedtuple(
'LatencyDatum',
('t_queued', 't_processing', 't_total')
)
class RequestSimulator(object):
""" Simulates a M/G/k process common in request processing (computing)
:param worker_desc: A tuple of (count, capacity) to construct workers with
:param local_balancer: A function which takes the current request number
and the list of workers and returns the index of the worker to send the
next request to
:param latency_fn: A function which takes the curent
request number and the worker that was assigned by the load balancer
amd returns the number of milliseconds a request took to process
:param number_of_requests: The number of requests to run through the
simulator
:param request_per_s: The rate of requests per second.
"""
def __init__(
self, worker_desc, load_balancer, latency_fn,
number_of_requests, request_per_s):
self.worker_desc = worker_desc
self.load_balancer = load_balancer
self.latency_fn = latency_fn
self.number_of_requests = int(number_of_requests)
self.request_interval_ms = 1. / (request_per_s / 1000.)
self.data = []
def simulate(self):
# Setup and start the simulation
random.seed(1)
np.random.seed(1)
self.env = simpy.Environment()
count, cap = self.worker_desc
self.workers = [
simpy.Resource(self.env, capacity=cap) for i in range(count)
]
self.env.process(self.generate_requests())
self.env.run()
def generate_requests(self):
for i in range(self.number_of_requests):
idx = self.load_balancer(i, self.workers)
worker = self.workers[idx]
response = self.process_request(
i, worker,
)
self.env.process(response)
# Exponential inter-arrival times == Poisson
arrival_interval = random.expovariate(
1.0 / self.request_interval_ms
)
yield self.env.timeout(arrival_interval)
def process_request(self, request_id, worker):
""" Request arrives, possibly queues, and then processes"""
t_arrive = self.env.now
with worker.request() as req:
yield req
t_start = self.env.now
t_queued = t_start - t_arrive
# Let the operation take w.e. amount of time the latency
# function tells us to
yield self.env.timeout(self.latency_fn(request_id))
t_done = self.env.now
t_processing = t_done - t_start
t_total_response = t_done - t_arrive
datum = LatencyDatum(t_queued, t_processing, t_total_response)
self.data.append(datum)
def run_simulation(
worker_desc, load_balancer, num_requests, request_per_s, latency_fn):
simulator = RequestSimulator(
worker_desc, load_balancer, latency_fn,
num_requests, request_per_s
)
simulator.simulate()
return simulator.data
In [6]:
# %load src/speculation_simulator.py
from collections import namedtuple
import random
import numpy as np
import simpy
LatencyDatum = namedtuple(
'LatencyDatum',
('t_queued', 't_processing', 't_total')
)
class SpeculatingRequestExecutor(object):
""" Simulates a M/G/k process common in request processing (computing) but
with always on speculation to another host
:param worker_desc: A tuple of (count, capacity) to construct workers with
:param local_balancer: A function which takes the current request number
and the list of workers and returns the index of the worker to
send the next request to
:param latency_fn: A function which takes the curent request number and the
worker that was assigned by the load balancer and returns the number of
milliseconds a request took to process
:param number_of_requests: The number of requests to run through the
simulator
:param request_per_s: The rate of requests per second.
"""
def __init__(
self, worker_desc, load_balancer, latency_fn,
number_of_requests, request_per_s):
self.worker_desc = worker_desc
self.load_balancer = load_balancer
self.latency_fn = latency_fn
self.number_of_requests = int(number_of_requests)
self.request_interval_ms = 1. / (request_per_s / 1000.)
self.received_first = {'1': 0, '2': 0}
self.data = []
def simulate(self):
# Setup and start the simulation
random.seed(1)
np.random.seed(1)
self.env = simpy.Environment()
count, cap = self.worker_desc
self.workers = [
simpy.Resource(self.env, capacity=cap) for i in range(count)
]
self.env.process(self.generate_requests())
self.env.run()
def generate_requests(self):
for i in range(self.number_of_requests):
workers = []
for j in range(2):
idx = self.load_balancer(i, self.workers)
workers.append(self.workers[idx])
response = self.process_request(
i, workers[0], workers[1],
)
self.env.process(response)
# Exponential inter-arrival times == Poisson
arrival_interval = random.expovariate(
1.0 / self.request_interval_ms
)
yield self.env.timeout(arrival_interval)
def process_request(self, request_id, worker1, worker2):
""" Request arrives, possibly queues, and then processes"""
t_arrive = self.env.now
req1 = worker1.request()
req2 = worker2.request()
try:
result = yield req1 | req2
if req1 in result:
self.received_first['1'] += 1
req2.cancel()
req2.resource.release(req2)
else:
self.received_first['2'] += 1
req1.cancel()
req1.resource.release(req1)
t_start = self.env.now
t_queued = t_start - t_arrive
# Let the operation take w.e. amount of time the latency
# function tells us to
yield self.env.timeout(self.latency_fn(request_id))
t_done = self.env.now
t_processing = t_done - t_start
t_total_response = t_done - t_arrive
self.data.append(LatencyDatum(
t_queued, t_processing, t_total_response))
finally:
worker1.release(req1)
worker2.release(req2)
def run_speculation(
worker_desc, load_balancer, num_requests, request_per_s, latency_fn):
simulator = SpeculatingRequestExecutor(
worker_desc, load_balancer, latency_fn,
num_requests, request_per_s
)
simulator.simulate()
return simulator.data, simulator.received_first
In [7]:
def queue_size(resource):
return resource.count + len(resource.queue)
def random_lb(request_num, workers):
return random.randint(0, len(workers) - 1)
def choice_two_lb(request_num, workers):
r1 = random_lb(request_num, workers)
r2 = random_lb(request_num, workers)
if queue_size(workers[r1]) < queue_size(workers[r2]):
return r1
return r2
def choice_two_adjacent_lb(request_num, workers):
r1 = random_lb(request_num, workers)
if r1 + 2 >= len(workers):
r2 = r1 - 1
r3 = r1 - 2
else:
r2 = r1 + 1
r3 = r1 + 2
iq = [(queue_size(workers[i]), i) for i in (r1, r2, r3)]
return (sorted(iq)[0][1])
def pareto(mean, shape):
# mean = scale * shape / (shape - 1)
# solve for scale given mean and shape (aka skew)
scale = mean - mean / shape
def latency(request):
return ((np.random.pareto(shape) + 1) * scale)
return latency
In [8]:
S_REQUESTS = 60000
S_QPS = 18000
AVG_RESPONSE_MS = 0.4
SHAPE = 1.5 # heavily skewed
SERVERS = 10
data_spec, count = run_speculation((SERVERS, 1), choice_two_adjacent_lb, S_REQUESTS,
S_QPS, pareto(AVG_RESPONSE_MS, SHAPE))
data_sim = run_simulation((SERVERS, 1), choice_two_adjacent_lb, S_REQUESTS,
S_QPS, pareto(AVG_RESPONSE_MS, SHAPE))
spec = [i.t_total for i in data_spec]
sim = [i.t_total for i in data_sim]
print(
"{0:11} | {1:>7} | {2:>7} | {3:>7} | {4:>7} |".format(
"", "p50", "p95", "p99", "p99.9"
))
print("Speculation | {0:7.3f} | {1:7.3f} | {2:7.3f} | {3:7.3f} |".format(
*np.percentile(spec, [50, 95, 99, 99.9])
))
print("Normal | {0:7.3f} | {1:7.3f} | {2:7.3f} | {3:7.3f} |".format(
*np.percentile(sim, [50, 95, 99, 99.9])
))
print("Worker Distribution: {}".format(sorted(count.items()), S_REQUESTS))
assert len(spec) == len(sim)
In [10]:
import numpy as np
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import matplotlib.style as style
style.use('seaborn-pastel')
def color_bplot(bp, edge_color, fill_color):
for element in ['boxes', 'whiskers', 'fliers', 'means', 'medians', 'caps']:
plt.setp(bp[element], color=edge_color)
for box in bp['boxes']:
box.set_facecolor(fill_color)
flier = dict(markerfacecolor='r', marker='.')
fig1, ax = plt.subplots(figsize=(12,4))
speculation_opts = {
'No Speculation': sim,
'Speculation': spec
}
opts = sorted(speculation_opts.keys())
data = [speculation_opts[i] for i in opts]
bplot1 = ax.boxplot(data, whis=[1,99],showfliers=False,flierprops=flier, labels=opts,
patch_artist=True, vert=False)
color_bplot(bplot1, 'black', 'lightblue')
plt.title("Response Time Distribution using Choice of Two Adjacent\n"
"[{0} QPS @ {1}ms avg with {2} servers]".format(
S_QPS, AVG_RESPONSE_MS, SERVERS)
)
plt.minorticks_on()
plt.grid(which='major', linestyle=':', linewidth='0.4', color='black')
plt.grid(which='minor', linestyle=':', linewidth='0.4', color='black')
plt.xlabel('Response Time (ms)')
plt.show()
In [ ]: