In [1]:
%pylab inline
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
try:
from future.builtins import (bytes, str, open, super, range,
zip, round, input, int, pow, object)
except:
pass
# ---- Standard Libraries not included in pylab
import collections
import glob
import json
import random
import string
import time
from StringIO import StringIO
# ---- Scientific Libraries, the standard Python science stack
import numpy as np
import pandas as pd
import scipy as sp
import matplotlib as mpl
import matplotlib.pyplot as plt
# ---- PySpark Libraries
sc
# ---- Extra Libraries for additional functionality
Out[1]:
In [2]:
def merge_dicts(*dict_args):
""" Given any number of dictionaries, merges them into a comprehensive
dictionary. For example:
>>> from collections import OrderedDict
>>> x = {'a': 1, 'b': 2}
>>> y = {'b': 3, 'c': 4}
>>> z = OrderedDict(sorted(merge_dicts(x, y).items(), key=lambda i: i[0]))
>>> z
OrderedDict([(u'a', 1), (u'b', 5), (u'c', 4)])
"""
result = {}
for dictionary in dict_args:
for key, value in dictionary.items():
result[key] = result.get(key, 0) + value
return result
In [1]:
# How to profile a function in IPython
# import cProfile
# cProfile.runctx('my_function()', globals(), locals())
In [2]:
# -------1---------2---------3---------4---------5---------6---------7----
In [2]:
def get_strdict(the_str):
""" Given a string returns a dictionary of the count of each letter in
said string. For example: "cat" would return {'c': 1, 'a': 1, 't', 1}
This function uses Spark, sc and RDDs to compute.
>>> from collections import OrderedDict
>>> c = get_strdict("cat")
>>> OrderedDict(sorted(c.items(), key=lambda i: i[0]))
OrderedDict([(u'a', 1), (u'c', 1), (u't', 1)])
"""
result = {}
if len(the_str) < 10**2: # too small for an rdd
for char in the_str:
result[char] = result.get(char, 0) + 1
else:
if len(the_str) > 10**5:
divisor = int(len(the_str) / 10**5)
else:
divisor = 1
str_1 = []
for i in range(divisor):
start = int(len(the_str) / divisor) * i
end = int(len(the_str) / divisor) * (i + 1)
str_rdd = sc.parallelize(the_str[start:end])
str_1.append(str_rdd)
for rdd in str_1:
cnt_str = rdd.map(lambda w: (w, 1))\
.reduceByKey(lambda a, b: a + b)
result = merge_dicts(result, dict(cnt_str.collect()))
return result
In [4]:
import doctest
doctest.testmod()
Out[4]:
In [3]:
power = 6
attempts = 2
str_1 = string.digits * (10**power)
str_2 = str_1[::-1]
print("{:>10}{:>8}{:>13}{:>11}"\
.format("Attempt", "Result", "Size", "Run Time"))
print(" " + "=" * 39)
size = len(str_1)
for i in range(attempts):
start = time.time()
char_cnt1 = get_strdict(str_1)
char_cnt2 = get_strdict(str_2)
result = (char_cnt1 == char_cnt2)
print("{:>9}{:>9}{:>13}{:>11}"\
.format("{:02}".format(i + 1),
"True" if result else "False",
"{:,}".format(size),
(str(round(time.time() - start, 2))) + "s"))
In [4]:
def is_anagram(str_1, str_2):
str_1 = "".join(str_1.split(" "))
str_2 = "".join(str_2.split(" "))
size_1 = len(str_1)
size_2 = len(str_2)
if size_1 != size_2:
return False, -1
# str1 = sc.parallelize(str_1)
cnt_str1 = str1.map(lambda w: (w, 1))\
.reduceByKey(lambda a, b: a + b)
# str2 = sc.parallelize(str_2)
cnt_str2 = str2.map(lambda w: (w, 1))\
.reduceByKey(lambda a, b: a + b)
return cnt_str1.collect() == cnt_str2.collect(), size_1
In [5]:
power = 5.2
len(string.digits * int(10**power))
Out[5]:
In [5]:
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
# methods = list(dir(sc))
# for method in methods:
# if not method.startswith("_") :
# print(method),
sc.appName = "PySpark from IPython"
sc
rdd = sc.parallelize(
# words = []
# with open("../full-text", "r") as f:
# for line in f:
# words.append(line)
# from bs4 import BeautifulSoup as bs
# words = bs("".join(words))
# words = " ".join((words.get_text()).split("\n"))
# words = words * 120
# words = words.split()
# print(len(words))
# words = sc.parallelize(words)
# print(words.count())
# word_map = words.map(lambda w: (w, 1))
# word_count = word_map.reduceByKey(lambda a, b: a + b)
# counts_collect = word_count.collect()
# print(len(counts_collect))
# counts = sorted(counts_collect, key=lambda tup: tup[1], reverse=True)
# print(counts[:10])
Out[5]:
In [17]:
def is_anagram(str_1, str_2):
""" A pure python implementation of is_anagram. """
str_1 = "".join(str_1.split(" "))
str_2 = "".join(str_2.split(" "))
size_1 = len(str_1)
size_2 = len(str_2)
if size_1 != size_2:
return False, -1
letter_count_1 = {}
for letter in str_1:
letter_count_1[letter] = letter_count_1.get(letter, 0) + 1
letter_count_2 = {}
for letter in str_2:
letter_count_2[letter] = letter_count_2.get(letter, 0) + 1
return (letter_count_1 == letter_count_2), size_1
power = 6
attempts = 1
str_1 = string.ascii_letters * int(10**power)
str_2 = str_1[::-1]
print("{:>10}{:>8}{:>18}{:>11}"\
.format("Attempt", "Result", "Size", "Run Time"))
print(" " + "=" * 44)
for i in range(attempts):
start = time.time()
result, size = is_anagram(str_1, str_2)
print("{:>9}{:>9}{:>18}{:>11}"\
.format("{:02}".format(i + 1),
"True" if result else "False",
"{:,}".format(size),
(str(round(time.time() - start, 2))) + "s"))
In [46]:
import multiprocessing as mp
import numpy as np
import random
import sys
import time
power = 8
def worker():
return time.sleep(2)
start = time.time()
jobs = []
for i in range(30):
p = mp.Process(target=worker)
jobs.append(p)
p.start()
print(time.time() - start)
start = time.time()
for i in range(30):
worker()
print(time.time() - start)
# for p in range(3, power + 1):
# start = time.time()
# n = [random.randint(1, 26) for i in xrange(10**p)]
# # n = []
# # i = 0
# # while i < 10**p:
# # i += 1
# # n.append(random.randint(1, 26))
# print("{:,} {:0.2f}".format(len(n), time.time() - start))
# print(n[:10])
# print(sys.getsizeof(n))
# print("=" * 40)
# del(n)