kafkaReceiveDataPy

This notebook receives data from Kafka on the topic 'test', and stores it in the 'time_test' table of Cassandra (created by cassandra_init.script in startup_script.sh).

CREATE KEYSPACE test_time WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

CREATE TABLE test_time.sent_received(
 time_sent TEXT,
 time_received TEXT,
PRIMARY KEY (time_sent)
);

A message that gives the current time is received every second.

Add dependencies


In [ ]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'
import time

Load modules and start SparkContext

Note that SparkContext must be started to effectively load the package dependencies. Two cores are used, since one is needed for running the Kafka receiver.


In [ ]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \
    .setAppName("Streaming test") \
    .setMaster("local[2]") \
    .set("spark.cassandra.connection.host", "127.0.0.1")
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

SaveToCassandra function

Takes a list of tuple (rows) and save to Cassandra


In [ ]:
def saveToCassandra(rows):
    if not rows.isEmpty(): 
        sqlContext.createDataFrame(rows).write\
        .format("org.apache.spark.sql.cassandra")\
        .mode('append')\
        .options(table="sent_received", keyspace="test_time")\
        .save()

Create streaming task

  • Receive data from Kafka 'test' topic every five seconds
  • Get stream content, and add receiving time to each message
  • Save each RDD in the DStream to Cassandra. Also print on screen

In [ ]:
ssc = StreamingContext(sc, 5)
kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test': 1})
data = kvs.map(lambda x: x[1])
rows= data.map(lambda x:Row(time_sent=x,time_received=time.strftime("%Y-%m-%d %H:%M:%S")))
rows.foreachRDD(saveToCassandra)
rows.pprint()

Start streaming


In [ ]:
ssc.start()

Stop streaming


In [ ]:
ssc.stop(stopSparkContext=False,stopGraceFully=True)

Get Cassandra table content


In [ ]:
data=sqlContext.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="sent_received", keyspace="test_time")\
    .load()
data.show()

Get Cassandra table content using SQL


In [ ]:
data.registerTempTable("sent_received");
data.printSchema()
data=sqlContext.sql("select * from sent_received")
data.show()

In [ ]: