This notebook receives data from Kafka on the topic 'test', and stores it in the 'time_test' table of Cassandra (created by cassandra_init.script in startup_script.sh).
CREATE KEYSPACE test_time WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};
CREATE TABLE test_time.sent_received(
time_sent TEXT,
time_received TEXT,
PRIMARY KEY (time_sent)
);
A message that gives the current time is received every second.
In [ ]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4040 --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0,com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3 pyspark-shell'
import time
In [ ]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
conf = SparkConf() \
.setAppName("Streaming test") \
.setMaster("local[2]") \
.set("spark.cassandra.connection.host", "127.0.0.1")
sc = SparkContext(conf=conf)
sqlContext=SQLContext(sc)
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
In [ ]:
def saveToCassandra(rows):
if not rows.isEmpty():
sqlContext.createDataFrame(rows).write\
.format("org.apache.spark.sql.cassandra")\
.mode('append')\
.options(table="sent_received", keyspace="test_time")\
.save()
In [ ]:
ssc = StreamingContext(sc, 5)
kvs = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {'test': 1})
data = kvs.map(lambda x: x[1])
rows= data.map(lambda x:Row(time_sent=x,time_received=time.strftime("%Y-%m-%d %H:%M:%S")))
rows.foreachRDD(saveToCassandra)
rows.pprint()
In [ ]:
ssc.start()
In [ ]:
ssc.stop(stopSparkContext=False,stopGraceFully=True)
In [ ]:
data=sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="sent_received", keyspace="test_time")\
.load()
data.show()
In [ ]:
data.registerTempTable("sent_received");
data.printSchema()
data=sqlContext.sql("select * from sent_received")
data.show()
In [ ]: