In [ ]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import warnings; warnings.simplefilter('ignore')
In this notebook forest change is detected by observing two contiguous acquisitions from landsat_8
imagery. The comparisons between a before
and after
images offers insight into changes in vegetation and inundation over a varied topography to give indication of deforestation. A broader history spanning one year into the past is also considered
The diagram below shows a series of very simple rules applied to a given point in the imagery to make a classification of forest changed
or not changed
Implementation, with the help of xarray
datastructures is equally as simple.
In [ ]:
def ndvi(dataset):
return ((dataset.nir - dataset.red)/(dataset.nir + dataset.red)).rename("NDVI")
In this step. The comparison of before
and after
NDVI values can indicate a loss of vegetation and is used as the first criteria for detecting deforestation.
The comparison relies on differencing between acquisitions.
In [ ]:
def delta(before, after):
return after - before
def vegetation_change(before, after):
vegetation_before = ndvi(before)
vegetation_after = ndvi(after)
return delta(vegetation_before,
vegetation_after)
Through a process of visual survey a threshold is determined for NDVI decrease. In our example the determined threshold is 0.18
, which means that pixels with NDVI subtraction smaller than -0.18 will be retained, larger than (-0.18) will not be considered as a possible point of deforestation
In [ ]:
def meets_devegetation_criteria(before,after, threshold = -0.18):
#In this code the following comparison is applied to all pixels.
#It returns a grid of True and False values to be used for filtering our dataset.
return (vegetation_change(before, after) < threshold).rename("devegetation_mask").astype(bool)
Agricultural cultivation often occurs on flat and low land. This assumption is used to deal with the possibility of agricultural activity interfereing with a detection of deforestation/devegatation.
Thus, pixels with DEM value smaller than 30m are discarded.
In [ ]:
## ASTER
def meets_minimum_height_criteria(aster_ds, threshold = 30):
return (aster_ds.num > threshold).astype(bool).rename("height_mask")
Most forests are tall with canopies obscuring the view of the forest floor. Visually speaking, flooding should have very little effect in NDVI values since flooding happens below the canopy.
The presence of water at some point in the year might indicate that the detected vegetation is not forest, but an agricultural area or pond covered by vegetation water-fern.
MNDWI
is a calculated index that correlates highly with the existence of water. Like NDVI
its formulation is simple.
$$MNDWI = \frac{pixel_{green} - pixel_{swir1}}{pixel_{green} + pixel_{swir1}} $$
So is the implementation in code
In [ ]:
def mndwi(dataset):
return (dataset.green - dataset.swir1)/(dataset.green + dataset.swir1).rename("MNDWI")
#In this code the following arithmetic is applied to all pixels in a dataset
In this step, the entire MNDWI
history(time component) is analyzed and the max MNDWI
is taken into account for each pixel. If, at some point in time, any MNDWI surpases some threshold, the assumption is made that the area is not a forest.
In [ ]:
def max_mndwi(dataset):
#In this code max is applied accross across all pixels along a time component.
_max = mndwi(dataset).max(dim = ['time'])
return _max.rename("max_MNDWI")
A threshold is emperically evaulated to reduce the rate of false positives caused by inundated areas. In this case that threshold is 0. Pixels with max_MNDWI smaller than 0 will be retained, greater be discarded.
In [ ]:
def meets_inundation_criteria(dataset, threshold = 0.0):
return (max_mndwi(dataset) < threshold).rename("inundation_mask")
In [ ]:
def create_ndvi_matrix(ds):
ndvi_matrix = ndvi(ds)
return ndvi_matrix.where(ds.cloud_mask)
def delta_matrix(ds):
return np.diff(ds, axis = 0)
def vegetation_sum_threshold(ds, threshold = 10):
ndvi_delta_matrix = delta_matrix(create_ndvi_matrix(ds)) # lat,lon,t-1 matrix
ndvi_change_magnitude = np.absolute(ndvi_delta_matrix) # lat,lon,t-1 matrix
cummulative_ndvi_change_magnitude = np.nansum(ndvi_change_magnitude, axis = 0) #lat, lon matrix,
ndvi_change_repeat_mask = cummulative_ndvi_change_magnitude < threshold #lat, lon boolean matrix
return xr.DataArray(ndvi_change_repeat_mask,
dims = ["latitude", "longitude"],
coords = {"latitude": ds.latitude, "longitude": ds.longitude},
attrs = {"threshold": threshold},
name = "repeat_devegetation_mask")
The assumption is made that deforestation happens on a sufficiently large scale. Another assumption is made that detected deforestation in a pixel is typically spatially correlated with deforestation in adjacent pixels.
Following this reasoning, the final filter retains groups of deforestation larger than 5
pixels per group. The diagram below
illustrates a candidate for deforestation based on this grouping criteria, and one that is rejected based on this criteria.
The implementation relies on adequately grouping pixels in an efficient manner. The use of skimage
's measure
module yields segemented arrays of pixel groups.
In [ ]:
from skimage import measure
import numpy as np
def group_pixels(array, connectivity = 1):
arr = measure.label(array, connectivity=connectivity)
return [np.dstack(np.where(arr == y))[0] for y in range(1,np.amax(arr))]
In [ ]:
def numpy_group_mask(boolean_np_array, min_size = 5):
all_groups = group_pixels(boolean_np_array.astype(int))
candidate_groups = filter(lambda group:
(len(group) > min_size) & (group != 0).all(),
all_groups)
candidate_pixels = (pixel for group in candidate_groups for pixel in group)
dynamic_array = np.zeros(boolean_np_array.shape)
for x,y in candidate_pixels:
dynamic_array[x][y] = 1
return dynamic_array.astype(bool)
In [ ]:
def boolean_xarray_segmentation_filter(da, min_size = 5):
mask_np = numpy_group_mask(da.values,min_size = 5)
return xr.DataArray(mask_np,
dims = da.dims,
coords = da.coords,
attrs = {"group_size": min_size},
name = "filtered_chunks_mask")
In [ ]:
# Zanzibar, Tanzania
latitude = (-6.2238, -6.1267)
longitude = (39.2298, 39.2909)
date_range = ('2000-01-01', '2001-12-31')
In [ ]:
## The code below renders a map that can be used to orient yourself with the region.
from utils.data_cube_utilities.dc_display_map import display_map
display_map(latitude = latitude, longitude = longitude)
In [ ]:
import datacube
dc = datacube.Datacube()
Landsat 7
In [ ]:
ls7_extents = dict(latitude = latitude,
longitude = longitude,
platform = "LANDSAT_7",
product = "ls7_usgs_sr_scene",
time = date_range,
measurements = ['red','green','blue','nir','swir1', 'swir2', 'pixel_qa'] )
ASTER GDEM V2
In [ ]:
aster_extents = dict(latitude = latitude,
longitude = longitude,
platform = "TERRA",
product = "terra_aster_gdm")
Load Products
In [ ]:
landsat_dataset
In [ ]:
aster_dataset
In [ ]:
import time
def reformat_n64(t):
return time.strftime("%Y-%m-%d", time.gmtime(t.astype(int)/1000000000))
acq_dates = list(map(reformat_n64, landsat_dataset.time.values))
print("Choose the target date")
print(acq_dates[1:]) # There must be at least one acqusition before the target date.
In [ ]:
target_date = '2001-01-12'
target_index = acq_dates.index(target_date)
In [ ]:
from datetime import datetime , timedelta
year_before_target_date = datetime.strptime(target_date, '%Y-%m-%d') - timedelta(days = 365)
year_of_landsat_dataset = landsat_dataset.sel(time = slice(year_before_target_date, target_date))
In [ ]:
year_of_landsat_dataset
In [ ]:
from utils.data_cube_utilities.dc_displayutil import display_at_time
In [ ]:
from functools import reduce
import numpy as np
import xarray as xr
def ls7_qa_mask(dataset, keys):
land_cover_endcoding = dict( fill = [1],
clear = [66, 130],
water = [68, 132],
shadow = [72, 136],
snow = [80, 112, 144, 176],
cloud = [96, 112, 160, 176, 224],
low_conf = [66, 68, 72, 80, 96, 112],
med_conf = [130, 132, 136, 144, 160, 176],
high_conf= [224]
)
def merge_lists(a, b):
return a.union(set(land_cover_endcoding[b]))
relevant_encodings = reduce(merge_lists, keys,set())
return xr.DataArray( np.isin(dataset.pixel_qa,list(relevant_encodings)),
coords = dataset.pixel_qa.coords,
dims = dataset.pixel_qa.dims,
name = "cloud_mask",
attrs = dataset.attrs)
In [ ]:
is_clear_mask = ls7_qa_mask(year_of_landsat_dataset, ['clear', 'water'])
Display contents
In [ ]:
is_clear_mask
In [ ]:
product_dataset = xr.merge([year_of_landsat_dataset, is_clear_mask])
In [ ]:
from utils.data_cube_utilities.dc_rgb import rgb
rgb(landsat_dataset, at_index = target_index, width = 15)
In [ ]:
rgb(product_dataset, at_index = -1, paint_on_mask = [((product_dataset.cloud_mask.values), (255,0,0))], width = 15)
In [ ]:
rgb(product_dataset, at_index = -1, paint_on_mask = [(np.invert(product_dataset.cloud_mask.values), (0,0,0))], width = 15)
In [ ]:
after = product_dataset .isel(time = -1)
before = product_dataset .isel(time = -2)
In [ ]:
devegetation_mask = meets_devegetation_criteria(before,after,threshold = -0.18)
In [ ]:
product_dataset = xr.merge([product_dataset, devegetation_mask])
In [ ]:
def nan_to_bool(nd_array):
nd = nd_array.astype(float)
nd[np.isnan(nd)] = 0
return nd.astype(bool)
In [ ]:
rgb(product_dataset, at_index = -1,
paint_on_mask = [(nan_to_bool(product_dataset.where(devegetation_mask).red.values), (255,0,0))], width = 15)
In [ ]:
rgb(product_dataset, at_index = -1,
paint_on_mask = [(nan_to_bool(product_dataset.where(devegetation_mask).red.values), (255,0,0)),
(np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))],
width = 15)
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [(np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)
Elevation in Salgar Colombia might have a unique distribution of heights. Therefore, It makes sense to look at a histogram of all hieights and determine an appropriate threshold specific to this area.
In [ ]:
aster_dataset.num.plot.hist(figsize = (15,5), bins = 16)
The lack of validation data, with regards to height and agriculture means we pick an arbitrary threshold for filtering. The threshold of 5 was chosen.
In [ ]:
height_mask = meets_minimum_height_criteria(aster_dataset.isel(time = 0), threshold = 5)
In [ ]:
product_dataset = xr.merge([product_dataset, height_mask])
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [((nan_to_bool(product_dataset.where(height_mask).red.values)), (255,0,0))], width = 15)
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [((nan_to_bool(product_dataset.where(height_mask).red.values)), (255,0,0)),
(np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [(np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)
In [ ]:
cloudless_dataset = product_dataset.where(product_dataset.cloud_mask)
inundation_mask = meets_inundation_criteria(cloudless_dataset)
In [ ]:
product_dataset = xr.merge([product_dataset, inundation_mask])
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [((nan_to_bool(product_dataset.where(inundation_mask).red.values)), (255,0,0))], width = 15)
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [((nan_to_bool(product_dataset.where(inundation_mask).red.values)), (255,0,0)),
(np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [(np.invert(nan_to_bool(product_dataset.where(inundation_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)
In [ ]:
non_recurring_ndvi_change = vegetation_sum_threshold(product_dataset, threshold = 1)
In [ ]:
product_dataset = xr.merge([product_dataset, non_recurring_ndvi_change])
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [((nan_to_bool(product_dataset.where(non_recurring_ndvi_change).red.values)), (255,0,0))], width = 15)
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [
((nan_to_bool(product_dataset.where(non_recurring_ndvi_change).red.values)),(255,0,0)),
(np.invert(nan_to_bool(product_dataset.where(inundation_mask ).red.values)),(0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [(np.invert(nan_to_bool(product_dataset.where(non_recurring_ndvi_change).red.values)),(0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(inundation_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)
In [ ]:
remaining_pixel_mask = product_dataset.isel(time = -1).cloud_mask & product_dataset.devegetation_mask & product_dataset.height_mask & product_dataset.inundation_mask & product_dataset.repeat_devegetation_mask
In [ ]:
deforestation_mask = boolean_xarray_segmentation_filter(remaining_pixel_mask).rename("deforestation_mask").drop("time")
In [ ]:
product_dataset = xr.merge([product_dataset, deforestation_mask])
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [((nan_to_bool(product_dataset.where(deforestation_mask).red.values)), (255,0,0))])
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [
((nan_to_bool(product_dataset.where(deforestation_mask).red.values)),(255,0,0)),
(np.invert(nan_to_bool(product_dataset.where(non_recurring_ndvi_change).red.values)),(0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(inundation_mask ).red.values)),(0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))])
In [ ]:
rgb(product_dataset,
at_index = -1,
paint_on_mask = [
(np.invert(nan_to_bool(product_dataset.where(deforestation_mask).red.values)),(0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(non_recurring_ndvi_change).red.values)),(0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(inundation_mask ).red.values)),(0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
(np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))])
In [ ]:
display_map(latitude = latitude, longitude = longitude)
In [ ]: