In [ ]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--conf spark.cassandra.connection.host=cassandra --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 pyspark-shell'
In [ ]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
In [ ]:
sc = SparkContext(appName="BigDataRiver")
sc.setLogLevel("WARN")
sc.setCheckpointDir('checkpoint/')
ssc = StreamingContext(sc, 60)
sql = SQLContext(sc)
In [ ]:
kafkaStream = KafkaUtils.createDirectStream(ssc, ['bdr'], {"metadata.broker.list": 'kafka:9092'})
In [ ]:
parsed = kafkaStream.map(lambda v: v[1])
In [ ]:
#split is_purchase column into two
separateClicksSchema = StructType([
StructField("purchased_count", LongType(), False),
StructField("clicked_count", LongType(), False)
])
def separateClicks(is_purchase):
return (is_purchase, 1-is_purchase)
separateClicks_udf = F.udf(separateClicks, separateClicksSchema)
In [ ]:
def buildCFModel(train):
def isProductToRating(productCount, clickCount):
return (productCount * 3.0) + clickCount
ratings = train.rdd.\
map(lambda r: Rating(r.user_id, r.product, isProductToRating(r.purchased_count, r.clicked_count)))
rank = 10
numIterations = 20
lambdaFactor = 0.01
alpha = 0.01
seed = 42
return ALS.trainImplicit(ratings, rank, numIterations, alpha, seed=seed)
In [ ]:
def recommendTopProducts(dfModel):
numberOfRecommendationsRequired = 5
rdd = dfModel.recommendProductsForUsers(numberOfRecommendationsRequired)
recommendations = rdd.map(lambda (user,ratings): (user, map(lambda r: r.product, ratings)))
topRecommendationsSchema = StructType([
StructField("user_id", IntegerType(), False),
StructField("recommended_products", ArrayType(IntegerType()), False)
])
return sql.createDataFrame(recommendations, topRecommendationsSchema)
In [ ]:
def processStream(rdd):
df = sql.read.json(rdd)
if(len(df.columns)):
#store updated counters in C*
df.withColumn('c', separateClicks_udf(df['is_purchase'])).\
select("user_id","product","c.purchased_count","c.clicked_count").\
write.format("org.apache.spark.sql.cassandra").mode('append').\
options(table="users_interests", keyspace="bdr").save()
#read all data from C*
usersInterests = sql.read.format("org.apache.spark.sql.cassandra").\
options(table="users_interests", keyspace="bdr").load().cache()
dfModel = buildCFModel(usersInterests.select("user_id","product","clicked_count","purchased_count"))
top5 = recommendTopProducts(dfModel)
top5.show()
top5.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="cf", keyspace="bdr").save()
print "Saved"
else:
print "Empty"
In [ ]:
parsed.foreachRDD(lambda rdd: processStream(rdd))
In [ ]:
ssc.start()
ssc.awaitTermination()
In [ ]: