We will examine policy interventation strategies to address and control the spread of HIV in a sexual network.
Given the heterogeneity of behavior and network effects of sexually-transmitted diseases, this problem requires ABM and network methodology. Differential equation compartment models will not allow for incomplete or scale-free graph structures or time-varying agent behavior, and so cannot be used.
Observe the mean and standard deviation of sexual partner degree distribution
To formally describe our model, let's break it down into pieces:
In this model, our space will be a two-dimensional (2D) square grid. Each grid cell will contain zero or one people. Edges of the grid will wrap around.
In this case, people are the carriers of HIV. We are modeling them as simply as possible, using only the following properties:
Properties
For their step function, agents will perform the following:
A "public health" institution will manage the level of subsidy for condoms. For now, the subsidy level will be set at $t_0$ and constant over model run.
prob_hookup
randomly initialized to a value from a uniform continuous distribution.condom_budget
randomly initialized to a value from a uniform continuous distribution.The public health institution will set a level of subsidy for condoms using a random uniform continuous distribution.
Based on the description above, we need the following model parameters:
grid_size
: size of the two-dimensional square grid, i.e., dimension lengthnum_people
: number of persons to create; must be less than ${grid\_size}^2$min_subsidy
, max_subsidy
: the lower and upper bounds for the institution's subsidymin_condom_budget
, max_condom_budget
: the lower and upper bounds for initial conditions of people's condom_budget
condom_cost
: cost of a condom; fixed to $1.0$min_prob_hookup
, max_prob_hookup
: the lower and upper bounds for initial conditions of people's prob_hookup
prob_transmit
, prob_transmit_condom
: the probability of transmitting the disease without and with a condom
In [8]:
%matplotlib inline
# Standard imports
import copy
import itertools
# Scientific computing imports
import numpy
import matplotlib.pyplot as plt
import networkx
import pandas
import seaborn; seaborn.set()
# Import widget methods
from IPython.html.widgets import *
Below, we will define our person class. This can be broken up as follows:
Person()
. This is in the __init__
method.decide_condom
: decide if the person will purchase a condom, i.e., by checking that $condom\_budget >= condom\_cost - subsidy$decide_hookup
: decide if the person will hookup, i.e., by sampling with $p=prob\_hookup$
In [2]:
class Person(object):
"""
Person class, which encapsulates the entire behavior of a person.
"""
def __init__(self, model, person_id, is_infected=False, condom_budget=1.0, prob_hookup=0.5):
"""
Constructor for Person class. By default,
* not infected
* will always buy condoms
* will hookup 50% of the time
Note that we must "link" the Person to their "parent" Model object.
"""
# Set model link and ID
self.model = model
self.person_id = person_id
# Set Person parameters.
self.is_infected = is_infected
self.condom_budget = condom_budget
self.prob_hookup = prob_hookup
def decide_condom(self):
"""
Decide if we will use a condom.
"""
if self.condom_budget >= (self.model.condom_cost - self.model.condom_subsidy):
return True
else:
return False
def decide_hookup(self):
"""
Decide if we want to hookup with a potential partner.
"""
if numpy.random.random() <= self.prob_hookup:
return True
else:
return False
def get_position(self):
"""
Return position, calling through model.
"""
return self.model.get_person_position(self.person_id)
def get_neighbors(self):
"""
Return neighbors, calling through model.
"""
return self.model.get_person_neighbors(self.person_id)
def __repr__(self):
'''
Return string representation.
'''
skip_none = True
repr_string = type(self).__name__ + " ["
except_list = "model"
elements = [e for e in dir(self) if str(e) not in except_list]
for e in elements:
# Make sure we only display "public" fields; skip anything private (_*), that is a method/function, or that is a module.
if not e.startswith("_") and eval('type(self.{0}).__name__'.format(e)) not in ['DataFrame', 'function', 'method', 'builtin_function_or_method', 'module', 'instancemethod']:
value = eval("self." + e)
if value != None and skip_none == True:
repr_string += "{0}={1}, ".format(e, value)
# Clean up trailing space and comma.
return repr_string.strip(" ").strip(",") + "]"
Below, we will define our model class. This can be broken up as follows:
Model()
. This is in the __init__
method.setup_space
: method to create our "space"setup_people
: method to create our "people"setup_institution
: method to create our "institution"get_neighbors
: method to get neighboring agents based on positionget_person_neighbors
: method to get neighboring agents based on agent IDget_person_position
: method to get position based on agent IDmove_person
: method to move an agent to a new positionstep_move
: method to step through agent movesstep_interact
: method to step through agent interactionstep
: main step method to control each time step simulation
In [1]:
class Model(object):
"""
Model class, which encapsulates the entire behavior of a single "run" in our HIV ABM.
"""
def __init__(self, grid_size, num_people, min_subsidy=0.0, max_subsidy=1.0,
min_condom_budget=0.0, max_condom_budget=2.0,
condom_cost=1.0, min_prob_hookup=0.0, max_prob_hookup=1.0,
prob_transmit=0.9, prob_transmit_condom=0.1):
"""
Class constructor.
"""
# Set our model parameters; this is long but simple!
self.grid_size = grid_size
self.num_people = num_people
self.min_subsidy = min_subsidy
self.max_subsidy = max_subsidy
self.min_condom_budget = min_condom_budget
self.max_condom_budget = max_condom_budget
self.condom_cost = condom_cost
self.min_prob_hookup = min_prob_hookup
self.max_prob_hookup = max_prob_hookup
self.prob_transmit = prob_transmit
self.prob_transmit_condom = prob_transmit_condom
# Set our state variables
self.t = 0
self.space = numpy.array((0,0))
self.condom_subsidy = 0.0
self.people = []
self.num_interactions = 0
self.num_interactions_condoms = 0
self.num_infected = 0
# Setup our history variables.
self.history_space = []
self.history_space_infected = []
self.history_interactions = []
self.history_num_infected = []
self.history_num_interactions = []
self.history_num_interactions_condoms = []
# Call our setup methods to initialize space, people, and institution.
self.setup_space()
self.setup_people()
self.setup_institution()
def setup_space(self):
"""
Method to setup our space.
"""
# Initialize a space with a NaN's
self.space = numpy.full((self.grid_size, self.grid_size), numpy.nan)
def setup_people(self):
"""
Method to setup our space.
"""
# First, begin by creating all agents without placing them.
for i in xrange(self.num_people):
self.people.append(Person(model=self,
person_id=i,
is_infected=False,
condom_budget=numpy.random.uniform(self.min_condom_budget, self.max_condom_budget),
prob_hookup=numpy.random.uniform(self.min_prob_hookup, self.max_prob_hookup)))
# Second, once created, place them into the space.
for person in self.people:
# Loop until unique
is_occupied = True
while is_occupied:
# Sample location
random_x = numpy.random.randint(0, self.grid_size)
random_y = numpy.random.randint(0, self.grid_size)
# Check if unique
if numpy.isnan(self.space[random_x, random_y]):
is_occupied = False
else:
is_occupied = True
# Now place the person there by setting their ID.
self.space[random_x, random_y] = person.person_id
# Third, pick one person to be infected initially.
random_infected = numpy.random.choice(range(self.num_people))
self.people[random_infected].is_infected = True
self.num_infected += 1
def setup_institution(self):
"""
Method to setup our space.
"""
# Randomly sample a subsidy level
self.condom_subsidy = numpy.random.uniform(self.min_subsidy, self.max_subsidy)
def get_neighborhood(self, x, y, distance=1):
"""
Get a Moore neighborhood of distance from (x, y).
"""
neighbor_pos = [ ( x % self.grid_size, y % self.grid_size)
for x, y in itertools.product(xrange(x-distance, x+distance+1),
xrange(y-distance, y+distance+1))]
return neighbor_pos
def get_neighbors(self, x, y, distance=1):
"""
Get any neighboring persons within distance from (x, y).
"""
neighbor_pos = self.get_neighborhood(x, y, distance)
neighbor_list = []
for pos in neighbor_pos:
# Skip identity
if pos[0] == x and pos[1] == y:
continue
# Check if empty
if not numpy.isnan(self.space[pos[0], pos[1]]):
neighbor_list.append(int(self.space[pos[0], pos[1]]))
return neighbor_list
def get_person_position(self, person_id):
"""
Get the position of a person based on their ID.
"""
# Find the value that matches our ID in self.space, then reshape to a 2-element list.
return numpy.reshape(numpy.where(self.space == person_id), (1, 2))[0].tolist()
def get_person_neighbors(self, person_id, distance=1):
"""
Get the position of a person based on their ID.
"""
# Find the value that matches our ID in self.space, then reshape to a 2-element list.
x, y = self.get_person_position(person_id)
return self.get_neighbors(x, y, distance)
def move_person(self, person_id, x, y):
"""
Move a person to a new (x, y) location.
"""
# Get original
original_position = self.get_person_position(person_id)
# Check target location
if not numpy.isnan(self.space[x, y]):
raise ValueError("Unable to move person {0} to ({1}, {2}) since occupied.".format(person_id, x, y))
# Otherwise, move by emptying and setting.
self.space[original_position[0], original_position[1]] = numpy.nan
self.space[x, y] = person_id
def step_move(self):
"""
Model step move function, which handles moving agents randomly around.
"""
# Get a random order for the agents.
random_order = range(self.num_people)
numpy.random.shuffle(random_order)
# Iterate in random order.
for i in random_order:
# Get current position
x, y = self.get_person_position(i)
# Move our agent between -1, 0, +1 in each dimension
x_new = (x + numpy.random.randint(-1, 2)) % self.grid_size
y_new = (y + numpy.random.randint(-1, 2)) % self.grid_size
# Try to move them
try:
self.move_person(i, x_new, y_new)
except ValueError:
# Occupied, so fail.
pass
def step_interact(self):
"""
"Interact" the agents by seeing if they will hookup and spread.
"""
# Get a random order for the agents.
random_order = range(self.num_people)
numpy.random.shuffle(random_order)
# Track which pairs we've tested. Don't want to "interact" them twice w/in one step.
seen_pairs = []
# Iterate in random order.
for i in random_order:
# Get neighbors
neighbors = self.get_person_neighbors(i)
# Iterate over neighbors
for neighbor in neighbors:
# Check if we've already seen.
a = min(i, neighbor)
b = max(i, neighbor)
if (a, b) not in seen_pairs:
seen_pairs.append((a, b))
else:
continue
# Check if hookup if not seen.
hookup_a = self.people[a].decide_hookup()
hookup_b = self.people[b].decide_hookup()
if hookup_a and hookup_b:
# Hookup going to happen.
self.num_interactions += 1
# Check now for condoms and use resulting rate.
if self.people[a].decide_condom() or self.people[b].decide_condom():
# Using a condom.
self.num_interactions_condoms += 1
use_condom = True
if self.people[a].is_infected or self.people[b].is_infected:
is_transmission = numpy.random.random() <= self.prob_transmit_condom
else:
is_transmission = False
else:
# Not using a condom.
use_condom = False
if self.people[a].is_infected or self.people[b].is_infected:
is_transmission = numpy.random.random() <= self.prob_transmit
else:
is_transmission = False
# Now infect.
self.history_interactions.append((self.t, a, b, use_condom, is_transmission))
if is_transmission:
self.people[a].is_infected = True
self.people[b].is_infected = True
def get_num_infected(self):
"""
Get the number of infected persons.
"""
# Count
infected = 0
for person in self.people:
if person.is_infected:
infected += 1
return infected
def step(self):
"""
Model step function.
"""
# "Interact" agents.
self.step_interact()
# Move agents
self.step_move()
# Increment steps and track history.
self.t += 1
self.history_space.append(copy.deepcopy(self.space))
self.history_space_infected.append(self.get_space_infected())
self.num_infected = self.get_num_infected()
self.history_num_infected.append(self.num_infected)
self.history_num_interactions.append(self.num_interactions)
self.history_num_interactions_condoms.append(self.num_interactions_condoms)
def get_space_infected(self, t=None):
"""
Return a projection of the space that shows which cells have an infected person.
"""
if t == None:
# Initialize empty
infected_space = numpy.zeros_like(self.space)
# Iterate over persons and set.
for p in self.people:
x, y = self.get_person_position(p.person_id)
if p.is_infected:
infected_space[x, y] = +1
else:
infected_space[x, y] = -1
# Return
return infected_space
else:
# Return historical step
return self.history_space_infected[t]
def __repr__(self):
'''
Return string representation.
'''
skip_none = True
repr_string = type(self).__name__ + " ["
elements = dir(self)
for e in elements:
# Make sure we only display "public" fields; skip anything private (_*), that is a method/function, or that is a module.
e_type = eval('type(self.{0}).__name__'.format(e))
if not e.startswith("_") and e_type not in ['DataFrame', 'function', 'method', 'builtin_function_or_method', 'module', 'instancemethod']:
value = eval("self." + e)
if value != None and skip_none == True:
if e_type in ['list', 'set', 'tuple']:
repr_string += "\n\n\t{0}={1},\n\n".format(e, value)
elif e_type in ['ndarray']:
repr_string += "\n\n\t{0}=\t\n{1},\n\n".format(e, value)
else:
repr_string += "{0}={1}, ".format(e, value)
# Clean up trailing space and comma.
return repr_string.strip(" ").strip(",") + "]"
In order to test our model, we can:
grid_size, num_people, min_prob_hookup,
and max_prob_hookup
.
In [56]:
# Create model and output
m = Model(grid_size=20, num_people=25, min_prob_hookup=0.25, max_prob_hookup=0.5)
In [57]:
# Step over the model for a few steps
for i in xrange(1000):
# Step
m.step()
# Update every 100 steps
if i % 100 == 0:
print((m.t, m.get_num_infected(), m.num_interactions, m.num_interactions_condoms))
In [58]:
# Plot time series.
f = plt.figure()
plt.subplot(211)
plt.plot(m.history_num_infected)
plt.legend(("Number of infections"), loc="best")
plt.subplot(212)
plt.plot(numpy.array(m.history_num_interactions) - numpy.array(m.history_num_interactions_condoms))
plt.plot(m.history_num_interactions_condoms)
plt.legend(("Number of interactions without condoms",
"Number of interactions with condoms"),
loc="best")
Out[58]:
In [59]:
# Get colormap
cmap = seaborn.cubehelix_palette(light=1, as_cmap=True)
def plot_space_infected(t=None):
"""
Return a projection of the space that shows which cells have an infected person.
"""
f = plt.figure()
plt.title("Infected space at t={0}".format(t))
plt.pcolor(m.get_space_infected(t), vmin=-1, vmax=1, cmap=cmap)
ax = f.gca()
ax.set_aspect(1./ax.get_data_ratio())
plt.colorbar()
interact(plot_space_infected,
t=IntSlider(min=1, max=m.t-1, step=1))
In [60]:
# Get the network of infections
edge_weight = {}
edge_color = {}
# Iterate over all interactions
for e in m.history_interactions:
# Check if we have recorded this pair before.
if (e[1], e[2]) not in edge_weight:
edge_weight[(e[1], e[2])] = 1
if e[-1] == True:
edge_color[(e[1], e[2])] = "red"
else:
edge_color[(e[1], e[2])] = "#dddddd"
else:
edge_weight[(e[1], e[2])] += 1
if e[-1] == True:
edge_color[(e[1], e[2])] = "red"
else:
edge_color[(e[1], e[2])] = "#dddddd"
In [71]:
# Create graph
g = networkx.Graph()
g.add_nodes_from(range(m.num_people))
g.add_edges_from(edge_weight.keys())
g_layout = networkx.spring_layout(g, iterations=100)
# Get node info
healthy_nodes = [i for i in range(m.num_people) if not m.people[i].is_infected]
infected_nodes = [i for i in range(m.num_people) if m.people[i].is_infected]
# Now we can visualize the infection network.
f = plt.figure(figsize=(16, 16))
ax = f.gca()
networkx.draw_networkx_nodes(g, g_layout,
nodelist=infected_nodes,
node_size=25,
node_color='#b92732')
networkx.draw_networkx_nodes(g, g_layout,
node_size=25,
nodelist=healthy_nodes,
node_color='#64a171')
networkx.draw_networkx_edges(g, g_layout,
width=1,
alpha=0.5,
edge_cmap=cmap,
edge_color=edge_weight.values())
ax.grid(False)
ax.set_axis_bgcolor("#333333")