In [ ]:
from collections import namedtuple
import re

TransformLine = namedtuple('TransformLine',
                           ['id', 'type', 'NAME', 'transform', 'compiled'])

LogLine = namedtuple('LogLine', ['ts', 'msg',
                                 'processed', 'dictionary', 'supportId'])


def rdd_TransformLine(line):
    '''
    process transformations into RDD format

    Args:
        line(string): line from the transform defintion file.
                      lines beginning with # are considered comments
                      and will need to be removed
    Returns:
        retval(TransformLine): namedTuple representation of the tasking
    '''

    if line.lstrip()[0] != '#':
        # id,type,name,transform
        l = line.lstrip().rstrip().split(',', 3)
        return TransformLine(int(l[0]), l[1], l[2], l[3], re.compile(l[3]))
    else:
        return TransformLine('COMMENT',
                             'COMMENT',
                             'COMMENT',
                             'COMMENT',
                             'COMMENT')


def rdd_LogLine(line):
    '''
    process a log line into a RDD

    Args:
        line(string): string from the logline

    Returns:
        retval(LogLine): fills in the first two portions of the LogLine
                         namedtuple
    '''

    # depends on tbird log structure
    l = line.strip().rstrip().split(' ', 3)
    return LogLine(float(l[2]), l[3], None, None, None)


def lineRegexReplacement(line, logTrans):
    '''
    apply a list of regex replacements to a line, make note of
    all the remplacements peformed in a dictionary(list)

    Args:
        line(LogLine): logline to work on

    Globals:
        transforms(RDD(TransformLine)): replacemnts to make with

    Returns:
        retval(LogLine): logline with the processed, and dictionary portions
                         filled in
    '''

    text = line.msg.strip()
    replaceDict = dict()

    for t in logTrans.value:
        if t.type == 'REPLACE':
            replaceList = t.compiled.findall(text)
            if replaceList:
                replaceDict[t.NAME] = replaceList
            text = t.compiled.sub(t.NAME, text, 0)

        if t.type == 'REPLACELIST':
            print 'REPLACELIST not implemented yet'

    processed = ' '.join(text.split())
    retVal = LogLine(line.ts, line.msg.lstrip().rstrip(),
                     processed.lstrip().rstrip(), replaceDict, None)

    return retVal


def readTransforms(sc, transFile):
    '''
    returns a list of transforms for replacement processing

    Args:
        sc(sparkContext): spark context
        transFile(string): uri to the transform file in HDFS

    Returns:
        retval(list(TransformLine))
    '''

    # map the transFile
    simpleTransformations = sc.textFile(transFile)

    # parse loglines
    logTransforms = simpleTransformations.map(rdd_TransformLine).cache()

    trans = logTransforms.collect()

    lTrans = list()

    for t in trans:
        if t.id != 'COMMENT':
            lTrans.append(t)

    return lTrans


def logPreProcess(sc, logTrans, rrdLogLine):
    '''
        take a series of loglines and pre-process the lines
        replace ipaddresses, directories, urls, etc with constants
        keep a dictionary of the replacements done to the line

        Args:
            sc(sparkContext): spark context
            logTrans(string): location fo the transFile in HDFS
            logFile(string): location of the log data in HDFS

        Returns:
            retval(RDD(LogLines)): preprocessed log lines ready for next
                                   stage of processing
   '''
    
    # following done to make sure that the broadcast gets to the function
    return rrdLogLine.map(lambda line: lineRegexReplacement(line, logTrans))


def rdd_preProcess(sc, logTrans, rrdLogLine):
    '''
    make a rdd of preprocessed loglines

     Args:
            sc(sparkContext): sparkContext
            logTrans(string): location fo the transFile in HDFS
            logFile(string): location of the log data in HDFS

    Returns:
            retval(RDD(LogLines)): preprocessed log lines ready for next
                                   stage of processing
    '''

    lTrans = readTransforms(sc, logTrans)
    logTrans = sc.broadcast(lTrans)
    return logPreProcess(sc, logTrans, rrdLogLine)

In [ ]:
def procLogLine(line,logFile):
    '''
    handles the logfile specific parsing input lines into 2 parts
    ts: timestamp float
    msg: the rest of the message
    
    Args:
        line(string): text to process
        logFile(string): hint of URI used for input
                         should use for switching parsing
                         based off different directories
    
    Returns:
        retval(list[string,string]): [ts, msg]
    '''
    return line.strip().rstrip().split(' ', 3)[2:]

def rdd_LogLine(line,logFile):
    '''
    process a log line into a RDD

    Args:
        line(string): string from the logline
        logFile(string): what URI the log lines came from,
                         eventually want to do different parsing
                         based on the base of the URI

    Returns:
        retval(LogLine): fills in the first two portions of the LogLine
                         namedtuple
    '''
    l = procLogLine(line,logFile)
    return LogLine(float(l[0]), l[1], None, None, None)

def rdd_ReadLog(sc, logFile):
    '''
    read a log/directory into LogLine RDD format
    NOTE: only ts, and msg are populated
    Args:
        sc(sparkContext)
        logFile(string): URI to file toprocess

    Returns:
        retval(RDD(LogLines): RDD of logs read from the LogFile URI  
    '''
    sparkLogFile = sc.textFile(logFile)
    
    return sparkLogFile.map(   lambda line: rdd_LogLine(line, logFile))

In [ ]:
logs = 'hdfs://namenode/magichour/tbird2.log'
#logs = 'hdfs://namenode/user/dgrossman/tbird.log.10000.gz'
trans = 'hdfs://namenode/magichour/simpleTrans'
outDir = 'hdfs://namenode/magichour/2FebTry2'

In [ ]:
rddLogs = rdd_ReadLog(sc,logs)
outData = rdd_preProcess(sc,trans,rddLogs)
outData.saveAsTextFile(outDir)

In [ ]: