In [4]:
%matplotlib inline
from __future__ import print_function
import os
from pyspark import SQLContext
from pyspark.sql import Row
import pyspark.sql.functions as sql
#from pyspark.sql.functions import udf, length
import matplotlib.pyplot as plt
import numpy
import math
import matplotlib.pyplot as plt
import seaborn as sns

import nltk
import pyspark.ml.feature as feature


:0: FutureWarning: IPython widgets are experimental and may change in the future.

In [1]:
# Load Processed Parquet
sqlContext = SQLContext(sc)
notes = sqlContext.read.parquet("../data/idigbio_notes.parquet")
#idbdf = sqlContext.read.parquet("../data/idigbio-100k/occurrence.txt.parquet")
total_records = notes.count()
print(total_records)


3232459

In [2]:
# Small sample of the df
notes_sm = notes.sample(withReplacement=False, fraction=0.1)
notes_sm.cache()
print(notes_sm.count())


322952

In [5]:
# How much text is this?
print("Total text in MB")
print(notes_sm.select(sql.sum(notes_sm['document_len'])).collect()[0][0] / 1024^2)


Total text in MB
20100

Standard phases:

  1. tokenize
  2. remove stop words
  3. stem
  4. frequency hist
  5. wordcloud

In [6]:
# This creates a new column with a list of tokens in it, not a column of tokens
tokenizer = feature.Tokenizer()
#print(tokenizer.params)
tokenizer.setParams(inputCol='document', outputCol='tokens')
notes_tokens = tokenizer.transform(notes_sm)
notes_tokens.head()


Out[6]:
Row(uuid=u'57fc3a0d-6459-4bea-af3b-cf5b1bc0895b', occurrenceID=u'urn:lsid:biosci.ohio-state.edu:osuc_occurrences:OSAL__0004567', catalogNumber=u'OSAL 0004567', county=u'Hocking', institutionCode=u'Ohio State University Acarology Laboratory, Columbus, OH (OSAL)', country=u'United States', countryCode=u'', stateProvince=u'Ohio', family=u'', recordedBy=u'Gerdeman, B. S. (Beverly Swaim)', order=u'', specificEpithet=u'', genus=u'', sex=u'undetermined', scientificName=u'Narceolaps annularis', year=u'', month=u'', fieldNotes=u"[ USA: Ohio, Hocking Co. | Little Rocky Hollow| Gerdeman BS | 29 V 1998 | ex narceus | (Diplopoda: Spirobolidae) | BSG98-0529-4 AL5523 ] [ Iphiopsididae | Narceolaelaps annularis | Kethley 1978 | DN | det.: Gerdeman 1998 | Lactophenol | Hoyer's | OSAL0004567 ]", occurrenceRemarks=u'', eventRemarks=u'', document=u"  [ USA: Ohio, Hocking Co. | Little Rocky Hollow| Gerdeman BS | 29 V 1998 | ex narceus | (Diplopoda: Spirobolidae) | BSG98-0529-4 AL5523 ] [ Iphiopsididae | Narceolaelaps annularis | Kethley 1978 | DN | det.: Gerdeman 1998 | Lactophenol | Hoyer's | OSAL0004567 ]", document_len=262, fieldNotes_len=260, eventRemarks_len=0, occurrenceRemarks_len=0, tokens=[u'', u'', u'[', u'usa:', u'ohio,', u'hocking', u'co.', u'|', u'little', u'rocky', u'hollow|', u'gerdeman', u'bs', u'|', u'29', u'v', u'1998', u'|', u'ex', u'narceus', u'|', u'(diplopoda:', u'spirobolidae)', u'|', u'bsg98-0529-4', u'al5523', u']', u'[', u'iphiopsididae', u'|', u'narceolaelaps', u'annularis', u'|', u'kethley', u'1978', u'|', u'dn', u'|', u'det.:', u'gerdeman', u'1998', u'|', u'lactophenol', u'|', u"hoyer's", u'|', u'osal0004567', u']'])

In [7]:
# flatten to list of tokens & convert to dataframe
tokens = notes_tokens.flatMap(lambda x: x['tokens']).map(lambda x: (x, 1))
print(tokens.take(10))

# Below cells based on DF need this
#tokens = tokens.map(lambda w: Row(token=w)).toDF()
#print(tokens.head(10))


[(u'', 1), (u'', 1), (u'[', 1), (u'usa:', 1), (u'ohio,', 1), (u'hocking', 1), (u'co.', 1), (u'|', 1), (u'little', 1), (u'rocky', 1)]

In [8]:
# pyspark.ml.feature.StopWordsRemover

In [8]:
from wordcloud import WordCloud

In [11]:
#token_freq = tokens.groupBy('token').count()
#.sort(sql.col("count").sql.desc())
#print(token_freq.head(4))
#(group_by_dataframe
#    .count()
#    .filter("`count` >= 10")
#    .sort(col("count").desc()))
# This None NA bug stuff sucks.
#token_freq.show(5)

tokens.registerTempTable("tokens")
tokens_freq = sqlContext.sql("""
SELECT token, count(token) as c
FROM tokens
WHERE length(token) > 0 AND token IS NOT NULL
GROUP BY token
ORDER BY c DESC
LIMIT 100""")
#print(token_freq.show(4)) # Still null pointer exception!
#pd_tokens = tokens_freq.toPandas() # And happens outright here

In [9]:
# Back to using the RRD version of tokens
from operator import add
counts = tokens.reduceByKey(add)
print(counts.take(3))


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-9-f7b4486fb994> in <module>()
      2 from operator import add
      3 counts = tokens.reduceByKey(add)
----> 4 print(counts.take(3))

/opt/spark/python/pyspark/rdd.py in take(self, num)
   1295 
   1296             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1297             res = self.context.runJob(self, takeUpToNumLeft, p)
   1298 
   1299             items += res

/opt/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    937         # SparkContext#runJob.
    938         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 939         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    940         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    941 

/opt/spark/python/lib/py4j-0.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    833         answer = self.gateway_client.send_command(command)
    834         return_value = get_return_value(
--> 835             answer, self.gateway_client, self.target_id, self.name)
    836 
    837         for temp_arg in temp_args:

/opt/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

/opt/spark/python/lib/py4j-0.9.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    308                 raise Py4JJavaError(
    309                     "An error occurred while calling {0}{1}{2}.\n".
--> 310                     format(target_id, ".", name), value)
    311             else:
    312                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 9.0 failed 1 times, most recent failure: Lost task 11.0 in stage 9.0 (TID 1283, localhost): java.lang.NullPointerException
	at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
	at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
	at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:119)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:110)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:110)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
	at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
	at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
	at org.apache.spark.ml.feature.Tokenizer$$anonfun$createTransformFunc$1.apply(Tokenizer.scala:39)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
	at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:119)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:110)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:110)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
	at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
	at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

In [ ]: