In [ ]:
print('Hello ex04')
In [ ]:
import os
import glob
In [ ]:
HAM_DIR = os.path.join('datasets', 'easy_ham')
SPAM_DIR = os.path.join('datasets', 'spam')
ham_files = [name for name in sorted(os.listdir(HAM_DIR)) if len(name) > 20]
spam_files = [name for name in sorted(os.listdir(SPAM_DIR)) if len(name) > 20]
In [ ]:
len(ham_files), ham_files[0], ham_files[-1]
In [ ]:
len(spam_files), spam_files[0], spam_files[-1]
Use email module
In [ ]:
import email
import email.policy
In [ ]:
SPM_PATH = './datasets'
In [ ]:
def load_email(is_spam, filename, spam_path=SPM_PATH):
directory = 'spam' if is_spam else 'easy_ham'
with open(os.path.join(spam_path, directory, filename), 'rb') as f:
return email.parser.BytesParser(policy=email.policy.default).parse(f)
In [ ]:
ham_email = [load_email(False, name) for name in ham_files]
spam_email = [load_email(True, name) for name in spam_files]
In [ ]:
# print(ham_email[13].get_content().strip())
print(ham_email[13].get_payload()[1].get_content_type())
In [ ]:
print(spam_email[6].get_content().strip())
In [ ]:
def get_email_structure(email):
if isinstance(email, str):
return email
payload = email.get_payload()
if isinstance(payload, list):
return f'multipart({", ".join([get_email_structure(sub_email) for sub_email in payload])})'
else:
return email.get_content_type()
In [ ]:
get_email_structure(ham_email[2])
ham_structures = list(map(get_email_structure, ham_email))
ham_structures.index('multipart(text/plain, application/pgp-signature)')
In [ ]:
import pandas as pd
In [ ]:
ham_df = pd.DataFrame({'type': ham_structures})
ham_df['type'].value_counts()
In [ ]:
spam_structures = list(map(get_email_structure, spam_email))
spam_df = pd.DataFrame({'type': spam_structures})
spam_df['type'].value_counts()
In [ ]:
for header, value in spam_email[0].items():
print(f'{header} : {value}')
In [ ]:
spam_email[0]['Subject']
In [ ]:
import numpy as np
from sklearn.model_selection import train_test_split
In [ ]:
X = np.array(ham_email + spam_email)
In [ ]:
y = np.concatenate([np.zeros(len(ham_email)), np.ones(len(spam_email))])
In [ ]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
In [ ]:
import re
from html import unescape
In [ ]:
def html_to_plain_text(html):
text = re.sub(r'<head.*?>.*?</head>', '', html, flags=re.M | re.S | re.I)
text = re.sub(r'<a\s.*?>', ' HYPERLINK ', text, flags=re.M | re.S | re.I)
text = re.sub(r'<.*?>', '', text, flags=re.M | re.S)
text = re.sub(r'(\s*\n)+', '\n', text, flags=re.M | re.S)
return unescape(text)
Find the spam email with text/html contents
In [ ]:
html_spam_emails = [email for email in X_train[y_train == 1] if get_email_structure(email) == 'text/html']
sample_html_spam = html_spam_emails[7]
sample_html_spam.get_content().strip()[:1000]
In [ ]:
print(html_to_plain_text(sample_html_spam.get_content())[:1000])
Return email's content as plain text
In [ ]:
def email_to_text(email):
html = None
for part in email.walk():
ctype = part.get_content_type()
if not ctype in ("text/plain", "text/html"):
continue
try:
content = part.get_content()
except: # in case of encoding issues
content = str(part.get_payload())
if ctype == "text/plain":
return content
else:
html = content
if html:
return html_to_plain_text(html)
In [ ]:
def email_to_text_2(email):
ret = []
for part in email.walk():
ctype = part.get_content_type()
try:
content = part.get_content()
except: # in case of encoding issues
content = str(part.get_payload())
ret.append((ctype, type(content), content[:200]))
return ret
In [ ]:
def get_num_of_parts(email):
return len(list(email.walk()))
def count_plain_html_part(email):
return sum([part.get_content_type() in ("text/plain", "text/html") for part in email.walk()])
In [ ]:
email_to_text_2(spam_email[466])
In [ ]:
[(index, get_num_of_parts(email)) for index, email in enumerate(spam_email) if get_num_of_parts(email) > 1][:5]
In [ ]:
[(index, count_plain_html_part(email)) for index, email in enumerate(X_train) if count_plain_html_part(email) == 0]
In [ ]:
index = 1047
print(email_to_text(X_train[index]), '...', y_train[index])
We found an email, 1047 and it doesn't have any context. It's spam//00467.5b733c506b7165424a0d4a298e67970f, as you can see the in the following, it does have content.
In [ ]:
y_train[1047]
In [ ]:
get_email_structure(X_train[1047])
In [ ]:
for part in X_train[1047].walk():
print(part.get_content_type())
print(html_to_plain_text(str(part.get_payload()))[:200])
In [ ]:
print(email_to_text(sample_html_spam)[:1000], '...')
In [ ]:
import nltk
In [ ]:
stemmer = nltk.PorterStemmer()
In [ ]:
for word in ("Computations", "Computation", "Computing", "Computed", "Compute", "Compulsive"):
print(f'{word} => {stemmer.stem(word)}')
In [ ]:
import urlextract
In [ ]:
url_extractor = urlextract.URLExtract()
In [ ]:
print(url_extractor.find_urls("Will it detect github.com and https://youtu.be/7Pq-S557XQU?t=3m32s"))
In [ ]:
from sklearn.base import BaseEstimator, TransformerMixin
In [ ]:
from collections import Counter
In [ ]:
class EmailToWordCounterTransformer(BaseEstimator, TransformerMixin):
def __init__(self, strip_headers=True, lower_case=True, remove_punctuation=True,
replace_urls=True, replace_numbers=True, stemming=True):
self.strip_headers = strip_headers
self.lower_case = lower_case
self.remove_punctuation = remove_punctuation
self.replace_urls = replace_urls
self.replace_numbers = replace_numbers
self.stemming = stemming
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
X_transformed = []
for email in X:
text = email_to_text(email) or ''
if self.lower_case:
text = text.lower()
if self.replace_urls and url_extractor is not None:
urls = sorted(url_extractor.find_urls(text, only_unique=True), key=lambda url: len(url), reverse=True)
for url in urls:
text = text.replace(url, ' URL ')
if self.replace_numbers:
text = re.sub(r'\d+(?:\.\d*(?:[eE]\d+)*)?', 'NUMBER', text)
if self.remove_punctuation:
text = re.sub(r'\W+', ' ', text, flags=re.M)
word_counts = Counter(text.split())
if self.stemming and stemmer is not None:
stemmed_word_counts = Counter()
for word, count in word_counts.items():
stemmed_word = stemmer.stem(word)
stemmed_word_counts[stemmed_word] += count
word_counts = stemmed_word_counts
X_transformed.append(word_counts)
return np.array(X_transformed)
In [ ]:
X_few = X_train[:3]
X_few_wordcounts = EmailToWordCounterTransformer().fit_transform(X_few)
X_few_wordcounts
In [ ]:
from scipy.sparse import csr_matrix
In [ ]:
class WordCounterToVectorTransformer(BaseEstimator, TransformerMixin):
def __init__(self, vocabulary_size=1000):
self.vocabulary_size = vocabulary_size
def fit(self, X, y=None):
total_count = Counter()
for word_count in X:
for word, count in word_count.items():
total_count[word] += min(count, 10)
most_common = total_count.most_common()[:self.vocabulary_size]
self.most_common_ = most_common
self.vocabulary_ = {word: index+1 for index, (word, count) in enumerate(most_common)}
return self
def transform(self, X, y=None):
rows, cols, data = [], [], []
for row, word_count in enumerate(X):
for word, count in word_count.items():
rows.append(row)
# Here if a word is not in 'vocabulary_', then the column is 0.
# Seems like if multiple data has the same row and colmun, the data is the summation
# See the code in the next box
cols.append(self.vocabulary_.get(word, 0))
data.append(count)
return csr_matrix((data, (rows, cols)), shape=(len(X), self.vocabulary_size+1))
In [ ]:
rows = [0, 0, 0]
cols = [0, 0, 1]
data = [3, 2, 1]
m = csr_matrix((data, (rows, cols)), shape=(1, 2))
m.toarray()
In [ ]:
vocab_transformer = WordCounterToVectorTransformer(vocabulary_size=10)
X_few_vectors = vocab_transformer.fit_transform(X_few_wordcounts)
X_few_vectors
In [ ]:
print(vocab_transformer.most_common_)
In [ ]:
X_few_vectors.toarray()
In [ ]:
vocab_transformer.vocabulary_
In [ ]:
X_few_wordcounts[1].most_common()[:10]
In [ ]:
from sklearn.pipeline import Pipeline
In [ ]:
preprocess_pipeline = Pipeline([
('email_to_wordcount', EmailToWordCounterTransformer()),
('wordcount_to_vector', WordCounterToVectorTransformer()),
])
In [ ]:
X_train_transformed = preprocess_pipeline.fit_transform(X_train)
In [ ]:
X_train_transformed.toarray().shape
In [ ]:
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
In [ ]:
log_clf = LogisticRegression(solver='lbfgs', random_state=42)
In [ ]:
score = cross_val_score(log_clf, X_train_transformed, y_train, cv=3, verbose=3)
In [ ]:
score.mean()
In [ ]:
from sklearn.metrics import precision_score, recall_score, accuracy_score
In [ ]:
X_test_transformed = preprocess_pipeline.fit_transform(X_test)
In [ ]:
log_clf = LogisticRegression(solver='lbfgs', random_state=42)
In [ ]:
log_clf.fit(X_train_transformed, y_train)
In [ ]:
y_pred = log_clf.predict(X_test_transformed)
In [ ]:
y_test.shape
In [ ]:
accuracy_score(y_pred, y_test)
In [ ]:
precision_score(y_pred, y_test)
recall_score(y_pred, y_test)
In [ ]:
from sklearn.metrics import precision_score, recall_score
X_test_transformed = preprocess_pipeline.transform(X_test)
log_clf = LogisticRegression(solver="lbfgs", random_state=42, max_iter=1000)
log_clf.fit(X_train_transformed, y_train)
y_pred = log_clf.predict(X_test_transformed)
print("Precision: {:.2f}%".format(100 * precision_score(y_test, y_pred)))
print("Recall: {:.2f}%".format(100 * recall_score(y_test, y_pred)))
In [ ]:
y_train_pred = log_clf.predict(X_train_transformed)
accuracy_score(y_train, y_train_pred)
In [ ]:
y_test_pred = log_clf.predict(X_test_transformed)
accuracy_score(y_test, y_test_pred)