In [ ]:
%matplotlib inline
import numpy as np  
import matplotlib.pyplot as plt
import warnings; warnings.simplefilter('ignore')

Detecting Forest Change using Landsat 8 imagery

In this notebook forest change is detected by observing two contiguous acquisitions from landsat_8 imagery. The comparisons between a before and after images offers insight into changes in vegetation and inundation over a varied topography to give indication of deforestation. A broader history spanning one year into the past is also considered

The diagram below shows a series of very simple rules applied to a given point in the imagery to make a classification of forest changed or not changed

Rule 1: vegetation decreases significantly

NDVI is an index that correlates highly with the existence of vegetation. Its formulation is simple

$$pixel_{NDVI} = \frac{pixel_{NIR} - pixel_{red}}{pixel_{NIR} + pixel_{red}} $$

Implementation, with the help of xarray datastructures is equally as simple.


In [ ]:
def ndvi(dataset):  
    return ((dataset.nir - dataset.red)/(dataset.nir + dataset.red)).rename("NDVI")

In this step. The comparison of before and after NDVI values can indicate a loss of vegetation and is used as the first criteria for detecting deforestation.

The comparison relies on differencing between acquisitions.


In [ ]:
def delta(before, after):
    return after - before

def vegetation_change(before, after):
    vegetation_before = ndvi(before)
    vegetation_after  = ndvi(after)
    
    return delta(vegetation_before,
                 vegetation_after)

Through a process of visual survey a threshold is determined for NDVI decrease. In our example the determined threshold is 0.18, which means that pixels with NDVI subtraction smaller than -0.18 will be retained, larger than (-0.18) will not be considered as a possible point of deforestation


In [ ]:
def meets_devegetation_criteria(before,after, threshold = -0.18):
    #In this code the following comparison is applied to all pixels.
    #It returns a grid of True and False values to be used for filtering our dataset.  
    return (vegetation_change(before, after) < threshold).rename("devegetation_mask").astype(bool)


Rule 2: vegetation decrease is not the result of agricultural activity

Agricultural cultivation often occurs on flat and low land. This assumption is used to deal with the possibility of agricultural activity interfereing with a detection of deforestation/devegatation.

Thus, pixels with DEM value smaller than 30m are discarded.


In [ ]:
## ASTER

def meets_minimum_height_criteria(aster_ds, threshold = 30):
    return  (aster_ds.num > threshold).astype(bool).rename("height_mask")

Rule 3: vegetation decrease is not in an area that is known for inundation

Most forests are tall with canopies obscuring the view of the forest floor. Visually speaking, flooding should have very little effect in NDVI values since flooding happens below the canopy.

The presence of water at some point in the year might indicate that the detected vegetation is not forest, but an agricultural area or pond covered by vegetation water-fern.

MNDWI is a calculated index that correlates highly with the existence of water. Like NDVI its formulation is simple.


$$MNDWI = \frac{pixel_{green} - pixel_{swir1}}{pixel_{green} + pixel_{swir1}} $$

So is the implementation in code


In [ ]:
def mndwi(dataset):  
    return (dataset.green - dataset.swir1)/(dataset.green + dataset.swir1).rename("MNDWI")     
#In this code the following arithmetic is applied to all pixels in a dataset


In this step, the entire MNDWI history(time component) is analyzed and the max MNDWI is taken into account for each pixel. If, at some point in time, any MNDWI surpases some threshold, the assumption is made that the area is not a forest.


In [ ]:
def max_mndwi(dataset): 
    #In this code max is applied accross across all pixels along a time component. 
    _max = mndwi(dataset).max(dim = ['time'])
    return _max.rename("max_MNDWI")

A threshold is emperically evaulated to reduce the rate of false positives caused by inundated areas. In this case that threshold is 0. Pixels with max_MNDWI smaller than 0 will be retained, greater be discarded.


In [ ]:
def meets_inundation_criteria(dataset, threshold = 0.0):
    return (max_mndwi(dataset) < threshold).rename("inundation_mask")

Rule 4: vegetation decrease isn't a recurring or normal phenomena


In [ ]:
def create_ndvi_matrix(ds):
    ndvi_matrix            = ndvi(ds) 
    return ndvi_matrix.where(ds.cloud_mask)

def delta_matrix(ds):
    return np.diff(ds, axis = 0) 

def vegetation_sum_threshold(ds, threshold = 10):
    ndvi_delta_matrix = delta_matrix(create_ndvi_matrix(ds)) # lat,lon,t-1 matrix
    ndvi_change_magnitude = np.absolute(ndvi_delta_matrix)   # lat,lon,t-1 matrix
    cummulative_ndvi_change_magnitude = np.nansum(ndvi_change_magnitude, axis = 0) #lat, lon matrix,
    ndvi_change_repeat_mask = cummulative_ndvi_change_magnitude < threshold        #lat, lon boolean matrix
    return xr.DataArray(ndvi_change_repeat_mask,
                        dims = ["latitude", "longitude"],
                        coords = {"latitude": ds.latitude, "longitude": ds.longitude},
                        attrs = {"threshold": threshold},
                        name = "repeat_devegetation_mask")

Rule 5: vegetation decrease happens in large areas

The assumption is made that deforestation happens on a sufficiently large scale. Another assumption is made that detected deforestation in a pixel is typically spatially correlated with deforestation in adjacent pixels.

Following this reasoning, the final filter retains groups of deforestation larger than 5 pixels per group. The diagram below illustrates a candidate for deforestation based on this grouping criteria, and one that is rejected based on this criteria.




The implementation relies on adequately grouping pixels in an efficient manner. The use of skimage's measure module yields segemented arrays of pixel groups.



In [ ]:
from skimage import measure
import numpy as np

def group_pixels(array, connectivity = 1):
    arr = measure.label(array, connectivity=connectivity)
    return [np.dstack(np.where(arr == y))[0] for y in range(1,np.amax(arr))]

In [ ]:
def numpy_group_mask(boolean_np_array, min_size = 5):  
    all_groups = group_pixels(boolean_np_array.astype(int))
    candidate_groups = filter(lambda group:
                                  (len(group) > min_size) & (group != 0).all(),
                                  all_groups)
    candidate_pixels = (pixel for group in candidate_groups for pixel in group)  
    dynamic_array = np.zeros(boolean_np_array.shape) 
    for x,y in candidate_pixels:
        dynamic_array[x][y] = 1  
    return dynamic_array.astype(bool)



In [ ]:
def boolean_xarray_segmentation_filter(da,  min_size = 5):
    mask_np = numpy_group_mask(da.values,min_size = 5)
    return xr.DataArray(mask_np,
                        dims = da.dims,
                        coords = da.coords,
                        attrs = {"group_size": min_size},
                        name = "filtered_chunks_mask")

Run on a small area

The area


In [ ]:
# Zanzibar, Tanzania
latitude = (-6.2238, -6.1267)
longitude = (39.2298, 39.2909)
date_range = ('2000-01-01', '2001-12-31')

A quick visualization


In [ ]:
## The code below renders a map that can be used to orient yourself with the region.
from utils.data_cube_utilities.dc_display_map import display_map
display_map(latitude = latitude, longitude = longitude)

Loading Data for the Area

The following lines of code load in ASTER DEM data, and Landsat7 data through datacube's python api.


In [ ]:
import datacube
dc = datacube.Datacube()

Define the loads

Landsat 7


In [ ]:
ls7_extents = dict(latitude     = latitude,
                   longitude    = longitude,
                   platform     = "LANDSAT_7",
                   product      = "ls7_usgs_sr_scene",
                   time         = date_range,
                   measurements = ['red','green','blue','nir','swir1', 'swir2', 'pixel_qa'] )

ASTER GDEM V2


In [ ]:
aster_extents = dict(latitude = latitude,
                     longitude = longitude,
                     platform = "TERRA",
                     product = "terra_aster_gdm")

Load Products


In [ ]:
landsat_dataset

In [ ]:
aster_dataset

Pick a time


In [ ]:
import time  

def reformat_n64(t):
        return time.strftime("%Y-%m-%d", time.gmtime(t.astype(int)/1000000000))  
    
acq_dates = list(map(reformat_n64, landsat_dataset.time.values))
print("Choose the target date")
print(acq_dates[1:]) # There must be at least one acqusition before the target date.

In [ ]:
target_date  = '2001-01-12'
target_index = acq_dates.index(target_date)

In [ ]:
from datetime import datetime , timedelta

year_before_target_date = datetime.strptime(target_date, '%Y-%m-%d') - timedelta(days = 365)
year_of_landsat_dataset = landsat_dataset.sel(time = slice(year_before_target_date, target_date))

In [ ]:
year_of_landsat_dataset

A quick RGB visualization function


In [ ]:
from utils.data_cube_utilities.dc_displayutil import display_at_time

Generate a boolean mask to filter clouds


In [ ]:
from functools import reduce  
import numpy as np  
import xarray as xr

def ls7_qa_mask(dataset, keys):
    land_cover_endcoding = dict( fill     =  [1], 
                                 clear    =  [66,  130], 
                                 water    =  [68,  132],
                                 shadow   =  [72,  136],
                                 snow     =  [80,  112, 144, 176],
                                 cloud    =  [96,  112, 160, 176, 224],
                                 low_conf =  [66,  68,  72,  80,  96,  112],
                                 med_conf =  [130, 132, 136, 144, 160, 176],
                                 high_conf=  [224]
                               ) 
    def merge_lists(a, b): 
        return a.union(set(land_cover_endcoding[b]))
                        
    relevant_encodings = reduce(merge_lists, keys,set())

    return xr.DataArray( np.isin(dataset.pixel_qa,list(relevant_encodings)),
                         coords = dataset.pixel_qa.coords,
                         dims = dataset.pixel_qa.dims,
                         name = "cloud_mask",
                         attrs = dataset.attrs)



In [ ]:
is_clear_mask = ls7_qa_mask(year_of_landsat_dataset, ['clear', 'water'])

Display contents


In [ ]:
is_clear_mask

Add landsat and cloud mask to a common Xarray


In [ ]:
product_dataset =   xr.merge([year_of_landsat_dataset, is_clear_mask])

Unfiltered image at selected time


In [ ]:
from utils.data_cube_utilities.dc_rgb import rgb 
rgb(landsat_dataset, at_index = target_index, width = 15)

Retained Pixels after filtering (in red)


In [ ]:
rgb(product_dataset, at_index = -1, paint_on_mask = [((product_dataset.cloud_mask.values), (255,0,0))], width = 15)

Filtered Imagery


In [ ]:
rgb(product_dataset, at_index = -1, paint_on_mask = [(np.invert(product_dataset.cloud_mask.values), (0,0,0))], width = 15)

STEP 1: NDVI Decrease


In [ ]:
after = product_dataset  .isel(time = -1)
before = product_dataset .isel(time = -2)

In [ ]:
devegetation_mask = meets_devegetation_criteria(before,after,threshold = -0.18)

In [ ]:
product_dataset = xr.merge([product_dataset, devegetation_mask])

In [ ]:
def nan_to_bool(nd_array):
    nd = nd_array.astype(float)
    nd[np.isnan(nd)] = 0
    
    return nd.astype(bool)

NDVI change mask (in red)


In [ ]:
rgb(product_dataset, at_index = -1,
    paint_on_mask = [(nan_to_bool(product_dataset.where(devegetation_mask).red.values), (255,0,0))], width = 15)

NDVI change + previous mask


In [ ]:
rgb(product_dataset, at_index = -1,
    paint_on_mask = [(nan_to_bool(product_dataset.where(devegetation_mask).red.values), (255,0,0)),
                     (np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))],
    width = 15)

Remaining pixels after filtering


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [(np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)

Step 2: Filter DEM

Elevation in Salgar Colombia might have a unique distribution of heights. Therefore, It makes sense to look at a histogram of all hieights and determine an appropriate threshold specific to this area.


In [ ]:
aster_dataset.num.plot.hist(figsize = (15,5), bins = 16)


The lack of validation data, with regards to height and agriculture means we pick an arbitrary threshold for filtering. The threshold of 5 was chosen.


In [ ]:
height_mask = meets_minimum_height_criteria(aster_dataset.isel(time = 0), threshold = 5)

In [ ]:
product_dataset = xr.merge([product_dataset, height_mask])

Valid Height Mask


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [((nan_to_bool(product_dataset.where(height_mask).red.values)), (255,0,0))], width = 15)

Height Mask + Previous Filters


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [((nan_to_bool(product_dataset.where(height_mask).red.values)), (255,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)

Remaining


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [(np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)

STEP 3: Filter inundation criteria


In [ ]:
cloudless_dataset = product_dataset.where(product_dataset.cloud_mask)
inundation_mask = meets_inundation_criteria(cloudless_dataset)

In [ ]:
product_dataset = xr.merge([product_dataset, inundation_mask])

Inundation Mask


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [((nan_to_bool(product_dataset.where(inundation_mask).red.values)), (255,0,0))], width = 15)

Inundation Mask + Previous Masks


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [((nan_to_bool(product_dataset.where(inundation_mask).red.values)), (255,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)

Remaining


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [(np.invert(nan_to_bool(product_dataset.where(inundation_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)

STEP 4: Recurring NDVI change


In [ ]:
non_recurring_ndvi_change = vegetation_sum_threshold(product_dataset, threshold = 1)

In [ ]:
product_dataset = xr.merge([product_dataset, non_recurring_ndvi_change])

Non-Repeat NDVI Mask


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [((nan_to_bool(product_dataset.where(non_recurring_ndvi_change).red.values)), (255,0,0))], width = 15)

Non-Repeat NDVI Mask + Previous Masks


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [
                    ((nan_to_bool(product_dataset.where(non_recurring_ndvi_change).red.values)),(255,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(inundation_mask          ).red.values)),(0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)

Remaining


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [(np.invert(nan_to_bool(product_dataset.where(non_recurring_ndvi_change).red.values)),(0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(inundation_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))], width = 15)

STEP 5: Filter in favor of sufficiently large pixel groups


In [ ]:
remaining_pixel_mask = product_dataset.isel(time = -1).cloud_mask & product_dataset.devegetation_mask & product_dataset.height_mask & product_dataset.inundation_mask & product_dataset.repeat_devegetation_mask

In [ ]:
deforestation_mask = boolean_xarray_segmentation_filter(remaining_pixel_mask).rename("deforestation_mask").drop("time")

In [ ]:
product_dataset = xr.merge([product_dataset, deforestation_mask])

Deforestation Mask


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [((nan_to_bool(product_dataset.where(deforestation_mask).red.values)), (255,0,0))])

Deforestation Mask in the context of previous filter results


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [
                     ((nan_to_bool(product_dataset.where(deforestation_mask).red.values)),(255,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(non_recurring_ndvi_change).red.values)),(0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(inundation_mask          ).red.values)),(0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))])

Deforestation After Filtering


In [ ]:
rgb(product_dataset,
    at_index = -1,
    paint_on_mask = [
                     (np.invert(nan_to_bool(product_dataset.where(deforestation_mask).red.values)),(0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(non_recurring_ndvi_change).red.values)),(0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(inundation_mask          ).red.values)),(0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(height_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.where(devegetation_mask).red.values)), (0,0,0)),
                     (np.invert(nan_to_bool(product_dataset.cloud_mask.values)), (0,0,0))])

In [ ]:
display_map(latitude = latitude, longitude = longitude)

In [ ]: