Plongeur

A topological data analysis library.

Core algorithm written in Scala, using Apache Spark.

Executed in a Jupyter notebook, using the Apache Toree kernel and declarative widgets.

Graphs rendered with Sigma/Linkurious, wrapped in a Polymer component.

Reactive machinery powered by Rx RxScala.

Maven dependencies


In [2]:
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2 --repository file:/Users/tmo/.m2/repository
%AddDeps org.scalanlp breeze_2.10 0.11.2 --transitive
%AddDeps org.scalanlp breeze-natives_2.10 0.11.2
%AddDeps org.scalanlp breeze-macros_2.10 0.11.2
%AddDeps com.github.haifengl smile-core 1.2.0 --transitive
%AddDeps com.github.karlhigley spark-neighbors_2.10 0.3.6-FORK --repository file:/Users/tmo/.m2/repository
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive --repository file:/Users/tmo/.m2/repository 
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4 --repository file:/Users/tmo/.m2/repository
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.51 --repository file:/Users/tmo/.m2/repository


Marking org.apache.spark:spark-mllib_2.10:1.6.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/org/apache/spark/spark-mllib_2.10/1.6.2/spark-mllib_2.10-1.6.2.jar
Marking org.scalanlp:breeze_2.10:0.11.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/org/scalanlp/breeze-macros_2.10/0.11.2/breeze-macros_2.10-0.11.2.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/junit/junit/4.8.2/junit-4.8.2.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/com/github/rwl/jtransforms/2.4.0/jtransforms-2.4.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/org/spire-math/spire-macros_2.10/0.7.4/spire-macros_2.10-0.7.4.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/com/github/fommil/netlib/core/1.1.2/core-1.1.2.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/net/sourceforge/f2j/arpack_combined_all/0.1/arpack_combined_all-0.1.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/org/apache/commons/commons-math3/3.2/commons-math3-3.2.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/org/spire-math/spire_2.10/0.7.4/spire_2.10-0.7.4.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/org/scalanlp/breeze_2.10/0.11.2/breeze_2.10-0.11.2.jar
Marking org.scalanlp:breeze-natives_2.10:0.11.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/org/scalanlp/breeze-natives_2.10/0.11.2/breeze-natives_2.10-0.11.2.jar
Marking org.scalanlp:breeze-macros_2.10:0.11.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/org/scalanlp/breeze-macros_2.10/0.11.2/breeze-macros_2.10-0.11.2.jar
Marking com.github.haifengl:smile-core:1.2.0 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/com/github/haifengl/smile-data/1.2.0/smile-data-1.2.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/com/github/haifengl/smile-graph/1.2.0/smile-graph-1.2.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/com/github/haifengl/smile-math/1.2.0/smile-math-1.2.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/com/github/haifengl/smile-core/1.2.0/smile-core-1.2.0.jar
Marking com.github.karlhigley:spark-neighbors_2.10:0.3.6-FORK for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/com/github/karlhigley/spark-neighbors_2.10/0.3.6-FORK/spark-neighbors_2.10-0.3.6-FORK.jar
Marking io.reactivex:rxscala_2.10:0.26.1 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/io/reactivex/rxscala_2.10/0.26.1/rxscala_2.10-0.26.1.jar
-> New file at /Users/tmo/.m2/repository/io/reactivex/rxjava/1.1.1/rxjava-1.1.1.jar
Marking com.softwaremill.quicklens:quicklens_2.10:1.4.4 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/https/repo1.maven.org/maven2/com/softwaremill/quicklens/quicklens_2.10/1.4.4/quicklens_2.10-1.4.4.jar
Marking org.tmoerman:plongeur-spark_2.10:0.3.51 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps5762558224610635465/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/org/tmoerman/plongeur-spark_2.10/0.3.51/plongeur-spark_2.10-0.3.51.jar

In [3]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar


Starting download from http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar
Finished download of declarativewidgets.jar

Import classes


In [4]:
import rx.lang.scala.{Observer, Subscription, Observable}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.subjects._

import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors.dense
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime
import org.tmoerman.plongeur.tda.TDAMachine
import org.tmoerman.plongeur.tda.Distances._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.Filters._
import org.tmoerman.plongeur.tda.cluster.Clustering._
import org.tmoerman.plongeur.tda.cluster.Scale._
import org.tmoerman.plongeur.tda.Colour._
import org.tmoerman.plongeur.tda.Brewer
import org.tmoerman.plongeur.tda.LSH.LSHParams
import org.tmoerman.plongeur.tda.Model.{DataPoint, TDAContext, dp}
import org.tmoerman.plongeur.tda.knn.FastKNN.FastKNNParams
import org.tmoerman.plongeur.tda.knn.SampledKNN.SampledKNNParams
import org.tmoerman.plongeur.tda.knn.{FastKNN, SampledKNN, _}
import org.tmoerman.plongeur.util.RDDFunctions._
import org.tmoerman.plongeur.util.TimeUtils.time
import org.tmoerman.plongeur.tda.geometry.Laplacian._
import breeze.stats.distributions._
import org.apache.spark.mllib.linalg.SparseMatrix

In [5]:
import declarativewidgets._
initWidgets

import declarativewidgets.WidgetChannels.channel



In [6]:
import java.util.concurrent.atomic.AtomicReference

case class SubRef(val ref: AtomicReference[Option[Subscription]] = new AtomicReference[Option[Subscription]](None)) extends Serializable {

    def update(sub: Subscription): Unit = ref.getAndSet(Option(sub)).foreach(old => old.unsubscribe())

    def reset(): Unit = update(null)

}

Import polymer elements

These cells triggers Bower installations of the specified web components.

If it doesn't work, check whether Bower has sufficient permissions to install in the jupyter /nbextensions folder.


In [7]:
%%html
<link rel='import' href='urth_components/paper-slider/paper-slider.html' 
        is='urth-core-import' package='PolymerElements/paper-slider'>
<link rel='import' href='urth_components/paper-button/paper-button.html' 
        is='urth-core-import' package='PolymerElements/paper-button'>
<link rel='import' href='urth_components/plongeur-graph/plongeur-graph.html' 
        is='urth-core-import' package='tmoerman/plongeur-graph'>
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>


Out[7]:

Reactive TDA Machine

Keep references to Rx subscriptions apart.


In [8]:
val in$_subRef = SubRef()

Instantiate a PublishSubject. This stream of TDAParams instances represents the input of a TDAMachine. The PublishSubject listens to changes and sets these to the channel "ch_TDA_1" under the "params" key.

TODO: unsubscribe previous on re-evaluation


In [9]:
val in$ = PublishSubject[TDAParams]

in$_subRef.update(in$.subscribe(p => channel("ch_TDA_1").set("params", p.toString)))

Create an initial TDAParams instance. In the same cell, we submit the instance to the PublishSubject.

For the sake of illustration, we create an html snippet that listens to changes on the "ch_TDA_1" channel and displays the value of the "params" key.


In [10]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>  
    <div style='background: #FFB; padding: 10px;'>
        <span style='font-family: "Courier"'>[[params]]</span>
    </div>
</template>


Out[10]:

Notice that when we evaluate the TDAParams instantiation cells, the output of the yellow box changes.

Inititalize rdd

In this example, we are using a synthetic torus-shaped 2D data set.


In [11]:
import org.apache.spark.rdd.RDD
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.mllib.linalg.Vectors.dense

def readMixture(file: String): RDD[DataPoint] = {
  sc
    .textFile(file)
    .zipWithIndex
    .map{ case (line, idx) =>
      val columns = line.split(",").map(trim)
      val category = columns.head
      val features = columns.tail.map(_.toDouble)

      dp(idx, dense(features), Map("cat" -> category)) }
}

In [12]:
val data_path = "/Users/tmo/Work/batiskav/projects/plongeur/scala/plongeur-spark/src/test/resources/data/"

val mixture_path = data_path + "mixture.1000.4.csv"

val rdd = readMixture(mixture_path).cache

val ctx = TDAContext(sc, rdd)

In [13]:
rdd.count


Out[13]:
1250

Turn a TDAResult into a data structure.


In [14]:
val r = scala.util.Random

def format(result: TDAResult) = Map(
    "nodes" -> result.clusters.map(c =>
      Map(
        "id"     -> c.id.toString,
        "size"   -> c.dataPoints.size,
        "color"  -> c.colours.headOption.getOrElse("#000000"),
        "x"      -> r.nextInt(100),
        "y"      -> r.nextInt(100))),
    "edges" -> result.edges.map(e => {
      val (from, to) = e.toArray match {case Array(f, t) => (f, t)}

      Map(
        "id"     -> s"$from--$to",
        "source" -> from.toString,
        "target" -> to.toString)}))

Run the machine, obtaining an Observable of TDAResult instances


In [15]:
val out$: Observable[TDAResult] = TDAMachine.run(ctx, in$)

In [16]:
val out$_subRef = SubRef()

In [17]:
out$_subRef.update(
    out$.subscribe(
        onNext = (r) => channel("ch_TDA_1").set("result", format(r)),
        onError = (e) => println("Error in TDA machine: ", e)))

Reactive inputs

First, we set up a stream of updates to BASE TDAParams instance.


In [18]:
val pipe$_subRef = SubRef()

In [19]:
import org.tmoerman.plongeur.ui.Controls._
kernel.magics.html(controlsCSS)


Out[19]:

In [22]:
val dist = LpNormDistance(0.5)

In [39]:
import TDAParams._

val den = Filter(Density(sigma=1.0), 30, 0.30)
val ecc = Eccentricity(Right(INFINITY), distance = dist)

val pc0 = Filter(PrincipalComponent(0), 20, 0.3)
val pc1 = Filter(PrincipalComponent(1), 20, 0.3)

val selector = (d: DataPoint) => d.meta.get("cat")
val maxFq = ClusterMaxFrequency(Array("#F00", "#00F", "#999"), selector)
val avgFilterValue = AverageFilterValue(Brewer.palettes("PuOr")(9), den)

val BASE = 
    TDAParams(
        lens = TDALens(pc0, ecc),
        clusteringParams = ClusteringParams(distance = dist),
        scaleSelection = firstGap(5),
        collapseDuplicateClusters = false,
        colouring = maxFq)

in$.onNext(BASE)

In [20]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>    
    <plongeur-graph height="600" data="{{result}}"></plongeur-graph>
</template>


Out[20]:

In [32]:
val (sub, html) = BASE.makeControls(channel("ch_TDA_1"), in$)
pipe$_subRef.update(sub)
kernel.magics.html(html)


Out[32]:

In [22]:
val rawData = rdd.
    map(dp => dp.features.toArray.toList).collect.toList

rawData.take(1)


Out[22]:
List(List(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.31263523311422, 0.46194504144512, -1.27540921236158, -0.442788955630807, 1.31844132365137, 1.74303388097402, -1.87478975018266, -0.250519039963329, 1.65695482393358, -0.385716027544235, -0.588364158427196, 1.74622062132631))

In [23]:
channel("data").set("raw", rawData)

In [24]:
%%html
<template is='urth-core-bind' channel='data'>    
    <urth-viz-scatter
        datarows='[[raw]]'
        primary='0'
        secondary='1'        
        />
</template>


Out[24]:

In [ ]: