Plongeur

A topological data analysis library.

Core algorithm written in Scala, using Apache Spark.

Executed in a Jupyter notebook, using the Apache Toree kernel and declarative widgets.

Graphs rendered with Sigma/Linkurious, wrapped in a Polymer component.

Reactive machinery powered by Rx RxScala.

Maven dependencies


In [1]:
%AddDeps org.apache.spark spark-mllib_2.10 1.6.2 --repository file:/Users/tmo/.m2/repository
%AddDeps org.scalanlp breeze_2.10 0.11.2 --transitive
%AddDeps org.scalanlp breeze-natives_2.10 0.11.2
%AddDeps org.scalanlp breeze-macros_2.10 0.11.2
%AddDeps com.github.karlhigley spark-neighbors_2.10 0.3.6-FORK --repository file:/Users/tmo/.m2/repository
%AddDeps io.reactivex rxscala_2.10 0.26.1 --transitive --repository file:/Users/tmo/.m2/repository 
%AddDeps com.softwaremill.quicklens quicklens_2.10 1.4.4 --repository file:/Users/tmo/.m2/repository
%AddDeps org.tmoerman plongeur-spark_2.10 0.3.48 --repository file:/Users/tmo/.m2/repository


Marking org.apache.spark:spark-mllib_2.10:1.6.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/org/apache/spark/spark-mllib_2.10/1.6.2/spark-mllib_2.10-1.6.2.jar
Marking org.scalanlp:breeze_2.10:0.11.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/org/spire-math/spire-macros_2.10/0.7.4/spire-macros_2.10-0.7.4.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/org/scalanlp/breeze-macros_2.10/0.11.2/breeze-macros_2.10-0.11.2.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/junit/junit/4.8.2/junit-4.8.2.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/com/github/rwl/jtransforms/2.4.0/jtransforms-2.4.0.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/org/apache/commons/commons-math3/3.2/commons-math3-3.2.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/com/github/fommil/netlib/core/1.1.2/core-1.1.2.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/net/sourceforge/f2j/arpack_combined_all/0.1/arpack_combined_all-0.1.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/org/spire-math/spire_2.10/0.7.4/spire_2.10-0.7.4.jar
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/org/scalanlp/breeze_2.10/0.11.2/breeze_2.10-0.11.2.jar
Marking org.scalanlp:breeze-natives_2.10:0.11.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/org/scalanlp/breeze-natives_2.10/0.11.2/breeze-natives_2.10-0.11.2.jar
Marking org.scalanlp:breeze-macros_2.10:0.11.2 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/org/scalanlp/breeze-macros_2.10/0.11.2/breeze-macros_2.10-0.11.2.jar
Marking com.github.karlhigley:spark-neighbors_2.10:0.3.6-FORK for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/com/github/karlhigley/spark-neighbors_2.10/0.3.6-FORK/spark-neighbors_2.10-0.3.6-FORK.jar
Marking io.reactivex:rxscala_2.10:0.26.1 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/io/reactivex/rxscala_2.10/0.26.1/rxscala_2.10-0.26.1.jar
-> New file at /Users/tmo/.m2/repository/io/reactivex/rxjava/1.1.1/rxjava-1.1.1.jar
Marking com.softwaremill.quicklens:quicklens_2.10:1.4.4 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/https/repo1.maven.org/maven2/com/softwaremill/quicklens/quicklens_2.10/1.4.4/quicklens_2.10-1.4.4.jar
Marking org.tmoerman:plongeur-spark_2.10:0.3.48 for download
Preparing to fetch from:
-> file:/var/folders/zz/zyxvpxvq6csfxvn_n0000000000000/T/toree_add_deps767520857116987262/
-> file:/Users/tmo/.m2/repository
-> https://repo1.maven.org/maven2
-> New file at /Users/tmo/.m2/repository/org/tmoerman/plongeur-spark_2.10/0.3.48/plongeur-spark_2.10-0.3.48.jar

In [2]:
%addjar http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar


Starting download from http://localhost:8888/nbextensions/declarativewidgets/declarativewidgets.jar
Finished download of declarativewidgets.jar

Import classes


In [3]:
import org.apache.commons.lang.StringUtils.trim
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors.dense
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime
import org.tmoerman.plongeur.tda.Distances._
import org.tmoerman.plongeur.tda.Model._
import org.tmoerman.plongeur.tda.LSH.LSHParams
import org.tmoerman.plongeur.tda.Model.{DataPoint, TDAContext, dp}
import org.tmoerman.plongeur.tda.knn.FastKNNParams
import org.tmoerman.plongeur.tda.knn.SampledKNN.SampledKNNParams
import org.tmoerman.plongeur.tda.knn.{FastKNN, SampledKNN, _}
import org.tmoerman.plongeur.util.RDDFunctions._
import org.tmoerman.plongeur.util.TimeUtils.time
import org.tmoerman.plongeur.tda.geometry.Laplacian._
import breeze.stats.distributions._
import org.apache.spark.mllib.linalg.SparseMatrix

In [4]:
import declarativewidgets._
initWidgets

import declarativewidgets.WidgetChannels.channel


Read the Iris data set


In [5]:
def read(file: String)(implicit sc: SparkContext): RDD[DataPoint] = {

  val irisParsed =
    sc
      .textFile(file)
      .map(_.split(",").map(trim))
      .parseCsv{ case Array(a, b, c, d, e) => (a.toDouble, b.toDouble, c.toDouble, d.toDouble, e) }

  val irisDataPointsRDD =
    irisParsed
      ._2
      .zipWithIndex
      .map{ case ((a, b, c, d, cat), idx) => DataPoint(idx.toInt, dense(Array(a, b, c, d)), Some(Map("cat" -> cat))) }
    
  irisDataPointsRDD

}

In [6]:
val wd = "../src/test/resources/data/"

val irisFile =  wd + "iris.csv"

In [7]:
val irisRDD = read(irisFile)(sc)

In [8]:
irisRDD.take(3).mkString("\n")


Out[8]:
DataPoint(0,[5.1,3.5,1.4,0.2],Some(Map(cat -> setosa)))
DataPoint(1,[4.9,3.0,1.4,0.2],Some(Map(cat -> setosa)))
DataPoint(2,[4.7,3.2,1.3,0.2],Some(Map(cat -> setosa)))

Compute the exact kNN distance matrix


In [9]:
type Triplet = (Index, Index, Distance)

def toSparseMatrix(N: Int, triplets: Iterable[Triplet]) = SparseMatrix.fromCOO(N, N, triplets)

def toAffinities(distancesRDD: RDD[Triplet], sigma: Double): RDD[Triplet] = {
  distancesRDD
    .map{ case (p, q, d) => (p, q, gaussianSimilarity(d, sigma)) }
    .cache
}

def distances(rdd: RDD[DataPoint])(implicit distance: DistanceFunction): RDD[Triplet] = {
  (rdd cartesian rdd)
    .filter{ case (p, q) => (p.index != q.index) }
    .map{ case (p, q) => (p.index, q.index, distance(p, q)) }.cache
}

In [10]:
val dist = LpNormDistance(0.5)

val SIGMA = 1.0

val ctx = TDAContext(sc, irisRDD)

val distancesRDD = distances(irisRDD)(dist)

val affinitiesRDD = toAffinities(distancesRDD, SIGMA)

Compute the Laplacian


In [11]:
val A = toSparseMatrix(ctx.N, affinitiesRDD.collect)

val E = 3

val params = new LaplacianParams(nrEigenVectors = E)

val laplacian = fromAffinities(ctx, A, params)

OUTPUT


In [12]:
%%html
<link rel='import' href='urth_components/urth-viz-scatter/urth-viz-scatter.html' is='urth-core-import'>


Out[12]:

Plotting the first 2 coords of the iris data set


In [13]:
%%html
<template is='urth-core-bind' channel='data'>    
    <urth-viz-scatter        
        datarows='[[raw]]'    
        columns='["x", "y", "cat"]'
        palette='["green", "red"]'
        primary='0'
        secondary='1'
        staggerLabels='true'
        multi-select='true'/>
</template>


Out[13]:

In [14]:
val rawData = irisRDD.
    map(dp => {
        val x = dp.features(0)
        val y = dp.features(1)
        val c = dp.meta.get("cat") match {
          case "versicolor" => 0
          case "virginica" => 1
          case "setosa" => 2
        }
        List(x, y, c)}).collect.toList

channel("data").set("raw", rawData)

Plotting the first 2 eigenvectors of the graph Laplacian


In [15]:
%%html
<template is='urth-core-bind' channel='data'>    
    <urth-viz-scatter        
        datarows='[[lap]]'    
        columns='["x", "y"]'        
        primary='0'
        secondary='1'
        staggerLabels='true'
        multi-select='true'/>
</template>


Out[15]:

In [19]:
val laplacianData = 
  laplacian.rows.map(v => List(v(1), v(2))).collect.toList
  
channel("data").set("lap", laplacianData)

In [ ]: