In [1]:
import pandas as pd
import numpy as np
import difflib
import sys
import os
from IPython.parallel import Client, require
from IPython.display import clear_output
In [2]:
@require('pandas', 'numpy', 'difflib')
def task(args):
min_row, max_row, bucket, dbname = args
pd = pandas
np = numpy
# open the database and pull out the bucket data
store = pd.HDFStore(dbname, mode='r')
df = store[bucket]
# allocate a new array for the similarities
similarities = np.zeros((max_row - min_row, len(df)))
# print out progress
print("{} ({}-{}/{})".format(bucket, min_row, max_row, len(df)))
for idx, i in enumerate(range(min_row, max_row)):
row1 = df.ix[i]
name1 = "{}, {}".format(row1['family'], row1['given'])
# go through each of the other names and calculate similarity
for j in range(i, len(df)):
if i == j:
similarities[idx, j] = 1.0
continue
row2 = df.ix[j]
name2 = "{}, {}".format(row2['family'], row2['given'])
ratio = difflib.SequenceMatcher(None, name1, name2).ratio()
similarities[idx, j] = ratio
# close the database
store.close()
# we need to return the index and bucket label too so we
# know how to aggregate the data when it's all done
return min_row, max_row, bucket, similarities
In [3]:
# fire up the IPython parallel client
rc = Client()
lview = rc.load_balanced_view()
In [4]:
# allocate a list for our task objects, and a dictionary to hold
# the similarity matrix for each bucket
tasks = []
similarities = {}
# open the database and iterate through all the buckets
dbname = os.path.abspath('citations.h5')
store = pd.HDFStore(dbname, mode='r')
bucket_group = store.get_node('buckets')
# walk through all the nodes in the database
for df in bucket_group._f_walknodes('Group'):
# skip groups that don't have a leaf (data) attached to them
if len(df._v_leaves) == 0:
continue
# get the full path in the database to the data
bucket = df._v_pathname
# print progress
clear_output()
print("Starting tasks for '{}'".format(bucket))
sys.stdout.flush()
# get the number of author names in this bucket
n = len(store[bucket])
num_chunks = np.ceil(n / 10)
chunks = np.array_split(np.arange(n), num_chunks)
# allocate an array for the similarities
similarities[bucket] = np.empty((n, n))
# start the parallel tasks -- one for each name
for chunk in chunks:
tasks.append(lview.apply(task, [chunk[0], chunk[-1] + 1, bucket, dbname]))
# close the database
store.close()
In [ ]:
# loop through all the tasks and get their data
# when they finish
while len(tasks) > 0:
# get the next task in the queue
task = tasks[0]
# wait until it's done
task.wait()
# print progress
clear_output()
task.display_outputs()
sys.stdout.flush()
# get it's data and save it into the similarity matrix
min_row, max_row, bucket, sim = task.get()
similarities[bucket][min_row:max_row] = sim
# remove the task from the list
tasks.pop(0)
In [ ]:
# save all the similarity matrices to disk
for key in similarities:
letter = key.split("/")[-1]
np.save("similarities/{}.npy".format(letter), similarities[key])