In [ ]:
import time
def timeit():
time.ctime()
return time.strftime('%l:%M%p %Z on %b %d, %Y')
In [ ]:
timeit()
In [ ]:
from collections import namedtuple
from collections import defaultdict
import re
import json
LogLine = namedtuple('LogLine', ['ts', 'msg',
'processed', 'dictionary',
'template','templateId','templateDict'])
TemplateLine = namedtuple('TemplateLine',['id','template','skipWords'])
TransformLine = namedtuple('TransformLine',
['id', 'type', 'NAME', 'transform', 'compiled'])
In [ ]:
frequentWords = json.load(open('freq_word_dict.json'))
goodTemplates = json.load(open('templates.json'))
readLog_RDD.py
In [ ]:
def procLogLine(line, logFile):
'''
handles the logfile specific parsing input lines into 2 parts
ts: timestamp float
msg: the rest of the message
Args:
line(string): text to process
logFile(string): hint of URI used for input
should use for switching parsing
based off different directories
Returns:
retval(list[string,string]): [ts, msg]
'''
return line.strip().rstrip().split(' ', 3)[2:]
def rdd_LogLine(line, logFile):
'''
process a log line into a RDD
Args:
line(string): string from the logline
logFile(string): what URI the log lines came from,
eventually want to do different parsing
based on the base of the URI
Returns:
retval(LogLine): fills in the first two portions of the LogLine
namedtuple
'''
l = procLogLine(line, logFile)
return LogLine(float(l[0]),
l[1],
None,
None,
None,
None,
None)
def rdd_ReadLog(sc, logFile):
'''
read a log/directory into LogLine RDD format
NOTE: only ts, and msg are populated
Args:
sc(sparkContext)
logFile(string): URI to file toprocess
Returns:
retval(RDD(LogLines): RDD of logs read from the LogFile URI
'''
sparkLogFile = sc.textFile(logFile)
return sparkLogFile.map(lambda line: rdd_LogLine(line, logFile))
preProcess_RDD.py
In [ ]:
def rdd_TransformLine(line):
'''
process transformations into RDD format
Args:
line(string): line from the transform defintion file.
lines beginning with # are considered comments
and will need to be removed
Returns:
retval(TransformLine): namedTuple representation of the tasking
'''
if line.lstrip()[0] != '#':
# id,type,name,transform
l = line.lstrip().rstrip().split(',', 3)
return TransformLine(int(l[0]),
l[1],
l[2],
l[3],
re.compile(l[3]))
else:
return TransformLine('COMMENT',
'COMMENT',
'COMMENT',
'COMMENT',
'COMMENT')
def lineRegexReplacement(line, logTrans):
'''
apply a list of regex replacements to a line, make note of
all the remplacements peformed in a dictionary(list)
Args:
line(LogLine): logline to work on
Globals:
transforms(RDD(TransformLine)): replacemnts to make with
Returns:
retval(LogLine): logline with the processed, and dictionary portions
filled in
'''
text = line.msg.strip()
replaceDict = dict()
for t in logTrans.value:
if t.type == 'REPLACE':
replaceList = t.compiled.findall(text)
if replaceList:
replaceDict[t.NAME] = replaceList
text = t.compiled.sub(t.NAME, text, 0)
if t.type == 'REPLACELIST':
print 'REPLACELIST not implemented yet'
processed = ' '.join(text.split())
retVal = LogLine(line.ts, line.msg.lstrip().rstrip(),
processed.lstrip().rstrip(), replaceDict, None, None, None)
return retVal
def readTransforms(sc, transFile):
'''
returns a list of transforms for replacement processing
Args:
sc(sparkContext): spark context
transFile(string): uri to the transform file in HDFS
Returns:
retval(list(TransformLine))
'''
# map the transFile
simpleTransformations = sc.textFile(transFile)
# parse loglines
logTransforms = simpleTransformations.map(rdd_TransformLine).cache()
trans = logTransforms.collect()
lTrans = list()
for t in trans:
if t.id != 'COMMENT':
lTrans.append(t)
return lTrans
def logPreProcess(sc, logTrans, rrdLogLine):
'''
take a series of loglines and pre-process the lines
replace ipaddresses, directories, urls, etc with constants
keep a dictionary of the replacements done to the line
Args:
sc(sparkContext): spark context
logTrans(string): location fo the transFile in HDFS
logFile(string): location of the log data in HDFS
Returns:
retval(RDD(LogLines)): preprocessed log lines ready for next
stage of processing
'''
# following done to make sure that the broadcast gets to the function
return rrdLogLine.map(lambda line: lineRegexReplacement(line, logTrans))
def rdd_preProcess(sc, logTrans, rrdLogLine):
'''
make a rdd of preprocessed loglines
Args:
sc(sparkContext): sparkContext
logTrans(string): location fo the transFile in HDFS
logFile(string): location of the log data in HDFS
Returns:
retval(RDD(LogLines)): preprocessed log lines ready for next
stage of processing
'''
lTrans = readTransforms(sc, logTrans)
logTrans = sc.broadcast(lTrans)
return logPreProcess(sc, logTrans, rrdLogLine)
rddTemplate
In [ ]:
def badWay(r1, r2):
'''
correct way:
For each pair of regexes r and s for languages L(r) and L(s)
Find the corresponding Deterministic Finite Automata M(r) and M(s) [1]
Compute the cross-product machine M(r x s) and assign accepting states
so that it computes L(r) - L(s)
Use a DFS or BFS of the the M(r x s) transition table to see if any
accepting state can be reached from the start state
If no, you can eliminate s because L(s) is a subset of L(r).
Reassign accepting states so that M(r x s) computes L(s) - L(r)
Repeat the steps above to see if it's possible to eliminate r
'''
return(len(r2)-len(r1))
def rankMatches(m):
retval = sorted(m, cmp=badWay)
return retval
def getWordSkipNames(s):
'''
find the skip word patterns
Args:
s(_sre.SRE_Pattern): compiled regex to match a logline
Returns:
retval(list(string)): list of the skip patterns found in s
'''
pattern = r'\(\(\?\:\\\ \{0,1\}\\S\+\)\{(\d)\,(\d)\}\)'
matchObj = re.finditer(pattern, s.pattern, re.M | re.I)
retVal = list()
if matchObj:
for m in matchObj:
vals = m.groups()
fpattern = r'((?:\ {0,1}\S+){%i,%i})' % (int(vals[0]), int(vals[1]))
retVal.append(fpattern)
return retVal
def readTemplates(sc, templateFile):
'''
returns a list of regex for replacement processing
Args:
sc(sparkContext): spark context
templateFile(string): uri to the transform file in HDFS
Returns:
retval(list(string))
'''
# map the templateFile
templates = sc.textFile(templateFile)
templateRDD = templates.collect()
matches = list()
for t in templateRDD:
stripped = r''+t.strip().rstrip()
escaped = re.escape(stripped)
replaced = unescapeSkips(escaped)
matches.append(replaced)
matches = rankMatches(matches)
templateLines = list()
for index, m in enumerate(matches):
# match end of line too
t = TemplateLine(index,
re.compile(m + '$'),
getWordSkipNames(re.compile(m)))
templateLines.append(t)
return templateLines
def unescapeSkips(s):
'''
find an escaped version of skip{m,n} words
replace with unescaped version
Args:
s(string): string to search
Returns:
retval(string): string with replacement
'''
pattern = r'\\\(\\\:\\\?\\\ S\\\+\\\)\\\{(\d)\\\,(\d)\\\}'
matchObj = re.finditer(pattern, s, re.M | re.I)
b = s
if matchObj:
for m in matchObj:
newString = r'((?:\ {0,1}\S+){%i,%i})' % (int(m.groups()[0]),
int(m.groups()[1]))
# the r is very important
newFound = r'\\\(\\:\\\?\\ S\\\+\\\)\\\{%i\\,%i\\\}' % (int(m.groups()[0]),
int(m.groups()[1]))
b = re.sub(newFound, newString, b)
return b
return s
def rdd_MatchLine(line, templates):
'''
assign a log line to a templateId or -1 if no match
keep track of any skip word replacements, return additional
informaiton in the LogLine named tuple
Args:
line(LogLine): logline being classified
templates(list(TemplateLine)): templates to attempt to match to
broadcast variable
Returns:
retval(LogLine): LogLine with the final 3 fields filled in
template - actual template used for match
templateId - number of the template
templateDict- dictionary of skip word replacements
'''
for templateLine in templates.value:
skipFound = templateLine.template.search(line.processed)
templateDict = defaultdict(list)
# TODO double check that the defaultdict is working as expected
if skipFound:
for i in range(len(templateLine.skipWords)):
templateDict[templateLine.skipWords[i]].append(skipFound.groups()[i])
return LogLine(line.ts,
line.msg,
line.processed,
line.dictionary,
templateLine.template.pattern,
templateLine.id,
templateDict)
# could not find a template match
return LogLine(line.ts,
line.msg,
line.processed,
line.dictionary,
None,
-1,
templateDict)
def matchTemplates(sc, templateFile, rddLogLine):
templates = readTemplates(sc, templateFile)
templateBroadcast = sc.broadcast(templates)
return rddLogLine.map(lambda line: rdd_MatchLine(line, templateBroadcast))
constants
In [ ]:
timeit()
In [ ]:
logs = 'hdfs://namenode/magichour/tbird500k'
#logs = 'hdfs://namenode/user/dgrossman/tbird.log.10000.gz'
trans = 'hdfs://namenode/magichour/simpleTrans'
templates = 'hdfs://namenode/magichour/templates'
In [ ]:
rddLogs = rdd_ReadLog(sc,logs)
procData = rdd_preProcess(sc,trans,rddLogs)
matched = matchTemplates(sc,templates,procData)
In [ ]:
matched.count()
In [ ]:
timeit()
In [ ]:
def bigStuff(line):
inside = line.templateDict
for i in inside.itervalues():
if len(i) > 1:
return True
return False
xx = matched.filter(bigStuff)
In [ ]:
xxx = xx.take(1000)
for x in xxx:
if x.templateDict is not None and len(x.templateDict) >= 1:
print x
In [ ]:
matched.saveAsPickleFile('hdfs://namenode/magichour/matchedTemplates')