SparkPi

We now turn to doing some lightweight Spark stuff in Scala.

This is the same SparkPi program available in the Scala section of Spark examples.

It computes $\pi$ up to some digits by computing the area of the unit circle (which is $A = \pi r^2 = \pi$). To do so, the unit square is repeteadly sampled (by randomly taking points from it) and the points falling within the unit circle are counted; the result is an approximation to the desired area. See the wikipedia page for further explanation.

As a complete program

We could execute the complete SparkPI Scala snippet. Let's take a look at the original source:

import scala.math.random
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = 100000 * slices
    val count = sc.parallelize(1 to n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}

This can be run with a small modification: the Scala kernel used in this notebook (Toree) already provides a SparkContext ready for us to use, in the sc variable. So we do not need to create a new context by executing

val spark = new SparkContext(conf)

... and indeed we should not do so, because the operation tries to create another context, and unless the kernel is running with the configuration property spark.driver.allowMultipleContexts set to true, it will fail.

So let's use that already-available context, and redefine the program:


In [ ]:
import scala.math.random
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = sc.getConf /* <--- we use the context the kernel has already created */
    conf.setAppName( "Spark Pi" )
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = 100000 * slices
    val count = sc.parallelize(1 to n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
  }
}

And now we execute the defined program by calling it with the required arguments


In [ ]:
SparkPi.main( Array("2") )

As a notebook

Now we do the same, but in a more notebook-friendly shape by splitting the program into cells to be computed sequentially


In [ ]:
// Imports we need
import scala.math.random
import org.apache.spark._

In [ ]:
// Set out application name
val conf = sc.getConf
conf.setAppName( "Spark Pi" )

In [ ]:
// Define how many threads we want
val slices = 2;
// We will sample 100K points per thread
val n = 100000 * slices

In [ ]:
// the program
val count = sc.parallelize(1 to n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)

In [ ]:
// The result
println("Pi is roughly " + 4.0 * count / n)

Python

Side note: the SPylon kernel can also execute Python/Pyspark code


In [ ]:
%%python
from operator import add
print(sc.parallelize(range(1, 100)).reduce(add))