Start with:
$ IPYTHON=1 IPYTHON_OPTS="notebook" $SPARK_HOME/bin/pyspark
In [1]:
    
sc
    
    Out[1]:
In [2]:
    
from pyspark import SparkContext
logFile = "README.md"  # Should be some file on your system
#sc = SparkContext("local", "Simple App1")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
    
    
In [1]:
    
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from scipy.sparse import csr_matrix
from pyspark.mllib.linalg import Matrix
from pyspark.mllib.linalg import Matrices
    
In [4]:
    
def demo():
    # Load and parse the data
    data = sc.textFile("/home/carlosm/Projects/Sherlock/spark-1.5.1/data/mllib/sample_lda_data.txt")
    parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
    # Index documents with unique IDs
    corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
    # Cluster the documents into three topics using LDA
    ldaModel = LDA.train(corpus, k=3)
    # Output topics. Each is a distribution over words (matching word count vectors)
    print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + " words):")
    topics = ldaModel.topicsMatrix()
    for topic in range(3):
        print("Topic " + str(topic) + ":")
        for word in range(0, ldaModel.vocabSize()):
            print(" " + str(topics[word][topic]))
demo()
    
    
In [ ]:
    
    
In [ ]:
    
    
In [ ]:
    
    
In [2]:
    
%pylab inline
    
    
In [3]:
    
import pandas as pd
from wordcloud import WordCloud
import gensim
    
In [4]:
    
import pickle
data_vraag = pickle.load(open('../data/preprocessedData.pkl', 'r'))
    
In [5]:
    
data_ppl = data_vraag[data_vraag['individu of groep']=='mijzelf']
data_org = data_vraag[data_vraag['individu of groep']!='mijzelf']
    
In [6]:
    
vraagTokens = data_vraag['SentToks'].tolist()
    
In [7]:
    
dic = gensim.corpora.Dictionary(vraagTokens)
corpus = [dic.doc2bow(text) for text in vraagTokens]
    
In [ ]:
    
    
In [8]:
    
def lineToSparse(line):
    v = [float(x) for x in line.strip().split(' ')]
    v = { idx: v[idx] for idx,val in zip(np.nonzero(v)[0], v) }
    v = Vectors.sparse(11, v)
    return v
    
In [9]:
    
def toSparseVector(corpusLine, nFeatures):
    v = { idx: val for idx,val in corpusLine }
    return Vectors.sparse(nFeatures, v)
# nSamples = len(corpus)
nFeatures = len(dic)
corpusParallel = sc.parallelize(corpus)
# corpusParallel = sc.parallelize(corpus[:10])
corpusMapped = corpusParallel.map(lambda doc: toSparseVector(doc, nFeatures))
# Index documents with unique IDs
corpusIndexed = corpusMapped.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
    
In [10]:
    
nTopics = 10
ldaModel = LDA.train(corpusIndexed, k=nTopics)
    
In [ ]:
    
    
In [16]:
    
doc0 = corpusIndexed.first()[1].toArray()
    
In [11]:
    
cP10 = sc.parallelize(corpus[:10])
cM10 = cP10.map(lambda doc: toSparseVector(doc, nFeatures))
# Index documents with unique IDs
cI10 = cM10.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
doc10 = cI10
    
In [12]:
    
localLDA = ldaModel.call('toLocal')
    
In [78]:
    
def hack(self):
    return 'x'
doc10._get_object_id = hack.__get__(s, Scriptable)
doc10._get_object_id()
    
    Out[78]:
In [45]:
    
localLDA.topicDistribution(doc10)
    
    
In [41]:
    
doc10._id = doc10.id()
    
In [ ]:
    
    
In [14]:
    
from sklearn.decomposition import LatentDirichletAllocation
    
In [18]:
    
# Embed fitted matrix into sklearn LDA
lda = LatentDirichletAllocation(n_topics=nTopics, max_iter=1,
                                learning_method='online', learning_offset=50.
                                #,random_state=0
                               )
doc0 = corpusIndexed.first()[1].toArray()
lda.fit(doc0)
lda.components_ = ldaModel.topicsMatrix().T
    
In [19]:
    
def getDocumentTopics(docTokens, lda):
    wcTuples = dic.doc2bow(docTokens)
    data = []
    row  = []
    col  = []
    for w,c in wcTuples:
        col.append(0)
        row.append(w)
        data.append(c)
    nSamples = 1
    nFeatures = len(dic)
    oneDoc = csr_matrix((data, (col,row)), shape=(nSamples, nFeatures))
    docWeights = lda.transform(oneDoc)[0]
    docWeights /= docWeights.sum()
    return docWeights
    
In [20]:
    
def inRange(age, targetAge, delta):
    return (targetAge-delta)<=age and age<=(targetAge+delta)
def getPplCirca(targetAge, delta):
    return data_ppl[data_ppl['Leeftijd'].apply(lambda age: inRange(age,targetAge, delta))]
    
In [21]:
    
topicsByAge = np.zeros((data_ppl['Leeftijd'].max()+1, nTopics))
deltaAge = 5
for age in arange(data_ppl['Leeftijd'].max()+1): 
    dataGroup = getPplCirca(age,deltaAge)
    groupTokens = dataGroup['SentToks'].tolist()
    
    for qTokens in groupTokens:
        topicWeights = getDocumentTopics(qTokens, lda)
        for topic,weight in enumerate(topicWeights):
            topicsByAge[age,topic] += weight / len(groupTokens)
    
In [22]:
    
topicWords = []
topicWeightedWords = []
for topic_idx, topic in enumerate(lda.components_):
    weightedWordIdx = topic.argsort()[::-1]
    wordsInTopic = [dic[i] for i in weightedWordIdx[:10]]
    weights = topic / topic.sum()
    topicWeights = [ (weights[i],dic[i]) for i in weightedWordIdx[:10]]
    
    print "Topic #%d:" % topic_idx
    print " ".join(wordsInTopic)
    topicWords.append(wordsInTopic)
    topicWeightedWords.append(topicWeights)
    
    
In [24]:
    
figure(figsize=(16,40))
for idx,topic in enumerate(topicWeightedWords):
    wc = WordCloud(background_color="white")
    img = wc.generate_from_frequencies([ (word, weight) for weight,word in topic ])
    subplot(nTopics,2,2*idx+1)
    imshow(img)
    axis('off')
    
    subplot(nTopics,2,2*idx+2)
    plot(topicsByAge[:,idx])
    axis([10, 100, 0, 1.0])
    title('Topic #%2d'%(idx))