This notebook shows how to multiply two matrices and calculate SVD. Each matrix is created out a set of GeoTiffs for a series of years. Both matrices should have the same dimension.
For demonstration we will use from a model (spring-index) and from a satellite (AVHRR).
In [82]:
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
import geotrellis.proj4.CRS
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.io.geotiff.{SinglebandGeoTiff, _}
import geotrellis.raster.{CellType, DoubleArrayTile, MultibandTile, Tile, UByteCellType}
import geotrellis.spark.io.hadoop._
import geotrellis.vector.{Extent, ProjectedExtent}
import org.apache.hadoop.io.SequenceFile.Writer
import org.apache.hadoop.io.{SequenceFile, _}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import spire.syntax.cfor.cfor
import scala.sys.process.Process
Here the user can define the mode of operation.
It is also possible to define which directory of GeoTiffs is to be used and on which band to run Kmeans. The options are
Note that when using a range **kemans offline mode** is not possible and it will be reset to **online mode**.
In [83]:
var model_rdd_offline_mode = true
var model_matrix_offline_mode = true
var satellite_rdd_offline_mode = true
var satellite_matrix_offline_mode = true
//Using spring-index model
//var model_path = "hdfs:///user/hadoop/avhrr/"
//var model_dir = "SOST"
var model_path = "hdfs:///user/hadoop/spring-index/"
var model_dir = "BloomGridmet"
var model_band_num = 3 //First band is 0
//Using AVHRR Satellite data
var satellite_path = "hdfs:///user/hadoop/avhrr/"
var satellite_dir = "SOST4Km"
//var satellite_path = "hdfs:///user/hadoop/spring-index/"
//var satellite_dir = "LeafGridmet"
var sat_band_num = 0 //First band is 0
var n_components :Long = 10
var out_path = "hdfs:///user/emma/svd/spark/" + model_dir + satellite_dir + "" + n_components + "/"
//Years between (inclusive) 1989 - 2014
val satellite_timeseries = (1989, 2014)
var satellite_first_year = 1989
var satellite_last_year = 2014
//Years between (inclusive) 1980 - 2015
val model_timeseries = (1980, 2016)
var model_first_year = 1989
var model_last_year = 2014
//Mask
val modToBeMasked = true
val satToBeMasked = true
val mod_mask_path = "hdfs:///user/hadoop/usa_mask_gridmet.tif"
//val mod_mask_path = "hdfs:///user/hadoop/usa_state_masks/california_4km.tif"
val sat_mask_path = "hdfs:///user/hadoop/usa_mask_gridmet.tif"
//val sat_mask_path = "hdfs:///user/hadoop/usa_state_masks/california_4km.tif"
//Matrix Mode: 0 Normal, 1 SC, 2 SR
val matrix_mode = 0
val save_rdds = false
val save_matrix = false
val save_svd_matrices = false
Out[83]:
DON'T MODIFY ANY PIECE OF CODE FROM HERE ON!!!.
In [84]:
//Check offline modes
var conf = sc.hadoopConfiguration
var fs = org.apache.hadoop.fs.FileSystem.get(conf)
//Paths to store data structures for Offline runs
var mod_mask_str = ""
var sat_mask_str = ""
if (modToBeMasked)
mod_mask_str = "_mask"
if (satToBeMasked)
sat_mask_str = "_mask"
val matrix_mode_str = matrix_mode match {
case 1 => "_Sc_Mc"
case 2 => "_Sr_Mr"
case _ => ""
}
var model_grid0_path = out_path + model_dir + "_grid0"
var model_grid0_index_path = out_path + model_dir + "_grid0_index"
var satellite_grid0_path = out_path + satellite_dir + "_grid0"
var satellite_grid0_index_path = out_path + satellite_dir + "_grid0_index"
var model_grid_path = out_path + model_dir + "_grid"
var satellite_grid_path = out_path + satellite_dir + "_grid"
var model_matrix_path = out_path + model_dir + "_matrix"
var satellite_matrix_path = out_path + satellite_dir + "_matrix"
var metadata_path = out_path + model_dir + "_metadata"
Out[84]:
In [85]:
var sc_path = out_path + model_dir + "_sc"
var mc_path = out_path + model_dir + "_mc"
val model_rdd_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(model_grid_path))
val model_matrix_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(model_matrix_path))
val satellite_rdd_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(satellite_grid_path))
val satellite_matrix_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(satellite_matrix_path))
if (model_rdd_offline_mode != model_rdd_offline_exists) {
println("\"Load GeoTiffs\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + model_rdd_offline_exists.toString())
model_rdd_offline_mode = model_rdd_offline_exists
}
if (model_matrix_offline_mode != model_matrix_offline_exists) {
println("\"Matrix\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + model_matrix_offline_exists.toString())
model_matrix_offline_mode = model_matrix_offline_exists
}
var model_skip_rdd = false
if (model_matrix_offline_exists) {
println("Since we have a matrix, the load of the grids RDD will be skipped!!!")
model_skip_rdd = true
}
if (satellite_rdd_offline_mode != satellite_rdd_offline_exists) {
println("\"Load GeoTiffs\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + satellite_rdd_offline_exists.toString())
satellite_rdd_offline_mode = satellite_rdd_offline_exists
}
if (satellite_matrix_offline_mode != satellite_matrix_offline_exists) {
println("\"Matrix\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + satellite_matrix_offline_exists.toString())
satellite_matrix_offline_mode = satellite_matrix_offline_exists
}
var satellite_skip_rdd = false
if (satellite_matrix_offline_exists) {
println("Since we have a matrix, the load of the grids RDD will be skipped!!!")
satellite_skip_rdd = true
}
//Years
val model_years = model_timeseries._1 to model_timeseries._2
val satellite_years = satellite_timeseries._1 to satellite_timeseries._2
if (!satellite_years.contains(satellite_first_year) || !(satellite_years.contains(satellite_last_year))) {
println("Invalid range of years for " + satellite_dir + ". I should be between " + satellite_first_year + " and " + satellite_last_year)
System.exit(0)
}
if (!model_years.contains(model_first_year) || !(model_years.contains(model_last_year))) {
println("Invalid range of years for " + model_dir + ". I should be between " + model_first_year + " and " + model_last_year)
System.exit(0)
}
if ( ((satellite_last_year - model_first_year) > (model_last_year - model_first_year)) || ((satellite_last_year - model_first_year) > (model_last_year - model_first_year))) {
println("The range of years for each data set should be of the same length.");
System.exit(0)
}
var model_years_range = (model_years.indexOf(model_first_year), model_years.indexOf(model_last_year))
var satellite_years_range = (satellite_years.indexOf(satellite_first_year), satellite_years.indexOf(satellite_last_year))
//Global variables
var projected_extent = new ProjectedExtent(new Extent(0,0,0,0), CRS.fromName("EPSG:3857"))
var model_grid0: RDD[(Long, Double)] = sc.emptyRDD
var model_grid0_index: RDD[Long] = sc.emptyRDD
var mod_grids_RDD: RDD[(Int, Array[Double])] = sc.emptyRDD
var model_grids_RDD: RDD[(Int, Array[Double])] = sc.emptyRDD
var model_grids: RDD[(Int,Array[Double])] = sc.emptyRDD
val rows = sc.parallelize(Array[Double]()).map(m => Vectors.dense(m))
var model_summary: MultivariateStatisticalSummary = new RowMatrix(rows).computeColumnSummaryStatistics()
var model_std :Array[Double] = new Array[Double](0)
var satellite_grid0: RDD[(Long, Double)] = sc.emptyRDD
var satellite_grid0_index: RDD[Long] = sc.emptyRDD
var sat_grids_RDD: RDD[(Int, Array[Double])] = sc.emptyRDD
var satellite_grids_RDD: RDD[(Int, Array[Double])] = sc.emptyRDD
var satellite_grids: RDD[(Int,Array[Double])] = sc.emptyRDD
var satellite_summary: MultivariateStatisticalSummary = new RowMatrix(rows).computeColumnSummaryStatistics()
var satellite_std :Array[Double] = new Array[Double](0)
var num_cols_rows :(Int, Int) = (0, 0)
var cellT :CellType = UByteCellType
var mod_mask_tile0 :Tile = new SinglebandGeoTiff(geotrellis.raster.ArrayTile.empty(cellT, num_cols_rows._1, num_cols_rows._2), projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions.DEFAULT).tile
var sat_mask_tile0 :Tile = new SinglebandGeoTiff(geotrellis.raster.ArrayTile.empty(cellT, num_cols_rows._1, num_cols_rows._2), projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions.DEFAULT).tile
var satellite_cells_size :Long = 0
var model_cells_size :Long = 0
var t0 : Long = 0
var t1 : Long = 0
print(t0)
Out[85]:
In [86]:
def serialize(value: Any): Array[Byte] = {
val out_stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val obj_out_stream = new ObjectOutputStream(out_stream)
obj_out_stream.writeObject(value)
obj_out_stream.close
out_stream.toByteArray
}
def deserialize(bytes: Array[Byte]): Any = {
val obj_in_stream = new ObjectInputStream(new ByteArrayInputStream(bytes))
val value = obj_in_stream.readObject
obj_in_stream.close
value
}
In [87]:
t0 = System.nanoTime()
//Load Mask
if (modToBeMasked) {
val mask_tiles_RDD = sc.hadoopGeoTiffRDD(mod_mask_path).values
val mask_tiles_withIndex = mask_tiles_RDD.zipWithIndex().map { case (e, v) => (v, e) }
mod_mask_tile0 = (mask_tiles_withIndex.filter(m => m._1 == 0).filter(m => !m._1.isNaN).values.collect()) (0)
}
if (satToBeMasked) {
val mask_tiles_RDD = sc.hadoopGeoTiffRDD(sat_mask_path).values
val mask_tiles_withIndex = mask_tiles_RDD.zipWithIndex().map { case (e, v) => (v, e) }
sat_mask_tile0 = (mask_tiles_withIndex.filter(m => m._1 == 0).filter(m => !m._1.isNaN).values.collect()) (0)
}
//Local variables
val pattern: String = "*.tif"
val satellite_filepath: String = satellite_path + satellite_dir
val model_filepath: String = model_path + "/" + model_dir
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
Out[87]:
In [88]:
def hadoopGeoTiffRDD(satellite_filepath :String, pattern :String): RDD[(Int, (ProjectedExtent, Tile))] = {
val listFiles = sc.binaryFiles(satellite_filepath + "/" + pattern).sortBy(_._1).keys.collect()
var prevRDD :RDD[(Int, (ProjectedExtent, Tile))] = sc.emptyRDD
cfor(0)(_ < listFiles.length, _ + 1) { k =>
val filePath :String = listFiles(k)
val kB = sc.broadcast(k)
val currRDD = sc.hadoopGeoTiffRDD(filePath).map(m => (kB.value, m))
if (prevRDD.isEmpty()){
prevRDD = currRDD
} else {
prevRDD = currRDD.union(prevRDD)
}
//kB.destroy()
}
prevRDD.sortBy(_._1)
}
In [89]:
def hadoopMultibandGeoTiffRDD(satellite_filepath :String, pattern :String): RDD[(Int, (ProjectedExtent, MultibandTile))] = {
val listFiles = sc.binaryFiles(satellite_filepath + "/" + pattern).sortBy(_._1).keys.collect()
var prevRDD :RDD[(Int,(ProjectedExtent, MultibandTile))] = sc.emptyRDD
cfor(0)(_ < listFiles.length, _ + 1) { k =>
val filePath :String = listFiles(k)
val kB = sc.broadcast(k)
val currRDD = sc.hadoopMultibandGeoTiffRDD(filePath).map(m => (kB.value,m))
if (prevRDD.isEmpty()){
prevRDD = currRDD
} else {
prevRDD = currRDD.union(prevRDD)
}
//kB.destroy()
}
prevRDD.sortBy(_._1)
}
In [90]:
t0 = System.nanoTime()
if (satellite_rdd_offline_mode) {
satellite_grids_RDD = sc.objectFile(satellite_grid_path)
satellite_grid0 = sc.objectFile(satellite_grid0_path)
satellite_grid0_index = sc.objectFile(satellite_grid0_index_path)
} else {
//Lets load MODIS Singleband GeoTiffs and return RDD just with the tiles.
if (sat_band_num == 0) {
val satellite_geos_RDD = hadoopGeoTiffRDD(satellite_filepath, pattern)
satellite_geos_RDD.cache()
val satellite_tiles_RDD = satellite_geos_RDD.map{ case (i, (p,t)) => (i,t)}
if (satToBeMasked) {
val mask_tile_broad: Broadcast[Tile] = sc.broadcast(sat_mask_tile0)
sat_grids_RDD = satellite_tiles_RDD.map{ case (i,t) => (i, t.localInverseMask(mask_tile_broad.value, 1, -2000).toArrayDouble().map(t => if (t == -1000) Double.NaN else t))}
} else {
sat_grids_RDD = satellite_tiles_RDD.map{ case (i,t) => (i,t.toArrayDouble())}
}
} else {
val satellite_geos_RDD = hadoopMultibandGeoTiffRDD(satellite_filepath, pattern)
satellite_geos_RDD.cache()
val satellite_tiles_RDD = satellite_geos_RDD.map{ case (i, (p,t)) => (i,t)}
val band_numB: Broadcast[Int] = sc.broadcast(sat_band_num)
if (satToBeMasked) {
val mask_tile_broad: Broadcast[Tile] = sc.broadcast(sat_mask_tile0)
sat_grids_RDD = satellite_tiles_RDD.map{ case (i,t) => (i, t.band(band_numB.value).localInverseMask(mask_tile_broad.value, 1, -2000).toArrayDouble().map(t => if (t == -1000) Double.NaN else t))}
} else {
sat_grids_RDD = satellite_tiles_RDD.map{ case (i,t) => (i, t.band(band_numB.value).toArrayDouble())}
}
}
//Get Index for each Cell
val grids_withIndex = sat_grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }
if (satToBeMasked) {
satellite_grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.filter(m => m._1 != -2000.0).map { case (v, i) => (i) }
} else {
satellite_grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.map { case (v, i) => (i) }
}
//Get the Tile's grid
satellite_grid0 = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.map { case (v, i) => (i, v) }
//Lets filter out NaN
if (satToBeMasked) {
satellite_grids_RDD = sat_grids_RDD.map{ case (i,m) => (i,m.filter(m => m != -2000.0))}
} else {
satellite_grids_RDD = sat_grids_RDD
}
//Store in HDFS
if (save_rdds) {
satellite_grids_RDD.saveAsObjectFile(satellite_grid_path)
satellite_grid0.saveAsObjectFile(satellite_grid0_path)
satellite_grid0_index.saveAsObjectFile(satellite_grid0_index_path)
}
}
val satellite_grids_withIndex = satellite_grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }
//Filter out the range of years:
val sat_year_diff = satellite_first_year-satellite_timeseries._1
val sat_year_diffB = sc.broadcast(sat_year_diff)
satellite_grids = satellite_grids_withIndex.filterByRange(satellite_years_range._1, satellite_years_range._2).map{ case(i,a) => (i-(sat_year_diffB.value),a)}
satellite_grids.persist(StorageLevel.DISK_ONLY)
//Collect Stats:
satellite_summary = Statistics.colStats(satellite_grids.map{ case (i,m) => Vectors.dense(m)})
//satellite_std = satellite_summary.variance.toArray.map(m => scala.math.sqrt(m))
satellite_cells_size = satellite_grid0_index.count().toInt
println("Number of cells is: " + satellite_cells_size)
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
Out[90]:
In [91]:
t0 = System.nanoTime()
if (model_rdd_offline_mode) {
model_grids_RDD = sc.objectFile(model_grid_path)
model_grid0 = sc.objectFile(model_grid0_path)
model_grid0_index = sc.objectFile(model_grid0_index_path)
val metadata = sc.sequenceFile(metadata_path, classOf[IntWritable], classOf[BytesWritable]).map(_._2.copyBytes()).collect()
projected_extent = deserialize(metadata(0)).asInstanceOf[ProjectedExtent]
num_cols_rows = (deserialize(metadata(1)).asInstanceOf[Int], deserialize(metadata(2)).asInstanceOf[Int])
cellT = deserialize(metadata(3)).asInstanceOf[CellType]
} else {
if (model_band_num == 0) {
val model_geos_RDD = hadoopGeoTiffRDD(model_filepath, pattern)
model_geos_RDD.cache()
val model_tiles_RDD = model_geos_RDD.map{case (i,(p,t)) => (i,t)}//.values
//Retrieve the number of cols and rows of the Tile's grid
val tiles_withIndex = model_tiles_RDD//.zipWithIndex().map { case (v, i) => (i, v) }
val tile0 = (tiles_withIndex.filter(m => m._1 == 0).values.collect()) (0)
num_cols_rows = (tile0.cols, tile0.rows)
cellT = tile0.cellType
//Retrieve the ProjectExtent which contains metadata such as CRS and bounding box
val projected_extents_withIndex = model_geos_RDD.map{ case (i,(p,t)) => (i,p)}//.keys.zipWithIndex().map { case (e, v) => (v, e) }
projected_extent = (projected_extents_withIndex.filter(m => m._1 == 0).values.collect()) (0)
if (modToBeMasked) {
val mask_tile_broad: Broadcast[Tile] = sc.broadcast(mod_mask_tile0)
mod_grids_RDD = model_tiles_RDD.map{case (i,m) => (i, m.localInverseMask(mask_tile_broad.value, 1, -2000).toArrayDouble().map(m => if (m == -1000) Double.NaN else m))}
} else {
mod_grids_RDD = model_tiles_RDD.map{case (i,m) => (i,m.toArrayDouble())}
}
} else {
val model_geos_RDD = hadoopMultibandGeoTiffRDD(model_filepath, pattern)
model_geos_RDD.cache()
val model_tiles_RDD = model_geos_RDD.map{case (i,(p,t)) => (i,t)}//.values
//Retrieve the number of cols and rows of the Tile's grid
val tiles_withIndex = model_tiles_RDD//.zipWithIndex().map { case (v, i) => (i, v) }
val tile0 = (tiles_withIndex.filter(m => m._1 == 0).values.collect()) (0)
num_cols_rows = (tile0.cols, tile0.rows)
cellT = tile0.cellType
//Retrieve the ProjectExtent which contains metadata such as CRS and bounding box
val projected_extents_withIndex = model_geos_RDD.map{ case (i,(p,t)) => (i,p)}//.keys.zipWithIndex().map { case (e, v) => (v, e) }
projected_extent = (projected_extents_withIndex.filter(m => m._1 == 0).values.collect()) (0)
val band_numB: Broadcast[Int] = sc.broadcast(model_band_num)
if (modToBeMasked) {
val mask_tile_broad: Broadcast[Tile] = sc.broadcast(mod_mask_tile0)
mod_grids_RDD = model_tiles_RDD.map{case (i,m) => (i,m.band(band_numB.value).localInverseMask(mask_tile_broad.value, 1, -2000).toArrayDouble().map(m => if (m == -1000) Double.NaN else m))}
} else {
mod_grids_RDD = model_tiles_RDD.map{case (i,m) => (i,m.band(band_numB.value).toArrayDouble())}
}
}
//Get Index for each Cell
val grids_withIndex = mod_grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }
if (modToBeMasked) {
model_grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.filter(m => m._1 != -2000.0).map { case (v, i) => (i) }
} else {
model_grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.map { case (v, i) => (i) }
}
//Get the Tile's grid
model_grid0 = grids_withIndex.filter(m => m._1 == 0).values.flatMap( m => m).zipWithIndex.map{case (v,i) => (i,v)}
//Lets filter out NaN
if (modToBeMasked) {
model_grids_RDD = mod_grids_RDD.map{case (i,m) => (i, m.filter(m => m != -2000.0))}
} else {
model_grids_RDD = mod_grids_RDD
}
//Store data in HDFS
if (save_rdds) {
model_grids_RDD.saveAsObjectFile(model_grid_path)
model_grid0.saveAsObjectFile(model_grid0_path)
model_grid0_index.saveAsObjectFile(model_grid0_index_path)
val writer: SequenceFile.Writer = SequenceFile.createWriter(conf,
Writer.file(metadata_path),
Writer.keyClass(classOf[IntWritable]),
Writer.valueClass(classOf[BytesWritable])
)
writer.append(new IntWritable(1), new BytesWritable(serialize(projected_extent)))
writer.append(new IntWritable(2), new BytesWritable(serialize(num_cols_rows._1)))
writer.append(new IntWritable(3), new BytesWritable(serialize(num_cols_rows._2)))
writer.append(new IntWritable(4), new BytesWritable(serialize(cellT)))
writer.hflush()
writer.close()
}
}
val model_grids_withIndex = model_grids_RDD//.zipWithIndex().map { case (e, v) => (v, e) }
//Filter out the range of years:
val mod_year_diff = model_first_year-model_timeseries._1
val mod_year_diffB = sc.broadcast(mod_year_diff)
model_grids = model_grids_withIndex.filterByRange(model_years_range._1, model_years_range._2).map{ case(i,a) => (i-(mod_year_diffB.value),a)}//.values
model_grids.persist(StorageLevel.DISK_ONLY)
//Collect Stats:
model_summary = Statistics.colStats(model_grids.map{ case (i,m) => Vectors.dense(m)})
//model_std = model_summary.variance.toArray.map(m => scala.math.sqrt(m))
var model_tile0_index: RDD[Double] = model_grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m)
model_cells_size = model_tile0_index.count().toInt
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
Out[91]:
In [92]:
//Satellite
val satellite_cells_sizeB = sc.broadcast(satellite_cells_size)
val satellite_indRowMat :IndexedRowMatrix = new IndexedRowMatrix(satellite_grids.map{ case (i, m) => (i,m.zipWithIndex)}.map{ case (i,m) => (i,m.filter(!_._1.isNaN))}.map{ case (i,m) => new IndexedRow(i.toLong, Vectors.sparse(satellite_cells_sizeB.value.toInt, m.map(v => v._2), m.map(v => v._1)))})
Out[92]:
In [93]:
//SC
val sc_exists = fs.exists(new org.apache.hadoop.fs.Path(sc_path))
var Sc :BlockMatrix = null
if (matrix_mode == 1) {
if (sc_exists) {
val rdd_indexed_rows :RDD[IndexedRow]= sc.objectFile(sc_path)
Sc = new IndexedRowMatrix(rdd_indexed_rows).toBlockMatrix()
} else {
val satellite_M_1_Gc = sc.parallelize(Array[Vector](satellite_summary.mean)).map(m => Vectors.dense(m.toArray))
val satellite_M_1_Gc_RowM: RowMatrix = new RowMatrix(satellite_M_1_Gc)
val sat_M_1_Gc_byColumnAndRow = satellite_M_1_Gc_RowM.rows.zipWithIndex.map {
case (row, rowIndex) => row.toArray.zipWithIndex.map {
case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
}
}.flatMap(x => x)
val satellite_M_1_Gc_blockMatrix = new CoordinateMatrix(sat_M_1_Gc_byColumnAndRow).toBlockMatrix()
val sat_matrix_Nt_1 = new Array[Double](satellite_grids.count().toInt)
//satellite_grids.unpersist(false)
for (i <- 0 until sat_matrix_Nt_1.length)
sat_matrix_Nt_1(i) = 1
val satellite_M_Nt_1 = sc.parallelize(sat_matrix_Nt_1).map(m => Vectors.dense(m))
val satellite_M_Nt_1_RowM: RowMatrix = new RowMatrix(satellite_M_Nt_1)
val sat_M_Nt_1_byColumnAndRow = satellite_M_Nt_1_RowM.rows.zipWithIndex.map {
case (row, rowIndex) => row.toArray.zipWithIndex.map {
case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
}
}.flatMap(x => x)
val satellite_M_Nt_1_blockMatrix = new CoordinateMatrix(sat_M_Nt_1_byColumnAndRow).toBlockMatrix()
val satellite_M_Nt_Gc_blockMatrix = satellite_M_Nt_1_blockMatrix.multiply(satellite_M_1_Gc_blockMatrix)
//Sc = satellite_blockMatrix.subtract(satellite_M_Nt_Gc_blockMatrix)
val joined_mat :RDD[ (Long, (Array[Double], Array[Double]))] = satellite_indRowMat.rows.map(m => (m.index, m.vector.toArray)).join(satellite_M_Nt_Gc_blockMatrix.toCoordinateMatrix().toIndexedRowMatrix().rows.map(m => (m.index, m.vector.toArray)))
Sc = (new CoordinateMatrix(joined_mat.map {case (row_index, (a,b)) => a.zip(b).map(m => m._1-m._2).zipWithIndex.map{ case (v,col_index) => new MatrixEntry(row_index, col_index,v)}}.flatMap(m => m))).toBlockMatrix()
//save to disk
Sc.toIndexedRowMatrix().rows.saveAsObjectFile(sc_path)
}
}
Out[93]:
In [94]:
//Model
val model_cells_sizeB = sc.broadcast(model_cells_size)
val model_indRowMat :IndexedRowMatrix = new IndexedRowMatrix(model_grids.map{ case (i, m) => (i,m.zipWithIndex)}.map{ case (i,m) => (i,m.filter(!_._1.isNaN))}.map{ case (i,m) => new IndexedRow(i.toLong, Vectors.sparse(model_cells_sizeB.value.toInt, m.map(v => v._2), m.map(v => v._1)))}).toCoordinateMatrix().transpose().toIndexedRowMatrix()
Out[94]:
In [95]:
//MC
val mc_exists = fs.exists(new org.apache.hadoop.fs.Path(mc_path))
var Mc :BlockMatrix = null
if (matrix_mode == 1) {
if (mc_exists) {
val rdd_indexed_rows :RDD[IndexedRow]= sc.objectFile(mc_path)
Mc = new IndexedRowMatrix(rdd_indexed_rows).toBlockMatrix()
} else {
val model_M_1_Gc = sc.parallelize(Array[Vector](model_summary.mean)).map(m => Vectors.dense(m.toArray))
val model_M_1_Gc_RowM: RowMatrix = new RowMatrix(model_M_1_Gc)
val mod_M_1_Gc_byColumnAndRow = model_M_1_Gc_RowM.rows.zipWithIndex.map {
case (row, rowIndex) => row.toArray.zipWithIndex.map {
case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
}
}.flatMap(x => x)
val model_M_1_Gc_blockMatrix = new CoordinateMatrix(mod_M_1_Gc_byColumnAndRow).toBlockMatrix()
val model_matrix_Nt_1 = new Array[Double](model_grids.count().toInt)
//model_grids.unpersist(false)
for (i <- 0 until model_matrix_Nt_1.length)
model_matrix_Nt_1(i) = 1
val model_M_Nt_1 = sc.parallelize(model_matrix_Nt_1).map(m => Vectors.dense(m))
val model_M_Nt_1_RowM: RowMatrix = new RowMatrix(model_M_Nt_1)
val mod_M_Nt_1_byColumnAndRow = model_M_Nt_1_RowM.rows.zipWithIndex.map {
case (row, rowIndex) => row.toArray.zipWithIndex.map {
case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
}
}.flatMap(x => x)
val model_M_Nt_1_blockMatrix = new CoordinateMatrix(mod_M_Nt_1_byColumnAndRow).toBlockMatrix()
val model_M_Nt_Gc_blockMatrix = model_M_Nt_1_blockMatrix.multiply(model_M_1_Gc_blockMatrix)
val model_M_Gc_Nt_blockMatrix = model_M_Nt_Gc_blockMatrix.transpose
//Mc = model_blockMatrix.subtract(model_M_Gc_Nt_blockMatrix)
val joined_mat :RDD[ (Long, (Array[Double], Array[Double]))] = model_indRowMat.rows.map( m => (m.index, m.vector.toArray)).join(model_M_Gc_Nt_blockMatrix.toIndexedRowMatrix().rows.map(m => (m.index, m.vector.toArray)))
Mc = (new CoordinateMatrix(joined_mat.map {case (row_index, (a,b)) => a.zip(b).map(m => m._1-m._2).zipWithIndex.map{ case (v,col_index) => new MatrixEntry(row_index, col_index,v)}}.flatMap(m => m))).toBlockMatrix()
//save to disk
Mc.toIndexedRowMatrix().rows.saveAsObjectFile(mc_path)
}
}
Out[95]:
In [96]:
//SR
var Sr :BlockMatrix = null
In [97]:
//MR
var Mr :BlockMatrix = null
In [98]:
//Matrix Multiplication
var matrix_mul :IndexedRowMatrix = null
//Normal Matrix
if (matrix_mode == 0) {
//model_blockMatrix.persist(StorageLevel.DISK_ONLY)
//satellite_blockMatrix.persist(StorageLevel.DISK_ONLY)
//matrix_mul = model_blockMatrix.toIndexedRowMatrix().multiply(satellite_blockMatrix.toLocalMatrix())
matrix_mul = model_indRowMat.multiply(satellite_indRowMat.toBlockMatrix().toLocalMatrix())
//n_components = List(model_indRowMat.numRows(), model_indRowMat.numCols(), satellite_indRowMat.numRows(), satellite_indRowMat.numCols()).min
}
//SC Matrix
if (matrix_mode == 1) {
Mc.persist(StorageLevel.DISK_ONLY)
Sc.persist(StorageLevel.DISK_ONLY)
matrix_mul = Mc.toIndexedRowMatrix().multiply(Sc.toLocalMatrix())
//n_components = List(Mc.numRows(), Mc.numCols(), Sc.numRows(), Sc.numCols()).min
}
//SR Matrix
if (matrix_mode == 2) {
Mr.persist(StorageLevel.DISK_ONLY)
Sr.persist(StorageLevel.DISK_ONLY)
matrix_mul = Mr.toIndexedRowMatrix().multiply(Sr.toLocalMatrix())
//n_components = List(Mr.numRows(), Mr.numCols(), Sr.numRows(), Sr.numCols()).min
}
Out[98]:
In [99]:
//Paths to save GeoTiffs
var u_geotiff_hdfs_paths :Array[String] = Array.fill[String](n_components.toInt)("")
var u_geotiff_tmp_paths :Array[String] = Array.fill[String](n_components.toInt)("")
var v_geotiff_hdfs_paths :Array[String] = Array.fill[String](n_components.toInt)("")
var v_geotiff_tmp_paths :Array[String] = Array.fill[String](n_components.toInt)("")
//Create dirs in HDFS
var cmd = "hadoop dfs -mkdir -p " + out_path + "u_tiffs/"
Process(cmd)!
cmd = "hadoop dfs -mkdir -p " + out_path + "v_tiffs/"
Process(cmd)!
cfor(0)(_ < n_components, _ + 1) { k =>
u_geotiff_hdfs_paths(k) = out_path + "u_tiffs/svd_u_" + k + "_" + n_components + matrix_mode_str + ".tif"
u_geotiff_tmp_paths(k) = "/tmp/svd_u_" + k + "_" + n_components + matrix_mode_str + ".tif"
if (fs.exists(new org.apache.hadoop.fs.Path(u_geotiff_hdfs_paths(k)))) {
println("There is already a GeoTiff with the path: " + u_geotiff_hdfs_paths(k) + ". Please make either a copy or move it to another location, otherwise, it will be over-written.")
}
v_geotiff_hdfs_paths(k) = out_path + "v_tiffs/svd_v_" + k + "_" + n_components + matrix_mode_str + ".tif"
v_geotiff_tmp_paths(k) = "/tmp/svd_v_" + k + "_" + n_components + matrix_mode_str + ".tif"
if (fs.exists(new org.apache.hadoop.fs.Path(v_geotiff_hdfs_paths(k)))) {
println("There is already a GeoTiff with the path: " + v_geotiff_hdfs_paths(k) + ". Please make either a copy or move it to another location, otherwise, it will be over-written.")
}
}
Out[99]:
In [ ]:
val svd: SingularValueDecomposition[IndexedRowMatrix, Matrix] = matrix_mul.computeSVD(n_components.toInt, computeU = true)
In [ ]:
val U: IndexedRowMatrix = svd.U // The U factor is a RowMatrix.
val s: Vector = svd.s // The singular values are stored in a local dense vector.
val V: Matrix = svd.V // The V factor is a local dense matrix.
val S = Matrices.diag(s)
In [ ]:
val R_path = out_path + "U_matrix"
val U_RDD = U.rows.sortBy(_.index).map(_.vector.toArray)
U_RDD.cache()
if (save_svd_matrices)
U_RDD.saveAsObjectFile(R_path)
In [ ]:
val V_path = out_path + "V_matrix"
val V_RDD = sc.parallelize(V.rowIter.toVector)
if (save_svd_matrices)
V_RDD.saveAsObjectFile(V_path)
In [ ]:
val S_path = out_path + "S_matrix"
val S_RDD = sc.parallelize(S.rowIter.toVector)
if (save_svd_matrices)
S_RDD.saveAsObjectFile(S_path)
In [ ]:
val u_path = out_path + "U" + matrix_mode_str +".csv"
if (fs.exists(new org.apache.hadoop.fs.Path(u_path))) {
cmd = "hadoop dfs -rm -r " + u_path
Process(cmd)!
}
U_RDD.map(m => m.mkString(",")).saveAsTextFile(u_path)
In [ ]:
val v_path = out_path + "V" + matrix_mode_str +".csv"
if (fs.exists(new org.apache.hadoop.fs.Path(v_path))) {
cmd = "hadoop dfs -rm -r " + v_path
Process(cmd)!
}
sc.parallelize(V.rowIter.toVector.map(m => m.toArray.mkString(",")),4).saveAsTextFile(v_path)
In [ ]:
val s_path = out_path + "S" + matrix_mode_str +".csv"
if (fs.exists(new org.apache.hadoop.fs.Path(s_path))) {
cmd = "hadoop dfs -rm -r " + s_path
Process(cmd)!
}
sc.parallelize(S.rowIter.toVector.map(m => m.toArray.mkString(",")),1).saveAsTextFile(s_path)
In [ ]:
t0 = System.nanoTime()
val mod_grid0_index_I = model_grid0_index.zipWithIndex().map{ case (v,i) => (i,v)}
mod_grid0_index_I.cache()
model_grid0.cache()
cfor(0)(_ < n_components, _ + 1) { k =>
//Merge two RDDs, one containing the clusters_ID indices and the other one the indices of a Tile's grid cells
val kB = sc.broadcast(k)
//val U_k_RDD = U.rows.map(_.toArray.zipWithIndex.filter(_._2 == kB.value).sortBy(_._2).map{ case (v,i) => v}).flatMap(m => m)
val U_k_RDD = U_RDD.map(_(kB.value))
val cluster_cell_pos = ((U_k_RDD.zipWithIndex().map{ case (v,i) => (i,v)}).join(mod_grid0_index_I)).map{ case (k,(v,i)) => (v,i)}
cluster_cell_pos.cache()
//Associate a Cluster_IDs to respective Grid_cell
val grid_clusters :RDD[ (Long, (Double, Option[Double]))] = model_grid0.leftOuterJoin(cluster_cell_pos.map{ case (c,i) => (i, c)})
//Convert all None to NaN
val grid_clusters_res = grid_clusters.sortByKey(true).map{case (k, (v, c)) => if (c == None) (k, Double.NaN) else (k, c.get)}
//Define a Tile
val cluster_cells :Array[Double] = grid_clusters_res.values.collect()
val cluster_cellsD = DoubleArrayTile(cluster_cells, num_cols_rows._1, num_cols_rows._2)
val geoTif = new SinglebandGeoTiff(cluster_cellsD, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions(compression.DeflateCompression))
cluster_cell_pos.unpersist()
//Save to /tmp/
GeoTiffWriter.write(geoTif, u_geotiff_tmp_paths(k))
//Upload to HDFS
var cmd = "hadoop dfs -copyFromLocal -f " + u_geotiff_tmp_paths(k) + " " + u_geotiff_hdfs_paths(k)
Process(cmd)!
//Remove from /tmp/
cmd = "rm -fr " + u_geotiff_tmp_paths(k)
Process(cmd)!
}
U_RDD.unpersist()
model_grid0.unpersist()
mod_grid0_index_I.unpersist()
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
In [ ]:
t0 = System.nanoTime()
val sat_grid0_index_I = satellite_grid0_index.zipWithIndex().map{ case (v,i) => (i,v)}
sat_grid0_index_I.cache()
val iter = V.colIter
var k :Int = 0
while (iter.hasNext) {
//Merge two RDDs, one containing the clusters_ID indices and the other one the indices of a Tile's grid cells
val V_k_RDD = sc.parallelize(iter.next().toArray.zipWithIndex.map{ case (v,i) => (i.toLong,v)})
val cluster_cell_pos = (V_k_RDD.join(sat_grid0_index_I)).map{ case (k,(v,i)) => (v,i)}
//Associate a Cluster_IDs to respective Grid_cell
val grid_clusters :RDD[ (Long, (Double, Option[Double]))] = satellite_grid0.leftOuterJoin(cluster_cell_pos.map{ case (c,i) => (i, c)})
//Convert all None to NaN
val grid_clusters_res = grid_clusters.sortByKey(true).map{case (k, (v, c)) => if (c == None) (k, Double.NaN) else (k, c.get)}
//Define a Tile
val cluster_cells :Array[Double] = grid_clusters_res.values.collect()
val cluster_cellsD = DoubleArrayTile(cluster_cells, num_cols_rows._1, num_cols_rows._2)
val geoTif = new SinglebandGeoTiff(cluster_cellsD, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions(compression.DeflateCompression))
//Save to /tmp/
GeoTiffWriter.write(geoTif, v_geotiff_tmp_paths(k))
//Upload to HDFS
var cmd = "hadoop dfs -copyFromLocal -f " + v_geotiff_tmp_paths(k) + " " + v_geotiff_hdfs_paths(k)
Process(cmd)!
//Remove from /tmp/
cmd = "rm -fr " + v_geotiff_tmp_paths(k)
Process(cmd)!
k += 1
}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
In [ ]: