In [ ]:
from __future__ import print_function
In [ ]:
import boto
from boto.s3.connection import S3Connection
from itertools import islice
conn = S3Connection(anon=True)
bucket = conn.get_bucket('aws-publicdatasets')
In [ ]:
# a function to help with looking through S3 buckets
from itertools import islice
def keys_in_bucket(bucket, prefix, truncate_path=True, limit=None):
"""
given a S3 boto bucket, yields the keys in bucket with the prefix
if truncate_path is True, remove prefix in keys
optional limit to number of keys to return
"""
keys = islice(bucket.list(prefix=prefix,
delimiter="/"),
limit)
for key in keys:
name = key.name.encode("UTF-8")
if truncate_path:
name = name.replace(prefix, "", 1)
yield name
The top level bucket is s3://aws-publicdatasets/common-crawl/
In [ ]:
!s3cmd ls s3://aws-publicdatasets/common-crawl/
In [ ]:
# using boto to pull up the same list of buckets
list(keys_in_bucket(bucket, "common-crawl/"))
Three eras of CC crawls
within s3://aws-publicdatasets/common-crawl/
I think of 3 phases:
Let's look this up
In [ ]:
# pre - 2012
!s3cmd ls s3://aws-publicdatasets/common-crawl/crawl-001/
In [ ]:
# pre - 2012
!s3cmd ls s3://aws-publicdatasets/common-crawl/crawl-002/
In [ ]:
# 2012: https://commoncrawl.atlassian.net/wiki/display/CRWL/About+the+Data+Set
!s3cmd ls s3://aws-publicdatasets/common-crawl/parse-output/
In [ ]:
!s3cmd ls s3://aws-publicdatasets/common-crawl/crawl-data/ | grep CC-MAIN
In [ ]:
# for the 2012 data that I was working with in 2013.
# not a good idea to take the contents for "common-crawl/parse-output/segment/" as valid segments
# instead -- depend on s3://aws-publicdatasets/common-crawl/parse-output/valid_segments.txt
# comment out so I won't run
# for (i,key) in enumerate(islice(keys_in_bucket(bucket, "common-crawl/parse-output/segment/"),
# 10)):
# print (i, key)
#!s3cmd ls s3://aws-publicdatasets/common-crawl/parse-output/
In [ ]:
!s3cmd ls s3://aws-publicdatasets/common-crawl/parse-output/valid_segments.txt
In [ ]:
k = bucket.get_key("common-crawl/parse-output/valid_segments.txt")
s = k.get_contents_as_string()
valid_segments = filter(None, s.split("\n"))
print (len(valid_segments), valid_segments[0])
http://commoncrawl.org/april-2014-crawl-data-available/
https://aws-publicdatasets.s3.amazonaws.com/common-crawl/crawl-data/CC-MAIN-2014-15/segment.paths.gz
In [ ]:
list(keys_in_bucket(bucket, "common-crawl/crawl-data/CC-MAIN-2014-15/"))
Do all of the nutch era crawls follow the same structure?
In [ ]:
[key for key in keys_in_bucket(bucket, "common-crawl/crawl-data/") if key.startswith("CC-MAIN")]
In [ ]:
# convert year/week number to datetime
import datetime
def week_in_year(year, week):
return datetime.datetime(year,1,1) + datetime.timedelta((week-1)*7)
week_in_year(2014,15)
In [ ]:
import sys
sys.maxint
In [ ]:
import gzip
import StringIO
# the following functions
def gzip_from_key(bucket, key_name):
k = bucket.get_key(key_name)
f = gzip.GzipFile(fileobj=StringIO.StringIO(k.get_contents_as_string()))
return f
def segments_from_gzip(gz):
s = gz.read()
return filter(None, s.split("\n"))
In [ ]:
# let's parse segment.paths.gz
valid_segments = segments_from_gzip(gzip_from_key(bucket,
"common-crawl/crawl-data/CC-MAIN-2014-15/segment.paths.gz"))
valid_segments[:5]
In [ ]:
# rewrite to deal with gzip on the fly?
# DON'T KNOW WHY THIS DOESN'T WORK -- COME BACK
from boto.s3.key import Key
from gzipstream import GzipStreamFile
def gzip_from_key_2(bucket_name, key_name):
pds = conn.get_bucket(bucket_name)
k = Key(pds)
k.key = key_name
f = GzipStreamFile(k)
return f
def segments_from_gzip_2(gz):
BUF_SIZE = 1000
s = ""
buffer = gz.read(BUF_SIZE)
while buffer:
s += buffer
buffer = gz.read(BUF_SIZE)
return filter(None, s.split("\n"))
valid_segments_2 = segments_from_gzip_2(gzip_from_key_2(bucket,
"common-crawl/crawl-data/CC-MAIN-2014-15/segment.paths.gz"))
valid_segments_2[:5]
all WAT files (CC-MAIN-2014-15/wat.paths.gz)
In [ ]:
wat_segments = segments_from_gzip(gzip_from_key(bucket,
"common-crawl/crawl-data/CC-MAIN-2014-15/wat.paths.gz"))
len(wat_segments)
In [ ]:
wat_segments[0]
http://commoncrawl.org/navigating-the-warc-file-format/
json file
forgot to check how big the file is before downloading it.
In [ ]:
k = bucket.get_key(wat_segments[0])
k.size
In [ ]:
def get_key_sizes(bucket, keys):
sizes = []
for key in keys:
k = bucket.get_key(key)
sizes.append((key, k.size if hasattr(k, 'size') else 0))
return sizes
In [ ]:
get_key_sizes(bucket, wat_segments[0:20])
In [ ]:
# http://stackoverflow.com/questions/2348317/how-to-write-a-pager-for-python-iterators/2350904#2350904
def grouper(iterable, page_size):
page= []
for item in iterable:
page.append( item )
if len(page) == page_size:
yield page
page= []
if len(page) > 0:
yield page
when to use Processes vs Threads? I think you use threads when there's no danger of memory collision
If we submit "jobs" to different threads, those jobs can be pictured as "sub-tasks" of a single process and those threads will usually have access to the same memory areas (i.e., shared memory). This approach can easily lead to conflicts in case of improper synchronization, for example, if processes are writing to the same memory location at the same time.
In [ ]:
wat_segments[0:2]
In [ ]:
# with pool.map
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
get_key_sizes_for_bucket = partial(get_key_sizes, bucket)
PAGE_SIZE = 1
POOL_SIZE = 100
MAX_SEGMENTS = 100
pool = ThreadPool(POOL_SIZE)
results = pool.map(get_key_sizes_for_bucket, grouper(wat_segments[0:MAX_SEGMENTS], PAGE_SIZE))
pool.close()
pool.join()
In [ ]:
# Pool.imap_unordered
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
get_key_sizes_for_bucket = partial(get_key_sizes, bucket)
PAGE_SIZE = 1
POOL_SIZE = 100
MAX_SEGMENTS = 500 # replace with None for all segments
CHUNK_SIZE = 10
pool = ThreadPool(POOL_SIZE)
results_iter = pool.imap_unordered(get_key_sizes_for_bucket,
grouper(wat_segments[0:MAX_SEGMENTS], PAGE_SIZE),
CHUNK_SIZE)
results = []
for (i, page) in enumerate(islice(results_iter,None)):
print ('\r>> Result %d' % i, end="")
for result in page:
results.append(result)
In [ ]:
results
In [ ]:
from pandas import Series, DataFrame
df = DataFrame(results, columns=['key', 'size'])
df.head()
In [ ]:
df['size'].describe()
In [ ]:
%pylab --no-import-all inline
In [ ]:
import pylab as P
P.hist(df['size'])
In [ ]:
# total byte size of all the wat files
print (format(sum(df['size']),",d"))
In [ ]:
# save results
import csv
with open('CC-MAIN-2014-15.wat.csv', 'wb') as csvfile:
wat_writer = csv.writer(csvfile, delimiter=',',
quotechar='|', quoting=csv.QUOTE_MINIMAL)
wat_writer.writerow(['key', 'size'])
for result in results:
wat_writer.writerow(result)
In [ ]:
!head CC-MAIN-2014-15.wat.csv
In [ ]:
# this won't work for the version of boto I'm currently using -- a bug to be resolved
key = bucket.get_key(wat_segments[0])
url = key.generate_url(expires_in=0, query_auth=False)
url
In [ ]:
wat_segments[0]
In [ ]:
# looks like within any segment ID, we should expect warc, wat, wet buckets
!s3cmd ls s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-15/segments/1397609521512.15/
In [ ]:
from gzipstream import GzipStreamFile
In [ ]:
import boto
from itertools import islice
from boto.s3.key import Key
from gzipstream import GzipStreamFile
import warc
import json
def test_gzipstream():
output = []
# Let's use a random gzipped web archive (WARC) file from the 2014-15 Common Crawl dataset
## Connect to Amazon S3 using anonymous credentials
conn = boto.connect_s3(anon=True)
pds = conn.get_bucket('aws-publicdatasets')
## Start a connection to one of the WARC files
k = Key(pds)
k.key = 'common-crawl/crawl-data/CC-MAIN-2014-15/segments/1397609521512.15/warc/CC-MAIN-20140416005201-00000-ip-10-147-4-33.ec2.internal.warc.gz'
# The warc library accepts file like objects, so let's use GzipStreamFile
f = warc.WARCFile(fileobj=GzipStreamFile(k))
for num, record in islice(enumerate(f),100):
if record['WARC-Type'] == 'response':
# Imagine we're interested in the URL, the length of content, and any Content-Type strings in there
output.append((record['WARC-Target-URI'], record['Content-Length']))
output.append( '\n'.join(x for x in record.payload.read().replace('\r', '').split('\n\n')[0].split('\n') if 'content-type:' in x.lower()))
output.append( '=-=-' * 10)
return output
In [ ]:
test_gzipstream()
In [ ]:
def warc_records(key_name, limit=None):
conn = boto.connect_s3(anon=True)
bucket = conn.get_bucket('aws-publicdatasets')
key = bucket.get_key(key_name)
# The warc library accepts file like objects, so let's use GzipStreamFile
f = warc.WARCFile(fileobj=GzipStreamFile(key))
for record in islice(f, limit):
yield record
In [ ]:
# let's compute some stats on the headers
from collections import Counter
c=Counter()
[c.update(record.header.keys()) for record in warc_records(wat_segments[0],1)]
c
In [ ]:
# warc-type
# looks like
# first record is 'warcinfo
# rest is 'metadata'
c=Counter()
[c.update([record['warc-type']]) for record in warc_records(wat_segments[0],100)]
c
In [ ]:
# content-type
c=Counter()
[c.update([record['content-type']]) for record in warc_records(wat_segments[0],100)]
c
In [ ]:
# what
In [ ]:
wrecords = warc_records(wat_segments[0])
In [ ]:
record = wrecords.next()
In [ ]:
# http://warc.readthedocs.org/en/latest/#working-with-warc-header
record.header.items()
In [ ]:
#payload
dir(record.payload)
In [ ]:
record.header.get('content-type')
In [ ]:
# payload
s = record.payload.read()
len(s)
In [ ]:
if record.header.get('content-type') == 'application/json':
payload = json.loads(s)
else:
payload = s
payload
In [ ]:
payload['Envelope']['WARC-Header-Metadata']['WARC-Target-URI']
In [ ]:
import urlparse
urlparse.urlparse(payload['Envelope']['WARC-Header-Metadata']['WARC-Target-URI']).netloc
In [ ]:
# let's just run through a segment with minimal processing (to test processing speed)
import urlparse
def walk_through_segment(segment_id, limit=None):
for (i,record) in enumerate(warc_records(segment_id,limit)):
print ("\r>> Record: %d" % i, end="")
In [ ]:
%time walk_through_segment(wat_segments[0],100)
In [ ]:
import urlparse
def netloc_count_for_segment(segment_id, limit=None):
c = Counter()
for (i, record) in enumerate(warc_records(segment_id,limit)):
print ("\r>>Record: %d" % i, end="")
s = record.payload.read()
if record.header.get('content-type') == 'application/json':
payload = json.loads(s)
url = payload['Envelope']['WARC-Header-Metadata'].get('WARC-Target-URI')
if url:
netloc = urlparse.urlparse(url).netloc
c.update([netloc])
else:
c.update([None])
return c
In [ ]:
%time netloc_count_for_segment(wat_segments[0],1000)
In [ ]:
# rewrite dropping Counter to use defaultdict
# http://evanmuehlhausen.com/simple-counters-in-python-with-benchmarks/ suggests Counter is slow
import urlparse
from collections import defaultdict
counter = defaultdict(int)
def netloc_count_for_segment_dd(segment_id, limit=None):
c = defaultdict(int)
for (i, record) in enumerate(warc_records(segment_id,limit)):
print ("\r>>Record: %d" % i, end="")
s = record.payload.read()
if record.header.get('content-type') == 'application/json':
payload = json.loads(s)
url = payload['Envelope']['WARC-Header-Metadata'].get('WARC-Target-URI')
if url:
netloc = urlparse.urlparse(url).netloc
c[netloc] += 1
else:
c[None] += 1
return c
In [ ]:
%time netloc_count_for_segment_dd(wat_segments[0],1000)
In [ ]:
# draw from https://github.com/Smerity/cc-mrjob/blob/7ab5a81ee698a2819ae1bc5295ac0de628f1ea6a/mrcc.py
In [ ]:
!s3cmd ls s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-23/warc.path.gz
In [ ]:
# WARC files from the latest crawl
# s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-35/
warc_segments = segments_from_gzip(gzip_from_key(bucket,
"common-crawl/crawl-data/CC-MAIN-2014-23/warc.path.gz"))
len(warc_segments)
In [ ]:
warc_segments[0]
In [ ]:
walk_through_segment(warc_segments[0],100)
In [ ]:
# code adapted from https://github.com/Smerity/cc-mrjob/blob/7ab5a81ee698a2819ae1bc5295ac0de628f1ea6a/tag_counter.py
import re
from collections import Counter
# Optimization: compile the regular expression once so it's not done each time
# The regular expression looks for (1) a tag name using letters (assumes lowercased input) and numbers
# and (2) allows an optional for a space and then extra parameters, eventually ended by a closing >
HTML_TAG_PATTERN = re.compile('<([a-z0-9]+)[^>]*>')
def get_tag_count(data, ctr=None):
"""Extract the names and total usage count of all the opening HTML tags in the document"""
if ctr is None:
ctr = Counter()
# Convert the document to lower case as HTML tags are case insensitive
ctr.update(HTML_TAG_PATTERN.findall(data.lower()))
return ctr
# Let's check to make sure the tag counter works as expected
assert get_tag_count('<html><a href="..."></a><h1 /><br/><p><p></p></p>') == {'html': 1, 'a': 1, 'p': 2, 'h1': 1, 'br': 1}
def tag_counts_for_segment(segment_id, limit=None):
ctr = Counter()
for (i, record) in enumerate(warc_records(segment_id,limit)):
print ("\r>>Record: %d" % i, end="")
if record.header.get('content-type') == 'application/http; msgtype=response':
payload = record.payload.read()
# The HTTP response is defined by a specification: first part is headers (metadata)
# and then following two CRLFs (newlines) has the data for the response
headers, body = payload.split('\r\n\r\n', 1)
if 'Content-Type: text/html' in headers:
# We avoid creating a new Counter for each page as that's actually quite slow
tag_count = get_tag_count(body,ctr)
return ctr
In [ ]:
# let's use multiprocessing to try to duplicate the list of segments on the mrjob example
# https://raw.githubusercontent.com/commoncrawl/cc-mrjob/master/input/test-10.txt
import requests
mrjobs_segs = filter(None,
requests.get("https://raw.githubusercontent.com/commoncrawl/cc-mrjob/master/input/test-10.txt").content.split("\n"))
mrjobs_segs
In [ ]:
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
def calc_tag_counts_for_segments(segments, max_segments=None, max_record_per_segment=None, pool_size=4, chunk_size=1,
print_result_count=False):
"""
pool_size: number of works in pool
max_segments: number of segments to calculate (None for no limit)
max_record_per_segment: max num of records to calculate per segment (None for no limit)
chunk_size: how to chunk jobs among pool workers
print_result_count: whether to print number of results available
"""
tag_counts_for_segment_limit = partial(tag_counts_for_segment, limit=max_record_per_segment)
pool = ThreadPool(pool_size)
results_iter = pool.imap_unordered(tag_counts_for_segment_limit,
mrjobs_segs[0:max_segments],
chunk_size)
results = []
net_counts = Counter()
for (i, result) in enumerate(islice(results_iter,None)):
if print_result_count:
print ('\r>> Result %d' % i, end="")
results.append(result)
net_counts.update(result)
return (net_counts, results)
In [ ]:
%time tag_counts = calc_tag_counts_for_segments(mrjobs_segs, max_segments=None, max_record_per_segment=100, pool_size=4, chunk_size=1, print_result_count=True)[0]
You can have multithreading within a given process but to use multiple processes, need to go with IPython parallel -- at least, that's what I've been able to work out
In [ ]:
import multiprocessing
multiprocessing.cpu_count()
In [ ]:
from IPython import parallel
rc = parallel.Client()
rc.block = True
rc.ids
To be continued....
Is it possible to work with pieces of the new warc files without having to download the whole file?
In the 2012 crawl data, there was an index built http://urlsearch.commoncrawl.org/ --> from which you can get a reference to an offset and length inside of the arc.gz file. With S3, you don't need to then download the whole file but just a chunk....and that chunk itself is unpackable as a gzip file. (nice feature of gzip?):
Is there a similar way of working with the 2014 crawl data? That is, a way of reading off chunks of the warc file without grabbing entire warc files?