In [2]:
%%bash
cd ../tmp/
export SPAM_ROOT="http://www.csmining.org/index.php"
export SPAM_PAGE="spam-assassin-datasets.html"
export SPAM_PAGE="$SPAM_PAGE?file=tl_files/Project_Datasets/SpamAssassin%20data"
export SPAM_SPAM_URL="$SPAM_PAGE/20050311_spam_2.tar.bz2"
export SPAM_HAM_URL="$SPAM_PAGE/20030228_easy_ham_2.tar.bz2"
wget -nc $SPAM_ROOT/$SPAM_SPAM_URL
wget -nc $SPAM_ROOT/$SPAM_HAM_URL
export SPAM_SPAM_NAME="${SPAM_SPAM_URL//\//%2F}"
export SPAM_HAM_NAME="${SPAM_HAM_URL//\//%2F}"
tar xf $SPAM_SPAM_NAME
tar xf $SPAM_HAM_NAME
In [29]:
import os
import csv
In [2]:
def list_words(text):
words = []
words_tmp = text.lower().split()
for w in words_tmp:
if w not in words and len(w) > 3:
words.append(w)
return words
In [21]:
dirs = [('spam', '../tmp/spam_2'), ('ham', '../tmp/spam_2')]
for category, path in dirs:
print('Reading %s ...' % category)
files = os.listdir(path)
with open("../tmp/subjects_%s.out" % category, "w") as out:
for fname in files:
file_path = os.path.join(path, fname)
data = None
for encoding in ['ascii', 'utf-8', 'latin_1']:
try:
with open(file_path, encoding=encoding) as f:
data = f.readlines()
break
except:
pass
if data is None:
print('[EE] %s load failed.' % fname)
continue
for line in data:
if line.startswith("Subject:"):
line.replace(",", "")
out.write("{0}, {1} \n".format(line[8:-1], category))
break
In [16]:
# For a more advanced term-document matrix, we can use the Python
# textmining package from:
# https://pypi.python.org/pypi/textmining/1
def list_words(text):
words = []
words_tmp = text.lower().split()
for w in words_tmp:
if w not in words and len(w) > 3:
words.append(w)
return words
In [17]:
def training(texts):
c_words ={}
c_categories ={}
c_texts = 0
c_total_words =0
#add the classes to the categories
for t in texts:
c_texts = c_texts + 1
if t[1] not in c_categories:
c_categories[t[1]] = 1
else:
c_categories[t[1]]= c_categories[t[1]] + 1
#add the words with list_words() function
for t in texts:
words = list_words(t[0])
for p in words:
if p not in c_words:
c_total_words = c_total_words +1
c_words[p] = {}
for c in c_categories:
c_words[p][c] = 0
c_words[p][t[1]] = c_words[p][t[1]] + 1
return (c_words, c_categories, c_texts, c_total_words)
In [18]:
def classifier(subject_line, c_words, c_categories, c_texts, c_tot_words):
category =""
category_prob = 0
for c in c_categories:
#category probability
prob_c = float(c_categories[c])/float(c_texts)
words = list_words(subject_line)
prob_total_c = prob_c
for p in words:
#word probability
if p in c_words:
prob_p= float(c_words[p][c])/float(c_tot_words)
#probability P(category|word)
prob_cond = prob_p/prob_c
#probability P(word|category)
prob =(prob_cond * prob_p)/ prob_c
prob_total_c = prob_total_c * prob
if category_prob < prob_total_c:
category = c
category_prob = prob_total_c
return (category, category_prob)
In [27]:
# dirs = [('spam', '../tmp/spam_2'), ('ham', '../tmp/spam_2')]
# save training data
with open('../tmp/training.csv', 'w') as f:
for category, path in dirs:
category_file_path = "../tmp/subjects_%s.out" % category
with open(category_file_path, 'r') as f_cat:
f.write(','.join(f_cat.readlines()[:100]))
In [30]:
with open('../tmp/training.csv') as f:
subjects = dict(csv.reader(f, delimiter=','))
words,categories,texts,total_words = training(subjects)
In [12]:
clase = classifier(
"Low Cost Easy to Use Conferencing", words,categories,texts,total_words
)
print("Result: {0} ".format(clase))
In [14]:
with open("test.csv") as f:
correct = 0
tests = csv.reader(f)
for subject in test:
clase = classifier(subject[0],w,c,t,tw)
if clase[1] == subject[1]:
correct += 1
print("Efficiency : {0} of 100".format(correct))