This notebook was prepared by Donne Martin. Source and license info is on GitHub.
mrjob lets you write MapReduce jobs in Python 2.5+ and run them on several platforms. You can:
From PyPI:
pip install mrjob
From source:
python setup.py install
See Sample Config File section for additional config details.
Sample mrjob code that processes log files on Amazon S3 based on the S3 logging format:
In [ ]:
%%file mr_s3_log_parser.py
import time
from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol, ReprProtocol
import re
class MrS3LogParser(MRJob):
"""Parses the logs from S3 based on the S3 logging format:
http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html
Aggregates a user's daily requests by user agent and operation
Outputs date_time, requester, user_agent, operation, count
"""
LOGPATS = r'(\S+) (\S+) \[(.*?)\] (\S+) (\S+) ' \
r'(\S+) (\S+) (\S+) ("([^"]+)"|-) ' \
r'(\S+) (\S+) (\S+) (\S+) (\S+) (\S+) ' \
r'("([^"]+)"|-) ("([^"]+)"|-)'
NUM_ENTRIES_PER_LINE = 17
logpat = re.compile(LOGPATS)
(S3_LOG_BUCKET_OWNER,
S3_LOG_BUCKET,
S3_LOG_DATE_TIME,
S3_LOG_IP,
S3_LOG_REQUESTER_ID,
S3_LOG_REQUEST_ID,
S3_LOG_OPERATION,
S3_LOG_KEY,
S3_LOG_HTTP_METHOD,
S3_LOG_HTTP_STATUS,
S3_LOG_S3_ERROR,
S3_LOG_BYTES_SENT,
S3_LOG_OBJECT_SIZE,
S3_LOG_TOTAL_TIME,
S3_LOG_TURN_AROUND_TIME,
S3_LOG_REFERER,
S3_LOG_USER_AGENT) = range(NUM_ENTRIES_PER_LINE)
DELIMITER = '\t'
# We use RawValueProtocol for input to be format agnostic
# and avoid any type of parsing errors
INPUT_PROTOCOL = RawValueProtocol
# We use RawValueProtocol for output so we can output raw lines
# instead of (k, v) pairs
OUTPUT_PROTOCOL = RawValueProtocol
# Encode the intermediate records using repr() instead of JSON, so the
# record doesn't get Unicode-encoded
INTERNAL_PROTOCOL = ReprProtocol
def clean_date_time_zone(self, raw_date_time_zone):
"""Converts entry 22/Jul/2013:21:04:17 +0000 to the format
'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into
a database such as Redshift or RDS
Note: requires the chars "[ ]" to be stripped prior to input
Returns the converted datetime annd timezone
or None for both values if failed
TODO: Needs to combine timezone with date as one field
"""
date_time = None
time_zone_parsed = None
# TODO: Probably cleaner to parse this with a regex
date_parsed = raw_date_time_zone[:raw_date_time_zone.find(":")]
time_parsed = raw_date_time_zone[raw_date_time_zone.find(":") + 1:
raw_date_time_zone.find("+") - 1]
time_zone_parsed = raw_date_time_zone[raw_date_time_zone.find("+"):]
try:
date_struct = time.strptime(date_parsed, "%d/%b/%Y")
converted_date = time.strftime("%Y-%m-%d", date_struct)
date_time = converted_date + " " + time_parsed
# Throws a ValueError exception if the operation fails that is
# caught by the calling function and is handled appropriately
except ValueError as error:
raise ValueError(error)
else:
return converted_date, date_time, time_zone_parsed
def mapper(self, _, line):
line = line.strip()
match = self.logpat.search(line)
date_time = None
requester = None
user_agent = None
operation = None
try:
for n in range(self.NUM_ENTRIES_PER_LINE):
group = match.group(1 + n)
if n == self.S3_LOG_DATE_TIME:
date, date_time, time_zone_parsed = \
self.clean_date_time_zone(group)
# Leave the following line of code if
# you want to aggregate by date
date_time = date + " 00:00:00"
elif n == self.S3_LOG_REQUESTER_ID:
requester = group
elif n == self.S3_LOG_USER_AGENT:
user_agent = group
elif n == self.S3_LOG_OPERATION:
operation = group
else:
pass
except Exception:
yield (("Error while parsing line: %s", line), 1)
else:
yield ((date_time, requester, user_agent, operation), 1)
def reducer(self, key, values):
output = list(key)
output = self.DELIMITER.join(output) + \
self.DELIMITER + \
str(sum(values))
yield None, output
def steps(self):
return [
self.mr(mapper=self.mapper,
reducer=self.reducer)
]
if __name__ == '__main__':
MrS3LogParser.run()
Run an Amazon Elastic MapReduce (EMR) job on the given input (must be a flat file hierarchy), placing the results in the output (output directory must not exist):
In [ ]:
!python mr_s3_log_parser.py -r emr s3://bucket-source/ --output-dir=s3://bucket-dest/
Run a MapReduce job locally on the specified input file, sending the results to the specified output file:
In [ ]:
!python mr_s3_log_parser.py input_data.txt > output_data.txt
Accompanying unit test:
In [ ]:
%%file test_mr_s3_log_parser.py
from StringIO import StringIO
import unittest2 as unittest
from mr_s3_log_parser import MrS3LogParser
class MrTestsUtil:
def run_mr_sandbox(self, mr_job, stdin):
# inline runs the job in the same process so small jobs tend to
# run faster and stack traces are simpler
# --no-conf prevents options from local mrjob.conf from polluting
# the testing environment
# "-" reads from standard in
mr_job.sandbox(stdin=stdin)
# make_runner ensures job cleanup is performed regardless of
# success or failure
with mr_job.make_runner() as runner:
runner.run()
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
yield value
class TestMrS3LogParser(unittest.TestCase):
mr_job = None
mr_tests_util = None
RAW_LOG_LINE_INVALID = \
'00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \
'00000388225bcc00000 ' \
's3-storage [22/Jul/2013:21:03:27 +0000] ' \
'00.111.222.33 ' \
RAW_LOG_LINE_VALID = \
'00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \
'00000388225bcc00000 ' \
's3-storage [22/Jul/2013:21:03:27 +0000] ' \
'00.111.222.33 ' \
'arn:aws:sts::000005646931:federated-user/user 00000AB825500000 ' \
'REST.HEAD.OBJECT user/file.pdf ' \
'"HEAD /user/file.pdf?versionId=00000XMHZJp6DjM9x500000' \
'00000SDZk ' \
'HTTP/1.1" 200 - - 4000272 18 - "-" ' \
'"Boto/2.5.1 (darwin) USER-AGENT/1.0.14.0" ' \
'00000XMHZJp6DjM9x5JVEAMo8MG00000'
DATE_TIME_ZONE_INVALID = "AB/Jul/2013:21:04:17 +0000"
DATE_TIME_ZONE_VALID = "22/Jul/2013:21:04:17 +0000"
DATE_VALID = "2013-07-22"
DATE_TIME_VALID = "2013-07-22 21:04:17"
TIME_ZONE_VALID = "+0000"
def __init__(self, *args, **kwargs):
super(TestMrS3LogParser, self).__init__(*args, **kwargs)
self.mr_job = MrS3LogParser(['-r', 'inline', '--no-conf', '-'])
self.mr_tests_util = MrTestsUtil()
def test_invalid_log_lines(self):
stdin = StringIO(self.RAW_LOG_LINE_INVALID)
for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):
self.assertEqual(result.find("Error"), 0)
def test_valid_log_lines(self):
stdin = StringIO(self.RAW_LOG_LINE_VALID)
for result in self.mr_tests_util.run_mr_sandbox(self.mr_job, stdin):
self.assertEqual(result.find("Error"), -1)
def test_clean_date_time_zone(self):
date, date_time, time_zone_parsed = \
self.mr_job.clean_date_time_zone(self.DATE_TIME_ZONE_VALID)
self.assertEqual(date, self.DATE_VALID)
self.assertEqual(date_time, self.DATE_TIME_VALID)
self.assertEqual(time_zone_parsed, self.TIME_ZONE_VALID)
# Use a lambda to delay the calling of clean_date_time_zone so that
# assertRaises has enough time to handle it properly
self.assertRaises(ValueError,
lambda: self.mr_job.clean_date_time_zone(
self.DATE_TIME_ZONE_INVALID))
if __name__ == '__main__':
unittest.main()
Run the mrjob test:
In [ ]:
!python test_mr_s3_log_parser.py -v
In [ ]:
runners:
emr:
aws_access_key_id: __ACCESS_KEY__
aws_secret_access_key: __SECRET_ACCESS_KEY__
aws_region: us-east-1
ec2_key_pair: EMR
ec2_key_pair_file: ~/.ssh/EMR.pem
ssh_tunnel_to_job_tracker: true
ec2_master_instance_type: m3.xlarge
ec2_instance_type: m3.xlarge
num_ec2_instances: 5
s3_scratch_uri: s3://bucket/tmp/
s3_log_uri: s3://bucket/tmp/logs/
enable_emr_debugging: True
bootstrap:
- sudo apt-get install -y python-pip
- sudo pip install --upgrade simplejson