LM-MLE fitting in Python

Need to implement our own version of LM-MLE fitting in python, because you cannot simply change the $\chi^2$ function, you also have to change the gradients.

Here's a C implementation of normal Levenberg–Marquardt: http://users.ics.forth.gr/~lourakis/levmar/

Basic plan is to replace the leastsq function, monkey patching that in instead.

Turns out its much easier to make our own version of curve_fit and delegate to scipy.curve_fit normally.

References

  1. Methods for Non-Linear Least Squares Problems (2nd ed.) http://www2.imm.dtu.dk/pubdb/views/publication_details.php?id=3215 (accessed Aug 18, 2017).
  2. Laurence, T. A.; Chromy, B. A. Efficient Maximum Likelihood Estimator Fitting of Histograms. Nat Meth 2010, 7 (5), 338–339.
  3. Numerical Recipes in C: The Art of Scientific Computing, 2nd ed.; Press, W. H., Ed.; Cambridge University Press: Cambridge ; New York, 1992.

Simple LS LM

Here I attempt to implement a simple, fully pythonic, version of LM. To make life ridiculously easy I'll start with modeling an exponential equation $y = A e^{-k t} + o$.


In [1]:
%matplotlib inline
import numpy as np
from numpy import linalg as la
import matplotlib.pyplot as plt
%load_ext line_profiler
import seaborn as sns

My test function will be two gaussians

$$ y = A_0 e^{-\frac{(x-x_0)^2}{\sigma_0^2}} + A_1 e^{-\frac{(x-x_1)^2}{\sigma_1^2}} + O $$

In [2]:
def test_func(xdata, a0, x0, sigma0, a1, x1, sigma1, o):
    """An exponential decay"""
    return a0 * np.exp(-(xdata-x0)**2 / sigma0**2) + a1 * np.exp(-(xdata-x1)**2 / sigma1**2) + o

In [3]:
x = np.linspace(0,1)
preal = np.array([1, 0.25, .04, 2, 0.7, 0.07, 2])
y = test_func(x, *preal)
plt.plot(x, y)


Out[3]:
[<matplotlib.lines.Line2D at 0x24cbb8d25f8>]

In [4]:
def test_func_jac(xdata, a0, x0, sigma0, a1, x1, sigma1, o):
    """An exponential decay"""
    dyda0 = np.exp(-(xdata-x0)**2 / sigma0**2)
    dydx0 = 2 * a0 * (xdata-x0) * dyda0 / sigma0**2
    dydsigma0 = dydx0 * (xdata-x0) / sigma0
    
    dyda1 = np.exp(-(xdata-x1)**2 / sigma1**2)
    dydx1 = 2 * a1 * (xdata-x1) * dyda1 / sigma1**2
    dydsigma1 = dydx1 * (xdata-x1) / sigma1
    
    dydo = np.ones_like(dyda0)
    
    to_return = np.concatenate((dyda0, dydx0, dydsigma0, dyda1, dydx1, dydsigma1, dydo))
    to_return.shape = (7, -1)
#     to_return = np.vstack((dyda0, dydx0, dydsigma0, dyda1, dydx1, dydsigma1, dydo))
    return to_return.T

Test fit with scipy.optimize.curve_fit


In [5]:
from scipy.optimize import curve_fit

In [6]:
x = np.linspace(0,1, 81)
y = test_func(x, *preal) + 0.1*np.random.randn(len(x))
plt.plot(x, y)


Out[6]:
[<matplotlib.lines.Line2D at 0x24cbbe314a8>]

In [7]:
preal


Out[7]:
array([ 1.  ,  0.25,  0.04,  2.  ,  0.7 ,  0.07,  2.  ])

In [8]:
pguess = np.array([ 1.1  ,  0.23,  0.03,  2.  ,  0.65 ,  0.05 ,  1.  ])
y_fit = test_func(x, *pguess)
plt.plot(x, y, ".")
plt.plot(x, y_fit)


Out[8]:
[<matplotlib.lines.Line2D at 0x24cbbe3c128>]

In [9]:
%%time
popt_sp, pcov, infodict, errmsg, ier = curve_fit(
    test_func, x, y, p0=pguess, full_output=True,
    jac=test_func_jac)


Wall time: 1 ms

In [10]:
popt_sp


Out[10]:
array([ 0.81774025,  0.25395677,  0.04519558,  1.9312654 ,  0.69973113,
        0.07271354,  2.01881715])

In [11]:
y_fit = test_func(x, *popt_sp)
plt.plot(x, y, ".")
plt.plot(x, y_fit)
((y-y_fit)**2).sum()


Out[11]:
0.88094658886845312

In [12]:
def _chi2(y, ydata):
    return 0.5 * ((y - ydata)**2).sum()

def lm_update(xdata, f, p, jac):
    """should use partials inside lm_ls"""
    # calculate the jacobian
    # j shape (ndata, nparams)
    j = jac(xdata, *p)
    # calculate the linear term of Hessian
    # a shape (nparams, nparams)
    a = j.T @ j
    # calculate the gradient
    # g shape (nparams,)
    g = j.T @ f
    return a, g

def lm_ls(func, xdata, ydata, p0, jac, ftol=1.49012e-8, xtol=1.49012e-8, gtol=0.0,
         maxfev=None, factor=100):
    """A test implementation of levenburg-marquet
    
    ftol : float, optional
        Relative error desired in the sum of squares.
    xtol : float, optional
        Relative error desired in the approximate solution.
    gtol : float, optional
        Orthogonality desired between the function vector and the
        columns of the Jacobian.
    """
    if maxfev is None:
        maxfev = 100 * (len(p0) + 1)

#     gtest = mubda g : npla.norm(g, np.inf) <= gtol
    def gtest(g):
        return np.abs(g).max() <= gtol
    def xtest(dp, p):
        return la.norm(dp) <= xtol * (la.norm(p) + xtol)
    # need residuals
    y = func(xdata, *p0)
    a, g = lm_update(xdata, y - ydata, p0, jac)
    chisq_old = chi2(y, ydata)
    mu = factor * np.diagonal(a).max()
    v = factor
    p = p0
    for ev in range(maxfev):
        if gtest(g):
            break
        # calculate proposed step
        aug_a = a + np.diag(np.ones_like(g) * mu)
        # dp = la.lstsq(aug_a, -g)[0]
        dp = -la.inv(aug_a) @ g
        if xtest(dp, p):
            break
        # make test move, I think I should be saving previous
        # position so that I can "undo" if this is bad
        p = p0 + dp
        y = func(xdata, *p)
        chisq_new = chi2(y, ydata)
        # see if we reduced chisq, note we should do more here
        # predicted_gain = 0.5 * dp.T @ (mu * dp - g)
        rho = (chisq_old - chisq_new) # / predicted_gain
        if rho > 0:
            if rho <= ftol * chisq_old:
                break
            # update params, chisq and a and g
            p0 = p
            chisq_old = chisq_new
            a, g = lm_update(xdata, y - ydata, p0, jac)
            mu /= factor 
#             mu *= max(1 / factor, 1 - (2 * rho - 1)**3)
        else:
            mu *= factor
    print(ev)
    return p

In [13]:
ta, tb = np.random.randn(7, 7), np.random.randn(700)
assert np.array_equal(np.dot(tb, tb), tb.T @ tb)
%timeit np.dot(tb, tb)
%timeit tb.T @ tb


927 ns ± 31.1 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
1.04 µs ± 10.8 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [14]:
ta, tb = np.random.randn(7, 7), np.random.randn(7)
%timeit la.lstsq(ta, tb)
%timeit la.pinv(ta) @ tb
%timeit la.inv(ta) @ tb
%timeit la.lstsq(ta, tb, 1e-6)
%timeit la.lstsq(ta, tb, 1e-8)
%timeit la.lstsq(ta, tb, 1e-10)
%timeit la.lstsq(ta, tb, 1e-15)
ta = ta.astype(np.float32)
tb = tb.astype(np.float32)
%timeit la.lstsq(ta, tb)


83.4 µs ± 6.57 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
65.5 µs ± 503 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
16.2 µs ± 526 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
68.3 µs ± 225 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
75 µs ± 4.63 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
80.1 µs ± 5.66 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
68.1 µs ± 232 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
74.8 µs ± 3.09 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [15]:
%time popt = lm_ls(test_func, x, y, pguess, test_func_jac)
y_fit = test_func(x, *popt)
plt.plot(x, y, ".")
plt.plot(x, y_fit)


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<timed exec> in <module>()

<ipython-input-12-72f5c53fd532> in lm_ls(func, xdata, ydata, p0, jac, ftol, xtol, gtol, maxfev, factor)
     38     y = func(xdata, *p0)
     39     a, g = lm_update(xdata, y - ydata, p0, jac)
---> 40     chisq_old = chi2(y, ydata)
     41     mu = factor * np.diagonal(a).max()
     42     v = factor

NameError: name 'chi2' is not defined
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-15-aab6528169d5> in <module>()
      1 get_ipython().magic('time popt = lm_ls(test_func, x, y, pguess, test_func_jac)')
----> 2 y_fit = test_func(x, *popt)
      3 plt.plot(x, y, ".")
      4 plt.plot(x, y_fit)

NameError: name 'popt' is not defined

In [ ]:
np.allclose(popt, popt_sp)

In [ ]:
curve_fit(test_func, x, y, p0=pguess, full_output=True, jac=test_func_jac)

In [ ]:
%timeit curve_fit(test_func, x, y, p0=pguess, full_output=True, jac=test_func_jac)
%timeit lm_ls(test_func, x, y, pguess, test_func_jac)

In [ ]:
%lprun -f lm_ls lm_ls(test_func, x, y, pguess, test_func_jac)

Matching the calling

I want to match the call signature of curve_fit

popt, pcov, infodict, errmsg, ier = curve_fit(
    model_ravel, (xx, yy), data.ravel(), p0=guess_params,
    bounds=bounds, full_output=True, jac=self.model_jac)

In [ ]:
def chi2_ls(f):
    """$\chi^2_ls$"""
    return 0.5 * (f**2).sum(0)

def update_ls(x0, f, Dfun):
    """update_ls"""
    # calculate the jacobian
    # j shape (ndata, nparams)
    j = Dfun(x0)
    # calculate the linear term of Hessian
    # a shape (nparams, nparams)
    a = j.T @ j
    # calculate the gradient
    # g shape (nparams,)
    g = j.T @ f
    return j, a, g

def chi2_mle(f):
    """$\chi^2_mle$"""
    f, y = f
    if f.min() <= 0:
        # this is not allowed so make chi2
        # large to avoid
        return np.inf
    part1 = (f - y).sum()
    part2 = - (y * np.log(f / y))[y > 0].sum()
    return part1 + part2

def update_mle(x0, f, Dfun):
    """update_mle"""
    # calculate the jacobian
    # j shape (ndata, nparams)
    f, y = f
    y_f = y / f
    j = Dfun(x0)
    # calculate the linear term of Hessian
    # a shape (nparams, nparams)
    a = ((j.T * (y_f / f)) @ j)
    # calculate the gradient
    # g shape (nparams,)
    g = j.T @ (1 - y_f)
    return j, a, g

def _wrap_func_mle(func, xdata, ydata, transform):
    """Returns f and xdata"""
    if transform is None:
        def func_wrapped(params):
            return func(xdata, *params), ydata
    elif transform.ndim == 1:
        raise NotImplementedError
    else:
        # Chisq = (y - yd)^T C^{-1} (y-yd)
        # transform = L such that C = L L^T
        # C^{-1} = L^{-T} L^{-1}
        # Chisq = (y - yd)^T L^{-T} L^{-1} (y-yd)
        # Define (y-yd)' = L^{-1} (y-yd)
        # by solving
        # L (y-yd)' = (y-yd)
        # and minimize (y-yd)'^T (y-yd)'
        raise NotImplementedError
    return func_wrapped


def _wrap_jac_mle(jac, xdata, transform):
    if transform is None:
        def jac_wrapped(params):
            return jac(xdata, *params)
    elif transform.ndim == 1:
        raise NotImplementedError
    else:
        raise NotImplementedError
    return jac_wrapped

def lm(func, x0, args=(), Dfun=None, full_output=False,
            col_deriv=True, ftol=1.49012e-8, xtol=1.49012e-8,
            gtol=0.0, maxfev=None, epsfcn=None, factor=100, diag=None, method="ls"):
    """A more thorough implementation of levenburg-marquet
    for gaussian Noise
    ::
        x = arg min(sum(func(y)**2,axis=0))
                 y
    Parameters
    ----------
    func : callable
        should take at least one (possibly length N vector) argument and
        returns M floating point numbers. It must not return NaNs or
        fitting might fail.
    x0 : ndarray
        The starting estimate for the minimization.
    args : tuple, optional
        Any extra arguments to func are placed in this tuple.
    Dfun : callable, optional
        A function or method to compute the Jacobian of func with derivatives
        across the rows. If this is None, the Jacobian will be estimated.
    full_output : bool, optional
        non-zero to return all optional outputs.
    col_deriv : bool, optional
        non-zero to specify that the Jacobian function computes derivatives
        down the columns (faster, because there is no transpose operation).
    ftol : float, optional
        Relative error desired in the sum of squares.
    xtol : float, optional
        Relative error desired in the approximate solution.
    gtol : float, optional
        Orthogonality desired between the function vector and the columns of
        the Jacobian.
    maxfev : int, optional
        The maximum number of calls to the function. If `Dfun` is provided
        then the default `maxfev` is 100*(N+1) where N is the number of elements
        in x0, otherwise the default `maxfev` is 200*(N+1).
    epsfcn : float, optional
        A variable used in determining a suitable step length for the forward-
        difference approximation of the Jacobian (for Dfun=None).
        Normally the actual step length will be sqrt(epsfcn)*x
        If epsfcn is less than the machine precision, it is assumed that the
        relative errors are of the order of the machine precision.
    factor : float, optional
        A parameter determining the initial step bound
        (``factor * || diag * x||``). Should be in interval ``(0.1, 100)``.
    diag : sequence, optional
        N positive entries that serve as a scale factors for the variables.
    method : "ls" or "mle"
        What type of estimator to use. Maximum likelihood ("mle") assumes that the noise
        in the measurement is poisson distributed while least squares ("ls") assumes
        normally distributed noise.
    """
    
    x0 = np.asarray(x0).flatten()
    n = len(x0)
    if not isinstance(args, tuple):
        args = (args,)
#     shape, dtype = _check_func('leastsq', 'func', func, x0, args, n)
#     m = shape[0]
#     if n > m:
#         raise TypeError('Improper input: N=%s must not exceed M=%s' % (n, m))
    if Dfun is None:
        raise NotImplementedError
        if epsfcn is None:
            epsfcn = np.finfo(dtype).eps
    else:
        if col_deriv:
            pass
#             _check_func('leastsq', 'Dfun', Dfun, x0, args, n, (n, m))
        else:
            raise NotImplementedError("Column derivatives required")
        if maxfev is None:
            maxfev = 100 * (n + 1)

    errors = {0: ["Improper input parameters.", TypeError],
              1: ["Both actual and predicted relative reductions "
                  "in the sum of squares\n  are at most %f" % ftol, None],
              2: ["The relative error between two consecutive "
                  "iterates is at most %f" % xtol, None],
              3: ["Both actual and predicted relative reductions in "
                  "the sum of squares\n  are at most %f and the "
                  "relative error between two consecutive "
                  "iterates is at \n  most %f" % (ftol, xtol), None],
              4: ["The cosine of the angle between func(x) and any "
                  "column of the\n  Jacobian is at most %f in "
                  "absolute value" % gtol, None],
              5: ["Number of calls to function has reached "
                  "maxfev = %d." % maxfev, ValueError],
              6: ["ftol=%f is too small, no further reduction "
                  "in the sum of squares\n  is possible.""" % ftol,
                  ValueError],
              7: ["xtol=%f is too small, no further improvement in "
                  "the approximate\n  solution is possible." % xtol,
                  ValueError],
              8: ["gtol=%f is too small, func(x) is orthogonal to the "
                  "columns of\n  the Jacobian to machine "
                  "precision." % gtol, ValueError],
              'unknown': ["Unknown error.", TypeError]}
    
    if maxfev is None:
        maxfev = 100 * (len(p0) + 1)

#     gtest = mubda g : npla.norm(g, np.inf) <= gtol
    def gtest(g):
        if gtol:
            return np.abs(g).max() <= gtol
        else:
            return False
    def xtest(dp, p):
        return la.norm(dp) <= xtol * (la.norm(p) + xtol)
    
    # need residuals
    if method == "ls":
        def update(x0, f):
            return update_ls(x0, f, Dfun)
        
        def chi2(f):
            return chi2_ls(f)
    elif method == "mle":
        def update(x0, f):
            return update_mle(x0, f, Dfun)
        
        def chi2(f):
            return chi2_mle(f)
    else:
        raise TypeError("Method {} not recognized".format(method))

    f = func(x0)
    j, a, g = update(x0, f)
    # I think this should be norm along the vector value direction
    D = la.norm(j, axis=0)
#     D = np.ones_like(x0)
    chisq_old = chi2(f)
    
    mu = factor * np.diagonal(a).max()
    
    x = x0
    v = 2
    rhos = []
    chis = [chisq_old]
    for ev in range(maxfev):
        if gtest(g):
            break
        # calculate proposed step
        aug_a = a + np.diag(mu * (D ** 2))
        dx = -la.inv(aug_a) @ g
        if xtest(dx, x):
            break
        # make test move, I think I should be saving previous
        # position so that I can "undo" if this is bad
        x = x0 + dx
        f = func(x)
        chisq_new = chi2(f)
        chis.append(chisq_new)
        # see if we reduced chisq, note we should do more here
        predicted_gain = 0.5 * dx.T @ (mu * dx - g)
        rho = (chisq_old - chisq_new) / predicted_gain
        rhos.append(rho)
        if rho > 0:
            if rho <= ftol * chisq_old:
                break
            # update params, chisq and a and g
            x0 = x
            chisq_old = chisq_new
            j, a, g = update(x0, f)
#             mu /= 10
            mu *= max(0.3333, 1 - (2 * rho - 1)**3)
            v=2
        else:
            mu *= v
            v *= 2
        D = np.maximum(la.norm(j, axis=0), D)
            
    if method == "mle":
        # remember we return the data with f?
        f = f[0]
    infodict = dict(fvec=f, nfev=ev, rhos=rhos, chis=chis)
    popt, pcov, errmsg, ier = x, None, None, 1
    if full_output:
        return popt, pcov, infodict, errmsg, ier
    else:
        return popt, pcov

In [ ]:
def _wrap_func_ls(func, xdata, ydata, transform):
    if transform is None:
        def func_wrapped(params):
            return func(xdata, *params) - ydata
    elif transform.ndim == 1:
        def func_wrapped(params):
            return transform * (func(xdata, *params) - ydata)
    else:
        # Chisq = (y - yd)^T C^{-1} (y-yd)
        # transform = L such that C = L L^T
        # C^{-1} = L^{-T} L^{-1}
        # Chisq = (y - yd)^T L^{-T} L^{-1} (y-yd)
        # Define (y-yd)' = L^{-1} (y-yd)
        # by solving
        # L (y-yd)' = (y-yd)
        # and minimize (y-yd)'^T (y-yd)'
        def func_wrapped(params):
            return solve_triangular(transform, func(xdata, *params) - ydata, lower=True)
    return func_wrapped


def _wrap_jac_ls(jac, xdata, transform):
    if transform is None:
        def jac_wrapped(params):
            return jac(xdata, *params)
    elif transform.ndim == 1:
        def jac_wrapped(params):
            return transform[:, np.newaxis] * np.asarray(jac(xdata, *params))
    else:
        def jac_wrapped(params):
            return solve_triangular(transform, np.asarray(jac(xdata, *params)), lower=True)
    return jac_wrapped

def curve_fit_dph(f, xdata, ydata, p0=None, sigma=None, absolute_sigma=False,
                  check_finite=True, bounds=(-np.inf, np.inf), method=None,
                  jac=None, **kwargs):
    """
    Use non-linear least squares to fit a function, f, to data.
    Assumes ``ydata = poisson(f(xdata, *params))``
    Parameters
    ----------
    f : callable
        The model function, f(x, ...).  It must take the independent
        variable as the first argument and the parameters to fit as
        separate remaining arguments.
    xdata : An M-length sequence or an (k,M)-shaped array for functions with k predictors
        The independent variable where the data is measured.
    ydata : M-length sequence
        The dependent data --- nominally f(xdata, ...)
    p0 : None, scalar, or N-length sequence, optional
        Initial guess for the parameters.  If None, then the initial
        values will all be 1 (if the number of parameters for the function
        can be determined using introspection, otherwise a ValueError
        is raised).
    sigma : None or M-length sequence or MxM array, optional
        Determines the uncertainty in `ydata`. If we define residuals as
        ``r = ydata - f(xdata, *popt)``, then the interpretation of `sigma`
        depends on its number of dimensions:
            - A 1-d `sigma` should contain values of standard deviations of
              errors in `ydata`. In this case, the optimized function is
              ``chisq = sum((r / sigma) ** 2)``.
            - A 2-d `sigma` should contain the covariance matrix of
              errors in `ydata`. In this case, the optimized function is
              ``chisq = r.T @ inv(sigma) @ r``.
              .. versionadded:: 0.19
        None (default) is equivalent of 1-d `sigma` filled with ones.
    absolute_sigma : bool, optional
        If True, `sigma` is used in an absolute sense and the estimated parameter
        covariance `pcov` reflects these absolute values.
        If False, only the relative magnitudes of the `sigma` values matter.
        The returned parameter covariance matrix `pcov` is based on scaling
        `sigma` by a constant factor. This constant is set by demanding that the
        reduced `chisq` for the optimal parameters `popt` when using the
        *scaled* `sigma` equals unity. In other words, `sigma` is scaled to
        match the sample variance of the residuals after the fit.
        Mathematically,
        ``pcov(absolute_sigma=False) = pcov(absolute_sigma=True) * chisq(popt)/(M-N)``
    check_finite : bool, optional
        If True, check that the input arrays do not contain nans of infs,
        and raise a ValueError if they do. Setting this parameter to
        False may silently produce nonsensical results if the input arrays
        do contain nans. Default is True.
    bounds : 2-tuple of array_like, optional
        Lower and upper bounds on independent variables. Defaults to no bounds.
        Each element of the tuple must be either an array with the length equal
        to the number of parameters, or a scalar (in which case the bound is
        taken to be the same for all parameters.) Use ``np.inf`` with an
        appropriate sign to disable bounds on all or some parameters.
        .. versionadded:: 0.17
    method : {'lm', 'trf', 'dogbox'}, optional
        Method to use for optimization.  See `least_squares` for more details.
        Default is 'lm' for unconstrained problems and 'trf' if `bounds` are
        provided. The method 'lm' won't work when the number of observations
        is less than the number of variables, use 'trf' or 'dogbox' in this
        case.
        
        "ls", "mle"
        What type of estimator to use. Maximum likelihood ("mle") assumes that the noise
        in the measurement is poisson distributed while least squares ("ls") assumes
        normally distributed noise. "pyls" is a python implementation, for testing only
        
        .. versionadded:: 0.17
    jac : callable, string or None, optional
        Function with signature ``jac(x, ...)`` which computes the Jacobian
        matrix of the model function with respect to parameters as a dense
        array_like structure. It will be scaled according to provided `sigma`.
        If None (default), the Jacobian will be estimated numerically.
        String keywords for 'trf' and 'dogbox' methods can be used to select
        a finite difference scheme, see `least_squares`.
        .. versionadded:: 0.18
    kwargs
        Keyword arguments passed to `leastsq` for ``method='lm'`` or
        `least_squares` otherwise."""
    if method is None:
        method = "lm"
        
    if method in {'lm', 'trf', 'dogbox'}:
        return curve_fit(f, xdata, ydata, p0, sigma, absolute_sigma,
                  check_finite, bounds, method,
                  jac, **kwargs)
    elif method == "ls":
        _wrap_func = _wrap_func_ls
        _wrap_jac = _wrap_jac_ls
    elif method == "mle":
        _wrap_func = _wrap_func_mle
        _wrap_jac = _wrap_jac_mle
    else:
        raise TypeError("Method {} not recognized".format(method))
    
    if p0 is None:
        raise NotImplementedError("You must give a guess")
    if sigma is not None:
        raise NotImplementedError("Weighting has not been implemented")
    else:
        transform = None
        
    if jac is None:
        raise NotImplementedError("You need a Jacobian")
        
    # NaNs can not be handled
    if check_finite:
        ydata = np.asarray_chkfinite(ydata)
    else:
        ydata = np.asarray(ydata)

    if isinstance(xdata, (list, tuple, np.ndarray)):
        # `xdata` is passed straight to the user-defined `f`, so allow
        # non-array_like `xdata`.
        if check_finite:
            xdata = np.asarray_chkfinite(xdata)
        else:
            xdata = np.asarray(xdata)
            
    func = _wrap_func(f, xdata, ydata, transform)
    if callable(jac):
        jac = _wrap_jac(jac, xdata, transform)
    elif jac is None and method != 'lm':
        jac = '2-point'

    # Remove full_output from kwargs, otherwise we're passing it in twice.
    return_full = kwargs.pop('full_output', False)
    res = lm(func, p0, Dfun=jac, full_output=1, method=method, **kwargs)
    popt, pcov, infodict, errmsg, ier = res
    cost = np.sum(infodict['fvec'] ** 2)
    if ier not in [1, 2, 3, 4]:
        raise RuntimeError("Optimal parameters not found: " + errmsg)

    if return_full:
        return popt, pcov, infodict, errmsg, ier
    else:
        return popt, pcov

In [ ]:
%%time
popt_dph, pcov, infodict, errmsg, ier = curve_fit_dph(
    test_func, x, y, p0=pguess, full_output=True,
    jac=test_func_jac, method="ls")

In [ ]:
infodict

In [ ]:
y_fit = test_func(x, *popt_dph)
plt.plot(x, y, ".")
plt.plot(x, y_fit)

In [ ]:
np.allclose(popt_dph, popt_sp)

In [ ]:
preal_ls = np.array([210, 0.25, .04, 500, 0.7, 0.07, 45])
pguess_ls = np.array([ 100  ,  0.23,  0.03,  500.  ,  0.65 ,  0.05 ,  45.  ])

x_ls = np.linspace(0,1, 81)
y_real_ls = test_func(x_ls, *preal_ls)

fig, (ax0, ax1) = plt.subplots(2)
ax0.plot(x_ls, y_real_ls, "--", label="Real")

y_ls = y_real_ls + np.random.randn(len(y_real_ls))*20

y_guess_ls = test_func(x_ls, *pguess_ls)
ax0.plot(x_ls, y_ls, ".")
ax0.plot(x_ls, y_guess_ls)

ax0.plot(x_ls, y_ls, ".")
for meth in ("mle", "ls", "lm"):
    def opt():
        return curve_fit_dph(
                    test_func, x_ls, y_ls, p0=pguess, full_output=True,
                    jac=test_func_jac, method=meth)
    popt, pcov, infodict, errmsg, ier = opt()
    y_fit_ls = test_func(x, *popt)
    ax0.plot(x, y_fit_ls, label=meth)
    print(meth, "took", infodict["nfev"])
    print(popt)
    try:
        ax1.semilogy(infodict["rhos"])
    except KeyError:
        pass
    %timeit opt()
ax0.legend()

%lprun -f curve_fit_dph curve_fit_dph(test_func, x, y, p0=pguess, full_output=True, jac=test_func_jac, method="ls")

Test MLE functions


In [ ]:
preal_mle = np.array([10, 0.25, .04, 15, 0.7, 0.07, 5])

x_mle = np.linspace(0,1, 256)
y_real_mle = test_func(x_mle, *preal_mle)

plt.plot(x_mle, y_real_mle, "--", label="Real")

y_mle = np.random.poisson(y_real_mle)

pguess_mle = np.array([ 10  ,  0.20,  0.03,  20.  ,  0.6 ,  0.05 ,  1.  ])
y_guess_mle = test_func(x_mle, *pguess_mle)
plt.plot(x_mle, y_mle, ".")
plt.plot(x_mle, y_guess_mle)

plt.plot(x_mle, y_mle, ".")
for meth in ("mle", "ls", "lm"):
    def opt():
        return curve_fit(
                    test_func, x_mle, y_mle, p0=pguess, full_output=True,
                    jac=test_func_jac, method=meth)
    popt, pcov, infodict, errmsg, ier = opt()
    y_fit_mle = test_func(x, *popt)
    plt.plot(x, y_fit_mle, label=meth)
    %timeit opt()
plt.legend()

Calculating the Hessian

The question is how to calculate the linear term of the Hessian

$$ \sum \frac{\partial M_i}{\partial x_k} \frac{\partial M_i}{\partial x_j} \frac{y_i}{M_i} $$

Where $\mathbf{M}$ is the model function and $\mathbf{y}$ is the observed data.

Because $\mathbf{J^T}\mathbf{J}$ is symmetric the following is true

$$ (\mathbf{J^T} \odot \mathbf{f})\mathbf{J} = \mathbf{J^T}(\mathbf{J} \odot \mathbf{f}) $$

Where $\odot$ signifies element wise multiplication


In [ ]:
ta, tb = np.random.randn(81, 7), np.random.randn(81)
%timeit (ta.T @ (ta * tb[:, None]))
%timeit ((ta.T * tb) @ ta)
np.allclose((ta.T @ (ta * tb[:, None])), ((ta.T * tb) @ ta))

In [ ]:
np.set_printoptions(precision=3)
print(ta.T @ ta)

In [ ]:
((ta.T * tb) @ ta)

In [ ]:
(ta.T @ (ta * tb[:, None]))

Compare methods


In [ ]:
preal_mle = np.array([10, 0.25, .04, 15, 0.7, 0.07, 5])

x_mle = np.linspace(0,1, 256)
y_real_mle = test_func(x_mle, *preal_mle)

plt.plot(x_mle, y_real_mle, "--", label="Real")

y_mle = np.random.poisson(y_real_mle)

pguess_mle = np.array([ 10  ,  0.20,  0.03,  20.  ,  0.6 ,  0.05 ,  1.  ])
y_guess_mle = test_func(x_mle, *pguess_mle)
plt.plot(x_mle, y_mle, ".")
plt.plot(x_mle, y_guess_mle)

popt_dph_mle, pcov, infodict, errmsg, ier = curve_fit_dph(
    test_func, x_mle, y_mle, p0=pguess, full_output=True,
    jac=test_func_jac, method="mle")

popt_dph_ls, pcov, infodict, errmsg, ier = curve_fit_dph(
    test_func, x_mle, y_mle, p0=pguess, full_output=True,
    jac=test_func_jac, method="ls")

popt_ls, pcov, infodict, errmsg, ier = curve_fit_dph(
    test_func, x_mle, y_mle, p0=pguess, full_output=True,
    jac=test_func_jac, method="lm")

# popt_sp, pcov, infodict, errmsg, ier = curve_fit(
#     test_func, x_mle, y_mle, p0=pguess, full_output=True,
#     jac=test_func_jac)


plt.plot(x_mle, y_mle, ".")
for popt, l in zip((popt_dph_mle, popt_dph_ls, popt_ls), ("popt_dph_mle", "popt_dph_ls", "popt_ls")):
    y_fit_mle = test_func(x, *popt)
    plt.plot(x, y_fit_mle, label=l)
plt.legend()
(preal_mle, popt_dph_mle, popt_dph_ls, popt_ls)

Scaling matters

Looks like we need to take the time to properly scale our algorithm otherwise it will be very slow


In [ ]:
for meth in ("ls", "lm"):
    for offset in (0, 1, 10, 100):
        plt.figure()
        preal_ls = np.array([1, 0.25, .04, 2, 0.7, 0.07, offset])
        pguess_ls = np.array([ 1.1  ,  0.23,  0.03,  2.  ,  0.65 ,  0.05 ,  offset])

        x_ls = np.linspace(0,1, 81)
        y_real_ls = test_func(x_ls, *preal_ls)

        plt.plot(x_ls, y_real_ls, "--", label="Real")

        y_ls = y_real_ls + np.random.randn(len(y_real_ls))*0.1

        y_guess_ls = test_func(x_ls, *pguess_ls)
        plt.plot(x_ls, y_ls, ".")
        plt.plot(x_ls, y_guess_ls)

        plt.plot(x_ls, y_ls, ".")
        def opt():
            return curve_fit_dph(
                        test_func, x_ls, y_ls, p0=pguess, full_output=True,
                        jac=test_func_jac, method=meth)
        popt, pcov, infodict, errmsg, ier = opt()
        y_fit_ls = test_func(x, *popt)
        plt.plot(x, y_fit_ls, label=meth)
        plt.title("meth = {} and offset = {} took {} iterations".format(meth, offset, infodict["nfev"]))
        %timeit opt()
        plt.legend()

In [ ]: