In [1]:
from pysparkling import Context

Word Count


In [2]:
counts = Context().textFile(
    '../README.rst'
).map(
    lambda line: ''.join(ch if ch.isalnum() else ' ' for ch in line)
).flatMap(
    lambda line: line.split(' ')
).map(
    lambda word: (word, 1)
).reduceByKey(
    lambda a, b: a + b
)
print(counts.collect())


[(u'', 683), (u'1', 1), (u'100k', 1), (u'20Chat', 1), (u'3', 3), (u'4', 1), (u'A', 3), (u'ACCESS', 2), (u'API', 6), (u'AWS', 3), (u'All', 2), (u'Assume', 1), (u'Context', 11), (u'Count', 1), (u'Examples', 1), (u'Features', 1), (u'File', 4), (u'For', 1), (u'Given', 1), (u'HISTORY', 2), (u'Hadoop', 1), (u'Handles', 1), (u'ID', 1), (u'If', 1), (u'Install', 1), (u'Instantiating', 1), (u'It', 1), (u'JVM', 2), (u'Join', 2), (u'KEY', 2), (u'Now', 1), (u'Parallelization', 1), (u'Plain', 1), (u'Pool', 3), (u'PySpark', 1), (u'Python', 1), (u'RDD', 6), (u'README', 1), (u'Resolves', 1), (u'S3', 1), (u'SECRET', 1), (u'Some', 1), (u'Spark', 2), (u'SparkContext', 1), (u'Specify', 1), (u'Supports', 1), (u'The', 5), (u'These', 1), (u'They', 1), (u'This', 4), (u'ThreadPoolExecutor', 1), (u'URI', 1), (u'Use', 2), (u'Word', 1), (u'You', 2), (u'a', 17), (u'access', 1), (u'advanced', 1), (u'all', 1), (u'also', 1), (u'alt', 1), (u'an', 2), (u'and', 20), (u'any', 2), (u'api', 3), (u'are', 2), (u'arguments', 1), (u'assumes', 1), (u'at', 2), (u'auth', 1), (u'b', 2), (u'badge', 5), (u'badges', 1), (u'bash', 1), (u'be', 3), (u'below', 1), (u'blob', 2), (u'block', 3), (u'boto', 1), (u'bucket', 1), (u'but', 1), (u'by', 3), (u'bz2', 2), (u'c', 1), (u'call', 1), (u'campaign', 1), (u'can', 5), (u'case', 1), (u'ch', 3), (u'change', 1), (u'chat', 1), (u'check', 1), (u'classifier', 2), (u'cloudpickle', 2), (u'code', 3), (u'collect', 1), (u'com', 4), (u'comma', 1), (u'common', 1), (u'compressed', 1), (u'compression', 1), (u'concurrent', 1), (u'content', 1), (u'converts', 1), (u'counts', 3), (u'custom', 1), (u'data', 7), (u'datasets', 1), (u'decompression', 1), (u'default', 2), (u'demo', 2), (u'demoed', 1), (u'dependencies', 2), (u'dependency', 2), (u'describes', 1), (u'deserialized', 1), (u'deserializer', 2), (u'directly', 1), (u'dm', 1), (u'doc', 3), (u'docs', 2), (u'document', 1), (u'documents', 1), (u'does', 2), (u'done', 1), (u'drop', 1), (u'dump', 1), (u'dumps', 1), (u'either', 1), (u'else', 1), (u'endpoint', 1), (u'environment', 1), (u'example', 1), (u'examples', 1), (u'existance', 1), (u'exists', 1), (u'expense', 1), (u'fast', 1), (u'faster', 1), (u'features', 3), (u'file', 6), (u'fileio', 2), (u'filename', 3), (u'files', 4), (u'flatMap', 1), (u'focus', 1), (u'for', 15), (u'form', 1), (u'from', 2), (u'full', 1), (u'func', 1), (u'functionality', 1), (u'fury', 1), (u'futures', 1), (u'generally', 1), (u'github', 3), (u'githubusercontent', 1), (u'gitter', 3), (u'gz', 2), (u'handle', 1), (u'have', 5), (u'having', 1), (u'hdfs', 4), (u'html', 3), (u'http', 7), (u'https', 11), (u'if', 1), (u'im', 3), (u'image', 4), (u'img', 1), (u'implementation', 2), (u'import', 1), (u'in', 11), (u'initialization', 1), (u'input', 1), (u'install', 2), (u'instantiation', 1), (u'instead', 1), (u'interface', 1), (u'io', 5), (u'ipynb', 2), (u'is', 12), (u'isalnum', 1), (u'iterable', 1), (u'join', 1), (u'just', 1), (u'lambda', 4), (u'learn', 1), (u'lightweight', 2), (u'like', 2), (u'limitations', 1), (u'line', 4), (u'list', 3), (u'load', 1), (u'loads', 1), (u'local', 2), (u'locations', 1), (u'log', 1), (u'logo', 2), (u'long', 1), (u'looks', 1), (u'map', 3), (u'master', 3), (u'medium', 1), (u'method', 1), (u'methods', 6), (u'module', 2), (u'more', 1), (u'most', 1), (u'multiple', 2), (u'multiprocessing', 4), (u'my', 1), (u'name', 1), (u'native', 1), (u'need', 1), (u'never', 1), (u'no', 1), (u'normalized', 1), (u'not', 2), (u'objective', 1), (u'objects', 1), (u'of', 8), (u'on', 3), (u'only', 1), (u'optional', 1), (u'or', 4), (u'org', 2), (u'other', 1), (u'overhead', 1), (u'pairs', 1), (u'parallel', 3), (u'parallelize', 1), (u'parallelizing', 1), (u'path', 2), (u'paths', 1), (u'per', 1), (u'perfect', 1), (u'pickle', 3), (u'pip', 2), (u'pipeline', 2), (u'png', 1), (u'pr', 1), (u'pre', 1), (u'preferably', 1), (u'preprocessing', 1), (u'primary', 1), (u'print', 1), (u'prints', 1), (u'process', 1), (u'processed', 1), (u'processes', 1), (u'processing', 2), (u'provided', 1), (u'py', 1), (u'pypi', 5), (u'pysparkling', 23), (u'python', 4), (u'raw', 1), (u'read', 1), (u'reading', 2), (u'reduceByKey', 1), (u'remove', 1), (u'replacement', 1), (u'requests', 1), (u'resilience', 1), (u'return', 1), (u'rst', 3), (u's', 2), (u's3', 4), (u'same', 1), (u'saveAsTextFile', 1), (u'scheme', 1), (u'scikit', 1), (u'separated', 1), (u'serializable', 1), (u'serialize', 1), (u'serialized', 1), (u'serializer', 3), (u'session', 1), (u'setup', 2), (u'shields', 1), (u'should', 1), (u'single', 1), (u'small', 2), (u'some', 3), (u'source', 1), (u'specify', 2), (u'split', 1), (u'starts', 1), (u'submodule', 1), (u'svenkreiss', 6), (u'svg', 3), (u'target', 4), (u'task', 1), (u'textFile', 3), (u'textfile', 1), (u'that', 3), (u'the', 17), (u'their', 1), (u'them', 1), (u'then', 1), (u'there', 1), (u'this', 4), (u'thread', 1), (u'to', 9), (u'train', 1), (u'trained', 1), (u'transparently', 1), (u'trivial', 3), (u'two', 1), (u'txt', 2), (u'use', 5), (u'used', 2), (u'using', 2), (u'usual', 1), (u'utm', 4), (u'v0', 3), (u'variables', 1), (u'via', 1), (u'w100', 1), (u'want', 2), (u'what', 1), (u'which', 3), (u'wildcards', 1), (u'with', 7), (u'word', 2), (u'words', 1), (u'write', 1), (u'writing', 2), (u'you', 6), (u'your', 1)]

Distributed Confusion Matrix


In [11]:
import numpy as np

predicted_true = Context().parallelize([
    (0, 0), (1, 0), (2, 2), (1, 1), (1, 2), (2, 2),
    (0, 2), (1, 0), (2, 1), (1, 1), (0, 0), (0, 0),
])

def cm_per_pair(p_t):
    cm = np.zeros((3, 3))
    cm[p_t[0], p_t[1]] = 1
    return cm

predicted_true.aggregate(
    zeroValue=np.zeros((3, 3)),
    seqOp=lambda prev, p_t: prev + cm_per_pair(p_t),
    combOp=np.add,
)


Out[11]:
array([[ 3.,  0.,  1.],
       [ 2.,  2.,  1.],
       [ 0.,  1.,  2.]])

Accessing Public Datasets on AWS S3

You need to have valid AWS account credentials in your environment under AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY for the next demos.

Common Crawl

More info on the dataset is in this blog post.


In [3]:
# read all the paths of warc and wat files of the latest Common Crawl
paths_rdd = Context().textFile(
    's3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2015-11/warc.paths.*,'
    's3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2015-11/wat.paths.gz'
)

print(paths_rdd.takeSample(5))


[u'common-crawl/crawl-data/CC-MAIN-2015-11/segments/1424936462555.21/warc/CC-MAIN-20150226074102-00158-ip-10-28-5-156.ec2.internal.warc.gz', u'common-crawl/crawl-data/CC-MAIN-2015-11/segments/1424936462982.10/warc/CC-MAIN-20150226074102-00095-ip-10-28-5-156.ec2.internal.warc.gz', u'common-crawl/crawl-data/CC-MAIN-2015-11/segments/1424936463181.39/warc/CC-MAIN-20150226074103-00261-ip-10-28-5-156.ec2.internal.warc.gz', u'common-crawl/crawl-data/CC-MAIN-2015-11/segments/1424936463956.95/wat/CC-MAIN-20150226074103-00277-ip-10-28-5-156.ec2.internal.warc.wat.gz', u'common-crawl/crawl-data/CC-MAIN-2015-11/segments/1424936465069.3/wat/CC-MAIN-20150226074105-00332-ip-10-28-5-156.ec2.internal.warc.wat.gz']

Human Biome Project


In [4]:
# access data from the Human Microbiome Project
by_subject_rdd = Context().textFile(
    's3n://human-microbiome-project/DEMO/HM16STR/46333/by_subject/*'
)
print(by_subject_rdd.takeSample(1))


[u'ACCAAGGCTTTGACGGGTAGCCGGCCTGAGAGGGTGACCGGCCACATTGGGACTGAGATA']

In [ ]: