A topological data analysis library.
Core algorithm written in Scala, using Apache Spark.
Executed in a Jupyter notebook, using the Apache Toree kernel and declarative widgets.
Graphs rendered with Sigma/Linkurious, wrapped in a Polymer component.
In [1]:
sc.getConf.toDebugString
Out[1]:
In [2]:
%AddDeps org.scalanlp breeze-natives_2.10 0.11.2
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2 --repository file:/Users/tmo/.m2/repository
%AddDeps com.github.haifengl smile-core 1.2.0 --transitive
%AddDeps com.github.karlhigley spark-neighbors_2.10 0.3.6-FORK --repository file:/Users/tmo/.m2/repository
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive --repository file:/Users/tmo/.m2/repository
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4 --repository file:/Users/tmo/.m2/repository
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.60 --repository file:/Users/tmo/.m2/repository
In [3]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar
In [4]:
import rx.lang.scala.{Observer, Subscription, Observable}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.subjects._
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors.dense
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime
import org.tmoerman.plongeur.tda.TDAMachine
import org.tmoerman.plongeur.tda.Distances._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.Filters._
import org.tmoerman.plongeur.tda.cluster.Clustering._
import org.tmoerman.plongeur.tda.cluster.Scale._
import org.tmoerman.plongeur.tda.Colour._
import org.tmoerman.plongeur.tda.Brewer
import org.tmoerman.plongeur.tda.LSH.LSHParams
import org.tmoerman.plongeur.tda.Model.{DataPoint, TDAContext, dp}
import org.tmoerman.plongeur.tda.knn.FastKNN.FastKNNParams
import org.tmoerman.plongeur.tda.knn.SampledKNN.SampledKNNParams
import org.tmoerman.plongeur.tda.knn.{FastKNN, SampledKNN, _}
import org.tmoerman.plongeur.util.RDDFunctions._
import org.tmoerman.plongeur.util.TimeUtils.time
import org.tmoerman.plongeur.tda.geometry.Laplacian._
import breeze.stats.distributions._
import org.apache.spark.mllib.linalg.SparseMatrix
In [5]:
import declarativewidgets._
initWidgets
import declarativewidgets.WidgetChannels.channel
In [7]:
%%html
<link rel='import' href='urth_components/paper-slider/paper-slider.html'
is='urth-core-import' package='PolymerElements/paper-slider'>
<link rel='import' href='urth_components/paper-button/paper-button.html'
is='urth-core-import' package='PolymerElements/paper-button'>
<link rel='import' href='urth_components/plongeur-graph/plongeur-graph.html'
is='urth-core-import' package='tmoerman/plongeur-graph'>
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>
Out[7]:
In [7]:
import org.tmoerman.plongeur.ui.Controls._
kernel.magics.html(controlsCSS)
Out[7]:
In [8]:
import breeze.linalg.SparseVector
import breeze.linalg.SparseVector.zeros
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.mllib.linalg.BreezeConversions._
type E = (Index, Either[(Int, Double), (String, Any)])
val N_OFFSET = 2
val D_OFFSET = 11
def toMeta(columns: List[(String, Int)],
limit: Option[Int],
f: (String => Any) = identity) = (columns: @unchecked) match {
case _ :: (label, _) :: values =>
limit.map(values.take).getOrElse(values).map{ case (value, idx) => (idx - N_OFFSET, Right((label, f(value)))) }
}
def toFeatures(lineIndex: Int,
columns: List[(String, Int)],
limit: Option[Int]) = (columns: @unchecked) match {
case _ :: _ :: features => limit.map(features.take).getOrElse(features).flatMap{ case (feature, idx) => {
val value = feature.toDouble
if (value > 0) (idx - N_OFFSET, Left((lineIndex - D_OFFSET, value))) :: Nil else Nil
}}
}
def parseLine(lineIdx: Int, columns: List[(String, Int)], limit: Option[Int]): Seq[E] = lineIdx match {
case 0 => toMeta(columns, limit) // tissue
case 1 => toMeta(columns, limit, _.toInt) // group
case 2 => toMeta(columns, limit, _.toInt) // total mRNA mol
case 3 => toMeta(columns, limit, _.toInt) // well
case 4 => toMeta(columns, limit, _.toInt) // sex
case 5 => toMeta(columns, limit, _.toInt) // age
case 6 => toMeta(columns, limit, _.toDouble) // diameter
case 7 => Nil // toMeta(columns, limit) // cell ID
case 8 => toMeta(columns, limit) // level 1 class
case 9 => toMeta(columns, limit) // level 2 class
case 10 => Nil // empty line
case _ => toFeatures(lineIdx, columns, limit) // feature
}
def parseZeisel(sc: SparkContext, file: String, limit: Option[Int] = None) = {
lazy val N = sc.textFile(file).map(line => line.split("\t").length).first - N_OFFSET
val D = sc.textFile(file).count.toInt - D_OFFSET
type ACC = (SparseVector[Double], Map[String, Any])
val INIT: ACC = (zeros[Double](D), Map.empty)
sc
.textFile(file)
.zipWithIndex
.flatMap{ case (line, lineIdx) =>
val columns = line.split("\t").map(trim).zipWithIndex.toList
parseLine(lineIdx.toInt, columns, limit) }
.aggregateByKey(INIT)(
{ case ((sparse, meta), e) => e match {
case Left((idx, v)) => sparse.update(idx, v); (sparse, meta)
case Right((key, v)) => (sparse, meta + (key -> v))
}},
{ case ((sparse1, meta1), (sparse2, meta2)) => (sparse1 + sparse2, meta1 ++ meta2) })
.map { case (idx, (sparse, meta)) => DataPoint(idx, sparse.toMLLib, Some(meta)) }
}
In [9]:
val wd = "/Users/tmo/Work/ghb2016/data/zeisel/"
val exp_mRNA = wd + "expression_mRNA_17-Aug-2014.txt"
//val rdd = parseZeisel(sc, exp_mRNA, limit = Some(1500)).cache
val rdd = parseZeisel(sc, exp_mRNA).cache
In [10]:
rdd.count
Out[10]:
In [11]:
val ctx = TDAContext(sc, rdd)
In [20]:
val in$ = PublishSubject[TDAParams]
In [21]:
val r = scala.util.Random
def format(result: TDAResult) = Map(
"nodes" -> result.clusters.map(c =>
Map(
"id" -> c.id.toString, //"label" -> c.label,
"size" -> c.dataPoints.size,
"color" -> c.colours.headOption.getOrElse("#000000"),
"x" -> r.nextInt(100),
"y" -> r.nextInt(100))),
"edges" -> result.edges.map(e => {
val (from, to) = e.toArray match {case Array(f, t) => (f, t)}
Map(
"id" -> s"$from-$to",
"source" -> from.toString,
"target" -> to.toString)}))
In [22]:
val out$: Observable[TDAResult] = TDAMachine.run(ctx, in$)
In [23]:
out$.subscribe(
onNext = (r) => channel("ch_TDA_1").set("result", format(r)),
onError = (e) => println("Error in TDA machine: ", e))
Out[23]:
In [44]:
import TDAParams._
val dist = TanimotoDistance
val lap0 = Filter(LaplacianEigenVector(0, distance = TanimotoDistance), 30, 0.3)
val lap1 = Filter(LaplacianEigenVector(1, distance = TanimotoDistance), 30, 0.3)
val den = Filter(Density(sigma=1.0, distance = dist), 45, 0.30)
val ecc = Filter(Eccentricity(Right(INFINITY), distance = dist), 30, 0.30)
val pc0 = Filter(PrincipalComponent(0), 45, 0.3)
val pc1 = Filter(PrincipalComponent(1), 30, 0.3)
val mean = Filter(FeatureMean, 30, 0.3)
val vari = Filter(FeatureVariance, 30, 0.3)
val age = Filter(Meta("age"), 30, 0.3)
val diam = Filter(Meta("diameter"), 30, 0.3)
val mRNA = Filter(Meta("total mRNA mol"), 30, 0.3)
val bySex = ClusterMaxFrequency(Brewer.palettes("Set1")(3), (d: DataPoint) => d.meta.get("sex"))
val byTissue = ClusterMaxFrequency(Brewer.palettes("Set1")(4), (d: DataPoint) => d.meta.get("tissue"))
val byLevel1 = ClusterMaxFrequency(Brewer.palettes("Set1")(7), (d: DataPoint) => d.meta.get("level1class"))
val byGroup = ClusterMaxFrequency(Brewer.palettes("Set3")(9), (d: DataPoint) => d.meta.get("group #"))
val clSize = ClusterSize(Brewer.palettes("RdYlBu")(9).reverse)
val avgEcc = AverageFilterValue(Brewer.palettes("Blues")(9), ecc)
val avgDen = AverageFilterValue(Brewer.palettes("Reds")(9), den)
val avgAge = AverageFilterValue(Brewer.palettes("RdYlBu")(9), age)
val avgDia = AverageFilterValue(Brewer.palettes("RdYlBu")(9), diam)
val avgRNA = AverageFilterValue(Brewer.palettes("RdYlBu")(9), mRNA)
val groupPct = ClusterPercentage(Brewer.palettes("RdYlBu")(9), (d: DataPoint) => d.meta.get("group #") == 2)
val cortexPct = ClusterPercentage(Brewer.palettes("RdYlBu")(9), (d: DataPoint) => d.meta.get("tissue") == "sscortex")
val hippoPct = ClusterPercentage(Brewer.palettes("RdYlBu")(9), (d: DataPoint) => d.meta.get("tissue") == "ca1hippocampus")
def L1Class(clazz: String) = ClusterPercentage(Brewer.palettes("RdYlBu")(9), (d: DataPoint) => d.meta.get("level1class") == clazz)
val L1Oligo = L1Class("oligodendrocytes")
def L2Oligos(nrs: Int*) = ClusterPercentage(Brewer.palettes("RdYlBu")(9), (d: DataPoint) => nrs.exists(i => d.meta.get("level2class") == s"Oligo$i"))
In [53]:
val BASE =
TDAParams(
lens = TDALens(ecc, mRNA),
clusteringParams = ClusteringParams(distance = dist),
scaleSelection = firstGap(5),
collapseDuplicateClusters = true,
colouring = avgRNA) // L1Class("<class>"))
in$.onNext(BASE)
val (sub, html) = BASE.makeControls(channel("ch_TDA_1"), in$)
kernel.magics.html(html)
In [50]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>
<plongeur-graph height="600" data="{{result}}"></plongeur-graph>
</template>
Out[50]:
In [19]:
in$.onCompleted()
In [ ]:
In [ ]: