We want to obtain the wavelength calibration for an arc spectrum, provided we already have a list (the 'master list') with the wavelengths of all the possible lines, and a rough idea of the wavelength range covered by that spectrum.
Here I am going to explain how this wavelength calibration can be carried out using for that purpose a pattern matching based on line triplets. Note that a triplet is characterized by the relative location of the middle line relative to the other two lines. In this way, two triplets can look similar to each other if that relative location is not very different. But as soon as the middle line appears in a different relative position, the two triplets can readily be discarded as being similar.
The idea behind the method here described is the following:
The potential wavelengths of the arc lines must be available in a master list, which should cover the full wavelength range of the observed arc spectra.
Given a particular observed arc spectrum, the coordinates of the line peaks (in pixel detector units) can be easily determined. Using these line peak coordinates, line triplets built from consecutive lines are going to be considered. Note that the lines must be consecutive in order to guanrantee that lineary deviations are not important when computing the relative position of the central linea of each line triplet. This means that, if nlines_arc
is the observed number of arc lines in the spectrum, nlines_arc-2
is the total number of line triplets considered.
Since each triplet from the observed arc spectrum is characterized by the relative location of the central line, triplets built from lines in the master list, with a similar relative location of the central line, are potential matches. Note that in the case of the master list, the triplets are not restricted to consecutive lines, since this master list can contain faint lines that may be undetected in the observed spectrum. As expected, the potential number of triplets built from the master list can be very large. So, to each triplet in the observed arc spectrum we can associate a bunch of triplets from the master list.
Each triplet from the master list provides a solution CRVAL1, CDELT1 for the wavelength calibration. The method is based on the assumption that, after plotting all the possible solutions in a diagram CDELT1 - CRVAL1, the erroneous solutions will be scattered in this diagram, while one should find an accumulation of points around the correct values.
For the code of this notebook to be self-contained, we start with the required initialization.
In [1]:
from __future__ import division
from __future__ import print_function
import sys
import numpy as np
from numpy.polynomial import polynomial
from scipy.stats import norm
import scipy.misc
import matplotlib.pyplot as plt
% matplotlib inline
import itertools
ldebug = True # display intermediate information
lplot = True # display intermediate plots
lpause = False # pause execution waiting for the user
def pause(lpause):
if lpause:
raw_input('press <RETURN> to continue...')
For this notebook, we are going to create a synthetic master list covering a wavelength interval ranging from wv_ini_master
to wv_end_master
, containing a total of nlines_master
lines. The resulting collection of master lines will be stored in a numpy array named wv_master
. An auxiliary function simulate_master_table
is defined to help in this step.
In [2]:
def simulate_master_table(my_seed, wv_ini_master, wv_end_master, nlines_master,
ldebug=False):
"""Generates a simulated master table of wavelengths.
The location of the lines follows a random uniform distribution
between `wv_ini_master` and `wv_end_master`.
Parameters
----------
my_seed : int
Seed to re-initialize random number generation.
wv_ini_master : float
Minimum wavelength in master table.
wv_end_master : float
Maximum wavelength in master table.
nlines_master : int
Total number of lines in master table.
ldebug : bool
If True intermediate results are displayed.
Returns
-------
wv_master : 1d numpy array, float
Array with wavelengths corresponding to the master table (Angstroms).
"""
if my_seed is not None:
np.random.seed(my_seed)
if wv_end_master < wv_ini_master:
raise ValueError('wv_ini_master=' + str(wv_ini_master) +
' must be <= wv_end_master=' + str(wv_end_master))
wv_master = np.random.uniform(low=wv_ini_master,
high=wv_end_master,
size=nlines_master)
wv_master.sort() # in-place sort
if ldebug:
print('>>> Master table:')
for val in zip(range(nlines_master), wv_master):
print(val)
pause(lpause)
return wv_master
The master list with wavelengths is immediately defined using this function.
In [3]:
# master list parameters
my_seed = 432
wv_ini_master = 3000
wv_end_master = 7000
nlines_master = 120
# generate line wavelengths, following a random uniform distribution
wv_master = simulate_master_table(my_seed, wv_ini_master, wv_end_master, nlines_master,
ldebug=ldebug)
Once this master list is generated, it is necessary to determine all the possible triplets that can be built from that list. In addition, the relative position of the central line of each triplet must also be computed. This is done by the following auxiliaty function. Note that given a particular master list, this function should be called only once. This is important when calibrating an image containing several spectra of the same arc (e.g. EMIR or MEGARA).
In [4]:
def gen_triplets_master(wv_master, ldebug=False, lplot=False):
"""Compute information associated to triplets in master table.
Determine all the possible triplets that can be generated from the
array `wv_master`. In addition, the relative position of the central
line of each triplet is also computed.
Parameters
----------
wv_master : 1d numpy array, float
Array with wavelengths corresponding to the master table (Angstroms).
ldebug : bool
If True intermediate results are displayed.
lplot : bool
If True intermediate plots are displayed.
Returns
-------
ntriplets_master : int
Number of triplets built from master table.
ratios_master_sorted : 1d numpy array, float
Array with values of the relative position of the central line of each
triplet, sorted in ascending order.
triplets_master_sorted_list : list of tuples
List with tuples of three numbers, corresponding to the three line
indices in the master table. The list is sorted to be in correspondence
with `ratios_master_sorted`.
"""
nlines_master = wv_master.size
#---
# Generate all the possible triplets with the numbers of the lines in the
# master table. Each triplet is defined as a tuple of three numbers
# corresponding to the three line indices in the master table. The
# collection of tuples is stored in an ordinary python list.
iter_comb_triplets = itertools.combinations(range(nlines_master), 3)
triplets_master_list = []
for val in iter_comb_triplets:
triplets_master_list.append(val)
# Verify that the number of triplets coincides with the expected value.
ntriplets_master = len(triplets_master_list)
if ntriplets_master == scipy.misc.comb(nlines_master, 3, exact=True):
if ldebug:
print('>>> Total number of lines in master table:',
nlines_master)
print('>>> Number of triplets in master table...:',
ntriplets_master)
else:
raise ValueError('Invalid number of combinations')
# For each triplet, compute the relative position of the central line.
ratios_master = np.zeros(ntriplets_master)
for i_tupla in range(ntriplets_master):
i1, i2, i3 = triplets_master_list[i_tupla]
delta1 = wv_master[i2]-wv_master[i1]
delta2 = wv_master[i3]-wv_master[i1]
ratios_master[i_tupla] = delta1/delta2
# Compute the array of indices that index the above ratios in sorted order.
isort_ratios_master = np.argsort(ratios_master)
# Simultaneous sort of position ratios and triplets.
ratios_master_sorted = ratios_master[isort_ratios_master]
triplets_master_sorted_list = \
[triplets_master_list[i] for i in isort_ratios_master]
if lplot:
# Compute and plot histogram with position ratios
bins_in = np.linspace(0.0, 1.0, 41)
hist, bins_out = np.histogram(ratios_master, bins=bins_in)
#
fig = plt.figure()
ax = fig.add_subplot(111)
width_hist = 0.8*(bins_out[1]-bins_out[0])
center = (bins_out[:-1]+bins_out[1:])/2
ax.bar(center, hist, align='center', width=width_hist)
ax.set_xlabel('distance ratio in each triplet')
ax.set_ylabel('Number of triplets')
plt.show()
pause(lpause)
return ntriplets_master, ratios_master_sorted, triplets_master_sorted_list
In [5]:
ntriplets_master, ratios_master_sorted, triplets_master_sorted_list = \
gen_triplets_master(wv_master, ldebug=ldebug, lplot=lplot)
Now we are going to generate a simulated arc spectrum by assuming a particular wavelength range (defined by wv_ini_arc
and wv_end_arc
), the number of pixels (naxis1_arc
), a probability for the lines in the master list to be detected in the arc (prob_line_master_in_arc
), a minimum distance between arc lines (delta_xpos_min_arc
), the maximum deviation from linearity of the calibration polynomial (delta_lambda
in angstrom) an error in the location of the arc lines (error_xpos_arc
in pixels), a polynomial degree for the wavelength calibration (poly_degree
), and a fraction of lines in the arc that are not going to be present in the master list (fraction_unknown_lines
).
In [6]:
wv_ini_arc = 4000
wv_end_arc = 5000
naxis1_arc = 1024
prob_line_master_in_arc = 0.80
delta_xpos_min_arc = 4.0
delta_lambda = 5.0
error_xpos_arc = 0.3
poly_degree = 2
fraction_unknown_lines = 0.20
The actual generation of the arc lines is performed by the auxiliary function simulate_arc
.
In [7]:
def simulate_arc(wv_ini_master,
wv_end_master,
wv_master,
wv_ini_arc,
wv_end_arc,
naxis1_arc,
prob_line_master_in_arc,
delta_xpos_min_arc,
delta_lambda,
error_xpos_arc,
poly_degree,
fraction_unknown_lines,
ldebug=False,
lplot=False,
lpause=False):
"""Generates simulated input for arc calibration.
Parameters
----------
wv_ini_master : float
Minimum wavelength in master table.
wv_end_master : float
Maximum wavelength in master table.
wv_master : 1d numpy array, float
Array with wavelengths corresponding to the master table (Angstroms).
wv_ini_arc : float
Minimum wavelength in arc spectrum.
wv_end_arc : float
Maximum wavelength in arc spectrum.
naxis1_arc : int
NAXIS1 of arc spectrum.
prob_line_master_in_arc : float
Probability that a master table line appears in the arc spectrum.
delta_xpos_min_arc : float
Minimum distance (pixels) between lines in arc spectrum.
delta_lambda : float
Maximum deviation (Angstroms) from linearity in arc calibration
polynomial.
error_xpos_arc : float
Standard deviation (pixels) employed to introduce noise in the arc
spectrum lines. The initial lines are shifted from their original
location following a Normal distribution with mean iqual to zero and
sigma equal to this parameter.
poly_degree : int
Polynomial degree corresponding to the original wavelength calibration
function.
fraction_unknown_lines : float
Fraction of lines that on average will be unknown (i.e., lines that
appear in the arc spectrum that are not present in the master table).
ldebug : bool
If True intermediate results are displayed.
lplot : bool
If True intermediate plots are displayed.
Returns
-------
nlines_arc : int
Number of arc lines
xpos_arc : 1d numpy array, float
Location of arc lines (pixels).
crval1_arc : float
CRVAL1 for arc spectrum (linear approximation).
cdelt1_arc : float
CDELT1 for arc spectrum (linear approximation).
c0_arc, c1_arc, c2_arc : floats
Coefficients of the second order polynomial.
ipos_wv_arc : 1d numpy array, int
Number of line in master table corresponding to each arc line. Unknown
lines (i.e. those that are not present in the master table) are
assigned to -1.
coeff_original : 1d numpy array, float
Polynomial coefficients ordered from low to high, corresponding to the
fit to the arc lines before the inclusion of unknown lines.
"""
if (wv_ini_arc < wv_ini_master) or (wv_ini_arc > wv_end_master):
print('wv_ini_master:', wv_ini_master)
print('wv_end_master:', wv_end_master)
print('wv_ini_arc...:', wv_ini_arc)
raise ValueError('wavelength_ini_arc outside valid range')
if (wv_end_arc < wv_ini_master) or (wv_end_arc > wv_end_master):
print('wv_ini_master:', wv_ini_master)
print('wv_end_master:', wv_end_master)
print('wv_end_arc...:', wv_end_arc)
raise ValueError('wavelength_ini_arc outside valid range')
if wv_end_arc < wv_ini_arc:
raise ValueError('wv_ini_arc=' + str(wv_ini_arc) +
' must be <= wv_end_arc=' + str(wv_end_arc))
#---
nlines_master = wv_master.size
crval1_arc = wv_ini_arc
cdelt1_arc = (wv_end_arc-wv_ini_arc)/float(naxis1_arc-1)
crpix1_arc = 1.0
if ldebug:
print('>>> CRVAL1, CDELT1, CRPIX1....:', crval1_arc, cdelt1_arc, crpix1_arc)
#---
# The arc lines constitute a subset of the master lines in the considered
# wavelength range.
i1_master = np.searchsorted(wv_master, wv_ini_arc)
i2_master = np.searchsorted(wv_master, wv_end_arc)
nlines_temp = i2_master-i1_master
nlines_arc_ini = int(round(nlines_temp*prob_line_master_in_arc))
ipos_wv_arc_ini = np.random.choice(range(i1_master,i2_master),
size = nlines_arc_ini,
replace = False)
ipos_wv_arc_ini.sort() # in-place sort
wv_arc_ini = wv_master[ipos_wv_arc_ini]
if ldebug:
print('>>> Number of master lines in arc region.:', nlines_temp)
print('>>> Initial number of arc lines..........:', nlines_arc_ini)
print('>>> Initial selection of master list lines for arc:')
print(ipos_wv_arc_ini)
# Remove too close lines.
ipos_wv_arc = np.copy(ipos_wv_arc_ini[0:1])
wv_arc = np.copy(wv_arc_ini[0:1])
i_last = 0
for i in range(1,nlines_arc_ini):
delta_xpos = (wv_arc_ini[i]-wv_arc_ini[i_last])/cdelt1_arc
if delta_xpos > delta_xpos_min_arc:
ipos_wv_arc = np.append(ipos_wv_arc, ipos_wv_arc_ini[i])
wv_arc = np.append(wv_arc, wv_arc_ini[i])
i_last = i
else:
if ldebug:
print('--> skipping line #',i,'. Too close to line #',i_last)
nlines_arc = len (wv_arc)
if ldebug:
print('>>> Intermediate number of arc lines.....:', nlines_arc)
print('>>> Intermediate selection of master list lines for arc:')
print(ipos_wv_arc)
# Generate pixel location of the arc lines.
if delta_lambda == 0.0:
# linear solution
xpos_arc = (wv_arc - crval1_arc)/cdelt1_arc + 1.0
c0_arc = wv_ini_arc
c1_arc = cdelt1_arc
c2_arc = 0.0
else:
# polynomial solution
c0_arc = wv_ini_arc
c1_arc = (wv_end_arc-wv_ini_arc-4*delta_lambda)/float(naxis1_arc-1)
c2_arc = 4*delta_lambda/float(naxis1_arc-1)**2
xpos_arc = (-c1_arc+np.sqrt(c1_arc**2-4*c2_arc*(c0_arc-wv_arc)))
xpos_arc /= 2*c2_arc
xpos_arc += 1 # convert from 0,...,(NAXIS1-1) to 1,...,NAXIS1
# Introduce noise in arc line positions.
if error_xpos_arc > 0:
xpos_arc += np.random.normal(loc=0.0,
scale=error_xpos_arc,
size=nlines_arc)
# Check that the order of the lines has not been modified.
xpos_arc_copy = np.copy(xpos_arc)
xpos_arc_copy.sort() # in-place sort
if sum(xpos_arc == xpos_arc_copy) != len(xpos_arc):
raise ValueError('FATAL ERROR: arc line switch after introducing noise')
if lplot:
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_xlim([1,naxis1_arc])
ax.set_ylim([wv_ini_arc,wv_end_arc])
ax.plot(xpos_arc, wv_arc, 'ro')
xp = np.array([1,naxis1_arc])
yp = np.array([wv_ini_arc,wv_end_arc])
ax.plot(xp,yp,'b-')
xp = np.arange(1,naxis1_arc+1)
yp = c0_arc+c1_arc*(xp-1)+c2_arc*(xp-1)**2
ax.plot(xp,yp,'g:')
ax.set_xlabel('pixel position in arc spectrum')
ax.set_ylabel('wavelength (Angstrom)')
plt.show()
pause(lpause)
# Unweighted polynomial fit.
coeff_original = polynomial.polyfit(xpos_arc, wv_arc, poly_degree)
poly_original = polynomial.Polynomial(coeff_original)
if lplot:
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_xlim([1,naxis1_arc])
if delta_lambda == 0.0:
if error_xpos_arc > 0:
ax.set_ylim([-4*error_xpos_arc*cdelt1_arc,
4*error_xpos_arc*cdelt1_arc])
else:
ax.set_ylim([-1.1,1.1])
else:
ax.set_ylim([-delta_lambda*1.5,delta_lambda*1.5])
yp = wv_arc - (crval1_arc+(xpos_arc-1)*cdelt1_arc)
ax.plot(xpos_arc, yp, 'ro')
xp = np.array([1,naxis1_arc])
yp = np.array([0,0])
ax.plot(xp,yp,'b-')
xp = np.arange(1,naxis1_arc+1)
yp = c0_arc+c1_arc*(xp-1)+c2_arc*(xp-1)**2
yp -= crval1_arc+cdelt1_arc*(xp-1)
ax.plot(xp,yp,'g:')
yp = poly_original(xp)
yp -= crval1_arc+cdelt1_arc*(xp-1)
ax.plot(xp,yp,'m-')
ax.set_xlabel('pixel position in arc spectrum')
ax.set_ylabel('residuals (Angstrom)')
plt.show()
pause(lpause)
#---
# Include unknown lines (lines that do not appear in the master table).
nunknown_lines = int(round(fraction_unknown_lines * float(nlines_arc)))
if ldebug:
print('>>> Number of unknown arc lines..........:', nunknown_lines)
for i in range(nunknown_lines):
iiter = 0
while True:
iiter += 1
if iiter > 1000:
raise ValueError('iiter > 1000 while adding unknown lines')
xpos_dum = np.random.uniform(low = 1.0,
high = float(naxis1_arc),
size = 1)
isort = np.searchsorted(xpos_arc, xpos_dum)
newlineok = False
if isort == 0:
dxpos1 = abs(xpos_arc[isort]-xpos_dum)
if dxpos1 > delta_xpos_min_arc:
newlineok = True
elif isort == nlines_arc:
dxpos2 = abs(xpos_arc[isort-1]-xpos_dum)
if dxpos2 > delta_xpos_min_arc:
newlineok = True
else:
dxpos1 = abs(xpos_arc[isort]-xpos_dum)
dxpos2 = abs(xpos_arc[isort-1]-xpos_dum)
if (dxpos1 > delta_xpos_min_arc) and \
(dxpos2 > delta_xpos_min_arc):
newlineok = True
if newlineok:
xpos_arc = np.insert(xpos_arc, isort, xpos_dum)
ipos_wv_arc = np.insert(ipos_wv_arc, isort, -1)
nlines_arc += 1
if ldebug:
print('--> adding unknown line at pixel:',xpos_dum)
break
if ldebug:
print('>>> Final number of arc lines............:', nlines_arc)
for val in zip(range(nlines_arc), ipos_wv_arc, xpos_arc):
print(val)
pause(lpause)
if lplot:
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_ylim([0.0,3.0])
ax.vlines(wv_master, 0.0, 1.0)
ax.vlines(wv_arc, 1.0, 2.0, colors='r', linestyle=':')
ax.vlines(wv_ini_arc, 0.0, 3.0, colors='m', linewidth=3.0)
ax.vlines(wv_end_arc, 0.0, 3.0, colors='m', linewidth=3.0)
ax.set_xlabel('wavelength')
axbis = ax.twiny()
axbis.vlines(xpos_arc, 2.0, 3.0, colors='g')
xmin_xpos_master = (wv_ini_master - crval1_arc)/cdelt1_arc + 1.0
xmax_xpos_master = (wv_end_master - crval1_arc)/cdelt1_arc + 1.0
axbis.set_xlim([xmin_xpos_master, xmax_xpos_master])
axbis.set_xlabel('pixel position in arc spectrum')
plt.show()
pause(lpause)
return nlines_arc, xpos_arc, crval1_arc, cdelt1_arc, \
c0_arc, c1_arc, c2_arc, \
ipos_wv_arc, coeff_original
With the help of this function it is immediate to simulate the arc spectrum.
In [8]:
nlines_arc, xpos_arc, crval1_arc, cdelt1_arc, \
c0_arc, c1_arc, c2_arc, ipos_wv_arc, coeff_original = \
simulate_arc(wv_ini_master,
wv_end_master,
wv_master,
wv_ini_arc,
wv_end_arc,
naxis1_arc,
prob_line_master_in_arc,
delta_xpos_min_arc,
delta_lambda,
error_xpos_arc,
poly_degree,
fraction_unknown_lines,
ldebug=ldebug,
lplot=lplot)
The last plot represents all the lines of the master list (black vertical lines at the bottom) and the arc lines corresponding to the simulated spectrum (green vertical lines at the top). The intermediate dotted red lines try to connect the master lines with the arc lines. However, since we are introducing a distortion in the wavelength scale, the green lines do not align perfectly with the master lines. In addition, some green lines have been artificially added to the arc spectrum that do not appear in the master list. These two effects are used to check how robust the wavelength calibration procedure is when confronting these problems.
At this point all the required information is already available to perform the wavelength calibration.
In [9]:
nlines_master = wv_master.size
nlines_arc = xpos_arc.size
if nlines_arc < 5:
raise ValueError('Insufficient arc lines=' + str(nlines_arc))
The following code should be incorporated into a function. We start by defining some parameters which are going to be useful later. The meaning of those parameters will be explained when they are used for the first time.
In [10]:
# parameters for wavelength calibration
wv_ini_search = wv_ini_master # minimum wavelength to be considered
wv_end_search = wv_end_master # maximum wavelength to be considered
times_sigma_r = 3.0 # maximum allowed uncertainty in arc line location
frac_triplets_for_sum = 0.50 # fraction of layers to be used when computing the cost function for each initial solution
times_sigma_theil_sen = 10.0 # times sigma to remove deviant lines using the Theil-Sen method
poly_degree_wfit = 2 # degree for polynomial fit to wavelength calibration
times_sigma_polfilt = 10.0 # times sigma to reject deviant lines using a polynomial fit
times_sigma_inclusion = 5.0 # times sigma to incorporate new lines that remain unidentified after the main procedure
It is time now to generate all the triplets that can be built using consecutive arc lines. This number is equal to the total number of arc lines minus 2.
A given arc triplet will be defined by the location of three lines $x_1$, $x_2$ and $x_3$ (in pixel units). This triplet will be characterized by the relative location of the central line (ratio_arc
), defined as $$r \equiv \frac{x_2-x_1}{x_3-x_1}.$$
This number will always verify $r \in [0.1]$. In addition, we are assuming that those line locations have been measured with an uncertainty $\Delta x$ (in pixel units; the same for each $x_i$, with $i=1, 2, 3$) given by error_xpos_arc
. An important expression is the relation between this $\Delta x$ and $\Delta r$. Using the law of propagation of errors, it is easy to show that $$\Delta r = \frac{\sqrt{2} \, \Delta x}{x_3-x_1} \sqrt{r(r-1)+1}.$$
For each arc triplet, characterized by its $r$ value ($r_{arc-triplet}$), compatible triplets from the master table are sought. By compatible triplet we consider a master line triplet which $r$ value $r_{master-triplet}$ is close to $r_{arc-triplet}$ ($\pm$ a number of times the value of $\Delta r$; this number of times is determined by times_sigma_r
).
Each compatible triplet from the master table provides an estimate for CRVAL1 and CDELT1. By definition, $\lambda = \lambda_{ini} + (N_{pixel}-1) \Theta$, where $\lambda_{ini}$=CRVAL1 (the central wavelength of the first pixel, in Angstrom), $\Theta$=CDELT1 (the dispersion, in Angstrom/pixel), and $N_{pixel}$ is the pixel number. Since the triplet from the master table provides the wavelengths for three lines ($\lambda_i$, for $i=1,2,3$), the previous expression leads to $$\Theta = \displaystyle\frac{\lambda_3-\lambda_1}{x_3-x_1}$$ and $$\lambda_{ini} = \lambda_2 -(x_2-1) \Theta.$$
From here is also possible to derive the uncertainties in these two parameters (which are correlated!): $$\Delta\Theta = \sqrt{2}\,\Theta\,\frac{\Delta x}{x_3-x_1}$$ and $$\Delta\lambda_{ini} = \theta \,\Delta x \sqrt{1+2 \frac{(x_2-1)^2}{(x_3-x_1)^2}}.$$
As an additional constraint, the only valid solutions are those for which the initial and the final wavelengths for the arc are restricted to a predefined wavelength interval (defined by wv_ini_search
and wv_end_search
). Note that this interval must be generous enough to accommodate the real solution. On the contrary, the algorithm can fail to identify the triplets located just at the border of the wavelength interval.
In [11]:
crval1_search = np.array([])
cdelt1_search = np.array([])
error_crval1_search = np.array([])
error_cdelt1_search = np.array([])
itriplet_search = np.array([])
clabel_search = []
ntriplets_arc = nlines_arc - 2
if ldebug:
print('>>> Total number of arc lines............:', nlines_arc)
print('>>> Total number of arc triplets.........:', ntriplets_arc)
# Loop in all the arc line triplets. Note that only triplets built from
# consecutive arc lines are considered.
for i in range(ntriplets_arc):
i1, i2, i3 = i, i+1, i+2
dist12 = xpos_arc[i2]-xpos_arc[i1]
dist13 = xpos_arc[i3]-xpos_arc[i1]
ratio_arc = dist12/dist13
pol_r = ratio_arc*(ratio_arc-1)+1
error_ratio_arc = np.sqrt(2)*error_xpos_arc/dist13*np.sqrt(pol_r)
ratio_arc_min = max(0.0, ratio_arc-times_sigma_r*error_ratio_arc)
ratio_arc_max = min(1.0, ratio_arc+times_sigma_r*error_ratio_arc)
# determine compatible triplets from the master list
j_loc_min = np.searchsorted(ratios_master_sorted, ratio_arc_min)-1
j_loc_max = np.searchsorted(ratios_master_sorted, ratio_arc_max)+1
if j_loc_min < 0:
j_loc_min = 0
if j_loc_max > ntriplets_master:
j_loc_max = ntriplets_master
if ldebug:
print(i, ratio_arc_min, ratio_arc, ratio_arc_max,
j_loc_min, j_loc_max)
# each triplet from the master list provides a potential solution
# for CRVAL1 and CDELT1
for j_loc in range(j_loc_min, j_loc_max):
j1, j2, j3 = triplets_master_sorted_list[j_loc]
# initial solutions for CDELT1, CRVAL1 and CRVALN
cdelt1_temp = (wv_master[j3]-wv_master[j1])/dist13
crval1_temp = wv_master[j2]-(xpos_arc[i2]-1)*cdelt1_temp
crvaln_temp = crval1_temp + float(naxis1_arc-1)*cdelt1_temp
# check that CRVAL1 and CRVALN are within the valid limits
if wv_ini_search <= crval1_temp <= wv_end_search:
if wv_ini_search <= crvaln_temp <= wv_end_search:
# Compute errors
error_crval1_temp = cdelt1_temp*error_xpos_arc* \
np.sqrt(1+2*((xpos_arc[i2]-1)**2)/(dist13**2))
error_cdelt1_temp = np.sqrt(2)*cdelt1_temp* \
error_xpos_arc/dist13
# Store values and errors
crval1_search = np.append(crval1_search, crval1_temp)
cdelt1_search = np.append(cdelt1_search, cdelt1_temp)
error_crval1_search = np.append(error_crval1_search,
error_crval1_temp)
error_cdelt1_search = np.append(error_cdelt1_search,
error_cdelt1_temp)
# Store additional information about the triplets
itriplet_search = np.append(itriplet_search, i)
clabel_search.append((j1, j2, j3))
# Maximum allowed value for CDELT1
cdelt1_max = (wv_end_search-wv_ini_search)/float(naxis1_arc-1)
if lplot:
# CDELT1 vs CRVAL1 diagram (original coordinates)
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_xlabel('cdelt1 (Angstroms/pixel)')
ax.set_ylabel('crval1 (Angstroms)')
ax.scatter(cdelt1_search, crval1_search, s=200, alpha=0.1)
xmin = 0.0
xmax = cdelt1_max
dx = xmax-xmin
xmin -= dx/20
xmax += dx/20
ax.set_xlim([xmin,xmax])
ymin = wv_ini_search
ymax = wv_end_search
dy = ymax-ymin
ymin -= dy/20
ymax += dy/20
ax.set_ylim([ymin,ymax])
xp_limits = np.array([0., cdelt1_max])
yp_limits = wv_end_search-float(naxis1_arc-1)*xp_limits
xp_limits = np.concatenate((xp_limits,[xp_limits[0],xp_limits[0]]))
yp_limits = np.concatenate((yp_limits,[yp_limits[1],yp_limits[0]]))
ax.plot(xp_limits, yp_limits, linestyle='-', color='red')
plt.show()
print('Number of points in last plot:', len(cdelt1_search))
pause(lpause)
The last plot shows the initial solutions plotted in the CDELT1-CRVAL1 diagram. Note that there is a concentration of solutions overplotted around CDELT1=1.0 and CRVAL1=4000 Angstroms (the right solution). However, there are some regions of this diagram with also high concentration of points. The limits corresponding to the valid area in this diagram are displayed with the red lines. Outside that region, no valid solutions can be found (the previous code has discarded them).
Since we are going to measure distances in this diagram, it is useful to normalized the ranges in both axis.
In [12]:
# Normalize the values of CDELT1 and CRVAL1 to the interval [0,1] in each
# case.
cdelt1_search_norm = cdelt1_search/cdelt1_max
error_cdelt1_search_norm = error_cdelt1_search/cdelt1_max
#
crval1_search_norm = (crval1_search-wv_ini_search)
crval1_search_norm /= (wv_end_search-wv_ini_search)
error_crval1_search_norm = error_crval1_search/(wv_ini_search-wv_end_search)
# Intermediate plots
if lplot:
# CDELT1 vs CRVAL1 diagram (normalized coordinates)
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_xlabel('normalized cdelt1')
ax.set_ylabel('normalized crval1')
ax.scatter(cdelt1_search_norm, crval1_search_norm, s=200, alpha=0.1)
xmin = -0.05
xmax = 1.05
ymin = -0.05
ymax = 1.05
ax.set_xlim([xmin,xmax])
ax.set_ylim([ymin,ymax])
xp_limits = np.array([0.,1.,0.,0.])
yp_limits = np.array([1.,0.,0.,1.])
ax.plot(xp_limits, yp_limits, linestyle='-', color='red')
plt.show()
print('Number of points in last plot:', len(cdelt1_search))
pause(lpause)
This plot can be repeated but using different colors for solutions derived from each arc triplet. In addition, the number of arc triplet is also overplotted.
In [13]:
if lplot:
# CDELT1 vs CRVAL1 diagram (normalized coordinates)
# with different color for each arc triplet and overplotting
# the arc triplet number
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_xlabel('normalized cdelt1')
ax.set_ylabel('normalized crval1')
ax.scatter(cdelt1_search_norm, crval1_search_norm, s=200, alpha=0.1,
c=itriplet_search)
for i in range(len(itriplet_search)):
ax.text(cdelt1_search_norm[i], crval1_search_norm[i],
str(int(itriplet_search[i])), fontsize=6)
ax.set_xlim([xmin,xmax])
ax.set_ylim([ymin,ymax])
ax.plot(xp_limits, yp_limits, linestyle='-', color='red')
plt.show()
pause(lpause)
The same plot but overplotting triplet numbers (the number of three lines within the master list).
In [14]:
if lplot:
# CDELT1 vs CRVAL1 diagram (normalized coordinates)
# including triplet numbers
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_xlabel('normalized cdelt1')
ax.set_ylabel('normalized crval1')
ax.scatter(cdelt1_search_norm, crval1_search_norm, s=200, alpha=0.1,
c=itriplet_search)
for i in range(len(clabel_search)):
ax.text(cdelt1_search_norm[i], crval1_search_norm[i],
clabel_search[i], fontsize=6)
ax.set_xlim([xmin,xmax])
ax.set_ylim([ymin,ymax])
ax.plot(xp_limits, yp_limits, linestyle='-', color='red')
plt.show()
pause(lpause)
And the same plot but including only the error bars (remember that those error bars are correlated!).
In [15]:
if lplot:
# CDELT1 vs CRVAL1 diagram (normalized coordinates)
# with error bars (note that errors in this plot are highly
# correlated)
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_xlabel('normalized cdelt1')
ax.set_ylabel('normalized crval1')
ax.errorbar(cdelt1_search_norm, crval1_search_norm,
xerr=error_cdelt1_search_norm,
yerr=error_crval1_search_norm,
fmt='none')
ax.set_xlim([xmin,xmax])
ax.set_ylim([ymin,ymax])
ax.plot(xp_limits, yp_limits, linestyle='-', color='red')
plt.show()
pause(lpause)
The next step consists in segregating the different solutions by arc triplet. In this way, we have different plots like the previous ones (one for each arc triplet).
In [16]:
# ---
# Segregate the different solutions (normalized to [0,1]) by triplet. In
# this way the solutions are saved in different layers (a layer for each
# triplet). The solutions will be stored as python lists of numpy arrays.
ntriplets_layered_list = []
cdelt1_layered_list = []
error_cdelt1_layered_list = []
crval1_layered_list = []
error_crval1_layered_list = []
itriplet_layered_list = []
clabel_layered_list = []
for i in range(ntriplets_arc):
ldum = (itriplet_search == i)
ntriplets_layered_list.append(ldum.sum())
#
cdelt1_dum = cdelt1_search_norm[ldum]
cdelt1_layered_list.append(cdelt1_dum)
error_cdelt1_dum = error_cdelt1_search_norm[ldum]
error_cdelt1_layered_list.append(error_cdelt1_dum)
#
crval1_dum = crval1_search_norm[ldum]
crval1_layered_list.append(crval1_dum)
error_crval1_dum = error_crval1_search_norm[ldum]
error_crval1_layered_list.append(error_crval1_dum)
#
itriplet_dum = itriplet_search[ldum]
itriplet_layered_list.append(itriplet_dum)
#
clabel_dum = [i for (i, v) in zip(clabel_search, ldum) if v]
clabel_layered_list.append(clabel_dum)
if ldebug:
print('>>> List with no. of solutions/triplet...:',
ntriplets_layered_list)
print('>>> Total number of potential solutions..:',
sum(ntriplets_layered_list))
pause(lpause)
Once all the initial solutions have been segregated in different layers (one for each arc triplet), we can start computing the cost function for each solution.
For each solution, we find the nearest solution in each of the the remaining ntriplets_arc
-1 layers. After sorting all these distances, the cost function is the coaddition of a fraction of them (the fraction is given by frac_triplets_for_sum
). After the computation of the cost function, this function is normalized and segregated by arc triplet.
In [17]:
# ---
# Computation of the cost function.
#
# For each solution, corresponding to a particular triplet, find the
# nearest solution in each of the remaining ntriplets_arc-1 layers.
# Compute the distance (in normalized coordinates) to those closest
# solutions, and obtain the sum of distances considering only a fraction
# of them (after sorting them in ascending order).
ntriplets_for_sum = \
max(1, int(round(frac_triplets_for_sum*float(ntriplets_arc))))
funcost_search = np.zeros(len(itriplet_search))
for k in range(len(itriplet_search)):
itriplet_local = itriplet_search[k]
x0 = cdelt1_search_norm[k]
y0 = crval1_search_norm[k]
dist_to_layers = np.array([])
for i in range(ntriplets_arc):
if i != itriplet_local:
if ntriplets_layered_list[i] > 0:
x1 = cdelt1_layered_list[i]
y1 = crval1_layered_list[i]
dist2 = (x0-x1)**2 + (y0-y1)**2
dist_to_layers = np.append(dist_to_layers, dist2.min())
else:
dist_to_layers = np.append(dist_to_layers, np.inf)
dist_to_layers.sort() # in-place sort
funcost_search[k] = dist_to_layers[range(ntriplets_for_sum)].sum()
# Normalize the cost function
funcost_min = funcost_search.min()
if ldebug:
print('funcost_min:',funcost_min)
funcost_search /= funcost_min
# Segregate the cost function by arc triplet.
funcost_layered_list = []
for i in range(ntriplets_arc):
ldum = (itriplet_search == i)
funcost_dum = funcost_search[ldum]
funcost_layered_list.append(funcost_dum)
if ldebug:
for i in range(ntriplets_arc):
jdum = funcost_layered_list[i].argmin()
print('>>>',i,funcost_layered_list[i][jdum],
clabel_layered_list[i][jdum],
cdelt1_layered_list[i][jdum],
crval1_layered_list[i][jdum])
pause(lpause)
The tabulated data indicate, for each arc line triplet, the corresponding cost function, the number of the three lines within the master list, and the solutions for CDELT1 and CRVAL1 (in normalized units). Since the arc line triplets are consecutive, the above table provides, for each individual arc line, three identifications in the master list (except for the first and last arc lines, for which only one solution are available, and the second and penultimate arc lines, for which two solutions are possible).
After the previous calculations, we can replot the CDELT1-CRVAL1 diagram, but now with symbol sizes proportional to the inverse of the cost function.
In [18]:
if lplot:
# CDELT1 vs CRVAL1 diagram (normalized coordinates)
# with symbol size proportional to the inverse of the cost function
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_xlabel('normalized cdelt1')
ax.set_ylabel('normalized crval1')
ax.scatter(cdelt1_search_norm, crval1_search_norm,
s=2000/funcost_search, c=itriplet_search, alpha=0.2)
ax.set_xlim([xmin,xmax])
ax.set_ylim([ymin,ymax])
ax.plot(xp_limits, yp_limits, linestyle='-', color='red')
plt.show()
print('Number of points in last plot:', len(cdelt1_search))
pause(lpause)
The largets symbols indicate the solutions with higher probabilities of being the correct ones.
Then, we proceed to compile, for each arc line, the three possible identifications within the master list (for the first and last arc line, only one identification is available; for the second and penultiname arc line, two identifications are derived).
In [19]:
# We store the identifications of each line in a python list of lists
# named diagonal_ids (which grows as the different triplets are
# considered). A similar list of lists is also employed to store the
# corresponding cost functions.
for i in range(ntriplets_arc):
jdum = funcost_layered_list[i].argmin()
k1, k2, k3 = clabel_layered_list[i][jdum]
funcost_dum = funcost_layered_list[i][jdum]
if i == 0:
diagonal_ids = [[k1],[k2],[k3]]
diagonal_funcost = [[funcost_dum],[funcost_dum], [funcost_dum]]
else:
diagonal_ids[i].append(k1)
diagonal_ids[i+1].append(k2)
diagonal_ids.append([k3])
diagonal_funcost[i].append(funcost_dum)
diagonal_funcost[i+1].append(funcost_dum)
diagonal_funcost.append([funcost_dum])
if ldebug:
for i in range(nlines_arc):
print(i, diagonal_ids[i],diagonal_funcost[i])
pause(lpause)
The next step is the line identification. Several scenarios are considered:
Lines with three identifications:
Type A: the three identifications are identical. We keep the lowest value of the three cost functions.
Type B: two identifications are identical and one is different. We keep the line with two identifications and the lowest of the corresponding two cost functions.
Type C: the three identifications are different. We keep the one which is closest to a previously identified type B line (it can not be next to a Type A line). We take the corresponding cost function.
Lines with two identifications (second and penultiname lines):
Lines with one identification (first and last lines):
Note that after this procedure, some lines can remain unidentified.
In [20]:
# The solutions are stored in a list of dictionaries. The dictionaries
# contain the following elements:
# - lineok: bool, indicates whether the line has been properly identified
# - type: 'A','B','C','D','E',...
# - id: index of the line in the master table
# - funcost: cost function associated the the line identification
# First, initialize solution.
solution = []
for i in range(nlines_arc):
solution.append({'lineok': False,
'type': None,
'id': None,
'funcost': None})
# Type A lines.
for i in range(2,nlines_arc-2):
j1,j2,j3 = diagonal_ids[i]
if j1 == j2 == j3:
solution[i]['lineok'] = True
solution[i]['type'] = 'A'
solution[i]['id'] = j1
solution[i]['funcost'] = min(diagonal_funcost[i])
if ldebug:
print('\n* Including Type A lines:')
for i in range(nlines_arc):
print(i, solution[i])
pause(lpause)
In [21]:
# Type B lines.
for i in range(2,nlines_arc-2):
if solution[i]['lineok'] == False:
j1,j2,j3 = diagonal_ids[i]
f1,f2,f3 = diagonal_funcost[i]
if j1 == j2:
if max(f1,f2) < f3:
solution[i]['lineok'] = True
solution[i]['type'] = 'B'
solution[i]['id'] = j1
solution[i]['funcost'] = min(f1,f2)
elif j1 == j3:
if max(f1,f3) < f2:
solution[i]['lineok'] = True
solution[i]['type'] = 'B'
solution[i]['id'] = j1
solution[i]['funcost'] = min(f1,f3)
elif j2 == j3:
if max(f2,f3) < f1:
solution[i]['lineok'] = True
solution[i]['type'] = 'B'
solution[i]['id'] = j2
solution[i]['funcost'] = min(f2,f3)
if ldebug:
print('\n* Including Type B lines:')
for i in range(nlines_arc):
print(i, solution[i])
pause(lpause)
In [22]:
# Type C lines.
for i in range(2,nlines_arc-2):
if solution[i]['lineok'] == False:
j1,j2,j3 = diagonal_ids[i]
f1,f2,f3 = diagonal_funcost[i]
if solution[i-1]['type'] == 'B':
if min(f2,f3) > f1:
solution[i]['lineok'] = True
solution[i]['type'] = 'C'
solution[i]['id'] = j1
solution[i]['funcost'] = f1
elif solution[i+1]['type'] == 'B':
if min(f1,f2) > f3:
solution[i]['lineok'] = True
solution[i]['type'] = 'C'
solution[i]['id'] = j3
solution[i]['funcost'] = f3
if ldebug:
print('\n* Including Type C lines:')
for i in range(nlines_arc):
print(i, solution[i])
pause(lpause)
In [23]:
# Type D lines.
for i in [1,nlines_arc-2]:
j1,j2 = diagonal_ids[i]
if j1 == j2:
f1,f2 = diagonal_funcost[i]
solution[i]['lineok'] = True
solution[i]['type'] = 'D'
solution[i]['id'] = j1
solution[i]['funcost'] = min(f1,f2)
if ldebug:
print('\n* Including Type D lines:')
for i in range(nlines_arc):
print(i, solution[i])
pause(lpause)
In [24]:
# Type E lines.
i = 0
if solution[i+1]['lineok'] and solution[i+2]['lineok']:
solution[i]['lineok'] = True
solution[i]['type'] = 'E'
solution[i]['id'] = diagonal_ids[i][0]
solution[i]['funcost'] = diagonal_funcost[i][0]
i = nlines_arc-1
if solution[i-2]['lineok'] and solution[i-1]['lineok']:
solution[i]['lineok'] = True
solution[i]['type'] = 'E'
solution[i]['id'] = diagonal_ids[i][0]
solution[i]['funcost'] = diagonal_funcost[i][0]
if ldebug:
print('\n* Including Type E lines:')
for i in range(nlines_arc):
print(i, solution[i])
pause(lpause)
Check that the solutions do not contain duplicated values. If they are present (probably due to the influence of an unknown line that unfortunately falls too close to a real line in the master table), we keep the solution with the lowest cost function. The removed lines are labelled as type='R'. The procedure is repeated several times in case a line appears more than twice.
In [25]:
lduplicated = True
nduplicated = 0
while lduplicated:
lduplicated = False
for i1 in range(nlines_arc):
if solution[i1]['lineok']:
j1 = solution[i1]['id']
for i2 in range(i1+1,nlines_arc):
if solution[i2]['lineok']:
j2 = solution[i2]['id']
if j1 == j2:
lduplicated = True
nduplicated += 1
f1 = solution[i1]['funcost']
f2 = solution[i2]['funcost']
if f1 < f2:
solution[i2]['lineok'] = False
solution[i2]['type'] = 'R'
else:
solution[i1]['lineok'] = False
solution[i1]['type'] = 'R'
if ldebug:
if nduplicated > 0:
print('\n* Removing Type R lines:')
for i in range(nlines_arc):
print(i, solution[i])
else:
print('\n* No duplicated Type R lines have been found')
pause(lpause)
Since we expect the wavelength calibration to be a smooth function with a small deviation from linearity, we can easily remove wrong lines which are going to appear very far from a robust linear fit (computed using the Theil-Sen method). An important parameter in this step is times_sigma_theil_sen
, the number of times sigma that is employed to determine that a point is too far from the linear fit.
For this procedure we define three auxiliary functions.
In [26]:
def select_data_for_fit(wv_master, xpos_arc, solution):
"""Select information from valid arc lines to facilitate posterior fits.
Parameters
----------
wv_master : 1d numpy array, float
Array with wavelengths corresponding to the master table (Angstroms).
xpos_arc : 1d numpy array, float
Location of arc lines (pixels).
solution : ouput from previous call to arccalibration
Returns
-------
nfit : int
Number of valid points for posterior fits.
ifit : list of int
List of indices corresponding to the arc lines which coordinates are
going to be employed in the posterior fits.
xfit : 1d numpy aray
X coordinate of points for posterior fits.
yfit : 1d numpy array
Y coordinate of points for posterior fits.
wfit : 1d numpy array
Cost function of points for posterior fits. The inverse of these values
can be employed for weighted fits.
"""
nlines_arc = len(solution)
if nlines_arc != xpos_arc.size:
raise ValueError('invalid nlines_arc=' + str(nlines_arc))
nfit = 0
ifit = []
xfit = np.array([])
yfit = np.array([])
wfit = np.array([])
for i in range(nlines_arc):
if solution[i]['lineok']:
ifit.append(i)
xfit = np.append(xfit, xpos_arc[i])
yfit = np.append(yfit, wv_master[solution[i]['id']])
wfit = np.append(wfit, solution[i]['funcost'])
nfit += 1
return nfit, ifit, xfit, yfit, wfit
In [27]:
def fit_theil_sen(x, y):
"""Compute a robust linear fit using the Theil-Sen method.
See http://en.wikipedia.org/wiki/Theil%E2%80%93Sen_estimator for details.
This function "pairs up sample points by the rank of their x-coordinates
(the point with the smallest coordinate being paired with the first point
above the median coordinate, etc.) and computes the median of the slopes of
the lines determined by these pairs of points".
Parameters
----------
x : 1d numpy array, float
X coordinate.
y : 1d numpy array, float
Y coordinate.
Returns
-------
intercept : float
Intercept of the linear fit.
slope : float
Slope of the linear fit.
"""
if x.ndim == y.ndim == 1:
n = x.size
if n == y.size:
if n < 5:
raise ValueError('n=' + str(n) +' is < 5')
result = [] # python list
if (n % 2) == 0:
iextra = 0
else:
iextra = 1
for i in range(n//2):
ii = i + n//2 + iextra
deltax = x[ii]-x[i]
deltay = y[ii]-y[i]
result.append(deltay/deltax)
slope = np.median(result)
result = y - slope*x # numpy array
intercept = np.median(result)
return intercept, slope
else:
raise ValueError('Invalid input sizes')
else:
raise ValueError('Invalid input dimensions')
In [28]:
def sigmaG(x):
"""Compute a robust estimator of the standard deviation.
See Eq. 3.36 (page 84) in Statistics, Data Mining, and Machine
in Astronomy, by Ivezic, Connolly, VanderPlas & Gray
Parameters
----------
x : 1d numpy array, float
Array of input values which standard deviation is requested.
Returns
-------
sigmag : float
Robust estimator of the standar deviation
"""
q25, q75 = np.percentile(x, [25.0, 75.0])
sigmag = 0.7413*(q75-q25)
return sigmag
After the previous definitions of these functions, we can proceed with the fit and the removal of deviant points.
In [29]:
if ldebug:
print('\n>>> Theil-Sen filtering...')
nfit, ifit, xfit, yfit, wfit = \
select_data_for_fit(wv_master, xpos_arc, solution)
intercept, slope = fit_theil_sen(xfit, yfit)
rfit = abs(yfit - (intercept + slope*xfit))
if ldebug:
print('rfit:', rfit)
sigma_rfit = sigmaG(rfit)
if ldebug:
print('sigmaG:',sigma_rfit)
nremoved = 0
for i in range(nfit):
if rfit[i] > times_sigma_theil_sen*sigma_rfit:
solution[ifit[i]]['lineok'] = False
solution[ifit[i]]['type'] = 'T'
nremoved += 1
if ldebug:
if nremoved > 0:
print('\n* Removing Type T lines:')
for i in range(nlines_arc):
print(i, solution[i])
else:
print('\nNo Type T lines have been found and removed')
pause(lpause)
After the previous filtering, we continue by removing points that deviate from a polynomial fit. The filtered lines are labelled as type='P'.
In [30]:
if ldebug:
print('\n>>> Polynomial filtering...')
nfit, ifit, xfit, yfit, wfit = \
select_data_for_fit(wv_master, xpos_arc, solution)
coeff_fit = polynomial.polyfit(xfit, yfit, poly_degree, w=1/wfit)
poly = polynomial.Polynomial(coeff_fit)
rfit = abs(yfit - poly(xfit))
if ldebug:
print('rfit:',rfit)
sigma_rfit = sigmaG(rfit)
if ldebug:
print('sigmaG:',sigma_rfit)
nremoved = 0
for i in range(nfit):
if rfit[i] > times_sigma_polfilt*sigma_rfit:
solution[ifit[i]]['lineok'] = False
solution[ifit[i]]['type'] = 'P'
nremoved += 1
if ldebug:
if nremoved > 0:
print('\n* Removing Type P lines:')
for i in range(nlines_arc):
print(i, solution[i])
else:
print('\nNo type P lines have been found and removed')
pause(lpause)
After removing deviant lines, we can now try to incorporate identifications for those lines that remain unidentified after the previous procedure.
In [31]:
if ldebug:
print('\n>>> Polynomial prediction of unknown lines...')
nfit, ifit, xfit, yfit, wfit = \
select_data_for_fit(wv_master, xpos_arc, solution)
coeff_fit = polynomial.polyfit(xfit, yfit, poly_degree_wfit, w=1/wfit)
poly = polynomial.Polynomial(coeff_fit)
rfit = abs(yfit - poly(xfit))
if ldebug:
print('rfit:',rfit)
sigma_rfit = sigmaG(rfit)
if ldebug:
print('sigmaG:',sigma_rfit)
list_id_already_found = []
list_funcost_already_found = []
for i in range(nlines_arc):
if solution[i]['lineok']:
list_id_already_found.append(solution[i]['id'])
list_funcost_already_found.append(solution[i]['funcost'])
nnewlines = 0
for i in range(nlines_arc):
if not solution[i]['lineok']:
zfit = poly(xpos_arc[i]) # predicted wavelength
isort = np.searchsorted(wv_master, zfit)
if isort == 0:
ifound = 0
dlambda = wv_master[ifound]-zfit
elif isort == nlines_master:
ifound = isort - 1
dlambda = zfit - wv_master[ifound]
else:
dlambda1 = zfit-wv_master[isort-1]
dlambda2 = wv_master[isort]-zfit
if dlambda1 < dlambda2:
ifound = isort - 1
dlambda = dlambda1
else:
ifound = isort
dlambda = dlambda2
if ldebug:
print(i,zfit,ifound,dlambda)
if ifound not in list_id_already_found: # unused line
if dlambda < times_sigma_inclusion*sigma_rfit:
list_id_already_found.append(ifound)
solution[i]['lineok'] = True
solution[i]['type'] = 'I'
solution[i]['id'] = ifound
solution[i]['funcost'] = max(list_funcost_already_found)
nnewlines += 1
if ldebug:
if nnewlines > 0:
print('\n* Including Type I lines:')
for i in range(nlines_arc):
print(i, solution[i])
else:
print('\nNo Type I lines have been found and added')
pause(lpause)
Once the solution has been found, we can finally determine the coefficients of the final wavelength calibration fit. For this purpose, we use an auxiliary function.
In [32]:
def fit_solution(wv_master,
xpos_arc,
solution,
naxis1_arc,
poly_degree_wfit,
weighted,
ldebug=False,
lplot=False,
lpause=False):
"""Fit polynomial to arc calibration solution.
Parameters
----------
wv_master : 1d numpy array, float
Array with wavelengths corresponding to the master table (Angstroms).
xpos_arc : 1d numpy array, float
Location of arc lines (pixels).
solution : list of dictionaries
A list of size equal to the number of arc lines, which elements are
dictionaries containing all the relevant information concerning the
line identification.
naxis1_arc : int
NAXIS1 of arc spectrum.
poly_degree_wfit : int
Polynomial degree corresponding to the wavelength calibration function
to be fitted.
weighted : bool
Determines whether the polynomial fit is weighted or not, using as
weights the values of the cost function obtained in the line
identification.
ldebug : bool
If True intermediate results are displayed.
lplot : bool
If True intermediate plots are displayed.
Returns
-------
coeff : 1d numpy array, float
Coefficients of the polynomial fit.
crval1_approx : float
Approximate CRVAL1 value.
cdetl1_approx : float
Approximate CDELT1 value.
"""
nlines_arc = len(solution)
if nlines_arc != xpos_arc.size:
raise ValueError('Invalid nlines_arc=' + str(nlines_arc))
# Select information from valid lines.
nfit, ifit, xfit, yfit, wfit = \
select_data_for_fit(wv_master, xpos_arc, solution)
# Select list of filtered out and unidentified lines
list_R = []
list_T = []
list_P = []
list_unidentified = []
for i in range(nlines_arc):
if not solution[i]['lineok']:
if solution[i]['type'] is None:
list_unidentified.append(i)
elif solution[i]['type'] == 'R':
list_R.append(i)
elif solution[i]['type'] == 'T':
list_T.append(i)
elif solution[i]['type'] == 'P':
list_P.append(i)
else:
raise ValueError('Unexpected "type"')
# Obtain approximate linear fit using the robust Theil-Sen method.
intercept, slope = fit_theil_sen(xfit, yfit)
cdelt1_approx = slope
crval1_approx = intercept + cdelt1_approx
if ldebug:
print('>>> Approximate CRVAL1 :',crval1_approx)
print('>>> Approximate CDELT1 :',cdelt1_approx)
# Polynomial fit.
if weighted:
coeff = polynomial.polyfit(xfit, yfit, poly_degree_wfit, w=1/wfit)
else:
coeff = polynomial.polyfit(xfit, yfit, poly_degree_wfit)
if ldebug:
print('>>> Fitted coefficients:', coeff)
xpol = np.linspace(1,naxis1_arc,naxis1_arc)
poly = polynomial.Polynomial(coeff)
ypol = poly(xpol)-(crval1_approx+(xpol-1)*cdelt1_approx)
if lplot:
fig = plt.figure()
ax = fig.add_subplot(111)
# identified lines
xp = np.copy(xfit)
yp = yfit-(crval1_approx+(xp-1)*cdelt1_approx)
ax.set_xlim([1-0.05*naxis1_arc,naxis1_arc+0.05*naxis1_arc])
ax.set_xlabel('pixel position in arc spectrum')
ax.set_ylabel('residuals (Angstrom)')
ax.plot(xp, yp, 'go', label="identified lines")
for i in range(nfit):
ax.text(xp[i], yp[i], solution[ifit[i]]['type'], fontsize=15)
# polynomial fit
ax.plot(xpol,ypol,'c-', label="polynomial fit")
# unidentified lines
if len(list_unidentified) > 0:
ymin = np.concatenate((yp,ypol)).min()
ymax = np.concatenate((yp,ypol)).max()
for i in list_unidentified:
xp = np.array([xpos_arc[i], xpos_arc[i]])
yp = np.array([ymin, ymax])
if i == list_unidentified[0]:
ax.plot(xp,yp,'r--',label='unidentified lines')
else:
ax.plot(xp,yp,'r--')
# R, T and P lines
for val in zip(["R","T","P"],[list_R, list_T, list_P],['red','blue','magenta']):
list_X = val[1]
if len(list_X) > 0:
xp = np.array([])
yp = np.array([])
for i in list_X:
xp = np.append(xp, xpos_arc[i])
yp = np.append(yp, wv_master[solution[i]['id']])
yp -= crval1_approx+(xp-1)*cdelt1_approx
ax.plot(xp, yp, marker='x', markersize = 15, c=val[2],
linewidth=0, label='removed line ('+val[0]+')')
# shrink current axis by 20% and put a legend to the right
box = ax.get_position()
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
plt.show()
pause(lpause)
return coeff, crval1_approx, cdelt1_approx
In [33]:
coeff, crval1_approx, cdelt1_approx = \
fit_solution(wv_master,
xpos_arc,
solution,
naxis1_arc,
poly_degree_wfit = 2,
weighted = False,
ldebug=ldebug,
lplot=lplot,
lpause=lpause)
In [34]:
print('>>> original crval1, cdelt1...:', crval1_arc, cdelt1_arc)
print('>>> original c0, c1, c2.......:', c0_arc, c1_arc, c2_arc)
print('>>> approximate crval1, cdelt1:', crval1_approx, cdelt1_approx)
print('>>> fitted coefficients.......:', coeff)