In [1]:
import pyspark
import json
sc = pyspark.SparkContext('local[*]')
wikiFile = sc.textFile('wikidata-20170306-all.json.gz')
subset = wikiFile \
.filter(lambda x: '{' in x) \
.map(lambda x: json.loads(x[:-1])) \
.map(lambda x: { \
'label': x.get('labels').get('de').get('value'), \
'description': x.get('descriptions').get('de').get('value') \
}) \
.take(10)
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.regression import LabeledPoint
documents = sc.parallelize(subset) \
.map(lambda x: x.get('description'))
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
labels = sc.parallelize(subset).map(lambda doc: doc['label'])
training = labels.zip(tfidf).map(lambda x: (x[0], x[1]))
print('finished')
print(training.first())
sc.stop()
In [ ]: