A topological data analysis library.
Core algorithm written in Scala, using Apache Spark.
Executed in a Jupyter notebook, using the Apache Toree kernel and declarative widgets.
Graphs rendered with Sigma/Linkurious, wrapped in a Polymer component.
In [1]:
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2
%AddDeps com.github.haifengl smile-core 1.1.0 --transitive
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4
%AddDeps com.chuusai shapeless_2.10 2.3.0 --repository https://oss.sonatype.org/content/repositories/releases/
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.21 --repository file:/Users/tmo/.m2/repository
In [2]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar
In [3]:
import rx.lang.scala.{Observer, Subscription, Observable}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.subjects._
import shapeless.HNil
import org.tmoerman.plongeur.tda._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.cluster.Clustering._
import org.tmoerman.plongeur.tda.cluster.Scale._
import declarativewidgets._
initWidgets
import declarativewidgets.WidgetChannels.channel
In [4]:
%%html
<link rel='import' href='urth_components/paper-slider/paper-slider.html'
is='urth-core-import' package='PolymerElements/paper-slider'>
<link rel='import' href='urth_components/paper-button/paper-button.html'
is='urth-core-import' package='PolymerElements/paper-button'>
<link rel='import' href='urth_components/plongeur-graph/plongeur-graph.html'
is='urth-core-import' package='tmoerman/plongeur-graph'>
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>
Out[4]:
Instantiate a PublishSubject. This stream of TDAParams instances represents the input of a TDAMachine. The PublishSubject listens to changes and sets these to the channel "ch_TDA_1" under the "params" key.
TODO: unsubscribe previous on re-evaluation
In [5]:
val in$ = PublishSubject[TDAParams]
val in$_sub = in$.subscribe(p => channel("ch_TDA_1").set("params", p.toString))
Create an initial TDAParams instance. In the same cell, we submit the instance to the PublishSubject.
In [6]:
val tdaParams =
TDAParams(
lens = TDALens(
Filter("feature" :: 0 :: HNil, 5, 0.25),
Filter("feature" :: 1 :: HNil, 5, 0.25)),
clusteringParams = ClusteringParams(),
scaleSelection = histogram(10))
in$.onNext(tdaParams)
Specify alternative parameters.
In [28]:
val tdaParams_ALT =
TDAParams(
lens = TDALens(
Filter("PCA" :: 0 :: HNil, 10, 0.25)),
clusteringParams = ClusteringParams(),
scaleSelection = histogram(10))
in$.onNext(tdaParams_ALT)
For the sake of illustration, we create an html snippet that listens to changes on the "ch_TDA_1" channel and displays the value of the "params" key.
In [8]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>
<div style='background: #FFB; padding: 10px;'>
<span style='font-family: "Courier"'>[[params]]</span>
</div>
</template>
Out[8]:
Notice that when we evaluate the TDAParams instantiation cells, the output of the yellow box changes.
In [9]:
import org.apache.spark.rdd.RDD
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.mllib.linalg.Vectors.dense
def readCircle(file: String) =
sc.
textFile(file).
map(_.split(",").map(trim)).
zipWithIndex.
map{ case (Array(x, y), idx) => dp(idx, dense(x.toDouble, y.toDouble))}
In [10]:
val data_path = "/Users/tmo/Work/batiskav/projects/plongeur/scala/plongeur-spark/src/test/resources/data/"
val circle_1k_path = data_path + "circle.1k.csv"
val rdd = readCircle(circle_1k_path).cache
In [11]:
val ctx = TDAContext(sc, rdd)
Run the machine, obtaining an Observable of TDAResult instances
In [12]:
val out$: Observable[(TDAParams, TDAResult)] = TDAMachine.run(ctx, in$)
Let's take a look at the raw data.
In [13]:
val rawData = rdd.
map(dp => {
val x = dp.features(0)
val y = dp.features(1)
List(x, y)}).collect.toList
rawData.take(3)
Out[13]:
DeclarativeWidgets transforms the List[List[Double]] into a json data structure.
In [14]:
channel("data").set("raw", rawData)
In [15]:
%%html
<template is='urth-core-bind' channel='data'>
<urth-viz-scatter
datarows='[[raw]]'
primary='0'
secondary='1'
/>
</template>
Out[15]:
Turn a TDAResult into a data structure.
In [16]:
val r = scala.util.Random
def format(result: TDAResult) = Map(
"nodes" -> result.clusters.map(c =>
Map(
"id" -> c.id.toString,
"label" -> c.id.toString,
"size" -> c.dataPoints.size,
"x" -> r.nextInt(100),
"y" -> r.nextInt(100))),
"edges" -> result.edges.map(e => {
val (from, to) = e.toArray match {case Array(f, t) => (f, t)}
Map(
"id" -> s"$from--$to",
"source" -> from.toString,
"target" -> to.toString)}))
In [17]:
val out_sub = out$.subscribe(
onNext = (t) => t match {case (p, r) => channel("ch_TDA_1").set("result", format(r))},
onError = (e) => println("Error in TDA machine: ", e))
In [18]:
//out_sub.unsubscribe()
Out[18]:
In [19]:
import TDAParams._
val BASE =
TDAParams(
lens = TDALens(
Filter("feature" :: 0 :: HNil, 20, 0.6)),
clusteringParams = ClusteringParams(),
scaleSelection = histogram(10))
val nrBins$ = PublishSubject[Int]
val overlap$ = PublishSubject[Percentage]
val params$ =
List(
nrBins$.map(v => setFilterNrBins(0, v)),
overlap$.map(v => setFilterOverlap(0, v))).
reduce(_ merge _).
scan(BASE)((params, fn) => fn(params))
We now have an Observable of TDAParams instances, named params$. We need to pipe this to the in Observable we defined earlier. Conceptually, this is like welding pipes.
In [20]:
val params_to_in_sub = params$.subscribe(in$)
Now, we need to set up channel listeners that capture input events and fire signals on the nrBins$ and overlap$ Subjects.
In [21]:
channel("ch_TDA_1").set("nrBins", 10)
channel("ch_TDA_1").set("overlap", 60)
channel("ch_TDA_1").watch("nrBins", (_: Any, v: Int) => nrBins$.onNext(v))
channel("ch_TDA_1").watch("overlap", (_: Any, v: Int) => overlap$.onNext(BigDecimal(v) / 100))
Set initial channel values.
In [22]:
channel("ch_TDA_1").set("nrBins", BASE.lens.filters(0).nrBins)
channel("ch_TDA_1").set("overlap", (BASE.lens.filters(0).overlap * 100).toInt)
We create two slider widgets that provide the inputs for the nrBins$ and overlap$ Observables.
In [23]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>
<table style="border-style: hidden;">
<tr style="border-style: hidden;">
<th style="border-style: hidden;">nr of bins</th>
<td style="border-style: hidden;">
<paper-slider min="10" max="100" step="1" value="{{nrBins}}"></paper-slider>
</td>
<td style="border-style: hidden;">[[nrBins]]</td>
</tr>
<tr style="border-style: hidden;">
<th style="border-style: hidden;">overlap</th>
<td style="border-style: hidden;">
<paper-slider min="0" max="75" step="1" value="{{overlap}}"></paper-slider>
</td>
<td style="border-style: hidden;">[[overlap]]%</td>
</tr>
</table>
</template>
Out[23]:
In [24]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>
<plongeur-graph data="{{result}}"></plongeur-graph>
</template>
Out[24]:
If everything went according to plan, the graph above displays a topological summary of the input data point cloud.
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: