In [1]:
import pyspark
import json

sc = pyspark.SparkContext('local[*]')

wikiFile = sc.textFile('wikidata-20170306-all.json.gz')

subset = wikiFile \
  .filter(lambda x: '{' in x) \
  .map(lambda x: json.loads(x[:-1])) \
  .map(lambda x: { \
    'label': x.get('labels').get('de').get('value'), \
    'description': x.get('descriptions').get('de').get('value') \
  }) \
  .take(10)


from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.regression import LabeledPoint

documents = sc.parallelize(subset) \
  .map(lambda x: x.get('description'))
    
hashingTF = HashingTF()
tf = hashingTF.transform(documents)

tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

labels = sc.parallelize(subset).map(lambda doc: doc['label'])
training = labels.zip(tfidf).map(lambda x: (x[0], x[1]))

print('finished')
print(training.first())

sc.stop()


finished
('Schottland', SparseVector(1048576, {35920: 0.6369, 40200: 0.0, 173606: 0.3185, 211440: 0.0953, 238153: 0.2859, 263483: 1.0116, 265159: 0.6369, 296409: 0.0, 388504: 1.5769, 469732: 1.0116, 599511: 1.0116, 667632: 1.2993, 702216: 0.0, 702740: 0.7885, 734443: 0.0, 777769: 0.7885, 856921: 1.2993, 865556: 1.2993, 875351: 0.0, 891534: 0.8027, 897367: 0.0953, 957654: 1.7047, 968035: 0.0, 997188: 1.2993}))

In [ ]: