SVD between a Model and Satellite data

This notebook shows how to multiply two matrices and calculate SVD. Each matrix is created out a set of GeoTiffs for a series of years. Both matrices should have the same dimension.

For demonstration we will use from a model (spring-index) and from a satellite (AVHRR).

Dependencies


In [82]:
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

import geotrellis.proj4.CRS
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.io.geotiff.{SinglebandGeoTiff, _}
import geotrellis.raster.{CellType, DoubleArrayTile, MultibandTile, Tile, UByteCellType}
import geotrellis.spark.io.hadoop._
import geotrellis.vector.{Extent, ProjectedExtent}
import org.apache.hadoop.io.SequenceFile.Writer
import org.apache.hadoop.io.{SequenceFile, _}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import spire.syntax.cfor.cfor

import scala.sys.process.Process

Mode of operation

Here the user can define the mode of operation.

  • rdd_offline_mode: If false it means the notebook will create all data from scratch and store protected_extent and num_cols_rows into HDFS. Otherwise, these data structures are read from HDFS.

It is also possible to define which directory of GeoTiffs is to be used and on which band to run Kmeans. The options are

  • all which are a multi-band (8 bands) GeoTiffs
  • Or choose single band ones:
    1. Onset_Greenness_Increase
    2. Onset_Greenness_Maximum
    3. Onset_Greenness_Decrease
    4. Onset_Greenness_Minimum
    5. NBAR_EVI_Onset_Greenness_Minimum
    6. NBAR_EVI_Onset_Greenness_Maximum
    7. NBAR_EVI_Area
    8. Dynamics_QC

Note that when using a range **kemans offline mode** is not possible and it will be reset to **online mode**.

Mode of Operation setup


In [83]:
var model_rdd_offline_mode = true
var model_matrix_offline_mode = true
var satellite_rdd_offline_mode = true
var satellite_matrix_offline_mode = true

//Using spring-index model
//var model_path = "hdfs:///user/hadoop/avhrr/"
//var model_dir = "SOST"
var model_path = "hdfs:///user/hadoop/spring-index/"
var model_dir = "BloomGridmet"
var model_band_num = 3  //First band is 0

//Using AVHRR Satellite data
var satellite_path = "hdfs:///user/hadoop/avhrr/"
var satellite_dir = "SOST4Km"
//var satellite_path = "hdfs:///user/hadoop/spring-index/"
//var satellite_dir = "LeafGridmet"
var sat_band_num = 0  //First band is 0

var n_components :Long = 10
var out_path = "hdfs:///user/emma/svd/spark/" + model_dir + satellite_dir + "" + n_components + "/"

//Years between (inclusive) 1989 - 2014
val satellite_timeseries = (1989, 2014)
var satellite_first_year = 1989
var satellite_last_year = 2014

//Years between (inclusive) 1980 - 2015
val model_timeseries = (1980, 2016)
var model_first_year = 1989
var model_last_year = 2014

//Mask
val modToBeMasked = true
val satToBeMasked = true
val mod_mask_path = "hdfs:///user/hadoop/usa_mask_gridmet.tif"
//val mod_mask_path = "hdfs:///user/hadoop/usa_state_masks/california_4km.tif"
val sat_mask_path = "hdfs:///user/hadoop/usa_mask_gridmet.tif"
//val sat_mask_path = "hdfs:///user/hadoop/usa_state_masks/california_4km.tif"

//Matrix Mode: 0 Normal, 1 SC, 2 SR
val matrix_mode = 0

val save_rdds = false
val save_matrix = false
val save_svd_matrices = false


model_rdd_offline_mode = true
model_matrix_offline_mode = true
satellite_rdd_offline_mode = true
satellite_matrix_offline_mode = true
model_path = hdfs:///user/hadoop/spring-index/
model_dir = BloomGridmet
model_band_num = 3
satellite_path = hdfs:///user/hadoop/avhrr/
satellite_dir = SOST4Km
sat_band_num = 0
n_components = 10
out_path = hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/
satellite_timeseries = (1989,2014)
satellite_first_year = 1989
satellite_last_year = 2014
model_timeseries = (1980,2016)
model_first_year = 1989
model_last_year = 2014
modToBeMasked = true
satToBeMasked = true
mod_mask_path = hdfs:///user/hadoop...
Out[83]:
hdfs:///user/hadoop/usa_mask_gridmet.tif

DON'T MODIFY ANY PIECE OF CODE FROM HERE ON!!!.

Mode of operation validation


In [84]:
//Check offline modes
var conf = sc.hadoopConfiguration
var fs = org.apache.hadoop.fs.FileSystem.get(conf)

//Paths to store data structures for Offline runs
var mod_mask_str = ""
var sat_mask_str = ""
if (modToBeMasked)
  mod_mask_str = "_mask"
if (satToBeMasked)
  sat_mask_str = "_mask"

val matrix_mode_str = matrix_mode match {
  case 1 => "_Sc_Mc"
  case 2 => "_Sr_Mr"
  case _ => ""
}

var model_grid0_path = out_path + model_dir + "_grid0"
var model_grid0_index_path = out_path + model_dir + "_grid0_index"
var satellite_grid0_path = out_path + satellite_dir + "_grid0"
var satellite_grid0_index_path = out_path + satellite_dir + "_grid0_index"

var model_grid_path = out_path + model_dir + "_grid"
var satellite_grid_path = out_path + satellite_dir + "_grid"
var model_matrix_path = out_path + model_dir + "_matrix"
var satellite_matrix_path = out_path + satellite_dir + "_matrix"
var metadata_path = out_path + model_dir + "_metadata"


conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, file:/usr/lib/spark-2.1.1-bin-without-hadoop/conf/hive-site.xml
fs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-439298823_41, ugi=pheno (auth:SIMPLE)]]
mod_mask_str = _mask
sat_mask_str = _mask
model_grid0_path = hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/BloomGridmet_grid0
model_grid0_index_path = hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/BloomGridmet_grid0_index
satellite_grid0_path = hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/SOST4Km_grid0
matrix_mode_str: String = ""
satellite_grid0_index_pa...
Out[84]:
hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/SOST4Km_grid0

In [85]:
var sc_path = out_path + model_dir + "_sc"
var mc_path = out_path + model_dir + "_mc"

val model_rdd_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(model_grid_path))
val model_matrix_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(model_matrix_path))
val satellite_rdd_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(satellite_grid_path))
val satellite_matrix_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(satellite_matrix_path))

if (model_rdd_offline_mode != model_rdd_offline_exists) {
  println("\"Load GeoTiffs\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + model_rdd_offline_exists.toString())
  model_rdd_offline_mode = model_rdd_offline_exists
}

if (model_matrix_offline_mode != model_matrix_offline_exists) {
  println("\"Matrix\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + model_matrix_offline_exists.toString())
  model_matrix_offline_mode = model_matrix_offline_exists
}

var model_skip_rdd = false
if (model_matrix_offline_exists) {
    println("Since we have a matrix, the load of the grids RDD will be skipped!!!")
    model_skip_rdd = true
}

if (satellite_rdd_offline_mode != satellite_rdd_offline_exists) {
  println("\"Load GeoTiffs\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + satellite_rdd_offline_exists.toString())
  satellite_rdd_offline_mode = satellite_rdd_offline_exists
}

if (satellite_matrix_offline_mode != satellite_matrix_offline_exists) {
  println("\"Matrix\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + satellite_matrix_offline_exists.toString())
  satellite_matrix_offline_mode = satellite_matrix_offline_exists
}

var satellite_skip_rdd = false
if (satellite_matrix_offline_exists) {
    println("Since we have a matrix, the load of the grids RDD will be skipped!!!")
    satellite_skip_rdd = true
}

//Years
val model_years = model_timeseries._1 to model_timeseries._2
val satellite_years = satellite_timeseries._1 to satellite_timeseries._2

if (!satellite_years.contains(satellite_first_year) || !(satellite_years.contains(satellite_last_year))) {
  println("Invalid range of years for " + satellite_dir + ". I should be between " + satellite_first_year + " and " + satellite_last_year)
  System.exit(0)
}

if (!model_years.contains(model_first_year) || !(model_years.contains(model_last_year))) {
  println("Invalid range of years for " + model_dir + ". I should be between " + model_first_year + " and " + model_last_year)
  System.exit(0)
}

if ( ((satellite_last_year - model_first_year) > (model_last_year - model_first_year)) || ((satellite_last_year - model_first_year) > (model_last_year - model_first_year))) {
  println("The range of years for each data set should be of the same length.");
  System.exit(0)
}

var model_years_range = (model_years.indexOf(model_first_year), model_years.indexOf(model_last_year))
var satellite_years_range = (satellite_years.indexOf(satellite_first_year), satellite_years.indexOf(satellite_last_year))

//Global variables
var projected_extent = new ProjectedExtent(new Extent(0,0,0,0), CRS.fromName("EPSG:3857"))
var model_grid0: RDD[(Long, Double)] = sc.emptyRDD
var model_grid0_index: RDD[Long] = sc.emptyRDD
var mod_grids_RDD: RDD[(Int, Array[Double])] = sc.emptyRDD
var model_grids_RDD: RDD[(Int, Array[Double])] = sc.emptyRDD
var model_grids: RDD[(Int,Array[Double])] = sc.emptyRDD
val rows = sc.parallelize(Array[Double]()).map(m => Vectors.dense(m))
var model_summary: MultivariateStatisticalSummary = new RowMatrix(rows).computeColumnSummaryStatistics()
var model_std :Array[Double] = new Array[Double](0)

var satellite_grid0: RDD[(Long, Double)] = sc.emptyRDD
var satellite_grid0_index: RDD[Long] = sc.emptyRDD
var sat_grids_RDD: RDD[(Int, Array[Double])] = sc.emptyRDD
var satellite_grids_RDD: RDD[(Int, Array[Double])] = sc.emptyRDD
var satellite_grids: RDD[(Int,Array[Double])] = sc.emptyRDD
var satellite_summary: MultivariateStatisticalSummary = new RowMatrix(rows).computeColumnSummaryStatistics()
var satellite_std :Array[Double] = new Array[Double](0)

var num_cols_rows :(Int, Int) = (0, 0)
var cellT :CellType = UByteCellType
var mod_mask_tile0 :Tile = new SinglebandGeoTiff(geotrellis.raster.ArrayTile.empty(cellT, num_cols_rows._1, num_cols_rows._2), projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions.DEFAULT).tile
var sat_mask_tile0 :Tile = new SinglebandGeoTiff(geotrellis.raster.ArrayTile.empty(cellT, num_cols_rows._1, num_cols_rows._2), projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions.DEFAULT).tile
var satellite_cells_size :Long = 0
var model_cells_size :Long = 0
var t0 : Long = 0
var t1 : Long = 0
print(t0)


"Load GeoTiffs" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
"Matrix" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
"Load GeoTiffs" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
"Matrix" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
0                                                                               
sc_path = hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/BloomGridmet_sc
mc_path = hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/BloomGridmet_mc
model_rdd_offline_exists = false
model_matrix_offline_exists = false
satellite_rdd_offline_exists = false
satellite_matrix_offline_exists = false
model_skip_rdd = false
satellite_skip_rdd = false
model_years = Range(1980, 1981, 1982, 1983, 1984, 1985, 1986, 1987, 1988, 1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016)
satellite_years = Range(1989, 1990, 1991, 1992, 1...
Out[85]:
Range(1989, 1990, 1991, 1992, 1993, 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014)

Functions to (de)serialize any structure into Array[Byte]


In [86]:
def serialize(value: Any): Array[Byte] = {
    val out_stream: ByteArrayOutputStream = new ByteArrayOutputStream()
    val obj_out_stream = new ObjectOutputStream(out_stream)
    obj_out_stream.writeObject(value)
    obj_out_stream.close
    out_stream.toByteArray
}

def deserialize(bytes: Array[Byte]): Any = {
    val obj_in_stream = new ObjectInputStream(new ByteArrayInputStream(bytes))
    val value = obj_in_stream.readObject
    obj_in_stream.close
    value
}


serialize: (value: Any)Array[Byte]
deserialize: (bytes: Array[Byte])Any

Load GeoTiffs

Using GeoTrellis all GeoTiffs of a directory will be loaded into a RDD. Using the RDD, we extract a grid from the first file to lated store the Kmeans cluster_IDS, we build an Index for populate such grid and we filter out here all NaN values.


In [87]:
t0 = System.nanoTime()

//Load Mask
if (modToBeMasked) {
  val mask_tiles_RDD = sc.hadoopGeoTiffRDD(mod_mask_path).values
  val mask_tiles_withIndex = mask_tiles_RDD.zipWithIndex().map { case (e, v) => (v, e) }
  mod_mask_tile0 = (mask_tiles_withIndex.filter(m => m._1 == 0).filter(m => !m._1.isNaN).values.collect()) (0)
}

if (satToBeMasked) {
  val mask_tiles_RDD = sc.hadoopGeoTiffRDD(sat_mask_path).values
  val mask_tiles_withIndex = mask_tiles_RDD.zipWithIndex().map { case (e, v) => (v, e) }
  sat_mask_tile0 = (mask_tiles_withIndex.filter(m => m._1 == 0).filter(m => !m._1.isNaN).values.collect()) (0)
}

//Local variables
val pattern: String = "*.tif"
val satellite_filepath: String = satellite_path + satellite_dir
val model_filepath: String = model_path + "/" + model_dir

t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


Elapsed time: 972591304ns
t0 = 258689933695244
pattern = *.tif
satellite_filepath = hdfs:///user/hadoop/avhrr/SOST4Km
model_filepath = hdfs:///user/hadoop/spring-index//BloomGridmet
t1 = 258690906286548
Out[87]:
258690906286548

In [88]:
def hadoopGeoTiffRDD(satellite_filepath :String, pattern :String): RDD[(Int, (ProjectedExtent, Tile))] = {
  val listFiles = sc.binaryFiles(satellite_filepath + "/" + pattern).sortBy(_._1).keys.collect()
  var prevRDD :RDD[(Int, (ProjectedExtent, Tile))] = sc.emptyRDD

  cfor(0)(_ < listFiles.length, _ + 1) { k =>
    val filePath :String = listFiles(k)
    val kB = sc.broadcast(k)
    val currRDD = sc.hadoopGeoTiffRDD(filePath).map(m => (kB.value, m))
    if (prevRDD.isEmpty()){
      prevRDD = currRDD
    } else {
      prevRDD = currRDD.union(prevRDD)
    }
    //kB.destroy()
  }
  prevRDD.sortBy(_._1)
}


hadoopGeoTiffRDD: (satellite_filepath: String, pattern: String)org.apache.spark.rdd.RDD[(Int, (geotrellis.vector.ProjectedExtent, geotrellis.raster.Tile))]

In [89]:
def hadoopMultibandGeoTiffRDD(satellite_filepath :String, pattern :String): RDD[(Int, (ProjectedExtent, MultibandTile))] = {
  val listFiles = sc.binaryFiles(satellite_filepath + "/" + pattern).sortBy(_._1).keys.collect()
  var prevRDD :RDD[(Int,(ProjectedExtent, MultibandTile))] = sc.emptyRDD

  cfor(0)(_ < listFiles.length, _ + 1) { k =>
    val filePath :String = listFiles(k)
    val kB = sc.broadcast(k)
    val currRDD = sc.hadoopMultibandGeoTiffRDD(filePath).map(m => (kB.value,m))
    if (prevRDD.isEmpty()){
      prevRDD = currRDD
    } else {
      prevRDD = currRDD.union(prevRDD)
    }
    //kB.destroy()
  }
  prevRDD.sortBy(_._1)
}


hadoopMultibandGeoTiffRDD: (satellite_filepath: String, pattern: String)org.apache.spark.rdd.RDD[(Int, (geotrellis.vector.ProjectedExtent, geotrellis.raster.MultibandTile))]

Satellite data


In [90]:
t0 = System.nanoTime()
if (satellite_rdd_offline_mode) {
  satellite_grids_RDD = sc.objectFile(satellite_grid_path)
  satellite_grid0 = sc.objectFile(satellite_grid0_path)
  satellite_grid0_index = sc.objectFile(satellite_grid0_index_path)
} else {
  //Lets load MODIS Singleband GeoTiffs and return RDD just with the tiles.
  if (sat_band_num == 0) {
    val satellite_geos_RDD = hadoopGeoTiffRDD(satellite_filepath, pattern)
    satellite_geos_RDD.cache()
    val satellite_tiles_RDD = satellite_geos_RDD.map{ case (i, (p,t)) => (i,t)}

    if (satToBeMasked) {
      val mask_tile_broad: Broadcast[Tile] = sc.broadcast(sat_mask_tile0)
      sat_grids_RDD = satellite_tiles_RDD.map{ case (i,t) => (i, t.localInverseMask(mask_tile_broad.value, 1, -2000).toArrayDouble().map(t => if (t == -1000) Double.NaN else t))}
    } else {
      sat_grids_RDD = satellite_tiles_RDD.map{ case (i,t) => (i,t.toArrayDouble())}
    }
  } else {
    val satellite_geos_RDD = hadoopMultibandGeoTiffRDD(satellite_filepath, pattern)
    satellite_geos_RDD.cache()
    val satellite_tiles_RDD = satellite_geos_RDD.map{ case (i, (p,t)) => (i,t)}

    val band_numB: Broadcast[Int] = sc.broadcast(sat_band_num)
    if (satToBeMasked) {
      val mask_tile_broad: Broadcast[Tile] = sc.broadcast(sat_mask_tile0)
      sat_grids_RDD = satellite_tiles_RDD.map{ case (i,t) => (i, t.band(band_numB.value).localInverseMask(mask_tile_broad.value, 1, -2000).toArrayDouble().map(t => if (t == -1000) Double.NaN else t))}
    } else {
      sat_grids_RDD = satellite_tiles_RDD.map{ case (i,t) => (i, t.band(band_numB.value).toArrayDouble())}
    }
  }

  //Get Index for each Cell
  val grids_withIndex = sat_grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }
  if (satToBeMasked) {
    satellite_grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.filter(m => m._1 != -2000.0).map { case (v, i) => (i) }
  } else {
    satellite_grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.map { case (v, i) => (i) }
  }

  //Get the Tile's grid
  satellite_grid0 = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.map { case (v, i) => (i, v) }

  //Lets filter out NaN
  if (satToBeMasked) {
    satellite_grids_RDD = sat_grids_RDD.map{ case (i,m) => (i,m.filter(m => m != -2000.0))}
  } else {
    satellite_grids_RDD = sat_grids_RDD
  }

  //Store in HDFS
  if (save_rdds) {
    satellite_grids_RDD.saveAsObjectFile(satellite_grid_path)
    satellite_grid0.saveAsObjectFile(satellite_grid0_path)
    satellite_grid0_index.saveAsObjectFile(satellite_grid0_index_path)
  }
}
val satellite_grids_withIndex = satellite_grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }

//Filter out the range of years:
val sat_year_diff = satellite_first_year-satellite_timeseries._1
val sat_year_diffB = sc.broadcast(sat_year_diff)
satellite_grids = satellite_grids_withIndex.filterByRange(satellite_years_range._1, satellite_years_range._2).map{ case(i,a) => (i-(sat_year_diffB.value),a)}
satellite_grids.persist(StorageLevel.DISK_ONLY)

//Collect Stats:
satellite_summary = Statistics.colStats(satellite_grids.map{ case (i,m) => Vectors.dense(m)})
//satellite_std = satellite_summary.variance.toArray.map(m => scala.math.sqrt(m))

satellite_cells_size = satellite_grid0_index.count().toInt
println("Number of cells is: " + satellite_cells_size)
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


[Stage 1360:============================>                           (2 + 2) / 4]Number of cells is: 483850
Elapsed time: 16137347729ns
t0 = 258695986822562
satellite_grids_withIndex = MapPartitionsRDD[2358] at map at <console>:236
sat_year_diff = 0
sat_year_diffB = Broadcast(1415)
satellite_grids = MapPartitionsRDD[2360] at map at <console>:253
satellite_summary = org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@7cb8aa66
satellite_cells_size = 483850
t1 = 258712124170291
Out[90]:
258712124170291

Model data


In [91]:
t0 = System.nanoTime()

if (model_rdd_offline_mode) {
  model_grids_RDD = sc.objectFile(model_grid_path)
  model_grid0 = sc.objectFile(model_grid0_path)
  model_grid0_index = sc.objectFile(model_grid0_index_path)
  val metadata = sc.sequenceFile(metadata_path, classOf[IntWritable], classOf[BytesWritable]).map(_._2.copyBytes()).collect()
  projected_extent = deserialize(metadata(0)).asInstanceOf[ProjectedExtent]
  num_cols_rows = (deserialize(metadata(1)).asInstanceOf[Int], deserialize(metadata(2)).asInstanceOf[Int])
  cellT = deserialize(metadata(3)).asInstanceOf[CellType]
} else {
  if (model_band_num == 0) {
    val model_geos_RDD = hadoopGeoTiffRDD(model_filepath, pattern)
    model_geos_RDD.cache()
    val model_tiles_RDD = model_geos_RDD.map{case (i,(p,t)) => (i,t)}//.values

    //Retrieve the number of cols and rows of the Tile's grid
    val tiles_withIndex = model_tiles_RDD//.zipWithIndex().map { case (v, i) => (i, v) }
    val tile0 = (tiles_withIndex.filter(m => m._1 == 0).values.collect()) (0)

    num_cols_rows = (tile0.cols, tile0.rows)
    cellT = tile0.cellType

    //Retrieve the ProjectExtent which contains metadata such as CRS and bounding box
    val projected_extents_withIndex = model_geos_RDD.map{ case (i,(p,t)) => (i,p)}//.keys.zipWithIndex().map { case (e, v) => (v, e) }
    projected_extent = (projected_extents_withIndex.filter(m => m._1 == 0).values.collect()) (0)

    if (modToBeMasked) {
      val mask_tile_broad: Broadcast[Tile] = sc.broadcast(mod_mask_tile0)
      mod_grids_RDD = model_tiles_RDD.map{case (i,m) => (i, m.localInverseMask(mask_tile_broad.value, 1, -2000).toArrayDouble().map(m => if (m == -1000) Double.NaN else m))}
    } else {
      mod_grids_RDD = model_tiles_RDD.map{case (i,m) => (i,m.toArrayDouble())}
    }
  } else {
    val model_geos_RDD = hadoopMultibandGeoTiffRDD(model_filepath, pattern)
    model_geos_RDD.cache()
    val model_tiles_RDD = model_geos_RDD.map{case (i,(p,t)) => (i,t)}//.values

    //Retrieve the number of cols and rows of the Tile's grid
    val tiles_withIndex = model_tiles_RDD//.zipWithIndex().map { case (v, i) => (i, v) }
    val tile0 = (tiles_withIndex.filter(m => m._1 == 0).values.collect()) (0)

    num_cols_rows = (tile0.cols, tile0.rows)
    cellT = tile0.cellType

    //Retrieve the ProjectExtent which contains metadata such as CRS and bounding box
    val projected_extents_withIndex = model_geos_RDD.map{ case (i,(p,t)) => (i,p)}//.keys.zipWithIndex().map { case (e, v) => (v, e) }
    projected_extent = (projected_extents_withIndex.filter(m => m._1 == 0).values.collect()) (0)

    val band_numB: Broadcast[Int] = sc.broadcast(model_band_num)
    if (modToBeMasked) {
      val mask_tile_broad: Broadcast[Tile] = sc.broadcast(mod_mask_tile0)
      mod_grids_RDD = model_tiles_RDD.map{case (i,m) => (i,m.band(band_numB.value).localInverseMask(mask_tile_broad.value, 1, -2000).toArrayDouble().map(m => if (m == -1000) Double.NaN else m))}
    } else {
      mod_grids_RDD = model_tiles_RDD.map{case (i,m) => (i,m.band(band_numB.value).toArrayDouble())}
    }
  }

  //Get Index for each Cell
  val grids_withIndex = mod_grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }
  if (modToBeMasked) {
    model_grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.filter(m => m._1 != -2000.0).map { case (v, i) => (i) }
  } else {
    model_grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.map { case (v, i) => (i) }
  }

  //Get the Tile's grid
  model_grid0 = grids_withIndex.filter(m => m._1 == 0).values.flatMap( m => m).zipWithIndex.map{case (v,i) => (i,v)}

  //Lets filter out NaN
  if (modToBeMasked) {
    model_grids_RDD = mod_grids_RDD.map{case (i,m) => (i, m.filter(m => m != -2000.0))}
  } else {
    model_grids_RDD = mod_grids_RDD
  }
  //Store data in HDFS
  if (save_rdds) {
      model_grids_RDD.saveAsObjectFile(model_grid_path)
      model_grid0.saveAsObjectFile(model_grid0_path)
      model_grid0_index.saveAsObjectFile(model_grid0_index_path)

      val writer: SequenceFile.Writer = SequenceFile.createWriter(conf,
        Writer.file(metadata_path),
        Writer.keyClass(classOf[IntWritable]),
        Writer.valueClass(classOf[BytesWritable])
      )

      writer.append(new IntWritable(1), new BytesWritable(serialize(projected_extent)))
      writer.append(new IntWritable(2), new BytesWritable(serialize(num_cols_rows._1)))
      writer.append(new IntWritable(3), new BytesWritable(serialize(num_cols_rows._2)))
      writer.append(new IntWritable(4), new BytesWritable(serialize(cellT)))
      writer.hflush()
      writer.close()
  }
}

val model_grids_withIndex = model_grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }

//Filter out the range of years:
val mod_year_diff = model_first_year-model_timeseries._1
val mod_year_diffB = sc.broadcast(mod_year_diff)
model_grids = model_grids_withIndex.filterByRange(model_years_range._1, model_years_range._2).map{ case(i,a) => (i-(mod_year_diffB.value),a)}//.values
model_grids.persist(StorageLevel.DISK_ONLY)

//Collect Stats:
model_summary = Statistics.colStats(model_grids.map{ case (i,m) => Vectors.dense(m)})
//model_std = model_summary.variance.toArray.map(m => scala.math.sqrt(m))

var model_tile0_index: RDD[Double] = model_grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m)
model_cells_size = model_tile0_index.count().toInt

t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


Elapsed time: 468185405592ns                                                    
t0 = 258718538708937
model_grids_withIndex = MapPartitionsRDD[2544] at map at <console>:270
mod_year_diff = 9
mod_year_diffB = Broadcast(1541)
model_grids = MapPartitionsRDD[2546] at map at <console>:300
model_summary = org.apache.spark.mllib.stat.MultivariateOnlineSummarizer@62c4ca6b
model_tile0_index = MapPartitionsRDD[2554] at flatMap at <console>:307
model_cells_size = 483850
t1 = 259186724114529
Out[91]:
259186724114529

Matrices

Satellite


In [92]:
//Satellite
val satellite_cells_sizeB = sc.broadcast(satellite_cells_size)
val satellite_indRowMat :IndexedRowMatrix = new IndexedRowMatrix(satellite_grids.map{ case (i, m) => (i,m.zipWithIndex)}.map{ case (i,m) => (i,m.filter(!_._1.isNaN))}.map{ case (i,m) =>  new IndexedRow(i.toLong, Vectors.sparse(satellite_cells_sizeB.value.toInt, m.map(v => v._2), m.map(v => v._1)))})


satellite_cells_sizeB = Broadcast(1545)
satellite_indRowMat = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@57ceacdf
Out[92]:
org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@57ceacdf

SC


In [93]:
//SC
val sc_exists = fs.exists(new org.apache.hadoop.fs.Path(sc_path))
var Sc :BlockMatrix = null
if (matrix_mode == 1) {
    if (sc_exists) {
      val rdd_indexed_rows :RDD[IndexedRow]= sc.objectFile(sc_path)
      Sc = new IndexedRowMatrix(rdd_indexed_rows).toBlockMatrix()
    } else {
      val satellite_M_1_Gc = sc.parallelize(Array[Vector](satellite_summary.mean)).map(m => Vectors.dense(m.toArray))
      val satellite_M_1_Gc_RowM: RowMatrix = new RowMatrix(satellite_M_1_Gc)
      val sat_M_1_Gc_byColumnAndRow = satellite_M_1_Gc_RowM.rows.zipWithIndex.map {
        case (row, rowIndex) => row.toArray.zipWithIndex.map {
          case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
        }
      }.flatMap(x => x)
      val satellite_M_1_Gc_blockMatrix = new CoordinateMatrix(sat_M_1_Gc_byColumnAndRow).toBlockMatrix()

      val sat_matrix_Nt_1 = new Array[Double](satellite_grids.count().toInt)
      //satellite_grids.unpersist(false)
      for (i <- 0 until sat_matrix_Nt_1.length)
        sat_matrix_Nt_1(i) = 1
      val satellite_M_Nt_1 = sc.parallelize(sat_matrix_Nt_1).map(m => Vectors.dense(m))
      val satellite_M_Nt_1_RowM: RowMatrix = new RowMatrix(satellite_M_Nt_1)
      val sat_M_Nt_1_byColumnAndRow = satellite_M_Nt_1_RowM.rows.zipWithIndex.map {
        case (row, rowIndex) => row.toArray.zipWithIndex.map {
          case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
        }
      }.flatMap(x => x)
      val satellite_M_Nt_1_blockMatrix = new CoordinateMatrix(sat_M_Nt_1_byColumnAndRow).toBlockMatrix()
      val satellite_M_Nt_Gc_blockMatrix = satellite_M_Nt_1_blockMatrix.multiply(satellite_M_1_Gc_blockMatrix)

      //Sc = satellite_blockMatrix.subtract(satellite_M_Nt_Gc_blockMatrix)
      val joined_mat :RDD[ (Long, (Array[Double], Array[Double]))] = satellite_indRowMat.rows.map(m => (m.index, m.vector.toArray)).join(satellite_M_Nt_Gc_blockMatrix.toCoordinateMatrix().toIndexedRowMatrix().rows.map(m => (m.index, m.vector.toArray)))
      Sc = (new CoordinateMatrix(joined_mat.map {case (row_index, (a,b)) => a.zip(b).map(m => m._1-m._2).zipWithIndex.map{ case (v,col_index) => new MatrixEntry(row_index, col_index,v)}}.flatMap(m => m))).toBlockMatrix() 

      //save to disk
      Sc.toIndexedRowMatrix().rows.saveAsObjectFile(sc_path)
    }
}


sc_exists = false
Sc: org.apache.spark.mllib.linalg.distributed.BlockMatrix = null
Out[93]:
false

Model Data


In [94]:
//Model
val model_cells_sizeB = sc.broadcast(model_cells_size)
val model_indRowMat :IndexedRowMatrix = new IndexedRowMatrix(model_grids.map{ case (i, m) => (i,m.zipWithIndex)}.map{ case (i,m) => (i,m.filter(!_._1.isNaN))}.map{ case (i,m) =>  new IndexedRow(i.toLong, Vectors.sparse(model_cells_sizeB.value.toInt, m.map(v => v._2), m.map(v => v._1)))}).toCoordinateMatrix().transpose().toIndexedRowMatrix()


[Stage 1423:================================================>     (18 + 2) / 20]
model_cells_sizeB = Broadcast(1546)
model_indRowMat = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@74e50c8e
Out[94]:
org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@74e50c8e

Mc


In [95]:
//MC
val mc_exists = fs.exists(new org.apache.hadoop.fs.Path(mc_path))
var Mc :BlockMatrix = null
if (matrix_mode == 1) {
    if (mc_exists) {
      val rdd_indexed_rows :RDD[IndexedRow]= sc.objectFile(mc_path)
      Mc = new IndexedRowMatrix(rdd_indexed_rows).toBlockMatrix()
    } else {
      val model_M_1_Gc = sc.parallelize(Array[Vector](model_summary.mean)).map(m => Vectors.dense(m.toArray))
      val model_M_1_Gc_RowM: RowMatrix = new RowMatrix(model_M_1_Gc)
      val mod_M_1_Gc_byColumnAndRow = model_M_1_Gc_RowM.rows.zipWithIndex.map {
        case (row, rowIndex) => row.toArray.zipWithIndex.map {
          case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
        }
      }.flatMap(x => x)
      val model_M_1_Gc_blockMatrix = new CoordinateMatrix(mod_M_1_Gc_byColumnAndRow).toBlockMatrix()

      val model_matrix_Nt_1 = new Array[Double](model_grids.count().toInt)
      //model_grids.unpersist(false)

      for (i <- 0 until model_matrix_Nt_1.length)
        model_matrix_Nt_1(i) = 1
      val model_M_Nt_1 = sc.parallelize(model_matrix_Nt_1).map(m => Vectors.dense(m))
      val model_M_Nt_1_RowM: RowMatrix = new RowMatrix(model_M_Nt_1)
      val mod_M_Nt_1_byColumnAndRow = model_M_Nt_1_RowM.rows.zipWithIndex.map {
        case (row, rowIndex) => row.toArray.zipWithIndex.map {
          case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
        }
      }.flatMap(x => x)
      val model_M_Nt_1_blockMatrix = new CoordinateMatrix(mod_M_Nt_1_byColumnAndRow).toBlockMatrix()
      val model_M_Nt_Gc_blockMatrix = model_M_Nt_1_blockMatrix.multiply(model_M_1_Gc_blockMatrix)
      val model_M_Gc_Nt_blockMatrix = model_M_Nt_Gc_blockMatrix.transpose

      //Mc = model_blockMatrix.subtract(model_M_Gc_Nt_blockMatrix)
      val joined_mat :RDD[ (Long, (Array[Double], Array[Double]))] = model_indRowMat.rows.map( m => (m.index, m.vector.toArray)).join(model_M_Gc_Nt_blockMatrix.toIndexedRowMatrix().rows.map(m => (m.index, m.vector.toArray)))
      Mc = (new CoordinateMatrix(joined_mat.map {case (row_index, (a,b)) => a.zip(b).map(m => m._1-m._2).zipWithIndex.map{ case (v,col_index) => new MatrixEntry(row_index, col_index,v)}}.flatMap(m => m))).toBlockMatrix()

      //save to disk
      Mc.toIndexedRowMatrix().rows.saveAsObjectFile(mc_path)
    }
}


mc_exists = false
Mc: org.apache.spark.mllib.linalg.distributed.BlockMatrix = null
Out[95]:
false

Sr


In [96]:
//SR
var Sr :BlockMatrix = null


Sr: org.apache.spark.mllib.linalg.distributed.BlockMatrix = null

Mr


In [97]:
//MR
var Mr :BlockMatrix = null


Mr: org.apache.spark.mllib.linalg.distributed.BlockMatrix = null

Matrix Multiplication


In [98]:
//Matrix Multiplication
var matrix_mul :IndexedRowMatrix = null

//Normal Matrix
if (matrix_mode == 0) {
  //model_blockMatrix.persist(StorageLevel.DISK_ONLY)
  //satellite_blockMatrix.persist(StorageLevel.DISK_ONLY)
  //matrix_mul = model_blockMatrix.toIndexedRowMatrix().multiply(satellite_blockMatrix.toLocalMatrix())
  matrix_mul = model_indRowMat.multiply(satellite_indRowMat.toBlockMatrix().toLocalMatrix())
  //n_components = List(model_indRowMat.numRows(), model_indRowMat.numCols(), satellite_indRowMat.numRows(), satellite_indRowMat.numCols()).min
}

//SC Matrix
if (matrix_mode == 1) {
  Mc.persist(StorageLevel.DISK_ONLY)
  Sc.persist(StorageLevel.DISK_ONLY)
  matrix_mul = Mc.toIndexedRowMatrix().multiply(Sc.toLocalMatrix())
  //n_components = List(Mc.numRows(), Mc.numCols(), Sc.numRows(), Sc.numCols()).min
}

//SR Matrix
if (matrix_mode == 2) {
  Mr.persist(StorageLevel.DISK_ONLY)
  Sr.persist(StorageLevel.DISK_ONLY)
  matrix_mul = Mr.toIndexedRowMatrix().multiply(Sr.toLocalMatrix())
  //n_components = List(Mr.numRows(), Mr.numCols(), Sr.numRows(), Sr.numCols()).min
}


[Stage 1430:==============================================>         (5 + 1) / 6]
matrix_mul = org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@53650bc0
Out[98]:
org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix@53650bc0

Paths to save GeoTiffs


In [99]:
//Paths to save GeoTiffs
var u_geotiff_hdfs_paths :Array[String] = Array.fill[String](n_components.toInt)("")
var u_geotiff_tmp_paths :Array[String] = Array.fill[String](n_components.toInt)("")
var v_geotiff_hdfs_paths :Array[String] = Array.fill[String](n_components.toInt)("")
var v_geotiff_tmp_paths :Array[String] = Array.fill[String](n_components.toInt)("")

//Create dirs in HDFS
var cmd = "hadoop dfs -mkdir -p " + out_path + "u_tiffs/"
Process(cmd)!

cmd = "hadoop dfs -mkdir -p " + out_path + "v_tiffs/"
Process(cmd)!

cfor(0)(_ < n_components, _ + 1) { k =>
  u_geotiff_hdfs_paths(k) =  out_path + "u_tiffs/svd_u_" + k + "_" + n_components + matrix_mode_str + ".tif"
  u_geotiff_tmp_paths(k) = "/tmp/svd_u_" + k + "_" + n_components + matrix_mode_str + ".tif"
  if (fs.exists(new org.apache.hadoop.fs.Path(u_geotiff_hdfs_paths(k)))) {
    println("There is already a GeoTiff with the path: " + u_geotiff_hdfs_paths(k) + ". Please make either a copy or move it to another location, otherwise, it will be over-written.")
  }
    
  v_geotiff_hdfs_paths(k) =  out_path + "v_tiffs/svd_v_" + k + "_" + n_components + matrix_mode_str + ".tif"
  v_geotiff_tmp_paths(k) = "/tmp/svd_v_" + k + "_" + n_components + matrix_mode_str + ".tif"
  if (fs.exists(new org.apache.hadoop.fs.Path(v_geotiff_hdfs_paths(k)))) {
    println("There is already a GeoTiff with the path: " + v_geotiff_hdfs_paths(k) + ". Please make either a copy or move it to another location, otherwise, it will be over-written.")
  }
}


DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

u_geotiff_hdfs_paths = Array(hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_0_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_1_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_2_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_3_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_4_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_5_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_6_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_7_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_8_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_9_10.tif)
warning: there were two feature warnings; re-run with -feature for details
u_geotiff_tmp...
Out[99]:
[hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_0_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_1_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_2_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_3_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_4_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_5_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_6_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_7_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_8_10.tif, hdfs:///user/emma/svd/spark/BloomGridmetSOST4Km10/u_tiffs/svd_u_9_10.tif]

SVD


In [ ]:
val svd: SingularValueDecomposition[IndexedRowMatrix, Matrix] = matrix_mul.computeSVD(n_components.toInt, computeU = true)


[Stage 1433:=======================>                          (473 + 48) / 1000]

In [ ]:
val U: IndexedRowMatrix = svd.U // The U factor is a RowMatrix.
val s: Vector = svd.s // The singular values are stored in a local dense vector.
val V: Matrix = svd.V // The V factor is a local dense matrix.
val S = Matrices.diag(s)

Save Matrices

R


In [ ]:
val R_path = out_path + "U_matrix"
val U_RDD = U.rows.sortBy(_.index).map(_.vector.toArray)
U_RDD.cache()
if (save_svd_matrices)
    U_RDD.saveAsObjectFile(R_path)

V


In [ ]:
val V_path = out_path + "V_matrix"
val V_RDD = sc.parallelize(V.rowIter.toVector)
if (save_svd_matrices)
    V_RDD.saveAsObjectFile(V_path)

S


In [ ]:
val S_path = out_path + "S_matrix"
val S_RDD = sc.parallelize(S.rowIter.toVector)
if (save_svd_matrices)
    S_RDD.saveAsObjectFile(S_path)

Save results

R


In [ ]:
val u_path = out_path + "U" + matrix_mode_str +".csv"
if (fs.exists(new org.apache.hadoop.fs.Path(u_path))) {
  cmd = "hadoop dfs -rm -r " + u_path
  Process(cmd)!
}
U_RDD.map(m => m.mkString(",")).saveAsTextFile(u_path)

V


In [ ]:
val v_path = out_path + "V" + matrix_mode_str +".csv"
if (fs.exists(new org.apache.hadoop.fs.Path(v_path))) {
  cmd = "hadoop dfs -rm -r " + v_path
  Process(cmd)!
}
sc.parallelize(V.rowIter.toVector.map(m => m.toArray.mkString(",")),4).saveAsTextFile(v_path)

S


In [ ]:
val s_path = out_path + "S" + matrix_mode_str +".csv"
if (fs.exists(new org.apache.hadoop.fs.Path(s_path))) {
  cmd = "hadoop dfs -rm -r " + s_path
  Process(cmd)!
}
sc.parallelize(S.rowIter.toVector.map(m => m.toArray.mkString(",")),1).saveAsTextFile(s_path)

Save GeoTiffs

Create GeoTiffs for U (dimension is M(A) x n_components)


In [ ]:
t0 = System.nanoTime()
val mod_grid0_index_I = model_grid0_index.zipWithIndex().map{ case (v,i) => (i,v)}
mod_grid0_index_I.cache()
model_grid0.cache()
cfor(0)(_ < n_components, _ + 1) { k =>
  //Merge two RDDs, one containing the clusters_ID indices and the other one the indices of a Tile's grid cells
  val kB = sc.broadcast(k)
  //val U_k_RDD = U.rows.map(_.toArray.zipWithIndex.filter(_._2 == kB.value).sortBy(_._2).map{ case (v,i) => v}).flatMap(m => m)
  val U_k_RDD = U_RDD.map(_(kB.value))
  val cluster_cell_pos = ((U_k_RDD.zipWithIndex().map{ case (v,i) => (i,v)}).join(mod_grid0_index_I)).map{ case (k,(v,i)) => (v,i)}
  cluster_cell_pos.cache()
    
  //Associate a Cluster_IDs to respective Grid_cell
  val grid_clusters :RDD[ (Long, (Double, Option[Double]))] = model_grid0.leftOuterJoin(cluster_cell_pos.map{ case (c,i) => (i, c)})

  //Convert all None to NaN
  val grid_clusters_res = grid_clusters.sortByKey(true).map{case (k, (v, c)) => if (c == None) (k, Double.NaN) else (k, c.get)}

  //Define a Tile
  val cluster_cells :Array[Double] = grid_clusters_res.values.collect()
  val cluster_cellsD = DoubleArrayTile(cluster_cells, num_cols_rows._1, num_cols_rows._2)
  val geoTif = new SinglebandGeoTiff(cluster_cellsD, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions(compression.DeflateCompression))
  cluster_cell_pos.unpersist()
    
  //Save to /tmp/
  GeoTiffWriter.write(geoTif, u_geotiff_tmp_paths(k))

  //Upload to HDFS
  var cmd = "hadoop dfs -copyFromLocal -f " + u_geotiff_tmp_paths(k) + " " + u_geotiff_hdfs_paths(k)
  Process(cmd)!

  //Remove from /tmp/
  cmd = "rm -fr " + u_geotiff_tmp_paths(k)
  Process(cmd)!
}
U_RDD.unpersist()
model_grid0.unpersist()
mod_grid0_index_I.unpersist()
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")

Create GeoTiffs for V (dimension is M(B) x n_components)


In [ ]:
t0 = System.nanoTime()
val sat_grid0_index_I = satellite_grid0_index.zipWithIndex().map{ case (v,i) => (i,v)}
sat_grid0_index_I.cache()
val iter = V.colIter
var k :Int = 0
while (iter.hasNext) {
  //Merge two RDDs, one containing the clusters_ID indices and the other one the indices of a Tile's grid cells
  val V_k_RDD = sc.parallelize(iter.next().toArray.zipWithIndex.map{ case (v,i) => (i.toLong,v)})
  val cluster_cell_pos = (V_k_RDD.join(sat_grid0_index_I)).map{ case (k,(v,i)) => (v,i)}

  //Associate a Cluster_IDs to respective Grid_cell
  val grid_clusters :RDD[ (Long, (Double, Option[Double]))] = satellite_grid0.leftOuterJoin(cluster_cell_pos.map{ case (c,i) => (i, c)})

  //Convert all None to NaN
  val grid_clusters_res = grid_clusters.sortByKey(true).map{case (k, (v, c)) => if (c == None) (k, Double.NaN) else (k, c.get)}

  //Define a Tile
  val cluster_cells :Array[Double] = grid_clusters_res.values.collect()
  val cluster_cellsD = DoubleArrayTile(cluster_cells, num_cols_rows._1, num_cols_rows._2)
  val geoTif = new SinglebandGeoTiff(cluster_cellsD, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions(compression.DeflateCompression))

  //Save to /tmp/
  GeoTiffWriter.write(geoTif, v_geotiff_tmp_paths(k))

  //Upload to HDFS
  var cmd = "hadoop dfs -copyFromLocal -f " + v_geotiff_tmp_paths(k) + " " + v_geotiff_hdfs_paths(k)
  Process(cmd)!

  //Remove from /tmp/
  cmd = "rm -fr " + v_geotiff_tmp_paths(k)
  Process(cmd)!
    
  k += 1
}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")

In [ ]: