MapCombine expects three functions:
- Initialization
- Map
- Reduce
The reduce is also used as thd combine.
At init, set up jobs for Charles Dickens' books. Return the jobs list and an empty base case.
For production jobs, this list would be read from meta-data or generated in a less verbose way.
In [1]:
def my_init(args, params, frame):
from copy import deepcopy
ans = {"words" : {}}
base = deepcopy(ans)
jobs = []
ans["fname"] = "/tmp/Dickens/TaleOfTwoCities.txt"
jobs.append(((0, 16271), params, args, deepcopy(ans)))
ans["fname"] = "/tmp/Dickens/ChristmasCarol.txt"
jobs.append(((0, 4236), params, args, deepcopy(ans)))
ans["fname"] = "/tmp/Dickens/HardTimes.txt"
jobs.append(((0, 12036), params, args, deepcopy(ans)))
ans["fname"] = "/tmp/Dickens/GreatExpectations.txt"
jobs.append(((0, 20415), params, args, deepcopy(ans)))
ans["fname"] = "/tmp/Dickens/DavidCopperfield.txt"
jobs.append(((0, 38588), params, args, deepcopy(ans)))
ans["fname"] = "/tmp/Dickens/BleakHouse.txt"
jobs.append(((0, 40234), params, args, deepcopy(ans)))
ans["fname"] = "/tmp/Dickens/PickwickPapers.txt"
jobs.append(((0, 36613), params, args, deepcopy(ans)))
ans["fname"] = "/tmp/Dickens/OliverTwist.txt"
jobs.append(((0, 19202), params, args, deepcopy(ans)))
return jobs, base
Map increments a counter for each word. The first time it is called, it opens a remote file with the glopen context manager. The last time it is called, it closes the context, which deletes the local copy of the file.
The funny __enter__() business is because glopen is a context manager.
In [2]:
import glopen
def my_map(pos, nelm_to_read, params, ans, last):
if "input_file" not in ans:
ans["glopen"] = glopen.glopen(ans["fname"], "r", endpoint="maxhutch#alpha-admin")
ans["input_file"] = ans["glopen"].__enter__()
for i in range(nelm_to_read):
line = ans["input_file"].readline()
for tok in line.split():
word = tok.strip('.,;:?!_/\\--"`')
if word in ans["words"]:
ans["words"][word] += 1
else:
ans["words"][word] = 1
if last and False:
ans["glopen"].__exit__(None, None, None)
del ans["glopen"]
del ans["input_file"]
return ans
Add the counts across the map outputs.
In [6]:
def my_reduce(whole, part):
for word in part["words"]:
if word in whole["words"]:
whole["words"][word] += part["words"][word]
else:
whole["words"][word] = part["words"][word]
return
Outside of an IPython Notebook, these would be set on the command line with argparse.
In [4]:
class Foo: pass
args = Foo()
args.MR_init = my_init
args.map = my_map
args.reduce = my_reduce
args.thread = 2
args.verbose = False
args.block = 1024
args.post = None
params = {}
jobs = [(args, params, 0),]
Here's the actual work. We can use any python map:
- map builtin
- IPython.Parallel.map
- multiprocessing.Pool.map
- multiprocessing.dummy.Pool.map
- dask.bag.map
In [5]:
from mapcombine import outer_process
stuff = map(outer_process, jobs)
for i, res in enumerate(stuff):
print("Charles Dickens wrote 'linen' {:d} times, but 'Rotherhithe' only {:d} times.".format(
res["words"]['linen'], res["words"]['Rotherhithe']))