In [1]:
import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
import org.apache.kafka.common.serialization.{BytesDeserializer, StringDeserializer}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.sql.types.StructType
import collection.JavaConverters.mapAsJavaMapConverter
In [19]:
val ssc = new StreamingContext(sc, Seconds(1))
ssc.remember(Minutes(1))
In [20]:
val consumerParams = Map[String, Object](
"bootstrap.servers" -> "kafka:9092",
"key.deserializer" -> classOf[BytesDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark-notebook",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("android")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, consumerParams)
)
In [4]:
val schema = new StructType().add("x", "float").add("y", "float").add("z", "float").add("timestamp", "long")
In [21]:
stream.foreachRDD { rdd =>
spark.read.schema(schema).json(rdd.map(_.value())).createOrReplaceTempView("locations")
spark.sql("select avg(x) as x, avg(y) as y, avg(z) as z, min(timestamp) as timestamp from locations").toJSON.foreachPartition {
partition =>
val producerParams = Map[String, Object](
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer"
)
val producer = new KafkaProducer[String, String](producerParams.asJava)
partition.foreach { s =>
if (s != "{}")
producer.send(new ProducerRecord[String, String]("acceleration", s))
}
producer.close()
}
}
In [22]:
ssc.start()
In [23]:
%%SQL
select * from locations
Out[23]:
In [18]:
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }