In [1]:
from sys import path
path.append("../../")
import datacube
import datetime
import folium
import numpy as np
import pandas as pd
import utils.data_cube_utilities.dc_display_map as dm
import xarray as xr
from folium import plugins
from sklearn.externals import joblib
from sklearn.preprocessing import minmax_scale
from utils.data_cube_utilities.dc_frac import frac_coverage_classify
from utils.data_cube_utilities.dc_mosaic import ls8_unpack_qa
from utils.data_cube_utilities.dc_mosaic import create_median_mosaic
from utils.data_cube_utilities.dc_rgb import rgb
In [2]:
def load_dc_data():
"""Loads the dataset from Datacube"""
dc = datacube.Datacube()
params = dict(platform = 'LANDSAT_8',
product = 'ls8_lasrc_uruguay',
latitude = (-34.44988376, -34.096445),
longitude = (-56.29119062, -55.24653668),
time = ('2016-01-01', '2017-01-01'),
measurements = ['red', 'green', 'blue', 'nir', 'swir1', 'swir2', 'pixel_qa'])
dataset = dc.load(**params)
return dataset
In [3]:
class TemporalCompositor:
"""A TemporalCompositor object for creating median composites over a temporal dimension.
Attributes:
dataset (xarray.Dataset): The dataset used in the compositing.
"""
def __init__(self, dataset):
"""Initialize object and set the dataset.
Args:
dataset (xarray.Dataset): The dataset used in the compositing.
"""
self.dataset = dataset
def clean_mask_ls8(self):
"""A function to create a clean mask for compositing.
Returns:
The clean mask.
"""
water_mask = ls8_unpack_qa(self.dataset.pixel_qa, cover_type = "water")
clear_mask = ls8_unpack_qa(self.dataset.pixel_qa, cover_type = "clear")
clean_mask = np.logical_or(water_mask, clear_mask)
return clean_mask
def create_temporal_composite(self):
"""A function to create the median temporal composite.
Returns:
The median temporal composite.
"""
clean = self.clean_mask_ls8()
composite = create_median_mosaic(self.dataset, clean_mask = clean)
composite = composite.drop('pixel_qa')
return composite
In [4]:
def NDVI(dataset: xr.Dataset) -> xr.DataArray:
return (dataset.nir - dataset.red)/(dataset.nir + dataset.red).rename("NDVI")
def NBR(dataset: xr.Dataset) -> xr.DataArray:
return ((dataset.nir - dataset.swir2) / (dataset.swir2 + dataset.nir)).rename("NBR")
def NDWI_2(dataset: xr.Dataset) -> xr.DataArray:
return (dataset.green - dataset.nir)/(dataset.green + dataset.nir).rename("NDWI_2")
def SCI(dataset: xr.Dataset) -> xr.DataArray:
return ((dataset.swir1 - dataset.nir)/(dataset.swir1 + dataset.nir)).rename("SCI")
def PNDVI(dataset: xr.Dataset) -> xr.DataArray:
nir = dataset.nir
green = dataset.green
blue = dataset.blue
red = dataset.red
return ((nir - (green + red + blue))/(nir + (green + red + blue))).rename("PNDVI")
def CVI(dataset: xr.Dataset) -> xr.DataArray:
return (dataset.nir * (dataset.red / (dataset.green * dataset.green))).rename("CVI")
def CCCI(dataset: xr.Dataset) -> xr.DataArray:
return ((dataset.nir - dataset.red)/(dataset.nir + dataset.red)).rename("CCCI")
def NBR2(dataset: xr.Dataset) -> xr.DataArray:
return (dataset.swir1 - dataset.swir2)/(dataset.swir1 + dataset.swir2)
def coefficient_of_variance(da:xr.DataArray):
return da.std(dim = "time")/da.mean(dim = "time")
def NDVI_coeff_var(ds, mask = None):
ds_ndvi = NDVI(ds)
masked_ndvi = ds_ndvi.where(mask)
return coefficient_of_variance(masked_ndvi)
def fractional_cover_2d(dataset: xr.Dataset) -> xr.DataArray:
return frac_coverage_classify(dataset, clean_mask= np.ones(dataset.red.values.shape).astype(bool))
In [5]:
class Classifier:
"""A Classifier object for performing the classification on a dataset.
Attributes:
rf (RandomForestClassifier): The RandomForestClassifier used in the classification.
"""
def __init__(self, model_location='./classifiers/models/random_forest.model'):
"""Initializes the data and loads the binary model
Args:
model_location (string): The location of the RandomForestClassifier's exported binary.
"""
self.rf = joblib.load(model_location)
def classify(self, features):
"""A function to classify the given dataset.
Args:
features (xarray.Dataset): The set of features to run the classifier with.
Returns:
An Xarray Dataset conatining the given features with the classification results appended.
"""
X = features.values
X = np.array_split(X, 100)
y_pred = []
for i in range(len(X)):
y_pred.append(self.rf.predict(X[i]))
y_pred = np.concatenate(y_pred)
df = pd.DataFrame(y_pred, columns=['label'])
features['label'] = df.values
return features
def build_features(self, dataset):
"""Builds the features used in classification.
Args:
dataset (xarray.Dataset): The dataset to use for building the features.
Returns:
A Pandas DataFrame of the given features with the built features appended.
"""
features = xr.Dataset()
compositor = TemporalCompositor(dataset)
composite = compositor.create_temporal_composite()
features = features.merge(composite)
feature_list = (NDVI,
NDVI_coeff_var,
PNDVI,
NBR,
NBR2,
NDWI_2,
SCI,
CVI,
CCCI,
fractional_cover_2d
)
clean = compositor.clean_mask_ls8()
for i in range(len(feature_list)):
if(feature_list[i].__name__ == 'NDVI_coeff_var'):
features[feature_list[i].__name__] = feature_list[i](dataset, mask = clean)
elif(feature_list[i].__name__ == 'fractional_cover_2d'):
features = features.merge(fractional_cover_2d(composite))
else:
features[feature_list[i].__name__] = feature_list[i](composite)
features.NDVI_coeff_var.values[ np.isnan(features.NDVI_coeff_var.values)] = 0
return features.to_dataframe()
In [6]:
class FeaturesDisplay:
"""A FeaturesDisplay object for presenting classification results.
Attributes:
dataset (xarray.Dataset): The features DataFrame represented as an Xarray Dataset.
"""
def __init__(self, features: xr.Dataset):
"""Initializes the FeaturesDisplay object.
Args:
features (pandas.DataFrame): A classified features DataFrame with a label key.
"""
self.dataset = xr.Dataset.from_dataframe(features)
def images(self):
"""Generates multiple images of the features based on classification results."""
landuse = ('Forest',
'Misc',
'Naturalgrassland',
'Prairie',
'Summercrops'
)
dataset = self.dataset
for i in range(len(landuse)):
tmp = dataset.where(dataset.label == landuse[i])
print("%s:" % landuse[i])
rgb(tmp, bands= ["swir1","nir","red"], width= 20)
def _get_canvas(self, key:str) -> np.array:
canvas = np.zeros((len(self.dataset.latitude.values),
len(self.dataset.longitude.values),
4))
paint_here = self.dataset.label == key
canvas[paint_here] = np.array([255, 255, 255, 179])
canvas[~paint_here] = np.array([0, 0, 0, 0])
return canvas
def map_overlay(self, key, color=None):
"""Maps classifications using Folium.
Args:
key (string): The classification to map from the following: (Forest, Misc, Naturalgrassland, Prairie, Summercrops)
color: Set to False to disable colorized overlay.
"""
dataset = self.dataset
tmp = dataset.where(dataset.label == key)
latitudes = (min(dataset.latitude.values), max(dataset.latitude.values))
longitudes = (min(dataset.longitude.values), max(dataset.longitude.values))
zoom_level = dm._degree_to_zoom_level(latitudes[0], latitudes[1])
# I don't know why, but this makes Folium work for these labels
if(key == 'Summercrops' or key == 'Naturalgrassland'):
mult = 255/3
else:
mult = 1
print("%s:" % key)
if(color == None):
r = tmp.nir.values
g = tmp.red.values
b = tmp.green.values
r[np.isnan(r)] = 0
g[np.isnan(g)] = 0
b[np.isnan(b)] = 0
minmax_scale(r, feature_range=(0,255), copy=False)
minmax_scale(g, feature_range=(0,255), copy=False)
minmax_scale(b, feature_range=(0,255), copy=False)
rgb_stack = np.dstack((r,g,b))
a = np.ones(r.shape) * 128
a[np.where(r == 0)] = 0
rgb_uint8 = (rgb_stack / mult).astype(np.uint8)
rgb_uint8 = np.dstack((rgb_uint8, a))
else:
rgb_uint8 = self._get_canvas(key).astype(np.uint8)
m = folium.Map(location=[np.average(latitudes),
np.average(longitudes)],
zoom_start=zoom_level+1,
tiles=" http://mt1.google.com/vt/lyrs=y&z={z}&x={x}&y={y}",
attr="Google")
m.add_child(plugins.ImageOverlay(np.flipud(rgb_uint8), \
bounds =[[min(latitudes), min(longitudes)], [max(latitudes), max(longitudes)]]))
folium.LayerControl().add_to(m)
return m
In [7]:
dataset = load_dc_data()
In [8]:
classifier = Classifier()
features = classifier.build_features(dataset)
features.head(10)
In [ ]:
features = classifier.classify(features)
features.head(10)
In [ ]:
display = FeaturesDisplay(features)
display.images()
In [ ]:
display.map_overlay('Forest', color=False)
In [ ]:
display.map_overlay('Misc', color=False)
In [ ]:
display.map_overlay('Naturalgrassland', color=False)
In [ ]:
display.map_overlay('Prairie', color=False)
In [ ]:
display.map_overlay('Summercrops', color=False)
To show how this proof of concept code may be used; we've created an example Python module. The module can be used to determine whether an input feature set is explicitly a forest or not a forest. This example module can be easily modified to do the same for the other classification labels.
In [ ]:
# This imports the actual classifier.
from classifiers.forest_classifier import ForestClassifier
"""
Load in out dataset. This is just an example. You can use a different
dataset as long as it contains the appropriate features and they are
in the order that the classifier needs them.
"""
dataset = load_dc_data()
"""
Generate a clean mask using the method in this same notebook.
This is just an example. You can supply your own mask as long
as it is of boolean type or boolean-like (1 or 0).
"""
mask = TemporalCompositor(dataset).clean_mask_ls8()
# Running the actual classifier with our example dataset and mask.
forest_classifier = ForestClassifier('./classifiers/models/random_forest.model')
forest = forest_classifier.classify(dataset, mask)
forest
In [ ]: