In [1]:
# Initialization Spark in Python
from pyspark import SparkContext
sc = SparkContext("local", "Work with MLlib")
In [16]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
In [4]:
spam = sc.textFile("spam.txt")
normal = sc.textFile("normal.txt")
In [18]:
# Create a HashingTF instance to map email text to vectors of 100 features.
tf = HashingTF(numFeatures = 100)
In [22]:
# Each email is split into words, and each word is mapped to one feature.
spamFeatures = spam.map(lambda email: tf.transform(email.split(" ")))
hamFeatures = normal.map(lambda email: tf.transform(email.split(" ")))
In [23]:
# Create LabeledPoint datasets for positive (spam) and negative (ham) examples.
positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features))
negativeExamples = hamFeatures.map(lambda features: LabeledPoint(0, features))
training_data = positiveExamples.union(negativeExamples)
training_data.cache() # Cache data since Logistic Regression is an iterative algorithm.
Out[23]:
In [25]:
model = LogisticRegressionWithLBFGS.train(training_data)
In [26]:
# Test on a positive example (spam) and a negative one (ham).
# First apply the same HashingTF feature transformation used on the training data.
posTestExample = tf.transform("O M G GET cheap stuff by sending money to ...".split(" "))
negTestExample = tf.transform("Hi Dad, I started studying Spark the other ...".split(" "))
In [27]:
# Now use the learned model to predict spam/ham for new emails.
print "Prediction for positive test example: %g" % model.predict(posTestExample)
print "Prediction for negative test example: %g" % model.predict(negTestExample)
In [28]:
sc.stop()