Para realizar esta prueba es necesario tener actualizada la máquina virtual con la versión más reciente de MLlib.
Para la actualización, debe seguir los pasos que se indican a continuación:
Entrar en la vm como root:
vagrant ssh
sudo bash
Ir a /usr/local/bin
Descargar la última versión de spark desde la vm mediante
wget http://www-eu.apache.org/dist/spark/spark-1.6.1/spark-1.6.1-bin-hadoop2.6.tgz
Desempaquetar:
tar xvf spark-1.6.1-bin-hadoop2.6.tgz (y borrar el tgz)
Lo siguiente es un parche, pero suficiente para que funcione:
Guardar copia de spark-1.3: mv spark-1.3.1-bin-hadoop2.6/ spark-1.3.1-bin-hadoop2.6_old
Crear enlace a spark-1.6: ln -s spark-1.6.1-bin-hadoop2.6/ spark-1.3.1-bin-hadoop2.6
In [20]:
%matplotlib inline
import nltk
import time
import matplotlib.pyplot as plt
import pylab
# import nltk
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
#from test_helper import Test
import collections
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# import gensim
# import numpy as np
In [2]:
#nltk.download()
mycorpus = nltk.corpus.reuters
Para evitar problemas de sobrecarga de memoria, o de tiempo de procesado, puede reducir el tamaño el corpus, modificando el valor de la variable n_docs a continuación.
In [3]:
n_docs = 500000
filenames = mycorpus.fileids()
fn_train = [f for f in filenames if f[0:5]=='train']
corpus_text = [mycorpus.raw(f) for f in fn_train]
# Reduced dataset:
n_docs = min(n_docs, len(corpus_text))
corpus_text = [corpus_text[n] for n in range(n_docs)]
print 'Loaded {0} files'.format(len(corpus_text))
A continuación cargaremos los datos en un RDD
In [4]:
corpusRDD = sc.parallelize(corpus_text, 4)
print "\nRDD created with {0} elements".format(corpusRDD.count())
Prepare los datos para aplicar un algoritmo de modelado de tópicos en pyspark. Para ello, aplique los pasos siguientes:
Guarde el resultado en la variable corpus_tokensRDD
In [5]:
def getTokenList(doc, stopwords_en):
# scode: tokens = <FILL IN> # Tokenize docs
tokens = word_tokenize(doc.decode('utf-8'))
# scode: tokens = <FILL IN> # Remove non-alphanumeric tokens and normalize to lowercase
tokens = [t.lower() for t in tokens if t.isalnum()]
# scode: tokens = <FILL IN> # Remove stopwords
tokens = [t for t in tokens if t not in stopwords_en]
return tokens
stopwords_en = stopwords.words('english')
corpus_tokensRDD = (corpusRDD
.map(lambda x: getTokenList(x, stopwords_en))
.cache())
# print "\n Let's check tokens after cleaning:"
print corpus_tokensRDD.take(1)[0][0:30]
In [6]:
# Select stemmer.
stemmer = nltk.stem.SnowballStemmer('english')
# scode: corpus_stemRDD = <FILL IN>
corpus_stemRDD = corpus_tokensRDD.map(lambda x: [stemmer.stem(token) for token in x])
print "\nLet's check the first tokens from document 0 after stemming:"
print corpus_stemRDD.take(1)[0][0:30]
En este punto cada documento del corpus es una lista de tokens.
Calcule un nuevo RDD que contenga, para cada documento, una lista de tuplas. La clave (key) de cada lista será un token y su valor el número de repeticiones del mismo en el documento.
Imprima una muestra de 20 tuplas uno de los documentos del corpus.
In [60]:
# corpus_wcRDD = <FILL IN>
corpus_wcRDD = (corpus_stemRDD
.map(collections.Counter)
.map(lambda x: [(t, x[t]) for t in x]))
print corpus_wcRDD.take(1)[0][0:20]
Construya, a partir de corpus_wcRDD, un nuevo diccionario con todos los tokens del corpus. El resultado será un diccionario python de nombre wcDict, cuyas entradas serán los tokens y sus valores el número de repeticiones del token en todo el corpus.
wcDict = {token1: valor1, token2, valor2, ...}
Imprima el número de repeticiones del token interpret
In [63]:
# scode: wcRDD = < FILL IN >
wcRDD = (corpus_wcRDD
.flatMap(lambda x: x)
.reduceByKey(lambda x, y: x + y))
wcDict = dict(wcRDD.collect())
print wcDict['interpret']
In [9]:
print wcRDD.count()
In [55]:
print wcRDD.takeOrdered(5, lambda x: -x[1])
In [65]:
tokenmasf = 'said'
ndocs = corpus_stemRDD.filter(lambda x: tokenmasf in x).count()
print ndocs
In [11]:
corpus_wcRDD2 = corpus_wcRDD.map(lambda x: [tupla for tupla in x if tupla[0]
not in ['said', 'mln']])
# scode: wcRDD = < FILL IN >
wcRDD2 = (corpus_wcRDD2
.flatMap(lambda x: x)
.reduceByKey(lambda x, y: x + y)
.sortBy(lambda x: -x[1]))
print wcRDD2.takeOrdered(10, lambda x: -x[1])
Determine la lista de topicos de todo el corpus, y construya un dictionario inverso, cuyas entradas sean los números consecutivos de 0 al número total de tokens, y sus salidas cada uno de los tokens, es decir
invD = {0: token0, 1: token1, 2: token2, ...}
In [25]:
# Token Dictionary:
n_tokens = wcRDD2.count()
TD = wcRDD2.takeOrdered(n_tokens, lambda x: -x[1])
D = map(lambda x: x[0], TD)
token_count = map(lambda x: x[1], TD)
# Compute inverse dictionary
invD = dict(zip(D, xrange(n_tokens)))
In [26]:
# Compute RDD replacing tokens by token_ids
corpus_sparseRDD = corpus_wcRDD2.map(lambda x: [(invD[t[0]], t[1]) for t in x])
# Convert list of tuplas into Vectors.sparse object.
corpus_sparseRDD = corpus_sparseRDD.map(lambda x: Vectors.sparse(n_tokens, x))
corpus4lda = corpus_sparseRDD.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
Aplique el algoritmo LDA con 4 tópicos sobre el corpus obtenido en corpus4lda, para un valor de topicConcentration = 2.0 y docConcentration = 3.0. (Tenga en cuenta que estos parámetros de entrada deben de ser tipo float).
In [53]:
print "Training LDA: this might take a while..."
start = time.time()
n_topics = 4
ldaModel = LDA.train(corpus4lda, k=n_topics, topicConcentration=2.0, docConcentration=3.0)
print "Modelo LDA entrenado en: {0} segundos".format(time.time()-start)
In [45]:
n_topics = 4
ldatopics = ldaModel.describeTopics(maxTermsPerTopic=2)
ldatopicnames = map(lambda x: x[0], ldatopics)
print ldatopicnames
for i in range(n_topics):
print "Topic {0}: {1}, {2}".format(i, D[ldatopicnames[i][0]], D[ldatopicnames[i][1]])
In [59]:
# Output topics. Each is a distribution over words (matching word count vectors)
iBank = invD['bank']
topicMatrix = ldaModel.topicsMatrix()
print topicMatrix[iBank]
In [ ]:
In [ ]: