Start with:
$ IPYTHON=1 IPYTHON_OPTS="notebook" $SPARK_HOME/bin/pyspark
In [1]:
sc
Out[1]:
In [2]:
from pyspark import SparkContext
logFile = "README.md" # Should be some file on your system
#sc = SparkContext("local", "Simple App1")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
In [1]:
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from scipy.sparse import csr_matrix
from pyspark.mllib.linalg import Matrix
from pyspark.mllib.linalg import Matrices
In [4]:
def demo():
# Load and parse the data
data = sc.textFile("/home/carlosm/Projects/Sherlock/spark-1.5.1/data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
demo()
In [ ]:
In [ ]:
In [ ]:
In [2]:
%pylab inline
In [3]:
import pandas as pd
from wordcloud import WordCloud
import gensim
In [4]:
import pickle
data_vraag = pickle.load(open('../data/preprocessedData.pkl', 'r'))
In [5]:
data_ppl = data_vraag[data_vraag['individu of groep']=='mijzelf']
data_org = data_vraag[data_vraag['individu of groep']!='mijzelf']
In [6]:
vraagTokens = data_vraag['SentToks'].tolist()
In [7]:
dic = gensim.corpora.Dictionary(vraagTokens)
corpus = [dic.doc2bow(text) for text in vraagTokens]
In [ ]:
In [8]:
def lineToSparse(line):
v = [float(x) for x in line.strip().split(' ')]
v = { idx: v[idx] for idx,val in zip(np.nonzero(v)[0], v) }
v = Vectors.sparse(11, v)
return v
In [9]:
def toSparseVector(corpusLine, nFeatures):
v = { idx: val for idx,val in corpusLine }
return Vectors.sparse(nFeatures, v)
# nSamples = len(corpus)
nFeatures = len(dic)
corpusParallel = sc.parallelize(corpus)
# corpusParallel = sc.parallelize(corpus[:10])
corpusMapped = corpusParallel.map(lambda doc: toSparseVector(doc, nFeatures))
# Index documents with unique IDs
corpusIndexed = corpusMapped.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
In [10]:
nTopics = 10
ldaModel = LDA.train(corpusIndexed, k=nTopics)
In [ ]:
In [16]:
doc0 = corpusIndexed.first()[1].toArray()
In [11]:
cP10 = sc.parallelize(corpus[:10])
cM10 = cP10.map(lambda doc: toSparseVector(doc, nFeatures))
# Index documents with unique IDs
cI10 = cM10.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
doc10 = cI10
In [12]:
localLDA = ldaModel.call('toLocal')
In [78]:
def hack(self):
return 'x'
doc10._get_object_id = hack.__get__(s, Scriptable)
doc10._get_object_id()
Out[78]:
In [45]:
localLDA.topicDistribution(doc10)
In [41]:
doc10._id = doc10.id()
In [ ]:
In [14]:
from sklearn.decomposition import LatentDirichletAllocation
In [18]:
# Embed fitted matrix into sklearn LDA
lda = LatentDirichletAllocation(n_topics=nTopics, max_iter=1,
learning_method='online', learning_offset=50.
#,random_state=0
)
doc0 = corpusIndexed.first()[1].toArray()
lda.fit(doc0)
lda.components_ = ldaModel.topicsMatrix().T
In [19]:
def getDocumentTopics(docTokens, lda):
wcTuples = dic.doc2bow(docTokens)
data = []
row = []
col = []
for w,c in wcTuples:
col.append(0)
row.append(w)
data.append(c)
nSamples = 1
nFeatures = len(dic)
oneDoc = csr_matrix((data, (col,row)), shape=(nSamples, nFeatures))
docWeights = lda.transform(oneDoc)[0]
docWeights /= docWeights.sum()
return docWeights
In [20]:
def inRange(age, targetAge, delta):
return (targetAge-delta)<=age and age<=(targetAge+delta)
def getPplCirca(targetAge, delta):
return data_ppl[data_ppl['Leeftijd'].apply(lambda age: inRange(age,targetAge, delta))]
In [21]:
topicsByAge = np.zeros((data_ppl['Leeftijd'].max()+1, nTopics))
deltaAge = 5
for age in arange(data_ppl['Leeftijd'].max()+1):
dataGroup = getPplCirca(age,deltaAge)
groupTokens = dataGroup['SentToks'].tolist()
for qTokens in groupTokens:
topicWeights = getDocumentTopics(qTokens, lda)
for topic,weight in enumerate(topicWeights):
topicsByAge[age,topic] += weight / len(groupTokens)
In [22]:
topicWords = []
topicWeightedWords = []
for topic_idx, topic in enumerate(lda.components_):
weightedWordIdx = topic.argsort()[::-1]
wordsInTopic = [dic[i] for i in weightedWordIdx[:10]]
weights = topic / topic.sum()
topicWeights = [ (weights[i],dic[i]) for i in weightedWordIdx[:10]]
print "Topic #%d:" % topic_idx
print " ".join(wordsInTopic)
topicWords.append(wordsInTopic)
topicWeightedWords.append(topicWeights)
In [24]:
figure(figsize=(16,40))
for idx,topic in enumerate(topicWeightedWords):
wc = WordCloud(background_color="white")
img = wc.generate_from_frequencies([ (word, weight) for weight,word in topic ])
subplot(nTopics,2,2*idx+1)
imshow(img)
axis('off')
subplot(nTopics,2,2*idx+2)
plot(topicsByAge[:,idx])
axis([10, 100, 0, 1.0])
title('Topic #%2d'%(idx))