In [ ]:
import yaml
import pyspark
from operator import add

conf = pyspark.SparkConf()
conf.setMaster("mesos://controller:5050")
conf.set("spark.mesos.executor.docker.image","registry:5000/bdkd:spark_mesos_v11")
conf.set("spark.mesos.executor.home","/opt/spark-1.4.0-bin-hadoop2.4")
conf.set("spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY","/usr/local/lib/libmesos.so")
sc = pyspark.SparkContext(conf=conf)

In [ ]:
datastore_conf = yaml.load(open('/root/.bdkd_datastore.conf').read())
access_key = datastore_conf.get('hosts',{}).get('s3-sydney',{}).get('access_key')
secret_key = datastore_conf.get('hosts',{}).get('s3-sydney',{}).get('secret_key')

ds_repo = 'bdkd-sirca-public'
ds_dataset = 'cities'

results_file = 'data/results_word_count.csv'

In [ ]:
# Parallel processing
def word_count():
    s3_files = 's3n://{0}:{1}@{2}/files/{3}/*.txt'.format(access_key, secret_key, ds_repo, ds_dataset)
    files_rdd = sc.textFile(s3_files)
    word_count = files_rdd.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add).\
                           map(lambda x: (x[1], x[0])).sortByKey(False)
    results = word_count.collect()
    with open(results_file, 'wb') as fw:
        for n, row in enumerate(results):
            if n > 10: # Show only top x
                break

        line = '{0},{1},{2}\n'.format(n, row[0], row[1])
        fw.write(line)

    return results_file

In [ ]:
results = word_count()
print 'Results file:', results

In [ ]:
sc.stop()

In [ ]: