In [ ]:
%%configure
{
"executorCores": 2,
"driverMemory" : "2G",
"jars": ["/path/to/azure-cosmosdb-spark_2.3.0_2.11-1.2.2-uber.jar"],
"conf": {"spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0",
"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11"
}
}
In [ ]:
val kafkaBrokers="host1:9092,host2:9092..."
val kafkaTopic="telegraf"
In [ ]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
In [ ]:
/*
{
"fields":{
"usage_guest":0,
"usage_guest_nice":0,
"usage_idle":97.28643216079983,
"usage_iowait":1.4070351758792998,
"usage_irq":0,
"usage_nice":0,
"usage_softirq":0,
"usage_steal":0,
"usage_system":0.40201005025121833,
"usage_user":0.9045226130652948},
"name":"cpu",
"tags":{
"cpu":"cpu0",
"host":"pliukafkawus2"},
"timestamp":1534985650
}
*/
val payloadSchema = new StructType().
add("fields", StringType).
add("name", StringType).
add("tags",StringType).
add("timestamp",TimestampType)
val df = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", kafkaBrokers).
option("subscribe", kafkaTopic).
load
val payloaddf = df.
select(from_json($"value".cast(StringType), payloadSchema).alias("payload")).
select($"payload.timestamp".cast(StringType).alias("ts"), //throws error if timestamp is not cast to string
get_json_object($"payload.fields", "$.usage_idle").alias("usage_idle"),
get_json_object($"payload.fields", "$.usage_iowait").alias("usage_iowait"),
get_json_object($"payload.fields", "$.usage_system").alias("usage_system"),
get_json_object($"payload.fields", "$.usage_user").alias("usage_user"))
/*
val query = payloaddf.
writeStream.
format("console").
start
*/
In [ ]:
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBSinkProvider
import com.microsoft.azure.cosmosdb.spark.config.Config
In [ ]:
val cosmosdbEndpoint = "https://{cosmosdb_account}.documents.azure.com:443/"
val cosmosdbMasterKey = "{cosmosdb_account_key}"
val cosmosdbDatabase = "metricdb"
val cosmosdbCollection = "metriccollection"
In [ ]:
val configMap = Map(
"Endpoint" -> cosmosdbEndpoint,
"Masterkey" -> cosmosdbMasterKey,
"Database" -> cosmosdbDatabase,
"Collection" -> cosmosdbCollection)
val query = payloaddf.
writeStream.
format(classOf[CosmosDBSinkProvider].getName).
outputMode("append").
options(configMap).
option("checkpointLocation", "/path/to/cosmoscheckpoint").
start
In [ ]:
//for batch instead of streaming, not yet tested
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
val writeConfig = Config(configMap)
df.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)