In [ ]:
#! /usr/bin/env python
import string
import avro.schema
import tarfile
import os
import re
import argparse
import dateutil.parser as parser
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
from email.utils import parseaddr
_HEADER_RE = re.compile(r'(\S+):\s*(.*$)')
_DATE_SUFFIX_RE = re.compile(r'(.+)\s*$')
def strip_extract_newsgroup_header(text):
before, _blankline, after = text.partition('\n\n')
header_text = before.splitlines(True)
header = {}
# Only extract the Date, Newsgroups, Lines, Subject, and Distribution headers
for line in header_text:
m = _HEADER_RE.match(line)
if not m:
continue
key = m.group(1)
if key and key == "Date":
header[key] = parser.parse(re.sub(_DATE_SUFFIX_RE, "", m.group(2))).isoformat()
elif key and key == "Newsgroups":
header[key] = m.group(2).split(",")
elif key and key == "Lines":
header[key] = int(m.group(2))
elif key and key in ["Subject", "Distribution"]:
header[key] = filter(lambda x: x in string.printable, m.group(2))
elif key and key == "From":
#Filter out any non-printable characters from string data, and parse the sender address
header[key] = parseaddr(filter(lambda x: x in string.printable, m.group(2)))[1]
if ("Distribution" not in header):
header["Distribution"] = "NA"
if ("Subject" not in header):
header["Subject"] = "NA"
if ("Lines" not in header):
header["Lines"] = -1
return (header, after)
_QUOTE_RE = re.compile(r'(writes in|writes:|wrote:|says:|said:'
r'|^In article|^Quoted from|^\||^>)')
def strip_newsgroup_quoting(text):
good_lines = [line for line in text.split('\n')
if not _QUOTE_RE.search(line)]
return '\n'.join(good_lines)
def strip_newsgroup_footer(text):
lines = text.strip().split('\n')
for line_num in range(len(lines) - 1, -1, -1):
line = lines[line_num]
if line.strip().strip('-') == '':
break
if line_num > 0:
return '\n'.join(lines[:line_num])
else:
return text
argparser = argparse.ArgumentParser(description = 'Convert and merge raw newsgroup data into a single Avro datafile.')
argparser.add_argument('-topic', dest = 'topic', help = 'the newsgroup topic to process', default = "")
args = argparser.parse_args()
#Each newsgroup article (message) will become a record in the output Avro data file,
#containing the header fields shown below, as well as an
#unstructured Content field containing the body text
_AVRO_SCHEMA = u"""{"namespace": "hortonworks.newsgroup",
"type": "record",
"name": "Message",
"fields": [
{"name": "articleId", "type": "int"},
{"name": "date", "type": "string"},
{"name": "from", "type": "string"},
{"name": "newsgroups", "type": "string"},
{"name": "subject", "type": ["string", "null"]},
{"name": "lines", "type": ["int", "null"]},
{"name": "distribution", "type": ["string", "null"]},
{"name": "content", "type": ["string", "null"]}
]
}"""
_AVRO_FILE_NAME = "newsgroups.avro"
schema = avro.schema.parse(_AVRO_SCHEMA)
writer = DataFileWriter(open(_AVRO_FILE_NAME, "w"), DatumWriter(), schema)
tar = tarfile.open(u"newsgroups.tgz", 'r:gz')
print r'.*' + re.escape(args.topic) + '/(\d+)$'
_FILENAME_RE = re.compile(r'.*' + re.escape(args.topic) + '/(\d+)$')
#Walk through the newgroup files, parsing each file and writing into the target Avro datafile
for tar_info in tar:
# We only want to process newsgroup articles, which have numeric file names. Ignore anything else
if not _FILENAME_RE.match(tar_info.name):
continue
file = tar.extractfile(tar_info)
print "Processing article %s" % (tar_info.name)
#For the content, we want to strip out header, footer,
#quoted content from previous messages,
#and any non-printable characters.
#We will retain a subset of header fields in the final output
data = file.read()
(header, data) = strip_extract_newsgroup_header(data)
data = strip_newsgroup_quoting(data)
data = strip_newsgroup_footer(data)
#Filter out non-printable characters from the content
data = filter(lambda x: x in string.printable, data)
#Write header and content fields to the Avro data file for the current message
writer.append({"articleId": int(_FILENAME_RE.match(tar_info.name).group(1)), "date": header["Date"],
"newsgroups": ",".join(header["Newsgroups"]), "from": header["From"],
"subject": header["Subject"], "lines": header["Lines"],
"distribution": header["Distribution"], "content": data})
file.close()
writer.close()
print "Successfully created %s" % _AVRO_FILE_NAME