Plongeur

A topological data analysis library.

Core algorithm written in Scala, using Apache Spark.

Executed in a Jupyter notebook, using the Apache Toree kernel and declarative widgets.

Graphs rendered with Sigma/Linkurious, wrapped in a Polymer component.

Reactive machinery powered by Rx RxScala.

Maven dependencies


In [1]:
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2 --repository file:/Users/tmo/.m2/repository
%AddDeps org.scalanlp breeze-natives_2.10 0.11.2
%AddDeps com.github.haifengl smile-core 1.2.0 --transitive
%AddDeps com.github.karlhigley spark-neighbors_2.10 0.3.6-FORK --repository file:/Users/tmo/.m2/repository
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive --repository file:/Users/tmo/.m2/repository 
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4 --repository file:/Users/tmo/.m2/repository
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.59 --repository file:/Users/tmo/.m2/repository


Marking org.apache.spark:spark-mllib_2.10:1.6.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/https/repo1.maven.org/maven2/org/apache/spark/spark-mllib_2.10/1.6.2/spark-mllib_2.10-1.6.2.jar
Marking org.scalanlp:breeze-natives_2.10:0.11.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/https/repo1.maven.org/maven2/org/scalanlp/breeze-natives_2.10/0.11.2/breeze-natives_2.10-0.11.2.jar
Marking com.github.haifengl:smile-core:1.2.0 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/https/repo1.maven.org/maven2/com/github/haifengl/smile-math/1.2.0/smile-math-1.2.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/https/repo1.maven.org/maven2/com/github/haifengl/smile-graph/1.2.0/smile-graph-1.2.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/https/repo1.maven.org/maven2/com/github/haifengl/smile-data/1.2.0/smile-data-1.2.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/https/repo1.maven.org/maven2/com/github/haifengl/smile-core/1.2.0/smile-core-1.2.0.jar
Marking com.github.karlhigley:spark-neighbors_2.10:0.3.6-FORK for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/com/github/karlhigley/spark-neighbors_2.10/0.3.6-FORK/spark-neighbors_2.10-0.3.6-FORK.jar
Marking io.reactivex:rxscala_2.10:0.26.1 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/io/reactivex/rxscala_2.10/0.26.1/rxscala_2.10-0.26.1.jar
-> New file at /Users/tmo/.m2/repository/io/reactivex/rxjava/1.1.1/rxjava-1.1.1.jar
Marking com.softwaremill.quicklens:quicklens_2.10:1.4.4 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/https/repo1.maven.org/maven2/com/softwaremill/quicklens/quicklens_2.10/1.4.4/quicklens_2.10-1.4.4.jar
Marking org.tmoerman:plongeur-spark_2.10:0.3.59 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps808320359358980455/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/org/tmoerman/plongeur-spark_2.10/0.3.59/plongeur-spark_2.10-0.3.59.jar

In [2]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar


Starting download from http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar
Finished download of declarativewidgets.jar

Import classes


In [3]:
import rx.lang.scala.{Observer, Subscription, Observable}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.subjects._

import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors.dense
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime
import org.tmoerman.plongeur.tda.TDAMachine
import org.tmoerman.plongeur.tda.Distances._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.Filters._
import org.tmoerman.plongeur.tda.cluster.Clustering._
import org.tmoerman.plongeur.tda.cluster.Scale
import org.tmoerman.plongeur.tda.Colour._
import org.tmoerman.plongeur.tda.Brewer
import org.tmoerman.plongeur.tda.LSH.LSHParams
import org.tmoerman.plongeur.tda.Model.{DataPoint, TDAContext, dp}
import org.tmoerman.plongeur.tda.knn.FastKNN.FastKNNParams
import org.tmoerman.plongeur.tda.knn.SampledKNN.SampledKNNParams
import org.tmoerman.plongeur.tda.knn.{FastKNN, SampledKNN, _}
import org.tmoerman.plongeur.util.RDDFunctions._
import org.tmoerman.plongeur.util.TimeUtils.time
import org.tmoerman.plongeur.tda.geometry.Laplacian._
import breeze.stats.distributions._
import org.apache.spark.mllib.linalg.SparseMatrix

In [4]:
import declarativewidgets._
initWidgets

import declarativewidgets.WidgetChannels.channel



In [5]:
import java.util.concurrent.atomic.AtomicReference

case class SubRef(val ref: AtomicReference[Option[Subscription]] = new AtomicReference[Option[Subscription]](None)) extends Serializable {

    def update(sub: Subscription): Unit = ref.getAndSet(Option(sub)).foreach(old => old.unsubscribe())

    def reset(): Unit = update(null)

}

Import polymer elements

These cells triggers Bower installations of the specified web components.

If it doesn't work, check whether Bower has sufficient permissions to install in the jupyter /nbextensions folder.


In [6]:
%%html
<link rel='import' href='urth_components/paper-slider/paper-slider.html' 
        is='urth-core-import' package='PolymerElements/paper-slider'>
<link rel='import' href='urth_components/paper-button/paper-button.html' 
        is='urth-core-import' package='PolymerElements/paper-button'>
<link rel='import' href='urth_components/plongeur-graph/plongeur-graph.html' 
        is='urth-core-import' package='tmoerman/plongeur-graph'>
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>


Out[6]:

Reactive TDA Machine

Keep references to Rx subscriptions apart.


In [7]:
val in$_subRef = SubRef()

Instantiate a PublishSubject. This stream of TDAParams instances represents the input of a TDAMachine. The PublishSubject listens to changes and sets these to the channel "ch_TDA_1" under the "params" key.

TODO: unsubscribe previous on re-evaluation


In [8]:
val in$ = PublishSubject[TDAParams]

in$_subRef.update(in$.subscribe(p => channel("ch_TDA_1").set("params", p.toString)))

Create an initial TDAParams instance. In the same cell, we submit the instance to the PublishSubject.


In [9]:
import org.tmoerman.plongeur.tda.cluster.Scale._

val tdaParams =
      TDAParams(
        lens = TDALens(Filter(PrincipalComponent(0), 20, 0.6)),
        clusteringParams = ClusteringParams(),
        scaleSelection = biggestGap())

in$.onNext(tdaParams)

Inititalize rdd

In this example, we are using a synthetic torus-shaped 2D data set.


In [10]:
import org.apache.spark.rdd.RDD
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.mllib.linalg.Vectors.dense

def readData(file: String) = 
    sc.
        textFile(file).
        map(_.split(",").map(trim)).
        zipWithIndex.
        map{ case (Array(x, y, z), idx) => dp(idx, dense(x.toDouble, y.toDouble, z.toDouble))}

In [11]:
val data_path = "/Users/tmo/Work/batiskav/projects/plongeur/scala/plongeur-spark/src/test/resources/data/"

val horse = data_path + "horse.csv"

val rdd = readData(horse).cache

val ctx = TDAContext(sc, rdd)

In [12]:
rdd.count


Out[12]:
8431

Turn a TDAResult into a data structure.


In [13]:
val r = scala.util.Random

def format(result: TDAResult) = Map(
    "nodes" -> result.clusters.map(c =>
      Map(
        "id"     -> c.id.toString,        
        "size"   -> c.dataPoints.size,
        "color"  -> c.colours.headOption.getOrElse("#000000"),
        "x"      -> r.nextInt(100),
        "y"      -> r.nextInt(100))),
        "edges" -> result.edges.map(e => {
            val (from, to) = e.toArray match {case Array(f, t) => (f, t)}

      Map(
        "id"     -> s"$from--$to",
        "source" -> from.toString,
        "target" -> to.toString)}))

Run the machine, obtaining an Observable of TDAResult instances


In [14]:
val out$: Observable[TDAResult] = TDAMachine.run(ctx, in$)

In [15]:
val out$_subRef = SubRef()

In [16]:
out$_subRef.update(
    out$.subscribe(
        onNext = (r) => channel("ch_TDA_1").set("result", format(r)),
        onError = (e) => println("Error in TDA machine: ", e)))

Reactive inputs

First, we set up a stream of updates to BASE TDAParams instance.


In [17]:
val pipe$_subRef = SubRef()

val nrBins$    = PublishSubject[Int]
val overlap$   = PublishSubject[Percentage]
val scaleBins$ = PublishSubject[Int]
val collapse$  = PublishSubject[Boolean]

In [18]:
channel("ch_TDA_1").watch("nrBins",  (_: Any, v: Int) => nrBins$.onNext(v))
channel("ch_TDA_1").watch("overlap", (_: Any, v: Int) => overlap$.onNext(BigDecimal(v) / 100))
channel("ch_TDA_1").watch("scaleBins", (_: Any, v: Int) => scaleBins$.onNext(v))
channel("ch_TDA_1").watch("collapse", (_: Any, v: Boolean) => collapse$.onNext(v))

In [19]:
import TDAParams._

val ecc = Filter(Eccentricity(Right(INFINITY)), 69, 0.69)

val pc0 = Filter(PrincipalComponent(0), 20, 0.3)
val pc1 = Filter(PrincipalComponent(1), 20, 0.3)

val avgFilterValue = AverageFilterValue(Brewer.palettes("PuOr")(9), ecc)

val BASE = 
    TDAParams(
        lens = TDALens(ecc),
        clusteringParams = ClusteringParams(),
        scaleSelection = biggestGap(),
        collapseDuplicateClusters = false,
        colouring = avgFilterValue)

val params$ =
    List(
        nrBins$.map(v => setFilterNrBins(0, v)),
        overlap$.map(v => setFilterOverlap(0, v)),
        scaleBins$.map(v => setHistogramScaleSelectionNrBins(v)),
        collapse$.map(v => (params: TDAParams) => params.copy(collapseDuplicateClusters = v))).
    reduce(_ merge _).
    scan(BASE)((params, fn) => fn(params))

pipe$_subRef.update(params$.subscribe(in$))

channel("ch_TDA_1").set("nrBins", BASE.lens.filters(0).nrBins)
channel("ch_TDA_1").set("overlap", (BASE.lens.filters(0).overlap * 100).toInt)
channel("ch_TDA_1").set("scaleBins", 50)
channel("ch_TDA_1").set("collapse", BASE.collapseDuplicateClusters)


Out[19]:
Name: Compile Error
Message: <console>:121: error: not found: value setHistogramScaleSelectionNrBins
               scaleBins$.map(v => setHistogramScaleSelectionNrBins(v)),
                                   ^
StackTrace: 

We create two slider widgets that provide the inputs for the nrBins$ and overlap$ Observables.


In [20]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>  
    <table style="border-style: hidden;">
        <tr style="border-style: hidden;">
            <th style="border-style: hidden;">nr of cover bins</th>
            <td style="border-style: hidden;">
                <paper-slider min="0" max="100" step="1" value="{{nrBins}}" style="width: 500px;"></paper-slider>
            </td>
            <td style="border-style: hidden;">[[nrBins]]</td>
        </tr>
        <tr style="border-style: hidden;">
            <th style="border-style: hidden;">overlap</th>
            <td style="border-style: hidden;">
                <paper-slider min="0" max="75" step="1" value="{{overlap}}" style="width: 500px;"></paper-slider>
            </td>
            <td style="border-style: hidden;">[[overlap]]%</td>
        </tr>
        <tr style="border-style: hidden;">
            <th style="border-style: hidden;">nr of scale bins</th>
            <td style="border-style: hidden;">
                <paper-slider min="5" max="150" step="1" value="{{scaleBins}}" style="width: 500px;"></paper-slider>
            </td>
            <td style="border-style: hidden;">[[scaleBins]]</td>
        </tr>
        <tr style="border-style: hidden;">
            <th style="border-style: hidden;">collapse duplicates</th>
            <td style="border-style: hidden;">
                <paper-toggle-button checked="{{collapse}}"/>
            </td>            
        </tr>
    </table>        
</template>


Out[20]:

In [21]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>    
    <plongeur-graph height="500" data="{{result}}"></plongeur-graph>
</template>


Out[21]:

In [22]:
%%html
<template is='urth-core-bind' channel='data'>    
    <urth-viz-scatter
        datarows='[[raw]]'
        primary='0'
        secondary='1'        
        />
</template>


Out[22]:

In [23]:
val rawData = rdd.
    map(dp => {
        val x = dp.features(2)
        val y = dp.features(1)        
        List(x, y)}).collect.toList

rawData.take(3)


Out[23]:
List(List(-0.379364, 0.59686), List(-0.378438, 0.602476), List(-0.383195, 0.590327))

In [24]:
channel("data").set("raw", rawData)

In [ ]:


In [ ]: