In [1]:
import os
import scipy.integrate
import matplotlib.pyplot as plt
import numpy as np
from numpy import f2py
import copy
%matplotlib inline

# local modules
from periodic import PeriodicProcess, PeriodicScheduler, \
    nested_dict_to_namedtuple, Logger
import uorb
from sympy_utils import rhs_to_scipy_ode, \
    save_sympy_expr, load_sympy_expr

In [2]:
if f2py.compile(source=open('pendulum.f90','r').read(), modulename='pendulum',
             source_fn='pendulum.f90'):
    raise RuntimeError('compile failed, see console')
import pendulum

In [3]:
class PendulumDynamics(PeriodicProcess):

    def __init__(self, period, x0, uorb_manager):
        super(PendulumDynamics, self).__init__(period)
        self.x0 = x0
        self.ode = scipy.integrate.ode(
            pendulum.compute_f, pendulum.compute_a)
        #self.ode.set_integrator('dopri5')
        self.uorb_manager = uorb_manager
        self.process_noise_power = 1e-5

    def initialize(self, t):
        super(PendulumDynamics, self).initialize(t)
        self.ode.set_initial_value(self.x0, t)
        
        # publish sim state
        self.sim = uorb.Publication(
            self.uorb_manager, 'sim_state',
            uorb.Topic_sim_state(*((0,)*16)))
        self.sim.publish()
        
        # subscribe to actuators
        self.actuators = uorb.Subscription(
            self.uorb_manager, 'actuator_outputs')
        
    def run(self, t):
        # update actuator info
        self.actuators.update()
        
        process_noise_stdev = np.sqrt(self.process_noise_power/self.period)
        process_noise = process_noise_stdev*np.random.randn()
        
        ode = self.ode
        if t == ode.t:
            return
        u = [self.actuators.data.output[0] + process_noise]
        m = 1
        g = 9.8
        l = 1
        ode.set_f_params(u, m, g, l)
        ode.set_jac_params(u, m, g, l)
        ode.integrate(t)
        if not ode.successful():
            raise ValueError('ode integration failed')
            
        # acceleration
        # TODO, should make sympy output this
        x = ode.y
        y_accel = pendulum.compute_g(t, x, u, m, g, l)

        # publish simulation data
        self.sim.data.timestamp=int(1e6*t)
        self.sim.data.pitch=ode.y[0]
        self.sim.data.pitchspeed=ode.y[1]
        self.sim.data.xacc=y_accel[0,0]
        self.sim.data.yacc=y_accel[1,0]
        self.sim.data.zacc=y_accel[2,0]
        self.sim.publish()

    @property
    def x(self):
        return self.ode.y

In [4]:
class Sensor(PeriodicProcess):
    
    def __init__(self, period, uorb_manager):
        super(Sensor, self).__init__(period)
        self.uorb_manager = uorb_manager

        self.gyro_noise_power = 1e-5**np.ones(3)
        self.gyro_bias = np.zeros(3)
        self.accel_noise_power = 1e-4*np.ones(3)
        self.mag_noise_power = 1e-4*np.ones(3)
        
    def initialize(self, t):
        super(Sensor, self).initialize(t)     
        
        #initialize topic
        timestamp = int(t*1e6)
        self.sensor = uorb.Publication(
            self.uorb_manager, 'sensor_combined',
            uorb.Topic_sensor_combined(*((0,)*42)))
        
        self.sensor.data.gyro1_rad_s = timestamp
        self.sensor.data.gyro_rad_s = np.array([0,0,0])
        self.sensor.data.accelerometer_timestamp = timestamp
        self.sensor.data.accelerometer_m_s2 = np.array([0,0,0])
        self.sensor.magnetometer_timestamp = timestamp
        self.sensor.magnetometer_ga = np.array([0,0,0])
        self.sensor.publish()
        
        # get sim data
        self.sim = uorb.Subscription(
            self.uorb_manager, 'sim_state')

    def run(self, t):
        # get new simulation data
        self.sim.update()
        
        # noise generator
        randn = np.random.randn

        timestamp = int(t*1e6)
        self.sensor.data.timestamp = timestamp
        
        # gyro
        gyro_ideal = np.array([self.sim.data.rollspeed,
                     self.sim.data.pitchspeed,
                     self.sim.data.yawspeed])
        gyro_stddev = np.sqrt(self.gyro_noise_power/self.period)
        gyro_noise = gyro_stddev*randn(3)
        gyro = gyro_ideal + gyro_noise
        
        # accelerometer
        accel_ideal = np.array([self.sim.data.xacc,self.sim.data.yacc,self.sim.data.zacc])
        accel_stddev = np.sqrt(self.accel_noise_power/self.period)
        accel_noise = accel_stddev*randn(3)
        accel = accel_ideal + accel_noise

        # magnetometer
        mag_ideal = np.array([0,0,0])
        mag_stddev = np.sqrt(self.mag_noise_power/self.period)
        mag_noise = mag_stddev*randn(3)
        mag = mag_ideal + mag_noise
        
        # publish
        self.sensor.data.gyro1_timestamp = timestamp
        self.sensor.data.gyro_rad_s = gyro
        self.sensor.data.accelerometer_timestamp = timestamp
        self.sensor.data.accelerometer_m_s2 = accel
        self.sensor.magnetometer_timestamp = timestamp
        self.sensor.magnetometer_ga = mag
        self.sensor.publish()

In [5]:
class Estimator(PeriodicProcess):

    def __init__(self, period, uorb_manager):
        super(Estimator, self).__init__(period)
        self.uorb_manager = uorb_manager
        self.xh = np.array([0,0])
        self.P0 = np.eye(2)

    def initialize(self, t):
        super(Estimator, self).initialize(t)
        # initialize position publication
        self.pos = uorb.Publication(
            self.uorb_manager,
            'vehicle_global_position',
            uorb.Topic_vehicle_global_position(*((0,)*11)))
        
        # initialize attitude publication
        self.att = uorb.Publication(
            self.uorb_manager,
            'vehicle_attitude',
            uorb.Topic_vehicle_attitude(*((0,)*16)))
        
        # intialize sensor subscription
        self.sensor = uorb.Subscription(
            self.uorb_manager,
            'sensor_combined')

        # TODO, this is a hack, we make the estimator
        # just give the sim state for now
        self.sim = uorb.Subscription(
            self.uorb_manager,
            'sim_state')
        
        # initialize timestamps for checking for sensor updates
        self.accel_timestamp_last = -1
        self.l = 1
        self.g = 9.8
        
    def run(self, t):
        # get new sensor data
        # TODO use this to estimate
        self.sensor.update()
        
        if self.sensor.data.accelerometer_timestamp != self.accel_timestamp_last:
            self.accel_timestamp_last = self.sensor.data.accelerometer_timestamp
            y = self.sensor.data.accelerometer_m_s2
            
            theta = self.xh[0]
            theta_dot = self.xh[1]
            theta_ddot = 1 # TODO
            yh = np.array([self.l*theta_ddot + self.g*np.sin(theta), 0, self.g*np.cos(theta)])
            yh = []
            
        # publish new estimated position
        # TODO don't cheat using sim state
        self.pos.data.timestamp = int(1e6*t)
        self.pos.data.lat = self.sim.data.lat
        self.pos.data.lon = self.sim.data.lon
        self.pos.data.alt = self.sim.data.alt
        self.pos.data.vel_n = self.sim.data.vx
        self.pos.data.vel_e = self.sim.data.vy
        self.pos.data.vel_d = self.sim.data.vz
        self.pos.data.yaw = self.sim.data.yaw
        self.pos.publish()
        
        # publish new estimated attitude
        self.att.data.timestamp = int(1e6*t)
        self.att.data.roll = self.sim.data.roll
        self.att.data.pitch = float(self.sim.data.pitch)
        self.att.data.yaw = self.sim.data.yaw
        self.att.data.rollspeed = self.sensor.data.gyro_rad_s[0]
        self.att.data.pitchspeed = self.sensor.data.gyro_rad_s[1]
        self.att.data.yawspeed = self.sensor.data.gyro_rad_s[2]
        self.att.publish()

In [6]:
class Controller(PeriodicProcess):

    def __init__(self, period, uorb_manager):
        super(Controller, self).__init__(period)
        self.uorb_manager = uorb_manager
        self.actuators = uorb.Publication(
            self.uorb_manager, 'actuator_outputs',
            uorb.Topic_actuator_outputs(0,
                output=np.array([0,0,0,0,0,0,0,0]).astype(float),
                noutputs=1))
        self.actuators.publish()
        
    def initialize(self, t):
        super(Controller, self).initialize(t)
        
        # publish actuator topic
        self.actuators.data = uorb.Topic_actuator_outputs(int(t*1e6),
                output=np.array([0,0,0,0,0,0,0,0]).astype(float))
        self.actuators.publish()
        
        # subscribe to vehicle global position topic
        self.pos = uorb.Subscription(
            self.uorb_manager, 'vehicle_global_position')
        
        self.att = uorb.Subscription(
            self.uorb_manager, 'vehicle_attitude')
        
    def run(self, t):
        # get new position/attitude estimation
        self.pos.update()
        self.att.update()
        
        u = -20*self.att.data.pitch - 6*self.att.data.pitchspeed
        
        # publish new actuator controls
        self.actuators.data.timestamp = int(t*1e6)
        self.actuators.data.output = [u,0,0,0,0,0,0,0]
        self.actuators.publish()

First we declare the uORB manager and all processes that will be running. Next, we create the scheduler and pass the list or processes to it. Finally, we run the scheudler.


In [7]:
def run_sim():
    # constants
    tf = 4

    # declare uorb manager, by using pointer copying we can avoid
    # copying actual data and make it run much faster, however 
    # we have to becareful doing this with multi-threading
    uorb_manager = uorb.Manager(copy_type='pointer')

    # declare periodic processes
    controller = Controller(0.02, uorb_manager)
    estimator = Estimator(0.002, uorb_manager)
    dynamics = PendulumDynamics(0.001, [1,0], uorb_manager)
    sensor = Sensor(0.001, uorb_manager)
    logger = Logger(0.001, tf, 
        ['sim_state',
         'vehicle_global_position',
         'vehicle_attitude',
         'sensor_combined',
         'actuator_outputs'],
        uorb_manager)

    # declare scheduler
    scheduler = PeriodicScheduler(False, 0, tf, 0.001)
    scheduler.process_list = [
        dynamics, sensor, estimator, controller, logger]  

    # run the scheduler
    scheduler.run()
    return nested_dict_to_namedtuple(logger.log)

We can profile the simulation to see what is taking the most time.


In [8]:
stats = %prun -q -r log = run_sim()
stats.sort_stats('tottime').print_stats(20);


          302502 function calls in 0.608 seconds

   Ordered by: internal time
   List reduced from 134 to 20 due to restriction <20>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     3999    0.123    0.000    0.125    0.000 _ode.py:730(run)
     3999    0.090    0.000    0.162    0.000 <ipython-input-4-5d4f043927be>:33(run)
     3999    0.086    0.000    0.129    0.000 __init__.py:111(run)
     3999    0.051    0.000    0.209    0.000 <ipython-input-3-ea772f26b4d7>:26(run)
    13926    0.047    0.000    0.047    0.000 {numpy.core.multiarray.array}
    15996    0.031    0.000    0.031    0.000 {method 'randn' of 'mtrand.RandomState' objects}
     1833    0.027    0.000    0.044    0.000 <ipython-input-5-6fb25091c0a5>:39(run)
    11865    0.021    0.000    0.027    0.000 _base.py:57(publish)
    30205    0.018    0.000    0.056    0.000 _base.py:100(update)
    20000    0.018    0.000    0.565    0.000 __init__.py:35(update)
    30205    0.013    0.000    0.022    0.000 _base.py:97(updated)
    18272    0.012    0.000    0.016    0.000 _base.py:83(copy)
        1    0.010    0.010    0.603    0.603 __init__.py:59(run)
    30205    0.010    0.000    0.010    0.000 _base.py:79(updated)
    35872    0.009    0.000    0.009    0.000 {method 'keys' of 'dict' objects}
    11865    0.007    0.000    0.034    0.000 _base.py:116(publish)
    30137    0.007    0.000    0.007    0.000 _base.py:42(_copy_type)
     3999    0.006    0.000    0.132    0.000 _ode.py:376(integrate)
        1    0.004    0.004    0.025    0.025 __init__.py:123(finalize)
        7    0.003    0.000    0.005    0.001 collections.py:288(namedtuple)


If we aren't profiling it is much faster.


In [9]:
%%time
log = run_sim();


CPU times: user 513 ms, sys: 3.6 ms, total: 517 ms
Wall time: 515 ms

It is easy to plot any of the various uorb topics.


In [10]:
plt.plot(log.log.t, log.sim_state.pitch);



In [11]:
plt.plot(log.log.t, log.actuator_outputs.output[:,0], 'r');



In [12]:
plt.plot(log.log.t, log.vehicle_attitude.pitch);



In [13]:
plt.plot(log.log.t, log.sensor_combined.gyro_rad_s);



In [14]:
plt.plot(log.log.t, log.sensor_combined.accelerometer_m_s2);