Kmeans over a set of GeoTiffs

This notebook loads a set of GeoTiffs into a RDD of Tiles, with each Tile being a band in the GeoTiff. Each GeoTiff file contains MODIS MCD12Q2 v005 for one year over the entire USA. The data ranges from 2001 until 2014.

Kmeans takes years as dimensions. Hence, the matrix has cells as rows and the years as columns. To cluster on all years, the matrix needs to be transposed. The notebook has two flavors of matrix transpose, locally by the Spark-driver or distributed using the Spark-workers. Once transposed the matrix is converted to a RDD of dense vectors to be used by Kmeans algorithm from Spark-MLlib. The end result is a grid where each cell has a cluster ID which is then saved into a SingleBand GeoTiff. By saving the result into a GeoTiff, the reader can plot it using a Python notebook as the one defined in the python examples.

In this notebook the reader only needs to modify the variables in **Mode of Operation Setup**.

Dependencies


In [1]:
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

import geotrellis.proj4.CRS
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.io.geotiff.{SinglebandGeoTiff, _}
import geotrellis.raster.{CellType, DoubleArrayTile, MultibandTile, Tile, UByteCellType}
import geotrellis.spark.io.hadoop._
import geotrellis.vector.{Extent, ProjectedExtent}
import org.apache.hadoop.io.SequenceFile.Writer
import org.apache.hadoop.io.{SequenceFile, _}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.sys.process._

//Spire is a numeric library for Scala which is intended to be generic, fast, and precise.
import spire.syntax.cfor._

Mode of operation

Here the user can define the mode of operation.

  • rdd_offline_mode: If false it means the notebook will create all data from scratch and store protected_extent and num_cols_rows into HDFS. Otherwise, these data structures are read from HDFS.
  • matrix_offline_mode: If false it means the notebook will create a mtrix, transposed it and save it to HDFS. Otherwise, these data structures are read from HDFS.
  • kmeans_offline_mode: If false it means the notebook will train kmeans and run kemans and store kmeans model into HDFS. Otherwise, these data structures are read from HDFS.

It is also possible to define which directory of GeoTiffs is to be used and on which band to run Kmeans. The options are

  • all which are a multi-band (8 bands) GeoTiffs
  • Or choose single band ones:
    1. Onset_Greenness_Increase
    2. Onset_Greenness_Maximum
    3. Onset_Greenness_Decrease
    4. Onset_Greenness_Minimum
    5. NBAR_EVI_Onset_Greenness_Minimum
    6. NBAR_EVI_Onset_Greenness_Maximum
    7. NBAR_EVI_Area
    8. Dynamics_QC

For kmeans the user can define the number of iterations and number of clusters as an inclusive range. Such range is defined using minClusters, maxClusters, and stepClusters. These variables will set a loop starting at minClusters and stopping at maxClusters (inclusive), iterating stepClusters at the time. Note that when using a range **kemans offline mode** is not possible and it will be reset to **online mode**.

Mode of Operation setup


In [2]:
var rdd_offline_mode = true
var matrix_offline_mode = true
var kmeans_offline_mode = true

//GeoTiffs to be read from "hdfs:///user/hadoop/modis/"
var dir_path = "hdfs:///user/hadoop/spring-index/"
var offline_dir_path = "hdfs:///user/emma/spring-index/"
var geoTiff_dir = "LeafFinal"

//SOST has 1 band - 1st is 0
//Spring-index has 4 bands - 4th is 3
var band_num = 3

//SOST years between (inclusive) 1989 - 2014
//Spring-index years between (inclusive) 1980 - 2015
val satellite_timeseries = (1980, 2015)
var satellite_first_year = 1989
var satellite_last_year = 2014

//Mask
val toBeMasked = true
val mask_path = "hdfs:///user/hadoop/usa_mask.tif"

//Kmeans number of iterations and clusters
var numIterations = 75
var minClusters = 70
var maxClusters = 70
var stepClusters = 10
var save_rdds = false
var save_matrix = false
var save_kmeans_model = false


rdd_offline_mode = true
matrix_offline_mode = true
kmeans_offline_mode = true
dir_path = hdfs:///user/hadoop/spring-index/
offline_dir_path = hdfs:///user/emma/spring-index/
geoTiff_dir = LeafFinal
band_num = 3
satellite_timeseries = (1980,2015)
satellite_first_year = 1989
satellite_last_year = 2014
toBeMasked = true
mask_path = hdfs:///user/hadoop/usa_mask.tif
numIterations = 75
minClusters = 70
maxClusters = 70
stepClusters = 10
save_rdds = false
save_matrix = false
save_kmeans_model = false
Out[2]:
false

DON'T MODIFY ANY PIECE OF CODE FROM HERE ON!!!.

Mode of operation validation


In [3]:
if (minClusters > maxClusters) {
    maxClusters = minClusters
    stepClusters = 1
}
if (stepClusters < 1) {
    stepClusters = 1
}

val pattern: String = "*.tif"

//Paths to store data structures for Offline runs
var mask_str = ""
if (toBeMasked)
    mask_str = "_mask"
var grid0_path = offline_dir_path + geoTiff_dir + "CentroidS/grid0" + "_"+ band_num + mask_str
var grid0_index_path = offline_dir_path + geoTiff_dir + "CentroidS/grid0_index" + "_"+ band_num + mask_str
var grids_noNaN_path = offline_dir_path + geoTiff_dir + "CentroidS/grids_noNaN" + "_"+ band_num + mask_str
var metadata_path = offline_dir_path + geoTiff_dir + "CentroidS/metadata" + "_"+ band_num + mask_str
var grids_matrix_path = offline_dir_path + geoTiff_dir + "CentroidS/grids_matrix" + "_"+ band_num + mask_str
var grids_matrix_index_path = offline_dir_path + geoTiff_dir + "CentroidS/grids_matrix_index" + "_" + band_num + mask_str

//Check offline modes
var conf = sc.hadoopConfiguration
var fs = org.apache.hadoop.fs.FileSystem.get(conf)

val rdd_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(grid0_path))
val matrix_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(grids_matrix_path))
                                      
if (rdd_offline_mode != rdd_offline_exists) {
    println("\"Load GeoTiffs\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + rdd_offline_exists.toString())
    rdd_offline_mode = rdd_offline_exists
} 
if (matrix_offline_mode != matrix_offline_exists) {
    println("\"Matrix\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + matrix_offline_exists.toString())
    matrix_offline_mode = matrix_offline_exists
}

if (!fs.exists(new org.apache.hadoop.fs.Path(mask_path))) {
    println("The mask path: " + mask_path + " is invalid!!!")
}

//Years
val satellite_years = satellite_timeseries._1 to satellite_timeseries._2

if (!satellite_years.contains(satellite_first_year) || !(satellite_years.contains(satellite_last_year))) {
  println("Invalid range of years for " + geoTiff_dir + ". I should be between " + satellite_first_year + " and " + satellite_last_year)
  System.exit(0)
}

var satellite_years_range = (satellite_years.indexOf(satellite_first_year), satellite_years.indexOf(satellite_last_year))

var num_kmeans :Int  = 1
if (minClusters != maxClusters) {
    num_kmeans = ((maxClusters - minClusters) / stepClusters) + 1
}
println(num_kmeans)
var kmeans_model_paths :Array[String] = Array.fill[String](num_kmeans)("")
var wssse_path :String = offline_dir_path + geoTiff_dir + "CentroidS/" + numIterations + "_wssse"
var geotiff_hdfs_paths :Array[String] = Array.fill[String](num_kmeans)("")
var geotiff_tmp_paths :Array[String] = Array.fill[String](num_kmeans)("")
var numClusters_id = 0

if (num_kmeans > 1) {
    numClusters_id = 0
    cfor(minClusters)(_ <= maxClusters, _ + stepClusters) { numClusters =>
        kmeans_model_paths(numClusters_id) = offline_dir_path + geoTiff_dir + "CentroidS/kmeans_model_" + band_num + "_" + numClusters + "_" + numIterations
        
        //Check if the file exists
        val kmeans_exist = fs.exists(new org.apache.hadoop.fs.Path(kmeans_model_paths(numClusters_id)))
        if (kmeans_exist && !kmeans_offline_mode) {
            println("The kmeans model path " + kmeans_model_paths(numClusters_id) + " exists, please remove it.")
        } else if (!kmeans_exist && kmeans_offline_mode) {
            kmeans_offline_mode = false
        }
        
        geotiff_hdfs_paths(numClusters_id) = offline_dir_path + geoTiff_dir + "CentroidS/clusters_" + band_num + "_" + numClusters + "_" + numIterations + ".tif"
        geotiff_tmp_paths(numClusters_id) = "/tmp/clusters_" + band_num + "_" + geoTiff_dir + "_" + numClusters + "_" + numIterations + ".tif"
        if (fs.exists(new org.apache.hadoop.fs.Path(geotiff_hdfs_paths(numClusters_id)))) {
            println("There is already a GeoTiff with the path: " + geotiff_hdfs_paths(numClusters_id) + ". Please make either a copy or move it to another location, otherwise, it will be over-written.")
        }
        numClusters_id += 1
    }
    kmeans_offline_mode = false
} else { 
    kmeans_model_paths(0) = offline_dir_path + geoTiff_dir + "CentroidS/kmeans_model_" + band_num + "_" + minClusters + "_" + numIterations
    val kmeans_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(kmeans_model_paths(0)))
    if (kmeans_offline_mode != kmeans_offline_exists) {
        println("\"Kmeans\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + kmeans_offline_exists.toString())
        kmeans_offline_mode = kmeans_offline_exists
    }
    geotiff_hdfs_paths(0) = offline_dir_path + geoTiff_dir + "CentroidS/clusters_" + band_num + "_" + minClusters + "_" + numIterations + ".tif"
    geotiff_tmp_paths(0) = "/tmp/clusters_" + band_num + "_" + geoTiff_dir + "_" + minClusters + "_" + numIterations + ".tif"
    if (fs.exists(new org.apache.hadoop.fs.Path(geotiff_hdfs_paths(0)))) {
        println("There is already a GeoTiff with the path: " + geotiff_hdfs_paths(0) + ". Please make either a copy or move it to another location, otherwise, it will be over-written.")
    }
}


Waiting for a Spark session to start...
"Load GeoTiffs" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
"Matrix" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
1
"Kmeans" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
pattern = *.tif
mask_str = _mask
grid0_path = hdfs:///user/emma/spring-index/LeafFinalCentroidS/grid0_3_mask
grid0_index_path = hdfs:///user/emma/spring-index/LeafFinalCentroidS/grid0_index_3_mask
grids_noNaN_path = hdfs:///user/emma/spring-index/LeafFinalCentroidS/grids_noNaN_3_mask
metadata_path = hdfs:///user/emma/spring-index/LeafFinalCentroidS/metadata_3_mask
grids_matrix_path = hdfs:///user/emma/spring-index/LeafFinalCentroidS/grids_matrix_3_mask
grids_matrix_index_path = hdfs:///user/emma/spring-index/LeafFinalCentroidS/grids_matrix_index_3_mask
conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-defaul...
Out[3]:
Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, file:/usr/lib/spark-2.1.1-bin-without-hadoop/conf/hive-site.xml

Functions to (de)serialize any structure into Array[Byte]


In [4]:
def serialize(value: Any): Array[Byte] = {
    val out_stream: ByteArrayOutputStream = new ByteArrayOutputStream()
    val obj_out_stream = new ObjectOutputStream(out_stream)
    obj_out_stream.writeObject(value)
    obj_out_stream.close
    out_stream.toByteArray
}

def deserialize(bytes: Array[Byte]): Any = {
    val obj_in_stream = new ObjectInputStream(new ByteArrayInputStream(bytes))
    val value = obj_in_stream.readObject
    obj_in_stream.close
    value
}


serialize: (value: Any)Array[Byte]
deserialize: (bytes: Array[Byte])Any

Load GeoTiffs

Using GeoTrellis all GeoTiffs of a directory will be loaded into a RDD. Using the RDD, we extract a grid from the first file to lated store the Kmeans cluster_IDS, we build an Index for populate such grid and we filter out here all NaN values.


In [5]:
def hadoopGeoTiffRDD(satellite_filepath :String, pattern :String): RDD[(Int, (ProjectedExtent, Tile))] = {
  val listFiles = sc.binaryFiles(satellite_filepath + "/" + pattern).sortBy(_._1).keys.collect()
  var prevRDD :RDD[(Int,(ProjectedExtent, Tile))] = sc.emptyRDD

  cfor(0)(_ < listFiles.length, _ + 1) { k =>
    val filePath :String = listFiles(k)
    val kB = sc.broadcast(k)
    val currRDD = sc.hadoopGeoTiffRDD(filePath).map(m => (kB.value, m))
    prevRDD = currRDD.union(prevRDD)
    //kB.destroy()
  }
  prevRDD.sortBy(_._1)
}


hadoopGeoTiffRDD: (satellite_filepath: String, pattern: String)org.apache.spark.rdd.RDD[(Int, (geotrellis.vector.ProjectedExtent, geotrellis.raster.Tile))]

In [6]:
def hadoopMultibandGeoTiffRDD(satellite_filepath :String, pattern :String): RDD[(Int, (ProjectedExtent, MultibandTile))] = {
  val listFiles = sc.binaryFiles(satellite_filepath + "/" + pattern).sortBy(_._1).keys.collect()
  var prevRDD :RDD[(Int,(ProjectedExtent, MultibandTile))] = sc.emptyRDD

  cfor(0)(_ < listFiles.length, _ + 1) { k =>
    val filePath :String = listFiles(k)
    val kB = sc.broadcast(k)
    val currRDD = sc.hadoopMultibandGeoTiffRDD(filePath).map(m => (kB.value,m))
    prevRDD = currRDD.union(prevRDD)
    //kB.destroy()
  }
  prevRDD.sortBy(_._1)
}


hadoopMultibandGeoTiffRDD: (satellite_filepath: String, pattern: String)org.apache.spark.rdd.RDD[(Int, (geotrellis.vector.ProjectedExtent, geotrellis.raster.MultibandTile))]

In [14]:
var t0 = System.nanoTime()
//Global variables
var projected_extent = new ProjectedExtent(new Extent(0,0,0,0), CRS.fromName("EPSG:3857"))
var grid0: RDD[(Long, Double)] = sc.emptyRDD
var grid0_index: RDD[Long] = sc.emptyRDD
var grids_noNaN_RDD: RDD[(Int, Array[Double])] = sc.emptyRDD
var num_cols_rows :(Int, Int) = (0, 0)
var cellT :CellType = UByteCellType
var grids_RDD :RDD[(Int, Array[Double])] = sc.emptyRDD
var mask_tile0 :Tile = new SinglebandGeoTiff(geotrellis.raster.ArrayTile.empty(cellT, num_cols_rows._1, num_cols_rows._2), projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions.DEFAULT).tile
var grid_cells_size :Long = 0

//Load Mask
if (toBeMasked) {
  val mask_tiles_RDD = sc.hadoopGeoTiffRDD(mask_path).values
  val mask_tiles_withIndex = mask_tiles_RDD.zipWithIndex().map{case (e,v) => (v,e)}
  mask_tile0 = (mask_tiles_withIndex.filter(m => m._1==0).values.collect())(0)
}

//Local variables
val filepath: String = dir_path + geoTiff_dir

if (rdd_offline_mode) {
  grids_noNaN_RDD = sc.objectFile(grids_noNaN_path)
  grid0 = sc.objectFile(grid0_path)
  grid0_index = sc.objectFile(grid0_index_path)

  val metadata = sc.sequenceFile(metadata_path, classOf[IntWritable], classOf[BytesWritable]).map(_._2.copyBytes()).collect()
  projected_extent = deserialize(metadata(0)).asInstanceOf[ProjectedExtent]
  num_cols_rows = (deserialize(metadata(1)).asInstanceOf[Int], deserialize(metadata(2)).asInstanceOf[Int])
  cellT = deserialize(metadata(3)).asInstanceOf[CellType]
} else {
  if (band_num == 0) {
    //Lets load a Singleband GeoTiffs and return RDD just with the tiles.
    var geos_RDD = hadoopGeoTiffRDD(filepath, pattern)
    geos_RDD.cache()
    var tiles_RDD = geos_RDD.map{ case (i,(p,t)) => (i,t)}

    //Retrive the numbre of cols and rows of the Tile's grid
    val tiles_withIndex = tiles_RDD//.zipWithIndex().map{case (e,v) => (v,e)}
    val tile0 = (tiles_withIndex.filter(m => m._1==0).values.collect())(0)
    num_cols_rows = (tile0.cols,tile0.rows)
    cellT = tile0.cellType

    //Retrieve the ProjectExtent which contains metadata such as CRS and bounding box
    val projected_extents_withIndex = geos_RDD.map{ case (i,(p,t)) => (i,p)}//.keys.zipWithIndex().map { case (e, v) => (v, e) }
    projected_extent = (projected_extents_withIndex.filter(m => m._1 == 0).values.collect()) (0)

    if (toBeMasked) {
      val mask_tile_broad :Broadcast[Tile] = sc.broadcast(mask_tile0)
      grids_RDD = tiles_RDD.map{ case (i,m) => (i,m.localInverseMask(mask_tile_broad.value, 1, -1000).toArrayDouble())}
    } else {
      grids_RDD = tiles_RDD.map{ case (i,m) => (i, m.toArrayDouble())}
    }
  } else {
    //Lets load Multiband GeoTiffs and return RDD just with the tiles.
    val geos_RDD = hadoopMultibandGeoTiffRDD(filepath, pattern)
    geos_RDD.cache()
    var tiles_RDD = geos_RDD.map{ case (i,(p,t)) => (i,t)}

    //Retrive the numbre of cols and rows of the Tile's grid
    val tiles_withIndex = tiles_RDD//.zipWithIndex().map{case (e,v) => (v,e)}
    val tile0 = (tiles_withIndex.filter(m => m._1==0).values.collect())(0)
    num_cols_rows = (tile0.cols,tile0.rows)
    cellT = tile0.cellType

    //Retrieve the ProjectExtent which contains metadata such as CRS and bounding box
    val projected_extents_withIndex = geos_RDD.map{ case (i,(p,t)) => (i,p)}//.keys.zipWithIndex().map { case (e, v) => (v, e) }
    projected_extent = (projected_extents_withIndex.filter(m => m._1 == 0).values.collect()) (0)

    //Lets read the average of the Spring-Index which is stored in the 4th band
    val band_numB :Broadcast[Int] = sc.broadcast(band_num)
    if (toBeMasked) {
      val mask_tile_broad :Broadcast[Tile] = sc.broadcast(mask_tile0)
      grids_RDD = tiles_RDD.map{ case (i,m) => (i,m.band(band_numB.value).localInverseMask(mask_tile_broad.value, 1, -1000).toArrayDouble())}
    } else {
      grids_RDD = tiles_RDD.map{ case (i,m) => (i, m.band(band_numB.value).toArrayDouble())}
    }
  }

  //Get Index for each Cell
  val grids_withIndex = grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }
  if (toBeMasked) {
    grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.filter(m => m._1 != -1000.0).map { case (v, i) => (i) }
  } else {
    grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.map { case (v, i) => (i) }

  }
  //Get the Tile's grid
  grid0 = grids_withIndex.filter(m => m._1 == 0).values.flatMap( m => m).zipWithIndex.map{case (v,i) => (i,v)}

  //Lets filter out NaN
  if (toBeMasked) {
    grids_noNaN_RDD = grids_RDD.map{case (i,m) => (i,m.filter(m => m != -1000.0))}
  } else {
    grids_noNaN_RDD = grids_RDD
  }
  //Store data in HDFS
  if (save_rdds) {
    grid0.saveAsObjectFile(grid0_path)
    grid0_index.saveAsObjectFile(grid0_index_path)
    grids_noNaN_RDD.saveAsObjectFile(grids_noNaN_path)
  }

  val grids_noNaN_RDD_withIndex = grids_noNaN_RDD//.zipWithIndex().map { case (e, v) => (v, e) }
  val sat_year_diff = satellite_first_year-satellite_timeseries._1
  val sat_year_diffB = sc.broadcast(sat_year_diff)
  grids_noNaN_RDD = grids_noNaN_RDD_withIndex.filterByRange(satellite_years_range._1, satellite_years_range._2).map{ case(i,a) => (i-(sat_year_diffB.value),a)}

  if (save_rdds) {
    val writer: SequenceFile.Writer = SequenceFile.createWriter(conf,
      Writer.file(metadata_path),
      Writer.keyClass(classOf[IntWritable]),
      Writer.valueClass(classOf[BytesWritable])
    )

    writer.append(new IntWritable(1), new BytesWritable(serialize(projected_extent)))
    writer.append(new IntWritable(2), new BytesWritable(serialize(num_cols_rows._1)))
    writer.append(new IntWritable(3), new BytesWritable(serialize(num_cols_rows._2)))
    writer.append(new IntWritable(4), new BytesWritable(serialize(cellT)))
    writer.hflush()
    writer.close()
  }
}
grid_cells_size = grid0_index.count().toInt
var t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


[Stage 14:======================================================> (35 + 1) / 36]Elapsed time: 413825417121ns
t0 = 138782016301563
projected_extent = ProjectedExtent(Extent(-126.30312894720473, 14.29219617034159, -56.162671563152486, 49.25462702827337),geotrellis.proj4.CRS$$anon$3@41d0d1b7)
grid0 = MapPartitionsRDD[185] at map at <console>:172
grid0_index = MapPartitionsRDD[180] at map at <console>:166
grids_noNaN_RDD = MapPartitionsRDD[188] at map at <console>:190
num_cols_rows = (7808,3892)
cellT = float64raw
grids_RDD = MapPartitionsRDD[174] at map at <console>:157
mask_tile0 = FloatRawArrayTile([F@6db0fe00,7808,3892)
gri...
Out[14]:
FloatRawArrayTile([F@6db0fe00,7808,3892)

Matrix

We need to do a Matrix transpose to have clusters per cell and not per year. With a GeoTiff representing a single year, the loaded data looks liks this:

bands_RDD.map(s => Vectors.dense(s)).cache()

//The vectors are rows and therefore the matrix will look like this:
[
Vectors.dense(0.0, 1.0, 2.0),
Vectors.dense(3.0, 4.0, 5.0),
Vectors.dense(6.0, 7.0, 8.0),
Vectors.dense(9.0, 0.0, 1.0)
]

To achieve that we convert the RDD[Vector] into a distributed Matrix, a CoordinateMatrix, which as a transpose method.


In [15]:
t0 = System.nanoTime()
//Global variables
var grids_matrix: RDD[Vector] = sc.emptyRDD
var grids_matrix_index :RDD[(Long, Long)] = sc.emptyRDD
val grid_cells_sizeB = sc.broadcast(grid_cells_size)

if (matrix_offline_mode) {
  grids_matrix = sc.objectFile(grids_matrix_path)
  grids_matrix_index = sc.objectFile(grids_matrix_index_path)
} else {
  val mat :IndexedRowMatrix = new IndexedRowMatrix(grids_noNaN_RDD.map{ case (i, m) => (i,m.zipWithIndex)}.map{ case (i,m) => (i,m.filter(!_._1.isNaN))}.map{ case (i,m) =>  new IndexedRow(i.toLong, Vectors.sparse(grid_cells_sizeB.value.toInt, m.map(v => v._2), m.map(v => v._1)))})
  val mat_T = mat.toCoordinateMatrix().transpose().toIndexedRowMatrix().rows.sortBy(_.index)
  grids_matrix = mat_T.map(_.vector)
  grids_matrix_index = mat_T.map(_.index).zipWithIndex().map{ case (v,i) => (i,v)}

  if (save_matrix) {
    grids_matrix.saveAsObjectFile(grids_matrix_path)
    grids_matrix_index.saveAsObjectFile(grids_matrix_index_path)
  }
}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


[Stage 29:=====================================================>(998 + 1) / 999]Elapsed time: 211999884926ns
t0 = 139204709039895
grids_matrix = MapPartitionsRDD[205] at map at <console>:83
grids_matrix_index = MapPartitionsRDD[208] at map at <console>:84
grid_cells_sizeB = Broadcast(88)
t1 = 139416708924821
Out[15]:
139416708924821

Kmeans

We use Kmeans from Sparl-MLlib. The user should only modify the variables on Kmeans setup.

Kmeans Training


In [16]:
t0 = System.nanoTime()
//Global variables
var kmeans_models :Array[KMeansModel] = new Array[KMeansModel](num_kmeans)
var wssse_data :List[(Int, Int, Double)] = List.empty
var numClusters_id = 0

if (kmeans_offline_mode) {
    numClusters_id = 0
    cfor(minClusters)(_ <= maxClusters, _ + stepClusters) { numClusters =>
        if (!fs.exists(new org.apache.hadoop.fs.Path(kmeans_model_paths(numClusters_id)))) {
            println("One of the files does not exist, we will abort!!!")
            System.exit(0)
        } else {
            kmeans_models(numClusters_id) = KMeansModel.load(sc, kmeans_model_paths(numClusters_id))
        }
        numClusters_id += 1
    }
    val wssse_data_RDD :RDD[(Int, Int, Double)]  = sc.objectFile(wssse_path)
    wssse_data  = wssse_data_RDD.collect().toList
} else {
    numClusters_id = 0
    if (fs.exists(new org.apache.hadoop.fs.Path(wssse_path))) {
        val wssse_data_RDD :RDD[(Int, Int, Double)]  = sc.objectFile(wssse_path)
        wssse_data  = wssse_data_RDD.collect().toList
    }
    grids_matrix.cache()
    cfor(minClusters)(_ <= maxClusters, _ + stepClusters) { numClusters =>
        println(numClusters)
        kmeans_models(numClusters_id) = {
            KMeans.train(grids_matrix, numClusters, numIterations)
        }

        // Evaluate clustering by computing Within Set Sum of Squared Errors
        val WSSSE = kmeans_models(numClusters_id).computeCost(grids_matrix)
        println("Within Set Sum of Squared Errors = " + WSSSE)
                
        wssse_data = wssse_data :+ (numClusters, numIterations, WSSSE)
        
        //Save kmeans model
        if (save_kmeans_model) {
            if (!fs.exists(new org.apache.hadoop.fs.Path(kmeans_model_paths(numClusters_id)))) {
                kmeans_models(numClusters_id).save(sc, kmeans_model_paths(numClusters_id))
            }
        }
        numClusters_id += 1
        
        if (fs.exists(new org.apache.hadoop.fs.Path(wssse_path))) {
            println("We will delete the wssse file")
            try { fs.delete(new org.apache.hadoop.fs.Path(wssse_path), true) } catch { case _ : Throwable => { } }
        }
    
        println("Lets create it with the new data")
        sc.parallelize(wssse_data, 1).saveAsObjectFile(wssse_path)
    }

    //Un-persist it to save memory
    grids_matrix.unpersist()
    

}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


70
Within Set Sum of Squared Errors = 4.7614399725666275E9                         000] 1000]:===================================================>(997 + 3) / 1000]ge 233:===>                                                (76 + 48) / 1000]     (454 + 48) / 1000]     (97 + 48) / 1000] 1000][Stage 352:================================================>  (950 + 17) / 1000]176 + 48) / 1000]8) / 1000]9) / 1000]
Lets create it with the new data
Elapsed time: 1355942363855ns
t0 = 139420252893441
kmeans_models = Array(org.apache.spark.mllib.clustering.KMeansModel@51153afc)
wssse_data = List((70,75,4.7614399725666275E9))
numClusters_id = 1
t1 = 140776195257296
Out[16]:
140776195257296

Inspect WSSSE


In [17]:
t0 = System.nanoTime()

//from disk
if (fs.exists(new org.apache.hadoop.fs.Path(wssse_path))) {
    var wssse_data_tmp :RDD[(Int, Int, Double)] = sc.objectFile(wssse_path)//.collect()//.toList
    println(wssse_data_tmp.collect().toList)    
}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


List((70,75,4.7614399725666275E9))
Elapsed time: 330925035ns
t0 = 140780254649693
t1 = 140780585574728
Out[17]:
140780585574728

Run Kmeans clustering

Run Kmeans and obtain the clusters per each cell.


In [18]:
t0 = System.nanoTime()
//Cache it so kmeans is more efficient
grids_matrix.cache()
var kmeans_res: Array[RDD[Int]] = Array.fill(num_kmeans)(sc.emptyRDD)
var kmeans_centroids: Array[Array[Double]] = Array.fill(num_kmeans)(Array.emptyDoubleArray)
numClusters_id = 0
cfor(minClusters)(_ <= maxClusters, _ + stepClusters) { numClusters =>
  kmeans_res(numClusters_id) = kmeans_models(numClusters_id).predict(grids_matrix)
  kmeans_centroids(numClusters_id) = kmeans_models(numClusters_id).clusterCenters.map(m => m(0))
  numClusters_id += 1
}


t0 = 140781709202785
kmeans_res = Array(MapPartitionsRDD[383] at map at KMeansModel.scala:69)
kmeans_centroids = Array(Array(71.11392941974889, 111.57386817189229, 81.65391493862376, 54.987484123378735, 103.6049992662525, 85.10577544922435, 17.28197722396435, 11.509277585353786, 30.522914943142986, 20.67316706884108, 104.16783725639078, 105.6449527366811, 128.66094518669857, 114.36864697717381, 100.91983143343735, 30.036185944363105, 112.4413995218386, 95.58824947447593, 86.5361769711713, 68.59102071421877, 24.62858004052402, 89.20943485586068, 75.1981274539414, 78.47561126204002, 59.37507118000632, 39.69136661333121, 111.162352782371, 71.6681698962615, 122.80037532455292, 121.93213862112094, 27.6830257143854, 94.48090884...
Out[18]:
[[71.11392941974889, 111.57386817189229, 81.65391493862376, 54.987484123378735, 103.6049992662525, 85.10577544922435, 17.28197722396435, 11.509277585353786, 30.522914943142986, 20.67316706884108, 104.16783725639078, 105.6449527366811, 128.66094518669857, 114.36864697717381, 100.91983143343735, 30.036185944363105, 112.4413995218386, 95.58824947447593, 86.5361769711713, 68.59102071421877, 24.62858004052402, 89.20943485586068, 75.1981274539414, 78.47561126204002, 59.37507118000632, 39.69136661333121, 111.162352782371, 71.6681698962615, 122.80037532455292, 121.93213862112094, 27.6830257143854, 94.48090884279476, 84.65716885628163, 138.89429944163095, 103.86543230099242, 30.83636006488569, 114.80767384691224, 95.56624505517384, 100.3732211900289, 54.90443031322262, 24.876139771743727, 65.62512671873439, 64.74232052812354, 106.08614046045976, 100.12724048388118, 43.92133611153571, 107.55623209559185, 71.05731585177092, 70.62395522930446, 90.65690859912092, 109.63046294497035, 75.16709853935349, 41.20797623571933, 117.76564989084902, 124.04659458866143, 52.71649225372698, 79.0821302602451, 50.73969019460823, 35.14974280655529, 95.64584416699063, 29.053548163230566, 153.35135456273764, 82.35861720706124, 72.5543758355453, 92.4842136199513, 81.40834435008627, 24.91249350509738, 89.79850771565995, 29.997419448037665, 41.34826269262612]]

Sanity test

It can be skipped, it only shows the cluster ID for the first 50 cells


In [19]:
t0 = System.nanoTime()
val kmeans_res_out = kmeans_res(0).filter(_!= 0).filter(_!=1).take(150)
kmeans_res_out.foreach(print)

println(kmeans_res_out.size)
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929292929150
Elapsed time: 382863476ns
t0 = 140783004404483
kmeans_res_out = Array(29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29, 29)
t1 = 140783387267959
Out[19]:
140783387267959

Build GeoTiff with Kmeans cluster_IDs

The Grid with the cluster IDs is stored in a SingleBand GeoTiff and uploaded to HDFS.

Assign cluster ID to each grid cell and save the grid as SingleBand GeoTiff

To assign the clusterID to each grid cell it is necessary to get the indices of gird cells they belong to. The process is not straight forward because the ArrayDouble used for the creation of each dense Vector does not contain the NaN values, therefore there is not a direct between the indices in the Tile's grid and the ones in kmeans_res (kmeans result).

To join the two RDDS the knowledge was obtaing from a stackoverflow post on how to perform basic joins of two rdd tables in spark using python.


In [21]:
t0 = System.nanoTime()
numClusters_id = 0
val grid0_index_I = grid0_index.zipWithIndex().map{ case (v,i) => (i,v)}
grid0_index_I.cache()
grid0.cache()
grids_matrix_index.cache()

cfor(minClusters)(_ <= maxClusters, _ + stepClusters) { numClusters =>
  val kmeans_out = (kmeans_res(numClusters_id).zipWithIndex().map{ case (v,i) => (i,v)}).join(grids_matrix_index).map{ case (z,(k,i)) => (i,k)}
  val cluster_cell_pos = (kmeans_out.join(grid0_index_I).map{ case (k,(v,i)) => (v,i)})
    
  //Associate a Cluster_IDs to respective Grid_cell
  val grid_clusters = grid0.map{ case (i, v) => if (v == 0.0) (i, Double.NaN) else (i,v)}.leftOuterJoin(cluster_cell_pos.map{ case (c,i) => (i.toLong, c)})

  //Convert all None to NaN
  val grid_clusters_res = grid_clusters.sortByKey(true).map{case (k, (v, c)) => if (c == None) (k, Int.MaxValue) else (k, c.get)}

  //Define a Tile
  val cluster_cellsID :Array[Int] = grid_clusters_res.values.collect()
  var cluster_cells :Array[Double] = Array.fill(cluster_cellsID.length)(Double.NaN)
  cfor(0)(_ < cluster_cellsID.size, _ + 1) { cellID =>
    if (cluster_cellsID(cellID) != Int.MaxValue) {
      cluster_cells(cellID) = kmeans_centroids(numClusters_id)(cluster_cellsID(cellID))
    }
  }
  val cluster_cellsD = DoubleArrayTile(cluster_cells, num_cols_rows._1, num_cols_rows._2)
  val geoTif = new SinglebandGeoTiff(cluster_cellsD, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions(compression.DeflateCompression))

  //Save to /tmp/
  GeoTiffWriter.write(geoTif, geotiff_tmp_paths(numClusters_id))

  //Upload to HDFS
  var cmd = "hadoop dfs -copyFromLocal -f " + geotiff_tmp_paths(numClusters_id) + " " + geotiff_hdfs_paths(numClusters_id)
  println(cmd)
  Process(cmd)!

  //Remove from /tmp/
  cmd = "rm -fr " + geotiff_tmp_paths(numClusters_id)
  println(cmd)
  Process(cmd)!

  numClusters_id += 1
}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


[Stage 497:===================================================>(998 + 2) / 1000]]6]]hadoop dfs -copyFromLocal -f /tmp/clusters_3_LeafFinal_70_75.tif hdfs:///user/emma/spring-index/LeafFinalCentroidS/clusters_3_70_75.tif
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

rm -fr /tmp/clusters_3_LeafFinal_70_75.tif
Elapsed time: 405029681667ns
t0 = 141306439862214
numClusters_id = 1
grid0_index_I = MapPartitionsRDD[409] at map at <console>:80
t1 = 141711469543881
lastException: Throwable = null
warning: there were two feature warnings; re-run with -feature for details
Out[21]:
141711469543881

Visualize results --------------- Plot WSSE


In [ ]: