In [ ]:
import json
import re
from collections import OrderedDict
In [ ]:
# Define Helper functions
def sanitize_key(key):
return re.sub('[,;/]', '_', key)
def parseLine2(line):
message = OrderedDict()
firstTag = None
continuing = None
for seg in line.split(' '):
if continuing:
continuing.append(seg)
if seg[-1:] == "'":
message[sanitize_key(firstTag)] = parseLine2(' '.join(continuing)[1:-1])
continuing = None
else:
ind = seg.find('=')
if ind != -1:
left = seg[:ind]
right = seg[ind+1:]
#(left, right) = seg.split('=')
if left == 'msg' and right.startswith('audit'):
message['timestamp'] = float(re.search('audit\(([0-9]+.[0-9]+)', seg).group(1))
elif right.startswith("'"):
firstTag = left
continuing = [right]
else:
message[sanitize_key(left)] = right
else:
message[sanitize_key(seg)] = None
return message
In [ ]:
logs = sc.textFile("hdfs:///user/ytesfaye/lab41_logs_small.log.gz").repartition(10)
In [ ]:
log_map = logs.map(parseLine2).map(lambda x: json.dumps(x))
log_map_df = sqlCtx.jsonRDD(log_map)
#log_map_df.saveAsParquetFile('hdfs:///user/ytesfaye/lab41_logs_small_parquet')
In [ ]:
# If we read from a text file then save it as a parquet RDD
log_map_df = sqlCtx.read.parquet('hdfs:///user/ytesfaye/lab41_logs_small_parquet').repartition(4)
log_map_df.registerTempTable('logs2')
In [ ]:
# View the schema that was generated for our parquet data
log_map_df.printSchema()
In [ ]:
# See what UIDs exist in logs
for row in sqlCtx.sql("select distinct(uid) from logs2").collect():
print row
In [ ]:
# See what user accounts exist in logs
for acct in sqlCtx.sql('select distinct(msg.acct) from logs2 where msg.acct is not null').collect():
print type(acct.acct), acct.acct
In [ ]:
# Some user account names are in hex, decode
import binascii
acct_name = '28696E76616C6964207573657229'
print binascii.unhexlify(acct_name)
In [ ]:
# Look at values in the type field
lookup_code = {}
counter = 0
for row in sqlCtx.sql('select distinct(type) from logs2').collect():
lookup_code[row.type] = counter
counter += 1
print lookup_code