In [1]:
val rawblocks = sc.textFile("linkage")
rawblocks.first


Out[1]:
"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"

In [2]:
val head = rawblocks.take(10)
head.length


Out[2]:
10

In [3]:
head.foreach(println)


"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
37671,78628,1,?,1,?,1,1,1,1,1,TRUE
57064,74984,1,1,1,?,1,1,1,1,1,TRUE
3149,8328,1,?,1,?,1,1,1,1,1,TRUE
33771,38173,1,?,1,?,1,1,1,1,1,TRUE
17029,37657,1,?,1,?,1,1,1,1,1,TRUE
94492,94494,1,?,1,?,1,1,1,1,1,TRUE
2318,14303,1,?,1,?,1,1,1,1,1,TRUE
2481,10949,1,?,1,?,1,1,1,1,1,TRUE
25247,25711,1,?,1,?,1,1,1,1,1,TRUE

In [6]:
def isHeader(line: String) = line.contains("id_1")

In [7]:
def isHeader(line: String): Boolean = {
    line.contains("id_1")
}

In [8]:
head.filter(isHeader).foreach(println)


"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"

In [9]:
head.filterNot(isHeader).length


Out[9]:
9

In [10]:
head.filter(x => !isHeader(x)).length


Out[10]:
9

In [11]:
head.filter(!isHeader(_)).length


Out[11]:
9

Let's apply this to all of our data now and store it as noheader.


In [12]:
val noheader = rawblocks.filter(!isHeader(_))

In [13]:
noheader.first


Out[13]:
37671,78628,1,?,1,?,1,1,1,1,1,TRUE

In [14]:
val header = head.filter(isHeader(_))

In [17]:
header


Out[17]:
Array("id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match")

In [19]:
val line = head(5)
val pieces = line.split(',')
pieces


Out[19]:
Array(17029, 37657, 1, ?, 1, ?, 1, 1, 1, 1, 1, TRUE)

In [20]:
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val matched = pieces(11).toBoolean

In [21]:
val rawscores = pieces.slice(2,11)
rawscores.map(s => s.toDouble)


Out[21]:
Name: java.lang.NumberFormatException
Message: For input string: "?"
StackTrace: sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1250)
java.lang.Double.parseDouble(Double.java:540)
scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)
scala.collection.immutable.StringOps.toDouble(StringOps.scala:31)
$line40.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:25)
$line40.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:25)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
$line40.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
$line40.$read$$iwC$$iwC$$iwC.<init>(<console>:30)
$line40.$read$$iwC$$iwC.<init>(<console>:32)
$line40.$read$$iwC.<init>(<console>:34)
$line40.$read.<init>(<console>:36)
$line40.$read$.<init>(<console>:40)
$line40.$read$.<clinit>(<console>)
$line40.$eval$.<init>(<console>:7)
$line40.$eval$.<clinit>(<console>)
$line40.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
com.ibm.spark.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:355)
com.ibm.spark.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:350)
com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
com.ibm.spark.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:349)
com.ibm.spark.kernel.interpreter.scala.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:349)
com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Oops, there's a '?' we need to handle


In [23]:
def toDouble(s: String) = {
    if ("?".equals(s)) Double.NaN else s.toDouble
}
val scores = rawscores.map(toDouble)
scores


Out[23]:
Array(1.0, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0)

Now combine all the parsing of a line into one function


In [24]:
def parse(line: String) = {
    val pieces = line.split(',')
    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val scores = pieces.slice(2,11).map(toDouble)
    val matched = pieces(11).toBoolean
    (id1,id2,scores,matched)
}
val tup = parse(line)
tup


Out[24]:
(17029,37657,Array(1.0, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0),true)

We can select from index using the ._1 for the first element, or .productElement(0) for the first element.


In [25]:
tup._1


Out[25]:
17029

In [26]:
tup.productElement(0)


Out[26]:
17029

productArity will get the size of a tuple


In [27]:
tup.productArity


Out[27]:
4

In [28]:
case class MatchData(id1: Int, id2: Int, scores: Array[Double], matched: Boolean)

In [29]:
def parse(line: String) = {
    val pieces = line.split(',')
    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val scores = pieces.slice(2,11).map(toDouble)
    val matched = pieces(11).toBoolean
    MatchData(id1,id2,scores,matched)
}
val tup = parse(line)
tup


Out[29]:
MatchData(17029,37657,[D@135d559c,true)

In [30]:
tup.matched


Out[30]:
true

In [31]:
tup.id1


Out[31]:
17029

In [32]:
tup.scores


Out[32]:
Array(1.0, NaN, 1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0)

In [33]:
val mds = head.filter(x => !isHeader(x)).map(x => parse(x))
mds


Out[33]:
Array(MatchData(37671,78628,[D@770def59,true), MatchData(57064,74984,[D@33dcf91f,true), MatchData(3149,8328,[D@70677cd2,true), MatchData(33771,38173,[D@1a33f8a5,true), MatchData(17029,37657,[D@4a092613,true), MatchData(94492,94494,[D@1ea2cf5e,true), MatchData(2318,14303,[D@52dc3408,true), MatchData(2481,10949,[D@76dc23b7,true), MatchData(25247,25711,[D@6ead0057,true))

In [35]:
val parsed = noheader.map(line => parse(line))
parsed.first


Out[35]:
MatchData(37671,78628,[D@5c2644d1,true)

To avoid running functions on the data with each call to parsed, we can save the data in its parsed form on the cluster.


In [36]:
parsed.cache()


Out[36]:
MapPartitionsRDD[4] at map at <console>:42

In [38]:
val grouped = mds.groupBy(md => md.matched)
grouped.mapValues(x => x.size).foreach(println)


(true,9)

In [ ]:
val matchCounts = parsed.map(md => md.matched).countByValue()

matchCounts is of Scala's Map class, which does not have methods for sorting its contents on keys or values. Therefore, I will convert a Map into a Scala Seq type, which does provide support for sorting.


In [ ]:
val matchCountsSeq = matchCounts.toSeq