In previous versions of Spark, most Machine Learning funcionality was provided through RDD (Resilient Distributed Datasets). However, to improve performance and communicability of results, Spark developers ported the ML functionality to work almost exclusively with DataFrames. Future releases of Spark will not update the support of ML with RDDs.
In this modern Spark ML approach, there are Estimators and Transformers. Estimators have some parameters that need to be fit into the data. After fitting, Estimators return Transformers. Tranformers can be applied to dataframes, taking one (or several) columns as input and creating (or several) columns as output.
A Pipeline combines several Tranformers with a final Estimator. The Pipeline, therefore, can be fit to the data because the final step of the process (the Estimator) is fit to the data. The result of the fitting is a pipelined Transformer that takes an input dataframe through all the stages of the Pipeline.
There is a third type of functionality that allows to select features.
For example, for analyzing text, a typical pipelined estimator is as follows:
After fitting, the Pipeline becomes a transformer:
Importantly, transformers can be saved and exchanged with other data scientists, improving reproducibility.
In [1]:
import findspark
In [2]:
findspark.init()
In [3]:
import pyspark
import numpy as np
In [4]:
conf = pyspark.SparkConf().\
setAppName('sentiment-analysis').\
setMaster('local[*]')
In [5]:
from pyspark.sql import SQLContext, HiveContext
sc = pyspark.SparkContext(conf=conf)
sqlContext = HiveContext(sc)
In [6]:
# dataframe functions
from pyspark.sql import functions as fn
A DataFrame
is a relatively new addition to Spark that stores a distributed dataset of structured columns. It is very similar to an R dataframe or a RDBS table. All columns are of the same type. A DataFrame
can be constructed out of a variety of sources, such as a database, CSV files, JSON files, or a Parquet file (columnar storage). The preferred method for storing dataframes is Parquet due to its speed and compression ratio.
We can create a dataframe from a RDD using the sqlContext
.
In [7]:
# Create a RDDs
documents_rdd = sc.parallelize([
[1, 'cats are cute', 0],
[2, 'dogs are playfull', 0],
[3, 'lions are big', 1],
[4, 'cars are fast', 1]])
users_rdd = sc.parallelize([
[0, 'Alice', 20],
[1, 'Bob', 23],
[2, 'Charles', 32]])
From the previous RDDs, we can call the toDF
method and specify the name of columns:
In [8]:
documents_df = documents_rdd.toDF(['doc_id', 'text', 'user_id'])
users_df = users_rdd.toDF(['user_id', 'name', 'age'])
Spark will automatically try to guess the column types. We can take a look at those types:
In [9]:
documents_df.printSchema()
In [10]:
users_df.printSchema()
Similar to SQL, we can apply a function to a column or several columns.
In [11]:
from pyspark.sql import functions as fn
In [12]:
# compute the average age of users
user_age_df = users_df.select(fn.avg('age'))
user_age_df
Out[12]:
As you can see, the function is not evaluated until an action (e.g., take
, show
, collect
) is taken
In [13]:
user_age_df.show()
We can cross (e.g., join) two dataframes ala SQL
In [14]:
users_df.join(documents_df, on='user_id').show()
We can also do outer joins
In [15]:
users_df.join(documents_df, on='user_id', how='left').show()
We can apply group functions
In [16]:
users_df.join(documents_df, 'user_id', how='left').\
groupby('user_id', 'name').\
agg(fn.count('text')).\
show()
We can change the name of computed columns:
In [17]:
users_df.join(documents_df, 'user_id', how='left').\
groupby('user_id', 'name').\
agg(fn.count('text').alias('n_pets')).\
show()
Add columns:
In [18]:
users_df.withColumn('name_length', fn.length('name')).show()
There are many, many types of functions. E.g., see here
In [19]:
from pyspark.ml.feature import Tokenizer
Almost all transfomers and estimator require you to specificy the input column of the dataframe and the output column that will be added to the dataframe.
In [20]:
# the tokenizer object
tokenizer = Tokenizer().setInputCol('text').setOutputCol('words')
We can now transform the dataframe
In [21]:
tokenizer.transform(documents_df).show()
This transformer counts how many times a word appears in a list and produces a vector with such counts. This is very useful for text analysis.
In [22]:
from pyspark.ml.feature import CountVectorizer
A CountVectorizer
is different from a Tokenizer
because it needs to learn how many different tokens there are in the input column. With that number, it will output vectors with consistent dimensions. Therefore, CountVectorizer
is an Estimator
that, when fitted, returns a Transformer
.
In [23]:
count_vectorizer_estimator = CountVectorizer().setInputCol('words').setOutputCol('features')
Now we need to user the words column that generated by the tokenizer
transformer
In [24]:
count_vectorizer_transformer = count_vectorizer_estimator.fit(tokenizer.transform(documents_df))
which results in:
In [25]:
count_vectorizer_transformer.transform(tokenizer.transform(documents_df)).show(truncate=False)
The column features
is a sparse vector representation. For example, for the first document, we have three features present: 0, 3, and 5. By looking at the vocabulary learned by count_vectorizer_transformer
, we can know which words those feature indices refer to:
In [26]:
# list of words in the vocabulary
count_vectorizer_transformer.vocabulary
Out[26]:
In [27]:
np.array(count_vectorizer_transformer.vocabulary)[[0, 3, 5]]
Out[27]:
Sometimes, we have long preprocessing steps that take raw data and transform it through several stages. As explained before, these complex transformations can be captured by Pipelines.
Pipelines are always estimators, even when they contain several transformers. After a pipeline is fit
to the data, the pipeline becomes an transformer.
We will now define a pipeline that takes the raw text
column and produces the features
column previously explained
In [28]:
from pyspark.ml import Pipeline
In [29]:
pipeline_cv_estimator = Pipeline(stages=[tokenizer, count_vectorizer_estimator])
In [30]:
pipeline_cv_transformer = pipeline_cv_estimator.fit(documents_df)
In [31]:
pipeline_cv_transformer.transform(documents_df).show()
In more complex scenarios, you can even chain Pipeline transformers. We will see this case in the actual use case below.
For a more detail explanation of Pipelines, Estimators, and Transformers, see here
In [32]:
!wget https://github.com/daniel-acuna/python_data_science_intro/blob/master/data/imdb_reviews_preprocessed.parquet.zip?raw=true -O imdb_reviews_preprocessed.parquet.zip && unzip imdb_reviews_preprocessed.parquet.zip && rm imdb_reviews_preprocessed.parquet.zip
In [33]:
!wget https://github.com/daniel-acuna/python_data_science_intro/blob/master/data/sentiments.parquet.zip?raw=true -O sentiments.parquet.zip && unzip sentiments.parquet.zip && rm sentiments.parquet.zip
In [34]:
!wget https://github.com/daniel-acuna/python_data_science_intro/blob/master/data/tweets.parquet.zip?raw=true -O tweets.parquet.zip && unzip tweets.parquet.zip && rm tweets.parquet.zip
In [35]:
sentiments_df = sqlContext.read.parquet('sentiments.parquet')
In [36]:
sentiments_df.printSchema()
The schema is very simple: for each word, we have whether it is positive (+1) or negative (-1)
In [37]:
# a sample of positive words
sentiments_df.where(fn.col('sentiment') == 1).show(5)
In [38]:
# a sample of negative words
sentiments_df.where(fn.col('sentiment') == -1).show(5)
Lets see how many of each category we have
In [39]:
sentiments_df.groupBy('sentiment').agg(fn.count('*')).show()
We have almost two times the number of negative words!
One simple approach for sentiment analysis is to simple count the number of positive and negative words in a text and then compute the average sentiment. Assuming that positive words are +1 and negative words are -1, we can classify a text as positive if the average sentiment is greater than zero and negative otherwise
To test our approach, we will use a sample of IMDB reviews that were tagged as positive and negative.
Let's load them:
In [40]:
imdb_reviews_df = sqlContext.read.parquet('imdb_reviews_preprocessed.parquet')
Let's take a look at a positive review
In [41]:
imdb_reviews_df.where(fn.col('score') == 1).first()
Out[41]:
And a negative one
In [42]:
imdb_reviews_df.where(fn.col('score') == 0).first()
Out[42]:
The first problem that we encounter is that the reviews are in plain text. We need to split the words and then match them to sentiment_df
.
To do, we will use a transformation that takes raw text and outputs a list of words
In [43]:
from pyspark.ml.feature import RegexTokenizer
RegexTokenizer
extracts a sequence of matches from the input text. Regular expressions are a powerful tool to extract strings with certain characteristics.
In [44]:
tokenizer = RegexTokenizer().setGaps(False)\
.setPattern("\\p{L}+")\
.setInputCol("review")\
.setOutputCol("words")
The pattern \p{L}+
means that it will extract letters without accents (e.g., it will extract "Acuna" from "Acuña"). setGaps
means that it will keep applying the rule until it can't extract new words. You have to set the input column from the incoming dataframe (in our case the review
column) and the new column that will be added (e.g., words
).
We are ready to transform the input dataframe imdb_reviews_df
with the tokenizer:
In [45]:
review_words_df = tokenizer.transform(imdb_reviews_df)
print(review_words_df)
Applying the transformation doesn't actually do anything until you apply an action. But as you can see, a new column words
of type array
of string
was added by the transformation. We can see how it looks:
In [46]:
review_words_df.show(5)
Now, we want to match every word from sentiment_df
in the array words
shown before. One way of doing this is to explode the column words
to create a row for each element in that list. Then, we would join that result with the dataframe sentiment
to continue further.
In [47]:
review_words_df.select('id', fn.explode('words').alias('word')).show(5)
Now if we join that with sentiment, we can see if there are positive and negative words in each review:
In [48]:
review_word_sentiment_df = review_words_df.\
select('id', fn.explode('words').alias('word')).\
join(sentiments_df, 'word')
review_word_sentiment_df.show(5)
Now we can simply average the sentiment per review id and, say, pick positive when the average is above 0, and negative otherwise.
In [49]:
simple_sentiment_prediction_df = review_word_sentiment_df.\
groupBy('id').\
agg(fn.avg('sentiment').alias('avg_sentiment')).\
withColumn('predicted', fn.when(fn.col('avg_sentiment') > 0, 1.0).otherwise(0.))
simple_sentiment_prediction_df.show(5)
Now, lets compute the accuracy of our prediction
In [50]:
imdb_reviews_df.\
join(simple_sentiment_prediction_df, 'id').\
select(fn.expr('float(score = predicted)').alias('correct')).\
select(fn.avg('correct')).\
show()
Not bad with such a simple approach! But can we do better than this?
There are couple of problems with the previous approach:
We could use data to estimate the sentiment that each word is contributing to the final sentiment of a review. Given that we are trying to predict negative and positve reviews, then we can use logistic regression for such binary prediction.
One typical approach is to count how many times a word appears in the text and then perform a reweighting so that words that are very common are "counted" less.
In Spark, we can achieve this by using several transformers:
Raw text => Tokens => Remove stop words => Term Frequency => Reweighting by Inverse Document frequency
To perform this sequence we will create a Pipeline
to consistently represent the steps from raw text to TF-IDF.
First, we need to create a sequence to take from raw text to term frequency. This is necessary because we don't know the number of tokens in the text and therefore we need to estimate such quantity from the data.
In [51]:
# we obtain the stop words from a website
import requests
stop_words = requests.get('http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words').text.split()
stop_words[0:10]
Out[51]:
In [52]:
from pyspark.ml.feature import StopWordsRemover
sw_filter = StopWordsRemover()\
.setStopWords(stop_words)\
.setCaseSensitive(False)\
.setInputCol("words")\
.setOutputCol("filtered")
Finally, for this initial Pipeline
, we define a counter vectorizer estimator
In [53]:
from pyspark.ml.feature import CountVectorizer
# we will remove words that appear in 5 docs or less
cv = CountVectorizer(minTF=1., minDF=5., vocabSize=2**17)\
.setInputCol("filtered")\
.setOutputCol("tf")
In [54]:
# we now create a pipelined transformer
cv_pipeline = Pipeline(stages=[tokenizer, sw_filter, cv]).fit(imdb_reviews_df)
In [55]:
# now we can make the transformation between the raw text and the counts
cv_pipeline.transform(imdb_reviews_df).show(5)
The term frequency vector is represented with a sparse vector. We have 26,384 terms.
Finally, we build another pipeline that takes the output of the previous pipeline and lowers the terms of documents that are very common.
In [56]:
from pyspark.ml.feature import IDF
idf = IDF().\
setInputCol('tf').\
setOutputCol('tfidf')
In [57]:
idf_pipeline = Pipeline(stages=[cv_pipeline, idf]).fit(imdb_reviews_df)
In [58]:
idf_pipeline.transform(imdb_reviews_df).show(5)
Therefore, the idf_pipeline
takes the raw text from the datafarme imdb_reviews_df
and creates a feature vector vector called tfidf
!
In [59]:
tfidf_df = idf_pipeline.transform(imdb_reviews_df)
First, let's split the data into training, validation, and testing.
In [60]:
training_df, validation_df, testing_df = imdb_reviews_df.randomSplit([0.6, 0.3, 0.1], seed=0)
In [61]:
[training_df.count(), validation_df.count(), testing_df.count()]
Out[61]:
One immediately apparent problem is that the number of features in the dataset is far larger than the number of training examples. This can lead to serious overfitting.
Let's look at this more closely. Let's apply a simple prediction model known as logistic regression.
Logistic regression will take the tfidf
features and predict whether the review is positive (score == 1
) or negative (score == 0
).
In [62]:
from pyspark.ml.classification import LogisticRegression
In [63]:
lr = LogisticRegression().\
setLabelCol('score').\
setFeaturesCol('tfidf').\
setRegParam(0.0).\
setMaxIter(100).\
setElasticNetParam(0.)
Lets create a pipeline transformation by chaining the idf_pipeline
with the logistic regression step (lr
)
In [64]:
lr_pipeline = Pipeline(stages=[idf_pipeline, lr]).fit(training_df)
Lets estimate the accuracy:
In [65]:
lr_pipeline.transform(validation_df).\
select(fn.expr('float(prediction = score)').alias('correct')).\
select(fn.avg('correct')).show()
The performance is much better than before.
The problem however is that we are overfitting because we have many features compared to the training examples:
For example, if we look at the weights of the features, there is a lot of noise:
In [66]:
import pandas as pd
vocabulary = idf_pipeline.stages[0].stages[-1].vocabulary
weights = lr_pipeline.stages[-1].coefficients.toArray()
coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': weights})
The most negative words are:
In [67]:
coeffs_df.sort_values('weight').head(5)
Out[67]:
And the most positive:
In [68]:
coeffs_df.sort_values('weight', ascending=False).head(5)
Out[68]:
But none of them make sense. What is happening? We are overfitting the data. Those words that don't make sense are capturing just noise in the reviews.
For example, the word helming
appears in only one review:
In [69]:
idf_pipeline.transform(training_df).\
select('id', fn.explode('words').alias('word')).\
where(fn.col('word') == 'helming').\
join(training_df, 'id').\
first()
Out[69]:
One way to prevent overfitting during training is to modify the loss function and penalize weight values that are too large.
There are two major regularization techniques, one based on penalizing the squared value of the weight (called L2 or ridge regularization) and anotherbased on penalizing the absolute value of the weight (called L1 or lasso regularization).
The unregularized logistic regression loss function is:
\begin{equation} L_\theta(p(X),Y) = - \left( \sum_i Y_i p_\theta(X_i) + (1-Y_i)(1-p_\theta(X_i)) \right) \end{equation}where $p_\theta(\cdot)$ is the sigmoid function:
\begin{equation} p_\theta(X) = \frac{1}{1+\exp(-(\theta_0 + \sum_{j>0} x_j \theta_j))} \end{equation}If we modify the loss function $L_\theta$ slightly
\begin{equation} L_\theta^{\lambda}(p(X),Y) = -\left( \sum_i Y_i p_\theta(X_i) + (1-Y_i)(1-p_\theta(X_i)) \right) + \lambda \sum_{j>0} \theta_j^2 \end{equation}we obtain what is known as L2 regularization.
Notice how we increase the loss function by $\lambda$ times the square of the weights. In practice, this means that we will think twice about increasing the importance of a feature. This loss function will prevent the algorithm for fitting certain data points, such as outliers or noise, unless the decrease in loss for the data grants it. Also, notice that the penalization doesn't apply to the bias parameter $\theta_0$.
You can see more clearly the effect of such cost function when $\lambda$ goes to infinity: the features will not be used for predicting and only the bias term will matter! This prevents the algorithm from learning altogether, forcing it to underfit!
One problem with L2 regularization is that all weights go to zero uniformly. In a sense, all features will matter but less than with the unregularized loss function. This is a really strange because we do not want all features to matter. In sentiment analysis, we want to select certain features because we want to understand that only some words have effects on the sentiment.
A different modification of the original loss function can achieve this. This regularization is known as L1 or lasso reguarlization and penalizes the absolute value of the weight
\begin{equation} L_\theta^{\lambda}(p(X),Y) = -\left( \sum_i Y_i p_\theta(X_i) + (1-Y_i)(1-p_\theta(X_i)) \right) + \lambda \sum_{j>0} \left| \theta_j \right| \end{equation}The practical effect of L1 regularization is that the difference between a feature having no importance vs some small importance is massively bigger than with L2 regularization. Therefore, optimizing the L1 loss function usually brings some features to have exactly zero weight.
One problem with L1 regularization is that it will never select more features that the number of examples. This is because it can always fit the training data perfectly when the number of features equals the number of examples. In our sentimental analysis, this is the case (there are more words than examples).
One way of remedying this is to have a combination of both L1 and L2. This is known as elastic net regularization. For this type of regularization, we have to pick a parameter ($\alpha$) deciding to consider L1 vs L2 regularization. If $\alpha=0$, then we choose L2, and if $\alpha=1$ we choose L1. For example, $\alpha=0.5$ means half L1 and half L2.
\begin{equation} L_\theta^{\lambda,\alpha}(p(X),Y) = -\left( \sum_i Y_i p_\theta(X_i) + (1-Y_i)(1-p_\theta(X_i)) \right) + \lambda \left[(1-\alpha) \sum_{j>0} \theta_j^2 + \alpha \sum_{j>0} \left| \theta_j \right| \right] \end{equation}Unfortunately, elastic net regularization comes with two additional parameters, $\lambda$ and $\alpha$, and we must either select them a priori or used the validation set to choose the best one.
In [70]:
lambda_par = 0.02
alpha_par = 0.3
en_lr = LogisticRegression().\
setLabelCol('score').\
setFeaturesCol('tfidf').\
setRegParam(lambda_par).\
setMaxIter(100).\
setElasticNetParam(alpha_par)
And we define a new Pipeline
In [71]:
en_lr_pipeline = Pipeline(stages=[idf_pipeline, en_lr]).fit(training_df)
Let's look at the performance
In [72]:
en_lr_pipeline.transform(validation_df).select(fn.avg(fn.expr('float(prediction = score)'))).show()
We improve performance slightly, but whats more important is that we improve the understanding of the word sentiments. Lets take at the weights:
In [73]:
en_weights = en_lr_pipeline.stages[-1].coefficients.toArray()
en_coeffs_df = pd.DataFrame({'word': vocabulary, 'weight': en_weights})
The most negative words all make sense ("worst" is actually more negative than than "worse")!
In [74]:
en_coeffs_df.sort_values('weight').head(15)
Out[74]:
Same thing with positive words
In [75]:
en_coeffs_df.sort_values('weight', ascending=False).head(15)
Out[75]:
Are there words with literarily zero importance for predicting sentiment? Yes, and most of them!
In [76]:
en_coeffs_df.query('weight == 0.0').shape
Out[76]:
In fact, more than 95% of features are not needed to achieve a better performance than all previous models!
In [77]:
en_coeffs_df.query('weight == 0.0').shape[0]/en_coeffs_df.shape[0]
Out[77]:
Let's look at these neutral words
In [78]:
en_coeffs_df.query('weight == 0.0').head(15)
Out[78]:
But, did we choose the right $\lambda$ and $\alpha$ parameters? We should run an experiment where we try different combinations of them. Fortunately, Spark let us do this by using a grid - a method that generates combination of parameters.
In [79]:
from pyspark.ml.tuning import ParamGridBuilder
We need to build a new estimator pipeline
In [80]:
en_lr_estimator = Pipeline(stages=[idf_pipeline, en_lr])
In [81]:
grid = ParamGridBuilder().\
addGrid(en_lr.regParam, [0., 0.01, 0.02]).\
addGrid(en_lr.elasticNetParam, [0., 0.2, 0.4]).\
build()
This is the list of parameters that we will try:
In [82]:
grid
Out[82]:
In [83]:
all_models = []
for j in range(len(grid)):
print("Fitting model {}".format(j+1))
model = en_lr_estimator.fit(training_df, grid[j])
all_models.append(model)
In [84]:
# estimate the accuracy of each of them:
accuracies = [m.\
transform(validation_df).\
select(fn.avg(fn.expr('float(score = prediction)')).alias('accuracy')).\
first().\
accuracy for m in all_models]
In [85]:
import numpy as np
In [86]:
best_model_idx = np.argmax(accuracies)
So the best model we found has the following parameters
In [87]:
grid[best_model_idx]
Out[87]:
In [88]:
best_model = all_models[best_model_idx]
In [89]:
accuracies[best_model_idx]
Out[89]:
Now we can use this model to predict sentiments on Twitter
In [90]:
tweets_df = sqlContext.read.parquet('tweets.parquet')
tweets_df.show(5, truncate=False)
We have 1K tweets from each candidate
In [91]:
tweets_df.groupby('handle').agg(fn.count('*')).show()
We can now predict the sentiment of the Tweet using our best model, we need to rename the column so that it matches our previous pipeline (review
=> ...)
In [92]:
best_model.transform(tweets_df.withColumnRenamed('text', 'review')).select('review', 'prediction').show()
Now, lets summarize our results in a graph!
In [93]:
%matplotlib inline
In [94]:
import seaborn
In [95]:
sentiment_pd = best_model.\
transform(tweets_df.withColumnRenamed('text', 'review')).\
groupby('handle').\
agg(fn.avg('prediction').alias('prediction'),
(2*fn.stddev('prediction')/fn.sqrt(fn.count('*'))).alias('err')).\
toPandas()
In [96]:
sentiment_pd.head()
Out[96]:
In [97]:
sentiment_pd.plot(x='handle', y='prediction', xerr='err', kind='barh');
But let's examine some "negative" tweets by Trump
In [98]:
best_model.\
transform(tweets_df.withColumnRenamed('text', 'review')).\
where(fn.col('handle') == '@realDonaldTrump').\
where(fn.col('prediction') == 0).\
select('review').\
take(5)
Out[98]:
And Clinton
In [99]:
best_model.\
transform(tweets_df.withColumnRenamed('text', 'review')).\
where(fn.col('handle') == '@HillaryClinton').\
where(fn.col('prediction') == 0).\
select('review').\
take(5)
Out[99]:
As you can see, there are lots of room for improvement.
imdb_reviews_df
), compute the average review length between positive and negative reviews. Hint: use the spark sql function length
. In particular, as we imported the funcions with the name fn
(using from pyspark.sql import function as fn
), use fn.length
with the name of the column.sentiments_df
, find the imdb reviews with the most number of negative words. Hint: You need to tokenize the review
field in imdb_review_df
and then join with sentiments_df
. Finally, perform selection and summary query1) Using the best model fitted (best_model
), estimate the generalization error in the testing set (testing_df
)
2) One way of analyzing what is wrong with a model is to examine when they fail the hardest. In our case, we could do this by looking at cases in which logistic regression is predicting with high probability a positive sentiment when in fact the actual sentiment is negative.
To extract the probability of positive sentiment, however, we must extract it from the prediction with a custom function.
In [100]:
from pyspark.sql import types
def probability_positive(probability_column):
return float(probability_column[1])
func_probability_positive = fn.udf(probability_positive, types.DoubleType())
prediction_probability_df = best_model.transform(validation_df).\
withColumn('probability_positive', func_probability_positive('probability')).\
select('id', 'review', 'score', 'probability_positive')
prediction_probability_df.show()
Analyze the worst predictions that are very wrong and suggest some ways of improving the model. Hint: Do a query that would get the highest probability_positive
values for cases where score
is 0
, and vice versa.
3) Using the best model (best_model
), predict the sentiment of the following sentences.
a) "Make America great again"
b) "Cats are not always the best companion"
c) "This sentence is not a sentence"