Set up a simple queue of SBs to run a real-time ARL pipeline, outputting QA to a TM queue.


In [ ]:
import queue

import numpy

from matplotlib import pylab
# %matplotlib inline

pylab.rcParams['figure.figsize'] = (8.0, 8.0)
pylab.rcParams['image.cmap'] = 'rainbow'

import matplotlib.pyplot as plt

from astropy.coordinates import SkyCoord
from astropy import units as u

from data_models.polarisation import PolarisationFrame

from wrappers.serial.skycomponent.operations import create_skycomponent
from wrappers.serial.simulation.testing_support import create_blockvisibility_iterator
from wrappers.serial.simulation.configurations import create_named_configuration
from wrappers.serial.calibration.operations import qa_gaintable
from wrappers.serial.calibration.rcal import rcal

Define the data to be generated


In [ ]:
lowcore = create_named_configuration('LOWBD2-CORE')
times = numpy.linspace(-3.0, +3.0, 7) * numpy.pi / 12.0
frequency = numpy.linspace(1.0e8, 1.50e8, 3)
channel_bandwidth = numpy.array([5e7, 5e7, 5e7])

# Define the component and give it some polarisation and spectral behaviour
f = numpy.array([100.0, 20.0, -10.0, 1.0])
flux = numpy.array([f, 0.8 * f, 0.6 * f])
phasecentre = SkyCoord(ra=+15.0 * u.deg, dec=-35.0 * u.deg, frame='icrs', equinox='J2000')
compdirection = SkyCoord(ra=17.0 * u.deg, dec=-36.5 * u.deg, frame='icrs', equinox='J2000')
comp = create_skycomponent(flux=flux, frequency=frequency, direction=compdirection)

In [ ]:
def plotgain(gt, title=''):
    plt.clf()
    plt.plot(numpy.real(gt.gain[...,0,0]).flat, numpy.imag(gt.gain[...,0,0]).flat, '.')
    plt.plot(numpy.real(gt.gain[...,1,1]).flat, numpy.imag(gt.gain[...,1,1]).flat, '.')
    plt.title(title)
    plt.xlabel('Real part of gain')
    plt.ylabel('Imaginary part of gain')
    plt.show()

Create two queues, an input and output. Call them CSP (in) and TM (out).


In [ ]:
csp_queue = queue.Queue()
tm_queue = queue.Queue()

Now populate the CSP queue with 3 "Measurements".


In [ ]:
num_tasks_to_queue = 3
for i in range(num_tasks_to_queue):
    sb_id = i+100000
    data = [sb_id, create_blockvisibility_iterator(lowcore, times=times,
                                             frequency=frequency,
                                             channel_bandwidth=channel_bandwidth, phasecentre=phasecentre,
                                             weight=1, polarisation_frame=PolarisationFrame('linear'),
                                             integration_time=1.0, number_integrations=1,
                                             components=comp, phase_error=0.1, amplitude_error=0.01)]
    csp_queue.put(data)

Can view the queue contents, if desired. Essentially an SDP Jobs List.


In [ ]:
for elem in list(csp_queue.queue):
    print(elem)

Can iterate over the queue, to process those jobs (non-prioritised).


In [ ]:
for ingest in iter(csp_queue.get, None):
    sb_id = ingest[0]
    print("Processing SB_ID:", sb_id, "(", csp_queue.qsize(), "items left in CSP queue )")
    rcal_pipeline = rcal(vis=ingest[1], components=comp, phase_only=False)

    print("Starting pipeline")
    for igt, gt in enumerate(rcal_pipeline):
        plotgain(gt, title="Chunk %d, time %s,  residual %.3g (Jy)" % (igt, numpy.unique(gt.time),
                                                                   numpy.average(gt.residual)))
        tm_queue.put( [sb_id, qa_gaintable(gt)] )

    print("Ingest and RCAL pipelines are empty, stopping")

print("CSP Queue is empty, stopping")
csp_queue.join()

In [ ]: