The goal of this notebook is to retrieve the relevant images from a set of ads with assigned clusters.
The input is specified by DATA_FILE, which is a JSON lines file containing CDR ad documents that each contain a doc_id and a cluster_id.
There are many resultant CSV files using for intermediary mapping, but the import end result is CP1_clusters_ads_images.csv with records following the format of cluster_id,ad_id,sha1 along with the CP1_imageset directory which is SHA1 content addressed.
../scripts/es_config.json must be configured to point to the correct Elasticsearch Index
In [ ]:
import json
import os
from collections import defaultdict
In [ ]:
DATA_FILE = ''
In [ ]:
!md5sum $DATA_FILE
Extract CDR IDs from DATA_FILE
In [ ]:
cdr_ids = set()
with open(DATA_FILE, 'r') as infile:
for line in infile:
ad = json.loads(line.strip())
cdr_ids.add(ad['doc_id'])
with open('../data/CP1_cdr_ids.txt', 'w') as outfile:
for cdr_id in cdr_ids:
outfile.write('%s\n' % cdr_id)
This is responsible for retrieving the documents from Elasticsearch whose parents are in cdr_ids (i.e. the images which belong to these ads).
A few notes about parallel:
awk '$7 != 0' ../data/misc/get_es_child_documents.log
In [ ]:
!time parallel -j6 --joblog ../data/misc/get_es_child_documents.log \
--arg-file ../data/CP1_cdr_ids.txt \
--retries 3 \
--timeout 20 \
--max-args 150 \
python ../scripts/get_es_child_documents.py | sort --unique > ../data/CP1_image_documents.json
Of particular interest from the documents are the values in the obj_stored_url field, write these to a separate file for downloading.
In [ ]:
image_urls = set()
with open('../data/CP1_image_documents.json', 'r') as infile:
for line in infile:
image_doc = json.loads(line)
if image_doc['obj_stored_url']:
image_urls.add(image_doc['obj_stored_url'])
# Construct file for parallel downloading
with open('../data/CP1_image_urls.txt', 'w') as outfile:
for url in image_urls:
outfile.write('%s\n' % url)
The same notes for parallel above follow here.
This will retrieve the URLs and store them as SHA1 addressed filenames and print the mapping between URLs and SHA1 hashes.
In [ ]:
!time parallel -j50 \
--joblog ../data/misc/dl_images.log \
--arg-file ../data/CP1_image_urls.txt \
--retries 3 \
--timeout 20 \
python ../scripts/download_url_as_sha.py | sort --unique > ../data/CP1_url_sha.txt
This is responsible for deleting invalid files which are defined as files with a size of 0, or files which don't contain "image data" as per the file command.
In [ ]:
import subprocess
for (dirpath, _, filenames) in os.walk('../data/CP1_imageset'):
for filename in filenames:
path = os.path.join(dirpath, filename)
exists = os.path.exists(path)
if exists and os.stat(path).st_size == 0:
os.unlink(path)
elif exists:
if 'image data' not in subprocess.check_output(['file', path]):
os.unlink(path)
Gather the rows that are still valid, so CP1_url_sha.txt can be adjusted to reflect that.
In [ ]:
valid_rows = []
with open('../data/CP1_url_sha.txt', 'r') as infile:
reader = csv.reader(infile, delimiter=' ')
for row in reader:
url, sha = row
if os.path.isfile(os.path.join('../data/CP1_imageset', sha[:3], sha)):
valid_rows.append(row)
In [ ]:
with open('../data/CP1_url_sha.txt', 'w') as outfile:
writer = csv.writer(outfile, lineterminator='\n', delimiter=' ')
for row in valid_rows:
writer.writerow(row)
In [ ]:
# CP1_image_documents.json gives ad_id and image doc_id which can be mapped to sha via image_doc_id_sha
clusters_ads = set()
# cluster_id,ad_id are given from the official DATA_FILE
with open(DATA_FILE, 'r') as infile:
for line in infile:
ad = json.loads(line.strip())
clusters_ads.add((ad['cluster_id'], ad['doc_id']))
In [ ]:
print '%d ads across %d clusters.' % (len(set([x[1] for x in clusters_ads])),
len(set([x[0] for x in clusters_ads])))
In [ ]:
# Retrieve the sha1 through the CP1_url_shas.txt through the obj_stored_url in CP1_image_documents.json
# Going from image to ad will always yield an ad
with open('../data/CP1_image_documents.json', 'r') as infile:
ad_id_image_urls = defaultdict(set)
for line in infile:
image_doc = json.loads(line)
if not isinstance(image_doc['obj_parent'], list):
image_doc['obj_parent'] = [image_doc['obj_parent']]
for ad_id in image_doc['obj_parent']:
ad_id_image_urls[ad_id].add(image_doc['obj_stored_url'])
In [ ]:
print '%d image URLs exist across %d ads.' % (sum([len(x) for x in ad_id_image_urls.values()]), len(ad_id_image_urls))
In [ ]:
# Go from obj_stored_url to sha1
with open('../data/CP1_url_sha.txt', 'r') as infile:
url_sha = {}
for line in infile:
url, sha = line.strip().split()
url_sha[url] = sha
In [ ]:
print '%d unique URLs/images.' % len(url_sha)
In [ ]:
import csv
with open('../data/CP1_clusters_ads_images.csv', 'w') as outfile:
writer = csv.writer(outfile)
# Headers
writer.writerow(['cluster_id', 'ad_id', 'sha1'])
for (cluster_id, ad_id) in clusters_ads:
# Finding a url should always work
# Finding the sha from a url may not, if the URL failed to be retrieved (404, whatever)
image_urls_from_ad = ad_id_image_urls[ad_id]
image_shas_from_ad = set([url_sha[x] for x in image_urls_from_ad if x in url_sha])
for sha1 in image_shas_from_ad:
writer.writerow((cluster_id, ad_id, sha1))