The animations on this notebook need vpython (http://vpython.org/) installed on your system. For some peculiar reason, the kernel needs to be restarted after every run.
A toy example for localization using an Hidden Markov Model. The ball stays put with probability $\pi_0$, jumps with probability $\pi_1$ forward, and is 'kidnapped' with a small probability $\pi_2$. At each step, we observe the color of the current tile.
At every step, the filtering density over the possible locations are shown.
In [1]:
from __future__ import division, print_function
#from math import *
import vpython as vp
import numpy as np
import matplotlib.pylab as plt
import matplotlib as mpl
from itertools import product
# Simulation parameters
RATE = 150
dt = 1./RATE
g_earth = 19.8
T_period = 0.5
showFilteringDensity = True
# Set the scene
# Floor
FloorLenght = 7
FloorHeight = 7
floorYPos = -1.5
floor = vp.box(pos=vp.vector(0,floorYPos,0), length=FloorLenght, height=0.05, width=FloorHeight, color=vp.vector(0.4,0.4,0.4))
# Number of Tiles
L = 40
tile_radius = 0.1
TileCoordinate = [(2*(np.cos(th)+np.sin(th)), 0, 2*(np.cos(th)-np.sin(th)) ) for th in np.linspace(0,2*np.pi,L)]
ColorOfObservation = [vp.color.white, vp.color.blue]
TileColor = []
FilteringDensityCylinder = []
for i in range(L):
sx, sy, sz = TileCoordinate[i]
s = vp.vector(sx,sy,sz)
# Pick a random color for the tile
c = np.random.choice(range(len(ColorOfObservation)))
TileColor.append(c)
vp.cylinder(pos=s, axis=vp.vector(0,-tile_radius,0), color=ColorOfObservation[c], radius=tile_radius)
if showFilteringDensity:
s2 = vp.vector(sx,sy+floorYPos-0.1,sz)
cyl = vp.cylinder(pos=s2, axis=vp.vector(0,0.4,0), radius=tile_radius)
FilteringDensityCylinder.append(cyl)
def Setup_HMM_parameters():
# Probability of staying on the same tile
ep = 0.2
# Probability of making an arbitrary jump
kidnap = 0.01
# Probability of correctly observing the tile color
a = 0.99
# Set up the transition matrix
idx = [i for i in range(1,L)]+[0]
I = np.diag(np.ones(L))
A = (1-kidnap)*(ep*I + (1-ep)*I[:,idx]) + kidnap*np.ones((L,L))/L
C = np.zeros((2,L))
pred = np.ones(L)/L
for i in range(L):
C[0,i] = a*(1 - TileColor[i]) + (1-a)*TileColor[i]
C[1,i] = a*TileColor[i] + (1-a)*(1 - TileColor[i])
return A, C, pred
A, C, pred = Setup_HMM_parameters()
Obs_noiseless = vp.sphere(pos=vp.vector(0,floorYPos-0.2,0), color=vp.color.black, radius=tile_radius)
Obs = vp.sphere(pos=vp.vector(0,floorYPos+0.2,0), color=vp.color.black, radius=tile_radius)
nf = mpl.colors.Normalize(vmin=0, vmax=0.7, clip=True)
cmap = plt.cm.ScalarMappable(cmap=plt.cm.hot, norm=nf)
lamp = vp.local_light(pos=vp.vector(5,-4,0), color=vp.color.white)
# Select a random initial state
CurrentState = np.random.choice(range(L))
sx, sy, sz = TileCoordinate[CurrentState]
# Set the initial state of the yellow ball
x0 = vp.vector(sx,sy,sz)
v0 = vp.vector(0,0,0)
YellowBall = vp.sphere(pos=x0, radius=tile_radius, color=vp.color.yellow,
make_trail=True, interval=1, retain=RATE*T_period)
YellowBall.vel = v0
YellowBall.g = g_earth
YellowBall.T_period = T_period
YellowBall.retain = RATE*YellowBall.T_period
FlightCounter = YellowBall.T_period/dt
while 1:
vp.rate (RATE)
YellowBall.pos = YellowBall.pos + YellowBall.vel*dt
# If hit halfway, dim the observation
if FlightCounter>= YellowBall.T_period/dt/2.:
Obs.color = vp.color.black
Obs_noiseless.color = vp.color.black
# If arrived, dim the observation
if FlightCounter>= YellowBall.T_period/dt:
observation = np.random.choice(range(C.shape[0]), p=C[:,CurrentState])
o_noiseless = TileColor[CurrentState]
Obs.color = ColorOfObservation[observation]
Obs_noiseless.color = ColorOfObservation[o_noiseless]
NextState = np.random.choice(range(A.shape[0]), p=A[:,CurrentState])
pred = C[observation,:]*pred
pred = pred/np.sum(pred)
if showFilteringDensity:
for k in range(L):
col = cmap.to_rgba(pred[k])
vcol = vp.vector(col[0], col[1], col[2])
FilteringDensityCylinder[k].color = vcol
FilteringDensityCylinder[k].axis=vp.vector(0,pred[k]+0.15,0)
pred = A.dot(pred)
## Plan the jump
sx, sy, sz = TileCoordinate[CurrentState]
tx, ty, tz = TileCoordinate[NextState]
v_vert = YellowBall.g*YellowBall.T_period/2 + (ty-sy)/YellowBall.T_period
YellowBall.vel = vp.vector((tx-sx)/YellowBall.T_period, v_vert,(tz-sz)/YellowBall.T_period)
YellowBall.pos = vp.vector(sx,sy,sz)
#
CurrentState = NextState
FlightCounter = 0
else:
YellowBall.vel.y = YellowBall.vel.y - YellowBall.g*dt
FlightCounter +=1
This demo illustrates a MH chain on a discrete state space. The size of each tile $x$ is proportional to the target probability $\pi(x)$.
The yellow big ball shows the current state $x$ of the chain, and the small gray ball shows the proposed state $x'$ drawn from a proposal density $q(\cdot|x)$. The proposed new state is accepted with probability $$ \min\left\{1, \frac{\pi(x') q(x|x')}{\pi(x) q(x'|x)} \right\} $$
In [1]:
from __future__ import division, print_function
from math import *
import vpython as vp
import numpy as np
import matplotlib.pylab as plt
import matplotlib as mpl
from vpython_utilities import make_grid2D
# Simulation speed
RATE = 300
dt = 1./RATE
W1 = 1
W2 = 6
step = 0.5
n2 = int(W2/step)+1
n1 = int(W1/step)+1
PointList, sub2ind, ind2sub, edges, A = make_grid2D(n2,n1)
Trans = A/np.sum(A, axis=0, keepdims=True)
Trans = Trans.dot(Trans).dot(Trans)
L = len(PointList)
Y = []
for i in range(L):
p = PointList[i]
x = p[0]*step-W2/2.
z = p[2]*step-W1/2.
E = 2+ np.cos(2*pi*z/3)+np.sin(2*pi*x/5)+np.random.randn()/10.
y = 2*np.exp(-1.1*E)
PointList[i] = (x, 0, z)
Y.append(y)
MAX = 1
MIN = 0
nf = mpl.colors.Normalize(vmin=MIN, vmax=MAX, clip=True)
cmap = plt.cm.ScalarMappable(cmap=plt.cm.cool_r, norm=nf)
#floor = box(pos=vector(0,-0.04,0), length=W1, height=0.05, width=W2, color=color.black)
wd = 0.4
radius = 0.2
maxY = max(Y)
for i in range(L):
sx, sy, sz = PointList[i]
s = vp.vector(sx,-radius,sz)
#vp.sphere(pos=s, color=vp.color.cyan, radius=0.1)
#vcol = vp.vector(0.9,0.9,0.9)
col = cmap.to_rgba(Y[i]/maxY)
vcol = vp.vector(col[0], col[1], col[2])
#vp.cylinder(pos=s, axis=vp.vector(0,-0.1,0), color=vcol, radius=radius*np.sqrt(nf(sy)))
wd = step*np.sqrt(Y[i]/maxY)
vp.box(pos=s,length=wd, height=0.05, width=wd, color=vcol )
#s = vp.vector(sx,(sy-radius)/2.,sz)
#vp.box(pos=s,length=wd, height=sy-radius, width=wd, color=vcol )
Cur = []
# Cnt[i] is the number of ticks after a new movement has started of the i'th particle
Cnt = []
B = []
g_earth = 49.8
T_period = 0.25
N = 2
for i in range(N):
cur = np.random.choice(range(L))
sx, sy, sz = PointList[cur]
x0 = vp.vector(sx,sy,sz)
v0 = vp.vector(0,0,0)
ball = vp.sphere(pos=x0, radius=radius, color=vp.color.yellow,
make_trail=True, interval=1, retain=RATE*T_period)
ball.vel = v0
ball.g = g_earth
ball.T_period = T_period
ball.retain = RATE*ball.T_period
cnt = ball.T_period/dt
B.append(ball)
Cur.append(cur)
Cnt.append(cnt)
lamp = vp.local_light(pos=vp.vector(0,-1,0), color=vp.color.yellow)
def selectNextState(cur):
pr = Trans[:,cur]
nex = np.random.choice(range(L), p=pr)
lw = np.log(Trans[cur, nex]) - np.log(Trans[nex, cur])
return nex, lw
def planJump(ball, curPos, nexPos):
sx, sy, sz = curPos
tx, ty, tz = nexPos
v_vert = ball.g*ball.T_period/2 + (ty-sy)/ball.T_period
vel = vp.vector((tx-sx)/ball.T_period, v_vert,(tz-sz)/ball.T_period)
pos = vp.vector(sx,sy,sz)
return pos, vel
# Particle index of the Chain
pP = 0
# Particle index of the proposal
pQ = 1
B[pQ].make_trail = False
B[pQ].color = vp.vector(0.6,0.6,0.6)
B[pQ].radius = radius/2
# Is proposal ball moving?
pQmove = True
log_q_ratio = 0
while 1:
vp.rate (RATE)
B[pQ].pos = B[pQ].pos + B[pQ].vel*dt
B[pP].pos = B[pP].pos + B[pP].vel*dt
if Cnt[pQ]>= B[pQ].T_period/dt:
if pQmove:
accept = np.log(np.random.rand()) < log_q_ratio + np.log(Y[Cur[pQ]]) - np.log(Y[Cur[pP]])
if accept:
# pP jumps to new location
B[pP].g = g_earth
pos, vel = planJump(B[pP], PointList[Cur[pP]], PointList[Cur[pQ]])
B[pP].vel = vel
B[pP].pos = pos
# pQ stays put
B[pQ].vel = vp.vector(0,0,0)
B[pQ].pos.x, B[pQ].pos.y, B[pQ].pos.z = PointList[Cur[pQ]]
B[pQ].g = 0
Cur[pP] = Cur[pQ]
else:
# pP jumps vertically
B[pP].g = g_earth
pos, vel = planJump(B[pP], PointList[Cur[pP]], PointList[Cur[pP]])
B[pP].vel = vel
B[pP].pos = pos
# pQ disappears
B[pQ].visible = False
B[pQ].g = g_earth/10.
pos, vel = planJump(B[pQ], PointList[Cur[pQ]], PointList[Cur[pP]])
B[pQ].vel = vel
B[pQ].pos = pos
Cur[pQ] = Cur[pP]
pQmove = False
else:
B[pQ].visible = True
nex, log_q_ratio = selectNextState(Cur[pP])
# pP stays put
B[pP].vel = vp.vector(0,0,0)
B[pP].pos.x, B[pP].pos.y, B[pP].pos.z = PointList[Cur[pP]]
B[pP].g = 0
# pQ jumps to new location
B[pQ].g = g_earth/10.
pos, vel = planJump(B[pQ], PointList[Cur[pP]], PointList[nex])
B[pQ].vel = vel
B[pQ].pos = pos
Cur[pQ] = nex
pQmove = True
Cnt[pP] = 0
Cnt[pQ] = 0
else:
B[pP].vel.y = B[pP].vel.y - B[pP].g*dt
B[pQ].vel.y = B[pQ].vel.y - B[pQ].g*dt
Cnt[pP] +=1
Cnt[pQ] +=1