In [ ]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--conf spark.cassandra.connection.host=cassandra --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 pyspark-shell'
In [ ]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
In [ ]:
sc = SparkContext(appName="BigDataRiver")
sc.setLogLevel("WARN")
sc.setCheckpointDir('checkpoint/')
ssc = StreamingContext(sc, 10)
sql = SQLContext(sc)
In [ ]:
kafkaStream = KafkaUtils.createDirectStream(ssc, ['bdr'], {"metadata.broker.list": 'kafka:9092'})
In [ ]:
parsed = kafkaStream.map(lambda v: v[1])
In [ ]:
def usersWhoBoughtXAlsoBought(df):
productDf = df.select('user_id', 'product')
otherProductDf = productDf.toDF('user_id', 'other_product')
matchedProductsDf = productDf.join(otherProductDf, otherProductDf['user_id'] == productDf['user_id'], 'inner').\
filter("`product` != `other_product`").select('product','other_product').\
groupby('product','other_product').count().toDF("product","other_product","count")
return matchedProductsDf
In [ ]:
def selectTopProducts(df):
df.registerTempTable("products")
topProductsDf = sql.sql("""
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY product ORDER BY count DESC) rn
FROM products
""").where("rn <= 5").groupBy("product").agg(F.collect_list("other_product").alias("other_products"))
return topProductsDf
In [ ]:
def processStream(rdd):
df = sql.read.json(rdd)
if(len(df.columns)):
productDf = df.select('user_id', 'product').cache()
selectTopProducts(usersWhoBoughtXAlsoBought(productDf)).\
write.format("org.apache.spark.sql.cassandra").\
mode('append').options(table="top_other_products_stream", keyspace="bdr").save()
#store all user products for the batch process
productDf.write.format("org.apache.spark.sql.cassandra").\
mode('append').options(table="all_user_products", keyspace="bdr").save()
print "Done"
else:
print "Empty"
In [ ]:
parsed.foreachRDD(lambda rdd: processStream(rdd))
In [ ]:
ssc.start()
ssc.awaitTermination()
In [ ]: