In [2]:
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._ 
import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset

//Read the data
val sqlContext = spark.sqlContext
val data = sc.textFile("titanic.txt")
val transactions: RDD[Array[String]] = data.map(s => s.trim.split('\t'))

//Classes to hold the rules/items
case class Rule(consequent: String, antecedent: String, confidence: Double)
case class Item(name: String, freq:Long)

//Calculate the itemsets and send results to a DataFrame
val fpg = new FPGrowth().setMinSupport(0.005)
val model = fpg.run(transactions)
val items = model.freqItemsets.map(p => Item(p.items.mkString("[", ",", "]"), p.freq) )
val itemsDF = sqlContext.createDataFrame(items)                                                              

//Calcluated the association rules
val minConfidence = 0.2
val rules = model.generateAssociationRules(minConfidence).map(p => Rule(p.antecedent.mkString("[", ",", "]"), p.consequent.mkString("[", ",", "]"), p.confidence))
val rulesDF = sqlContext.createDataFrame(rules)

//Filter rules so they only contain consequents with survived yes/no
val filteredRules = rulesDF.filter("consequent in ('[Yes]','[No]')")

//Filter the item set to contain only the filtered antecedents & consequents
val c = filteredRules.select("consequent").distinct.select("consequent").rdd.map(r=> r(0)).collect()
val a = filteredRules.select("antecedent").distinct.select("antecedent").rdd.map(r=> r(0)).collect()
val combined = a ++ c
val filteredItems = itemsDF.filter(itemsDF.col("name").isin(combined:_*))


[Stage 7:==========================================>            (154 + 4) / 200]
sqlContext = org.apache.spark.sql.SQLContext@6248ac29
data = titanic.txt MapPartitionsRDD[1] at textFile at <console>:30
transactions = MapPartitionsRDD[2] at map at <console>:31
defined class Rule
defined class Item
fpg = org.apache.spark.mllib.fpm.FPG...
Out[2]:
org.apache.spark.mllib.fpm.FPGrowth@57f04d0f

In [5]:
%%brunel data('filteredRules') 
         edge key(antecedent, consequent) opacity(confidence) tooltip(#all)  + 
         network data('filteredItems') key(name) label(name) tooltip(freq)  size(freq) style('* {font-size: 7pt}') 
          ::
width=400, height=400


                'topojson' : '//cdnjs.cloudflare.com/ajax/libs/topo...
Out[5]:

In [ ]: