Kmeans over a set of GeoTiffs

This notebook loads a set of GeoTiffs into a RDD of Tiles, with each Tile being a band in the GeoTiff. Each GeoTiff file contains SpringIndex- or LastFreeze- value for one year over the entire USA.

Kmeans takes years as dimensions. Hence, the matrix has cells as rows and the years as columns. To cluster on all years, the matrix needs to be transposed. The notebook has two flavors of matrix transpose, locally by the Spark-driver or distributed using the Spark-workers. Once transposed the matrix is converted to a RDD of dense vectors to be used by Kmeans algorithm from Spark-MLlib. The end result is a grid where each cell has a cluster ID which is then saved into a SingleBand GeoTiff. By saving the result into a GeoTiff, the reader can plot it using a Python notebook as the one defined in the python examples.

In this notebook the reader only needs to modify the variables in **Mode of Operation Setup**.

Dependencies


In [1]:
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

import geotrellis.proj4.CRS
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.io.geotiff.{SinglebandGeoTiff, _}
import geotrellis.raster.{CellType, DoubleArrayTile, MultibandTile, Tile, UByteCellType}
import geotrellis.spark.io.hadoop._
import geotrellis.vector.{Extent, ProjectedExtent}
import org.apache.hadoop.io.SequenceFile.Writer
import org.apache.hadoop.io.{SequenceFile, _}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.sys.process._

//Spire is a numeric library for Scala which is intended to be generic, fast, and precise.
import spire.syntax.cfor._

Mode of operation

Here the user can define the mode of operation.

  • rdd_offline_mode: If false it means the notebook will create all data from scratch and store grid0, grid0_index, protected_extent and num_cols_rows (from grid0) into HDFS. Otherwise, these data structures are read from HDFS.
  • matrix_offline_mode: If false it means the notebook will create a mtrix, transposed it and save it to HDFS. Otherwise, these data structures are read from HDFS.
  • kmeans_offline_mode: If false it means the notebook will train kmeans and run kemans and store kmeans model into HDFS. Otherwise, these data structures are read from HDFS.

It is also possible to define which directory of GeoTiffs is to be used and on which band to run Kmeans. The options are

  • BloomFinal or LeafFinal which are multi-band (4 bands)
  • DamageIndex and LastFreeze which are single-band and if set band_num higher, it will reset to 0

For kmeans the user can define the number of iterations and number of clusters as an inclusive range. Such range is defined using minClusters, maxClusters, and stepClusters. These variables will set a loop starting at minClusters and stopping at maxClusters (inclusive), iterating stepClusters at the time. Note that when using a range **kemans offline mode** is not possible and it will be reset to **online mode**.

Mode of Operation setup


In [2]:
var inpB_rdd_offline_mode = true
var inpA_rdd_offline_mode = true
var matrix_offline_mode = true
var kmeans_offline_mode = true

//Using AVHRR Satellite data
var inpA_path = "hdfs:///user/hadoop/spring-index/"
var inpA_dir = "LeafFinal"

//Using spring-index inpB
var inpB_path = "hdfs:///user/hadoop/spring-index/"
var inpB_dir = "BloomFinal"

var out_path = "hdfs:///user/pheno/kmeans_" + inpA_dir + "_" + inpB_dir + "CentroidS/"
var band_num = 3

//Satellite years between (inclusive) 1989 - 2014
//Model years between (inclusive) 1980 - 2015

val timeseries = (1980, 2015)
var first_year = 1980
var last_year = 2015

//Mask
val toBeMasked = true
val mask_path = "hdfs:///user/hadoop/usa_mask.tif"

//Kmeans number of iterations and clusters
var numIterations = 75
var minClusters = 70
var maxClusters = 70
var stepClusters = 1
var save_kmeans_inpB = false
val save_rdds = false
val save_matrix = false


inpB_rdd_offline_mode = true
inpA_rdd_offline_mode = true
matrix_offline_mode = true
kmeans_offline_mode = true
inpA_path = hdfs:///user/hadoop/spring-index/
inpA_dir = LeafFinal
inpB_path = hdfs:///user/hadoop/spring-index/
inpB_dir = BloomFinal
out_path = hdfs:///user/pheno/kmeans_LeafFinal_BloomFinalCentroidS/
band_num = 3
timeseries = (1980,2015)
first_year = 1980
last_year = 2015
toBeMasked = true
mask_path = hdfs:///user/hadoop/usa_mask.tif
numIterations = 75
minClusters = 70
maxClusters = 70
stepClusters = 1
save_kmeans_inpB = false
save_rdds = false
save_matrix = false
Out[2]:
false

DON'T MODIFY ANY PIECE OF CODE FROM HERE ON!!!.

Mode of operation validation


In [3]:
var conf = sc.hadoopConfiguration
var fs = org.apache.hadoop.fs.FileSystem.get(conf)

//Paths to store data structures for Offline runs
var mask_str = ""
if (toBeMasked)
  mask_str = "_mask"

var inpB_grid_path = out_path + inpB_dir + "_grid"
var inpA_grid_path = out_path + inpA_dir + "_grid"
var grid0_path = out_path + inpB_dir + "_grid0"
var grid0_index_path = out_path + inpB_dir + "_grid0_index"
var matrix_path = out_path + inpB_dir + "_matrix" + "_" + first_year + "_" + last_year
var metadata_path = out_path + inpB_dir + "_metadata"

val inpB_rdd_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(inpB_grid_path))
val inpA_rdd_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(inpA_grid_path))
val matrix_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(matrix_path))

if (minClusters > maxClusters) {
  maxClusters = minClusters
  stepClusters = 1
}
if (stepClusters < 1) {
  stepClusters = 1
}

if (inpB_rdd_offline_mode != inpB_rdd_offline_exists) {
  println("\"Load GeoTiffs\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + inpB_rdd_offline_exists.toString())
  inpB_rdd_offline_mode = inpB_rdd_offline_exists
}

if (inpA_rdd_offline_mode != inpA_rdd_offline_exists) {
  println("\"Load GeoTiffs\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + inpA_rdd_offline_exists.toString())
  inpA_rdd_offline_mode = inpA_rdd_offline_exists
}

if (matrix_offline_mode != matrix_offline_exists) {
  println("\"Matrix\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + matrix_offline_exists.toString())
  matrix_offline_mode = matrix_offline_exists
}

//Years
//Years
val years = timeseries._1 to timeseries._2

if (!years.contains(first_year) || !(years.contains(last_year))) {
  println("Invalid range of years for " + inpB_dir + ". I should be between " + first_year + " and " + last_year)
  System.exit(0)
}

var years_range = (years.indexOf(first_year), years.indexOf(last_year))

var num_kmeans: Int = 1
if (minClusters != maxClusters) {
  num_kmeans = ((maxClusters - minClusters) / stepClusters) + 1
}
println(num_kmeans)
var kmeans_inpB_paths: Array[String] = Array.fill[String](num_kmeans)("")
var wssse_path: String = out_path + "/" + numIterations + "_wssse" + "_" + first_year + "_" + last_year
var geotiff_hdfs_paths: Array[String] = Array.fill[String](num_kmeans)("")
var geotiff_tmp_paths: Array[String] = Array.fill[String](num_kmeans)("")
var numClusters_id = 0

if (num_kmeans > 1) {
  numClusters_id = 0
  cfor(minClusters)(_ <= maxClusters, _ + stepClusters) { numClusters =>
    kmeans_inpB_paths(numClusters_id) = out_path + "/kmeans_inpB_" + band_num + "_" + numClusters + "_" + numIterations + "_" + first_year + "_" + last_year

    //Check if the file exists
    val kmeans_exist = fs.exists(new org.apache.hadoop.fs.Path(kmeans_inpB_paths(numClusters_id)))
    if (kmeans_exist && !kmeans_offline_mode) {
      println("The kmeans inpB path " + kmeans_inpB_paths(numClusters_id) + " exists, please remove it.")
    } else if (!kmeans_exist && kmeans_offline_mode) {
      kmeans_offline_mode = false
    }

    geotiff_hdfs_paths(numClusters_id) = out_path + "/clusters_" + band_num + "_" + numClusters + "_" + numIterations + "_" + first_year + "_" + last_year
    geotiff_tmp_paths(numClusters_id) = "/tmp/clusters_" + band_num + "_" + numClusters + "_" + numIterations
    numClusters_id += 1
  }
  kmeans_offline_mode = false
} else {
  kmeans_inpB_paths(0) = out_path + "/kmeans_inpB_" + band_num + "_" + minClusters + "_" + numIterations + "_" + first_year + "_" + last_year
  val kmeans_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(kmeans_inpB_paths(0)))
  if (kmeans_offline_mode != kmeans_offline_exists) {
    println("\"Kmeans\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + kmeans_offline_exists.toString())
    kmeans_offline_mode = kmeans_offline_exists
  }
  geotiff_hdfs_paths(0) = out_path + "/clusters_" + band_num + "_" + minClusters + "_" + numIterations + "_" + first_year + "_" + last_year
  geotiff_tmp_paths(0) = "/tmp/clusters_" + band_num + "_" + minClusters + "_" + numIterations
}

//Global variables
var inpA_grids_RDD: RDD[(Long,Array[Double])] = sc.emptyRDD
var inpA_grids: RDD[(Long,Array[Double])] = sc.emptyRDD
var inpB_grids_RDD: RDD[(Long, Array[Double])] = sc.emptyRDD
var inpB_grids: RDD[(Long, Array[Double])] = sc.emptyRDD
var projected_extent = new ProjectedExtent(new Extent(0, 0, 0, 0), CRS.fromName("EPSG:3857"))
var grid0: RDD[(Long, Double)] = sc.emptyRDD
var grid0_index: RDD[Long] = sc.emptyRDD
var num_cols_rows: (Int, Int) = (0, 0)
var cellT: CellType = UByteCellType
var mask_tile0: Tile = new SinglebandGeoTiff(geotrellis.raster.ArrayTile.empty(cellT, num_cols_rows._1, num_cols_rows._2), projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions.DEFAULT).tile
//We are comparing 2 data sets only
var cells_size: Long = 2
var t0: Long = 0
var t1: Long = 0


Waiting for a Spark session to start...
"Load GeoTiffs" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
"Load GeoTiffs" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
"Matrix" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
1
"Kmeans" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, file:/usr/lib/spark-2.1.1-bin-without-hadoop/conf/hive-site.xml
fs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1993764880_41, ugi=pheno (auth:SIMPLE)]]
mask_str = _mask
inpB_grid_path = hdfs:///user/pheno/kmeans_LeafFinal_BloomFinalCentroidS/BloomFinal_grid
inpA_grid_path = hdfs:///user/pheno/kmeans_LeafFinal_BloomFinalCentroidS/LeafFinal_grid
grid0_path = hdfs:///user/pheno/kmeans_LeafFinal_BloomFinalCentroidS/BloomFinal_grid0
grid0_index_path = hdfs:///user/pheno/kmeans_LeafFinal_BloomFinalCentroidS/BloomFinal_gri...
Out[3]:
hdfs:///user/pheno/kmeans_LeafFinal_BloomFinalCentroidS/BloomFinal_grid0_index

Functions to (de)serialize any structure into Array[Byte]


In [4]:
def serialize(value: Any): Array[Byte] = {
    val out_stream: ByteArrayOutputStream = new ByteArrayOutputStream()
    val obj_out_stream = new ObjectOutputStream(out_stream)
    obj_out_stream.writeObject(value)
    obj_out_stream.close
    out_stream.toByteArray
}

def deserialize(bytes: Array[Byte]): Any = {
    val obj_in_stream = new ObjectInputStream(new ByteArrayInputStream(bytes))
    val value = obj_in_stream.readObject
    obj_in_stream.close
    value
}


serialize: (value: Any)Array[Byte]
deserialize: (bytes: Array[Byte])Any

Load GeoTiffs

Using GeoTrellis all GeoTiffs of a directory will be loaded into a RDD. Using the RDD, we extract a grid from the first file to lated store the Kmeans cluster_IDS, we build an Index for populate such grid and we filter out here all NaN values.


In [5]:
def hadoopGeoTiffRDD(inpA_filepath :String, pattern :String): RDD[(Int, (ProjectedExtent, Tile))] = {
  val listFiles = sc.binaryFiles(inpA_filepath + "/" + pattern).sortBy(_._1).keys.collect()
  var prevRDD :RDD[(Int,(ProjectedExtent, Tile))] = sc.emptyRDD

  cfor(0)(_ < listFiles.length, _ + 1) { k =>
    val filePath :String = listFiles(k)
    val kB = sc.broadcast(k)
    val currRDD = sc.hadoopGeoTiffRDD(filePath).map(m => (kB.value, m))
    prevRDD = currRDD.union(prevRDD)
    //kB.destroy()
  }
  prevRDD.sortBy(_._1)
}


hadoopGeoTiffRDD: (inpA_filepath: String, pattern: String)org.apache.spark.rdd.RDD[(Int, (geotrellis.vector.ProjectedExtent, geotrellis.raster.Tile))]

In [6]:
def hadoopMultibandGeoTiffRDD(inpA_filepath :String, pattern :String): RDD[(Int, (ProjectedExtent, MultibandTile))] = {
  val listFiles = sc.binaryFiles(inpA_filepath + "/" + pattern).sortBy(_._1).keys.collect()
  var prevRDD :RDD[(Int,(ProjectedExtent, MultibandTile))] = sc.emptyRDD

  cfor(0)(_ < listFiles.length, _ + 1) { k =>
    val filePath :String = listFiles(k)
    val kB = sc.broadcast(k)
    val currRDD = sc.hadoopMultibandGeoTiffRDD(filePath).map(m => (kB.value,m))
    prevRDD = currRDD.union(prevRDD)
    //kB.destroy()
  }
  prevRDD.sortBy(_._1)
}


hadoopMultibandGeoTiffRDD: (inpA_filepath: String, pattern: String)org.apache.spark.rdd.RDD[(Int, (geotrellis.vector.ProjectedExtent, geotrellis.raster.MultibandTile))]

GeoTiffs A


In [7]:
t0 = System.nanoTime()
//Load Mask
if (toBeMasked) {
  val mask_tiles_RDD = sc.hadoopGeoTiffRDD(mask_path).values
  val mask_tiles_withIndex = mask_tiles_RDD.zipWithIndex().map { case (e, v) => (v, e) }
  mask_tile0 = (mask_tiles_withIndex.filter(m => m._1 == 0).filter(m => !m._1.isNaN).values.collect()) (0)
}

//Local variables
val pattern: String = "*.tif"
val inpA_filepath: String = inpA_path + inpA_dir
val inpB_filepath: String = inpB_path + "/" + inpB_dir

t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")

t0 = System.nanoTime()
if (inpA_rdd_offline_mode) {
  inpA_grids_RDD = sc.objectFile(inpA_grid_path)
} else {
  val inpA_geos_RDD = hadoopMultibandGeoTiffRDD(inpA_filepath, pattern)
  val inpA_tiles_RDD = inpA_geos_RDD.map{ case (i,(p,t)) => (i,t)}

  val band_numB :Broadcast[Int] = sc.broadcast(band_num)
  if (toBeMasked) {
    val mask_tile_broad: Broadcast[Tile] = sc.broadcast(mask_tile0)
    inpA_grids_RDD = inpA_tiles_RDD.map{case (i,m) => (i,m.band(band_numB.value).localInverseMask(mask_tile_broad.value, 1, -1000).toArrayDouble().filter(_ != -1000).filter(!_.isNaN))}
  } else {
    inpA_grids_RDD = inpA_tiles_RDD.map{case (i,m) => (i,m.band(band_numB.value).toArrayDouble().filter(!_.isNaN))}
  }

  //Store in HDFS
  if (save_rdds) {
    inpA_grids_RDD.saveAsObjectFile(inpA_grid_path)
  }
}
val inpA_grids_withIndex = inpA_grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }

//Filter out the range of years:
val inpA_year_diff = first_year-timeseries._1
val inputA_year_diffB = sc.broadcast(inpA_year_diff)
inpA_grids = inpA_grids_withIndex.filterByRange(years_range._1, years_range._2).map{ case(i,a) => (i-(inputA_year_diffB.value),a)}//.values

var inpA_grid0_index: RDD[Double] = inpA_grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m)

t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


Elapsed time: 5945968109ns
[Stage 4:=======================================================> (35 + 1) / 36]Elapsed time: 99000596542ns
t0 = 225008938048
pattern = *.tif
inpA_filepath = hdfs:///user/hadoop/spring-index/LeafFinal
inpB_filepath = hdfs:///user/hadoop/spring-index//BloomFinal
t1 = 324009534590
t0 = 225008938048
inpA_grids_withIndex = MapPartitionsRDD[172] at map at <console>:109
inpA_year_diff = 0
inputA_year_diffB = Broadcast(81)
inpA_grids = MapPartitionsRDD[174] at map at <console>:124
inpA_grid0_index = MapPartitionsRDD[177] at flatMap at <console>:126
t1 = 324009534590
Out[7]:
324009534590

GeoTiffs B


In [ ]:
t0 = System.nanoTime()
if (inpB_rdd_offline_mode) {
  inpB_grids_RDD = sc.objectFile(inpB_grid_path)
  grid0 = sc.objectFile(grid0_path)
  grid0_index = sc.objectFile(grid0_index_path)

  val metadata = sc.sequenceFile(metadata_path, classOf[IntWritable], classOf[BytesWritable]).map(_._2.copyBytes()).collect()
  projected_extent = deserialize(metadata(0)).asInstanceOf[ProjectedExtent]
  num_cols_rows = (deserialize(metadata(1)).asInstanceOf[Int], deserialize(metadata(2)).asInstanceOf[Int])
  cellT = deserialize(metadata(3)).asInstanceOf[CellType]
} else {
  val inpB_geos_RDD = hadoopMultibandGeoTiffRDD(inpB_filepath, pattern)
  val inpB_tiles_RDD = inpB_geos_RDD.map{case (i,(p,t)) => (i,t)}

  //Retrieve the number of cols and rows of the Tile's grid
  val tiles_withIndex = inpB_tiles_RDD//.zipWithIndex().map { case (v, i) => (i, v) }
  val tile0 = (tiles_withIndex.filter(m => m._1 == 0).values.collect()) (0)

  num_cols_rows = (tile0.cols, tile0.rows)
  cellT = tile0.cellType

  //Retrieve the ProjectExtent which contains metadata such as CRS and bounding box
  val projected_extents_withIndex = inpB_geos_RDD.map{case (i,(p,t)) => (i,p)}
  projected_extent = (projected_extents_withIndex.filter(m => m._1 == 0).values.collect()) (0)

  val band_numB: Broadcast[Int] = sc.broadcast(band_num)
  if (toBeMasked) {
    val mask_tile_broad: Broadcast[Tile] = sc.broadcast(mask_tile0)
    inpB_grids_RDD = inpB_tiles_RDD.map{ case (i, m) => (i,m.band(band_numB.value).localInverseMask(mask_tile_broad.value, 1, -1000).toArrayDouble())}
  } else {
    inpB_grids_RDD = inpB_tiles_RDD.map{ case (i, m) => (i, m.band(band_numB.value).toArrayDouble())}
  }

  //Get Index for each Cell
  val grids_withIndex = inpB_grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }
  if (toBeMasked) {
    grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.filter(m => m._1 != -1000.0).filter(m => !m._1.isNaN).map { case (v, i) => (i) }
  } else {
    grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.filter(m => !m._1.isNaN).map { case (v, i) => (i) }
  }

  //Get the Tile's grid
  grid0 = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.map { case (v, i) => (i, v) }

  //Lets filter out NaN
  if (toBeMasked) {
    inpB_grids_RDD = inpB_grids_RDD.map{ case (i,m) => (i,m.filter(m => m != -1000.0).filter(m => !m.isNaN))}
  } else {
    inpB_grids_RDD = inpB_grids_RDD.map{ case (i,m) => (i, m.filter(!_.isNaN))}
  }

  //Store data in HDFS
  if (save_rdds) {
    grid0.saveAsObjectFile(grid0_path)
    grid0_index.saveAsObjectFile(grid0_index_path)
    inpB_grids_RDD.saveAsObjectFile(inpB_grid_path)

    val writer: SequenceFile.Writer = SequenceFile.createWriter(conf,
      Writer.file(metadata_path),
      Writer.keyClass(classOf[IntWritable]),
      Writer.valueClass(classOf[BytesWritable])
    )

    writer.append(new IntWritable(1), new BytesWritable(serialize(projected_extent)))
    writer.append(new IntWritable(2), new BytesWritable(serialize(num_cols_rows._1)))
    writer.append(new IntWritable(3), new BytesWritable(serialize(num_cols_rows._2)))
    writer.append(new IntWritable(4), new BytesWritable(serialize(cellT)))
    writer.hflush()
    writer.close()
  }
}
val inpB_grids_withIndex = inpB_grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }
//Filter out the range of years:
//inpB_grids = inpB_grids_withIndex.filterByRange(years_range._1, years_range._2).values

//Filter out the range of years:
val inpB_year_diff = first_year-timeseries._1
val inputB_year_diffB = sc.broadcast(inpB_year_diff)
inpB_grids = inpB_grids_withIndex.filterByRange(years_range._1, years_range._2).map{ case(i,a) => (i-(inputB_year_diffB.value),a)}//.values

t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


[Stage 14:====================================>                  (23 + 12) / 35]

Clean NaNs


In [9]:
grid0.cache()
val nans_index = grid0.filter(_._2 != -1000).map(_._2).zipWithIndex().filter(_._1.isNaN).map(_._2).collect()


nans_index = Array(242648, 326473, 847244, 1114195, 1118901, 1184534, 1189204, 1240663, 1245371, 1249929, 1254634, 1287489, 1292163, 1418144, 1422619, 1793723, 2518223, 2568624, 2573737, 2754204, 2760228, 3567256, 3706949, 3994794, 5068165, 5275589, 5420986, 5757421, 5757559, 5789255, 5826273, 5962422, 6225192, 6291976, 6461180, 6486665, 6664822, 7068242, 7481374, 7520963, 7702861, 8289913, 8321072, 8576748, 9721352, 9773380, 9900942, 9904479, 9971947, 9977332, 10070483, 10113710, 10115621)
Out[9]:
[242648, 326473, 847244, 1114195, 1118901, 1184534, 1189204, 1240663, 1245371, 1249929, 1254634, 1287489, 1292163, 1418144, 1422619, 1793723, 2518223, 2568624, 2573737, 2754204, 2760228, 3567256, 3706949, 3994794, 5068165, 5275589, 5420986, 5757421, 5757559, 5789255, 5826273, 5962422, 6225192, 6291976, 6461180, 6486665, 6664822, 7068242, 7481374, 7520963, 7702861, 8289913, 8321072, 8576748, 9721352, 9773380, 9900942, 9904479, 9971947, 9977332, 10070483, 10113710, 10115621]

In [10]:
val nans_indexB = sc.broadcast(nans_index)
val inpA_grids_noNaNs = inpA_grids.map{ case (i,v) => (i,v.zipWithIndex.filter{ case (v,i) => !nans_indexB.value.contains(i)}.map{ case (v,i) => v})}


nans_indexB = Broadcast(169)
inpA_grids_noNaNs = MapPartitionsRDD[362] at map at <console>:61
Out[10]:
MapPartitionsRDD[362] at map at <console>:61

In [11]:
inpA_grids_noNaNs.cache()
inpB_grids.cache()


Out[11]:
MapPartitionsRDD[356] at map at <console>:171

In [12]:
inpA_grids_noNaNs.map(_._2.size).take(10)


[Stage 26:===================================================>     (9 + 1) / 10]
Out[12]:
[10115578, 10115578, 10115578, 10115578, 10115578, 10115578, 10115578, 10115578, 10115578, 10115578]

In [13]:
inpB_grids.map(_._2.size).take(10)


[Stage 32:===================================================>     (9 + 1) / 10]
Out[13]:
[10115578, 10115578, 10115578, 10115578, 10115578, 10115578, 10115578, 10115578, 10115578, 10115578]

Matrix

We need to do a Matrix transpose to have clusters per cell and not per year. With a GeoTiff representing a single year, the loaded data looks liks this:

bands_RDD.map(s => Vectors.dense(s)).cache()

//The vectors are rows and therefore the matrix will look like this:
[
Vectors.dense(0.0, 1.0, 2.0),
Vectors.dense(3.0, 4.0, 5.0),
Vectors.dense(6.0, 7.0, 8.0),
Vectors.dense(9.0, 0.0, 1.0)
]

To achieve that we convert the RDD[Vector] into a distributed Matrix, a CoordinateMatrix, which as a transpose method.


In [14]:
t0 = System.nanoTime()
var grids_matrix: RDD[Vector] = sc.emptyRDD

if (matrix_offline_mode) {
  grids_matrix = sc.objectFile(matrix_path)
} else {
  val inp_grids :RDD[Array[Double]] = inpA_grids_noNaNs.flatMap{ case (i,m) => m}.zipWithIndex().map{ case (v,i) => (i,v)}.join(inpB_grids.flatMap{case (i,m) => m}.zipWithIndex().map{case (v,i) => (i,v)}).sortByKey(true).map{case (i, (a1,a2)) => Array(a1, a2)}
  val cells_sizeB = sc.broadcast(cells_size)
  grids_matrix = inp_grids.map(m => m.zipWithIndex).map(m => m.filter(!_._1.isNaN)).map(m => Vectors.sparse(cells_sizeB.value.toInt, m.map(v => v._2), m.map(v => v._1)))
  if (save_matrix)
    grids_matrix.saveAsObjectFile(matrix_path)
}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


[Stage 41:====================================================>(998 + 2) / 1000]Elapsed time: 246780750325ns
t0 = 1243394151950
grids_matrix = MapPartitionsRDD[381] at map at <console>:84
t1 = 1490174902275
Out[14]:
1490174902275

Kmeans

We use Kmeans from Sparl-MLlib. The user should only modify the variables on Kmeans setup.

Kmeans Training


In [15]:
t0 = System.nanoTime()
//Global variables
var kmeans_inpBs :Array[KMeansModel] = new Array[KMeansModel](num_kmeans)
var wssse_data :List[(Int, Int, Double)] = List.empty

if (kmeans_offline_mode) {
  numClusters_id = 0
  cfor(minClusters)(_ <= maxClusters, _ + stepClusters) { numClusters =>
    if (!fs.exists(new org.apache.hadoop.fs.Path(kmeans_inpB_paths(numClusters_id)))) {
      println("One of the files does not exist, we will abort!!!")
      System.exit(0)
    } else {
      kmeans_inpBs(numClusters_id) = KMeansModel.load(sc, kmeans_inpB_paths(numClusters_id))
    }
    numClusters_id += 1
  }
  val wssse_data_RDD :RDD[(Int, Int, Double)]  = sc.objectFile(wssse_path)
  wssse_data  = wssse_data_RDD.collect().toList
} else {
  numClusters_id = 0
  if (fs.exists(new org.apache.hadoop.fs.Path(wssse_path))) {
    val wssse_data_RDD :RDD[(Int, Int, Double)]  = sc.objectFile(wssse_path)
    wssse_data  = wssse_data_RDD.collect().toList
  }
  grids_matrix.cache()
  cfor(minClusters)(_ <= maxClusters, _ + stepClusters) { numClusters =>
    println(numClusters)
    kmeans_inpBs(numClusters_id) = {
      KMeans.train(grids_matrix, numClusters, numIterations)
    }

    // Evaluate clustering by computing Within Set Sum of Squared Errors
    val WSSSE = kmeans_inpBs(numClusters_id).computeCost(grids_matrix)
    println("Within Set Sum of Squared Errors = " + WSSSE)

    wssse_data = wssse_data :+ (numClusters, numIterations, WSSSE)

    //Save kmeans inpB
    if (save_kmeans_inpB) {
      if (!fs.exists(new org.apache.hadoop.fs.Path(kmeans_inpB_paths(numClusters_id)))) {
        kmeans_inpBs(numClusters_id).save(sc, kmeans_inpB_paths(numClusters_id))
      }
    }
    numClusters_id += 1

    if (fs.exists(new org.apache.hadoop.fs.Path(wssse_path))) {
      println("We will delete the wssse file")
      try { fs.delete(new org.apache.hadoop.fs.Path(wssse_path), true) } catch { case _ : Throwable => { } }
    }

    println("Lets create it with the new data")
    sc.parallelize(wssse_data, 1).saveAsObjectFile(wssse_path)
  }

  //Un-persist it to save memory
  grids_matrix.unpersist()

}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


70
[Stage 196:======================>                            (446 + 36) / 1000]
IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

Inspect WSSSE


In [16]:
t0 = System.nanoTime()
//current
println(wssse_data)

//from disk
if (fs.exists(new org.apache.hadoop.fs.Path(wssse_path))) {
    var wssse_data_tmp :RDD[(Int, Int, Double)] = sc.objectFile(wssse_path)//.collect()//.toList
    println(wssse_data_tmp.collect().toList)    
}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


List((70,75,1.1830068926156204E10), (70,75,4.389376460230786E9), (70,75,4.322761920589285E9))
List((70,75,1.1830068926156204E10), (70,75,4.389376460230786E9), (70,75,4.322761920589285E9))
Elapsed time: 339944692ns
t0 = 2850659070268
t1 = 2850999014960
Out[16]:
2850999014960

Run Kmeans clustering

Run Kmeans and obtain the clusters per each cell.


In [17]:
t0 = System.nanoTime()
//Cache it so kmeans is more efficient
grids_matrix.cache()

var kmeans_res: Array[RDD[Int]] = Array.fill(num_kmeans)(sc.emptyRDD)
var kmeans_centroids: Array[Array[Double]] = Array.fill(num_kmeans)(Array.emptyDoubleArray)
numClusters_id = 0
cfor(minClusters)(_ <= maxClusters, _ + stepClusters) { numClusters =>
  kmeans_res(numClusters_id) = kmeans_inpBs(numClusters_id).predict(grids_matrix)
  kmeans_centroids(numClusters_id) = kmeans_inpBs(numClusters_id).clusterCenters.map(m => m(0))
  numClusters_id += 1
}

//Un-persist it to save memory
grids_matrix.unpersist()
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


Elapsed time: 18214952ns
t0 = 2851762797749
kmeans_res = Array(MapPartitionsRDD[500] at map at KMeansModel.scala:69)
kmeans_centroids = Array(Array(77.36996501035624, 113.86224284536698, 29.14822017823281, 58.88144214075923, 127.31475184306379, 73.7902887998774, 27.694854015588, 97.88331630733785, 163.84294909234643, 85.70331359597664, 59.38546821324081, 98.46785873697408, 125.34158345451264, 43.105558751694595, 105.19824307370467, 88.27626946884818, 38.10438264866804, 109.20439379120842, 22.925695657038077, 89.93075861574022, 45.36165801968137, 134.64169329291997, 101.10596698942915, 42.17344495521638, 51.51197288988513, 82.44579819408993, 41.41722489308865, 11.593694439882615, 90.23425214679074, 119.74913990196242, 49.28364114388902, 106.415089...
Out[17]:
[[77.36996501035624, 113.86224284536698, 29.14822017823281, 58.88144214075923, 127.31475184306379, 73.7902887998774, 27.694854015588, 97.88331630733785, 163.84294909234643, 85.70331359597664, 59.38546821324081, 98.46785873697408, 125.34158345451264, 43.105558751694595, 105.19824307370467, 88.27626946884818, 38.10438264866804, 109.20439379120842, 22.925695657038077, 89.93075861574022, 45.36165801968137, 134.64169329291997, 101.10596698942915, 42.17344495521638, 51.51197288988513, 82.44579819408993, 41.41722489308865, 11.593694439882615, 90.23425214679074, 119.74913990196242, 49.28364114388902, 106.4150894880739, 69.08875899707522, 65.5322666629869, 20.801335604988445, 122.12251924820116, 150.8931386538521, 35.162737212193974, 33.495738565224066, 79.1452903122647, 99.64309078730716, 94.60963509606124, 58.78908935020606, 31.2721832796152, 34.205171815004405, 83.3818877772058, 25.354978926560534, 141.05605868678992, 107.66057720494781, 92.9862123871554, 55.003562301755466, 92.08404871090701, 72.15003598060555, 75.14313338953728, 113.64143410756314, 86.55925554062225, 67.20055291245723, 56.09986816995934, 104.23898598757873, 128.87311144063622, 77.3738358485732, 109.50738549551704, 140.76437343350037, 113.69723232640013, 17.287059343531155, 52.84242933574233, 44.55007036613624, 119.61939127968711, 73.57883232883323, 65.21707607788784]]

Sanity test

It can be skipped, it only shows the cluster ID for the first 50 cells


In [18]:
t0 = System.nanoTime()
val kmeans_res_out = kmeans_res(0).take(150)
kmeans_res_out.foreach(print)

println(kmeans_res_out.size)
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363636363150
Elapsed time: 681840761ns
t0 = 2852623579683
kmeans_res_out = Array(63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63, 63)
t1 = 2853305420444
Out[18]:
2853305420444

Build GeoTiff with Kmeans cluster_IDs

The Grid with the cluster IDs is stored in a SingleBand GeoTiff and uploaded to HDFS.

Assign cluster ID to each grid cell and save the grid as SingleBand GeoTiff

To assign the clusterID to each grid cell it is necessary to get the indices of gird cells they belong to. The process is not straight forward because the ArrayDouble used for the creation of each dense Vector does not contain the NaN values, therefore there is not a direct between the indices in the Tile's grid and the ones in kmeans_res (kmeans result).

To join the two RDDS the knowledge was obtaing from a stackoverflow post on how to perform basic joins of two rdd tables in spark using python.


In [19]:
inpA_grids_noNaNs.unpersist()
inpB_grids.unpersist()


Out[19]:
MapPartitionsRDD[356] at map at <console>:171

In [20]:
//CREATE GeoTiffs
t0 = System.nanoTime()
numClusters_id = 0
grid0_index.cache()
grid0.cache()
val grid0_index_I = grid0_index.zipWithIndex().map{ case (v,i) => (i,v)}
grid0_index_I.cache()
grid0_index.unpersist()
kmeans_res(0).cache()
var num_cells = kmeans_res(0).count().toInt
kmeans_res(0).unpersist()
var cells_per_year = num_cells / ((last_year-first_year)+1)
println(cells_per_year)
println(num_cells)
val cells_per_yearB = sc.broadcast(cells_per_year)

cfor(minClusters)(_ <= maxClusters, _ + stepClusters) { numClusters =>
  //Merge two RDDs, one containing the clusters_ID indices and the other one the indices of a Tile's grid cells
  var year :Int = 0
  val kmeans_res_zip = kmeans_res(numClusters_id).zipWithIndex()
  kmeans_res_zip.cache()
  val start = cells_per_year * year
  cfor(start) (_ < num_cells, _ + cells_per_year) { cellID =>
    println("Saving GeoTiff for numClustersID: " + numClusters_id + " year: " + years(year))
    val cellIDB = sc.broadcast(cellID)
    val kmeans_res_sing = kmeans_res(numClusters_id)
    val cluster_cell_pos = ((kmeans_res_zip.map{ case (v,i) => (i,v)}.filterByRange(cellIDB.value, (cellIDB.value+cells_per_yearB.value-1)).map{case (i,v) => (i-cellIDB.value, v)}.join(grid0_index_I)).map{ case (k,(v,i)) => (v,i)})

    //Associate a Cluster_IDs to respective Grid_cell
    val grid_clusters :RDD[ (Long, (Double, Option[Int]))] = grid0.leftOuterJoin(cluster_cell_pos.map{ case (c,i) => (i.toLong, c)})

    //Convert all None to NaN
    val grid_clusters_res = grid_clusters.sortByKey(true).map{case (k, (v, c)) => if (c == None) (k, Int.MaxValue) else (k, c.get)}

    //Define a Tile
    val cluster_cellsID :Array[Int] = grid_clusters_res.values.collect()
    var cluster_cells :Array[Double] = Array.fill(cluster_cellsID.length)(Double.NaN)
    cfor(0)(_ < cluster_cellsID.length, _ + 1) { cell =>
      if (cluster_cellsID(cell) != Int.MaxValue) {
        cluster_cells(cell) = kmeans_centroids(numClusters_id)(cluster_cellsID(cell))
      }
    }
    val cluster_cellsD = DoubleArrayTile(cluster_cells, num_cols_rows._1, num_cols_rows._2)
    val geoTif = new SinglebandGeoTiff(cluster_cellsD, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions(compression.DeflateCompression))

    //Save to /tmp/
    GeoTiffWriter.write(geoTif, geotiff_tmp_paths(numClusters_id) + "_" + years(year) + ".tif")

    //Upload to HDFS
    var cmd = "hadoop dfs -copyFromLocal -f " + geotiff_tmp_paths(numClusters_id) + "_" + years(year) + ".tif" + " " + geotiff_hdfs_paths(numClusters_id) + "_" + years(year) + ".tif"
    Process(cmd)!

    //Remove from /tmp/
    cmd = "rm -fr " + geotiff_tmp_paths(numClusters_id) + "_" + years(year) + ".tif"
    Process(cmd)!

    cellIDB.destroy()
    year += 1
  }
  kmeans_res_zip.unpersist()
  numClusters_id += 1
}
cells_per_yearB.destroy()
grid0_index_I.unpersist()
grid0.unpersist()
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


[Stage 429:===================================================>(998 + 2) / 1000]10115578
364160808
[Stage 435:====================================================>(998 + 1) / 999]Saving GeoTiff for numClustersID: 0 year: 2006
[Stage 456:=================================================> (977 + 23) / 1000]]DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Saving GeoTiff for numClustersID: 0 year: 2007
[Stage 477:===================================================>(998 + 2) / 1000]]===============>   (941 + 38) / 1000]DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Saving GeoTiff for numClustersID: 0 year: 2008
[Stage 498:===================================================>(998 + 2) / 1000]]DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Saving GeoTiff for numClustersID: 0 year: 2009
[Stage 519:===================================================>(997 + 3) / 1000]]DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Saving GeoTiff for numClustersID: 0 year: 2010
DEPRECATED: Use of this script to execute hdfs command is deprecated.           ]
Instead use the hdfs command for it.

Saving GeoTiff for numClustersID: 0 year: 2011
[Stage 561:===================================================>(999 + 1) / 1000]]DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Saving GeoTiff for numClustersID: 0 year: 2012
[Stage 582:===================================================>(993 + 7) / 1000]]
IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

Visualize results --------------- Plot WSSE