In [ ]:
import time

def timeit():
    time.ctime()
    return time.strftime('%l:%M%p %Z on %b %d, %Y')

In [ ]:
timeit()

In [ ]:
from collections import namedtuple
from collections import defaultdict
import re
import json

LogLine = namedtuple('LogLine', ['ts', 'msg',
                                 'processed', 'dictionary', 
                                 'template','templateId','templateDict'])

TemplateLine = namedtuple('TemplateLine',['id','template','skipWords'])


TransformLine = namedtuple('TransformLine',
                           ['id', 'type', 'NAME', 'transform', 'compiled'])

In [ ]:
frequentWords = json.load(open('freq_word_dict.json'))
goodTemplates = json.load(open('templates.json'))

readLog_RDD.py


In [ ]:
def procLogLine(line, logFile):
    '''
    handles the logfile specific parsing input lines into 2 parts
    ts: timestamp float
    msg: the rest of the message

    Args:
        line(string): text to process
        logFile(string): hint of URI used for input
                         should use for switching parsing
                         based off different directories

    Returns:
        retval(list[string,string]): [ts, msg]
    '''
    return line.strip().rstrip().split(' ', 3)[2:]


def rdd_LogLine(line, logFile):
    '''
    process a log line into a RDD

    Args:
        line(string): string from the logline
        logFile(string): what URI the log lines came from,
                         eventually want to do different parsing
                         based on the base of the URI

    Returns:
        retval(LogLine): fills in the first two portions of the LogLine
                         namedtuple
    '''
    l = procLogLine(line, logFile)
    return LogLine(float(l[0]),
                   l[1],
                   None,
                   None,
                   None,
                   None,
                   None)


def rdd_ReadLog(sc, logFile):
    '''
    read a log/directory into LogLine RDD format
    NOTE: only ts, and msg are populated
    Args:
        sc(sparkContext)
        logFile(string): URI to file toprocess

    Returns:
        retval(RDD(LogLines): RDD of logs read from the LogFile URI
    '''
    sparkLogFile = sc.textFile(logFile)

    return sparkLogFile.map(lambda line: rdd_LogLine(line, logFile))

preProcess_RDD.py


In [ ]:
def rdd_TransformLine(line):
    '''
    process transformations into RDD format

    Args:
        line(string): line from the transform defintion file.
                      lines beginning with # are considered comments
                      and will need to be removed
    Returns:
        retval(TransformLine): namedTuple representation of the tasking
    '''

    if line.lstrip()[0] != '#':
        # id,type,name,transform
        l = line.lstrip().rstrip().split(',', 3)
        return TransformLine(int(l[0]),
                             l[1],
                             l[2],
                             l[3],
                             re.compile(l[3]))
    else:
        return TransformLine('COMMENT',
                             'COMMENT',
                             'COMMENT',
                             'COMMENT',
                             'COMMENT')

def lineRegexReplacement(line, logTrans):
    '''
    apply a list of regex replacements to a line, make note of
    all the remplacements peformed in a dictionary(list)

    Args:
        line(LogLine): logline to work on

    Globals:
        transforms(RDD(TransformLine)): replacemnts to make with

    Returns:
        retval(LogLine): logline with the processed, and dictionary portions
                         filled in
    '''

    text = line.msg.strip()
    replaceDict = dict()

    for t in logTrans.value:
        if t.type == 'REPLACE':
            replaceList = t.compiled.findall(text)
            if replaceList:
                replaceDict[t.NAME] = replaceList
            text = t.compiled.sub(t.NAME, text, 0)

        if t.type == 'REPLACELIST':
            print 'REPLACELIST not implemented yet'

    processed = ' '.join(text.split())
    retVal = LogLine(line.ts, line.msg.lstrip().rstrip(),
                     processed.lstrip().rstrip(), replaceDict, None, None, None)

    return retVal


def readTransforms(sc, transFile):
    '''
    returns a list of transforms for replacement processing

    Args:
        sc(sparkContext): spark context
        transFile(string): uri to the transform file in HDFS

    Returns:
        retval(list(TransformLine))
    '''

    # map the transFile
    simpleTransformations = sc.textFile(transFile)

    # parse loglines
    logTransforms = simpleTransformations.map(rdd_TransformLine).cache()

    trans = logTransforms.collect()

    lTrans = list()

    for t in trans:
        if t.id != 'COMMENT':
            lTrans.append(t)

    return lTrans


def logPreProcess(sc, logTrans, rrdLogLine):
    '''
    take a series of loglines and pre-process the lines
    replace ipaddresses, directories, urls, etc with constants
    keep a dictionary of the replacements done to the line

    Args:
        sc(sparkContext): spark context
        logTrans(string): location fo the transFile in HDFS
        logFile(string): location of the log data in HDFS

    Returns:
        retval(RDD(LogLines)): preprocessed log lines ready for next
                               stage of processing
   '''

    # following done to make sure that the broadcast gets to the function
    return rrdLogLine.map(lambda line: lineRegexReplacement(line, logTrans))


def rdd_preProcess(sc, logTrans, rrdLogLine):
    '''
    make a rdd of preprocessed loglines

    Args:
            sc(sparkContext): sparkContext
            logTrans(string): location fo the transFile in HDFS
            logFile(string): location of the log data in HDFS

    Returns:
            retval(RDD(LogLines)): preprocessed log lines ready for next
                                   stage of processing
    '''

    lTrans = readTransforms(sc, logTrans)
    logTrans = sc.broadcast(lTrans)
    return logPreProcess(sc, logTrans, rrdLogLine)

rddTemplate


In [ ]:
def badWay(r1, r2):
    '''
    correct way:

    For each pair of regexes r and s for languages L(r) and L(s)
    Find the corresponding Deterministic Finite Automata M(r) and M(s)   [1]
    Compute the cross-product machine M(r x s) and assign accepting states
    so that it computes L(r) - L(s)
    Use a DFS or BFS of the the M(r x s) transition table to see if any
    accepting state can be reached from the start state
    If no, you can eliminate s because L(s) is a subset of L(r).
    Reassign accepting states so that M(r x s) computes L(s) - L(r)
    Repeat the steps above to see if it's possible to eliminate r
    '''

    return(len(r2)-len(r1))


def rankMatches(m):
    retval = sorted(m, cmp=badWay)
    return retval


def getWordSkipNames(s):
    '''
    find the skip word patterns

    Args:
        s(_sre.SRE_Pattern): compiled regex to match a logline

    Returns:
        retval(list(string)): list of the skip patterns found in s
    '''

    pattern = r'\(\(\?\:\\\ \{0,1\}\\S\+\)\{(\d)\,(\d)\}\)'
    matchObj = re.finditer(pattern, s.pattern, re.M | re.I)

    retVal = list()

    if matchObj:
        for m in matchObj:
            vals = m.groups()
            fpattern = r'((?:\ {0,1}\S+){%i,%i})' % (int(vals[0]), int(vals[1]))
            retVal.append(fpattern)

    return retVal


def readTemplates(sc, templateFile):
    '''
    returns a list of regex for replacement processing

    Args:
        sc(sparkContext): spark context
        templateFile(string): uri to the transform file in HDFS

    Returns:
        retval(list(string))
    '''

    # map the templateFile
    templates = sc.textFile(templateFile)

    templateRDD = templates.collect()

    matches = list()

    for t in templateRDD:
        stripped = r''+t.strip().rstrip()
        escaped = re.escape(stripped)
        replaced = unescapeSkips(escaped)
        matches.append(replaced)

    matches = rankMatches(matches)

    templateLines = list()
    for index, m in enumerate(matches):
        # match end of line too
        t = TemplateLine(index,
                         re.compile(m + '$'),
                         getWordSkipNames(re.compile(m)))
        templateLines.append(t)

    return templateLines


def unescapeSkips(s):
    '''
    find an escaped version of skip{m,n} words
    replace with unescaped version

    Args:
        s(string): string to search

    Returns:
        retval(string): string with replacement
    '''

    pattern = r'\\\(\\\:\\\?\\\ S\\\+\\\)\\\{(\d)\\\,(\d)\\\}'
    
    matchObj = re.finditer(pattern, s, re.M | re.I)
    b = s

    if matchObj:
        for m in matchObj:

            newString = r'((?:\ {0,1}\S+){%i,%i})' % (int(m.groups()[0]),
                                                      int(m.groups()[1]))

            # the r is very important
            newFound = r'\\\(\\:\\\?\\ S\\\+\\\)\\\{%i\\,%i\\\}' % (int(m.groups()[0]),
                                                                    int(m.groups()[1]))
            b = re.sub(newFound, newString, b)

        return b
    return s


def rdd_MatchLine(line, templates):
    '''
    assign a log line to a templateId or -1 if no match
    keep track of any skip word replacements, return additional
    informaiton in the LogLine named tuple

    Args:
        line(LogLine): logline being classified
        templates(list(TemplateLine)): templates to attempt to match to
                                       broadcast variable
    Returns:
        retval(LogLine): LogLine  with the final 3 fields filled in
                         template - actual template used for match
                         templateId - number of the template
                         templateDict- dictionary of skip word replacements
    '''

    for templateLine in templates.value:
        skipFound = templateLine.template.search(line.processed)
        templateDict = defaultdict(list)

        # TODO double check that the defaultdict is working as expected
        if skipFound:
            for i in range(len(templateLine.skipWords)):
                    templateDict[templateLine.skipWords[i]].append(skipFound.groups()[i])

            return LogLine(line.ts,
                           line.msg,
                           line.processed,
                           line.dictionary,
                           templateLine.template.pattern,
                           templateLine.id,
                           templateDict)

    # could not find a template match
    return LogLine(line.ts,
                   line.msg,
                   line.processed,
                   line.dictionary,
                   None,
                   -1,
                   templateDict)






def matchTemplates(sc, templateFile, rddLogLine):

    templates = readTemplates(sc, templateFile)
    templateBroadcast = sc.broadcast(templates)
    return rddLogLine.map(lambda line: rdd_MatchLine(line, templateBroadcast))

constants


In [ ]:
timeit()

In [ ]:
logs = 'hdfs://namenode/magichour/tbird500k'
#logs = 'hdfs://namenode/user/dgrossman/tbird.log.10000.gz'
trans = 'hdfs://namenode/magichour/simpleTrans'
templates = 'hdfs://namenode/magichour/templates'

In [ ]:
rddLogs = rdd_ReadLog(sc,logs)
procData = rdd_preProcess(sc,trans,rddLogs)
matched = matchTemplates(sc,templates,procData)

In [ ]:
matched.count()

In [ ]:
timeit()

In [ ]:
def bigStuff(line):
    inside = line.templateDict
    for i in inside.itervalues():
        if len(i) > 1:
            return True
    return False

xx =  matched.filter(bigStuff)

In [ ]:
xxx = xx.take(1000)

for x in xxx:
    if x.templateDict is not None and len(x.templateDict) >= 1:
        print x

In [ ]:
matched.saveAsPickleFile('hdfs://namenode/magichour/matchedTemplates')