In [1]:
import pandas as pd
import numpy as np
import difflib
import sys
import os
from IPython.parallel import Client, require
from IPython.display import clear_output

In [2]:
@require('pandas', 'numpy', 'difflib')
def task(args):
    min_row, max_row, bucket, dbname = args
    pd = pandas
    np = numpy

    # open the database and pull out the bucket data
    store = pd.HDFStore(dbname, mode='r')
    df = store[bucket]

    # allocate a new array for the similarities
    similarities = np.zeros((max_row - min_row, len(df)))

    # print out progress
    print("{} ({}-{}/{})".format(bucket, min_row, max_row, len(df)))

    for idx, i in enumerate(range(min_row, max_row)):
        row1 = df.ix[i]
        name1 = "{}, {}".format(row1['family'], row1['given'])

        # go through each of the other names and calculate similarity
        for j in range(i, len(df)):
            if i == j:
                similarities[idx, j] = 1.0
                continue

            row2 = df.ix[j]
            name2 = "{}, {}".format(row2['family'], row2['given'])
            ratio = difflib.SequenceMatcher(None, name1, name2).ratio()
            similarities[idx, j] = ratio

    # close the database
    store.close()

    # we need to return the index and bucket label too so we
    # know how to aggregate the data when it's all done
    return min_row, max_row, bucket, similarities

In [3]:
# fire up the IPython parallel client
rc = Client()
lview = rc.load_balanced_view()

In [4]:
# allocate a list for our task objects, and a dictionary to hold 
# the similarity matrix for each bucket
tasks = []
similarities = {}

# open the database and iterate through all the buckets
dbname = os.path.abspath('citations.h5')
store = pd.HDFStore(dbname, mode='r')
bucket_group = store.get_node('buckets')

# walk through all the nodes in the database
for df in bucket_group._f_walknodes('Group'):
    # skip groups that don't have a leaf (data) attached to them
    if len(df._v_leaves) == 0:
        continue

    # get the full path in the database to the data
    bucket = df._v_pathname

    # print progress
    clear_output()
    print("Starting tasks for '{}'".format(bucket))
    sys.stdout.flush()

    # get the number of author names in this bucket
    n = len(store[bucket])
    num_chunks = np.ceil(n / 10)
    chunks = np.array_split(np.arange(n), num_chunks)

    # allocate an array for the similarities
    similarities[bucket] = np.empty((n, n))

    # start the parallel tasks -- one for each name
    for chunk in chunks:
        tasks.append(lview.apply(task, [chunk[0], chunk[-1] + 1, bucket, dbname]))

# close the database
store.close()


Starting tasks for '/buckets/_z'

In [ ]:
# loop through all the tasks and get their data
# when they finish
while len(tasks) > 0:
    # get the next task in the queue
    task = tasks[0]
    # wait until it's done
    task.wait()

    # print progress
    clear_output()
    task.display_outputs()
    sys.stdout.flush()

    # get it's data and save it into the similarity matrix
    min_row, max_row, bucket, sim = task.get()
    similarities[bucket][min_row:max_row] = sim

    # remove the task from the list
    tasks.pop(0)


/buckets/_c (70-80/4827)

In [ ]:
# save all the similarity matrices to disk
for key in similarities:
    letter = key.split("/")[-1]
    np.save("similarities/{}.npy".format(letter), similarities[key])