In [ ]:
logs = sc.textFile("hdfs://namenode/datasets/magichour/tbird.log.gz").repartition(30)
simpleTransformations = sc.textFile("hdfs://namenode/magichour/simpleTrans")
In [ ]:
#logs.count()
#simpleTransformations.count()
In [ ]:
from collections import namedtuple
import re
TransformLine = namedtuple('TransformLine',['id','type','NAME','transform','compiled'])
LogLine = namedtuple('LogLine', ['ts','msg','processed','dictionary','supportId'])
def get_Transforms(line):
if line.lstrip()[0] != '#':
l = line.lstrip().rstrip().split(',', 3) #id,type,name,transform
return TransformLine(int(l[0]),l[1],l[2],l[3],re.compile(l[3]))
else:
return TransformLine('COMMENT','COMMENT','COMMENT','COMMENT','COMMENT')
logTransforms = simpleTransformations.map(get_Transforms).cache()
#logTransformList = sc.broadcast(list(simpleTransformations))
In [ ]:
# logTransforms.take(5)
In [ ]:
a = logTransforms.collect()
lTrans = list()
for l in a:
if l.id != 'COMMENT':
lTrans.append(l)
logTrans= sc.broadcast(lTrans)
In [ ]:
In [ ]:
potato = tsLine.map(makeTransformedLine)
In [ ]:
potato.take(100)
In [ ]:
potato.count()
In [ ]:
from collections import namedtuple
import re
TransformLine = namedtuple('TransformLine',
['id', 'type', 'NAME', 'transform', 'compiled'])
LogLine = namedtuple('LogLine', ['ts', 'msg',
'processed', 'dictionary', 'supportId'])
def rdd_TransformLine(line):
'''
process transformations into RDD format
Args:
line(string): line from the transform defintion file.
lines beginning with # are considered comments
and will need to be removed
Returns:
retval(TransformLine): namedTuple representation of the tasking
'''
if line.lstrip()[0] != '#':
# id,type,name,transform
l = line.lstrip().rstrip().split(',', 3)
return TransformLine(int(l[0]), l[1], l[2], l[3], re.compile(l[3]))
else:
return TransformLine('COMMENT',
'COMMENT',
'COMMENT',
'COMMENT',
'COMMENT')
def rdd_LogLine(line):
'''
process a log line into a RDD
Args:
line(string): string from the logline
Returns:
retval(LogLine): fills in the first two portions of the LogLine
namedtuple
'''
# depends on tbird log structure
l = line.strip().rstrip().split(' ', 3)
return LogLine(float(l[2]), l[3], None, None, None)
def lineRegexReplacement(line):
'''
apply a list of regex replacements to a line, make note of
all the remplacements peformed in a dictionary(list)
Args:
line(LogLine): logline to work on
Globals:
transforms(RDD(TransformLine)): replacemnts to make with
Returns:
retval(LogLine): logline with the processed, and dictionary portions
filled in
'''
global logTrans
text = line.msg.strip()
replaceDict = dict()
# logTrans is broadcast
for t in logTrans.value:
if t.type == 'REPLACE':
replaceList = t.compiled.findall(text)
if replaceList:
replaceDict[t.NAME] = replaceList
text = t.compiled.sub(t.NAME, text, 0)
if t.type == 'REPLACELIST':
print 'REPLACELIST not implemented yet'
processed = ' '.join(text.split())
retVal = LogLine(line.ts, line.msg.lstrip().rstrip(),
processed.lstrip().rstrip(), replaceDict, None)
return retVal
def readTransforms(transFile):
# map the transFile
simpleTransformations = sc.textFile(transFile)
# parse loglines
return simpleTransformations.map(rdd_TransformLine).cache()
def doPreProcess(logFile, transFile, partitions):
'''
take a series of loglines and pre-process the lines
replace ipaddresses, directories, urls, etc with constants
keep a dictionary of the replacements done to the line
Args:
logFile(string): location of the log data in HDFS
transFile(string): location of the replacement regex in HDFS
partitions(int): number of partitions to apply to the logFile
Returns:
retval(RDD(LogLines)): preprocessed log lines ready for next
stage of processing
'''
# read the logs
logs = sc.textFile(logFile).repartition(partitions)
# read the transforms, removing comments
logTransforms = readTransforms(transFile)
trans = logTransforms.collect()
lTrans = list()
for t in trans:
if t.id != 'COMMENT':
lTrans.append(t)
logTrans = sc.broadcast(lTrans)
tsLine = logs.map(rdd_LogLine)
return tsLine.map(lineRegexReplacement)
In [ ]:
logs = 'hdfs://namenode/datasets/magichour/tbird.log.gz'
logs = 'hdfs://namenode/user/dgrossman/tbird.log.10000.gz'
trans = 'hdfs://namenode/magichour/simpleTrans'
In [ ]:
out = doPreProcess(logs,trans,1)
In [ ]:
out.take(100)
In [ ]: