In [1]:
#!/usr/bin/python
# -*- coding: utf-8 -*-
import numpy as np
import logging
import copy
import random
import collections
import os
import csv
from time import gmtime, strftime
from matplotlib import pyplot as plt
from functools import reduce
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Cooperate Value
coop = 'C'
# Defect value
dfct = 'D'
# Von Neumann type -- TODO: Correct the typo in the whole file
vonNewmann = ['vonNewmann', 4]
# Moore type
moore = ['moore', 8]
#Imitate
imitate = 'imitate'
#Replicate
replicate = 'replicate'
class Player:
"""Class representing a player that will take part in the problems"""
# Definition of elements for a player
neighbourhood = None
game = None
payoff = 0
strategy = None
strategyHistory = None
neighbourhoodSize = None
def __init__(self, neighbourhoodSize, strategy, game):
#logging.debug('Creating instance of a player')
self.neighbourhood = None
self.strategy = strategy
self.game = game
self.strategyHistory = [self.strategy]
self.neighbourhoodSize = neighbourhoodSize
#logging.debug('Instance of player created successfully')
def play(self):
"""
Runs the game with all of the neighbours and computes the payoff for the current iteration.
"""
payoff = 0
for i in range(0,len(self.neighbourhood)):
payoff += self.game.run([self.strategy, self.neighbourhood[i].strategy])[0]
self.payoff = payoff
def imitate(self):
logging.debug('Imitating ...')
logging.debug('Current Strategy:' + self.strategy + ", own payoff: " + str(self.payoff))
self.strategy = self.maxPayoffNeighbour().strategy
self.strategyHistory.append(self.strategy)
logging.debug('New Strategy:' + self.strategy + ", Neighbour payoff: " + str(self.maxPayoffNeighbour().payoff))
def replicate(self):
logging.debug('Replicating ...')
neighbour = random.choice(self.neighbourhood)
p = (1 + (neighbour.payoff - self.payoff)/
(self.neighbourhoodSize * (max(temp,rwrd,suck,pnsh) - min(temp,rwrd,suck,pnsh)))) / 2
if random.uniform(0, 1) < p:
self.strategy = self.maxPayoffNeighbour().strategy
self.strategyHistory.append(self.strategy)
def maxPayoffNeighbour(self):
mx = self
for i in range(1,len(self.neighbourhood)):
if self.neighbourhood[i].payoff > mx.payoff:
mx = self.neighbourhood[i]
return mx
class Game:
"""Common base class for all games"""
# Definition of elements for a game
numPlayers = 2 # Number of players. Default 2
matrix = None # Game Matrix
strategies = None # Possible strategy values for the game. Stored as a dictionary with each entry containing [value, index]. The index corresponds to the one in the matrix of the game
def __init__(self, numPlayers, matrix, strategies):
logging.debug('Creating instance of game')
self.numPlayers = numPlayers
self.matrix = matrix
self.strategies = strategies
logging.debug('Instance of game created')
def run(self, strategies):
"""Executes the current game. Given the value of the game matrix and strategies chosen returns the value for both players"""
#logging.debug('Playing a game')
return self.matrix[self.strategies[strategies[0]],
self.strategies[strategies[1]]]
class Simulator:
"""Simulator class in charge of executing the main logic of the application"""
# Definition of elements for the simulator
lattice = None
game = None
avgValue = None # Value used in the terminate computation and the required plots
lastLatticeStrategy = None
latticeSize = None
neighbourhoodType = None
cooperationLevelHistory = None
cooperationLevelCompleteHistory = None
coopLvlShortHistory = None
cooperationLevelData = None
reComputeCurrentLatticeStrategy = None
currentLatticeStrategyList = None
maxLoops = None
coopLvlShortSize = None
minStdDev = 0.016
directoryOutput = None
updateType = None
def __init__(self, latticeSize, game, neighbourhoodType, avgValue, updateType):
logging.info('Creating instance of simulator')
self.game = game
self.avgValue = avgValue
self.coopLvlShortSize = int(self.avgValue/3)
self.maxLoops = self.avgValue * 3
self.latticeSize = latticeSize
self.updateType = updateType
self.neighbourhoodType = neighbourhoodType[0]
self.cooperationLevelData = []
self.cooperationLevelCompleteHistory = []
self.directoryOutput = str(latticeSize) + '_' + neighbourhoodType[0] + '_' + strftime("%Y-%m-%d_%H-%M-%S", gmtime())
self.cooperationLevelHistory = collections.deque(maxlen=self.avgValue)
self.coopLvlShortHistory = collections.deque(maxlen=int(self.coopLvlShortSize))
self.reComputeCurrentLatticeStrategy = True
self.initLattice(neighbourhoodType[1])
self.computeNeighbourhoods()
logging.info('Instance of simulator created successfully')
def initLattice(self, neighbourhoodSize):
"""Initialize the lattice with a set of nxn different players"""
logging.debug('Initializing lattice for simulator')
self.lattice = []
for i in range(0, self.latticeSize):
self.lattice.append([])
for j in range(0, self.latticeSize):
self.lattice[i].append(Player(neighbourhoodSize, self.randomStrategy(), self.game))
logging.debug('Players created in lattice for simulator')
def randomStrategy(self):
if random.uniform(0, 1) < 0.5:
return coop
return dfct
def computeNeighbourhoods(self):
"""Initialize the neighbourhoods for the players of the simulation"""
logging.debug('Computing neighbours for players in lattice')
for i in range(self.latticeSize):
for j in range(self.latticeSize):
self.lattice[i][j].neighbourhood = self.computeNeighbours(i, j)
logging.debug('Neighbours successfully assigned for players in lattice')
def computeNeighbours(self, row, col):
#logging.debug('Computing neighbours for player' + str(row) + ',' + str(col) + ' in lattice')
neighbours = []
if self.neighbourhoodType == vonNewmann[0]:
neighbours.append(copy.copy(self.lattice[row % self.latticeSize][(col - 1) % self.latticeSize]))
neighbours.append(copy.copy(self.lattice[row % self.latticeSize][(col + 1) % self.latticeSize]))
neighbours.append(copy.copy(self.lattice[(row - 1) % self.latticeSize][col % self.latticeSize]))
neighbours.append(copy.copy(self.lattice[(row + 1) % self.latticeSize][col % self.latticeSize]))
if self.neighbourhoodType == moore[0]:
neighbours.append(copy.copy(self.lattice[(row - 1) % self.latticeSize][(col - 1) % self.latticeSize]))
neighbours.append(copy.copy(self.lattice[(row - 1) % self.latticeSize][col % self.latticeSize]))
neighbours.append(copy.copy(self.lattice[(row - 1) % self.latticeSize][(col + 1) % self.latticeSize]))
neighbours.append(copy.copy(self.lattice[row % self.latticeSize][(col - 1) % self.latticeSize]))
neighbours.append(copy.copy(self.lattice[row % self.latticeSize][(col + 1) % self.latticeSize]))
neighbours.append(copy.copy(self.lattice[(row + 1) % self.latticeSize][(col - 1) % self.latticeSize]))
neighbours.append(copy.copy(self.lattice[(row + 1) % self.latticeSize][col % self.latticeSize]))
neighbours.append(copy.copy(self.lattice[(row + 1) % self.latticeSize][(col + 1) % self.latticeSize]))
return neighbours
def currentLatticeStrategy(self):
if self.reComputeCurrentLatticeStrategy:
self.currentLatticeStrategyList = [p.strategy for sublist in self.lattice for p in sublist]
return self.currentLatticeStrategyList
def terminate(self, loop):
"""Determine whether a stable state has been reached and it's good to stop"""
logging.debug(np.std(self.coopLvlShortHistory))
return (loop > self.avgValue and np.std(self.coopLvlShortHistory) < self.minStdDev) or loop > self.maxLoops
def run(self):
logging.info('Starting to run simulator')
if not os.path.exists(self.directoryOutput):
os.makedirs(self.directoryOutput)
generateChart(self.currentLatticeStrategy(), True, False, './' + self.directoryOutput + '/initial', self.latticeSize)
loop = 0
self.lastLatticeStrategy = self.currentLatticeStrategy()
self.cooperationLevelHistory.append(self.cooperationLevel())
self.cooperationLevelCompleteHistory.append(self.cooperationLevel())
self.cooperationLevelData.append(reduce(lambda x, y: x + y, self.cooperationLevelHistory) / len(self.cooperationLevelHistory))
for i in range(self.latticeSize):
for j in range(self.latticeSize):
self.lattice[i][j].play()
self.computeNeighbourhoods()
while not self.terminate(loop):
self.lastLatticeStrategy = self.currentLatticeStrategy()
for i in range(self.latticeSize):
for j in range(self.latticeSize):
if self.updateType == imitate:
self.lattice[i][j].imitate()
if self.updateType == replicate:
self.lattice[i][j].replicate()
self.lattice[i][j].play()
self.computeNeighbourhoods()
self.reComputeCurrentLatticeStrategy = True
self.cooperationLevelHistory.append(self.cooperationLevel())
self.cooperationLevelCompleteHistory.append(self.cooperationLevel())
self.cooperationLevelData.append(reduce(lambda x, y: x + y, self.cooperationLevelHistory) / len(self.cooperationLevelHistory))
self.coopLvlShortHistory.append(self.cooperationLevel())
logging.debug('Iteration: '+ str(loop))
loop = loop + 1
logging.debug('Std deviation: ' + str(np.std(self.payoffs())))
logging.debug('Coop Level: ' + str(self.cooperationLevel()))
logging.debug('Std Dev - Coop Lvl: ' + str(np.std(self.coopLvlShortHistory)))
if loop % 25 == 0 or loop in (1, 5, 10, 20, 50):
generateChart(self.currentLatticeStrategy(), True, False,
'./' + self.directoryOutput + '/' + str(loop) + '-', self.latticeSize)
logging.info('Simulation finished')
self.saveSummary(loop)
def payoffs(self):
return [p.payoff for sublist in self.lattice for p in sublist]
def cooperationLevel(self):
return self.currentLatticeStrategy().count(coop) / len(self.currentLatticeStrategy())
def saveSummary(self, loop):
target = open('./' + self.directoryOutput + '/summary.out', 'w')
target.write(self.summary(loop))
target.close()
def summary(self, loop):
summary = 'Loops: ' + str(loop) + '\n'
summary += 'Avg payoff: ' + str(reduce(lambda x, y: x + y, self.payoffs()) / len(self.payoffs())) + '\n'
summary += 'Coop Lvl History: ' + str(self.cooperationLevelCompleteHistory) + '\n'
summary += 'Payoffs: ' + str(self.payoffs()) + '\n'
return summary
In [2]:
def generateChart(arr, save, plot, name, sze):
data = np.asarray([x == coop for x in arr]).reshape(sze,sze)
fig = plt.figure()
ax = fig.add_subplot(111)
ax.imshow(data)
ax.imshow(data, cmap='Greys', interpolation='nearest', vmin=False, vmax=True)
if save:
fig.savefig(name + '.png')
if plot:
fig.show()
plt.clf()
plt.close(fig)
In [3]:
def runSinglePrisionerDilemma(sz, neighType):
prisionersDilemmaGame = Game(2, np.array([((rwrd, rwrd), (suck, temp)),((temp, suck), (pnsh, pnsh))]), {coop: 0, dfct: 1})
sim = Simulator(sz, prisionersDilemmaGame, neighType, avgVal, imitate)
sim.run()
generateChart(sim.currentLatticeStrategy(), True, False, './' + sim.directoryOutput + '/final', sz)
# Plot cooperation level avg
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(sim.cooperationLevelData)
fig.savefig('./' + sim.directoryOutput + '/coopLvl.png')
plt.clf()
plt.close(fig)
return sim.cooperationLevelCompleteHistory
In [ ]:
avgVal = 100 # Average value used to measure the level of cooperation
size = 50 # Latice size
temp = 10 # Temptation payoff
rwrd = 7 # Reward payoff
suck = 0 # Sucker's payoff
pnsh = 0 # Punishment payoff
cooperationLvlData = []
for i in range(0, 100):
cooperationLvlData.append(runSinglePrisionerDilemma(size, moore))
coopLvlData = np.array(cooperationLvlData)
# Plot cooperation level complete avg
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(coopLvlData.mean(axis = 0))
fig.savefig('./coopLvl_' + strftime("%Y-%m-%d_%H-%M-%S", gmtime()) + '.png')
plt.clf()
plt.close(fig)
In [4]:
#Taken from: https://tonysyu.github.io/plotting-error-bars.html#.WDrwIuIrLVN
def errorfill(x, y, yerr, color=None, alpha_fill=0.3, ax=None):
ax = ax if ax is not None else plt.gca()
if color is None:
color = ax._get_lines.color_cycle.next()
if np.isscalar(yerr) or len(yerr) == len(y):
ymin = y - yerr
ymax = y + yerr
elif len(yerr) == 2:
ymin, ymax = yerr
ax.plot(x, y, color=color)
ax.fill_between(x, ymax, ymin, color=color, alpha=alpha_fill)
In [8]:
# Do cooperation levels have similar values?
fileName = '12_vn_coop_lvl_history'
myData = []
with open('./outputs/' + fileName + '.csv', 'r') as f:
reader = csv.reader(f)
for row in reader:
myData.append(list([float(i) for i in row]))
#with open('./outputs/coop.csv', 'r') as f:
# rdr = csv.reader(f)
# myData = list(rdr)
res = [[] for _ in range(max(len(sl) for sl in myData))]
for sl in myData:
for x, res_sl in zip(sl, res):
res_sl.append(x)
myData = np.array(res)
#myData = myData.astype(float)
for i in range(0, len(myData)):
myData[i] = np.array(myData[i])
logging.info(myData.shape[0])
mean = np.array([np.mean(d) for d in myData])
stdDev = np.array([np.std(d) for d in myData])
lowerBound = mean - 1 * stdDev
upperBound = mean + 1 * stdDev
errorRange = np.array([lowerBound, upperBound])
# Create a figure instance
fig = plt.figure(1, figsize=(9, 6))
#fig = plt.figure()
# Create an axes instance
ax = fig.add_subplot(111)
# Create the boxplot
#bp = ax.boxplot(myData)
x = range(0, myData.shape[0])
y = mean
e = errorRange
#ax.errorbar(x, y, yerr=e)
errorfill(x, y, e, 'blue', 0.3, ax)
# Save the figure
fig.savefig('./outputs/' + fileName + '.png', bbox_inches='tight')
plt.clf()
plt.close(fig)
In [ ]:
avgVal = 100 # Average value used to measure the level of cooperation
size = 8 # Latice size
temp = 10 # Temptation payoff
rwrd = 7 # Reward payoff
suck = 0 # Sucker's payoff
pnsh = 0 # Punishment payoff
for i in (60, 80):
for k in (moore, vonNewmann):
for j in range(0, 8):
size = i
runSinglePrisionerDilemma(size, k)
In [5]:
def runSingleSnowdrift(sz, neighType):
snowdriftGame = Game(2, np.array([((rwrd, rwrd), (suck, temp)),((temp, suck), (pnsh, pnsh))]), {coop: 0, dfct: 1})
sim = Simulator(sz, snowdriftGame, neighType, avgVal, replicate)
sim.run()
generateChart(sim.currentLatticeStrategy(), True, False, './' + sim.directoryOutput + '/final', sz)
# Plot cooperation level avg
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(sim.cooperationLevelData)
fig.savefig('./' + sim.directoryOutput + '/coopLvl.png')
plt.clf()
plt.close(fig)
return sim.cooperationLevelCompleteHistory
In [6]:
avgVal = 100 # Average value used to measure the level of cooperation
size = 50 # Latice size
temp = 10 # Temptation payoff
rwrd = 7 # Reward payoff
suck = 3 # Sucker's payoff
pnsh = 0 # Punishment payoff
cooperationLvlData = []
for i in range(0, 100):
cooperationLvlData.append(runSingleSnowdrift(size, vonNewmann))
coopLvlData = np.array(cooperationLvlData)
# Plot cooperation level complete avg
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(coopLvlData.mean(axis = 0))
fig.savefig('./coopLvl_' + strftime("%Y-%m-%d_%H-%M-%S", gmtime()) + '.png')
plt.clf()
plt.close(fig)
In [9]:
temp = 10 # Temptation payoff
rwrd = 7 # Reward payoff
suck = 3 # Sucker's payoff
pnsh = 0 # Punishment payoff
for i in (4, 8, 12, 20):
for k in (moore, vonNewmann):
for j in range(0, 8):
size = i
runSingleSnowdrift(size, k)
In [ ]: