This notebook loads a set of GeoTiffs into a RDD of Tiles, with each Tile being a band in the GeoTiff. Each GeoTiff file contains SpringIndex- or LastFreeze- value for one year over the entire USA.
Kmeans takes years as dimensions. Hence, the matrix has cells as rows and the years as columns. To cluster on all years, the matrix needs to be transposed. The notebook has two flavors of matrix transpose, locally by the Spark-driver or distributed using the Spark-workers. Once transposed the matrix is converted to a RDD of dense vectors to be used by Kmeans algorithm from Spark-MLlib. The end result is a grid where each cell has a cluster ID which is then saved into a SingleBand GeoTiff. By saving the result into a GeoTiff, the reader can plot it using a Python notebook as the one defined in the python examples.
In [1]:
import sys.process._
import geotrellis.proj4.CRS
import geotrellis.raster.{CellType, ArrayTile, DoubleArrayTile, Tile}
import geotrellis.raster.io.geotiff._
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.io.geotiff.{GeoTiff, SinglebandGeoTiff}
import geotrellis.spark.io.hadoop._
import geotrellis.vector.{Extent, ProjectedExtent}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry, RowMatrix}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//Spire is a numeric library for Scala which is intended to be generic, fast, and precise.
import spire.syntax.cfor._
In [2]:
val single_band = false
val local_mode = false
var num_cols_rows :(Int, Int) = (0, 0)
var cellT :CellType = CellType.fromName("uint8raw")
var grids_RDD :RDD[Array[Double]] = sc.emptyRDD
val pattern: String = "tif"
var filepath: String = ""
if (single_band) {
//Single band GeoTiff
filepath = "hdfs:///user/hadoop/spring-index/LastFreeze/"
} else {
//Multi band GeoTiff
filepath = "hdfs:///user/hadoop/spring-index/BloomFinal/"
}
if (single_band) {
//Lets load a Singleband GeoTiffs and return RDD just with the tiles.
val tiles_RDD = sc.hadoopGeoTiffRDD(filepath, pattern).values
//Retrive the numbre of cols and rows of the Tile's grid
val tiles_withIndex = tiles_RDD.zipWithIndex().map{case (e,v) => (v,e)}
val tile0 = (tiles_withIndex.filter(m => m._1==0).values.collect())(0)
num_cols_rows = (tile0.cols,tile0.rows)
cellT = tile0.cellType
grids_RDD = tiles_RDD.map(m => m.toArrayDouble())
} else {
//Lets load Multiband GeoTiffs and return RDD just with the tiles.
val tiles_RDD = sc.hadoopMultibandGeoTiffRDD(filepath, pattern).values
//Retrive the numbre of cols and rows of the Tile's grid
val tiles_withIndex = tiles_RDD.zipWithIndex().map{case (e,v) => (v,e)}
val tile0 = (tiles_withIndex.filter(m => m._1==0).values.collect())(0)
num_cols_rows = (tile0.cols,tile0.rows)
cellT = tile0.cellType
//Lets read the average of the Spring-Index which is stored in the 4th band
grids_RDD = tiles_RDD.map(m => m.band(3).toArrayDouble())
}
Out[2]:
In [3]:
//Global variables
var projected_extent = new ProjectedExtent(new Extent(0,0,0,0), CRS.fromName("EPSG:3857"))
var grid0: RDD[(Long, Double)] = sc.emptyRDD
var grid0_index: RDD[Long] = sc.emptyRDD
var grids_noNaN_RDD: RDD[Array[Double]] = sc.emptyRDD
//Retrieve the ProjectExtent which contains metadata such as CRS and bounding box
val projected_extents_withIndex = sc.hadoopGeoTiffRDD(filepath, pattern).keys.zipWithIndex().map{case (e,v) => (v,e)}
projected_extent = (projected_extents_withIndex.filter(m => m._1 == 0).values.collect())(0)
//Get Index for each Cell
val grids_withIndex = grids_RDD.zipWithIndex().map { case (e, v) => (v, e) }
grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.filter(m => !m._1.isNaN).map { case (v, i) => (i) }
//Get the Tile's grid
grid0 = grids_withIndex.filter(m => m._1 == 0).values.flatMap( m => m).zipWithIndex.map{case (v,i) => (i,v)}
//Lets filter out NaN
grids_noNaN_RDD = grids_RDD.map(m => m.filter(!_.isNaN))
Out[3]:
We need to do a Matrix transpose to have clusters per cell and not per year. With a GeoTiff representing a single year, the loaded data looks liks this:
bands_RDD.map(s => Vectors.dense(s)).cache()
//The vectors are rows and therefore the matrix will look like this:
[
Vectors.dense(0.0, 1.0, 2.0),
Vectors.dense(3.0, 4.0, 5.0),
Vectors.dense(6.0, 7.0, 8.0),
Vectors.dense(9.0, 0.0, 1.0)
]
The information was gathered from the blog how to convert a matrix to a RDD of vectors and a stackoverflow post on how to transpose an rdd in spark.
In [29]:
var grids_vec: RDD[Vector] = sc.emptyRDD
//A) For small memory footprint RDDs we can simply bring it to the Driver node and transpose it
if (local_mode) {
//First transpose and then parallelize otherwise you get:
//error: polymorphic expression cannot be instantiated to expected type;
val grids_noNaN_RDD_T = grids_noNaN_RDD.collect().transpose
//Convert to a RDD
val transposed = sc.parallelize(grids_noNaN_RDD_T)
//Create a RDD of dense vectors and cache it
grids_vec = transposed.map(m => Vectors.dense(m)).cache()
//B) For large memory footpring RDDs we need to run in distributed mode
} else {
val mat :RowMatrix = new RowMatrix(grids_noNaN_RDD.map(m => Vectors.dense(m)))
// Split the matrix into one number per line.
val byColumnAndRow = mat.rows.zipWithIndex.map {
case (row, rowIndex) => row.toArray.zipWithIndex.map {
case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
}
}.flatMap(x => x)
val matt: CoordinateMatrix = new CoordinateMatrix(byColumnAndRow)
val matt_T = matt.transpose()
grids_vec = matt_T.toRowMatrix().rows
}
Out[29]:
In [29]:
val numClusters = 3
val numIterations = 5
val clusters = {
KMeans.train(grids_vec, numClusters, numIterations)
}
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(grids_vec)
println("Within Set Sum of Squared Errors = " + WSSSE)
//Un-persist it to save memory
grids_vec.unpersist()
Out[29]:
In [30]:
//Lets save the model into HDFS. If the file already exists it will abort and report error.
if (single_band) {
clusters.save(sc, "hdfs:///user/pheno/spring_index/LastFreeze/all_kmeans_model")
} else {
clusters.save(sc, "hdfs:///user/pheno/spring_index/BloomFinal/all_kmeans_model")
}
In [13]:
var clusters :KMeansModel = new KMeansModel(Array.empty :Array[Vector])
if (single_band) {
clusters = KMeansModel.load(sc, "hdfs:///user/pheno/spring_index/LastFreeze/all_kmeans_model")
} else {
clusters = KMeansModel.load(sc, "hdfs:///user/pheno/spring_index/BloomFinal/all_kmeans_model")
}
Out[13]:
In [31]:
//Cache it so kmeans is more efficient
grids_vec.cache()
val kmeans_res = clusters.predict(grids_vec)
//Un-persist it to save memory
grids_vec.unpersist()
Out[31]:
In [17]:
val kmeans_res_out = kmeans_res.take(50)
kmeans_res_out.foreach(println)
println(kmeans_res_out.size)
Out[17]:
To assign the clusterID to each grid cell it is necessary to get the indices of gird cells they belong to. The process is not straight forward because the ArrayDouble used for the creation of each dense Vector does not contain the NaN values, therefore there is not a direct between the indices in the Tile's grid and the ones in kmeans_res (kmeans result).
To join the two RDDS the knowledge was obtaing from a stackoverflow post on how to perform basic joins of two rdd tables in spark using python.
In [32]:
//The zip operator would be the most appropriated operator for this operation.
//However, it requires the RRDs to have the same number of partitions and each partition have the same number of records.
//val cluster_cell_pos = res.zip(band0_index)
//Since we can't use Zip, we index each RDD and then we join them.
val cluster_cell_pos = ((kmeans_res.zipWithIndex().map{ case (v,i) => (i,v)}).join(grid0_index.zipWithIndex().map{ case (v,i) => (i,v)})).map{ case (k,(v,i)) => (v,i)}
Out[32]:
In [50]:
val grid_clusters = grid0.leftOuterJoin(cluster_cell_pos.map{ case (c,i) => (i.toLong, c)})
//Convert all None to NaN
val grid_clusters_res = grid_clusters.sortByKey(true).map{case (k, (v, c)) => if (c == None) (k, Double.NaN) else (k, c.get.toDouble)}//.collect().foreach(println)
Out[50]:
In [54]:
val cluster_cells :Array[Double] = grid_clusters_res.values.collect()
//Define a Tile
val cluster_cellsD = DoubleArrayTile(cluster_cells, num_cols_rows._1, num_cols_rows._2)
val cluster_tile = geotrellis.raster.DoubleArrayTile.empty(cellT, num_cols_rows._1, num_cols_rows._2)
cfor(0)(_ < num_cols_rows._1, _ + 1) { col =>
cfor(0)(_ < num_cols_rows._2, _ + 1) { row =>
val v = cluster_cellsD.get(col, row)
cluster_tile.setDouble(col, row, v)
}
}
//Create GeoTiff
val geoTif = new SinglebandGeoTiff(cluster_tile, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions.DEFAULT)
In [55]:
var output:String = ""
val tmp_output = "/tmp/clusters.tif"
GeoTiffWriter.write(geoTif, tmp_output)
if (single_band) {
output = "/user/pheno/spring-index/LastFreeze/clusters.tif"
} else {
output = "/user/pheno/spring-index/BloomFinal/clusters.tif"
}
Out[55]:
In [57]:
val cmd = "hadoop dfs -copyFromLocal -f " + tmp_output + " " + output
Process(cmd)!
Out[57]:
In [ ]: