In [1]:
val rawblocks = sc.textFile("linkage")
rawblocks.first
Out[1]:
In [2]:
val head = rawblocks.take(10)
head.length
Out[2]:
In [3]:
head.foreach(println)
In [6]:
def isHeader(line: String) = line.contains("id_1")
In [7]:
def isHeader(line: String): Boolean = {
line.contains("id_1")
}
In [8]:
head.filter(isHeader).foreach(println)
In [9]:
head.filterNot(isHeader).length
Out[9]:
In [10]:
head.filter(x => !isHeader(x)).length
Out[10]:
In [11]:
head.filter(!isHeader(_)).length
Out[11]:
Let's apply this to all of our data now and store it as noheader.
In [12]:
val noheader = rawblocks.filter(!isHeader(_))
In [13]:
noheader.first
Out[13]:
In [14]:
val header = head.filter(isHeader(_))
In [17]:
header
Out[17]:
In [19]:
val line = head(5)
val pieces = line.split(',')
pieces
Out[19]:
In [20]:
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val matched = pieces(11).toBoolean
In [21]:
val rawscores = pieces.slice(2,11)
rawscores.map(s => s.toDouble)
Out[21]:
Oops, there's a '?' we need to handle
In [23]:
def toDouble(s: String) = {
if ("?".equals(s)) Double.NaN else s.toDouble
}
val scores = rawscores.map(toDouble)
scores
Out[23]:
Now combine all the parsing of a line into one function
In [24]:
def parse(line: String) = {
val pieces = line.split(',')
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val scores = pieces.slice(2,11).map(toDouble)
val matched = pieces(11).toBoolean
(id1,id2,scores,matched)
}
val tup = parse(line)
tup
Out[24]:
We can select from index using the ._1 for the first element, or .productElement(0) for the first element.
In [25]:
tup._1
Out[25]:
In [26]:
tup.productElement(0)
Out[26]:
productArity will get the size of a tuple
In [27]:
tup.productArity
Out[27]:
In [28]:
case class MatchData(id1: Int, id2: Int, scores: Array[Double], matched: Boolean)
In [29]:
def parse(line: String) = {
val pieces = line.split(',')
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val scores = pieces.slice(2,11).map(toDouble)
val matched = pieces(11).toBoolean
MatchData(id1,id2,scores,matched)
}
val tup = parse(line)
tup
Out[29]:
In [30]:
tup.matched
Out[30]:
In [31]:
tup.id1
Out[31]:
In [32]:
tup.scores
Out[32]:
In [33]:
val mds = head.filter(x => !isHeader(x)).map(x => parse(x))
mds
Out[33]:
In [35]:
val parsed = noheader.map(line => parse(line))
parsed.first
Out[35]:
To avoid running functions on the data with each call to parsed, we can save the data in its parsed form on the cluster.
In [36]:
parsed.cache()
Out[36]:
In [38]:
val grouped = mds.groupBy(md => md.matched)
grouped.mapValues(x => x.size).foreach(println)
In [ ]:
val matchCounts = parsed.map(md => md.matched).countByValue()
matchCounts is of Scala's Map class, which does not have methods for sorting its contents on keys or values. Therefore, I will convert a Map into a Scala Seq type, which does provide support for sorting.
In [ ]:
val matchCountsSeq = matchCounts.toSeq