In [172]:
import $exclude.`org.slf4j:slf4j-log4j12`, $ivy.`org.slf4j:slf4j-nop:1.7.21` // for cleaner logs
//import $profile.`hadoop-2.6`
import $ivy.`org.apache.spark::spark-sql:2.1.0` // adjust spark version - spark >= 2.0
//import $ivy.`org.apache.hadoop:hadoop-aws:2.6.4`
import $ivy.`org.jupyter-scala::spark:0.4.2` // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)

import org.apache.spark._
import org.apache.spark.sql._
import jupyter.spark.session._
import Resolvers._


Out[172]:
import $exclude.$                        , $ivy.$                            // for cleaner logs
//import $profile.`hadoop-2.6`

import $ivy.$                                   // adjust spark version - spark >= 2.0
//import $ivy.`org.apache.hadoop:hadoop-aws:2.6.4`

import $ivy.$                                // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)


import org.apache.spark._

import org.apache.spark.sql._

import jupyter.spark.session._

import Resolvers._

In [173]:
import $ivy.`org.vegas-viz::vegas:0.3.9`
import vegas._
import vegas.data.External._


Out[173]:
import $ivy.$                           

import vegas._

import vegas.data.External._

In [174]:
interp.resolvers() = interp.resolvers() :+ Resolver.Http(
  "isarn project",
  "https://dl.bintray.com/isarn/maven/",
  MavenPattern,
  true
)
// resolvers += "Will's bintray" at "https://dl.bintray.com/willb/maven/"
interp.resolvers() = interp.resolvers() :+ Resolver.Http(
  "Will Benton",
  "https://dl.bintray.com/willb/maven/",
  MavenPattern,
  true
)

In [175]:
import $ivy.`org.isarnproject::isarn-sketches:0.0.3.rc1` // "org.isarnproject" %% "isarn-sketches" % "0.0.2"
import org.isarnproject.sketches._


Out[175]:
import $ivy.$                                            // "org.isarnproject" %% "isarn-sketches" % "0.0.2"

import org.isarnproject.sketches._

In [176]:
import $ivy.`com.redhat.et::silex:0.1.2` // "com.redhat.et" %% "silex" % "0.1.2"


Out[176]:
import $ivy.$                            // "com.redhat.et" %% "silex" % "0.1.2"

In [177]:
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.classification.{LogisticRegressionModel, LogisticRegressionWithLBFGS}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint

import com.redhat.et.silex.util.vectors.implicits._
import com.redhat.et.silex.cluster.ClusteringTreeModel
import ClusteringTreeModel._


Out[177]:
import org.apache.spark.rdd.RDD

import org.apache.spark.mllib.tree.RandomForest

import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

import org.apache.spark.mllib.classification.{LogisticRegressionModel, LogisticRegressionWithLBFGS}

import org.apache.spark.mllib.evaluation.MulticlassMetrics

import org.apache.spark.mllib.regression.LabeledPoint


import com.redhat.et.silex.util.vectors.implicits._

import com.redhat.et.silex.cluster.ClusteringTreeModel

import ClusteringTreeModel._

In [178]:
val spark = JupyterSparkSession.builder() // important - call this rather than SparkSession.builder()
  .jupyter() // this method must be called straightaway after builder()
  .master("spark://frclust:7077")
  .appName("notebook")
  .getOrCreate()


Out[178]:
spark: SparkSession = org.apache.spark.sql.SparkSession@531a08df

In [179]:
scala.util.Random.setSeed(23571113)

In [180]:
def timing[T](block: => T) = {
    val t0 = System.currentTimeMillis
    val r = block
    val t = (System.currentTimeMillis - t0).toDouble / 1000.0
    println(s"time: $t")
    r
}


Out[180]:
defined function timing

In [181]:
implicit class TDEnhance(td: TDigest) extends Serializable {
  def toXY: Vector[(Double, Double)] = {
    val q = (0.001 +: (0.05 to 0.95 by 0.05) :+ 0.999).toVector
    q.map(q => (td.cdfInverse(q), q))
  }
}


Out[181]:
defined class TDEnhance

In [182]:
val gaussianData = Vector.fill(1000000) { scala.util.Random.nextGaussian() }
val gaussianRDD = spark.sparkContext.parallelize(gaussianData, 10)


Out[182]:
gaussianData: Vector[Double] = Vector(
  -0.22810922233651607,
  0.43570354735682515,
  -0.7910474922935643,
  0.40177382348662527,
  0.29006757451660975,
  0.1993858180454829,
  -0.03775433638357763,
  0.5795731389992561,
  -1.1623629066548575,
  -0.955642175001452,
  -1.7259353951180345,
...
gaussianRDD: RDD[Double] = ParallelCollectionRDD[283913] at parallelize at cmd181.sc:2

In [183]:
val gaussianSketch = gaussianRDD.aggregate(TDigest.empty())(
    (td, x) => td + x,
    (td1, td2) => td1 ++ td2
)


                                                                                
Out[183]:
gaussianSketch: TDigest = TDigest(
  0.5,
  0,
  121,
  Map(
    -4.706082570034547 -> 1.0,
    -4.688055197866202 -> 1.0,
    -4.589157805061891 -> 1.0,
    -4.492289202659431 -> 1.0,
    -4.414671826601608 -> 2.0,
    -4.263151643910318 -> 3.3291125806359894,
    -4.229663029682633 -> 3.7375184983107355,
...

In [184]:
Vegas("CDF")
    .withXY(gaussianSketch.toXY)
    .encodeX("x", Quant)
    .encodeY("y", Quant)
    .mark(Line)
    .configCell(width=600, height=300)
    .show



In [185]:
def loadCSV(fname: String) = spark.read
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .option("mode", "DROPMALFORMED")
  .load(fname)


Out[185]:
defined function loadCSV

In [186]:
val trainCSV = loadCSV("/data/tox21/tox21_dense_train.csv")
val testCSV = loadCSV("/data/tox21/tox21_dense_test.csv")
val trainLabelsCSV = loadCSV("/data/tox21/tox21_labels_train.csv")
val testLabelsCSV = loadCSV("/data/tox21/tox21_labels_test.csv")


Out[186]:
trainCSV: DataFrame = [_c0: string, AW: string ... 800 more fields]
testCSV: DataFrame = [_c0: string, AW: string ... 800 more fields]
trainLabelsCSV: DataFrame = [_c0: string, NR.AhR: string ... 11 more fields]
testLabelsCSV: DataFrame = [_c0: string, NR.AhR: string ... 11 more fields]

In [187]:
def tdSketchFV(fv: RDD[Vector[Double]]): Vector[TDigest] = {
    val tds = fv.aggregate(Array.empty[TDigest])(
        (tdv, xv) => {
            if (tdv.isEmpty) {
                Array.tabulate(xv.length) { j => TDigest.empty(maxDiscrete=50) + xv(j) }
            } else {
                for { j <- 0 until xv.length } { tdv(j) += xv(j) }
                tdv
            }
        },
        (tdv1, tdv2) => {
            if (tdv1.isEmpty) tdv2 else {
                for { j <- 0 until tdv1.length } { tdv1(j) ++= tdv2(j) }
                tdv1
            }
        }
    )
    tds.toVector
}


Out[187]:
defined function tdSketchFV

In [188]:
def synthesize(tdVec: Vector[TDigest], n: Int, partitions: Int = 2, seed: Int=scala.util.Random.nextInt) = {
    implicit class AddSampling(td: TDigest) {
        import org.isarnproject.collections.mixmaps.nearest.Cover
        def cdfDiscreteInverse(q: Double): Double = {
            td.clusters.mCover(q * td.clusters.sum).map(n => (n.data.key, n.data.value)) match {
                case Cover(_, Some((x, _))) => x
                case Cover(Some((x, _)), None) => x
            }
        }

        def sample: Double = {
            val clust = td.clusters
            td.nclusters match {
                case 0 => 0.0
                case 1 => clust(clust.keyMin.get)
                case n if (n <= td.maxDiscrete) => cdfDiscreteInverse(scala.util.Random.nextDouble)
                case _ => td.cdfInverse(scala.util.Random.nextDouble)
            }
        }
    }
    val tdVecBC = spark.sparkContext.broadcast(tdVec)
    spark.sparkContext.parallelize(1 to n, partitions).map { r => 
        scala.util.Random.setSeed(r + (r * seed))
        tdVecBC.value.map(_.sample) 
    }
}


Out[188]:
defined function synthesize

In [189]:
val trainFV = trainCSV.rdd.map(_.toSeq.toVector.drop(1).map(_.asInstanceOf[String].toDouble))
val fvSketches = tdSketchFV(trainFV)


                                                                                
Out[189]:
trainFV: RDD[Vector[Double]] = MapPartitionsRDD[283946] at map at cmd188.sc:1
fvSketches: Vector[TDigest] = Vector(
  TDigest(
    0.5,
    50,
    71,
    Map(
      1.0 -> 9.0,
      1.333 -> 24.0,
      1.5 -> 45.0,
      1.600489455071117 -> 33.242848970251714,
      1.670598074443201 -> 5.180451805513876,
      1.697451499884758 -> 66.5766992242344,
...

In [190]:
val synthFV = synthesize(fvSketches, 60000, 4)


Out[190]:
synthFV: RDD[Vector[Double]] = MapPartitionsRDD[283948] at map at cmd187.sc:22

In [191]:
val trainLab = trainFV.map(_.toLabeledPoint(1.0))
val synthLab = synthFV.map(_.toLabeledPoint(0.0))
val trainFR = (trainLab ++ synthLab).cache()


Out[191]:
trainLab: RDD[LabeledPoint] = MapPartitionsRDD[283949] at map at cmd190.sc:1
synthLab: RDD[LabeledPoint] = MapPartitionsRDD[283950] at map at cmd190.sc:2
trainFR: RDD[LabeledPoint] = UnionRDD[283951] at $plus$plus at cmd190.sc:3

In [192]:
def frClassifier = {
val rfFR = RandomForest.trainClassifier(
    trainFR,              // training data
    2,                    // number of classes
    Map.empty[Int, Int],  // category info
    100,                  // forest size
    "auto",               // 
    "gini",               // split quality measure
    10,                   // max depth
    20,                   // max bins
    235711)               // random seed
val predictionAndLabels = trainLab.map { case LabeledPoint(label, features) =>
  val prediction = rfFR.predict(features)
  (prediction, label)
}
val metrics = new MulticlassMetrics(predictionAndLabels)
(rfFR, metrics)
}


Out[192]:
defined function frClassifier

In [193]:
val (rfFR, metrics) = frClassifier
metrics.accuracy


                                                                                
Out[193]:
rfFR: mllib.tree.model.RandomForestModel = TreeEnsembleModel classifier with 100 trees

metrics: MulticlassMetrics = org.apache.spark.mllib.evaluation.MulticlassMetrics@72549e53
res192_1: Double = 1.0

In [194]:
val frRules = rfFR.trees.map(_.rules(trainCSV.columns.toVector.drop(1), Map.empty[Int, Int]))


Out[194]:
frRules: Array[Map[Double, Seq[Seq[Predicate]]]] = Array(
  Map(
    1.0 -> ArrayBuffer(
      List((MRVSA2 <= 0.0)),
      List((MRVSA2 > 0.0), (RDFM30 <= 0.0)),
      List((MRVSA2 > 0.0), (RDFM30 > 0.0), (VSAEstate2 <= 0.0))
    ),
    0.0 -> ArrayBuffer(List((MRVSA2 > 0.0), (RDFM30 > 0.0), (VSAEstate2 > 0.0)))
  ),
  Map(
    1.0 -> ArrayBuffer(
      List((RDFU25 <= 0.0)),
...

In [195]:
val featPerTree = for {
    rmap <- frRules
    feats = {
        val fraw = for {
            vrules <- rmap.values
            plist <- vrules
            pred <- plist
        } yield { pred.feature }
        fraw.toSet.toSeq
    }
    f <- feats
} yield { f }
val featCounts = featPerTree.foldLeft(Map.empty[String, Int])((m, f) => m + (f -> (1 + m.getOrElse(f, 0))))
val featHist = featCounts.toSeq.sortBy { case (_, n) => -n}.toVector
println(featHist.mkString("\n"))


(VSAEstate0,20)
(RNCS,20)
(MRVSA7,20)
(VSAEstate2,18)
(VSAEstate3,18)
(slogPVSA8,17)
(slogPVSA6,15)
(RDFM29,13)
(RDFM30,13)
(RDFV30,12)
(RDFU27,11)
(PEOEVSA4,11)
(slogPVSA3,10)
(slogPVSA7,9)
(RDFE30,9)
(PEOEVSA10,9)
(Chi4c,8)
(RDFV27,8)
(RDFV24,8)
(PEOEVSA11,7)
(PEOEVSA2,7)
(slogPVSA9,7)
(Chiv4c,7)
(PEOEVSA3,7)
(RDFU30,7)
(MRVSA3,7)
(RDFE28,7)
(MRVSA8,6)
(PEOEVSA12,6)
(Chi5ch,6)
(MRVSA2,6)
(RDFP29,6)
(RDFV29,6)
(RDFM27,6)
(RDFP30,6)
(RDFP24,5)
(RDFM25,5)
(RDFP25,5)
(slogPVSA0,5)
(EstateVSA6,5)
(RDFE29,5)
(RDFM24,5)
(MRVSA5,4)
(RDFV23,4)
(EstateVSA2,4)
(Chiv5ch,4)
(RDFE27,4)
(RDFV28,4)
(RDFV26,4)
(RDFU29,4)
(RDFU25,4)
(RDFP27,3)
(RDFM22,3)
(RDFU28,3)
(GATSm8,3)
(RDFU24,3)
(EstateVSA5,3)
(slogPVSA11,3)
(RDFE26,3)
(Tp,3)
(RDFE23,3)
(RDFU22,3)
(PEOEVSA8,3)
(RDFP28,3)
(RDFU21,3)
(RDFE25,3)
(L1p,2)
(RDFM26,2)
(rygr,2)
(RDFM28,2)
(RDFE22,2)
(RDFU23,2)
(RDFP14,2)
(PEOEVSA1,2)
(EstateVSA0,2)
(Tu,2)
(RDFV25,2)
(RDFP22,2)
(RDFV21,2)
(RDFP26,2)
(RDFU26,2)
(Vm,2)
(slogPVSA2,2)
(RDFM20,2)
(RDFE21,2)
(RDFU9,1)
(RDFM2,1)
(W3DH,1)
(GeDi,1)
(Chi3,1)
(L1e,1)
(nhyd,1)
(E1u,1)
(Chiv6ch,1)
(LogP,1)
(Ve,1)
(RDFE20,1)
(slogPVSA10,1)
(RDFM19,1)
(nhal,1)
(J,1)
(EstateVSA1,1)
(RDFC14,1)
(RDFP21,1)
(RDFV12,1)
(L1u,1)
(RDFC25,1)
(RDFC10,1)
(RDFC13,1)
(bcute5,1)
(RDFU2,1)
(bcutm5,1)
(RDFE19,1)
(bcutp13,1)
(MoRSEP14,1)
(GATSe8,1)
(Gravto,1)
(RDFU12,1)
(MSA,1)
(RDFE24,1)
(Vv,1)
(RDFU20,1)
(RDFE1,1)
(P1e,1)
(AWeight,1)
(RDFM23,1)
(Tv,1)
(RDFM12,1)
(Hato,1)
(P1m,1)
(bcutv13,1)
(MATSm6,1)
(bcutv7,1)
(PEOEVSA13,1)
(Chi7,1)
(WNSA3,1)
(EstateVSA4,1)
(bcutv16,1)
(L1m,1)
(bcutv4,1)
(Chi6ch,1)
Out[195]:
featPerTree: Array[String] = Array(
  "MRVSA2",
  "RDFM30",
  "VSAEstate2",
  "RDFU25",
  "RDFM30",
  "Chi5ch",
  "VSAEstate3",
  "RDFM29",
  "VSAEstate3",
  "RDFV29",
  "PEOEVSA4",
...
featCounts: Map[String, Int] = Map(
  "MRVSA8" -> 6,
  "PEOEVSA12" -> 6,
  "RDFU9" -> 1,
  "VSAEstate2" -> 18,
  "MRVSA5" -> 4,
  "Chi5ch" -> 6,
  "RDFM2" -> 1,
  "RDFM29" -> 13,
  "W3DH" -> 1,
  "RDFP27" -> 3,
  "GeDi" -> 1,
...
featHist: Vector[(String, Int)] = Vector(
  ("VSAEstate0", 20),
  ("RNCS", 20),
  ("MRVSA7", 20),
  ("VSAEstate2", 18),
  ("VSAEstate3", 18),
  ("slogPVSA8", 17),
  ("slogPVSA6", 15),
  ("RDFM29", 13),
  ("RDFM30", 13),
  ("RDFV30", 12),
  ("RDFU27", 11),
...

In [196]:
val featSelect = featHist.filter { case (_, n) => n > 1 }.map { case (f, _) => f }
featSelect.length
featHist.length


Out[196]:
featSelect: Vector[String] = Vector(
  "VSAEstate0",
  "RNCS",
  "MRVSA7",
  "VSAEstate2",
  "VSAEstate3",
  "slogPVSA8",
  "slogPVSA6",
  "RDFM29",
  "RDFM30",
  "RDFV30",
  "RDFU27",
...
res195_1: Int = 85
res195_2: Int = 141

In [197]:
val trainCSVFR = trainCSV.select("_c0", featSelect :_*)
val testCSVFR = testCSV.select("_c0", featSelect :_*)


Out[197]:
trainCSVFR: DataFrame = [_c0: string, VSAEstate0: string ... 84 more fields]
testCSVFR: DataFrame = [_c0: string, VSAEstate0: string ... 84 more fields]

In [198]:
println(trainLabelsCSV.columns.mkString(", "))


_c0, NR.AhR, NR.AR, NR.AR.LBD, NR.Aromatase, NR.ER, NR.ER.LBD, NR.PPAR.gamma, SR.ARE, SR.ATAD5, SR.HSE, SR.MMP, SR.p53

In [199]:
def csvToPairs(csv: DataFrame) =
    csv.rdd.map(_.toSeq)
    .map(v => (v.head.asInstanceOf[String], v.tail.toVector.map(_.asInstanceOf[String].toDouble)))
def lpFR(tlab: String, labCSV: DataFrame, fvCSV: DataFrame) = {
    val labCol = labCSV.select("_c0", tlab)
    val labPairs = labCol.filter(labCol(tlab) =!= "NA")
        .rdd.map(_.toSeq).map { s => (s(0).asInstanceOf[String], s(1).asInstanceOf[String].toDouble) }
    val fvPairs = csvToPairs(fvCSV)
    val lp = labPairs.join(fvPairs).map { case (_, (lab, feats)) => LabeledPoint(lab, feats.toSpark) }
    lp
}


Out[199]:
defined function csvToPairs
defined function lpFR

In [200]:
case class TrainTestSets(
    trainAll: RDD[LabeledPoint], testAll: RDD[LabeledPoint],
    trainFR: RDD[LabeledPoint], testFR: RDD[LabeledPoint]
)
val trainTestSets = trainLabelsCSV.columns.tail.map { acol => (acol, s"`$acol`")}
    .foldLeft(Map.empty[String,TrainTestSets]) { case (mp, (acolRaw, acol)) =>
        mp + (acolRaw -> TrainTestSets(
            lpFR(acol, trainLabelsCSV, trainCSV).cache(), lpFR(acol, testLabelsCSV, testCSV).cache(),
            lpFR(acol, trainLabelsCSV, trainCSVFR).cache(), lpFR(acol, testLabelsCSV, testCSVFR.cache())
        ))
    }


Out[200]:
defined class TrainTestSets
trainTestSets: Map[String, wrapper.wrapper.TrainTestSets] = Map(
  "SR.MMP" -> TrainTestSets(
    MapPartitionsRDD[284503] at map at cmd198.sc:9,
    MapPartitionsRDD[284515] at map at cmd198.sc:9,
    MapPartitionsRDD[284527] at map at cmd198.sc:9,
    MapPartitionsRDD[284539] at map at cmd198.sc:9
  ),
  "NR.AhR" -> TrainTestSets(
    MapPartitionsRDD[284009] at map at cmd198.sc:9,
    MapPartitionsRDD[284025] at map at cmd198.sc:9,
    MapPartitionsRDD[284041] at map at cmd198.sc:9,
    MapPartitionsRDD[284059] at map at cmd198.sc:9
...

In [201]:
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
def trainLRModel(train: RDD[LabeledPoint], test: RDD[LabeledPoint]) = {
    val lrModel = new LogisticRegressionWithLBFGS().run(train)
    val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
        val prediction = lrModel.predict(features)
        (prediction, label)
    }
    val metrics = new MulticlassMetrics(predictionAndLabels)
    (lrModel, predictionAndLabels, metrics)
}
def trainSVMModel(train: RDD[LabeledPoint], test: RDD[LabeledPoint]) = {
    val model = new SVMWithSGD().run(train)
    val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
        val prediction = model.predict(features)
        (prediction, label)
    }
    val metrics = new MulticlassMetrics(predictionAndLabels)
    (model, predictionAndLabels, metrics)
}
def trainGBModel(train: RDD[LabeledPoint], test: RDD[LabeledPoint]) = {
    val boostingStrategy = BoostingStrategy.defaultParams("Classification")
    boostingStrategy.numIterations = 10
    boostingStrategy.treeStrategy.numClasses = 2
    boostingStrategy.treeStrategy.maxDepth = 6
    boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
    val model = GradientBoostedTrees.train(train, boostingStrategy)
    val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
        val prediction = model.predict(features)
        (prediction, label)
    }
    val metrics = new MulticlassMetrics(predictionAndLabels)
    (model, predictionAndLabels, metrics)
}


Out[201]:
import org.apache.spark.mllib.classification.SVMWithSGD

import org.apache.spark.mllib.classification.NaiveBayes

import org.apache.spark.mllib.tree.GradientBoostedTrees

import org.apache.spark.mllib.tree.configuration.BoostingStrategy

defined function trainLRModel
defined function trainSVMModel
defined function trainGBModel

In [202]:
def comparePlot(data: Array[Map[String, Any]]) = {
    val ffn = fvSketches.length
    val rfn = featSelect.length
    val expr = s""" datum.feats < $ffn ? (datum.accuracy >= datum.alt ? "Reduced ($rfn) >=" : "Reduced ($rfn) <") : "Full ($ffn)" """
    Vegas().
    withData(data).
    mark(Bar).
    addTransformCalculation("features", expr).
    encodeColumn("assay", Nominal, scale=Scale(padding=20.0),
                 axis=Axis(title="tox assay", orient=Orient.Bottom, axisWidth=1.0, labelAngle=45,labelAlign="bottom",offset= -5)).
    encodeY("accuracy", Quantitative, axis=Axis(title="model accuracy", grid=false)).
    encodeX("feats", Nominal, scale=Scale(bandSize = 15), hideAxis=true).
    encodeColor("features", Nominal, scale=Scale(rangeNominals=List("#e06118", "#99c69c", "#2841ff"))).
    configFacet(cell=CellConfig(strokeWidth = 0)).
    show
}


Out[202]:
defined function comparePlot

In [203]:
val allvsFR = trainLabelsCSV.columns.tail
    .flatMap { acol =>
        val TrainTestSets(trainAll, testAll, trainFR, testFR) = trainTestSets(acol)
        val (modelAll, predLabAll, metricsAll) = trainLRModel(trainAll, testAll)
        val (modelFR, predLabFR, metricsFR) = trainLRModel(trainFR, testFR)
        Seq(
            Map(
                "assay" -> acol,
                "feats" -> fvSketches.length,
                "accuracy" -> metricsAll.accuracy,
                "alt" -> metricsFR.accuracy
            ),
            Map(
                "assay" -> acol,
                "feats" -> featSelect.length,
                "accuracy" -> metricsFR.accuracy,
                "alt" -> metricsAll.accuracy
            )
        )
    }
comparePlot(allvsFR)


                                                                                
Out[203]:
allvsFR: Array[Map[String, Any]] = Array(
  Map(
    "assay" -> NR.AhR,
    "feats" -> 801,
    "accuracy" -> 0.8803278688524591,
    "alt" -> 0.8836065573770492
  ),
  Map(
    "assay" -> NR.AhR,
    "feats" -> 85,
    "accuracy" -> 0.8836065573770492,
    "alt" -> 0.8803278688524591
...

In [204]:
val allvsFRSVM = trainLabelsCSV.columns.tail
    .flatMap { acol =>
        val TrainTestSets(trainAll, testAll, trainFR, testFR) = trainTestSets(acol)
        val (modelAll, predLabAll, metricsAll) = trainSVMModel(trainAll, testAll)
        val (modelFR, predLabFR, metricsFR) = trainSVMModel(trainFR, testFR)
        Seq(
            Map(
                "assay" -> acol,
                "feats" -> fvSketches.length,
                "accuracy" -> metricsAll.accuracy,
                "alt" -> metricsFR.accuracy
            ),
            Map(
                "assay" -> acol,
                "feats" -> featSelect.length,
                "accuracy" -> metricsFR.accuracy,
                "alt" -> metricsAll.accuracy
            )
        )
    }
comparePlot(allvsFRSVM)


Out[204]:
allvsFRSVM: Array[Map[String, Any]] = Array(
  Map(
    "assay" -> NR.AhR,
    "feats" -> 801,
    "accuracy" -> 0.8803278688524591,
    "alt" -> 0.8803278688524591
  ),
  Map(
    "assay" -> NR.AhR,
    "feats" -> 85,
    "accuracy" -> 0.8803278688524591,
    "alt" -> 0.8803278688524591
...

In [205]:
val allvsFRGB = trainLabelsCSV.columns.tail
    .flatMap { acol =>
        val TrainTestSets(trainAll, testAll, trainFR, testFR) = trainTestSets(acol)
        val (modelAll, predLabAll, metricsAll) = trainGBModel(trainAll, testAll)
        val (modelFR, predLabFR, metricsFR) = trainGBModel(trainFR, testFR)
        Seq(
            Map(
                "assay" -> acol,
                "feats" -> fvSketches.length,
                "accuracy" -> metricsAll.accuracy,
                "alt" -> metricsFR.accuracy
            ),
            Map(
                "assay" -> acol,
                "feats" -> featSelect.length,
                "accuracy" -> metricsFR.accuracy,
                "alt" -> metricsAll.accuracy
            )
        )
    }
comparePlot(allvsFRGB)


                                                                                
Out[205]:
allvsFRGB: Array[Map[String, Any]] = Array(
  Map(
    "assay" -> NR.AhR,
    "feats" -> 801,
    "accuracy" -> 0.8754098360655738,
    "alt" -> 0.8868852459016393
  ),
  Map(
    "assay" -> NR.AhR,
    "feats" -> 85,
    "accuracy" -> 0.8868852459016393,
    "alt" -> 0.8754098360655738
...

In [206]:
import org.apache.spark.mllib.classification.SVMModel
import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel


Out[206]:
import org.apache.spark.mllib.classification.SVMModel

import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel

In [207]:
val modelsFullLR = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,LogisticRegressionModel]) { case (mp, acol) => {
            val TrainTestSets(train, _, _, _) = trainTestSets(acol)
            mp + (acol -> new LogisticRegressionWithLBFGS().run(train))
        }}
}
val modelsRedLR = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,LogisticRegressionModel]) { case (mp, acol) => {
            val TrainTestSets(_, _, train, _) = trainTestSets(acol)
            mp + (acol -> new LogisticRegressionWithLBFGS().run(train))
        }}
}


                                                                                
time: 72.212
time: 45.674
Out[207]:
modelsFullLR: Map[String, LogisticRegressionModel] = Map(
  "SR.MMP" -> org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 801, numClasses = 2, threshold = 0.5,
  "NR.AhR" -> org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 801, numClasses = 2, threshold = 0.5,
  "SR.HSE" -> org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 801, numClasses = 2, threshold = 0.5,
  "NR.ER" -> org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 801, numClasses = 2, threshold = 0.5,
  "NR.AR" -> org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 801, numClasses = 2, threshold = 0.5,
  "NR.ER.LBD" -> org.apache.spark.mllib.classification.LogisticRegressionModel: ...
modelsRedLR: Map[String, LogisticRegressionModel] = Map(
  "SR.MMP" -> org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 85, numClasses = 2, threshold = 0.5,
  "NR.AhR" -> org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 85, numClasses = 2, threshold = 0.5,
  "SR.HSE" -> org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 85, numClasses = 2, threshold = 0.5,
  "NR.ER" -> org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 85, numClasses = 2, threshold = 0.5,
  "NR.AR" -> org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 85, numClasses = 2, threshold = 0.5,
  "NR.ER.LBD" -> org.apache.spark.mllib.classification.LogisticRegressionModel: ...

In [208]:
val modelsFullSVM = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,SVMModel]) { case (mp, acol) => {
            val TrainTestSets(train, _, _, _) = trainTestSets(acol)
            mp + (acol -> new SVMWithSGD().run(train))
        }}
}
val modelsRedSVM = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,SVMModel]) { case (mp, acol) => {
            val TrainTestSets(_, _, train, _) = trainTestSets(acol)
            mp + (acol -> new SVMWithSGD().run(train))
        }}
}


time: 35.2
time: 34.08
Out[208]:
modelsFullSVM: Map[String, SVMModel] = Map(
  "SR.MMP" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 801, numClasses = 2, threshold = 0.0,
  "NR.AhR" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 801, numClasses = 2, threshold = 0.0,
  "SR.HSE" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 801, numClasses = 2, threshold = 0.0,
  "NR.ER" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 801, numClasses = 2, threshold = 0.0,
  "NR.AR" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 801, numClasses = 2, threshold = 0.0,
  "NR.ER.LBD" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0...
modelsRedSVM: Map[String, SVMModel] = Map(
  "SR.MMP" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 85, numClasses = 2, threshold = 0.0,
  "NR.AhR" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 85, numClasses = 2, threshold = 0.0,
  "SR.HSE" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 85, numClasses = 2, threshold = 0.0,
  "NR.ER" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 85, numClasses = 2, threshold = 0.0,
  "NR.AR" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 85, numClasses = 2, threshold = 0.0,
  "NR.ER.LBD" -> org.apache.spark.mllib.classification.SVMModel: intercept = 0.0...

In [209]:
val modelsFullGB = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,GradientBoostedTreesModel]) { case (mp, acol) => {
            val TrainTestSets(train, _, _, _) = trainTestSets(acol)
            val boostingStrategy = BoostingStrategy.defaultParams("Classification")
            boostingStrategy.numIterations = 10
            boostingStrategy.treeStrategy.numClasses = 2
            boostingStrategy.treeStrategy.maxDepth = 6
            boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
            val model = GradientBoostedTrees.train(train, boostingStrategy)
            mp + (acol -> model)
        }}
}
val modelsRedGB = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,GradientBoostedTreesModel]) { case (mp, acol) => {
            val TrainTestSets(_, _, train, _) = trainTestSets(acol)
            val boostingStrategy = BoostingStrategy.defaultParams("Classification")
            boostingStrategy.numIterations = 10
            boostingStrategy.treeStrategy.numClasses = 2
            boostingStrategy.treeStrategy.maxDepth = 6
            boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]()
            val model = GradientBoostedTrees.train(train, boostingStrategy)
            mp + (acol -> model)
        }}
}


                                                                                
time: 250.883
time: 62.571
Out[209]:
modelsFullGB: Map[String, GradientBoostedTreesModel] = Map(
  "SR.MMP" -> TreeEnsembleModel classifier with 10 trees
,
  "NR.AhR" -> TreeEnsembleModel classifier with 10 trees
,
  "SR.HSE" -> TreeEnsembleModel classifier with 10 trees
,
  "NR.ER" -> TreeEnsembleModel classifier with 10 trees
,
  "NR.AR" -> TreeEnsembleModel classifier with 10 trees
,
  "NR.ER.LBD" -> TreeEnsembleModel classifier with 10 trees
...
modelsRedGB: Map[String, GradientBoostedTreesModel] = Map(
  "SR.MMP" -> TreeEnsembleModel classifier with 10 trees
,
  "NR.AhR" -> TreeEnsembleModel classifier with 10 trees
,
  "SR.HSE" -> TreeEnsembleModel classifier with 10 trees
,
  "NR.ER" -> TreeEnsembleModel classifier with 10 trees
,
  "NR.AR" -> TreeEnsembleModel classifier with 10 trees
,
  "NR.ER.LBD" -> TreeEnsembleModel classifier with 10 trees
...

In [210]:
val synthFull = synthesize(fvSketches, 40000, 1).map(_.toSpark).cache()


Out[210]:
synthFull: RDD[mllib.linalg.Vector] = MapPartitionsRDD[347126] at map at cmd209.sc:1

In [211]:
val trainRedFV = trainCSVFR.rdd.map(_.toSeq.toVector.drop(1).map(_.asInstanceOf[String].toDouble))
val sketchRed = tdSketchFV(trainRedFV)
val synthRed = synthesize(sketchRed, 40000, 1).map(_.toSpark).cache()


                                                                                
Out[211]:
trainRedFV: RDD[Vector[Double]] = MapPartitionsRDD[347127] at map at cmd210.sc:1
sketchRed: Vector[TDigest] = Vector(
  TDigest(0.5, 50, 1, Map(0.0 -> 12060.0)),
  TDigest(0.5, 50, 1, Map(0.0 -> 12060.0)),
  TDigest(0.5, 50, 1, Map(0.0 -> 12060.0)),
  TDigest(0.5, 50, 1, Map(0.0 -> 12060.0)),
  TDigest(0.5, 50, 1, Map(0.0 -> 12060.0)),
  TDigest(0.5, 50, 1, Map(0.0 -> 12060.0)),
  TDigest(
    0.5,
    50,
    52,
    Map(
...
synthRed: RDD[mllib.linalg.Vector] = MapPartitionsRDD[347130] at map at cmd210.sc:3

In [212]:
val evalFullLR = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
            val data = synthFull
            val model = modelsFullLR(acol)
            val pred = data.map { features => model.predict(features) }
            val force = pred.count
            mp + (acol -> pred)
        }}
}
val evalRedLR = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
            val data = synthRed
            val model = modelsRedLR(acol)
            val pred = data.map { features => model.predict(features) }
            val force = pred.count
            mp + (acol -> pred)
        }}
}


time: 32.144
time: 3.877
Out[212]:
evalFullLR: Map[String, RDD[Double]] = Map(
  "SR.MMP" -> MapPartitionsRDD[347141] at map at cmd211.sc:6,
  "NR.AhR" -> MapPartitionsRDD[347131] at map at cmd211.sc:6,
  "SR.HSE" -> MapPartitionsRDD[347140] at map at cmd211.sc:6,
  "NR.ER" -> MapPartitionsRDD[347135] at map at cmd211.sc:6,
  "NR.AR" -> MapPartitionsRDD[347132] at map at cmd211.sc:6,
  "NR.ER.LBD" -> MapPartitionsRDD[347136] at map at cmd211.sc:6,
  "SR.ATAD5" -> MapPartitionsRDD[347139] at map at cmd211.sc:6,
  "NR.AR.LBD" -> MapPartitionsRDD[347133] at map at cmd211.sc:6,
  "NR.Aromatase" -> MapPartitionsRDD[347134] at map at cmd211.sc:6,
  "NR.PPAR.gamma" -> MapPartitionsRDD[347137] at map at cmd211.sc:6,
  "SR.ARE" -> MapPartitionsRDD[347138] at map at cmd211.sc:6,
...
evalRedLR: Map[String, RDD[Double]] = Map(
  "SR.MMP" -> MapPartitionsRDD[347153] at map at cmd211.sc:16,
  "NR.AhR" -> MapPartitionsRDD[347143] at map at cmd211.sc:16,
  "SR.HSE" -> MapPartitionsRDD[347152] at map at cmd211.sc:16,
  "NR.ER" -> MapPartitionsRDD[347147] at map at cmd211.sc:16,
  "NR.AR" -> MapPartitionsRDD[347144] at map at cmd211.sc:16,
  "NR.ER.LBD" -> MapPartitionsRDD[347148] at map at cmd211.sc:16,
  "SR.ATAD5" -> MapPartitionsRDD[347151] at map at cmd211.sc:16,
  "NR.AR.LBD" -> MapPartitionsRDD[347145] at map at cmd211.sc:16,
  "NR.Aromatase" -> MapPartitionsRDD[347146] at map at cmd211.sc:16,
  "NR.PPAR.gamma" -> MapPartitionsRDD[347149] at map at cmd211.sc:16,
  "SR.ARE" -> MapPartitionsRDD[347150] at map at cmd211.sc:16,
...

In [213]:
val evalFullSVM = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
            val data = synthFull
            val model = modelsFullSVM(acol)
            val pred = data.map { features => model.predict(features) }
            val force = pred.count
            mp + (acol -> pred)
        }}
}
val evalRedSVM = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
            val data = synthRed
            val model = modelsRedSVM(acol)
            val pred = data.map { features => model.predict(features) }
            val force = pred.count
            mp + (acol -> pred)
        }}
}


time: 0.586
time: 0.234
Out[213]:
evalFullSVM: Map[String, RDD[Double]] = Map(
  "SR.MMP" -> MapPartitionsRDD[347165] at map at cmd212.sc:6,
  "NR.AhR" -> MapPartitionsRDD[347155] at map at cmd212.sc:6,
  "SR.HSE" -> MapPartitionsRDD[347164] at map at cmd212.sc:6,
  "NR.ER" -> MapPartitionsRDD[347159] at map at cmd212.sc:6,
  "NR.AR" -> MapPartitionsRDD[347156] at map at cmd212.sc:6,
  "NR.ER.LBD" -> MapPartitionsRDD[347160] at map at cmd212.sc:6,
  "SR.ATAD5" -> MapPartitionsRDD[347163] at map at cmd212.sc:6,
  "NR.AR.LBD" -> MapPartitionsRDD[347157] at map at cmd212.sc:6,
  "NR.Aromatase" -> MapPartitionsRDD[347158] at map at cmd212.sc:6,
  "NR.PPAR.gamma" -> MapPartitionsRDD[347161] at map at cmd212.sc:6,
  "SR.ARE" -> MapPartitionsRDD[347162] at map at cmd212.sc:6,
...
evalRedSVM: Map[String, RDD[Double]] = Map(
  "SR.MMP" -> MapPartitionsRDD[347177] at map at cmd212.sc:16,
  "NR.AhR" -> MapPartitionsRDD[347167] at map at cmd212.sc:16,
  "SR.HSE" -> MapPartitionsRDD[347176] at map at cmd212.sc:16,
  "NR.ER" -> MapPartitionsRDD[347171] at map at cmd212.sc:16,
  "NR.AR" -> MapPartitionsRDD[347168] at map at cmd212.sc:16,
  "NR.ER.LBD" -> MapPartitionsRDD[347172] at map at cmd212.sc:16,
  "SR.ATAD5" -> MapPartitionsRDD[347175] at map at cmd212.sc:16,
  "NR.AR.LBD" -> MapPartitionsRDD[347169] at map at cmd212.sc:16,
  "NR.Aromatase" -> MapPartitionsRDD[347170] at map at cmd212.sc:16,
  "NR.PPAR.gamma" -> MapPartitionsRDD[347173] at map at cmd212.sc:16,
  "SR.ARE" -> MapPartitionsRDD[347174] at map at cmd212.sc:16,
...

In [214]:
val evalFullGB = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
            val data = synthFull
            val model = modelsFullGB(acol)
            val pred = data.map { features => model.predict(features) }
            val force = pred.count
            mp + (acol -> pred)
        }}
}
val evalRedGB = timing {
    trainLabelsCSV.columns.tail
        .foldLeft(Map.empty[String,RDD[Double]]) { case (mp, acol) => {
            val data = synthRed
            val model = modelsRedGB(acol)
            val pred = data.map { features => model.predict(features) }
            val force = pred.count
            mp + (acol -> pred)
        }}
}


time: 1.33
time: 0.885
Out[214]:
evalFullGB: Map[String, RDD[Double]] = Map(
  "SR.MMP" -> MapPartitionsRDD[347189] at map at cmd213.sc:6,
  "NR.AhR" -> MapPartitionsRDD[347179] at map at cmd213.sc:6,
  "SR.HSE" -> MapPartitionsRDD[347188] at map at cmd213.sc:6,
  "NR.ER" -> MapPartitionsRDD[347183] at map at cmd213.sc:6,
  "NR.AR" -> MapPartitionsRDD[347180] at map at cmd213.sc:6,
  "NR.ER.LBD" -> MapPartitionsRDD[347184] at map at cmd213.sc:6,
  "SR.ATAD5" -> MapPartitionsRDD[347187] at map at cmd213.sc:6,
  "NR.AR.LBD" -> MapPartitionsRDD[347181] at map at cmd213.sc:6,
  "NR.Aromatase" -> MapPartitionsRDD[347182] at map at cmd213.sc:6,
  "NR.PPAR.gamma" -> MapPartitionsRDD[347185] at map at cmd213.sc:6,
  "SR.ARE" -> MapPartitionsRDD[347186] at map at cmd213.sc:6,
...
evalRedGB: Map[String, RDD[Double]] = Map(
  "SR.MMP" -> MapPartitionsRDD[347201] at map at cmd213.sc:16,
  "NR.AhR" -> MapPartitionsRDD[347191] at map at cmd213.sc:16,
  "SR.HSE" -> MapPartitionsRDD[347200] at map at cmd213.sc:16,
  "NR.ER" -> MapPartitionsRDD[347195] at map at cmd213.sc:16,
  "NR.AR" -> MapPartitionsRDD[347192] at map at cmd213.sc:16,
  "NR.ER.LBD" -> MapPartitionsRDD[347196] at map at cmd213.sc:16,
  "SR.ATAD5" -> MapPartitionsRDD[347199] at map at cmd213.sc:16,
  "NR.AR.LBD" -> MapPartitionsRDD[347193] at map at cmd213.sc:16,
  "NR.Aromatase" -> MapPartitionsRDD[347194] at map at cmd213.sc:16,
  "NR.PPAR.gamma" -> MapPartitionsRDD[347197] at map at cmd213.sc:16,
  "SR.ARE" -> MapPartitionsRDD[347198] at map at cmd213.sc:16,
...