In [223]:
from pyspark import SparkConf, SparkContext
from collections import OrderedDict
from functools import partial
import re

from numpy import array
from math import sqrt
import numpy as np

In [224]:
partitions = 64
parlog = sc.textFile("/user/milroy/lustre_debug.out", partitions)

In [225]:
parlog.take(10)


Out[225]:
[u'00010000:00080000:2.1F:1433384402.983324:0:0:0:(ldlm_lib.c:2008:target_recovery_expired()) scratch-MDT0000: recovery timed out; 2 clients are still in recovery after 300s (136 clients connected)',
 u'00000100:00080000:22.0:1433439189.202419:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export e0b948f3-2c66-de79-82f6-858c54bcf73f at 1433439189 exp ffff8805feae1000',
 u'00000100:00080000:22.0:1433439189.205134:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export 1915dbbb-3d37-10c8-c161-d533b76bcbcb at 1433439189 exp ffff8805cf6dec00',
 u'00000100:00080000:22.0:1433439189.208987:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export 37147439-d85e-5002-0501-0315cbd3a063 at 1433439189 exp ffff8805cf5f7800',
 u'00000100:00080000:22.0:1433439189.213463:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export ed7c79f9-b42b-ce30-ba52-38106df6aaed at 1433439189 exp ffff8805b6a0dc00',
 u'00000100:00080000:22.0:1433439189.215960:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export 352aa199-a505-6fb6-96f6-bb641b17474f at 1433439189 exp ffff8802f1373800',
 u'00000100:00080000:22.0:1433439189.232961:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export 9bd403bb-4d2e-7d02-1c22-2c809edefa3d at 1433439189 exp ffff8802f2b85000',
 u'00000100:00080000:22.0:1433439189.303157:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export 2d28e2ea-bedc-ad07-b49a-6f20c20f841b at 1433439189 exp ffff8805b4d34000',
 u'00000100:00080000:22.0:1433439189.306565:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export 7207a313-9b1d-d5ee-2100-a3ca8c509e95 at 1433439189 exp ffff8805bb752800',
 u'00000100:00080000:22.0:1433439189.321356:0:28364:0:(service.c:789:ptlrpc_update_export_timer()) updating export 63d06ebf-337a-7077-3d09-00074c38d802 at 1433439189 exp ffff8805c7210c00']

In [326]:
words = parlog.filter(lambda line: line.count('-') > 3).filter(
    lambda line: 'updating' in line).map(lambda line: re.split('\W+', line.lower().strip()))

In [ ]:
words.take(3)

In [329]:
exports = words.flatMap(lambda line: [[int(line[7]), str(x), str(''.join(line[15:19]))] 
                                      for x in line if x.startswith('ffff')])

In [330]:
exports.take(1)


Out[330]:
[[28364, 'ffff8805feae1000', 'e0b948f32c66de7982f6']]

In [331]:
to_int = exports.map(lambda row: [row[0], int(row[1], 16), int(row[2], 16)])

In [332]:
to_int.take(10)


Out[332]:
[[28364, 18446612158061875200L, 1061227996410447197209334L],
 [28364, 18446612157269142528L, 118462377013510696124769L],
 [28364, 18446612157268195328L, 260107466402186330768641L],
 [28364, 18446612156853050368L, 1121497041959493926173266L],
 [28364, 18446612144951080960L, 251071831362941308278518L],
 [28364, 18446612144976318464L, 735877783481886252866594L],
 [28364, 18446612156822798336L, 213260712583334543733914L],
 [28364, 18446612156934072320L, 538490657162060706488576L],
 [28364, 18446612157129886720L, 471359184725092152589577L],
 [28364, 18446612156607994880L, 734911401062701910371150L]]

In [333]:
to_vector = to_int.map(lambda row: np.asarray([float(x) for x in row]))

In [342]:
to_vector.take(2)


Out[342]:
[array([  2.83640000e+04,   1.84466122e+19,   1.06122800e+24]),
 array([  2.83640000e+04,   1.84466122e+19,   1.18462377e+23])]

In [344]:
from pyspark.mllib.stat import Statistics

In [347]:
pearsonCorr = Statistics.corr(to_vector, method="spearman")

In [348]:
pearsonCorr


Out[348]:
array([[  1.00000000e+00,  -2.93146845e-03,  -2.03278004e-04],
       [ -2.93146845e-03,   1.00000000e+00,  -2.72207813e-02],
       [ -2.03278004e-04,  -2.72207813e-02,   1.00000000e+00]])