In [1]:
import json
from pyspark.sql.functions import abs, col, explode, length, hash, udf, when, date_format
from pyspark.sql.types import *
In [2]:
idb_df = sqlContext.read.parquet("/guoda/data/idigbio-20190612T171757.parquet")
In [3]:
idb_dyn = (idb_df
.filter(length(col("data.dwc:dynamicProperties")) > 2)
.withColumn("props_str", col("data.dwc:dynamicProperties"))
)
In [4]:
idb_dyn.count()
Out[4]:
In [5]:
idb_dyn.select(col("props_str")).show(10, truncate=False)
Need a udf that returns a mapType of string:string that explode will expand
In [6]:
def json_to_map(s):
"""Convert a string containing JSON into a dictionary,
Skip flattening for now."""
try:
return json.loads(s)
except:
return {}
json_to_map_udf = udf(json_to_map, MapType(StringType(), StringType()))
print(json_to_map('{ "solr_long_lat": "-5.87403,30.49728", "related_record_types": "PreservedSpecimen|PreservedSpecimen", "related_record_links": "YPM-IP-530950|YPM-IP-530951" }'))
In [7]:
idb_map = (idb_dyn
.withColumn("props_map", json_to_map_udf(col("props_str")))
)
idb_map.select(col("props_map")).show(10, truncate=False)
In [18]:
idb_triples = (idb_map
.select(col("uuid"),
col("recordset"),
col("institutioncode"),
explode(col("props_map")).alias("key", "value"))
)
idb_triples.cache()
Out[18]:
In [19]:
idb_triples.count()
Out[19]:
In [20]:
idb_triples.show(20, truncate=False)
In [11]:
(idb_triples
.groupBy(col("key"))
.count()
.sort(col("count"), ascending=False)
.limit(1000)
).toPandas()
Out[11]:
In [21]:
(idb_triples
.groupBy(col("institutioncode"))
.count()
.sort(col("count"), ascending=False)
.limit(1000)
).toPandas()
Out[21]:
In [12]:
(idb_triples
.filter(col("key") == "NSF_TCN")
.count()
)
Out[12]:
In [22]:
(idb_triples
.filter(col("key") == "NSF_TCN")
.groupBy(col("institutioncode"), col("value"))
.count()
.sort(col("count"), ascending=False)
.limit(1000)
).toPandas()
Out[22]:
Now let's write this out and go back and join to the main DF for some summaries
In [ ]:
#(idb_triples
# .write
# .parquet("/tmp/idigbio-20171014T023306-json-triples.parquet")
#)
How much more information might we be able to find in records that records that are not JSON parsable?
In [ ]:
(idb_triples
.select(length(col("key")).alias("len_key"))
.avg(col("len_key"))
.show()
)
In [ ]:
#joined = idb_dyn.join(idb_triples, idb_dyn["uuid"] == idb_triples["uuid"], "inner")
In [16]:
#joined.show(3, truncate=False)
In [ ]:
#joined.count()
Who provides the most additional information in valid JSON?
What is the ratio of JSON to non-JSON text? How much more do we have to work on?