Create CSV file

This notebook show how to load an Array of triples stored as objectFile and save it again as a CSV file.

Dependencies


In [1]:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import geotrellis.spark.io.hadoop._

import org.apache.spark.rdd.RDD
import org.apache.hadoop.io._
import org.apache.hadoop.io.{IOUtils, SequenceFile}

Configuration


In [2]:
var offline_dir_path = "hdfs:///user/pheno/avhrr/"
//var offline_dir_path = "hdfs:///user/pheno/spring-index/"
var geoTiff_dir = "SOST"
//var geoTiff_dir = "LeafFinal"
var wssse_path :String = offline_dir_path + geoTiff_dir + "/75_wssse"
var wssse_csv_path :String = offline_dir_path + geoTiff_dir + "/75_wssse.csv"

var conf = sc.hadoopConfiguration
var fs = org.apache.hadoop.fs.FileSystem.get(conf)

if (fs.exists(new org.apache.hadoop.fs.Path(wssse_csv_path))) {
    println("The file " + wssse_csv_path + " already exists we will delete it!!!")
    try { fs.delete(new org.apache.hadoop.fs.Path(wssse_csv_path), true) } catch { case _ : Throwable => { } }
}


Waiting for a Spark session to start...
offline_dir_path = hdfs:///user/pheno/avhrr/
geoTiff_dir = SOST
wssse_path = hdfs:///user/pheno/avhrr/SOST/75_wssse
wssse_csv_path = hdfs:///user/pheno/avhrr/SOST/75_wssse.csv
conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, file:/usr/lib/spark-2.1.1-bin-without-hadoop/conf/hive-site.xml
fs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_475836789_36, ugi=pheno (auth:SIMPLE)]]
Out[2]:
()

Load Data


In [3]:
var wssse_data :RDD[(Int, Int, Double)] = sc.emptyRDD

//from disk
if (fs.exists(new org.apache.hadoop.fs.Path(wssse_path))) {
    wssse_data = sc.objectFile(wssse_path)
    println(wssse_data.collect().toList)        
}


List((10,75,3.543594088100406E11), (20,75,3.1164671707882837E11), (30,75,2.891401201066724E11), (40,75,2.7659957276233606E11), (50,75,2.6393510342638516E11), (60,75,2.5526775470250888E11), (70,75,2.474527242908672E11), (80,75,2.4158928746856482E11), (90,75,2.3546354225689584E11), (100,75,2.3163178150354764E11), (110,75,2.2620214478763037E11), (120,75,2.231121099726902E11), (130,75,2.1968520531267715E11), (140,75,2.1637970796204584E11), (150,75,2.1311835641170624E11), (160,75,2.100999719988994E11), (170,75,2.0805760370421555E11), (180,75,2.0606506045153506E11), (190,75,2.034304514480542E11), (200,75,2.0235881533249454E11), (210,75,1.9951537144941003E11), (220,75,1.9824697995193225E11), (230,75,1.9573530305860495E11), (240,75,1.94558876227357E11), (250,75,1.925478193220847E11), (260,75,1.908033219040186E11), (270,75,1.8954723146873877E11), (280,75,1.8836500863517624E11), (290,75,1.8699108636352213E11), (300,75,1.858526017795722E11), (310,75,1.8423297708310217E11), (320,75,1.8317663056064175E11), (330,75,1.823668408364441E11), (340,75,1.8115073571288522E11), (350,75,1.8020940315103592E11), (360,75,1.7874518307706036E11), (370,75,1.7821354947981213E11), (380,75,1.770531429434101E11), (390,75,1.7625400513016193E11), (400,75,1.7508245702737253E11), (410,75,1.74471484549489E11), (420,75,1.7334350000927664E11), (430,75,1.7247372967251416E11), (440,75,1.718031362254259E11), (450,75,1.7092756856140845E11), (460,75,1.703876445560928E11), (470,75,1.6950616282129953E11), (480,75,1.6886305610852612E11), (490,75,1.684841457440525E11), (500,75,1.6772424497378024E11))
wssse_data = MapPartitionsRDD[2] at objectFile at <console>:49
Out[3]:
MapPartitionsRDD[2] at objectFile at <console>:49

Save Data


In [4]:
val wssse = wssse_data.repartition(1).sortBy(_._1).map{case (a,b,c) => Array(a,b,c).mkString(",")}
wssse.saveAsTextFile(wssse_csv_path)


wssse = MapPartitionsRDD[10] at map at <console>:43
Out[4]:
MapPartitionsRDD[10] at map at <console>:43

In [ ]: