In [2]:
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
//Read the data
val sqlContext = spark.sqlContext
val data = sc.textFile("titanic.txt")
val transactions: RDD[Array[String]] = data.map(s => s.trim.split('\t'))
//Classes to hold the rules/items
case class Rule(consequent: String, antecedent: String, confidence: Double)
case class Item(name: String, freq:Long)
//Calculate the itemsets and send results to a DataFrame
val fpg = new FPGrowth().setMinSupport(0.005)
val model = fpg.run(transactions)
val items = model.freqItemsets.map(p => Item(p.items.mkString("[", ",", "]"), p.freq) )
val itemsDF = sqlContext.createDataFrame(items)
//Calcluated the association rules
val minConfidence = 0.2
val rules = model.generateAssociationRules(minConfidence).map(p => Rule(p.antecedent.mkString("[", ",", "]"), p.consequent.mkString("[", ",", "]"), p.confidence))
val rulesDF = sqlContext.createDataFrame(rules)
//Filter rules so they only contain consequents with survived yes/no
val filteredRules = rulesDF.filter("consequent in ('[Yes]','[No]')")
//Filter the item set to contain only the filtered antecedents & consequents
val c = filteredRules.select("consequent").distinct.select("consequent").rdd.map(r=> r(0)).collect()
val a = filteredRules.select("antecedent").distinct.select("antecedent").rdd.map(r=> r(0)).collect()
val combined = a ++ c
val filteredItems = itemsDF.filter(itemsDF.col("name").isin(combined:_*))
Out[2]:
In [5]:
%%brunel data('filteredRules')
edge key(antecedent, consequent) opacity(confidence) tooltip(#all) +
network data('filteredItems') key(name) label(name) tooltip(freq) size(freq) style('* {font-size: 7pt}')
::
width=400, height=400
Out[5]:
In [ ]: