Full waveform inversion with Devito and Dask (distributed computing)

Introduction

In this tutorial we show how Devito and scipy.optimize.minimize are used with Dask to perform full waveform inversion (FWI) on distributed memory parallel computers.

This tutorial uses the same synthetic datasets and model setup as the previous two tutorals, so check back if you get lost on parts of the code specific to Devito or SciPy.optimize.

What is Dask?

Dask is a flexible parallel computing library for analytic computing.

Dask is composed of two components:

  • Dynamic task scheduling optimized for computation...
  • “Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of the dynamic task schedulers.

Dask emphasizes the following virtues:

  • Familiar: Provides parallelized NumPy array and Pandas DataFrame objects
  • Flexible: Provides a task scheduling interface for more custom workloads and integration with other projects.
  • Native: Enables distributed computing in Pure Python with access to the PyData stack.
  • Fast: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
  • Scales up: Runs resiliently on clusters with 1000s of cores
  • Scales down: Trivial to set up and run on a laptop in a single process
  • Responsive: Designed with interactive computing in mind it provides rapid feedback and diagnostics to aid humans

We are going to use it here to parallelise the computation of the functional and gradient as this is the vast bulk of the computational expense of FWI and it is trivially parallel over data shots.

Setting up (synthetic) data

We are going to set up the same synthetic test case as for the previous tutorial (refer back for details). The code below is slightly re-engineered to make it suitable for using with scipy.optimize.minimize.


In [1]:
#NBVAL_IGNORE_OUTPUT
from examples.seismic import Model, demo_model

# Define the grid parameters
def get_grid():
    shape = (101, 101)    # Number of grid point (nx, nz)
    spacing = (10., 10.)  # Grid spacing in m. The domain size is now 1km by 1km
    origin = (0., 0.)     # Need origin to define relative source and receiver locations

    return shape, spacing, origin

# Define the test phantom; in this case we are using a simple circle
# so we can easily see what is going on.
def get_true_model():
    shape, spacing, origin = get_grid()
    return demo_model('circle-isotropic', vp=3.0, vp_background=2.5, 
                      origin=origin, shape=shape, spacing=spacing, nbpml=40)

# The initial guess for the subsurface model.
def get_initial_model():
    shape, spacing, origin = get_grid()

    return demo_model('circle-isotropic', vp=2.5, vp_background=2.5, 
                      origin=origin, shape=shape, spacing=spacing, nbpml=40)


from examples.seismic.acoustic import AcousticWaveSolver
from examples.seismic import RickerSource, Receiver

# This is used by the worker to get the current model. 
def get_current_model(param):
    """ Returns the current model.
    """
    model = get_initial_model()
    model.m.data[:] = np.reshape(np.load(param['model']), model.m.data.shape)
    return model

# Inversion crime alert! Here the worker is creating the 'observed' data
# using the real model. For a real case the worker would be reading
# seismic data from disk.
def get_data(param):
    """ Returns source and receiver data for a single shot labeled 'shot_id'.
    """
    true_model = get_true_model()
    dt = true_model.critical_dt  # Time step from model grid spacing

    # Set up source data and geometry.
    nt = int(1 + (param['tn']-param['t0']) / dt)  # Discrete time axis length

    src = RickerSource(name='src', grid=true_model.grid, f0=param['f0'],
                       time=np.linspace(param['t0'], param['tn'], nt))
    src.coordinates.data[0, :] = [30, param['shot_id']*1000./(param['nshots']-1)]

    # Set up receiver data and geometry.
    nreceivers = 101  # Number of receiver locations per shot
    rec = Receiver(name='rec', grid=true_model.grid, npoint=nreceivers, ntime=nt)
    rec.coordinates.data[:, 1] = np.linspace(0, true_model.domain_size[0], num=nreceivers)
    rec.coordinates.data[:, 0] = 980. # 20m from the right end

    # Set up solver - using model_in so that we have the same dt,
    # otherwise we should use pandas to resample the time series data. 
    solver = AcousticWaveSolver(true_model, src, rec, space_order=4)

    # Generate synthetic receiver data from true model
    true_d, _, _ = solver.forward(src=src, m=true_model.m)

    return src, true_d, nt, solver

Dask specifics

Previously we defined a function to calculate the individual contribution to the functional and gradient for each shot, which was then used in a loop over all shots. However, when using distributed frameworks such as Dask we instead think in terms of creating a worklist which gets mapped onto the worker pool. The sum reduction is also performed in parallel. For now however we assume that the scipy.optimize.minimize itself is running on the master process; this is a reasonable simplification because the computational cost of calculating (f, g) far exceeds the other compute costs.

Because we want to be able to use standard reduction operators such as sum on (f, g) we first define it as a type so that we can define the __add__ (and __rand__ method).


In [2]:
# Define a type to store the functional and gradient.
class fg_pair:
    def __init__(self, f, g):
        self.f = f
        self.g = g
    
    def __add__(self, other):
        f = self.f + other.f
        g = self.g + other.g
        
        return fg_pair(f, g)
    
    def __radd__(self, other):
        if other == 0:
            return self
        else:
            return self.__add__(other)

Next we define the functional and gradient operator for a single shot of data. This is the work that is going to be performed by the worker on a unit of data.


In [3]:
from devito import Function, clear_cache

# Create FWI gradient kernel for a single shot
def fwi_gradient_i(param):
    # Need to clear the workers cache.
    clear_cache()

    # Load the current model and the shot data for this worker.
    # Note, unlike the serial example the model is not passed in
    # as an argument. Broadcasting large datasets is considered
    # a programming anti-pattern and at the time of writing it
    # it only worked relaiably with Dask master. Therefore, the
    # the model is communicated via a file.
    model0 = get_current_model(param)
    src, rec, nt, solver = get_data(param)
    
    # Create symbols to hold the gradient and the misfit between
    # the 'measured' and simulated data.
    grad = Function(name="grad", grid=model0.grid)
    residual = Receiver(name='rec', grid=model0.grid, ntime=nt, coordinates=rec.coordinates.data)
    
    # Compute simulated data and full forward wavefield u0
    d, u0, _ = solver.forward(src=src, m=model0.m, save=True)
        
    # Compute the data misfit (residual) and objective function  
    residual.data[:] = d.data[:] - rec.data[:]
    f = .5*np.linalg.norm(residual.data.flatten())**2
    
    # Compute gradient using the adjoint-state method. Note, this
    # backpropagates the data misfit through the model.
    solver.gradient(rec=residual, u=u0, m=model0.m, grad=grad)
    
    # Copying here to avoid a (probably overzealous) destructor deleting
    # the gradient before Dask has had a chance to communicate it.
    g = np.array(grad.data[:])
    
    # return the objective functional and gradient.
    return fg_pair(f, g)

Define the global functional-gradient operator. This does the following:

  • Creates the Dask cluster
  • Maps the worklist (shots) to the workers so that the invidual contributions to (f, g) are computed.
  • Sum individual contributions to (f, g) and returns the result.

In [4]:
import numpy as np
from distributed import Client

# Dumps the model to disk; workers will pick this up when they need it.
def dump_model(param, model):
    np.save(param['model'], model.astype(np.float32))

def fwi_gradient(model, param):
    # Dump a copy of the current model for the workers
    # to pick up when they are ready.
    param['model'] = "model_0.npy"
    dump_model(param, model)

    # Define work list
    work = [dict(param) for i in range(param['nshots'])]
    for i in  range(param['nshots']):
        work[i]['shot_id'] = i
        
    # Start dask cluster
    client = Client()
    
    # Distribute worklist to workers.
    fgi = client.map(fwi_gradient_i, work)
    
    # Perform reduction.
    fg = client.submit(sum, fgi).result()
    
    # Shutdown cluster
    client.close()
    
    # L-BFGS in scipy expects a flat array in 64-bit floats.
    return fg.f, fg.g.flatten().astype(np.float64)

FWI with L-BFGS-B

Equipped with a function to calculate the functional and gradient, we are finally ready to define the optimization function.


In [5]:
from scipy import optimize

# Define bounding box constraints on the solution.
def apply_box_constraint(m):
    # Maximum possible 'realistic' velocity is 3.5 km/sec
    # Minimum possible 'realistic' velocity is 2 km/sec
    return np.clip(m, 1/3.5**2, 1/2**2)

# Many optimization methods in scipy.optimize.minimize accept a callback
# function that can operate on the solution after every iteration. Here
# we use this to apply box constraints and to monitor the true relative
# solution error.
relative_error = []
def fwi_callbacks(x):
    # Apply boundary constraint
    x.data[:] = apply_box_constraint(x)
    
    # Calculate true relative error
    true_x = get_true_model().m.data.flatten()
    relative_error.append(np.linalg.norm((x-true_x)/true_x))

def fwi(model, param, ftol=0.1, maxiter=20):
    result = optimize.minimize(fwi_gradient,
                               model.m.data.flatten().astype(np.float64),
                               args=(param, ), method='L-BFGS-B', jac=True,
                               callback=fwi_callbacks,
                               options={'ftol':ftol,
                                        'maxiter':maxiter,
                                        'disp':True})

    return result

We now apply our FWI function and have a look at the result.


In [6]:
#NBVAL_SKIP

# Change to the WARNING log level to reduce log output
# as compared to the default DEBUG
from devito import configuration
configuration['log_level'] = 'WARNING'

# Set up inversion parameters. This time we are using
# more shots because we are running in parallel.
param = {'t0': 0.,
         'tn': 1000.,           # Simulation lasts 1 second (1000 ms)
         'f0': 0.010,           # Source peak frequency is 10Hz (0.010 kHz)
         'nshots': 16}          # Number of shots to create gradient from

model0 = get_initial_model()

# Baby steps
result = fwi(model0, param)

# Print out results of optimizer.
print(result)


      fun: 59.130213695617584
 hess_inv: <32761x32761 LbfgsInvHessProduct with dtype=float64>
      jac: array([ 0.,  0.,  0., ...,  0.,  0.,  0.])
  message: b'CONVERGENCE: REL_REDUCTION_OF_F_<=_FACTR*EPSMCH'
     nfev: 15
      nit: 14
   status: 0
  success: True
        x: array([ 0.16,  0.16,  0.16, ...,  0.16,  0.16,  0.16])

In [7]:
#NBVAL_SKIP

# Show what the update does to the model
from examples.seismic import plot_image, plot_velocity

model0.m.data[:] = result.x.astype(np.float32).reshape(model0.m.data.shape)
model0.vp = np.sqrt(1. / model0.m.data[40:-40, 40:-40])
plot_velocity(model0)



In [8]:
#NBVAL_SKIP

# Plot percentage error
plot_image(100*np.abs(model0.vp-get_true_model().vp.data)/get_true_model().vp.data, vmax=15, cmap="hot")



In [9]:
#NBVAL_SKIP
import matplotlib.pyplot as plt

# Plot objective function decrease
plt.figure()
plt.loglog(relative_error)
plt.xlabel('Iteration number')
plt.ylabel('True relative error')
plt.title('Convergence')
plt.show()


This notebook is part of the tutorial "Optimised Symbolic Finite Difference Computation with Devito" presented at the Intel® HPC Developer Conference 2017.