A. Taylan Cemgil

Boğaziçi University, Dept. of Computer Engineering

The animations on this notebook need vpython (http://vpython.org/) installed on your system. For some peculiar reason, the kernel needs to be restarted after every run.

Ball jumping around a circle

A toy example for localization using an Hidden Markov Model. The ball stays put with probability $\pi_0$, jumps with probability $\pi_1$ forward, and is 'kidnapped' with a small probability $\pi_2$. At each step, we observe the color of the current tile.

At every step, the filtering density over the possible locations are shown.


In [1]:
from __future__ import division, print_function
#from math import *
import vpython as vp

import numpy as np
import matplotlib.pylab as plt
import matplotlib as mpl

from itertools import product

# Simulation parameters
RATE = 150
dt = 1./RATE
g_earth = 19.8
T_period = 0.5

showFilteringDensity = True

# Set the scene

# Floor
FloorLenght = 7
FloorHeight = 7
floorYPos = -1.5
floor = vp.box(pos=vp.vector(0,floorYPos,0), length=FloorLenght, height=0.05, width=FloorHeight, color=vp.vector(0.4,0.4,0.4))

# Number of Tiles
L = 40
tile_radius = 0.1
TileCoordinate = [(2*(np.cos(th)+np.sin(th)), 0, 2*(np.cos(th)-np.sin(th)) ) for th in np.linspace(0,2*np.pi,L)]

ColorOfObservation = [vp.color.white, vp.color.blue]
TileColor = []
FilteringDensityCylinder = []
for i in range(L):
    sx, sy, sz = TileCoordinate[i]
    s = vp.vector(sx,sy,sz)
    # Pick a random color for the tile 
    c = np.random.choice(range(len(ColorOfObservation)))
    TileColor.append(c)
    vp.cylinder(pos=s, axis=vp.vector(0,-tile_radius,0), color=ColorOfObservation[c], radius=tile_radius)
    
    if showFilteringDensity:
        s2 = vp.vector(sx,sy+floorYPos-0.1,sz)
        cyl = vp.cylinder(pos=s2, axis=vp.vector(0,0.4,0), radius=tile_radius)
        FilteringDensityCylinder.append(cyl)

        
def Setup_HMM_parameters():
    # Probability of staying on the same tile
    ep = 0.2
    # Probability of making an arbitrary jump
    kidnap = 0.01
    # Probability of correctly observing the tile color
    a = 0.99

    # Set up the transition matrix
    idx = [i for i in range(1,L)]+[0]
    I = np.diag(np.ones(L))
    A = (1-kidnap)*(ep*I + (1-ep)*I[:,idx]) + kidnap*np.ones((L,L))/L
    C = np.zeros((2,L))
    pred = np.ones(L)/L

    for i in range(L):
        C[0,i] = a*(1 - TileColor[i]) + (1-a)*TileColor[i]
        C[1,i] = a*TileColor[i] + (1-a)*(1 - TileColor[i])

    return A, C, pred

A, C, pred = Setup_HMM_parameters()
    
    
Obs_noiseless = vp.sphere(pos=vp.vector(0,floorYPos-0.2,0), color=vp.color.black, radius=tile_radius)
Obs = vp.sphere(pos=vp.vector(0,floorYPos+0.2,0), color=vp.color.black, radius=tile_radius)



nf = mpl.colors.Normalize(vmin=0, vmax=0.7, clip=True)
cmap = plt.cm.ScalarMappable(cmap=plt.cm.hot, norm=nf)
lamp = vp.local_light(pos=vp.vector(5,-4,0), color=vp.color.white)




# Select a random initial state
CurrentState = np.random.choice(range(L))
sx, sy, sz = TileCoordinate[CurrentState]

# Set the initial state of the yellow ball
x0 = vp.vector(sx,sy,sz)
v0 = vp.vector(0,0,0)
YellowBall = vp.sphere(pos=x0, radius=tile_radius, color=vp.color.yellow,
             make_trail=True, interval=1, retain=RATE*T_period)
YellowBall.vel = v0
YellowBall.g = g_earth 
YellowBall.T_period = T_period 
YellowBall.retain = RATE*YellowBall.T_period

FlightCounter = YellowBall.T_period/dt
    


while 1:
    vp.rate (RATE)

    YellowBall.pos = YellowBall.pos + YellowBall.vel*dt
    
    # If hit halfway, dim the observation
    if FlightCounter>= YellowBall.T_period/dt/2.:
        Obs.color = vp.color.black
        Obs_noiseless.color = vp.color.black

    # If arrived, dim the observation
    if FlightCounter>= YellowBall.T_period/dt:
        observation = np.random.choice(range(C.shape[0]), p=C[:,CurrentState])
        o_noiseless = TileColor[CurrentState]
        Obs.color = ColorOfObservation[observation]
        Obs_noiseless.color = ColorOfObservation[o_noiseless]
        
        
        NextState = np.random.choice(range(A.shape[0]), p=A[:,CurrentState])

        pred = C[observation,:]*pred
        pred = pred/np.sum(pred)

        if showFilteringDensity:
            for k in range(L):
                col = cmap.to_rgba(pred[k])
                vcol = vp.vector(col[0], col[1], col[2])
                FilteringDensityCylinder[k].color = vcol
                FilteringDensityCylinder[k].axis=vp.vector(0,pred[k]+0.15,0)

        pred = A.dot(pred)


        ## Plan the jump
        sx, sy, sz = TileCoordinate[CurrentState]
        tx, ty, tz = TileCoordinate[NextState]
        v_vert = YellowBall.g*YellowBall.T_period/2 + (ty-sy)/YellowBall.T_period

        YellowBall.vel = vp.vector((tx-sx)/YellowBall.T_period, v_vert,(tz-sz)/YellowBall.T_period)   
        YellowBall.pos = vp.vector(sx,sy,sz)
        #
        CurrentState = NextState
        FlightCounter = 0
    else:
        YellowBall.vel.y = YellowBall.vel.y - YellowBall.g*dt
        FlightCounter +=1


---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-1-d6ca11575286> in <module>()
    102 
    103 while 1:
--> 104     vp.rate (RATE)
    105 
    106     YellowBall.pos = YellowBall.pos + YellowBall.vel*dt

/Users/cemgil/anaconda/lib/python3.6/site-packages/vpython/vpython.py in __call__(self, N)
    201         self.rval = N
    202         if self.rval < 1: raise ValueError("rate value must be greater than or equal to 1")
--> 203         super(_RateKeeper2, self).__call__(self.rval) ## calls __call__ in rate_control.py
    204 
    205 if sys.version > '3':

/Users/cemgil/anaconda/lib/python3.6/site-packages/vpython/rate_control.py in __call__(self, maxRate)
    205             dt = self.lastSleep + self.calls*(self.userTime + self.callTime + self.delay) + \
    206                  renders*self.renderTime + sleeps*self.interactionPeriod - _clock()
--> 207             _sleep(dt)
    208             self.lastSleep = _clock()
    209             self.calls = 0

/Users/cemgil/anaconda/lib/python3.6/site-packages/vpython/rate_control.py in _sleep(dt)
     47         dtsleep = nticks*_tick
     48         t = _clock()
---> 49         time.sleep(dtsleep)
     50         t = _clock()-t
     51         dt -= t

KeyboardInterrupt: 

Proposal Mechanism of Metropolis Hastings

This demo illustrates a MH chain on a discrete state space. The size of each tile $x$ is proportional to the target probability $\pi(x)$.

The yellow big ball shows the current state $x$ of the chain, and the small gray ball shows the proposed state $x'$ drawn from a proposal density $q(\cdot|x)$. The proposed new state is accepted with probability $$ \min\left\{1, \frac{\pi(x') q(x|x')}{\pi(x) q(x'|x)} \right\} $$


In [1]:
from __future__ import division, print_function
from math import *
import vpython as vp
import numpy as np

import matplotlib.pylab as plt
import matplotlib as mpl

from vpython_utilities import make_grid2D

# Simulation speed
RATE = 300
dt = 1./RATE

W1 = 1
W2 = 6
step = 0.5

n2 = int(W2/step)+1
n1 = int(W1/step)+1
PointList, sub2ind, ind2sub, edges, A = make_grid2D(n2,n1)

Trans = A/np.sum(A, axis=0, keepdims=True)
Trans = Trans.dot(Trans).dot(Trans)
L = len(PointList)

Y = []
for i in range(L):
    p = PointList[i]
    x = p[0]*step-W2/2.
    z = p[2]*step-W1/2.
    E = 2+ np.cos(2*pi*z/3)+np.sin(2*pi*x/5)+np.random.randn()/10.
    y = 2*np.exp(-1.1*E)
    PointList[i] = (x, 0, z)
    Y.append(y)
    
MAX = 1
MIN = 0
nf = mpl.colors.Normalize(vmin=MIN, vmax=MAX, clip=True)
cmap = plt.cm.ScalarMappable(cmap=plt.cm.cool_r, norm=nf)

#floor = box(pos=vector(0,-0.04,0), length=W1, height=0.05, width=W2, color=color.black)

wd = 0.4
radius = 0.2
maxY = max(Y)

for i in range(L):
    sx, sy, sz = PointList[i]
    s = vp.vector(sx,-radius,sz)
    #vp.sphere(pos=s, color=vp.color.cyan, radius=0.1)
    #vcol = vp.vector(0.9,0.9,0.9)
    col = cmap.to_rgba(Y[i]/maxY)
    vcol = vp.vector(col[0], col[1], col[2])
    #vp.cylinder(pos=s, axis=vp.vector(0,-0.1,0), color=vcol, radius=radius*np.sqrt(nf(sy))) 
    
    wd = step*np.sqrt(Y[i]/maxY)
    vp.box(pos=s,length=wd, height=0.05, width=wd, color=vcol )
    #s = vp.vector(sx,(sy-radius)/2.,sz)
    #vp.box(pos=s,length=wd, height=sy-radius, width=wd, color=vcol )

Cur = []

# Cnt[i] is the number of ticks after a new movement has started of the i'th particle
Cnt = []

B = []


g_earth = 49.8
T_period = 0.25

N = 2

for i in range(N):  
    cur = np.random.choice(range(L))
    sx, sy, sz = PointList[cur]
    x0 = vp.vector(sx,sy,sz)
    v0 = vp.vector(0,0,0)
    ball = vp.sphere(pos=x0, radius=radius, color=vp.color.yellow,
                 make_trail=True, interval=1, retain=RATE*T_period)
    ball.vel = v0
    ball.g = g_earth
    ball.T_period = T_period 
    ball.retain = RATE*ball.T_period
    cnt = ball.T_period/dt
    B.append(ball)
    Cur.append(cur)
    Cnt.append(cnt)

lamp = vp.local_light(pos=vp.vector(0,-1,0), color=vp.color.yellow)

    
def selectNextState(cur):
    pr = Trans[:,cur]
    nex = np.random.choice(range(L), p=pr) 
    lw = np.log(Trans[cur, nex]) - np.log(Trans[nex, cur])
    return nex, lw

def planJump(ball, curPos, nexPos):
    sx, sy, sz = curPos
    tx, ty, tz = nexPos
    v_vert = ball.g*ball.T_period/2 + (ty-sy)/ball.T_period                
    vel = vp.vector((tx-sx)/ball.T_period, v_vert,(tz-sz)/ball.T_period)   
    pos = vp.vector(sx,sy,sz)  
    return pos, vel


# Particle index of the Chain
pP = 0
# Particle index of the proposal
pQ = 1
B[pQ].make_trail = False
B[pQ].color = vp.vector(0.6,0.6,0.6)
B[pQ].radius = radius/2

# Is proposal ball moving?
pQmove = True
log_q_ratio = 0

while 1:
    vp.rate (RATE)
    
    B[pQ].pos = B[pQ].pos + B[pQ].vel*dt
    B[pP].pos = B[pP].pos + B[pP].vel*dt
    if Cnt[pQ]>= B[pQ].T_period/dt:
        
        if pQmove:
            accept = np.log(np.random.rand()) < log_q_ratio + np.log(Y[Cur[pQ]]) - np.log(Y[Cur[pP]])

            if accept:
                
                # pP jumps to new location
                B[pP].g = g_earth
                pos, vel = planJump(B[pP], PointList[Cur[pP]], PointList[Cur[pQ]])              
                B[pP].vel = vel 
                B[pP].pos = pos
                
                # pQ stays put
                B[pQ].vel = vp.vector(0,0,0)
                B[pQ].pos.x, B[pQ].pos.y, B[pQ].pos.z = PointList[Cur[pQ]]
                B[pQ].g  = 0

                Cur[pP] = Cur[pQ]
            else:
                # pP jumps vertically
                B[pP].g = g_earth
                pos, vel = planJump(B[pP], PointList[Cur[pP]], PointList[Cur[pP]])  
                B[pP].vel = vel
                B[pP].pos = pos


                # pQ disappears
                B[pQ].visible = False
                B[pQ].g = g_earth/10.
                pos, vel = planJump(B[pQ], PointList[Cur[pQ]], PointList[Cur[pP]])              
                B[pQ].vel = vel 
                B[pQ].pos = pos

                Cur[pQ] = Cur[pP]               
            pQmove = False
        else:
            B[pQ].visible = True

            nex, log_q_ratio = selectNextState(Cur[pP])
            # pP stays put
            B[pP].vel = vp.vector(0,0,0)
            B[pP].pos.x, B[pP].pos.y, B[pP].pos.z = PointList[Cur[pP]]
            B[pP].g  = 0

            # pQ jumps to new location
            B[pQ].g = g_earth/10.
            pos, vel = planJump(B[pQ], PointList[Cur[pP]], PointList[nex])              
            B[pQ].vel = vel 
            B[pQ].pos = pos

            Cur[pQ] = nex               
            
            pQmove = True

                
        Cnt[pP] = 0
        Cnt[pQ] = 0
                
    else:
        B[pP].vel.y = B[pP].vel.y - B[pP].g*dt
        B[pQ].vel.y = B[pQ].vel.y - B[pQ].g*dt
        Cnt[pP] +=1
        Cnt[pQ] +=1


---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-1-cd9b9eeab396> in <module>()
    120 
    121 while 1:
--> 122     vp.rate (RATE)
    123 
    124     B[pQ].pos = B[pQ].pos + B[pQ].vel*dt

/Users/cemgil/anaconda/envs/py35/lib/python3.5/site-packages/vpython/vpython.py in __call__(self, N)
    201         self.rval = N
    202         if self.rval < 1: raise ValueError("rate value must be greater than or equal to 1")
--> 203         super(_RateKeeper2, self).__call__(self.rval) ## calls __call__ in rate_control.py
    204 
    205 if sys.version > '3':

/Users/cemgil/anaconda/envs/py35/lib/python3.5/site-packages/vpython/rate_control.py in __call__(self, maxRate)
    205             dt = self.lastSleep + self.calls*(self.userTime + self.callTime + self.delay) + \
    206                  renders*self.renderTime + sleeps*self.interactionPeriod - _clock()
--> 207             _sleep(dt)
    208             self.lastSleep = _clock()
    209             self.calls = 0

/Users/cemgil/anaconda/envs/py35/lib/python3.5/site-packages/vpython/rate_control.py in _sleep(dt)
     47         dtsleep = nticks*_tick
     48         t = _clock()
---> 49         time.sleep(dtsleep)
     50         t = _clock()-t
     51         dt -= t

KeyboardInterrupt: