In [ ]:
from pyspark import SparkConf, SparkContext
import re
This dataset is a debug dump from a Lustre filesystem. Typically these events occur due to code bugs (LBUG), heavy load, hardware problems, or misbehaving user application IO.
Let's analyze some of the log structure to determine what may have caused this debug dump.
In [ ]:
sc
In [ ]:
partitions = 18
parlog = sc.textFile("/lustre/janus_scratch/dami9546/lustre_debug.out", partitions)
Let's take a look at the first five lines of the debug log. This log is colon-delimited, and roughly corresponds to the following information:
0-1 describe subsystem ID
2
3 timestamp
4-6 PIDs
7 relevant code module
8 code line
9 function and message
In [ ]:
parlog.take(5)
Now let's split each line of the RDD into lowercase "words".
Lambda functions are ubiquitous in Spark, I presume due to the functional programming underpinnings of Scala. They act on each partition in parallel, and operate on each line.
In [ ]:
words = parlog.map(lambda line: re.split('\W+', line.lower().strip()))
Notice that this map returns immediately; no actions have been taken- the DAG has been updated to prepare for transformations. I like to think of this as analogous to a page fault, but applying to a Directed Acyclic Graph.
In [ ]:
words.take(2)
My experience with Lustre affords me (some) insight into this- I know the system has been susceptible to MDS overloading due to applications creating tons of small files, or issuing lots of MDS RPCs. I want to look for all lines that contain mfd changes.
Let's apply a filter to this RDD. Let's create a new RDD that only contains lines with mfs changes.
In [ ]:
mfds = words.filter(lambda x: 'mfd' and 'change' in x)
Did it work?
In [ ]:
mfds.take(2)
Now we issue an action to the RDD: the DAG performs the lazily executed functions. In this case we count the number of lines in the mfds RDD.
In [ ]:
mfds.count()
And as a percent of the overall file?
In [ ]:
'{0:0.2f}%'.format((mfds.count()/float(parlog.count()))*100)
Now let's determine the effect of the flatMap: this behaves like map, but does not return a list for each line. Rather, it aggregates (flattens) the output into a single list.
In [ ]:
flatwords = parlog.flatMap(lambda line: re.split('\W+', line.lower().strip()))
Now filter out "words" longer than 2 characters.
In [ ]:
longwords = flatwords.filter(lambda x: len(x) > 2 )
In [ ]:
longwords.take(10)
To sort words by number of occurences we map each word of each line to a tuple: itself and 1. We will perform a reduction on these tuples to get counts.
In [ ]:
longwords = longwords.map(lambda word: (word, 1))
We utilize reduceByKey: this operation performs a function on identical keys. By default this will be the first element of the tuple. Since this will be the word, the behavior is desired.
Note that reduce operations are accumulators and must be associative.
In [ ]:
longcount = longwords.reduceByKey(lambda a, b: a + b)
In [ ]:
longcount.take(10)
We swap the order of the tuple's contents to sort by the number rather than words. The argument "False" passed to sortByKey instructs it to sort descending.
In [ ]:
longwords = longcount.map(lambda x: (x[1], x[0])).sortByKey(False)
In [ ]:
longwords.take(20)