In [ ]:
import yaml
import pyspark
from operator import add
conf = pyspark.SparkConf()
conf.setMaster("mesos://controller:5050")
conf.set("spark.mesos.executor.docker.image","registry:5000/bdkd:spark_mesos_v11")
conf.set("spark.mesos.executor.home","/opt/spark-1.4.0-bin-hadoop2.4")
conf.set("spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY","/usr/local/lib/libmesos.so")
sc = pyspark.SparkContext(conf=conf)
In [ ]:
datastore_conf = yaml.load(open('/root/.bdkd_datastore.conf').read())
access_key = datastore_conf.get('hosts',{}).get('s3-sydney',{}).get('access_key')
secret_key = datastore_conf.get('hosts',{}).get('s3-sydney',{}).get('secret_key')
ds_repo = 'bdkd-sirca-public'
ds_dataset = 'cities'
results_file = 'data/results_word_count.csv'
In [ ]:
# Parallel processing
def word_count():
s3_files = 's3n://{0}:{1}@{2}/files/{3}/*.txt'.format(access_key, secret_key, ds_repo, ds_dataset)
files_rdd = sc.textFile(s3_files)
word_count = files_rdd.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add).\
map(lambda x: (x[1], x[0])).sortByKey(False)
results = word_count.collect()
with open(results_file, 'wb') as fw:
for n, row in enumerate(results):
if n > 10: # Show only top x
break
line = '{0},{1},{2}\n'.format(n, row[0], row[1])
fw.write(line)
return results_file
In [ ]:
results = word_count()
print 'Results file:', results
In [ ]:
sc.stop()
In [ ]: