Kmeans over a set of GeoTiffs

This notebook loads a set of GeoTiffs into a RDD of Tiles, with each Tile being a band in the GeoTiff. Each GeoTiff file contains SpringIndex- or LastFreeze- value for one year over the entire USA.

Kmeans takes years as dimensions. Hence, the matrix has cells as rows and the years as columns. To cluster on all years, the matrix needs to be transposed. The notebook has two flavors of matrix transpose, locally by the Spark-driver or distributed using the Spark-workers. Once transposed the matrix is converted to a RDD of dense vectors to be used by Kmeans algorithm from Spark-MLlib. The end result is a grid where each cell has a cluster ID which is then saved into a SingleBand GeoTiff. By saving the result into a GeoTiff, the reader can plot it using a Python notebook as the one defined in the python examples.

Dependencies


In [1]:
import sys.process._
import geotrellis.proj4.CRS
import geotrellis.raster.{CellType, ArrayTile, DoubleArrayTile, Tile}
import geotrellis.raster.io.geotiff._
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.io.geotiff.{GeoTiff, SinglebandGeoTiff}
import geotrellis.spark.io.hadoop._
import geotrellis.vector.{Extent, ProjectedExtent}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry, RowMatrix}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

//Spire is a numeric library for Scala which is intended to be generic, fast, and precise.
import spire.syntax.cfor._

Load multiple GeoTiffs into a RDD


In [2]:
val single_band = false
val local_mode = false

var num_cols_rows :(Int, Int) = (0, 0)
var cellT :CellType = CellType.fromName("uint8raw")
var grids_RDD :RDD[Array[Double]] = sc.emptyRDD

val pattern: String = "tif"
var filepath: String = ""

if (single_band) {
    //Single band GeoTiff
    filepath = "hdfs:///user/hadoop/spring-index/LastFreeze/"
} else {
    //Multi band GeoTiff
    filepath = "hdfs:///user/hadoop/spring-index/BloomFinal/"
}

if (single_band) {
    //Lets load a Singleband GeoTiffs and return RDD just with the tiles.
    val tiles_RDD = sc.hadoopGeoTiffRDD(filepath, pattern).values
    
    //Retrive the numbre of cols and rows of the Tile's grid
    val tiles_withIndex = tiles_RDD.zipWithIndex().map{case (e,v) => (v,e)}
    val tile0 = (tiles_withIndex.filter(m => m._1==0).values.collect())(0)
    num_cols_rows = (tile0.cols,tile0.rows)
    cellT = tile0.cellType
    
    grids_RDD = tiles_RDD.map(m => m.toArrayDouble())
} else {
    //Lets load Multiband GeoTiffs and return RDD just with the tiles.
    val tiles_RDD = sc.hadoopMultibandGeoTiffRDD(filepath, pattern).values
    
    //Retrive the numbre of cols and rows of the Tile's grid
    val tiles_withIndex = tiles_RDD.zipWithIndex().map{case (e,v) => (v,e)}
    val tile0 = (tiles_withIndex.filter(m => m._1==0).values.collect())(0)
    num_cols_rows = (tile0.cols,tile0.rows)
    cellT = tile0.cellType
    
    //Lets read the average of the Spring-Index which is stored in the 4th band
    grids_RDD = tiles_RDD.map(m => m.band(3).toArrayDouble())
}


Waiting for a Spark session to start...
[Stage 1:=======================================================> (35 + 1) / 36]
single_band = false
local_mode = false
num_cols_rows = (7808,3892)
cellT = float64raw
grids_RDD = MapPartitionsRDD[8] at map at <console>:91
pattern = tif
filepath = hdfs:///user/hadoop/spring-index/BloomFinal/
Out[2]:
hdfs:///user/hadoop/spring-index/BloomFinal/

Read metadata and create indexes


In [3]:
//Global variables
var projected_extent = new ProjectedExtent(new Extent(0,0,0,0), CRS.fromName("EPSG:3857"))
var grid0: RDD[(Long, Double)] = sc.emptyRDD
var grid0_index: RDD[Long] = sc.emptyRDD
var grids_noNaN_RDD: RDD[Array[Double]] = sc.emptyRDD

//Retrieve the ProjectExtent which contains metadata such as CRS and bounding box
val projected_extents_withIndex = sc.hadoopGeoTiffRDD(filepath, pattern).keys.zipWithIndex().map{case (e,v) => (v,e)}
projected_extent = (projected_extents_withIndex.filter(m => m._1 == 0).values.collect())(0)

//Get Index for each Cell
val grids_withIndex = grids_RDD.zipWithIndex().map { case (e, v) => (v, e) }
grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.filter(m => !m._1.isNaN).map { case (v, i) => (i) }

//Get the Tile's grid
grid0 = grids_withIndex.filter(m => m._1 == 0).values.flatMap( m => m).zipWithIndex.map{case (v,i) => (i,v)}

//Lets filter out NaN
grids_noNaN_RDD = grids_RDD.map(m => m.filter(!_.isNaN))


[Stage 6:=======================================================> (34 + 1) / 35]
projected_extent = ProjectedExtent(Extent(-126.30312894720473, 14.29219617034159, -56.162671563152486, 49.25462702827337),geotrellis.proj4.CRS$$anon$3@41d0d1b7)
grid0 = MapPartitionsRDD[31] at map at <console>:68
grid0_index = MapPartitionsRDD[26] at map at <console>:65
grids_noNaN_RDD = MapPartitionsRDD[32] at map at <console>:71
projected_extents_withIndex = MapPartitionsRDD[16] at map at <console>:60
projected_extent = ProjectedExtent(Extent(-126.30312894720473, 14.29219617034159, -56.162671563152486, 49.25462702827337),geotrellis.pro...
Out[3]:
ProjectedExtent(Extent(-126.30312894720473, 14.29219617034159, -56.162671563152486, 49.25462702827337),geotrellis.proj4.CRS$$anon$3@41d0d1b7)

Matrix transpose

We need to do a Matrix transpose to have clusters per cell and not per year. With a GeoTiff representing a single year, the loaded data looks liks this:

bands_RDD.map(s => Vectors.dense(s)).cache()

//The vectors are rows and therefore the matrix will look like this:
[
Vectors.dense(0.0, 1.0, 2.0),
Vectors.dense(3.0, 4.0, 5.0),
Vectors.dense(6.0, 7.0, 8.0),
Vectors.dense(9.0, 0.0, 1.0)
]

The information was gathered from the blog how to convert a matrix to a RDD of vectors and a stackoverflow post on how to transpose an rdd in spark.


In [29]:
var grids_vec: RDD[Vector] = sc.emptyRDD

//A) For small memory footprint RDDs we can simply bring it to the Driver node and transpose it    
if (local_mode) {
    //First transpose and then parallelize otherwise you get:
    //error: polymorphic expression cannot be instantiated to expected type;
    val grids_noNaN_RDD_T = grids_noNaN_RDD.collect().transpose
    
    //Convert to a RDD
    val transposed = sc.parallelize(grids_noNaN_RDD_T)
    
    //Create a RDD of dense vectors and cache it
    grids_vec = transposed.map(m => Vectors.dense(m)).cache()

//B) For large memory footpring RDDs we need to run in distributed mode
} else {
    val mat :RowMatrix = new RowMatrix(grids_noNaN_RDD.map(m => Vectors.dense(m)))

    // Split the matrix into one number per line.
    val byColumnAndRow = mat.rows.zipWithIndex.map {
        case (row, rowIndex) => row.toArray.zipWithIndex.map {
            case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
        }   
    }.flatMap(x => x)
    
    val matt: CoordinateMatrix = new CoordinateMatrix(byColumnAndRow)
    val matt_T = matt.transpose()
    grids_vec = matt_T.toRowMatrix().rows
}


[Stage 43:====================================================>   (34 + 2) / 36]
grids_vec = MapPartitionsRDD[83] at map at IndexedRowMatrix.scala:90
Out[29]:
MapPartitionsRDD[83] at map at IndexedRowMatrix.scala:90

Kmeans training


In [29]:
val numClusters = 3
val numIterations = 5
val clusters = {
    KMeans.train(grids_vec, numClusters, numIterations)
}

// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(grids_vec)
println("Within Set Sum of Squared Errors = " + WSSSE)

//Un-persist it to save memory
grids_vec.unpersist()


Within Set Sum of Squared Errors = 2.6870621488867938E11                        
numClusters = 3
numIterations = 5
clusters = org.apache.spark.mllib.clustering.KMeansModel@54b35a67
WSSSE = 2.6870621488867938E11
Out[29]:
MapPartitionsRDD[216] at map at IndexedRowMatrix.scala:90

Cluster model's result management


In [30]:
//Lets save the model into HDFS. If the file already exists it will abort and report error.
if (single_band) {
    clusters.save(sc, "hdfs:///user/pheno/spring_index/LastFreeze/all_kmeans_model")
} else {
    clusters.save(sc, "hdfs:///user/pheno/spring_index/BloomFinal/all_kmeans_model")
}


Cluster Centers: 
[Stage 147:=====================================================> (31 + 1) / 32]

Re-load the KmeansModel

In case the notebook was left running in the background the next task helps the user to re-load the Kmeans Model.


In [13]:
var clusters :KMeansModel = new KMeansModel(Array.empty :Array[Vector])

if (single_band) {
    clusters = KMeansModel.load(sc, "hdfs:///user/pheno/spring_index/LastFreeze/all_kmeans_model")
} else {
    clusters = KMeansModel.load(sc, "hdfs:///user/pheno/spring_index/BloomFinal/all_kmeans_model")
}


[Stage 9:=====================================================>   (30 + 2) / 32]
clusters = org.apache.spark.mllib.clustering.KMeansModel@7a09538b
Out[13]:
org.apache.spark.mllib.clustering.KMeansModel@7a09538b

Run Kmeans clustering

Run Kmeans and obtain the clusters per each cell.


In [31]:
//Cache it so kmeans is more efficient
grids_vec.cache()

val kmeans_res = clusters.predict(grids_vec)

//Un-persist it to save memory
grids_vec.unpersist()


kmeans_res = MapPartitionsRDD[84] at map at KMeansModel.scala:69
Out[31]:
MapPartitionsRDD[83] at map at IndexedRowMatrix.scala:90

Show the cluster ID for the first 50 cells


In [17]:
val kmeans_res_out = kmeans_res.take(50)
kmeans_res_out.foreach(println)

println(kmeans_res_out.size)


0
kmeans_res_out = Array()
Out[17]:
[]

Assign cluster ID to each grid cell

To assign the clusterID to each grid cell it is necessary to get the indices of gird cells they belong to. The process is not straight forward because the ArrayDouble used for the creation of each dense Vector does not contain the NaN values, therefore there is not a direct between the indices in the Tile's grid and the ones in kmeans_res (kmeans result).

To join the two RDDS the knowledge was obtaing from a stackoverflow post on how to perform basic joins of two rdd tables in spark using python.

Merge two RDDs, one containing the clusters_ID indices and the other one the indices of a Tile's grid cells.


In [32]:
//The zip operator would be the most appropriated operator for this operation.
//However, it requires the RRDs to have the same number of partitions and each partition have the same number of records.
//val cluster_cell_pos = res.zip(band0_index)

//Since we can't use Zip, we index each RDD and then we join them.
val cluster_cell_pos = ((kmeans_res.zipWithIndex().map{ case (v,i) => (i,v)}).join(grid0_index.zipWithIndex().map{ case (v,i) => (i,v)})).map{ case (k,(v,i)) => (v,i)}


[Stage 48:======================================================> (34 + 1) / 35]
cluster_cell_pos = MapPartitionsRDD[92] at map at <console>:62
Out[32]:
MapPartitionsRDD[92] at map at <console>:62

Associate a Cluster_IDs to respective Grid_cell


In [50]:
val grid_clusters = grid0.leftOuterJoin(cluster_cell_pos.map{ case (c,i) => (i.toLong, c)})

//Convert all None to NaN
val grid_clusters_res = grid_clusters.sortByKey(true).map{case (k, (v, c)) => if (c == None) (k, Double.NaN) else (k, c.get.toDouble)}//.collect().foreach(println)


[Stage 130:===================================================>   (30 + 2) / 32]
grid_clusters = MapPartitionsRDD[115] at leftOuterJoin at <console>:63
grid_clusters_res = MapPartitionsRDD[119] at map at <console>:66
Out[50]:
MapPartitionsRDD[119] at map at <console>:66

Store the results in a SingleBand GeoTiff

The Grid with the cluster IDs is stored in a SinglBand GeoTiff.


In [54]:
val cluster_cells :Array[Double] = grid_clusters_res.values.collect()

//Define a Tile
val cluster_cellsD = DoubleArrayTile(cluster_cells, num_cols_rows._1, num_cols_rows._2)

val cluster_tile = geotrellis.raster.DoubleArrayTile.empty(cellT, num_cols_rows._1, num_cols_rows._2)

cfor(0)(_ < num_cols_rows._1, _ + 1) { col =>
    cfor(0)(_ < num_cols_rows._2, _ + 1) { row =>
        val v = cluster_cellsD.get(col, row)
        cluster_tile.setDouble(col, row, v)
    }
}

//Create GeoTiff
val geoTif = new SinglebandGeoTiff(cluster_tile, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions.DEFAULT)


[Stage 161:=====================================================> (31 + 1) / 32]
cluster_cells = Array(NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, 0.0, 2.0, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 0.0, 2.0, 2.0, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, 2.0, NaN, 0.0, 0.0, 2.0, 0.0, NaN, 2.0, 0.0, 2.0, 0.0, NaN, 2.0, 2.0, 0.0, 0.0, 1.0, 0.0, 2.0, 1.0, 2.0, 0.0, 2.0, 2.0, 0.0, 2.0, 0.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 0.0, 2.0, 1.0, 2.0, 0.0, 2.0, NaN, 2.0, 2.0, 0.0, 1.0, NaN, 0.0, 2.0, 2.0, 2.0, 2.0, 2.0, 0.0, 2.0, 0.0, 2.0, 0.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 2.0, 2.0, 1.0, 0.0, 2.0, 0.0, 2.0, 2.0, 2.0, NaN, 0.0, 0.0, 2.0, 2.0, 2.0, 0.0, 0.0, 2.0, 2.0, 0.0, 2.0, 0.0, 2.0, 0.0, 0.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 0.0, 0.0, ...
IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Save a GeoTiff to /tmp


In [55]:
var output:String = ""
val tmp_output = "/tmp/clusters.tif"
GeoTiffWriter.write(geoTif, tmp_output)

if (single_band) {
    output = "/user/pheno/spring-index/LastFreeze/clusters.tif"
} else {
    output = "/user/pheno/spring-index/BloomFinal/clusters.tif"
}


output = /user/pheno/spring-index/BloomFinal/clusters.tif
tmp_output = /tmp/clusters.tif
Out[55]:
/tmp/clusters.tif

Upload a GeoTiff to HDFS


In [57]:
val cmd = "hadoop dfs -copyFromLocal -f " + tmp_output + " " + output
Process(cmd)!


DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

cmd = hadoop dfs -copyFromLocal -f /tmp/clusters.tif /user/pheno/spring-index/BloomFinal/clusters.tif
warning: there was one feature warning; re-run with -feature for details
Out[57]:
0

In [ ]: