Fit a Step – Exercise 7

Based on the Intrinsic Dispersion fitting class you created last week create a new class that allow to fit a step.

Let's imagine a physical process having a transition at a given threshold value. This could for instance be applied to astrophysics. Cepheids (pulsating stars) have an average magnitude (luminosity in log-scale) m0 if their pulsating period is lower than, say, 10 and an average magnitude m1 otherwise (nb: this is not true in real life). Each group has a natural dispersion of sigma0 and sigma1, respectively.

Goal

The pulsation and magnitude of each stars have been measured (see data) and we want to derive the average magnitudes (m0, m1) and natural dispersions (sigma0, sigma1) for the Cepheids.

Data

The data are recorded in data/Cepheid_StepFit.dat

About the data and the probability for a star to be in the low- and high-frequency pulsation group

The data have errors both on the pulsation and manitude measurements. All errors are gaussian errors Because of the errors on the pulsation measurements, a given star will have a probability p0 to belong to the low-pulsation group (below 10) and a probability p1=1-p0 to belong to the high-pulsation one.


In [2]:
# No annoying warnings
import warnings
warnings.filterwarnings('ignore')
# ==  Basic import == #
# plot within the notebook
%matplotlib inline
import numpy as np
import matplotlib.pyplot as mpl

Tools for the fit


In [22]:
from scipy.optimize import minimize
from scipy import stats

Step 1 - Read the data


In [23]:
import astropy.table as t

In [24]:
table = t.Table.read("data/Cepheid_StepFit.dat", format="ascii")

Sorry for the negative errors. Bug while building the fake data


In [25]:
table


Out[25]:
<Table length=200>
pulsationpuls_errmagmag_err
float64float64float64float64
10.7262805618-0.9919251250710.362589167609-0.167067818149
11.8456540358-1.474417116860.6614054466880.270360149602
8.29372283731-2.005570878281.015970401110.311076660544
7.18562098049-4.087285151540.1555159683290.152934504583
11.3313794143-1.438270799370.3451824423140.0397369379362
9.20457104163-1.06753922942-0.104938570016-0.138390245452
11.6401924192-1.437433588860.4434377765480.16803211009
12.49814579030.166576191529-0.00818498809317-0.167897235595
10.53823409050.07730540620610.7167579907130.469007655091
12.1909935025-0.5083750037880.4055922525380.0521159047735
............
7.912202425580.643958144493-0.410840400751-0.0972193133187
6.91390847388-0.89338780628-0.45294527005-0.0302628608687
10.92169350771.66426743531-0.561503377883-0.250259431938
9.588160991031.04873521824-0.31869759471-0.0151169389923
6.20171585986-0.05259147056340.004371554205090.0879904192413
7.630312180010.149832940027-0.678557038985-0.131686602501
8.38368957380.337427883715-0.674792615916-0.457410524368
10.52896610243.8884425811-0.611655650476-0.289230057226
5.90026531082-0.432635523778-0.320479479013-0.200302639126
6.25532417158-0.298092935734-0.08109365928650.144530448458

Step 2 - Build the Fitting class


In [35]:
class StepFit( object ):
    """ Class to fit a bimodal guassian distribution """
    
    FREEPARAMETERS = ["mean_a", "mean_b", "sigma_a", "sigma_b"]
    NAME = "StepFit"
    def __init__(self, x, y, xerr, yerr, xcut=None):
        """ Initialize the instance
        Parameter:
        ----------
        x,y,dx,dy: [N-length array]
            x and y-axis data with their respective errors.
        Return
        ------
        Void
        """
        if len(x) != len(xerr) or len(y) != len(yerr) or len(x) != len(y):
            raise ValueError("x, y , xerr and yerr must have the same size.")
        
        self.x     = np.asarray(x)
        self.xerr  = np.abs(np.asarray(xerr))
        self.y     = np.asarray(y)
        self.yerr  = np.abs(np.asarray(yerr))
        if xcut is not None:
            self.set_proba(xcut)
        
    # ---------------- #
    #  Main Methods    #
    # ---------------- #
    def fit(self, guesses, verbose=True):
        """ """
        # - Setup
        self.set_guesses(guesses)
        # - Fit
        self.fitout = minimize(self.minus_loglikelihood, self.guesses)
        if verbose:
            print self.fitout
        
    def minus_loglikelihood(self, parameters):
        """ The - sum (log(Likelihood)) for the given parameters
        This method can be used for fitting procedure
        
        Parameters
        ----------
        parameters: [N-length array]
            Values setting the model. N must be the size of mode's
            freeparameters (see self.FREEPARAMETERS)
            
        Return
        ------
        float
        """
        # - Test if the probability has been initialized
        if "proba_a" not in dir(self):
            raise AttributeError(" The probility of each object to be from 'a' or 'b' has not been set.")
            
        mean_a,mean_b, sigma_a, sigma_b = parameters
        
        Likelihood = self.proba_a * stats.norm.pdf(self.y, 
                                                   loc=mean_a, scale= np.sqrt(sigma_a**2 + self.yerr**2)) +\
                 (1-self.proba_a) * stats.norm.pdf(self.y, 
                                                   loc=mean_b, scale= np.sqrt(sigma_b**2 + self.yerr**2))
            
        return -np.sum(np.log(Likelihood))

    # --------- #
    #  SETTER   #
    # --------- #
    def set_proba(self, xcut):
        """ define the probability of each points
        to be above or below the xcut given x and xerr.
        
        Parameters
        ----------
        xcut: [float]
            Value defining the location of the step.
            
        Return
        ------
        Void
        """
        if type(xcut) is not float:
            raise TypeError("xcut must be a float.")
        
        self.xcut = xcut
        self.proba_a = stats.norm.cdf(xcut, loc=self.x, scale=self.xerr)
        
    def set_guesses(self, guesses):
        """ set the initial fit parameters 
        
        Parameters
        ----------
        guesses: [N-length array]
            the Initial values for the fit procedure. 
            N must be the number of free parameters.
            (see self.FREEPARAMETERS)
            
        Return
        ------
        Void
        """
        if len(guesses) != len(self.FREEPARAMETERS):
            raise ValueError("guess must have %d entries (%d given)"%(len(self.FREEPARAMETERS), len(guesses)))
        self.guesses = np.asarray(guesses)
        
    
    # --------- #
    #  PLOTTER  #
    # --------- #
    def plot(self, xbins=10, ybins=10, **kwargs):
        """ plot the data and the associated best fit (if any) """
        
        has_proba = "proba_a" in dir(self)
        fit_done  = "fitout" in dir(self)
        
        # ----------- #
        #  Axes       #
        # ----------- #
        fig = mpl.figure(figsize=[10,8])
        ax_size=0.63
        # rect (left, lower, width, heigth) in fig coords.
        ax  = fig.add_axes([0.15,0.15,ax_size,ax_size])
        axhx = fig.add_axes([0.15,0.8,ax_size,0.18],sharex=ax)
        axhy = fig.add_axes([0.8,0.15,0.18,ax_size],sharey=ax)
        
        # - remove the ticks labels
        [[label.set_visible(False) for label in lticks]
            for lticks in [axhx.get_xticklabels(),axhx.get_yticklabels()]]
        [[label.set_visible(False) for label in lticks]
            for lticks in [axhy.get_xticklabels(),axhy.get_yticklabels()]]
        # ----------- #
        #  Scatter    #
        # ----------- #
        # - Scatter
        prop = dict(marker="o", s=80, linewidths = 1, cmap = mpl.cm.Blues, zorder=6)
        prop["facecolors"] = 0.5  if not has_proba else self.proba_a
        
        for k,v in kwargs.items():
            prop[k] = v
        #  Python 3 only
        # prop = **prop + **kwargs
            
        ax.scatter(self.x,self.y, **prop)
        ax.errorbar(self.x, self.y, xerr=self.xerr, yerr=self.yerr,
                    ecolor="0.7", zorder=0.3, marker="s",ls="None")
        # ----------- #
        #  Histo      #
        # ----------- #
        prop_hist = dict(histtype="step", lw=2,ec="k")
        if not has_proba:
            axhx.hist(self.x, xbins, **prop_hist)
            axhy.hist(self.y, ybins, 
                      orientation="horizontal",**prop_hist)
        else:
            
            # - b-group
            axhx.hist(self.x, xbins, weights = 1-self.proba_a,
                      fill=False,
                      **prop_hist)
            axhy.hist(self.y, ybins, weights = 1-self.proba_a,
                      fill=False, 
                      orientation="horizontal",
                      **prop_hist)
            # - a-group
            prop_hist["lw"] = 1
            prop_hist["ec"] = prop["cmap"](1.,0.8)
            axhx.hist(self.x, xbins, weights = self.proba_a,
                      fill=True, fc= prop["cmap"](1.,0.4),
                      **prop_hist)
            axhy.hist(self.y, ybins, weights = self.proba_a,
                      fill=True, fc= prop["cmap"](1.,0.4),
                      orientation="horizontal",
                      **prop_hist)
            
        # ------------- #
        #  Fit Results  #
        # ------------- #
        if fit_done:
            mean_a,mean_b,sigma_a, sigma_b = self.fitout["x"]
            mean_a_err,mean_b_err = np.sqrt(self.fitout["hess_inv"][0][0]),np.sqrt(self.fitout["hess_inv"][1][1])
            
            ax.plot([10,20], [mean_b,mean_b], scalex=False, lw=0.5,color="k")
            
            ax.plot([0,10], [mean_a,mean_a], scalex=False, lw=0.5,color=prop["cmap"](0.9,0.9))
            
            ax.fill_between([10,20], [mean_b+mean_b_err,mean_b+mean_b_err],
                            [mean_b-mean_b_err,mean_b-mean_b_err],
                            color="k", alpha=0.3)
            
            ax.fill_between([0,10], [mean_a+mean_a_err,mean_a+mean_a_err],
                            [mean_a-mean_a_err,mean_a-mean_a_err],
                            color=prop["cmap"](0.9,0.9), alpha=0.3)
            
            
        ax.set_ylim(-1.5,1.5)
        ax.set_xlim(0,20)

In [36]:
step = StepFit(table["pulsation"], table["mag"], table["puls_err"], table["mag_err"], xcut=10.)

In [39]:
step.fit([-1,1,0.3,0.4], verbose=False)
step.plot(marker="s",s=130, cmap=mpl.cm.jet)



In [9]:
print step.fitout


      fun: 15.312346480672295
 hess_inv: array([[  2.08478283e-04,   6.80472799e-06,  -5.68036183e-06,
         -4.26019109e-07],
       [  6.80472799e-06,   4.99504345e-04,   3.54875255e-06,
         -3.10266249e-05],
       [ -5.68036183e-06,   3.54875255e-06,   1.81977416e-04,
         -1.50470987e-06],
       [ -4.26019109e-07,  -3.10266249e-05,  -1.50470987e-06,
          3.25548910e-04]])
      jac: array([  4.76837158e-07,   2.38418579e-07,   5.96046448e-07,
         5.96046448e-07])
  message: 'Optimization terminated successfully.'
     nfev: 258
      nit: 22
     njev: 43
   status: 0
  success: True
        x: array([-0.33292375,  0.41510775,  0.08826342,  0.17480126])

In [ ]:


In [ ]:


In [ ]:


In [ ]:

Example Kwargs


In [ ]:


In [15]:
x = np.random.random(100)
y = np.random.random(100)
yerr = np.random.random(100)/3.

In [18]:
def scatter_it(x,y, yerr, prop={},**kwargs):
    """ 
    **kwargs goes to matplotlib's scatter method
    """
    fig = mpl.figure()
    ax = fig.add_subplot(111)
    ax.plot(x,y, **kwargs)
    ax.errorbar(x,y,yerr=yerr,**prop)

In [20]:
scatter_it(x,y,yerr,prop=dict(ecolor="0.7"), ls="None", marker="o")



In [ ]:
mpl.cm.

mpl.scatter