In [ ]:
from collections import namedtuple
import re
TransformLine = namedtuple('TransformLine',
['id', 'type', 'NAME', 'transform', 'compiled'])
LogLine = namedtuple('LogLine', ['ts', 'msg',
'processed', 'dictionary', 'supportId'])
def rdd_TransformLine(line):
'''
process transformations into RDD format
Args:
line(string): line from the transform defintion file.
lines beginning with # are considered comments
and will need to be removed
Returns:
retval(TransformLine): namedTuple representation of the tasking
'''
if line.lstrip()[0] != '#':
# id,type,name,transform
l = line.lstrip().rstrip().split(',', 3)
return TransformLine(int(l[0]), l[1], l[2], l[3], re.compile(l[3]))
else:
return TransformLine('COMMENT',
'COMMENT',
'COMMENT',
'COMMENT',
'COMMENT')
def rdd_LogLine(line):
'''
process a log line into a RDD
Args:
line(string): string from the logline
Returns:
retval(LogLine): fills in the first two portions of the LogLine
namedtuple
'''
# depends on tbird log structure
l = line.strip().rstrip().split(' ', 3)
return LogLine(float(l[2]), l[3], None, None, None)
def lineRegexReplacement(line, logTrans):
'''
apply a list of regex replacements to a line, make note of
all the remplacements peformed in a dictionary(list)
Args:
line(LogLine): logline to work on
Globals:
transforms(RDD(TransformLine)): replacemnts to make with
Returns:
retval(LogLine): logline with the processed, and dictionary portions
filled in
'''
text = line.msg.strip()
replaceDict = dict()
for t in logTrans.value:
if t.type == 'REPLACE':
replaceList = t.compiled.findall(text)
if replaceList:
replaceDict[t.NAME] = replaceList
text = t.compiled.sub(t.NAME, text, 0)
if t.type == 'REPLACELIST':
print 'REPLACELIST not implemented yet'
processed = ' '.join(text.split())
retVal = LogLine(line.ts, line.msg.lstrip().rstrip(),
processed.lstrip().rstrip(), replaceDict, None)
return retVal
def readTransforms(sc, transFile):
'''
returns a list of transforms for replacement processing
Args:
sc(sparkContext): spark context
transFile(string): uri to the transform file in HDFS
Returns:
retval(list(TransformLine))
'''
# map the transFile
simpleTransformations = sc.textFile(transFile)
# parse loglines
logTransforms = simpleTransformations.map(rdd_TransformLine).cache()
trans = logTransforms.collect()
lTrans = list()
for t in trans:
if t.id != 'COMMENT':
lTrans.append(t)
return lTrans
def logPreProcess(sc, logTrans, rrdLogLine):
'''
take a series of loglines and pre-process the lines
replace ipaddresses, directories, urls, etc with constants
keep a dictionary of the replacements done to the line
Args:
sc(sparkContext): spark context
logTrans(string): location fo the transFile in HDFS
logFile(string): location of the log data in HDFS
Returns:
retval(RDD(LogLines)): preprocessed log lines ready for next
stage of processing
'''
# following done to make sure that the broadcast gets to the function
return rrdLogLine.map(lambda line: lineRegexReplacement(line, logTrans))
def rdd_preProcess(sc, logTrans, rrdLogLine):
'''
make a rdd of preprocessed loglines
Args:
sc(sparkContext): sparkContext
logTrans(string): location fo the transFile in HDFS
logFile(string): location of the log data in HDFS
Returns:
retval(RDD(LogLines)): preprocessed log lines ready for next
stage of processing
'''
lTrans = readTransforms(sc, logTrans)
logTrans = sc.broadcast(lTrans)
return logPreProcess(sc, logTrans, rrdLogLine)
In [ ]:
def procLogLine(line,logFile):
'''
handles the logfile specific parsing input lines into 2 parts
ts: timestamp float
msg: the rest of the message
Args:
line(string): text to process
logFile(string): hint of URI used for input
should use for switching parsing
based off different directories
Returns:
retval(list[string,string]): [ts, msg]
'''
return line.strip().rstrip().split(' ', 3)[2:]
def rdd_LogLine(line,logFile):
'''
process a log line into a RDD
Args:
line(string): string from the logline
logFile(string): what URI the log lines came from,
eventually want to do different parsing
based on the base of the URI
Returns:
retval(LogLine): fills in the first two portions of the LogLine
namedtuple
'''
l = procLogLine(line,logFile)
return LogLine(float(l[0]), l[1], None, None, None)
def rdd_ReadLog(sc, logFile):
'''
read a log/directory into LogLine RDD format
NOTE: only ts, and msg are populated
Args:
sc(sparkContext)
logFile(string): URI to file toprocess
Returns:
retval(RDD(LogLines): RDD of logs read from the LogFile URI
'''
sparkLogFile = sc.textFile(logFile)
return sparkLogFile.map( lambda line: rdd_LogLine(line, logFile))
In [ ]:
logs = 'hdfs://namenode/magichour/tbird2.log'
#logs = 'hdfs://namenode/user/dgrossman/tbird.log.10000.gz'
trans = 'hdfs://namenode/magichour/simpleTrans'
outDir = 'hdfs://namenode/magichour/2FebTry2'
In [ ]:
rddLogs = rdd_ReadLog(sc,logs)
outData = rdd_preProcess(sc,trans,rddLogs)
outData.saveAsTextFile(outDir)
In [ ]: