In [ ]:
# from IPython.display import HTML
# HTML('''<script>
# code_show=true;
# function code_toggle() {
# if (code_show){
# $('div.input').hide();
# } else {
# $('div.input').show();
# }
# code_show = !code_show
# }
# $( document ).ready(code_toggle);
# </script>
# <form action="javascript:code_toggle()"><input type="submit" value="Toggle ON/OFF raw code cells."></form>''')
Here we analyse some photometric measurements of CS82' images. The entire dataset is composed by 177 catalogs containing Sextractor's output for each tile of the CFHT survey over Stripe82.
For each tile/catalog we are going to:
Table of Contents
In [ ]:
tilename = 'S82m0m_y.V2.7A.swarp.cut.deV.fit'
In [ ]:
import astropy
import pandas
import bokeh
import numpy
from bokeh.io import output_notebook
output_notebook()
%matplotlib inline
import seaborn
seaborn.set()
from matplotlib import pyplot as plt
The contents of the (tile) FITS file follows:
In [ ]:
from astropy.io import fits
hdul = fits.open(tilename)
hdul.info()
The Data Unit of our interest is the number 2
.
In [ ]:
#TODO: use booq
from astropy.table import Table
cat = Table.read(tilename,format='fits',hdu=2)
cat.colnames
In [ ]:
flgs = pandas.Series(cat['FLAGS']).value_counts().to_frame(name='COUNTS').sort_index()
flgs.index.name = 'FLAGS'
print flgs
del flgs
In [ ]:
_ = seaborn.countplot(cat['FLAGS'])
In [ ]:
flgs = pandas.Series(cat['FLAGS_WEIGHT']).value_counts().to_frame(name='COUNTS').sort_index()
flgs.index.name = 'FLAGS_WEIGHT'
print flgs
del flgs
In [ ]:
_ = seaborn.countplot(cat['FLAGS_WEIGHT'])
In [ ]:
RA_col = 'ALPHA_J2000'
DEC_col = 'DELTA_J2000'
In [ ]:
from matplotlib import pyplot as plt
fig, ax = plt.subplots()
fig.set_size_inches(8,8)
_ = ax.scatter(x=cat[RA_col], y=cat[DEC_col], marker='.')
_ = ax.set_xlabel('RA (deg)')
_ = ax.set_ylabel('Dec (deg)')
_title = 'Footprint: {}'.format(tilename)
_ = ax.set_title(_title)
In [ ]:
# from bokeh.plotting import figure
# p = figure()
# p.circle(x=cat[RA_col], y=cat[DEC_col])
# p.xaxis.axis_label = 'RA'
# p.yaxis.axis_label = 'Dec'
# from bokeh.io import show
# show(p)
In [ ]:
mag_col = 'MAG_PSF'
err_col = 'MAGERR_PSF'
In [ ]:
cat2 = cat[mag_col,err_col]
mags = cat2.to_pandas()
nil_indxs = mags[mag_col]==99
mags.loc[nil_indxs,mag_col] = None
nil_indxs = mags[err_col]==99
mags.loc[nil_indxs,err_col] = None
In [ ]:
def scatter_mags(mag_col,err_col,mags):
from bokeh.plotting import figure
p = figure()
p.circle(x=mags[err_col], y=mags[mag_col])
p.xaxis.axis_label = err_col
p.yaxis.axis_label = mag_col
_title = '{}: measurement vs error'.format(mag_col)
p.title.text = _title
return p
from bokeh.io import show
show(scatter_mags(mag_col,err_col,mags))
Looks like we have a logarithmic relation following this data: $$ y = \alpha + \beta * log(\gamma + x) $$
In other words, as the photometric measurement goes to fainter objects the error goes exponencially high.
We can explore this relation to better define our sample based on such measurement errors.
In [ ]:
mags_sort_byErr = mags.sort_values(by=err_col)
mags_sort_byErr.dropna(inplace=True)
In [ ]:
def func(t, a, b):
import numpy as np
return a + b * np.log(t)
def scatter_mags_fit(mag_col,err_col,mags_sort_byErr):
p = scatter_mags(mag_col,err_col,mags_sort_byErr)
def fit_log_curve(x,y):
import numpy as np
from scipy.optimize import curve_fit
popt,pcov = curve_fit(func, x, y)
x = np.linspace(x.min(),x.max(),100)
y = func(x,*popt)
return x,y,popt
x,y,popt = fit_log_curve(mags_sort_byErr[err_col], mags_sort_byErr[mag_col])
p.line(x=x, y=y, line_color='red', legend='{:.3f} + {:.3f} * log(x)'.format(*popt), line_width=2)
p.legend.location = 'bottom_right'
return p
show(scatter_mags_fit(mag_col,err_col,mags_sort_byErr))
If we take the simplest relation between signal-to-noise ratio (SNR) and Magerr: $$ SNR = \frac{1}{Mag{err}} $$
We can now define a cut on the limiting magnitude based on this definition of signal-to-noise ratio.
For example, for this tile, if I want a SNR of 5, corresponding to $Mag_{err} = 0.2$, I should go no further than $Mag \approx 24.4$.
In [ ]:
from scipy.optimize import curve_fit
popt,pcov = curve_fit(func, mags_sort_byErr[err_col], mags_sort_byErr[mag_col])
MagErr_SNR = 0.2
Mag_SNR = func(MagErr_SNR,*popt)
print 'Magnitude limit: {:.3f}, corresponding to an error of {}'.format(Mag_SNR,MagErr_SNR)
In [ ]:
def scatter_mags_fit_annot(mag_col,err_col,mags_sort_byErr, MagErr_SNR, MagErr_SNR_2):
p = scatter_mags_fit(mag_col,err_col,mags_sort_byErr)
Mag_SNR = func(MagErr_SNR,*popt)
from bokeh.models import Span
# line_mag_snr = Span(dimension='width', location=Mag_SNR, line_color='red', line_dash='dashed', line_width=1)
line_magerr_snr = Span(dimension='height', location=MagErr_SNR, line_color='red', line_dash='dashed', line_width=1)
# p.add_layout(line_mag_snr)
p.add_layout(line_magerr_snr)
from bokeh.models import Label
_lbl = ' {:.2f}, {:.2f} '.format(MagErr_SNR,Mag_SNR)
label_mag_snr = Label(x=MagErr_SNR+0.01, y=Mag_SNR-1, text=_lbl, text_color='red',
border_line_color='black', border_line_alpha=1.0,
background_fill_color='white', background_fill_alpha=1.0)
p.add_layout(label_mag_snr)
# Second line
line_magerr_snr_2 = Span(dimension='height', location=MagErr_SNR_2, line_color='red', line_dash='dashed', line_width=1)
p.add_layout(line_magerr_snr_2)
Mag_SNR_2 = func(MagErr_SNR_2,*popt)
_lbl = ' {:.2f}, {:.2f} '.format(MagErr_SNR_2,Mag_SNR_2)
label_mag_snr_2 = Label(x=MagErr_SNR_2+0.01, y=Mag_SNR_2-1, text=_lbl, text_color='red',
border_line_color='black', border_line_alpha=1.0,
background_fill_color='white', background_fill_alpha=1.0)
p.add_layout(label_mag_snr_2)
return p
MagErr_SNR_2 = 0.5
show(scatter_mags_fit_annot(mag_col,err_col,mags_sort_byErr, MagErr_SNR, MagErr_SNR_2))
The goal now is to define the limiting magnitude in a different way, we will consider the variation of magnitude-error across the same-valued magnitude entries. The approach computes mean and standard-deviation of the magnitude-error in a set of magnitude-binned measurements.
In [ ]:
# nbins = int(round(np.power(len(mags_sort_byErr),1/3.)))
nbins = 20
bin_categories,bins = pandas.qcut(mags_sort_byErr[mag_col], nbins, retbins=True, precision=2)
mags_sort_byErr['bins'] = bin_categories
del nbins,bin_categories
In [ ]:
mags_sort_byErr.groupby('bins').describe()
In [ ]:
%matplotlib inline
import seaborn as sns
sns.set()
from matplotlib import pyplot as plt
In [ ]:
ax1 = mags_sort_byErr.boxplot(by='bins', column=err_col, rot=90, sym='*')
ax1.axhline(MagErr_SNR, color='red', ls='--')
ax1.axhline(MagErr_SNR_2, color='red', ls='--')
ax2 = mags_sort_byErr.boxplot(by='bins', column=mag_col, rot=90, whis='range')
In [ ]:
groups = []
for n,g in mags_sort_byErr.groupby('bins'):
print n,g.describe()
q = g[err_col]
dq = q.quantile(0.75)-q.quantile(0.25)
mq = q.quantile(0.5)
if mq-1.5*dq < MagErr_SNR and mq+1.5*dq > MagErr_SNR:
groups.append(g)
print len(groups)
In [ ]:
print sum(g[mag_col].mean() for g in groups)/len(groups)
In [ ]:
def func(t, a, b):
import numpy as np
return a*t + b
def fit_linear_curve(x,y):
from scipy.optimize import curve_fit
popt,pcov = curve_fit(func, x, y)
return popt
import numpy as np
x = np.array([ g[err_col] for g in groups ]).flatten()
y = np.array([ g[mag_col] for g in groups ]).flatten()
try:
popt = fit_linear_curve(x,y)
y = func(MagErr_SNR,*popt)
except:
y = None
print '{:.3f}: {}'.format(MagErr_SNR,y)
In [ ]:
def mag_limit_boxplot(MagErr_SNR, mags_sort_byErr, mag_col):
def func(t, a, b):
import numpy as np
return a*t + b
def fit_linear_curve(x,y):
from scipy.optimize import curve_fit
popt,pcov = curve_fit(func, x, y)
return popt
groups = []
for n,g in mags_sort_byErr.groupby('bins'):
q = g[mag_col]
dq = q.quantile(0.75)-q.quantile(0.25)
mq = q.quantile(0.5)
if mq-1.5*dq < MagErr_SNR and mq+1.5*dq > MagErr_SNR:
groups.append(g)
import numpy as np
x = np.array([ g[err_col] for g in groups ]).flatten()
y = np.array([ g[mag_col] for g in groups ]).flatten()
popt = fit_linear_curve(x,y)
y = func(MagErr_SNR,*popt)
return y
try:
mag_limit = mag_limit_boxplot(MagErr_SNR_2, mags_sort_byErr, err_col)
except:
mag_limit = None
print '{:.3f}: {}'.format(MagErr_SNR_2,mag_limit)
In [ ]:
def func_log(t, a, b):
import numpy as np
return a + b * np.log(t)
def scatter_mags_fit_annot_2(mag_col,err_col,mags_sort_byErr, MagErr_SNR, MagErr_SNR_2):
def scatter_mags_fit(mag_col,err_col,mags_sort_byErr):
def scatter_mags(mag_col,err_col,mags):
from matplotlib import pyplot as plt
plt.scatter(mags[err_col], mags[mag_col], marker='.', alpha=0.5)
plt.xlabel(err_col)
plt.ylabel(mag_col)
_title = '{}: measurement vs error'.format(mag_col)
plt.title(_title)
return plt
plt = scatter_mags(mag_col,err_col,mags_sort_byErr)
plt.hold(True)
def fit_log_curve(x,y):
import numpy as np
from scipy.optimize import curve_fit
popt,pcov = curve_fit(func_log, x, y)
x = np.linspace(x.min(),x.max(),100)
y = func_log(x,*popt)
return x,y,popt
x,y,popt = fit_log_curve(mags_sort_byErr[err_col], mags_sort_byErr[mag_col])
plt.plot(x, y, linestyle='solid', color='red', lw=1)#, legend='{:.3f} + {:.3f} * log(x)'.format(*popt))#, line_width=2)
# p.legend.location = 'bottom_right'
return plt,popt
plt,popt = scatter_mags_fit(mag_col,err_col,mags_sort_byErr)
Mag_SNR = func_log(MagErr_SNR,*popt)
print Mag_SNR, MagErr_SNR
plt.axvline(MagErr_SNR, color='red', linestyle='dotted')
from bokeh.models import Label
_lbl = ' {:.2f}, {:.2f} '.format(MagErr_SNR,Mag_SNR)
plt.text(MagErr_SNR+0.01, Mag_SNR-5, _lbl, bbox=dict(facecolor='white', alpha=0.9))
# Second line
plt.axvline(MagErr_SNR_2, color='red', linestyle='dotted')
Mag_SNR_2 = func_log(MagErr_SNR_2,*popt)
_lbl = ' {:.2f}, {:.2f} '.format(MagErr_SNR_2,Mag_SNR_2)
plt.text(MagErr_SNR_2+0.01, Mag_SNR_2-5, _lbl, bbox=dict(facecolor='white', alpha=0.9))
return plt
MagErr_SNR_2 = 0.5
plt = scatter_mags_fit_annot_2(mag_col,err_col,mags_sort_byErr, MagErr_SNR, MagErr_SNR_2)
In [ ]:
def func(t, a, b):
import numpy as np
return a + b * np.log(t)
def scatter_mags_fit(mag_col,err_col,mags_sort_byErr):
def scatter_mags(mag_col,err_col,mags):
from bokeh.plotting import figure
p = figure()# tools='pan,box_zoom,wheel_zoom,crosshair,hover,resize,reset' )
p.circle(x=mags[err_col], y=mags[mag_col], fill_color='#3A5785', fill_alpha=0.1, line_color=None)
p.xaxis.axis_label = err_col
p.yaxis.axis_label = mag_col
_title = '{}: measurement vs error'.format(mag_col)
p.title.text = _title
return p
p = scatter_mags(mag_col,err_col,mags_sort_byErr)
def fit_log_curve(x,y):
import numpy as np
from scipy.optimize import curve_fit
popt,pcov = curve_fit(func, x, y)
print 'Cov:\n',pcov
x = np.linspace(x.min(),x.max(),1000)
y = func(x,*popt)
return x,y,popt
x,y,popt = fit_log_curve(mags_sort_byErr[err_col], mags_sort_byErr[mag_col])
ln = p.line(x=x, y=y, line_color='red', legend='{:.3f} + {:.3f} * log(x)'.format(*popt), line_width=2)
from bokeh.models import HoverTool
hover = HoverTool(tooltips=[('mag','@y'),('err','@x')], renderers=[ln], mode='vline')
p.add_tools(hover)
p.legend.location = 'bottom_right'
return p,popt
from bokeh.models import CrosshairTool,HoverTool
p,popt = scatter_mags_fit(mag_col,err_col,mags_sort_byErr)
cross = CrosshairTool(dimensions = 'height')
p.add_tools(cross)
# p.select(HoverTool).mode = 'vline'
# p.select(HoverTool).tooltips = [('Mag_lim = ','func($x)')]
show(p)
Let us now assemble all steps above in a pipeline, to be run for all and each image in the CS82 archive.
For each image, the pipeline should:
FLAGS
>0ALPHA_J2000
,DELTA_J2000
)FLAGS
histogram{MAG,MAGERR}_{AUTO,MODEL,PSF,APER,SPHEROID,HYBRID,BEST,PETRO,ISO,ISOCOR}
mag_type | SNR | MAG_ERROR | MAG_LIMIT_CURVEFIT | MAG_LIMIT_BOXPLOT |
---|---|---|---|---|
AUTO | 5 | 0.2 | 24.44 | 24.59 |
AUTO | 2 | 0.5 | 25.69 | None |
BEST | 5 | 0.2 | 24.47 | 24.6 |
BEST | 2 | 0.5 | 25.71 | None |
PSF | 5 | 0.2 | 24.95 | 24.82* |
PSF | 2 | 0.5 | 25.99 | None |
... |
This table should be named ${tilename}_magLimits.csv
In [1]:
class Plots:
'''
Manage data source and plot creation
'''
@staticmethod
def scatter(x, y, vlines=None, fit_func=None, title=None, savefig=False):
import pandas
df = pandas.DataFrame({x.name:x, y.name:y})
def scatter_mags(x,y,df):
from matplotlib import pyplot as plt
plt.scatter(df[x.name], df[y.name], marker='.', alpha=0.1, lw=None)
plt.xlabel(x.name)
plt.ylabel(y.name)
plt.title(title)
return plt
plt = scatter_mags(x,y,df)
plt.hold(True)
def fit_curve(x,y,fit_func):
import numpy as np
from scipy.optimize import curve_fit
popt,pcov = curve_fit(fit_func, x, y)
x = np.linspace(x.min(),x.max(),100)
y = fit_func(x,*popt)
return x,y,popt
if fit_func != None:
x_fit, y_fit, popt = fit_curve( df[x.name], df[y.name], fit_func )
plt.plot( x_fit, y_fit, linestyle='solid', color='red', lw=1 )
vlines_intersec = None
if vlines:
vlines_intersec = []
for x_line in vlines:
y_val = fit_func(x_line,*popt)
vlines_intersec.append(y_val)
plt.axvline(x_line, color='red', linestyle='dotted')
from bokeh.models import Label
_lbl = ' {:.2f}, {:.2f} '.format(x_line,y_val)
plt.text(x_line+0.01, y_val-5, _lbl, bbox=dict(facecolor='white', alpha=0.9))
del df
plt.hold(False)
return fig,vlines_intersec
@staticmethod
def boxplot(column, by, hlines, nbins=20, fit_func=None, savefig=False):
'''
Boxplot distributions of 'column' splitted by column 'by' in 'nbins'
'hlines' is a list of values within 'column' limits, where 'column'
should be annotated.
'''
import pandas
df = pandas.DataFrame({column.name:column, by.name:by})
import pandas
bin_categories,bins = pandas.qcut(df[by.name], nbins, retbins=True, precision=2)
bincol = '_'+by.name+'_'
df[bincol] = bin_categories
from matplotlib import pyplot as plt
plt.hold(True)
# fig,ax = plt.subplots()
ax = df.boxplot(column.name,by=bincol, rot=90, sym='*')#,ax=ax)
# ---
def fit_linear_curve(x,y,func):
from scipy.optimize import curve_fit
popt,pcov = curve_fit(func, x, y)
return popt
# ---
hlines_intersec = None
if hlines:
hlines_intersec = []
for hline in hlines:
ax.axhline(hline, color='red', ls='dotted')
def _within_quartil(q):
dq = q.quantile(0.75)-q.quantile(0.25)
mq = q.quantile(0.5)
return mq-1.5*dq < hline and mq+1.5*dq > hline
filt_df = df.groupby(bincol).filter( lambda df,col=column:_within_quartil(df[col.name]) )
y_val = None
h_lbl = ' {} : {} '.format(hline,y_val)
if len(filt_df) > 0:
x = filt_df[column.name]
y = filt_df[by.name]
if fit_func is None:
y_val = y.mean()
else:
popt = fit_linear_curve(x,y,fit_func)
y_val = fit_func(hline,*popt)
h_lbl = ' {:.2f} : {:.2f} '.format(hline,y_val)
ax.text(1, hline+hline*0.1, h_lbl, bbox=dict(facecolor='white', alpha=0.9))
hlines_intersec.append(y_val)
del df
plt.hold(False)
fig = ax.figure
return fig,hlines_intersec
@staticmethod
def footprint(x, y, title, savefig=False):
from matplotlib import pyplot as plt
fig, ax = plt.subplots()
fig.set_size_inches(8,8)
_ = ax.scatter(x=x, y=y, marker='.')
_ = ax.set_xlabel('RA (deg)')
_ = ax.set_ylabel('Dec (deg)')
_title = 'Footprint: {}'.format(tilename)
_ = ax.set_title(_title)
# fig,_ = Plots.scatter(x, y, title=title)
# fig.set_size_inches(8,8)
return fig
@staticmethod
def histogram(column,savefig=False):
import seaborn
ax = seaborn.countplot(column)
fig = ax.figure
return fig
class CS82(object):
'''
Holds methods to extract and produce plots to CS82 image
'''
def __init__(self,tilename):
self._catalog = self.open_file(tilename)
self._tilename = tilename
self._columns_radec = self.read_position_columns()
self._columns_mag = self.read_mag_columns()
self._columns_flags = self.read_flags_columns()
self.init_plots()
self._table = None
self._magtype = None
def add_magLims(self,dict_mag):
import pandas
for idx,cols in dict_mag.iteritems():
_vs = cols.values()[0]
df_new = pandas.DataFrame(cols,index=[idx]*len(_vs))
if self._table is None:
self._table = df_new
else:
self._table = pandas.concat([self._table,df_new])
@property
def magLims(self):
return self._table
@property
def filename(self):
return self._tilename
@property
def ra(self):
return self._catalog['ALPHA_J2000']
@property
def dec(self):
return self._catalog['DELTA_J2000']
@property
def flags(self):
return self._catalog['FLAGS']
@property
def mag(self):
assert self._magtype
col = 'MAG_'+self._magtype
assert col in self._catalog.columns
return self._catalog[col]
@property
def magerr(self):
assert self._magtype
col = 'MAGERR_'+self._magtype
assert col in self._catalog.columns
return self._catalog[col]
def set_magtype(self,mtype):
self._magtype = mtype
@staticmethod
def open_file(tilename):
from astropy.table import Table
cat = Table.read(tilename,format='fits',hdu=2)
return cat
def init_plots(self):
_columns = self._columns_flags[:]
_columns.extend(self._columns_radec)
_columns.extend([ 'MAG_{}'.format(c) for c in self._columns_mag ])
_columns.extend([ 'MAGERR_{}'.format(c) for c in self._columns_mag ])
cat = self._catalog[_columns]
df = cat.to_pandas()
for col in _columns:
nil_indxs = df[col]==99
df.loc[nil_indxs,col] = None
df.dropna(inplace=True)
self._catalog = df
def read_position_columns(self):
return ['ALPHA_J2000','DELTA_J2000']
def read_mag_columns(self):
colnames = filter(lambda s:'MAG_' in s, self._catalog.colnames)
colnames = filter(lambda c:self._catalog[c].ndim == 1, colnames)
mag_types = [ s[4:] for s in colnames ]
return mag_types
def read_flags_columns(self):
return ['FLAGS']
In [6]:
tiles_list = ['S82m0m_y.V2.7A.swarp.cut.deV.fit']
mags_list = ['AUTO','BEST','MODEL','PSF']
def _savefig_filename_(tilename,plot_type):
assert isinstance(plot_type,(str,unicode))
from os.path import basename,splitext
rootname = splitext(basename(tilename))[0]
filename = '{}_{}.png'.format(rootname,plot_type)
return filename
def _csv_filename_(tilename):
from os.path import basename,splitext
rootname = splitext(basename(tilename))[0]
filename = '{}_{}.csv'.format(rootname,'magLims')
return filename
for tilename in tiles_list:
print "Running MAGS-LIMIT pipeline for tile: {}".format(tilename)
tile = CS82(tilename)
from matplotlib import pyplot as plt
plt.cla()
plt.clf()
for mag in mags_list:
print "--> processing {} magnitude measurements;".format(mag)
tile.set_magtype(mag)
error_lims = [0.2,0.5]
def func_linear(t, a, b):
return a*t + b
fig,mag_lims_bx = Plots.boxplot(tile.magerr, by=tile.mag, hlines=error_lims,
fit_func=func_linear, nbins=20, savefig=True)
fig.savefig(_savefig_filename_(tile.filename,'boxplot_'+mag))
plt.cla()
plt.clf()
def func_log(t, a, b):
from numpy import log
return a + b * log(t)
title = '{}: measurement vs error'.format(mag)
fig,mag_lims_sc = Plots.scatter(x=tile.magerr, y=tile.mag, vlines=error_lims,
fit_func=func_log, title=title, savefig=True)
fig.savefig(_savefig_filename_(tile.filename,'scatter_'+mag))
tile.add_magLims({ mag:{'Error':error_lims , 'MagErr_Fit':mag_lims_sc , 'MagErr_Box':mag_lims_bx} })
plt.cla()
plt.clf()
print "--> done with magnitude plots and depth estimation;"
fig = Plots.histogram(tile.flags, savefig=True)
fig.savefig(_savefig_filename_(tile.filename,'flags'))
title = 'Footprint: {}'.format(tile.filename)
fig = Plots.footprint(tile.ra,tile.dec,title=title,savefig=True)
fig.savefig(_savefig_filename_(tile.filename,'footprint'))
tile.magLims.to_csv(_csv_filename_(tile.filename))
print "Done with this tile."
In [ ]: