In [172]:
import $exclude.`org.slf4j:slf4j-log4j12`, $ivy.`org.slf4j:slf4j-nop:1.7.21` // for cleaner logs
//import $profile.`hadoop-2.6`
import $ivy.`org.apache.spark::spark-sql:2.1.0` // adjust spark version - spark >= 2.0
//import $ivy.`org.apache.hadoop:hadoop-aws:2.6.4`
import $ivy.`org.jupyter-scala::spark:0.4.2` // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)
import org.apache.spark._
import org.apache.spark.sql._
import jupyter.spark.session._
import Resolvers._
Out[172]:
In [173]:
import $ivy.`org.vegas-viz::vegas:0.3.9`
import vegas._
import vegas.data.External._
Out[173]:
In [174]:
interp.resolvers() = interp.resolvers() :+ Resolver.Http(
"isarn project",
"https://dl.bintray.com/isarn/maven/",
MavenPattern,
true
)
// resolvers += "Will's bintray" at "https://dl.bintray.com/willb/maven/"
interp.resolvers() = interp.resolvers() :+ Resolver.Http(
"Will Benton",
"https://dl.bintray.com/willb/maven/",
MavenPattern,
true
)
In [175]:
import $ivy.`org.isarnproject::isarn-sketches:0.0.3.rc1` // "org.isarnproject" %% "isarn-sketches" % "0.0.2"
import org.isarnproject.sketches._
Out[175]:
In [176]:
import $ivy.`com.redhat.et::silex:0.1.2` // "com.redhat.et" %% "silex" % "0.1.2"
Out[176]:
In [177]:
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.classification.{LogisticRegressionModel, LogisticRegressionWithLBFGS}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import com.redhat.et.silex.util.vectors.implicits._
import com.redhat.et.silex.cluster.ClusteringTreeModel
import ClusteringTreeModel._
Out[177]:
In [178]:
val spark = JupyterSparkSession.builder() // important - call this rather than SparkSession.builder()
.jupyter() // this method must be called straightaway after builder()
.master("spark://frclust:7077")
.appName("notebook")
.getOrCreate()
Out[178]:
In [179]:
scala.util.Random.setSeed(23571113)
In [180]:
def timing[T](block: => T) = {
val t0 = System.currentTimeMillis
val r = block
val t = (System.currentTimeMillis - t0).toDouble / 1000.0
println(s"time: $t")
r
}
Out[180]:
In [181]:
implicit class TDEnhance(td: TDigest) extends Serializable {
def toXY: Vector[(Double, Double)] = {
val q = (0.001 +: (0.05 to 0.95 by 0.05) :+ 0.999).toVector
q.map(q => (td.cdfInverse(q), q))
}
}
Out[181]:
In [182]:
val gaussianData = Vector.fill(1000000) { scala.util.Random.nextGaussian() }
val gaussianRDD = spark.sparkContext.parallelize(gaussianData, 10)
Out[182]:
In [183]:
val gaussianSketch = gaussianRDD.aggregate(TDigest.empty())(
(td, x) => td + x,
(td1, td2) => td1 ++ td2
)
Out[183]:
In [184]:
Vegas("CDF")
.withXY(gaussianSketch.toXY)
.encodeX("x", Quant)
.encodeY("y", Quant)
.mark(Line)
.configCell(width=600, height=300)
.show
In [185]:
def loadCSV(fname: String) = spark.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load(fname)
Out[185]:
In [186]:
val trainCSV = loadCSV("/data/tox21/tox21_dense_train.csv")
val testCSV = loadCSV("/data/tox21/tox21_dense_test.csv")
val trainLabelsCSV = loadCSV("/data/tox21/tox21_labels_train.csv")
val testLabelsCSV = loadCSV("/data/tox21/tox21_labels_test.csv")
Out[186]:
In [187]:
def tdSketchFV(fv: RDD[Vector[Double]]): Vector[TDigest] = {
val tds = fv.aggregate(Array.empty[TDigest])(
(tdv, xv) => {
if (tdv.isEmpty) {
Array.tabulate(xv.length) { j => TDigest.empty(maxDiscrete=50) + xv(j) }
} else {
for { j <- 0 until xv.length } { tdv(j) += xv(j) }
tdv
}
},
(tdv1, tdv2) => {
if (tdv1.isEmpty) tdv2 else {
for { j <- 0 until tdv1.length } { tdv1(j) ++= tdv2(j) }
tdv1
}
}
)
tds.toVector
}
Out[187]:
In [188]:
def synthesize(tdVec: Vector[TDigest], n: Int, partitions: Int = 2, seed: Int=scala.util.Random.nextInt) = {
implicit class AddSampling(td: TDigest) {
import org.isarnproject.collections.mixmaps.nearest.Cover
def cdfDiscreteInverse(q: Double): Double = {
td.clusters.mCover(q * td.clusters.sum).map(n => (n.data.key, n.data.value)) match {
case Cover(_, Some((x, _))) => x
case Cover(Some((x, _)), None) => x
}
}
def sample: Double = {
val clust = td.clusters
td.nclusters match {
case 0 => 0.0
case 1 => clust(clust.keyMin.get)
case n if (n <= td.maxDiscrete) => cdfDiscreteInverse(scala.util.Random.nextDouble)
case _ => td.cdfInverse(scala.util.Random.nextDouble)
}
}
}
val tdVecBC = spark.sparkContext.broadcast(tdVec)
spark.sparkContext.parallelize(1 to n, partitions).map { r =>
scala.util.Random.setSeed(r + (r * seed))
tdVecBC.value.map(_.sample)
}
}
Out[188]:
In [189]:
val trainFV = trainCSV.rdd.map(_.toSeq.toVector.drop(1).map(_.asInstanceOf[String].toDouble))
val fvSketches = tdSketchFV(trainFV)
Out[189]:
In [190]:
val synthFV = synthesize(fvSketches, 60000, 4)
Out[190]:
In [191]:
val trainLab = trainFV.map(_.toLabeledPoint(1.0))
val synthLab = synthFV.map(_.toLabeledPoint(0.0))
val trainFR = (trainLab ++ synthLab).cache()
Out[191]:
In [192]:
def frClassifier = {
val rfFR = RandomForest.trainClassifier(
trainFR, // training data
2, // number of classes
Map.empty[Int, Int], // category info
100, // forest size
"auto", //
"gini", // split quality measure
10, // max depth
20, // max bins
235711) // random seed
val predictionAndLabels = trainLab.map { case LabeledPoint(label, features) =>
val prediction = rfFR.predict(features)
(prediction, label)
}
val metrics = new MulticlassMetrics(predictionAndLabels)
(rfFR, metrics)
}
Out[192]:
In [193]:
val (rfFR, metrics) = frClassifier
metrics.accuracy
Out[193]:
In [194]:
val frRules = rfFR.trees.map(_.rules(trainCSV.columns.toVector.drop(1), Map.empty[Int, Int]))
Out[194]:
In [195]:
val featPerTree = for {
rmap <- frRules
feats = {
val fraw = for {
vrules <- rmap.values
plist <- vrules
pred <- plist
} yield { pred.feature }
fraw.toSet.toSeq
}
f <- feats
} yield { f }
val featCounts = featPerTree.foldLeft(Map.empty[String, Int])((m, f) => m + (f -> (1 + m.getOrElse(f, 0))))
val featHist = featCounts.toSeq.sortBy { case (_, n) => -n}.toVector
println(featHist.mkString("\n"))
Out[195]:
In [196]:
val featSelect = featHist.filter { case (_, n) => n > 1 }.map { case (f, _) => f }
featSelect.length
featHist.length
Out[196]:
In [197]:
val trainCSVFR = trainCSV.select("_c0", featSelect :_*)
val testCSVFR = testCSV.select("_c0", featSelect :_*)
Out[197]:
In [198]:
println(trainLabelsCSV.columns.mkString(", "))
In [199]:
def csvToPairs(csv: DataFrame) =
csv.rdd.map(_.toSeq)
.map(v => (v.head.asInstanceOf[String], v.tail.toVector.map(_.asInstanceOf[String].toDouble)))
def lpFR(tlab: String, labCSV: DataFrame, fvCSV: DataFrame) = {
val labCol = labCSV.select("_c0", tlab)
val labPairs = labCol.filter(labCol(tlab) =!= "NA")
.rdd.map(_.toSeq).map { s => (s(0).asInstanceOf[String], s(1).asInstanceOf[String].toDouble) }
val fvPairs = csvToPairs(fvCSV)
val lp = labPairs.join(fvPairs).map { case (_, (lab, feats)) => LabeledPoint(lab, feats.toSpark) }
lp
}
Out[199]:
In [200]:
case class TrainTestSets(
trainAll: RDD[LabeledPoint], testAll: RDD[LabeledPoint],
trainFR: RDD[LabeledPoint], testFR: RDD[LabeledPoint]
)
val trainTestSets = trainLabelsCSV.columns.tail.map { acol => (acol, s"`$acol`")}
.foldLeft(Map.empty[String,TrainTestSets]) { case (mp, (acolRaw, acol)) =>
mp + (acolRaw -> TrainTestSets(
lpFR(acol, trainLabelsCSV, trainCSV).cache(), lpFR(acol, testLabelsCSV, testCSV).cache(),
lpFR(acol, trainLabelsCSV, trainCSVFR).cache(), lpFR(acol, testLabelsCSV, testCSVFR.cache())
))
}
Out[200]:
In [201]:
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
def trainLRModel(train: RDD[LabeledPoint], test: RDD[LabeledPoint]) = {
val lrModel = new LogisticRegressionWithLBFGS().run(train)
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = lrModel.predict(features)
(prediction, label)
}
val metrics = new MulticlassMetrics(predictionAndLabels)
(lrModel, predictionAndLabels, metrics)
}
def trainSVMModel(train: RDD[LabeledPoint], test: RDD[LabeledPoint]) = {
val model = new SVMWithSGD().run(train)
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
val metrics = new MulticlassMetrics(predictionAndLabels)
(model, predictionAndLabels, metrics)
}
def trainGBModel(train: RDD[LabeledPoint], test: RDD[LabeledPoint]) = {
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.numIterations = 10
boostingStrategy.treeStrategy.numClasses = 2
boostingStrategy.treeStrategy.maxDepth = 6
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(train, boostingStrategy)
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
val metrics = new MulticlassMetrics(predictionAndLabels)
(model, predictionAndLabels, metrics)
}
Out[201]:
In [202]:
def comparePlot(data: Array[Map[String, Any]]) = {
val ffn = fvSketches.length
val rfn = featSelect.length
val expr = s""" datum.feats < $ffn ? (datum.accuracy >= datum.alt ? "Reduced ($rfn) >=" : "Reduced ($rfn) <") : "Full ($ffn)" """
Vegas().
withData(data).
mark(Bar).
addTransformCalculation("features", expr).
encodeColumn("assay", Nominal, scale=Scale(padding=20.0),
axis=Axis(title="tox assay", orient=Orient.Bottom, axisWidth=1.0, labelAngle=45,labelAlign="bottom",offset= -5)).
encodeY("accuracy", Quantitative, axis=Axis(title="model accuracy", grid=false)).
encodeX("feats", Nominal, scale=Scale(bandSize = 15), hideAxis=true).
encodeColor("features", Nominal, scale=Scale(rangeNominals=List("#e06118", "#99c69c", "#2841ff"))).
configFacet(cell=CellConfig(strokeWidth = 0)).
show
}
Out[202]:
In [203]:
val allvsFR = trainLabelsCSV.columns.tail
.flatMap { acol =>
val TrainTestSets(trainAll, testAll, trainFR, testFR) = trainTestSets(acol)
val (modelAll, predLabAll, metricsAll) = trainLRModel(trainAll, testAll)
val (modelFR, predLabFR, metricsFR) = trainLRModel(trainFR, testFR)
Seq(
Map(
"assay" -> acol,
"feats" -> fvSketches.length,
"accuracy" -> metricsAll.accuracy,
"alt" -> metricsFR.accuracy
),
Map(
"assay" -> acol,
"feats" -> featSelect.length,
"accuracy" -> metricsFR.accuracy,
"alt" -> metricsAll.accuracy
)
)
}
comparePlot(allvsFR)
Out[203]:
In [204]:
val allvsFRSVM = trainLabelsCSV.columns.tail
.flatMap { acol =>
val TrainTestSets(trainAll, testAll, trainFR, testFR) = trainTestSets(acol)
val (modelAll, predLabAll, metricsAll) = trainSVMModel(trainAll, testAll)
val (modelFR, predLabFR, metricsFR) = trainSVMModel(trainFR, testFR)
Seq(
Map(
"assay" -> acol,
"feats" -> fvSketches.length,
"accuracy" -> metricsAll.accuracy,
"alt" -> metricsFR.accuracy
),
Map(
"assay" -> acol,
"feats" -> featSelect.length,
"accuracy" -> metricsFR.accuracy,
"alt" -> metricsAll.accuracy
)
)
}
comparePlot(allvsFRSVM)
Out[204]:
In [205]:
val allvsFRGB = trainLabelsCSV.columns.tail
.flatMap { acol =>
val TrainTestSets(trainAll, testAll, trainFR, testFR) = trainTestSets(acol)
val (modelAll, predLabAll, metricsAll) = trainGBModel(trainAll, testAll)
val (modelFR, predLabFR, metricsFR) = trainGBModel(trainFR, testFR)
Seq(
Map(
"assay" -> acol,
"feats" -> fvSketches.length,
"accuracy" -> metricsAll.accuracy,
"alt" -> metricsFR.accuracy
),
Map(
"assay" -> acol,
"feats" -> featSelect.length,
"accuracy" -> metricsFR.accuracy,
"alt" -> metricsAll.accuracy
)
)
}
comparePlot(allvsFRGB)
Out[205]:
In [206]:
import org.apache.spark.mllib.classification.SVMModel
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel
Out[206]:
In [207]:
val modelsFullLR = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,LogisticRegressionModel]) { case (mp, acol) => {
val TrainTestSets(train, _, _, _) = trainTestSets(acol)
mp + (acol -> new LogisticRegressionWithLBFGS().run(train))
}}
}
val modelsRedLR = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,LogisticRegressionModel]) { case (mp, acol) => {
val TrainTestSets(_, _, train, _) = trainTestSets(acol)
mp + (acol -> new LogisticRegressionWithLBFGS().run(train))
}}
}
Out[207]:
In [208]:
val modelsFullSVM = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,SVMModel]) { case (mp, acol) => {
val TrainTestSets(train, _, _, _) = trainTestSets(acol)
mp + (acol -> new SVMWithSGD().run(train))
}}
}
val modelsRedSVM = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,SVMModel]) { case (mp, acol) => {
val TrainTestSets(_, _, train, _) = trainTestSets(acol)
mp + (acol -> new SVMWithSGD().run(train))
}}
}
Out[208]:
In [209]:
val modelsFullGB = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,GradientBoostedTreesModel]) { case (mp, acol) => {
val TrainTestSets(train, _, _, _) = trainTestSets(acol)
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.numIterations = 10
boostingStrategy.treeStrategy.numClasses = 2
boostingStrategy.treeStrategy.maxDepth = 6
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(train, boostingStrategy)
mp + (acol -> model)
}}
}
val modelsRedGB = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,GradientBoostedTreesModel]) { case (mp, acol) => {
val TrainTestSets(_, _, train, _) = trainTestSets(acol)
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.numIterations = 10
boostingStrategy.treeStrategy.numClasses = 2
boostingStrategy.treeStrategy.maxDepth = 6
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
val model = GradientBoostedTrees.train(train, boostingStrategy)
mp + (acol -> model)
}}
}
Out[209]:
In [210]:
val synthFull = synthesize(fvSketches, 40000, 1).map(_.toSpark).cache()
Out[210]:
In [211]:
val trainRedFV = trainCSVFR.rdd.map(_.toSeq.toVector.drop(1).map(_.asInstanceOf[String].toDouble))
val sketchRed = tdSketchFV(trainRedFV)
val synthRed = synthesize(sketchRed, 40000, 1).map(_.toSpark).cache()
Out[211]:
In [212]:
val evalFullLR = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
val data = synthFull
val model = modelsFullLR(acol)
val pred = data.map { features => model.predict(features) }
val force = pred.count
mp + (acol -> pred)
}}
}
val evalRedLR = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
val data = synthRed
val model = modelsRedLR(acol)
val pred = data.map { features => model.predict(features) }
val force = pred.count
mp + (acol -> pred)
}}
}
Out[212]:
In [213]:
val evalFullSVM = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
val data = synthFull
val model = modelsFullSVM(acol)
val pred = data.map { features => model.predict(features) }
val force = pred.count
mp + (acol -> pred)
}}
}
val evalRedSVM = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
val data = synthRed
val model = modelsRedSVM(acol)
val pred = data.map { features => model.predict(features) }
val force = pred.count
mp + (acol -> pred)
}}
}
Out[213]:
In [214]:
val evalFullGB = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
val data = synthFull
val model = modelsFullGB(acol)
val pred = data.map { features => model.predict(features) }
val force = pred.count
mp + (acol -> pred)
}}
}
val evalRedGB = timing {
trainLabelsCSV.columns.tail
.foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
val data = synthRed
val model = modelsRedGB(acol)
val pred = data.map { features => model.predict(features) }
val force = pred.count
mp + (acol -> pred)
}}
}
Out[214]: