This notebook takes input from HDInsight Kafka, which in turn takes input from Telegraf, and sends the metrics to Cosmos DB

Note that cosmosdb spark connector must be a uber jar located in HDFS as shown below, the one in Maven repo doesn't have all the dependencies.


In [ ]:
%%configure
{   
  "executorCores": 2, 
  "driverMemory" : "2G", 
  "jars": ["/path/to/azure-cosmosdb-spark_2.3.0_2.11-1.2.2-uber.jar"],
  "conf": {"spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0",
           "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
          }
}

In [ ]:
val kafkaBrokers="host1:9092,host2:9092..."
val kafkaTopic="telegraf"

In [ ]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._

In [ ]:
/*
{
"fields":{
    "usage_guest":0,
    "usage_guest_nice":0,
    "usage_idle":97.28643216079983,
    "usage_iowait":1.4070351758792998,
    "usage_irq":0,
    "usage_nice":0,
    "usage_softirq":0,
    "usage_steal":0,
    "usage_system":0.40201005025121833,
    "usage_user":0.9045226130652948},
"name":"cpu",
"tags":{
    "cpu":"cpu0",
    "host":"pliukafkawus2"},
"timestamp":1534985650
}
*/

val payloadSchema = new StructType().
      add("fields", StringType).
      add("name", StringType).
      add("tags",StringType).
      add("timestamp",TimestampType)

val df = spark.
  readStream.
  format("kafka").
  option("kafka.bootstrap.servers", kafkaBrokers).
  option("subscribe", kafkaTopic).
  load

val payloaddf = df.
  select(from_json($"value".cast(StringType), payloadSchema).alias("payload")).
  select($"payload.timestamp".cast(StringType).alias("ts"), //throws error if timestamp is not cast to string
         get_json_object($"payload.fields", "$.usage_idle").alias("usage_idle"),
         get_json_object($"payload.fields", "$.usage_iowait").alias("usage_iowait"),
         get_json_object($"payload.fields", "$.usage_system").alias("usage_system"),
         get_json_object($"payload.fields", "$.usage_user").alias("usage_user"))

/*
val query = payloaddf.
  writeStream.
  format("console").
  start
*/

In [ ]:
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider
import com.microsoft.azure.cosmosdb.spark.config.Config

In [ ]:
val cosmosdbEndpoint = "https://{cosmosdb_account}.documents.azure.com:443/"
val cosmosdbMasterKey = "{cosmosdb_account_key}"
val cosmosdbDatabase = "metricdb"
val cosmosdbCollection = "metriccollection"

In [ ]:
val configMap = Map(
  "Endpoint" -> cosmosdbEndpoint,
  "Masterkey" -> cosmosdbMasterKey,
  "Database" -> cosmosdbDatabase,
  "Collection" -> cosmosdbCollection)

val query = payloaddf.
  writeStream.
  format(classOf[CosmosDBSinkProvider].getName).
  outputMode("append").
  options(configMap).
  option("checkpointLocation", "/path/to/cosmoscheckpoint").
  start

In [ ]:
//for batch instead of streaming, not yet tested
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

val writeConfig = Config(configMap)
df.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)