Acceleration Example

Imports


In [1]:
import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
import org.apache.kafka.common.serialization.{BytesDeserializer, StringDeserializer}
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.sql.types.StructType
import collection.JavaConverters.mapAsJavaMapConverter

Create Streaming Context


In [19]:
val ssc = new StreamingContext(sc, Seconds(1))
ssc.remember(Minutes(1))

Setup Kafka input stream


In [20]:
val consumerParams = Map[String, Object](
  "bootstrap.servers" -> "kafka:9092",
  "key.deserializer" -> classOf[BytesDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-notebook",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("android")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, consumerParams)
)

Expected Input Schema


In [4]:
val schema = new StructType().add("x", "float").add("y", "float").add("z", "float").add("timestamp", "long")

Stream Processing


In [21]:
stream.foreachRDD { rdd =>
  spark.read.schema(schema).json(rdd.map(_.value())).createOrReplaceTempView("locations")
  spark.sql("select avg(x) as x, avg(y) as y, avg(z) as z, min(timestamp) as timestamp from locations").toJSON.foreachPartition {
    partition =>

      val producerParams = Map[String, Object](
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "kafka:9092",
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer"
      )
 
      val producer = new KafkaProducer[String, String](producerParams.asJava)
      
      partition.foreach { s =>
        if (s != "{}")
            producer.send(new ProducerRecord[String, String]("acceleration", s))
      }
      
      producer.close()
  }
}

Start stream


In [22]:
ssc.start()

Lets see what we really read


In [23]:
%%SQL
select * from locations


Out[23]:
+------------+-----------+---------+-------------+
|           x|          y|        z|    timestamp|
+------------+-----------+---------+-------------+
|  -0.0309906|-0.20075989| 9.753952|1480415975382|
| -0.06072998| -0.1966095| 9.986816|1480415975446|
|-0.028884888|-0.18727112| 10.02356|1480415975510|
|-0.042907715|-0.19052124|9.9123535|1480415975575|
|-0.024627686|-0.20962524| 9.925446|1480415975639|
|-0.021408081|-0.20065308| 9.945801|1480415975703|
|-0.026153564| -0.1612091| 9.930786|1480415975767|
|-0.062347412|-0.17887878| 9.888138|1480415975832|
| -0.04194641|-0.18881226| 9.864151|1480415975896|
|-0.072143555|-0.20272827|  9.95253|1480415975964|
+------------+-----------+---------+-------------+
only showing top 10 rows

Stop stream


In [18]:
StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) }

Verify the contents in Kafka using the console consumer

The following command line tools can help print the contents to the console.

./bin/kafka-console-consumer.sh --topic acceleration --bootstrap-server localhost:9092