In [ ]:
from re import split
import mailbox
# Recibe un string con el subject y body del un solo correo
# Devuelve un set con las palabras de ese string
# separadas según delimiter, un regex
def separate_in_words(text, delimiter):
words = set()
for w in split(delimiter, text):
words.add(w)
if '' in words:
words.remove('')
return words
# Recibe un string con la ruta de un .mbox
# Devuelve generator de un set de palabras por correo recorrido
def words_per_mail(mbox, delimiter):
for mail in mailbox.mbox(mbox):
text = mail['subject'] + ' ' + mail.get_payload()
words = separate_in_words(text, delimiter)
yield words
# Recibe un string con la ruta de un .mbox
# Devuelve dict con key = <una palabra> y value = <nº de correos en los que aparece>,
# y el número de correos del mbox
def training_dict(mbox, delimiter):
dictionary = {}
count = 0
for words in words_per_mail(mbox, delimiter):
for w in words:
dictionary[w] = dictionary[w] + 1 if w in dictionary else 1
count += 1
return dictionary, count
In [ ]:
# Devuelve dict de la forma {'word' : Sw }, el número de correos Spam,
# otro dict de la forma {'word' : Hw } y el número de correos Ham
def training(spambox, hambox, delimiter = '\W+'):
spam_dict, s = training_dict(spambox, delimiter)
ham_dict, h = training_dict(hambox, delimiter)
return spam_dict, s, ham_dict, h
In [ ]:
# Devuelve lista de máximo n palabras con mejor clasificación individual
def best_N_words(words, spam_dict, ham_dict, n):
bests = [] # De la forma: [(Pw1, w1), (Pw2, w2), (Pw3, w3), ...]
for word in words:
sw = spam_dict[word] if word in spam_dict else 0 # Ocurrencias en Spam
hw = ham_dict[word] if word in ham_dict else 0 # Ocurrencias en Ham
# Bayes de P(y|x) ∈ [0,1] con 0.5 si es palabra neutra
# P(y|x) más próximo a 1 o 0 es la mejor clasificación individual
prob = 0.5 if sw == 0 and hw == 0 else sw/(sw+hw)
p = abs(prob - 0.5) # Ahora 0.5 es la mejor clasificación individual, con p ∈ [0,0.5]
if p != 0:
bests.append(( p, word )) # Excluimos las palabras neutras de p = 0
# Ordenamos de forma decreciente por probabilidad y devolvemos las n primeras palabras
return [word for probability, word in sorted(bests, reverse=True)][:n]
# Naive-Bayes de P(y|x1,...,xn) que suaviza creando entrada ficticia
# en el conjunto de entrenamiento por cada palabra que no se encuentra en este
def naive_bayes(bests, spam_dict, s, ham_dict, h, debug):
debug = print if debug else lambda *a: None # Igual a print(), o a función que no hace nada
s_extra = h_extra = 0 # Cuenta las veces que suavizamos
pi_sw = pi_hw = 1 # ∏Sw y ∏Hw para Naive Bayes
for word in bests:
sw = spam_dict[word] if word in spam_dict else 0 # Ocurrencias en Spam
hw = ham_dict[word] if word in ham_dict else 0 # Ocurrencias en Ham
# Suavizamos al cambiar 0 por 1 y contamos las veces que lo hacemos
if sw==0:
s_extra += 1
sw = 1
if hw==0:
h_extra += 1
hw = 1
pi_sw *= sw
pi_hw *= hw
debug('Sw:',sw, '— Hw:', hw, '—', word)
s += s_extra*len(bests) # Agregamos tantos 1 como veces hemos suavizado
h += h_extra*len(bests)
probability = pi_sw / (pi_sw + pi_hw * (s/h) ** (len(bests)-1)) # Naive Bayes
debug('S:', s, '— H:', h)
debug('∏Sw:', pi_sw, '— ∏Hw:', pi_hw)
debug('P(y|x1...xn):',probability, '=> Spam' if probability > 0.9 else '=> Ham', '\n')
return probability
In [ ]:
# Función que recibe un mbox de correos nuevos
# Devuelve una lista de 0 o 1 por cada correo nuevo si es ham o spam
def clasification(newsbox, spam_dict, s, ham_dict, h, delimiter = '\W+', n = 15, debug = False):
clas = []
for words in words_per_mail(newsbox, delimiter):
bests = best_N_words(words, spam_dict, ham_dict, n)
probability = naive_bayes(bests, spam_dict, s, ham_dict, h, debug = debug)
is_spam = 1 if probability > 0.9 else 0
clas.append(is_spam)
return clas
In [ ]:
# Escribe los correos nuevos del mbox ubicado en la ruta 'new_mbox'
# en el mbox ubicado en la ruta 'mbox'
def append_new_to_mbox(new_mbox, mbox):
with open(mbox, 'a') as writable:
with open(new_mbox) as readable:
writable.write(readable.read())
# devuelve size actualizado y un nuevo dictionary actualizado a partir del set de palabras
# de cada correo del mbox ubicado en la ruta 'new_mbox'
def update_dict_and_size(new_mbox, dictionary, size, delimiter):
updated_dict = dictionary.copy()
for words in words_per_mail(new_mbox, delimiter):
for w in words:
updated_dict[w] = updated_dict[w] + 1 if w in updated_dict else 1
size += 1
return updated_dict, size
In [ ]:
# Escribe los correos del mbox ubicado en la ruta 'new_mbox' en el mbox ubicado en la ruta 'mbox'
# Actualiza los dictionarios y tamaño de este mbox
# Tras su uso, no requiere volver a ejecutar training()
def incorporation(new_mbox, mbox, dictionary, size, delimiter = '\W+'):
append_new_to_mbox(new_mbox, mbox)
return update_dict_and_size(new_mbox, dictionary, size, delimiter)
In [ ]:
hambox = './ham.mbox'
spambox = './spam.mbox'
newsbox = './news.mbox'
new_hambox = './new-ham.mbox'
new_spambox = './new-spam.mbox'
# Criterio de separación alternativo
# delimiter = '[.@_#/]?[\s]+|[,;:()<>*~][\s]*'
# Número de palabras alternativo para hacer Näive-Bayes
# n = 19
In [ ]:
# Entrenamiento
spam_dict, s, ham_dict, h = training(spambox, hambox)
# Cantidad actual de correos spam y ham para el entrenamiento
print('[Tras el entrenamiento]')
print('S:', s, '— H:', h)
print('#Sw:', len(spam_dict), '— #Hw:', len(ham_dict))
In [ ]:
# Clasificación
res = clasification(newsbox, spam_dict, s, ham_dict, h, debug = False)
print(res)
In [ ]:
# Cantidad actual de correos spam y ham para el entrenamiento
print('[Antes de incorporar]')
print('S:', s, '— H:', h)
print('#Sw:', len(spam_dict), '— #Hw:', len(ham_dict))
# Incorporación a spam.box
spam_dict, s = incorporation(new_spambox, spambox, spam_dict, s)
# Incorporación a ham.box
ham_dict, h = incorporation(new_hambox, hambox, ham_dict, h)
# Cantidad nueva de correos spam y ham para el entrenamiento
print('\n[Después de incorporar]')
print('S:', s, '— H:', h)
print('#Sw:', len(spam_dict), '— #Hw:', len(ham_dict))