This notebook shows how to read from Spark a single- and multi-band GeoTiff. We use GeoTrellis to read the GeoTiff as a RDD, extracts from it a band, filters NaN and converts the result to a RDD of dense vectors. Such RDD is then passed to Kmeans cluster algorithm from Spark-MLlib for training. The kmeans model is then saved into HDFS.
Note: In this example the grid cells define the dimension of the matrix. Since only the year **1980** is loaded, the matrix only has one record. To understand how to load several GeoTiffs and tranpose a matrix to have years the dimension of the matrix the reader should check [kmeans_multiGeoTiffs_matrixTranspose](kmeans_multiGeoTiffs_matrixTranspose.ipynb) notebooks in the scala examples.
In [1]:
import geotrellis.raster.MultibandTile
import geotrellis.spark.io.hadoop._
import geotrellis.vector.ProjectedExtent
import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD
In [14]:
var band_NaN_RDD :RDD[Array[Double]] = sc.emptyRDD
val single_band = True;
var filepath :String = ""
if (single_band) {
//Single band GeoTiff
filepath = "hdfs:///user/hadoop/spring-index/LastFreeze/1980.tif"
} else {
//Multi band GeoTiff
filepath = "hdfs:///user/hadoop/spring-index/BloomFinal/1980.tif"
}
if (single_band) {
//Lets load a Singleband GeoTiff and return RDD just with the tile.
val bands_RDD = sc.hadoopGeoTiffRDD(filepath).values
//Conversion to ArrayDouble is necessary to thne generate a Dense Vector
band_NaN_RDD = bands_RDD.map( m => m.toArrayDouble())
} else {
//Lets load a Multiband GeoTiff and return RDD just with the tiles.
val bands_RDD = sc.hadoopMultibandGeoTiffRDD(filepath).values
//Extract the 4th band
band_NaN_RDD = bands_RDD.map( m => m.band(3).toArrayDouble())
}
In [ ]:
//Go to each vector and print the length of each vector
band_NaN_RDD.collect().foreach(m => println("vector length with NaN is %d".format(m.length)))
//Go to each vector and filter out all NaNs
val band_RDD = band_NaN_RDD.map(m => m.filter(v => !v.isNaN))
//Go to each vector and print the length of each vector to see how many NaN were removed
band_RDD.collect().foreach(m => println("vector length without NaN is %d".format(m.length)))
In [15]:
// Create a Vector with NaN converted to 0s
//val band_vec = band_NaN_RDD.map(s => Vectors.dense(s.map(v => if (v.isNaN) 0 else v))).cache()
// Create a Vector without NaN values
val band_vec = band_RDD.map(s => Vectors.dense(s)).cache()
In [16]:
val numClusters = 2
val numIterations = 20
val clusters = {
KMeans.train(band_vec,numClusters,numIterations)
}
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(band_vec)
println("Within Set Sum of Squared Errors = " + WSSSE)
In [18]:
//Un-persist the model
band_vec.unpersist()
// Shows the result.
println("Cluster Centers: ")
//clusters.clusterCenters.foreach(println)
//Clusters save the model
if (band_count == 1) {
clusters.save(sc, "hdfs:///user/pheno/spring_index/LastFreeze/1980_kmeans_model")
} else {
clusters.save(sc, "hdfs:///user/pheno/spring_index/BloomFinal/1980_kmeans_model")
}