In [1]:
# operating modes drive which nets are loaded, trained
TURN = 1
AVOID = 2
ACQUIRE = 3
HUNT = 4
PACK = 5
cur_mode = PACK
use_existing_model = True
show_sensors = True # Showing sensors and redrawing slows things down.
draw_screen = True
# object settings
NUM_OBSTACLES = 6
NUM_CATS = 4
NUM_DRONES = 2
# turn model settings
TURN_NUM_SENSOR = 5 # front five sonar readings
TURN_TOTAL_SENSORS = 10 # distance and color values from sonar readings
TURN_NUM_OUTPUT = 5 # do nothing, two right turn, two left turn
TURN_STATE_FRAMES = 3
TURN_NUM_INPUT = TURN_STATE_FRAMES * TURN_TOTAL_SENSORS
# avoid model settings
AVOID_NUM_SENSOR = 7 # front five, rear two sonar distance readings
AVOID_TOTAL_SENSORS = 16 # distance and color values from sonar readings, turn action, current speed
AVOID_NUM_OUTPUT = 3 # 30, 50, 70
AVOID_STATE_FRAMES = 3
AVOID_NUM_INPUT = AVOID_STATE_FRAMES * AVOID_TOTAL_SENSORS
SPEEDS = [30,50,70]
# acquire model settings
ACQUIRE_NUM_SENSOR = 2 # distance, angle
ACQUIRE_TOTAL_SENSORS = 2
ACQUIRE_NUM_OUTPUT = 5 # nothing, 2 right turn, 2 left turn
ACQUIRE_STATE_FRAMES = 2
ACQUIRE_NUM_INPUT = ACQUIRE_STATE_FRAMES * ACQUIRE_NUM_SENSOR
# hunt model settings
HUNT_NUM_SENSOR = 7 # all sonar distance / color readings
HUNT_AVOID = 0
HUNT_ACQUIRE = 1
HUNT_TOTAL_SENSORS = 16 # seven sonar distance + color, target distance + heading
HUNT_NUM_OUTPUT = 2 # avoid (model), acquire (model)
HUNT_STATE_FRAMES = 3
HUNT_NUM_INPUT = HUNT_STATE_FRAMES * HUNT_TOTAL_SENSORS
# pack model settings
NUM_TARGETS = 1
TARGET_RADIUS = 14
DRONE_NUM_SENSOR = (2 * NUM_OBSTACLES) + (2 * NUM_CATS) + (2 * NUM_DRONES) + (2 * NUM_TARGETS)
# old model: 7 all sonar distance readings, subsequently reduced to four compass readings
DRONE_TOTAL_SENSORS = DRONE_NUM_SENSOR
PACK_TOTAL_SENSORS = DRONE_TOTAL_SENSORS
# these are radian adjustments to first (lhs) and second (rhs) to heading to target. pos is left, neg in right.
PACK_HEADING_ADJUST = [[0,0],[1,0],[-1,0],[0,1],[0,-1],[1,1],[-1,-1],[1,-1],[-1,1]]
PACK_NUM_OUTPUT = 9
PACK_STATE_FRAMES = 5
PACK_EVAL_FRAMES = PACK_STATE_FRAMES
PACK_NUM_INPUT = PACK_TOTAL_SENSORS
START_PACK_ACTION = 0
START_DRONE_ID = 0
# neural net parms
BATCHSIZE = 100
PACK_STATEFUL_BATCH_SIZE = 1
TRAIN_BUFFER = 50000
PACK_TRAIN_BUFFER = 1
# initial settings
START_SPEED = 50
START_TURN_ACTION = 0
START_SPEED_ACTION = 1
START_DISTANCE = 1
Section below defines Keras / Theano neural network schemas. Given function decomp, networks have max three hidden layers. LTSM not implemented here. Network "memory" managed in learning module via appended state frames. Base design comes from: http://outlace.com/Reinforcement-Learning-Part-3/.
In [2]:
from keras.models import Sequential
from keras.layers.core import Dense, Activation, Dropout
from keras.optimizers import RMSprop
from keras.layers.recurrent import LSTM
from keras.callbacks import Callback
class LossHistory(Callback):
def on_train_begin(self, logs={}):
self.losses = []
def on_batch_end(self, batch, logs={}):
self.losses.append(logs.get('loss'))
def turn_net(num_inputs, params, num_outputs, load=''):
model = Sequential()
# First layer.
model.add(Dense(params[0], init='lecun_uniform', input_shape=(num_inputs,)))
model.add(Activation('relu'))
model.add(Dropout(0.2))
# Second layer.
model.add(Dense(params[1], init='lecun_uniform'))
model.add(Activation('relu'))
model.add(Dropout(0.2))
# Output layer.
model.add(Dense(num_outputs, init='lecun_uniform'))
model.add(Activation('linear'))
rms = RMSprop()
model.compile(loss='mse', optimizer=rms)
if load:
model.load_weights(load)
return model
def avoid_net(num_inputs, params, num_outputs, load=''):
model = Sequential()
# First layer.
model.add(Dense(params[0], init='lecun_uniform', input_shape=(num_inputs,)))
model.add(Activation('relu'))
model.add(Dropout(0.2))
# Second layer.
model.add(Dense(params[1], init='lecun_uniform'))
model.add(Activation('relu'))
model.add(Dropout(0.2))
# Third layer.
model.add(Dense(params[2], init='lecun_uniform'))
model.add(Activation('relu'))
model.add(Dropout(0.3))
# Output layer.
model.add(Dense(num_outputs, init='lecun_uniform'))
model.add(Activation('linear'))
rms = RMSprop()
model.compile(loss='mse', optimizer=rms)
if load:
model.load_weights(load)
return model
def acquire_net(num_inputs, params, num_outputs, load=''):
model = Sequential()
# First layer.
model.add(Dense(params[0], init='lecun_uniform', input_shape=(num_inputs,)))
model.add(Activation('relu'))
model.add(Dropout(0.2))
# Output layer.
model.add(Dense(num_outputs, init='lecun_uniform'))
model.add(Activation('linear'))
rms = RMSprop()
model.compile(loss='mse', optimizer=rms)
if load:
model.load_weights(load)
return model
def hunt_net(num_inputs, params, num_outputs, load=''):
model = Sequential()
# First layer.
model.add(Dense(params[0], init='lecun_uniform', input_shape=(num_inputs,)))
model.add(Activation('relu'))
model.add(Dropout(0.3))
# Second layer.
model.add(Dense(params[1], init='lecun_uniform'))
model.add(Activation('relu'))
model.add(Dropout(0.3))
# Third layer.
model.add(Dense(params[2], init='lecun_uniform'))
model.add(Activation('relu'))
model.add(Dropout(0.3))
# Output layer.
model.add(Dense(num_outputs, init='lecun_uniform'))
model.add(Activation('linear'))
rms = RMSprop()
model.compile(loss='mse', optimizer=rms)
if load:
model.load_weights(load)
return model
# implementation of a stateful LSTM RNN
'''A stateful recurrent model is one for which the internal states (memories) obtained after
processing a batch of samples are reused as initial states for the samples of the next batch.
This allows to process longer sequences while keeping computational complexity manageable.'''
# expected input batch shape: (batch_size, timesteps, data_dim)
# note that we have to provide the full batch_input_shape since the network is stateful.
# the sample of index i in batch k is the follow-up for the sample i in batch k-1.
def pack_net(batchsize, num_frames, num_inputs, params, num_outputs, load=''):
model = Sequential()
model.add(LSTM(params[0], batch_input_shape=(batchsize, num_frames, num_inputs),
return_sequences=True, stateful=True))
# model.add(LSTM(params[0], batch_input_shape=(batchsize, frames, num_inputs)
# return_sequences=True, if you want this layer to feed the next layer w/ frames/batch intact
# try stateful = True w/ process_pack_minibatch to format x and y in batch context
# If a RNN is stateful, a complete input_shape must be provided (including batch size)
# init='lecun_uniform'?
# activation='sigmoid' (or 'relu', 'linear'), inner_activation='hard_sigmoid'
# add dropout
model.add(LSTM(params[1], return_sequences=True, stateful=True))
model.add(LSTM(params[2], stateful=True))
model.add(Dense(num_outputs, activation='softmax'))
# for multi-class classification http://keras.io/getting-started/sequential-model-guide/
rms = RMSprop()
model.compile(loss='mse', optimizer=rms)
#model.compile(loss='categorical_crossentropy', optimizer = 'rmsprop') #, metrics=['accuracy']
if load:
model.load_weights(load)
return model
In [3]:
def load_models():
turn_model = turn_model_30 = turn_model_50 = turn_model_70 = avoid_model = acquire_model = acquire_model_30 = acquire_model_50 = acquire_model_70 = hunt_model = pack_model = 0
if cur_mode in [TURN, AVOID, HUNT, PACK]:
nn_param = [TURN_NUM_INPUT*25, TURN_NUM_INPUT*10]
params = {"batchSize": BATCHSIZE, "buffer": TRAIN_BUFFER, "nn": nn_param}
if cur_mode == TURN and use_existing_model == False:
turn_model = turn_net(TURN_NUM_INPUT, nn_param, TURN_NUM_OUTPUT)
else:
saved_model = 'models/turn/saved/turn-750-300-100-50000-30-600000.h5'
turn_model_30 = turn_net(TURN_NUM_INPUT, nn_param, TURN_NUM_OUTPUT, saved_model)
saved_model = 'models/turn/saved/turn-750-300-100-50000-50-600000.h5'
turn_model_50 = turn_net(TURN_NUM_INPUT, nn_param, TURN_NUM_OUTPUT, saved_model)
saved_model = 'models/turn/saved/turn-750-300-100-50000-70-600000.h5'
turn_model_70 = turn_net(TURN_NUM_INPUT, nn_param,TURN_NUM_OUTPUT, saved_model)
if cur_mode in [AVOID, HUNT, PACK]:
nn_param = [AVOID_NUM_INPUT * 25, AVOID_NUM_INPUT * 5, AVOID_NUM_INPUT]
params = {"batchSize": BATCHSIZE, "buffer": TRAIN_BUFFER, "nn": nn_param}
if cur_mode == AVOID and use_existing_model == False:
avoid_model = avoid_net(AVOID_NUM_INPUT, nn_param, AVOID_NUM_OUTPUT)
else:
saved_model = 'models/avoid/saved/avoid-1200-240-48-100-50000-700000-old-3L-2022.h5'
avoid_model = avoid_net(AVOID_NUM_INPUT, nn_param, AVOID_NUM_OUTPUT, saved_model)
if cur_mode in [ACQUIRE, HUNT, PACK]:
nn_param = [ACQUIRE_NUM_INPUT * 15, ACQUIRE_NUM_INPUT * 5]
params = {"batchSize": BATCHSIZE, "buffer": TRAIN_BUFFER, "nn": nn_param}
if cur_mode == ACQUIRE and use_existing_model == False:
acquire_model = acquire_net(ACQUIRE_NUM_INPUT, nn_param, ACQUIRE_NUM_OUTPUT)
else:
saved_model = 'models/acquire/saved/acquire-60-20-100-50000-50-350000.h5'
# using 50 until time to re-train at 30
acquire_model_30 = acquire_net(ACQUIRE_NUM_INPUT, nn_param,
ACQUIRE_NUM_OUTPUT, saved_model)
saved_model = 'models/acquire/saved/acquire-60-20-100-50000-50-350000.h5'
acquire_model_50 = acquire_net(ACQUIRE_NUM_INPUT, nn_param,
ACQUIRE_NUM_OUTPUT, saved_model)
saved_model = 'models/acquire/saved/acquire-60-20-100-50000-70-350000.h5'
acquire_model_70 = acquire_net(ACQUIRE_NUM_INPUT, nn_param,
ACQUIRE_NUM_OUTPUT, saved_model)
if cur_mode in [HUNT, PACK]:
nn_param = [HUNT_NUM_INPUT * 25, HUNT_NUM_INPUT * 5, HUNT_NUM_INPUT]
params = {"batchSize": BATCHSIZE, "buffer": TRAIN_BUFFER, "nn": nn_param}
if cur_mode == HUNT and use_existing_model == False:
hunt_model = hunt_net(HUNT_NUM_INPUT, nn_param, HUNT_NUM_OUTPUT)
else:
saved_model = 'models/hunt/saved/hunt-1200-240-48-100-50000-300000-40-50-avoid.h5'
hunt_model = hunt_net(HUNT_NUM_INPUT, nn_param, HUNT_NUM_OUTPUT, saved_model)
if cur_mode == PACK:
nn_param = [PACK_NUM_INPUT * 20, PACK_NUM_INPUT * 5, PACK_NUM_INPUT]
params = {"batchSize": PACK_STATEFUL_BATCH_SIZE, "buffer": PACK_TRAIN_BUFFER, "nn": nn_param}
if cur_mode == PACK and use_existing_model == False:
pack_model = pack_net(PACK_STATEFUL_BATCH_SIZE, PACK_STATE_FRAMES,
PACK_NUM_INPUT, nn_param, PACK_NUM_OUTPUT)
else:
saved_model = 'models/pack/saved/pack-520-130-26-1-1-1000000.h5'
pack_model = pack_net(PACK_STATEFUL_BATCH_SIZE, PACK_STATE_FRAMES,
PACK_NUM_INPUT, nn_param, PACK_NUM_OUTPUT, saved_model)
return turn_model, turn_model_30, turn_model_50, turn_model_70, avoid_model, acquire_model, acquire_model_30, acquire_model_50, acquire_model_70, hunt_model, pack_model, params
Establishes the Pygame / Pymunk game state including the board, walls, objects and their characteristics i.e., gravity, elasticity, etc. Pygame "movements" are made by updating the "screen" (the bottom graphic level) and image "surfaces" (layers) above that level. Bind the "surfaces" to the screen by "blit"-ing. Then "flip" the combinded package to print it to the output screen. So, you'll see screen here. You'll also see a variety of surfaces (sometimes called "grids") to accomplish those layers.
Defines changes for each frame (or step). For each frame (or flip or step) of the game, the drone and obstacles are moved based on trajectories and speeds either determined by the game parameters (in the case of "obstacles" and "cats") or by the neural networks (in the case of "drones". Drone moves are in response to game states output from the function below to the neural networks. Game state variables returned depend on the network being trained. For example, the "turn" network uses "sonar" sensor readings emitted from the drone as it moves. Those sensor readings capture the distance to objects and the color of the objects detected. The network learns risk and specific evasive actions accordingly.
Note: while python assumes (0,0) is in the lower left of the screen, pygame assumes (0,0) in the upper left. Therefore, y+ moves DOWN the y axis. Here is example code that illustrates how to handle angles in that environment: https://github.com/mgold/Python-snippets/blob/master/pygame_angles.py. In this implementation, I have flipped the screen so that Y+ moves UP the screen.
In [4]:
import random
import math
import numpy as np
import pygame
from pygame.color import THECOLORS
import pymunk
from pymunk.vec2d import Vec2d
from pymunk.pygame_util import draw
import time
from math import atan2, degrees, pi, sqrt
# ***** initialize variables *****
# PyGame init
width = 1000
height = 700
pygame.init()
clock = pygame.time.Clock()
# display surface
screen = pygame.display.set_mode((width, height))
BACK_COLOR = "black"
WALL_COLOR = "red"
DRONE_COLOR = "green"
SMILE_COLOR = "blue"
OBSTACLE_COLOR = "purple"
CAT_COLOR = "orange"
DRONE_BODY_DIAM = 12
SONAR_ARM_LEN = 20
OBSTACLE_SIZES = [60, 60, 60]
# acquire model settngs
target_grid = pygame.Surface((width, height), pygame.SRCALPHA, 32)
target_grid.convert_alpha()
path_grid = pygame.Surface((width, height))
PATH_COLOR = "grey"
ACQUIRE_PIXEL_COLOR = "green"
ACQUIRED_PIXEL_COLOR = "yellow"
ACQUIRE_PIXEL_SIZE = 2
if cur_mode == ACQUIRE:
ACQUIRE_PIXEL_SEPARATION = 25
ACQUIRE_MARGIN = 50
else:
ACQUIRE_PIXEL_SEPARATION = 5
ACQUIRE_MARGIN = 75
# hunt model settings
STATS_BUFFER = 2000
# Turn off alpha since we don't use it.
screen.set_alpha(None)
# ***** instantiate the game *****
class GameState:
def __init__(self):
# initialize space
self.space = pymunk.Space()
self.space.gravity = pymunk.Vec2d(0., 0.)
# initialize counters
self.total_frame_ctr = 0
self.replay_frame_ctr = 0
self.acquire_frame_ctr = 0
self.num_off_scrn = 0
# create drones
self.drones = []
for drone_id in range(NUM_DRONES):
self.drones.append(self.create_drone(random.randint(400,600),
random.randint(300,400), 0.5))
self.last_x = np.empty([NUM_DRONES,1])
self.last_y = np.empty([NUM_DRONES,1])
for drone_id in range(len(self.drones)):
x, y = self.drones[drone_id].position
self.last_x[drone_id] = x + 2; self.last_y[drone_id] = y + 2
# create walls
static = [pymunk.Segment(self.space.static_body,(0, 1), (0, height), 1),
pymunk.Segment(self.space.static_body,(1, height), (width, height), 1),
pymunk.Segment(self.space.static_body,(width-1, height), (width-1, 1), 1),
pymunk.Segment(self.space.static_body,(1, 1), (width, 1), 1)]
for s in static:
s.friction = 1.
s.group = 1
s.collision_type = 1
s.color = THECOLORS[WALL_COLOR]
self.space.add(static)
if cur_mode in [TURN, AVOID, HUNT, PACK]:
self.obstacles = []
self.cats = []
if cur_mode in [TURN, AVOID, HUNT]:
# create slow, randomly moving, larger obstacles
self.obstacles.append(self.create_obstacle(random.randint(100, width-100),
random.randint(70, height-100),50))
self.obstacles.append(self.create_obstacle(random.randint(100, width-100),
random.randint(70, height-70),50))
self.obstacles.append(self.create_obstacle(random.randint(100, width-100),
random.randint(70, height-70),63))
self.obstacles.append(self.create_obstacle(random.randint(100, width-100),
random.randint(70, height-70),63))
self.obstacles.append(self.create_obstacle(random.randint(100, width-100),
random.randint(70, height-70),30))
self.obstacles.append(self.create_obstacle(random.randint(100, width-100),
random.randint(70, height-70),30))
# create faster, randomly moving, smaller obstacles a.k.a. "cats"
self.cats.append(self.create_cat(width-950,height-100))
self.cats.append(self.create_cat(width-50,height-600))
self.cats.append(self.create_cat(width-50,height-100))
self.cats.append(self.create_cat(width-50,height-600))
if cur_mode in [ACQUIRE, HUNT, PACK]:
# set up seach grid and feed first target
self.target_inventory = []
self.acquired_targets = []
self.current_targets = []
self.target_radius = TARGET_RADIUS
self.generate_targets(True)
for targets in range(NUM_TARGETS):
self.assign_target(True, drone_id)
self.target_acquired = False
self.obs_adds = 0
# initialize structures used to track efficiency of EACH move
# distance to target
self.last_tgt_dist = np.empty([NUM_DRONES, NUM_TARGETS])
self.last_tgt_dist.fill(350) # last target dist held in array for ea drone
tmp = [0.5, 1]; self.tgt_deltas = [] # deltas held in list for ea drone
for drone_id in range(NUM_DRONES): self.tgt_deltas.append(tmp)
# distance to obstacles
self.last_obs_dist = np.empty([NUM_DRONES, NUM_TARGETS])
self.last_obs_dist.fill(10) # last obstacle dist held in array for ea drone
tmp = [10, 12]; self.obs_dists = [] # distances held in list for ea drone
for drone_id in range(NUM_DRONES): self.obs_dists.append(tmp)
# initialize structures to track efficiency of PACK_EVAL_FRAMES moves
if cur_mode == PACK:
# distance to target
self.last_pack_tgt_dist = np.empty([NUM_DRONES, NUM_TARGETS])
self.last_pack_tgt_dist.fill(350) # last target dist held in array for ea drone
tmp = [0.4, 0.7]; self.pack_tgt_deltas = [] # deltas held in list for ea drone
for drone_id in range(NUM_DRONES): self.pack_tgt_deltas.append(tmp)
# distance to obstacles
self.last_pack_obs_dist = np.empty([NUM_DRONES, NUM_TARGETS])
self.last_pack_obs_dist.fill(10) # last obstacle dist held in array for ea drone
tmp = [10, 12]; self.pack_obs_dists = [] # distances held in list for ea drone
for drone_id in range(NUM_DRONES): self.pack_obs_dists.append(tmp)
# starting positions
self.pack_cum_rwds = np.zeros([NUM_DRONES, 1])
self.start_positions = [(25,25), (25,675), (975,25), (975,650)]
self.start_angles = [0.8, -0.8, 2.5, 3.9]
# ***** primary logic controller for game play *****
def frame_step(self, drone_id, turn_action, speed_action, pack_action, cur_speed, total_ctr, replay_ctr):
self.total_frame_ctr = total_ctr
self.replay_frame_ctr = replay_ctr
self.acquire_frame_ctr += 1
# turn drone based on current (active) model prediction
if cur_mode in [TURN, AVOID, ACQUIRE, HUNT, PACK]:
self.set_turn(turn_action, drone_id)
# set speed based on active model prediction
if cur_mode in [AVOID, HUNT, PACK]: # setting speed values directly see SPEEDS
cur_speed = self.set_speed(speed_action, drone_id)
# effect move by applying speed and direction as vector on self
driving_direction = Vec2d(1, 0).rotated(self.drones[drone_id].angle)
self.drones[drone_id].velocity = cur_speed * driving_direction
x, y = self.drones[drone_id].position
# set heading adjustment based on pack model output
if cur_mode == PACK:
heading_adjust = self.set_pack_adjust(pack_action)[drone_id]
else:
heading_adjust = 0
# move obstacles
if cur_mode in [TURN, AVOID, HUNT, PACK]:
if cur_mode != PACK or self.total_frame_ctr > 400: # 400000
if self.total_frame_ctr % 20 == 0: # 20x more stable than drone
self.move_obstacles(1)
if self.total_frame_ctr % 40 == 0: # 40x more stable than drone
self.move_cats(1)
elif cur_mode == PACK and self.total_frame_ctr > 300: # 300000
# slow obstacles
if self.total_frame_ctr % 30 == 0:
self.move_obstacles(2)
# fast obstacles
if self.total_frame_ctr % 60 == 0:
self.move_cats(3)
elif cur_mode == PACK and self.total_frame_ctr > 200: # 200000
if self.total_frame_ctr % 40 == 0:
self.move_obstacles(3)
if self.total_frame_ctr % 80 == 0:
self.move_cats(6)
# update the screen and surfaces
if drone_id == 0:
screen.fill(pygame.color.THECOLORS[BACK_COLOR])
if cur_mode in [ACQUIRE, HUNT, PACK]:
# draw the path drone has taken on the path grid
if self.acquire_frame_ctr / NUM_DRONES > 1.5:
pygame.draw.lines(path_grid, pygame.color.THECOLORS[PATH_COLOR], True,
((self.last_x[drone_id], height - self.last_y[drone_id]),
(x, height - y)), 1)
# if last drone, bind paths, targets to the screen
if drone_id == (NUM_DRONES - 1):
screen.blit(path_grid, (0,0))
screen.blit(target_grid, (0,0))
# display screen
draw(screen, self.space)
self.space.step(1./10) # one pixel for every 10 SPEED
if draw_screen:
pygame.display.flip()
# get readings, build states
self.last_x[drone_id] = x; self.last_y[drone_id] = y
x, y = self.drones[drone_id].position
turn_state, avoid_state, acquire_state, hunt_state, drone_state, min_sonar_dist, avoid_move_efficiency, acquire_move_efficiency = \
self.build_states(drone_id, turn_action, heading_adjust, cur_speed)
# calc rewards based on training mode(s) in effect
reward = self.calc_rwd(drone_id, min_sonar_dist, driving_direction, cur_speed, avoid_move_efficiency, acquire_move_efficiency)
#self.total_frame_ctr += 1
clock.tick()
return turn_state, avoid_state, acquire_state, hunt_state, drone_state, reward, cur_speed
# ***** turn and speed model functions *****
def create_obstacle(self, x, y, r):
obs_body = pymunk.Body(pymunk.inf, pymunk.inf)
obs_shape = pymunk.Circle(obs_body, r)
obs_shape.elasticity = 1.0
obs_body.position = x, y
obs_shape.color = THECOLORS[OBSTACLE_COLOR]
self.space.add(obs_body, obs_shape)
return obs_body
def create_cat(self,x,y):
inertia = pymunk.moment_for_circle(1, 0, 14, (0, 0))
cat_body = pymunk.Body(1, inertia)
cat_body.position = x, y
cat_shape = pymunk.Circle(cat_body, 20)
cat_shape.color = THECOLORS[CAT_COLOR]
cat_shape.elasticity = 1.0
cat_shape.angle = 0.5
direction = Vec2d(1, 0).rotated(cat_body.angle)
self.space.add(cat_body, cat_shape)
return cat_body
def create_drone(self, x, y, r):
inertia = pymunk.moment_for_circle(1, 0, 14, (0, 0))
drone_body = pymunk.Body(1, inertia)
drone_body.position = x, y
drone_shape = pymunk.Circle(drone_body, DRONE_BODY_DIAM) # was 25
drone_shape.color = THECOLORS[DRONE_COLOR]
drone_shape.elasticity = 1.0
drone_body.angle = r
driving_direction = Vec2d(1, 0).rotated(drone_body.angle)
drone_body.apply_impulse(driving_direction)
self.space.add(drone_body, drone_shape)
return drone_body
def move_obstacles(self, divisor):
# randomly moves large, slow obstacles around
if len(self.obstacles) > 0:
for obstacle in self.obstacles:
speed = int(random.randint(10, 15)/divisor)
direction = Vec2d(1, 0).rotated(self.drones[0].angle + random.randint(-2, 2))
obstacle.velocity = speed * direction
def move_cats(self, divisor):
# randomly moves small, fast obstacles
if len(self.cats) > 0:
for cat in self.cats:
speed = int(random.randint(60, 80)/divisor)
direction = Vec2d(1, 0).rotated(random.randint(-3, 3))
x, y = cat.position
if x < 0 or x > width or y < 0 or y > height:
cat.position = int(width/2), int(height/2)
cat.velocity = speed * direction
def set_turn(self, turn_action, drone_id):
# action == 0 is continue current trajectory
if turn_action == 1: # slight right adjust to current trajectory
self.drones[drone_id].angle -= .2
elif turn_action == 2: # hard right
self.drones[drone_id].angle -= .4
elif turn_action == 3: # slight left
self.drones[drone_id].angle += .2
elif turn_action == 4: # hard left
self.drones[drone_id].angle += .4
def set_speed(self, speed_action, drone_id):
# choose appropriate speed action, including 0 speed
if speed_action == 0:
cur_speed = SPEEDS[0]
elif speed_action == 1:
cur_speed = SPEEDS[1]
elif speed_action == 2:
cur_speed = SPEEDS[2]
return cur_speed
def set_pack_adjust(self, pack_action):
heading_adjust = []
# pack actions effect +/- 0.8 radian (45 deg) drone heading adjustments (2)
for i in range(NUM_DRONES):
heading = PACK_HEADING_ADJUST[pack_action][i]
heading_adjust.append(heading * (3.14 / 4))
return heading_adjust
def evaluate_move(self, drone_id, heading_adjust, min_sonar_dist):
'''eventually, will introduce multiple targets but each new target doubles
state variables. so, for now, assuming single target:'''
target_id = 0
avoid_move_efficiency = 0
acquire_move_efficiency = 0
# 1. calc distance and angle to active target(s) relative to drone
target_dist, target_rads = calc_dist_rads(self.current_targets[target_id],
self.drones[drone_id].position,0)
# 2. get adjusted heading
_, adj_target_rads = calc_dist_rads(self.current_targets[target_id],
self.drones[drone_id].position,
heading_adjust)
adj_target_degs = np.round(degrees(adj_target_rads),1)
# ii. relative to drone's current direction
rads = self.drones[drone_id].angle
rads %= 2*pi
drone_angle_rads = rads
drone_angle_degs = np.round(degrees(rads),1)
if drone_angle_degs > 360:
drone_angle_degs = drone_angle_degs - 360
# "heading" accounts for angle FROM drone and OF drone netting degrees drone must turn
adj_heading_to_target = adj_target_degs - drone_angle_degs
if adj_heading_to_target < -180:
adj_heading_to_target = adj_heading_to_target + 360
if cur_mode != PACK:
# 3. calc normalized efficiency of last move
# vs. target acquisition
dt = int(self.last_tgt_dist[drone_id, target_id] - target_dist)
if abs(dt) >= 20: # mistakenly thinking crashes are moves. so, ignore moves > 12
dt = np.mean(self.tgt_deltas[drone_id])
# postive distance delta indicates "closing" on the target
ndt = np.round((dt - np.mean(self.tgt_deltas[drone_id])) / np.std(self.tgt_deltas[drone_id]),2)
# save current values
self.last_tgt_dist[drone_id, target_id] = target_dist
self.tgt_deltas[drone_id].append(dt)
if len(self.tgt_deltas[drone_id]) > STATS_BUFFER:
self.tgt_deltas[drone_id].pop(0)
# vs. obstacle avoidance
do = min_sonar_dist
# positive distance delta indicates "avoiding" an obstacle
ndo = np.round((do - np.mean(self.obs_dists[drone_id])) / np.std(self.obs_dists[drone_id]),2)
# save current values
self.last_obs_dist[drone_id] = do
self.obs_dists[drone_id].append(do)
if len(self.obs_dists[drone_id]) > STATS_BUFFER:
self.obs_dists[drone_id].pop(0)
# finally, apply calcs to score move
if cur_mode == ACQUIRE:
acquire_move_efficiency = np.round(ndt / target_dist**0.333,2)
# cubed root of the target distance... lessens effect of distance
else:
avoid_move_efficiency = np.round(ndo / target_dist**0.333,2) # was 0.333
acquire_move_efficiency = np.round(ndt / target_dist**0.333,2)
# for balancing avoidance with acquisition
else:
#if self.total_frame_ctr == 1 or self.replay_frame_ctr % PACK_EVAL_FRAMES == 0:
# 3. calc normalized efficiency of last move
# vs. target acquisition
dt = int(self.last_pack_tgt_dist[drone_id, target_id] - target_dist)
if abs(dt) >= 20: # mistakenly thinking crashes are moves. so, ignore moves > 12
dt = np.mean(self.pack_tgt_deltas[drone_id])
# postive distance delta indicates "closing" on the target
ndt = np.round((dt - np.mean(self.pack_tgt_deltas[drone_id])) / np.std(self.pack_tgt_deltas[drone_id]),2)
# save current values
self.last_pack_tgt_dist[drone_id, target_id] = target_dist
self.pack_tgt_deltas[drone_id].append(dt)
if len(self.pack_tgt_deltas[drone_id]) > STATS_BUFFER:
self.pack_tgt_deltas[drone_id].pop(0)
# vs. obstacle avoidance
do = min_sonar_dist
# positive distance delta indicates "avoiding" an obstacle
ndo = np.round((do - np.mean(self.pack_obs_dists[drone_id])) / np.std(self.pack_obs_dists[drone_id]),2)
# save current values
self.last_pack_obs_dist[drone_id] = do
self.pack_obs_dists[drone_id].append(do)
if len(self.pack_obs_dists[drone_id]) > STATS_BUFFER:
self.pack_obs_dists[drone_id].pop(0)
# finally, apply calcs to score move
avoid_move_efficiency = np.round(ndo / target_dist**0.333,2)
acquire_move_efficiency = np.round(ndt / target_dist**0.333,2)
# for balancing avoidance with acquisition
# 4. if w/in reasonable distance, declare victory
if target_dist <= TARGET_RADIUS:
print("************** target acquired ************")
self.target_acquired = True
# move acquired target to acquired targets
self.acquired_targets.append(self.current_targets[target_id])
self.target_inventory.remove(self.current_targets[target_id])
print("pct complete:", (len(self.acquired_targets) /
(len(self.acquired_targets) + len(self.target_inventory))))
if len(self.acquired_targets) % 20 == 1:
take_screen_shot(screen)
time.sleep(0.2) # screen capture takes a bit
# remove old target
self.current_targets.remove(self.current_targets[target_id])
# get a new target
self.assign_target(False, drone_id)
start_dists = []
if cur_mode == PACK:
# find furthest start position...
for i in range(len(self.start_positions)):
dx = self.start_positions[i][0] - self.current_targets[0][0]
dy = self.start_positions[i][1] - self.current_targets[0][1]
start_dist = int(((dx**2 + dy**2)**0.5))
# ...that doesn't have an obstacle w/in 125
obstacle_dists = []; cat_dists = []
if len(self.obstacles) > 0:
for j in range(len(self.obstacles)):
dx = self.start_positions[i][0] - self.obstacles[j].position[0]
dy = self.start_positions[i][1] - self.obstacles[j].position[1]
obstacle_dists.append(int(((dx**2 + dy**2)**0.5)))
if len(self.cats) > 0:
for k in range(len(self.cats)):
dx = self.start_positions[i][0] - self.cats[k].position[0]
dy = self.start_positions[i][1] - self.cats[k].position[1]
cat_dists.append(int(((dx**2 + dy**2)**0.5)))
if (len(self.obstacles) > 0 and min(obstacle_dists) < 125) or (len (self.cats) > 0 and min(cat_dists) < 125):
print("-- alternative start location")
start_dists.append(0)
else:
start_dists.append(start_dist)
# move drones to start position
for i in range(NUM_DRONES):
self.drones[i].position = \
self.start_positions[start_dists.index(max(start_dists))][0], \
self.start_positions[start_dists.index(max(start_dists))][1]
self.drones[i].angle = self.start_angles[start_dists.index(max(start_dists))]
# introduce obstacles gradually for HUNT/PACK learning
if cur_mode == PACK and drone_id == (NUM_DRONES - 1):
if self.total_frame_ctr > 1 and self.obs_adds == 0:
for i in range(2):
self.obstacles.append(self.create_obstacle(random.randint(100, width-100),
random.randint(100, height-100),
OBSTACLE_SIZES[0]))
self.target_radius -= 2
self.obs_adds += 1
elif self.total_frame_ctr > 25 and self.obs_adds == 1: # 2500
for i in range(2):
self.obstacles.append(self.create_obstacle(random.randint(100, width-100),
random.randint(100, height-100),
OBSTACLE_SIZES[1]))
self.target_radius -= 2
self.obs_adds += 1
elif self.total_frame_ctr > 75 and self.obs_adds == 2: # 7500
for i in range(2):
self.obstacles.append(self.create_obstacle(random.randint(100, width-100),
random.randint(100, height-100),
OBSTACLE_SIZES[2]))
self.target_radius -= 2
self.obs_adds += 1
elif self.total_frame_ctr > 225 and self.obs_adds == 3: # 22500
for i in range(2):
self.cats.append(self.create_cat(random.randint(50, width-50),
random.randint(50, height-50)))
self.target_radius -= 2
self.obs_adds += 1
elif self.total_frame_ctr > 500 and self.obs_adds == 4: # 500
for i in range(2):
self.cats.append(self.create_cat(random.randint(50, width-50),
random.randint(50, height-50)))
self.target_radius -= 2
self.obs_adds += 1
return target_dist, target_rads, drone_angle_rads, adj_heading_to_target, \
avoid_move_efficiency, acquire_move_efficiency
def build_states(self, drone_id, turn_action, heading_adjust, cur_speed):
turn_state = 0
avoid_state = 0
acquire_state = 0
hunt_state = 0
drone_state = 0
min_sonar_dist = 0
avoid_move_efficiency = 0
acquire_move_efficiency = 0
# get readings from the various sensors
sonar_dist_readings, sonar_color_readings = \
self.get_sonar_dist_color_readings(drone_id)
turn_readings = sonar_dist_readings[:TURN_NUM_SENSOR]
min_sonar_dist = min(turn_readings)
turn_readings = turn_readings + sonar_color_readings[:TURN_NUM_SENSOR]
turn_state = np.array([turn_readings])
if cur_mode != TURN:
avoid_readings = sonar_dist_readings[:AVOID_NUM_SENSOR]
min_sonar_dist = min(avoid_readings)
avoid_readings = avoid_readings + sonar_color_readings[:AVOID_NUM_SENSOR]
avoid_readings.append(turn_action)
avoid_readings.append(cur_speed)
avoid_state = np.array([avoid_readings])
if cur_mode in [ACQUIRE, HUNT, PACK]:
# calc distances, headings and efficiency
min_sonar_dist = min(sonar_dist_readings[:HUNT_NUM_SENSOR])
# note: avoid, hunt, pack all using 7 sensors for min dist.
# however, pack will only be seeing four sensors. FIX THIS AT SOME POINT.
# problem is, you can't call evaluate_move twice as it appends readings for mean sd ea time.
# So, some moves will be evaluated based on sensor distances it doesn't see.
target_dist, target_angle_rads, drone_angle_rads, adj_heading_to_target, \
avoid_move_efficiency, acquire_move_efficiency = \
self.evaluate_move(drone_id, heading_adjust, min_sonar_dist)
acquire_state = np.array([[target_dist, adj_heading_to_target]])
if cur_mode in [HUNT, PACK]:
hunt_readings = sonar_dist_readings[:HUNT_NUM_SENSOR]
hunt_readings = hunt_readings + sonar_color_readings[:HUNT_NUM_SENSOR]
hunt_readings.append(target_dist)
hunt_readings.append(adj_heading_to_target)
hunt_state = np.array([hunt_readings])
min_sonar_dist = min(sonar_dist_readings[:HUNT_NUM_SENSOR])
if cur_mode == PACK: #and (self.total_frame_ctr == 1 or self.replay_frame_ctr % PACK_EVAL_FRAMES == 0)
'''approach 1: didn't work. lever to input and reward linkage too noisey'''
# pack requires four compas point (above, below, right and left) obs dist readings
#compass_rads = [0, (3.14/2), 3.14, (-3.14/2)]
# it gets readings by adjusting the sonar readings for the drone angle...
#sonar_angles = [0, 0.6, -0.6, 1.2, -1.2, 2.8, -2.8]
#sonar_angles_adj = np.add(sonar_angles, drone_angle_rads)
# ...then finds the sonar reading closest to its required compass direction
#for rad in range(len(compass_rads)):
# drone_readings.append(sonar_dist_readings[find_nearest(sonar_angles_adj,
# compass_rads[rad])])
# drone_readings.append(target_dist)
# drone_readings.append(target_angle_rads)
'''approach 2: using dist/headings to obj. unlikely to work'''
#obj_dist_rads = []
#for obstacle in self.obstacles:
# obs_dist, obs_rads = calc_dist_rads(obstacle.position,
# self.drones[drone_id].position)
# obj_dist_rads.append(obs_dist)
# obj_dist_rads.append(obs_rads)
#for cat in self.cats:
# cat_dist, cat_rads = calc_dist_rads(cat.position,
# self.drones[drone_id].position)
# obj_dist_rads.append(cat_dist)
# obj_dist_rads.append(cat_rads)
#ctr = 0
#for drone in self.drones:
# if drone_id != ctr:
# drone_dist, drone_rads = calc_dist_rads(drone.position,
# self.drones[drone_id].position)
# obj_dist_rads.append(drone_dist)
# obj_dist_rads.append(drone_rads)
# ctr += 1
'''approach 3: object coordinates'''
drone_state = np.zeros([1,DRONE_NUM_SENSOR])
if drone_id == (NUM_DRONES - 1):
ctr = 0
for obstacle in self.obstacles:
x, y = obstacle.position
drone_state[0, ctr] = int(x)
drone_state[0, (ctr+1)] = int(y)
ctr += 2
ctr = 0
for cat in self.cats:
x, y = cat.position
drone_state[0,((2 * NUM_OBSTACLES) + ctr)] = int(x)
drone_state[0,((2 * NUM_OBSTACLES) + 1 + ctr)] = int(y)
ctr += 2
ctr = 0
for drone in self.drones:
x, y = drone.position
drone_state[0,((2 * (NUM_OBSTACLES + NUM_CATS)) + ctr)] = int(x)
drone_state[0,((2 * (NUM_OBSTACLES + NUM_CATS)) + 1 + ctr)] = int(y)
ctr += 2
ctr = 0
for target in self.current_targets:
drone_state[0,((2 * (NUM_OBSTACLES + NUM_CATS + NUM_DRONES)) + ctr)] = target[0]
drone_state[0,((2 * (NUM_OBSTACLES + NUM_CATS + NUM_DRONES)) + 1 + ctr)] = target[1]
ctr += 2
return turn_state, avoid_state, acquire_state, hunt_state, drone_state, \
min_sonar_dist, avoid_move_efficiency, acquire_move_efficiency
def calc_rwd(self, drone_id, min_sonar_dist, driving_direction, cur_speed,
avoid_move_efficiency, acquire_move_efficiency):
reward = 0
x, y = self.drones[drone_id].position
# check for crash
if min_sonar_dist <= 1: # and cur_mode != PACK
reward = -500
if x < 0 or x > width or y < 0 or y > height:
self.drones[drone_id].position = int(width/2), int(height/2)
self.num_off_scrn += 1
print("off screen. total off screens", self.num_off_scrn)
reward = -1000
self.recover_from_crash(driving_direction, drone_id)
else:
if cur_mode == TURN:
# Rewards better spacing from objects
reward = min_sonar_dist
elif cur_mode == AVOID:
# rewards distance from objects and speed
sd_speeds = np.std(SPEEDS)
sd_dist = np.std(range(20))
std_speed = cur_speed / sd_speeds
std_dist = min_sonar_dist / sd_dist
std_max_speed = max(SPEEDS) / sd_speeds
std_max_dist = SONAR_ARM_LEN / sd_dist
reward = ((std_speed * std_dist) +
((std_max_speed - std_speed) * (std_max_dist - std_dist)))
else: # i.e., cur_mode is acquisition-related (acquire, hunt, pack)
# rewards moving in the right direction and acquiring pixels
if self.target_acquired == True:
reward = 1000
self.target_acquired = False
self.acquire_frame_ctr = 0
else:
if cur_mode == ACQUIRE:
reward = 100 * acquire_move_efficiency
elif cur_mode == HUNT:
reward = 40 * acquire_move_efficiency + 50 * avoid_move_efficiency
if cur_mode == PACK:
# rewards moving all drones in right direction and acquiring pixels
if reward == 1000:
self.pack_cum_rwds[drone_id, 0] = self.pack_cum_rwds[drone_id, 0] + 1000
elif reward == -500 or reward == -1000:
self.pack_cum_rwds[drone_id, 0] = self.pack_cum_rwds[drone_id, 0] - 500
else:
new_rwd = (((60 / (PACK_EVAL_FRAMES * NUM_DRONES)) * acquire_move_efficiency) +
((40 / (PACK_EVAL_FRAMES * NUM_DRONES)) * avoid_move_efficiency))
# two drones. reward each 1/2 of total in acquire/avoid eff proportion
self.pack_cum_rwds[drone_id, 0] += new_rwd
reward = 0
#print("++ in game / calc rwd. replay frame ctr:", self.replay_frame_ctr)
#print("drone_id:", drone_id, "drone reward:", self.pack_cum_rwds[drone_id,0])
if self.replay_frame_ctr % PACK_EVAL_FRAMES == 0: # self.total_frame_ctr == 1 or
reward = int(self.pack_cum_rwds[drone_id, 0])
self.pack_cum_rwds[drone_id, 0] = 0
return reward
def recover_from_crash(self, driving_direction, drone_id):
# back up
crash_adjust = -100
self.drones[drone_id].velocity = crash_adjust * driving_direction
for i in range(10):
self.drones[drone_id].angle += .2 # Turn a little.
screen.fill(THECOLORS["red"]) # Red is scary!
draw(screen, self.space)
self.space.step(1./10)
if draw_screen:
pygame.display.flip()
clock.tick()
def get_sonar_dist_color_readings(self, drone_id):
sonar_dist_readings = []
sonar_color_readings = []
"""
sonar readings return N "distance" readings, one for each sonar. distance is
a count of the first non-zero color detection reading starting at the object.
"""
# make sonar "arms"
arm_1 = self.make_sonar_arm(drone_id)
arm_2 = arm_1
arm_3 = arm_1
arm_4 = arm_1
arm_5 = arm_1
arm_6 = arm_1
arm_7 = arm_1
# rotate arms to get vector of readings
d, c = self.get_arm_dist_color(arm_1, 0, drone_id)
sonar_dist_readings.append(d); sonar_color_readings.append(c)
d, c = self.get_arm_dist_color(arm_2, 0.6, drone_id)
sonar_dist_readings.append(d); sonar_color_readings.append(c)
d, c = self.get_arm_dist_color(arm_3, -0.6, drone_id)
sonar_dist_readings.append(d); sonar_color_readings.append(c)
d, c = self.get_arm_dist_color(arm_4, 1.2, drone_id)
sonar_dist_readings.append(d); sonar_color_readings.append(c)
d, c = self.get_arm_dist_color(arm_5, -1.2, drone_id)
sonar_dist_readings.append(d); sonar_color_readings.append(c)
d, c = self.get_arm_dist_color(arm_6, 2.8, drone_id)
sonar_dist_readings.append(d); sonar_color_readings.append(c)
d, c = self.get_arm_dist_color(arm_7, -2.8, drone_id)
sonar_dist_readings.append(d); sonar_color_readings.append(c)
if show_sensors:
pygame.display.update()
return sonar_dist_readings, sonar_color_readings
def get_arm_dist_color(self, arm, offset, drone_id):
# count arm length to nearest obstruction
i = 0
x, y = self.drones[drone_id].position
# evaluate each arm point to see if we've hit something
for point in arm:
i += 1
# move the point to the right spot
rotated_p = self.get_rotated_point(x, y, point[0], point[1],
self.drones[drone_id].angle + offset)
# return i if rotated point is off screen
if rotated_p[0] <= 0 or rotated_p[1] <= 0 or rotated_p[0] >= width or rotated_p[1] >= height:
return i, 1 # 1 is wall color
else:
obs = screen.get_at(rotated_p)
# this gets the color of the pixel at the rotated point
obs_color = self.get_track_or_not(obs)
if obs_color != 0:
# if pixel not a safe color, return distance
return i, obs_color
# plots the individual sonar point on the screen
if show_sensors:
pygame.draw.circle(screen, (255, 255, 255), (rotated_p), 2)
return i, 0 # 0 is safe color
def make_sonar_arm(self, drone_id):
x, y = self.drones[drone_id].position
spread = 10 # gap between points on sonar arm
distance = 10 # number of points on sonar arm
arm_points = []
# builds arm flat. it will be rotated about the center later
for i in range(1, SONAR_ARM_LEN): # was 40
arm_points.append((x + distance + (spread * i), y))
return arm_points
def get_rotated_point(self, x_1, y_1, x_2, y_2, radians):
# Rotate x_2, y_2 around x_1, y_1 by angle.
x_change = (x_2 - x_1) * math.cos(radians) + \
(y_2 - y_1) * math.sin(radians)
y_change = (y_1 - y_2) * math.cos(radians) - \
(x_1 - x_2) * math.sin(radians)
new_x = x_change + x_1
new_y = height - (y_change + y_1)
return int(new_x), int(new_y)
def get_track_or_not(self, reading):
# check to see if color encountered is safe (i.e., should not be crash)
if reading == pygame.color.THECOLORS[BACK_COLOR] or \
reading == pygame.color.THECOLORS[DRONE_COLOR] or \
reading == pygame.color.THECOLORS[SMILE_COLOR] or \
reading == pygame.color.THECOLORS[ACQUIRE_PIXEL_COLOR] or \
reading == pygame.color.THECOLORS[ACQUIRED_PIXEL_COLOR] or \
reading == pygame.color.THECOLORS[PATH_COLOR]:
return 0
else:
if reading == pygame.color.THECOLORS[WALL_COLOR]:
return 1
elif reading == pygame.color.THECOLORS[CAT_COLOR]:
return 2
elif reading == pygame.color.THECOLORS[OBSTACLE_COLOR]:
return 3
else:
return 1
# ***** target and acquire model functions *****
def generate_targets(self, first_iter):
# calc number of targets that can fit space
num_pxl_x_dir = int((width - 2 * ACQUIRE_MARGIN)/ACQUIRE_PIXEL_SEPARATION)
num_pxl_y_dir = int((height- 2 * ACQUIRE_MARGIN)/ACQUIRE_PIXEL_SEPARATION)
n = num_pxl_x_dir * num_pxl_y_dir
ctr = 0
for v in range(num_pxl_y_dir):
for h in range(num_pxl_x_dir):
# space targets across target grid
x_pxl = (ACQUIRE_MARGIN + (h * ACQUIRE_PIXEL_SEPARATION))
y_pxl = (ACQUIRE_MARGIN + (v * ACQUIRE_PIXEL_SEPARATION))
if(first_iter == True):
self.target_inventory.append((x_pxl,y_pxl))
ctr += 1
return num_pxl_x_dir, num_pxl_y_dir
def assign_target(self, first_iter, drone_id):
# clear the path surface
path_grid.fill(pygame.color.THECOLORS[BACK_COLOR])
# mark target as acquired
x, y = self.drones[drone_id].position
if first_iter == False:
pygame.draw.rect(target_grid, pygame.color.THECOLORS[ACQUIRED_PIXEL_COLOR],
((x, height - y), (ACQUIRE_PIXEL_SIZE, ACQUIRE_PIXEL_SIZE)), 0)
# randomly select a new target
new_target = random.choice(self.target_inventory)
self.current_targets.append(new_target)
# draw the new target
pygame.draw.rect(target_grid, pygame.color.THECOLORS[ACQUIRE_PIXEL_COLOR],
((new_target[0], height - new_target[1]),
(ACQUIRE_PIXEL_SIZE, ACQUIRE_PIXEL_SIZE)), 0)
# ***** global functions *****
def find_nearest(array, value):
idx = (np.abs(array-value)).argmin()
return idx
def calc_dist_rads(xy1, xy2, rad_adjust):
dx = xy1[0] - xy2[0]
dy = xy1[1] - xy2[1]
dist = int(((dx**2 + dy**2)**0.5))
rads = atan2(dy,dx) + rad_adjust
rads %= 2*pi
rads = np.round(rads,2)
if rads > 3.14:
rads = rads - 6.28
return dist, rads
def take_screen_shot(screen):
time_taken = time.asctime(time.localtime(time.time()))
time_taken = time_taken.replace(" ", "_")
time_taken = time_taken.replace(":",".")
save_file = "screenshots/" + time_taken + ".jpeg"
pygame.image.save(screen,save_file)
print("screen shot taken")
In [5]:
import numpy as np
import random
import csv
import os.path
import timeit
import time
# initialize
GAMMA = 0.9 # Forgetting.
def train_net(turn_model, turn_model_30, turn_model_50, turn_model_70, avoid_model, acquire_model,
acquire_model_30, acquire_model_50, acquire_model_70, hunt_model, pack_model, params):
filename = params_to_filename(params)
if cur_mode in [ACQUIRE, HUNT]:
observe = 1000 # Number of frames to observe before training.
elif cur_mode == PACK:
observe = 2 * PACK_STATE_FRAMES
else:
observe = 2000
epsilon = 1 # vary this based on pre-learning already occurred in lower models
train_frames = 1000000 # number of flips for training
batchSize = params['batchSize']
buffer = params['buffer']
# initialize variables and structures used below.
max_crash_frame_ctr = 0
crash_frame_ctr = 0
total_frame_ctr = 0
replay_frame_ctr = 0
stop_ctr = 0
avoid_ctr = 0
acquire_ctr = 0
cum_rwd = 0
cum_speed = 0
new_pack_action = pack_action = START_PACK_ACTION
new_pack_rwd = pack_rwd = 0
data_collect = []
replay = []
pack_states = []
loss_log = [] # replay stores state, action, reward, new state
save_init = True
cur_speeds = []
for i in range(NUM_DRONES): cur_speeds.append(START_SPEED)
# initialize drone state holders
turn_states = np.zeros([NUM_DRONES, TURN_TOTAL_SENSORS * TURN_STATE_FRAMES])
avoid_states = np.zeros([NUM_DRONES, AVOID_TOTAL_SENSORS * AVOID_STATE_FRAMES])
acquire_states = np.zeros([NUM_DRONES, ACQUIRE_TOTAL_SENSORS * ACQUIRE_STATE_FRAMES])
hunt_states = np.zeros([NUM_DRONES, HUNT_TOTAL_SENSORS * HUNT_STATE_FRAMES])
pack_states = np.zeros([PACK_STATE_FRAMES, PACK_NUM_INPUT])
# create game instance
game_state = GameState()
# get initial state(s)
turn_state, avoid_state, acquire_state, hunt_state, drone_state, reward, cur_speed = \
game_state.frame_step(START_DRONE_ID, START_TURN_ACTION, START_SPEED_ACTION,
START_PACK_ACTION, START_SPEED, START_DISTANCE, 1)
# initialize frame states
if cur_mode in [TURN, AVOID, HUNT, PACK]:
for i in range(NUM_DRONES):
turn_states[i] = state_frames(turn_state,
np.zeros((1, TURN_TOTAL_SENSORS * TURN_STATE_FRAMES)),
TURN_TOTAL_SENSORS, TURN_STATE_FRAMES)
if cur_mode in [AVOID, HUNT, PACK]:
for i in range(NUM_DRONES):
avoid_states[i] = state_frames(avoid_state,
np.zeros((1, AVOID_TOTAL_SENSORS * AVOID_STATE_FRAMES)),
AVOID_TOTAL_SENSORS, AVOID_STATE_FRAMES)
if cur_mode in [ACQUIRE, HUNT, PACK]:
for i in range(NUM_DRONES):
acquire_states[i] = state_frames(acquire_state,
np.zeros((1, ACQUIRE_TOTAL_SENSORS * ACQUIRE_STATE_FRAMES)),
ACQUIRE_TOTAL_SENSORS, ACQUIRE_STATE_FRAMES)
if cur_mode in [HUNT, PACK]:
for i in range(NUM_DRONES):
hunt_states[i] = state_frames(hunt_state,
np.zeros((1, HUNT_TOTAL_SENSORS * HUNT_STATE_FRAMES)),
HUNT_TOTAL_SENSORS, HUNT_STATE_FRAMES)
if cur_mode == PACK:
for i in range(PACK_STATE_FRAMES):
pack_states[replay_frame_ctr] = drone_state
# time it
start_time = timeit.default_timer()
# run frames
while total_frame_ctr < train_frames:
total_frame_ctr += 1 # counts total training distance traveled
crash_frame_ctr += 1 # counts distance between crashes
replay_frame_ctr += 1 # counts frames between pack mode replay captures
# used to slow things down for de-bugging
#time.sleep(1)
for drone_id in range(NUM_DRONES): # NUM_DRONES = 1, unless you're in PACK mode
speed_action = START_SPEED_ACTION
# choose appropriate action(s)
'''note: only generates random inputs for currently training model.
All prior (sub) models provide their best (fully-trained) inputs.'''
if random.random() < epsilon or total_frame_ctr < observe: # epsilon degrades over flips...
if cur_mode == TURN:
turn_action, active_turn_model = set_turn_action(True, cur_speeds[drone_id],
np.array([turn_states[drone_id]]),
turn_model, turn_model_30,
turn_model_50, turn_model_70)
else:
if cur_mode in [AVOID, HUNT, PACK]:
turn_action, active_turn_model = set_turn_action(False,
cur_speeds[drone_id],
np.array([turn_states[drone_id]]),
turn_model, turn_model_30,
turn_model_50, turn_model_70)
if cur_mode == AVOID:
speed_action = set_avoid_action(True, turn_action,
np.array([avoid_states[drone_id]]))
else:
if cur_mode in [HUNT, PACK]:
speed_action = set_avoid_action(False, turn_action,
np.array([avoid_states[drone_id]]))
if cur_mode == ACQUIRE:
acquire_action, active_acquire_model = \
set_acquire_action(True, cur_speeds[drone_id],
np.array([acquire_states[drone_id,]]),
acquire_model, acquire_model_30,
acquire_model_50, acquire_model_70)
turn_action = acquire_action
else:
acquire_action, active_acquire_model = \
set_acquire_action(False, cur_speeds[drone_id],
np.array([acquire_states[drone_id,]]),
acquire_model, acquire_model_30,
acquire_model_50, acquire_model_70)
if cur_mode == HUNT:
hunt_action, turn_action, speed_action = \
set_hunt_action(True, cur_speeds[drone_id],
turn_action,speed_action,
acquire_action,
np.array([hunt_states[drone_id,]]))
else:
hunt_action, turn_action, speed_action = \
set_hunt_action(False, cur_speeds[drone_id],
turn_action, speed_action, acquire_action,
np.array([hunt_states[drone_id,]]))
if cur_mode == PACK and (replay_frame_ctr - 1) % PACK_STATE_FRAMES == 0 and drone_id == 0: #(total_frame_ctr == 1 or
new_pack_action = set_pack_action(True, np.array([pack_states[PACK_EVAL_FRAMES:]]))
'''note: pack action only changed every PACK_EVAL_FRAMES.
For frames in between it is constant'''
else: # ...increasing use of predictions over time
if cur_mode == TURN:
turn_action, active_turn_model = set_turn_action(False, cur_speeds[drone_id],
np.array([turn_states[drone_id]]),
turn_model, turn_model_30,
turn_model_50, turn_model_70)
else:
if cur_mode in [AVOID, HUNT, PACK]:
turn_action, active_turn_model = set_turn_action(False, cur_speeds[drone_id],
np.array([turn_states[drone_id]]),
turn_model, turn_model_30,
turn_model_50, turn_model_70)
if cur_mode == AVOID:
speed_action = set_avoid_action(False, turn_action,
np.array([avoid_states[drone_id]]))
else:
if cur_mode in [HUNT, PACK]:
speed_action = set_avoid_action(False, turn_action,
np.array([avoid_states[drone_id]]))
if cur_mode == ACQUIRE:
acquire_action, active_acquire_model = \
set_acquire_action(False, cur_speeds[drone_id],
np.array([acquire_states[drone_id,]]),
acquire_model, acquire_model_30,
acquire_model_50, acquire_model_70)
turn_action = acquire_action
else:
acquire_action, active_acquire_model = \
set_acquire_action(False, cur_speeds[drone_id],
np.array([acquire_states[drone_id,]]),
acquire_model, acquire_model_30,
acquire_model_50, acquire_model_70)
if cur_mode == HUNT:
hunt_action, turn_action, speed_action = \
set_hunt_action(False, cur_speeds[drone_id],
turn_action, speed_action, acquire_action,
np.array([hunt_states[drone_id,]]))
else:
hunt_action, turn_action, speed_action = \
set_hunt_action(False, cur_speeds[drone_id],
turn_action, speed_action, acquire_action,
np.array([hunt_states[drone_id,]]))
if cur_mode == PACK and (replay_frame_ctr - 1) % PACK_STATE_FRAMES == 0 and drone_id == 0: #(total_frame_ctr == 1 or
# get 1 pack action for each set of drones on first drone
new_pack_action = set_pack_action(False, np.array([pack_states[PACK_EVAL_FRAMES:]]))
new_turn_state, new_avoid_state, new_acquire_state, new_hunt_state, new_drone_state, new_reward, new_speed = \
game_state.frame_step(drone_id, turn_action, speed_action, new_pack_action,
cur_speeds[drone_id], total_frame_ctr, replay_frame_ctr)
# append (horizontally) historical states for learning speed.
'''note: do this concatination even for models that are not learning
(e.g., turn when running search or turn, search and acquire while running hunt)
b/c their preds, performed above, expect the same multi-frame view that was
in place when they trained.'''
if cur_mode in [TURN, AVOID, HUNT, PACK]:
new_turn_state = state_frames(new_turn_state,
np.array([turn_states[drone_id]]),
TURN_TOTAL_SENSORS, TURN_STATE_FRAMES)
if cur_mode in [AVOID, HUNT, PACK]:
new_avoid_state = state_frames(new_avoid_state,
np.array([avoid_states[drone_id]]),
AVOID_TOTAL_SENSORS, AVOID_STATE_FRAMES)
if cur_mode in [ACQUIRE, HUNT, PACK]:
new_acquire_state = state_frames(new_acquire_state,
np.array([acquire_states[drone_id]]),
ACQUIRE_TOTAL_SENSORS, ACQUIRE_STATE_FRAMES)
if cur_mode in [HUNT, PACK]:
new_hunt_state = state_frames(new_hunt_state,
np.array([hunt_states[drone_id]]),
HUNT_TOTAL_SENSORS, HUNT_STATE_FRAMES)
if cur_mode == PACK: #and (total_frame_ctr == 1 or replay_frame_ctr % PACK_EVAL_FRAMES == 0):
new_pack_rwd += new_reward
if drone_id == (NUM_DRONES - 1): # for last drone add pack record
pack_states = np.append(pack_states, new_drone_state, axis = 0)
if pack_states.shape[0] > (2 * PACK_STATE_FRAMES):
pack_states = np.delete(pack_states, 0, 0)
# experience replay storage
"""note: only the model being trained requires event storage as it is
stack that will be sampled for training below."""
if cur_mode == TURN:
replay.append((np.array([turn_states[drone_id]]),
turn_action, new_reward, new_turn_state))
elif cur_mode == AVOID:
replay.append((np.array([avoid_states[drone_id]]),
speed_action, new_reward, new_avoid_state))
elif cur_mode == ACQUIRE:
replay.append((np.array([acquire_states[drone_id]]),
turn_action, new_reward, new_acquire_state))
elif cur_mode == HUNT:
replay.append((np.array([hunt_states[drone_id]]),
hunt_action, new_reward, new_hunt_state))
elif cur_mode == PACK and total_frame_ctr > PACK_EVAL_FRAMES and replay_frame_ctr % PACK_EVAL_FRAMES == 0 and drone_id == (NUM_DRONES - 1): #(total_frame_ctr == 1 or
replay.append((pack_states[0:PACK_EVAL_FRAMES], pack_action,
pack_rwd, pack_states[PACK_EVAL_FRAMES:]))
pack_action = new_pack_action
pack_rwd = new_pack_rwd
new_pack_rwd = 0
# If we're done observing, start training.
if total_frame_ctr > observe and (cur_mode != PACK or (replay_frame_ctr % PACK_EVAL_FRAMES == 0 and drone_id == (NUM_DRONES - 1))):
# If we've stored enough in our buffer, pop the oldest.
if len(replay) > buffer:
replay.pop(0)
if cur_mode == PACK:
minibatch = replay[-1]
else:
# randomly sample experience replay memory
minibatch = random.sample(replay, batchSize)
if cur_mode == TURN:
# Get training values.
X_train, y_train = process_minibatch(minibatch, active_turn_model,
TURN_NUM_INPUT, TURN_NUM_OUTPUT, cur_mode)
history = LossHistory()
active_turn_model.fit(X_train, y_train, batch_size=batchSize,
nb_epoch=1, verbose=0, callbacks=[history])
elif cur_mode == AVOID:
X_train, y_train = process_minibatch(minibatch, avoid_model,
AVOID_NUM_INPUT, AVOID_NUM_OUTPUT, cur_mode)
history = LossHistory()
avoid_model.fit(X_train, y_train, batch_size=batchSize,
nb_epoch=1, verbose=0, callbacks=[history])
elif cur_mode == ACQUIRE:
X_train, y_train = process_minibatch(minibatch, active_acquire_model,
ACQUIRE_NUM_INPUT, ACQUIRE_NUM_OUTPUT, cur_mode)
history = LossHistory()
active_acquire_model.fit(X_train, y_train, batch_size=batchSize,
nb_epoch=1, verbose=0, callbacks=[history])
elif cur_mode == HUNT:
X_train, y_train = process_minibatch(minibatch, hunt_model,
HUNT_NUM_INPUT, HUNT_NUM_OUTPUT, cur_mode)
history = LossHistory()
hunt_model.fit(X_train, y_train, batch_size=batchSize,
nb_epoch=1, verbose=0, callbacks=[history])
elif cur_mode == PACK:
X_train, y_train = process_pack_batch(minibatch, pack_model, PACK_STATE_FRAMES,
PACK_NUM_INPUT, PACK_NUM_OUTPUT, cur_mode)
history = LossHistory()
pack_model.fit(X_train, y_train, batch_size=batchSize,
nb_epoch=1, verbose=0, callbacks=[history])
loss_log.append(history.losses)
# Update the starting state with S'.
if cur_mode in [TURN, AVOID, HUNT, PACK]:
turn_states[drone_id] = new_turn_state
if cur_mode in [AVOID, HUNT, PACK]:
avoid_states[drone_id] = new_avoid_state
if cur_mode in [ACQUIRE, HUNT, PACK]:
acquire_states[drone_id] = new_acquire_state
if cur_mode in [HUNT, PACK]:
hunt_states[drone_id] = new_hunt_state
if cur_mode == PACK and replay_frame_ctr % PACK_EVAL_FRAMES == 0: #(total_frame_ctr == 1 or
#drone_states[drone_id] = new_drone_state
replay_frame_ctr = 0
cur_speeds[drone_id] = new_speed
cum_rwd += new_reward
# in case of crash, report and initialize
if new_reward == -500 or new_reward == -1000:
# Log the car's distance at this T.
data_collect.append([total_frame_ctr, crash_frame_ctr])
# new max achieved?
if crash_frame_ctr > max_crash_frame_ctr:
max_crash_frame_ctr = crash_frame_ctr
# Time it.
tot_time = timeit.default_timer() - start_time
fps = crash_frame_ctr / tot_time
# output results to point of crash
print("Max: %d at %d\t eps: %f\t dist: %d\t mode: %d\t cum rwd: %d\t fps: %d" %
(max_crash_frame_ctr, total_frame_ctr, epsilon, crash_frame_ctr, cur_mode, cum_rwd, int(fps)))
# Reset.
crash_frame_ctr = cum_rwd = cum_speed = 0
start_time = timeit.default_timer()
#print(9)
# decrement epsilon for another frame
if epsilon > 0.1 and total_frame_ctr > observe:
epsilon -= (1/train_frames)
if total_frame_ctr % 10000 == 0:
if crash_frame_ctr != 0:
print("Max: %d at %d\t eps: %f\t dist: %d\t mode: %d\t cum rwd: %d" %
(max_crash_frame_ctr, total_frame_ctr, epsilon, crash_frame_ctr, cur_mode, cum_rwd))
# Save model every 50k frames
if total_frame_ctr % 50000 == 0:
save_init = False
if cur_mode == TURN:
turn_model.save_weights('models/turn/turn-' + filename + '-' +
str(START_SPEED) + '-' + str(total_frame_ctr) + '.h5',overwrite=True)
print("Saving turn_model %s - %d - %d" % (filename, START_SPEED,total_frame_ctr))
elif cur_mode == AVOID:
avoid_model.save_weights('models/avoid/avoid-' + filename + '-' +
str(total_frame_ctr) + '.h5', overwrite=True)
print("Saving avoid_model %s - %d" % (filename,total_frame_ctr))
elif cur_mode == ACQUIRE:
acquire_model.save_weights('models/acquire/acquire-' + filename + '-' +
str(START_SPEED) + '-' + str(total_frame_ctr) + '.h5',overwrite=True)
print("Saving acquire_model %s - %d" % (filename,total_frame_ctr))
elif cur_mode == HUNT:
hunt_model.save_weights('models/hunt/hunt-' + filename + '-' +
str(total_frame_ctr) + '.h5', overwrite=True)
print("Saving hunt_model %s - %d" % (filename,total_frame_ctr))
elif cur_mode == PACK:
pack_model.save_weights('models/pack/pack-' + filename + '-' +
str(total_frame_ctr) + '.h5', overwrite=True)
print("Saving pack_model %s - %d" % (filename, total_frame_ctr))
# Log results after we're done all frames.
log_results(filename, data_collect, loss_log)
def set_turn_action(random_fl, cur_speed, turn_state, turn_model,
turn_model_30, turn_model_50, turn_model_70):
if random_fl:
turn_action = np.random.randint(0, TURN_NUM_OUTPUT)
if use_existing_model == False:
active_turn_model = turn_model
else:
active_turn_model = turn_model_50
else:
if cur_mode == TURN and use_existing_model == False:
turn_qval = turn_model.predict(turn_state, batch_size=1)
active_turn_model = turn_model
else:
if cur_speed == SPEEDS[0]:
turn_qval = turn_model_30.predict(turn_state, batch_size=1)
active_turn_model = turn_model_30
elif cur_speed == SPEEDS[1]:
turn_qval = turn_model_50.predict(turn_state, batch_size=1)
active_turn_model = turn_model_50
elif cur_speed == SPEEDS[2]:
turn_qval = turn_model_70.predict(turn_state, batch_size=1)
active_turn_model = turn_model_70
turn_action = (np.argmax(turn_qval))
return turn_action, active_turn_model
def set_avoid_action(random_fl, turn_action, avoid_state):
if random_fl:
speed_action = np.random.randint(0, AVOID_NUM_OUTPUT)
else:
avoid_state[0][14] = turn_action # ensures AVOID using current turn pred
avoid_qval = avoid_model.predict(avoid_state, batch_size=1)
speed_action = (np.argmax(avoid_qval))
return speed_action
def set_acquire_action(random_fl, cur_speed, acquire_state, acquire_model,
acquire_model_30, acquire_model_50, acquire_model_70):
if random_fl:
turn_action = np.random.randint(0, ACQUIRE_NUM_OUTPUT)
if use_existing_model == False:
active_acquire_model = acquire_model
else:
active_acquire_model = acquire_model_50
else:
if cur_mode == ACQUIRE and use_existing_model == False:
acquire_qval = acquire_model.predict(acquire_state, batch_size=1)
active_acquire_model = acquire_model
else:
if cur_speed == SPEEDS[0]:
acquire_qval = acquire_model_30.predict(acquire_state, batch_size=1)
active_acquire_model = acquire_model_50
elif cur_speed == SPEEDS[1]:
acquire_qval = acquire_model_50.predict(acquire_state, batch_size=1)
active_acquire_model = acquire_model_50
else:
acquire_qval = acquire_model_70.predict(acquire_state, batch_size=1)
active_acquire_model = acquire_model_70
turn_action = (np.argmax(acquire_qval))
return turn_action, active_acquire_model
def set_hunt_action(random_fl, cur_speed, turn_action, speed_action, acquire_action, hunt_state):
if random_fl:
hunt_action = np.random.randint(0, HUNT_NUM_OUTPUT)
if hunt_action == HUNT_AVOID: # accept speed model action
turn_action = turn_action
if cur_speed > 0:
speed_action = speed_action # continue current speed
else:
speed_action = 1 # reset speed to 50, as you were stopped
#avoid_ctr += 1
elif hunt_action == HUNT_ACQUIRE: # accept acquire model action
turn_action = acquire_action
if cur_speed > 0:
speed_action = 1 # just setting acquire speed to 50 for now
else:
speed_action = 1 # reset speed to 50, as you were stopped
else:
hunt_qval = hunt_model.predict(hunt_state, batch_size=1)
hunt_action = (np.argmax(hunt_qval))
if hunt_action == HUNT_AVOID: # accept avoid model action
turn_action = turn_action
if cur_speed > 0:
speed_action = speed_action # continue current speed
else:
speed_action = 1
elif hunt_action == HUNT_ACQUIRE: # accept acquire model action
turn_action = acquire_action
if cur_speed > 0:
speed_action = 1 # just setting acquire speed to 50 for now
else:
speed_action = 1 # reset acquire speed to 50, as you were stopped
return hunt_action, turn_action, speed_action
def set_pack_action(random_fl, pack_state):
if random_fl:
pack_action = np.random.randint(0, PACK_NUM_OUTPUT)
else:
pack_qval = pack_model.predict(pack_state, batch_size=1)
pack_action = (np.argmax(pack_qval))
return pack_action
def state_frames(new_state, old_state, num_sensor, num_frame):
"""
Takes a state returned from the game and turns it into a multi-frame state.
Create a new array with the new state and first N of old state,
which was the previous frame's new state.
"""
# Turn them back into arrays.
new_state = new_state.tolist()[0]
old_state = old_state.tolist()[0][:num_sensor * (num_frame - 1)]
# Combine them.
combined_state = new_state + old_state
# Re-numpy them on exit.
return np.array([combined_state])
def log_results(filename, data_collect, loss_log):
# Save the results to a file so we can graph it later.
with open('results/sonar-frames/learn_data-' + filename + '.csv', 'w') as data_dump:
wr = csv.writer(data_dump)
wr.writerows(data_collect)
with open('results/sonar-frames/loss_data-' + filename + '.csv', 'w') as lf:
wr = csv.writer(lf)
for loss_item in loss_log:
wr.writerow(loss_item)
def process_minibatch(minibatch, model, num_input, num_output, cur_mode):
print("***** in process minibatch *****")
"""This does the heavy lifting, aka, the training. It's super jacked."""
X_train = []
y_train = []
# Loop through our batch and create arrays for X and y
# so that we can fit our model at every step.
for memory in minibatch:
# break minibatch of replays into component parts
old_state_m, action_m, reward_m, new_state_m = memory
# make a prediction on the prior state.
# this gives you a baseline set of predicted values for each output class.
# these baseline predictions will be "corrected" by swapping in the actual reward recived
# for the action previously selected (i.e., when you asked for the pred n frames before).
if cur_mode == PACK:
print("old state m dims b4:")
print(old_state_m.size)
print("old state m dims aftr1:")
print(np.array(old_state_m))
print("old state m dims aftr:")
old_state_m = np.array(old_state_m)
old_qval = model.predict(old_state_m, batch_size=1)
else:
old_qval = model.predict(old_state_m, batch_size=1)
print("old qval:")
print(old_qval)
# make prediction on the current state
newQ = model.predict(new_state_m, batch_size=1)
print("new qval:")
print(newQ)
# select output option w/ on current state w/ highest predicted value - used in update function
maxQ = np.max(newQ)
print("maxQ")
print(maxQ)
# creat a pseudo Y record with the q-values from the prior state prediction, but...
y = np.zeros((1, num_output)) # was 3.
y[:] = old_qval[:]
# ...calculate a weighted reward based on:
# the quality of result i.e., reward
# the learning rate i.e., gamma
# how certain you are now (?) i.e., maxQ - rationale for this is not obvious...
if reward_m != -500: # non-terminal state
update = (reward_m + (GAMMA * maxQ))
# ...normally you'd see gamma * reward. here we're dealing with actual predicted values (not log probs).
# so, this is up'ing the reward based on an amount that is appropriate to the training stage.
# that is, it will increase as the model improves. this seems to be the sole use of the second prediction.
else: # terminal state
update = reward_m
print("update:")
print(update)
# then associate that reward w/ the chosen action value in the y vect for your old pred.
# right... the reward is associated directly w/ the action value that gave rise to it.
y[0][action_m] = update
# create list of un-flattened x values (e.g., taking [0,n] and giving [num_input, n/num_input])
X_train.append(old_state_m.reshape(num_input,))
# create list of un-flattend y values (e.g., taking [0,n] and giving [num_output, n/num_output])
y_train.append(y.reshape(num_output,))
# do this for every record in the minibatch getting a stack of x's and y's
# then return this for the "fit" step, basically submitting the batch to the neural network for training
# in the case of TURN, AVOID, ACQUIRE and HUNT models, this minbatch (and therefore x/y's) are selected at random
X_train = np.array(X_train)
y_train = np.array(y_train)
return X_train, y_train
def process_pack_batch(minibatch, model, num_frames, num_input, num_output, cur_mode):
X_train = []
y_train = []
# break minibatch into component parts
old_state_m = np.array(minibatch[0])
action_m = minibatch[1]
reward_m = minibatch[2]
new_state_m = np.array(minibatch[3])
# make prediction on the old state
old_qval = model.predict(np.array([old_state_m]), batch_size=1)
# using cross-entropy loss + rms optimizer results in qval probs, not expected values
# make prediction on the current state
newQ = model.predict(np.array([new_state_m]), batch_size=1)
# select output option w/ on current state w/ highest predicted value - used in update function
maxQ = np.max(newQ)
#maxQ = 1
# creat a pseudo Y record with the q-values from the prior state prediction, but...
y = np.zeros((1, num_output)) # was 3.
y[:] = old_qval[:]
y = y * 10 # kludge: translating probs to expected values by multiplying by estimated average reward
# ...calculate a weighted reward based on:
if reward_m != -500: # non-terminal state
update = (reward_m + (GAMMA * maxQ))
# ...normally you'd see gamma * reward. here we're dealing with actual predicted values (not log probs).
# so, this is up'ing the reward based on an amount that is appropriate to the training stage.
# that is, it will increase as the model improves. this seems to be the sole use of the second prediction.
else: # terminal state
update = reward_m
# then associate that reward w/ the chosen action value in the y vect for your old pred.
# right... the reward is associated directly w/ the action value that gave rise to it.
y[0][action_m] = update
# create list of un-flattened x values (e.g., taking [0,n] and giving [num_input, n/num_input])
X_train.append(old_state_m.reshape(num_frames, num_input,))
# create list of un-flattend y values (e.g., taking [0,n] and giving [num_output, n/num_output])
y_train.append(y.reshape(num_output,))
X_train = np.array(X_train)
y_train = np.array(y_train)
return X_train, y_train
def params_to_filename(params):
if len(params['nn']) == 1:
return str(params['nn'][0]) + '-' + str(params['batchSize']) + \
'-' + str(params['buffer'])
elif len(params['nn']) == 2:
return str(params['nn'][0]) + '-' + str(params['nn'][1]) + '-' + \
str(params['batchSize']) + '-' + str(params['buffer'])
elif len(params['nn']) == 3:
return str(params['nn'][0]) + '-' + str(params['nn'][1]) + '-' + \
str(params['nn'][2]) + '-' + str(params['batchSize']) + '-' + str(params['buffer'])
def launch_learn(params):
filename = params_to_filename(params)
print("Trying %s" % filename)
# Make sure we haven't run this one.
if not os.path.isfile('results/sonar-frames/loss_data-' + filename + '.csv'):
# Create file so we don't double test when we run multiple
# instances of the script at the same time.
open('results/sonar-frames/loss_data-' + filename + '.csv', 'a').close()
print("Starting test.")
# Train.
if cur_mode == TURN:
turn_model = turn_net(NUM_INPUT, params['nn'])
train_net(turn_model, 0, params)
elif cur_mode == AVOID:
avoid_model = avoid_net(NUM_INPUT, params['nn'])
train_net(0, avoid_model, params)
else:
print("Already tested.")
Section below loads trained neural nets as required to support training of higher level networks. For networks being trained, it initializes network by calling appropriate neural network schema.
In [ ]:
turn_model, turn_model_30, turn_model_50, turn_model_70, avoid_model, acquire_model, acquire_model_30, acquire_model_50, acquire_model_70, hunt_model, pack_model, params = load_models()
train_net(turn_model, turn_model_30, turn_model_50, turn_model_70, avoid_model, acquire_model, acquire_model_30, acquire_model_50, acquire_model_70, hunt_model, pack_model, params)
In [6]:
def play(turn_model, turn_model_30, turn_model_50, turn_model_70, avoid_model, acquire_model,
acquire_model_30, acquire_model_50, acquire_model_70, hunt_model, pack_model, params):
total_frame_ctr = 0
crash_frame_ctr = 0
replay_frame_ctr = 0
crash_ctr = 0
acquire_ctr = 0
cum_speed = 0
stop_ctr = avoid_ctr = acquire_ctr = 0
new_pack_action = pack_action = START_PACK_ACTION
cur_speeds = []
for i in range(NUM_DRONES): cur_speeds.append(START_SPEED)
# initialize drone state holders
turn_states = np.zeros([NUM_DRONES, TURN_TOTAL_SENSORS * TURN_STATE_FRAMES])
avoid_states = np.zeros([NUM_DRONES, AVOID_TOTAL_SENSORS * AVOID_STATE_FRAMES])
acquire_states = np.zeros([NUM_DRONES, ACQUIRE_TOTAL_SENSORS * ACQUIRE_STATE_FRAMES])
hunt_states = np.zeros([NUM_DRONES, HUNT_TOTAL_SENSORS * HUNT_STATE_FRAMES])
drone_states = np.zeros([NUM_DRONES, DRONE_TOTAL_SENSORS * PACK_STATE_FRAMES])
pack_states = np.zeros([PACK_STATE_FRAMES, PACK_NUM_INPUT])
# create game instance
game_state = GameState()
# get initial state(s)
turn_state, avoid_state, acquire_state, hunt_state, drone_state, reward, cur_speed = \
game_state.frame_step(START_DRONE_ID, START_TURN_ACTION, START_SPEED_ACTION,
START_PACK_ACTION, START_SPEED, START_DISTANCE, 1)
# initialize frame states
if cur_mode in [TURN, AVOID, HUNT, PACK]:
for i in range(NUM_DRONES):
turn_states[i] = state_frames(turn_state, np.zeros((1, TURN_TOTAL_SENSORS * TURN_STATE_FRAMES)),
TURN_TOTAL_SENSORS, TURN_STATE_FRAMES)
if cur_mode in [AVOID, HUNT, PACK]:
for i in range(NUM_DRONES):
avoid_states[i] = state_frames(avoid_state, np.zeros((1, AVOID_TOTAL_SENSORS * AVOID_STATE_FRAMES)),
AVOID_TOTAL_SENSORS, AVOID_STATE_FRAMES)
if cur_mode in [ACQUIRE, HUNT, PACK]:
for i in range(NUM_DRONES):
acquire_states[i] = state_frames(acquire_state, np.zeros((1, ACQUIRE_TOTAL_SENSORS * ACQUIRE_STATE_FRAMES)),
ACQUIRE_TOTAL_SENSORS, ACQUIRE_STATE_FRAMES)
if cur_mode in [HUNT, PACK]:
for i in range(NUM_DRONES):
hunt_states[i] = state_frames(hunt_state, np.zeros((1, HUNT_TOTAL_SENSORS * HUNT_STATE_FRAMES)),
HUNT_TOTAL_SENSORS, HUNT_STATE_FRAMES)
if cur_mode == PACK:
for i in range(PACK_STATE_FRAMES):
pack_states[replay_frame_ctr] = drone_state
# Move.
while True:
total_frame_ctr += 1
crash_frame_ctr += 1
replay_frame_ctr += 1
if total_frame_ctr == 200000:
end
#time.sleep(1)
for drone_id in range(NUM_DRONES): # NUM_DRONES = 1, unless you're in PACK mode
speed_action = START_SPEED_ACTION
if cur_mode != PACK:
pack_action = 0
# choose action
if cur_mode == TURN:
turn_action, active_turn_model = set_turn_action(False, cur_speeds[drone_id],
np.array([turn_states[drone_id]]),
turn_model, turn_model_30,
turn_model_50, turn_model_70)
else:
if cur_mode in [AVOID, HUNT, PACK]:
turn_action, active_turn_model = set_turn_action(False, cur_speeds[drone_id],
np.array([turn_states[drone_id]]),
turn_model, turn_model_30,
turn_model_50, turn_model_70)
if cur_mode == AVOID:
speed_action = set_avoid_action(False, turn_action,
np.array([avoid_states[drone_id]]))
else:
if cur_mode in [HUNT, PACK]:
speed_action = set_avoid_action(False, turn_action,
np.array([avoid_states[drone_id]]))
if cur_mode == ACQUIRE:
acquire_action, active_acquire_model = \
set_acquire_action(False, cur_speeds[drone_id],
np.array([acquire_states[drone_id,]]),
acquire_model, acquire_model_30,
acquire_model_50, acquire_model_70)
turn_action = acquire_action
else:
acquire_action, active_acquire_model = \
set_acquire_action(False, cur_speeds[drone_id],
np.array([acquire_states[drone_id,]]),
acquire_model, acquire_model_30,
acquire_model_50, acquire_model_70)
if cur_mode == HUNT:
hunt_action, turn_action, speed_action = \
set_hunt_action(False, cur_speeds[drone_id], turn_action, speed_action,
acquire_action, np.array([hunt_states[drone_id,]]))
else:
hunt_action, turn_action, speed_action = \
set_hunt_action(False, cur_speeds[drone_id],
turn_action, speed_action, acquire_action,
np.array([hunt_states[drone_id,]]))
if cur_mode == PACK and replay_frame_ctr > PACK_STATE_FRAMES and (replay_frame_ctr - 1) % PACK_STATE_FRAMES == 0 and drone_id == 0: #(total_frame_ctr == 1 or
# get 1 pack action for each set of drones on first drone
new_pack_action = set_pack_action(False, np.array([pack_states[PACK_EVAL_FRAMES:]]))
# pass action, receive new state, reward
new_turn_state, new_avoid_state, new_acquire_state, new_hunt_state, new_drone_state, new_reward, new_speed = \
game_state.frame_step(drone_id, turn_action, speed_action, pack_action,
cur_speeds[drone_id], total_frame_ctr, replay_frame_ctr)
# append (horizontally) historical states for learning speed.
if cur_mode in [TURN, AVOID, HUNT, PACK]:
new_turn_state = state_frames(new_turn_state,
np.array([turn_states[drone_id]]),
TURN_TOTAL_SENSORS,TURN_STATE_FRAMES)
if cur_mode in [AVOID, HUNT, PACK]:
new_avoid_state = state_frames(new_avoid_state,
np.array([avoid_states[drone_id]]),
AVOID_TOTAL_SENSORS, AVOID_STATE_FRAMES)
if cur_mode in [ACQUIRE, HUNT, PACK]:
new_acquire_state = state_frames(new_acquire_state,
np.array([acquire_states[drone_id]]),
ACQUIRE_NUM_SENSOR, ACQUIRE_STATE_FRAMES)
if cur_mode in [HUNT, PACK]:
new_hunt_state = state_frames(new_hunt_state,
np.array([hunt_states[drone_id]]),
HUNT_TOTAL_SENSORS, HUNT_STATE_FRAMES)
if cur_mode == PACK: #and (total_frame_ctr == 1 or replay_frame_ctr % PACK_EVAL_FRAMES == 0):
if drone_id == (NUM_DRONES - 1): # for last drone add pack record
pack_states = np.append(pack_states, new_drone_state, axis = 0)
if pack_states.shape[0] > (2 * PACK_STATE_FRAMES):
pack_states = np.delete(pack_states, 0, 0)
# Update the starting state with S'.
if cur_mode in [TURN, AVOID, HUNT, PACK]:
turn_states[drone_id] = new_turn_state
if cur_mode in [AVOID, HUNT, PACK]:
avoid_states[drone_id] = new_avoid_state
if cur_mode in [ACQUIRE, HUNT, PACK]:
acquire_states[drone_id] = new_acquire_state
if cur_mode in [HUNT, PACK]:
hunt_states[drone_id] = new_hunt_state
if cur_mode == PACK and replay_frame_ctr % PACK_EVAL_FRAMES == 0:
pack_action = new_pack_action
#pack_rwd = new_pack_rwd
#new_pack_rwd = 0
replay_frame_ctr = 0
cur_speeds[drone_id] = new_speed
# give status
if new_reward <= -250:
crash_ctr += 1
print("crashes", crash_ctr, "frames", total_frame_ctr)
elif new_reward > 500:
acquire_ctr += 1
print("acquisitions:", acquire_ctr, "frames", total_frame_ctr)
if total_frame_ctr % 5000 == 0:
print("+++++ total frames:", total_frame_ctr)
print("+++++ frames between crashes:", int(total_frame_ctr / crash_ctr))
if cur_mode in [ACQUIRE, HUNT, PACK]:
print("+++++ frames / acquisition:", int(total_frame_ctr / acquire_ctr))
#stop_ctr = avoid_ctr = acquire_ctr = 0
In [7]:
turn_model, turn_model_30, turn_model_50, turn_model_70, avoid_model, acquire_model, acquire_model_30, acquire_model_50, acquire_model_70, hunt_model, pack_model, params = load_models()
play(turn_model, turn_model_30, turn_model_50, turn_model_70, avoid_model,
acquire_model, acquire_model_30, acquire_model_50, acquire_model_70,
hunt_model, pack_model, params)
In [ ]: