In [ ]:
import dask
In [ ]:
import eada
In [1]:
def search(waveband, keywords=None, service='conesearch'):
'''
Search every 'service' in 'waveband', optionally matching 'keywords'
'''
registry='US'
from pyvo import regsearch
# Query the/a registry for everything
try:
records = regsearch(waveband=waveband,
keywords=keywords,
servicetype=service)
except:
print('RegSearch failed')
return None
num_records = len(records)
print("Found {} catalogues.".format(num_records))
if not num_records:
return None
return records
In [2]:
from eada.vo.servsearch import select_catalog
def record_metadata(rec):
try:
cat = select_catalog(rec)
if cat:
k = cat.shortname()
return {k : cat.summary()}
except:
pass
return None
In parallel, using dask [(default) number of processes == number of cpu/cores]
In [3]:
def get_records_metadata_dask(recs, nprocs=None):
import dask.bag as db
from dask.diagnostics import ProgressBar
print('Processing {} records..'.format(len(recs)))
b = db.from_sequence(recs).map(record_metadata)
with ProgressBar():
cats = b.compute(num_workers=nprocs)
return cats
In [ ]:
wb = 'radio'
In [ ]:
records = search(wb)
In [ ]:
N = 100
if N:
recs = [records[i] for i in range(N)]
In [ ]:
print('Default number of processes (number of cpu cores)')
%time cats_d = get_records_metadata_dask(recs)
In [ ]:
import json
s = json.dumps(cats_d,indent=2)
print(s)
In [ ]:
def get_records_metadata_loop(recs):
cats = []
for i in range(len(recs)):
cats.append(record_metadata(recs[i]))
return cats
%time cats_l = get_records_metadata_loop(recs)
In [ ]:
import json
s = json.dumps(cats_l,indent=2)
print(s)
In [ ]:
N = 400
if N:
recs = [records[i] for i in range(N)]
In [ ]:
N_procs_steps = 6
N_procs = [2**i for i in range(1, N_procs_steps)]
N_procs.reverse()
for np in N_procs:
print('Number of processes: {}'.format(np))
%time cats_dd = get_records_metadata_dask(recs, np)
print('---')
In [ ]:
NPROCS=8 # double the number of cpus
%time cats = get_records_metadata_dask(records, NPROCS)
In [ ]:
print(len(cats))
In [ ]:
cats_dirty = cats.copy()
cats = [c for c in cats if c is not None]
print(len(cats))
In [ ]:
dcats = {k:v for c in cats for k,v in c.items()}
len(dcats)
Notice that when we move the catalogues from the original list ('cats') to a dictionary ('dcats') we lost some items. Which means that a lot of them (~200) have same name/key.
This should be fixed!
In [ ]:
for k,v in dcats.items():
ucds = v['ucds']
if any('polarization' in u for u in ucds):
print("{}\n\t{}\n".format(k,ucds))
In [ ]:
wb = 'optical'
orecords = search(wb)
In [ ]:
NPROCS=8 # double the number of cpus
%time ocats = get_records_metadata_dask(orecords, NPROCS)
In [ ]:
ocats = [c for c in ocats if c is not None]
print(len(ocats))
docats = {k:v for c in ocats for k,v in c.items()}
len(docats)
In [ ]:
polarization = {}
for k,v in docats.items():
ucds = v['ucds']
if any('phys.polarization' in u for u in ucds):
polarization[k] = v
In [ ]:
for k,v in polarization.items():
print("{}\n\t{}\n".format(k,v['ucds']))
In [ ]:
len(polarization)
In [ ]:
import json
with open('vo_optical_polarization_scs_catalogs.json','w') as fp:
json.dump(polarization, fp, indent=4)
In [4]:
wbs = ['radio','millimeter','infrared','optical','uv','x-ray','gamma-ray']
In [5]:
def search_catalogs(wb, nrecs=None):
# import logging
# logging.getLogger("astropy").setLevel(0)
# logging.getLogger("pyvo").setLevel(0)
records = search(wb)
if records is None or not len(records):
return None
if nrecs and nrecs > 0:
nrecs = min(nrecs,len(records))
records = [records[i] for i in range(nrecs)]
cats = get_records_metadata_dask(records, NPROCS)
cats = list(filter(lambda c:c is not None, cats))
return cats
In [6]:
def write(catalogs, wb):
import json
fn = 'vo_{}_scs_catalogs.json'.format(wb)
with open(fn, 'w') as fp:
json.dump(catalogs, fp, indent=4)
In [7]:
NPROCS=8 # number of cpus x2
resources = {}
for wb in wbs:
print('\n> Waveband:', wb)
cats = search_catalogs(wb)
if cats is None:
print('No catalogs found.')
continue
print('Number of catalogs:', len(cats))
write(cats, wb)
resources[wb] = cats
In [ ]: