In [ ]:
# Load Data
#tbird_logs = sc.textFile("hdfs:///user/ytesfaye/tbird.log.out.logCluster.processed.gz")#.repartition(10)
tbird_logs = sc.textFile("hdfs:///user/ytesfaye/tbird.log.preProc.gz").repartition(10)

In [ ]:
from collections import namedtuple
LogLine = namedtuple('LogLine', ['ts', 'msg'])
def get_tbird_data(line):
    ls = line.split(' ', 1)
    timestamp = float(ls[0])
    msg = ls[1]
    return LogLine(ts=timestamp, msg=msg)
tbird_loglines = tbird_logs.map(get_tbird_data).cache()

In [ ]:
tbird_loglines.take(5)

In [ ]:
# Get Words that occur more than support times 
from collections import defaultdict
support = 200
def parse_words(line):
    words = set(line.msg.split())
    return [(word, 1) for word in words]

frequentWordDict = tbird_loglines.flatMap(parse_words)\
                                 .reduceByKey(lambda x,y: x+y)\
                                 .filter(lambda (key,count): count > support)\
                                 .collectAsMap()
frequentWords = sc.broadcast(set(frequentWordDict.keys()))

In [ ]:
def extract_patterns(line):
    skip = 0
    freqWordPattern = []
    pattern = []
    for word in line.msg.split():
        if word in frequentWords.value:
            if skip != 0:
                pattern.append(skip)
                skip = 0
            freqWordPattern.append(word)
            pattern.append(word)
        else:
            skip += 1    
    return (tuple(freqWordPattern), pattern)

In [ ]:
def parse_list(pattern):
    freqWordPattern, patterns = pattern
    patterns = set([tuple(pattern) for pattern in patterns])
    aggregate_pattern = [set()]
    for word in freqWordPattern:
        aggregate_pattern.append(word)
        aggregate_pattern.append(set())
    
    for pattern in patterns:
        output_loc = 0
        prev_val = 0
        for word in pattern:
            if isinstance(word, int):
                aggregate_pattern[output_loc].add(word)
                output_loc += 1
                prev_val = 1
            else:
                # TODO: Add check here that it matches what it should match
                #if word != aggregate_pattern[output_loc+1]:
                #    print 'ERROR', word, aggregate_pattern[output_loc]
                if prev_val == 0:
                    aggregate_pattern[output_loc].add(0)
                    output_loc += 2
                else:
                    output_loc += 1
                prev_val = 0
    
    final_pattern = []
    for word in aggregate_pattern:
        if isinstance(word, set):
            if len(word) >= 2:
                final_pattern.append('(:? S+){%d,%d}'%(min(word), max(word)))
            elif len(word) == 1 and 0 not in word: # Always skip the same number of values
                final_pattern.append('(:? S+){%d,%d}'%(min(word), max(word)))
        else:
            final_pattern.append(word)
    return final_pattern

In [ ]:
t = tbird_loglines.map(extract_patterns)\
                  .groupByKey()\
                  .filter(lambda (freqWordPattern, pattern): len(pattern) > support)\
                  .map(parse_list)\
                  .collect()

In [ ]:
for l in sorted(t):
    print ' '.join(l)

In [ ]:
len(t)

In [ ]:
import re
print re.escape(' '.join(parse_list(t[0])))

In [ ]:
for i in t:
    print ' '.join(parse_list(i))[:60]

In [ ]:
r = list(t[1][1])

In [ ]:
for i in range(len(t)):
    if t[i][0][0] == 'USER':
        print t[i][0]

In [ ]:
print t[3][0][0]

In [ ]:
list(t[0][1])

In [ ]: