In [1]:
from pyspark import SparkConf, SparkContext
from collections import OrderedDict
In [2]:
partitions = 48
parcsv = sc.textFile("/lustre/janus_scratch/dami9546/lustre_timeseries.csv", partitions)
parcsv.take(5)
Out[2]:
In [3]:
filtered = parcsv.filter(lambda line: len(line.split(';')) == 6)
In [4]:
def cast(line):
try:
val = float(str(line.split(';')[2]))
except:
val = 0.0
return (int(line.split(';')[5]), line.split(';')[0],
line.split(';')[1], val)
In [5]:
parsed = filtered.map(cast)
Metrics aren't reported continuously, nor are the monitoring systems flawless. We need to assemble a unique set (dictionary) of metrics for the pivot, but they must be ordered to make sure time series analysis isn't distorted.
PySpark's ".distinct()" method accomplishes this; we issue a ".collect()" as well to assign the RDD's values to a variable.
In [ ]:
columns = parsed.map(lambda x: x[2]).distinct().collect()
basedict = dict((metric, 0.0) for metric in columns)
Now we create an ordered dictionary to preserve the metric (and consequently, column) ordering. If we did not create this OrderedDict, the keys' ordering may be permuted. This will render ML techniques useless.
The object is broadcast to all executors to be used in a future mapped function.
In [6]:
ordered = sc.broadcast(OrderedDict(sorted(basedict.items(), key=lambda y: y[0])))
The two functions below are adapted from user patricksurry's answer to this Stack Overflow question: http://stackoverflow.com/questions/30260015/reshaping-pivoting-data-in-spark-rdd-and-or-spark-dataframes. Beware, patricksurry's answer is predominantly serial!
In [ ]:
def combine(u1, u2):
u1.update(u2)
return u1
def sequential(u, v):
if not u:
u = {}
u[v[2]] = v[3]
return u
We need to perform an aggregation by key. This operation takes two functions as arguments: the sequential and combination functions. The sequential op constructs a dictionary from (metric, value) in each row, and the combine op combines row dictionaries based on identical (timestamp, host) keys.
In [ ]:
aggregated = parsed.keyBy(lambda row: (row[0], row[1])).aggregateByKey(
None, sequential, combine)
Now we need to impose the structure of our OrderedDict on each aggregated key, value pair. We create a new function to copy our canonical dictionary (of ordered keys, and 0.0 values) and update it with the dictionaries created in the aggregateByKey step.
In [ ]:
def mergedicts(new):
tmp = ordered.value.copy()
tmp.update(new[1])
return new[0], tmp
In [8]:
pivoted = aggregated.map(mergedicts)
Let's take a look at the results.
In [56]:
final_ordered = pivoted.takeOrdered(10, key=lambda x: x[0])
In [57]:
final_ordered[0][0]
Out[57]:
To sort the entire RDD, we use a sortByKey.
In [9]:
final_sorted = pivoted.sortByKey(keyfunc= lambda k: k[0])
In [10]:
final_dict = final_sorted.map(lambda row: row[1].values())
Writing the lists to disk takes quite a long time. This is not optimized and not writing in parallel. An exercise for the reader!