Plongeur

A topological data analysis library.

Core algorithm written in Scala, using Apache Spark.

Executed in a Jupyter notebook, using the Apache Toree kernel and declarative widgets.

Graphs rendered with Sigma/Linkurious, wrapped in a Polymer component.

Reactive machinery powered by Rx RxScala.


In [1]:
sc.getConf.toDebugString


Out[1]:
spark.app.id=local-1479586939541
spark.app.name=IBM Spark Kernel
spark.driver.extraJavaOptions=-Xmx10g -Xms10g
spark.driver.host=192.168.0.125
spark.driver.memory=10g
spark.driver.port=54327
spark.executor.id=driver
spark.externalBlockStore.folderName=spark-c4348cae-db2e-45f3-a851-f5d087fd7a99
spark.jars=file:/usr/local/share/jupyter/kernels/apache_toree_scala/lib/toree-assembly-0.1.0.dev8-incubating-SNAPSHOT.jar
spark.master=local[*]
spark.repl.class.uri=http://192.168.0.125:54291
spark.submit.deployMode=client

Case: single cell mRNA expression (Zeisel)


In [2]:
%AddDeps org.scalanlp breeze-natives_2.10 0.11.2
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2 --repository file:/Users/tmo/.m2/repository
%AddDeps com.github.haifengl smile-core 1.2.0 --transitive
%AddDeps com.github.karlhigley spark-neighbors_2.10 0.3.6-FORK --repository file:/Users/tmo/.m2/repository
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive --repository file:/Users/tmo/.m2/repository 
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4 --repository file:/Users/tmo/.m2/repository
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.60 --repository file:/Users/tmo/.m2/repository


Marking org.scalanlp:breeze-natives_2.10:0.11.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/https/repo1.maven.org/maven2/org/scalanlp/breeze-natives_2.10/0.11.2/breeze-natives_2.10-0.11.2.jar
Marking org.apache.spark:spark-mllib_2.10:1.6.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/https/repo1.maven.org/maven2/org/apache/spark/spark-mllib_2.10/1.6.2/spark-mllib_2.10-1.6.2.jar
Marking com.github.haifengl:smile-core:1.2.0 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/https/repo1.maven.org/maven2/com/github/haifengl/smile-graph/1.2.0/smile-graph-1.2.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/https/repo1.maven.org/maven2/com/github/haifengl/smile-data/1.2.0/smile-data-1.2.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/https/repo1.maven.org/maven2/com/github/haifengl/smile-math/1.2.0/smile-math-1.2.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/https/repo1.maven.org/maven2/com/github/haifengl/smile-core/1.2.0/smile-core-1.2.0.jar
Marking com.github.karlhigley:spark-neighbors_2.10:0.3.6-FORK for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/com/github/karlhigley/spark-neighbors_2.10/0.3.6-FORK/spark-neighbors_2.10-0.3.6-FORK.jar
Marking io.reactivex:rxscala_2.10:0.26.1 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/io/reactivex/rxscala_2.10/0.26.1/rxscala_2.10-0.26.1.jar
-> New file at /Users/tmo/.m2/repository/io/reactivex/rxjava/1.1.1/rxjava-1.1.1.jar
Marking com.softwaremill.quicklens:quicklens_2.10:1.4.4 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/https/repo1.maven.org/maven2/com/softwaremill/quicklens/quicklens_2.10/1.4.4/quicklens_2.10-1.4.4.jar
Marking org.tmoerman:plongeur-spark_2.10:0.3.60 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps7950602644399553294/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/org/tmoerman/plongeur-spark_2.10/0.3.60/plongeur-spark_2.10-0.3.60.jar

In [3]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar


Starting download from http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar
Finished download of declarativewidgets.jar

Imports


In [4]:
import rx.lang.scala.{Observer, Subscription, Observable}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.subjects._

import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors.dense
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime
import org.tmoerman.plongeur.tda.TDAMachine
import org.tmoerman.plongeur.tda.Distances._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.Filters._
import org.tmoerman.plongeur.tda.cluster.Clustering._
import org.tmoerman.plongeur.tda.cluster.Scale._
import org.tmoerman.plongeur.tda.Colour._
import org.tmoerman.plongeur.tda.Brewer
import org.tmoerman.plongeur.tda.LSH.LSHParams
import org.tmoerman.plongeur.tda.Model.{DataPoint, TDAContext, dp}
import org.tmoerman.plongeur.tda.knn.FastKNN.FastKNNParams
import org.tmoerman.plongeur.tda.knn.SampledKNN.SampledKNNParams
import org.tmoerman.plongeur.tda.knn.{FastKNN, SampledKNN, _}
import org.tmoerman.plongeur.util.RDDFunctions._
import org.tmoerman.plongeur.util.TimeUtils.time
import org.tmoerman.plongeur.tda.geometry.Laplacian._
import breeze.stats.distributions._
import org.apache.spark.mllib.linalg.SparseMatrix

In [5]:
import declarativewidgets._
initWidgets

import declarativewidgets.WidgetChannels.channel



In [7]:
%%html
<link rel='import' href='urth_components/paper-slider/paper-slider.html' 
        is='urth-core-import' package='PolymerElements/paper-slider'>
<link rel='import' href='urth_components/paper-button/paper-button.html' 
        is='urth-core-import' package='PolymerElements/paper-button'>
<link rel='import' href='urth_components/plongeur-graph/plongeur-graph.html' 
        is='urth-core-import' package='tmoerman/plongeur-graph'>
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>


Out[7]:

In [7]:
import org.tmoerman.plongeur.ui.Controls._
kernel.magics.html(controlsCSS)


Out[7]:

Parse the Zeisel dataset


In [8]:
import breeze.linalg.SparseVector
import breeze.linalg.SparseVector.zeros
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.mllib.linalg.BreezeConversions._

type E = (Index, Either[(Int, Double), (String, Any)])

val N_OFFSET = 2
val D_OFFSET = 11

def toMeta(columns: List[(String, Int)],
           limit: Option[Int],
           f: (String => Any) = identity) = (columns: @unchecked) match {

  case _ :: (label, _) :: values =>
    limit.map(values.take).getOrElse(values).map{ case (value, idx) => (idx - N_OFFSET, Right((label, f(value)))) }
}

def toFeatures(lineIndex: Int,
               columns: List[(String, Int)],
               limit: Option[Int]) = (columns: @unchecked) match {

  case _ :: _ :: features => limit.map(features.take).getOrElse(features).flatMap{ case (feature, idx) => {
    val value = feature.toDouble

    if (value > 0) (idx - N_OFFSET, Left((lineIndex - D_OFFSET, value))) :: Nil else Nil
  }}
}

def parseLine(lineIdx: Int, columns: List[(String, Int)], limit: Option[Int]): Seq[E] = lineIdx match {
  case 0 => toMeta(columns, limit)              // tissue
  case 1 => toMeta(columns, limit, _.toInt)     // group
  case 2 => toMeta(columns, limit, _.toInt)     // total mRNA mol
  case 3 => toMeta(columns, limit, _.toInt)     // well
  case 4 => toMeta(columns, limit, _.toInt)     // sex
  case 5 => toMeta(columns, limit, _.toInt)     // age
  case 6 => toMeta(columns, limit, _.toDouble)  // diameter
  case 7 => Nil // toMeta(columns, limit)       // cell ID
  case 8 => toMeta(columns, limit)              // level 1 class
  case 9 => toMeta(columns, limit)              // level 2 class
  case 10 => Nil                                // empty line
  case _ => toFeatures(lineIdx, columns, limit) // feature
}

def parseZeisel(sc: SparkContext, file: String, limit: Option[Int] = None) = {

  lazy val N = sc.textFile(file).map(line => line.split("\t").length).first - N_OFFSET

  val D = sc.textFile(file).count.toInt - D_OFFSET

  type ACC = (SparseVector[Double], Map[String, Any])

  val INIT: ACC = (zeros[Double](D), Map.empty)

  sc
    .textFile(file)
    .zipWithIndex
    .flatMap{ case (line, lineIdx) =>
      val columns = line.split("\t").map(trim).zipWithIndex.toList
      parseLine(lineIdx.toInt, columns, limit) }
    .aggregateByKey(INIT)(
      { case ((sparse, meta), e) => e match {
          case Left((idx, v)) => sparse.update(idx, v); (sparse, meta)
          case Right((key, v)) => (sparse, meta + (key -> v))
      }},
      { case ((sparse1, meta1), (sparse2, meta2)) => (sparse1 + sparse2, meta1 ++ meta2) })
    .map { case (idx, (sparse, meta)) => DataPoint(idx, sparse.toMLLib, Some(meta)) }
}

In [9]:
val wd = "/Users/tmo/Work/ghb2016/data/zeisel/"

val exp_mRNA = wd + "expression_mRNA_17-Aug-2014.txt"

//val rdd = parseZeisel(sc, exp_mRNA, limit = Some(1500)).cache
val rdd = parseZeisel(sc, exp_mRNA).cache

In [10]:
rdd.count


Out[10]:
3005

In [11]:
val ctx = TDAContext(sc, rdd)

TDA Machine


In [20]:
val in$ = PublishSubject[TDAParams]

In [21]:
val r = scala.util.Random

def format(result: TDAResult) = Map(
    "nodes" -> result.clusters.map(c =>
      Map(
        "id"     -> c.id.toString, //"label"  -> c.label,
        "size"   -> c.dataPoints.size,
        "color"  -> c.colours.headOption.getOrElse("#000000"),
        "x"      -> r.nextInt(100),
        "y"      -> r.nextInt(100))),
    "edges" -> result.edges.map(e => {
      val (from, to) = e.toArray match {case Array(f, t) => (f, t)}

      Map(        
        "id"     -> s"$from-$to",
        "source" -> from.toString,
        "target" -> to.toString)}))

In [22]:
val out$: Observable[TDAResult] = TDAMachine.run(ctx, in$)

In [23]:
out$.subscribe(
    onNext = (r) => channel("ch_TDA_1").set("result", format(r)),
    onError = (e) => println("Error in TDA machine: ", e))


Out[23]:
rx.lang.scala.Subscription$$anon$2@490d3e6c

UI


In [44]:
import TDAParams._

val dist = TanimotoDistance

val lap0 = Filter(LaplacianEigenVector(0, distance = TanimotoDistance), 30, 0.3)
val lap1 = Filter(LaplacianEigenVector(1, distance = TanimotoDistance), 30, 0.3)
val den = Filter(Density(sigma=1.0, distance = dist), 45, 0.30)
val ecc = Filter(Eccentricity(Right(INFINITY), distance = dist), 30, 0.30)

val pc0  = Filter(PrincipalComponent(0), 45, 0.3)
val pc1  = Filter(PrincipalComponent(1), 30, 0.3)
val mean = Filter(FeatureMean, 30, 0.3)
val vari = Filter(FeatureVariance, 30, 0.3)

val age  = Filter(Meta("age"), 30, 0.3)
val diam = Filter(Meta("diameter"), 30, 0.3)
val mRNA = Filter(Meta("total mRNA mol"), 30, 0.3)

val bySex    = ClusterMaxFrequency(Brewer.palettes("Set1")(3), (d: DataPoint) => d.meta.get("sex"))
val byTissue = ClusterMaxFrequency(Brewer.palettes("Set1")(4), (d: DataPoint) => d.meta.get("tissue"))
val byLevel1 = ClusterMaxFrequency(Brewer.palettes("Set1")(7), (d: DataPoint) => d.meta.get("level1class"))
val byGroup  = ClusterMaxFrequency(Brewer.palettes("Set3")(9), (d: DataPoint) => d.meta.get("group #"))

val clSize = ClusterSize(Brewer.palettes("RdYlBu")(9).reverse)

val avgEcc = AverageFilterValue(Brewer.palettes("Blues")(9), ecc)
val avgDen = AverageFilterValue(Brewer.palettes("Reds")(9), den)
val avgAge = AverageFilterValue(Brewer.palettes("RdYlBu")(9), age)
val avgDia = AverageFilterValue(Brewer.palettes("RdYlBu")(9), diam)
val avgRNA = AverageFilterValue(Brewer.palettes("RdYlBu")(9), mRNA)

val groupPct = ClusterPercentage(Brewer.palettes("RdYlBu")(9), (d: DataPoint) => d.meta.get("group #") == 2)

val cortexPct = ClusterPercentage(Brewer.palettes("RdYlBu")(9), (d: DataPoint) => d.meta.get("tissue") == "sscortex")
val hippoPct = ClusterPercentage(Brewer.palettes("RdYlBu")(9), (d: DataPoint) => d.meta.get("tissue") == "ca1hippocampus")

def L1Class(clazz: String) = ClusterPercentage(Brewer.palettes("RdYlBu")(9), (d: DataPoint) => d.meta.get("level1class") == clazz)
val L1Oligo = L1Class("oligodendrocytes")
def L2Oligos(nrs: Int*) = ClusterPercentage(Brewer.palettes("RdYlBu")(9), (d: DataPoint) => nrs.exists(i => d.meta.get("level2class") == s"Oligo$i"))

In [53]:
val BASE =
  TDAParams(
    lens = TDALens(ecc, mRNA),
    clusteringParams = ClusteringParams(distance = dist),
    scaleSelection = firstGap(5),
    collapseDuplicateClusters = true,
    colouring = avgRNA) // L1Class("<class>"))

in$.onNext(BASE)

val (sub, html) = BASE.makeControls(channel("ch_TDA_1"), in$)

kernel.magics.html(html)

In [50]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>    
    <plongeur-graph height="600" data="{{result}}"></plongeur-graph>
</template>


Out[50]:

In [19]:
in$.onCompleted()

In [ ]:


In [ ]: