In [1]:
#Purpose: import from other notebook...
#Ref: http://jupyter-notebook.readthedocs.io/en/latest/examples/Notebook/Importing%20Notebooks.html
import io, os, sys, types
from IPython import get_ipython
from nbformat import read
from IPython.core.interactiveshell import InteractiveShell
#--------------------------------------------------------------------------------
def find_notebook(fullname, path=None):
"""find a notebook, given its fully qualified name and an optional path
This turns "foo.bar" into "foo/bar.ipynb"
and tries turning "Foo_Bar" into "Foo Bar" if Foo_Bar
does not exist.
"""
name = fullname.rsplit('.', 1)[-1]
if not path:
path = ['']
for d in path:
nb_path = os.path.join(d, name + ".ipynb")
if os.path.isfile(nb_path):
return nb_path
# let import Notebook_Name find "Notebook Name.ipynb"
nb_path = nb_path.replace("_", " ")
if os.path.isfile(nb_path):
return nb_path
#--------------------------------------------------------------------------------
class NotebookLoader(object):
"""Module Loader for Jupyter Notebooks"""
def __init__(self, path=None):
self.shell = InteractiveShell.instance()
self.path = path
def load_module(self, fullname):
"""import a notebook as a module"""
path = find_notebook(fullname, self.path)
print ("importing Jupyter notebook from %s" % path)
# load the notebook object
with io.open(path, 'r', encoding='utf-8') as f:
nb = read(f, 4)
# create the module and add it to sys.modules
# if name in sys.modules:
# return sys.modules[name]
mod = types.ModuleType(fullname)
mod.__file__ = path
mod.__loader__ = self
mod.__dict__['get_ipython'] = get_ipython
sys.modules[fullname] = mod
# extra work to ensure that magics that would affect the user_ns
# actually affect the notebook module's ns
save_user_ns = self.shell.user_ns
self.shell.user_ns = mod.__dict__
try:
for cell in nb.cells:
if cell.cell_type == 'code':
# transform the input to executable Python
code = self.shell.input_transformer_manager.transform_cell(cell.source)
# run the code in themodule
exec(code, mod.__dict__)
finally:
self.shell.user_ns = save_user_ns
return mod
#--------------------------------------------------------------------------------
class NotebookFinder(object):
"""Module finder that locates Jupyter Notebooks"""
def __init__(self):
self.loaders = {}
def find_module(self, fullname, path=None):
nb_path = find_notebook(fullname, path)
if not nb_path:
return
key = path
if path:
# lists aren't hashable
key = os.path.sep.join(path)
if key not in self.loaders:
self.loaders[key] = NotebookLoader(path)
return self.loaders[key]
#--------------------------------------------------------------------------------
sys.meta_path.append(NotebookFinder())
In [2]:
from collections import Counter, defaultdict
from cm_utils import split_data
import math, random, re, glob
#--------------------------------------------------------------------------------
def tokenize(message):
message = message.lower() # convert to lowercase
all_words = re.findall("[a-z0-9']+", message) # extract the words
return set(all_words) # remove duplicates
#--------------------------------------------------------------------------------
def count_words(training_set):
"""training set consists of pairs (message, is_spam)"""
counts = defaultdict(lambda: [0, 0])
for message, is_spam in training_set:
for word in tokenize(message):
counts[word][0 if is_spam else 1] += 1
return counts
#--------------------------------------------------------------------------------
def word_probabilities(counts, total_spams, total_non_spams, k=0.5):
"""turn the word_counts into a list of triplets
w, p(w | spam) and p(w | ~spam)"""
return [(w,
(spam + k) / (total_spams + 2 * k),
(non_spam + k) / (total_non_spams + 2 * k))
for w, (spam, non_spam) in iter(counts.items())] #counts.iteritems()] #python2x
#--------------------------------------------------------------------------------
def spam_probability(word_probs, message):
message_words = tokenize(message)
log_prob_if_spam = log_prob_if_not_spam = 0.0
for word, prob_if_spam, prob_if_not_spam in word_probs:
# for each word in the message,
# add the log probability of seeing it
if word in message_words:
log_prob_if_spam += math.log(prob_if_spam)
log_prob_if_not_spam += math.log(prob_if_not_spam)
# for each word that's not in the message
# add the log probability of _not_ seeing it
else:
log_prob_if_spam += math.log(1.0 - prob_if_spam)
log_prob_if_not_spam += math.log(1.0 - prob_if_not_spam)
prob_if_spam = math.exp(log_prob_if_spam)
prob_if_not_spam = math.exp(log_prob_if_not_spam)
return prob_if_spam / (prob_if_spam + prob_if_not_spam)
#--------------------------------------------------------------------------------
class NaiveBayesClassifier:
def __init__(self, k=0.5):
self.k = k
self.word_probs = []
#----------------------------------------------------------------------------
def train(self, training_set):
# count spam and non-spam messages
num_spams = len([is_spam
for message, is_spam in training_set
if is_spam])
num_non_spams = len(training_set) - num_spams
# run training data through our "pipeline"
word_counts = count_words(training_set)
self.word_probs = word_probabilities(word_counts,
num_spams,
num_non_spams,
self.k)
#----------------------------------------------------------------------------
def classify(self, message):
return spam_probability(self.word_probs, message)
#--------------------------------------------------------------------------------
def get_subject_data(path):
data = []
# regex for stripping out the leading "Subject:" and any spaces after it
subject_regex = re.compile(r"^Subject:\s+")
# glob.glob returns every filename that matches the wildcarded path
for fn in glob.glob(path):
is_spam = "ham" not in fn
with open(fn,'r') as file:
for line in file:
if line.startswith("Subject:"):
subject = subject_regex.sub("", line).strip()
data.append((subject, is_spam))
return data
#--------------------------------------------------------------------------------
def p_spam_given_word(word_prob):
word, prob_if_spam, prob_if_not_spam = word_prob
return prob_if_spam / (prob_if_spam + prob_if_not_spam)
#--------------------------------------------------------------------------------
def train_and_test_model(path):
data = get_subject_data(path)
random.seed(0) # just so you get the same answers as me
train_data, test_data = split_data(data, 0.75)
classifier = NaiveBayesClassifier()
classifier.train(train_data)
classified = [(subject, is_spam, classifier.classify(subject))
for subject, is_spam in test_data]
counts = Counter((is_spam, spam_probability > 0.5) # (actual, predicted)
for _, is_spam, spam_probability in classified)
print(counts)
classified.sort(key=lambda row: row[2])
#spammiest_hams = filter(lambda row: not row[1], classified)[-5:] #python2x
spammiest_hams = list(filter(lambda row: not row[1], classified))[-5:] #python3x, we should convert a iterable filter object to a list
hammiest_spams = list(filter(lambda row: row[1], classified))[:5]
print("spammiest_hams", spammiest_hams)
print("hammiest_spams", hammiest_spams)
words = sorted(classifier.word_probs, key=p_spam_given_word)
spammiest_words = words[-5:]
hammiest_words = words[:5]
print("spammiest_words", spammiest_words)
print("hammiest_words", hammiest_words)
In [3]:
if __name__ == "__main__":
train_and_test_model(r"c:\spam\*\*")