In [8]:
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark import SparkContext
# sc = SparkContext()
# Load and parse the data
data = sc.textFile("../../spark/data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
# Save and load model
# model.save(sc, "myModelPath")
# sameModel = LDAModel.load(sc, "myModelPath")
In [14]:
topics
Out[14]:
In [9]:
data.take(3)
Out[9]:
In [10]:
parsedData.take(3)
Out[10]:
In [11]:
corpus.take(3)
Out[11]:
In [12]:
ldaModel
Out[12]: