Tuning of spark to make sure multiple cores are used.
In [1]:
from __future__ import print_function
import os
from pyspark import SQLContext
from pyspark.sql.functions import length
#from pyspark.sql.GroupedData import sum
In [2]:
# Load iDigBio Parquet
sqlContext = SQLContext(sc)
idbdf = sqlContext.read.parquet("../data/idigbio/occurrence.txt.parquet")
#idbdf = sqlContext.read.parquet("../data/idigbio-100k/occurrence.txt.parquet")
In [3]:
print(idbdf.count())
In [27]:
# This takes < 10 seconds
notes = idbdf.filter(idbdf['`http://rs.tdwg.org/dwc/terms/fieldNotes`'] != "")
print(notes.count())
lengths = notes.select(notes['`http://rs.tdwg.org/dwc/terms/country`'].alias('country'),
length(notes['`http://rs.tdwg.org/dwc/terms/fieldNotes`']).alias("len"))
lengths.groupby('country').sum('len').collect()[0]
Out[27]:
In [3]:
# This takes longer
# json decode http://rs.tdwg.org/dwc/terms/dynamicProperties
dynamics = idbdf.filter(idbdf['`http://rs.tdwg.org/dwc/terms/dynamicProperties`'] != "")
print(dynamics.count())
In [4]:
import json
def decode_json(s):
try:
return json.loads(s)
except:
return ""
#print(decode_json('{"name":"asdf","top":true}'))
from pyspark.sql.functions import udf
udf_decode_json = udf(decode_json)
decoded = dynamics.withColumn('decoded_properties',
udf_decode_json(dynamics['`http://rs.tdwg.org/dwc/terms/dynamicProperties`']))
#decoded.cache()
good_decodes = decoded.filter(decoded['decoded_properties'] != "")
good_decodes.count()
Out[4]: