Fitting of dqdv peaks

The purpose of this notebook is to evaluate and develope a robust way of fitting dqdv data. The plan is then to implement this into the cellpy.utils.ica module (as seperate classes). It would also be valuable to equip the fitting class(es) with optional ipywidgets.


In [ ]:
%load_ext autoreload
%autoreload 2

In [ ]:
import os
import copy
import logging
import bokeh
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from cellpy import cellreader
from cellpy.utils import ica
import holoviews as hv

import ipywidgets as widgets
from ipywidgets import interact, interact_manual

%matplotlib inline
hv.extension('bokeh')

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

my_data = cellreader.CellpyData()
filename = "../../../testdata/hdf5/20160805_test001_45_cc.h5"
assert os.path.isfile(filename)
my_data.load(filename)
my_data.set_mass(0.1)

from icafit import *

Defining dqdv peak ensambles

The natural way (and my impression is that this is how other groups also do it) of conducting an "in-depth" ica study on a LiB cell would be to measure ica of re-buildt half-cells of both the cathode and the anode of the full cells, fit the peaks of the half cells, and then a use convolution of these fits to fit the actual full cell.

Examples from the literature

Should include references here...

Plan

  1. Create a class (PeakEnsamble)
  2. Create peak ensambles by sub-classing PeakEnsamble

ToDo

  • [x] Fix so that it is possible to turn crystalline peak on and off for Si peaks
  • [ ] Set peak attributes directly
  • [ ] Make it easy (and obvious) to use previous fit-prms for new fit
  • [x] Use __add__
  • [x] Combine all fit-results into one dataframe
  • [ ] Make a ipywidget for at least one of the prms (e.g. scale)
  • [ ] Make it possible to freeze peaks and ensambles
  • [ ] Make it possible to zero out peaks?
  • [x] Fit negative of discharge curves
  • [ ] Fit discharge and charge in one go? ("hysteresis parameter")

In [ ]:


In [ ]:
logger.setLevel(logging.DEBUG)

In [ ]:
import logging
from colorama import Fore
import ipywidgets as widgets

class log_viewer(logging.Handler):
    """ Class to redistribute python logging data """

    # have a class member to store the existing logger
    logger_instance = logging.getLogger("__name__")

    def __init__(self, output=None, up_side_down=True, max_lines=20, *args, **kwargs):
        self._output = output
        self.up_side_down = up_side_down
        self.max_lines = max_lines
        if self._output is None:
            self._output = widgets.Output(layout=widgets.Layout(width='600px', height='160px', border='solid')) 
        self._output.layout.overflow_y = "scroll"
 
        # Initialize the Handler
        logging.Handler.__init__(self, *args)

        # optional take format
        # setFormatter function is derived from logging.Handler
        for key, value in kwargs.items():
            if "{}".format(key) == "format":
                self.setFormatter(value)

        # make the logger send data to this class
        self.logger_instance.addHandler(self)
        
    @property
    def output(self):
        return self._output
        

    def emit(self, record):
        """ Overload of logging.Handler method """

        record = self.format(record)
        
        if self.up_side_down:
            self.output.outputs = (
                {
                    'name': 'stdout', 
                    'output_type': 
                    'stream', 
                    'text': (Fore.BLACK + (record + '\n'))
                },
            ) + self.output.outputs[:self.max_lines]
            
        else:
            self.output.outputs = self.output.outputs[-self.max_lines:] + (
                {
                    'name': 'stdout', 
                    'output_type': 'stream', 
                    'text': (Fore.BLACK + (record + '\n'))
                },
            )

Widgets


In [ ]:
import ipywidgets as widgets
from IPython.display import display
import matplotlib.pyplot as plt
%matplotlib inline

In [ ]:
class SiliconPeaksFitWidget(widgets.VBox):
    def __init__(self, silicon_peaks, x, y, name=None):
        """
        """
        #    [ ]             jitter
        #    [x] |-----o--|  scale
        #    [x] |-----o--|  shift
        #    [x] |-----o--|  sigma_p1
        #    [x] |-----o--|  max_point
        # TODO:
        # Need to implement result (and parameters) and update (as default) + button for reset
        # Need to make this work for CompositeEnsamble
        
        self.peaks_object = silicon_peaks
        self.x = x
        self.y = y
        self.invert_res = False
        self.invert_dq = False
        
        self.result = None
        
        if name is None:
            name = self.peaks_object.name
            
        jitter = self.peaks_object.jitter
        scale = self.peaks_object.scale
        offset = self.peaks_object.offset
        offset_vary = self.peaks_object.params["SiOffsetc"].vary
        shift = self.peaks_object.shift
        sigma_p1 = self.peaks_object.sigma_p1
        max_point = self.peaks_object.max_point
        crystalline = self.peaks_object.crystalline
        compress = self.peaks_object._compress
        expand = self.peaks_object._expand
        
        self.plot_output = widgets.Output()
        self.log_output = widgets.Output()
        self.output_to_log = False
        self.plot_fixed = False
        self.auto_clear_log = True
        
        self.w_name = widgets.Label(f"{name}")
        
        self.w_jitter = widgets.Checkbox(value=jitter, description="jitter")
        self.w_crystalline = widgets.Checkbox(value=crystalline, description="crystalline")
        
        self.w_scale = widgets.FloatSlider(
            value=scale, 
            min=0.00001,
            max=100*scale,
            continuous_update=False,
            description="scale",  # Currently using the description to link up to params (so you have to live with the bad names)
        )
        
        self.w_offset = widgets.FloatSlider(
            value=offset, 
            min=0.0,
            max=max_point,
            continuous_update=False,
            description="offset",  # Currently using the description to link up to params (so you have to live with the bad names)
        )
        self.w_offset_vary = widgets.Checkbox(value=offset_vary, description="offset_vary")
        
        self.w_shift = widgets.FloatSlider(
            value=shift, 
            min = -1,
            max = 1,
            step = 0.01,
            continuous_update=False,
            description="shift",
        )
        
        self.w_sigma_p1 = widgets.FloatSlider(
            value=sigma_p1, 
            min=0.000000001,
            max=10*sigma_p1,
            step=0.01,
            continuous_update=False,
            description="sigma_p1",
        )
        
        self.w_max_point = widgets.FloatSlider(
            value=max_point, 
            min = 0.00001,
            max = 10*max_point,
            continuous_update=False,
            description="max_point",
        )
        
        self.w_fit = widgets.Button(
            description="Fit!"
        )
        
        self.clear = widgets.Button(
            description="Clear!"
        )

        self.w_fit.on_click(self.fit)
        self.clear.on_click(self.clear_log)
       
        self.w_jitter.observe(self.on_w_change, 'value')
        self.w_scale.observe(self.on_w_change, 'value')
        self.w_offset_vary.observe(self.on_w_change, 'value')
        self.w_offset.observe(self.on_w_change, 'value')
        self.w_shift.observe(self.on_w_change, 'value')
        self.w_sigma_p1.observe(self.on_w_change, 'value')
        self.w_max_point.observe(self.on_w_change, 'value')
        self.w_crystalline.observe(self.on_w_change, 'value')
        
        self.widget_ids = ['scale', 'offset', 'shift', 'sigma_p1', 'max_point', 'crystalline']

        super(SiliconPeaksFitWidget, self).__init__()
        
        widget_box = widgets.VBox(
            [
                self.w_name,
                self.w_crystalline,
                self.w_jitter,
                self.w_offset_vary,
                self.w_scale,
                self.w_offset,
                self.w_shift,
                self.w_sigma_p1,
                self.w_max_point,
                widgets.HBox([self.w_fit, self.clear]),
            ]
        )
        
        row_1 = widgets.HBox([widget_box, self.plot_output])
        row_2 = self.log_output
        
        self.children = [
            row_1,
            row_2,
        ]
        
        self.update_plot("initial")
        
    def fit(self, change=None, y=None, x=None):
        # TODO: need to implement turning off y and x 
        
        _old_shift = self.peaks_object.shift
        _old_max_point = self.peaks_object.max_point
        _old_scale = self.peaks_object.scale
        
        if y is None:
            y = self.y
            
        if x is None:
            x = self.x
            
        if self.invert_dq:
            y = - y
        
        self.result = self.peaks_object.fit(y, x=x)
        if self.output_to_log:
            with self.log_output:
                if self.auto_clear_log:
                    self.log_output.clear_output()
                display(self.result)
                print(self.peaks_object)
            
        main_logger.info(f"auto update: {self.peaks_object.auto_update_from_fit}")
        _new_shift = self.peaks_object.shift
        _new_max_point = self.peaks_object.max_point
        _new_scale = self.peaks_object.scale
        main_logger.info(f"shift from {_old_shift} to {_new_shift}")
        main_logger.info(f"max_point from {_old_max_point} to {_new_max_point}")
        main_logger.info(f"scale from {_old_scale} to {_new_scale}")
            
        self.update_plot("new_fit")

        self.plot_fixed = True
        for what in self.widget_ids:
            value = getattr(self.peaks_object, what)
            main_logger.info(f" -> update widget. widget: {what} value: {value}")
            w = "w_" + what
            getattr(self, w).value = value
        self.plot_fixed = False
        
         
    def on_w_change(self, change):
        
        name = change.owner.description
        value = change.new
        main_logger.info(f"change observed name: {name} value: {value}")
        
        if name == "offset_vary":
            main_logger.info(f"old: {self.peaks_object.params['SiOffsetc'].vary} new: {value}")
            self.peaks_object.params["SiOffsetc"].vary = value
        elif name == "jitter":
            setattr(self.peaks_object, name, value)
        else:
            setattr(self.peaks_object, name, value)
            self.peaks_object.init()  # This removes the link to from the parameters to the widgets :-(
            
        self.update_plot(name)
        
    def _create_plot_object(self, components=None, group_title="fit", 
                           invert_dq=False, invert_res=False, width=500, height=500, size=8):

        if self.invert_dq:
            y = - self.y
        else:
            y = self.y

        i = 1
        if self.invert_res:
            i = -1
        main_logger.info("-> creating plot object")
        raw = hv.Points((self.x, y), label="raw", group=group_title).opts(
            width=width, height=height, size=size, alpha=0.3,
            xlabel="Voltage",
            ylabel="dQ/dv"
        )
        if components is not None:
            main_logger.info("-> components are not None")
            prt = {}
            for key in components:
                if not key.endswith("Scale"):
                    prt[key] = hv.Curve((self.x, i * components[key]), group=group_title)
            return raw * hv.NdOverlay(prt)

            
        prt = {
            "init": hv.Curve((self.x, i * self.result.init_fit), group=group_title).opts(alpha=0.5, tools=['hover']),
            "best": hv.Curve((self.x, i * self.result.best_fit), group=group_title).opts(tools=['hover']),
        }

        parts = self.result.eval_components(x=self.x)
        if not parts:
            main_logger.info("-> no parts extracted")
        
        s = self.peaks_object.scale  # set this to 1 if you dont want to use the scale factor when plotting
        for key in parts:
            if not key.endswith("Scale"):
                main_logger.info(f"-> adding {key} to the plot (scaled)")
                prt[key] = hv.Curve((self.x, i * s * parts[key]), group=group_title).opts(tools=['hover'])

        return raw * hv.NdOverlay(prt)

        
    def update_plot(self, name):
        if self.plot_fixed:
            main_logger.info("sorry, plot is fixed")
            return
        
        if name in ["scale", "shift", "offset", "sigma_p1", "max_point", "initial", "crystalline", "new_fit"]:
            
            with self.plot_output:
                if name == "new_fit":
                    main_logger.info("-------new-fit-------")
                    plotwindow = self._create_plot_object()
                else:
                    component = self.peaks_object.peaks.eval(self.peaks_object.params, x=self.x)
                    components = {"Init": component}
                    plotwindow = self._create_plot_object(components=components)
                self.plot_output.clear_output(wait=True)
                display(plotwindow)
            
    def clear_log(self, change=None):
        self.log_output.clear_output()
        
    def experimental(self, change=None):
        with self.log_output:
            print(self.peaks_object)
        
    def set_min(self, what, value):
        w = "w_" + what
        getattr(self, w).min = value
        
    def set_max(self, what, value):
        w = "w_" + what
        getattr(self, w).max = value
        
    def set_value(self, what, value):
        main_logger.info(f"setting value ({value}) to widget (w_{what})")
        w = "w_" + what
        getattr(self, w).value = value
        
    def set_step(self, what, value):
        w = "w_" + what
        getattr(self, w).step = value
        
# ---------------------------------------------------------------------------
visited = {}
options = list(range(1, 10+1))

def reset_cycle(event):
    main_logger.info("-> reset cycle")
    _load_cycle(sel.value, reset=True)
    
def load_cycle(change):
    cycle = change.new
    _load_cycle(cycle, reset=False)

def _load_cycle(cycle, reset):
    
    if not reset and cycle in visited.keys():
        c = visited[cycle]
        
    if reset and cycle in visited.keys():
        old = visited[cycle]
        v = old.x
        dq = old.y
        
        silicon = Silicon(shift=-0.0, max_point=dq.max(), sigma_p1=0.06)
        c = SiliconPeaksFitWidget(silicon, v, dq, f"Cycle {cycle}")
        visited[cycle] = c
        
    if cycle not in visited.keys():
        cha, volt = my_data.get_ccap(cycle)
        v, dq = ica.dqdv(volt, cha)
        
        silicon = Silicon(shift=-0.0, max_point=dq.max(), sigma_p1=0.06)
        c = SiliconPeaksFitWidget(silicon, v, dq, f"Cycle {cycle}")
        visited[cycle] = c
        
    with fit_window:
        fit_window.clear_output(wait=True)
        display(c)
        
def _copy(cycle1, cycle2):
    main_logger.info(f"-> copy from {cycle1} to {cycle2}")
    old = visited[cycle1]
    old_peaks = copy.deepcopy(old.peaks_object)
    cha, volt = my_data.get_ccap(cycle2)
    v, dq = ica.dqdv(volt, cha)
    new = SiliconPeaksFitWidget(old_peaks, v, dq, f"Cycle {cycle2}")
    visited[cycle2] = new
    sel.value = cycle2
        
def copy_cycle_forward(event):
    cycle1 = sel.value
    cycle2 = cycle1 + 1
    if cycle2 in options:
        _copy(cycle1, cycle2)
    else:
        main_logger.info(f"-> {cycle2} does not exist")

def copy_cycle_backward(event):
    cycle1 = sel.value
    cycle2 = cycle1 - 1
    if cycle2 in options:
        _copy(cycle1, cycle2)
    else:
        main_logger.info(f"-> {cycle2} does not exist")

fit_window = widgets.Output()

description = widgets.Label("Select cycle")
sel = widgets.Select(
    options=options,
    value=options[0],
    rows=20,
    disabled=False,
    layout=widgets.Layout(width='70%', height='220px'),
)
sel.observe(load_cycle, 'value')
reset = widgets.Button(description="Reset!")

copy_next = widgets.Button(description="Copy >>")
copy_prev = widgets.Button(description="Copy <<")

reset.on_click(reset_cycle)
copy_next.on_click(copy_cycle_forward)
copy_prev.on_click(copy_cycle_backward)

header = widgets.VBox([description, sel, reset, copy_next, copy_prev])
out = widgets.HBox([header, fit_window])

handler = log_viewer()
main_logger_out = handler.output

main_logger = logging.getLogger(__name__)
main_logger.addHandler(handler)
main_logger.setLevel(20)   # log at info level.

display(main_logger_out)
display(out)
_load_cycle(sel.value, reset=False)

Saving results


In [ ]:
import pickle

In [ ]:
cc = visited[3]

In [ ]:
c3 = cc.result

In [ ]:
b3 = c3.best_values

In [ ]:
pickle_out = open("example.pickle", "wb")

In [ ]:
pickle.dump(b3, pickle_out)

In [ ]:
pickle_out.close()

Loading results


In [ ]:
pickle_in = open("example.pickle","rb")
b3new = pickle.load(pickle_in)

In [ ]:
b3new

In [ ]:
cc.peaks_object.set_param("SiOffsetc", value=99.9999999707220)

In [ ]:
for key in b3new:
    cc.peaks_object.set_param(key, value=b3new[key])

In [ ]:
jsonstr = cc.peaks_object.params.dumps()

In [ ]:
ccc = visited[4]

In [ ]:
s = ccc.peaks_object.params.loads(jsonstr)

In [ ]:
cc.peaks_object.params

In [ ]:
ccc.peaks_object.params

In [ ]:


In [ ]:

Set values by writing


In [ ]:


In [ ]:
cc.set_value('shift', -0.2)

In [ ]:
cc.w_shift.value = 0.001

In [ ]:
cc.peaks_object.params.pretty_print()

In [ ]:
p_si = Silicon()
p_si.params["Si02sigma"]

In [ ]:
prm = "Si02sigma"
step = 0.01
p_si.params[prm].min += step

In [ ]:
# method two implemented in CompositeEnsamble
p_t = CompositeEnsemble(Silicon(), Graphite())
p_t.set_param('Si02sigma', minimum=0.02, vary=False)
p_t.reset_peaks()
print(f"hint: {p_t.param_hints['Si02sigma']} val: {p_t.params['Si02sigma']}")

In [ ]:


In [ ]:

Collecting all the results


In [ ]:


In [ ]:
import pandas as pd
pd.options.display.max_columns = None

In [ ]:
cycles = sorted(visited.keys())
all_dfs = []
for cycle in cycles:
    params = visited[cycle].peaks_object.params
    labels = params.keys()
    columns = list(labels)
    df = pd.DataFrame(columns=columns)
    df.loc[cycle] = None
    for label in labels:
        df.loc[cycle, label] = params[label].value
    all_dfs.append(df)
n_df = pd.concat(all_dfs)
n_df.index.name = "cycle"

In [ ]:
n_df

In [ ]:
n_df.plot(y=["Si01center", "Si02center"])

In [ ]: