In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
A lot of examples in this article are borrowed from the book written by Bird et al. (2009). Here I tried to implement the examples from the book with spark as much as possible.
Refer to the book for more details: Bird, Steven, Ewan Klein, and Edward Loper. Natural language processing with Python: analyzing text with the natural language toolkit. " O'Reilly Media, Inc.", 2009.
Create a data frame consisting of text elements.
In [3]:
import pandas as pd
pdf = pd.DataFrame({
'texts': [['I', 'like', 'playing', 'basketball'],
['I', 'like', 'coding'],
['I', 'like', 'machine', 'learning', 'very', 'much']]
})
df = spark.createDataFrame(pdf)
df.show(truncate=False)
In [4]:
from pyspark.ml.feature import NGram
from pyspark.ml import Pipeline
ngrams = [NGram(n=n, inputCol='texts', outputCol=str(n)+'-grams') for n in [2,3,4]]
# build pipeline model
pipeline = Pipeline(stages=ngrams)
# transform data
texts_ngrams = pipeline.fit(df).transform(df)
In [5]:
# display result
texts_ngrams.select('2-grams').show(truncate=False)
texts_ngrams.select('3-grams').show(truncate=False)
texts_ngrams.select('4-grams').show(truncate=False)
In [6]:
from nltk.corpus import gutenberg
gutenberg_fileids = gutenberg.fileids()
gutenberg_fileids
Out[6]:
In [7]:
gutenberg.abspath(gutenberg_fileids[0])
Out[7]:
In [8]:
gutenberg.raw(gutenberg_fileids[0])[:200]
Out[8]:
In [9]:
gutenberg.words()
Out[9]:
In [10]:
len(gutenberg.words())
Out[10]:
In [11]:
gutenberg.sents(gutenberg_fileids[0])
Out[11]:
In [12]:
len(gutenberg.sents(gutenberg_fileids[0]))
Out[12]:
In [13]:
from nltk.corpus import PlaintextCorpusReader
corpus_data = PlaintextCorpusReader('./data', '.*')
In [14]:
data_fileids = corpus_data.fileids()
data_fileids
Out[14]:
In [15]:
corpus_data.raw('twitter.txt')
Out[15]:
In [16]:
corpus_data.words(fileids='twitter.txt')
Out[16]:
In [17]:
len(corpus_data.words(fileids='twitter.txt'))
Out[17]:
In [18]:
corpus_data.sents(fileids='twitter.txt')
Out[18]:
In [19]:
len(corpus_data.sents(fileids='twitter.txt'))
Out[19]:
In [20]:
from nltk.corpus import wordnet
wordnet.synsets
pdf = pd.DataFrame({
'car_synsets': [synsets._name for synsets in wordnet.synsets('car')]
})
df = spark.createDataFrame(pdf)
df.show()
In [41]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from nltk.corpus import wordnet
def lemma_names_from_synset(x):
synset = wordnet.synset(x)
return synset.lemma_names()
lemma_names_from_synset('car.n.02')
# synset_lemmas_udf = udf(lemma_names_from_synset, ArrayType(StringType()))
Out[41]: