In [1]:
%pylab inline

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

try:
    from future.builtins import (bytes, str, open, super, range,
                                 zip, round, input, int, pow, object)
except:
    pass

# ---- Standard Libraries not included in pylab
import collections
import glob
import json
import random
import string
import time
from StringIO import StringIO

# ---- Scientific Libraries, the standard Python science stack
import numpy as np
import pandas as pd
import scipy as sp
import matplotlib as mpl
import matplotlib.pyplot as plt

# ---- PySpark Libraries
sc

# ---- Extra Libraries for additional functionality


Populating the interactive namespace from numpy and matplotlib
Out[1]:
<pyspark.context.SparkContext at 0x7f09ff2200d0>

In [2]:
def merge_dicts(*dict_args):
    """ Given any number of dictionaries, merges them into a comprehensive 
    dictionary.  For example:
    
    >>> from collections import OrderedDict
    >>> x = {'a': 1, 'b': 2}
    >>> y = {'b': 3, 'c': 4}
    >>> z = OrderedDict(sorted(merge_dicts(x, y).items(), key=lambda i: i[0]))
    >>> z
    OrderedDict([(u'a', 1), (u'b', 5), (u'c', 4)])
    
    """
    result = {}
    
    for dictionary in dict_args:
        for key, value in dictionary.items():
            result[key] = result.get(key, 0) + value
        
    return result

In [1]:
# How to profile a function in IPython

# import cProfile
# cProfile.runctx('my_function()', globals(), locals())

In [2]:
# -------1---------2---------3---------4---------5---------6---------7----

In [2]:
def get_strdict(the_str):
    """ Given a string returns a dictionary of the count of each letter in 
    said string.  For example: "cat" would return {'c': 1, 'a': 1, 't', 1}
    
    This function uses Spark, sc and RDDs to compute.
    
    >>> from collections import OrderedDict
    >>> c = get_strdict("cat")
    >>> OrderedDict(sorted(c.items(), key=lambda i: i[0]))
    OrderedDict([(u'a', 1), (u'c', 1), (u't', 1)])
        
    """
    result = {}
    if len(the_str) < 10**2: # too small for an rdd
        for char in the_str:
            result[char] = result.get(char, 0) + 1
    
    else:
        if len(the_str) > 10**5:
            divisor = int(len(the_str) / 10**5)
        else:
            divisor = 1

        str_1 = []
        for i in range(divisor):
            start = int(len(the_str) / divisor) * i
            end = int(len(the_str) / divisor) * (i + 1)

            str_rdd = sc.parallelize(the_str[start:end])
            str_1.append(str_rdd)

        for rdd in str_1:
            cnt_str = rdd.map(lambda w: (w, 1))\
                         .reduceByKey(lambda a, b: a + b)
            result = merge_dicts(result, dict(cnt_str.collect()))
    
    return result

In [4]:
import doctest
doctest.testmod()


Out[4]:
TestResults(failed=0, attempted=8)

In [3]:
power = 6
attempts = 2

str_1 = string.digits * (10**power)
str_2 = str_1[::-1]

print("{:>10}{:>8}{:>13}{:>11}"\
      .format("Attempt", "Result", "Size", "Run Time"))
print("   " + "=" * 39)

size = len(str_1)
for i in range(attempts):
    start = time.time()
    char_cnt1 = get_strdict(str_1)
    char_cnt2 = get_strdict(str_2)
    result = (char_cnt1 == char_cnt2)
    print("{:>9}{:>9}{:>13}{:>11}"\
          .format("{:02}".format(i + 1), 
                  "True" if result else "False",
                  "{:,}".format(size),
                  (str(round(time.time() - start, 2))) + "s"))


   Attempt  Result         Size   Run Time
   =======================================
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-3-061a67f9ed9b> in <module>()
     11 for i in range(attempts):
     12     start = time.time()
---> 13     char_cnt1 = get_strdict(str_1)
     14     char_cnt2 = get_strdict(str_2)
     15     result = (char_cnt1 == char_cnt2)

<ipython-input-2-3ce90bb3fe0a> in get_strdict(the_str)
     32         for rdd in str_1:
     33             cnt_str = rdd.map(lambda w: (w, 1))                         .reduceByKey(lambda a, b: a + b)
---> 34             result = merge_dicts(result, dict(cnt_str.collect()))
     35 
     36     return result

NameError: global name 'merge_dicts' is not defined

In [4]:
def is_anagram(str_1, str_2):
    str_1 = "".join(str_1.split(" "))
    str_2 = "".join(str_2.split(" "))
    size_1 = len(str_1)
    size_2 = len(str_2)
    
    if size_1 != size_2:
        return False, -1
    
    # str1 = sc.parallelize(str_1)
    cnt_str1 = str1.map(lambda w: (w, 1))\
                   .reduceByKey(lambda a, b: a + b)
    
    # str2 = sc.parallelize(str_2)
    cnt_str2 = str2.map(lambda w: (w, 1))\
                   .reduceByKey(lambda a, b: a + b)
    
    return cnt_str1.collect() == cnt_str2.collect(), size_1

In [5]:
power = 5.2
len(string.digits * int(10**power))


Out[5]:
1584890

In [5]:
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext

# methods = list(dir(sc))
# for method in methods:
#     if not method.startswith("_") :
#         print(method),

sc.appName = "PySpark from IPython"
sc

rdd = sc.parallelize(
# words = []
# with open("../full-text", "r") as f:
#     for line in f:
#         words.append(line)

# from bs4 import BeautifulSoup as bs
# words = bs("".join(words))
# words = " ".join((words.get_text()).split("\n"))
# words = words * 120
# words = words.split()
# print(len(words))

# words = sc.parallelize(words)
# print(words.count())
# word_map = words.map(lambda w: (w, 1))
# word_count = word_map.reduceByKey(lambda a, b: a + b)
# counts_collect = word_count.collect()
# print(len(counts_collect))
# counts = sorted(counts_collect, key=lambda tup: tup[1], reverse=True)
# print(counts[:10])


Out[5]:
<pyspark.context.SparkContext at 0x7f09ff2200d0>

In [17]:
def is_anagram(str_1, str_2):
    """ A pure python implementation of is_anagram. """
    str_1 = "".join(str_1.split(" "))
    str_2 = "".join(str_2.split(" "))
    size_1 = len(str_1)
    size_2 = len(str_2)
    
    if size_1 != size_2:
        return False, -1
    
    letter_count_1 = {}
    for letter in str_1:
        letter_count_1[letter] = letter_count_1.get(letter, 0) + 1
    
    letter_count_2 = {}
    for letter in str_2:
        letter_count_2[letter] = letter_count_2.get(letter, 0) + 1
            
    return (letter_count_1 == letter_count_2), size_1


power = 6
attempts = 1
str_1 = string.ascii_letters * int(10**power)
str_2 = str_1[::-1]
print("{:>10}{:>8}{:>18}{:>11}"\
      .format("Attempt", "Result", "Size", "Run Time"))
print("   " + "=" * 44)

for i in range(attempts):
    start = time.time()
    result, size = is_anagram(str_1, str_2)
    print("{:>9}{:>9}{:>18}{:>11}"\
          .format("{:02}".format(i + 1), 
                  "True" if result else "False",
                  "{:,}".format(size),
                  (str(round(time.time() - start, 2))) + "s"))


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-17-d5dcf59a747f> in <module>()
     22 power = 6
     23 attempts = 1
---> 24 str_1 = string.ascii_letters * int(10**power)
     25 str_2 = str_1[::-1]
     26 print("{:>10}{:>8}{:>18}{:>11}"      .format("Attempt", "Result", "Size", "Run Time"))

NameError: name 'string' is not defined

In [46]:
import multiprocessing as mp
import numpy as np
import random
import sys
import time

power = 8

def worker():
    return time.sleep(2)

start = time.time()
jobs = []
for i in range(30):
    p = mp.Process(target=worker)
    jobs.append(p)
    p.start()
print(time.time() - start)

start = time.time()
for i in range(30):
    worker()
print(time.time() - start)
    

# for p in range(3, power + 1):
#     start = time.time()

#     n = [random.randint(1, 26) for i in xrange(10**p)]
    
# #     n = []
# #     i = 0
# #     while i < 10**p:
# #         i += 1
# #         n.append(random.randint(1, 26))

#     print("{:,}     {:0.2f}".format(len(n), time.time() - start))
#     print(n[:10])
#     print(sys.getsizeof(n))
#     print("=" * 40)
    
#     del(n)


0.804926156998
60.0430719852