A topological data analysis library.
Core algorithm written in Scala, using Apache Spark.
Executed in a Jupyter notebook, using the Apache Toree kernel and declarative widgets.
Graphs rendered with Sigma/Linkurious, wrapped in a Polymer component.
In [5]:
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2 --repository file:/Users/tmo/.m2/repository
%AddDeps com.github.haifengl smile-core 1.1.0 --transitive --repository file:/Users/tmo/.m2/repository
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive --repository file:/Users/tmo/.m2/repository
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4 --repository file:/Users/tmo/.m2/repository
%AddDeps com.chuusai shapeless_2.10 2.3.0 --repository https://oss.sonatype.org/content/repositories/releases/ --repository file:/Users/tmo/.m2/repository
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.21 --repository file:/Users/tmo/.m2/repository
In [6]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar
In [7]:
import rx.lang.scala.{Observer, Subscription, Observable}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.subjects._
import shapeless.HNil
import org.tmoerman.plongeur.tda._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.cluster.Clustering._
import org.tmoerman.plongeur.tda.cluster.Scale._
import declarativewidgets._
initWidgets
import declarativewidgets.WidgetChannels.channel
In [8]:
import java.util.concurrent.atomic.AtomicReference
case class SubRef(val ref: AtomicReference[Option[Subscription]] = new AtomicReference[Option[Subscription]](None)) extends Serializable {
def update(sub: Subscription): Unit = ref.getAndSet(Option(sub)).foreach(old => old.unsubscribe())
def reset(): Unit = update(null)
}
In [9]:
%%html
<link rel='import' href='urth_components/paper-slider/paper-slider.html'
is='urth-core-import' package='PolymerElements/paper-slider'>
<link rel='import' href='urth_components/paper-button/paper-button.html'
is='urth-core-import' package='PolymerElements/paper-button'>
<link rel='import' href='urth_components/plongeur-graph/plongeur-graph.html'
is='urth-core-import' package='tmoerman/plongeur-graph'>
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>
Out[9]:
Keep references to Rx subscriptions apart.
In [10]:
val in$_subRef = SubRef()
Instantiate a PublishSubject. This stream of TDAParams instances represents the input of a TDAMachine. The PublishSubject listens to changes and sets these to the channel "ch_TDA_1" under the "params" key.
TODO: unsubscribe previous on re-evaluation
In [11]:
val in$ = PublishSubject[TDAParams]
in$_subRef.update(in$.subscribe(p => channel("ch_TDA_1").set("params", p.toString)))
Create an initial TDAParams instance. In the same cell, we submit the instance to the PublishSubject.
For the sake of illustration, we create an html snippet that listens to changes on the "ch_TDA_1" channel and displays the value of the "params" key.
In [12]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>
<div style='background: #FFB; padding: 10px;'>
<span style='font-family: "Courier"'>[[params]]</span>
</div>
</template>
Out[12]:
Notice that when we evaluate the TDAParams instantiation cells, the output of the yellow box changes.
In [13]:
import org.apache.spark.rdd.RDD
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.mllib.linalg.Vectors.dense
def readCircle(file: String) =
sc.
textFile(file).
map(_.split(",").map(trim)).
zipWithIndex.
map{ case (Array(x, y), idx) => dp(idx, dense(x.toDouble, y.toDouble))}
In [14]:
val data_path = "/Users/tmo/Work/batiskav/projects/plongeur/scala/plongeur-spark/src/test/resources/data/"
val circle_1k_path = data_path + "circle.1k.csv"
val rdd = readCircle(circle_1k_path).cache
val ctx = TDAContext(sc, rdd)
Turn a TDAResult into a data structure.
In [15]:
val r = scala.util.Random
def format(result: TDAResult) = Map(
"nodes" -> result.clusters.map(c =>
Map(
"id" -> c.id.toString,
"label" -> c.id.toString,
"size" -> c.dataPoints.size,
"x" -> r.nextInt(100),
"y" -> r.nextInt(100))),
"edges" -> result.edges.map(e => {
val (from, to) = e.toArray match {case Array(f, t) => (f, t)}
Map(
"id" -> s"$from--$to",
"source" -> from.toString,
"target" -> to.toString)}))
Run the machine, obtaining an Observable of TDAResult instances
In [16]:
val out$: Observable[(TDAParams, TDAResult)] = TDAMachine.run(ctx, in$)
In [17]:
val out$_subRef = SubRef()
In [18]:
out$_subRef.update(
out$.subscribe(
onNext = (t) => t match {case (p, r) => channel("ch_TDA_1").set("result", format(r))},
onError = (e) => println("Error in TDA machine: ", e)))
In [19]:
val pipe$_subRef = SubRef()
val nrBins$ = PublishSubject[Int]
val overlap$ = PublishSubject[Percentage]
In [20]:
channel("ch_TDA_1").watch("nrBins", (_: Any, v: Int) => nrBins$.onNext(v))
channel("ch_TDA_1").watch("overlap", (_: Any, v: Int) => overlap$.onNext(BigDecimal(v) / 100))
In [21]:
import TDAParams._
val BASE =
TDAParams(
lens = TDALens(
Filter("PCA" :: 0 :: HNil, 10, 0.5)),
clusteringParams = ClusteringParams(),
scaleSelection = histogram(10))
val params$ =
List(
nrBins$.map(v => setFilterNrBins(0, v)),
overlap$.map(v => setFilterOverlap(0, v))).
reduce(_ merge _).
scan(BASE)((params, fn) => fn(params))
pipe$_subRef.update(params$.subscribe(in$))
channel("ch_TDA_1").set("nrBins", BASE.lens.filters(0).nrBins)
channel("ch_TDA_1").set("overlap", (BASE.lens.filters(0).overlap * 100).toInt)
We create two slider widgets that provide the inputs for the nrBins$ and overlap$ Observables.
In [22]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>
<table style="border-style: hidden;">
<tr style="border-style: hidden;">
<th style="border-style: hidden;">nr of bins</th>
<td style="border-style: hidden;">
<paper-slider min="0" max="100" step="1" value="{{nrBins}}"></paper-slider>
</td>
<td style="border-style: hidden;">[[nrBins]]</td>
</tr>
<tr style="border-style: hidden;">
<th style="border-style: hidden;">overlap</th>
<td style="border-style: hidden;">
<paper-slider min="0" max="75" step="1" value="{{overlap}}"></paper-slider>
</td>
<td style="border-style: hidden;">[[overlap]]%</td>
</tr>
</table>
</template>
Out[22]:
In [23]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>
<plongeur-graph data="{{result}}"></plongeur-graph>
</template>
Out[23]:
In [24]:
val tdaParams =
TDAParams(
lens = TDALens(
Filter("PCA" :: 0 :: HNil, 10, 0.25),
Filter("PCA" :: 1 :: HNil, 10, 0.25)),
clusteringParams = ClusteringParams(),
scaleSelection = histogram(10))
in$.onNext(tdaParams)
In [25]:
ctx.dim
Out[25]:
In [35]:
import org.apache.spark.mllib.feature.{PCA, PCAModel}
val points = ctx.dataPoints
val dims = ctx.dim
val pcaModel = new PCA(dims).fit(points.map(_.features))
val pcaCoords = points.collect.map(dp => pcaModel.transform(dp.features).toArray.toList).toList
In [36]:
channel("data").set("pca", pcaCoords)
In [37]:
%%html
<template is='urth-core-bind' channel='data'>
<urth-viz-scatter
datarows='[[pca]]'
primary='0'
secondary='1'
/>
</template>
Out[37]:
In [ ]:
In [ ]: