In [2]:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import geotrellis.spark.io.hadoop._
import org.apache.spark.rdd.RDD
import org.apache.hadoop.io._
import org.apache.hadoop.io.{IOUtils, SequenceFile}
import sys.process._
import scala.sys.process.Process
In [3]:
print("test")
In [17]:
var _dir_path_hdfs = "hdfs:///user/hadoop/"
var file_name = "TSF_process.x64"
var file_name_dir :String = _dir_path_hdfs + file_name
var file_name_dir_hdfs = new org.apache.hadoop.fs.Path(file_name_dir)
var file_name_settings = "200_200_px_USA_cluster.set"
var file_name_dir_settings :String = _dir_path_hdfs + file_name_settings
var file_name_settings_hdfs = new org.apache.hadoop.fs.Path(file_name_dir_settings)
var file_name_list_files = "listfile.txt"
var file_name_dir_list_files :String = _dir_path_hdfs + "Img_only/" + file_name_list_files
var file_name_list_hdfs = new org.apache.hadoop.fs.Path(file_name_dir_list_files)
var conf = sc.hadoopConfiguration
var fs = org.apache.hadoop.fs.FileSystem.get(conf)
if (fs.exists(file_name_dir_hdfs) && fs.exists(file_name_settings_hdfs)) {
println("The file " + file_name_dir + " is found! :))")
println("The file " + file_name_settings_hdfs + " is found! :))")
}else{
println("Put the file in user/hadoop/mycluster0 on the HDFS")
}
// Connect to Spark
var appName = "phenology_timesat"
var masterURL = "spark://mycluster0.mydomain:7077"
//A context needs to be created if it does not already exist
val conf_spark = new SparkConf().setAppName(appName).setMaster(masterURL)
val sc_spark = new SparkContext(conf_spark)
// Test
// val info = List(("viktor", 24), ("joe", 30), ("jack", 30))
// val infoRDD = sc_spark.parallelize(info)
// infoRDD.collect().foreach(println)
def update_list_path_location_on_worker(): Unit = {
sc_spark.addFile(file_name_dir_list_files)
val sparkListPath: String = org.apache.spark.SparkFiles.get(file_name_list_files)
print (sparkListPath)
// for each job Spark creates new set of temporary files
//dinamically change the list_of_files path for the worker in the settings file; line number=6
val settings: RDD[String] = sc_spark.textFile(file_name_dir_settings)
val filteredRdd = settings.zipWithIndex().collect { case (r, i) if (i != 5) => r
case (r, i) if (i == 5) => sparkListPath + " %Data" }
if (fs.exists(file_name_settings_hdfs+"_new")) {
fs.delete(file_name_settings_hdfs+"_new")
}
filteredRdd.coalesce(1).saveAsTextFile(file_name_dir_settings+"_new")
}
update_list_path_location_on_worker()
//METHODOLAGY:
// The basic idea:
//put the executable into HDFS and use addFile to add it into driver,
// which will also copy them into workers.
//Execute(use SparkFiles.get to get the path from the work executor) to that partition using Process.
//To control the input partitionaing:
//Data partitioning file as a RDD, and use mapPartitionsWithIndex function to save each partition (????)
sc_spark.addFile(file_name_dir)
sc_spark.addFile(file_name_dir_settings+ "_new/part-00000") //ugly
val sparkScriptPath: String = org.apache.spark.SparkFiles.get(file_name)
val sparkSettingsPath: String = org.apache.spark.SparkFiles.get("part-00000")
// //Execute the external system call
val exitCode = Seq(sparkScriptPath,sparkSettingsPath, "1").!
//TODO: How to have access from each of the workers to the .img files on HDFS ??
//Transport each file to the worker with .addFile ?
Out[17]:
In [ ]:
In [ ]:
In [ ]: