Spark and Python

Spark is another framework ontop of HDFS/Hadoop.

It gives an api compatible with many languages, includig Python.

In this notebook I will give some examples based on an NGINX like access log.


In [1]:
from pyspark import SparkContext

sc = SparkContext('local', 'ipynb Example')

In [2]:
import re

file = sc.textFile('hdfs://localhost:8020/user/root/GCD-Week-6-access_log.txt')

In [3]:
# A regex for matching the nginx log line.
# The only problem with this approach is that it does not always match every line
reg = re.compile('(?P<ipaddress>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - - \[(?P<dateandtime>\d{2}\/[a-zA-Z]{3}\/\d{4}:\d{2}:\d{2}:\d{2} (\+|\-)\d{4})\] ((\"(?P<method>[A-Z]+) )(?P<url>.+) (HTTP\/1\.1\")) (?P<statuscode>\d{3}) (?P<bytessent>\d+)')

In [4]:
file.top(1)


Out[4]:
['10.99.99.58 - - [17/Jul/2011:10:51:29 -0700] "GET /favicon.ico HTTP/1.1" 200 1406']

Count the amount of requests per path or address


In [5]:
# First parse the line
# Then get the url component of the line if the line was successfully parsed
# Then map it to one, e.g. ('/', 1)
# Then reduce it to count them
counts = file.map(lambda line: reg.match(line))\
    .map(lambda group: group.group('url') if group else None)\
    .map(lambda url: (url, 1))\
    .reduceByKey(lambda a, b: a + b)

actual_counts = dict(counts.collect())
# For example:
# ‘/assets/js/the-associates.js’
actual_counts['/assets/js/the-associates.js']


Out[5]:
217

In [6]:
# This is very similar with the source ip:
counts_ip = file.map(lambda line: reg.match(line))\
    .map(lambda group: group.group('ipaddress') if group else None)\
    .map(lambda ip: (ip, 1))\
    .reduceByKey(lambda a, b: a + b)

actual_counts_ip = dict(counts_ip.collect())

# for example 10.99.99.186"
actual_counts_ip['10.99.99.186']


Out[6]:
4

Ordering


In [7]:
# Ordering is also quite easy to do in spark. For instance for the most commonly requested file:
counts = file.map(lambda line: reg.match(line))\
    .map(lambda group: group.group('url') if group else None)\
    .map(lambda url: (url, 1))\
    .reduceByKey(lambda a,b:a+b)\
    .sortBy(lambda pair: -pair[1])
# this orders by the second pair item, where pair is ('path', count)

actual_counts = counts.collect()

# The first 20 items:
for path, count in actual_counts[:20]:
    print("%s: %d" % (path, count))
    
# Here you can see the None problem. To fix it a better regex needs to be created or another parsing method should be used


None: 636479
/assets/css/combined.css: 103709
/assets/js/javascript_combined.js: 99176
/: 92372
/assets/img/home-logo.png: 84132
/assets/css/printstyles.css: 83268
/images/filmpics/0000/3695/Pelican_Blood_2D_Pack.jpg: 83105
/favicon.ico: 63228
/robots.txt: 38732
/images/filmpics/0000/3139/SBX476_Vanquisher_2d.jpg: 35633
/assets/img/search-button.gif: 31834
/images/filmmediablock/290/Harpoon_2d.JPG: 30793
/assets/img/play_icon.png: 30241
/images/filmpics/0000/1421/RagingPhoenix_2DSleeve.jpeg: 27549
/assets/img/x.gif: 25081
/release-schedule/: 24620
/search/: 22744
/assets/img/release-schedule-logo.png: 20518
/release-schedule: 17740
/assets/img/banner/ten-years-banner-white.jpg: 15877

Map Reduce wordcount in Spark


In [8]:
gutenberg_file = sc.textFile('hdfs://localhost:8020/user/root/gutenberg_total.txt')

In [9]:
import string
import sys
sys.path.insert(0, '.')
sys.path.insert(0, './Portfolio')
from MapReduce_code import allStopWords as stopwords

punc = str.maketrans('', '', string.punctuation)


def rem_punctuation(inp):
    return inp.translate(punc)

In [10]:
word_count = gutenberg_file.flatMap(lambda line: 
                                    [word for word in rem_punctuation(line).lower().split(' ')])\
    .filter(lambda word: len(word) > 1)\
    .filter(lambda word: word not in stopwords)\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a, b: a + b)\
    .sortBy(lambda pair: -pair[1])

actual_word_count = word_count.collect()

for word, count in actual_word_count[:10]:
    print("%s:\t%d" % (word, count))


one:	49502
said:	49272
now:	31991
will:	28301
us:	28247
time:	26309
like:	26102
project:	25751
can:	24689
back:	24330

In [11]:
# store the result to a file
def convert_to_csv(data):
    return ','.join([str(d) for d in data])


csv_lines = word_count.map(convert_to_csv)
csv_lines.saveAsTextFile('hdfs://localhost:8020/user/root/wordcount.csv')

Result:

[root@quickstart /]# hadoop fs -ls wordcount.csv
Found 2 items
-rw-r--r--   3 rick supergroup          0 2017-10-24 09:12 wordcount.csv/_SUCCESS
-rw-r--r--   3 rick supergroup    2544698 2017-10-24 09:12 wordcount.csv/part-00000
[root@quickstart /]# hadoop fs -ls wordcount.csv/part-00000
-rw-r--r--   3 rick supergroup    2544698 2017-10-24 09:12 wordcount.csv/part-00000
^[[A[root@quickstart /]# hadoop fs -tail wordcount.csv/part-00000
hlessly,1
lemonp,1
aptp,1
braceletp,1
idref9chapter,1

Note to teacher

  • sc.textFile already returns a RDD object
  • map, flatMap already is an transformation
  • filter, reduceByKey, sortBy already is an action

In [ ]: