In [1]:
from pyspark import SparkContext
sc = SparkContext('local', 'ipynb Example')
In [2]:
import re
file = sc.textFile('hdfs://localhost:8020/user/root/GCD-Week-6-access_log.txt')
In [3]:
# A regex for matching the nginx log line.
# The only problem with this approach is that it does not always match every line
reg = re.compile('(?P<ipaddress>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - - \[(?P<dateandtime>\d{2}\/[a-zA-Z]{3}\/\d{4}:\d{2}:\d{2}:\d{2} (\+|\-)\d{4})\] ((\"(?P<method>[A-Z]+) )(?P<url>.+) (HTTP\/1\.1\")) (?P<statuscode>\d{3}) (?P<bytessent>\d+)')
In [4]:
file.top(1)
Out[4]:
In [5]:
# First parse the line
# Then get the url component of the line if the line was successfully parsed
# Then map it to one, e.g. ('/', 1)
# Then reduce it to count them
counts = file.map(lambda line: reg.match(line))\
.map(lambda group: group.group('url') if group else None)\
.map(lambda url: (url, 1))\
.reduceByKey(lambda a, b: a + b)
actual_counts = dict(counts.collect())
# For example:
# ‘/assets/js/the-associates.js’
actual_counts['/assets/js/the-associates.js']
Out[5]:
In [6]:
# This is very similar with the source ip:
counts_ip = file.map(lambda line: reg.match(line))\
.map(lambda group: group.group('ipaddress') if group else None)\
.map(lambda ip: (ip, 1))\
.reduceByKey(lambda a, b: a + b)
actual_counts_ip = dict(counts_ip.collect())
# for example 10.99.99.186"
actual_counts_ip['10.99.99.186']
Out[6]:
In [7]:
# Ordering is also quite easy to do in spark. For instance for the most commonly requested file:
counts = file.map(lambda line: reg.match(line))\
.map(lambda group: group.group('url') if group else None)\
.map(lambda url: (url, 1))\
.reduceByKey(lambda a,b:a+b)\
.sortBy(lambda pair: -pair[1])
# this orders by the second pair item, where pair is ('path', count)
actual_counts = counts.collect()
# The first 20 items:
for path, count in actual_counts[:20]:
print("%s: %d" % (path, count))
# Here you can see the None problem. To fix it a better regex needs to be created or another parsing method should be used
In [8]:
gutenberg_file = sc.textFile('hdfs://localhost:8020/user/root/gutenberg_total.txt')
In [9]:
import string
import sys
sys.path.insert(0, '.')
sys.path.insert(0, './Portfolio')
from MapReduce_code import allStopWords as stopwords
punc = str.maketrans('', '', string.punctuation)
def rem_punctuation(inp):
return inp.translate(punc)
In [10]:
word_count = gutenberg_file.flatMap(lambda line:
[word for word in rem_punctuation(line).lower().split(' ')])\
.filter(lambda word: len(word) > 1)\
.filter(lambda word: word not in stopwords)\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a + b)\
.sortBy(lambda pair: -pair[1])
actual_word_count = word_count.collect()
for word, count in actual_word_count[:10]:
print("%s:\t%d" % (word, count))
In [11]:
# store the result to a file
def convert_to_csv(data):
return ','.join([str(d) for d in data])
csv_lines = word_count.map(convert_to_csv)
csv_lines.saveAsTextFile('hdfs://localhost:8020/user/root/wordcount.csv')
Result:
[root@quickstart /]# hadoop fs -ls wordcount.csv
Found 2 items
-rw-r--r-- 3 rick supergroup 0 2017-10-24 09:12 wordcount.csv/_SUCCESS
-rw-r--r-- 3 rick supergroup 2544698 2017-10-24 09:12 wordcount.csv/part-00000
[root@quickstart /]# hadoop fs -ls wordcount.csv/part-00000
-rw-r--r-- 3 rick supergroup 2544698 2017-10-24 09:12 wordcount.csv/part-00000
^[[A[root@quickstart /]# hadoop fs -tail wordcount.csv/part-00000
hlessly,1
lemonp,1
aptp,1
braceletp,1
idref9chapter,1
In [ ]: