In [1]:
import pyspark.sql.types as types
import pyspark.sql.functions as sql
In [2]:
idb_df_version = "20161119"
In [3]:
idb_df = sqlContext.read.parquet("/guoda/data/idigbio-{0}-100k.parquet".format(idb_df_version))
idb_df.count()
Out[3]:
In [4]:
idb_df.printSchema()
In [5]:
notes = (idb_df
.select(sql.concat_ws(" ", idb_df["data.dwc:occurrenceRemarks"],
idb_df["data.dwc:eventRemarks"],
idb_df["data.dwc:fieldNotes"]
)
.alias("note"),
idb_df["uuid"]
)
.where(sql.column("note") != "")
)
In [6]:
print(notes.head())
print(notes.count())
In [7]:
import nltk
from nltk.corpus import stopwords
stopwords_set = set(stopwords.words('english'))
t = nltk.tokenize.treebank.TreebankWordTokenizer()
def tokenize(s):
'''
Take a string and return a list of tokens split out from it
with the nltk library
'''
# word_tokenize uses PunktSentenceTokenizer first, then
# treebank_word_tokenizer on those so can get nested
# lists.
#return nltk.tokenize.word_tokenize(s)
# this is just the treebank tokenizer
return [word for word in t.tokenize(s) if word not in stopwords_set]
udf_tokenize = sql.udf(tokenize, types.ArrayType(types.StringType()))
print(tokenize('a-uuid-with-dashes Some isn\'t "99", nor is it good; only bad. 3kas9203h: CAT32423432'))
In [8]:
tokens = (notes
.withColumn("tokens", udf_tokenize(notes["note"]))
.select(notes["uuid"],
sql.explode(sql.column("tokens")).alias("token")
)
)
In [9]:
print(tokens.head(5))
print(tokens.count())
In [10]:
idb_tf_index = (tokens
.groupBy(tokens["uuid"], tokens["token"])
.count()
)
In [11]:
print(idb_tf_index.head(10))
print(idb_tf_index.count())
In [12]:
# Everything as one line:
test_df = (idb_df
.select(sql.concat_ws(" ", idb_df["data.dwc:occurrenceRemarks"],
idb_df["data.dwc:eventRemarks"],
idb_df["data.dwc:fieldNotes"]
)
.alias("note"),
idb_df["uuid"]
)
.where(sql.column("note") != "")
.withColumn("tokens", udf_tokenize(sql.column("note")))
.select(sql.column("uuid"),
sql.explode(sql.column("tokens")).alias("token")
)
.groupBy(sql.column("uuid"), sql.column("token"))
.count()
)
print(test_df.head())
print(test_df.count())
In [15]:
(test_df
.write
.mode("overwrite")
.parquet("/guoda/data/idigbio-{}-tf-100k.parquet".format(idb_df_version))
)
In [16]:
idb_tf_df = sqlContext.read.parquet("/guoda/data/idigbio-{}-tf-100k.parquet".format(idb_df_version))
idb_tf_df.count()
Out[16]:
In [ ]: