CSCS530 Winter 2015

Complex Systems 530 - Computer Modeling of Complex Systems (Winter 2015)

View this repository on NBViewer

HIV Model Outline

Goal

We will examine policy interventation strategies to address and control the spread of HIV in a sexual network.

Justification

Given the heterogeneity of behavior and network effects of sexually-transmitted diseases, this problem requires ABM and network methodology. Differential equation compartment models will not allow for incomplete or scale-free graph structures or time-varying agent behavior, and so cannot be used.

Outline

  • Implement condom subsidy mechanics
  • Implement gossip network to share status
  • Vary condom subsidy and gossip network structure
  • Observe how number of steps to 50% infected
  • Observe the mean and standard deviation of sexual partner degree distribution

    To formally describe our model, let's break it down into pieces:

I. Space

In this model, our space will be a two-dimensional (2D) square grid. Each grid cell will contain zero or one people. Edges of the grid will wrap around.

II. Actors

A. People

In this case, people are the carriers of HIV. We are modeling them as simply as possible, using only the following properties:

Properties

  • _isinfected: is the person infected with HIV?
  • _condombudget: what is the person's budget for protection?
  • _probhookup: what is the probability that a person will want to hookup given contact?

For their step function, agents will perform the following:

  • take a unit-distance step in a random direction
  • evaluate neighboring cells for any potential people to hookup with
  • for each pair, check against _probhookup; if both people sample True, then hookup
  • for a hookup, check against _maxbudget and the global condom cost to see if either partner will purchase

B. Institution

A "public health" institution will manage the level of subsidy for condoms. For now, the subsidy level will be set at $t_0$ and constant over model run.

III. Initial Conditions

A. People

  • People will be randomly distributed throughout the grid by sampling from a uniform discrete distribution with replacement. In the event that an agent has already been placed at the sampled location, we continue drawing a new nandom position until it is unoccupied.
  • People will have their prob_hookup randomly initialized to a value from a uniform continuous distribution.
  • People will have their condom_budget randomly initialized to a value from a uniform continuous distribution.
  • A single person will be selected at random at $t_0$ to have HIV.

B. Insitution

The public health institution will set a level of subsidy for condoms using a random uniform continuous distribution.

IV. Model Parameters

Based on the description above, we need the following model parameters:

  • grid_size: size of the two-dimensional square grid, i.e., dimension length
  • num_people: number of persons to create; must be less than ${grid\_size}^2$
  • min_subsidy, max_subsidy: the lower and upper bounds for the institution's subsidy
  • min_condom_budget, max_condom_budget: the lower and upper bounds for initial conditions of people's condom_budget
  • condom_cost: cost of a condom; fixed to $1.0$
  • min_prob_hookup, max_prob_hookup: the lower and upper bounds for initial conditions of people's prob_hookup
  • prob_transmit, prob_transmit_condom: the probability of transmitting the disease without and with a condom

In [8]:
%matplotlib inline

# Standard imports
import copy
import itertools

# Scientific computing imports
import numpy
import matplotlib.pyplot as plt
import networkx
import pandas
import seaborn; seaborn.set()

# Import widget methods
from IPython.html.widgets import *

Person Class

Below, we will define our person class. This can be broken up as follows:

  • constructor: class constructor, which "initializes" or "creates" the person when we call Person(). This is in the __init__ method.
  • decide_condom: decide if the person will purchase a condom, i.e., by checking that $condom\_budget >= condom\_cost - subsidy$
  • decide_hookup: decide if the person will hookup, i.e., by sampling with $p=prob\_hookup$

In [2]:
class Person(object):
    """
    Person class, which encapsulates the entire behavior of a person.
    """
    
    def __init__(self, model, person_id, is_infected=False, condom_budget=1.0, prob_hookup=0.5):
        """
        Constructor for Person class.  By default,
          * not infected
          * will always buy condoms
          * will hookup 50% of the time
          
        Note that we must "link" the Person to their "parent" Model object.
        """
        # Set model link and ID
        self.model = model
        self.person_id = person_id
        
        # Set Person parameters.
        self.is_infected = is_infected
        self.condom_budget = condom_budget
        self.prob_hookup = prob_hookup
        
    def decide_condom(self):
        """
        Decide if we will use a condom.
        """
        if self.condom_budget >= (self.model.condom_cost - self.model.condom_subsidy):
            return True
        else:
            return False
    
    def decide_hookup(self):
        """
        Decide if we want to hookup with a potential partner.
        """
        if numpy.random.random() <= self.prob_hookup:
            return True
        else:
            return False
    
    def get_position(self):
        """
        Return position, calling through model.
        """
        return self.model.get_person_position(self.person_id)
    
    def get_neighbors(self):
        """
        Return neighbors, calling through model.
        """
        return self.model.get_person_neighbors(self.person_id)

    def __repr__(self):
        '''
        Return string representation.
        '''
        skip_none = True
        repr_string = type(self).__name__ + " ["
        except_list = "model"

        elements = [e for e in dir(self) if str(e) not in except_list]
        for e in elements:
            # Make sure we only display "public" fields; skip anything private (_*), that is a method/function, or that is a module.
            if not e.startswith("_") and eval('type(self.{0}).__name__'.format(e)) not in ['DataFrame', 'function', 'method', 'builtin_function_or_method', 'module', 'instancemethod']:
                    value = eval("self." + e)
                    if value != None and skip_none == True:
                        repr_string += "{0}={1}, ".format(e, value)

        # Clean up trailing space and comma.
        return repr_string.strip(" ").strip(",") + "]"

Model Class

Below, we will define our model class. This can be broken up as follows:

  • constructor: class constructor, which "initializes" or "creates" the model when we call Model(). This is in the __init__ method.
  • setup_space: method to create our "space"
  • setup_people: method to create our "people"
  • setup_institution: method to create our "institution"
  • get_neighbors: method to get neighboring agents based on position
  • get_person_neighbors: method to get neighboring agents based on agent ID
  • get_person_position: method to get position based on agent ID
  • move_person: method to move an agent to a new position
  • step_move: method to step through agent moves
  • step_interact: method to step through agent interaction
  • step: main step method to control each time step simulation

In [1]:
class Model(object):
    """
    Model class, which encapsulates the entire behavior of a single "run" in our HIV ABM.
    """
    
    def __init__(self, grid_size, num_people, min_subsidy=0.0, max_subsidy=1.0,
                     min_condom_budget=0.0, max_condom_budget=2.0,
                     condom_cost=1.0, min_prob_hookup=0.0, max_prob_hookup=1.0,
                    prob_transmit=0.9, prob_transmit_condom=0.1):
        """
        Class constructor.
        """
        # Set our model parameters; this is long but simple!
        self.grid_size = grid_size
        self.num_people =  num_people
        self.min_subsidy = min_subsidy
        self.max_subsidy = max_subsidy
        self.min_condom_budget = min_condom_budget
        self.max_condom_budget = max_condom_budget
        self.condom_cost = condom_cost
        self.min_prob_hookup = min_prob_hookup
        self.max_prob_hookup = max_prob_hookup
        self.prob_transmit = prob_transmit
        self.prob_transmit_condom = prob_transmit_condom
        
        # Set our state variables
        self.t = 0
        self.space = numpy.array((0,0))
        self.condom_subsidy = 0.0
        self.people = []
        self.num_interactions = 0
        self.num_interactions_condoms = 0
        self.num_infected = 0
        
        # Setup our history variables.
        self.history_space = []
        self.history_space_infected = []
        self.history_interactions = []
        self.history_num_infected = []
        self.history_num_interactions = []
        self.history_num_interactions_condoms = []
        
        # Call our setup methods to initialize space, people, and institution.
        self.setup_space()
        self.setup_people()
        self.setup_institution()
        
    def setup_space(self):
        """
        Method to setup our space.
        """
        # Initialize a space with a NaN's
        self.space = numpy.full((self.grid_size, self.grid_size), numpy.nan)
    
    def setup_people(self):
        """
        Method to setup our space.
        """
        
        # First, begin by creating all agents without placing them.
        for i in xrange(self.num_people):
            self.people.append(Person(model=self,
                                      person_id=i,
                                      is_infected=False,
                                      condom_budget=numpy.random.uniform(self.min_condom_budget, self.max_condom_budget),
                                      prob_hookup=numpy.random.uniform(self.min_prob_hookup, self.max_prob_hookup)))
        
        # Second, once created, place them into the space.
        for person in self.people:
            # Loop until unique
            is_occupied = True
            while is_occupied:
                # Sample location
                random_x = numpy.random.randint(0, self.grid_size)
                random_y = numpy.random.randint(0, self.grid_size)
                
                # Check if unique
                if numpy.isnan(self.space[random_x, random_y]):
                    is_occupied = False
                else:
                    is_occupied = True
            
            # Now place the person there by setting their ID.
            self.space[random_x, random_y] = person.person_id
            
        # Third, pick one person to be infected initially.
        random_infected = numpy.random.choice(range(self.num_people))
        self.people[random_infected].is_infected = True
        self.num_infected += 1

    def setup_institution(self):
        """
        Method to setup our space.
        """
        # Randomly sample a subsidy level
        self.condom_subsidy = numpy.random.uniform(self.min_subsidy, self.max_subsidy)
    
    def get_neighborhood(self, x, y, distance=1):
        """
        Get a Moore neighborhood of distance from (x, y).
        """
        neighbor_pos = [ ( x % self.grid_size, y % self.grid_size)
                                for x, y in itertools.product(xrange(x-distance, x+distance+1),
                                xrange(y-distance, y+distance+1))]
        return neighbor_pos
    
    def get_neighbors(self, x, y, distance=1):
        """
        Get any neighboring persons within distance from (x, y).
        """
        neighbor_pos = self.get_neighborhood(x, y, distance)
        neighbor_list = []
        for pos in neighbor_pos:
            # Skip identity
            if pos[0] == x and pos[1] == y:
                continue
                
            # Check if empty
            if not numpy.isnan(self.space[pos[0], pos[1]]):
                neighbor_list.append(int(self.space[pos[0], pos[1]]))
        
        return neighbor_list
    
    def get_person_position(self, person_id):
        """
        Get the position of a person based on their ID.
        """
        # Find the value that matches our ID in self.space, then reshape to a 2-element list.
        return numpy.reshape(numpy.where(self.space == person_id), (1, 2))[0].tolist()

    def get_person_neighbors(self, person_id, distance=1):
        """
        Get the position of a person based on their ID.
        """
        # Find the value that matches our ID in self.space, then reshape to a 2-element list.
        x, y = self.get_person_position(person_id)
        return self.get_neighbors(x, y, distance)   
    
    def move_person(self, person_id, x, y):
        """
        Move a person to a new (x, y) location.
        """
        
        # Get original
        original_position = self.get_person_position(person_id)
        
        # Check target location
        if not numpy.isnan(self.space[x, y]):
            raise ValueError("Unable to move person {0} to ({1}, {2}) since occupied.".format(person_id, x, y))
        
        # Otherwise, move by emptying and setting.
        self.space[original_position[0], original_position[1]] = numpy.nan
        self.space[x, y] = person_id
    
    def step_move(self):
        """
        Model step move function, which handles moving agents randomly around.
        """
        
        # Get a random order for the agents.
        random_order = range(self.num_people)
        numpy.random.shuffle(random_order)
        
        # Iterate in random order.
        for i in random_order:
            # Get current position
            x, y = self.get_person_position(i)
            
            # Move our agent between -1, 0, +1 in each dimension
            x_new = (x + numpy.random.randint(-1, 2)) % self.grid_size
            y_new = (y + numpy.random.randint(-1, 2)) % self.grid_size
            
            # Try to move them
            try:
                self.move_person(i, x_new, y_new)
            except ValueError:
                # Occupied, so fail.
                pass
    
    def step_interact(self):
        """
        "Interact" the agents by seeing if they will hookup and spread.
        """
        
        # Get a random order for the agents.
        random_order = range(self.num_people)
        numpy.random.shuffle(random_order)
        
        # Track which pairs we've tested.  Don't want to "interact" them twice w/in one step.
        seen_pairs = []
        
        # Iterate in random order.
        for i in random_order:
            # Get neighbors
            neighbors = self.get_person_neighbors(i)
            
            # Iterate over neighbors
            for neighbor in neighbors:
                # Check if we've already seen.
                a = min(i, neighbor)
                b = max(i, neighbor)
                if (a, b) not in seen_pairs:
                    seen_pairs.append((a, b))
                else:
                    continue
                
                # Check if hookup if not seen.
                hookup_a = self.people[a].decide_hookup()
                hookup_b = self.people[b].decide_hookup()
                if hookup_a and hookup_b:
                    # Hookup going to happen.  
                    self.num_interactions += 1
                                        
                    # Check now for condoms and use resulting rate.
                    if self.people[a].decide_condom() or self.people[b].decide_condom():
                        # Using a condom.
                        self.num_interactions_condoms += 1
                        use_condom = True
                        
                        if self.people[a].is_infected or self.people[b].is_infected:
                            is_transmission = numpy.random.random() <= self.prob_transmit_condom
                        else:
                            is_transmission = False
                    else:
                        # Not using a condom.
                        use_condom = False
                        if self.people[a].is_infected or self.people[b].is_infected:
                            is_transmission = numpy.random.random() <= self.prob_transmit
                        else:
                            is_transmission = False
                    
                    # Now infect.
                    self.history_interactions.append((self.t, a, b, use_condom, is_transmission))
                    if is_transmission:
                        self.people[a].is_infected = True
                        self.people[b].is_infected = True
    
    def get_num_infected(self):
        """
        Get the number of infected persons.
        """
        # Count
        infected = 0
        for person in self.people:
            if person.is_infected:
                infected += 1
                
        return infected
    
    def step(self):
        """
        Model step function.
        """
        
        # "Interact" agents.
        self.step_interact()
        
        # Move agents
        self.step_move()
        
        # Increment steps and track history.
        self.t += 1
        self.history_space.append(copy.deepcopy(self.space))
        self.history_space_infected.append(self.get_space_infected())
        self.num_infected = self.get_num_infected()
        self.history_num_infected.append(self.num_infected)
        self.history_num_interactions.append(self.num_interactions)
        self.history_num_interactions_condoms.append(self.num_interactions_condoms)

    def get_space_infected(self, t=None):
        """
        Return a projection of the space that shows which cells have an infected person.
        """
        if t == None:
            # Initialize empty
            infected_space = numpy.zeros_like(self.space)
            
            # Iterate over persons and set.
            for p in self.people:
                x, y = self.get_person_position(p.person_id)
                if p.is_infected:
                    infected_space[x, y] = +1
                else:
                    infected_space[x, y] = -1
            
            # Return
            return infected_space
        else:
            # Return historical step
            return self.history_space_infected[t]

    def __repr__(self):
        '''
        Return string representation.
        '''
        skip_none = True
        repr_string = type(self).__name__ + " ["

        elements = dir(self)
        for e in elements:
            # Make sure we only display "public" fields; skip anything private (_*), that is a method/function, or that is a module.
            e_type = eval('type(self.{0}).__name__'.format(e))
            if not e.startswith("_") and e_type not in ['DataFrame', 'function', 'method', 'builtin_function_or_method', 'module', 'instancemethod']:
                    value = eval("self." + e)
                    if value != None and skip_none == True:
                        if e_type in ['list', 'set', 'tuple']:
                            repr_string += "\n\n\t{0}={1},\n\n".format(e, value)
                        elif e_type in ['ndarray']:
                            repr_string += "\n\n\t{0}=\t\n{1},\n\n".format(e, value)
                        else:
                            repr_string += "{0}={1}, ".format(e, value)

        # Clean up trailing space and comma.
        return repr_string.strip(" ").strip(",") + "]"

Testing our model

In order to test our model, we can:

  1. Initialize the model with some parameters. For example, below, we have set grid_size, num_people, min_prob_hookup, and max_prob_hookup.
  2. Iterate for some number of steps. Below, we take 1000 steps, outputting some information every 100 steps.
  3. Plot some important statistics as time series plots and visualize interaction within the space. For example, below, we plot the time series of interactions with and without condoms and number of infected over time.

In [56]:
# Create model and output
m = Model(grid_size=20, num_people=25, min_prob_hookup=0.25, max_prob_hookup=0.5)

In [57]:
# Step over the model for a few steps
for i in xrange(1000):
    # Step
    m.step()
    
    # Update every 100 steps
    if i % 100 == 0:
        print((m.t, m.get_num_infected(), m.num_interactions, m.num_interactions_condoms))


(1, 1, 0, 0)
(101, 1, 84, 83)
(201, 1, 163, 156)
(301, 1, 248, 236)
(401, 2, 325, 306)
(501, 2, 407, 386)
(601, 3, 509, 481)
(701, 6, 602, 565)
(801, 9, 689, 638)
(901, 16, 788, 735)

In [58]:
# Plot time series.
f = plt.figure()
plt.subplot(211)
plt.plot(m.history_num_infected)
plt.legend(("Number of infections"), loc="best")
plt.subplot(212)
plt.plot(numpy.array(m.history_num_interactions) - numpy.array(m.history_num_interactions_condoms))
plt.plot(m.history_num_interactions_condoms)
plt.legend(("Number of interactions without condoms",
           "Number of interactions with condoms"),
          loc="best")


Out[58]:
<matplotlib.legend.Legend at 0x1798ad10>

Visualizing space

In the code below, we use an IPython widget slider to visualize the movement in our space. The cells are color-coded so that:

  • $-1$: agent without infection
  • $0$: empty cell
  • $+1$: agent with infection

In [59]:
# Get colormap
cmap = seaborn.cubehelix_palette(light=1, as_cmap=True)

def plot_space_infected(t=None):
        """
        Return a projection of the space that shows which cells have an infected person.
        """
        f = plt.figure()
        plt.title("Infected space at t={0}".format(t))
        plt.pcolor(m.get_space_infected(t), vmin=-1, vmax=1, cmap=cmap)
        ax = f.gca()
        ax.set_aspect(1./ax.get_data_ratio())   
        plt.colorbar()
        
interact(plot_space_infected,
                t=IntSlider(min=1, max=m.t-1, step=1))



In [60]:
# Get the network of infections
edge_weight = {}
edge_color = {}

# Iterate over all interactions
for e in m.history_interactions:
    # Check if we have recorded this pair before.
    if (e[1], e[2]) not in edge_weight:
        edge_weight[(e[1], e[2])] = 1
        
        if e[-1] == True:
            edge_color[(e[1], e[2])] = "red"
        else:
            edge_color[(e[1], e[2])] = "#dddddd"
    else:
        edge_weight[(e[1], e[2])] += 1
        
        if e[-1] == True:
            edge_color[(e[1], e[2])] = "red"
        else:
            edge_color[(e[1], e[2])] = "#dddddd"

In [71]:
# Create graph
g = networkx.Graph()
g.add_nodes_from(range(m.num_people))
g.add_edges_from(edge_weight.keys())
g_layout = networkx.spring_layout(g, iterations=100)

# Get node info
healthy_nodes = [i for i in range(m.num_people) if not m.people[i].is_infected]
infected_nodes = [i for i in range(m.num_people) if m.people[i].is_infected]

# Now we can visualize the infection network.
f = plt.figure(figsize=(16, 16))
ax = f.gca()
networkx.draw_networkx_nodes(g, g_layout,
                       nodelist=infected_nodes,
                       node_size=25,
                       node_color='#b92732')

networkx.draw_networkx_nodes(g, g_layout,
                       node_size=25,
                       nodelist=healthy_nodes,
                       node_color='#64a171')

networkx.draw_networkx_edges(g, g_layout, 
                       width=1, 
                       alpha=0.5,
                       edge_cmap=cmap,
                       edge_color=edge_weight.values())

ax.grid(False)
ax.set_axis_bgcolor("#333333")