Generating symbolic expressions

For larger reaction systems it is preferable to generate the system of ordinary differential equations from some serialized format and then generate the callback using code generation.

In this notebook we will define such a serialized format, and use it load a larger set of reactions. We represent a reaction as length 3 tuple of: (rate_const, coeff_powers, net_effect). Representing Robertson's system this way looks like this:


In [ ]:
reactions = [
    ('k1', {'A': 1}, {'B': 1, 'A': -1}),
    ('k2', {'B': 1, 'C': 1}, {'A': 1, 'B': -1}),
    ('k3', {'B': 2}, {'B': -1, 'C': 1})
]
names, params = 'A B C'.split(), 'k1 k2 k3'.split()
tex_names = ['[%s]' % n for n in names]

the reaction system is still defined as: $$ A \overset{k_1}{\rightarrow} B \\ B + C \overset{k_2}{\rightarrow} A + C \\ 2 B \overset{k_3}{\rightarrow} B + C $$

We will now write a small convenience function which takes the above representation and creates symbolic expressions for the ODE system:


In [ ]:
# %load ../scipy2017codegen/chem.py
from operator import mul
from functools import reduce
import sympy as sym

def prod(seq):
    return reduce(mul, seq) if seq else 1

def mk_exprs_symbs(rxns, names):
    concs = sym.symbols(names, real=True, nonnegative=True)
    c_dict = dict(zip(names, concs))
    f = {n: 0 for n in names}
    for coeff, r_stoich, net_stoich in rxns:
        r = sym.S(coeff)*prod([c_dict[rk]**p for rk, p in r_stoich.items()])
        for nk, nm in net_stoich.items():
            f[nk] += nm*r
    return [f[n] for n in names], concs


def mk_rsys(ODEcls, reactions, names, params=(), **kwargs):
    f, symbs = mk_exprs_symbs(reactions, names)
    return ODEcls(f, symbs, params=map(sym.S, params), **kwargs)

In [ ]:
sym.init_printing()
f, symbs = mk_exprs_symbs(reactions, names)
f, symbs

We create a helper class to represent to ODE system.


In [ ]:
# %load ../scipy2017codegen/odesys.py
from itertools import chain  # Py 2.7 does not support func(*args1, *args2)
import sympy as sym
from scipy.integrate import odeint

class ODEsys(object):

    default_integrator = 'odeint'

    def __init__(self, f, y, t=None, params=(), tex_names=None, lambdify=None):
        assert len(f) == len(y), 'f is dy/dt'
        self.f = tuple(f)
        self.y = tuple(y)
        self.t = t
        self.p = tuple(params)
        self.tex_names = tex_names
        self.j = sym.Matrix(self.ny, 1, f).jacobian(y)
        self.lambdify = lambdify or sym.lambdify
        self.setup()

    @property
    def ny(self):
        return len(self.y)

    def setup(self):
        self.lambdified_f = self.lambdify(self.y + self.p, self.f)
        self.lambdified_j = self.lambdify(self.y + self.p, self.j)

    def f_eval(self, y, t, *params):
        return self.lambdified_f(*chain(y, params))

    def j_eval(self, y, t, *params):
        return self.lambdified_j(*chain(y, params))

    def integrate(self, *args, **kwargs):
        integrator = kwargs.pop('integrator', self.default_integrator)
        return getattr(self, 'integrate_%s' % integrator)(*args, **kwargs)

    def integrate_odeint(self, tout, y0, params=(), rtol=1e-8, atol=1e-8, **kwargs):
        return odeint(self.f_eval, y0, tout, args=tuple(params), full_output=True,
                      Dfun=self.j_eval, rtol=rtol, atol=atol, **kwargs)

    def print_info(self, info):
        if info is None:
            return
        nrhs = info.get('num_rhs')
        if not nrhs:
            nrhs = info['nfe'][-1]
        njac = info.get('num_dls_jac_evals')
        if not njac:
            njac = info['nje'][-1]
        print("The rhs was evaluated %d times and the Jacobian %d times" % (nrhs, njac))

    def plot_result(self, tout, yout, info=None, ax=None):
        ax = ax or plt.subplot(1, 1, 1)
        for i, label in enumerate(self.tex_names):
            ax.plot(tout, yout[:, i], label='$%s$' % label)
        ax.set_ylabel('$\mathrm{concentration\ /\ mol \cdot dm^{-3}}$')
        ax.set_xlabel('$\mathrm{time\ /\ s}$')
        ax.legend(loc='best')
        self.print_info(info)

In [ ]:
odesys = ODEsys(f, symbs, params=params, tex_names=tex_names)

In [ ]:
import numpy as np
tout = np.logspace(-6, 6)
yout, info = odesys.integrate_odeint(tout, [1, 0, 0], [0.04, 1e4, 3e7], atol=1e-9, rtol=1e-9)

In [ ]:
import matplotlib.pyplot as plt
%matplotlib inline

In [ ]:
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
odesys.plot_result(tout, yout, info, ax=axes[0])
odesys.plot_result(tout, yout, ax=axes[1])
axes[1].set_xscale('log')
axes[1].set_yscale('log')

The reason for why we went through this trouble is to be able to create a ODEsys instance from conveniently serialized data. Here is a much larger set of reactions, describing water radiolysis at 298 K and a doserate of 300 Gy/s (which is a doserate not far from that of a nuclear reactor):


In [ ]:
import json
watrad_data = json.load(open('../scipy2017codegen/data/radiolysis_300_Gy_s.json'))
watrad = mk_rsys(ODEsys, **watrad_data)
print(len(watrad.f), watrad.y[0], watrad.f[0])

Values correspond to SI units, the concentration of water at 298 K is 55400 mol/m³. Neutral water contains [H+] = [HO-] = 10^-4 mol/m³:


In [ ]:
tout = np.logspace(-6, 3, 200)  # close to one hour of operation
c0 = {'H2O': 55.4e3, 'H+': 1e-4, 'OH-': 1e-4}
y0 = [c0.get(symb.name, 0) for symb in watrad.y]

In [ ]:
%timeit watrad.integrate_odeint(tout, y0)

In [ ]:
fig, ax = plt.subplots(1, 1, figsize=(14, 6))
watrad.plot_result(tout, *watrad.integrate_odeint(tout, y0), ax=ax)
ax.set_xscale('log')
ax.set_yscale('log')