The goal of this notebook is to retrieve the relevant images from a set of ads with assigned clusters.

Input

The input is specified by DATA_FILE, which is a JSON lines file containing CDR ad documents that each contain a doc_id and a cluster_id.

Output

There are many resultant CSV files using for intermediary mapping, but the import end result is CP1_clusters_ads_images.csv with records following the format of cluster_id,ad_id,sha1 along with the CP1_imageset directory which is SHA1 content addressed.

Requirements

pip
  • requests
  • certifi
  • elasticsearch
  • elasticsearch-dsl
system
  • GNU Parallel
  • File utility
Other
  • ../scripts/es_config.json must be configured to point to the correct Elasticsearch Index

In [ ]:
import json
import os

from collections import defaultdict

In [ ]:
DATA_FILE = ''

In [ ]:
!md5sum $DATA_FILE

Obtaining relevant data

Extract CDR IDs from DATA_FILE


In [ ]:
cdr_ids = set()

with open(DATA_FILE, 'r') as infile:
    for line in infile:
        ad = json.loads(line.strip())
        cdr_ids.add(ad['doc_id'])
        
with open('../data/CP1_cdr_ids.txt', 'w') as outfile:
    for cdr_id in cdr_ids:
        outfile.write('%s\n' % cdr_id)

This is responsible for retrieving the documents from Elasticsearch whose parents are in cdr_ids (i.e. the images which belong to these ads).

A few notes about parallel:

  • The commands run and their exit status are stored in the file specified as --joblog
    • This means the failing jobs could be retrieved using something like
      awk '$7 != 0' ../data/misc/get_es_child_documents.log
  • Depending on the system, -j and --max-args may need to be adjusted

In [ ]:
!time parallel -j6 --joblog ../data/misc/get_es_child_documents.log \
                   --arg-file ../data/CP1_cdr_ids.txt \
                   --retries 3 \
                   --timeout 20 \
                   --max-args 150 \
                   python ../scripts/get_es_child_documents.py | sort --unique > ../data/CP1_image_documents.json

Of particular interest from the documents are the values in the obj_stored_url field, write these to a separate file for downloading.


In [ ]:
image_urls = set()

with open('../data/CP1_image_documents.json', 'r') as infile:
    for line in infile:
        image_doc = json.loads(line)
        
        if image_doc['obj_stored_url']:
            image_urls.add(image_doc['obj_stored_url'])
        
# Construct file for parallel downloading
with open('../data/CP1_image_urls.txt', 'w') as outfile:
    for url in image_urls:
        outfile.write('%s\n' % url)

The same notes for parallel above follow here.

This will retrieve the URLs and store them as SHA1 addressed filenames and print the mapping between URLs and SHA1 hashes.


In [ ]:
!time parallel -j50 \
               --joblog ../data/misc/dl_images.log \
               --arg-file ../data/CP1_image_urls.txt \
               --retries 3 \
               --timeout 20 \
               python ../scripts/download_url_as_sha.py | sort --unique > ../data/CP1_url_sha.txt

This is responsible for deleting invalid files which are defined as files with a size of 0, or files which don't contain "image data" as per the file command.


In [ ]:
import subprocess

for (dirpath, _, filenames) in os.walk('../data/CP1_imageset'):
    for filename in filenames:
        path = os.path.join(dirpath, filename)
        exists = os.path.exists(path)
        if exists and os.stat(path).st_size == 0:
            os.unlink(path)
        elif exists:
            if 'image data' not in subprocess.check_output(['file', path]):
                os.unlink(path)

Gather the rows that are still valid, so CP1_url_sha.txt can be adjusted to reflect that.


In [ ]:
valid_rows = []

with open('../data/CP1_url_sha.txt', 'r') as infile:
    reader = csv.reader(infile, delimiter=' ')

    for row in reader:
        url, sha = row
        
        if os.path.isfile(os.path.join('../data/CP1_imageset', sha[:3], sha)):
            valid_rows.append(row)

In [ ]:
with open('../data/CP1_url_sha.txt', 'w') as outfile:
    writer = csv.writer(outfile, lineterminator='\n', delimiter=' ')
    
    for row in valid_rows:
        writer.writerow(row)

Formatting data

The rest of the notebook is about writing ../data/CP1_clusters_ads_images.csv given the data we have already obtained.


In [ ]:
# CP1_image_documents.json gives ad_id and image doc_id which can be mapped to sha via image_doc_id_sha
clusters_ads = set()

# cluster_id,ad_id are given from the official DATA_FILE
with open(DATA_FILE, 'r') as infile:
    for line in infile:
        ad = json.loads(line.strip())
        clusters_ads.add((ad['cluster_id'], ad['doc_id']))

In [ ]:
print '%d ads across %d clusters.' % (len(set([x[1] for x in clusters_ads])),
                                      len(set([x[0] for x in clusters_ads])))

In [ ]:
# Retrieve the sha1 through the CP1_url_shas.txt through the obj_stored_url in CP1_image_documents.json
# Going from image to ad will always yield an ad
with open('../data/CP1_image_documents.json', 'r') as infile:
    ad_id_image_urls = defaultdict(set)
    
    for line in infile:
        image_doc = json.loads(line)
        
        if not isinstance(image_doc['obj_parent'], list):
            image_doc['obj_parent'] = [image_doc['obj_parent']]
                
        for ad_id in image_doc['obj_parent']:
            ad_id_image_urls[ad_id].add(image_doc['obj_stored_url'])

In [ ]:
print '%d image URLs exist across %d ads.' % (sum([len(x) for x in ad_id_image_urls.values()]), len(ad_id_image_urls))

In [ ]:
# Go from obj_stored_url to sha1
with open('../data/CP1_url_sha.txt', 'r') as infile:
    url_sha = {}
    
    for line in infile:
        url, sha = line.strip().split()
        url_sha[url] = sha

In [ ]:
print '%d unique URLs/images.' % len(url_sha)

In [ ]:
import csv

with open('../data/CP1_clusters_ads_images.csv', 'w') as outfile:
    writer = csv.writer(outfile)
    
    # Headers
    writer.writerow(['cluster_id', 'ad_id', 'sha1'])
    
    for (cluster_id, ad_id) in clusters_ads:
        # Finding a url should always work
        # Finding the sha from a url may not, if the URL failed to be retrieved (404, whatever)
        image_urls_from_ad = ad_id_image_urls[ad_id] 
        image_shas_from_ad = set([url_sha[x] for x in image_urls_from_ad if x in url_sha])
        
        for sha1 in image_shas_from_ad:
            writer.writerow((cluster_id, ad_id, sha1))