In [1]:
import json

from pyspark.sql.functions import abs, col, explode, length, hash, udf, when, date_format
from pyspark.sql.types import *

In [2]:
idb_df = sqlContext.read.parquet("/guoda/data/idigbio-20190612T171757.parquet")

In [3]:
idb_dyn = (idb_df
               .filter(length(col("data.dwc:dynamicProperties")) > 2)
              .withColumn("props_str", col("data.dwc:dynamicProperties"))
              )

In [4]:
idb_dyn.count()


Out[4]:
10374395

In [5]:
idb_dyn.select(col("props_str")).show(10, truncate=False)


+---------------------------------------------+
|props_str                                    |
+---------------------------------------------+
|{"life stage":"adult"}                       |
|{"sex":"Female"}                             |
|{"measurements":"ecotone area" }             |
|{ "solr_long_lat": "-71.303074,44.061185" }  |
|{ "solr_long_lat": "-71.319924,44.041766" }  |
|{ "solr_long_lat": "-71.304611,44.059277" }  |
|{ "solr_long_lat": "-71.3000108,44.0602311" }|
|{ "solr_long_lat": "-71.297795,44.062155" }  |
|{"measurements":"SV 71, Tail 175" }          |
|{ "solr_long_lat": "-109.74341,34.91267" }   |
+---------------------------------------------+
only showing top 10 rows

Need a udf that returns a mapType of string:string that explode will expand


In [6]:
def json_to_map(s):
    """Convert a string containing JSON into a dictionary,
    Skip flattening for now."""
    try:
        return json.loads(s)
    except:
        return {}
    
json_to_map_udf = udf(json_to_map, MapType(StringType(), StringType()))

print(json_to_map('{ "solr_long_lat": "-5.87403,30.49728", "related_record_types": "PreservedSpecimen|PreservedSpecimen", "related_record_links": "YPM-IP-530950|YPM-IP-530951" }'))


{'solr_long_lat': '-5.87403,30.49728', 'related_record_types': 'PreservedSpecimen|PreservedSpecimen', 'related_record_links': 'YPM-IP-530950|YPM-IP-530951'}

In [7]:
idb_map = (idb_dyn
 .withColumn("props_map", json_to_map_udf(col("props_str")))
)
idb_map.select(col("props_map")).show(10, truncate=False)


+--------------------------------------------+
|props_map                                   |
+--------------------------------------------+
|Map(life stage -> adult)                    |
|Map(sex -> Female)                          |
|Map(measurements -> ecotone area)           |
|Map(solr_long_lat -> -71.303074,44.061185)  |
|Map(solr_long_lat -> -71.319924,44.041766)  |
|Map(solr_long_lat -> -71.304611,44.059277)  |
|Map(solr_long_lat -> -71.3000108,44.0602311)|
|Map(solr_long_lat -> -71.297795,44.062155)  |
|Map(measurements -> SV 71, Tail 175)        |
|Map(solr_long_lat -> -109.74341,34.91267)   |
+--------------------------------------------+
only showing top 10 rows


In [18]:
idb_triples = (idb_map
 .select(col("uuid"), 
         col("recordset"),
         col("institutioncode"),
         explode(col("props_map")).alias("key", "value"))
)
idb_triples.cache()


Out[18]:
DataFrame[uuid: string, recordset: string, institutioncode: string, key: string, value: string]

In [19]:
idb_triples.count()


Out[19]:
31170327

In [20]:
idb_triples.show(20, truncate=False)


+------------------------------------+------------------------------------+---------------+--------------------+-------------------------+
|uuid                                |recordset                           |institutioncode|key                 |value                    |
+------------------------------------+------------------------------------+---------------+--------------------+-------------------------+
|44c2a8fa-5a06-44de-83e8-d29146031cfe|271a9ce9-c6d3-4b63-a722-cb0adc48863f|mcz            |life stage          |adult                    |
|eb6c2256-73ae-4c52-95c2-cfb3292ed994|271a9ce9-c6d3-4b63-a722-cb0adc48863f|mcz            |sex                 |Female                   |
|abeb01a0-c691-4262-924c-667d74187021|637d0f2f-a0b4-4f33-a1ad-bd0ab18b620d|uaz            |measurements        |ecotone area             |
|626b8a8e-8109-421b-b09d-8725c4e6cd79|76015dea-c909-4e6d-a8e1-3bf35763571e|ypm            |solr_long_lat       |-71.303074,44.061185     |
|951adfcd-c4e4-4173-ab08-e3a54f516167|76015dea-c909-4e6d-a8e1-3bf35763571e|ypm            |solr_long_lat       |-71.319924,44.041766     |
|2c07f947-9c30-4f79-b849-56cc86443b2e|76015dea-c909-4e6d-a8e1-3bf35763571e|ypm            |solr_long_lat       |-71.304611,44.059277     |
|b1e9c3dc-2552-44ee-989d-8ed84e92c34b|76015dea-c909-4e6d-a8e1-3bf35763571e|ypm            |solr_long_lat       |-71.3000108,44.0602311   |
|c4f81e35-f665-4ab4-a310-d2e2d2dc9ae8|76015dea-c909-4e6d-a8e1-3bf35763571e|ypm            |solr_long_lat       |-71.297795,44.062155     |
|1d387871-2d55-42a4-bebe-cde0fb2800ae|637d0f2f-a0b4-4f33-a1ad-bd0ab18b620d|uaz            |measurements        |SV 71, Tail 175          |
|46c4adc7-171b-4813-a9b6-40182fc1e6d9|7ae4d15d-62e2-459b-842a-446f921b9d3f|ypm            |solr_long_lat       |-109.74341,34.91267      |
|1e16711f-c30f-4480-845c-79a154b1239d|7ae4d15d-62e2-459b-842a-446f921b9d3f|ypm            |solr_long_lat       |-109.74341,34.91267      |
|0a24b5f8-a80b-4d09-a487-02cb88c82966|7ae4d15d-62e2-459b-842a-446f921b9d3f|ypm            |solr_long_lat       |-109.74341,34.91267      |
|1efe00ae-64b5-4670-88d3-50a630cbc905|7ae4d15d-62e2-459b-842a-446f921b9d3f|ypm            |solr_long_lat       |-109.74341,34.91267      |
|3465b4cf-4824-4d93-a903-c9c414ed92f3|637d0f2f-a0b4-4f33-a1ad-bd0ab18b620d|uaz            |measurements        |Wt 23.2 g, SVL 130mm     |
|085f2ffa-1def-4243-a833-bd2447437112|637d0f2f-a0b4-4f33-a1ad-bd0ab18b620d|uaz            |measurements        |Mesquite grass           |
|72ee232f-c4d0-40c4-b8dc-fe6784d10bc5|637d0f2f-a0b4-4f33-a1ad-bd0ab18b620d|uaz            |measurements        |dited in CTM&5 C acc.Exp.|
|25106b40-bafd-464c-a367-2c534915da56|137ed4cd-5172-45a5-acdb-8e1de9a64e32|ypm            |related_record_links|YPM-IP-586385            |
|25106b40-bafd-464c-a367-2c534915da56|137ed4cd-5172-45a5-acdb-8e1de9a64e32|ypm            |related_record_types|PreservedSpecimen        |
|25106b40-bafd-464c-a367-2c534915da56|137ed4cd-5172-45a5-acdb-8e1de9a64e32|ypm            |mm_repository_id    |458813                   |
|25106b40-bafd-464c-a367-2c534915da56|137ed4cd-5172-45a5-acdb-8e1de9a64e32|ypm            |solr_long_lat       |-107.978086,39.729294    |
+------------------------------------+------------------------------------+---------------+--------------------+-------------------------+
only showing top 20 rows


In [11]:
(idb_triples
 .groupBy(col("key"))
 .count()
 .sort(col("count"), ascending=False)
 .limit(1000)
 ).toPandas()


Out[11]:
key count
0 recordtype 2946917
1 centroid 2946917
2 created 2946917
3 gbifid 2463714
4 determinations 2382962
5 subdepartment 2323969
6 gbifissue 1267532
7 registrationcode 1039765
8 donorname 905538
9 collectionkind 799900
10 solr_long_lat 794785
11 preservative 686046
12 kindofobject 663367
13 kindofcollection 533086
14 cultivated 481478
15 sex 433463
16 labellocality 373332
17 relationshipofresource 368639
18 relatedresourceid 368639
19 project 330669
20 chronostratigraphy 322012
21 vessel 315190
22 parttype 305961
23 weight 281773
24 age class 257038
25 mm_repository_id 249150
26 incubation 207787
27 lithostratigraphy 199759
28 gonads 172031
29 water depth 170991
... ... ...
177 photo 45
178 preparationtype 37
179 culmenLengthInMM 36
180 rightTarsusLengthInMM 36
181 disk length 31
182 carapace length 23
183 temperature experiment 22
184 abnormality 20
185 breeding remarks 13
186 crown-rump length 12
187 weightInoz 8
188 Date Collected 7
189 Site Number 5
190 nestshape 5
191 maxerror 4
192 nestsite 4
193 colors 3
194 weightInlbs 2
195 hindfootLengthInm 1
196 totalLengthInm 1
197 weightInlbs skinned 1
198 tailLengthInm 1
199 tailLengthInmmr 1
200 weightInlbs (skinned) 1
201 totalLengthInmmr 1
202 hindfootLengthInmmr 1
203 earLengthInm 1
204 earLengthInmmr 1
205 weightIn lbs 1
206 mutation 1

207 rows × 2 columns


In [21]:
(idb_triples
 .groupBy(col("institutioncode"))
 .count()
 .sort(col("count"), ascending=False)
 .limit(1000)
 ).toPandas()


Out[21]:
institutioncode count
0 nhmuk 25313796
1 ypm 1271259
2 mcz 1193852
3 lacm 569006
4 sio 560147
5 wfvz 404292
6 omnh 288517
7 uwbm 239439
8 ummz 236954
9 ku 207037
10 ttu 189813
11 crcm 178677
12 uwfc 127494
13 uaz 113835
14 usf 64481
15 tcwc 41201
16 but 40343
17 uconn 34038
18 os 32399
19 csuc 21728
20 usac 13358
21 hsu 9202
22 nmmnh 6000
23 nbmb 4924
24 uafmc 3280
25 bsns 3080
26 ucla 1655
27 unr 470
28 csu 38
29 uf 12

In [12]:
(idb_triples
 .filter(col("key") == "NSF_TCN")
 .count()
)


Out[12]:
77024

In [22]:
(idb_triples
 .filter(col("key") == "NSF_TCN")
 .groupBy(col("institutioncode"), col("value"))
 .count()
 .sort(col("count"), ascending=False)
 .limit(1000)
 ).toPandas()


Out[22]:
institutioncode value count
0 ypm WIS 61477
1 ypm FIC 15500
2 ypm FIC WIS 47

Now let's write this out and go back and join to the main DF for some summaries


In [ ]:
#(idb_triples
# .write
# .parquet("/tmp/idigbio-20171014T023306-json-triples.parquet")
#)

How much more information might we be able to find in records that records that are not JSON parsable?


In [ ]:
(idb_triples
 .select(length(col("key")).alias("len_key"))
 .avg(col("len_key"))
 .show()
)

In [ ]:
#joined = idb_dyn.join(idb_triples, idb_dyn["uuid"] == idb_triples["uuid"], "inner")

In [16]:
#joined.show(3, truncate=False)


---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-16-4ed20ba2f946> in <module>()
----> 1 joined.show(3, truncate=False)

/opt/spark/latest/python/pyspark/sql/dataframe.py in show(self, n, truncate)
    318             print(self._jdf.showString(n, 20))
    319         else:
--> 320             print(self._jdf.showString(n, int(truncate)))
    321 
    322     def __repr__(self):

/opt/spark/latest/python/lib/py4j-latest-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1129             proto.END_COMMAND_PART
   1130 
-> 1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
   1133             answer, self.gateway_client, self.target_id, self.name)

/opt/spark/latest/python/lib/py4j-latest-src.zip/py4j/java_gateway.py in send_command(self, command, retry, binary)
    881         connection = self._get_connection()
    882         try:
--> 883             response = connection.send_command(command)
    884             if binary:
    885                 return response, self._create_connection_guard(connection)

/opt/spark/latest/python/lib/py4j-latest-src.zip/py4j/java_gateway.py in send_command(self, command)
   1026 
   1027         try:
-> 1028             answer = smart_decode(self.stream.readline()[:-1])
   1029             logger.debug("Answer received: {0}".format(answer))
   1030             if answer.startswith(proto.RETURN_MESSAGE):

/usr/lib/python3.5/socket.py in readinto(self, b)
    573         while True:
    574             try:
--> 575                 return self._sock.recv_into(b)
    576             except timeout:
    577                 self._timeout_occurred = True

KeyboardInterrupt: 

In [ ]:
#joined.count()

Who provides the most additional information in valid JSON?

What is the ratio of JSON to non-JSON text? How much more do we have to work on?