In [6]:
import sys
words=""
for line in sys.stdin:
# Get the words in each line
words = line.split()
# Generate the count for each word
for word in words:
# Write the key-value pair to stdout to be processed by
# the reducer.
# The key is anything before the first tab character and the #value is anything after the first tab character.
print ('{0}\t{1}'.format(word, 1))
In [10]:
import sys
curr_word = None
curr_count = 0
# Process each key-value pair from the mapper
for line in sys.stdin:
# Get the key and value from the current line
word, count = line.split('\t') # Convert the count to an int
count = int(count)
# If the current word is the same as the previous word, # increment its count, otherwise print the words count # to stdout
if word == curr_word:
curr_count += count
else:
# Write word and its number of occurrences as a key-value # pair to stdout
if curr_word:
print ('{0}\t{1}'.format(curr_word, curr_count))
curr_word = word
# Output the count for the last word
if curr_word == word:
print ('{0}\t{1}'.format(curr_word, curr_count))
In [7]:
from pyspark import SparkContext
import re
import sys
def main():
# Insure a search term was supplied at the command line
if len(sys.argv) != 2:
sys.stderr.write('Usage: {} <search_term>'.format(sys.argv[0]))
sys.exit()
# Create the SparkContext
sc = SparkContext(appName='SparkWordCount') # Broadcast the requested term
requested_movie = sc.broadcast(sys.argv[1]) # Load the input file
source_file = sc.textFile('/user/hduser/input/movies') # Get the movie title from the second fields
titles = source_file.map(lambda line: line.split('|')[1])
# Create a map of the normalized title to the raw title
normalized_title = titles.map(lambda title: (re.sub(r'\s*\ (\d{4}\)','', title).lower(), title))
# Find all movies matching the requested_movie
matches = normalized_title.filter(lambda x: reques ted_movie.value in x[0])
# Collect all the matching titles
matching_titles = matches.map(lambda x: x[1]).distinct().col lect()
# Display the result
print ('{} Matching titles found:'.format(len(matching_titles)))
for title in matching_titles:
print (title)
sc.stop()
if __name__ == '__main__':
main()
In [8]:
from pyspark import SparkContext
import re
import sys
def main():
# Insure a search term was supplied at the command line
if len(sys.argv) != 2:
sys.stderr.write('Usage: {} <search_term>'.format(sys.argv[0]))
sys.exit()
# Create the SparkContext
sc = SparkContext(appName='SparkWordCount')
# Broadcast the requested term
requested_movie = sc.broadcast(sys.argv[1])
# Load the input file
source_file = sc.textFile('/user/hduser/input/movies')
# Get the movie title from the second fields
titles = source_file.map(lambda line: line.split('|')[1])
# Create a map of the normalized title to the raw title
normalized_title = titles.map(lambda title: (re.sub(r'\s*\(\d{4}\)','', title).lower(), title))
# Find all movies matching the requested_movie
matches = normalized_title.filter(lambda x: requested_movie.value in x[0])
# Collect all the matching titles
matching_titles = matches.map(lambda x: x[1]).distinct().collect()
# Display the result
print ('{} Matching titles found:'.format(len(matching_titles)))
for title in matching_titles:
print (title)
sc.stop()
if __name__ == '__main__':
main()
In [9]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
In [10]:
distFile = sc.textFile("terminal.txt")
In [11]:
lines = sc.textFile("terminal.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
In [13]:
import sys
from pyspark import SparkContext, SparkConf
if __name__ == "__main__":
# create Spark context with Spark configuration
conf = SparkConf().setAppName("Spark Count")
# get threshold
threshold = int(sys.argv[2])
# read in text file and split each document into words
tokenized = sc.textFile(sys.argv[1]).flatMap(lambda line: line.split(" "))
# count the occurrence of each word
wordCounts = tokenized.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2)
# filter out words with fewer than threshold occurrences
filtered = wordCounts.filter(lambda pair:pair[1] >= threshold)
# count characters
charCounts = filtered.flatMap(lambda pair:pair[0]).map(lambda c: c).map(lambda c: (c, 1)).reduceByKey(lambda v1,v2:v1 +v2)
list = charCounts.collect()
print (repr(list)[1:-1])
In [ ]: