In [ ]:
%dep
z.load("alvsanand:spark-generic-connector:0.2.0-spark_2x-s_2.11")
In [ ]:
import es.alvsanand.sgc.ftp.{FTPCredentials, FTPSlot}
import es.alvsanand.sgc.ftp.normal.{FTPSgcConnectorFactory, FTPParameters}
import java.text.SimpleDateFormat
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.sgc._
In [ ]:
val parameters = SFTPParameters("HOST", PORT, "DIRECTORY", FTPCredentials("USER", Option("PASSWORD"))
In [ ]:
val parameters = SFTPParameters("HOST", PORT, "DIRECTORY", FTPCredentials("USER", Option("PASSWORD"))
val range = None
In [ ]:
val dt: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = dt.parse("2016-01-01 00:00:00")
val range = Option(SgcRange(date))
In [ ]:
val dt: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = dt.parse("2016-01-01 00:00:00")
val range = Option(SgcRange(date, Seq("/files/example_20161201_1.txt", "/files/example_20161201_2.txt"))
In [ ]:
val batchTime = Seconds(60)
val checkpointTime = Seconds(5 * 60)
val checkPointDirectory = "/tmp/CHECK_POINT_DIRECTORY"
val ssc = StreamingContext.getOrCreate(checkPointDirectory, () => {
val ssc = new StreamingContext(sc, batchTime)
val ds = ssc.createSgcInputDStream(FTPSgcConnectorFactory, parameters, range)
ds.checkpoint(checkpointTime)
ssc.checkpoint(checkPointDirectory)
ds.foreachRDD { rdd =>
rdd.saveAsTextFile("hdfs://...")
}
ssc
})
In [ ]:
ssc.start()