This software is provided under license "as is", without warranty of any kind including, but not limited to, fitness for a particular purpose. The user assumes the entire risk as to the use and performance of the software. In no event shall the copyright holder be held liable for any claim, damages or other liability arising from the use of the software.
In [295]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from shapely import geometry, ops
from scipy import optimize
from skimage import transform
from IPython import display
%matplotlib inline
In [296]:
def anderson(origin=(0, 0), beta=0.0, a=1.0, f=1.0, g=0.0, h=1.0, t=1.0):
"""
Anderson et al, "Modelling the spread of grass fires", 1981.
Inputs:
a: burining rate
t: time (minues after the start of a fire)
Outputs:
(geometry) linestring object
"""
x0, y0 = origin
theta = np.linspace(0, 2*np.pi, 100)
# Equation (4)
X = a*t*(f*np.cos(theta) + g)
Y = a*t*h*np.sin(theta)
rbeta = np.deg2rad(beta)
# Equation (6)
x = x0 + X * np.cos(rbeta) - Y * np.sin(rbeta)
y = y0 + X * np.sin(rbeta) + Y * np.cos(rbeta)
return geometry.linestring.LineString([(_x, _y) for _x, _y in zip(x, y)])
def fire_boundary(origin=(0,0), beta=0.0, n=3, max_t=100., a=1.0, duration=12):
"""Geometric fire boundary"""
t = np.linspace(1., max_t, n)
delta = (duration * 3600) / n
valid_time = [pd.Timestamp("2009-02-07 00:50:00+00:00") + pd.Timedelta((_n + 1) * delta, 's') for _n in range(n)]
geom = [anderson(origin=origin, beta=beta, a=a, f=1.0, g=1.0, h=0.7, t=_t) for _t in t]
return pd.DataFrame({'valid_time':valid_time, 'geometry':geom})
def fcst_fire(beta, n, duration, a=1):
return fire_boundary(beta=beta, n=n, duration=duration, a=a, max_t=1000., origin=(100, 100))
def obs_fire(n, duration):
return fire_boundary(beta=0, n=n, duration=duration, max_t=1000., origin=(100, 100))
In [297]:
def polygonize(dataset):
"""Transform the dataset into a polygon set"""
geom = dataset['geometry'].apply(lambda x: geometry.Polygon(x).buffer(0))
return ops.cascaded_union(geom)
In [298]:
def coord(bounds, spacing=80.):
"""
Form a grid of specific bounds and spacing.
"""
(minx, miny, maxx, maxy) = bounds
x = np.arange(minx, maxx, spacing)
y = np.arange(miny, maxy, spacing)
X, Y = np.meshgrid(x, y)
return np.r_[[X.flatten(), Y.flatten()]].T, X.shape
def intersection(coords, shape, polygon):
"""
Return the intersection as a mask.
"""
mask = np.array([polygon.contains(geometry.Point(x, y)) for x, y in coords])
return mask.reshape(shape)
In [299]:
fcst, obs = fcst_fire(50, 10, 100, a=0.8), obs_fire(10, 100)
obs_poly, fcst_poly = polygonize(obs), polygonize(fcst)
In [300]:
coords, shape = coord(obs_poly.buffer(5000).bounds, spacing=100.)
In [301]:
obs_grid = intersection(coords, shape, obs_poly).astype('float')
fcst_grid = intersection(coords, shape, fcst_poly).astype('float')
In [302]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 10))
ax1.matshow(obs_grid)
ax2.matshow(fcst_grid)
Out[302]:
Extending the CRA to fires requires some modifications, translation is removed and rotation and stretch transforms are used instead. The translations are pinned to a known ignition point (in grid space).
In [303]:
ignition_point = (50, 55)
In [304]:
def rot_warp(img, theta, ignition_point):
return transform.rotate(img, theta, center=ignition_point)
def rot_callback(params):
ax = plt.gca().matshow(rot_warp(fcst_grid, params[0], ignition_point) - obs_grid, cmap='jet', vmin=0, vmax=1)
plt.gca().plot(ignition_point[0], ignition_point[1], 'or')
plt.grid('on')
display.clear_output(wait=True)
display.display(plt.gcf())
def cost(params, fcst, obs, ignition_point):
return ((rot_warp(fcst, params[0], ignition_point).flatten() - obs.flatten())**2).sum()
In [305]:
theta = 0
res = optimize.minimize(cost,
theta,
args=(fcst_grid.astype('float'), obs_grid.astype('float'), ignition_point),
callback=rot_callback,
options={'disp': True})
theta = res.x
theta
Out[305]:
In [321]:
fig, ax = plt.subplots(figsize=(10, 10))
im = ax.matshow(rot_warp(fcst_grid, theta, ignition_point) - obs_grid, cmap='jet', vmin=-1, vmax=1)
plt.gca().plot(ignition_point[0], ignition_point[1], 'or')
plt.grid('on')
plt.colorbar(im, shrink=0.75)
Out[321]:
By applying the rotation to the forecast you can see that all the error not accounted for by simply rotating the field, there is an additional component that could be corrected for by stretching the forecast.
In [307]:
from skimage import measure
def estimate_angle(img):
"""
Estimate angle (from the x-axis) using a segmentation of the fire output.
** If there are multiple objects we return the median of estimated angle of each object.
"""
label_img = measure.label(img)
thetas = [x.orientation for x in measure.regionprops(label_img)]
return np.rad2deg(np.median(thetas))
def stretch_warp(img, alpha, beta, ignition_point):
"""
Stretch along along the lateral (alpha) and longitudial (beta) directions.
"""
center = np.asarray(p0)
angle = estimate_angle(img)
tform1 = transform.SimilarityTransform(translation=center)
tform2 = transform.SimilarityTransform(rotation=np.deg2rad(angle))
tform3 = transform.AffineTransform(scale=(1. - alpha, 1. - beta))
tform4 = transform.SimilarityTransform(rotation=np.deg2rad(-angle))
tform5 = transform.SimilarityTransform(translation=-center)
tform = tform5 + tform4 + tform3 + tform2 + tform1
return transform.warp(img, tform)
In [308]:
# from ipywidgets import interact
# fig = plt.figure(figsize=(6, 6))
# axe = fig.add_subplot(111)
# img = axe.matshow(fcst_grid,cmap='jet')
# def cbfn(theta, alpha, beta):
# data = rot_warp(stretch_warp(fcst_grid, alpha, beta, ignition_point), theta, ignition_point)
# img.set_data(data)
# img.autoscale()
# display.display(fig)
# interact(cbfn, theta=(0, 360, 5), alpha=(0., 1., 0.01), beta=(0., 1., 0.01));
In [407]:
def rot_warp(img, theta, ignition_point):
return transform.rotate(img, theta, center=ignition_point)
def warp(img, alpha, beta, theta, ignition_point):
return rot_warp(stretch_warp(img, alpha, beta, ignition_point), theta, ignition_point)
def alpha_callback(param):
ax = plt.gca().matshow(warp(fcst_grid, param[0], 0., theta, ignition_point) - obs_grid, cmap='jet', vmin=0, vmax=1)
plt.gca().plot(ignition_point[0], ignition_point[1], 'or')
plt.grid('on')
display.clear_output(wait=True)
display.display(plt.gcf())
def beta_callback(param):
ax = plt.gca().matshow(warp(fcst_grid, alpha, param[0], theta, ignition_point) - obs_grid, cmap='jet', vmin=0, vmax=1)
plt.gca().plot(ignition_point[0], ignition_point[1], 'or')
plt.grid('on')
display.clear_output(wait=True)
display.display(plt.gcf())
def cost_lateral(param, fcst, obs, ignition_point, theta):
return ((warp(fcst, param, 0., theta, ignition_point).flatten() - obs.flatten())**2).sum()
def cost_longitudial(param, fcst, obs, ignition_point, alpha, theta):
return ((warp(fcst, alpha, param, theta, ignition_point).flatten() - obs.flatten())**2).sum()
In [408]:
alpha = 0.
res = optimize.minimize(cost_lateral,
alpha,
args=(fcst_grid.astype('float'), obs_grid.astype('float'), ignition_point, theta),
callback=alpha_callback,
options={'disp': True})
alpha = res.x
In [409]:
beta = 0.
res = optimize.minimize(cost_longitudial,
beta,
args=(fcst_grid.astype('float'), obs_grid.astype('float'), ignition_point, alpha, theta),
callback=beta_callback,
options={'disp': True})
beta = res.x
In [410]:
fig, ax = plt.subplots(figsize=(10, 10))
im = ax.matshow(warp(fcst_grid, alpha, beta, theta, ignition_point) - obs_grid, cmap='jet', vmin=-1, vmax=1)
plt.gca().plot(ignition_point[0], ignition_point[1], 'or')
plt.grid('on')
plt.colorbar(im, shrink=0.75)
Out[410]:
In [411]:
fig, (ax1, ax2, ax3, ax4) = plt.subplots(1, 4, figsize=(14, 8))
im = ax1.matshow(fcst_grid - obs_grid, cmap='jet', vmin=-1, vmax=1)
im = ax2.matshow(rot_warp(fcst_grid, theta, ignition_point) - obs_grid, cmap='jet', vmin=-1, vmax=1)
im = ax3.matshow(warp(fcst_grid, alpha, 0., theta, ignition_point) - obs_grid, cmap='jet', vmin=-1, vmax=1)
im = ax4.matshow(warp(fcst_grid, alpha, beta, theta, ignition_point) - obs_grid, cmap='jet', vmin=-1, vmax=1)
In [415]:
total_error = ((fcst_grid.flatten() - obs_grid.flatten())**2).sum()
total_error
Out[415]:
In [416]:
rotaion_error = ((rot_warp(fcst_grid, theta, ignition_point).flatten() - obs_grid.flatten())**2).sum()
rotaion_error
Out[416]:
In [417]:
stretch_error_lat = ((warp(fcst_grid, 0., beta, theta, ignition_point).flatten() - obs_grid.flatten())**2).sum()
stretch_error_lat
Out[417]:
In [418]:
stretch_error_latlong = ((warp(fcst_grid, alpha, beta, theta, ignition_point).flatten() - obs_grid.flatten())**2).sum()
stretch_error_latlong
Out[418]:
In [420]:
error_components = np.abs(np.diff((total_error, shape_error, stretch_error_lat, stretch_error_latlong, 0))) / total_error
error_components
Out[420]:
In [421]:
dframe = pd.DataFrame(error_components).T
dframe.columns = ['rotation_error', 'stretch_error_lateral', 'stretch_error_longitudial', 'remaining_error']
dframe
Out[421]:
In [422]:
dframe.plot(kind='bar')
Out[422]:
In [ ]: