Particle Filter Algorithm
In [1]:
import numpy as np
import matplotlib.pyplot as plt
import random
import matplotlib.patches as patches
from scipy.stats import gennorm
from scipy.stats import gamma
%matplotlib inline
In [174]:
def generate_initial_coordinates(side_length=2000, n_pokemon=9):
pokemons = {}
for i in range(n_pokemon):
pokemons[i] = (random.uniform(-side_length/2, side_length/2), random.uniform(-side_length/2, side_length/2))
return pokemons
def distance(coord1, coord2):
return np.sqrt((coord1[0] - coord2[0])**2 + (coord1[1] - coord2[1])**2)
# this is not visible to players
def pokemon_distances(player_coord, pokemons):
return {i: distance(player_coord, coord) for i, coord in pokemons.items()}
# def particle_distances(player_coord, particles):
# return [distance(player_coord, particle) for particle in particles]
def particle_distances(player_coord, particles):
return np.sqrt(np.sum((player_coord - particles)**2, axis=1))
def rank(input):
output = [0] * len(input)
for i, x in enumerate(sorted(range(len(input)), key=lambda y: input[y])):
output[x] = i
return output
# player will be able to see this
# slight bug - we will not be able to see rankings of pokemons that are out of radar radius (but we can fix this later)
def pokemon_rankings(player_coord, pokemons):
dists = pokemon_distances(player_coord, pokemons)
rankings = {}
for i, x in enumerate(sorted(range(len(dists)), key=lambda y: dists[y])):
rankings[x] = i
return rankings
def plot_pokemons(player_coord, pokemons):
plt.figure(figsize=(15,15))
# non-target pokemons
plt.scatter([x - player_coord[0] for x, y in [coord for coord in pokemons.values()]][1:],
[y - player_coord[1] for x, y in [coord for coord in pokemons.values()]][1:])
# target pokemon
plt.scatter([x - player_coord[0] for x, y in [coord for coord in pokemons.values()]][0],
[y - player_coord[1] for x, y in [coord for coord in pokemons.values()]][0],
marker="*", color='red', s=15)
plt.axes().set_aspect(1)
plt.axes().set_xlim((-1100, 1100))
plt.axes().set_ylim((-1100, 1100))
# player
plt.scatter(0, 0 , color='purple', s=15)
# detection radii
dists = {10:'green', 25:'blue', 100:'yellow', 1000:'red'}
for r in dists:
plt.axes().add_patch(plt.Circle((0,0), r, fill=False, color=dists[r]))
plt.show()
def footprint(distance):
if distance < 10:
return 0
elif distance < 25:
return 1
elif distance < 100:
return 2
elif distance < 1000:
return 3
else:
return 'out'
def distance_levels(player_coord, pokemons):
dists = pokemon_distances(player_coord, pokemons)
return {i: footprint(v) for i,v in dists.items()}
def random_particle_generation(side_length=2000, n=1000):
particles = np.ndarray((n, 2))
for i in range(n):
particles[i] = random.uniform(-side_length/2, side_length/2), random.uniform(-side_length/2, side_length/2)
return particles
def plot_particles(player_coord, particles):
plt.figure(figsize=(15,15))
plt.scatter([p[0] - player_coord[0] for p in particles],
[p[1] - player_coord[1] for p in particles], s=1)
plt.axes().set_aspect(1)
plt.axes().set_xlim((-1100, 1100))
plt.axes().set_ylim((-1100, 1100))
# player
plt.scatter(0, 0 , color='purple', s=15)
# detection radii
dists = {10:'green', 25:'blue', 100:'yellow', 1000:'red'}
for r in dists:
plt.axes().add_patch(plt.Circle((0,0), r, fill=False, color=dists[r]))
plt.show()
In [175]:
particles = random_particle_generation(n=3000)
plot_particles((0, 0), particles)
Another source of information we might want to exploit is the true ranking and probability of a particular particle being consistent with the ranking.
Let A, B denote two pokemons. Let A[i] denote a particle of pokemon A and B[j] a paricle of pokemon B. There are other pokemons C, D, E...
The probability that A[i] is consistent with a particular ranking (say B>A>C>...) is:
P(B>A[i] ^ A[i]>C ^ A[i]>D ^ ...) = P(B>A[i]) * P(A[i]>C | B>A[i]) * P(A[i]>D | A[i]>C, B>A[i]) * ...
(Option 1) Using the naive approach, we have:
P(B>A[i] ^ A[i]>C ^ A[i]>D ^ ...) = P(B>A[i]) * P(A[i]>C) * P(A[i]>D) * ...
(Option 2) Assuming the order of other particles are fixed, then the probability can be obtained by only considering immediately neighbouring pokemons, since the other conditional probabilities will be 1.
P(B>A[i] ^ A[i]>C ^ A[i]>D ^ ...) = P(B>A[i]) * P(A[i]>C | B>A[i])
Now use the naive assumption:
P(B>A[i] ^ A[i]>C ^ A[i]>D ^ ...) = P(B>A[i]) * P(A[i]>C)
We may calculate, say P(A[i]>C), as follows:
P(A[i]>C) = sum{P(A[i]>C[j]) * P(C[j])} over all j
P(A[i]>C[j]) is either 1 or 0 depending on the relationship between the distance of A[i] and C[j].
So the probability of a particle being consistent with the true ranking is:
L(A[i]) = sum{P(A[i]<B[j]) * P(B[j])} * sum{P(A[i]>C[j]) * P(C[j])}
Since all the B[j]'s and C[j]'s are already sampled according to their likelihood, we can assume that their densities in different locations already represent the probability of B at that location, and we can assume all P(B[j])'s and P(C[j])'s are equally likely. The expression can be further simplified as:
L(A[i]) = sum{P(A[i]<B[j])} * sum{P(A[i]>C[j])}
Now we have a probability associated with each particle for a given pokemon with which we can perform the resampling.
Whenever an event occurs and the some particles are resampled, we can perform this operation on the particle populations to favour those particles that are more likely to be consistent with the ranking.
In [176]:
distributions = {
0: lambda x: gennorm.pdf(x / 10, 3),
1: lambda x: gennorm.pdf((x - 17.5) / 7.5, 3),
2: lambda x: gennorm.pdf((x - 62.5) / 37.5, 4),
3: lambda x: gennorm.pdf((x - 550) / 430, 6),
'in': lambda x: gamma.pdf((-x + 1100) / (450/6), 1.5),
'out': lambda x: gamma.pdf((x - 900) / (450/6), 1.5),
'invis': np.vectorize(lambda x: 1)
}
# def resample_population(particles, particle_weights):
# return [particles[np.random.choice(range(len(particles)), p=particle_weights / sum(particle_weights))]
# for i in range(len(particles))]
def resample_population(particles, particle_weights):
return particles[np.random.choice(range(len(particles)), size=len(particles), p=particle_weights / sum(particle_weights))]
def resample_all_population(all_particles, all_particle_weights):
return {i: particles[np.random.choice(range(len(particles)), size=len(particles),
p=all_particle_weights[i] / sum(all_particle_weights[i]))]
for i, particles in all_particles.items()}
# def distance_level_weights(particle_distances, distance_level, distributions=distributions):
# particle_weights = list(map(distributions[distance_level], particle_distances))
# return particle_weights
def distance_level_weights(particle_distances, distance_level, distributions=distributions):
return distributions[distance_level](particle_distances)
def all_distance_level_weights(all_particle_distances, all_distance_levels, distributions=distributions):
return {i:distributions[all_distance_levels[i]](distances) for i, distances in all_particle_distances.items()}
def all_particle_distances(player_coord, all_particles):
return {i: particle_distances(player_coord, v) for i,v in all_particles.items()}
def all_ranking_weights(all_particle_distances, rankings):
# get sorted distances
all_sorted_distances = {i: np.sort(distances) for i, distances in all_particle_distances.items()}
# loop through all pokemons
likelihoods = {}
for i, distances in all_particle_distances.items():
# select pokemons in neighbouring rankings
selected_ranking = rankings[i]
if selected_ranking == 0:
prev_id = None
next_id = next((k for k,v in rankings.items() if v == rankings[i] + 1))
elif selected_ranking == len(rankings) - 1:
next_id = None
prev_id = next((k for k,v in rankings.items() if v == rankings[i] - 1))
else:
prev_id = next((k for k,v in rankings.items() if v == rankings[i] - 1))
next_id = next((k for k,v in rankings.items() if v == rankings[i] + 1))
# calculate weights for particle population
if prev_id == None:
prev_probs = 1
else:
# prev_probs = np.array([sum(a > all_particle_distances[prev_id]) for a in all_particle_distances[i]])
prev_probs = np.searchsorted(all_sorted_distances[prev_id], all_particle_distances[i])
if next_id == None:
next_probs = 1
else:
# next_probs = np.array([sum(a < all_particle_distances[next_id]) for a in all_particle_distances[i]])
next_probs = len(distances) - np.searchsorted(all_sorted_distances[next_id], all_particle_distances[i])
likelihoods[i] = prev_probs * next_probs
return likelihoods
In [177]:
new_particles = resample_population(particles, distance_level_weights(particle_distances((0, 0), particles), 3))
plot_particles((0, 0), new_particles)
In [178]:
pokemons = generate_initial_coordinates()
pokemons
Out[178]:
In [179]:
plot_pokemons((0, 0), pokemons)
In [180]:
rankings = pokemon_rankings((0, 0), pokemons)
rankings
Out[180]:
In [181]:
all_particles = {i: random_particle_generation(n=3000) for i in range(len(pokemons))}
len(all_particles)
Out[181]:
In [182]:
distances = all_particle_distances((0, 0), all_particles)
distances
Out[182]:
In [183]:
all_ranking_weights(distances, rankings)
Out[183]:
In [184]:
levels = distance_levels((0, 0), pokemons)
levels
Out[184]:
In [185]:
level_weights = all_distance_level_weights(distances, levels)
level_weights
Out[185]:
In [186]:
new_all_particles = resample_all_population(all_particles, level_weights)
In [187]:
plot_particles((0, 0), new_all_particles[0])
In [188]:
# now we attempt two operations in a row
all_particles = {i: random_particle_generation(n=3000) for i in range(len(pokemons))}
distances = all_particle_distances((0, 0), all_particles)
level_weights = all_distance_level_weights(distances, levels)
new_all_particles = resample_all_population(all_particles, level_weights)
ranking_weights = all_ranking_weights(distances, rankings)
new_all_particles = resample_all_population(all_particles, ranking_weights)
In [192]:
plot_particles((0, 0), new_all_particles[4])
In [ ]: