We will be using a partially cleaned subset of the Sentiment140 dataset created by Twitter.
In [ ]:
linesRDD = sc.textFile('tweets.tsv')
In [ ]:
<FILL IN>
In [ ]:
<FILL IN>
In [ ]:
rawTweetsRDD = linesRDD.map(lambda x: str(x).split('\t', 1)) # str() converts to ASCII
rawTweetsRDD.take(5)
In [ ]:
rawTweetsRDD.countByKey()
In [ ]:
import string
import re
def clean_tweet(tweet):
""" Clean tweet by removing URLs, mentions, punctation, and converting to lowercase
Args:
tweet (str): a single tweet
Returns:
str: lowercase tweet with URLs, mentions, and punctuation removed
"""
if len(tweet) > 0:
# remove links
tweet = re.sub('((www\.[\s]+)|(https?://[^\s]+))','',tweet)
# remove mentions (i.e. @user)
tweet = re.sub('@[^\s]+','',tweet)
specials = '\'"?,.!@#$%^&*\(\):;_~`-+=/'
trans = string.maketrans(specials, u' '*len(specials))
tweet = tweet.translate(trans)
# reduce multiple spaces
tweet = re.sub('[\s]+', ' ', tweet)
# remove trailing spaces and convert to lowercase
return tweet.strip().lower()
return None
# grab the first tweet's text just to test the function
sampleTweet = rawTweetsRDD.take(1)[0][1]
print sampleTweet
print clean_tweet(sampleTweet)
In [ ]:
# clean all tweets
cleanTweetsRDD = rawTweetsRDD.map(lambda (label,tweet): (float(label), clean_tweet(tweet)))
In [ ]:
<FILL IN>
In [ ]:
<FILL IN>
In [ ]:
stopwordsList = sc.textFile('stopwords.txt').collect()
print stopwordsList[:5]
In [ ]:
stopwords = set(stopwordsList)
def tokenize(text, minLength=2, remove_stopwords=True):
""" Tokenize a tweet, optionally remove stopwords and words less than a certain length
Args:
tweet (str): a single tweet
Returns:
str: tokenized tweet
"""
words = set([t for t in text.split(' ') if len(t) >= minLength])
if remove_stopwords:
return list(words - stopwords)
else:
return list(words)
sampleTweet = cleanTweetsRDD.take(1)[0][1]
print sampleTweet
print tokenize(sampleTweet)
In [ ]:
tokenizedRDD = cleanTweetsRDD.map(lambda (label,tweet): (label, tokenize(tweet)))
tokenizedRDD.take(2)
We can't build the vocabulary from our test set, that would be cheating!
randomSplit() function
In [ ]:
trainRDD, testRDD = tokenizedRDD.randomSplit([0.8, 0.2])
In [ ]:
print 'Training length: ', trainRDD.<FILL IN>
print 'Training class distribution: ', (trainRDD.<FILL IN>)
In [ ]:
print 'Test length: ', testRDD.<FILL IN>
print 'Test class distribution: ', (testRDD.<FILL IN>)
In [ ]:
from operator import add
vocabulary = (trainRDD
.flatMap(lambda (label,words): words)
.map(lambda word: (word,1))
.reduceByKey(add)
.cache())
vocabulary.cache()
vocabulary.take(5)
In [ ]:
vocabulary.<FILL IN>
In [ ]:
vocabulary.<FILL IN>
In [ ]:
num_features = 5000
features = (vocabulary
.sortBy(lambda (word,freq): -1*freq)
.map(lambda (word,freq): word)
.zipWithIndex()
.take(num_features))
In [ ]:
features[:10]
Convert to Python dict for fast lookup
In [ ]:
featureDict = dict(features)
# convert to a string and print the first few chars, otherwise it would take a lot of screen space
str(featureDict)[:100]
Since each tweet contains a small amount of words, we should store each instance in a SparseVector.
We use a broadcast variable for communicating the feature list to the worker nodes.
In [ ]:
from pyspark.mllib.linalg import SparseVector
def vectorize(tokens, num_features):
""" Convert a tweet into a sparse binary word vector.
Args:
text (str): a single tweet
Returns:
SparseVector: sparse vector representation of the instance
"""
feat = features_broadcast.value
values = []
for t in tokens:
# check if word exists in the vocabulary
if t in feat:
values.append([feat[t], 1.0])
return SparseVector(num_features, values)
# broadcast variable
features_broadcast = sc.broadcast(featureDict)
sampleTweet = trainRDD.take(1)[0][1]
print sampleTweet
vectorize(sampleTweet, len(featureDict))
We convert each instance to a LabeledPoint, which MLlib uses for storing instances with class labels
In [ ]:
from pyspark.mllib.feature import LabeledPoint
num_features = len(featureDict)
trainVectorizedRDD = trainRDD.map(lambda (label,words): LabeledPoint(label, vectorize(words, num_features)))
testVectorizedRDD = testRDD.map(lambda (label,words): LabeledPoint(label, vectorize(words, num_features)))
In [ ]:
# preview the training RDD
trainVectorizedRDD.take(5)
In [ ]:
# preview the testing RDD
testVectorizedRDD.take(5)
For this model, we will use the Logistic Regression model with SGD implementation from MLlib.
In [ ]:
from pyspark.mllib.classification import LogisticRegressionWithSGD
model = LogisticRegressionWithSGD.train(trainVectorizedRDD)
# this will make the model spit out raw scores (probabilities) instead of class label
model.clearThreshold()
sampleInstance = testRDD.take(1)[0]
sampleInstanceVectorized = testVectorizedRDD.take(1)[0]
# sample instance and prediction
print 'Sample instance text:', sampleInstance
print 'Sample instance:', sampleInstanceVectorized
print 'Prediction:', model.predict(sampleInstanceVectorized.features)
BinaryClassificationMetrics computes Area under ROC (AUC) and Area under PRC (Precision/Recall curve) from RDDs of (prediction, original_label).
In [ ]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
predictionAndLabel = testVectorizedRDD.map(lambda p : (float(model.predict(p.features)), p.label))
auc = BinaryClassificationMetrics(predictionAndLabel).areaUnderROC
print 'AUC:', auc