In [1]:
classpath.add(
"org.apache.spark" %% "spark-core" % "2.0.2",
"org.apache.spark" %% "spark-sql" % "2.0.2",
"org.apache.spark" %% "spark-mllib" % "2.0.2"
);
In [2]:
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset}
In [3]:
val spark = SparkSession.builder().master("local[*]").getOrCreate()
In [4]:
import spark.implicits._
In [20]:
val ds1 = spark.createDataset(Seq(1)).map(_.toDouble)
val ds2 = spark.createDataset(Seq(1, 2)).map(_.toDouble)
val ds3 = spark.createDataset(Seq(1, 2, 3)).map(_.toDouble)
val ds4 = spark.createDataset(Seq(1, 2, 3, 4)).map(_.toDouble)
val ds5 = spark.createDataset(Seq(1, 2, 3, 4, 5)).map(_.toDouble)
In [14]:
val Array(median) = ds5.stat.approxQuantile("value",
Array(0.5),
relativeError = 0.1)
This is strange to me. My understanding is that relativeError=0 is supposed to result in an exact median calculation. I will have to look into this further.
In [15]:
val Array(median) = ds5.stat.approxQuantile("value",
Array(0.5),
relativeError = 0)
In [21]:
val Array(median) = ds4.stat.approxQuantile("value",
Array(0.5),
relativeError = 0.1)
In [22]:
val Array(median) = ds4.stat.approxQuantile("value",
Array(0.5),
relativeError = 0)
In [16]:
val Array(median) = ds1.stat.approxQuantile("value",
Array(0.5),
relativeError = 0.1)
In [17]:
val Array(median) = ds1.stat.approxQuantile("value",
Array(0.5),
relativeError = 0)
This is not an efficient implementation but it works.
In [28]:
import org.apache.spark.sql.Dataset
In [42]:
def median(ds: Dataset[Double], column: String = "value"): Double = {
// Order the dataset
val dsOrdered = ds.orderBy(column)
val count = ds.count()
val dsDouble = dsOrdered.select(column).as[Double]
// Zip the Dataset with index so we can lookup
// values by index
val dsWithIndex = dsDouble.rdd.zipWithIndex()
if (count % 2 == 0) {
val left = dsWithIndex
.filter(_._2 == count / 2 - 1)
.collect()(0)._1
val right = dsWithIndex
.filter(_._2 == count / 2)
.collect()(0)._1
(left + right) / 2
} else {
dsWithIndex.
filter(_._2 == count / 2)
.collect()(0)._1
}
}
In [38]:
median(ds5)
In [39]:
median(ds4)
In [40]:
median(ds1)
In [41]:
median(ds2)