In [ ]:
    
import time
def timeit():
    time.ctime()
    return time.strftime('%l:%M%p %Z on %b %d, %Y')
    
In [ ]:
    
timeit()
    
In [ ]:
    
from collections import namedtuple
from collections import defaultdict
import re
import json
LogLine = namedtuple('LogLine', ['ts', 'msg',
                                 'processed', 'dictionary', 
                                 'template','templateId','templateDict'])
TemplateLine = namedtuple('TemplateLine',['id','template','skipWords'])
TransformLine = namedtuple('TransformLine',
                           ['id', 'type', 'NAME', 'transform', 'compiled'])
    
In [ ]:
    
frequentWords = json.load(open('freq_word_dict.json'))
goodTemplates = json.load(open('templates.json'))
    
readLog_RDD.py
In [ ]:
    
def procLogLine(line, logFile):
    '''
    handles the logfile specific parsing input lines into 2 parts
    ts: timestamp float
    msg: the rest of the message
    Args:
        line(string): text to process
        logFile(string): hint of URI used for input
                         should use for switching parsing
                         based off different directories
    Returns:
        retval(list[string,string]): [ts, msg]
    '''
    return line.strip().rstrip().split(' ', 3)[2:]
def rdd_LogLine(line, logFile):
    '''
    process a log line into a RDD
    Args:
        line(string): string from the logline
        logFile(string): what URI the log lines came from,
                         eventually want to do different parsing
                         based on the base of the URI
    Returns:
        retval(LogLine): fills in the first two portions of the LogLine
                         namedtuple
    '''
    l = procLogLine(line, logFile)
    return LogLine(float(l[0]),
                   l[1],
                   None,
                   None,
                   None,
                   None,
                   None)
def rdd_ReadLog(sc, logFile):
    '''
    read a log/directory into LogLine RDD format
    NOTE: only ts, and msg are populated
    Args:
        sc(sparkContext)
        logFile(string): URI to file toprocess
    Returns:
        retval(RDD(LogLines): RDD of logs read from the LogFile URI
    '''
    sparkLogFile = sc.textFile(logFile)
    return sparkLogFile.map(lambda line: rdd_LogLine(line, logFile))
    
preProcess_RDD.py
In [ ]:
    
def rdd_TransformLine(line):
    '''
    process transformations into RDD format
    Args:
        line(string): line from the transform defintion file.
                      lines beginning with # are considered comments
                      and will need to be removed
    Returns:
        retval(TransformLine): namedTuple representation of the tasking
    '''
    if line.lstrip()[0] != '#':
        # id,type,name,transform
        l = line.lstrip().rstrip().split(',', 3)
        return TransformLine(int(l[0]),
                             l[1],
                             l[2],
                             l[3],
                             re.compile(l[3]))
    else:
        return TransformLine('COMMENT',
                             'COMMENT',
                             'COMMENT',
                             'COMMENT',
                             'COMMENT')
def lineRegexReplacement(line, logTrans):
    '''
    apply a list of regex replacements to a line, make note of
    all the remplacements peformed in a dictionary(list)
    Args:
        line(LogLine): logline to work on
    Globals:
        transforms(RDD(TransformLine)): replacemnts to make with
    Returns:
        retval(LogLine): logline with the processed, and dictionary portions
                         filled in
    '''
    text = line.msg.strip()
    replaceDict = dict()
    for t in logTrans.value:
        if t.type == 'REPLACE':
            replaceList = t.compiled.findall(text)
            if replaceList:
                replaceDict[t.NAME] = replaceList
            text = t.compiled.sub(t.NAME, text, 0)
        if t.type == 'REPLACELIST':
            print 'REPLACELIST not implemented yet'
    processed = ' '.join(text.split())
    retVal = LogLine(line.ts, line.msg.lstrip().rstrip(),
                     processed.lstrip().rstrip(), replaceDict, None, None, None)
    return retVal
def readTransforms(sc, transFile):
    '''
    returns a list of transforms for replacement processing
    Args:
        sc(sparkContext): spark context
        transFile(string): uri to the transform file in HDFS
    Returns:
        retval(list(TransformLine))
    '''
    # map the transFile
    simpleTransformations = sc.textFile(transFile)
    # parse loglines
    logTransforms = simpleTransformations.map(rdd_TransformLine).cache()
    trans = logTransforms.collect()
    lTrans = list()
    for t in trans:
        if t.id != 'COMMENT':
            lTrans.append(t)
    return lTrans
def logPreProcess(sc, logTrans, rrdLogLine):
    '''
    take a series of loglines and pre-process the lines
    replace ipaddresses, directories, urls, etc with constants
    keep a dictionary of the replacements done to the line
    Args:
        sc(sparkContext): spark context
        logTrans(string): location fo the transFile in HDFS
        logFile(string): location of the log data in HDFS
    Returns:
        retval(RDD(LogLines)): preprocessed log lines ready for next
                               stage of processing
   '''
    # following done to make sure that the broadcast gets to the function
    return rrdLogLine.map(lambda line: lineRegexReplacement(line, logTrans))
def rdd_preProcess(sc, logTrans, rrdLogLine):
    '''
    make a rdd of preprocessed loglines
    Args:
            sc(sparkContext): sparkContext
            logTrans(string): location fo the transFile in HDFS
            logFile(string): location of the log data in HDFS
    Returns:
            retval(RDD(LogLines)): preprocessed log lines ready for next
                                   stage of processing
    '''
    lTrans = readTransforms(sc, logTrans)
    logTrans = sc.broadcast(lTrans)
    return logPreProcess(sc, logTrans, rrdLogLine)
    
rddTemplate
In [ ]:
    
def badWay(r1, r2):
    '''
    correct way:
    For each pair of regexes r and s for languages L(r) and L(s)
    Find the corresponding Deterministic Finite Automata M(r) and M(s)   [1]
    Compute the cross-product machine M(r x s) and assign accepting states
    so that it computes L(r) - L(s)
    Use a DFS or BFS of the the M(r x s) transition table to see if any
    accepting state can be reached from the start state
    If no, you can eliminate s because L(s) is a subset of L(r).
    Reassign accepting states so that M(r x s) computes L(s) - L(r)
    Repeat the steps above to see if it's possible to eliminate r
    '''
    return(len(r2)-len(r1))
def rankMatches(m):
    retval = sorted(m, cmp=badWay)
    return retval
def getWordSkipNames(s):
    '''
    find the skip word patterns
    Args:
        s(_sre.SRE_Pattern): compiled regex to match a logline
    Returns:
        retval(list(string)): list of the skip patterns found in s
    '''
    pattern = r'\(\(\?\:\\\ \{0,1\}\\S\+\)\{(\d)\,(\d)\}\)'
    matchObj = re.finditer(pattern, s.pattern, re.M | re.I)
    retVal = list()
    if matchObj:
        for m in matchObj:
            vals = m.groups()
            fpattern = r'((?:\ {0,1}\S+){%i,%i})' % (int(vals[0]), int(vals[1]))
            retVal.append(fpattern)
    return retVal
def readTemplates(sc, templateFile):
    '''
    returns a list of regex for replacement processing
    Args:
        sc(sparkContext): spark context
        templateFile(string): uri to the transform file in HDFS
    Returns:
        retval(list(string))
    '''
    # map the templateFile
    templates = sc.textFile(templateFile)
    templateRDD = templates.collect()
    matches = list()
    for t in templateRDD:
        stripped = r''+t.strip().rstrip()
        escaped = re.escape(stripped)
        replaced = unescapeSkips(escaped)
        matches.append(replaced)
    matches = rankMatches(matches)
    templateLines = list()
    for index, m in enumerate(matches):
        # match end of line too
        t = TemplateLine(index,
                         re.compile(m + '$'),
                         getWordSkipNames(re.compile(m)))
        templateLines.append(t)
    return templateLines
def unescapeSkips(s):
    '''
    find an escaped version of skip{m,n} words
    replace with unescaped version
    Args:
        s(string): string to search
    Returns:
        retval(string): string with replacement
    '''
    pattern = r'\\\(\\\:\\\?\\\ S\\\+\\\)\\\{(\d)\\\,(\d)\\\}'
    
    matchObj = re.finditer(pattern, s, re.M | re.I)
    b = s
    if matchObj:
        for m in matchObj:
            newString = r'((?:\ {0,1}\S+){%i,%i})' % (int(m.groups()[0]),
                                                      int(m.groups()[1]))
            # the r is very important
            newFound = r'\\\(\\:\\\?\\ S\\\+\\\)\\\{%i\\,%i\\\}' % (int(m.groups()[0]),
                                                                    int(m.groups()[1]))
            b = re.sub(newFound, newString, b)
        return b
    return s
def rdd_MatchLine(line, templates):
    '''
    assign a log line to a templateId or -1 if no match
    keep track of any skip word replacements, return additional
    informaiton in the LogLine named tuple
    Args:
        line(LogLine): logline being classified
        templates(list(TemplateLine)): templates to attempt to match to
                                       broadcast variable
    Returns:
        retval(LogLine): LogLine  with the final 3 fields filled in
                         template - actual template used for match
                         templateId - number of the template
                         templateDict- dictionary of skip word replacements
    '''
    for templateLine in templates.value:
        skipFound = templateLine.template.search(line.processed)
        templateDict = defaultdict(list)
        # TODO double check that the defaultdict is working as expected
        if skipFound:
            for i in range(len(templateLine.skipWords)):
                    templateDict[templateLine.skipWords[i]].append(skipFound.groups()[i])
            return LogLine(line.ts,
                           line.msg,
                           line.processed,
                           line.dictionary,
                           templateLine.template.pattern,
                           templateLine.id,
                           templateDict)
    # could not find a template match
    return LogLine(line.ts,
                   line.msg,
                   line.processed,
                   line.dictionary,
                   None,
                   -1,
                   templateDict)
def matchTemplates(sc, templateFile, rddLogLine):
    templates = readTemplates(sc, templateFile)
    templateBroadcast = sc.broadcast(templates)
    return rddLogLine.map(lambda line: rdd_MatchLine(line, templateBroadcast))
    
constants
In [ ]:
    
timeit()
    
In [ ]:
    
logs = 'hdfs://namenode/magichour/tbird500k'
#logs = 'hdfs://namenode/user/dgrossman/tbird.log.10000.gz'
trans = 'hdfs://namenode/magichour/simpleTrans'
templates = 'hdfs://namenode/magichour/templates'
    
In [ ]:
    
rddLogs = rdd_ReadLog(sc,logs)
procData = rdd_preProcess(sc,trans,rddLogs)
matched = matchTemplates(sc,templates,procData)
    
In [ ]:
    
matched.count()
    
In [ ]:
    
timeit()
    
In [ ]:
    
def bigStuff(line):
    inside = line.templateDict
    for i in inside.itervalues():
        if len(i) > 1:
            return True
    return False
xx =  matched.filter(bigStuff)
    
In [ ]:
    
xxx = xx.take(1000)
for x in xxx:
    if x.templateDict is not None and len(x.templateDict) >= 1:
        print x
    
In [ ]:
    
matched.saveAsPickleFile('hdfs://namenode/magichour/matchedTemplates')