DMP tutorial

Introduction

Dynamical movement primitives are dynamical systems that provide a means of robust, generalizable trajectory generation. I give an overview of their origins formally on my blog (https://studywolf.wordpress.com/category/robotics/dynamic-movement-primitive/) and here we'll do a quick overview plus look at their implementation in neurons.

They have two forms, discrete and rhythmic, and in this tutorial we'll be looking at using DMPs for rhythmic pattern generation.

Basics

There are two main parts to DMPs, the point attractors and the forcing function. For each degree-of-freedom that you would like to generate a pattern for a separate point attractor is required. In this notebook we'll be looking at generating 2D patterns, so we'll need two point attractors. Let's look at the code for generating point attractors!


In [ ]:
import numpy as np

import nengo

model = nengo.Network()
with model: 
    # linearly increasing system with an oscillatory biased input
    ramp_osc = nengo.Ensemble(n_neurons=500, dimensions=2, radius=.01)
    # recurrent connections
    nengo.Connection(ramp_osc, ramp_osc, 
                 transform=np.eye(2) + \
                            np.array([[1, -1],
                                      [1, 1]]))

    # set the number of neurons = to the number of basis functions specified
    ramp = nengo.Ensemble(n_neurons=500, dimensions=1)

    # make first dimensions of forcing function ensemble an integrator
    nengo.Connection(ramp, ramp, synapse=.1)

    # set up the input to the integrating first dimensions 
    nengo.Connection(ramp_osc, ramp, 
                 transform=.015, 
                 function=lambda x: x[0]+.5)
    
from nengo_gui.ipython import IPythonViz
IPythonViz(model, cfg='ramp.viz.cfg')

In [ ]:
def gen_oscillator(model, speed=.05):
    with model:
        # ------------------ Oscillator -------------------
        osc = nengo.Ensemble(n_neurons=500, dimensions=2, label='oscillator')
        # recurrent connections
        nengo.Connection(osc, osc,
                          transform=np.eye(2) + \
                                    np.array([[1, -1],
                                              [1, 1]]) * speed)
        return osc
    
import numpy as np
import nengo

model = nengo.Network('Oscillator')
with m: 
    osc = gen_oscillator(m, speed=.05)
    output = nengo.Ensemble(1, 1, neuron_type=nengo.Direct())
     
    nengo.Connection(osc, output, 
                     function=lambda x: np.arctan2(x[0], x[1]))
        
from nengo_gui.ipython import IPythonViz
IPythonViz(m, cfg='osc.viz.cfg')

The point attractor


In [ ]:
def gen_point_attractor(model, goal, n_neurons=200, alpha=10, beta=10/4.):
    # create an ensemble with point attractor dynamics
    synapse = 1
    with model:
        # set up two integrators to represent y and dy
        y = nengo.Ensemble(n_neurons=n_neurons, dimensions=1, radius=1.5, label='y')
        dy = nengo.Ensemble(n_neurons=n_neurons, dimensions=1, radius=5, label='dy')
        nengo.Connection(y, y, synapse=synapse)
        nengo.Connection(dy, dy, synapse=synapse)
        
        nengo.Connection(dy, y, synapse=synapse)
        # implement ddy = alpha * (beta * (goal - y) - dy)
        nengo.Connection(goal, dy, transform=alpha*beta, synapse=synapse)
        nengo.Connection(y, dy, transform=-alpha*beta, synapse=synapse)
        nengo.Connection(dy, dy, transform=-alpha, synapse=synapse)
        
        return y,dy

In [ ]:
m = nengo.Network('Point attractor')
with m: 

    # --------------------- Input --------------------------
    goal = nengo.Node(output=[.8, -.8])
    
    # ------------------- Point Attractors --------------------
    y1,dy1 = gen_point_attractor(m, goal[0])
    y2,dy2 = gen_point_attractor(m, goal[1])
    
    # ------------------ Combine output ----------------------
    combine = nengo.Ensemble(n_neurons=500, dimensions=2, radius=np.sqrt(2))
    
    nengo.Connection(y1[0], combine[0], synapse=.01)
    nengo.Connection(y2[0], combine[1], synapse=.01)

from nengo_gui.ipython import IPythonViz
IPythonViz(m, cfg='point_attractor.viz.cfg')

The forcing function

The second part of DMPs is the forcing function. For rhythmic DMPs we use an oscillator, and from that oscillator we'll decode a function. Let's look at how to program an oscillator and decode a function from it.

First we'll generate the function we'd like from some arbitrary trajectory (can be anything!):


In [ ]:
import numpy as np
from scipy import interpolate

# our desired path
heart_path = np.load('heart_traj.npz')['arr_0'][:,0] * 10
    
# generate range of values to assign our desired path to
x = np.linspace(-np.pi, np.pi, len(heart_path))
# generate function to interpolate the desired trajectory
path_gen = interpolate.interp1d(x, heart_path / 10.0)

import matplotlib.pyplot as plt
%matplotlib inline
plt.plot(np.linspace(-np.pi, np.pi, len(heart_path)), 
         path_gen(np.linspace(-np.pi, np.pi, len(heart_path))))
plt.show()

In [ ]:
import numpy as np

import nengo

m = nengo.Network('Oscillator')
with m: 
    osc = gen_oscillator(m, speed=.05)
    output1 = nengo.Ensemble(n_neurons=1, dimensions=1, neuron_type=nengo.Direct())
    output2 = nengo.Ensemble(n_neurons=1, dimensions=1, neuron_type=nengo.Direct())
    
    # decode out a rhythmic path from our oscillator
    def force(x, function, gain=1):
        # calculate the angle theta
        theta = np.arctan2(x[0], x[1])
        # decode our function off of the theta value
        return function(theta) * gain
    nengo.Connection(osc, output1, function=lambda x: force(x, path_gen, -1))
    nengo.Connection(osc, output2, function=lambda x: force(x, path_gen))

from nengo_gui.ipython import IPythonViz
IPythonViz(m, cfg='oscillator.viz.cfg')

Combining point attractor and forcing function

Now that we can generate point attractors and decode rhythmic patterns off of oscillators, we have to put them together.

Note that we want to generate a set of forces off of the oscillator, rather than a set of positions. So the function that we want to decode off of the oscillator can be calculated from the desired position trajectory by finding the desired acceleration trajectory, and subtracting out the effects of the point attractors.

Once we have this function, we can simply connect the decoded oscillator output to the point attractory dynamics!


In [ ]:
import numpy as np
from scipy import interpolate

def gen_forcing_functions(y_des, dt=.001, alpha=10, beta=10/4.):
        
        # scale our trajectory and find the center point
        y_des = y_des.T / 1e5
        goal = np.sum(y_des, axis=1) / y_des.shape[1]
                
        # interpolate our desired trajectory to smooth out the sampling
        num_samples = 10
        path = np.zeros((y_des.shape[0], num_samples))
        x = np.linspace(-np.pi, np.pi, y_des.shape[1])
        for d in range(y_des.shape[0]):
            path_gen = interpolate.interp1d(x, y_des[d])
            for ii,t in enumerate(np.linspace(-np.pi, np.pi, num_samples)):
                path[d, ii] = path_gen(t)
        y_des = path
    
        # calculate velocity of y_des
        dy_des = np.diff(y_des) / dt
        # add zero to the beginning of every row
        dy_des = np.hstack((np.zeros((y_des.shape[0], 1)), dy_des))

        # calculate acceleration of y_des
        ddy_des = np.diff(dy_des) / dt
        # add zero to the beginning of every row
        ddy_des = np.hstack((np.zeros((y_des.shape[0], 1)), ddy_des))

        forcing_functions = []
        for d in range(y_des.shape[0]):
            # find the force required to move along this trajectory
            # by subtracting out the effects of the point attractor 
            force = ddy_des[d] - alpha * \
                            (beta * (goal[d] - y_des[d]) - \
                             dy_des[d])
            # generate another interpolation function we can use 
            # to now train up our decoded oscillator output
            forcing_functions.append(lambda x, force=force:
                                    interpolate.interp1d(np.linspace(-np.pi, np.pi, num_samples), force)(x))
            
        return forcing_functions

In [ ]:
import nengo

m = nengo.Network()
with m: 
    # --------------------- Inputs --------------------------
    in_goal = nengo.Node(output=[0,0])

    # ------------------- Point Attractors --------------------
    yz1 = gen_point_attractor(m, in_goal[0], n_neurons=1000)
    yz2 = gen_point_attractor(m, in_goal[1], n_neurons=1000)
    
    # -------------------- Oscillators ----------------------
    osc = gen_oscillator(m, speed=.05)
    
    # generate our forcing function
    y_des = np.load('heart_traj.npz')['arr_0']
    forcing_functions = gen_forcing_functions(y_des)
    
    def force(x, function, gain=1):
        # calculate the angle theta
        theta = np.arctan2(x[0], x[1])
        # decode our function off of the theta value
        return function(theta) * gain
    # connect oscillator to point attractor
    nengo.Connection(osc, yz1[1], function=lambda x: force(x, forcing_functions[0]))
    nengo.Connection(osc, yz2[1], function=lambda x: force(x, forcing_functions[1]))
    
    # output for easy viewing
    output = nengo.Ensemble(n_neurons=1, dimensions=2, neuron_type=nengo.Direct())
    nengo.Connection(yz1[0], output[0], synapse=.01)
    nengo.Connection(yz2[0], output[1], synapse=.01)
    
from nengo_gui.ipython import IPythonViz
IPythonViz(m, cfg='DMP.viz.cfg')

And we have a rhythmic pattern generator capable of generating our desired trajectory at different points in state space! From here the model can be extended by adding a spatial scaling term (by multiplying the decoded oscillator output by a gain term) or adding temporal scaling (by using a controlled oscillator).

Additionally, other dynamical systems can be added to incorporate effects like obstacle avoidance, simply by connecting up their dynamics to our point attractors! There are a ton of other extensions, such as using DMPs to generate expected sensory trajectories and outputting corrective signals if there's deviation during movement.

Publications about some of the extensions can be found here: http://www-clmc.usc.edu/Resources/Publications


In [ ]: