In [ ]:
#logs = 'hdfs://namenode/datasets/magichour/tbird.log.gz'
logs = 'hdfs://namenode/magichour/tbird/'
trans = 'hdfs://namenode/magichour/simpleTrans'
In [ ]:
sc.addPyFile('magichour/preProcess/preProcess_SPARK.py')
sc.addPyFile('magichour/LogCluster/LogCluster.py')
In [ ]:
from preProcess_SPARK import rdd_preProcess
from LogCluster import log_cluster
In [ ]:
pre_processed_lines = rdd_preProcess(sc, trans, logs, 200)
In [ ]:
templates = log_cluster(sc, pre_processed_lines, 2000)
In [ ]:
[' '.join(template) for template in templates]
In [ ]:
logs = 'hdfs://namenode/datasets/magichour/tbird.log.gz'
logs = 'hdfs://namenode/user/dgrossman/tbird.log.10000.gz'
trans = 'hdfs://namenode/magichour/simpleTrans'
In [ ]:
l = sc.textFile(logs)
In [ ]:
l.saveAsTextFile()
In [ ]:
logs = 'hdfs://namenode/magichour/tbird/'
In [ ]:
sc.addPyFile('magichour/magichour/lib/LogCluster/LogCluster.py')
sc.addPyFile('magichour/magichour/api/local/preprocess/preProcess_SPARK.py')
sc.addPyFile('magichour/magichour/api/local/preprocess/readLog_RDD.py')
In [ ]:
from preProcess_SPARK import rdd_preProcess
from readLog_RDD import rdd_ReadLog
from LogCluster import log_cluster
In [ ]:
rddLogs = rdd_ReadLog(sc,logs).repartition(1000)
In [ ]:
preprocessed_logs = rdd_preProcess(sc,trans,rddLogs)
In [ ]:
frequent_words, templates = log_cluster(sc, preprocessed_logs, 1000)
In [ ]:
[' '.join(template) for template in templates]
In [ ]:
len(templates)
In [ ]:
rddLogs.count()
In [ ]:
lines = rddLogs.take(1000)
In [ ]:
lines[:10]
In [ ]:
processed = preprocessed_logs.take(1000)
In [ ]:
[p.processed for p in processed[:10]]
In [ ]:
preprocessed_logs.cache()
In [ ]:
from LogCluster import *
"""
Run log cluster
Args:
log_lines(rdd of LogLine): Input log messages as LogLine objects
support(int): Threshold # of occurrences before a pattern can be included
Returns:
list[list[str]]: Returns a list of pattern strings (where the pattern is a list of strings) for the log lines
"""
frequent_word_dict = preprocessed_logs.flatMap(parse_words)\
.reduceByKey(lambda x,y: x+y)\
.filter(lambda (key,count): count > 1000)\
.collectAsMap()
frequent_words = sc.broadcast(set(frequent_word_dict.keys()))
#return log_lines.map(lambda x: extract_patterns(x, frequent_words))\
# .groupByKey()\
# .filter(lambda (freq_word_pattern, pattern): len(pattern) > support)\
# .map(collapse_patterns)\
# .collect()
In [ ]:
!ls
In [ ]:
import json
json.dump(frequent_word_dict, open('freq_word_dict.json', 'wb'))
In [ ]:
json.dump(templates, open('templates.json', 'wb'))
In [ ]:
preprocessed_logs.filter(lambda x: 'SWEEP' in x.processed and 'ib_sm' in x.processed).collect()
In [ ]:
d = preprocessed_logs.filter(lambda x: 'SWEEP' in x.processed)\
.map(lambda x: extract_patterns(x, frequent_words))\
.groupByKey()\
.filter(lambda (freq_word_pattern, pattern): len(pattern) > 1000)\
.map(collapse_patterns)\
.collect()
In [ ]:
d
In [ ]:
c[1].processed
In [ ]:
templates2 = preprocessed_logs.map(lambda x: extract_patterns(x, frequent_words))\
.reduceByKey(lambda l1,l2: agg(l1, l2))\
.filter(lambda (key, val): isinstance(val[0], int) and val[0] > 1000)\
.collect()
#\
#.filter(lambda (freq_word_pattern, pattern): len(pattern) > 1000)\
# .collect()
# .map(collapse_patterns)\
# .collect()
In [ ]:
len(templates2)
In [ ]:
templates2[-50:]
In [ ]:
def get_next_pair(input_list):
'''
Iterator over input lists that returns skip_count, word pairs
'''
count = None
i = 0
while i < len(input_list):
if isinstance(input_list[i], int):
count = set([input_list[i]])
elif isinstance(input_list[i], set):
count = input_list[i]
else: #str
yield (count, input_list[i])
count = None
i += 1
yield (count, None)
def combine_sets(s1, s2):
if s1 is None and s2 is None:
return None
if s1 is None:
return s2
elif s2 is None:
return s1
else:
return s1.union(s2)
def agg(l1, l2):
if isinstance(l1, list):
l1_count = 1
elif isinstance(l1, tuple):
l1_count = l1[0]
l1 = l1[1]
else:
raise TypeError('Expcted list or tuple, found: %s'%type(l1))
if isinstance(l2, list):
l2_count = 1
else:
l2_count = l2[0]
l2 = l2[1]
if not isinstance(l1_count, int) or not isinstance(l2_count, int):
raise ValueError('Incorrect format: %s|%s'%(l1_count, l2_count))
output = []
for (i1, i2) in zip(get_next_pair(l1), get_next_pair(l2)):
combined_count = combine_sets(i1[0], i2[0])
if combined_count is not None:
output.append(combined_count)
if i1[1] != i2[1]:
raise ValueError('These should match, instead: %s %s'%(l1, l2))#(i1[1], i2[1]))
if i1 is not None:
output.append(i1[1])
return (l1_count + l2_count, output)
In [ ]:
isinstance(tuple([1,2]), list)
In [ ]:
!ls
In [ ]:
In [ ]:
%load_ext Cython
In [ ]:
%%cython
cdef int a = 0
for i in range(10):
a += i
print a
In [ ]:
!hadoop fs -ls /magichour/matchedTemplates
In [ ]: