In [ ]:
print('Hello ex04')

3.4 build a spam classifier (a more challenging exercise)

3.4.1 Download examples of spam and ham from Apache SpamAssassin’s public datasets.

Downloaded 20021010 dataset

Unzip the datasets and familiarize yourself with the data format.


In [ ]:
import os
import glob

In [ ]:
HAM_DIR  = os.path.join('datasets', 'easy_ham')
SPAM_DIR = os.path.join('datasets', 'spam')
ham_files  = [name for name in sorted(os.listdir(HAM_DIR)) if len(name) > 20]
spam_files = [name for name in sorted(os.listdir(SPAM_DIR)) if len(name) > 20]

In [ ]:
len(ham_files), ham_files[0], ham_files[-1]

In [ ]:
len(spam_files), spam_files[0], spam_files[-1]

Use email module


In [ ]:
import email
import email.policy

In [ ]:
SPM_PATH = './datasets'

In [ ]:
def load_email(is_spam, filename, spam_path=SPM_PATH):
    directory = 'spam' if is_spam else 'easy_ham'
    with open(os.path.join(spam_path, directory, filename), 'rb') as f:
        return email.parser.BytesParser(policy=email.policy.default).parse(f)

In [ ]:
ham_email = [load_email(False, name) for name in ham_files]
spam_email = [load_email(True, name) for name in spam_files]

In [ ]:
# print(ham_email[13].get_content().strip())
print(ham_email[13].get_payload()[1].get_content_type())

In [ ]:
print(spam_email[6].get_content().strip())

In [ ]:
def get_email_structure(email):
    if isinstance(email, str):
        return email
    payload = email.get_payload()
    if isinstance(payload, list):
        return f'multipart({", ".join([get_email_structure(sub_email) for sub_email in payload])})'
    else:
        return email.get_content_type()

In [ ]:
get_email_structure(ham_email[2])
ham_structures = list(map(get_email_structure, ham_email))
ham_structures.index('multipart(text/plain, application/pgp-signature)')

In [ ]:
import pandas as pd

In [ ]:
ham_df = pd.DataFrame({'type': ham_structures})
ham_df['type'].value_counts()

In [ ]:
spam_structures = list(map(get_email_structure, spam_email))
spam_df = pd.DataFrame({'type': spam_structures})
spam_df['type'].value_counts()

In [ ]:
for header, value in spam_email[0].items():
    print(f'{header} : {value}')

In [ ]:
spam_email[0]['Subject']

Train test split


In [ ]:
import numpy as np
from sklearn.model_selection import train_test_split

In [ ]:
X = np.array(ham_email + spam_email)

In [ ]:
y = np.concatenate([np.zeros(len(ham_email)), np.ones(len(spam_email))])

In [ ]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

Preprocessing html to plain text


In [ ]:
import re
from html import unescape

In [ ]:
def html_to_plain_text(html):
    text = re.sub(r'<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
    text = re.sub(r'<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)
    text = re.sub(r'<.*?>', '', text, flags=re.M | re.S)
    text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
    return unescape(text)

Find the spam email with text/html contents


In [ ]:
html_spam_emails = [email for email in X_train[y_train == 1] if get_email_structure(email) == 'text/html']
sample_html_spam = html_spam_emails[7]
sample_html_spam.get_content().strip()[:1000]

In [ ]:
print(html_to_plain_text(sample_html_spam.get_content())[:1000])

Return email's content as plain text


In [ ]:
def email_to_text(email):
    html = None
    for part in email.walk():
        ctype = part.get_content_type()
        if not ctype in ("text/plain", "text/html"):
            continue
        try:
            content = part.get_content()
        except: # in case of encoding issues
            content = str(part.get_payload())
        if ctype == "text/plain":
            return content
        else:
            html = content
    if html:
        return html_to_plain_text(html)

In [ ]:
def email_to_text_2(email):
    ret = []
    for part in email.walk():
        ctype = part.get_content_type()
        try:
            content = part.get_content()
        except: # in case of encoding issues
            content = str(part.get_payload())
        ret.append((ctype, type(content), content[:200]))
    return ret

In [ ]:
def get_num_of_parts(email):
    return len(list(email.walk()))

def count_plain_html_part(email):
    return sum([part.get_content_type() in ("text/plain", "text/html") for part in email.walk()])

In [ ]:
email_to_text_2(spam_email[466])

In [ ]:
[(index, get_num_of_parts(email)) for index, email in enumerate(spam_email) if get_num_of_parts(email) > 1][:5]

In [ ]:
[(index, count_plain_html_part(email)) for index, email in enumerate(X_train) if count_plain_html_part(email) == 0]

In [ ]:
index = 1047
print(email_to_text(X_train[index]), '...', y_train[index])

We found an email, 1047 and it doesn't have any context. It's spam//00467.5b733c506b7165424a0d4a298e67970f, as you can see the in the following, it does have content.


In [ ]:
y_train[1047]

In [ ]:
get_email_structure(X_train[1047])

In [ ]:
for part in X_train[1047].walk():
    print(part.get_content_type())
    print(html_to_plain_text(str(part.get_payload()))[:200])

In [ ]:
print(email_to_text(sample_html_spam)[:1000], '...')

Throw in stemming


In [ ]:
import nltk

In [ ]:
stemmer = nltk.PorterStemmer()

In [ ]:
for word in ("Computations", "Computation", "Computing", "Computed", "Compute", "Compulsive"):
    print(f'{word} => {stemmer.stem(word)}')

In [ ]:
import urlextract

In [ ]:
url_extractor = urlextract.URLExtract()

In [ ]:
print(url_extractor.find_urls("Will it detect github.com and https://youtu.be/7Pq-S557XQU?t=3m32s"))

Transformer to convert emails to word counter


In [ ]:
from sklearn.base import BaseEstimator, TransformerMixin

In [ ]:
from collections import Counter

In [ ]:
class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, strip_headers=True, lower_case=True, remove_punctuation=True,
                 replace_urls=True, replace_numbers=True, stemming=True):
        self.strip_headers = strip_headers
        self.lower_case = lower_case
        self.remove_punctuation = remove_punctuation
        self.replace_urls = replace_urls
        self.replace_numbers = replace_numbers
        self.stemming = stemming
    def fit(self, X, y=None):
        return self
    def transform(self, X, y=None):
        X_transformed = []
        for email in X:
            text = email_to_text(email) or ''
            if self.lower_case:
                text = text.lower()
            if self.replace_urls and url_extractor is not None:
                urls = sorted(url_extractor.find_urls(text, only_unique=True), key=lambda url: len(url), reverse=True)
                for url in urls:
                    text = text.replace(url, ' URL ')
            if self.replace_numbers:
                text = re.sub(r'\d+(?:\.\d*(?:[eE]\d+)*)?', 'NUMBER', text)
            if self.remove_punctuation:
                text = re.sub(r'\W+', ' ', text, flags=re.M)
            word_counts = Counter(text.split())
            if self.stemming and stemmer is not None:
                stemmed_word_counts = Counter()
                for word, count in word_counts.items():
                    stemmed_word = stemmer.stem(word)
                    stemmed_word_counts[stemmed_word] += count
                word_counts = stemmed_word_counts
            X_transformed.append(word_counts)
        return np.array(X_transformed)

In [ ]:
X_few = X_train[:3]
X_few_wordcounts = EmailToWordCounterTransformer().fit_transform(X_few)
X_few_wordcounts

In [ ]:
from scipy.sparse import csr_matrix

In [ ]:
class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, vocabulary_size=1000):
        self.vocabulary_size = vocabulary_size
    def fit(self, X, y=None):
        total_count = Counter()
        for word_count in X:
            for word, count in word_count.items():
                total_count[word] += min(count, 10)
        most_common = total_count.most_common()[:self.vocabulary_size]
        self.most_common_ = most_common
        self.vocabulary_ = {word: index+1 for index, (word, count) in enumerate(most_common)}
        return self
    def transform(self, X, y=None):
        rows, cols, data = [], [], []
        for row, word_count in enumerate(X):
            for word, count in word_count.items():
                rows.append(row)
                # Here if a word is not in 'vocabulary_', then the column is 0.
                # Seems like if multiple data has the same row and colmun, the data is the summation
                # See the code in the next box
                cols.append(self.vocabulary_.get(word, 0))
                data.append(count)
        return csr_matrix((data, (rows, cols)), shape=(len(X), self.vocabulary_size+1))

In [ ]:
rows = [0, 0, 0]
cols = [0, 0, 1]
data = [3, 2, 1]
m = csr_matrix((data, (rows, cols)), shape=(1, 2))
m.toarray()

In [ ]:
vocab_transformer = WordCounterToVectorTransformer(vocabulary_size=10)
X_few_vectors = vocab_transformer.fit_transform(X_few_wordcounts)
X_few_vectors

In [ ]:
print(vocab_transformer.most_common_)

In [ ]:
X_few_vectors.toarray()

In [ ]:
vocab_transformer.vocabulary_

In [ ]:
X_few_wordcounts[1].most_common()[:10]

Create a pipeline


In [ ]:
from sklearn.pipeline import Pipeline

In [ ]:
preprocess_pipeline = Pipeline([
    ('email_to_wordcount', EmailToWordCounterTransformer()),
    ('wordcount_to_vector', WordCounterToVectorTransformer()),
])

In [ ]:
X_train_transformed = preprocess_pipeline.fit_transform(X_train)

In [ ]:
X_train_transformed.toarray().shape

Apply the logistic regression


In [ ]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

In [ ]:
log_clf = LogisticRegression(solver='lbfgs', random_state=42)

In [ ]:
score = cross_val_score(log_clf, X_train_transformed, y_train, cv=3, verbose=3)

In [ ]:
score.mean()

Precision and Recall score for test dataset


In [ ]:
from sklearn.metrics import precision_score, recall_score, accuracy_score

In [ ]:
X_test_transformed = preprocess_pipeline.fit_transform(X_test)

In [ ]:
log_clf = LogisticRegression(solver='lbfgs', random_state=42)

In [ ]:
log_clf.fit(X_train_transformed, y_train)

In [ ]:
y_pred = log_clf.predict(X_test_transformed)

In [ ]:
y_test.shape

In [ ]:
accuracy_score(y_pred, y_test)

In [ ]:
precision_score(y_pred, y_test)
recall_score(y_pred, y_test)

In [ ]:
from sklearn.metrics import precision_score, recall_score

X_test_transformed = preprocess_pipeline.transform(X_test)

log_clf = LogisticRegression(solver="lbfgs", random_state=42, max_iter=1000)
log_clf.fit(X_train_transformed, y_train)

y_pred = log_clf.predict(X_test_transformed)

print("Precision: {:.2f}%".format(100 * precision_score(y_test, y_pred)))
print("Recall: {:.2f}%".format(100 * recall_score(y_test, y_pred)))

In [ ]:
y_train_pred = log_clf.predict(X_train_transformed)
accuracy_score(y_train, y_train_pred)

In [ ]:
y_test_pred = log_clf.predict(X_test_transformed)
accuracy_score(y_test, y_test_pred)