In [ ]:
import json
import re
from collections import OrderedDict

In [ ]:
# Define Helper functions
def sanitize_key(key):
    return re.sub('[,;/]', '_', key)


def parseLine2(line):
    message = OrderedDict()
    firstTag = None
    continuing = None
    for seg in line.split(' '):
        if continuing:
            continuing.append(seg)
            if seg[-1:] == "'":
                message[sanitize_key(firstTag)] = parseLine2(' '.join(continuing)[1:-1])
                continuing = None
        else:
            ind = seg.find('=')
            if ind != -1:
                left = seg[:ind]
                right = seg[ind+1:]
                #(left, right) = seg.split('=')
                if left == 'msg' and right.startswith('audit'):
                    message['timestamp'] =  float(re.search('audit\(([0-9]+.[0-9]+)', seg).group(1))
                elif right.startswith("'"):
                    firstTag = left
                    continuing = [right]
                else:
                    message[sanitize_key(left)] = right
            else:
                message[sanitize_key(seg)] = None


    return message

In [ ]:
logs = sc.textFile("hdfs:///user/ytesfaye/lab41_logs_small.log.gz").repartition(10)

In [ ]:
log_map = logs.map(parseLine2).map(lambda x: json.dumps(x))
log_map_df = sqlCtx.jsonRDD(log_map)
#log_map_df.saveAsParquetFile('hdfs:///user/ytesfaye/lab41_logs_small_parquet')

Use Parquet Representation


In [ ]:
# If we read from a text file then save it as a parquet RDD
log_map_df = sqlCtx.read.parquet('hdfs:///user/ytesfaye/lab41_logs_small_parquet').repartition(4)
log_map_df.registerTempTable('logs2')

In [ ]:
# View the schema that was generated for our parquet data
log_map_df.printSchema()

In [ ]:
# See what UIDs exist in logs
for row in sqlCtx.sql("select distinct(uid) from logs2").collect():
    print row

In [ ]:
# See what user accounts exist in logs
for acct in sqlCtx.sql('select distinct(msg.acct) from logs2 where msg.acct is not null').collect():
    print type(acct.acct), acct.acct

In [ ]:
# Some user account names are in hex, decode
import binascii
acct_name = '28696E76616C6964207573657229'

print binascii.unhexlify(acct_name)

In [ ]:
# Look at values in the type field
lookup_code = {}
counter = 0
for row in sqlCtx.sql('select distinct(type) from logs2').collect():
    lookup_code[row.type] = counter
    counter += 1
print lookup_code