Local Spark Cluster
Import dependencies:
In [1]:
import operator
import pyspark
Init Spark session:
In [2]:
def init_spark():
session = SparkSession.builder.appName("HelloSparkApp").getOrCreate()
cntx = session.sparkContext
return session,cntx
session,cntx = init_spark()
spark
Out[2]:
In [3]:
nums = cntx.parallelize(range(0, 11))
print(nums.map(lambda x: x*x).collect())
Get a RDD containing lines from AI-Workshop:
In [4]:
lines = sc.textFile('../../AI-Workshop/README.md')
lines.take(10)
Out[4]:
Get the number of partitions:
In [5]:
numPartitions = lines.getNumPartitions()
print('Number of partitions storing the dataset: {}'.format(numPartitions))
In [6]:
# Split each line into words and assign a frequency of 1 to each word
words = lines.flatMap(lambda line: line.split(" "))
# Filter stop words
stop_words = ['', '*', '##', 'и', 'с', 'в','по']
filtered_words = words.filter(lambda x: x.lower() not in stop_words)
# Get word tuples
word_tuples = filtered_words.map(lambda word: (word, 1))
# Count the frequency for words
counts = word_tuples.reduceByKey(operator.add) # equals .reduceByKey(lambda a, b: a + b)
# Sort the counts in descending order based on the word frequency
sorted_counts = counts.sortBy(lambda x: x[1], False)
In [7]:
#Get an iterator over the counts to print a word and its frequency
for word,count in sorted_counts.toLocalIterator():
print('{} --> {}'.format(word, count))
In [8]:
# import the pyspark sql Row class
from pyspark.sql import Row
# create a table from Rows
word_counts_rows = sorted_counts.map(lambda p: Row(word=p[0], count=int(p[1])))
word_counts_table = sqlContext.createDataFrame(word_counts_rows)
# register a temp table for querying
word_counts_table.registerTempTable("word_count")
Get data fron word_counts_table
:
In [9]:
word_counts_dt = session.sql('SELECT word, count FROM word_count WHERE length(word) > 2 and count > 1')
word_counts_dt.toPandas()
Out[9]:
In [10]:
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator
import matplotlib.pyplot as plt
%matplotlib inline
Read word statistics from Spark table and draw word cloud:
In [11]:
words_stats = word_counts_dt.toPandas()
words_stats = words_stats.set_index('word').to_dict()['count']
# Fit word cloud
wordcloud = WordCloud().fit_words(words_stats)
# Display the generated image:
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.show()