Particle Filter Algorithm

  1. (sim) initialise pokemon positions
  2. get initial pokemon distance level and rankings
  3. create random particles for each pokemon on radar
  4. assign selection probabilities to each particle based on the respective distance levels and rankings of that pokemon
  5. resample the particle population of each pokemon based on the selection probabilities obtained in step 4
  6. calculate the weighed average location of target pokemon
  7. start moving in the direction of the weighed average location of target pokemon
  8. listen for distance level or ranking change events. On event, halt moving and:
    1. re-calculate particle selection probabilities of affected pokemon(s)
    2. resample the particle population of affected pokemon(s)
    3. re-calculate weighed average location of target pokemon
    4. start moving in the direction obtained in the previous step and resume listening for events
  9. program terminates when the target pokemon is within zero-footprint distance

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import random
import matplotlib.patches as patches
from scipy.stats import gennorm
from scipy.stats import gamma
%matplotlib inline

In [174]:
def generate_initial_coordinates(side_length=2000, n_pokemon=9):
    pokemons = {}
    for i in range(n_pokemon):
        pokemons[i] = (random.uniform(-side_length/2, side_length/2), random.uniform(-side_length/2, side_length/2))
    return pokemons

def distance(coord1, coord2):
    return np.sqrt((coord1[0] - coord2[0])**2 + (coord1[1] - coord2[1])**2)

# this is not visible to players
def pokemon_distances(player_coord, pokemons):
    return {i: distance(player_coord, coord) for i, coord in pokemons.items()}

# def particle_distances(player_coord, particles):
#     return [distance(player_coord, particle) for particle in particles]

def particle_distances(player_coord, particles):
    return np.sqrt(np.sum((player_coord - particles)**2, axis=1))

def rank(input):
    output = [0] * len(input)
    for i, x in enumerate(sorted(range(len(input)), key=lambda y: input[y])):
        output[x] = i
    return output

# player will be able to see this
# slight bug - we will not be able to see rankings of pokemons that are out of radar radius (but we can fix this later)
def pokemon_rankings(player_coord, pokemons):
    dists = pokemon_distances(player_coord, pokemons)
    rankings = {}
    for i, x in enumerate(sorted(range(len(dists)), key=lambda y: dists[y])):
        rankings[x] = i
    return rankings

def plot_pokemons(player_coord, pokemons):
    plt.figure(figsize=(15,15))
    # non-target pokemons
    plt.scatter([x - player_coord[0] for x, y in [coord for coord in pokemons.values()]][1:], 
                [y - player_coord[1] for x, y in [coord for coord in pokemons.values()]][1:])
    # target pokemon
    plt.scatter([x - player_coord[0] for x, y in [coord for coord in pokemons.values()]][0], 
                [y - player_coord[1] for x, y in [coord for coord in pokemons.values()]][0],
               marker="*", color='red', s=15)
    plt.axes().set_aspect(1)
    plt.axes().set_xlim((-1100, 1100))
    plt.axes().set_ylim((-1100, 1100))
    # player
    plt.scatter(0, 0 , color='purple', s=15)
    # detection radii
    dists = {10:'green', 25:'blue', 100:'yellow', 1000:'red'}
    for r in dists:
        plt.axes().add_patch(plt.Circle((0,0), r, fill=False, color=dists[r]))
    plt.show()
    
def footprint(distance):
    if distance < 10:
        return 0
    elif distance < 25:
        return 1
    elif distance < 100:
        return 2
    elif distance < 1000:
        return 3
    else:
        return 'out'

def distance_levels(player_coord, pokemons):
    dists = pokemon_distances(player_coord, pokemons)
    return {i: footprint(v) for i,v in dists.items()}

def random_particle_generation(side_length=2000, n=1000):
    particles = np.ndarray((n, 2))
    for i in range(n):
        particles[i] = random.uniform(-side_length/2, side_length/2), random.uniform(-side_length/2, side_length/2)
    return particles

def plot_particles(player_coord, particles):
    plt.figure(figsize=(15,15))
    plt.scatter([p[0] - player_coord[0] for p in particles], 
                [p[1] - player_coord[1] for p in particles], s=1)
    plt.axes().set_aspect(1)
    plt.axes().set_xlim((-1100, 1100))
    plt.axes().set_ylim((-1100, 1100))
    # player
    plt.scatter(0, 0 , color='purple', s=15)
    # detection radii
    dists = {10:'green', 25:'blue', 100:'yellow', 1000:'red'}
    for r in dists:
        plt.axes().add_patch(plt.Circle((0,0), r, fill=False, color=dists[r]))
    plt.show()

In [175]:
particles = random_particle_generation(n=3000)
plot_particles((0, 0), particles)


Another source of information we might want to exploit is the true ranking and probability of a particular particle being consistent with the ranking.

Let A, B denote two pokemons. Let A[i] denote a particle of pokemon A and B[j] a paricle of pokemon B. There are other pokemons C, D, E...

The probability that A[i] is consistent with a particular ranking (say B>A>C>...) is:

P(B>A[i] ^ A[i]>C ^ A[i]>D ^ ...) = P(B>A[i]) * P(A[i]>C | B>A[i]) * P(A[i]>D | A[i]>C, B>A[i]) * ...

(Option 1) Using the naive approach, we have:

P(B>A[i] ^ A[i]>C ^ A[i]>D ^ ...) = P(B>A[i]) * P(A[i]>C) * P(A[i]>D) * ...

(Option 2) Assuming the order of other particles are fixed, then the probability can be obtained by only considering immediately neighbouring pokemons, since the other conditional probabilities will be 1.

P(B>A[i] ^ A[i]>C ^ A[i]>D ^ ...) = P(B>A[i]) * P(A[i]>C | B>A[i])

Now use the naive assumption:

P(B>A[i] ^ A[i]>C ^ A[i]>D ^ ...) = P(B>A[i]) * P(A[i]>C)

We may calculate, say P(A[i]>C), as follows:

P(A[i]>C) = sum{P(A[i]>C[j]) * P(C[j])} over all j

P(A[i]>C[j]) is either 1 or 0 depending on the relationship between the distance of A[i] and C[j].

So the probability of a particle being consistent with the true ranking is:

L(A[i]) = sum{P(A[i]<B[j]) * P(B[j])} * sum{P(A[i]>C[j]) * P(C[j])}

Since all the B[j]'s and C[j]'s are already sampled according to their likelihood, we can assume that their densities in different locations already represent the probability of B at that location, and we can assume all P(B[j])'s and P(C[j])'s are equally likely. The expression can be further simplified as:

L(A[i]) = sum{P(A[i]<B[j])} * sum{P(A[i]>C[j])}

Now we have a probability associated with each particle for a given pokemon with which we can perform the resampling.

Whenever an event occurs and the some particles are resampled, we can perform this operation on the particle populations to favour those particles that are more likely to be consistent with the ranking.


In [176]:
distributions = {
    0: lambda x: gennorm.pdf(x / 10, 3),
    1: lambda x: gennorm.pdf((x - 17.5) / 7.5, 3),
    2: lambda x: gennorm.pdf((x - 62.5) / 37.5, 4),
    3: lambda x: gennorm.pdf((x - 550) / 430, 6),
    'in': lambda x: gamma.pdf((-x + 1100) / (450/6), 1.5),
    'out': lambda x: gamma.pdf((x - 900) / (450/6), 1.5),
    'invis': np.vectorize(lambda x: 1)
}

# def resample_population(particles, particle_weights):
#     return [particles[np.random.choice(range(len(particles)), p=particle_weights / sum(particle_weights))] 
#             for i in range(len(particles))]

def resample_population(particles, particle_weights):
    return particles[np.random.choice(range(len(particles)), size=len(particles), p=particle_weights / sum(particle_weights))]

def resample_all_population(all_particles, all_particle_weights):
    return {i: particles[np.random.choice(range(len(particles)), size=len(particles), 
                                          p=all_particle_weights[i] / sum(all_particle_weights[i]))]
            for i, particles in all_particles.items()}

# def distance_level_weights(particle_distances, distance_level, distributions=distributions):
#     particle_weights = list(map(distributions[distance_level], particle_distances))
#     return particle_weights

def distance_level_weights(particle_distances, distance_level, distributions=distributions):
    return distributions[distance_level](particle_distances)

def all_distance_level_weights(all_particle_distances, all_distance_levels, distributions=distributions):
    return {i:distributions[all_distance_levels[i]](distances) for i, distances in all_particle_distances.items()}

def all_particle_distances(player_coord, all_particles):
    return {i: particle_distances(player_coord, v) for i,v in all_particles.items()}

def all_ranking_weights(all_particle_distances, rankings):
    # get sorted distances
    all_sorted_distances = {i: np.sort(distances) for i, distances in all_particle_distances.items()}
    # loop through all pokemons
    likelihoods = {}
    for i, distances in all_particle_distances.items():
        # select pokemons in neighbouring rankings
        selected_ranking = rankings[i]
        if selected_ranking == 0:
            prev_id = None
            next_id = next((k for k,v in rankings.items() if v == rankings[i] + 1))
        elif selected_ranking == len(rankings) - 1:
            next_id = None
            prev_id = next((k for k,v in rankings.items() if v == rankings[i] - 1))
        else:
            prev_id = next((k for k,v in rankings.items() if v == rankings[i] - 1))
            next_id = next((k for k,v in rankings.items() if v == rankings[i] + 1))
        # calculate weights for particle population
        if prev_id == None:
            prev_probs = 1
        else:
#             prev_probs = np.array([sum(a > all_particle_distances[prev_id]) for a in all_particle_distances[i]])
            prev_probs = np.searchsorted(all_sorted_distances[prev_id], all_particle_distances[i])
        if next_id == None:
            next_probs = 1
        else: 
#             next_probs = np.array([sum(a < all_particle_distances[next_id]) for a in all_particle_distances[i]])
            next_probs = len(distances) - np.searchsorted(all_sorted_distances[next_id], all_particle_distances[i])
        likelihoods[i] = prev_probs * next_probs
    return likelihoods

In [177]:
new_particles = resample_population(particles, distance_level_weights(particle_distances((0, 0), particles), 3))
plot_particles((0, 0), new_particles)



In [178]:
pokemons = generate_initial_coordinates()
pokemons


Out[178]:
{0: (-772.9806886650117, -509.93598438635115),
 1: (-140.10443514849499, 895.707918903101),
 2: (-232.4219408802901, 646.4731724818166),
 3: (-949.1309023940297, -447.3961599411987),
 4: (976.6467784608853, -883.9330437238824),
 5: (-890.4584521460903, 385.3859287359335),
 6: (947.7813291560144, -279.10650269982295),
 7: (-282.15032037714275, 412.47682192911634),
 8: (791.8428637914078, -48.08601742724636)}

In [179]:
plot_pokemons((0, 0), pokemons)



In [180]:
rankings = pokemon_rankings((0, 0), pokemons)
rankings


Out[180]:
{0: 4, 1: 3, 2: 1, 3: 7, 4: 8, 5: 5, 6: 6, 7: 0, 8: 2}

In [181]:
all_particles = {i: random_particle_generation(n=3000) for i in range(len(pokemons))}
len(all_particles)


Out[181]:
9

In [182]:
distances = all_particle_distances((0, 0), all_particles)
distances


Out[182]:
{0: array([ 1184.15763146,   771.98734481,   953.84611063, ...,   862.74085589,
          377.51690239,   745.96390401]),
 1: array([  322.73560567,   389.84511051,  1206.20454622, ...,  1054.73071948,
         1075.03095082,   105.50720209]),
 2: array([ 1008.87637146,  1210.5405809 ,  1315.72246026, ...,  1091.79805286,
         1238.46830185,   409.61255238]),
 3: array([ 912.82650362,  711.34206493,  549.52046946, ...,  749.83760818,
         915.23584719,  639.53203074]),
 4: array([ 380.95385956,  545.040755  ,  150.82692133, ...,  882.75756594,
         725.37126283,    5.32294453]),
 5: array([ 1182.86810491,   653.47145983,   929.22737468, ...,  1271.75069262,
          894.03916966,  1176.60060833]),
 6: array([ 991.04226717,  346.19234567,  964.17867807, ...,  384.43492705,
         510.88932901,  424.50716327]),
 7: array([ 1148.74742559,   614.86565899,   517.13691216, ...,   477.75395769,
          300.41422209,   827.51631873]),
 8: array([ 1035.98618792,   612.30759489,   717.89804097, ...,   348.37702195,
          656.68294458,   692.34973866])}

In [183]:
all_ranking_weights(distances, rankings)


Out[183]:
{0: array([ 467940, 2191440, 1774046, ..., 2119740,  901784, 2229255], dtype=int64),
 1: array([ 676856,  955502,  412090, ..., 1194456, 1082848,   80352], dtype=int64),
 2: array([1469604,  414990,   95008, ...,  919800,  325024, 1039200], dtype=int64),
 3: array([2104861, 2159784, 1636325, ..., 2263343, 2094298, 1943814], dtype=int64),
 4: array([ 307,  670,   44, ..., 1833, 1232,    0], dtype=int64),
 5: array([ 523776, 2042636, 1963624, ...,  170230, 2104480,  558792], dtype=int64),
 6: array([1586823,  756800, 1769964, ...,  908544, 1456840, 1076968], dtype=int64),
 7: array([ 238, 2104, 2352, ..., 2460, 2794, 1412], dtype=int64),
 8: array([1323410, 1864144, 2126338, ...,  761964, 1962961, 2060160], dtype=int64)}

In [184]:
levels = distance_levels((0, 0), pokemons)
levels


Out[184]:
{0: 3, 1: 3, 2: 3, 3: 'out', 4: 'out', 5: 3, 6: 3, 7: 3, 8: 3}

In [185]:
level_weights = all_distance_level_weights(distances, levels)
level_weights


Out[185]:
{0: array([  1.83275011e-05,   5.28849444e-01,   2.71342115e-01, ...,
          4.64806928e-01,   5.36715747e-01,   5.34149315e-01]),
 1: array([  5.27336091e-01,   5.37519280e-01,   1.76256339e-06, ...,
          3.94161830e-02,   1.96100983e-02,   1.59108142e-01]),
 2: array([  1.23062473e-01,   1.05935895e-06,   7.63976888e-15, ...,
          9.85733921e-03,   2.60311363e-08,   5.38303804e-01]),
 3: array([ 0.3932833 ,  0.        ,  0.        , ...,  0.        ,
         0.41508127,  0.        ]),
 4: array([ 0.,  0.,  0., ...,  0.,  0.,  0.]),
 5: array([  2.07656257e-05,   5.38851484e-01,   3.36670297e-01, ...,
          1.04669022e-10,   4.14598668e-01,   3.74307207e-05]),
 6: array([ 0.1682278 ,  0.53288027,  0.24251393, ...,  0.53720284,
         0.5389558 ,  0.5386232 ]),
 7: array([  3.68216168e-04,   5.38949756e-01,   5.38955999e-01, ...,
          5.38943983e-01,   5.18735946e-01,   5.01383057e-01]),
 8: array([ 0.06705181,  0.53895112,  0.53704956, ...,  0.5332587 ,
         0.53883043,  0.53824719])}

In [186]:
new_all_particles = resample_all_population(all_particles, level_weights)

In [187]:
plot_particles((0, 0), new_all_particles[0])



In [188]:
# now we attempt two operations in a row
all_particles = {i: random_particle_generation(n=3000) for i in range(len(pokemons))}
distances = all_particle_distances((0, 0), all_particles)
level_weights = all_distance_level_weights(distances, levels)
new_all_particles = resample_all_population(all_particles, level_weights)
ranking_weights = all_ranking_weights(distances, rankings)
new_all_particles = resample_all_population(all_particles, ranking_weights)

In [192]:
plot_particles((0, 0), new_all_particles[4])



In [ ]: