Tuning of spark to make sure multiple cores are used.


In [1]:
from __future__ import print_function
import os
from pyspark import SQLContext
from pyspark.sql.functions import length
#from pyspark.sql.GroupedData import sum

In [2]:
# Load iDigBio Parquet
sqlContext = SQLContext(sc)
idbdf = sqlContext.read.parquet("../data/idigbio/occurrence.txt.parquet")
#idbdf = sqlContext.read.parquet("../data/idigbio-100k/occurrence.txt.parquet")

In [3]:
print(idbdf.count())


15457224

In [27]:
# This takes < 10 seconds
notes = idbdf.filter(idbdf['`http://rs.tdwg.org/dwc/terms/fieldNotes`'] != "")
print(notes.count())
lengths = notes.select(notes['`http://rs.tdwg.org/dwc/terms/country`'].alias('country'), 
                       length(notes['`http://rs.tdwg.org/dwc/terms/fieldNotes`']).alias("len"))
lengths.groupby('country').sum('len').collect()[0]


593399
Out[27]:
Row(country=u'Cyprus', sum(len)=1304)

In [3]:
# This takes longer

# json decode http://rs.tdwg.org/dwc/terms/dynamicProperties
dynamics = idbdf.filter(idbdf['`http://rs.tdwg.org/dwc/terms/dynamicProperties`'] != "")
print(dynamics.count())


1430710

In [4]:
import json
def decode_json(s):
    try:
        return json.loads(s)
    except:
        return ""

#print(decode_json('{"name":"asdf","top":true}'))

from pyspark.sql.functions import udf
udf_decode_json = udf(decode_json)
decoded = dynamics.withColumn('decoded_properties', 
                    udf_decode_json(dynamics['`http://rs.tdwg.org/dwc/terms/dynamicProperties`']))
#decoded.cache()

good_decodes = decoded.filter(decoded['decoded_properties'] != "")
good_decodes.count()


Out[4]:
8