A topological data analysis library.
Core algorithm written in Scala, using Apache Spark.
Executed in a Jupyter notebook, using the Apache Toree kernel and declarative widgets.
Graphs rendered with Sigma/Linkurious, wrapped in a Polymer component.
In [1]:
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2 --repository file:/Users/tmo/.m2/repository
%AddDeps org.scalanlp breeze_2.10 0.11.2 --transitive
%AddDeps org.scalanlp breeze-natives_2.10 0.11.2
%AddDeps org.scalanlp breeze-macros_2.10 0.11.2
%AddDeps com.github.haifengl smile-core 1.2.0 --transitive
%AddDeps com.github.karlhigley spark-neighbors_2.10 0.3.6-FORK --repository file:/Users/tmo/.m2/repository
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive --repository file:/Users/tmo/.m2/repository
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4 --repository file:/Users/tmo/.m2/repository
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.51 --repository file:/Users/tmo/.m2/repository
In [2]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar
In [3]:
import rx.lang.scala.{Observer, Subscription, Observable}
import rx.lang.scala.subjects.PublishSubject
import rx.lang.scala.subjects._
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors.dense
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime
import org.tmoerman.plongeur.tda.TDAMachine
import org.tmoerman.plongeur.tda.Distances._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.Filters._
import org.tmoerman.plongeur.tda.cluster.Clustering._
import org.tmoerman.plongeur.tda.cluster.Scale._
import org.tmoerman.plongeur.tda.Colour._
import org.tmoerman.plongeur.tda.Brewer
import org.tmoerman.plongeur.tda.LSH.LSHParams
import org.tmoerman.plongeur.tda.Model.{DataPoint, TDAContext, dp}
import org.tmoerman.plongeur.tda.knn.FastKNN.FastKNNParams
import org.tmoerman.plongeur.tda.knn.SampledKNN.SampledKNNParams
import org.tmoerman.plongeur.tda.knn.{FastKNN, SampledKNN, _}
import org.tmoerman.plongeur.util.RDDFunctions._
import org.tmoerman.plongeur.util.TimeUtils.time
import org.tmoerman.plongeur.tda.geometry.Laplacian._
import breeze.stats.distributions._
import org.apache.spark.mllib.linalg.SparseMatrix
In [4]:
import declarativewidgets._
initWidgets
import declarativewidgets.WidgetChannels.channel
In [5]:
import java.util.concurrent.atomic.AtomicReference
case class SubRef(val ref: AtomicReference[Option[Subscription]] = new AtomicReference[Option[Subscription]](None)) extends Serializable {
def update(sub: Subscription): Unit = ref.getAndSet(Option(sub)).foreach(old => old.unsubscribe())
def reset(): Unit = update(null)
}
In [6]:
%%html
<link rel='import' href='urth_components/paper-slider/paper-slider.html'
is='urth-core-import' package='PolymerElements/paper-slider'>
<link rel='import' href='urth_components/paper-button/paper-button.html'
is='urth-core-import' package='PolymerElements/paper-button'>
<link rel='import' href='urth_components/plongeur-graph/plongeur-graph.html'
is='urth-core-import' package='tmoerman/plongeur-graph'>
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>
Out[6]:
Keep references to Rx subscriptions apart.
In [7]:
val in$_subRef = SubRef()
Instantiate a PublishSubject. This stream of TDAParams instances represents the input of a TDAMachine. The PublishSubject listens to changes and sets these to the channel "ch_TDA_1" under the "params" key.
TODO: unsubscribe previous on re-evaluation
In [8]:
val in$ = PublishSubject[TDAParams]
in$_subRef.update(in$.subscribe(p => channel("ch_TDA_1").set("params", p.toString)))
Create an initial TDAParams instance. In the same cell, we submit the instance to the PublishSubject.
For the sake of illustration, we create an html snippet that listens to changes on the "ch_TDA_1" channel and displays the value of the "params" key.
In [9]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>
<div style='background: #FFB; padding: 10px;'>
<span style='font-family: "Courier"'>[[params]]</span>
</div>
</template>
Out[9]:
Notice that when we evaluate the TDAParams instantiation cells, the output of the yellow box changes.
In [10]:
import org.apache.spark.rdd.RDD
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.mllib.linalg.Vectors.dense
def readMixture(file: String): RDD[DataPoint] = {
sc
.textFile(file)
.zipWithIndex
.map{ case (line, idx) =>
val columns = line.split(",").map(trim)
val category = columns.head
val features = columns.tail.map(_.toDouble)
dp(idx, dense(features), Map("cat" -> category)) }
}
In [11]:
val data_path = "/Users/tmo/Work/batiskav/projects/plongeur/scala/plongeur-spark/src/test/resources/data/"
val mixture_path = data_path + "mixture.1000.2.csv"
val rdd = readMixture(mixture_path).cache
val ctx = TDAContext(sc, rdd)
In [12]:
rdd.count
Out[12]:
Turn a TDAResult into a data structure.
In [13]:
val r = scala.util.Random
def format(result: TDAResult) = Map(
"nodes" -> result.clusters.map(c =>
Map(
"id" -> c.id.toString,
"size" -> c.dataPoints.size,
"color" -> c.colours.headOption.getOrElse("#000000"),
"x" -> r.nextInt(100),
"y" -> r.nextInt(100))),
"edges" -> result.edges.map(e => {
val (from, to) = e.toArray match {case Array(f, t) => (f, t)}
Map(
"id" -> s"$from--$to",
"source" -> from.toString,
"target" -> to.toString)}))
Run the machine, obtaining an Observable of TDAResult instances
In [14]:
val out$: Observable[TDAResult] = TDAMachine.run(ctx, in$)
In [15]:
val out$_subRef = SubRef()
In [16]:
out$_subRef.update(
out$.subscribe(
onNext = (r) => channel("ch_TDA_1").set("result", format(r)),
onError = (e) => println("Error in TDA machine: ", e)))
In [17]:
val pipe$_subRef = SubRef()
In [18]:
import org.tmoerman.plongeur.ui.Controls._
kernel.magics.html(controlsCSS)
Out[18]:
In [40]:
import TDAParams._
val den = Filter(Density(sigma=1.0), 30, 0.30)
val pc0 = Filter(Feature(0), 20, 0.3)
val pc1 = Filter(Feature(1), 20, 0.3)
val selector = (d: DataPoint) => d.meta.get("cat")
val maxFq = ClusterMaxFrequency(Array("#F00", "#00F", "#999"), selector)
val avgFilterValue = AverageFilterValue(Brewer.palettes("PuOr")(9), den)
val BASE =
TDAParams(
lens = TDALens(pc0, pc1),
clusteringParams = ClusteringParams(),
scaleSelection = firstGap(5),
collapseDuplicateClusters = false,
colouring = Nop())
in$.onNext(BASE)
In [34]:
%%html
<template is='urth-core-bind' channel='ch_TDA_1'>
<plongeur-graph height="600" data="{{result}}"></plongeur-graph>
</template>
Out[34]:
In [36]:
val (sub, html) = BASE.makeControls(channel("ch_TDA_1"), in$)
pipe$_subRef.update(sub)
kernel.magics.html(html)
Out[36]:
In [27]:
val rawData = rdd.
map(dp => dp.features.toArray.toList).collect.toList
rawData.take(1)
Out[27]:
In [28]:
channel("data").set("raw", rawData)
In [29]:
%%html
<template is='urth-core-bind' channel='data'>
<urth-viz-scatter
datarows='[[raw]]'
primary='0'
secondary='1'
/>
</template>
Out[29]:
In [ ]: