In [1]:
from collections import defaultdict
import subprocess
In [2]:
# part 1
In [3]:
subprocess.call('python h1-p/count_freqs.py h1-p/gene.train > gene.counts', shell=True)
Out[3]:
In [4]:
def read_counts(counts_file):
"""
Read counts_file and counts as
emissions[word][tag] = count
xgrams[1-GRAM|2-GRAM|3-GRAM][xgram_str] = count
"""
emission_counts = defaultdict(lambda: defaultdict(int))
xgram_counts = defaultdict(lambda: defaultdict(int))
for line in counts_file:
parts = line.rstrip().split(' ')
count = int(parts[0])
count_type = parts[1]
if count_type == 'WORDTAG':
# 8 WORDTAG I-GENE U5
emission_counts[parts[3]][parts[2]] = count
else:
# 749 3-GRAM * * I-GENE
xgram_counts[count_type][' '.join(parts[2:])] = count
return emission_counts, xgram_counts
In [5]:
import re
REPLACEMENT_RULES = [('_RARE_', '.*')]
def replace_infrequent_words(emissions, in_file, out_file, replace_rules,
threshold=5):
for in_line in in_file:
in_line = in_line.rstrip()
if not in_line:
out_file.write('\n')
continue
word = in_line.split(' ', 2)[0]
out_line = in_line
total_count = sum(emissions[word].values())
if total_count < threshold:
for rule in replace_rules:
new_word, regex = rule[0], rule[1]
if re.search(regex, word):
out_line = out_line.replace(word, new_word, 1)
break
out_file.write(out_line + '\n')
In [6]:
with open('gene.counts', 'rt') as counts_file:
emission_counts, xgram_counts = read_counts(counts_file)
In [7]:
with open('h1-p/gene.train', 'rt') as in_file, \
open('gene_infreq.train', 'wt') as out_file:
replace_infrequent_words(emission_counts, in_file, out_file, REPLACEMENT_RULES)
In [8]:
subprocess.call('python h1-p/count_freqs.py gene_infreq.train > gene_infreq.counts', shell=True)
Out[8]:
In [9]:
with open('gene_infreq.counts', 'rt') as counts_file:
emission_counts, xgram_counts = read_counts(counts_file)
In [10]:
def calc_emission_probs(emission_counts, xgram_counts):
emission_probs = defaultdict(lambda: defaultdict(float))
for word, tag_dict in emission_counts.iteritems():
for tag, count in tag_dict.iteritems():
emission_probs[word][tag] = float(count) / xgram_counts['1-GRAM'][tag]
return emission_probs
In [11]:
emission_probs = calc_emission_probs(emission_counts, xgram_counts)
# check emission probs
# sum(map(lambda x: emission_probs[x]['I-GENE'], emission_probs.keys()))
# sum(map(lambda x: emission_probs[x]['O'], emission_probs.keys()))
In [12]:
def unigram_tagger(word, emission_probs):
word = word if word in emission_probs else '_RARE_'
probs = emission_probs[word]
sorted_tags = sorted(probs, key=lambda key: -probs[key])
return sorted_tags[0]
In [13]:
def apply_unigram_tagger(in_file_path, out_file_path):
with open(in_file_path, 'rt') as in_file, \
open(out_file_path, 'wt') as out_file:
for in_line in in_file:
word = in_line.rstrip()
out_line = ''
if len(word) > 0:
tag = unigram_tagger(word, emission_probs)
out_line = "{0} {1}".format(word, tag)
out_file.write(out_line + '\n')
apply_unigram_tagger('h1-p/gene.dev', 'gene_dev.p1.out')
apply_unigram_tagger('h1-p/gene.test', 'gene_test.p1.out')
In [14]:
# part 2
In [15]:
def calc_transition_probs(xgram_counts):
trigrams_dict = xgram_counts['3-GRAM']
bigrams_dict = xgram_counts['2-GRAM']
q_probs = defaultdict(lambda: 0.0)
for trigram, count in trigrams_dict.iteritems():
tags = trigram.split(' ')
bigram = ' '.join(tags[:2])
q_probs[trigram] = float(count) / bigrams_dict[bigram]
return q_probs
transition_probs = calc_transition_probs(xgram_counts)
In [16]:
TAGS = ('O', 'I-GENE')
def viterbi_tagger(sentence, transition_probs, emission_probs):
pi = defaultdict(lambda: defaultdict(lambda: 0.0))
pi[0]['* *'] = 1.0
bp = defaultdict(lambda: {})
Sk_1 = Sk_2 = ('*')
Sk = TAGS
n = len(sentence)
# states order: w, u, v
for k, x in enumerate(sentence):
# print '### ' + x
k += 1
# this should be moved outsite of the tagger
x = x if x in emission_probs else '_RARE_'
for v in Sk:
for u in Sk_1:
possible_pi = {}
for w in Sk_2:
prev_state = ' '.join((w, u))
trans_state = ' '.join((w, u, v))
q = transition_probs[trans_state]
e = emission_probs[x][v]
prev_pi = pi[k-1][prev_state]
possible_pi[w] = prev_pi * q * e
# print trans_state + ' -> ' + str((prev_pi, q, e, prev_pi * q * e))
# pick the most probable state for w
sorted_w = sorted(possible_pi, key=lambda key: -possible_pi[key])
max_w = sorted_w[0]
max_pi = possible_pi[max_w]
new_state = ' '.join((u, v))
pi[k][new_state] = max_pi
bp[k][new_state] = max_w
# update state sets
Sk_2 = Sk_1
Sk_1 = Sk
# pick the most probable last state
possible_pi = {}
for v in Sk:
for u in Sk_1:
last_state = ' '.join((u, v))
q = transition_probs[last_state + ' STOP']
possible_pi[last_state] = pi[n][last_state] * q
sorted_last_states = sorted(possible_pi, key=lambda key: -possible_pi[key])
last_state = sorted_last_states[0].split(' ', 1)
# move backwards along the most probable path
predicted_tags = {}
predicted_tags[n] = last_state[1]
predicted_tags[n-1] = last_state[0]
for k in range(n-2,0,-1):
next_state = ' '.join((predicted_tags[k+1], predicted_tags[k+2]))
predicted_tags[k] = bp[k+2][next_state]
return predicted_tags
In [17]:
def sentence_iterator(in_file):
current_sentence = []
for line in in_file:
line = line.rstrip()
if line == '':
yield current_sentence
current_sentence = []
else:
current_sentence.append(line)
if current_sentence: # If the last line was blank, we're done
yield current_sentence
In [18]:
def apply_viterbi_tagger(in_org_file_path, in_rep_file_path, out_file_path):
with open(in_org_file_path, 'rt') as in_org_file, \
open(in_rep_file_path, 'rt') as in_rep_file, \
open(out_file_path, 'wt') as out_file:
org_sentence_gen = sentence_iterator(in_org_file)
for sentence in sentence_iterator(in_rep_file):
tags = viterbi_tagger(sentence, transition_probs, emission_probs)
org_sentence = org_sentence_gen.next()
for k, word in enumerate(org_sentence):
line = "{0} {1}\n".format(word, tags[k + 1])
out_file.write(line)
out_file.write('\n')
In [19]:
with open('h1-p/gene.test', 'rt') as in_file, \
open('gene.p3.test', 'wt') as out_file:
replace_infrequent_words(emission_counts, in_file, out_file,
REPLACEMENT_RULES)
In [20]:
apply_viterbi_tagger('h1-p/gene.test', 'gene.p3.test', 'gene_test.p2.out')
In [21]:
# part 3
In [22]:
with open('gene.counts', 'rt') as counts_file:
emission_counts, xgram_counts = read_counts(counts_file)
In [23]:
# Numeric The word is rare and contains at least one numeric characters.
# All Capitals The word is rare and consists entirely of capitalized letters.
# Last Capital The word is rare, not all capitals, and ends with a capital letter
# Rare The word is rare and does not fit in the other classes.
REPLACEMENT_RULES = (
('_RARE_NUMERIC_', '\d+'),
('_RARE_ALL_CAP_', '^[A-Z0-9]+$'),
('_RARE_END_CAP_', '.+[A-Z]$'),
('_RARE_', '.*')
)
In [24]:
with open('h1-p/gene.train', 'rt') as in_file, \
open('gene_infreq.p3.train', 'wt') as out_file:
replace_infrequent_words(emission_counts, in_file, out_file,
REPLACEMENT_RULES)
In [25]:
with open('h1-p/gene.test', 'rt') as in_file, \
open('gene.p3.test', 'wt') as out_file:
replace_infrequent_words(emission_counts, in_file, out_file,
REPLACEMENT_RULES)
In [26]:
subprocess.call('python h1-p/count_freqs.py gene_infreq.p3.train > gene_infreq.p3.counts', shell=True)
Out[26]:
In [27]:
with open('gene_infreq.p3.counts', 'rt') as counts_file:
emission_counts, xgram_counts = read_counts(counts_file)
transition_probs = calc_transition_probs(xgram_counts)
emission_probs = calc_emission_probs(emission_counts, xgram_counts)
In [28]:
# apply_viterbi_tagger('h1-p/gene.test', 'gene_test.p2.out')
apply_viterbi_tagger('h1-p/gene.test', 'gene.p3.test', 'gene_test.p3.out')