In [ ]:
# to find out which Python and environment I'm using
!which python

In [ ]:
# version of boto
!pip show boto

In [ ]:
# working with Python 2.7.x

from __future__ import print_function

In [ ]:
import boto
from boto.s3.connection import S3Connection
from boto.s3.key import Key

from itertools import islice

In [ ]:
# I don't like this having to pass in a bucket to keys_in_bucket.  Let's rewrite to pass in bucket_name

def keys_in_bucket(prefix, bucket_name="aws-publicdatasets", truncate_path=True, limit=None):
    """
    given a S3 boto bucket, yields the keys in bucket with the prefix
    if truncate_path is True, remove prefix in keys
    optional limit to number of keys to return
    """
    
    conn = S3Connection(anon=True)
    bucket = conn.get_bucket('aws-publicdatasets')
    
    keys = islice(bucket.list(prefix=prefix, 
                                   delimiter="/"),
                       limit)
    for key in keys:
        name = key.name.encode("UTF-8") 
        if truncate_path:
            name = name.replace(prefix, "", 1)
        yield name

In [ ]:
def S3Key(key_name, bucket_name="aws-publicdatasets"):
    conn = S3Connection(anon=True)
    bucket = conn.get_bucket(bucket_name)
    return bucket.get_key(key_name)

In [ ]:
!s3cmd ls s3://aws-publicdatasets/common-crawl/crawl-data/ | grep CC-MAIN

August 2014 crawl

http://commoncrawl.org/august-2014-crawl-data-available/

  • all segments (CC-MAIN-2014-35/segment.paths.gz)
  • all WARC files (CC-MAIN-2014-35/warc.paths.gz)
  • all WAT files (CC-MAIN-2014-35/wat.paths.gz)
  • all WET files (CC-MAIN-2014-35/wet.paths.gz)

https://aws-publicdatasets.s3.amazonaws.com/common-crawl/crawl-data/CC-MAIN-2014-35/segment.path.gz

or

https://aws-publicdatasets.s3.amazonaws.com/common-crawl/crawl-data/CC-MAIN-2014-35/segment.paths.gz ?


In [ ]:
def key_for_new_crawl(crawl_name, file_type='all', return_name=False, plural=True):
    
    suffix = "s" if plural else ""
    
    if file_type.upper() == 'WARC':
        file_name = "warc.path{suffix}.gz".format(suffix=suffix)
    elif file_type.upper() == 'WAT':
        file_name = "wat.path{suffix}.gz".format(suffix=suffix)
    elif file_type.upper() == 'wet':
        file_name = "wet.path{suffix}.gz".format(suffix=suffix)
    else:
        file_name = "segment.path{suffix}.gz".format(suffix=suffix)
        

    key_name = "common-crawl/crawl-data/{crawl_name}/{file_name}".format(crawl_name=crawl_name,
                                                                       file_name=file_name)
    
    return S3Key(key_name)

In [ ]:
list(keys_in_bucket("common-crawl/crawl-data/CC-MAIN-2014-35/"))

In [ ]:
k = S3Key("common-crawl/crawl-data/CC-MAIN-2014-35/segment.paths.gz")
k

In [ ]:
k.size

In [ ]:
k = key_for_new_crawl("CC-MAIN-2014-35", "all")
k

Do all of the nutch era crawls follow the same structure?


In [ ]:
[key for key in keys_in_bucket("common-crawl/crawl-data/") if key.startswith("CC-MAIN")]

In [ ]:
# convert year/week number to datetime

import datetime

def week_in_year(year, week):
    return datetime.datetime(year,1,1) + datetime.timedelta((week-1)*7)

week_in_year(2014,35)

In [ ]:
import gzip
import StringIO

# the following functions 

def gzip_from_key(key_name, bucket_name="aws-publicdatasets"):
    
    k = S3Key(key_name, bucket_name)
    f = gzip.GzipFile(fileobj=StringIO.StringIO(k.get_contents_as_string()))
    return f

def segments_from_gzip(gz):
    s = gz.read()
    return filter(None, s.split("\n"))

In [ ]:
# let's parse segment.path(s).gz

valid_segments = segments_from_gzip(gzip_from_key(key_for_new_crawl("CC-MAIN-2014-35", "all", return_name=True)))
valid_segments[:5]

In [ ]:
# side issue: rewrite to deal with gzip on the fly?

from boto.s3.key import Key
from gzipstream import GzipStreamFile

def gzip_from_key_2(key_name, bucket_name='aws-publicdatasets'):

    k = S3Key(key_name, bucket_name)
    f = GzipStreamFile(k)
    
    return f

def segments_from_gzip_2(gz):
    BUF_SIZE = 1000
    s = ""
    buffer = gz.read(BUF_SIZE)
    while buffer:
        s += buffer
        buffer = gz.read(BUF_SIZE)

    return filter(None, s.split("\n"))

valid_segments_2 = segments_from_gzip_2(gzip_from_key_2("common-crawl/crawl-data/CC-MAIN-2014-15/segment.paths.gz"))
valid_segments_2[:5]

WAT: metadata files

all WAT files (CC-MAIN-2014-15/wat.paths.gz)


In [ ]:
wat_segments = segments_from_gzip(gzip_from_key(key_for_new_crawl("CC-MAIN-2014-35", "WAT", return_name=True)))
len(wat_segments)

In [ ]:
wat_segments[0]

http://commoncrawl.org/navigating-the-warc-file-format/

json file

forgot to check how big the file is before downloading it.


In [ ]:
k = S3Key(wat_segments[0])
k.size

In [ ]:
def get_key_sizes(keys, bucket_name="aws-publicdatasets"):
    try:
        conn = S3Connection(anon=True)
        bucket = conn.get_bucket(bucket_name)

        sizes = []
        for key in keys:
            try:
                k = bucket.get_key(key)
                sizes.append((key, k.size if hasattr(k, 'size') else 0))
            except Exception, e:
                sizes.append((key, e))
        return sizes
    except Exception, e:
        sizes = []
        for key in keys:
            sizes.append ((key, e))
        return sizes

In [ ]:
get_key_sizes(wat_segments[0:20])

In [ ]:
# http://stackoverflow.com/questions/2348317/how-to-write-a-pager-for-python-iterators/2350904#2350904        
def grouper(iterable, page_size):
    page= []
    for item in iterable:
        page.append( item )
        if len(page) == page_size:
            yield page
            page= []
    if len(page) > 0:
        yield page

multiprocessing approach

when to use Processes vs Threads? I think you use threads when there's no danger of memory collision

http://sebastianraschka.com/Articles/2014_multiprocessing_intro.html#Multi-Threading-vs.-Multi-Processing:

If we submit "jobs" to different threads, those jobs can be pictured as "sub-tasks" of a single process and those threads will usually have access to the same memory areas (i.e., shared memory). This approach can easily lead to conflicts in case of improper synchronization, for example, if processes are writing to the same memory location at the same time.


In [ ]:
wat_segments[0:2]

multiprocessing async approach


In [ ]:
# with pool.map

from multiprocessing.dummy import Pool as ThreadPool 
from functools import partial

get_key_sizes_for_bucket = partial(get_key_sizes, bucket_name="aws-publicdatasets")

PAGE_SIZE = 1
POOL_SIZE = 100
MAX_SEGMENTS = 100

pool = ThreadPool(POOL_SIZE) 
results = pool.map(get_key_sizes_for_bucket, grouper(wat_segments[0:MAX_SEGMENTS], PAGE_SIZE))
pool.close()
pool.join()

In [ ]:
results[:5]

In [ ]:
# Pool.imap_unordered

from multiprocessing.dummy import Pool as ThreadPool 
from functools import partial

get_key_sizes_for_bucket = partial(get_key_sizes, bucket_name="aws-publicdatasets")

PAGE_SIZE = 1
POOL_SIZE = 100
MAX_SEGMENTS = 10  # replace with None for all segments
CHUNK_SIZE = 10

pool = ThreadPool(POOL_SIZE) 
results_iter = pool.imap_unordered(get_key_sizes_for_bucket, 
                              grouper(wat_segments[0:MAX_SEGMENTS], PAGE_SIZE),
                              CHUNK_SIZE)

results = []
                             
for (i, page) in enumerate(islice(results_iter,None)):
    print ('\r>> Result %d' % i, end="")
    for result in page:
        results.append(result)

In [ ]:
results[:5]

In [ ]:
# Pool.imap_unordered with ProcessPool
# not necessarily that useful since this calculation is I/O bound

from multiprocessing import Pool as ProcessPool 
from functools import partial

get_key_sizes_for_bucket = partial(get_key_sizes, bucket_name="aws-publicdatasets")

PAGE_SIZE = 1
POOL_SIZE = 100
MAX_SEGMENTS = 10  # replace with None for all segments
CHUNK_SIZE = 10

pool = ProcessPool(POOL_SIZE) 
results_iter = pool.imap_unordered(get_key_sizes_for_bucket, 
                              grouper(wat_segments[0:MAX_SEGMENTS], PAGE_SIZE),
                              CHUNK_SIZE)

results = []
                             
for (i, page) in enumerate(islice(results_iter,None)):
    print ('\r>> Result %d' % i, end="")
    for result in page:
        results.append(result)

In [ ]:
from pandas import Series, DataFrame
df = DataFrame(results, columns=['key', 'size'])
df.head()

In [ ]:
df['size'].describe()

In [ ]:
%pylab --no-import-all inline

In [ ]:
import pylab as P
P.hist(df['size'])

In [ ]:
# total byte size of all the wat files
print (format(sum(df['size']),",d"))

In [ ]:
# save results
import csv
with open('CC-MAIN-2014-35.wat.csv', 'wb') as csvfile:
    wat_writer = csv.writer(csvfile, delimiter=',',
                            quotechar='|', quoting=csv.QUOTE_MINIMAL)
    wat_writer.writerow(['key', 'size'])
    for result in results:
        wat_writer.writerow(result)

In [ ]:
!head CC-MAIN-2014-35.wat.csv

Taking apart a WAT file


In [ ]:
# this won't work for the version of boto I'm currently using -- a bug to be resolved

# key = bucket.get_key(wat_segments[0])
# url = key.generate_url(expires_in=0, query_auth=False)
# url

In [ ]:
wat_segments[0]

In [ ]:
# looks like within any segment ID, we should expect warc, wat, wet buckets

!s3cmd ls s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-35/segments/1408500800168.29/

In [ ]:
#https://github.com/commoncrawl/gzipstream

from gzipstream import GzipStreamFile

In [ ]:
import boto
from itertools import islice
from boto.s3.key import Key
from gzipstream import GzipStreamFile
import warc
import json

def test_gzipstream():

  output = []
    
  # Let's use a random gzipped web archive (WARC) file from the 2014-15 Common Crawl dataset
  ## Connect to Amazon S3 using anonymous credentials
  conn = boto.connect_s3(anon=True)
  pds = conn.get_bucket('aws-publicdatasets')
  ## Start a connection to one of the WARC files
  k = Key(pds)
  k.key = 'common-crawl/crawl-data/CC-MAIN-2014-15/segments/1397609521512.15/warc/CC-MAIN-20140416005201-00000-ip-10-147-4-33.ec2.internal.warc.gz'

  # The warc library accepts file like objects, so let's use GzipStreamFile
  f = warc.WARCFile(fileobj=GzipStreamFile(k))
  for num, record in islice(enumerate(f),100):
    if record['WARC-Type'] == 'response':
      # Imagine we're interested in the URL, the length of content, and any Content-Type strings in there
      output.append((record['WARC-Target-URI'], record['Content-Length']))
      output.append( '\n'.join(x for x in record.payload.read().replace('\r', '').split('\n\n')[0].split('\n') if 'content-type:' in x.lower()))
      output.append( '=-=-' * 10)
 
  return output

In [ ]:
#test_gzipstream()

In [ ]:
def warc_records(key_name, limit=None):
    conn = boto.connect_s3(anon=True)
    bucket = conn.get_bucket('aws-publicdatasets')
    key = bucket.get_key(key_name)

    # The warc library accepts file like objects, so let's use GzipStreamFile
    f = warc.WARCFile(fileobj=GzipStreamFile(key))
    for record in islice(f, limit):
        yield record

In [ ]:
# let's compute some stats on the headers
from collections import Counter
c=Counter()
[c.update(record.header.keys()) for record in warc_records(wat_segments[0],1)]
c

In [ ]:
# warc-type

# looks like 
# first record is 'warcinfo
# rest is 'metadata'

c=Counter()
[c.update([record['warc-type']]) for record in warc_records(wat_segments[0],100)]

c

In [ ]:
# content-type

c=Counter()
[c.update([record['content-type']]) for record in warc_records(wat_segments[0],100)]

c

In [ ]:
# what's in the records

In [ ]:
wrecords = warc_records(wat_segments[0])
wrecords

In [ ]:
record = wrecords.next()

In [ ]:
if record.header.get('content-type') == 'application/json':
    s = record.payload.read()
    payload = json.loads(s)
else:
    payload = s
    
payload

In [ ]:
record.header.get('content-type')

In [ ]:
record.url

In [ ]:
# http://warc.readthedocs.org/en/latest/#working-with-warc-header

record.header.items()

In [ ]:
#payload
dir(record.payload)

In [ ]:
record.header.get('content-type')

In [ ]:
type(payload)

In [ ]:
if isinstance(payload, dict):
    print (payload['Envelope']['WARC-Header-Metadata'].get('WARC-Target-URI'))

In [ ]:
record.header

In [ ]:
import urlparse
urlparse.urlparse(payload['Envelope']['WARC-Header-Metadata']['WARC-Target-URI']).netloc

In [ ]:
# let's just run through a segment with minimal processing (to test processing speed)

import urlparse

def walk_through_segment(segment_id, limit=None):
    
    for (i,record) in enumerate(warc_records(segment_id,limit)):
        print ("\r>> Record: %d" % i, end="")

In [ ]:
%time walk_through_segment(wat_segments[0],10)

In [ ]:
import urlparse

def netloc_count_for_segment(segment_id, limit=None):
    
    c = Counter()

    for (i, record) in enumerate(warc_records(segment_id,limit)):

        print ("\r>>Record: %d" % i, end="")

        if record.header.get('content-type') == 'application/json':
            s = record.payload.read()
            payload = json.loads(s)
            url = payload['Envelope']['WARC-Header-Metadata'].get('WARC-Target-URI')
            if url:
                netloc = urlparse.urlparse(url).netloc
                c.update([netloc])
            else:
                c.update([None])

    return c

In [ ]:
%time netloc_count_for_segment(wat_segments[0],100)

In [ ]:
# rewrite dropping Counter to use defaultdict
# http://evanmuehlhausen.com/simple-counters-in-python-with-benchmarks/ suggests Counter is slow

import urlparse
from collections import defaultdict

counter = defaultdict(int)

def netloc_count_for_segment_dd(segment_id, limit=None):

    c = defaultdict(int)
    
    for (i, record) in enumerate(warc_records(segment_id,limit)):

        print ("\r>>Record: %d" % i, end="")
        s = record.payload.read()
        if record.header.get('content-type') == 'application/json':
            payload = json.loads(s)
            url = payload['Envelope']['WARC-Header-Metadata'].get('WARC-Target-URI')
            if url:
                netloc = urlparse.urlparse(url).netloc
                c[netloc] += 1
            else:
                c[None] += 1

    return c

In [ ]:
%time netloc_count_for_segment_dd(wat_segments[0],100)

Tag Count


In [ ]:
# draw from https://github.com/Smerity/cc-mrjob/blob/7ab5a81ee698a2819ae1bc5295ac0de628f1ea6a/mrcc.py

In [ ]:
!s3cmd ls s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-35/warc.path.gz

In [ ]:
# WARC files from the latest crawl
# s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-35/

warc_segments = segments_from_gzip(gzip_from_key(key_for_new_crawl("CC-MAIN-2014-23", "WARC", return_name=True)))
len(warc_segments)

In [ ]:
warc_segments[0]

In [ ]:
walk_through_segment(warc_segments[0],100)

In [ ]:
# code adapted from https://github.com/Smerity/cc-mrjob/blob/7ab5a81ee698a2819ae1bc5295ac0de628f1ea6a/tag_counter.py

import re
from collections import Counter

# Optimization: compile the regular expression once so it's not done each time
# The regular expression looks for (1) a tag name using letters (assumes lowercased input) and numbers
# and (2) allows an optional for a space and then extra parameters, eventually ended by a closing >

HTML_TAG_PATTERN = re.compile('<([a-z0-9]+)[^>]*>')

def get_tag_count(data, ctr=None):
  """Extract the names and total usage count of all the opening HTML tags in the document"""
  if ctr is None:
    ctr = Counter()
  # Convert the document to lower case as HTML tags are case insensitive
  ctr.update(HTML_TAG_PATTERN.findall(data.lower()))
  return ctr

# Let's check to make sure the tag counter works as expected
assert get_tag_count('<html><a href="..."></a><h1 /><br/><p><p></p></p>') == {'html': 1, 'a': 1, 'p': 2, 'h1': 1, 'br': 1}


def tag_counts_for_segment(segment_id, limit=None, print_record_num=False):
    
    ctr = Counter()
    
    try:
    
        for (i, record) in enumerate(warc_records(segment_id,limit)):

            if print_record_num:
                print ("\r>>Record: %d" % i, end="")

            if record.header.get('content-type') == 'application/http; msgtype=response':
                payload = record.payload.read()
                # The HTTP response is defined by a specification: first part is headers (metadata)
                # and then following two CRLFs (newlines) has the data for the response
                headers, body = payload.split('\r\n\r\n', 1)
                if 'Content-Type: text/html' in headers:
                    # We avoid creating a new Counter for each page as that's actually quite slow
                    tag_count = get_tag_count(body,ctr)
  
        return (segment_id, ctr)
    
    except Exception, e:
        
        return (segment_id, e)

In [ ]:
# let's use multiprocessing to try to duplicate the list of segments on the mrjob example

# https://github.com/commoncrawl/cc-mrjob/blob/1914a8923a6c79ffff28d269799a109d2805b04e/input/test-1.warc


import requests
url = "https://raw.githubusercontent.com/commoncrawl/cc-mrjob/1914a8923a6c79ffff28d269799a109d2805b04e/input/test-1.warc"
mrjobs_segs = filter(None, 
                     requests.get(url).content.split("\n"))
mrjobs_segs

In [ ]:
from multiprocessing.dummy import Pool as ThreadPool 
from multiprocessing import Pool as ProcessPool

from functools import partial


def calc_tag_counts_for_segments(segments, max_segments=None, max_record_per_segment=None, pool_size=4, chunk_size=1,
                                 print_result_count=False, threads=True):
    
    """
    pool_size:  number of works in pool
    max_segments:  number of segments to calculate (None for no limit)
    max_record_per_segment:  max num of records to calculate per segment (None for no limit)
    chunk_size: how to chunk jobs among pool workers
    print_result_count: whether to print number of results available
    """
    
    tag_counts_for_segment_limit = partial(tag_counts_for_segment, limit=max_record_per_segment)


    if threads:
        pool = ThreadPool(pool_size) 
    else:
        pool = ProcessPool(pool_size)
        
    results_iter = pool.imap_unordered(tag_counts_for_segment_limit, 
                                  mrjobs_segs[0:max_segments],
                                  chunk_size)

    results = []
    exceptions = []
    net_counts = Counter()

    for (i, (seg_id, result)) in enumerate(results_iter):
        if print_result_count:
            print ('\r>> Result %d' % i, end="")
        results.append((seg_id, result))
        if isinstance(result, Counter):
            net_counts.update(result)
        elif isinstance(result,Exception):
            exceptions.append((seg_id, result))
    
    return (net_counts, results, exceptions)

In [ ]:
%time (tag_counts, results, exceptions) = calc_tag_counts_for_segments(mrjobs_segs, max_segments=None, \
                                                max_record_per_segment=100, \
                                                pool_size=8, \
                                                chunk_size=1, \
                                                print_result_count=True, \
                                                threads=True)

In [ ]:
# calculate vector for h1...h6
def htags(tag_counts):
    return [tag_counts.get('h{i}'.format(i=i),0) for i in range(1,7)]


def normalize(a):
    s = np.sum(a)
    if s > 0:
        return np.array(a).astype('float')/s
    else:
        return np.array(a)
    
    
k = normalize(htags(tag_counts))
k

In [ ]:
from itertools import izip

colors = ['b', 'g', 'r', 'c','m', 'y', 'k', 'w']
    
def plot_hdist(props):
    c_props = np.cumsum(props)
    for (prop, c_prop, color) in izip(props, c_props, colors):
        plt.bar(left=0, width=width, bottom = c_prop-prop, height = prop, color=color)    
    plt.plot()
        
plot_hdist(normalize(htags(tag_counts)))

In [ ]:
exceptions

In [ ]:
from IPython.html import widgets
m = widgets.interact(plot_hdist, props=normalize(htags(tag_counts)))

Using IPython parallel

You can have multithreading within a given process but to use multiple processes, need to go with IPython parallel -- at least, that's what I've been able to work out


In [ ]:
import multiprocessing
multiprocessing.cpu_count()

In [ ]:
from IPython import parallel
rc = parallel.Client()
rc.block = True

rc.ids

To be continued....

WARC files: the final frontier

Is it possible to work with pieces of the new warc files without having to download the whole file?

In the 2012 crawl data, there was an index built http://urlsearch.commoncrawl.org/ --> from which you can get a reference to an offset and length inside of the arc.gz file. With S3, you don't need to then download the whole file but just a chunk....and that chunk itself is unpackable as a gzip file. (nice feature of gzip?):

http://nbviewer.ipython.org/github/rdhyee/working-open-data/blob/postscript/notebooks/Day_21_CommonCrawl_Content.ipynb#Grabbing-pieces-of-the-.arc.gz-files

Is there a similar way of working with the 2014 crawl data? That is, a way of reading off chunks of the warc file without grabbing entire warc files?