Plongeur

A topological data analysis library.

Core algorithm written in Scala, using Apache Spark.

Executed in a Jupyter notebook, using the Apache Toree kernel and declarative widgets.

Graphs rendered with Sigma/Linkurious, wrapped in a Polymer component.

Reactive machinery powered by Rx RxScala.

Maven dependencies


In [1]:
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2
%AddDeps com.github.haifengl smile-core 1.1.0 --transitive
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4
%AddDeps com.chuusai shapeless_2.10 2.3.0 --repository https://oss.sonatype.org/content/repositories/releases/
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.21 --repository file:/Users/tmo/.m2/repository


Marking org.apache.spark:spark-mllib_2.10:1.6.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/https/repo1.maven.org/maven2/org/apache/spark/spark-mllib_2.10/1.6.2/spark-mllib_2.10-1.6.2.jar
Marking com.github.haifengl:smile-core:1.1.0 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/https/repo1.maven.org/maven2/com/github/haifengl/smile-graph/1.1.0/smile-graph-1.1.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/https/repo1.maven.org/maven2/com/github/haifengl/smile-data/1.1.0/smile-data-1.1.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/https/repo1.maven.org/maven2/com/github/haifengl/smile-math/1.1.0/smile-math-1.1.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/https/repo1.maven.org/maven2/com/github/haifengl/smile-core/1.1.0/smile-core-1.1.0.jar
Marking io.reactivex:rxscala_2.10:0.26.1 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/https/repo1.maven.org/maven2/io/reactivex/rxscala_2.10/0.26.1/rxscala_2.10-0.26.1.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/https/repo1.maven.org/maven2/io/reactivex/rxjava/1.1.1/rxjava-1.1.1.jar
Marking com.softwaremill.quicklens:quicklens_2.10:1.4.4 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/https/repo1.maven.org/maven2/com/softwaremill/quicklens/quicklens_2.10/1.4.4/quicklens_2.10-1.4.4.jar
Marking com.chuusai:shapeless_2.10:2.3.0 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/
-> https://oss.sonatype.org/content/repositories/releases/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/https/oss.sonatype.org/content/repositories/releases/com/chuusai/shapeless_2.10/2.3.0/shapeless_2.10-2.3.0.jar
Marking org.tmoerman:plongeur-spark_2.10:0.3.21 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps4602500660601867771/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/org/tmoerman/plongeur-spark_2.10/0.3.21/plongeur-spark_2.10-0.3.21.jar

In [2]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar


Starting download from http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar
Finished download of declarativewidgets.jar

Import classes


In [3]:
import rx.lang.scala.{Observer, Subscription, Observable}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.subjects._

import shapeless.HNil

import org.tmoerman.plongeur.tda._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.cluster.Clustering._
import org.tmoerman.plongeur.tda.cluster.Scale._

import declarativewidgets._
initWidgets

import declarativewidgets.WidgetChannels.channel


Import polymer elements

These cells triggers Bower installations of the specified web components.

If it doesn't work, check whether Bower has sufficient permissions to install in the jupyter /nbextensions folder.


In [4]:
%%html
<link rel='import' href='urth_components/paper-slider/paper-slider.html' 
        is='urth-core-import' package='PolymerElements/paper-slider'>
<link rel='import' href='urth_components/paper-button/paper-button.html' 
        is='urth-core-import' package='PolymerElements/paper-button'>
<link rel='import' href='urth_components/plongeur-graph/plongeur-graph.html' 
        is='urth-core-import' package='tmoerman/plongeur-graph'>
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>


Out[4]:

Reactive TDA Machine

Instantiate a PublishSubject. This stream of TDAParams instances represents the input of a TDAMachine. The PublishSubject listens to changes and sets these to the channel "ch_TDA_1" under the "params" key.

TODO: unsubscribe previous on re-evaluation


In [5]:
val in$ = PublishSubject[TDAParams]

val in$_sub = in$.subscribe(p => channel("ch_TDA_1").set("params", p.toString))

Create an initial TDAParams instance. In the same cell, we submit the instance to the PublishSubject.


In [6]:
val tdaParams =
      TDAParams(
        lens = TDALens(
          Filter("feature" :: 0 :: HNil, 5, 0.25),
          Filter("feature" :: 1 :: HNil, 5, 0.25)),
        clusteringParams = ClusteringParams(),
        scaleSelection = histogram(10))

in$.onNext(tdaParams)

Specify alternative parameters.


In [28]:
val tdaParams_ALT =
      TDAParams(
        lens = TDALens(
          Filter("PCA" :: 0 :: HNil, 10, 0.25)),
        clusteringParams = ClusteringParams(),
        scaleSelection = histogram(10))

in$.onNext(tdaParams_ALT)

For the sake of illustration, we create an html snippet that listens to changes on the "ch_TDA_1" channel and displays the value of the "params" key.


In [8]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>  
    <div style='background: #FFB; padding: 10px;'>
        <span style='font-family: "Courier"'>[[params]]</span>
    </div>
</template>


Out[8]:

Notice that when we evaluate the TDAParams instantiation cells, the output of the yellow box changes.

Inititalize rdd

In this example, we are using a synthetic torus-shaped 2D data set.


In [9]:
import org.apache.spark.rdd.RDD
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.mllib.linalg.Vectors.dense

def readCircle(file: String) = 
    sc.
        textFile(file).
        map(_.split(",").map(trim)).
        zipWithIndex.
        map{ case (Array(x, y), idx) => dp(idx, dense(x.toDouble, y.toDouble))}

In [10]:
val data_path = "/Users/tmo/Work/batiskav/projects/plongeur/scala/plongeur-spark/src/test/resources/data/"

val circle_1k_path = data_path + "circle.1k.csv"

val rdd = readCircle(circle_1k_path).cache

In [11]:
val ctx = TDAContext(sc, rdd)

Run the machine, obtaining an Observable of TDAResult instances


In [12]:
val out$: Observable[(TDAParams, TDAResult)] = TDAMachine.run(ctx, in$)

Let's take a look at the raw data.


In [13]:
val rawData = rdd.
    map(dp => {
        val x = dp.features(0)
        val y = dp.features(1)        
        List(x, y)}).collect.toList

rawData.take(3)


Out[13]:
List(List(-8.75435473500198, 2.96452013439886), List(-4.31371323296644, 7.35714500394287), List(-10.0269614176788, 5.28996510154708))

DeclarativeWidgets transforms the List[List[Double]] into a json data structure.


In [14]:
channel("data").set("raw", rawData)

In [15]:
%%html
<template is='urth-core-bind' channel='data'>    
    <urth-viz-scatter
        datarows='[[raw]]'
        primary='0'
        secondary='1'        
        />
</template>


Out[15]:

Turn a TDAResult into a data structure.


In [16]:
val r = scala.util.Random

def format(result: TDAResult) = Map(
    "nodes" -> result.clusters.map(c =>
      Map(
        "id"     -> c.id.toString,
        "label"  -> c.id.toString,
        "size"   -> c.dataPoints.size,
        "x"      -> r.nextInt(100),
        "y"      -> r.nextInt(100))),
    "edges" -> result.edges.map(e => {
      val (from, to) = e.toArray match {case Array(f, t) => (f, t)}

      Map(
        "id"     -> s"$from--$to",
        "source" -> from.toString,
        "target" -> to.toString)}))

In [17]:
val out_sub = out$.subscribe(
    onNext = (t) => t match {case (p, r) => channel("ch_TDA_1").set("result", format(r))},
    onError = (e) => println("Error in TDA machine: ", e))

In [18]:
//out_sub.unsubscribe()


Out[18]:
Name: Syntax Error.
Message: 
StackTrace: 

Reactive inputs

First, we set up a stream of updates to BASE TDAParams instance.


In [19]:
import TDAParams._

val BASE = 
    TDAParams(
        lens = TDALens(
          Filter("feature" :: 0 :: HNil, 20, 0.6)),
        clusteringParams = ClusteringParams(),
        scaleSelection = histogram(10))

val nrBins$ = PublishSubject[Int]

val overlap$ = PublishSubject[Percentage]

val params$ =
    List(
        nrBins$.map(v => setFilterNrBins(0, v)),
        overlap$.map(v => setFilterOverlap(0, v))).
    reduce(_ merge _).
    scan(BASE)((params, fn) => fn(params))

We now have an Observable of TDAParams instances, named params$. We need to pipe this to the in Observable we defined earlier. Conceptually, this is like welding pipes.


In [20]:
val params_to_in_sub = params$.subscribe(in$)

Now, we need to set up channel listeners that capture input events and fire signals on the nrBins$ and overlap$ Subjects.


In [21]:
channel("ch_TDA_1").set("nrBins", 10)
channel("ch_TDA_1").set("overlap", 60)

channel("ch_TDA_1").watch("nrBins",  (_: Any, v: Int) => nrBins$.onNext(v))
channel("ch_TDA_1").watch("overlap", (_: Any, v: Int) => overlap$.onNext(BigDecimal(v) / 100))

Set initial channel values.


In [22]:
channel("ch_TDA_1").set("nrBins", BASE.lens.filters(0).nrBins)
channel("ch_TDA_1").set("overlap", (BASE.lens.filters(0).overlap * 100).toInt)

We create two slider widgets that provide the inputs for the nrBins$ and overlap$ Observables.


In [23]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>  
    <table style="border-style: hidden;">
        <tr style="border-style: hidden;">
            <th style="border-style: hidden;">nr of bins</th>
            <td style="border-style: hidden;">
                <paper-slider min="10" max="100" step="1" value="{{nrBins}}"></paper-slider>
            </td>
            <td style="border-style: hidden;">[[nrBins]]</td>
        </tr>
        <tr style="border-style: hidden;">
            <th style="border-style: hidden;">overlap</th>
            <td style="border-style: hidden;">
                <paper-slider min="0" max="75" step="1" value="{{overlap}}"></paper-slider>
            </td>
            <td style="border-style: hidden;">[[overlap]]%</td>
        </tr>
    </table>        
</template>


Out[23]:

In [24]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>    
    <plongeur-graph data="{{result}}"></plongeur-graph>
</template>


Out[24]:

If everything went according to plan, the graph above displays a topological summary of the input data point cloud.


In [ ]:


In [ ]:


In [ ]:


In [ ]:


In [ ]: