Create CSV file

This notebook show how to load an Array of triples stored as objectFile and save it again as a CSV file.

Dependencies


In [1]:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import geotrellis.spark.io.hadoop._

import org.apache.spark.rdd.RDD
import org.apache.hadoop.io._
import org.apache.hadoop.io.{IOUtils, SequenceFile}

Configuration


In [2]:
var offline_dir_path = "hdfs:///user/pheno/spring-index/"
var geoTiff_dir = "BloomFinal"
var wssse_path :String = offline_dir_path + geoTiff_dir + "/wssse"
var wssse_csv_path :String = offline_dir_path + geoTiff_dir + "/wssse.csv"

var conf = sc.hadoopConfiguration
var fs = org.apache.hadoop.fs.FileSystem.get(conf)

if (fs.exists(new org.apache.hadoop.fs.Path(wssse_csv_path))) {
    println("The file " + wssse_csv_path + " already exists we will delete it!!!")
    try { fs.delete(new org.apache.hadoop.fs.Path(wssse_csv_path), true) } catch { case _ : Throwable => { } }
}


Waiting for a Spark session to start...
offline_dir_path = hdfs:///user/pheno/spring-index/
geoTiff_dir = BloomFinal
wssse_path = hdfs:///user/pheno/spring-index/BloomFinal/wssse
wssse_csv_path = hdfs:///user/pheno/spring-index/BloomFinal/wssse.csv
conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, file:/usr/lib/spark-2.1.1-bin-without-hadoop/conf/hive-site.xml
fs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1047414169_36, ugi=pheno (auth:SIMPLE)]]
Out[2]:
DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1047414169_36, ugi=pheno (auth:SIMPLE)]]

Load Data


In [3]:
var wssse_data :RDD[(Int, Int, Double)] = sc.emptyRDD

//from disk
if (fs.exists(new org.apache.hadoop.fs.Path(wssse_path))) {
    wssse_data = sc.objectFile(wssse_path)
    println(wssse_data.collect().toList)        
}


List((12,35,4.3297733110180664E10), (2,35,1.3269493028574286E11), (3,35,7.612110282892206E10), (4,35,5.192563731031121E10), (5,35,3.86547975867681E10), (6,35,3.1070608358128193E10), (7,35,2.657061805793561E10), (8,35,2.307391282797956E10), (9,35,2.0876037759553226E10), (10,35,1.9153577999634872E10), (11,35,1.8028667788894394E10), (12,35,1.7020487136558575E10), (13,35,1.671272347748397E10), (14,35,1.5865979019165756E10), (15,35,1.5225546057630272E10))
wssse_data = MapPartitionsRDD[2] at objectFile at <console>:49
Out[3]:
MapPartitionsRDD[2] at objectFile at <console>:49

Save Data


In [4]:
val wssse = wssse_data.repartition(1).sortBy(_._1).map{case (a,b,c) => Array(a,b,c).mkString(",")}
wssse.saveAsTextFile(wssse_csv_path)


wssse = MapPartitionsRDD[10] at map at <console>:43
Out[4]:
MapPartitionsRDD[10] at map at <console>:43

In [ ]: