In [38]:
from __future__ import print_function
from pyspark import SQLContext
import pyspark.sql.functions as sql
import pyspark.sql.types as types
In [39]:
sqlContext = SQLContext(sc)
In [40]:
idigbio_full = sqlContext.read.parquet("../2016spr/data/idigbio/occurrence.txt.parquet")
bhl_full = sqlContext.read.parquet("../guoda-datasets/BHL/data/bhl-20160516.parquet")
In [41]:
# This replaces the datasets with ones that are a small subset
#bhl = bhl_full.sample(fraction=0.01, withReplacement=False)
#idigbio = idigbio_full.sample(fraction=0.001, withReplacement=False)
bhl = sqlContext.createDataFrame(bhl_full.head(100))
idigbio = sqlContext.createDataFrame(idigbio_full.head(10000))
bhl.cache()
idigbio.cache()
print("defined")
From iDigBio data, make a list of unique identifiers to try to find in BHL. While there are lots of identifiers in fields, we're going to guess that the Darwin Core catalogNumber field would be the most likely to be found so only that field will be used to start. Later, other fields could be examined.
In [42]:
idigbio.registerTempTable("idbtable")
# Demonstrate using SQL syntax for working with Spark dataframes
ids = sqlContext.sql("""
SELECT
`http://rs.tdwg.org/dwc/terms/occurrenceID` as id
FROM idbtable WHERE
`http://rs.tdwg.org/dwc/terms/occurrenceID` != ''
GROUP BY
`http://rs.tdwg.org/dwc/terms/occurrenceID`
""")
In [43]:
# This triggers running the group by and takes a few seconds
print(ids.count())
With the list of ids, go find them in BHL OCR text. Final data structure should be a list of IDs with the added column containing a list of item IDs where the iDigBio id is found and perhaps a column for the count of the items IDs in the list which is probably what we'll end up sorting on.
The naive approach would be to take each document and check it for each id in turn. That would be about:
In [44]:
print(8000000*180000/1000000/60/60)
hours of work asuming we could perform 1M searches per seond. Instead, let's make a list of unique words in BHL which items those words come from. Then we can join that list to the list of catalog numbers and use aggregate functions to have our answer.
In [45]:
def unicode_split(s):
if s is not None:
return s.split()
else:
return []
unicode_split_udf = sql.udf(unicode_split, types.ArrayType(types.StringType()))
words = bhl.select(sql.explode(unicode_split_udf(bhl.ocrtext)).alias('word'), bhl.itemid)
In [46]:
print(words.head(20))
In [47]:
print(words.count())
In [48]:
# could also use df.distinct() if we didn't want the counts
words_summary = words.select(words.word, words.itemid).groupBy(words.word, words.itemid).count()
print(words_summary.head())
In [49]:
print(words_summary.count())
In [50]:
joined = ids.join(words_summary, ids.id == words_summary.word, 'inner')
print(joined.head(20))
In [51]:
print(joined.count())
This works but the output is suspicious. The few occurrenceID in the subset of items appear to all be simple numbers which we can't trust to be actual references to occurrence IDs.
But, the method works. Let's repackage this as an efficient job and see if we get better results at scale. Remember we only looked at 100 BHL texts and 10k catalogNumbers.
In [ ]: