Scala Example for FTP server

This is an example of how to use Generic Connector with Apache Spark in order to process files stored in a FTP server directory.

Load dependencies for Apache Spark 2.x


In [ ]:
%dep

z.load("alvsanand:spark-generic-connector:0.2.0-spark_2x-s_2.11")

Import dependencies


In [ ]:
import es.alvsanand.sgc.ftp.{FTPCredentials, FTPSlot}
import es.alvsanand.sgc.ftp.normal.{FTPSgcConnectorFactory, FTPParameters}
import java.text.SimpleDateFormat
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.sgc._

Create the SgcConnectorParameters with the desired parameters


In [ ]:
val parameters = SFTPParameters("HOST", PORT, "DIRECTORY", FTPCredentials("USER", Option("PASSWORD"))

Create a SgcRange for filtering files:

  • Process since the benining:

In [ ]:
val parameters = SFTPParameters("HOST", PORT, "DIRECTORY", FTPCredentials("USER", Option("PASSWORD"))

val range = None
  • Process since a specific date:

In [ ]:
val dt: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = dt.parse("2016-01-01 00:00:00")

val range = Option(SgcRange(date))
  • Process since a specific date and skipping somefiles:

In [ ]:
val dt: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = dt.parse("2016-01-01 00:00:00")

val range = Option(SgcRange(date, Seq("/files/example_20161201_1.txt", "/files/example_20161201_2.txt"))

Create the StreamingContext passing the SgcConnectorFactory and the parameters


In [ ]:
val batchTime = Seconds(60)
val checkpointTime = Seconds(5 * 60)

val checkPointDirectory = "/tmp/CHECK_POINT_DIRECTORY"

val ssc = StreamingContext.getOrCreate(checkPointDirectory, () => {
      val ssc = new StreamingContext(sc, batchTime)

      val ds = ssc.createSgcInputDStream(FTPSgcConnectorFactory, parameters, range)

      ds.checkpoint(checkpointTime)

      ssc.checkpoint(checkPointDirectory)

      ds.foreachRDD { rdd =>
        rdd.saveAsTextFile("hdfs://...")
      }

      ssc
    })

Start the StreamingContext


In [ ]:
ssc.start()