Scala Example for DoubleClick Data Transfer

This is an example of how to use Generic Connector with Apache Spark in order to process files stored in a DoubleClick Data Transfer bucket.

Load dependencies for Apache Spark 2.x


In [ ]:
%dep

z.load("alvsanand:spark-generic-connector:0.2.0-spark_2x-s_2.11")

Import dependencies


In [ ]:
import org.apache.spark.streaming.sgc._
import es.alvsanand.sgc.google.dcm_data_transfer._
import es.alvsanand.sgc.google.dcm_data_transfer.DataTransferFileTypes.DataTransferFileType

Create the SgcConnectorParameters with the desired parameters

  • Simple configuration:

In [ ]:
val parameters = DataTransferParameters("CREDENTIALS_ZIP_URL", "DATA_TRANSFER_BUCKET_NAME")
  • Filtering file types:

In [ ]:
val parameters = DataTransferParameters("CREDENTIALS_ZIP_URL", "DATA_TRANSFER_BUCKET_NAME",
    Seq[DataTransferFileType](DataTransferFileTypes.ACTIVITY, DataTransferFileTypes.IMPRESSION))

Create the RDD passing the SgcConnectorFactory and the parameters


In [ ]:
val rdd = sc.createSgcRDD(DataTransferSgcConnectorFactory, parameters)

Use the RDD as desired


In [ ]:
rdd.partitions.map(_.asInstanceOf[SgcRDDPartition[DataTransferSlot]].slot)
rdd.take(10).foreach(println)