A very quick example of pulling out unique values from one field and writing them as a CSV to a local file.


In [11]:
from pyspark.sql.functions import col

In [24]:
# You can add "-100k" or "-1M" before the "." to load smaller data sets for testing
idb_df = sqlContext.read.parquet("/guoda/data/idigbio-20190612T171757.parquet")

In [13]:
idb_df.count()


Out[13]:
115173471

In [14]:
idb_df.printSchema()


root
 |-- barcodevalue: string (nullable = true)
 |-- basisofrecord: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- canonicalname: string (nullable = true)
 |-- catalognumber: string (nullable = true)
 |-- class: string (nullable = true)
 |-- collectioncode: string (nullable = true)
 |-- collectionid: string (nullable = true)
 |-- collectionname: string (nullable = true)
 |-- collector: string (nullable = true)
 |-- commonname: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- coordinateuncertainty: float (nullable = true)
 |-- country: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- county: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- coreid: string (nullable = true)
 |    |-- dc:rights: string (nullable = true)
 |    |-- dcterms:accessRights: string (nullable = true)
 |    |-- dcterms:bibliographicCitation: string (nullable = true)
 |    |-- dcterms:language: string (nullable = true)
 |    |-- dcterms:license: string (nullable = true)
 |    |-- dcterms:modified: string (nullable = true)
 |    |-- dcterms:references: string (nullable = true)
 |    |-- dcterms:rights: string (nullable = true)
 |    |-- dcterms:rightsHolder: string (nullable = true)
 |    |-- dcterms:source: string (nullable = true)
 |    |-- dcterms:type: string (nullable = true)
 |    |-- dwc:VerbatimEventDate: string (nullable = true)
 |    |-- dwc:acceptedNameUsage: string (nullable = true)
 |    |-- dwc:acceptedNameUsageID: string (nullable = true)
 |    |-- dwc:accessRights: string (nullable = true)
 |    |-- dwc:associatedMedia: string (nullable = true)
 |    |-- dwc:associatedOccurrences: string (nullable = true)
 |    |-- dwc:associatedOrganisms: string (nullable = true)
 |    |-- dwc:associatedReferences: string (nullable = true)
 |    |-- dwc:associatedSequences: string (nullable = true)
 |    |-- dwc:associatedTaxa: string (nullable = true)
 |    |-- dwc:basisOfRecord: string (nullable = true)
 |    |-- dwc:bed: string (nullable = true)
 |    |-- dwc:behavior: string (nullable = true)
 |    |-- dwc:catalogNumber: string (nullable = true)
 |    |-- dwc:class: string (nullable = true)
 |    |-- dwc:classs: string (nullable = true)
 |    |-- dwc:collectionCode: string (nullable = true)
 |    |-- dwc:collectionID: string (nullable = true)
 |    |-- dwc:continent: string (nullable = true)
 |    |-- dwc:coordinatePrecision: string (nullable = true)
 |    |-- dwc:coordinateUncertaintyInMeters: string (nullable = true)
 |    |-- dwc:country: string (nullable = true)
 |    |-- dwc:countryCode: string (nullable = true)
 |    |-- dwc:county: string (nullable = true)
 |    |-- dwc:dataGeneralizations: string (nullable = true)
 |    |-- dwc:datasetID: string (nullable = true)
 |    |-- dwc:datasetName: string (nullable = true)
 |    |-- dwc:dateIdentified: string (nullable = true)
 |    |-- dwc:day: string (nullable = true)
 |    |-- dwc:decimalLatitude: string (nullable = true)
 |    |-- dwc:decimalLongitude: string (nullable = true)
 |    |-- dwc:disposition: string (nullable = true)
 |    |-- dwc:dynamicProperties: string (nullable = true)
 |    |-- dwc:earliestAgeOrLowestStage: string (nullable = true)
 |    |-- dwc:earliestEonOrLowestEonothem: string (nullable = true)
 |    |-- dwc:earliestEpochOrLowestSeries: string (nullable = true)
 |    |-- dwc:earliestEraOrLowestErathem: string (nullable = true)
 |    |-- dwc:earliestPeriodOrLowestSystem: string (nullable = true)
 |    |-- dwc:endDayOfYear: string (nullable = true)
 |    |-- dwc:establishmentMeans: string (nullable = true)
 |    |-- dwc:eventDate: string (nullable = true)
 |    |-- dwc:eventID: string (nullable = true)
 |    |-- dwc:eventRemarks: string (nullable = true)
 |    |-- dwc:eventTime: string (nullable = true)
 |    |-- dwc:family: string (nullable = true)
 |    |-- dwc:fieldNotes: string (nullable = true)
 |    |-- dwc:fieldNumber: string (nullable = true)
 |    |-- dwc:footprintSRS: string (nullable = true)
 |    |-- dwc:footprintSpatialFit: string (nullable = true)
 |    |-- dwc:footprintWKT: string (nullable = true)
 |    |-- dwc:formation: string (nullable = true)
 |    |-- dwc:genus: string (nullable = true)
 |    |-- dwc:geodeticDatum: string (nullable = true)
 |    |-- dwc:geologicalContextID: string (nullable = true)
 |    |-- dwc:georeferenceProtocol: string (nullable = true)
 |    |-- dwc:georeferenceRemarks: string (nullable = true)
 |    |-- dwc:georeferenceSources: string (nullable = true)
 |    |-- dwc:georeferenceVerificationStatus: string (nullable = true)
 |    |-- dwc:georeferencedBy: string (nullable = true)
 |    |-- dwc:georeferencedDate: string (nullable = true)
 |    |-- dwc:group: string (nullable = true)
 |    |-- dwc:habitat: string (nullable = true)
 |    |-- dwc:higherClassification: string (nullable = true)
 |    |-- dwc:higherGeography: string (nullable = true)
 |    |-- dwc:higherGeographyID: string (nullable = true)
 |    |-- dwc:highestBiostratigraphicZone: string (nullable = true)
 |    |-- dwc:identificationID: string (nullable = true)
 |    |-- dwc:identificationQualifier: string (nullable = true)
 |    |-- dwc:identificationReferences: string (nullable = true)
 |    |-- dwc:identificationRemarks: string (nullable = true)
 |    |-- dwc:identificationVerificationStatus: string (nullable = true)
 |    |-- dwc:identifiedBy: string (nullable = true)
 |    |-- dwc:individualCount: string (nullable = true)
 |    |-- dwc:informationWithheld: string (nullable = true)
 |    |-- dwc:infraspecificEpithet: string (nullable = true)
 |    |-- dwc:institutionCode: string (nullable = true)
 |    |-- dwc:institutionID: string (nullable = true)
 |    |-- dwc:island: string (nullable = true)
 |    |-- dwc:islandGroup: string (nullable = true)
 |    |-- dwc:kingdom: string (nullable = true)
 |    |-- dwc:language: string (nullable = true)
 |    |-- dwc:latestAgeOrHighestStage: string (nullable = true)
 |    |-- dwc:latestEonOrHighestEonothem: string (nullable = true)
 |    |-- dwc:latestEpochOrHighestSeries: string (nullable = true)
 |    |-- dwc:latestEraOrHighestErathem: string (nullable = true)
 |    |-- dwc:latestPeriodOrHighestSystem: string (nullable = true)
 |    |-- dwc:lifeStage: string (nullable = true)
 |    |-- dwc:lithostratigraphicTerms: string (nullable = true)
 |    |-- dwc:locality: string (nullable = true)
 |    |-- dwc:locationAccordingTo: string (nullable = true)
 |    |-- dwc:locationID: string (nullable = true)
 |    |-- dwc:locationRemarks: string (nullable = true)
 |    |-- dwc:lowestBiostratigraphicZone: string (nullable = true)
 |    |-- dwc:materialSampleID: string (nullable = true)
 |    |-- dwc:maximumDepthInMeters: string (nullable = true)
 |    |-- dwc:maximumElevationInMeters: string (nullable = true)
 |    |-- dwc:member: string (nullable = true)
 |    |-- dwc:minimumDepthInMeters: string (nullable = true)
 |    |-- dwc:minimumElevationInMeters: string (nullable = true)
 |    |-- dwc:modified: string (nullable = true)
 |    |-- dwc:month: string (nullable = true)
 |    |-- dwc:municipality: string (nullable = true)
 |    |-- dwc:nameAccordingTo: string (nullable = true)
 |    |-- dwc:namePublishedIn: string (nullable = true)
 |    |-- dwc:namePublishedInID: string (nullable = true)
 |    |-- dwc:namePublishedInYear: string (nullable = true)
 |    |-- dwc:nomenclaturalCode: string (nullable = true)
 |    |-- dwc:nomenclaturalStatus: string (nullable = true)
 |    |-- dwc:occurrenceDetails: string (nullable = true)
 |    |-- dwc:occurrenceID: string (nullable = true)
 |    |-- dwc:occurrenceRemarks: string (nullable = true)
 |    |-- dwc:occurrenceStatus: string (nullable = true)
 |    |-- dwc:order: string (nullable = true)
 |    |-- dwc:organismID: string (nullable = true)
 |    |-- dwc:organismName: string (nullable = true)
 |    |-- dwc:organismQuantity: string (nullable = true)
 |    |-- dwc:organismQuantityType: string (nullable = true)
 |    |-- dwc:originalNameUsage: string (nullable = true)
 |    |-- dwc:originalNameUsageID: string (nullable = true)
 |    |-- dwc:otherCatalogNumbers: string (nullable = true)
 |    |-- dwc:ownerInstitutionCode: string (nullable = true)
 |    |-- dwc:parentNameUsage: string (nullable = true)
 |    |-- dwc:phylum: string (nullable = true)
 |    |-- dwc:pointRadiusSpatialFit: string (nullable = true)
 |    |-- dwc:preparations: string (nullable = true)
 |    |-- dwc:previousIdentifications: string (nullable = true)
 |    |-- dwc:recordNumber: string (nullable = true)
 |    |-- dwc:recordedBy: string (nullable = true)
 |    |-- dwc:reproductiveCondition: string (nullable = true)
 |    |-- dwc:rights: string (nullable = true)
 |    |-- dwc:rightsHolder: string (nullable = true)
 |    |-- dwc:sampleSizeValue: string (nullable = true)
 |    |-- dwc:samplingEffort: string (nullable = true)
 |    |-- dwc:samplingProtocol: string (nullable = true)
 |    |-- dwc:scientificName: string (nullable = true)
 |    |-- dwc:scientificNameAuthorship: string (nullable = true)
 |    |-- dwc:scientificNameID: string (nullable = true)
 |    |-- dwc:sex: string (nullable = true)
 |    |-- dwc:specificEpithet: string (nullable = true)
 |    |-- dwc:startDayOfYear: string (nullable = true)
 |    |-- dwc:stateProvince: string (nullable = true)
 |    |-- dwc:subgenus: string (nullable = true)
 |    |-- dwc:taxonID: string (nullable = true)
 |    |-- dwc:taxonRank: string (nullable = true)
 |    |-- dwc:taxonRemarks: string (nullable = true)
 |    |-- dwc:taxonomicStatus: string (nullable = true)
 |    |-- dwc:typeStatus: string (nullable = true)
 |    |-- dwc:verbatimCoordinateSystem: string (nullable = true)
 |    |-- dwc:verbatimCoordinates: string (nullable = true)
 |    |-- dwc:verbatimDepth: string (nullable = true)
 |    |-- dwc:verbatimElevation: string (nullable = true)
 |    |-- dwc:verbatimEventDate: string (nullable = true)
 |    |-- dwc:verbatimLatitude: string (nullable = true)
 |    |-- dwc:verbatimLocality: string (nullable = true)
 |    |-- dwc:verbatimLongitude: string (nullable = true)
 |    |-- dwc:verbatimSRS: string (nullable = true)
 |    |-- dwc:verbatimTaxonRank: string (nullable = true)
 |    |-- dwc:vernacularName: string (nullable = true)
 |    |-- dwc:waterBody: string (nullable = true)
 |    |-- dwc:year: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- idigbio:recordId: string (nullable = true)
 |    |-- symbiota:recordEnteredBy: string (nullable = true)
 |    |-- symbiota:verbatimScientificName: string (nullable = true)
 |-- datasetid: string (nullable = true)
 |-- datecollected: timestamp (nullable = true)
 |-- datemodified: timestamp (nullable = true)
 |-- dqs: float (nullable = true)
 |-- earliestageorloweststage: string (nullable = true)
 |-- earliesteonorlowesteonothem: string (nullable = true)
 |-- earliestepochorlowestseries: string (nullable = true)
 |-- earliesteraorlowesterathem: string (nullable = true)
 |-- earliestperiodorlowestsystem: string (nullable = true)
 |-- etag: string (nullable = true)
 |-- eventdate: string (nullable = true)
 |-- family: string (nullable = true)
 |-- fieldnumber: string (nullable = true)
 |-- formation: string (nullable = true)
 |-- genus: string (nullable = true)
 |-- geologicalcontextid: string (nullable = true)
 |-- geopoint: struct (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- lon: double (nullable = true)
 |-- group: string (nullable = true)
 |-- hasImage: boolean (nullable = true)
 |-- hasMedia: boolean (nullable = true)
 |-- highertaxon: string (nullable = true)
 |-- highestbiostratigraphiczone: string (nullable = true)
 |-- individualcount: float (nullable = true)
 |-- infraspecificepithet: string (nullable = true)
 |-- institutioncode: string (nullable = true)
 |-- institutionid: string (nullable = true)
 |-- institutionname: string (nullable = true)
 |-- kingdom: string (nullable = true)
 |-- latestageorhigheststage: string (nullable = true)
 |-- latesteonorhighesteonothem: string (nullable = true)
 |-- latestepochorhighestseries: string (nullable = true)
 |-- latesteraorhighesterathem: string (nullable = true)
 |-- latestperiodorhighestsystem: string (nullable = true)
 |-- lithostratigraphicterms: string (nullable = true)
 |-- locality: string (nullable = true)
 |-- lowestbiostratigraphiczone: string (nullable = true)
 |-- maxdepth: float (nullable = true)
 |-- maxelevation: float (nullable = true)
 |-- member: string (nullable = true)
 |-- mindepth: float (nullable = true)
 |-- minelevation: float (nullable = true)
 |-- municipality: string (nullable = true)
 |-- occurrenceid: string (nullable = true)
 |-- order: string (nullable = true)
 |-- phylum: string (nullable = true)
 |-- recordnumber: string (nullable = true)
 |-- recordset: string (nullable = true)
 |-- scientificname: string (nullable = true)
 |-- specificepithet: string (nullable = true)
 |-- startdayofyear: integer (nullable = true)
 |-- stateprovince: string (nullable = true)
 |-- taxonid: string (nullable = true)
 |-- taxonomicstatus: string (nullable = true)
 |-- taxonrank: string (nullable = true)
 |-- typestatus: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- verbatimeventdate: string (nullable = true)
 |-- verbatimlocality: string (nullable = true)
 |-- version: integer (nullable = true)
 |-- waterbody: string (nullable = true)


In [25]:
distinct_habitats = (idb_df
                     .groupBy(col("data.dwc:habitat"))
                     .count()
                     )

In [23]:
# Uncomment for testing, otherwise leave commented as it causes extra gathering work
# in addition to the writing of the CSV below
distinct_habitats.show(10, truncate=False)


+--------------------------------------------------------+-----+
|dwc:habitat                                             |count|
+--------------------------------------------------------+-----+
|disturbed thickets and fields near watercourses to ocean|1    |
|epiphyte on fallen tree                                 |1    |
|dry oamy soil                                           |1    |
|epiphyte in monte                                       |1    |
|monte                                                   |4    |
|Bosque primario, suelo de arena blanca                  |1    |
|disturbed forest & trailside                            |1    |
|Floodplain.; Compact, dry soil.                         |1    |
|In clay soil on top of sharp ridge                      |1    |
|Beitemark. Blokk/jord. På jord på blokker i beitemark   |1    |
+--------------------------------------------------------+-----+
only showing top 10 rows


In [26]:
# toPandas is inefficient (15 minutes) but simpler than working with HDFS
(distinct_habitats
 .toPandas()
 .to_csv("distinct_habitats.csv", index=False, header=True)
 )