PySpark Docs

Make sure you use are reading the docs for the correct version!

The version installed in the VM is Spark 1.5.1

You will need to use various transformations and actions for RDDs, as well as several classes in the MLlib package.

1) Parse data file

We will be using a partially cleaned subset of the Sentiment140 dataset created by Twitter.

1a) load file


In [ ]:
linesRDD = sc.textFile('tweets.tsv')

1b) How many instances are in the file?


In [ ]:
<FILL IN>

1c) View a few lines to see the format


In [ ]:
<FILL IN>

1d) Split the input lines into two fields: sentiment class and text

  • 1.0 => positive sentiment
  • 0.0 => negative sentiment

In [ ]:
rawTweetsRDD = linesRDD.map(lambda x: str(x).split('\t', 1)) # str() converts to ASCII
rawTweetsRDD.take(5)

1e) What is the class distribution?

Notice that each element in the RDD is a tuple of (class, text).

You will want to count instances based on the first element, the class value.


In [ ]:
rawTweetsRDD.countByKey()

2) Clean data

2a) Function for cleaning tweets

To accurately model each input tweet, we need to remove information that is not necessary, and clean the text.


In [ ]:
import string
import re

def clean_tweet(tweet):
    """ Clean tweet by removing URLs, mentions, punctation, and converting to lowercase
    Args: 
        tweet (str): a single tweet
    Returns:
        str: lowercase tweet with URLs, mentions, and punctuation removed
    """
    if len(tweet) > 0:
        # remove links
        tweet = re.sub('((www\.[\s]+)|(https?://[^\s]+))','',tweet) 
        # remove mentions (i.e. @user)
        tweet = re.sub('@[^\s]+','',tweet)
        specials = '\'"?,.!@#$%^&*\(\):;_~`-+=/'
        trans = string.maketrans(specials, u' '*len(specials))
        tweet = tweet.translate(trans)
        # reduce multiple spaces
        tweet = re.sub('[\s]+', ' ', tweet)
        # remove trailing spaces and convert to lowercase
        return tweet.strip().lower()
    return None

# grab the first tweet's text just to test the function
sampleTweet = rawTweetsRDD.take(1)[0][1]
print sampleTweet
print clean_tweet(sampleTweet)

2b) Clean all tweets

Apply the clean_tweet() function to all tweets via the RDD map() function.

Remember each instance in the RDD is a tuple of (label, tweet).


In [ ]:
# clean all tweets
cleanTweetsRDD = rawTweetsRDD.map(lambda (label,tweet): (float(label), clean_tweet(tweet)))

2c) Verify clean tweets

Verify that the cleanTweetsRDD is the same size as the rawTweetsRDD and print a sample of the clean tweets.


In [ ]:
<FILL IN>

In [ ]:
<FILL IN>

3) Tokenize data

Here, we remove stopwords to remove frequently used words, and only include words with 2 or more characters.

3a) Load stopwords

Load the stopwords.txt file into an RDD and use the collect() method to convert the RDD to a Python list.


In [ ]:
stopwordsList = sc.textFile('stopwords.txt').collect()
print stopwordsList[:5]

3b) Tokenize function

The tokenize() function defaults to removing stopwords and only including words with 2 or more characters. These parameters may be passed when calling the function to provide non-default behavior.


In [ ]:
stopwords = set(stopwordsList)

def tokenize(text, minLength=2, remove_stopwords=True):
    """ Tokenize a tweet, optionally remove stopwords and words less than a certain length
    Args: 
        tweet (str): a single tweet
    Returns:
        str: tokenized tweet
    """
    words = set([t for t in text.split(' ') if len(t) >= minLength])
    if remove_stopwords:
        return list(words - stopwords)
    else:
        return list(words)

sampleTweet = cleanTweetsRDD.take(1)[0][1]
print sampleTweet
print tokenize(sampleTweet)

3c) Tokenize all tweets

Apply the tokenize() function to all tweets in the cleanTweetsRDD.

Remember each instance in the RDD is a tuple of (label, tweet).

The resulting tokenizedRDD instances should be tuples of (label, list).


In [ ]:
tokenizedRDD = cleanTweetsRDD.map(lambda (label,tweet): (label, tokenize(tweet)))
tokenizedRDD.take(2)

4) Create vocabulary

To build a word vector-based model, we need to establish a vocabulary of most frequently used words to use as features for each.

4a) Wait! We need split the data!

We can't build the vocabulary from our test set, that would be cheating!

randomSplit() function


In [ ]:
trainRDD, testRDD = tokenizedRDD.randomSplit([0.8, 0.2])

4b) What is the size and class distribution of each dataset?


In [ ]:
print 'Training length: ', trainRDD.<FILL IN>
print 'Training class distribution: ', (trainRDD.<FILL IN>)

In [ ]:
print 'Test length: ', testRDD.<FILL IN>
print 'Test class distribution: ', (testRDD.<FILL IN>)

4c) Build vocabulary

Count the number of times each word occurs across all documents. We cache the RDD because we will be using it frequently.


In [ ]:
from operator import add
vocabulary = (trainRDD
              .flatMap(lambda (label,words): words)
              .map(lambda word: (word,1))
              .reduceByKey(add)
              .cache())
vocabulary.cache()
vocabulary.take(5)

4d) How many unique words are in the training data?


In [ ]:
vocabulary.<FILL IN>

4e) What are the top 10 most frequently used words?


In [ ]:
vocabulary.<FILL IN>

4f) Create inverted index for efficient lookup

To create a word vector for each instance, we need to assign a feature ID to each word in the vocabulary.

We specify the number of features (words) to use for the model. This will select to the top n most frequently used words.


In [ ]:
num_features = 5000
features = (vocabulary
            .sortBy(lambda (word,freq): -1*freq)
            .map(lambda (word,freq): word)
            .zipWithIndex()
            .take(num_features))

In [ ]:
features[:10]

Convert to Python dict for fast lookup


In [ ]:
featureDict = dict(features)
# convert to a string and print the first few chars, otherwise it would take a lot of screen space
str(featureDict)[:100]

5) Create word vector from text

5a) Vectorize function

Since each tweet contains a small amount of words, we should store each instance in a SparseVector.

We use a broadcast variable for communicating the feature list to the worker nodes.


In [ ]:
from pyspark.mllib.linalg import SparseVector

def vectorize(tokens, num_features):
    """ Convert a tweet into a sparse binary word vector.
    Args: 
        text (str): a single tweet
    Returns:
        SparseVector: sparse vector representation of the instance
    """
    feat = features_broadcast.value
    values = []
    for t in tokens:
        # check if word exists in the vocabulary
        if t in feat:
            values.append([feat[t], 1.0])
    return SparseVector(num_features, values)

# broadcast variable
features_broadcast = sc.broadcast(featureDict)

sampleTweet = trainRDD.take(1)[0][1]
print sampleTweet
vectorize(sampleTweet, len(featureDict))

5b) Vectorize training and testing data

We convert each instance to a LabeledPoint, which MLlib uses for storing instances with class labels


In [ ]:
from pyspark.mllib.feature import LabeledPoint

num_features = len(featureDict)
trainVectorizedRDD = trainRDD.map(lambda (label,words): LabeledPoint(label, vectorize(words, num_features)))
testVectorizedRDD = testRDD.map(lambda (label,words): LabeledPoint(label, vectorize(words, num_features)))

In [ ]:
# preview the training RDD
trainVectorizedRDD.take(5)

In [ ]:
# preview the testing RDD
testVectorizedRDD.take(5)

6) Build our model and predict!

6a) Build Logistic Regression model

For this model, we will use the Logistic Regression model with SGD implementation from MLlib.


In [ ]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

model = LogisticRegressionWithSGD.train(trainVectorizedRDD)
# this will make the model spit out raw scores (probabilities) instead of class label
model.clearThreshold()
sampleInstance = testRDD.take(1)[0]
sampleInstanceVectorized = testVectorizedRDD.take(1)[0]

# sample instance and prediction
print 'Sample instance text:', sampleInstance
print 'Sample instance:', sampleInstanceVectorized
print 'Prediction:', model.predict(sampleInstanceVectorized.features)

6b) Evalute the model on our test data

BinaryClassificationMetrics computes Area under ROC (AUC) and Area under PRC (Precision/Recall curve) from RDDs of (prediction, original_label).


In [ ]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

predictionAndLabel = testVectorizedRDD.map(lambda p : (float(model.predict(p.features)), p.label))
auc = BinaryClassificationMetrics(predictionAndLabel).areaUnderROC
print 'AUC:', auc