Correlation between a Model and Satellite data

This notebook shows how to calculate correlation between two matrices. Each matrix is created out a set of GeoTiffs for a series of years. Both matrices should have the same dimension.

For demonstration we will use from a model (spring-index) and from a satellite (AVHRR).

Dependencies


In [1]:
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

import breeze.linalg.{DenseMatrix => BDM, _}
import breeze.stats._
import geotrellis.proj4.CRS
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.io.geotiff.{SinglebandGeoTiff, _}
import geotrellis.raster.{CellType, DoubleArrayTile, Tile, UByteCellType}
import geotrellis.spark.io.hadoop._
import geotrellis.vector.{Extent, ProjectedExtent}
import org.apache.hadoop.io.SequenceFile.Writer
import org.apache.hadoop.io.{SequenceFile, _}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext, mllib}

import scala.sys.process._


import breeze.linalg.{DenseMatrix=>BDM, _}

Mode of operation

Here the user can define the mode of operation.

  • rdd_offline_mode: If false it means the notebook will create all data from scratch and store protected_extent and num_cols_rows into HDFS. Otherwise, these data structures are read from HDFS.

It is also possible to define which directory of GeoTiffs is to be used and on which band to run Kmeans. The options are

  • all which are a multi-band (8 bands) GeoTiffs
  • Or choose single band ones:
    1. Onset_Greenness_Increase
    2. Onset_Greenness_Maximum
    3. Onset_Greenness_Decrease
    4. Onset_Greenness_Minimum
    5. NBAR_EVI_Onset_Greenness_Minimum
    6. NBAR_EVI_Onset_Greenness_Maximum
    7. NBAR_EVI_Area
    8. Dynamics_QC

Note that when using a range **kemans offline mode** is not possible and it will be reset to **online mode**.

Mode of Operation setup


In [2]:
var model_rdd_offline_mode = true
var model_matrix_offline_mode = true
var satellite_rdd_offline_mode = true
var satellite_matrix_offline_mode = true

//Using spring-index model
var model_path = "hdfs:///user/hadoop/spring-index/"
var model_dir = "BloomFinal"

//Using AVHRR Satellite data
var satellite_path = "hdfs:///user/hadoop/avhrr/"
var satellite_dir = "SOST"

var out_path = "hdfs:///user/pheno/correlation/" + model_dir + satellite_dir + "/"
var band_num = 3

//Years between (inclusive) 1989 - 2014
var satellite_first_year = 1989
var satellite_last_year = 2014

//Years between (inclusive) 1980 - 2015
//You always need to give an extra year so we can adjust with negative values of AVHRR and MODIS (satellite)
var model_first_year = 1988
var model_last_year = 2014

//Mask
val toBeMasked = true
val mask_path = "hdfs:///user/hadoop/usa_mask.tif"

val save_rdds = false
val save_matrix = false


model_rdd_offline_mode = true
model_matrix_offline_mode = true
satellite_rdd_offline_mode = true
satellite_matrix_offline_mode = true
model_path = hdfs:///user/hadoop/spring-index/
model_dir = BloomFinal
satellite_path = hdfs:///user/hadoop/avhrr/
satellite_dir = SOST
out_path = hdfs:///user/pheno/correlation/BloomFinalSOST/
band_num = 3
satellite_first_year = 1989
satellite_last_year = 2014
model_first_year = 1988
model_last_year = 2014
toBeMasked = true
mask_path = hdfs:///user/hadoop/usa_mask.tif
save_rdds = false
save_matrix = false
Out[2]:
false

DON'T MODIFY ANY PIECE OF CODE FROM HERE ON!!!.

Mode of operation validation


In [3]:
//Check offline modes
var conf = sc.hadoopConfiguration
var fs = org.apache.hadoop.fs.FileSystem.get(conf)

//Paths to store data structures for Offline runs
var mask_str = ""
if (toBeMasked)
  mask_str = "_mask"
var model_grid0_path = out_path + model_dir + "_model_grid0"
var model_grid0_index_path = out_path + model_dir + "_model_grid0_index"

var model_grid_path = out_path + model_dir + "_model_grid"
var satellite_grid_path = out_path + satellite_dir + "_model_grid"
var model_matrix_path = out_path + model_dir + "_model_matrix"
var satellite_matrix_path = out_path + satellite_dir + "_model_matrix"
var metadata_path = out_path + model_dir + "_metadata"

val model_rdd_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(model_grid_path))
val model_matrix_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(model_matrix_path))
val satellite_rdd_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(satellite_grid_path))
val satellite_matrix_offline_exists = fs.exists(new org.apache.hadoop.fs.Path(satellite_matrix_path))

if (model_rdd_offline_mode != model_rdd_offline_exists) {
  println("\"Load GeoTiffs\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + model_rdd_offline_exists.toString())
  model_rdd_offline_mode = model_rdd_offline_exists
}

if (model_matrix_offline_mode != model_matrix_offline_exists) {
  println("\"Matrix\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + model_matrix_offline_exists.toString())
  model_matrix_offline_mode = model_matrix_offline_exists
}

var model_skip_rdd = false
if (model_matrix_offline_exists) {
    println("Since we have a matrix, the load of the grids RDD will be skipped!!!")
    model_skip_rdd = true
}

if (satellite_rdd_offline_mode != satellite_rdd_offline_exists) {
  println("\"Load GeoTiffs\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + satellite_rdd_offline_exists.toString())
  satellite_rdd_offline_mode = satellite_rdd_offline_exists
}

if (satellite_matrix_offline_mode != satellite_matrix_offline_exists) {
  println("\"Matrix\" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to " + satellite_matrix_offline_exists.toString())
  satellite_matrix_offline_mode = satellite_matrix_offline_exists
}

var satellite_skip_rdd = false
if (satellite_matrix_offline_exists) {
    println("Since we have a matrix, the load of the grids RDD will be skipped!!!")
    satellite_skip_rdd = true
}

var corr_tif = out_path + satellite_dir + "_" + model_dir + ".tif"
var corr_tif_tmp = "/tmp/correlation_" + satellite_dir + "_" + model_dir + ".tif"

//Years
val satellite_years = 1989 to 2014
val model_years = 1980 to 2015

if (!satellite_years.contains(satellite_first_year) || !(satellite_years.contains(satellite_last_year))) {
  println("Invalid range of years for " + satellite_dir + ". I should be between " + satellite_first_year + " and " + satellite_last_year)
  System.exit(0)
}

if (!model_years.contains(model_first_year) || !(model_years.contains(model_last_year))) {
  println("Invalid range of years for " + model_dir + ". I should be between " + model_first_year + " and " + model_last_year)
  System.exit(0)
}

if ( ((satellite_last_year - model_first_year) > (model_last_year - model_first_year)) || ((satellite_last_year - model_first_year) > (model_last_year - model_first_year))) {
  println("The range of years for each data set should be of the same length.");
  System.exit(0)
}

var model_years_range = (model_years.indexOf(model_first_year), model_years.indexOf(model_last_year))
var satellite_years_range = (satellite_years.indexOf(satellite_first_year), satellite_years.indexOf(satellite_last_year))

//Global variables
var projected_extent = new ProjectedExtent(new Extent(0,0,0,0), CRS.fromName("EPSG:3857"))
var model_grid0: RDD[(Long, Double)] = sc.emptyRDD
var model_grid0_index: RDD[Long] = sc.emptyRDD
var grids_RDD: RDD[Array[Double]] = sc.emptyRDD
var model_grids_RDD: RDD[Array[Double]] = sc.emptyRDD
var model_grids: RDD[Array[Double]] = sc.emptyRDD
var satellite_grids_RDD: RDD[Array[Double]] = sc.emptyRDD
var satellite_grids: RDD[Array[Double]] = sc.emptyRDD
var num_cols_rows :(Int, Int) = (0, 0)
var cellT :CellType = UByteCellType
var mask_tile0 :Tile = new SinglebandGeoTiff(geotrellis.raster.ArrayTile.empty(cellT, num_cols_rows._1, num_cols_rows._2), projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions.DEFAULT).tile
var satellite_cells_size :Long = 0
var model_cells_size :Long = 0
var t0 : Long = 0
var t1 : Long = 0


Waiting for a Spark session to start...
"Load GeoTiffs" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
"Matrix" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
"Matrix" offline mode is not set properly, i.e., either it was set to false and the required file does not exist or vice-versa. We will reset it to false
conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, file:/usr/lib/spark-2.1.1-bin-without-hadoop/conf/hive-site.xml
fs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1805994961_39, ugi=pheno (auth:SIMPLE)]]
mask_str = _mask
model_grid0_path = hdfs:///user/pheno/correlation/BloomFinalSOST/BloomFinal_model_grid0
model_grid0_index_path = hdfs:///user/pheno/correlation/BloomFinalSOST/BloomFinal_model_grid0_index
model_grid_path = hdfs:///user/pheno/correlation/BloomFinalSOST/BloomFinal_model_grid
satellite_grid_path = hdfs:///user/pheno/correlation/BloomFinalSOST/SOST_model...
Out[3]:
hdfs:///user/pheno/correlation/BloomFinalSOST/SOST_model_grid

Functions to (de)serialize any structure into Array[Byte]


In [4]:
def serialize(value: Any): Array[Byte] = {
    val out_stream: ByteArrayOutputStream = new ByteArrayOutputStream()
    val obj_out_stream = new ObjectOutputStream(out_stream)
    obj_out_stream.writeObject(value)
    obj_out_stream.close
    out_stream.toByteArray
}

def deserialize(bytes: Array[Byte]): Any = {
    val obj_in_stream = new ObjectInputStream(new ByteArrayInputStream(bytes))
    val value = obj_in_stream.readObject
    obj_in_stream.close
    value
}


serialize: (value: Any)Array[Byte]
deserialize: (bytes: Array[Byte])Any

Load GeoTiffs

Using GeoTrellis all GeoTiffs of a directory will be loaded into a RDD. Using the RDD, we extract a grid from the first file to lated store the Kmeans cluster_IDS, we build an Index for populate such grid and we filter out here all NaN values.


In [5]:
var t0 = System.nanoTime()

//Load Mask
if (!(model_skip_rdd && satellite_skip_rdd)  && toBeMasked) {
  val mask_tiles_RDD = sc.hadoopGeoTiffRDD(mask_path).values
  val mask_tiles_withIndex = mask_tiles_RDD.zipWithIndex().map{case (e,v) => (v,e)}
  mask_tile0 = (mask_tiles_withIndex.filter(m => m._1==0).filter(m => !m._1.isNaN).values.collect())(0)
}

//Local variables
val pattern: String = "tif"
val satellite_filepath: String = satellite_path + satellite_dir
val model_filepath: String = model_path + "/" + model_dir

var t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


Elapsed time: 12640094545ns
t0 = 450743224794952
pattern = tif
satellite_filepath = hdfs:///user/hadoop/avhrr/SOST
model_filepath = hdfs:///user/hadoop/spring-index//BloomFinal
t1 = 450755864889497
Out[5]:
450755864889497

Satellite data


In [6]:
t0 = System.nanoTime()
if (!satellite_skip_rdd) {
    if (satellite_rdd_offline_mode) {
      satellite_grids_RDD = sc.objectFile(satellite_grid_path)
    } else {
      //Lets load MODIS Singleband GeoTiffs and return RDD just with the tiles.
      val satellite_geos_RDD = sc.hadoopGeoTiffRDD(satellite_filepath, pattern)
      val satellite_tiles_RDD = satellite_geos_RDD.values

      if (toBeMasked) {
        val mask_tile_broad :Broadcast[Tile] = sc.broadcast(mask_tile0)
        satellite_grids_RDD = satellite_tiles_RDD.map(m => m.localInverseMask(mask_tile_broad.value, 1, -1000).toArrayDouble().filter(_ != -1000))
      } else {
        satellite_grids_RDD = satellite_tiles_RDD.map(m => m.toArrayDouble())
      }

      //Store in HDFS
      if (save_rdds) {
          satellite_grids_RDD.saveAsObjectFile(satellite_grid_path)
      }
    }
    val satellite_grids_withIndex = satellite_grids_RDD.zipWithIndex().map { case (e, v) => (v, e) }

    //Filter out the range of years:
    satellite_grids = satellite_grids_withIndex.filterByRange(satellite_years_range._1, satellite_years_range._2).values

    var satellite_grid0_index: RDD[Double] = satellite_grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m)
    satellite_cells_size = satellite_grid0_index.count().toInt
    println("Number of cells is: " + satellite_cells_size)
}
t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


Number of cells is: 10115631                                                    
Elapsed time: 13836309155ns
t0 = 450758125903566
t1 = 450771962212721
Out[6]:
450771962212721

Model data


In [7]:
t0 = System.nanoTime()
if (model_rdd_offline_mode) {
  model_grids_RDD = sc.objectFile(model_grid_path)
  model_grid0 = sc.objectFile(model_grid0_path)
  model_grid0_index = sc.objectFile(model_grid0_index_path)

  val metadata = sc.sequenceFile(metadata_path, classOf[IntWritable], classOf[BytesWritable]).map(_._2.copyBytes()).collect()
  projected_extent = deserialize(metadata(0)).asInstanceOf[ProjectedExtent]
  num_cols_rows = (deserialize(metadata(1)).asInstanceOf[Int], deserialize(metadata(2)).asInstanceOf[Int])
  cellT = deserialize(metadata(3)).asInstanceOf[CellType]  
} else {
  val model_geos_RDD = sc.hadoopMultibandGeoTiffRDD(model_filepath, pattern)
  val model_tiles_RDD = model_geos_RDD.values

  //Retrieve the number of cols and rows of the Tile's grid
  val tiles_withIndex = model_tiles_RDD.zipWithIndex().map{case (v,i) => (i,v)}
  val tile0 = (tiles_withIndex.filter(m => m._1==0).values.collect())(0)

  num_cols_rows = (tile0.cols, tile0.rows)
  cellT = tile0.cellType

  //Retrieve the ProjectExtent which contains metadata such as CRS and bounding box
  val projected_extents_withIndex = model_geos_RDD.keys.zipWithIndex().map{case (e,v) => (v,e)}
  projected_extent = (projected_extents_withIndex.filter(m => m._1 == 0).values.collect())(0)

  val band_numB :Broadcast[Int] = sc.broadcast(band_num)
  if (toBeMasked) {
    val mask_tile_broad :Broadcast[Tile] = sc.broadcast(mask_tile0)
    grids_RDD = model_tiles_RDD.map(m => m.band(band_numB.value).localInverseMask(mask_tile_broad.value, 1, -1000).toArrayDouble())
  } else {
    grids_RDD = model_tiles_RDD.map(m => m.band(band_numB.value).toArrayDouble())
  }

  //Get Index for each Cell
  val grids_withIndex = grids_RDD.zipWithIndex().map { case (e, v) => (v, e) }
  if (toBeMasked) {
    model_grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.filter(m => m._1 != -1000.0).map { case (v, i) => (i) }
  } else {
    model_grid0_index = grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m).zipWithIndex.map { case (v, i) => (i) }
  }

  //Get the Tile's grid
  model_grid0 = grids_withIndex.filter(m => m._1 == 0).values.flatMap( m => m).zipWithIndex.map{case (v,i) => (i,v)}

  //Lets filter out NaN
  if (toBeMasked) {
    model_grids_RDD = grids_RDD.map(m => m.filter(m => m != -1000.0))
  } else {
    model_grids_RDD = grids_RDD
  }

  //Store data in HDFS
  if (save_rdds) {
      model_grid0.saveAsObjectFile(model_grid0_path)
      model_grid0_index.saveAsObjectFile(model_grid0_index_path)
      model_grids_RDD.saveAsObjectFile(model_grid_path)

      val writer: SequenceFile.Writer = SequenceFile.createWriter(conf,
        Writer.file(metadata_path),
        Writer.keyClass(classOf[IntWritable]),
        Writer.valueClass(classOf[BytesWritable])
      )

      writer.append(new IntWritable(1), new BytesWritable(serialize(projected_extent)))
      writer.append(new IntWritable(2), new BytesWritable(serialize(num_cols_rows._1)))
      writer.append(new IntWritable(3), new BytesWritable(serialize(num_cols_rows._2)))
      writer.append(new IntWritable(4), new BytesWritable(serialize(cellT)))
      writer.hflush()
      writer.close()
  }
}
val model_grids_withIndex = model_grids_RDD.zipWithIndex().map { case (e, v) => (v, e) }

//Filter out the range of years:
model_grids = model_grids_withIndex.filterByRange(model_years_range._1, model_years_range._2).values

var model_tile0_index: RDD[Double] = model_grids_withIndex.filter(m => m._1 == 0).values.flatMap(m => m)
model_cells_size = model_tile0_index.count().toInt
println("Number of cells is: " + model_cells_size)

t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


Number of cells is: 10115631                                                    
Elapsed time: 1122352589690ns
t0 = 450773397458260
model_grids_withIndex = MapPartitionsRDD[52] at map at <console>:166
model_grids = MapPartitionsRDD[54] at values at <console>:169
model_tile0_index = MapPartitionsRDD[57] at flatMap at <console>:171
model_cells_size = 10115631
t1 = 451895750047950
Out[7]:
451895750047950

Matrix

We need to do a Matrix transpose to have clusters per cell and not per year. With a GeoTiff representing a single year, the loaded data looks liks this:

bands_RDD.map(s => Vectors.dense(s)).cache()

//The vectors are rows and therefore the matrix will look like this:
[
Vectors.dense(0.0, 1.0, 2.0),
Vectors.dense(3.0, 4.0, 5.0),
Vectors.dense(6.0, 7.0, 8.0),
Vectors.dense(9.0, 0.0, 1.0)
]

To achieve that we convert the RDD[Vector] into a distributed Matrix, a CoordinateMatrix, which as a transpose method.

Satellite data


In [8]:
t0 = System.nanoTime()
//Global variables
var satellite_matrix: RDD[mllib.linalg.Vector] = sc.emptyRDD
val satellite_cells_sizeB = sc.broadcast(satellite_cells_size)
if (satellite_matrix_offline_mode) {
  satellite_matrix = sc.objectFile(satellite_matrix_path)
} else {
  val mat :RowMatrix = new RowMatrix(satellite_grids_RDD.map(m => m.zipWithIndex).map(m => m.filter(!_._1.isNaN)).map(m => Vectors.sparse(satellite_cells_sizeB.value.toInt, m.map(v => v._2), m.map(v => v._1))))
  // Split the matrix into one number per line.
  val byColumnAndRow = mat.rows.zipWithIndex.map {
    case (row, rowIndex) => row.toArray.zipWithIndex.map {
      case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
    }
  }.flatMap(x => x)

  val matt: CoordinateMatrix = new CoordinateMatrix(byColumnAndRow)
  val matt_T = matt.transpose()

  satellite_matrix = matt_T.toIndexedRowMatrix().rows.sortBy(_.index).map(_.vector)
  if (save_matrix) {
      satellite_matrix.saveAsObjectFile(satellite_matrix_path)
  }
}

t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


[Stage 15:====================================================>(995 + 5) / 1000]Elapsed time: 392356588007ns
t0 = 451898269441517
satellite_matrix = MapPartitionsRDD[75] at map at <console>:89
satellite_cells_sizeB = Broadcast(17)
t1 = 452290626029524
Out[8]:
452290626029524

Model Data


In [9]:
t0 = System.nanoTime()

//Global variables
var model_matrix: RDD[mllib.linalg.Vector] = sc.emptyRDD

val model_cells_sizeB = sc.broadcast(model_cells_size)
if (model_matrix_offline_mode) {
  model_matrix = sc.objectFile(model_matrix_path)
} else {
  val mat :RowMatrix = new RowMatrix(model_grids.map(m => m.zipWithIndex).map(m => m.filter(!_._1.isNaN)).map(m => Vectors.sparse(model_cells_sizeB.value.toInt, m.map(v => v._2), m.map(v => v._1))))
  // Split the matrix into one number per line.
  val byColumnAndRow = mat.rows.zipWithIndex.map {
    case (row, rowIndex) => row.toArray.zipWithIndex.map {
      case (number, columnIndex) => new MatrixEntry(rowIndex, columnIndex, number)
    }
  }.flatMap(x => x)

  val matt: CoordinateMatrix = new CoordinateMatrix(byColumnAndRow)
  val matt_T = matt.transpose()

  model_matrix = matt_T.toIndexedRowMatrix().rows.sortBy(_.index).map(_.vector)
  
  if (save_matrix) {
      model_matrix.saveAsObjectFile(model_matrix_path)
  }
}

t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")


[Stage 19:====================================================>(995 + 5) / 1000]Elapsed time: 428822240188ns
t0 = 452292916348290
model_matrix = MapPartitionsRDD[93] at map at <console>:91
model_cells_sizeB = Broadcast(22)
t1 = 452721738588478
Out[9]:
452721738588478

Validation


In [10]:
var model_matrix_rows = model_matrix.count()
var satellite_matrix_rows = satellite_matrix.count()

if (model_matrix_rows != satellite_matrix_rows) {
  println("For correlation it is necessary to have a matrix with same number of rows and columns!!!")
  println("Model matrix has " + model_matrix_rows + " rows while satellite matrix has " + satellite_matrix_rows + " rows!!!")
  //System.exit(0)
}


[Stage 25:==================================================> (964 + 36) / 1000]
model_matrix_rows = 10115631
satellite_matrix_rows = 10115631
Out[10]:
10115631

In [11]:
val satellite :RDD[(Long, Array[Double])] = satellite_matrix.zipWithIndex().map{ case (v, i) => (i,v.toArray)}
val model :RDD[(Long, Array[Double])] = model_matrix.zipWithIndex().map{case (v,i) => (i, v.toArray)}
val satellite_model = satellite.join(model)
val satellite_model_V = satellite_model.map{ case (a,(b, c)) => (a, (new breeze.linalg.DenseVector[Double](b), new breeze.linalg.DenseVector[Double](c)))}


[Stage 31:=====================================================>(998 + 1) / 999]
satellite = MapPartitionsRDD[95] at map at <console>:62
model = MapPartitionsRDD[97] at map at <console>:63
satellite_model = MapPartitionsRDD[100] at join at <console>:64
satellite_model_V = MapPartitionsRDD[101] at map at <console>:65
Out[11]:
MapPartitionsRDD[101] at map at <console>:65

In [12]:
val satellite_yearsB = sc.broadcast(satellite_years)
val satellite_model_corr = satellite_model_V.map{ case (i, (b, a)) => ( i, 
    {
        val isLeapYear = (year: Int) => (((year % 4) == 0) && !(
          ((year % 100) == 0) &&
            !((year % 400) == 0))
          )
        //a is model, b is satellite (model has an extra day)
        var n = a.length
        val aC :Array[Double] = new Array[Double](n*2)
        val bC = new Array[Double](n*2)
        
        for (i <- 0 until (n*2)) {
          aC(i) = Double.NaN
          bC(i) = Double.NaN
        }
        
        var k = 0
        for (i <- 0 until (n)) {
          aC(k) = a(i)
          aC(k+1) = a(i)
          if (i > 0)
              bC(k) = b(i-1)
          k += 2
        }

        //Let's shift the negative days
        var neg_cnt = 0
        for (i <- 1 until (n)) {
          if (bC(i*2) < 0) {
            neg_cnt += 1
            if (isLeapYear(satellite_yearsB.value(i-1))) {
              bC((i * 2) - 1) = 367 - bC(i * 2)  
            } else {
              bC((i * 2) - 1) = 366 - bC(i * 2)  
            }      
            bC(i*2) = Double.NaN
          } //else { //American system starts countin at 0 days
            //  bC(i*2) = bC(i*2) + 1
          //}
        }

        for (i <- 0 until (n*2)) {
          if (bC(i).isNaN)
            aC(i) = Double.NaN
        }

        val aC_NoNAN = new breeze.linalg.DenseVector[Double](aC.filter(!_.isNaN))
        val bC_NoNAN = new breeze.linalg.DenseVector[Double](bC.filter(!_.isNaN))

        if (aC_NoNAN.length > bC_NoNAN.length) {
            assert(false)
        }
         
        //Correlation
        n = aC_NoNAN.length
        val ameanavar = meanAndVariance(aC_NoNAN)
        val amean = ameanavar.mean
        val avar = ameanavar.variance
        val bmeanbvar = meanAndVariance(bC_NoNAN)
        val bmean = bmeanbvar.mean
        val bvar = bmeanbvar.variance
        val astddev = math.sqrt(avar)
        val bstddev = math.sqrt(bvar)

        ((1.0 / (n - 1.0) * sum( ((aC_NoNAN - amean) / astddev) :* ((bC_NoNAN - bmean) / bstddev) )), n , neg_cnt)
})
}.sortByKey()


[Stage 38:====================================================>(998 + 2) / 1000]
satellite_yearsB = Broadcast(33)
satellite_model_corr = ShuffledRDD[105] at sortByKey at <console>:136
Out[12]:
ShuffledRDD[105] at sortByKey at <console>:136

Build GeoTiff with Kmeans cluster_IDs

The Grid with the cluster IDs is stored in a SingleBand GeoTiff and uploaded to HDFS.

Assign cluster ID to each grid cell and save the grid as SingleBand GeoTiff

To assign the clusterID to each grid cell it is necessary to get the indices of gird cells they belong to. The process is not straight forward because the ArrayDouble used for the creation of each dense Vector does not contain the NaN values, therefore there is not a direct between the indices in the Tile's grid and the ones in kmeans_res (kmeans result).

To join the two RDDS the knowledge was obtaing from a stackoverflow post on how to perform basic joins of two rdd tables in spark using python.

Correlation


In [13]:
val corr_res_b = satellite_model_corr.map{ case (a, (b,c,d)) => (a, b)}
//Merge two RDDs, one containing the clusters_ID indices and the other one the indices of a Tile's grid cells
var corr_cell_pos = corr_res_b.join(model_grid0_index.zipWithIndex().map{ case (v,i) => (i,v)}).map{ case (k,(v,i)) => (v,i)}

//Associate a Cluster_IDs to respective Grid_cell
var grid_corr = model_grid0.map{ case (i, v) => if (v == -1000) (i,Double.NaN) else (i,v)}.leftOuterJoin(corr_cell_pos.map{ case (c,i) => (i.toLong, c)})

//Convert all None to NaN
var grid_corr_res = grid_corr.sortByKey(true).map{case (k, (v, c)) => if (c == None) (k, Double.NaN) else (k, c.get)}

//Define a Tile
var corr_cells :Array[Double] = grid_corr_res.values.collect()
var corr_cellsD = DoubleArrayTile(corr_cells, num_cols_rows._1, num_cols_rows._2)

var geoTif = new SinglebandGeoTiff(corr_cellsD, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions(compression.DeflateCompression))

//Save to /tmp/
GeoTiffWriter.write(geoTif, corr_tif_tmp)

//Upload to HDFS

corr_tif = out_path + satellite_dir + "_" + model_dir + ".tif"
var cmd = "hadoop dfs -copyFromLocal -f " + corr_tif_tmp + " " + corr_tif
Process(cmd)!

//Remove from /tmp/
cmd = "rm -fr " + corr_tif_tmp
Process(cmd)!


[Stage 64:====================================================>(999 + 1) / 1000]DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

corr_res_b = MapPartitionsRDD[106] at map at <console>:82
corr_cell_pos = MapPartitionsRDD[112] at map at <console>:84
grid_corr = MapPartitionsRDD[117] at leftOuterJoin at <console>:87
grid_corr_res = MapPartitionsRDD[121] at map at <console>:90
corr_cells = Array(NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, NaN, ...
warning: there were two feature warnings; re-run with -feature for details

Array sizes


In [14]:
val corr_res_c = satellite_model_corr.map{ case (a, (b,c,d)) => (a, c)}
//Merge two RDDs, one containing the clusters_ID indices and the other one the indices of a Tile's grid cells
corr_cell_pos = corr_res_c.join(model_grid0_index.zipWithIndex().map{ case (v,i) => (i,v)}).map{ case (k,(v,i)) => (v,i)}

//Associate a Cluster_IDs to respective Grid_cell
grid_corr = model_grid0.map{ case (i, v) => if (v == -1000) (i,Double.NaN) else (i,v)}.leftOuterJoin(corr_cell_pos.map{ case (c,i) => (i.toLong, c)})

//Convert all None to NaN
grid_corr_res = grid_corr.sortByKey(true).map{case (k, (v, c)) => if (c == None) (k, Double.NaN) else (k, c.get)}

//Define a Tile
corr_cells = grid_corr_res.values.collect()
corr_cellsD = DoubleArrayTile(corr_cells, num_cols_rows._1, num_cols_rows._2)

geoTif = new SinglebandGeoTiff(corr_cellsD, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions(compression.DeflateCompression))

//Save to /tmp/
GeoTiffWriter.write(geoTif, corr_tif_tmp)

//Upload to HDFS
corr_tif = out_path + "len_" + satellite_dir + "_" + model_dir + ".tif"
cmd = "hadoop dfs -copyFromLocal -f " + corr_tif_tmp + " " + corr_tif
Process(cmd)!

//Remove from /tmp/
cmd = "rm -fr " + corr_tif_tmp
Process(cmd)!


[Stage 90:====================================================>(999 + 1) / 1000]DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

corr_res_c = MapPartitionsRDD[123] at map at <console>:91
corr_cell_pos = MapPartitionsRDD[129] at map at <console>:93
grid_corr = MapPartitionsRDD[134] at leftOuterJoin at <console>:96
grid_corr_res = MapPartitionsRDD[138] at map at <console>:99
corr_cells = [D@113836f5
corr_cellsD = DoubleConstantNoDataArrayTile([D@113836f5,7808,3892)
geoTif = SinglebandGeoTiff(DoubleConstantNoDataArrayTile([D@113836f5,7808,3892),Extent(-126.30312894720473, 14.29219617034159, -56.162671563152486, 49.25462702827337),geotrellis...
warning: there were two feature warnings; re-run with -feature for details
Out[14]:
SinglebandGeoTiff(DoubleConstantNoDataArrayTile([D@113836f5,7808,3892),Extent(-126.30312894720473, 14.29219617034159, -56.162671563152486, 49.25462702827337),geotrellis.proj4.CRS$$anon$3@41d0d1b7,Tags(Map(),List()),GeoTiffOptions(geotrellis.raster.io.geotiff.Striped@6076e357,geotrellis.raster.io.geotiff.compression.DeflateCompression$@5fcf830,1,None))

Number of Negatives


In [15]:
val corr_res_d = satellite_model_corr.map{ case (a, (b,c,d)) => (a, d)}
//Merge two RDDs, one containing the clusters_ID indices and the other one the indices of a Tile's grid cells
corr_cell_pos = corr_res_d.join(model_grid0_index.zipWithIndex().map{ case (v,i) => (i,v)}).map{ case (k,(v,i)) => (v,i)}

//Associate a Cluster_IDs to respective Grid_cell
grid_corr = model_grid0.map{ case (i, v) => if (v == -1000) (i,Double.NaN) else (i,v)}.leftOuterJoin(corr_cell_pos.map{ case (c,i) => (i.toLong, c)})

//Convert all None to NaN
grid_corr_res = grid_corr.sortByKey(true).map{case (k, (v, c)) => if (c == None) (k, Double.NaN) else (k, c.get.toDouble)}

//Define a Tile
corr_cells = grid_corr_res.values.collect()
corr_cellsD = DoubleArrayTile(corr_cells, num_cols_rows._1, num_cols_rows._2)

geoTif = new SinglebandGeoTiff(corr_cellsD, projected_extent.extent, projected_extent.crs, Tags.empty, GeoTiffOptions(compression.DeflateCompression))

//Save to /tmp/
corr_tif = out_path + "neg_" + satellite_dir + "_" + model_dir + ".tif"
GeoTiffWriter.write(geoTif, corr_tif_tmp)

//Upload to HDFS
cmd = "hadoop dfs -copyFromLocal -f " + corr_tif_tmp + " " + corr_tif
Process(cmd)!

//Remove from /tmp/
cmd = "rm -fr " + corr_tif_tmp
Process(cmd)!


[Stage 116:===================================================>(999 + 1) / 1000]DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

corr_res_d = MapPartitionsRDD[140] at map at <console>:91
corr_cell_pos = MapPartitionsRDD[146] at map at <console>:93
grid_corr = MapPartitionsRDD[151] at leftOuterJoin at <console>:96
grid_corr_res = MapPartitionsRDD[155] at map at <console>:99
corr_cells = [D@5bf6358f
corr_cellsD = DoubleConstantNoDataArrayTile([D@5bf6358f,7808,3892)
geoTif = SinglebandGeoTiff(DoubleConstantNoDataArrayTile([D@5bf6358f,7808,3892),Extent(-126.30312894720473, 14.29219617034159, -56.162671563152486, 49.25462702827337),geotrellis...
warning: there were two feature warnings; re-run with -feature for details
Out[15]:
SinglebandGeoTiff(DoubleConstantNoDataArrayTile([D@5bf6358f,7808,3892),Extent(-126.30312894720473, 14.29219617034159, -56.162671563152486, 49.25462702827337),geotrellis.proj4.CRS$$anon$3@41d0d1b7,Tags(Map(),List()),GeoTiffOptions(geotrellis.raster.io.geotiff.Striped@6076e357,geotrellis.raster.io.geotiff.compression.DeflateCompression$@5fcf830,1,None))