In [ ]:
# Standard library
from os.path import join
import sys
if '/Users/adrian/projects/longslit/' not in sys.path:
sys.path.append('/Users/adrian/projects/longslit/')
# Third-party
from astropy.constants import c
import numpy as np
import matplotlib.pyplot as plt
import astropy.units as u
from astropy.io import fits
import astropy.modeling as mod
plt.style.use('apw-notebook')
%matplotlib inline
from scipy.interpolate import InterpolatedUnivariateSpline
from scipy.optimize import leastsq
import ccdproc
from ccdproc import CCDData, ImageFileCollection
from longslit.utils import gaussian_constant, gaussian_polynomial
In [ ]:
ccd_props = dict(gain=2.7*u.electron/u.adu, # from: http://mdm.kpno.noao.edu/index/MDM_CCDs.html
readnoise=7.9*u.electron)
In [ ]:
# ic = ImageFileCollection('/Users/adrian/projects/gaia-wide-binaries/data/mdm-spring-2017/n1/',
# filenames=['n1.00{:02d}.fit'.format(i) for i in range(1,23+1)])
ic = ImageFileCollection('/Users/adrian/projects/gaia-wide-binaries/data/mdm-spring-2017/n4/',
filenames=['n4.00{:02d}.fit'.format(i) for i in list(range(1,21))+[89]])
# ic = ImageFileCollection('/Users/adrian/projects/gaia-wide-binaries/data/mdm-spring-2017/n3/',
# filenames=['n3.0{:03d}.fit'.format(i) for i in list(range(1,21))+[122]])
In [ ]:
ic.summary['object']
In [ ]:
# get list of overscan-subtracted bias frames as 2D arrays
bias_list = []
for hdu, fname in ic.hdus(return_fname=True, imagetyp='BIAS'):
ccd = CCDData.read(join(ic.location, fname), unit='adu')
ccd = ccdproc.gain_correct(ccd, gain=ccd_props['gain'])
ccd = ccdproc.subtract_overscan(ccd, overscan=ccd[:,300:])
ccd = ccdproc.trim_image(ccd, fits_section="[1:300,:]")
bias_list.append(ccd)
In [ ]:
# combine all bias frames into a master bias frame
master_bias = ccdproc.combine(bias_list, method='average', clip_extrema=True,
nlow=1, nhigh=1, error=True)
In [ ]:
plt.hist(master_bias.data.ravel(), bins='auto');
plt.yscale('log')
plt.xlabel('master bias pixel values')
In [ ]:
plt.figure(figsize=(15,5))
plt.imshow(master_bias.data.T, cmap='plasma')
In [ ]:
# create a list of flat frames
flat_list = []
for hdu, fname in ic.hdus(return_fname=True, imagetyp='FLAT'):
ccd = CCDData.read(join(ic.location, fname), unit='adu')
ccd = ccdproc.gain_correct(ccd, gain=ccd_props['gain'])
ccd = ccdproc.ccd_process(ccd, oscan='[300:364,:]', trim="[1:300,:]",
master_bias=master_bias)
flat_list.append(ccd)
In [ ]:
# combine into a single master flat - use 3*sigma sigma-clipping
master_flat = ccdproc.combine(flat_list, method='average', sigma_clip=True,
low_thresh=3, high_thresh=3)
In [ ]:
plt.hist(master_flat.data.ravel(), bins='auto');
plt.yscale('log')
plt.xlabel('master flat pixel values')
In [ ]:
plt.figure(figsize=(15,5))
plt.imshow(master_flat.data.T, cmap='plasma')
In [ ]:
im_list = []
for hdu, fname in ic.hdus(return_fname=True, imagetyp='OBJECT'):
print(hdu.header['OBJECT'])
ccd = CCDData.read(join(ic.location, fname), unit='adu')
hdr = ccd.header
# make a copy of the object
nccd = ccd.copy()
# apply the overscan correction
poly_model = mod.models.Polynomial1D(2)
nccd = ccdproc.subtract_overscan(nccd, fits_section='[300:364,:]',
model=poly_model)
# apply the trim correction
nccd = ccdproc.trim_image(nccd, fits_section='[1:300,:]')
# create the error frame
nccd = ccdproc.create_deviation(nccd, gain=ccd_props['gain'],
readnoise=ccd_props['readnoise'])
# now gain correct
nccd = ccdproc.gain_correct(nccd, gain=ccd_props['gain'])
# correct for master bias frame
# - this does some crazy shit at the blue end, but we can live with it
nccd = ccdproc.subtract_bias(nccd, master_bias)
# correct for master flat frame
nccd = ccdproc.flat_correct(nccd, master_flat)
# comsic ray cleaning - this updates the uncertainty array as well
nccd = ccdproc.cosmicray_lacosmic(nccd, sigclip=8.)
# new_fname = 'p'+fname
# ccd.write(new_fname, overwrite=True)
# im_list.append(new_fname)
print(hdu.header['EXPTIME'])
break
In [ ]:
fig,axes = plt.subplots(3,1,figsize=(18,10),sharex=True,sharey=True)
axes[0].imshow(nccd.data.T, cmap='plasma')
axes[1].imshow(nccd.uncertainty.array.T, cmap='plasma')
axes[2].imshow(nccd.mask.T, cmap='plasma')
In [ ]:
n_trace_nodes = 32
In [ ]:
def fit_gaussian(pix, flux, flux_ivar, n_coeff=1, p0=None, return_cov=False):
if p0 is None:
p0 = [flux.max(), pix[flux.argmax()], 1.] + [0.]*(n_coeff-1) + [flux.min()]
def errfunc(p):
return (gaussian_polynomial(pix, *p) - flux) * np.sqrt(flux_ivar)
res = leastsq(errfunc, x0=p0, full_output=True)
p_opt = res[0]
cov_p = res[1]
ier = res[-1]
if ier > 4 or ier < 1:
raise RuntimeError("Failed to fit Gaussian to spectrum trace row.")
if return_cov:
return p_opt, cov_p
else:
return p_opt
In [ ]:
n_rows,n_cols = nccd.data.shape
row_idx = np.linspace(0, n_rows-1, n_trace_nodes).astype(int)
pix = np.arange(n_cols, dtype=float)
trace_amps = np.zeros_like(row_idx).astype(float)
trace_centroids = np.zeros_like(row_idx).astype(float)
trace_stddevs = np.zeros_like(row_idx).astype(float)
for i,j in enumerate(row_idx):
flux = nccd.data[j]
flux_err = nccd.uncertainty.array[j]
flux_ivar = 1/flux_err**2.
flux_ivar[~np.isfinite(flux_ivar)] = 0.
p_opt = fit_gaussian(pix, flux, flux_ivar)
trace_amps[i] = p_opt[0]
trace_centroids[i] = p_opt[1]
trace_stddevs[i] = p_opt[2]
In [ ]:
trace_func = InterpolatedUnivariateSpline(row_idx, trace_centroids)
trace_amp_func = InterpolatedUnivariateSpline(row_idx, trace_amps)
In [ ]:
trace_stddev = np.median(trace_stddevs)
In [ ]:
plt.hist(trace_stddevs, bins=np.linspace(0, 5., 16));
In [ ]:
plt.plot(row_idx, trace_centroids)
_grid = np.linspace(row_idx.min(), row_idx.max(), 1024)
plt.plot(_grid, trace_func(_grid), marker='', alpha=0.5)
plt.axvline(50)
plt.axvline(1650)
plt.xlabel('row index [pix]')
plt.ylabel('trace centroid [pix]')
In [ ]:
from scipy.special import wofz
from scipy.stats import scoreatpercentile
def voigt(x, amp, x_0, stddev, fwhm):
_x = x-x_0
z = (_x + 1j*fwhm/2.) / (np.sqrt(2.)*stddev)
return amp * wofz(z).real / (np.sqrt(2.*np.pi)*stddev)
In [ ]:
def psf_model(p, x):
amp, x_0, std_G, fwhm_L, C = p
return voigt(x, amp, x_0, std_G, fwhm_L) + C
def psf_chi(p, pix, flux, flux_ivar):
return (psf_model(p, pix) - flux) * np.sqrt(flux_ivar)
In [ ]:
i = 800 # MAGIC NUMBER
flux = nccd.data[i]
flux_err = nccd.uncertainty.array[i]
pix = np.arange(len(flux))
flux_ivar = 1/flux_err**2.
flux_ivar[~np.isfinite(flux_ivar)] = 0.
In [ ]:
p0 = [flux.max(), pix[np.argmax(flux)], 1., 1., scoreatpercentile(flux[flux>0], 16.)]
fig,axes = plt.subplots(2, 1, figsize=(10,6), sharex=True, sharey=True)
# data on both panels
for i in range(2):
axes[i].plot(pix, flux, marker='', drawstyle='steps-mid')
axes[i].errorbar(pix, flux, flux_err, marker='', linestyle='none', ecolor='#777777', zorder=-10)
# plot initial guess
grid = np.linspace(pix.min(), pix.max(), 1024)
axes[0].plot(grid, psf_model(p0, grid), marker='', alpha=1., zorder=10)
axes[0].set_yscale('log')
axes[0].set_title("Initial")
# fit parameters
p_opt,ier = leastsq(psf_chi, x0=p0, args=(pix, flux, flux_ivar))
print(ier)
# plot fit parameters
axes[1].plot(grid, psf_model(p_opt, grid), marker='', alpha=1., zorder=10)
axes[1].set_title("Fit")
fig.tight_layout()
In [ ]:
psf_p = dict()
psf_p['std_G'] = p_opt[2]
psf_p['fwhm_L'] = p_opt[3]
In [ ]:
def row_model(p, psf_p, x):
amp, x_0, C = p
return voigt(x, amp, x_0, stddev=psf_p['std_G'], fwhm=psf_p['fwhm_L']) + C
def row_chi(p, pix, flux, flux_ivar, psf_p):
return (row_model(p, psf_p, pix) - flux) * np.sqrt(flux_ivar)
In [ ]:
# This does PSF extraction
trace_1d = np.zeros(n_rows).astype(float)
flux_1d = np.zeros(n_rows).astype(float)
flux_1d_err = np.zeros(n_rows).astype(float)
sky_flux_1d = np.zeros(n_rows).astype(float)
sky_flux_1d_err = np.zeros(n_rows).astype(float)
for i in range(nccd.data.shape[0]):
flux = nccd.data[i]
flux_err = nccd.uncertainty.array[i]
flux_ivar = 1/flux_err**2.
flux_ivar[~np.isfinite(flux_ivar)] = 0.
p0 = [flux.max(), pix[np.argmax(flux)], scoreatpercentile(flux[flux>0], 16.)]
p_opt,p_cov,*_,mesg,ier = leastsq(row_chi, x0=p0, full_output=True,
args=(pix, flux, flux_ivar, psf_p))
if ier < 1 or ier > 4 or p_cov is None:
flux_1d[i] = np.nan
sky_flux_1d[i] = np.nan
print("Fit failed for {}".format(i))
continue
if test:
_grid = np.linspace(sub_pix.min(), sub_pix.max(), 1024)
model_flux = p_to_model(p_opt, shifts)(_grid)
# ----------------------------------
plt.figure(figsize=(8,6))
plt.plot(sub_pix, flux, drawstyle='steps-mid', marker='')
plt.errorbar(sub_pix, flux, 1/np.sqrt(flux_ivar), marker='', linestyle='none', ecolor='#666666', alpha=0.75)
plt.axhline(sky_opt[0])
plt.plot(_grid, model_flux, marker='')
plt.yscale('log')
flux_1d[i] = p_opt[0]
trace_1d[i] = p_opt[1]
sky_flux_1d[i] = p_opt[2]
# TODO: ignores centroiding covariances...
flux_1d_err[i] = np.sqrt(p_cov[0,0])
sky_flux_1d_err[i] = np.sqrt(p_cov[2,2])
In [ ]:
# clean up the 1d spectra
flux_1d_ivar = 1/flux_1d_err**2
sky_flux_1d_ivar = 1/sky_flux_1d_err**2
pix_1d = np.arange(len(flux_1d))
mask_1d = (pix_1d < 50) | (pix_1d > 1600)
flux_1d[mask_1d] = 0.
flux_1d_ivar[mask_1d] = 0.
sky_flux_1d[mask_1d] = 0.
sky_flux_1d_ivar[mask_1d] = 0.
In [ ]:
fig,all_axes = plt.subplots(2, 2, figsize=(12,12), sharex='row')
axes = all_axes[0]
axes[0].plot(flux_1d, marker='', drawstyle='steps-mid')
axes[0].errorbar(np.arange(n_rows), flux_1d, 1/np.sqrt(flux_1d_ivar), linestyle='none',
marker='', ecolor='#666666', alpha=1., zorder=-10)
axes[0].set_ylim(1e3, np.nanmax(flux_1d))
axes[0].set_yscale('log')
axes[0].axvline(halpha_idx, zorder=-10, color='r', alpha=0.1)
axes[1].plot(sky_flux_1d, marker='', drawstyle='steps-mid')
axes[1].errorbar(np.arange(n_rows), sky_flux_1d, 1/np.sqrt(sky_flux_1d_ivar), linestyle='none',
marker='', ecolor='#666666', alpha=1., zorder=-10)
axes[1].set_ylim(1e-1, np.nanmax(sky_flux_1d))
axes[1].set_yscale('log')
axes[1].axvline(halpha_idx, zorder=-10, color='r', alpha=0.1)
# Zoom in around Halpha
axes = all_axes[1]
slc = slice(halpha_idx-32, halpha_idx+32+1)
axes[0].plot(np.arange(n_rows)[slc], flux_1d[slc], marker='', drawstyle='steps-mid')
axes[0].errorbar(np.arange(n_rows)[slc], flux_1d[slc], flux_1d_err[slc], linestyle='none',
marker='', ecolor='#666666', alpha=1., zorder=-10)
axes[1].plot(np.arange(n_rows)[slc], sky_flux_1d[slc], marker='', drawstyle='steps-mid')
axes[1].errorbar(np.arange(n_rows)[slc], sky_flux_1d[slc], sky_flux_1d_err[slc], linestyle='none',
marker='', ecolor='#666666', alpha=1., zorder=-10)
fig.tight_layout()
In [ ]:
plt.figure(figsize=(12,5))
plt.plot(sky_flux_1d, marker='', drawstyle='steps-mid')
plt.errorbar(np.arange(n_rows), sky_flux_1d, 1/np.sqrt(sky_flux_1d_ivar), linestyle='none',
marker='', ecolor='#666666', alpha=1., zorder=-10)
# plt.ylim(1e3, np.nanmax(flux_1d))
plt.yscale('log')
plt.xlim(1000, 1300)
# plt.ylim(1e4, 4e4)
In [ ]:
In [ ]:
In [ ]:
In [ ]:
n_rows,n_cols = nccd.data.shape
pix = np.arange(n_cols, dtype=float)
In [ ]:
sqrt_2pi = np.sqrt(2*np.pi)
def gaussian1d(x, amp, mu, var):
return amp/(sqrt_2pi*np.sqrt(var)) * np.exp(-0.5 * (np.array(x) - mu)**2/var)
In [ ]:
def model(pix, sky_p, p0, other_p, shifts=None):
"""
parameters, p, are:
sky
mean0, log_amp0, log_var0
log_amp_m*, log_var_m* (at mean0 - N pix)
log_amp_p*, log_var_p* (at mean0 + N pix)
"""
n_other = len(other_p)
if n_other // 2 != n_other / 2:
raise ValueError("Need even number of other_p")
if shifts is None:
shifts = np.arange(-n_other/2, n_other/2+1)
shifts = np.delete(shifts, np.where(shifts == 0)[0])
assert len(other_p) == len(other_p)
# central model
mean0, log_amp0, log_var0 = np.array(p0).astype(float)
model_flux = gaussian1d(pix, np.exp(log_amp0), mean0, np.exp(log_var0))
for shift,pars in zip(shifts, other_p):
model_flux += gaussian1d(pix, np.exp(pars[0]), mean0 + shift, np.exp(pars[1]))
# l1 = Lorentz1D(x_0=mean0, amplitude=np.exp(sky_p[1]), fwhm=np.sqrt(np.exp(sky_p[2])))
# sky = sky_p[0] + l1(pix)
g = np.exp(sky_p[2])
sky = sky_p[0] + np.exp(sky_p[1]) * (np.pi*g * (1 + (pix-mean0)**2/g**2))**-1
return model_flux + sky
In [ ]:
def unpack_p(p):
sky = p[0:3]
p0 = p[3:6]
other_p = np.array(p[6:]).reshape(-1, 2)
return sky, p0, other_p
def p_to_model(p, shifts):
sky, p0, other_p = unpack_p(p)
for ln_amp in [p0[1]]+other_p[:,0].tolist():
if ln_amp > 15:
return lambda x: np.inf
for ln_var in [p0[2]]+other_p[:,1].tolist():
if ln_var < -1. or ln_var > 8:
return lambda x: np.inf
return lambda x: model(x, sky, p0, other_p, shifts)
def ln_likelihood(p, pix, flux, flux_ivar, shifts):
_model = p_to_model(p, shifts)
model_flux = _model(pix)
if np.any(np.isinf(model_flux)):
return np.inf
return np.sqrt(flux_ivar) * (flux - model_flux)
In [ ]:
fig,ax = plt.subplots(1,1,figsize=(10,8))
for i in np.linspace(500, 1200, 16).astype(int):
flux = nccd.data[i]
pix = np.arange(len(flux))
plt.plot(pix-trace_func(i), flux / trace_amp_func(i), marker='', drawstyle='steps-mid', alpha=0.25)
plt.xlim(-50, 50)
plt.yscale('log')
In [ ]:
flux = nccd.data[800]
plt.plot(pix, flux, marker='', drawstyle='steps-mid', alpha=0.25)
plt.yscale('log')
plt.xlim(flux.argmax()-25, flux.argmax()+25)
In [ ]:
In [ ]:
# This does PSF extraction, slightly wrong
# hyper-parameters of fit
# shifts = [-6, -4, -2, 2, 4., 6.]
shifts = np.linspace(-10, 10, 4).tolist()
flux_1d = np.zeros(n_rows).astype(float)
flux_1d_err = np.zeros(n_rows).astype(float)
sky_flux_1d = np.zeros(n_rows).astype(float)
sky_flux_1d_err = np.zeros(n_rows).astype(float)
test = False
# test = True
# for i in range(n_rows):
# for i in [620, 700, 780, 860, 920, 1000]:
for i in range(620, 1000):
# line_ctr0 = trace_func(i) # TODO: could use this to initialize?
flux = nccd.data[i,j1:j2]
flux_err = nccd.uncertainty.array[i,j1:j2]
flux_ivar = 1/flux_err**2.
flux_ivar[~np.isfinite(flux_ivar)] = 0.
# initial guess for least-squares
sky = [np.min(flux), 1E-2, 2.]
p0 = [pix[np.argmax(flux)], np.log(flux.max()), np.log(1.)]
other_p = [[np.log(flux.max()/100.), np.log(1.)]]*len(shifts)
_shifts = shifts + [0.,0.]
other_p += [[np.log(flux.max()/100.), np.log(4.)]]*2
# sanity check
ll = ln_likelihood(sky + p0 + other_p, sub_pix, flux, flux_ivar, shifts=_shifts).sum()
assert np.isfinite(ll)
p_opt,p_cov,*_,mesg,ier = leastsq(ln_likelihood, x0=sky + p0 + np.ravel(other_p).tolist(), full_output=True,
args=(pix, flux, flux_ivar, shifts))
if ier < 1 or ier > 4:
flux_1d[i] = np.nan
sky_flux_1d[i] = np.nan
print("Fit failed for {}".format(i))
continue
# raise ValueError("Fit failed for {}".format(i))
sky_opt, p0_opt, other_p_opt = unpack_p(p_opt)
amps = np.exp(other_p_opt[:,0])
stds = np.exp(other_p_opt[:,1])
if test:
_grid = np.linspace(sub_pix.min(), sub_pix.max(), 1024)
model_flux = p_to_model(p_opt, shifts)(_grid)
# ----------------------------------
plt.figure(figsize=(8,6))
plt.plot(sub_pix, flux, drawstyle='steps-mid', marker='')
plt.errorbar(sub_pix, flux, 1/np.sqrt(flux_ivar), marker='', linestyle='none', ecolor='#666666', alpha=0.75)
plt.axhline(sky_opt[0])
plt.plot(_grid, model_flux, marker='')
plt.yscale('log')
flux_1d[i] = np.exp(p0_opt[1]) + amps.sum()
sky_flux_1d[i] = sky_opt[0]
In [ ]:
fig,all_axes = plt.subplots(2, 2, figsize=(12,12), sharex='row')
axes = all_axes[0]
axes[0].plot(flux_1d, marker='', drawstyle='steps-mid')
# axes[0].errorbar(np.arange(n_rows), flux_1d, flux_1d_err, linestyle='none',
# marker='', ecolor='#666666', alpha=1., zorder=-10)
# axes[0].set_ylim(1e2, np.nanmax(flux_1d))
axes[0].set_ylim(np.nanmin(flux_1d[flux_1d!=0.]), np.nanmax(flux_1d))
# axes[0].set_yscale('log')
axes[0].axvline(halpha_idx, zorder=-10, color='r', alpha=0.1)
axes[1].plot(sky_flux_1d, marker='', drawstyle='steps-mid')
# axes[1].errorbar(np.arange(n_rows), sky_flux_1d, sky_flux_1d_err, linestyle='none',
# marker='', ecolor='#666666', alpha=1., zorder=-10)
axes[1].set_ylim(-5, np.nanmax(sky_flux_1d))
axes[1].axvline(halpha_idx, zorder=-10, color='r', alpha=0.1)
# Zoom in around Halpha
axes = all_axes[1]
slc = slice(halpha_idx-32, halpha_idx+32+1)
axes[0].plot(np.arange(n_rows)[slc], flux_1d[slc], marker='', drawstyle='steps-mid')
# axes[0].errorbar(np.arange(n_rows)[slc], flux_1d[slc], flux_1d_err[slc], linestyle='none',
# marker='', ecolor='#666666', alpha=1., zorder=-10)
axes[1].plot(np.arange(n_rows)[slc], sky_flux_1d[slc], marker='', drawstyle='steps-mid')
# axes[1].errorbar(np.arange(n_rows)[slc], sky_flux_1d[slc], sky_flux_1d_err[slc], linestyle='none',
# marker='', ecolor='#666666', alpha=1., zorder=-10)
fig.tight_layout()
For observed counts $C$ in the source aperture, and observed counts $B$ in the background aperture, $s$ is the true source counts, and $b$ is the background density, and there are $N_S$ pixels in the source aperture, $N_B$ pixels in the background aperture:
$$ \hat{s} = C - B\,\frac{N_S}{N_B} \\ \hat{b} = \frac{B}{N_B - N_S} \\ $$$$ \sigma^2_s = C + B\,\frac{N_S^2}{N_B^2} \\ \sigma^2_b = \frac{B}{N_B} $$https://arxiv.org/pdf/1410.2564.pdf
In [ ]:
# aperture_width = 8 # pixels
aperture_width = int(np.ceil(3*trace_stddev))
sky_offset = 8 # pixels
sky_width = 16 # pixels
In [ ]:
# # estimated from ds9: counts in 1 pixel of spectrum, background counts in 2 pixels
# C = 11000.
# B = 800.
# N_S = 1
# N_B = 2
# s_hat = C - B*N_S/N_B
# b_hat = B / (N_B-N_S)
# s_var = C + B*(N_S/N_B)**2
# b_var = B / N_B
In [ ]:
# This does aperture extraction, but wrong!
flux_1d = np.zeros(n_rows).astype(float)
flux_1d_err = np.zeros(n_rows).astype(float)
sky_flux_1d = np.zeros(n_rows).astype(float)
sky_flux_1d_err = np.zeros(n_rows).astype(float)
source_mask = np.zeros_like(nccd.data).astype(bool)
sky_mask = np.zeros_like(nccd.data).astype(bool)
# # HACK: uniform aperture down the CCD
# _derp = trace_func(np.arange(n_rows))
# j1 = int(np.floor(_derp.min()))
# j2 = int(np.ceil(_derp.max()))
for i in range(n_rows):
line_ctr = trace_func(i)
# source aperture bool mask
j1 = int(np.floor(line_ctr-aperture_width/2))
j2 = int(np.ceil(line_ctr+aperture_width/2)+1)
source_mask[i,j1:j2] = 1
# sky aperture bool masks
s1 = int(np.floor(j1 - sky_offset - sky_width))
s2 = int(np.ceil(j1 - sky_offset + 1))
sky_mask[i,s1:s2] = 1
s1 = int(np.floor(j2 + sky_offset))
s2 = int(np.ceil(j2 + sky_offset + sky_width + 1))
sky_mask[i,s1:s2] = 1
source_flux = nccd.data[i,source_mask[i]]
source_flux_ivar = 1 / nccd.uncertainty.array[i,source_mask[i]]**2
source_flux_ivar[~np.isfinite(source_flux_ivar)] = 0.
sky_flux = nccd.data[i,sky_mask[i]]
sky_flux_ivar = 1 / nccd.uncertainty.array[i,sky_mask[i]]**2
sky_flux_ivar[~np.isfinite(sky_flux_ivar)] = 0.
C = np.sum(source_flux_ivar*source_flux) / np.sum(source_flux_ivar)
B = np.sum(sky_flux_ivar*sky_flux) / np.sum(sky_flux_ivar)
N_S = source_mask[i].sum()
N_B = sky_mask[i].sum()
s_hat = C - B * N_S / N_B
b_hat = B / (N_B - N_S)
s_var = C + B * (N_S / N_B)**2
b_var = B / N_B
flux_1d[i] = s_hat
flux_1d_err[i] = np.sqrt(s_var)
sky_flux_1d[i] = b_hat
sky_flux_1d_err[i] = np.sqrt(b_var)
In [ ]:
# approximate
halpha_idx = 686
TODO: discontinuities are from shifting aperture -- figure out a way to have a constant aperture?
In [ ]:
In [ ]:
x = np.linspace(-10.2, 11.1, 21)
y = gaussian_polynomial(x, 100., 0.2, 1., 0.4, 15.) \
+ gaussian_polynomial(x, 10., 0.3, 2., 0.)
# y = gaussian_polynomial(x, 100., 0.2, 1., 0.) \
# + gaussian_polynomial(x, 100., 2., 2., 0.)
y_err = np.full_like(y, 1.)
y_ivar = 1/y_err**2
# y_ivar[8] = 0.
y = np.random.normal(y, y_err)
In [ ]:
plt.errorbar(x, y, y_err)
In [ ]:
g1 = mod.models.Gaussian1D(amplitude=y.max())
g2 = mod.models.Gaussian1D(amplitude=y.max()/10.)
g1.amplitude.bounds = (0, 1E10)
g2.amplitude.bounds = (0, 1E10)
# g3 = mod.models.Gaussian1D()
p1 = mod.models.Polynomial1D(3)
full_model = g1+g2+p1
In [ ]:
fitter = mod.fitting.LevMarLSQFitter()
fit_model = fitter(full_model, x, y, weights=1/y_err)
In [ ]:
plt.errorbar(x, y, y_err, linestyle='none', marker='o')
_grid = np.linspace(x.min(), x.max(), 256)
plt.plot(_grid, fit_model(_grid), marker='')
In [ ]:
fit_model
In [ ]:
fit_model.amplitude_0
In [ ]:
fit_model.amplitude_1
In [ ]:
In [ ]:
aperture_width = 250
i = 1200
line_ctr = trace_func(i)
# j1 = int(np.floor(line_ctr-aperture_width/2))
# j2 = int(np.ceil(line_ctr+aperture_width/2)+1)
j1 = 0
j2 = 300
print(j1, j2)
In [ ]:
_grid = np.linspace(sub_pix.min(), sub_pix.max(), 1024)
model_flux = p_to_model(p_opt, shifts)(_grid)
# ----------------------------------
plt.figure(figsize=(8,6))
plt.plot(sub_pix, flux, drawstyle='steps-mid', marker='')
plt.errorbar(sub_pix, flux, 1/np.sqrt(flux_ivar), marker='', linestyle='none', ecolor='#666666', alpha=0.75)
plt.axhline(sky_opt[0])
plt.plot(_grid, model_flux, marker='')
plt.yscale('log')
In [ ]:
In [ ]:
from astropy.modeling.functional_models import Lorentz1D, Gaussian1D, Voigt1D, Const1D
from astropy.modeling.fitting import LevMarLSQFitter
In [ ]:
def _pars_to_model(p):
mean = p[0]
log_amp_V, log_fwhm_L = p[1:3]
log_amp_G1, log_var1, log_amp_G2, log_var2 = p[3:-1]
C = p[-1]
# l = Voigt1D(x_0=mean, amplitude_L=np.exp(log_amp_V),
# fwhm_L=np.exp(log_fwhm_L),
# fwhm_G=np.exp(log_fwhm_G)) \
# + Gaussian1D(amplitude=np.exp(log_amp_G1), mean=mean, stddev=np.sqrt(np.exp(log_var1))) \
# + Gaussian1D(amplitude=np.exp(log_amp_G2), mean=mean, stddev=np.sqrt(np.exp(log_var2))) \
# + Const1D(amplitude=C)
l = Lorentz1D(x_0=mean, amplitude=np.exp(log_amp_V), fwhm=np.exp(log_fwhm_L)) \
+ Gaussian1D(amplitude=np.exp(log_amp_G1), mean=mean, stddev=np.sqrt(np.exp(log_var1))) \
+ Gaussian1D(amplitude=np.exp(log_amp_G2), mean=mean, stddev=np.sqrt(np.exp(log_var2))) \
+ Const1D(amplitude=C)
return l
def test(p, pix, flux, flux_ivar):
l = _pars_to_model(p)
return (flux - l(pix)) * np.sqrt(flux_ivar)
In [ ]:
derp,ier = leastsq(test, x0=[np.argmax(flux),
np.log(np.max(flux)), 1.,
np.log(np.max(flux)/10.), 5.5,
np.log(np.max(flux)/25.), 7.5,
np.min(flux)],
args=(sub_pix, flux, flux_ivar))
In [ ]:
l_fit = _pars_to_model(derp)
In [ ]:
plt.figure(figsize=(8,6))
plt.plot(sub_pix, flux, drawstyle='steps-mid', marker='')
plt.errorbar(sub_pix, flux, 1/np.sqrt(flux_ivar), marker='', linestyle='none', ecolor='#666666', alpha=0.75)
plt.plot(_grid, l_fit(_grid), marker='')
plt.yscale('log')
In [ ]:
plt.figure(figsize=(8,6))
plt.plot(sub_pix, (flux-l_fit(sub_pix))/(1/np.sqrt(flux_ivar)), drawstyle='steps-mid', marker='')
# plt.errorbar(sub_pix, (flux-l_fit(sub_pix))/(, 1/np.sqrt(flux_ivar),
# marker='', linestyle='none', ecolor='#666666', alpha=0.75)
In [ ]: