Filtro de spam mediante inferencia probabilística

Entrenamiento:

  • Separar un texto en palabras diferentes.
  • Devolver el texto del título y cuerpo del mensaje al iterar sobre un mbox.
  • Devolver el número de correos de un mbox junto a un diccionario de palabras con el número de correos en los que aparece.
  • Devolver dos diccionarios, uno para spambox y otro para hambox, y el número de correos que contiene cada mbox.

Clasificación:

  • Devolver una lista de máximo n palabras con mejor clasificación individual
  • Devolver la probabilidad P(y|x1, ..., xn) con suavizado aditivo de Laplace.
  • Devolver una lista de 0 o 1 por cada correo nuevo si es ham o spam, respectivamente.

Incorporación:

  • Escribir los correos nuevos en un mbox del conjunto de entrenamiento.
  • Devolver el nuevo número de correos del conjunto de entrenamiento y un nuevo diccionario actualizado
  • Agrupar las 2 tareas anteriores en un procedimiento.

Ejemplo de uso:

  • Ejemplo de uso de los 3 procedimientos anteriores

Entrenamiento

Utilidad


In [ ]:
from re import split
import mailbox


# Recibe un string con el subject y body del un solo correo
# Devuelve un set con las palabras de ese string
# separadas según delimiter, un regex
def separate_in_words(text, delimiter):
    words = set()
    for w in split(delimiter, text):
        words.add(w)
    if '' in words:
        words.remove('')
    return words


# Recibe un string con la ruta de un .mbox
# Devuelve generator de un set de palabras por correo recorrido
def words_per_mail(mbox, delimiter):  
    for mail in mailbox.mbox(mbox):
        text = mail['subject'] + ' ' + mail.get_payload()
        words = separate_in_words(text, delimiter)
        yield words


# Recibe un string con la ruta de un .mbox
# Devuelve dict con key = <una palabra> y value = <nº de correos en los que aparece>,
# y el número de correos del mbox
def training_dict(mbox, delimiter):
    dictionary = {}
    count = 0
    for words in words_per_mail(mbox, delimiter):
        for w in words:
            dictionary[w] = dictionary[w] + 1 if w in dictionary else 1
        count += 1
    return dictionary, count

Procedimiento


In [ ]:
# Devuelve dict de la forma {'word' : Sw }, el número de correos Spam,
# otro dict de la forma {'word' : Hw } y el número de correos Ham
def training(spambox, hambox, delimiter = '\W+'):
    
    spam_dict, s = training_dict(spambox, delimiter)    
    ham_dict, h = training_dict(hambox, delimiter)
    
    return spam_dict, s, ham_dict, h

Clasificación

Utilidad


In [ ]:
# Devuelve lista de máximo n palabras con mejor clasificación individual
def best_N_words(words, spam_dict, ham_dict, n):
    
    bests = []  # De la forma: [(Pw1, w1), (Pw2, w2), (Pw3, w3), ...]
    for word in words:        
        sw = spam_dict[word] if word in spam_dict else 0  # Ocurrencias en Spam
        hw = ham_dict[word]  if word in ham_dict  else 0  # Ocurrencias en Ham
        
        # Bayes de P(y|x) ∈ [0,1] con 0.5 si es palabra neutra
        # P(y|x) más próximo a 1 o 0 es la mejor clasificación individual
        prob = 0.5 if sw == 0 and hw == 0 else sw/(sw+hw)
        
        p  = abs(prob - 0.5)  # Ahora 0.5 es la mejor clasificación individual, con p ∈ [0,0.5]
        
        if p != 0:
            bests.append(( p, word ))  # Excluimos las palabras neutras de p = 0
            
    # Ordenamos de forma decreciente por probabilidad y devolvemos las n primeras palabras
    return [word for probability, word in sorted(bests, reverse=True)][:n]


# Naive-Bayes de P(y|x1,...,xn) que suaviza creando entrada ficticia
# en el conjunto de entrenamiento por cada palabra que no se encuentra en este
def naive_bayes(bests, spam_dict, s, ham_dict, h, debug):
    
    debug = print if debug else lambda *a: None  # Igual a print(), o a función que no hace nada
    
    s_extra = h_extra = 0  # Cuenta las veces que suavizamos
    pi_sw   = pi_hw   = 1  # ∏Sw y ∏Hw para Naive Bayes
    
    for word in bests:        
        sw = spam_dict[word] if word in spam_dict else 0  # Ocurrencias en Spam
        hw = ham_dict[word]  if word in ham_dict  else 0  # Ocurrencias en Ham
        
        # Suavizamos al cambiar 0 por 1 y contamos las veces que lo hacemos
        if sw==0:
            s_extra += 1
            sw = 1
        if hw==0:
            h_extra += 1
            hw = 1
        
        pi_sw *= sw
        pi_hw *= hw
        
        debug('Sw:',sw, '— Hw:', hw, '—', word)
    
    s += s_extra*len(bests)  # Agregamos tantos 1 como veces hemos suavizado
    h += h_extra*len(bests)
    
    probability = pi_sw / (pi_sw + pi_hw * (s/h) ** (len(bests)-1))  # Naive Bayes
    
    debug('S:', s, '— H:', h)
    debug('∏Sw:', pi_sw, '— ∏Hw:', pi_hw)        
    debug('P(y|x1...xn):',probability, '=> Spam' if probability > 0.9 else '=> Ham', '\n')
    
    return probability

Procedimiento


In [ ]:
# Función que recibe un mbox de correos nuevos
# Devuelve una lista de 0 o 1 por cada correo nuevo si es ham o spam
def clasification(newsbox, spam_dict, s, ham_dict, h, delimiter = '\W+', n = 15, debug = False):
    
    clas = []
    
    for words in words_per_mail(newsbox, delimiter):
        
        bests = best_N_words(words, spam_dict, ham_dict, n)
        
        probability  = naive_bayes(bests, spam_dict, s, ham_dict, h, debug = debug)
        
        is_spam      = 1 if probability > 0.9 else 0
        
        clas.append(is_spam)
        
    return clas

Incorporación

Utilidad


In [ ]:
# Escribe los correos nuevos del mbox ubicado en la ruta 'new_mbox'
# en el mbox ubicado en la ruta 'mbox'
def append_new_to_mbox(new_mbox, mbox):
    with open(mbox, 'a') as writable:
        with open(new_mbox) as readable:
            writable.write(readable.read())   

            
# devuelve size actualizado y un nuevo dictionary actualizado a partir del set de palabras
# de cada correo del mbox ubicado en la ruta 'new_mbox'
def update_dict_and_size(new_mbox, dictionary, size, delimiter):
    updated_dict = dictionary.copy()
    for words in words_per_mail(new_mbox, delimiter):
        for w in words:
            updated_dict[w] = updated_dict[w] + 1 if w in updated_dict else 1
        size += 1
    return updated_dict, size

Procedimiento


In [ ]:
# Escribe los correos del mbox ubicado en la ruta 'new_mbox' en el mbox ubicado en la ruta 'mbox'
# Actualiza los dictionarios y tamaño de este mbox
# Tras su uso, no requiere volver a ejecutar training()
def incorporation(new_mbox, mbox, dictionary, size, delimiter = '\W+'):
    append_new_to_mbox(new_mbox, mbox)
    return update_dict_and_size(new_mbox, dictionary, size, delimiter)

Ejemplo de uso de los 3 procedimientos

Variables

  • Rutas de archivos y parametros opcionales de training(), clasification() e incorporation()

In [ ]:
hambox = './ham.mbox'
spambox = './spam.mbox'
newsbox = './news.mbox'
new_hambox  = './new-ham.mbox'
new_spambox = './new-spam.mbox'

# Criterio de separación alternativo
# delimiter = '[.@_#/]?[\s]+|[,;:()<>*~][\s]*'

# Número de palabras alternativo para hacer Näive-Bayes
# n = 19

Uso de training()

  • Parámetro delimiter es opcional, por defecto es '\W+', cualquier secuencia de caracteres no alfanúmericos.

In [ ]:
# Entrenamiento
spam_dict, s, ham_dict, h = training(spambox, hambox)

# Cantidad actual de correos spam y ham para el entrenamiento
print('[Tras el entrenamiento]')
print('S:', s, '— H:', h)
print('#Sw:', len(spam_dict), '— #Hw:', len(ham_dict))

Uso de clasification()

  • Parámetro delimiter es opcional, por defecto es '\W+', cualquier secuencia de caracteres no alfanúmericos.
  • Parámetro n es opcional, por defecto es 15.
  • Parámetro debug es opcional, por defecto False. Si es True imprime información sobre el cálculo de naive bayes.
  • Requiere que training() haya sido ejecutado antes.

In [ ]:
# Clasificación
res = clasification(newsbox, spam_dict, s, ham_dict, h, debug = False)

print(res)

Uso de incorporation()

  • Parámetro delimiter es opcional, por defecto es '\W+', cualquier secuencia de caracteres no alfanúmericos.
  • Escribe los nuevos correos en los archivos spambox y hambox
  • Actualiza los dictionarios y tamaños de ambos mbox

In [ ]:
# Cantidad actual de correos spam y ham para el entrenamiento
print('[Antes de incorporar]')
print('S:', s, '— H:', h)
print('#Sw:', len(spam_dict), '— #Hw:', len(ham_dict))

# Incorporación a spam.box
spam_dict, s = incorporation(new_spambox, spambox, spam_dict, s)

# Incorporación a ham.box
ham_dict, h = incorporation(new_hambox, hambox, ham_dict, h)

# Cantidad nueva de correos spam y ham para el entrenamiento
print('\n[Después de incorporar]')
print('S:', s, '— H:', h)
print('#Sw:', len(spam_dict), '— #Hw:', len(ham_dict))