In this notebook we will explore the utilitis for Topic Modelling available on MLlib.
In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import pylab
# Required imports
from wikitools import wiki
from wikitools import category
# import nltk
import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from test_helper import Test
import collections
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# import gensim
# import numpy as np
# import lda
# import lda.datasets
In this notebook we will explore some tools for text processing and analysis and two topic modeling algorithms available from Python toolboxes.
To do so, we will explore and analyze collections of Wikipedia articles from a given category, using wikitools
, that makes easy the capture of content from wikimedia sites.
(As a side note, there are many other available text collections to test topic modelling algorithm. In particular, the NLTK library has many examples, that can explore them using the nltk.download()
tool.
import nltk
nltk.download()
for instance, you can take the gutemberg dataset
Mycorpus = nltk.corpus.gutenberg
text_name = Mycorpus.fileids()[0]
raw = Mycorpus.raw(text_name)
Words = Mycorpus.words(text_name)
Also, tools like Gensim or Sci-kit learn include text databases to work with).
In order to use Wikipedia data, we will select a single category of articles:
In [ ]:
site = wiki.Wiki("https://en.wikipedia.org/w/api.php")
# Select a category with a reasonable number of articles (>100)
cat = "Economics"
# cat = "Pseudoscience"
print cat
You can try with any other categories. Take into account that the behavior of topic modelling algorithms may depend on the amount of documents available for the analysis. Select a category with at least 100 articles. You can browse the wikipedia category tree here, https://en.wikipedia.org/wiki/Category:Contents, for instance.
We start downloading the text collection.
In [ ]:
# Loading category data. This may take a while
print "Loading category data. This may take a while..."
cat_data = category.Category(site, cat)
corpus_titles = []
corpus_text = []
for n, page in enumerate(cat_data.getAllMembersGen()):
print "\r Loading article {0}".format(n + 1),
corpus_titles.append(page.title)
corpus_text.append(page.getWikiText())
n_art = len(corpus_titles)
print "\nLoaded " + str(n_art) + " articles from category " + cat
Now, we have stored the whole text collection in two lists:
corpus_titles
, which contains the titles of the selected articlescorpus_text
, with the text content of the selected wikipedia articlesYou can browse the content of the wikipedia articles to get some intuition about the kind of documents that will be processed.
In [ ]:
# n = 5
# print corpus_titles[n]
# print corpus_text[n]
Now, we will load the text collection into an RDD
In [ ]:
corpusRDD = sc.parallelize(corpus_text, 4)
print "\nRDD created with {0} elements".format(corpusRDD.count())
In [ ]:
Test.assertTrue(corpusRDD.count() >= 100,
"Your corpus_tokens has less than 100 articles. Consider using a larger dataset")
Topic modelling algorithms process vectorized data. In order to apply them, we need to transform the raw text input data into a vector representation. To do so, we will remove irrelevant information from the text data and preserve as much relevant information as possible to capture the semantic content in the document collection.
Thus, we will proceed with the following steps:
The first three steps are independent for each document, so they can be parallelized.
For the first steps, we will use some of the powerfull methods available from the Natural Language Toolkit. In order to use the word_tokenize
method from nltk, you might need to get the appropriate libraries using nltk.download()
. You must select option "d) Download", and identifier "punkt"
In [ ]:
# You can comment this if the package is already available.
# Select option "d) Download", and identifier "punkt"
# nltk.download()
Also, we need to load a list of english stopwords. Select now identifier "stopwords"
In [ ]:
# You can comment this if the package is already available.
# Select option "d) Download", and identifier "stopwords"
# nltk.download()
You can check the stopword list. This is a standard python list of strings. We could modify it by removing words or adding new ones if required.
In [ ]:
stopwords_en = stopwords.words('english')
print "The stopword list contains {0} elements: ".format(len(stopwords_en))
print stopwords_en
Task: Create a method getTokenList
with two inputs: a document (string) and a stopword list, and completes the first three steps of the corpus processing, as follows:
utf-8
and transform the string into a list of tokens, using word_tokenize
from nltk.tokenize
.Return the result of cleaning (a list of tokens).
In [ ]:
def getTokenList(doc, stopwords_en):
# scode: tokens = <FILL IN> # Tokenize docs
tokens = word_tokenize(doc.decode('utf-8'))
# scode: tokens = <FILL IN> # Remove non-alphanumeric tokens and normalize to lowercase
tokens = [t.lower() for t in tokens if t.isalnum()]
# scode: tokens = <FILL IN> # Remove stopwords
tokens = [t for t in tokens if t not in stopwords_en]
return tokens
In [ ]:
Test.assertEquals(getTokenList('The rain in spain stays mainly in the plane', stopwords_en),
[u'rain', u'spain', u'stays', u'mainly', u'plane'],
'getTokenList does not return the expected results')
Task: Apply getTokenList
to all documents in the corpus and save the result in a corpus_tokensRDD
In [ ]:
# scode: corpus_tokensRDD = <FILL IN>
corpus_tokensRDD = (corpusRDD
.map(lambda x: getTokenList(x, stopwords_en))
.cache())
# print "\n Let's check tokens after cleaning:"
print corpus_tokensRDD.take(1)[0][0:30]
In [ ]:
Test.assertEquals(corpus_tokensRDD.count(), n_art,
"The number of documents in the original set does not correspond to the size of corpus_tokensRDD")
Test.assertTrue(all([c==c.lower() for c in corpus_tokensRDD.take(1)[0]]), 'Capital letters have not been removed')
Test.assertTrue(all([c.isalnum() for c in corpus_tokensRDD.take(1)[0]]),
'Non alphanumeric characters have not been removed')
Test.assertTrue(len([c for c in corpus_tokensRDD.take(1)[0] if c in stopwords_en])==0,
'Stopwords have not been removed')
Task: Apply stemming to all documents corpus_tokensRDD
and save the result in a new RDD, corpus_stemmedRDD
.
In [ ]:
# Select stemmer.
stemmer = nltk.stem.SnowballStemmer('english')
# scode: corpus_stemRDD = <FILL IN>
corpus_stemRDD = corpus_tokensRDD.map(lambda x: [stemmer.stem(token) for token in x])
print "\nLet's check the first tokens from document 0 after stemming:"
print corpus_stemRDD.take(1)[0][0:30]
In [ ]:
Test.assertTrue((len([c for c in corpus_stemRDD.take(1)[0] if c!=stemmer.stem(c)])
< 0.1*len(corpus_stemRDD.take(1)[0])),
'It seems that stemming has not been applied properly')
Alternatively, we can apply lemmatization. For english texts, we can use the lemmatizer from NLTK, which is based on WordNet. If you have not used wordnet before, you will likely need to download it from nltk
In [ ]:
# You can comment this if the package is already available.
# Select option "d) Download", and identifier "wordnet"
# nltk.download()
Task: Lemmatize all documents corpus_tokensRDD
using the .lemmatize() method, from the WordNetLemmatizer object created in the first line and save the result in a new RDD, corpus_lemRDD
.
In [ ]:
wnl = WordNetLemmatizer()
# scode: corpus_lemmatRDD = <FILL IN>
corpus_lemmatRDD = (corpus_tokensRDD
.map(lambda x: [wnl.lemmatize(token) for token in x]))
print "\nLet's check the first tokens from document 0 after stemming:"
print corpus_lemmatRDD.take(1)[0][0:30]
One of the advantages of the lemmatizer method is that the result of lemmatization is still a true word, which is more advisable for the presentation of text processing results and lemmatization.
However, without using contextual information, lemmatize() does not remove grammatical differences. This is the reason why "is" or "are" are preserved and not replaced by infinitive "be".
As an alternative, we can apply .lemmatize(word, pos)
, where 'pos' is a string code specifying the part-of-speech (pos), i.e. the grammatical role of the words in its sentence. For instance, you can check the difference between wnl.lemmatize('is')
and wnl.lemmatize('is, pos='v')
.
Up to this point, we have transformed the raw text collection of articles in a list of articles, where each article is a collection of the word roots that are most relevant for semantic analysis. Now, we need to convert these data (a list of token lists) into a numerical representation (a list of vectors, or a matrix).
As a first step, we compute the word count for every document in the corpus.
Task: Compute a new RDD from corpus_stemRDD
where each element is a list of tuples related to a document. The key of each tuple is a token, and its value the number of occurrences of this token in the document. To do so, you can use method Counter
from collections
.
In [ ]:
# corpus_wcRDD = <FILL IN>
corpus_wcRDD = (corpus_stemRDD
.map(collections.Counter)
.map(lambda x: [(t, x[t]) for t in x]))
print corpus_wcRDD.take(1)[0][0:30]
In [ ]:
Test.assertTrue(corpus_wcRDD.count() == n_art, 'List corpus_clean does not contain the expected number of articles')
Test.assertTrue(corpus_wcRDD.flatMap(lambda x: x).map(lambda x: x[1]).sum()== corpus_stemRDD.map(len).sum(),
'The total token count in the output RDD is not consistent with the total number of input tokens')
At this point, we have got a representation of documents as list of tuples (token, word_count)
in corpus_wcRDD
. From this RDD, we can compute a dictionary containing all tokens in the corpus as keys, and their respective number of occurrences as values.
Task: Using corpus_wcRDD
compute a new RDD of (key, value)
pairs, where keys are the tokens in the whole corpus and their respective values are the total number of occurences in the corpus.
In [ ]:
# scode: wcRDD = < FILL IN >
wcRDD = (corpus_wcRDD
.flatMap(lambda x: x)
.reduceByKey(lambda x, y: x + y))
print wcRDD.take(30)
Task: Take all tuples in wcRDD
in decreasing order of the number of token counts in variable TD
and compute two lists:
token_count
: a list of token counts, in decreasing order.D
: A list of tokens, in the same order.
In [ ]:
# Token Dictionary:
n_tokens = wcRDD.count()
# scode: TD = wcRDD.<FILL IN>
TD = wcRDD.takeOrdered(n_tokens, lambda x: -x[1])
# scode: D = <FIll IN> # Extract tokens from TD
D = map(lambda x: x[0], TD)
# scode: token_count = <FILL IN> # Extract token counts from TD
token_count = map(lambda x: x[1], TD)
# ALTERNATIVELY:
TD_RDD = wcRDD.sortBy(lambda x: -x[1])
D_RDD = TD_RDD.map(lambda x: x[0])
token_countRDD = TD_RDD.map(lambda x: x[1])
print TD
We can visualize the token distribution using D
and token_count
, for the most frequent terms
In [ ]:
# SORTED TOKEN FREQUENCIES (II):
# plt.rcdefaults()
# Example data
n_bins = 25
y_pos = range(n_bins-1, -1, -1)
hot_tokens = D[0:n_bins]
z = [float(t)/n_art for t in token_count[0:n_bins]]
plt.barh(y_pos, z, align='center', alpha=0.4)
plt.yticks(y_pos, hot_tokens)
plt.xlabel('Average number of occurrences per article')
plt.title('Token distribution')
plt.show()
In order to apply the LDA algorithm, we need to represent the input documents in the format required by MLlib. More specifically. The input data should be an RDD where each element is a tuple
(doc_id, vector)
where doc_id
is an integer document identifier, and vector
can be a sparse or dense vector from class Vectors
. We will use sparse vectors, which are more adequate for large vocabularies.
To compute the sparse vectors, we must first transform the lists of tuples (token, value)
in wcRDD
into a lists of (token_id, value)
, pairs, thus replacing each token by a numerical identifier.
We will proceed in two steps:
invD
, transforming tokens into numbers.wcRDD
replacing each token by its token_id
.[ Task: complete the two steps outlined above.
In [ ]:
# INDICE INVERTIDO: EJEMPLO:
# D = ['token1', 'token2', 'token3', 'token4']
# D[1] = 'token2'
# invD = {'token1': 0, 'token2': 1, 'token3': 2, 'token4': 3}
# invD['token2'] = 1
# Compute inverse dictionary
# scode: invD = <FILL IN>
invD = dict(zip(D, xrange(n_tokens)))
### ALTERNATIVELY:
# invD_RDD = D_RDD.zipWithIndex() ### Tuples (token, index)
# Compute RDD replacing tokens by token_ids
# scode: corpus_sparseRDD = <FILL IN>
corpus_sparseRDD = corpus_wcRDD.map(lambda x: [(invD[t[0]], t[1]) for t in x])
# Convert list of tuplas into Vectors.sparse object.
corpus_sparseRDD = corpus_sparseRDD.map(lambda x: Vectors.sparse(n_tokens, x))
The only remaining step consists on adding an identifier to each document of the corpus.
Task: Apply method zipWithIndex
to corpus_sparseRDD
in order to add consecutive integer identifiers to all documents in the corpus.
In [ ]:
corpus4lda = corpus_sparseRDD.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
That's all. We can already call to the lda algorithm.'
Task: Train an LDA model with 3 topics and the corpus obtained in corpus4lda
. Check the LDA documentation to find the appropriate command.
In [ ]:
print "Training LDA: this might take a while..."
# scode: ldaModel = LDA.<FILL IN>
ldaModel = LDA.train(corpus4lda, k=3)
The whole topics matrix can be computed using the .topicsMatrix()
method.
In [ ]:
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + " words):")
topics = ldaModel.topicsMatrix()
Alternatively, we can use the .describeTopics
method that returns the most relevan terms for each topic, and it is more useful for a graphical plot.
Task: Represent the 25 most relevant terms for each topic using bar plots.
In [ ]:
n_bins = 25
# Example data
y_pos = range(n_bins-1, -1, -1)
pylab.rcParams['figure.figsize'] = 16, 8 # Set figure size
for i in range(3):
topic = ldaModel.describeTopics(maxTermsPerTopic=n_bins)[i]
tokens = [D[n] for n in topic[0]]
weights = topic[1]
plt.subplot(1, 3, i+1)
plt.barh(y_pos, weights, align='center', alpha=0.4)
plt.yticks(y_pos, tokens)
plt.xlabel('Average number of occurrences per article')
plt.title('Token distribution')
Exercise: Explore the influence of the topicConcentration
parameter. Show in barplots the most relevant tokens for each topic for large values of this parameter.
In [ ]:
Unfortunately, we cannot capture the document distributions over topics, in the current version of pySpark mllib (1.6).
The following code, taken from Stackoverflow, can be used to compute the SVD.
In [ ]:
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg.distributed import RowMatrix
class SVD(JavaModelWrapper):
"""Wrapper around the SVD scala case class"""
@property
def U(self):
""" Returns a RowMatrix whose columns are the left singular vectors of the SVD if computeU was set to be True."""
u = self.call("U")
if u is not None:
return RowMatrix(u)
@property
def s(self):
"""Returns a DenseVector with singular values in descending order."""
return self.call("s")
@property
def V(self):
""" Returns a DenseMatrix whose columns are the right singular vectors of the SVD."""
return self.call("V")
In [ ]:
def computeSVD(row_matrix, k, computeU=False, rCond=1e-9):
"""
Computes the singular value decomposition of the RowMatrix.
The given row matrix A of dimension (m X n) is decomposed into U * s * V'T where
* s: DenseVector consisting of square root of the eigenvalues (singular values) in descending order.
* U: (m X k) (left singular vectors) is a RowMatrix whose columns are the eigenvectors of (A X A')
* v: (n X k) (right singular vectors) is a Matrix whose columns are the eigenvectors of (A' X A)
:param k: number of singular values to keep. We might return less than k if there are numerically zero singular values.
:param computeU: Whether of not to compute U. If set to be True, then U is computed by A * V * sigma^-1
:param rCond: the reciprocal condition number. All singular values smaller than rCond * sigma(0) are treated as zero, where sigma(0) is the largest singular value.
:returns: SVD object
"""
java_model = row_matrix._java_matrix_wrapper.call("computeSVD", int(k), computeU, float(rCond))
return SVD(java_model)
In [ ]:
from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),), (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])
pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca_extracted.fit(df)
features = model.transform(df) # this create a DataFrame with the regular features and pca_features
# We can now extract the pca_features to prepare our RowMatrix.
pca_features = features.select("pca_features").rdd.map(lambda row : row[0])
mat = RowMatrix(pca_features)
# Once the RowMatrix is ready we can compute our Singular Value Decomposition
svd = computeSVD(mat,2,True)
print svd.s
# DenseVector([9.491, 4.6253])
print svd.U.rows.collect()
# [DenseVector([0.1129, -0.909]), DenseVector([0.463, 0.4055]), DenseVector([0.8792, -0.0968])]
print svd.V
# DenseMatrix(2, 2, [-0.8025, -0.5967, -0.5967, 0.8025], 0)
Task: Adapt the code above to compute the LSI topic model of corpus4lda
.
In [ ]: