In [1]:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import geotrellis.spark.io.hadoop._
import org.apache.spark.rdd.RDD
import org.apache.hadoop.io._
import org.apache.hadoop.io.{IOUtils, SequenceFile}
In [2]:
var offline_dir_path = "hdfs:///user/pheno/spring-index/"
var geoTiff_dir = "BloomFinal"
var wssse_path :String = offline_dir_path + geoTiff_dir + "/wssse"
var wssse_csv_path :String = offline_dir_path + geoTiff_dir + "/wssse.csv"
var conf = sc.hadoopConfiguration
var fs = org.apache.hadoop.fs.FileSystem.get(conf)
if (fs.exists(new org.apache.hadoop.fs.Path(wssse_csv_path))) {
println("The file " + wssse_csv_path + " already exists we will delete it!!!")
try { fs.delete(new org.apache.hadoop.fs.Path(wssse_csv_path), true) } catch { case _ : Throwable => { } }
}
Out[2]:
In [3]:
var wssse_data :RDD[(Int, Int, Double)] = sc.emptyRDD
//from disk
if (fs.exists(new org.apache.hadoop.fs.Path(wssse_path))) {
wssse_data = sc.objectFile(wssse_path)
println(wssse_data.collect().toList)
}
Out[3]:
In [4]:
val wssse = wssse_data.repartition(1).sortBy(_._1).map{case (a,b,c) => Array(a,b,c).mkString(",")}
wssse.saveAsTextFile(wssse_csv_path)
Out[4]:
In [ ]: