In [ ]:
logs = sc.textFile("hdfs://namenode/datasets/magichour/tbird.log.gz").repartition(30)
simpleTransformations = sc.textFile("hdfs://namenode/magichour/simpleTrans")

In [ ]:
#logs.count()
#simpleTransformations.count()

In [ ]:
from collections import namedtuple
import re

TransformLine = namedtuple('TransformLine',['id','type','NAME','transform','compiled'])
LogLine = namedtuple('LogLine', ['ts','msg','processed','dictionary','supportId'])

def get_Transforms(line):
    if line.lstrip()[0] != '#':
        l = line.lstrip().rstrip().split(',', 3)  #id,type,name,transform
        return TransformLine(int(l[0]),l[1],l[2],l[3],re.compile(l[3]))
    else:
        return TransformLine('COMMENT','COMMENT','COMMENT','COMMENT','COMMENT')

logTransforms = simpleTransformations.map(get_Transforms).cache()
#logTransformList = sc.broadcast(list(simpleTransformations))

In [ ]:
# logTransforms.take(5)

In [ ]:
a = logTransforms.collect()
lTrans = list()

for l in a:
    if l.id != 'COMMENT':
        lTrans.append(l)
        
logTrans= sc.broadcast(lTrans)

In [ ]:

def makeTransformedLine(l): ''' apply a list of regex replacements to a line, make note of all the remplacements peformed in a dictionary(list) Args: l(LogLine): logline to work on transforms(list(TransformLine)): replacemnts to make with Returns: retval(LogLine): logline with the processed, and dictionary portions filled in ''' text = l.msg.strip() replaceDict = dict() # logTrans is broadcast for t in logTrans.value: if t.type == 'REPLACE': replaceList = t.compiled.findall(text) if replaceList: replaceDict[t.NAME] = replaceList text = t.compiled.sub(t.NAME, text, 0) if t.type == 'REPLACELIST': print 'REPLACELIST not implemented yet' processed = ' '.join(text.split()) retVal = LogLine(l.ts, l.msg.lstrip().rstrip(), processed.lstrip().rstrip(), replaceDict, None) return retVal

In [ ]:
potato = tsLine.map(makeTransformedLine)

In [ ]:
potato.take(100)

In [ ]:
potato.count()

In [ ]:
from collections import namedtuple
import re

TransformLine = namedtuple('TransformLine',
                           ['id', 'type', 'NAME', 'transform', 'compiled'])

LogLine = namedtuple('LogLine', ['ts', 'msg',
                                 'processed', 'dictionary', 'supportId'])


def rdd_TransformLine(line):
    '''
    process transformations into RDD format

    Args:
        line(string): line from the transform defintion file.
                      lines beginning with # are considered comments
                      and will need to be removed
    Returns:
        retval(TransformLine): namedTuple representation of the tasking
    '''

    if line.lstrip()[0] != '#':
        # id,type,name,transform
        l = line.lstrip().rstrip().split(',', 3)
        return TransformLine(int(l[0]), l[1], l[2], l[3], re.compile(l[3]))
    else:
        return TransformLine('COMMENT',
                             'COMMENT',
                             'COMMENT',
                             'COMMENT',
                             'COMMENT')


def rdd_LogLine(line):
    '''
    process a log line into a RDD

    Args:
        line(string): string from the logline

    Returns:
        retval(LogLine): fills in the first two portions of the LogLine
                         namedtuple
    '''

    # depends on tbird log structure
    l = line.strip().rstrip().split(' ', 3)
    return LogLine(float(l[2]), l[3], None, None, None)


def lineRegexReplacement(line):
    '''
    apply a list of regex replacements to a line, make note of
    all the remplacements peformed in a dictionary(list)

    Args:
        line(LogLine): logline to work on

    Globals:
        transforms(RDD(TransformLine)): replacemnts to make with

    Returns:
        retval(LogLine): logline with the processed, and dictionary portions
                         filled in
    '''
    global logTrans
    text = line.msg.strip()
    replaceDict = dict()

    # logTrans is broadcast
    for t in logTrans.value:
        if t.type == 'REPLACE':
            replaceList = t.compiled.findall(text)
            if replaceList:
                replaceDict[t.NAME] = replaceList
            text = t.compiled.sub(t.NAME, text, 0)

        if t.type == 'REPLACELIST':
            print 'REPLACELIST not implemented yet'

    processed = ' '.join(text.split())
    retVal = LogLine(line.ts, line.msg.lstrip().rstrip(),
                     processed.lstrip().rstrip(), replaceDict, None)

    return retVal


def readTransforms(transFile):

    # map the transFile
    simpleTransformations = sc.textFile(transFile)

    # parse loglines
    return simpleTransformations.map(rdd_TransformLine).cache()

   

def doPreProcess(logFile, transFile, partitions):
    '''
        take a series of loglines and pre-process the lines
        replace ipaddresses, directories, urls, etc with constants
        keep a dictionary of the replacements done to the line

        Args:
            logFile(string): location of the log data in HDFS
            transFile(string): location of the replacement regex in HDFS
            partitions(int): number of partitions to apply to the logFile

        Returns:
            retval(RDD(LogLines)): preprocessed log lines ready for next
                                   stage of processing
   '''

    # read the logs
    logs = sc.textFile(logFile).repartition(partitions)

    # read the transforms, removing comments
    logTransforms = readTransforms(transFile)

    trans = logTransforms.collect()
    lTrans = list()

    for t in trans:
        if t.id != 'COMMENT':
            lTrans.append(t)

    logTrans = sc.broadcast(lTrans)

    
    tsLine = logs.map(rdd_LogLine)
    return tsLine.map(lineRegexReplacement)

In [ ]:
logs = 'hdfs://namenode/datasets/magichour/tbird.log.gz'
logs = 'hdfs://namenode/user/dgrossman/tbird.log.10000.gz'
trans = 'hdfs://namenode/magichour/simpleTrans'

In [ ]:
out = doPreProcess(logs,trans,1)

In [ ]:
out.take(100)

In [ ]: