In [ ]:
# Load Data
#tbird_logs = sc.textFile("hdfs:///user/ytesfaye/tbird.log.out.logCluster.processed.gz")#.repartition(10)
tbird_logs = sc.textFile("hdfs:///user/ytesfaye/tbird.log.preProc.gz").repartition(10)
In [ ]:
from collections import namedtuple
LogLine = namedtuple('LogLine', ['ts', 'msg'])
def get_tbird_data(line):
ls = line.split(' ', 1)
timestamp = float(ls[0])
msg = ls[1]
return LogLine(ts=timestamp, msg=msg)
tbird_loglines = tbird_logs.map(get_tbird_data).cache()
In [ ]:
tbird_loglines.take(5)
In [ ]:
# Get Words that occur more than support times
from collections import defaultdict
support = 200
def parse_words(line):
words = set(line.msg.split())
return [(word, 1) for word in words]
frequentWordDict = tbird_loglines.flatMap(parse_words)\
.reduceByKey(lambda x,y: x+y)\
.filter(lambda (key,count): count > support)\
.collectAsMap()
frequentWords = sc.broadcast(set(frequentWordDict.keys()))
In [ ]:
def extract_patterns(line):
skip = 0
freqWordPattern = []
pattern = []
for word in line.msg.split():
if word in frequentWords.value:
if skip != 0:
pattern.append(skip)
skip = 0
freqWordPattern.append(word)
pattern.append(word)
else:
skip += 1
return (tuple(freqWordPattern), pattern)
In [ ]:
def parse_list(pattern):
freqWordPattern, patterns = pattern
patterns = set([tuple(pattern) for pattern in patterns])
aggregate_pattern = [set()]
for word in freqWordPattern:
aggregate_pattern.append(word)
aggregate_pattern.append(set())
for pattern in patterns:
output_loc = 0
prev_val = 0
for word in pattern:
if isinstance(word, int):
aggregate_pattern[output_loc].add(word)
output_loc += 1
prev_val = 1
else:
# TODO: Add check here that it matches what it should match
#if word != aggregate_pattern[output_loc+1]:
# print 'ERROR', word, aggregate_pattern[output_loc]
if prev_val == 0:
aggregate_pattern[output_loc].add(0)
output_loc += 2
else:
output_loc += 1
prev_val = 0
final_pattern = []
for word in aggregate_pattern:
if isinstance(word, set):
if len(word) >= 2:
final_pattern.append('(:? S+){%d,%d}'%(min(word), max(word)))
elif len(word) == 1 and 0 not in word: # Always skip the same number of values
final_pattern.append('(:? S+){%d,%d}'%(min(word), max(word)))
else:
final_pattern.append(word)
return final_pattern
In [ ]:
t = tbird_loglines.map(extract_patterns)\
.groupByKey()\
.filter(lambda (freqWordPattern, pattern): len(pattern) > support)\
.map(parse_list)\
.collect()
In [ ]:
for l in sorted(t):
print ' '.join(l)
In [ ]:
len(t)
In [ ]:
import re
print re.escape(' '.join(parse_list(t[0])))
In [ ]:
for i in t:
print ' '.join(parse_list(i))[:60]
In [ ]:
r = list(t[1][1])
In [ ]:
for i in range(len(t)):
if t[i][0][0] == 'USER':
print t[i][0]
In [ ]:
print t[3][0][0]
In [ ]:
list(t[0][1])
In [ ]: