Example 2: A fast parallel pivot, or preparing for time series analysis


In [1]:
from pyspark import SparkConf, SparkContext
from collections import OrderedDict

In [2]:
partitions = 18
parcsv = sc.textFile("/lustre/janus_scratch/dami9546/lustre_timeseries.csv", partitions)
parcsv.take(5)


Out[2]:
[u'oss07;lustre.scratch.ost.obdfilter.OST0017.cache_access;0.00000;float;pages/s;1398382546',
 u'oss07;lustre.scratch.ost.obdfilter.OST0015.disconnect;0.00000;float;requests/s;1398382546',
 u'oss07;cpu_intr;0.0;float;%;1398382546',
 u'oss07;lustre.scratch.ost.obdfilter.hosttotal.cache_access;0.00000;float;pages/s;1398382546',
 u'oss07;lustre.scratch.ost.obdfilter.OST0025.connect;0.00000;float;requests/s;1398382546']

Each of these lines contains 6 semi-colon delimited columns: hostname, metric name, value reported, type, units, and Unix epoch time. Can we assume all do? The example data is an excerpt of one day of Lustre data, but we have hundreds of full days which may contain dropped writes and malformed data. I'll apply a filter to the data to select all lines with six columns.

Sometimes it isn't evident whether filters are needed until a succeeding RDD action fails.


In [3]:
filtered = parcsv.filter(lambda line: len(line.split(';')) == 6)

As seen above, the lines are Unicode, but in anticipation of necessary transformations the timestamp and values will need to be cast to appropriate types. We'll need to create a function that takes each line as an argument and returns a 4-tuple (quadruple?), organized to facilitate intuitive indexing. Let's pick the following ordering: (timestamp, host, metric, value). We don't need the other values, so they are discarded.

Since the values in the third column are currently Unicode, a try-except structure is used to attempt to cast them to floats. If unsuccessful we set them to zero rather than NaN, since these don't work with the forthcoming eigendecomposition.

An alternative to the try-except would be to apply a filter for lines whose third column can't be cast as a float. I haven't compared the performance between these two.


In [4]:
def cast(line):
    try:
      val = float(str(line.split(';')[2]))
    except:
      val = 0.0
    return (int(line.split(';')[5]), line.split(';')[0], 
            line.split(';')[1], val)

In [5]:
parsed = filtered.map(cast)

Metrics aren't reported continuously, nor are the monitoring systems flawless. We need to assemble a unique set (dictionary) of metrics for the pivot, but they must be ordered to make sure the covariance structure (for PCA) isn't distorted.

PySpark's ".distinct()" method accomplishes this; we issue a ".collect()" as well to assign the RDD's values to a variable.


In [ ]:
columns = parsed.map(lambda x: x[2]).distinct().collect()
basedict = dict((metric, 0.0) for metric in columns)

Now we create an ordered dictionary to preserve the metric (and consequently, column) ordering. If we did not create this OrderedDict, the keys' ordering may be permuted. This will render the eigendecomposition of the covariance matrix meaningless.

The object is broadcast to all executors to be used in a future mapped function.


In [6]:
ordered = sc.broadcast(OrderedDict(sorted(basedict.items(), key=lambda y: y[0])))

The two functions below are adapted from user patricksurry's answer to this Stack Overflow question: http://stackoverflow.com/questions/30260015/reshaping-pivoting-data-in-spark-rdd-and-or-spark-dataframes. Beware, patricksurry's answer is predominantly serial!


In [ ]:
def combine(u1, u2):
  u1.update(u2)
  return u1

def sequential(u, v):
  if not u:
      u = {}
  u[v[2]] = v[3]
  return u

We need to perform an aggregation by key. This operation takes two functions as arguments: the sequential and combination functions. The sequential op constructs a dictionary from (metric, value) in each row, and the combine op combines row dictionaries based on identical (timestamp, host) keys.


In [ ]:
aggregated = parsed.keyBy(lambda row: (row[0], row[1])).aggregateByKey(
    None, sequential, combine)

Now we need to impose the structure of our OrderedDict on each aggregated key, value pair. We create a new function to copy our canonical dictionary (of ordered keys, and 0.0 values) and update it with the dictionaries created in the aggregateByKey step.


In [ ]:
def mergedicts(new):
  tmp = ordered.value.copy()
  tmp.update(new[1])
  return new[0], tmp

In [8]:
pivoted = aggregated.map(mergedicts)

Let's take a look at the results.


In [56]:
final_ordered = pivoted.takeOrdered(10, key=lambda x: x[0])

In [57]:
final_ordered[0][0]


Out[57]:
(1398382545, u'mds01')

To sort the entire RDD, we use a sortByKey.


In [9]:
final_sorted = pivoted.sortByKey(keyfunc= lambda k: k[0])

In [10]:
final_dict = final_sorted.map(lambda row: row[1].values())

Writing the lists to disk takes quite a long time. This is not optimized and not writing in parallel. An exercise for the reader!


In [11]:
final_dict.saveAsTextFile("/lustre/janus_scratch/dami9546/pivoted.txt")

Now on to Scala Spark for time series PCA

Now exit the pyspark shell, and run spark-shell with the following options.


In [ ]:
spark-shell --master $MASTER --driver-memory 12g

In [ ]:
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.{Vector, Vectors}

val datafilePattern = "/lustre/janus_scratch/dami9546/pivoted.txt"
val lustreData = sc.textFile(datafilePattern).cache()

val vecData = lustreData.map(line => line.split(",").map(
        line => line.drop(1).dropRight(1)).map(
        v => v.toDouble)).map(arr => Vectors.dense(arr))
val rmat: RowMatrix = new RowMatrix(vecData)
val pc: Matrix = rmat.computePrincipalComponents(15)