A topological data analysis library.
Core algorithm written in Scala, using Apache Spark.
Executed in a Jupyter notebook, using the Apache Toree kernel and declarative widgets.
Graphs rendered with Sigma/Linkurious, wrapped in a Polymer component.
In [1]:
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2 --repository file:/Users/tmo/.m2/repository
%AddDeps org.scalanlp breeze_2.10 0.11.2 --transitive
%AddDeps org.scalanlp breeze-natives_2.10 0.11.2
%AddDeps org.scalanlp breeze-macros_2.10 0.11.2
%AddDeps com.github.karlhigley spark-neighbors_2.10 0.3.6-FORK --repository file:/Users/tmo/.m2/repository
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive --repository file:/Users/tmo/.m2/repository
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4 --repository file:/Users/tmo/.m2/repository
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.48 --repository file:/Users/tmo/.m2/repository
In [2]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar
In [3]:
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors.dense
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime
import org.tmoerman.plongeur.tda.Distances._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.LSH.LSHParams
import org.tmoerman.plongeur.tda.Model.{DataPoint, TDAContext, dp}
import org.tmoerman.plongeur.tda.knn.FastKNNParams
import org.tmoerman.plongeur.tda.knn.SampledKNN.SampledKNNParams
import org.tmoerman.plongeur.tda.knn.{FastKNN, SampledKNN, _}
import org.tmoerman.plongeur.util.RDDFunctions._
import org.tmoerman.plongeur.util.TimeUtils.time
import org.tmoerman.plongeur.tda.geometry.Laplacian._
import breeze.stats.distributions._
import org.apache.spark.mllib.linalg.SparseMatrix
In [4]:
import declarativewidgets._
initWidgets
import declarativewidgets.WidgetChannels.channel
In [5]:
def read(file: String)(implicit sc: SparkContext): RDD[DataPoint] = {
val irisParsed =
sc
.textFile(file)
.map(_.split(",").map(trim))
.parseCsv{ case Array(a, b, c, d, e) => (a.toDouble, b.toDouble, c.toDouble, d.toDouble, e) }
val irisDataPointsRDD =
irisParsed
._2
.zipWithIndex
.map{ case ((a, b, c, d, cat), idx) => DataPoint(idx.toInt, dense(Array(a, b, c, d)), Some(Map("cat" -> cat))) }
irisDataPointsRDD
}
In [6]:
val wd = "../src/test/resources/data/"
val irisFile = wd + "iris.csv"
In [7]:
val irisRDD = read(irisFile)(sc)
In [8]:
irisRDD.take(3).mkString("\n")
Out[8]:
In [9]:
type Triplet = (Index, Index, Distance)
def toSparseMatrix(N: Int, triplets: Iterable[Triplet]) = SparseMatrix.fromCOO(N, N, triplets)
def toAffinities(distancesRDD: RDD[Triplet], sigma: Double): RDD[Triplet] = {
distancesRDD
.map{ case (p, q, d) => (p, q, gaussianSimilarity(d, sigma)) }
.cache
}
def distances(rdd: RDD[DataPoint])(implicit distance: DistanceFunction): RDD[Triplet] = {
(rdd cartesian rdd)
.filter{ case (p, q) => (p.index != q.index) }
.map{ case (p, q) => (p.index, q.index, distance(p, q)) }.cache
}
In [10]:
val dist = LpNormDistance(0.5)
val SIGMA = 1.0
val ctx = TDAContext(sc, irisRDD)
val distancesRDD = distances(irisRDD)(dist)
val affinitiesRDD = toAffinities(distancesRDD, SIGMA)
In [11]:
val A = toSparseMatrix(ctx.N, affinitiesRDD.collect)
val E = 3
val params = new LaplacianParams(nrEigenVectors = E)
val laplacian = fromAffinities(ctx, A, params)
In [12]:
%%html
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>
Out[12]:
In [13]:
%%html
<template is='urth-core-bind' channel='data'>
<urth-viz-scatter
datarows='[[raw]]'
columns='["x", "y", "cat"]'
palette='["green", "red"]'
primary='0'
secondary='1'
staggerLabels='true'
multi-select='true'/>
</template>
Out[13]:
In [14]:
val rawData = irisRDD.
map(dp => {
val x = dp.features(0)
val y = dp.features(1)
val c = dp.meta.get("cat") match {
case "versicolor" => 0
case "virginica" => 1
case "setosa" => 2
}
List(x, y, c)}).collect.toList
channel("data").set("raw", rawData)
In [15]:
%%html
<template is='urth-core-bind' channel='data'>
<urth-viz-scatter
datarows='[[lap]]'
columns='["x", "y"]'
primary='0'
secondary='1'
staggerLabels='true'
multi-select='true'/>
</template>
Out[15]:
In [19]:
val laplacianData =
laplacian.rows.map(v => List(v(1), v(2))).collect.toList
channel("data").set("lap", laplacianData)
In [ ]: