In [ ]:
fruits = ['apple', 'orange', 'banana', 'grape', 'watermelon', 'apple', 'orange', 'apple']
number_partitions = 4
dataRDD = sc.parallelize(fruits, number_partitions)
print type(dataRDD)
There are 8 elements in the RDD
These are the first two:
['apple', 'orange']
These are the first two, alphabetically ordered:
['apple', 'apple']
In [ ]:
N_data = dataRDD.<COMPLETAR>()
print "There are %d elements in the RDD\n" % N_data
print "These are the first two:"
print dataRDD.<COMPLETAR>(2)
print "\nThese are the first two, alphabetically ordered:"
print dataRDD.<COMPLETAR>(2)
Testing the function:
apple fruit
These are all the elements in the RDD:
['apple fruit', 'orange fruit', 'banana fruit', 'grape fruit', 'watermelon fruit', 'apple fruit', 'orange fruit', 'apple fruit']
In [ ]:
def complete_word(word):
return <COMPLETAR>
print "Testing the function:"
print complete_word('apple')
dataRDDprocessed = dataRDD.map(<COMPLETAR>)
print "\nThese are all the elements in the RDD:"
print dataRDDprocessed.<COMPLETAR>()
In [ ]:
dataRDDprocessed_lambda = dataRDD.map(lambda x: x + ' fruit')
print "Result with a lambda function:"
print dataRDDprocessed_lambda.<COMPLETAR>()
In [ ]:
wordLengths = (dataRDDprocessed_lambda
.map(<COMPLETAR>)
.collect())
print wordLengths
type 'str'
apple orange banana grape watermelon apple orange apple
type 'str'
apple orange banana grape watermelon apple orange apple
In [ ]:
string1 = " ".join(<COMPLETAR>)
print type(string1)
print string1
string2 = dataRDD.reduce(lambda x, y: <COMPLETAR>)
print type(string2)
print string2
In [ ]:
Nchars = sum(dataRDD.<COMPLETAR>)
print Nchars
Nchars = dataRDD.map(len).reduce(<COMPLETAR>)
print Nchars
(k,v)
where k
is the key and v
is the value.
[('apple', 1), ('orange', 1), ('banana', 1), ('grape', 1), ('watermelon', 1), ('apple', 1), ('orange', 1), ('apple', 1)]
Grouped pairs as an interable:
[('orange', ), ('watermelon', ), ('grape', ), ('apple', ), ('banana', )]
Grouped pairs as a list
[('orange', [1, 1]), ('watermelon', [1]), ('grape', [1]), ('apple', [1, 1, 1]), ('banana', [1])]
Grouped pairs + count
[('orange', 2), ('watermelon', 1), ('grape', 1), ('apple', 3), ('banana', 1)]
In [ ]:
pairRDD = dataRDD.map(lambda x: (x, 1))
print pairRDD.collect()
In [ ]:
print "Result: (key, iterable):"
groupedRDD = pairRDD.groupByKey()
print groupedRDD.collect()
print " "
print "Result: (key, list of results):"
groupedRDDprocessed = groupedRDD.mapValues(list)
print groupedRDDprocessed.collect()
print " "
print "Result: (key, count):"
groupedRDDprocessed = groupedRDD.mapValues(len)
print groupedRDDprocessed.collect()
print " "
Result: (key, count):
[('apple', 1), ('orange', 1)]
[('orange', 2), ('watermelon', 1), ('grape', 1), ('apple', 3), ('banana', 1)]
In [ ]:
print "Result: (key, count):"
countRDD = pairRDD.groupByKey().map(<COMPLETAR>)
print countRDD.collect()
print " "
In [ ]:
print "Result: (key, count):"
countRDD = pairRDD.reduceByKey(<COMPLETAR>)
print countRDD.collect()
print " "
In [ ]:
counts = (dataRDD
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
)
print counts
In [ ]:
N_unique_words = (dataRDD
.<COMPLETAR>
.<COMPLETAR>
.filter(<COMPLETAR>)
.count()
)
print N_unique_words
SparkContext.textFile()
method.
In [ ]:
textRDD = sc.textFile('data/shakespeare.txt', 8)
print "Number of lines of text = %d" % textRDD.count()
[(u'', 9493), (u' thou diest in thine unthankfulness, and thine ignorance makes', 1), (u" Which I shall send you written, be assur'd", 1), (u' I do beseech you, take it not amiss:', 1), (u' their mastiffs are of unmatchable courage.', 1), (u' With us in Venice, if it be denied,', 1), (u" Hot. I'll have it so. A little charge will do it.", 1), (u' By what yourself, too, late have spoke and done,', 1), (u" FIRST LORD. He's but a mad lord, and nought but humours sways him.", 1), (u' none will entertain it.', 1)]
In [ ]:
counts = (textRDD
.map(lambda x: (x, 1))
.<COMPLETAR>
.take(10)
)
print counts
In [ ]:
counts = (textRDD
.flatMap(lambda x: x.split())
.map(<COMPLETAR>)
.<COMPLETAR>
.take(10)
)
print counts
In [ ]:
counts = (textRDD
.flatMap(<COMPLETAR>)
.map(<COMPLETAR>)
.reduceByKey(<COMPLETAR>)
.filter(<COMPLETAR>)
.take(<COMPLETAR>)
)
print counts
In [ ]:
counts = (textRDD
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
.takeOrdered(5,key = lambda x: <COMPLETAR>)
)
print counts
[(u'the', 27267), (u'and', 25340), (u'i', 19540), (u'to', 18656), (u'of', 17301)]
In [ ]:
def clean_text(string):
string = string.lower()
return string
In [ ]:
counts = (textRDD
.flatMap(<COMPLETAR>)
.map(<COMPLETAR>)
.map(<COMPLETAR>)
.reduceByKey(<COMPLETAR>)
.takeOrdered(<COMPLETAR>)
)
print counts
The database has 40957 words that need cleaning, for example:
[(u"i'll", 1737), (u'you,', 1478), (u"'tis", 1367), (u'sir,', 1235), (u'me,', 1219), (u"th'", 1146), (u'o,', 1008), (u'lord,', 977), (u'come,', 875), (u'me.', 823), (u'you.', 813), (u'why,', 805), (u'now,', 785), (u'it.', 784), (u'him.', 755), (u'lord.', 702), (u'him,', 698), (u'ay,', 661), (u'well,', 647), (u'and,', 647)]
In [ ]:
countsRDD = (textRDD
.flatMap(<COMPLETAR>)
.map(<COMPLETAR>)
.filter(lambda x: not x.isalpha())
.map(<COMPLETAR>)
.reduceByKey(<COMPLETAR>)
)
countsRDD.cache()
print "The database has %d words that need cleaning, for example:\n" % countsRDD.count()
print countsRDD.takeOrdered(20,key = lambda x: -x[1])
The database has 0 elements that need preprocessing, for example:
[]
In [ ]:
def new_clean_text(string):
string = string.lower()
list_of_chars = ['.', <COMPLETAR>]
for c in <COMPLETAR>:
string = string.replace(c,'')
return string
countsRDD = (textRDD
.flatMap(<COMPLETAR>)
.map(new_clean_text)
.filter(lambda x: not x.isnumeric())
.filter(lambda x: len(x)>0)
.filter(lambda x: not x.isalnum())
.map(<COMPLETAR>)
.reduceByKey(<COMPLETAR>)
)
countsRDD.cache()
Npreprocess = countsRDD.count()
print "The database has %d elements that need preprocessing, for example:" % Npreprocess
print countsRDD.takeOrdered(20,key = lambda x: -x[1])
Processing the dataset to find the 20 most frequent strings:
[(u'the', 27361), (u'and', 26028), (u'i', 20681), (u'to', 19150), (u'of', 17463), (u'a', 14593), (u'you', 13615), (u'my', 12481), (u'in', 10956), (u'that', 10890), (u'is', 9134), (u'not', 8497), (u'with', 7771), (u'me', 7769), (u'it', 7678), (u'for', 7558), (u'be', 6857), (u'his', 6857), (u'your', 6655), (u'this', 6602)]
In [ ]:
print "Processing the dataset to find the 20 most frequent strings:\n"
countsRDDclean = (textRDD
.<COMPLETAR>
)
countsRDDclean.cache()
print countsRDDclean.takeOrdered(20,key = lambda x: -x[1])
In [ ]:
import csv
with open('data/english_stopwords.txt', 'rb') as csvfile:
reader = csv.reader(csvfile)
stopwords = []
for row in reader:
stopwords.append(row[0].replace("'",'').replace('\t',''))
stopwords = [unicode(s, "utf-8") for s in stopwords]
print stopwords
These are the most frequent words:
all no lord king good now sir come or let enter love hath man one go upon like say know may make us yet must see tis give can take speak mine first th duke tell time exeunt much think never heart exit queen doth art great hear lady death
In [ ]:
countsRDDclean = (textRDD
.<COMPLETAR>
.filter(lambda x: <COMPLETAR> stopwords)
.<COMPLETAR>
)
countsRDDclean.cache()
pairs = countsRDDclean.takeOrdered(50,key = lambda x: -x[1])
#print pairs
words = ' '.join([x[0] for x in pairs])
print "These are the most frequent words:\n"
print words
In [ ]: