In [4]:
%matplotlib inline
from __future__ import print_function
import os
from pyspark import SQLContext
from pyspark.sql import Row
import pyspark.sql.functions as sql
#from pyspark.sql.functions import udf, length
import matplotlib.pyplot as plt
import numpy
import math
import matplotlib.pyplot as plt
import seaborn as sns
import nltk
import pyspark.ml.feature as feature
In [1]:
# Load Processed Parquet
sqlContext = SQLContext(sc)
notes = sqlContext.read.parquet("../data/idigbio_notes.parquet")
#idbdf = sqlContext.read.parquet("../data/idigbio-100k/occurrence.txt.parquet")
total_records = notes.count()
print(total_records)
In [2]:
# Small sample of the df
notes_sm = notes.sample(withReplacement=False, fraction=0.1)
notes_sm.cache()
print(notes_sm.count())
In [5]:
# How much text is this?
print("Total text in MB")
print(notes_sm.select(sql.sum(notes_sm['document_len'])).collect()[0][0] / 1024^2)
Standard phases:
In [6]:
# This creates a new column with a list of tokens in it, not a column of tokens
tokenizer = feature.Tokenizer()
#print(tokenizer.params)
tokenizer.setParams(inputCol='document', outputCol='tokens')
notes_tokens = tokenizer.transform(notes_sm)
notes_tokens.head()
Out[6]:
In [7]:
# flatten to list of tokens & convert to dataframe
tokens = notes_tokens.flatMap(lambda x: x['tokens']).map(lambda x: (x, 1))
print(tokens.take(10))
# Below cells based on DF need this
#tokens = tokens.map(lambda w: Row(token=w)).toDF()
#print(tokens.head(10))
In [8]:
# pyspark.ml.feature.StopWordsRemover
In [8]:
from wordcloud import WordCloud
In [11]:
#token_freq = tokens.groupBy('token').count()
#.sort(sql.col("count").sql.desc())
#print(token_freq.head(4))
#(group_by_dataframe
# .count()
# .filter("`count` >= 10")
# .sort(col("count").desc()))
# This None NA bug stuff sucks.
#token_freq.show(5)
tokens.registerTempTable("tokens")
tokens_freq = sqlContext.sql("""
SELECT token, count(token) as c
FROM tokens
WHERE length(token) > 0 AND token IS NOT NULL
GROUP BY token
ORDER BY c DESC
LIMIT 100""")
#print(token_freq.show(4)) # Still null pointer exception!
#pd_tokens = tokens_freq.toPandas() # And happens outright here
In [9]:
# Back to using the RRD version of tokens
from operator import add
counts = tokens.reduceByKey(add)
print(counts.take(3))
In [ ]: