In [ ]:
from collections import namedtuple
import re

LogLine = namedtuple('LogLine', ['ts', 'msg',
                                 'processed', 'dictionary', 'supportId'])

In [ ]:
def procLogLine(line,logFile):
    '''
    handles the logfile specific parsing input lines into 2 parts
    ts: timestamp float
    msg: the rest of the message
    
    Args:
        line(string): text to process
        logFile(string): hint of URI used for input
                         should use for switching parsing
                         based off different directories
    
    Returns:
        retval(list[string,string]): [ts, msg]
    '''
    return line.strip().rstrip().split(' ', 3)[2:]

def rdd_LogLine(line,logFile):
    '''
    process a log line into a RDD

    Args:
        line(string): string from the logline
        logFile(string): what URI the log lines came from,
                         eventually want to do different parsing
                         based on the base of the URI

    Returns:
        retval(LogLine): fills in the first two portions of the LogLine
                         namedtuple
    '''
    l = procLogLine(line,logFile)
    return LogLine(float(l[0]), l[1], None, None, None)

def rdd_ReadLog(sc, logFile):
    '''
    read a log/directory into LogLine RDD format
    NOTE: only ts, and msg are populated
    Args:
        sc(sparkContext)
        logFile(string): URI to file toprocess

    Returns:
        retval(RDD(LogLines): RDD of logs read from the LogFile URI  
    '''
    sparkLogFile = sc.textFile(logFile)
    
    return sparkLogFile.map(   lambda line: rdd_LogLine(line, logFile))

In [ ]:
logs = 'hdfs://namenode/user/dgrossman/tbird.log.10000.gz'

In [ ]:
test = rdd_ReadLog(sc,logs)
test.take(10)