Counting words

1.- Creating a simple RDD .

We will create a simple RDD and apply basic operations


In [ ]:
fruits = ['apple', 'orange', 'banana', 'grape', 'watermelon', 'apple', 'orange', 'apple']
number_partitions = 4
dataRDD = sc.parallelize(fruits, number_partitions)
print type(dataRDD)

Exercise: Apply the corresponding operation:

- obtain the total number of elements in the RDD (count)

- print the first two elements in the RDD (take)

- print the first two alphabetically sorted elements in the RDD (takeOrdered)

The answer should be:


There are 8 elements in the RDD

These are the first two:
['apple', 'orange']

These are the first two, alphabetically ordered:
['apple', 'apple']

In [ ]:
N_data = dataRDD.<COMPLETAR>()
print "There are %d elements in the RDD\n" % N_data

print "These are the first two:"
print dataRDD.<COMPLETAR>(2)

print "\nThese are the first two, alphabetically ordered:"
print dataRDD.<COMPLETAR>(2)

2.- Simple transformations

Exercise: Define a function 'complete_word' that adds ' fruit' to the input string. Use this function to process all elements in the RDD using map. Print all of the elements in the resulting RDD using collect().

The answer should be:


Testing the function:
apple fruit

These are all the elements in the RDD:
['apple fruit', 'orange fruit', 'banana fruit', 'grape fruit', 'watermelon fruit', 'apple fruit', 'orange fruit', 'apple fruit']

In [ ]:
def complete_word(word):
    return <COMPLETAR>

print "Testing the function:"
print complete_word('apple')

dataRDDprocessed = dataRDD.map(<COMPLETAR>)

print "\nThese are all the elements in the RDD:"
print dataRDDprocessed.<COMPLETAR>()

We will use now a lambda function to do the same task


In [ ]:
dataRDDprocessed_lambda = dataRDD.map(lambda x: x + ' fruit')

print "Result with a lambda function:"
print dataRDDprocessed_lambda.<COMPLETAR>()

Now let's count the number of characters of every processed word.

The answer should be:


[11, 12, 12, 11, 16, 11, 12, 11]

In [ ]:
wordLengths = (dataRDDprocessed_lambda
                    .map(<COMPLETAR>)
                    .collect())
print wordLengths

Let's obtain a string with all the words in the original RDD using two different approaches.

Exercise: Complete the code and discuss the results:

The answer should be:


type 'str'
apple orange banana grape watermelon apple orange apple
type 'str'
apple orange banana grape watermelon apple orange apple

In [ ]:
string1 = " ".join(<COMPLETAR>)
print type(string1)
print string1

string2 = dataRDD.reduce(lambda x, y: <COMPLETAR>)
print type(string2)
print string2

Exercise: Repeat the scheme above to obtain the total number of characters in the RDD:

The answer should be:


48
48

In [ ]:
Nchars = sum(dataRDD.<COMPLETAR>)
print Nchars

Nchars = dataRDD.map(len).reduce(<COMPLETAR>)
print Nchars

3.- Creating a pair RDD and counting

Every element of a pair RDD is a tuple (k,v) where k is the key and v is the value.

Exercise: Transform the original RDD into a pair RDD, where the value is always 1.

The answer should be:


[('apple', 1), ('orange', 1), ('banana', 1), ('grape', 1), ('watermelon', 1), ('apple', 1), ('orange', 1), ('apple', 1)]

Grouped pairs as an interable:
[('orange', ), ('watermelon', ), ('grape', ), ('apple', ), ('banana', )]

Grouped pairs as a list
[('orange', [1, 1]), ('watermelon', [1]), ('grape', [1]), ('apple', [1, 1, 1]), ('banana', [1])]

Grouped pairs + count
[('orange', 2), ('watermelon', 1), ('grape', 1), ('apple', 3), ('banana', 1)]

In [ ]:
pairRDD = dataRDD.map(lambda x: (x, 1))
print pairRDD.collect()

In [ ]:
print "Result: (key, iterable):"
groupedRDD = pairRDD.groupByKey()
print groupedRDD.collect()
print " "

print "Result: (key, list of results):"
groupedRDDprocessed = groupedRDD.mapValues(list)
print groupedRDDprocessed.collect()
print " "

print "Result: (key, count):"
groupedRDDprocessed = groupedRDD.mapValues(len)
print groupedRDDprocessed.collect()
print " "

Exercise: Use groupByKey to count the frequencies of every word ( caution!: groupByKey transformation can be very inefficient, since it needs to exchange data among workers):

The answer should be:


Result: (key, count):
[('apple', 1), ('orange', 1)]
[('orange', 2), ('watermelon', 1), ('grape', 1), ('apple', 3), ('banana', 1)]

In [ ]:
print "Result: (key, count):"

countRDD = pairRDD.groupByKey().map(<COMPLETAR>)

print countRDD.collect()
print " "

Exercise: Repeat the counting using reduceByKey, a much more efficient approach, since it operates at every worker before sharing results.

The answer should be:


Result: (key, count):
[('orange', 2), ('watermelon', 1), ('grape', 1), ('apple', 3), ('banana', 1)]

In [ ]:
print "Result: (key, count):"
countRDD = pairRDD.reduceByKey(<COMPLETAR>)
print countRDD.collect()
print " "

Exercise: Combine map, reduceByKey and collect to obtain the counts per word:

The answer should be:


[('orange', 2), ('watermelon', 1), ('grape', 1), ('apple', 3), ('banana', 1)]

In [ ]:
counts = (dataRDD
              .<COMPLETAR>
              .<COMPLETAR>
              .<COMPLETAR>
         )
print counts

4.- Filtering a RDD

Count the number of words that only appear once in the dataset.

The answer should be:


3

In [ ]:
N_unique_words = (dataRDD
                      .<COMPLETAR>
                      .<COMPLETAR>
                      .filter(<COMPLETAR>)
                      .count()
                 )
print N_unique_words

5.- Counting words in a file

We will use the Complete Works of William Shakespeare from Project Gutenberg. To convert a text file into an RDD, we use the SparkContext.textFile() method.


In [ ]:
textRDD = sc.textFile('data/shakespeare.txt', 8)
print "Number of lines of text = %d" % textRDD.count()

Exercise: Use the code written in the previous sections to obtain the counts for every word in the text. Print the first 10 results. Observe the result, is this what we want? What is going wrong?

The answer should be:


[(u'', 9493), (u'    thou diest in thine unthankfulness, and thine ignorance makes', 1), (u"    Which I shall send you written, be assur'd", 1), (u'    I do beseech you, take it not amiss:', 1), (u'    their mastiffs are of unmatchable courage.', 1), (u'    With us in Venice, if it be denied,', 1), (u"  Hot. I'll have it so. A little charge will do it.", 1), (u'     By what yourself, too, late have spoke and done,', 1), (u"  FIRST LORD. He's but a mad lord, and nought but humours sways him.", 1), (u'    none will entertain it.', 1)]

In [ ]:
counts = (textRDD
              .map(lambda x: (x, 1))
              .<COMPLETAR>
              .take(10)
         )
print counts

Exercise: Modify the code by introducing a flatMap operation and observe the result.

The answer should be:


[(u'fawn', 11), (u'bishops.', 2), (u'divinely', 1), (u'mustachio', 1), (u'four', 114), (u'reproach-', 1), (u'drollery.', 1), (u'conjuring', 1), (u'slew.', 1), (u'Calen', 1)]

In [ ]:
counts = (textRDD
              .flatMap(lambda x: x.split())
              .map(<COMPLETAR>)
              .<COMPLETAR>
              .take(10)
         )
print counts

Exercise: Modify the code to obtain 5 words that appear exactly 111 times in the text.

The answer should be:


[(u'think,', 111), (u'see,', 111), (u'gone.', 111), (u"King's", 111), (u'having', 111)]

In [ ]:
counts = (textRDD
              .flatMap(<COMPLETAR>)
              .map(<COMPLETAR>)
              .reduceByKey(<COMPLETAR>)
              .filter(<COMPLETAR>)
              .take(<COMPLETAR>)
         )
print counts

Exercise: Modify the code to obtain the 5 words that most appear in the text.

The answer should be:


[(u'the', 23197), (u'I', 19540), (u'and', 18263), (u'to', 15592), (u'of', 15507)]

In [ ]:
counts = (textRDD
              .<COMPLETAR>
              .<COMPLETAR>
              .<COMPLETAR>
              .takeOrdered(5,key = lambda x: <COMPLETAR>)
         )
print counts

6.- Cleaning the text

You may see in the results that we observe some words in capital letters, that some other punctuation characters appear as well. We will incorporate in the code a cleaning function such that we eliminate unwanted characters. We provide a simple cleaning function that lowers all the characters.

Exercise: Use it in the code and verify that the word "I" is printed as "i".

Note: Since we are modifying the strings, the counts will differ with respect to the previous values.

The answer should be:


[(u'the', 27267), (u'and', 25340), (u'i', 19540), (u'to', 18656), (u'of', 17301)]

In [ ]:
def clean_text(string):
    string = string.lower()
    return string

In [ ]:
counts = (textRDD
              .flatMap(<COMPLETAR>)
              .map(<COMPLETAR>)
              .map(<COMPLETAR>)
              .reduceByKey(<COMPLETAR>)
              .takeOrdered(<COMPLETAR>)
         )
print counts

We will now search for non-alphabetical characters in the dataset. We can use the Python method 'isalpha' to decide wether or not a string is composed of characters a-z.

Exercise: Use that function to print the 20 words with non-alphabetic characters that most appear in the text and print the total number of strings with non-alphabetic characters.

The answer should be:


The database has 40957 words that need cleaning, for example:

[(u"i'll", 1737), (u'you,', 1478), (u"'tis", 1367), (u'sir,', 1235), (u'me,', 1219), (u"th'", 1146), (u'o,', 1008), (u'lord,', 977), (u'come,', 875), (u'me.', 823), (u'you.', 813), (u'why,', 805), (u'now,', 785), (u'it.', 784), (u'him.', 755), (u'lord.', 702), (u'him,', 698), (u'ay,', 661), (u'well,', 647), (u'and,', 647)]

In [ ]:
countsRDD = (textRDD
              .flatMap(<COMPLETAR>)
              .map(<COMPLETAR>)
              .filter(lambda x: not x.isalpha())
              .map(<COMPLETAR>)
              .reduceByKey(<COMPLETAR>)
              )
countsRDD.cache()

print "The database has %d words that need cleaning, for example:\n" % countsRDD.count()
print countsRDD.takeOrdered(20,key = lambda x: -x[1])

You can clearly observe now all the punctuation symbols that have not been removed yet.

Exercise: Write a new_clean_function such that all the unwanted symbols have been remode. As a hint, we include the code for removing the symbol '.'

The answer should be:


The database has 0 elements that need preprocessing, for example:
[]

In [ ]:
def new_clean_text(string):
    string = string.lower()
    list_of_chars = ['.', <COMPLETAR>]
    for c in <COMPLETAR>:
        string = string.replace(c,'')
    return string    

countsRDD = (textRDD
              .flatMap(<COMPLETAR>)
              .map(new_clean_text)
              .filter(lambda x: not x.isnumeric())
              .filter(lambda x: len(x)>0)  
              .filter(lambda x: not x.isalnum())                     
              .map(<COMPLETAR>)
              .reduceByKey(<COMPLETAR>)
              )
countsRDD.cache()

Npreprocess = countsRDD.count()
print "The database has %d elements that need preprocessing, for example:" % Npreprocess
print countsRDD.takeOrdered(20,key = lambda x: -x[1])

Exercise: Now that we have completely cleaned the words, try to find the 20 most frequent cleaned strings.

The answer should be:


Processing the dataset to find the 20 most frequent strings:

[(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463), (u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890), (u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678), (u'for', 7558), (u'be', 6857), (u'his', 6857), (u'your', 6655), (u'this', 6602)]

In [ ]:
print "Processing the dataset to find the 20 most frequent strings:\n"
countsRDDclean = (textRDD
          .<COMPLETAR>
          )
countsRDDclean.cache()
print countsRDDclean.takeOrdered(20,key = lambda x: -x[1])

7.- Removing stopwords

Many of the most frequent words obtained in the previous section are irrelevant to many tasks, they are know as stop-words. We will use here a stop list (list of meaningless words) to clean out those terms.

Exercise: Observe the line used for converting the strings to unicode. This task could be implemented using a "for" loop, but we are using what is called a "List Comprehension".


In [ ]:
import csv
with open('data/english_stopwords.txt', 'rb') as csvfile:
    
    reader = csv.reader(csvfile)
    stopwords = []
    
    for row in reader:
        stopwords.append(row[0].replace("'",'').replace('\t',''))
        
    stopwords = [unicode(s, "utf-8") for s in stopwords]
    
print stopwords

Exercise: Apply an extra filter that removes the stop words from the calculations. Print the 50 most frequent words ONLY THE WORDS separated with blank spaces. Are they informative about Shakespeare's books?

The answer should be:


These are the most frequent words:

all no lord king good now sir come or let enter love hath man one go upon like say know may make us yet must see tis give can take speak mine first th duke tell time exeunt much think never heart exit queen doth art great hear lady death

In [ ]:
countsRDDclean = (textRDD
          .<COMPLETAR>
          .filter(lambda x: <COMPLETAR> stopwords)
          .<COMPLETAR>
          )
countsRDDclean.cache()
pairs = countsRDDclean.takeOrdered(50,key = lambda x: -x[1])
#print pairs

words = ' '.join([x[0] for x in pairs])
print "These are the most frequent words:\n"
print words

In [ ]: