Apache Spark development basics

Check that Spark is running


In [1]:
sc.version


Out[1]:
2.1.0

Word Count

The word count program is considered to be the "hello world" program in Big Data analytics. In the word count program, given a text file, we want to count how many times every single word occurs. An example follows.

Input file:

foo bar bar 
baz bar
foo baz bar

Result:

(foo,2)
(bar,4)
(baz,2)

Task 1: given an input file data/la_divina_commedia.txt, count how many times each single word occurs into it.


In [19]:
{
sc.textFile("data/la_divina_commedia.txt")
  .flatMap(_.split(" ")) // transformation
  .map((_,1)) // transformation
  .reduceByKey(_+_) // transformation
  .saveAsTextFile("data/commedia_counts.txt") // action (triggers the computation)
}

Montecarlo $\pi$ estimation

Large dataset analysis is the main use case of Spark. However, Spark can be used to perform compute intensive tasks as well. Montecarlo $\pi$ estimation is a good example problem.

Montecarlo method

Idea: the ratio $\frac{A_{circle}}{A_{square}}$ is roughly equal to the faction of darts that fall in the circle.

Algorithm

  1. Throw $N$ uniformly distributed darts in the square
  2. Count how many darts fall in the circle
  3. Pi is roughly $4\frac{count}{N}$
\begin{equation*} \frac{count}{N} \simeq \frac{A_{circle}}{A_{square}} = \frac{\pi r^2}{(2r)^2} = \frac{\pi r^2}{4^2} = \frac{\pi}{4} \end{equation*}

In [18]:
{
val n = 10000000
val count = sc.parallelize(1 to n)
  .map { _ =>
      val x = math.random
      val y = math.random
      if(x*x + y*y < 1) 1 else 0
  }.reduce(_+_)
val pi = 4.0 * count / n
println(pi)
}


3.141292

K-Nearest Neighbour Classifier

When Spark was first implemented, the motivation to have a new framework was the lack of dataset caching in MapReduce (and Hadoop). This is penalizing for applications that need to access a hot dataset iteratively. Building a K-Nearest Neighbour (KNN) classifier, is a nice example that falls in this range of problems.

Supervised classification: Given a dataset of examples $(x_i, l)$, $i=1 ... N$, where $x_n$ is a vector in a fixed dimensional space and $l$ is a class label, in supervised classification we aim to build a model to classify new unlabelled exaples $(x_i, ?)$, $i=N+1 ... M$.

KNN, idea: Foreach new example $(x_i, ?)$, $i=N+1 ... M$, compute the (euclidean) distance from the known examples $(x_i, l)$, $i=1 ... N$. Label $(x_i, ?)$ as the most frequent class in the K nearest neighbours.

1-NN classifer in Spark


In [17]:
{

// Generate some examples 
val examples = sc.parallelize(1 to 300).map { _ =>
    val xi = Array(math.random, math.random)
    val label = if(xi(1) > xi(0)) {
        "yellow"
    } else {
        "purple"
    }
    (xi,label)
}

// Save 20% of the exaples for testing
val split = examples.randomSplit(Array(0.8,0.2))
val dataset = split(0).cache // cache in memory
val testset = split(1).collect // assume M small

// Make some prediction with 1NN
val predictions = testset.map { case(x,_) => // unseen
    dataset.map { case(z, label) => // known label
        val d0 = z(0) - x(0)
        val d1 = z(1) - x(1)
        val dist = math.sqrt(d0*d0 + d1*d1)
        (dist,label)
    }
    .sortBy{case(dist,label) => dist} 
    .first
    ._2 // return the label
}

// Evaluate our predictions
val correct = testset.zip(predictions).count { 
    case((_,label),prediction) =>
        label == prediction
}
val correctFrac = correct.toDouble / testset.length
println(s"fraction of correct predictions $correctFrac")

}


fraction of correct predictions 0.9295774647887324

Questions

Please open an issue here: https://goo.gl/dOy089.