In [2]:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import geotrellis.spark.io.hadoop._
import org.apache.spark.rdd.RDD
import org.apache.hadoop.io._
import org.apache.hadoop.io.{IOUtils, SequenceFile}
import sys.process._

import scala.sys.process.Process

In [3]:
print("test")


test

Configuration


In [17]:
var _dir_path_hdfs = "hdfs:///user/hadoop/"
var file_name = "TSF_process.x64"
var file_name_dir :String = _dir_path_hdfs + file_name
var file_name_dir_hdfs = new org.apache.hadoop.fs.Path(file_name_dir)

var file_name_settings = "200_200_px_USA_cluster.set"
var file_name_dir_settings :String = _dir_path_hdfs + file_name_settings
var file_name_settings_hdfs = new org.apache.hadoop.fs.Path(file_name_dir_settings)

var file_name_list_files = "listfile.txt"
var file_name_dir_list_files :String = _dir_path_hdfs + "Img_only/" + file_name_list_files
var file_name_list_hdfs = new org.apache.hadoop.fs.Path(file_name_dir_list_files)

var conf = sc.hadoopConfiguration
var fs = org.apache.hadoop.fs.FileSystem.get(conf)

if (fs.exists(file_name_dir_hdfs) && fs.exists(file_name_settings_hdfs)) {
    println("The file " + file_name_dir + " is found! :))")
       println("The file " + file_name_settings_hdfs + " is found! :))")
}else{
    println("Put the file in user/hadoop/mycluster0 on the HDFS")
}

// Connect to Spark
var appName = "phenology_timesat"
var masterURL = "spark://mycluster0.mydomain:7077"

//A context needs to be created if it does not already exist
val conf_spark = new SparkConf().setAppName(appName).setMaster(masterURL)
val sc_spark = new SparkContext(conf_spark)


// Test
// val info = List(("viktor", 24), ("joe", 30), ("jack", 30))
// val infoRDD = sc_spark.parallelize(info)
// infoRDD.collect().foreach(println)

def update_list_path_location_on_worker(): Unit = {
    
    sc_spark.addFile(file_name_dir_list_files)
    
    val sparkListPath: String =  org.apache.spark.SparkFiles.get(file_name_list_files)
    print (sparkListPath)

    // for each job Spark creates new set of temporary files
    //dinamically change the list_of_files path for the worker in the settings file; line number=6
    val settings: RDD[String] = sc_spark.textFile(file_name_dir_settings)
    
    val filteredRdd = settings.zipWithIndex().collect { case (r, i) if (i != 5) => r
                                                        case (r, i) if (i == 5) => sparkListPath + " %Data" }

    if (fs.exists(file_name_settings_hdfs+"_new")) {
        fs.delete(file_name_settings_hdfs+"_new") 
    }

    filteredRdd.coalesce(1).saveAsTextFile(file_name_dir_settings+"_new")   
}

update_list_path_location_on_worker()



//METHODOLAGY:
// The basic idea:
//put the executable into HDFS  and use addFile to add it into driver, 
// which will also copy them into workers. 
//Execute(use SparkFiles.get to get the path from the work executor) to that partition using Process.
//To control the input partitionaing: 
//Data partitioning file as a RDD, and use mapPartitionsWithIndex function to save each partition (????)



sc_spark.addFile(file_name_dir)
sc_spark.addFile(file_name_dir_settings+ "_new/part-00000") //ugly

val sparkScriptPath: String =  org.apache.spark.SparkFiles.get(file_name)
val sparkSettingsPath: String =  org.apache.spark.SparkFiles.get("part-00000")

// //Execute the external system call
val exitCode = Seq(sparkScriptPath,sparkSettingsPath, "1").!

//TODO: How to have access from each of the workers to the .img files on HDFS ??
//Transport each file to the worker with .addFile ?


The file hdfs:///user/hadoop/TSF_process.x64 is found! :))
The file hdfs:/user/hadoop/200_200_px_USA_cluster.set is found! :))
/data/local/spark/tmp/spark-a2a6ed92-c91a-42e5-ab1f-584f005b9b01/userFiles-09308da0-35dc-4164-bd0a-b37d0d424897/listfile.txt ------------------------------------------------------------------------
   TSF_process
   Program for processing time-series data from images or ASCII files
   Arguments: settings_file no_of_processors
 
   TIMESAT version 3.3                         
   Copyright Per Jonsson and Lars Eklundh           
   per.jonsson@mah.se, lars.eklundh@nateko.lu.se    
   Feb. 2017                                         
 ------------------------------------------------------------------------
  Error opening image file
 hdfs:///user/hadoop/Img_only/g2_BIOPAR_NDVI_201406010000_NOAM_PROBAV_V2.1_NDVI.
 img                                                                            
                                                                                
                                                                
_dir_path_hdfs = hdfs:///user/hadoop/
file_name = TSF_process.x64
file_name_dir = hdfs:///user/hadoop/TSF_process.x64
file_name_dir_hdfs = hdfs:/user/hadoop/TSF_process.x64
file_name_settings = 200_200_px_USA_cluster.set
file_name_dir_settings = hdfs:///user/hadoop/200_200_px_USA_cluster.set
file_name_settings_hdfs = hdfs:/user/hadoop/200_200_px_USA_cluster.set
file_name_list_files = listfile.txt
file_name_dir_list_files = hdfs:///user/hadoop/Img_only/listfile.txt
file_name_list_hdfs = hdfs:/user/hadoop/Img_only/listfile.txt
conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml,...
warning: there was one deprecation warning; re-run with -deprecation for details
Out[17]:
Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, file:/usr/lib/spark-2.1.1-bin-without-hadoop/conf/hive-site.xml

In [ ]:


In [ ]:


In [ ]: