In [ ]:
%matplotlib inline
import time
import puzzle
import csv
import matplotlib.pyplot as plt
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import RMSprop
import numpy as np
import random
MAX_EPISODES=20
INDEX_EPISODE=0
INDEX_EPOCH=1
INDEX_REWARD=2
INDEX_LOSS=3
INDEX_SCORE=4
INDEX_INCREMENT=5
INDEX_LOST=6
INDEX_LAST_STATE=7
INDEX_ACTION=8
INDEX_CURRENT_STATE=9
INDEX_WEIGHTS=10
INDEX_Q=7
INDEX_EXPLORATION=8
INDEX_STATIONARY=9
LOG_LOCATION='../logs/log.csv'
LOG_ITERATION=100 # Logs after this many global iterations
STATIONARY_PENALTY=8 # Penalty for making an illegal move i.e. the states didn't change after an iteration
#ACCESS SCORE AS self.game.score
#ACCESS MATRIX AS self.game.matrix
#DECIDE ACTION TO TAKE IN act()
#POSSIBLE ACTIONS:
# go up: "'w'"
# go left: "'a'"
# go right: "'s'"
# go down: "'d'"
class Machine:
game=puzzle.GameGrid() # Game object
epoch=0
episode=0
loss=0
reward=0
exploration=False
verbose_logging=False
weight_logging=False
debug_logging=False
action="'w'"
model = Sequential()
inputVector=np.zeros((1, 16))
lastState=np.zeros((1, 16))
Qvalues0=np.zeros((1,4))
Qvalues1=np.zeros((1,4))
acts = ["'w'","'s'","'d'","'a'"]
act2index = {"'w'": 0, "'s'": 1, "'d'": 2, "'a'":3}
gamma = 0.9 # Discount rate
epsilon = 0.99 # Exploration rate
iteration = 0
log_iteration=0
dump=False
buffer=np.zeros((1,7))
bufferShape=np.zeros((1,7))
def __init__(self, verbose_logging_in=False,weight_logging_in=False,debug_logging_in=False):
# Short time memory
self.memory = deque(maxlen=100000)
self.gamma = 0.9 # discount rate
self.e_decay = .99
self.e_min = 0.05
self.dump=False
self.debug_logging=debug_logging_in
self.verbose_logging=verbose_logging_in
self.weight_logging=weight_logging_in
# Create model
self.model.add(Dense(20, input_dim=16, activation='tanh')) # Adds the first layer with 16 inputs
self.model.add(Dense(20, activation='tanh')) # Adds Hidden layer with 20 nodes (Removed 'uniform')
self.model.add(Dense(20, activation='tanh')) # Adds Hidden layer with 20 nodes (Removed 'uniform')
self.model.add(Dense(20, activation='tanh')) # Adds Hidden layer with 20 nodes (Removed 'uniform')
self.model.add(Dense(20, activation='tanh')) # Adds Hidden layer with 20 nodes (Removed 'uniform')
self.model.add(Dense(4, activation='linear')) # Adds output layer with 20 nodes
self.model.compile(loss='mse',optimizer=RMSprop(lr=0.01)) # Creates the model from all of the above
# Initialise log
with open('../logs/log.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
if self.verbose_logging and self.weight_logging:
writer.writerow(["EPISODE","EPOCH","REWARD","LOSS","TOTAL SCORE","INCREMENT IN SCORE","LOST","LAST STATE","ACTION","CURRENT STATE","WEIGHTS"])
self.bufferShape=np.zeros((1,11))
elif self.verbose_logging:
writer.writerow(["EPISODE","EPOCH","LAST STATE","ACTION","CURRENT STATE","REWARD","LOSS","TOTAL SCORE","INCREMENT IN SCORE","LOST"])
self.bufferShape=np.zeros((1,10))
elif self.debug_logging:
writer.writerow(["EPISODE","EPOCH","REWARD","LOSS","TOTAL SCORE","INCREMENT IN SCORE","LOST","Q","EXPLORATION","STATIONARY"])
else:
writer.writerow(["EPISODE","EPOCH","REWARD","LOSS","TOTAL SCORE","INCREMENT IN SCORE","LOST"])
self.bufferShape=np.zeros((1,7))
self.buffer=self.bufferShape
self.buffer=self.buffer.tolist()
def run(self):
# Transform game state to 1D array
for i in range(4):
self.inputVector[0][0+4*i:4+4*i]=self.game.matrix[i]
# Increase counters
self.epoch=self.epoch+1 # Increase epoch
self.iteration=self.iteration+1 # Increase global iteration
self.log_iteration=self.log_iteration+1
# Update Machine
self.log() # Log model
self.game.increment=self.get_reward() # Update reward if game has been lost
self.game.key_down(self.act()) # Select action and update weights
self.lastState[0]=self.inputVector[0]
self.remember() # Remember previous actions
# Game loop
self.game.update_idletasks
self.game.update()
# pool emaG
def act(self):
self.exploration=False
if random.random() >= self.epsilon: # Exploration
self.exploration=True
self.action = self.acts[random.randint(0,3)]
return self.action
else:
# Predict Q values of current state
self.Qvalues1=self.gamma*self.model.predict(self.inputVector)+self.game.increment
# Extract Q value of the state
Q1=np.amax(self.Qvalues1)
#Construct target vector
self.Qvalues1=self.Qvalues0
self.Qvalues1[0][self.act2index[self.action]]=Q1 # Q0 with Q1 in the last step's action
# Update weights with respect to last step's prediction of this step's Q values
self.loss=self.model.train_on_batch(self.lastState, self.Qvalues1)
# Select action with highest Q value
self.Qvalues0=self.model.predict(self.inputVector)
self.action=self.acts[self.Qvalues0.argmax()] # Don't delete this variable, it's used when logging
return self.action
# memorize state; action; reward; next state ??inputVector == next state??
def remember(self):
self.memory.append((self.lastState, self.Qvalues1.argmax(), self.reward, self.inputVector, self.game.result))
# replay some of the actions at the end of training
def replay(self, batch_size, state_size, action_size):
batch_size = min(batch_size, len(self.memory))
minibatch = random.sample(self.memory, batch_size)
X = np.zeros((batch_size, state_size))
Y = np.zeros((batch_size, action_size))
for i in range(batch_size):
state, action, reward, next_state, done = minibatch[i]
target = self.model.predict(state)[0]
if done:
target[action] = reward
else:
target[action] = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
X[i], Y[i] = state, target
self.model.fit(X, Y, batch_size=batch_size, epochs=1, verbose=0)
if self.epsilon > self.e_min:
self.epsilon *= self.e_decay
def log(self):
if self.weight_logging:
for layer in self.model.layers:
weights = layer.get_weights() # list of numpy arrays
if self.weight_logging and self.verbose_logging:
self.buffer.append([self.episode,self.epoch,self.reward,self.loss,self.game.score,self.game.increment,self.game.result,self.lastState,self.action,self.inputVector,weights])
elif self.verbose_logging:
self.buffer.append([self.episode,self.epoch,self.reward,self.loss,self.game.score,self.game.increment,self.game.result,self.lastState,self.action,self.inputVector])
elif self.debug_logging:
stat=np.array_equal(self.lastState,self.inputVector)
self.buffer.append([self.episode,self.epoch,self.reward,self.loss,self.game.score,self.game.increment,self.game.result,self.Qvalues0[0],self.exploration,stat])
else:
self.buffer.append([self.episode,self.epoch,self.reward,self.loss,self.game.score,self.game.increment,self.game.result])
if (self.log_iteration == LOG_ITERATION) or self.dump:
with open(LOG_LOCATION, 'a', newline='') as csvfile:
self.buffer=self.buffer[1:] # Remove first line of zeros
writer = csv.writer(csvfile)
for row in self.buffer:
writer.writerow(row)
# Reset buffer
self.buffer=self.bufferShape
self.buffer=self.buffer.tolist()
self.log_iteration=0
self.dump=False
def dump_logs(self):
self.dump=True
self.log()
def plot(self):
with open(LOG_LOCATION,newline='') as csvfile:
reader=csv.reader(csvfile)
# Transform reader to array
data=list(reader)
# Allocate arrays
x=[]
y=[]
# Get rid of labels
a=data.pop(0)
for row in data:
# Search for lost games
if row[INDEX_LOST]=="True": # Needed because it's read as a string and not a bool
x.append(row[INDEX_EPISODE]) # Episode
y.append(row[INDEX_SCORE]) # Total score
# Plot results
fig, ax = plt.subplots()
ax.plot(x, y)
def get_reward(self):
l=0
if np.array_equal(self.lastState,self.inputVector): # The agent didn't make a legal move
l=l-STATIONARY_PENALTY
if self.game.result: # If the agent lost
l = -2*self.game.increment-self.game.score
# Reset game
self.game.reset()
self.epoch=0
self.episode=self.episode+1
if (self.episode % 10 == 0):
print('Episode %d of %d' %(self.episode,MAX_EPISODES))
self.reward=self.game.increment + l # Don't delete this variable, it's used for logging
return (self.reward)
our_machine = Machine(debug_logging_in=True)
while our_machine.episode<MAX_EPISODES:
our_machine.run()
# replay previous actions
our_machine.replay(32, 16, 4)
our_machine.dump_logs()
our_machine.plot()
# restart Kernel
import IPython, time
time.sleep(2)
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
In [ ]: