How to write a Landlab component

This ipython notebook walks you through the basic procedure for writing a Landlab component, using the example of a kinematic-wave flow model.

Overview

A Landlab component is implemented as a Python class. Although every Landlab component is unique in some respects, to be a component, a class must have at least the following standard ingredients:

(1) The class must inherit the base class Component.

(2) The class must include a set of standard variables defined in the header (i.e., before the __init__ method), which describe the data arrays that the component uses.

(3) The class must have an __init__ method defined, with a semi-standardized parameter list described below.

(4) The class must provide a function that does performs the component's "action", typically named run_one_step() and this function's parameter list must follow the convention described below.

Class definition and header

A Landlab component is a class that inherits from Component. The name of the class should be in CamelCase, and should make sense when used in the sentence: "A (component-name) is a...". The class definition should be followed by a docstring. The docstring should include a list of parameters for the __init__ method and succintly describe them.


In [ ]:
import numpy as np

from landlab import Component, FieldError


class KinwaveOverlandFlowModel(Component):
    """
    Calculate water flow over topography.
    
    Landlab component that implements a two-dimensional 
    kinematic wave model.
    
    You can put other information here... Anything you 
    think a user might need to know. We use numpy style
    docstrings written in restructured text. You can use
    math formatting. 
    
    Useful Links:
    - https://www.sphinx-doc.org/en/master/usage/restructuredtext/basics.html
    - https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_numpy.html

    Parameters
    ----------
    grid : ModelGrid
        A Landlab grid object.
    precip_rate : float, optional (defaults to 1 mm/hr)
        Precipitation rate, mm/hr
    precip_duration : float, optional (defaults to 1 hour)
        Duration of precipitation, hours
    infilt_rate : float, optional (defaults to 0)
        Maximum rate of infiltration, mm/hr
    roughnes : float, defaults to 0.01
        Manning roughness coefficient, s/m^1/3

    """

    def __init__():  # ignore this for now, we will add more stuff eventually.
        pass

Doc tests

The following docstring section 'Examples' should help the user understand what is the component's purpose and how it works. It is an example (or examples) of its use in a (more or less) simple case within the Landlab framework: a grid is created, the component is instantiated on this grid and run. Unlike in the example below, we strongly recommend commenting your example(s) to explain what is happening.

This is also the section that will be run during the integration tests of your component (once you have submitted a pull request to have your component merged into the Landlab release branch). All lines starting with >>> are run and should produce the results you provided: here, the test will fail if kw.vel_coeff does not return 100.0.


In [ ]:
"""
    Examples
    --------
    >>> from landlab import RasterModelGrid
    >>> rg = RasterModelGrid((4, 5), 10.0)
    >>> kw = KinwaveOverlandFlowModel(rg)
    >>> kw.vel_coef
    100.0
    >>> rg.at_node['surface_water__depth']
    array([ 0.,  0.,  0.,  0.,  0.,
            0.,  0.,  0.,  0.,  0.,
            0.,  0.,  0.,  0.,  0.,
            0.,  0.,  0.,  0.,  0.])
    """

Header information: _name

Every component should have a name, as a string. Normally this will be the same as the class name.


In [ ]:
_name = "KinwaveOverlandFlowModel"

Header information: _unit_agnostic

Components also indicate whether they are unit agnostic or not. Unit agnostic components require that component users are consistent with units within and across components used in a single application, but do not require that inputs conform to a specific set of units.

This component is not unit agnostic because it includes an assumption that time units will be in hours but assumes that the Manning coefficient will be provided with time units of seconds.


In [ ]:
_unit_agnostic = False

Header information: _info

All the metadata about the fields that a components requires and creates is described in a datastructured called Component._info.

Info is a dictionary with one key for each field. The value associated with that key is a dictionary that must contain all of the following elements (and no other elements).

  • "dtype": a python data type
  • "intent": a string indicating whether the field is an input ("in"), and output ("out"), or both ("inout")
  • "optional": a boolean indicating whether the field is an optional input or output
  • "units": a string indicating what units the field has (use "-")
  • "mapping": a string indicating the grid element (e.g., node, cell) on which the field is located
  • "doc": a string describing the field.

The code in the Component base class will check things like:

  • Can the component be created if all of the required inputs exist?
  • Is all this information present? Is something extra present?
  • Does the component create outputs of the correct dtype?

In [ ]:
_info = {
    "surface_water__depth": {
        "dtype": float,
        "intent": "out",
        "optional": False,
        "units": "m",
        "mapping": "node",
        "doc": "Depth of water on the surface",
    },
    "topographic__elevation": {
        "dtype": float,
        "intent": "in",
        "optional": False,
        "units": "m",
        "mapping": "node",
        "doc": "Land surface topographic elevation",
    },
    "topographic__gradient": {
        "dtype": float,
        "intent": "in",
        "optional": False,
        "units": "m/m",
        "mapping": "link",
        "doc": "Gradient of the ground surface",
    },
    "water__specific_discharge": {
        "dtype": float,
        "intent": "out",
        "optional": False,
        "units": "m2/s",
        "mapping": "link",
        "doc": "flow discharge component in the direction of the link",
    },
    "water__velocity": {
        "dtype": float,
        "intent": "out",
        "optional": False,
        "units": "m/s",
        "mapping": "link",
        "doc": "flow velocity component in the direction of the link",
    },
}

Class with complete header information


In [ ]:
import numpy as np

from landlab import Component, FieldError


class KinwaveOverlandFlowModel(Component):
    """
    Calculate water flow over topography.
    
    Landlab component that implements a two-dimensional 
    kinematic wave model.
    
    Construction:
    
        KinwaveOverlandFlowModel(grid, [stuff to be added later])
    
    Parameters
    ----------
    grid : ModelGrid
        Landlab ModelGrid object
    precip_rate : float, optional (defaults to 1 mm/hr)
        Precipitation rate, mm/hr
    precip_duration : float, optional (defaults to 1 hour)
        Duration of precipitation, hours
    infilt_rate : float, optional (defaults to 0)
        Maximum rate of infiltration, mm/hr
    roughness : float, defaults to 0.01
        Manning roughness coefficient, s/m^1/3
    """
    
    _name = "KinwaveOverlandFlowModel"

    _unit_agnostic = False
    
    _info = {
        "surface_water__depth": {
            "dtype": float,
            "intent": "out",
            "optional": False,
            "units": "m",
            "mapping": "node",
            "doc": "Depth of water on the surface",
        },
        "topographic__elevation": {
            "dtype": float,
            "intent": "in",
            "optional": False,
            "units": "m",
            "mapping": "node",
            "doc": "Land surface topographic elevation",
        },
        "topographic__gradient": {
            "dtype": float,
            "intent": "in",
            "optional": False,
            "units": "m/m",
            "mapping": "link",
            "doc": "Gradient of the ground surface",
        },
        "water__specific_discharge": {
            "dtype": float,
            "intent": "out",
            "optional": False,
            "units": "m2/s",
            "mapping": "link",
            "doc": "flow discharge component in the direction of the link",
        },
        "water__velocity": {
            "dtype": float,
            "intent": "out",
            "optional": False,
            "units": "m/s",
            "mapping": "link",
            "doc": "flow velocity component in the direction of the link",
        },
    }

    def __init__():  # ignore this for now, we will add more stuff eventually.
        pass

The initialization method (__init__)

Every Landlab component should have an __init__ method. The parameter signature should start with a ModelGrid object as the first parameter. Following this are component-specific parameters. In our example, the parameters for the kinematic wave model include: precipiation rate, precipitation duration, infiltration rate, and roughness coefficient (Manning's n).

The first thing the component __init__ should do is call the super method. This calls the __init__ of the component's base class.

Two things a component __init__ method common does are (1) store the component's parameters as class attributes, and (2) create the necessary fields. When creating grid fields, it is important to first check to see whether a field with the same name (and mapping) already exists. For example, a driver or another component might have already created topographic__elevation when our kinematic wave component is initialized.


In [ ]:
def __init__(
    self, grid, precip_rate=1.0, precip_duration=1.0, infilt_rate=0.0, roughness=0.01
):
    """Initialize the KinwaveOverlandFlowModel.

    Parameters
    ----------
    grid : ModelGrid
        Landlab ModelGrid object
    precip_rate : float, optional (defaults to 1 mm/hr)
        Precipitation rate, mm/hr
    precip_duration : float, optional (defaults to 1 hour)
        Duration of precipitation, hours
    infilt_rate : float, optional (defaults to 0)
        Maximum rate of infiltration, mm/hr
    roughness : float, defaults to 0.01
        Manning roughness coefficient, s/m^1/3
    """
    super().__init__(grid)

    # Store parameters and do unit conversion
    self._current_time = 0

    self._precip = precip_rate / 3600000.0  # convert to m/s
    self._precip_duration = precip_duration * 3600.0  # h->s
    self._infilt = infilt_rate / 3600000.0  # convert to m/s
    self._vel_coef = 1.0 / roughness  # do division now to save time

    # Create fields...
    #   Elevation
    self._elev = grid.at_node["topographic__elevation"]

    #   Slope
    self._slope = grid.at_link["topographic__gradient"]

    self.initialize_output_fields()
    self._depth = grid.at_node["surface_water__depth"]
    self._vel = grid.at_link["water__velocity"]
    self._disch = grid.at_link["water__specific_discharge"]

    # Calculate the ground-surface slope (assume it won't change)
    self._slope[self._grid.active_links] = self._grid.calc_grad_at_link(self._elev)[
        self._grid.active_links
    ]
    self._sqrt_slope = np.sqrt(self._slope)
    self._sign_slope = np.sign(self._slope)

The "go" method, run_one_step()

Every Landlab component will have a method that implements the component's action. The go method can have any name you like, but the preferred practice for time-advancing components is to use the standard name run_one_step(). Landlab assumes that if a component has a method with this name, it will (a) be the primary "go" method, and (b) will be fully standardized as described here.

The run_one_step method should take either zero or one argument. If there is an argument, it should be a duration to run, dt; i.e., a timestep length. If the component does not evolve as time passes, this argument may be missing (see, e.g., the FlowRouter, which returns a steady state flow pattern independent of time).

The first step in the algorithm in the example below is to calculate water depth at the links, where we will be calculating the water discharge. In this particular case, we'll use the depth at the upslope of the two nodes. The grid method to do this, map_value_at_max_node_to_link, is one of many mapping functions available.

We then calculate velocity using the Manning equation, and specific discharge by multiplying velocity by depth.

Mass balance for the cells around nodes is computed using the calc_flux_div_at_node grid method.


In [ ]:
def run_one_step(self, dt):
    """Calculate water flow for a time period `dt`.

    Default units for dt are *seconds*.
    """
    # Calculate water depth at links. This implements an "upwind" scheme
    # in which water depth at the links is the depth at the higher of the
    # two nodes.
    H_link = self._grid.map_value_at_max_node_to_link(
        "topographic__elevation", "surface_water__depth"
    )

    # Calculate velocity using the Manning equation.
    self._vel = (
        -self._sign_slope * self._vel_coef * H_link ** 0.66667 * self._sqrt_slope
    )

    # Calculate discharge
    self._disch = H_link * self._vel

    # Flux divergence
    dqda = self._grid.calc_flux_div_at_node(self._disch)

    # Rate of change of water depth
    if self._current_time < self._precip_duration:
        ppt = self._precip
    else:
        ppt = 0.0
    dHdt = ppt - self._infilt - dqda

    # Update water depth: simple forward Euler scheme
    self._depth[self._grid.core_nodes] += dHdt[self._grid.core_nodes] * dt

    # Very crude numerical hack: prevent negative water depth
    self._depth[np.where(self._depth < 0.0)[0]] = 0.0

    self._current_time += dt

Changes to boundary conditions

Sometimes, (though not in this example), it proves convenient to hard-code assumptions about boundary conditions into the __init__ method.

We can resolve this issue by creating an additional component method that updates these components that can be called if the boundary conditions change. Whether the boundary conditions have changed can be assessed with a grid method called bc_set_code. This is an int which will change if the boundary conditions change.


In [ ]:
def __init__(self):
    """Initialize the Component.
    ...
    """
    super().__init__(grid)

    # Store grid and parameters and do unit conversion
    self._bc_set_code = self._grid.bc_set_code
    # ...


def updated_boundary_conditions(self):
    """Call if boundary conditions are updated.
    """
    # do things necessary if BCs are updated.


def run_one_step(self, dt):
    """Calculate water flow for a time period `dt`.
    """
    if self._bc_set_code != self.grid.bc_set_code:
        self.updated_boundary_conditions()
        self._bc_set_code = self.grid.bc_set_code
    # Do rest of run one step
    # ...

The complete component


In [ ]:
import numpy as np

from landlab import Component


class KinwaveOverlandFlowModel(Component):
    """Calculate water flow over topography.

    Landlab component that implements a two-dimensional
    kinematic wave model. This is an extremely simple, unsophisticated
    model, originally built simply to demonstrate the component creation
    process. Limitations to the present version include: infiltration is
    handled very crudely, the called is responsible for picking a stable
    time step size (no adaptive time stepping is used in the `run_one_step`
    method), precipitation rate is constant for a given duration (then zero),
    and all parameters are uniform in space. Also, the terrain is assumed
    to be stable over time. Caveat emptor!

    Examples
    --------
    >>> from landlab import RasterModelGrid
    >>> rg = RasterModelGrid((4, 5), xy_spacing=10.0)
    >>> z = rg.add_zeros("topographic__elevation", at="node")
    >>> s = rg.add_zeros("topographic__gradient", at="link")
    >>> kw = KinwaveOverlandFlowModel(rg)
    >>> kw.vel_coef
    100.0
    >>> rg.at_node['surface_water__depth']
    array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,
            0.,  0.,  0.,  0.,  0.,  0.,  0.])
    """

    _name = "KinwaveOverlandFlowModel"

    _unit_agnostic = False

    _info = {
        "surface_water__depth": {
            "dtype": float,
            "intent": "out",
            "optional": False,
            "units": "m",
            "mapping": "node",
            "doc": "Depth of water on the surface",
        },
        "topographic__elevation": {
            "dtype": float,
            "intent": "in",
            "optional": False,
            "units": "m",
            "mapping": "node",
            "doc": "Land surface topographic elevation",
        },
        "topographic__gradient": {
            "dtype": float,
            "intent": "in",
            "optional": False,
            "units": "m/m",
            "mapping": "link",
            "doc": "Gradient of the ground surface",
        },
        "water__specific_discharge": {
            "dtype": float,
            "intent": "out",
            "optional": False,
            "units": "m2/s",
            "mapping": "link",
            "doc": "flow discharge component in the direction of the link",
        },
        "water__velocity": {
            "dtype": float,
            "intent": "out",
            "optional": False,
            "units": "m/s",
            "mapping": "link",
            "doc": "flow velocity component in the direction of the link",
        },
    }

    def __init__(
        self,
        grid,
        precip_rate=1.0,
        precip_duration=1.0,
        infilt_rate=0.0,
        roughness=0.01,
    ):
        """Initialize the KinwaveOverlandFlowModel.

        Parameters
        ----------
        grid : ModelGrid
            Landlab ModelGrid object
        precip_rate : float, optional (defaults to 1 mm/hr)
            Precipitation rate, mm/hr
        precip_duration : float, optional (defaults to 1 hour)
            Duration of precipitation, hours
        infilt_rate : float, optional (defaults to 0)
            Maximum rate of infiltration, mm/hr
        roughness : float, defaults to 0.01
            Manning roughness coefficient, s/m^1/3
        """
        super().__init__(grid)

        # Store parameters and do unit conversion
        self._current_time = 0

        self._precip = precip_rate / 3600000.0  # convert to m/s
        self._precip_duration = precip_duration * 3600.0  # h->s
        self._infilt = infilt_rate / 3600000.0  # convert to m/s
        self._vel_coef = 1.0 / roughness  # do division now to save time

        # Create fields...
        #   Elevation
        self._elev = grid.at_node["topographic__elevation"]

        #   Slope
        self._slope = grid.at_link["topographic__gradient"]

        self.initialize_output_fields()
        self._depth = grid.at_node["surface_water__depth"]
        self._vel = grid.at_link["water__velocity"]
        self._disch = grid.at_link["water__specific_discharge"]

        # Calculate the ground-surface slope (assume it won't change)
        self._slope[self._grid.active_links] = self._grid.calc_grad_at_link(self._elev)[
            self._grid.active_links
        ]
        self._sqrt_slope = np.sqrt(self._slope)
        self._sign_slope = np.sign(self._slope)

    @property
    def vel_coef(self):
        """Velocity coefficient.

        (1/roughness)
        """
        return self._vel_coef

    def run_one_step(self, dt):
        """Calculate water flow for a time period `dt`.

        Default units for dt are *seconds*.
        """
        # Calculate water depth at links. This implements an "upwind" scheme
        # in which water depth at the links is the depth at the higher of the
        # two nodes.
        H_link = self._grid.map_value_at_max_node_to_link(
            "topographic__elevation", "surface_water__depth"
        )

        # Calculate velocity using the Manning equation.
        self._vel = (
            -self._sign_slope * self._vel_coef * H_link ** 0.66667 * self._sqrt_slope
        )

        # Calculate discharge
        self._disch[:] = H_link * self._vel

        # Flux divergence
        dqda = self._grid.calc_flux_div_at_node(self._disch)

        # Rate of change of water depth
        if self._current_time < self._precip_duration:
            ppt = self._precip
        else:
            ppt = 0.0
        dHdt = ppt - self._infilt - dqda

        # Update water depth: simple forward Euler scheme
        self._depth[self._grid.core_nodes] += dHdt[self._grid.core_nodes] * dt

        # Very crude numerical hack: prevent negative water depth
        self._depth[np.where(self._depth < 0.0)[0]] = 0.0

        self._current_time += dt

In [ ]:
from landlab import RasterModelGrid

nr = 3
nc = 4
rg = RasterModelGrid((nr, nc), 10.0)
rg.add_empty("topographic__elevation", at="node")
rg.add_zeros("topographic__gradient", at="link")
rg.at_node["topographic__elevation"][:] = rg.x_of_node.copy()
kinflow = KinwaveOverlandFlowModel(rg, precip_rate=100.0, precip_duration=100.0)

for i in range(100):
    kinflow.run_one_step(1.0)
print("The discharge from node 6 to node 5 should be -0.000278 m2/s:")
print(rg.at_link["water__specific_discharge"][8])
print("The discharge from node 5 to node 4 should be -0.000556 m2/s:")
print(rg.at_link["water__specific_discharge"][7])

Next, we'll test the component on a larger grid and a larger domain.


In [ ]:
nr = 62
nc = 42
rg = RasterModelGrid((nr, nc), 10.0)
rg.add_empty("topographic__elevation", at="node")
rg.at_node["topographic__elevation"] = 0.01 * rg.y_of_node
rg.add_zeros("topographic__gradient", at="link")
kinflow = KinwaveOverlandFlowModel(rg, precip_rate=100.0, precip_duration=100.0)
for i in range(1800):
    kinflow.run_one_step(1.0)

Plot the topography:


In [ ]:
%matplotlib inline
from landlab.plot import imshow_grid

imshow_grid(rg, "topographic__elevation")

The steady solution should be as follows. The unit discharge at the bottom edge should equal the precipitation rate, 100 mm/hr, times the slope length.

The slope length is the distance from the bottom edge of the bottom-most row of cells, to the top edge of the top-most row of cells. The base row of nodes are at y = 0, and the cell edges start half a cell width up from that, so y = 5 m. The top of the upper-most row of cells is half a cell width below the top grid edge, which is 610 m, so the top of the cells is 605 m. Hence the interior (cell) portion of the grid is 600 m long.

Hence, discharge out the bottom should be 100 mm/hr x 600 m = 0.1 m/hr x 600 m = 60 m2/hr. Let's convert this to m2/s:


In [ ]:
q_out = 0.1 * 600 / 3600.0
q_out

The water depth should be just sufficient to carry this discharge with the given slope and roughness. We get this by inverting the Manning equation:

$$q = (1/n) H^{5/3} S^{1/2}$$$$H^{5/3} = n q S^{-1/2}$$$$H = (n q)^{3/5} S^{-3/10}$$

The slope gradient is 0.01 (because we set elevation to be 0.01 times the y coordinate). The discharge, as we've already established, is about 0.0167 m2/s, and the roughness is 0.01 (the default value). Therefore,


In [ ]:
n = 0.01
q = 0.0167
S = 0.01
H_out = (n * q) ** 0.6 * S ** -0.3
H_out

In [ ]:
imshow_grid(rg, "surface_water__depth", cmap="Blues")

This looks pretty good. Let's check the values:


In [ ]:
rg.at_node["surface_water__depth"][42:84]  # bottom row of core nodes

We see that the depth agrees with the analytical solution to within three decimal places: not bad. Ideally, we would build the above tests into the component as doctests or unit tests. We could also test the transient solutions: rising hydrograph, falling hydrograph. Finally, we haven't tested all the ingredients; for example, we haven't tested what happens when infiltration rate is greater than zero.

Nonetheless, the above example illustrates the basics of component-making. A great next step would be to create a unit test based on this example.

Click here for more Landlab tutorials