In [38]:
from __future__ import print_function

from pyspark import SQLContext
import pyspark.sql.functions as sql
import pyspark.sql.types as types

In [39]:
sqlContext = SQLContext(sc)

Loading Data

The datasets below are on local disk and are in some scratch directories. Final datasets will be in HDFS


In [40]:
idigbio_full = sqlContext.read.parquet("../2016spr/data/idigbio/occurrence.txt.parquet")
bhl_full = sqlContext.read.parquet("../guoda-datasets/BHL/data/bhl-20160516.parquet")

In [41]:
# This replaces the datasets with ones that are a small subset
#bhl = bhl_full.sample(fraction=0.01, withReplacement=False)
#idigbio = idigbio_full.sample(fraction=0.001, withReplacement=False)

bhl = sqlContext.createDataFrame(bhl_full.head(100))
idigbio = sqlContext.createDataFrame(idigbio_full.head(10000))
bhl.cache()
idigbio.cache()
print("defined")


defined

Building a list of IDs

From iDigBio data, make a list of unique identifiers to try to find in BHL. While there are lots of identifiers in fields, we're going to guess that the Darwin Core catalogNumber field would be the most likely to be found so only that field will be used to start. Later, other fields could be examined.


In [42]:
idigbio.registerTempTable("idbtable")
# Demonstrate using SQL syntax for working with Spark dataframes
ids = sqlContext.sql("""
                       SELECT
                        `http://rs.tdwg.org/dwc/terms/occurrenceID` as id
                       FROM idbtable WHERE 
                        `http://rs.tdwg.org/dwc/terms/occurrenceID` != ''
                       GROUP BY
                        `http://rs.tdwg.org/dwc/terms/occurrenceID`
                    """)

In [43]:
# This triggers running the group by and takes a few seconds
print(ids.count())


9872

Search for those IDs

With the list of ids, go find them in BHL OCR text. Final data structure should be a list of IDs with the added column containing a list of item IDs where the iDigBio id is found and perhaps a column for the count of the items IDs in the list which is probably what we'll end up sorting on.

The naive approach would be to take each document and check it for each id in turn. That would be about:


In [44]:
print(8000000*180000/1000000/60/60)


400

hours of work asuming we could perform 1M searches per seond. Instead, let's make a list of unique words in BHL which items those words come from. Then we can join that list to the list of catalog numbers and use aggregate functions to have our answer.

Tokenizing BHL into words

To do that, first we need to tokenize each document and for that we need a tokenizer. We'll just use spliting on whitespace as something simple.

This builds a list of word & item ID combinations


In [45]:
def unicode_split(s):
    if s is not None:
        return s.split()
    else:
        return []
unicode_split_udf = sql.udf(unicode_split, types.ArrayType(types.StringType()))

words = bhl.select(sql.explode(unicode_split_udf(bhl.ocrtext)).alias('word'), bhl.itemid)

In [46]:
print(words.head(20))


[Row(word=u'Historic,', itemid=152051), Row(word=u'archived', itemid=152051), Row(word=u'document', itemid=152051), Row(word=u'Do', itemid=152051), Row(word=u'not', itemid=152051), Row(word=u'assume', itemid=152051), Row(word=u'content', itemid=152051), Row(word=u'reflects', itemid=152051), Row(word=u'current', itemid=152051), Row(word=u'scientific', itemid=152051), Row(word=u'knowledge,', itemid=152051), Row(word=u'policies,', itemid=152051), Row(word=u'or', itemid=152051), Row(word=u'practices.', itemid=152051), Row(word=u'THE', itemid=152051), Row(word=u'BEST', itemid=152051), Row(word=u'OTTHE', itemid=152051), Row(word=u'OLD,', itemid=152051), Row(word=u'NEW', itemid=152051), Row(word=u'AND', itemid=152051)]

In [47]:
print(words.count())


2196538

Summarize words

words right now is all the words in the item but wer really only need to keep unique words since we're looking for specific terms. We can run distinct on this list.


In [48]:
# could also use df.distinct() if we didn't want the counts
words_summary = words.select(words.word, words.itemid).groupBy(words.word, words.itemid).count()
print(words_summary.head())


Row(word=u'Mail', itemid=152051, count=1)

In [49]:
print(words_summary.count())


521905

Now we have a list of unique words in each item and a list of ids we're looking for. Let's join them and see if we have any exact matches.


In [50]:
joined = ids.join(words_summary, ids.id == words_summary.word, 'inner')
print(joined.head(20))


[Row(id=u'339', word=u'339', itemid=152065, count=1), Row(id=u'52390', word=u'52390', itemid=152112, count=1), Row(id=u'52390', word=u'52390', itemid=152131, count=1), Row(id=u'52390', word=u'52390', itemid=152135, count=1), Row(id=u'10363', word=u'10363', itemid=152112, count=1), Row(id=u'11920', word=u'11920', itemid=152135, count=1)]

In [51]:
print(joined.count())


6

Conclusions

This works but the output is suspicious. The few occurrenceID in the subset of items appear to all be simple numbers which we can't trust to be actual references to occurrence IDs.

But, the method works. Let's repackage this as an efficient job and see if we get better results at scale. Remember we only looked at 100 BHL texts and 10k catalogNumbers.


In [ ]: