In [ ]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = \
  '--conf spark.cassandra.connection.host=cassandra --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,com.datastax.spark:spark-cassandra-connector_2.11:2.0.2 pyspark-shell'

In [ ]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [ ]:
sc = SparkContext(appName="BigDataRiver")
sc.setLogLevel("WARN")
sc.setCheckpointDir('checkpoint/')
ssc = StreamingContext(sc, 60)
sql = SQLContext(sc)

In [ ]:
kafkaStream = KafkaUtils.createDirectStream(ssc, ['bdr'], {"metadata.broker.list": 'kafka:9092'})

In [ ]:
parsed = kafkaStream.map(lambda v: v[1])

In [ ]:
#split is_purchase column into two
separateClicksSchema = StructType([   
    StructField("purchased_count", LongType(), False),
    StructField("clicked_count", LongType(), False)
])

def separateClicks(is_purchase):
  return (is_purchase, 1-is_purchase)

separateClicks_udf = F.udf(separateClicks, separateClicksSchema)

In [ ]:
def buildCFModel(train):
    def isProductToRating(productCount, clickCount):
        return (productCount * 3.0) + clickCount
    
    ratings = train.rdd.\
        map(lambda r: Rating(r.user_id, r.product, isProductToRating(r.purchased_count, r.clicked_count)))
    rank = 10
    numIterations = 20
    lambdaFactor = 0.01
    alpha = 0.01
    seed = 42
    return ALS.trainImplicit(ratings, rank, numIterations, alpha, seed=seed)

In [ ]:
def recommendTopProducts(dfModel):
        numberOfRecommendationsRequired = 5
        rdd = dfModel.recommendProductsForUsers(numberOfRecommendationsRequired)
        recommendations = rdd.map(lambda (user,ratings): (user, map(lambda r: r.product, ratings)))
        topRecommendationsSchema = StructType([
            StructField("user_id", IntegerType(), False),
            StructField("recommended_products", ArrayType(IntegerType()), False)
        ])
        return sql.createDataFrame(recommendations, topRecommendationsSchema)

In [ ]:
def processStream(rdd):
    df = sql.read.json(rdd)
    if(len(df.columns)):
        #store updated counters in C*
        df.withColumn('c', separateClicks_udf(df['is_purchase'])).\
            select("user_id","product","c.purchased_count","c.clicked_count").\
            write.format("org.apache.spark.sql.cassandra").mode('append').\
            options(table="users_interests", keyspace="bdr").save()
            
        #read all data from C*
        usersInterests = sql.read.format("org.apache.spark.sql.cassandra").\
            options(table="users_interests", keyspace="bdr").load().cache()

        dfModel = buildCFModel(usersInterests.select("user_id","product","clicked_count","purchased_count"))
        top5 = recommendTopProducts(dfModel)
        top5.show()
        top5.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="cf", keyspace="bdr").save()
            
        print "Saved"
    else:
        print "Empty"

In [ ]:
parsed.foreachRDD(lambda rdd: processStream(rdd))

In [ ]:
ssc.start()
ssc.awaitTermination()

In [ ]: