In [1]:
import pyspark.sql.types as types
import pyspark.sql.functions as sql

In [2]:
idb_df_version = "20161119"

In [3]:
idb_df = sqlContext.read.parquet("/guoda/data/idigbio-{0}-100k.parquet".format(idb_df_version))
idb_df.count()


Out[3]:
100000

In [4]:
idb_df.printSchema()


root
 |-- barcodevalue: string (nullable = true)
 |-- basisofrecord: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- canonicalname: string (nullable = true)
 |-- catalognumber: string (nullable = true)
 |-- class: string (nullable = true)
 |-- collectioncode: string (nullable = true)
 |-- collectionid: string (nullable = true)
 |-- collectionname: string (nullable = true)
 |-- collector: string (nullable = true)
 |-- commonname: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- coordinateuncertainty: float (nullable = true)
 |-- country: string (nullable = true)
 |-- countrycode: string (nullable = true)
 |-- county: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- coreid: string (nullable = true)
 |    |-- dc:rights: string (nullable = true)
 |    |-- dcterms:accessRights: string (nullable = true)
 |    |-- dcterms:bibliographicCitation: string (nullable = true)
 |    |-- dcterms:language: string (nullable = true)
 |    |-- dcterms:license: string (nullable = true)
 |    |-- dcterms:modified: string (nullable = true)
 |    |-- dcterms:references: string (nullable = true)
 |    |-- dcterms:rights: string (nullable = true)
 |    |-- dcterms:rightsHolder: string (nullable = true)
 |    |-- dcterms:source: string (nullable = true)
 |    |-- dcterms:type: string (nullable = true)
 |    |-- dwc:VerbatimEventDate: string (nullable = true)
 |    |-- dwc:acceptedNameUsage: string (nullable = true)
 |    |-- dwc:accessRights: string (nullable = true)
 |    |-- dwc:associatedMedia: string (nullable = true)
 |    |-- dwc:associatedOccurrences: string (nullable = true)
 |    |-- dwc:associatedReferences: string (nullable = true)
 |    |-- dwc:associatedSequences: string (nullable = true)
 |    |-- dwc:associatedTaxa: string (nullable = true)
 |    |-- dwc:basisOfRecord: string (nullable = true)
 |    |-- dwc:bed: string (nullable = true)
 |    |-- dwc:behavior: string (nullable = true)
 |    |-- dwc:catalogNumber: string (nullable = true)
 |    |-- dwc:class: string (nullable = true)
 |    |-- dwc:collectionCode: string (nullable = true)
 |    |-- dwc:collectionID: string (nullable = true)
 |    |-- dwc:continent: string (nullable = true)
 |    |-- dwc:coordinatePrecision: string (nullable = true)
 |    |-- dwc:coordinateUncertaintyInMeters: string (nullable = true)
 |    |-- dwc:country: string (nullable = true)
 |    |-- dwc:countryCode: string (nullable = true)
 |    |-- dwc:county: string (nullable = true)
 |    |-- dwc:dataGeneralizations: string (nullable = true)
 |    |-- dwc:datasetID: string (nullable = true)
 |    |-- dwc:datasetName: string (nullable = true)
 |    |-- dwc:dateIdentified: string (nullable = true)
 |    |-- dwc:day: string (nullable = true)
 |    |-- dwc:decimalLatitude: string (nullable = true)
 |    |-- dwc:decimalLongitude: string (nullable = true)
 |    |-- dwc:disposition: string (nullable = true)
 |    |-- dwc:dynamicProperties: string (nullable = true)
 |    |-- dwc:earliestAgeOrLowestStage: string (nullable = true)
 |    |-- dwc:earliestEonOrLowestEonothem: string (nullable = true)
 |    |-- dwc:earliestEpochOrLowestSeries: string (nullable = true)
 |    |-- dwc:earliestEraOrLowestErathem: string (nullable = true)
 |    |-- dwc:earliestPeriodOrLowestSystem: string (nullable = true)
 |    |-- dwc:endDayOfYear: string (nullable = true)
 |    |-- dwc:establishmentMeans: string (nullable = true)
 |    |-- dwc:eventDate: string (nullable = true)
 |    |-- dwc:eventID: string (nullable = true)
 |    |-- dwc:eventRemarks: string (nullable = true)
 |    |-- dwc:eventTime: string (nullable = true)
 |    |-- dwc:family: string (nullable = true)
 |    |-- dwc:fieldNotes: string (nullable = true)
 |    |-- dwc:fieldNumber: string (nullable = true)
 |    |-- dwc:footprintWKT: string (nullable = true)
 |    |-- dwc:formation: string (nullable = true)
 |    |-- dwc:genus: string (nullable = true)
 |    |-- dwc:geodeticDatum: string (nullable = true)
 |    |-- dwc:geologicalContextID: string (nullable = true)
 |    |-- dwc:georeferenceProtocol: string (nullable = true)
 |    |-- dwc:georeferenceRemarks: string (nullable = true)
 |    |-- dwc:georeferenceSources: string (nullable = true)
 |    |-- dwc:georeferenceVerificationStatus: string (nullable = true)
 |    |-- dwc:georeferencedBy: string (nullable = true)
 |    |-- dwc:georeferencedDate: string (nullable = true)
 |    |-- dwc:group: string (nullable = true)
 |    |-- dwc:habitat: string (nullable = true)
 |    |-- dwc:higherClassification: string (nullable = true)
 |    |-- dwc:higherGeography: string (nullable = true)
 |    |-- dwc:higherGeographyID: string (nullable = true)
 |    |-- dwc:highestBiostratigraphicZone: string (nullable = true)
 |    |-- dwc:identificationID: string (nullable = true)
 |    |-- dwc:identificationQualifier: string (nullable = true)
 |    |-- dwc:identificationReferences: string (nullable = true)
 |    |-- dwc:identificationRemarks: string (nullable = true)
 |    |-- dwc:identificationVerificationStatus: string (nullable = true)
 |    |-- dwc:identifiedBy: string (nullable = true)
 |    |-- dwc:individualCount: string (nullable = true)
 |    |-- dwc:individualID: string (nullable = true)
 |    |-- dwc:informationWithheld: string (nullable = true)
 |    |-- dwc:infraspecificEpithet: string (nullable = true)
 |    |-- dwc:institutionCode: string (nullable = true)
 |    |-- dwc:institutionID: string (nullable = true)
 |    |-- dwc:island: string (nullable = true)
 |    |-- dwc:islandGroup: string (nullable = true)
 |    |-- dwc:kingdom: string (nullable = true)
 |    |-- dwc:latestAgeOrHighestStage: string (nullable = true)
 |    |-- dwc:latestEonOrHighestEonothem: string (nullable = true)
 |    |-- dwc:latestEpochOrHighestSeries: string (nullable = true)
 |    |-- dwc:latestEraOrHighestErathem: string (nullable = true)
 |    |-- dwc:latestPeriodOrHighestSystem: string (nullable = true)
 |    |-- dwc:lifeStage: string (nullable = true)
 |    |-- dwc:lithostratigraphicTerms: string (nullable = true)
 |    |-- dwc:locality: string (nullable = true)
 |    |-- dwc:locationAccordingTo: string (nullable = true)
 |    |-- dwc:locationID: string (nullable = true)
 |    |-- dwc:locationRemarks: string (nullable = true)
 |    |-- dwc:lowestBiostratigraphicZone: string (nullable = true)
 |    |-- dwc:maximumDepthInMeters: string (nullable = true)
 |    |-- dwc:maximumElevationInMeters: string (nullable = true)
 |    |-- dwc:member: string (nullable = true)
 |    |-- dwc:minimumDepthInMeters: string (nullable = true)
 |    |-- dwc:minimumElevationInMeters: string (nullable = true)
 |    |-- dwc:month: string (nullable = true)
 |    |-- dwc:municipality: string (nullable = true)
 |    |-- dwc:nameAccordingTo: string (nullable = true)
 |    |-- dwc:nameAccordingToID: string (nullable = true)
 |    |-- dwc:namePublishedIn: string (nullable = true)
 |    |-- dwc:namePublishedInID: string (nullable = true)
 |    |-- dwc:namePublishedInYear: string (nullable = true)
 |    |-- dwc:nomenclaturalCode: string (nullable = true)
 |    |-- dwc:nomenclaturalStatus: string (nullable = true)
 |    |-- dwc:occurrenceDetails: string (nullable = true)
 |    |-- dwc:occurrenceID: string (nullable = true)
 |    |-- dwc:occurrenceRemarks: string (nullable = true)
 |    |-- dwc:occurrenceStatus: string (nullable = true)
 |    |-- dwc:order: string (nullable = true)
 |    |-- dwc:organismID: string (nullable = true)
 |    |-- dwc:originalNameUsage: string (nullable = true)
 |    |-- dwc:otherCatalogNumbers: string (nullable = true)
 |    |-- dwc:ownerInstitutionCode: string (nullable = true)
 |    |-- dwc:parentNameUsage: string (nullable = true)
 |    |-- dwc:phylum: string (nullable = true)
 |    |-- dwc:preparations: string (nullable = true)
 |    |-- dwc:previousIdentifications: string (nullable = true)
 |    |-- dwc:recordNumber: string (nullable = true)
 |    |-- dwc:recordedBy: string (nullable = true)
 |    |-- dwc:reproductiveCondition: string (nullable = true)
 |    |-- dwc:rights: string (nullable = true)
 |    |-- dwc:rightsHolder: string (nullable = true)
 |    |-- dwc:samplingEffort: string (nullable = true)
 |    |-- dwc:samplingProtocol: string (nullable = true)
 |    |-- dwc:scientificName: string (nullable = true)
 |    |-- dwc:scientificNameAuthorship: string (nullable = true)
 |    |-- dwc:scientificNameID: string (nullable = true)
 |    |-- dwc:sex: string (nullable = true)
 |    |-- dwc:specificEpithet: string (nullable = true)
 |    |-- dwc:startDayOfYear: string (nullable = true)
 |    |-- dwc:stateProvince: string (nullable = true)
 |    |-- dwc:subgenus: string (nullable = true)
 |    |-- dwc:taxonID: string (nullable = true)
 |    |-- dwc:taxonRank: string (nullable = true)
 |    |-- dwc:taxonRemarks: string (nullable = true)
 |    |-- dwc:taxonomicStatus: string (nullable = true)
 |    |-- dwc:typeStatus: string (nullable = true)
 |    |-- dwc:verbatimCoordinateSystem: string (nullable = true)
 |    |-- dwc:verbatimCoordinates: string (nullable = true)
 |    |-- dwc:verbatimDepth: string (nullable = true)
 |    |-- dwc:verbatimElevation: string (nullable = true)
 |    |-- dwc:verbatimEventDate: string (nullable = true)
 |    |-- dwc:verbatimLatitude: string (nullable = true)
 |    |-- dwc:verbatimLocality: string (nullable = true)
 |    |-- dwc:verbatimLongitude: string (nullable = true)
 |    |-- dwc:verbatimSRS: string (nullable = true)
 |    |-- dwc:verbatimTaxonRank: string (nullable = true)
 |    |-- dwc:vernacularName: string (nullable = true)
 |    |-- dwc:waterBody: string (nullable = true)
 |    |-- dwc:year: string (nullable = true)
 |    |-- fcc:datePicked: string (nullable = true)
 |    |-- fcc:pickedBy: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- idigbio:preservative: string (nullable = true)
 |    |-- idigbio:recordId: string (nullable = true)
 |    |-- idigbio:subfamily: string (nullable = true)
 |    |-- idigbio:substrate: string (nullable = true)
 |    |-- idigbio:superfamily: string (nullable = true)
 |    |-- symbiota:recordEnteredBy: string (nullable = true)
 |    |-- symbiota:verbatimScientificName: string (nullable = true)
 |-- datecollected: timestamp (nullable = true)
 |-- datemodified: timestamp (nullable = true)
 |-- dqs: float (nullable = true)
 |-- earliestageorloweststage: string (nullable = true)
 |-- earliesteonorlowesteonothem: string (nullable = true)
 |-- earliestepochorlowestseries: string (nullable = true)
 |-- earliesteraorlowesterathem: string (nullable = true)
 |-- earliestperiodorlowestsystem: string (nullable = true)
 |-- etag: string (nullable = true)
 |-- eventdate: string (nullable = true)
 |-- family: string (nullable = true)
 |-- fieldnumber: string (nullable = true)
 |-- formation: string (nullable = true)
 |-- genus: string (nullable = true)
 |-- geologicalcontextid: string (nullable = true)
 |-- geopoint: struct (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- lon: double (nullable = true)
 |-- group: string (nullable = true)
 |-- hasImage: boolean (nullable = true)
 |-- hasMedia: boolean (nullable = true)
 |-- highertaxon: string (nullable = true)
 |-- highestbiostratigraphiczone: string (nullable = true)
 |-- individualcount: float (nullable = true)
 |-- infraspecificepithet: string (nullable = true)
 |-- institutioncode: string (nullable = true)
 |-- institutionid: string (nullable = true)
 |-- institutionname: string (nullable = true)
 |-- kingdom: string (nullable = true)
 |-- latestageorhigheststage: string (nullable = true)
 |-- latesteonorhighesteonothem: string (nullable = true)
 |-- latestepochorhighestseries: string (nullable = true)
 |-- latesteraorhighesterathem: string (nullable = true)
 |-- latestperiodorhighestsystem: string (nullable = true)
 |-- lithostratigraphicterms: string (nullable = true)
 |-- locality: string (nullable = true)
 |-- lowestbiostratigraphiczone: string (nullable = true)
 |-- maxdepth: float (nullable = true)
 |-- maxelevation: float (nullable = true)
 |-- member: string (nullable = true)
 |-- mindepth: float (nullable = true)
 |-- minelevation: float (nullable = true)
 |-- municipality: string (nullable = true)
 |-- occurrenceid: string (nullable = true)
 |-- order: string (nullable = true)
 |-- phylum: string (nullable = true)
 |-- recordnumber: string (nullable = true)
 |-- recordset: string (nullable = true)
 |-- scientificname: string (nullable = true)
 |-- specificepithet: string (nullable = true)
 |-- startdayofyear: integer (nullable = true)
 |-- stateprovince: string (nullable = true)
 |-- taxonid: string (nullable = true)
 |-- taxonomicstatus: string (nullable = true)
 |-- taxonrank: string (nullable = true)
 |-- typestatus: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- verbatimeventdate: string (nullable = true)
 |-- verbatimlocality: string (nullable = true)
 |-- version: integer (nullable = true)
 |-- waterbody: string (nullable = true)


In [5]:
notes = (idb_df
         .select(sql.concat_ws(" ", idb_df["data.dwc:occurrenceRemarks"],
                        idb_df["data.dwc:eventRemarks"],
                        idb_df["data.dwc:fieldNotes"]
                        )
                        .alias("note"),
                 idb_df["uuid"]
                 )
         .where(sql.column("note") != "")
         )

In [6]:
print(notes.head())
print(notes.count())


Row(note='On sandy soil over rocks.', uuid='b73cb633-3eb9-43a1-a0c6-08b07805699a')
27667

In [7]:
import nltk
from nltk.corpus import stopwords

stopwords_set = set(stopwords.words('english'))
t = nltk.tokenize.treebank.TreebankWordTokenizer() 

def tokenize(s):
    '''
    Take a string and return a list of tokens split out from it
    with the nltk library
    '''

    # word_tokenize uses PunktSentenceTokenizer first, then
    # treebank_word_tokenizer on those so can get nested
    # lists.
    #return nltk.tokenize.word_tokenize(s)

    # this is just the treebank tokenizer
    return [word for word in t.tokenize(s) if word not in stopwords_set]

udf_tokenize = sql.udf(tokenize, types.ArrayType(types.StringType()))

print(tokenize('a-uuid-with-dashes Some isn\'t "99", nor is it good; only bad. 3kas9203h: CAT32423432'))


['a-uuid-with-dashes', 'Some', "n't", '``', '99', "''", ',', 'good', ';', 'bad.', '3kas9203h', ':', 'CAT32423432']

In [8]:
tokens = (notes
         .withColumn("tokens", udf_tokenize(notes["note"]))
         .select(notes["uuid"],
                 sql.explode(sql.column("tokens")).alias("token")
                 )
          )

In [9]:
print(tokens.head(5))
print(tokens.count())


[Row(uuid='b73cb633-3eb9-43a1-a0c6-08b07805699a', token='On'), Row(uuid='b73cb633-3eb9-43a1-a0c6-08b07805699a', token='sandy'), Row(uuid='b73cb633-3eb9-43a1-a0c6-08b07805699a', token='soil'), Row(uuid='b73cb633-3eb9-43a1-a0c6-08b07805699a', token='rocks'), Row(uuid='b73cb633-3eb9-43a1-a0c6-08b07805699a', token='.')]
263228

In [10]:
idb_tf_index = (tokens
               .groupBy(tokens["uuid"], tokens["token"])
               .count()
               )

In [11]:
print(idb_tf_index.head(10))
print(idb_tf_index.count())


[Row(uuid='b73d735c-43b4-4a45-85af-4a684ceb4e8c', token='marshy', count=1), Row(uuid='b73f5953-6c64-40fb-a056-f407f88605c5', token='Moist', count=1), Row(uuid='b74692e2-73f8-4b13-93e8-a5a5d3b7e67f', token='menziesii', count=1), Row(uuid='b748a569-e93e-403b-a59c-1538adee6c22', token='2', count=1), Row(uuid='b748a569-e93e-403b-a59c-1538adee6c22', token='5.5', count=1), Row(uuid='b74a0670-ff75-4dfc-aa04-97e7774936fa', token='anthers', count=1), Row(uuid='b74a8cee-d5a6-4c57-9634-928be0c74ee5', token='Tech', count=1), Row(uuid='b74b595d-efb3-4c92-9f3f-5ac6b8b47fd0', token=';', count=1), Row(uuid='b74dfbc8-af96-47b0-9c62-1d2fabbc1a96', token='girgensonhnii', count=1), Row(uuid='b74f4dec-ac3c-4bc5-bbe4-aabd3d34e440', token='along', count=1)]
232306

In [12]:
# Everything as one line:

test_df = (idb_df
    .select(sql.concat_ws(" ", idb_df["data.dwc:occurrenceRemarks"],
                          idb_df["data.dwc:eventRemarks"],
                          idb_df["data.dwc:fieldNotes"]
                         )
                        .alias("note"),
                        idb_df["uuid"]
            )
    .where(sql.column("note") != "")
    .withColumn("tokens", udf_tokenize(sql.column("note")))
    .select(sql.column("uuid"),
            sql.explode(sql.column("tokens")).alias("token")
           )
    .groupBy(sql.column("uuid"), sql.column("token"))
    .count()
)

print(test_df.head())
print(test_df.count())


Row(uuid='b73d735c-43b4-4a45-85af-4a684ceb4e8c', token='marshy', count=1)
232306

In [15]:
(test_df
    .write
    .mode("overwrite")
    .parquet("/guoda/data/idigbio-{}-tf-100k.parquet".format(idb_df_version))
)

In [16]:
idb_tf_df = sqlContext.read.parquet("/guoda/data/idigbio-{}-tf-100k.parquet".format(idb_df_version))
idb_tf_df.count()


Out[16]:
232306

In [ ]: