import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import matplotlib.gridspec as gd
from astroquery.mast import Observations
from mpl_toolkits.axes_grid1 import make_axes_locatable
from astropy.io import fits
from astropy.table import Table
from astropy.stats import mad_std
from scipy.interpolate import interp1d
from scipy.optimize import root, fsolve
import matplotlib
import multiprocessing
import concurrent.futures
from pathlib import Path
from glob import glob
from tqdm import tqdm
#from .plotstyles import *
import pickle
import juliet
from .utils import *
import warnings
import os
try:
from photutils.aperture import CircularAnnulus, CircularAperture, ApertureStats
from photutils.aperture import aperture_photometry as aphot
except:
print('Warning: photutils is not installed! Will not be able to use photutils based photometry.')
try:
from dace_query.cheops import Cheops
except:
print('It looks like DACE API is not installed! Will not be able to download CHEOPS data products.')
[docs]
class CHEOPSData(object):
"""
Manipulate CHEOPS time-series data from simple reading to extraction.
This class provides tools to read, process, and extract photometry from
CHEOPS (CHaracterising ExOPlanet Satellite) observations. It supports
reading ``PIPE``-produced light curves, downloading data reduction pipeline
(DRP) products, handling sub-array data, and performing aperture photometry
on subarrays.
Parameters
----------
object_name : str, optional
Name or identifier of the target object. Used for querying the CHEOPS
database when downloading data products. Default is ``None``.
pipe_filename : str, optional
Path to a pipeline-produced FITS file for reading with :meth:`pipe_data`.
Default is ``None``.
Attributes
----------
pipe_fname : str or None
Stored pipeline FITS filename.
object_name : str or None
Stored target object name.
drp_data : dict
Dictionary containing DRP light curve data populated by
:meth:`get_drp_lightcurves` with keys for each instrument
(e.g., 'TG000001'), each containing 'TIME', 'FLUX', 'FLUX_ERR',
'ROLL', 'BG', 'CONTAM', 'SMEAR', 'XC', 'YC'.
time_sub, flux_sub, flux_err_sub, badpix_sub : dict
Dictionaries containing sub-array data populated by
:meth:`get_subarrays`.
subarray_data : dict, optional
Dictionary that may be populated with processed subarray data.
Examples
--------
Reading ``PIPE`` data:
>>> cheops = CHEOPSData(pipe_filename='path/to/file.fits')
>>> data = cheops.pipe_data(bgmin=300)
Downloading DRP light curves:
>>> cheops = CHEOPSData(object_name='WASP-189b')
>>> drp_data = cheops.get_drp_lightcurves(pout='/path/to/output')
Notes
-----
Requires the ``dace_query`` library to download CHEOPS data products from
the DACE (Data and Analysis Center for Exoplanets) database.
"""
def __init__(self, object_name=None, pipe_filename=None):
self.pipe_fname = pipe_filename
self.object_name = object_name
[docs]
def pipe_data(self, bgmin=300, not_flg_arr=None):
"""Extract and process light curve data from a ``PIPE`` FITS file.
This method reads a CHEOPS pipeline-produced FITS file (typically
from ``self.pipe_fname``) and extracts time series photometry along
with auxiliary data such as centroids, background, and temperature.
Data points are filtered by quality flags and background levels.
The flux is normalized by its median value.
Parameters
----------
bgmin : float, optional
Minimum background level threshold in counts. Data points with
background below this value are excluded. Default is ``300``.
not_flg_arr : array_like of int, optional
Array of FLAG values to *include* in the output (i.e., non-zero
flags are normally rejected, but values in this array are kept).
If ``None``, only FLAG==0 data are retained. Default is ``None``.
Returns
-------
data : dict
Dictionary containing extracted and processed data with keys:
- 'TIME' : ndarray
Barycentric Julian Date time stamps.
- 'FLUX' : ndarray
Normalized flux (normalized by median).
- 'FLUX_ERR' : ndarray
Flux uncertainties (normalized by median).
- 'ROLL' : ndarray
Roll angle in degrees.
- 'XC', 'YC' : ndarray
Centroid positions in pixels.
- 'BG' : ndarray
Background level in counts.
- 'TF2' : ndarray
thermFront2 temperature in Kelvin.
- 'U1' ... 'U16' : ndarray, optional
Principal components from PCA.
Notes
-----
Quality flags (FLAG column):
- 0 : Good data
- >0 : Data with issues (normally rejected)
The method performs two masking steps:
1. Removes flagged data and optionally includes specified flags
2. Removes high-background points (BG < bgmin)
Examples
--------
>>> cheops = CHEOPSData(pipe_filename='file.fits')
>>> data = cheops.pipe_data(bgmin=300)
>>> print(data['TIME'], data['FLUX'])
"""
hdul = fits.open(self.pipe_fname)
tab = Table.read(hdul[1])
# Masking datasets
flg = np.asarray(tab['FLAG']) # Flagged data
msk = np.where(flg==0)[0] # Creating a mask to remove flg>0 values
if not_flg_arr is not None:
for i in range(len(not_flg_arr)):
msk_f1 = np.where(flg==not_flg_arr[i])[0]
msk = np.hstack((msk, msk_f1))
# Gathering dataset
Us_n = np.array([])
Us = []
for i in tab.colnames:
if i[0] == 'U':
Us_n = np.hstack((Us_n, i))
for j in range(len(Us_n)):
usn = np.asarray(tab[Us_n[j]])[msk]
Us.append(usn)
tim, flx, flxe = np.asarray(tab['BJD_TIME'])[msk], np.asarray(tab['FLUX'])[msk], np.asarray(tab['FLUXERR'])[msk]
roll, xc, yc, bg = np.asarray(tab['ROLL'])[msk], np.asarray(tab['XC'])[msk], np.asarray(tab['YC'])[msk], np.asarray(tab['BG'])[msk]
tf2 = np.asarray(tab['thermFront_2'])[msk]
# Masking those points with high background values
msk1 = np.where(bg<bgmin)[0]
tim, flx, flxe, roll, xc, yc, bg, tf2 = tim[msk1], flx[msk1], flxe[msk1], roll[msk1], xc[msk1], yc[msk1], bg[msk1], tf2[msk1]
Us1 = []
for i in range(len(Us_n)):
us1 = Us[i][msk1]
Us1.append(us1)
# Normalising flux
flx, flxe = flx/np.median(flx), flxe/np.median(flx)
# Finally, sorting the data according to time
idx_sort = np.argsort(tim)
data = {}
data['TIME'], data['FLUX'], data['FLUX_ERR'] = tim[idx_sort], flx[idx_sort], flxe[idx_sort]
data['ROLL'], data['XC'], data['YC'], data['BG'] = roll[idx_sort], xc[idx_sort], yc[idx_sort], bg[idx_sort]
data['TF2'] = tf2[idx_sort]
for i in range(len(Us_n)):
data[Us_n[i]] = Us1[i][idx_sort]
return data
[docs]
def get_drp_lightcurves(self, pout=os.getcwd(), load=False, save=False, aperture='default', visit_nos=None, filekey=None, bkg_clip=20):
"""Download or load Data Reduction Pipeline light curves from CHEOPS.
This method queries the CHEOPS DACE database for light curve products
(DRP level) for the target specified in ``self.object_name``. It can
download a specific visit (by ``visit_nos`` or ``filekey``) or all
available visits. Downloaded data are filtered to remove flagged points,
high-background measurements, and cosmic rays (via EVENT/STATUS flags).
Flux is normalized by its median value per instrument.
Parameters
----------
pout : str, optional
Output directory for downloaded/saved products. Created if it does
not exist. Default is the current working directory.
load : bool, optional
If ``True``, skip downloading and instead load pre-downloaded files
from ``pout``. Useful for re-reading data. Default is ``False``.
save : bool, optional
If ``True``, retain downloaded FITS files in ``pout`` after
extraction. If ``False`` (default), files are deleted after reading.
aperture : str, optional
Aperture type to download ('default', 'optimal', 'rsup', and 'rinf).
Default is ``'default'``.
visit_nos : int or None, optional
Specific visit number to download (e.g., 701, 101, etc.). If provided,
only that visit's data are retrieved. Overridden by ``filekey``.
Default is ``None``.
filekey : str or None, optional
Specific file key (e.g., 'CH_PR100009_TG000001_...') for precise
product selection. Takes precedence over ``visit_nos``.
Default is ``None``.
bkg_clip : float, optional
Sigma threshold for background clipping. Points with background
above ``median(BG) + bkg_clip * MAD(BG)`` are removed.
Default is ``20``.
Returns
-------
drp_data : dict
Nested dictionary with structure::
{instrument_name: {
'TIME': ndarray,
'FLUX': ndarray,
'FLUX_ERR': ndarray,
'ROLL': ndarray,
'BG': ndarray,
'CONTAM': ndarray, # Contamination from nearby sources
'SMEAR': ndarray, # Smearing correction
'XC', 'YC': ndarray # Centroids
}}
Also stored in ``self.drp_data``.
Notes
-----
Data filtering steps:
1. Remove non-finite flux values
2. Remove points with EVENT != 0 or STATUS != 0
3. Remove high-background points (sigma clipping)
The method creates a tar archive internally when downloading
multiple files, which is extracted to ``pout`` then deleted.
Examples
--------
Download all DRP light curves for a target:
>>> cheops = CHEOPSData(object_name='WASP-189b')
>>> drp_data = cheops.get_drp_lightcurves(pout='./cheops_data')
Download a specific visit:
>>> drp_data = cheops.get_drp_lightcurves(visit_nos=1, pout='./data')
Load previously downloaded data:
>>> drp_data = cheops.get_drp_lightcurves(load=True, pout='./data')
"""
# When either filekey or visit_nos is provided, we can pin-point one particular visit
# In that case, we can download particular light curve for particular visit
# Otherwise, we will download DEFAULT aperture light curves for all visits
# If pout doesn't exist then make one
if not Path( pout ).exists():
os.mkdir( pout )
# Querying the CHEOPS database
if ( filekey == None ) and ( visit_nos != None ):
## If file-key is not provided, but visit number is provided, we can find filekey from that
values = Cheops.browse_products(filters={'target_name':{'equal': [self.object_name]}}, file_type='lightcurves')
for i in range( len( values['file'] ) ):
visit = 'TG' + f"{int(visit_nos):06}"
if values['file'][i].split('_')[1] == visit:
filekey = 'CH_' + values['file'][i].split('/')[1]
if ( filekey == None ) and ( visit_nos == None ):
filters = { 'target_name': { 'equal' : [self.object_name] } }
else:
filters = { 'target_name': { 'equal' : [self.object_name] },\
'file_key': { 'equal' : [filekey] } }
files = Cheops.browse_products(filters=filters, file_type='lightcurves', aperture=aperture)
file_list = files['file']
if not load:
Cheops.download(filters=filters, file_type='lightcurves', aperture=aperture, output_directory=pout)
if len(file_list) != 1:
os.system('tar -xzvf ' + pout + '/cheops_download.tar.gz -C ' + pout)
self.drp_data = {}
for ins in range(len(file_list)):
## Finding the instrument
instrument = file_list[ins].split('/')[1].split('_')[1]
self.drp_data[instrument] = {}
## Now loading the data
if ( len(file_list) == 1 ) or load:
hdul = fits.open( pout + '/' + file_list[ins].split('/')[-1] )
else:
hdul = fits.open( pout + '/' + '/'.join(file_list[ins].split('/')[1:]) )
table = Table.read( hdul[1] )
## Masking non-finite data points
mask = np.isfinite( table['FLUX'] )
times, flux, flux_err = table['BJD_TIME'][mask], table['FLUX'][mask], table['FLUXERR'][mask]
roll, bkg, contamination = table['ROLL_ANGLE'][mask], table['BACKGROUND'][mask], table['CONTA_LC'][mask]
smear, cenx, ceny = table['SMEARING_LC'][mask], table['CENTROID_X'][mask], table['CENTROID_Y'][mask]
event, status = table['EVENT'][mask], table['STATUS'][mask]
## Removing data points with EVENTS and STATUS
msk3 = np.ones( len(times), dtype=bool )
msk3[ event != 0. ] = False
msk3[ status != 0. ] = False
times, flux, flux_err = times[msk3], flux[msk3], flux_err[msk3]
roll, bkg, contamination = roll[msk3], bkg[msk3], contamination[msk3]
smear, cenx, ceny = smear[msk3], cenx[msk3], ceny[msk3]
## Removing high background points
msk_bkg = np.ones( len(times), dtype=bool )
msk_bkg[ bkg > ( np.nanmedian(bkg) + ( bkg_clip * mad_std(bkg) ) ) ] = False
times, flux, flux_err = times[msk_bkg], flux[msk_bkg], flux_err[msk_bkg]
roll, bkg, contamination = roll[msk_bkg], bkg[msk_bkg], contamination[msk_bkg]
smear, cenx, ceny = smear[msk_bkg], cenx[msk_bkg], ceny[msk_bkg]
# Saving the data in a dictionary
self.drp_data[instrument]['TIME'], self.drp_data[instrument]['FLUX'], self.drp_data[instrument]['FLUX_ERR'] = times, flux / np.nanmedian(flux), flux_err / np.nanmedian(flux)
self.drp_data[instrument]['ROLL'], self.drp_data[instrument]['BG'], self.drp_data[instrument]['CONTAM'] = roll, bkg, contamination
self.drp_data[instrument]['SMEAR'], self.drp_data[instrument]['XC'], self.drp_data[instrument]['YC'] = smear, cenx, ceny
if ( not save ) and ( not load ):
if len(file_list) == 1:
os.system('rm -rf ' + pout + '/' + file_list[ins].split('/')[-1])
else:
os.system('rm -rf ' + pout + '/' + file_list[0].split('/')[1])
if save:
if len(file_list) != 1:
os.system('mv ' + pout + '/' + '/'.join(file_list[ins].split('/')[1:]) + ' ' + pout)
os.system('rm -rf ' + pout + '/' + file_list[ins].split('/')[1] )
if not save:
os.system('rm -rf ' + pout + '/cheops_download.tar.gz')
return self.drp_data
[docs]
def get_subarrays(self, pout=os.getcwd(), download_all=False, load=False, visit_nos=None, filekey=None):
"""Download or load CHEOPS sub-array image time-series data.
This method queries and downloads sub-array FITS files from CHEOPS
data products, which contain the flux measurements as 2D image cubes
for each frame in the time series. These are useful for custom
photometry and pixel-level analysis.
Sub-array data are stored in ``self.time_sub``, ``self.flux_sub``,
``self.flux_err_sub``, and ``self.badpix_sub``.
Parameters
----------
pout : str, optional
Output directory for downloaded/saved products. Created if needed.
Default is the current working directory.
download_all : bool, optional
If ``True``, download all available file types (not just
sub-arrays). If ``False``, filter for sub-array products only.
Default is ``False``.
load : bool, optional
If ``True``, skip downloading and load pre-downloaded files from
``pout``. Default is ``False``.
visit_nos : int or None, optional
Specific visit number to download. If provided with ``filekey=None``,
only that visit's sub-arrays are retrieved. Default is ``None``.
filekey : str or None, optional
Specific file key for precise product selection. Takes precedence
over ``visit_nos``. Default is ``None``.
Returns
-------
None
Results are stored in instance dictionaries:
- ``self.time_sub[instrument]`` : ndarray
Time stamps (shape: N_frames).
- ``self.flux_sub[instrument]`` : ndarray
Flux images (shape: N_frames, Ny, Nx).
- ``self.flux_err_sub[instrument]`` : ndarray
Flux error (shape: N_frames, Ny, Nx)
- ``self.badpix_sub[instrument]`` : ndarray
Bad-pixel mask (shape: Ny, Nx) where 1 = good, 0 = bad.
Notes
-----
The method attempts to load a bad-pixel map from the CHEOPS data
products. If not found, a default all-good mask is created.
Sub-array file names are identified by the key 'COR_SubArray'
in the filename.
Examples
--------
Download all sub-arrays for a target:
>>> cheops = CHEOPSData(object_name='WASP-189b')
>>> cheops.get_subarrays(pout='./cheops_data')
Load previously downloaded sub-array data:
>>> cheops.get_subarrays(load=True, pout='./data')
"""
filetype = 'all' if download_all else 'sub'
# If pout doesn't exist then make one
if not Path( pout ).exists():
os.mkdir( pout )
# Querying the CHEOPS database
if ( filekey == None ) and ( visit_nos != None ):
## If file-key is not provided, but visit number is provided, we can find filekey from that
values = Cheops.browse_products(filters={'target_name':{'equal': [self.object_name]}}, file_type=filetype)
for i in range( len( values['file'] ) ):
visit = 'TG' + f"{int(visit_nos):06}"
if values['file'][i].split('_')[1] == visit:
filekey = 'CH_' + values['file'][i].split('/')[1]
if ( filekey == None ) and ( visit_nos == None ):
filters = { 'target_name': { 'equal' : [self.object_name] } }
else:
filters = { 'target_name': { 'equal' : [self.object_name] },\
'file_key': { 'equal' : [filekey] } }
files = Cheops.browse_products(filters=filters, file_type=filetype)
file_list = []
for i in range(len(files['file'])):
keywd = 'COR_SubArray'
if '_'.join(files['file'][i].split('_')[-3:-1]) == keywd:
file_list.append( files['file'][i] )
if not load:
Cheops.download(filters=filters, file_type=filetype, output_directory=pout)
os.system('tar -xzvf ' + pout + '/cheops_download.tar.gz -C ' + pout)
self.time_sub, self.flux_sub, self.flux_err_sub, self.badpix_sub = {}, {}, {}, {}
for ins in range(len(file_list)):
## Finding the instrument
instrument = file_list[ins].split('/')[1].split('_')[1]
hdul = fits.open( pout + '/' + '/'.join(file_list[ins].split('/')[1:]) )
table = Table.read( hdul[2] )
# Reading the flux
self.time_sub[instrument] = np.asarray( table['BJD_TIME'] )
self.flux_sub[instrument] = hdul[1].data
self.flux_err_sub[instrument] = np.sqrt( hdul[1].data + table['RON'][:,None,None]**2 )
try:
## If we have bad-pixel file, we can use it for bad-pixel map
hdul_badpix = fits.open( pout + '/' + file_list[ins].split('/')[1] + '/' + '_'.join(file_list[ins].split('/')[2].split('_')[0:4]) + '_PIP_COR_PixelFlagMapSubArray_V0300.fits')
## 2D badpixel map
badpix2d = np.ones(hdul_badpix[1].data.shape)
badpix2d[hdul_badpix[1].data != 0.] = 0.
## 3D badpixel map
badpix = np.ones( hdul[1].data.shape )
badpix = badpix * badpix2d[None, :, :]
except:
## Except, just assume that everything is good
badpix = np.ones( hdul[1].data.shape )
self.badpix_sub[instrument] = badpix
def ApPhoto(self, visit_nos, aprad=None, sky_rad1=None, sky_rad2=None, brightpix=False, nos_brightest=12, nos_faintest=None, minmax=None):
instrument = 'TG' + f"{int(visit_nos):06}"
return ApPhoto(times=self.time_sub[instrument], frames=self.flux_sub[instrument], errors=self.flux_err_sub[instrument], badpix=self.badpix_sub[instrument],\
aprad=aprad, sky_rad1=sky_rad1, sky_rad2=sky_rad2, brightpix=brightpix, nos_brightest=nos_brightest, nos_faintest=nos_faintest, minmax=minmax)
[docs]
class julietPlots(object):
"""Plotting helper for results produced by a `juliet` analysis.
This class loads a `juliet` input/output folder, computes models from the
fitted posterior and provides a collection of convenience plotting and
data-processing methods for light-curve visualisation. Typical usage is
to instantiate the class with the `input_folder` used for a juliet fit
and then call plotting helpers such as :meth:`full_model_lc`,
:meth:`plot_gp` or :meth:`detrend_data`.
Attributes
----------
input_folder : str, optional
Path to the juliet input/output folder provided at construction.
The user need to provide either input_folder or dataset and res
dataset : juliet.load object, optioanl
The object returned by ``juliet.load`` for the given input folder.
res : juliet.fit, optional
The result of calling ``dataset.fit(...)``.
N : int
Number of posterior samples drawn when evaluating sampled models.
**kwargs : dict, optional
Any additional keywords provided to juliet.fit object.
Main methods
------------
full_model_lc(instruments=None, save=False, nrandom=50, quantile_models=True)
Plot observed light curves with the full fitted model and residuals.
phase_folded_lc(phmin=0.8, instruments=None, highres=False, ...)
Plot phase folder light curve with best-fitted planetary model for one or more instruments.
plot_gp(instruments=None, highres=False, one_plot=False, ...)
Plot GP components and binned residuals for one or more instruments.
plot_fake_allan_deviation(instruments=None, binmax=10, method='pipe', timeunit=None)
Plot "allan deviation" plot, which is noise as a function of binning, for one or more instruments
plot_corner(planet_only=False, save=True)
Plot corner plots for fitted posterior samples, can use planet-only or all parameters.
"""
def __init__(self, input_folder=None, dataset=None, res=None, N=1000, **kwargs):
# First order of business: loading the juliet folder
# We can optinally provide datasets and res directly
if ( dataset is None ) and ( input_folder is not None ):
self.dataset = juliet.load(input_folder=input_folder)
self.res = self.dataset.fit(**kwargs)
else:
self.dataset = dataset
self.res = res
# Saving those kwargs
self.fit_kwargs = kwargs
# Location of the input folder
self.input_folder = input_folder
# Number of samples
self.nsamps = N
# Computing all models, for all instruments
self.models_all_ins = {}
self.all_mods_ins = {}
# Sorting indices for all instruments (sometimes GP regressors are not time -- in that case, times
# are sorted according to GP regressors, which will mess-up the time array -- we need to sort it again)
self.idx_time_sort = {}
for i in range(len(self.dataset.inames_lc)):
# Evaluating the models
## This will return 5 elements: model samples, median model, upper_CI, lower_CI, and components
self.models_all_ins[self.dataset.inames_lc[i]] =\
self.res.lc.evaluate(self.dataset.inames_lc[i], nsamples=self.nsamps, return_err=True,\
return_components=True, return_samples=True)
# Saving all models
self.all_mods_ins[self.dataset.inames_lc[i]] = self.res.lc.model[self.dataset.inames_lc[i]]
# Sorting array
self.idx_time_sort[self.dataset.inames_lc[i]] = np.argsort( self.dataset.times_lc[self.dataset.inames_lc[i]] )
[docs]
def highres_planet_models_workaround(self, instrument, times_highres, GP_reg_highres, lin_reg_highres):
"""
The usual way of generating high time-resolution models
is not working with juliet when light_travel_delay=True for
occultations and phase curve modelling. This is a workaround for this."""
# Creating dictionaries to save high-res times, fluxes, and errors
tim_highres, fl_highres, fle_highres = {}, {}, {}
## Actual high-res time array
tim_highres[instrument] = times_highres
## For flux and flux error highres array, we will simply use array with ones
fl_highres[instrument], fle_highres[instrument] = np.ones(len(times_highres)), np.ones(len(times_highres))
# GP and linear regressors would be arrays, so creating dictionaries to save them
if GP_reg_highres is not None:
gp_pars = {}
gp_pars[instrument] = GP_reg_highres
else:
gp_pars = None
if lin_reg_highres is not None:
lin_pars = {}
lin_pars[instrument] = lin_reg_highres
else:
lin_pars = None
## Again, running juliet.load and juliet.fit
data_highres = juliet.load(priors=self.input_folder + '/priors.dat', t_lc=tim_highres, y_lc=fl_highres, yerr_lc=fle_highres,\
GP_regressors_lc=gp_pars, linear_regressors_lc=lin_pars, out_folder=self.input_folder)
res_highres = data_highres.fit(**self.fit_kwargs)
post_samps_new = res_highres.posteriors['posterior_samples'].copy()
for i in post_samps_new.keys():
if ( i[0:5] == 'mflux' ) or ( i[0:9] == 'mdilution' ) or ( i[0:5] == 'theta' ) or ( i[0:2] == 'GP' ):
post_samps_new[i] = np.zeros(len(post_samps_new[i]))
planet_only_models_highres = \
res_highres.lc.evaluate(instrument, nsamples=self.nsamps, return_err=True, parameter_values=post_samps_new,\
return_components=True, return_samples=True, evaluate_transit=True)
return planet_only_models_highres
[docs]
def full_model_lc(self, instruments=None, save=False, nrandom=50, quantile_models=True):
"""Plot the full fitted light-curve model for one or more instruments.
This method creates a figure per instrument showing the observed
light curve with the fitted (full) model overplotted and a
residuals panel beneath.
Parameters
----------
instruments : list or None
List of instrument names to plot. If ``None``, all instruments
in the current juliet dataset plotted.
save : bool, optional
If ``True``, each figure is saved to
``<input_folder>/full_model_<instrument>.png``. Default ``False``.
nrandom : int, optional
Number of random posterior-sample models to draw when
``quantile_models`` is ``False``. Default is 50.
quantile_models : bool, optional
If ``True``, plot the 68%% credible interval as a filled band;
if ``False``, plot ``nrandom`` random posterior models instead.
Returns
-------
all_fig : list
List of matplotlib Figure objects created (one per instrument).
all_axs1 : list
List of top-panel Axes (data + model) for each figure.
all_axs2 : list
List of bottom-panel Axes (residuals) for each figure.
Notes
-----
The routine expects that ``self.models_all_ins`` was populated by
a prior call (see the class constructor) and that the dataset
arrays (times, data, errors) are available in ``self.dataset``.
"""
# Computing the model for all instruments
if instruments is None:
# If instruments is None, then all instruments are selected
instruments = self.dataset.inames_lc
all_fig, all_axs1, all_axs2 = [], [], []
for i in range(len(instruments)):
# Data
tim_ins = self.dataset.times_lc[instruments[i]][self.idx_time_sort[instruments[i]]]
fl_ins = self.dataset.data_lc[instruments[i]][self.idx_time_sort[instruments[i]]]
fle_ins = self.dataset.errors_lc[instruments[i]][self.idx_time_sort[instruments[i]]]
## Median model
full_model = self.models_all_ins[instruments[i]][1][self.idx_time_sort[instruments[i]]]
## Quantile models
up_68CI = self.models_all_ins[instruments[i]][2][self.idx_time_sort[instruments[i]]]
lo_68CI = self.models_all_ins[instruments[i]][3][self.idx_time_sort[instruments[i]]]
## Random models
sample_models = self.models_all_ins[instruments[i]][0][:, self.idx_time_sort[instruments[i]]]
idx = np.random.choice( np.arange(sample_models.shape[0]), size=nrandom )
random_models = sample_models[idx, :]
# And figure
fig = plt.figure()
gs = gd.GridSpec(2,1, height_ratios=[2,1])
# Top panel
ax1 = plt.subplot(gs[0])
ax1.errorbar(tim_ins, fl_ins, yerr=fle_ins, fmt='.', elinewidth=0.5, markersize=1.5, color='dodgerblue')#, alpha=0.3)
ax1.plot(tim_ins, full_model, c='navy', lw=2, zorder=50)
if quantile_models:
ax1.fill_between(x=tim_ins, y1=lo_68CI, y2=up_68CI, color='orangered', alpha=0.5, zorder=25)
else:
for rand in range(nrandom):
ax1.plot(tim_ins, random_models[rand,:], color='orangered', alpha=0.5, lw=1., zorder=25)
ax1.set_ylabel('Relative Flux')
ax1.set_title('Full fitted model for instrument ' + str(instruments[i]))
ax1.set_xlim(np.min(tim_ins), np.max(tim_ins))
ax1.xaxis.set_major_formatter(plt.NullFormatter())
# Bottom panel
ax2 = plt.subplot(gs[1])
ax2.errorbar(tim_ins, (fl_ins-full_model)*1e6, yerr=fle_ins*1e6, fmt='.', elinewidth=0.5, markersize=1.5, color='dodgerblue')#, alpha=0.3)
ax2.axhline(y=0.0, c='black', ls='--', zorder=100)
ax2.set_ylabel('Residuals [ppm]')
ax2.set_xlabel('Time [BJD]')
ax2.set_xlim(np.min(tim_ins), np.max(tim_ins))
all_fig.append(fig)
all_axs1.append(ax1)
all_axs2.append(ax2)
if save:
plt.savefig(self.input_folder + '/full_model_' + instruments[i] + '.png', dpi=250)
return all_fig, all_axs1, all_axs2
[docs]
def detrend_data(self, phmin, instruments, planet='p1'):
"""This function will generate detrended data from the raw data. It will also generate phases."""
# First, create detrended dataset and models
## For detrended dataset
self.phase, self.detrended_data, self.detrended_errs = {}, {}, {}
self.ph_minimum, self.ph_maximum = 1e10, 1e-10
## For normalising the data, we will compute F1 and F2,
## We can use the same F1 and F2 to normalise the model as well
## So, let's save them!
self.F1, self.F2 = {}, {}
for i in range(len(instruments)):
# -----------------------------------------
# Our first task is to compute
# period and t0:
# -----------------------------------------
try:
per = np.nanmedian(self.res.posteriors['posterior_samples']['P_' + planet])
except:
per = self.all_mods_ins[instruments[i]]['params'].per
try:
t0 = np.nanmedian(self.res.posteriors['posterior_samples']['t0_' + planet])
except:
t0 = self.all_mods_ins[instruments[i]]['params'].t0
# -----------------------------------------
# -----------------------------------------
# Next we would like to compute GP
# and linear models, so that we can
# detrend the data
# -----------------------------------------
# GP model
if self.dataset.GP_lc_arguments != None:
if instruments[i] in self.dataset.GP_lc_arguments.keys():
gp_model = self.all_mods_ins[instruments[i]]['GP']
else:
gp_model = 0.
else:
gp_model = 0.
if instruments[i] in self.dataset.lm_lc_arguments.keys():
linear_model = self.models_all_ins[instruments[i]][-1]['lm']
else:
linear_model = 0.
# -----------------------------------------
# -----------------------------------------
# Finally, we want to compute F1 and F2
# So we can normalise the detrended data
# The full model is defined as (courtesy of the original juliet code),
# y = [ T * ( D / (1 + D*M) ) ] + [ (1 - D) / (1 + D*M)]
# => y = T * F1 + F2
# => T = (y - F2) / F1
# -----------------------------------------
mflx = self.res.posteriors['posterior_samples']['mflux_' + instruments[i]]
try:
mdil = self.res.posteriors['posterior_samples']['mdilution_' + instruments[i]]
except:
mdil = np.ones( len(mflx) )
F1 = np.nanmedian( mdil / ( 1 + (mdil * mflx) ) )
F2 = np.nanmedian( (1 - mdil) / (1 + (mdil * mflx) ) )
## Saving F1 and F2
self.F1[instruments[i]], self.F2[instruments[i]] = F1, F2
sigw = np.nanmedian(self.res.posteriors['posterior_samples']['sigma_w_' + instruments[i]] * 1e-6)
## Here we create detrended data by subtracting GP (if any) and linear (if any) models from the raw data
self.detrended_data[instruments[i]] = ( self.dataset.data_lc[instruments[i]] - gp_model - linear_model - F2) / F1
self.detrended_errs[instruments[i]] = np.sqrt( self.dataset.errors_lc[instruments[i]]**2 + sigw**2 )
phases_instrument = juliet.utils.get_phases(self.dataset.times_lc[instruments[i]], P=per, t0=t0, phmin=phmin)
self.phase[instruments[i]] = phases_instrument
# Computing the minimum and maximum of the orbital phases
if np.max(phases_instrument) > self.ph_maximum:
self.ph_maximum = np.max(phases_instrument)
if np.min(phases_instrument) < self.ph_minimum:
self.ph_minimum = np.min(phases_instrument)
def detrend_model(self, instruments, phmin, highres, planet='p1'):
# If we want to create high-resolution model (highres=True), then we will use the same times for all instruments
# If not, then we will use different times for different instruments
self.times_model, self.phases_model = {}, {}
self.planet_only_models = {}
# -----------------------------------------
# Our first task is to compute
# period and t0:
# -----------------------------------------
try:
per = np.nanmedian(self.res.posteriors['posterior_samples']['P_' + planet])
except:
per = self.all_mods_ins[instruments[0]]['params'].per
try:
t0 = np.nanmedian(self.res.posteriors['posterior_samples']['t0_' + planet])
except:
t0 = self.all_mods_ins[instruments[0]]['params'].t0
# We also have to create a new posterior array since when we compute detrended model,
# we only want uncertainties from planetary parameters to be propagated to the model uncertainties
# So, we will make posteriors of mflux, theta, and GP to zero, such that any uncertainties
# in them would not propagate to model uncertainties.
post_samps_new = self.res.posteriors['posterior_samples'].copy()
for i in post_samps_new.keys():
if ( i[0:5] == 'mflux' ) or ( i[0:9] == 'mdilution' ) or ( i[0:5] == 'theta' ) or ( i[0:2] == 'GP' ):
post_samps_new[i] = np.zeros(len(post_samps_new[i]))
if highres:
## First selecting the times
## This this is a high time resolution scenario so we will create a dummy time array
dummy_phs_highres = np.linspace(self.ph_minimum, self.ph_maximum, 10000)
dummy_time_highres = t0 + (dummy_phs_highres * per)
# Now loop over all instruments to create high time-resolution planet-only models
for ins in range(len(instruments)):
## First we have to check if there are GP or linear model fitted to the data
## Because, we have to create high-resolution GP and linear regressors as well.
## We will do this by interpolating the original GP and linear regressors
### High resolution GP regressors
### (we have to remember that everything is sorted according to GP regressors)
### (while dummy time is sorted according to times)
### (we will perform interpolation to arrays sorted according to time)
if self.dataset.GP_lc_arguments != None:
if instruments[ins] in self.dataset.GP_lc_arguments.keys():
#! Try/except because it is possible that not all instruments have GP model
try:
inter_gp = interp1d(x=self.dataset.times_lc[instruments[ins]][self.idx_time_sort[instruments[ins]]],\
y=self.dataset.GP_lc_arguments[instruments[ins]][:,0][self.idx_time_sort[instruments[ins]]],\
kind='cubic')
gp_reg_highres = inter_gp(x=dummy_time_highres)
#### Now, we can sort time, phase, gp regressors according to GP regressors
idx_gp = np.argsort( gp_reg_highres )
dummy_time_highres, dummy_phs_highres = dummy_time_highres[idx_gp], dummy_phs_highres[idx_gp]
gp_reg_highres = gp_reg_highres[idx_gp]
except:
gp_reg_highres = None
else:
gp_reg_highres = None
else:
gp_reg_highres = None
### High resolution linear regressors
if instruments[ins] in self.dataset.lm_lc_arguments.keys():
#! Again, try/except because not all instruments might have a linear model
## (Here, we don't need to sort according to time -- because the dummy_times should have now
## sorted according to GP regressors, if applicable)
try:
inter_lin = interp1d(x=self.dataset.times_lc[instruments[ins]],\
y=self.dataset.lm_lc_arguments[instruments[ins]],\
kind='cubic', axis=0)
lin_reg_highres = inter_lin(x=dummy_time_highres)
except:
lin_reg_highres = None
else:
lin_reg_highres = None
try:
# This high-res time models will not work for occultations/phase curves, when light_travel_delay=True
self.planet_only_models[instruments[ins]] = \
self.res.lc.evaluate(instruments[ins], nsamples=self.nsamps, return_err=True, parameter_values=post_samps_new,\
return_components=True, return_samples=True, evaluate_transit=True,\
t=dummy_time_highres, GPregressors=gp_reg_highres, LMregressors=lin_reg_highres)
except:
# In case when it doesn't work, I have invented a workaround
self.planet_only_models[instruments[ins]] = \
self.highres_planet_models_workaround(instrument=instruments[ins], times_highres=dummy_time_highres,\
GP_reg_highres=gp_reg_highres, lin_reg_highres=lin_reg_highres)
## Saving times and phases for model
self.times_model[instruments[ins]] = dummy_time_highres
self.phases_model[instruments[ins]] = dummy_phs_highres
else:
for ins in range(len(instruments)):
# This should be fairly trivial since we don't need to compute the models at higher time resolution
self.planet_only_models[instruments[ins]] = \
self.res.lc.evaluate(instruments[ins], nsamples=self.nsamps, return_err=True, parameter_values=post_samps_new,\
return_components=True, return_samples=True, evaluate_transit=True)
## Saving times and phases for model
self.times_model[instruments[ins]] = self.dataset.times_lc[instruments[ins]]
self.phases_model[instruments[ins]] = juliet.utils.get_phases(t=self.dataset.times_lc[instruments[ins]], P=per, t0=t0, phmin=phmin)
[docs]
def phase_folded_lc(self, phmin=0.8, instruments=None, planet='p1', highres=False, nrandom=50, quantile_models=True, one_plot=None, figsize=(16/1.5, 9/1.5), pycheops_binning=False, nos_bin_tra=20, nos_bin_pc=30):
"""Plot phase-folded light curves and models for the fitted dataset.
This method computes detrended data and planetary models by
calling ``detrend_data`` and ``detrend_model`` internally, detects
whether the fit contains transits, eclipses or phase-curves, and
then produces matplotlib figures showing the data, model, and
residuals. It can produce either one figure per instrument or a single
combined figure with all instruments plotted together.
Parameters
----------
phmin : float, optional
Minimum phase value used for generating orbital phase (default
``0.8``).
instruments : list or None, optional
List of instrument names to plot. If ``None``, all instruments
in the juliet dataset (``self.dataset.inames_lc``) are used. One plot
is produced if all instruments are provided; otherwise, one plot
per instrument is created.
planet : str, optional
Planet identifier used in the juliet fit (e.g., 'p1', 'p2', etc.) to select
the appropriate planet model. Default is 'p1'.
highres : bool, optional
If ``True``, compute and plot high time-resolution planet-only
models. Default ``False``.
nrandom : int, optional
Number of random posterior-sample models to draw when
``quantile_models`` is ``False``. Default ``50``.
quantile_models : bool, optional
If ``True``, display the 68%% credible interval as a filled
band; if ``False``, overplot ``nrandom`` random posterior
samples. Default ``True``.
one_plot : bool or None, optional
If ``True``, produce a single combined plot for all instruments
(regardless of the number of instruments provided). If
``False``, produce one plot per instrument (unless all
instruments are provided). If ``None``, the behaviour is as
just described. Default ``None``.
figsize : tuple, optional
Figure size passed to matplotlib when creating figures. Default
is ``(16/1.5, 9/1.5)``.
pycheops_binning : bool, optional
If ``True``, use binning as produced by ``pycheops`` the
binned datapoints. If ``False``, it will use default ``juliet``
binning. Default ``False``.
nos_bin_tra : int, optional
Number of total binned data points in transit plot. Default is 20.
nos_bin_pc : int, optional
Number of total binned data points in phase curve plot. Default is 30.
Returns
-------
If a single combined plot is produced the function returns the
combined ``fig`` and the primary axes used. If multiple figures are
generated, the function returns lists: ``figs_all, axs1_all,
axs2_all, axs3_all, axs4_all`` corresponding to created figures and
their axes panels.
"""
# -------------------------------------------
# Do we want one plot?
# -------------------------------------------
# If instruments == None, then we will make one common plot for _all_ instruments
# Also, if all instruments are provided, this will make one common plot
# Otherwise, one plot per instrument
# However, sometimes we want to force one plot, so we have the option one_plot
if one_plot is None:
one_plot = True
if instruments != None:
if len(instruments) != len(self.dataset.inames_lc):
one_plot = False
else:
instruments = self.dataset.inames_lc
# -------------------------------------------
# Compute t14
# -------------------------------------------
# Let's compute the transit/occultation duration for the planet (that can help setting the xlimits of the plots)
## We need several other parameters for that first
### Orbital period
try:
per = np.nanmedian(self.res.posteriors['posterior_samples']['P_' + planet])
except:
per = self.all_mods_ins[instruments[0]]['params'].per
### Rp/R*
try:
rprs = np.nanmedian(self.res.posteriors['posterior_samples']['p_' + planet])
except:
rprs = self.all_mods_ins[instruments[0]]['params'].rp
### a/R*
if 'rho' in self.res.model_parameters:
### This would mean that we have fitted for rho. We can convert rho to a/*
try:
rho = np.nanmedian(self.res.posteriors['posterior_samples']['rho'])
ar = rho_to_ar(rho, per)
except:
ar = self.all_mods_ins[instruments[0]]['params'].a
else:
### That means that we directly fit for a/R*
try:
ar = np.nanmedian(self.res.posteriors['posterior_samples']['a_' + planet])
except:
ar = self.all_mods_ins[instruments[0]]['params'].a
### b
try:
b = np.nanmedian(self.res.posteriors['posterior_samples']['b_' + planet])
except:
inc = self.all_mods_ins[instruments[0]]['params'].inc
b = inc_to_b(inc=inc, ar=ar, ecc=self.all_mods_ins[instruments[0]]['params'].ecc,\
omega=self.all_mods_ins[instruments[0]]['params'].w)
t14_dur = t14(per=per, ar=ar, rprs=rprs, b=b, ecc=self.all_mods_ins[instruments[0]]['params'].ecc,\
omega=self.all_mods_ins[instruments[0]]['params'].w)
t14_phs = t14_dur / per
# -------------------------------------------
# Computing detrended data and model
# -------------------------------------------
self.detrend_data(phmin=phmin, instruments=instruments, planet=planet)
self.detrend_model(instruments=instruments, phmin=phmin, highres=highres, planet=planet)
# -------------------------------------------
# Type of fitting
# -------------------------------------------
# We can find the type of fitting just by looking at the phase range covered
phasecurve, transit, eclipse = False, False, False
for ins in range(len(instruments)):
if self.dataset.lc_options[instruments[ins]]['TranEclFit']:
phasecurve = True
elif self.dataset.lc_options[instruments[ins]]['EclipseFit']:
eclipse = True
else:
transit = True
## For phase curve fitting, eclipse fitting is by default true, so, we need to manually turn it off
if phasecurve and eclipse:
eclipse = False
## Sometimes, even for just eclipse fitting we might have turned full transit+eclipse fitting, so double check
## if there is indeed phase curve fitting in the data
if phasecurve:
diff_phs = self.ph_maximum - self.ph_minimum
if diff_phs<0.3:
phasecurve = False
eclipse = True
# -------------------------------------------
# Starting figure code:
# one plot: do plt.figure() here
# Depending on phase curve/transit/occ
# We need either 4 panels, or 2 panels
# -------------------------------------------
if one_plot:
if not phasecurve:
fig = plt.figure(figsize=figsize)
gs = gd.GridSpec(2,1, height_ratios=[2,1])
ax1 = plt.subplot(gs[0]) # Top panel
ax2 = plt.subplot(gs[1]) # Bottom panel
ax3, ax4 = None, None
else:
fig = plt.figure(figsize=figsize)
gs = gd.GridSpec(2, 2, height_ratios=[2,1], width_ratios=[1,3])#, wspace=0.1)
ax1 = plt.subplot(gs[0,0]) # Upper-left (for transit data)
ax2 = plt.subplot(gs[1,0]) # Bottom-left (for transit residuals)
ax3 = plt.subplot(gs[0,1]) # Upper-right (for phase curve)
ax4 = plt.subplot(gs[1,1]) # Bottom-right (for phase curve residuals)
# If there is one plot, then we also need to
# collect all data to compute binned data
bin_phs, bin_fl, bin_fle, bin_res = np.array([]), np.array([]), np.array([]), np.array([])
# Also, in case we don't have highres model, we want to save all models before plotting
all_models_for_plotting = np.array([])
all_u68_mods_plotting, all_l68_mods_plotting = np.array([]), np.array([])
# ------------------------------------------------------------------------
# Creating a list of all fig, axes to return at the end of the function
# if one_plot==False
if not one_plot:
figs_all, axs1_all, axs2_all, axs3_all, axs4_all = [], [], [], [], []
for i in range(len(instruments)):
# -------------------------------------------
# Starting figure code:
# If not one plot: do plt.figure()
# inside the loop
# Depending on phase curve/transit/occ
# We need either 4 panels, or 2 panels
# -------------------------------------------
if not one_plot:
if not phasecurve:
fig = plt.figure(figsize=figsize)
gs = gd.GridSpec(2,1, height_ratios=[2,1])
ax1 = plt.subplot(gs[0])
ax2 = plt.subplot(gs[1])
ax3, ax4 = None, None
else:
fig = plt.figure(figsize=figsize)
gs = gd.GridSpec(2, 2, height_ratios=[2,1], width_ratios=[1,3])#, wspace=0.1)
ax1 = plt.subplot(gs[0,0]) # Upper-left (for transit data)
ax2 = plt.subplot(gs[1,0]) # Bottom-left (for transit residuals)
ax3 = plt.subplot(gs[0,1]) # Upper-right (for phase curve)
ax4 = plt.subplot(gs[1,1]) # Bottom-right (for phase curve residuals)
# -------------------------------------------
# Collecting the required data
# -------------------------------------------
# Detrended data
## Sorting according to phase
idx_phs = np.argsort(self.phase[instruments[i]])
phs = self.phase[instruments[i]][idx_phs]
detrend_data = self.detrended_data[instruments[i]][idx_phs]
detrend_errs = self.detrended_errs[instruments[i]][idx_phs]
# Detrended model
## Sorting according to phase
idx_phs_mod = np.argsort( self.phases_model[instruments[i]] )
phs_model = self.phases_model[instruments[i]][idx_phs_mod]
detrend_model = self.planet_only_models[instruments[i]][1][idx_phs_mod]
## Quantile models
up_68CI = self.planet_only_models[instruments[i]][2][idx_phs_mod]
lo_68CI = self.planet_only_models[instruments[i]][3][idx_phs_mod]
## Random models
sample_models = self.planet_only_models[instruments[i]][0][:, idx_phs_mod]
idx = np.random.choice( np.arange(sample_models.shape[0]), size=nrandom )
random_models = sample_models[idx, :]
# Residuals
residuals = ( self.dataset.data_lc[instruments[i]] - self.models_all_ins[instruments[i]][1] )[idx_phs]
# If we have phase curve fitting, we will use ppm units for the y-axis
if phasecurve or eclipse:
detrend_data, detrend_errs = (detrend_data-1.)*1e6, detrend_errs*1e6
detrend_model = (detrend_model-1.)*1e6
random_models = (random_models-1.)*1e6
lo_68CI, up_68CI = (lo_68CI-1.)*1e6, (up_68CI-1.)*1e6
if one_plot:
# -------------------------------------------
# Saving the data for binning
# outside the loop
# -------------------------------------------
bin_phs = np.hstack(( bin_phs, phs ))
bin_fl, bin_fle = np.hstack(( bin_fl, detrend_data )), np.hstack(( bin_fle, detrend_errs ))
bin_res = np.hstack(( bin_res, residuals ))
all_models_for_plotting = np.hstack(( all_models_for_plotting, detrend_model ))
all_l68_mods_plotting = np.hstack(( all_l68_mods_plotting, lo_68CI ))
all_u68_mods_plotting = np.hstack(( all_u68_mods_plotting, up_68CI ))
# ----------------------------------------------------------
# The actual plotting code starts from here
# ----------------------------------------------------------
# -------------------------------------------
# Top left: transit data
# -------------------------------------------
if phasecurve:
ppt = 1e-3
else:
ppt = 1.
ax1.errorbar(phs, detrend_data*ppt, fmt='.', elinewidth=0.5, markersize=1.5, alpha=0.5, color='dodgerblue', zorder=1)
if (not one_plot) or highres:
## if we are making one-plot, we can plot the detrended model out of loop
ax1.plot(phs_model, detrend_model*ppt, color='navy', lw=2.5, zorder=50)
if quantile_models:
if (not one_plot) or highres:
## Again, if we are making one plot, we can plot the detrended quantile models out of loop
ax1.fill_between(phs_model, y1=lo_68CI*ppt, y2=up_68CI*ppt, color='orangered', alpha=0.15, zorder=25)
else:
for rand in range(nrandom):
ax1.plot(phs_model, random_models[rand,:]*ppt, color='orangered', alpha=0.5, lw=1., zorder=25)
# Limits
## Maximum of the transit model would always be 1. (or, 0 in case we have phase curve model, and we decided to use ppm)
if transit or phasecurve:
ax1.set_xlim(-0.7*t14_phs, 0.7*t14_phs)
else:
ax1.set_xlim(0.5-0.7*t14_phs, 0.5+0.7*t14_phs)
if transit:
ax1.set_ylim([np.min(detrend_model)-0.2*np.ptp(detrend_model), 1+0.2*np.ptp(detrend_model)])
elif phasecurve:
ax1.set_ylim([np.min(detrend_model*ppt)-0.2*np.ptp(detrend_model*ppt), 0.+0.2*np.ptp(detrend_model*ppt)])
else:
ax1.set_ylim([1-0.3*np.ptp(detrend_model), np.max(detrend_model)+0.3*np.ptp(detrend_model)])
# -------------------------------------------
# Bottom left: transit residuals
# -------------------------------------------
ax2.errorbar(phs, residuals*ppt*1e6, fmt='.', elinewidth=0.5, markersize=1.5, alpha=0.5, color='dodgerblue', zorder=1)
ax2.axhline(y=0., ls='--', color='k', zorder=50)
# Limits (x-limit same as the top axis; for y-lim, we will set 3-sigma from median)
if transit or phasecurve:
ax2.set_xlim(-0.7*t14_phs, 0.7*t14_phs)
else:
ax2.set_xlim(0.5-0.7*t14_phs, 0.5+0.7*t14_phs)
# -------------------------------------------
# Binned data: For TRANSIT
# -------------------------------------------
## We do binned data here: ONLY if we don't have one plot
if not one_plot:
if transit or phasecurve:
idx_transits = ( phs > -0.8*t14_phs ) & ( phs < 0.8*t14_phs )
else:
idx_transits = ( phs > 0.5-0.8*t14_phs ) & ( phs < 0.5+0.8*t14_phs )
if not pycheops_binning:
nbin = int( np.sum(idx_transits) / nos_bin_tra )
bin_phs_tra, bin_fl_tra, bin_fle_tra = juliet.utils.bin_data(x=phs[idx_transits], y=detrend_data[idx_transits]*ppt, n_bin=nbin, yerr=detrend_errs[idx_transits]*ppt)
if phasecurve or eclipse:
_, bin_res_tra, bin_reserr_tra = juliet.utils.bin_data(x=phs[idx_transits], y=residuals[idx_transits]*ppt*1e6, n_bin=nbin, yerr=detrend_errs[idx_transits]*ppt)
else:
# When transit only, we haven't multiplied the errorbars with 1e6 yet; so we need to do it again
_, bin_res_tra, bin_reserr_tra = juliet.utils.bin_data(x=phs[idx_transits], y=residuals[idx_transits]*1e6, n_bin=nbin, yerr=detrend_errs[idx_transits]*1e6)
else:
binwid = np.ptp( phs[idx_transits] ) / nos_bin_tra
bin_phs_tra, bin_fl_tra, bin_fle_tra, _ = lcbin(time=phs[idx_transits], flux=detrend_data[idx_transits]*ppt, binwidth=binwid)
if phasecurve or eclipse:
_, bin_res_tra, bin_reserr_tra, _ = lcbin(time=phs[idx_transits], flux=residuals[idx_transits]*ppt*1e6, binwidth=binwid)
else:
# When transit only, we haven't multiplied the errorbars with 1e6 yet; so we need to do it again
_, bin_res_tra, bin_reserr_tra, _ = lcbin(time=phs[idx_transits], flux=residuals[idx_transits]*1e6, binwidth=binwid)
# Plotting them
ax1.errorbar(bin_phs_tra, bin_fl_tra, yerr=bin_fle_tra, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100)
ax2.errorbar(bin_phs_tra, bin_res_tra, yerr=bin_reserr_tra, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100)
# Setting y-lim on residuals based on binned residuals
ax2.set_ylim(np.nanmedian(bin_res_tra)-5*pipe_mad(bin_res_tra), np.nanmedian(bin_res_tra)+5*pipe_mad(bin_res_tra))
# -------------------------------------------
# If phase curve modelling,
# then we also need right panel
# -------------------------------------------
if phasecurve:
# -------------------------------------------
# Top right: phase curve
# -------------------------------------------
ax3.errorbar(phs, detrend_data, fmt='.', elinewidth=0.5, markersize=1.5, alpha=0.5, color='dodgerblue', zorder=1)
if (not one_plot) or highres:
## If we are making one plot, then we can take this outside of the loop
ax3.plot(phs_model, detrend_model, color='navy', lw=2.5, zorder=50)
ax3.axhline(0.0, color='k', ls='--', lw=1.5, zorder=45)
if quantile_models:
if (not one_plot) or highres:
## If we are making one plot, we can take this outside of the loop
ax3.fill_between(phs_model, y1=lo_68CI, y2=up_68CI, color='orangered', alpha=0.15, zorder=25)
else:
for rand in range(nrandom):
ax3.plot(phs_model, random_models[rand,:], color='orangered', alpha=0.5, lw=1., zorder=25)
# Limits
ax3.set_xlim(phmin-1., phmin)
## For y-lim; we have baseline of 0, we can leave half occultation depth/phase amplitude below and above
ylen = np.ptp( detrend_model[np.abs(phs_model) > 0.1] )
ax3.set_ylim([-0.4*ylen, 1.5*ylen])
# -------------------------------------------
# Bottom right: phase curve residuals
# -------------------------------------------
ax4.errorbar(phs, residuals*1e6, fmt='.', elinewidth=0.5, markersize=1.5, alpha=0.5, color='dodgerblue', zorder=1)
ax4.axhline(y=0., ls='--', color='k', zorder=10)
# Limits
ax4.set_xlim(phmin-1., phmin)
# -------------------------------------------
# Binned data: For PHASE CURVE
# -------------------------------------------
## We do binned data here: ONLY if we don't have one plot
if not one_plot:
if not pycheops_binning:
bin_phs_pc, bin_fl_pc, bin_fle_pc = juliet.utils.bin_data(x=phs, y=detrend_data, n_bin=int( len(phs)/nos_bin_pc ), yerr=detrend_errs)
_, bin_res_pc, bin_reserr_pc = juliet.utils.bin_data(x=phs, y=residuals*1e6, n_bin=int( len(phs)/nos_bin_pc ), yerr=detrend_errs)
else:
bin_phs_pc, bin_fl_pc, bin_fle_pc, _ = lcbin(time=phs, flux=detrend_data, binwidth=1/nos_bin_pc)
_, bin_res_pc, bin_reserr_pc, _ = lcbin(time=phs, flux=residuals*1e6, binwidth=1/nos_bin_pc)
# Plotting them
ax3.errorbar(bin_phs_pc, bin_fl_pc, yerr=bin_fle_pc, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100)
ax4.errorbar(bin_phs_pc, bin_res_pc, yerr=bin_reserr_pc, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100)
ax4.set_ylim(np.nanmedian(bin_res_pc)-5*pipe_mad(bin_res_pc), np.nanmedian(bin_res_pc)+5*pipe_mad(bin_res_pc))
if not one_plot:
# ---------------------------------------------------
# Figure labels etc.
# ---------------------------------------------------
# ------------------------
#. Transits
# ------------------------
## First, removing the x-axis labels of top panels
ax1.xaxis.set_major_formatter(plt.NullFormatter())
# x-and y-labels
if not phasecurve:
if not eclipse:
ax1.set_ylabel('Normalised flux')
else:
ax1.set_ylabel('Normalised flux [ppm]')
ax2.set_ylabel('Residuals [ppm]')
else:
ax1.set_ylabel('Normalised flux [ppt]')
ax2.set_ylabel('Residuals [ppt]')
ax2.set_xlabel('Orbital phase')
# ------------------------
#. Transits
# ------------------------
if phasecurve:
## Removing x-axis labels of top panels
ax3.xaxis.set_major_formatter(plt.NullFormatter())
## Now, we will switch the y-axis labels to right side for phase curve panels
ax3.yaxis.tick_right()
ax3.tick_params(labelright=True)
ax3.yaxis.set_label_position('right')
ax4.yaxis.tick_right()
ax4.tick_params(labelright=True)
ax4.yaxis.set_label_position('right')
## x-and y-labels
ax3.set_ylabel('Normalised flux [ppm]', rotation=270, labelpad=25)
ax4.set_xlabel('Orbital phase')
ax4.set_ylabel('Residuals [ppm]', rotation=270, labelpad=25)
# Saving figs and axes if not one_plot
figs_all.append(fig)
axs1_all.append(ax1)
axs2_all.append(ax2)
axs3_all.append(ax3)
axs4_all.append(ax4)
# -------------------------------------------------------------
# Now, we plot the binned data when
# we have one plot
# -------------------------------------------------------------
if one_plot:# and (not highres):
## This ppt will convert transit data to PPT, when phase curve is plotted
if phasecurve:
ppt = 1e-3
else:
ppt = 1.
# -------------------------------------------
# Binned data: For Transits
# -------------------------------------------
idx_phs_bin = np.argsort(bin_phs)
bin_phs, bin_fl, bin_fle, bin_res = bin_phs[idx_phs_bin], bin_fl[idx_phs_bin], bin_fle[idx_phs_bin], bin_res[idx_phs_bin]
# Now, we want to decide nbin (number of data points to bin)
# We would like to have different binning for transit and phase curve -- so first selecting nbin for transit/occultation
if transit or phasecurve:
idx_transits = ( bin_phs > -0.8*t14_phs ) & ( bin_phs < 0.8*t14_phs )
else:
idx_transits = ( bin_phs > 0.5-0.8*t14_phs ) & ( bin_phs < 0.5+0.8*t14_phs )
if not pycheops_binning:
nbin = int( np.sum(idx_transits) / nos_bin_tra )
# Performing binning
bin_phs_tra, bin_fl_tra, bin_fle_tra = juliet.utils.bin_data(x=bin_phs[idx_transits], y=bin_fl[idx_transits]*ppt, n_bin=nbin, yerr=bin_fle[idx_transits]*ppt)
if phasecurve or eclipse:
_, bin_res_tra, bin_reserr_tra = juliet.utils.bin_data(x=bin_phs[idx_transits], y=bin_res[idx_transits]*ppt*1e6, n_bin=nbin, yerr=bin_fle[idx_transits]*ppt)
else:
# When transit only, we haven't multiplied the errorbars with 1e6 yet; so we need to do it again
_, bin_res_tra, bin_reserr_tra = juliet.utils.bin_data(x=bin_phs[idx_transits], y=bin_res[idx_transits]*1e6, n_bin=nbin, yerr=bin_fle[idx_transits]*1e6)
else:
binwid = np.ptp( bin_phs[idx_transits] ) / nos_bin_tra
# Performing binning
bin_phs_tra, bin_fl_tra, bin_fle_tra, _ = lcbin(time=bin_phs[idx_transits], flux=bin_fl[idx_transits]*ppt, binwidth=binwid)
if phasecurve or eclipse:
_, bin_res_tra, bin_reserr_tra, _ = lcbin(time=bin_phs[idx_transits], flux=bin_res[idx_transits]*ppt*1e6, binwidth=binwid)
else:
# When transit only, we haven't multiplied the errorbars with 1e6 yet; so we need to do it again
_, bin_res_tra, bin_reserr_tra, _ = lcbin(time=bin_phs[idx_transits], flux=bin_res[idx_transits]*1e6, binwidth=binwid)
# Plotting them
ax1.errorbar(bin_phs_tra, bin_fl_tra, yerr=bin_fle_tra, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100)
ax2.errorbar(bin_phs_tra, bin_res_tra, yerr=bin_reserr_tra, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100)
# Setting y-lim on residuals based on binned residuals
ax2.set_ylim(np.nanmedian(bin_res_tra)-5*pipe_mad(bin_res_tra), np.nanmedian(bin_res_tra)+5*pipe_mad(bin_res_tra))
# -------------------------------------------
# Binned data: For PHASE CURVE
# -------------------------------------------
if phasecurve:
if not pycheops_binning:
bin_phs_pc, bin_fl_pc, bin_fle_pc = juliet.utils.bin_data(x=bin_phs, y=bin_fl, n_bin=int( len(bin_phs)/nos_bin_pc ), yerr=bin_fle)
_, bin_res_pc, bin_reserr_pc = juliet.utils.bin_data(x=bin_phs, y=bin_res*1e6, n_bin=int( len(bin_phs)/nos_bin_pc ), yerr=bin_fle)
else:
bin_phs_pc, bin_fl_pc, bin_fle_pc, _ = lcbin(time=bin_phs, flux=bin_fl, binwidth=1/nos_bin_pc)
_, bin_res_pc, bin_reserr_pc, _ = lcbin(time=bin_phs, flux=bin_res*1e6, binwidth=1/nos_bin_pc)
# Plotting them
ax3.errorbar(bin_phs_pc, bin_fl_pc, yerr=bin_fle_pc, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100)
ax4.errorbar(bin_phs_pc, bin_res_pc, yerr=bin_reserr_pc, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100)
ax4.set_ylim(np.nanmedian(bin_res_pc)-5*pipe_mad(bin_res_pc), np.nanmedian(bin_res_pc)+5*pipe_mad(bin_res_pc))
# -----------------------------------------------------------------------
#
# Now, we want to plot the planetary model
# if highres = False
#
# -----------------------------------------------------------------------
if not highres:
# First, collecting all models
all_models_for_plotting = all_models_for_plotting[idx_phs_bin]
all_l68_mods_plotting, all_u68_mods_plotting = all_l68_mods_plotting[idx_phs_bin], all_u68_mods_plotting[idx_phs_bin]
# ---------------------------------------------------
# For transit/occultation
# ---------------------------------------------------
## ----------- We can now plot the models (detrended and quantile models)
ax1.plot(bin_phs, all_models_for_plotting*ppt, color='navy', lw=2.5, zorder=50)
if quantile_models:
ax1.fill_between(bin_phs, y1=all_l68_mods_plotting*ppt, y2=all_u68_mods_plotting*ppt, color='orangered', alpha=0.5, zorder=25)
# ---------------------------------------------------
# For phase curve
# ---------------------------------------------------
if phasecurve:
## ----------- We can now plot the models (detrended and quantile models)
ax3.plot(bin_phs, all_models_for_plotting, color='navy', lw=2.5, zorder=50)
if quantile_models:
ax3.fill_between(bin_phs, y1=all_l68_mods_plotting, y2=all_u68_mods_plotting, color='orangered', alpha=0.5, zorder=25)
if one_plot:
# ---------------------------------------------------
# Figure labels etc.
# ---------------------------------------------------
# ------------------------
#. Transits
# ------------------------
## First, removing the x-axis labels of top panels
ax1.xaxis.set_major_formatter(plt.NullFormatter())
# x-and y-labels
if not phasecurve:
if not eclipse:
ax1.set_ylabel('Normalised flux')
else:
ax1.set_ylabel('Normalised flux [ppm]')
ax2.set_ylabel('Residuals [ppm]')
else:
ax1.set_ylabel('Normalised flux [ppt]')
ax2.set_ylabel('Residuals [ppt]')
ax2.set_xlabel('Orbital phase')
# ------------------------
#. Phase curves
# ------------------------
if phasecurve:
## Removing x-axis labels of top panels
ax3.xaxis.set_major_formatter(plt.NullFormatter())
## Now, we will switch the y-axis labels to right side for phase curve panels
ax3.yaxis.tick_right()
ax3.tick_params(labelright=True)
ax3.yaxis.set_label_position('right')
ax4.yaxis.tick_right()
ax4.tick_params(labelright=True)
ax4.yaxis.set_label_position('right')
## x-and y-labels
ax3.set_ylabel('Normalised flux [ppm]', rotation=270, labelpad=25)
ax4.set_xlabel('Orbital phase')
ax4.set_ylabel('Residuals [ppm]', rotation=270, labelpad=25)
## Depending on one_plot, we return the appropriate objects
if not one_plot:
return figs_all, axs1_all, axs2_all, axs3_all, axs4_all
else:
return fig, ax1, ax2, ax3, ax4
[docs]
def plot_fake_allan_deviation(self, instruments=None, binmax=10, method='pipe', timeunit=None):
"""Compute and plot per-instrument noise-vs-bin-size curves.
This is a thin wrapper around ``utils.fake_allan_deviation`` that
computes residuals for each instrument (data minus the full model)
and returns the figures, axes and numeric results for further use.
Parameters
----------
instruments : list or None
List of instrument names to include. If ``None``, all
instruments in the juliet dataset (``self.dataset.inames_lc``)
are processed.
binmax : int, optional
Passed to ``utils.fake_allan_deviation`` to control the maximum
number of bins (default ``10``).
method : {'pipe','std','rms','astropy'}, optional
Method used to estimate the scatter of the binned residuals:
- 'pipe' : use ``pipe_mad`` (median absolute differences estimator)
- 'std' : use ``np.nanstd``
- 'rms' : use the root-mean-square (``rms`` helper)
- 'astropy' : use ``astropy.stats.mad_std`` (robust MAD-based std)
timeunit : str or None, optional
If provided, forwarded to ``utils.fake_allan_deviation`` to
force the secondary x-axis time unit; otherwise the helper
chooses an appropriate unit automatically.
Returns
-------
figs_all : list
List of matplotlib Figure objects (one per instrument).
axs_all : list
List of matplotlib Axes objects corresponding to the figures.
binsize_all : list
List of binsize arrays returned for each instrument.
noise_all : list
List of noise arrays (ppm) computed for each instrument.
white_noise_all : list
List of white-noise expectation arrays (ppm) for each
instrument.
"""
# Let's first gather the names of all instruments (we will always do one plot per instrument)
if instruments != None:
if len(instruments) != len(self.dataset.inames_lc):
one_plot = False
else:
instruments = self.dataset.inames_lc
## All stuff we can save
figs_all, axs_all = [], []
binsize_all, noise_all, white_noise_all = [], [], []
for ins in range(len(instruments)):
# Getting the data
tim_ins = self.dataset.times_lc[instruments[ins]][self.idx_time_sort[instruments[ins]]]
fl_ins = self.dataset.data_lc[instruments[ins]][self.idx_time_sort[instruments[ins]]]
## Median model
full_model = self.models_all_ins[instruments[ins]][1][self.idx_time_sort[instruments[ins]]]
residuals = fl_ins - full_model
# And, now, let's plot the Allan deviation
fig, ax, binsize, noise, white_noise = fake_allan_deviation(times=tim_ins, residuals=residuals, binmax=binmax, method=method, timeunit=timeunit, plot=True)
figs_all.append(fig)
axs_all.append(ax)
binsize_all.append(binsize)
noise_all.append(noise)
white_noise_all.append(white_noise)
return figs_all, axs_all, binsize_all, noise_all, white_noise_all
[docs]
def plot_corner(self, planet_only=False, param_list=None, save=True):
"""Create a corner plot of selected posterior parameters.
Parameters
----------
planet_only : bool, optional
If ``True``, include only planetary parameters in the corner
plot. If ``False`` (default), include instrumental and noise
parameters as well when available.
param_list : list of str or None, optional
If provided, a list of parameter names to include in the corner
plot. If ``None`` (default), all parameters are included (subject
to the ``planet_only`` filter).
save : bool, optional
If ``True`` (default), save the generated figure to
``<input_folder>/corner_plot.png``.
Returns
-------
fig : matplotlib.figure.Figure
The figure produced by ``utils.corner_plot`` containing the
marginal and pairwise plots for the selected parameters.
"""
# Access the posteriors
post_samps = self.res.posteriors['posterior_samples']
# Arrays for posteriors and labels
posteriors, labels, titles = [], [], []
for i in post_samps.keys():
if param_list is not None and i not in param_list:
continue
par = i.split('_')
if ( 'p1' in par ) or ( 'p2' in par ) or ( 'p3' in par ) or ( 'p4' in par ) or ( 'q1' in par ) or ( 'q2' in par ) or ( 'rho' in par ):
## Adding posteriors and labels ( this parameters are added no matter planet_only is True/False)
if i[0:3] == 'P_p':
## Period parameter: the uncertainty is generally very small, so we can scale it to make it plottable
per_exponent_part = np.ceil( np.log10( np.nanstd( post_samps[i] ) ) )
per_median = np.nanmedian( post_samps[i] )
per = ( post_samps[i] - per_median ) / ( 10**per_exponent_part )
## Saving the posteriors
posteriors.append( per )
labels.append( i + ' [(P $-$ ' + str(np.around(per_median,5)) + ') / $10^{' + str(int(per_exponent_part)) + '}$ d]' )
titles.append( par[0] )
elif i[0:2] == 't0':
## Same for t0, we don't need to write out the whole thing
t02 = np.floor( np.nanmedian(post_samps[i]) )
posteriors.append( post_samps[i] - t02 )
labels.append( i + ' [t0 $-$ ' + str(int(t02)) + ' d]' )
titles.append( par[0] )
elif (i[0:3] == 'p_p') or (i[0:4] == 'p1_p') or (i[0:4] == 'p2_p'):
## For Rp/R*, if there are more than 1 instruments, we will label them as 'instrument_name et al.'
posteriors.append( post_samps[i] )
titles.append( par[0] )
if len(i.split('_')) > 3:
labels.append('_'.join(i.split('_')[0:3]) + '_et al.')
else:
labels.append(i)
elif (i[0:2] == 'q1') or (i[0:2] == 'q2'):
## Same for limb-darkening coefficients, if there are more than 1 instruments, we will label them as 'instrument_name et al.'
posteriors.append( post_samps[i] )
titles.append( par[0] )
if len(i.split('_')) > 2:
labels.append('_'.join(i.split('_')[0:2]) + '_et al.')
else:
labels.append(i)
elif ( i[0:2] == 'fp' ) or ( i[0:4] == 'C1_p' ) or ( i[0:2] == 'C2' ) or ( i[0:2] == 'D1' ) or ( i[0:2] == 'D2' ):
## For flux parameters, we will label them as 'instrument_name et al.'
posteriors.append( post_samps[i] * 1e6 )
titles.append( par[0] )
if len(i.split('_')) > 3:
labels.append('_'.join(i.split('_')[0:3]) + '_et al. [ppm]')
else:
labels.append(i + ' [ppm]')
## If there are 'sesinomega_p1' and 'secosomega_p1' parameters, we will convert them to 'e_p1' and 'omega_p1'
elif ( i[0:10] == 'secosomega' ):
pl_num = i.split('_')[-1]
sesinomega = post_samps['sesinomega_' + pl_num]
secosomega = post_samps['secosomega_' + pl_num]
e_p = sesinomega**2 + secosomega**2
omega_p = np.arctan2( sesinomega, secosomega ) * 180. / np.pi
## First, add sesinomega
posteriors.append( post_samps[i] )
labels.append( i )
titles.append( par[0] )
## And now, e and w
posteriors.append( e_p )
labels.append( 'e_' + pl_num )
titles.append( 'e' )
posteriors.append( omega_p )
labels.append( r'$\omega$_' + pl_num + ' [deg]' )
titles.append( r'$\omega$' )
else:
## Else
posteriors.append( post_samps[i] )
if len(i.split('_')) > 3:
labels.append('_'.join(i.split('_')[0:3]) + '_et al.')
else:
labels.append(i)
titles.append( par[0] )
else:
## Rest of the parameters are only added if planet_only is False
if not planet_only:
if ( i[0:7] != 'unnamed' ) and ( i[0:7] != 'loglike' ):
posteriors.append( post_samps[i] )
if len(i.split('_')) > 3:
labels.append('_'.join(i.split('_')[0:3]) + '_et al.')
else:
labels.append(i)
titles.append( par[0] )
# ------------------------------------------
# Now, we can make the corner plot
# ------------------------------------------
## Reducing fontsize specially for this plot
from matplotlib import rcParams
rcParams['font.size'] = 10.
rcParams['axes.labelsize'] = 'medium'
rcParams['xtick.labelsize'] = 'medium'
rcParams['ytick.labelsize'] = 'medium'
rcParams['axes.titlesize'] = 'large'
fig = corner_plot(samples=posteriors, labels=labels, titles=titles)
if save:
plt.savefig(self.input_folder + '/corner_plot.png', dpi=250)
return fig
[docs]
def plot_gp(self, instruments=None, highres=False, one_plot=False, figsize=(16/1.5, 9/1.5), pycheops_binning=False, xlabel='GP regressor', robust_errors=False, plot_errorbars=False, parameter_vector=None, nos_bin=20):
"""Plot the Gaussian Process component(s) from the fitted `juliet` model.
This routine computes and plots the GP model (and binned data)
for one or more instruments present in the current `juliet` dataset.
Parameters
----------
instruments : list or None
List of instrument names to plot. If ``None``, all instruments for
which GP regressors exist in ``self.dataset.GP_lc_arguments`` are
plotted.
highres : bool, optional
If ``True``, compute a high-resolution GP prediction (calls
``compute_gp_model``). Default is ``False``.
one_plot : bool, optional
If ``True``, combine all instruments into a single figure and
return a single ``(fig, axs)``. If ``False``, return lists of
figures and axes (one per instrument). Default ``False``.
figsize : tuple, optional
Figure size passed to ``matplotlib``. Default ``(16/1.5, 9/1.5)``.
pycheops_binning : bool, optional
Use ``pycheops`` produced binning when set to ``True``.
Default ``False``.
xlabel : str, optional
X-axis label for the GP regressor. Default ``'GP regressor'``.
robust_errors : bool, optional
If ``True``, compute GP uncertainties using
``compute_gp_model`` instead of using ``juliet`` precomputed errors.
plot_errorbars : bool, optional
If ``True``, plot error bars on the GP data points. Default ``False``.
parameter_vector : array-like or None, optional
If provided, passed to the GP parameter setter before computing
predictions (useful for evaluating GP at different parameter
values).
nos_bin : int, optional
Approximate number of bins to use when computing binned data per
instrument. Default ``20``.
Returns
-------
(fig, axs) or (fig_all, axs_all)
If ``one_plot`` is ``True``, returns a single tuple ``(fig, axs)``
for the combined figure. Otherwise returns ``(fig_all, axs_all)``
where each is a list containing one Figure/Axis per instrument.
"""
# -------------------------------------------
# List of all instruments
# -------------------------------------------
## If instruments=None, we will generate one plot per instruments for all instruments for which GP fitting is done
if instruments != None:
## Let's check if the provided instruments indeed have GP fitting
for ins in range(len(instruments)):
if instruments[ins] not in self.dataset.GP_lc_arguments.keys():
raise ValueError('GP fitting is not found for instrument ' + instruments[ins])
else:
instruments = [i for i in self.dataset.GP_lc_arguments.keys()]
# -------------------------------------------
# Computing detrended data and model
# -------------------------------------------
self.detrend_data(phmin=0.8, instruments=instruments)
self.detrend_model(instruments=instruments, phmin=0.8, highres=False)
# -------------------------------------------
# Let's now compute the residuals
# needed to plot the GP model
# The total juliet model is:
# y = T*F1 + F2 + GP + LM
# => GP = y - T*F1 - F2 - LM
# -------------------------------------------
gp_model_regressor, self.gp_data = {}, {}
gp_med_model, gp_up68, gp_lo68 = {}, {}, {}
for i in range(len(instruments)):
# We note here that every array should be sorted according to GP regressors (e.g., time or roll angle)
# We will keep it that way -- everything sorted according GP regressors
## -- We also need to compute linear model, if there is any
if instruments[i] in self.dataset.lm_lc_arguments.keys():
linear_model = self.models_all_ins[instruments[i]][-1]['lm']
else:
linear_model = 0.
# -------------------------------------------
# GP data: i.e., data fitted to GP model
# -------------------------------------------
self.gp_data[instruments[i]] = self.dataset.data_lc[instruments[i]] - ( self.F1[instruments[i]] * self.planet_only_models[instruments[i]][1] ) -\
self.F2[instruments[i]] - linear_model
# -------------------------------------------
# And GP model -- we need to compute them
# -------------------------------------------
if not highres:
gp_model_regressor[instruments[i]] = np.copy( self.dataset.GP_lc_arguments[instruments[i]][:, 0] )
gp_med_model[instruments[i]] = self.all_mods_ins[instruments[i]]['GP']
if not robust_errors:
gp_up68[instruments[i]], gp_lo68[instruments[i]] = self.all_mods_ins[instruments[i]]['GP_uerror'], self.all_mods_ins[instruments[i]]['GP_lerror']
else:
## In case of robust errors, we need to compute errors from ``compute_gp_model`` function
gp_quantiles = self.compute_gp_model(instrument=instruments[i], highres=False, parameter_vector=parameter_vector)
gp_up68[instruments[i]], gp_lo68[instruments[i]] = gp_quantiles[1,:], gp_quantiles[2,:]
else:
# High-resolution GP regressor per instrument
predict_time = np.linspace( np.min(self.dataset.GP_lc_arguments[instruments[i]][:, 0]),\
np.max(self.dataset.GP_lc_arguments[instruments[i]][:, 0]), 10000 )
# ------ For highres models, we always need to compute models from ``compute_gp_model`` function
gp_quantiles = self.compute_gp_model(instrument=instruments[i], highres=True, pred_time=predict_time, parameter_vector=parameter_vector)
gp_model_regressor[instruments[i]] = predict_time
gp_med_model[instruments[i]] = gp_quantiles[0,:]
gp_up68[instruments[i]], gp_lo68[instruments[i]] = gp_quantiles[1,:], gp_quantiles[2,:]
# Now, I think, we have everything -- so, let's plot!
# -------------------------------------------
# Starting figure code:
# one plot: do plt.subplots() here
# -------------------------------------------
if one_plot:
fig, axs = plt.subplots(figsize=figsize)
# If there is one plot, then we also need to collect all data to compute binned data
bin_gp_reg, bin_resids, bin_errors = np.array([]), np.array([]), np.array([])
# Also, in case we don't have highres model, we want to save all models before plotting
all_regressors_for_plotting, all_models_for_plotting = np.array([]), np.array([])
all_u68_mods_plotting, all_l68_mods_plotting = np.array([]), np.array([])
else:
# We need a list to save all fig and axs objects if we are planning more than one plot
fig_all, axs_all = [], []
for i in range(len(instruments)):
# -------------------------------------------
# If not one plot, we need to create
# fig and axs object inside for loop
# -------------------------------------------
if not one_plot:
fig, axs = plt.subplots(figsize=figsize)
# Plotting the "raw" data
errorbars = self.detrended_errs[instruments[i]]*1e6 if plot_errorbars else None
alp = 0.3 if plot_errorbars else 0.8
axs.errorbar(self.dataset.GP_lc_arguments[instruments[i]][:,0], self.gp_data[instruments[i]]*1e6, yerr=errorbars, fmt='.', elinewidth=0.3, markersize=1.5, alpha=alp, color='dodgerblue', zorder=1)
# ------------------------------------------
# Now, we can plot the bin data and
# quantile models here, if there is
# no one plot, else, just save everything
# to one big array, to plot outside
# the for loop. We can also do
# the label thing here.
# ------------------------------------------
if not one_plot:
# First bin-data
if not pycheops_binning:
nbin = int( len( self.gp_data[instruments[i]] ) / nos_bin )
bin_tim, bin_fl, bin_fle = juliet.utils.bin_data(x=self.dataset.GP_lc_arguments[instruments[i]][:,0], y=self.gp_data[instruments[i]]*1e6, yerr=self.detrended_errs[instruments[i]]*1e6, n_bin=nbin)
else:
binwid = np.ptp( self.dataset.GP_lc_arguments[instruments[i]][:,0] ) / nos_bin
bin_tim, bin_fl, bin_fle, _ = lcbin(time=self.dataset.GP_lc_arguments[instruments[i]][:,0], flux=self.gp_data[instruments[i]]*1e6, binwidth=binwid)
axs.errorbar(bin_tim, bin_fl, yerr=bin_fle, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100)
# And now models
## If we don't need highres model, we can simply plot what we have
## Else, we need to create highres model first
axs.plot(gp_model_regressor[instruments[i]], gp_med_model[instruments[i]]*1e6, color='navy', lw=1.5, zorder=50)
# If we are making one plot per instrument (i.e., one_plot=False), we can simply plot quantile models right here
axs.fill_between(gp_model_regressor[instruments[i]], y1=gp_lo68[instruments[i]]*1e6, y2=gp_up68[instruments[i]]*1e6, color='orangered', alpha=0.5, zorder=25)
# Labels, limits
axs.set_xlabel(xlabel)
axs.set_ylabel('Relative flux [ppm]')
axs.set_xlim([ np.min(self.dataset.GP_lc_arguments[instruments[i]][:,0]), np.max(self.dataset.GP_lc_arguments[instruments[i]][:,0]) ])
axs.set_ylim([ np.nanmedian(bin_fl)-10*pipe_mad(bin_fl), np.nanmedian(bin_fl)+10*pipe_mad(bin_fl) ])
# Saving fig, axs to the list
fig_all.append(fig)
axs_all.append(axs)
else:
# Saving data to the big list
bin_gp_reg, bin_resids = np.hstack( (bin_gp_reg, self.dataset.GP_lc_arguments[instruments[i]][:,0]) ), np.hstack( (bin_resids, self.gp_data[instruments[i]]*1e6) )
bin_errors = np.hstack( (bin_errors, self.detrended_errs[instruments[i]]*1e6) )
# Saving models to the list
all_regressors_for_plotting = np.hstack( (all_regressors_for_plotting, gp_model_regressor[instruments[i]]) )
all_models_for_plotting = np.hstack( (all_models_for_plotting, gp_med_model[instruments[i]]*1e6) )
## Let's first save the quantile models
all_l68_mods_plotting = np.hstack( (all_l68_mods_plotting, gp_lo68[instruments[i]]*1e6) )
all_u68_mods_plotting = np.hstack( (all_u68_mods_plotting, gp_up68[instruments[i]]*1e6) )
# ------------------------------------------
# If one plot, we can now plot the
# bin data, quantile models, labels
# etc. now, outside of the for loop
# ------------------------------------------
if one_plot:
# First, bin data
if not pycheops_binning:
nbin = int( len( bin_gp_reg ) / nos_bin / len(instruments) )
bin_tim, bin_fl, bin_fle = juliet.utils.bin_data(x=bin_gp_reg, y=bin_resids, yerr=bin_errors, n_bin=nbin)
else:
binwid = np.ptp( bin_gp_reg ) / nos_bin / len(instruments)
bin_tim, bin_fl, bin_fle, _ = lcbin(time=bin_gp_reg, flux=bin_resids, binwidth=binwid)
axs.errorbar(bin_tim, bin_fl, yerr=bin_fle, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100)
if highres:
# ------------------------------------------
# One plot _and_ highres requires a
# special treatment
# ------------------------------------------
## -- Okay, we need one plot, in high resolution, i.e., we need to call a function to do this.
idx_gp_reg_sort = np.argsort(bin_gp_reg)
pred_time = np.linspace(np.min(bin_gp_reg[idx_gp_reg_sort]), np.max(bin_gp_reg[idx_gp_reg_sort]), 10000)
gp_quantiles = self.compute_gp_model(instrument=None, highres=True, times=bin_gp_reg[idx_gp_reg_sort], pred_time=pred_time,\
resids=bin_resids[idx_gp_reg_sort]/1e6, errors=bin_errors[idx_gp_reg_sort]/1e6,\
parameter_vector=parameter_vector)
axs.plot(pred_time, gp_quantiles[0,:]*1e6, color='navy', lw=1.5, zorder=50)
axs.fill_between(pred_time, y1=gp_quantiles[2,:]*1e6, y2=gp_quantiles[1,:]*1e6, color='orangered', alpha=0.5, zorder=25)
else:
## First, sorting according to regressors, and then plotting the full model
idx_reg_sort = np.argsort(all_regressors_for_plotting)
axs.plot(all_regressors_for_plotting[idx_reg_sort], all_models_for_plotting[idx_reg_sort], color='navy', lw=1.5, zorder=50)
# We can plot qunatile models here, if highres=False
axs.fill_between(all_regressors_for_plotting[idx_reg_sort], y1=all_l68_mods_plotting[idx_reg_sort], y2=all_u68_mods_plotting[idx_reg_sort], color='orangered', alpha=0.5, zorder=25)
# Labels, limits
axs.set_xlabel(xlabel)
axs.set_ylabel('Relative flux [ppm]')
axs.set_xlim([ np.min(bin_gp_reg), np.max(bin_gp_reg) ])
axs.set_ylim([ np.nanmedian(bin_fl)-10*pipe_mad(bin_fl), np.nanmedian(bin_fl)+10*pipe_mad(bin_fl) ])
if one_plot:
return fig, axs
else:
return fig_all, axs_all
[docs]
def compute_gp_model(self, instrument=None, highres=None, times=None, pred_time=None, resids=None, errors=None, parameter_vector=None):
"""This function computes the GP models. It accesses the GP object from the juliet files, and then do GP.compute and GP.predict."""
# If we compute GP model this way, then the errors will always be robust because we will be using gp.predict
# If time is None: that means that we need model for an instrument (not one model for all instruments)
#
# times is not None, means that we need one model for more than one instruments
# This model is always highres
# This also assumes that the GP model is the same for all instruments
if times is None:
# When time is None, means that we haven't provided any data, so we need to extract it from self.
## Extracting times (GP regressors) from self.dataset object
times = self.dataset.GP_lc_arguments[instrument][:,0]
## Similarly, extracting GP data and errors from self
resids, errors = self.gp_data[instrument], self.detrended_errs[instrument]
if highres:
# Model for one instrument, but still highres model
pred_time = np.linspace(np.min(times), np.max(times), 10000)
else:
# Not highres model
pred_time = np.copy(times)
else:
# We don't need any data in this case, since everything is provided,
# but we still need a name of an instrument to access GP object
instrument = [i for i in self.dataset.GP_lc_arguments.keys()][0]
if parameter_vector is not None:
self.dataset.lc_options[instrument]['noise_model'].GP.set_parameter_vector(parameter_vector)
# GP.compute
self.dataset.lc_options[instrument]['noise_model'].GP.compute(t=times, yerr=errors)
# And GP prediction
gp_mean, gp_var = self.dataset.lc_options[instrument]['noise_model'].GP.predict(y=resids, t=pred_time, return_var=True)
gp_std = np.sqrt(gp_var)
gp_quantiles = np.zeros( (3, len(gp_mean)) )
gp_quantiles[0, :] = gp_mean
gp_quantiles[1, :], gp_quantiles[2, :] = gp_mean + gp_std, gp_mean - gp_std
return gp_quantiles
[docs]
class ApPhoto(object):
"""Helper for aperture photometry on image time-series.
This class encapsulates common operations required for aperture
photometry on a data cube of frames (shape ``(N_frames, Ny, Nx)``),
including centroid estimation, aperture and sky-mask construction,
simple aperture flux extraction, and pixel-level decorrelation
(PLD) basis construction.
Parameters
----------
times : array_like
1D array of time stamps corresponding to each frame.
frames : ndarray
3D data cube with shape ``(N_frames, Ny, Nx)`` containing image
pixel values for each frame.
errors : ndarray
Per-pixel uncertainties with the same shape as ``frames``.
badpix : ndarray
2D bad-pixel mask for a single frame (shape ``(Ny, Nx)``).
A value of zero indicates a bad/ignored pixel; non-zero
indicates a usable pixel.
aprad : float or None, optional
Aperture radius in pixels for circular aperture photometry.
Used by :meth:`aperture_mask` and :meth:`simple_aperture_photometry`.
Ignored when ``brightpix`` is True. Default is ``None``.
sky_rad1 : float or None, optional
Inner radius (pixels) of the sky annulus. Used by :meth:`sky_mask`
and :meth:`simple_aperture_photometry` for background estimation.
If omitted and ``brightpix`` is False, an empty (zero) mask is returned.
Default is ``None``.
sky_rad2 : float or None, optional
Outer radius (pixels) of the sky annulus. Used by :meth:`sky_mask`
and :meth:`simple_aperture_photometry` for background estimation.
Default is ``None``.
brightpix : bool, optional
If ``True``, select ``nos_brightest`` brightest pixels to form an aperture
instead of circular aperture. Used by :meth:`aperture_mask` and :meth:`sky_mask`.
Default ``False``.
nos_brightest : int, optional
Number of brightest pixels to include in aperture when ``brightpix=True``.
Default ``12``.
nos_faintest : int, optional
Number of faintest pixels to include in background aperture when it is not None and ``brightpix=True``.
When None, all pixels that are not in aperture are included. Default is ``None``.
minmax : dict or None, optional
Optional spatial bounds for brightest pixel selection with keys
'rmin', 'rmax', 'cmin', 'cmax'. Default is ``None``.
Attributes
----------
times, frames, errors, badpix : as above
aprad, sky_rad1, sky_rad2, brightpix, nos_brightest, minmax : as above
cen_r, cen_c : ndarray
Per-frame row/column centroid positions (populated by
:meth:`find_center`).
ap_bool_mask_pld : ndarray
2D aperture boolean mask used by PLD routines.
Psum, Phat, V, eigenvalues, PCA : ndarray
Intermediate PLD quantities (pixel-sum, normalized fractions,
PCA eigenvectors/values and projected time-series) populated by
:meth:`pixel_level_decorrelation`.
Main methods
------------
identify_crays(clip, niters)
Flag cosmic-ray affected pixels and update ``badpix``.
replace_nan(max_iter)
Iteratively fill NaNs in ``frames`` from neighboring pixels.
find_center(rmin,rmax,cmin,cmax)
Compute center-of-flux centroids per frame.
simple_aperture_photometry(method, plot, ...)
Extract aperture photometry using manual, photutils, or median methods.
pixel_level_decorrelation(removeNan)
Build normalized pixel level light curves and compute PCA basis for PLD.
Notes
-----
The class stores both raw inputs and intermediate results used by
subsequent calls; many methods modify the instance in-place and
also return results for convenience.
"""
def __init__(self, times, frames, errors, badpix, aprad=None, sky_rad1=None, sky_rad2=None, brightpix=False, nos_brightest=12, nos_faintest=None, minmax=None):
# The data
self.times = times
self.frames = frames
self.errors = errors
self.badpix = badpix
# The aperture information
self.aprad = aprad
self.sky_rad1, self.sky_rad2 = sky_rad1, sky_rad2
self.brightpix = brightpix
self.nos_brightest = nos_brightest
self.nos_faintest = nos_faintest
self.minmax = minmax
if ( self.aprad == None ) and ( not self.brightpix ):
raise Exception('You need to define an aperture by providing either aperture radius (aprad)\nOr setting brightpix=True, which will form the aperture from bightest pixels.')
[docs]
def identify_crays(self, clip=5, niters=5):
"""Identify cosmic-ray affected pixels and update the bad-pixel map
by comparing each data frame with the median frame.
Parameters
----------
clip : float, optional
Sigma threshold used to flag cosmic-ray candidates (default
``5``).
niters : int, optional
Number of iterative passes to update ``self.badpix`` (default
``5``).
Returns
-------
badpix : ndarray
Updated 2D bad-pixel mask (shape ``(Ny, Nx)``) where flagged
pixels are set to zero and good pixels are one.
"""
# Masking bad pixels as NaN
for _ in range(niters):
# Flagging bad data as Nan
frame_new = np.copy(self.frames)
frame_new[self.badpix == 0.] = np.nan
# Median frame
with warnings.catch_warnings():
warnings.simplefilter('ignore', RuntimeWarning)
median_frame = np.nanmedian(frame_new, axis=0) # 2D frame
# Creating residuals
resids = frame_new - median_frame[None,:,:]
# Median and std of residuals (both are 2D frames)
med_resid, std_resid = np.nanmedian(resids, axis=0), np.nanstd(resids, axis=0)
limit = med_resid + (clip*std_resid)
mask_cr1 = np.abs(resids) < limit[None,:,:]
self.badpix = mask_cr1*self.badpix
return self.badpix
[docs]
def replace_nan(self, max_iter = 50):
"""Fill NaN entries in the data array by the mean of neighbouring pixels.
On each iteration the array is rolled along each axis to build a
collection of nearest-neighbour shifts; the mean of these shifted
arrays is used to replace NaNs. Iteration stops when there are no
remaining NaNs or when ``max_iter`` is reached.
Parameters
----------
max_iter : int, optional
Maximum number of fill iterations to perform (default ``50``).
Returns
-------
frames : ndarray
The data array with NaNs replaced (also modified
in-place and returned).
"""
shape = np.append([2*self.frames.ndim], self.frames.shape)
interp_cube = np.zeros(shape)
axis = tuple(range(self.frames.ndim))
shift0 = np.zeros(self.frames.ndim, int)
shift0[0] = 1
shift = [] # Shift list will be [(-1, 0), (1, 0), (0, -1), (0, 1)]
for n in range(self.frames.ndim):
shift.append(tuple(np.roll(-shift0, n)))
shift.append(tuple(np.roll(shift0, n)))
for _ in range(max_iter):
for n in range(2*self.frames.ndim):
interp_cube[n] = np.roll(self.frames, shift[n], axis = axis) # interp_cube would be (4, data.shape[0], data.shape[1]) sized array
with warnings.catch_warnings(): # with shifted position in each element (so that we can take its mean)
warnings.simplefilter('ignore', RuntimeWarning)
mean_data = np.nanmean(interp_cube, axis=0)
self.frames[np.isnan(self.frames)] = mean_data[np.isnan(self.frames)]
if np.sum(np.isnan(self.frames)) == 0:
break
return self.frames
[docs]
def find_center(self, rmin=None, rmax=None, cmin=None, cmax=None, plot=False):
"""Compute center-of-flux centroids for each frame in the 3D data cube.
The centroid for each frame is computed as the center of flux
(i.e. sum(x*I)/sum(I)) over either the full image or an optional
subimage defined by the row/column limits ``rmin,rmax,cmin,cmax``.
Results are stored on the instance as ``self.cen_r`` and
``self.cen_c`` and also returned.
Parameters
----------
rmin, rmax, cmin, cmax : int or None, optional
Optional integer bounds (rows and columns) defining a subimage
to use for the centroid calculation. If omitted, the full
frame is used.
plot : bool, optional
If ``True``, produce a plot of centroids as a function of
time (default ``False``).
Returns
-------
cen_r, cen_c : ndarray
1D arrays (length ``N_frames``) containing the row and column
centroid positions (in pixel coordinates) for each frame.
fig : matplotlib.figure.Figure or None
Figure of centroids vs time, if ``plot=True``, otherwise ``None``.
axs : ndarray or None
Axes of the figure if produced, otherwise ``None``.
"""
self.cen_r, self.cen_c = np.zeros(self.frames.shape[0]), np.zeros(self.frames.shape[0])
for i in range(len(self.cen_r)):
# Row is the first index, column is the second index
# First find the subimage if min & max row, cols are provided
if (rmin != None)&(rmax != None)&(cmin == None)&(cmax == None):
subimg = self.frames[i, rmin:rmax, :]
elif (rmin == None)&(rmax == None)&(cmin != None)&(cmax != None):
subimg = self.frames[i, :, cmin:cmax]
elif (rmin != None)&(rmax != None)&(cmin != None)&(cmax != None):
subimg = self.frames[i, rmin:rmax, cmin:cmax]
else:
subimg = np.copy(self.frames[i,:,:])
# Row and column indices
row_idx, col_idx = np.arange(subimg.shape[0]), np.arange(subimg.shape[1])
# And now the center of *subimage*
cen_r_sub = np.nansum(row_idx * np.nansum(subimg, axis=1)) / np.nansum(subimg.flatten())
cen_c_sub = np.nansum(col_idx * np.nansum(subimg, axis=0)) / np.nansum(subimg.flatten())
# This was the center of subimage, let's transform that to image coordinates
if (rmin != None)&(cmin == None):
self.cen_r[i], self.cen_c[i] = cen_r_sub + rmin, cen_c_sub
elif (rmin == None)&(cmin != None):
self.cen_r[i], self.cen_c[i] = cen_r_sub, cen_c_sub + cmin
elif (rmin != None)&(cmin != None):
self.cen_r[i], self.cen_c[i] = cen_r_sub + rmin, cen_c_sub + cmin
else:
self.cen_r[i], self.cen_c[i] = cen_r_sub, cen_c_sub
if plot:
fig, axs = plt.subplots(ncols=1, nrows=2, figsize=(15,7), sharex=True)
axs[0].plot(self.times - self.times[0], self.cen_r, 'k-', lw=1.)
axs[0].set_ylabel('Row')
axs[1].plot(self.times - self.times[0], self.cen_c, 'k-', lw=1.)
axs[1].set_xlabel('Time (BJD)')
axs[1].set_ylabel('Column')
axs[1].set_xlim([ 0., np.max( self.times - self.times[0] ) ])
else:
fig, axs = None, None
return self.cen_r, self.cen_c, fig, axs
[docs]
def aperture_mask(self, plot=False):
"""Build a binary aperture mask (and masked frame/error arrays).
By default a circular aperture is created on the median image using
the median centroid position computed from ``self.cen_r`` and
``self.cen_c``. Alternatively, when ``self.brightpix=True``, the mask is
formed by selecting the ``self.nos_brightest`` brightest pixels from the
median image (optionally limited by ``self.minmax`` bounds).
Parameters
----------
plot : bool, optional
If ``True``, produce a plot of median image with aperture
plotted on the top (default ``False``).
Returns
-------
aper_fl_mask : ndarray
``self.frames`` multiplied by the 2D aperture boolean mask
(shape ``(N_frames, Ny, Nx)``).
aper_err_mask : ndarray
``self.errors`` multiplied by the 2D aperture boolean mask.
ap_bool_msk : ndarray
2D binary (0/1) mask describing the aperture on a single frame
(shape ``(Ny, Nx)``).
fig : matplotlib.figure.Figure or None
Figure of median image and aperture mask, if ``plot=True``, otherwise ``None``.
im : matplotlib.image.AxesImage or None
Image object from the median image plot if ``plot=True``, otherwise ``None``.
axs : ndarray or None
Axes of the figure if produced, otherwise ``None``.
Notes
-----
Uses instance attributes ``self.aprad``, ``self.brightpix``,
``self.nos_brightest``, and ``self.minmax`` to control aperture
construction.
"""
# Median image
med_img = np.nanmedian(self.frames, axis=0)
# Now, creating mask
ap_bool_msk = np.zeros(med_img.shape)
if not self.brightpix:
# Let's first find a distance array which will contain the distance of each pixels from the center
idx_arr_r, idx_arr_c = np.meshgrid(np.arange(med_img.shape[0]), np.arange(med_img.shape[1]))
idx_arr_r, idx_arr_c = np.transpose(idx_arr_r), np.transpose(idx_arr_c)
# Both of above array would be of the dimension of the image array, and we can get row and column index by doing,
# idx_arr_r[row, col] = row index and idx_arr_c[row, col] = col index
# Distance array would give distace of each pixel from the center
distance = np.sqrt(((idx_arr_r - np.nanmedian(self.cen_r))**2) + ((idx_arr_c - np.nanmedian(self.cen_c))**2))
ap_bool_msk[distance < self.aprad] = 1.
else:
# Sorting all pixels values; so that we can select first N bright pixels in the aperture
med_img_flat = med_img.flatten()
med_img_sorted = np.sort(med_img_flat)
# minmax != None
if self.minmax is not None:
rmin, rmax = self.minmax['rmin'], self.minmax['rmax']
cmin, cmax = self.minmax['cmin'], self.minmax['cmax']
else:
rmin, rmax = 0, 10000
cmin, cmax = 0, 10000
tot_num = True
ap_pix_nos, i = 0, 0
while tot_num:
if np.isnan(med_img_sorted[-1-i]):
i = i + 1
continue
else:
ind = np.where(med_img == med_img_sorted[-1-i])
# To prevent aperture being far from the center
if (ind[0][0] > rmax) or (ind[0][0] < rmin) or (ind[1][0] < cmin) or (ind[1][0] > cmax):
i = i + 1
continue
else:
ap_bool_msk[ind[0][0], ind[1][0]] = 1.
ap_pix_nos = ap_pix_nos + 1
if ap_pix_nos>self.nos_brightest:
tot_num = False
i = i + 1
# And, aperture mask
aper_fl_mask, aper_err_mask = self.frames * ap_bool_msk[None, :, :], self.errors * ap_bool_msk[None, :, :]
if plot:
# Figure code
fig, axs = plt.subplots(1, 2, figsize=(10,5), sharex=True, sharey=True)
im = axs[0].imshow(med_img, interpolation='none')
axs[0].imshow(ap_bool_msk, alpha=0.5)
axs[1].imshow(ap_bool_msk, interpolation='none')
axs[0].set_title('Median image + Aperture mask')
axs[1].set_title('Aperture mask')
else:
fig, im, axs = None, None, None
return aper_fl_mask, aper_err_mask, ap_bool_msk, fig, im, axs
[docs]
def sky_mask(self, plot=False):
"""Create a sky-annulus mask (and masked frame/error arrays).
When ``self.brightpix`` is False this builds a circular annulus defined by
``self.sky_rad1`` (inner radius) and ``self.sky_rad2`` (outer radius) around
the median centroid position. When ``self.brightpix`` is True the function instead
selects the ``self.nos_brightest`` brightest pixels (optionally limited
by ``self.minmax``) to define the aperture, and rest of the pixels as background.
Parameters
----------
plot : bool, optional
If ``True``, produce a plot of median image with sky mask
plotted on the top (default ``False``).
Returns
-------
sky_fl_mask : ndarray
``self.frames`` multiplied by the 2D sky boolean mask
(shape ``(N_frames, Ny, Nx)``).
sky_err_mask : ndarray
``self.errors`` multiplied by the 2D sky boolean mask.
sky_bool_msk : ndarray
2D binary (0/1) mask describing the sky annulus on a single
frame (shape ``(Ny, Nx)``).
fig : matplotlib.figure.Figure or None
Figure of median image and sky mask, if ``plot=True``, otherwise ``None``.
im : matplotlib.image.AxesImage or None
Image object from the median image plot if ``plot=True``, otherwise ``None``.
axs : ndarray or None
Axes of the figure if produced, otherwise ``None``.
Notes
-----
Uses instance attributes ``self.sky_rad1``, ``self.sky_rad2``,
``self.brightpix``, ``self.nos_brightest``, and ``self.minmax`` to
control sky mask construction.
"""
# Median image
med_img = np.nanmedian(self.frames, axis=0)
# Now, creating mask
sky_bool_msk = np.zeros(med_img.shape)
if ( self.sky_rad1 == None ) and ( self.sky_rad2 == None ) and ( not self.brightpix ):
pass
else:
if ( not self.brightpix ) or ( ( self.sky_rad1 != None ) and ( self.sky_rad2 != None ) ):
# Let's first find a distance array which will contain the distance of each pixels from the center
idx_arr_r, idx_arr_c = np.meshgrid(np.arange(med_img.shape[0]), np.arange(med_img.shape[1]))
idx_arr_r, idx_arr_c = np.transpose(idx_arr_r), np.transpose(idx_arr_c)
# Both of above array would be of the dimension of the image array, i.e, we can get the row and column index by doing.
# idx_arr_r[row, col] = row index and idx_arr_c[row, col] = col index
# Distance array would give distace of each pixel from the center
distance = np.sqrt(((idx_arr_r - np.nanmedian(self.cen_r))**2) + ((idx_arr_c - np.nanmedian(self.cen_c))**2))
sky_bool_msk[(distance > self.sky_rad1)&(distance < self.sky_rad2)] = 1.
else:
# Sorting all pixels values; so that we can select first N bright pixels in the aperture
med_img_flat = med_img.flatten()
med_img_sorted = np.sort(med_img_flat)
# minmax != None
if self.minmax is not None:
rmin, rmax = self.minmax['rmin'], self.minmax['rmax']
cmin, cmax = self.minmax['cmin'], self.minmax['cmax']
else:
rmin, rmax = 0, 10000
cmin, cmax = 0, 10000
if self.nos_faintest == None:
sky_bool_msk = np.ones(med_img.shape)
tot_num = True
ap_pix_nos, i = 0, 0
while tot_num:
if np.isnan(med_img_sorted[-1-i]):
i = i + 1
continue
else:
ind = np.where(med_img == med_img_sorted[-1-i])
# To prevent aperture being far from the center
if (ind[0][0] > rmax) or (ind[0][0] < rmin) or (ind[1][0] < cmin) or (ind[1][0] > cmax):
i = i + 1
continue
else:
sky_bool_msk[ind[0][0], ind[1][0]] = 0.
ap_pix_nos = ap_pix_nos + 1
if ap_pix_nos>self.nos_brightest:
tot_num = False
i = i + 1
else:
sky_bool_msk = np.zeros(med_img.shape)
tot_num = True
ap_pix_nos, i = 0, 0
while tot_num:
if np.isnan(med_img_sorted[i]):
i = i + 1
continue
else:
ind = np.where(med_img == med_img_sorted[i])
# To prevent aperture being far from the center
if (ind[0][0] < rmax) and (ind[0][0] > rmin) and (ind[1][0] > cmin) and (ind[1][0] < cmax):
i = i + 1
continue
else:
sky_bool_msk[ind[0][0], ind[1][0]] = 1.
ap_pix_nos = ap_pix_nos + 1
if ap_pix_nos>self.nos_faintest:
tot_num = False
i = i + 1
# And, sky mask
sky_fl_mask, sky_err_mask = self.frames * sky_bool_msk[None, :, :], self.errors * sky_bool_msk[None, :, :]
if plot:
# Figure code
fig, axs = plt.subplots(1, 2, figsize=(10,5), sharex=True, sharey=True)
im = axs[0].imshow(med_img, interpolation='none')
axs[0].imshow(sky_bool_msk, alpha=0.5)
axs[1].imshow(sky_bool_msk, interpolation='none')
axs[0].set_title('Median image + Bkg mask')
axs[1].set_title('Bkg mask')
else:
fig, im, axs = None, None, None
return sky_fl_mask, sky_err_mask, sky_bool_msk, fig, im, axs
[docs]
def simple_aperture_photometry(self, method='manual', plot=False, bkg_corr='mean', robust_err=False, **kwargs):
"""Compute aperture photometry for each frame in the data cube.
The function supports three methods:
- ``manual``: sums pixels inside a binary aperture mask and subtracts
the estimated sky background per pixel computed from a sky annulus.
- ``photutils``: uses ``photutils`` aperture classes to compute
aperture sums and background statistics per frame (requires
``photutils`` to be installed). Additional keyword arguments are
forwarded to the photutils helpers.
- ``median``: returns the per-frame median of the aperture pixels
(useful as a robust estimator or for quick diagnostics).
Parameters
----------
method : {'manual','photutils','median'}, optional
Photometry method to use. Default is ``'manual'``.
plot : bool, optional
If ``True``, produce a plot of aperture photometry, flux vs time (default ``False``).
bkg_corr : str, optional
The method to compute sky background flux, can be either mean or median. Default is ``mean``.
robust_err : bool, optional
Computes robust errors on aperture photometry by including scatter in sky background flux.
Default is ``False``.
**kwargs : dict
Additional keyword arguments forwarded to ``photutils``
routines when ``method='photutils'`` (for example, custom
aperture or statistic options).
Returns
-------
ap_flux : ndarray
1D array (length N_frames) with the background-subtracted
aperture flux for each frame (same units as ``frames``).
ap_flux_err : ndarray
1D array (length N_frames) with the estimated uncertainty on
the aperture flux for each frame.
tot_sky_bkg : ndarray
1D array (length N_frames) with the total sky background
contribution (summed over aperture pixels) subtracted from
each frame. For methods that do not estimate a sky background
this will be zeros.
fig : matplotlib.figure.Figure or None
Figure of aperture photometry vs time, if ``plot=True``, otherwise ``None``.
axs : ndarray or None
Axes of the figure if produced, otherwise ``None``.
Notes
-----
- The function expects centroids to be available as
``self.cen_r`` and ``self.cen_c`` (one per frame) before being
called.
- Uses instance attributes ``self.aprad``, ``self.sky_rad1``,
``self.sky_rad2``, ``self.brightpix``, ``self.nos_brightest``,
and ``self.minmax`` to control aperture and background extraction.
- When using ``photutils``, this routine constructs
``CircularAperture``/``CircularAnnulus`` objects per frame and
uses ``ApertureStats`` and ``aperture_photometry``; errors are
estimated combining aperture sum errors and annulus statistics
following standard propagation.
"""
if method == 'manual':
# Let's first obtain sky background flux per pixel (If sky radii are not None)
sky_fl_mask, sky_err_mask, sky_bool_mask, _, _, _ = self.sky_mask()
# Now, the aperture flux
aper_fl_mask, aper_err_mask, ap_bool_mask, _, _, _ = self.aperture_mask()
## Expression of Sky flux per pixel and sky error per pixel
if np.nansum(sky_bool_mask) != 0:
# --------------------- First, calculate the mean sky flux per pix
if bkg_corr == 'mean':
sky_flx_per_pix = np.nansum(sky_fl_mask, axis=(1,2)) / np.nansum(sky_bool_mask)
elif bkg_corr == 'median':
# Replacing 0s with Nan, so that they don't interfere with median computation
sky_fl_mask12 = np.copy( sky_fl_mask )
sky_fl_mask12[ sky_fl_mask12 == 0. ] = np.nan
sky_flx_per_pix = np.nanmedian( sky_fl_mask12, axis=(1,2) )
else:
raise Exception('The background can be computed either using mean or median.')
# --------------------- And, now, errors on the mean sky flux per pix
sky_flx_err_per_pix = np.sqrt( np.nansum( sky_err_mask**2, axis=(1,2) ) ) / np.nansum(sky_bool_mask)
if robust_err:
# Replacing 0s with Nan, so that they don't interfere with std computation
sky_fl_mask12 = np.copy( sky_fl_mask )
sky_fl_mask12[ sky_fl_mask12 == 0. ] = np.nan
sky_flx_err_per_pix = np.sqrt( sky_flx_err_per_pix**2 + ( np.nanstd( sky_fl_mask12, axis=(1,2) ) )**2 )
else:
sky_flx_per_pix, sky_flx_err_per_pix = 0., 0.
tot_sky_bkg = np.nansum(ap_bool_mask)*sky_flx_per_pix
ap_flux = np.nansum(aper_fl_mask, axis=(1,2)) - tot_sky_bkg
ap_flux_err = np.sqrt( np.nansum( aper_err_mask**2, axis=(1,2) ) + ( (np.nansum(ap_bool_mask) * sky_flx_err_per_pix)**2 ) )
elif method == 'photutils':
ap_flux, ap_flux_err, tot_sky_bkg = np.zeros( self.frames.shape[0] ), np.zeros( self.frames.shape[0] ), np.zeros( self.frames.shape[0] )
for t in range(len(ap_flux)):
# First let's perform the background subtraction
if (self.sky_rad1 == None) and (self.sky_rad2 == None):
bkg_mean, bkg_std = 0., 0.
else:
sky_aper = CircularAnnulus((int(self.cen_c[t]), int(self.cen_r[t])), r_in=self.sky_rad1, r_out=self.sky_rad2)
sky_aperstats = ApertureStats(self.frames[t,:,:], sky_aper, **kwargs)
if bkg_corr == 'mean':
bkg_mean, bkg_std = sky_aperstats.mean, sky_aperstats.std # Mean sky background per pixel
elif bkg_corr == 'median':
bkg_mean, bkg_std = sky_aperstats.median, sky_aperstats.std # Mean sky background per pixel
else:
raise Exception('The background can be computed either using mean or median.')
# Now computing the aperture flux
circ_aper = CircularAperture((self.cen_c[t], self.cen_r[t]), r=self.aprad)
ap_phot = aphot(data=self.frames[t,:,:], apertures=circ_aper, error=self.errors[t,:,:], **kwargs)
total_sky_bkg = bkg_mean * circ_aper.area_overlap(self.frames[t,:,:], **kwargs)
phot_bkgsub = ap_phot['aperture_sum'] - total_sky_bkg # Background subtraction
## Error estimation in background subtracted photometry
phot_bkgsub_err = np.sqrt( (ap_phot['aperture_sum_err']**2) + ( (circ_aper.area_overlap(self.frames[t,:,:], **kwargs) * bkg_std)**2 ) )
## Results
ap_flux[t], ap_flux_err[t] = phot_bkgsub[0], phot_bkgsub_err[0]
tot_sky_bkg[t] = total_sky_bkg
elif method == 'median':
aper_fl_mask, aper_err_mask, ap_bool_mask, _, _, _ = self.aperture_mask()
tot_sky_bkg = np.zeros(aper_fl_mask.shape[0])
aper_fl_mask[aper_fl_mask == 0.] = np.nan
ap_flux, ap_flux_err = np.nanmedian(aper_fl_mask, axis=(1,2)), np.sqrt( np.nanmedian( aper_fl_mask, axis=(1,2) ) )
else:
raise Exception('Please enter correct method...\nMethod can either be manual, photutils, or median.')
if plot:
fig, axs = plt.subplots()
axs.errorbar(self.times - self.times[0], ap_flux/np.nanmedian(ap_flux), yerr=ap_flux_err/np.nanmedian(ap_flux), fmt='.', color='dodgerblue')
axs.set_xlim( [0., np.max(self.times - self.times[0]) ] )
axs.set_xlabel( 'Time [BJD - {:.2f}]'.format(self.times[0]) )
axs.set_ylabel('Relative flux')
else:
fig, axs = None, None
return ap_flux, ap_flux_err, tot_sky_bkg, fig, axs
[docs]
def growth_function(self, rmin, rmax, noise='pipe', plot=False, **kwargs):
"""Compute the growth curve (flux vs aperture radius) and noise.
Parameters
----------
rmin : int
Minimum aperture radius (inclusive) in pixels.
rmax : int
Maximum aperture radius (exclusive) in pixels.
noise : {'pipe','std','rms','astropy'}, optional
Method used to estimate scatter for each aperture radius.
Default is 'pipe' (uses ``utils.pipe_mad``).
plot : bool, optional
If ``True``, produce a plot of the growth function (default ``False``).
**kwargs : dict
Additional keyword arguments forwarded to
``simple_aperture_photometry`` (for example sky annulus
parameters).
Returns
-------
ap_flux_radii : ndarray
Median aperture flux for each radius in the tested range.
noise_radii : ndarray
Estimated noise metric (in ppm) for each radius.
fig : matplotlib.figure.Figure or None
Figure of the growth function, if ``plot=True``, otherwise ``None``.
axs : ndarray or None
Axes of the figure if produced, otherwise ``None``.
"""
radii = np.arange(rmin, rmax, 1)
ap_flux_radii, noise_radii = np.zeros(len(radii)), np.zeros(len(radii))
# Choosing the method to compute noise
if noise == 'pipe':
noise_func = pipe_mad
elif noise == 'std':
noise_func = np.nanstd
elif noise == 'rms':
noise_func = rms
elif noise == 'astropy':
noise_func = lambda x: mad_std(x, ignore_nan=True)
else:
raise ValueError("Method should be one of 'pipe', 'std', 'rms', or 'astropy'.")
for r in tqdm(range(len(radii))):
if not self.brightpix:
self.aprad = radii[r]
else:
self.nos_brightest = radii[r]
ap_fl, _, _, _, _ = self.simple_aperture_photometry(**kwargs)
ap_flux_radii[r] = np.nanmedian( ap_fl )
noise_radii[r] = noise_func( ap_fl / np.nanmedian( ap_fl ) ) * 1e6
if plot:
fig, axs1 = plt.subplots()
color1 = 'orangered'
axs1.plot(radii, ap_flux_radii, color=color1)
axs1.set_xlabel('Radius')
axs1.set_ylabel('Growth function', color=color1)
axs1.tick_params(axis='y', which='both', color=color1, labelcolor=color1)
color2 = 'royalblue'
axs2 = axs1.twinx()
axs2.plot(radii, noise_radii, color=color2)
axs2.axvline(radii[np.argmin(noise_radii)], color='dimgrey', ls='--', lw=1.)
axs2.set_ylabel('Median Absolute Deviation [ppm]', color=color2, rotation=270, labelpad=25)
axs2.tick_params(axis='y', which='both', color=color2, labelcolor=color2)
axs2.spines['right'].set_color(color2)
axs2.spines['left'].set_color(color1)
axs1.set_xlim([ np.min(radii), np.max(radii) ])
fig.tight_layout()
else:
fig, axs1, axs2 = None, None, None
return ap_flux_radii, noise_radii, fig, [axs1, axs2]
[docs]
def pixel_level_decorrelation(self, removeNan=False):
"""Prepare pixel-level decorrelation (PLD) basis functions.
The method builds the normalized pixel-level light curves (Phat) inside
the chosen aperture and performs a PCA (principal componene analysis)
on them to produce a set of orthogonal basis vectors (``self.PCA``) with
singular values and eigenvectors stored on the instance.
Parameters
----------
removeNan : bool, optional
If ``True``, remove frames that contain NaNs in the basis
before performing PCA.
Returns
-------
V : ndarray
Matrix of eigenvectors from the PCA (shape: n_components, n_pixels).
eigenvalues : ndarray
Eigenvalues corresponding to each PCA component.
PCA : ndarray
Projected PCA time-series (shape: n_components, n_frames).
"""
_, _, self.ap_bool_mask_pld, _, _, _ = self.aperture_mask()
sky_fl, _, sky_bool, _, _, _ = self.sky_mask()
if np.nansum(sky_bool) == 0.:
sky_bkg_per_pix = 0.
else:
sky_bkg_per_pix = np.nansum(sky_fl, axis=(1,2)) / np.nansum(sky_bool)
self.idxr, self.idxc = np.where(self.ap_bool_mask_pld == 1)
pixel_fluxes = np.zeros( ( self.frames.shape[0], int(np.sum(self.ap_bool_mask_pld)) ) )
for r in range(len(self.idxr)):
pixel_fluxes[:,r] = self.frames[:, int(self.idxr[r]), int(self.idxc[r])] - sky_bkg_per_pix
# Calculating Phat
self.Psum = np.nansum( pixel_fluxes, axis=1 )
self.Phat = pixel_fluxes / self.Psum[:, None]
if removeNan:
id0, _ = np.where(np.isnan(self.Phat))
self.idx_nan = np.ones(self.Phat.shape[0], dtype=bool)
self.idx_nan[id0] = False
self.Phat = self.Phat[self.idx_nan, :]
self.times = self.times[self.idx_nan]
self.Psum = self.Psum[self.idx_nan]
self.idx_nan = np.ones(self.Phat.shape[0], dtype=bool)
id0 = np.where( self.Psum == 0. )
self.idx_nan[id0] = False
self.Phat, self.Psum = self.Phat[self.idx_nan, :], self.Psum[self.idx_nan]
self.times = self.times[self.idx_nan]
else:
self.idx_nan = np.ones(self.Phat.shape[0], dtype=bool)
self.V, self.eigenvalues, self.PCA = classic_PCA(self.Phat.T)
return self.V, self.eigenvalues, self.PCA
[docs]
def plot_correlation_matrices(self):
"""Plot correlation matrices before and after PCA on pixel-level light curves.
Returns
-------
fig : matplotlib.figure.Figure
Figure containing the two correlation matrix subplots.
axs : ndarray
Array of two Axes objects for the before/after PCA plots.
im_list : list
List with the two AxesImage objects (useful for colorbars).
"""
# Calculating PCA correlation matrix
CorrelationMatrix = np.abs(np.corrcoef(self.Phat.T))
PCACorrelationMatrix = np.abs(np.corrcoef(self.PCA))
from matplotlib import rcParams
rcParams['xtick.direction'] = 'out'
rcParams['ytick.direction'] = 'out'
# Actual figure code
fig, axs = plt.subplots(1, 2, figsize=(10,5), sharex=True, sharey=True)
im1 = axs[0].imshow(CorrelationMatrix, cmap='magma', vmin=0, vmax=1)
im2 = axs[1].imshow(PCACorrelationMatrix, cmap='magma', vmin=0, vmax=1)
axs[0].set_xlabel('Element $i$')
axs[1].set_xlabel('Element $i$')
axs[0].set_ylabel('Element $j$')
axs[0].set_title('Before PCA')
axs[1].set_title('After PCA')
fig.subplots_adjust(right=0.8)
cbar_ax = fig.add_axes([0.85, 0.1, 0.015, 0.75])
fig.colorbar(im2, cax=cbar_ax)
rcParams['xtick.direction'] = 'in'
rcParams['ytick.direction'] = 'in'
return fig, axs, [im1, im2]
[docs]
def plot_eigenvectors(self):
"""Visualize the first 10 PCA eigenvectors as spatial maps.
Returns
-------
fig : matplotlib.figure.Figure
Figure containing the eigenvector images.
axs : ndarray
Array of Axes for each eigenvector subplot.
"""
comps = 0
fig, axs = plt.subplots(nrows=2, ncols=5, figsize=(15, 5), sharex=True, sharey=True)
for r in range(axs.shape[0]):
for c in range(axs.shape[1]):
eg_vec = np.zeros(self.ap_bool_mask_pld.shape)
for k in range(len(self.idxr)):
eg_vec[self.idxr[k], self.idxc[k]] = self.V[comps, k]
im = axs[r,c].imshow(eg_vec, interpolation='none', cmap='plasma')
im.set_clim([-0.22, 0.22])
axs[r,c].set_xlim([np.min(self.idxc)-3, np.max(self.idxc)+3])
axs[r,c].set_ylim([np.min(self.idxr)-3, np.max(self.idxr)+3])
axs[r,c].text(np.max(self.idxc)+1.5, np.max(self.idxr)+2, comps+1, fontweight='bold')#, backgroundcolor='white')
comps = comps + 1
fig.suptitle('First 10 eigenvectors from PCA analysis', fontsize=16)
return fig, axs
[docs]
def plot_pcs(self, nmax=10):
"""Plot the PC time-series (principal components) up to `nmax`.
Parameters
----------
nmax : int, optional
Number of principal components to plot (default 10).
Returns
-------
fig : matplotlib.figure.Figure
Figure containing the stacked PC plots.
axs : ndarray
Array of Axes objects, one per plotted PC.
"""
fig, axs = plt.subplots(nrows=int(nmax), ncols=1, figsize=(15,10), sharex=True)
for i in range(int(nmax)):
axs[i].plot(self.times - self.times[0], self.PCA[i,:], 'k-', lw=1.)
axs[i].set_ylabel('PC ' + str(i+1))
med, std = np.nanmedian(self.PCA[i,:]), mad_std(self.PCA[i,:])
axs[i].set_ylim([med-4*std, med+4*std])
plt.xlabel('Time (BJD - {:.4f})'.format(self.times[0]))
return fig, axs
[docs]
def pld_correction(self, pc_max=5, extra_vectors=None, plot=False):
"""Apply PLD correction using the first `pc_max` principal components.
Parameters
----------
pc_max : int, optional
Number of PCA components to include in the PLD fit (default 5).
extra_vectors : ndarray or None, optional
Additional vectors to include in the PLD fit (shape: either 1D array
of length n_frames or 2D array of shape (n_extra, n_frames)).
If provided, these will be appended to the PCA basis before fitting.
Default is ``None`` (no extra vectors).
plot : bool, optional
If ``True``, produce a diagnostic plot comparing the raw and
PLD-predicted flux (default ``False``).
Returns
-------
Psum : ndarray
Sum of pixel fluxes (SAP flux) before correction.
prediction : ndarray
PLD model prediction for the flux (same length as ``Psum``).
fig : matplotlib.figure.Figure or None
Diagnostic figure if ``plot=True``, otherwise ``None``.
axs : ndarray or None
Axes of the diagnostic figure if produced, otherwise ``None``.
"""
if extra_vectors is not None:
# Ensure extra_vectors has the expected dimensionality (2D: n_extra, n_frames)
extra_vectors = np.asarray(extra_vectors)
if extra_vectors.ndim == 1:
# Interpret a 1D array as a single extra vector
extra_vectors = extra_vectors.reshape(1, -1)
elif extra_vectors.ndim != 2:
raise ValueError("extra_vectors must be a 1D or 2D array-like object.")
if extra_vectors.shape[1] != self.PCA.shape[1]:
raise ValueError("extra_vectors should have the same number of columns (frames) as PCA.")
X = np.vstack([ np.ones(len(self.Psum)), self.PCA[0:pc_max,:], extra_vectors ])
else:
X = np.vstack([ np.ones(len(self.Psum)), self.PCA[0:pc_max,:] ])
# Fit:
result = np.linalg.lstsq(X.T, self.Psum, rcond=None)
coeffs = result[0]
prediction = np.dot(coeffs, X)
print('>>> --- MAD of uncorrected light curve is: {:.2f} ppm'.format(pipe_mad(self.Psum/np.median(self.Psum))*1e6))
print('>>> --- MAD of PLD corrected light curve is: {:.2f} ppm'.format(pipe_mad(self.Psum/prediction)*1e6))
if plot:
fig, axs = plt.subplots(2, 1, figsize=(15/1.5,10/1.5), sharex=True, sharey=True)
axs[0].errorbar(self.times-self.times[0], self.Psum/np.median(self.Psum), fmt='.', c='orangered', label='SAP Flux')
axs[0].plot(self.times-self.times[0], prediction/np.median(self.Psum), c='darkgreen', label='PLD prediction',alpha=0.7, zorder=10)
axs[0].set_ylabel('Relative Flux')
axs[0].legend(loc='best')
axs[0].grid()
axs[1].errorbar(self.times-self.times[0], self.Psum/prediction, fmt='.', c='cornflowerblue')
axs[1].set_xlabel('Time [BJD - {:.2f}]'.format(self.times[0]))
axs[1].set_ylabel('Relative Flux')
axs[1].grid()
axs[1].set_xlim([ 0., np.max(self.times-self.times[0]) ])
else:
fig, axs = None, None
return self.Psum, prediction, fig, axs
[docs]
def noise_with_pcs(self, pc_max, noise='pipe', plot=False):
"""Estimate noise (MAD or other) as a function of included PCs.
Parameters
----------
pc_max : int
Maximum number of principal components to test (the method will
evaluate values from 1 to ``pc_max-1``).
noise : {'pipe','std','rms','astropy'}, optional
Noise estimator to use (default 'pipe' which uses
``utils.pipe_mad``).
plot : bool, optional
If ``True``, display a plot of noise metric vs number of PCs.
Returns
-------
pcs_to_include : ndarray
Array of numbers of PCs tested (1..pc_max-1).
mad_with_pcs : ndarray
Noise metric computed for each tested number of PCs.
fig : matplotlib.figure.Figure or None
Figure object if ``plot=True``, else ``None``.
axs : matplotlib.axes.Axes or None
Axis object for the plot if produced, else ``None``.
"""
# Choosing the method to compute noise
if noise == 'pipe':
noise_func = pipe_mad
elif noise == 'std':
noise_func = np.nanstd
elif noise == 'rms':
noise_func = rms
elif noise == 'astropy':
noise_func = lambda x: mad_std(x, ignore_nan=True)
else:
raise ValueError("Method should be one of 'pipe', 'std', 'rms', or 'astropy'.")
pcs_to_include = np.arange(1, pc_max)
mad_with_pcs = np.zeros(len(pcs_to_include))
for p in tqdm(range(len(pcs_to_include))):
# First compute the prediction
x_pc = np.vstack(( np.ones(len(self.Psum)), self.PCA[0:int(pcs_to_include[p]),:] ))
res_pc = np.linalg.lstsq(x_pc.T, self.Psum, rcond=None)
co_pc = res_pc[0]
pred_pc = np.dot(co_pc, x_pc)
# Computing the MAD
mad_with_pcs[p] = noise_func( self.Psum / pred_pc ) * 1e6
# Number of PCs with minimum noise
n_pc_of_min_scat = pcs_to_include[np.argmin(mad_with_pcs)]
if plot:
fig, axs = plt.subplots(figsize=(16/1.5, 9/1.5))
axs.plot(pcs_to_include, mad_with_pcs, c='orangered')
axs.axvline(n_pc_of_min_scat, color='dimgrey', ls='--', lw=1.)
axs.set_xlabel('Number of PCs included')
axs.set_ylabel('Median Absolute Deviation [ppm]')
axs.set_xlim([ np.min(pcs_to_include), np.max(pcs_to_include) ])
plt.grid()
else:
fig, axs = None, None
return pcs_to_include, mad_with_pcs, fig, axs
[docs]
class TESSData(object):
"""Helper to query and fetch TESS products from MAST and juliet.
This lightweight utility class provides two convenience methods:
``get_lightcurves`` which delegates to ``juliet.utils.get_all_TESS_data``
(when available) to gather per-instrument light curves, and
``get_tpfs`` which queries MAST for Target Pixel Files (TPFs),
downloads the products and populates simple dictionaries on the
instance with the extracted time and pixel-cube data.
Parameters
----------
object_name : str
Target identifier accepted by MAST (for example a TIC or common
object name).
Attributes
----------
object_name : str
The provided target name.
tim_lc, fl_lc, fle_lc : dict or None
Time/flux/flux-error collections populated by
``get_lightcurves`` when that method is called.
out_dict_lc : object or None
Optional output dictionary returned by the juliet helper.
tim_tpf, fl_tpf, fle_tpf, badpix, quality : dict
Dictionaries populated by :meth:`get_tpfs` containing per-sector
time arrays, pixel flux cubes, pixel flux error cubes, bad-pixel
masks and quality arrays respectively.
"""
def __init__(self, object_name):
self.object_name = object_name
[docs]
def get_lightcurves(self, pdc=True, save=False, pout=os.getcwd(), **kwargs):
"""Collect TESS light curves for ``self.object_name``.
This method attempts to use ``juliet.utils.get_all_TESS_data`` to
retrieve light curves. The juliet helper may return either a
simple tuple ``(tim, fl, fle)`` or an (out_dict, tim, fl, fle)
tuple; both cases are handled and stored on the instance.
Parameters
----------
pdc : bool, optional
If True request PDC-corrected fluxes when available. Default
is ``True``.
save : bool, optional
If True the method will write ASCII files named ``LC_<object>_<inst>.dat``
to ``pout``. Default ``False``.
pout : str, optional
Output directory used when ``save=True`` (defaults to the
current working directory).
**kwargs : dict
Forwarded to ``juliet.utils.get_all_TESS_data``.
Returns
-------
tim_lc, fl_lc, fle_lc, out_dict_lc : tuple
Stored time/flux/fluxerr containers and an optional output
dictionary (``out_dict_lc`` may be ``None``).
"""
try:
self.tim_lc, self.fl_lc, self.fle_lc = juliet.utils.get_all_TESS_data(object_name=self.object_name, get_PDC=pdc, **kwargs)
self.out_dict_lc = None
except:
self.out_dict_lc, self.tim_lc, self.fl_lc, self.fle_lc = juliet.utils.get_all_TESS_data(object_name=self.object_name, get_PDC=pdc, **kwargs)
if save:
name = 'PDC' if pdc else 'SAP'
for ins in self.tim_lc.keys():
fname = open( pout + '/LC_' + self.object_name + '_' + ins + '_' + name + '.dat', 'w' )
for t in range( len(self.tim_lc[ins]) ):
fname.write( str( self.tim_lc[ins][t] ) + '\t' + str( self.fl_lc[ins][t] ) + '\t' + str( self.fle_lc[ins][t] ) + '\n' )
fname.close()
return self.tim_lc, self.fl_lc, self.fle_lc, self.out_dict_lc
[docs]
def get_tpfs(self, pout=os.getcwd(), load=False, save=False, savefits=False):
"""Download or load TESS Target Pixel Files (TPFs) for the target.
If ``load`` is False the method queries MAST for TESS time-series
products for ``self.object_name``, filters suitable products
(TPFs), downloads them, reads the FITS tables and populates the
instance dictionaries ``tim_tpf``, ``fl_tpf``, ``fle_tpf``,
``quality`` and ``badpix``. If ``save`` is True a small pickled
dictionary per sector is written to ``pout``. ``load=True`` simply
reads these previously saved dictionaries.
Parameters
----------
pout : str, optional
Output directory used when ``save=True``. Default is the
current working directory.
load : bool, optional
If True, attempt to load previously saved TPF pickles instead
of querying/downloading from MAST. (Loading logic is not
implemented in this helper; calling with ``load=True`` will
bypass the MAST query branch.)
save : bool, optional
If True save extracted per-sector dictionaries as
``TPF_<object>_<sector>.pkl`` in ``pout``. Default ``False``.
savefits : bool, optional
If True, save the downloaded fits files in pout folder.
Default is False.
Returns
-------
tim_tpf, fl_tpf, fle_tpf, badpix, quality : tuple
The dictionaries populated on the instance containing the
per-sector time arrays, pixel flux cubes, flux error cubes,
quality arrays and a placeholder bad-pixel mask.
"""
if not load:
try:
obt = Observations.query_object(self.object_name, radius=0.01)
except:
raise Exception('The name of the object does not seem to be correct.\nPlease try again...')
b = np.array([])
for j in range(len(obt['intentType'])):
if obt['obs_collection'][j] == 'TESS' and obt['dataproduct_type'][j] == 'timeseries':
b = np.hstack( (b, j) )
if len(b) == 0:
raise Exception('No TESS timeseries data available for this target (strange, right?!!).\nTry another target...')
sectors, pi_name, obsids, exptime, new_b = np.array([]), np.array([]), np.array([]), np.array([]), np.array([])
for i in range(len(b)):
data1 = obt['dataURL'][int(b[i])]
if data1[-9:] == 's_lc.fits':
fls = data1.split('-')
for j in range(len(fls)):
if len(fls[j]) == 5:
sec = fls[j]
tic = fls[j+1]
sectors = np.hstack((sectors, sec))
new_b = np.hstack((new_b, b[i]))
obsids = np.hstack((obsids, obt['obsid'][int(b[i])]))
pi_name = np.hstack((pi_name, obt['proposal_pi'][int(b[i])]))
exptime = np.hstack((exptime, obt['t_exptime'][int(b[i])]))
print('Data products found over ' + str( len(sectors) ) + ' sectors.')
print('Downloading them...')
disp_tic, disp_sec = [], []
# Dictionaries for saving the data in self
self.tim_tpf, self.fl_tpf, self.fle_tpf, self.badpix, self.quality = {}, {}, {}, {}, {}
for i in range(len(sectors)):
dpr = Observations.get_product_list(obt[int(new_b[i])])
cij = 0
for j in range(len(dpr['obsID'])):
if dpr['description'][j] == 'Target pixel files':
cij = j
tab = Observations.download_products(dpr[cij])
lpt = tab['Local Path'][0][1:]
# Reading fits
hdul = fits.open(os.getcwd() + lpt)
hdr = hdul[0].header
## Finding TIC-id and sector
ticid = int(hdr['TICID'])
ticid = f"{ticid:010}"
disp_tic.append(ticid)
sec_tess = 'TESS' + str( hdr['SECTOR'] )
disp_sec.append(sec_tess)
dta = Table.read(hdul[1])
# Creating a mask array to mask NaN
idx = np.ones( dta['FLUX'].shape[0], dtype=bool )
for t in range( dta['FLUX'].shape[0] ):
nan = np.where( np.isnan( dta['FLUX'][t,:,:] ) )
if len( nan[0] ) == len( dta['FLUX'][t,:,:].flatten() ):
idx[t] = False
self.tim_tpf[sec_tess] = dta['TIME'][idx] + hdul[1].header['BJDREFI']
self.fl_tpf[sec_tess], self.fle_tpf[sec_tess] = dta['FLUX'][idx,:,:], dta['FLUX_ERR'][idx,:,:]
self.quality[sec_tess] = dta['QUALITY'][idx]
self.badpix[sec_tess] = np.ones( dta['FLUX'][idx,:,:].shape, dtype=bool )
if save:
data_sector = {}
data_sector['TIME'] = dta['TIME'][idx] + hdul[1].header['BJDREFI']
data_sector['FLUX'], data_sector['FLUX_ERR'] = dta['FLUX'][idx,:,:], dta['FLUX_ERR'][idx,:,:]
data_sector['QUALITY'] = dta['QUALITY'][idx]
data_sector['BADPIX'] = np.ones( dta['FLUX'][idx,:,:].shape, dtype=bool )
## And saving them
pickle.dump( data_sector, open(pout + '/TPF_' + self.object_name + '_' + sec_tess + '.pkl','wb') )
if savefits:
if not Path(pout + '/TPF_' + self.object_name + '_fits').exists():
os.mkdir(pout + '/TPF_' + self.object_name + '_fits')
os.system('mv ' + os.getcwd() + lpt + ' ' + pout + '/TPF_' + self.object_name + '_fits/')
# Delete the folder in the end
os.system('rm -rf mastDownload')
else:
fnames = glob(pout + '/TPF_' + self.object_name + '_TESS*.pkl')
# Dictionaries for saving the data in self
self.tim_tpf, self.fl_tpf, self.fle_tpf, self.badpix, self.quality = {}, {}, {}, {}, {}
for i in range(len(fnames)):
data_sector = pickle.load( open( fnames[i], 'rb' ) )
## Sector name
sec_name = fnames[i].split('/')[-1].split('.')[0].split('_')[-1]
## Loading the data
self.tim_tpf[sec_name] = data_sector['TIME']
self.fl_tpf[sec_name], self.fle_tpf[sec_name] = data_sector['FLUX'], data_sector['FLUX_ERR']
self.quality[sec_name] = data_sector['QUALITY']
self.badpix[sec_name] = data_sector['BADPIX']
def ApPhoto(self, sector, aprad=None, sky_rad1=None, sky_rad2=None, brightpix=False, nos_brightest=12, nos_faintest=None, minmax=None):
return ApPhoto(times=self.tim_tpf[sector], frames=self.fl_tpf[sector], errors=self.fle_tpf[sector], badpix=self.badpix[sector],\
aprad=aprad, sky_rad1=sky_rad1, sky_rad2=sky_rad2, brightpix=brightpix, nos_brightest=nos_brightest, nos_faintest=nos_faintest, minmax=minmax)
[docs]
class KeplerData(object):
def __init__(self, object_name):
self.object_name = object_name
[docs]
def get_lightcurves(self, pdc=True, long_cadence=True, verbose=True, save=False, pout=os.getcwd()):
"""
Collect Kepler/K2 light curves for ``self.object_name``.
This method uses ``astroquery`` to retrieve light curves.
The function returns a simple tuple ``(tim, fl, fle)``.
Parameters
----------
pdc : bool, optional
If True request PDC-corrected fluxes when available. Default
is ``True``.
long_cadence : bool, optional
If True the long cadence data will be downloaded. Default if False.
verbose : bool
Boolean on whether to print updates. Default is True
save : bool, optional
If True the method will write ASCII files named ``LC_<object>_<inst>.dat``
to ``pout``. Default ``False``.
pout : str, optional
Output directory used when ``save=True`` (defaults to the
current working directory).
"""
if ('K2' in self.object_name) and (not long_cadence):
raise Exception('No Short Cadence data available for K2 objects.')
try:
obt = Observations.query_object(self.object_name, radius=0.001)
except:
raise Exception('The name of the object does not seem to be correct.\nPlease try again...')
# b contains indices of the timeseries observations from TESS
b = np.array([])
for j in range(len(obt['intentType'])):
if (obt['obs_collection'][j] == 'Kepler' or obt['obs_collection'][j] == 'K2') and obt['dataproduct_type'][j] == 'timeseries':
b = np.hstack((b,j))
if len(b) == 0:
raise Exception('No Kepler/K2 timeseries data available for this target.\nTry another target...')
# To extract obs-id from the observation table
pi_name, obsids, exptime, scad, lcad = np.array([]), np.array([]), np.array([]), np.array([]), np.array([])
for i in range(len(b)):
data1 = obt['dataURL'][int(b[i])]
if 'lc' in data1.split('_'):
lcad = np.hstack((lcad, b[i]))
if 'sc' in data1.split('_'):
scad = np.hstack((scad, b[i]))
if 'llc.fits' in data1.split('_'):
lcad = np.hstack((lcad, b[i]))
pi_nm = obt['proposal_pi'][int(b[i])]
if type(pi_nm) != str:
pi_nm = 'K2 Team'
pi_name = np.hstack((pi_name, pi_nm))
if long_cadence:
dwn_b = lcad
keywd = 'Lightcurve Long Cadence'
else:
dwn_b = scad
keywd = 'Lightcurve Short Cadence'
for i in range(len(dwn_b)):
obsids = np.hstack((obsids, obt['obsid'][int(dwn_b[i])]))
exptime = np.hstack((exptime, obt['t_exptime'][int(dwn_b[i])]))
disp_kic, disp_sec = [], []
# Directory to save the data
self.tim_lc, self.fl_lc, self.fle_lc = {}, {}, {}
for i in range(len(dwn_b)):
dpr = Observations.get_product_list(obt[int(dwn_b[i])])
cij = []
for j in range(len(dpr['obsID'])):
if keywd in dpr['description'][j]:
cij.append(j)
if verbose:
print('Data products found over ' + str(len(cij)) + ' quarters/cycles.')
print('Downloading them...')
for j in range(len(cij)):
sector = f"{i:02}" + f"{j:02}" + '-' + dpr['description'][cij[j]].split('- ')[1]
# Downloading the data
tab = Observations.download_products(dpr[cij[j]])
lpt = tab['Local Path'][0][1:]
# Reading fits
hdul = fits.open(os.getcwd() + lpt)
hdr = hdul[0].header
kicid = int(hdr['KEPLERID'])
kicid = f"{kicid:010}"
dta = Table.read(hdul[1])
# Available data products
try:
if pdc:
fl = np.asarray(dta['PDCSAP_FLUX'])
fle = np.asarray(dta['PDCSAP_FLUX_ERR'])
else:
fl = np.asarray(dta['SAP_FLUX'])
fle = np.asarray(dta['SAP_FLUX_ERR'])
except:
continue
mask = np.isfinite(fl) # Creating Mask to remove Nans
bjd1 = np.asarray(dta['TIME'])[mask] + hdul[1].header['BJDREFI']
fl, fle = fl[mask], fle[mask] # Flux and Error in flux without Nans
self.tim_lc[sector], self.fl_lc[sector], self.fle_lc[sector] = bjd1, fl / np.nanmedian(fl), fle / np.nanmedian(fl)
disp_kic.append(kicid)
disp_sec.append(sector)
if verbose:
print('----------------------------------------------------------------------------------------')
print('Name\t\tKIC-id\t\tSector')
print('----------------------------------------------------------------------------------------')
for i in range(len(disp_kic)):
print(self.object_name + '\t\t' + disp_kic[i] + '\t\t' + disp_sec[i])
if save:
name = 'PDC' if pdc else 'SAP'
for ins in self.tim_lc.keys():
fname = open( pout + '/LC_' + self.object_name + '_' + ins + '_' + name + '.dat', 'w' )
for t in range( len(self.tim_lc[ins]) ):
fname.write( str( self.tim_lc[ins][t] ) + '\t' + str( self.fl_lc[ins][t] ) + '\t' + str( self.fle_lc[ins][t] ) + '\n' )
fname.close()
# Deleting the data
os.system('rm -rf mastDownload')
return self.tim_lc, self.fl_lc, self.fle_lc
[docs]
def get_tpfs(self, long_cadence=True, load=False, verbose=True, save=False, pout=os.getcwd(), savefits=False):
"""
Collect Kepler/K2 target pixel files for ``self.object_name``.
This method uses ``astroquery`` to retrieve target pixel files.
Parameters
----------
load : bool, optional
If True request PDC-corrected fluxes when available. Default
is ``True``.
long_cadence : bool, optional
If True the long cadence data will be downloaded. Default if False.
verbose : bool
Boolean on whether to print updates. Default is True
save : bool, optional
If True the method will write ASCII files named ``LC_<object>_<inst>.dat``
to ``pout``. Default ``False``.
pout : str, optional
Output directory used when ``save=True`` (defaults to the
current working directory).
"""
if not load:
if ('K2' in self.object_name) and (not long_cadence):
raise Exception('No Short Cadence data available for K2 objects.')
try:
obt = Observations.query_object(self.object_name, radius=0.001)
except:
raise Exception('The name of the object does not seem to be correct.\nPlease try again...')
# b contains indices of the timeseries observations from TESS
b = np.array([])
for j in range(len(obt['intentType'])):
if (obt['obs_collection'][j] == 'Kepler' or obt['obs_collection'][j] == 'K2') and obt['dataproduct_type'][j] == 'timeseries':
b = np.hstack((b,j))
if len(b) == 0:
raise Exception('No Kepler/K2 timeseries data available for this target.\nTry another target...')
# To extract obs-id from the observation table
pi_name, obsids, exptime, scad, lcad = np.array([]), np.array([]), np.array([]), np.array([]), np.array([])
for i in range(len(b)):
data1 = obt['dataURL'][int(b[i])]
if 'lc' in data1.split('_'):
lcad = np.hstack((lcad, b[i]))
if 'sc' in data1.split('_'):
scad = np.hstack((scad, b[i]))
if 'llc.fits' in data1.split('_'):
lcad = np.hstack((lcad, b[i]))
if 'slc.fits' in data1.split('_'):
scad = np.hstack((scad, b[i]))
pi_nm = obt['proposal_pi'][int(b[i])]
if type(pi_nm) != str:
pi_nm = 'K2 Team'
pi_name = np.hstack((pi_name, pi_nm))
if long_cadence:
dwn_b = lcad
keywd = 'Target Pixel Long Cadence'
else:
dwn_b = scad
keywd = 'Target Pixel Short Cadence'
for i in range(len(dwn_b)):
obsids = np.hstack((obsids, obt['obsid'][int(dwn_b[i])]))
exptime = np.hstack((exptime, obt['t_exptime'][int(dwn_b[i])]))
disp_kic, disp_sec = [], []
# Dictionaries to save the data
self.tim_tpf, self.fl_tpf, self.fle_tpf, self.badpix, self.quality = {}, {}, {}, {}, {}
for i in range(len(dwn_b)):
dpr = Observations.get_product_list(obt[int(dwn_b[i])])
cij = []
for j in range(len(dpr['obsID'])):
if keywd in dpr['description'][j]:
cij.append(j)
if verbose:
print('Data products found over ' + str(len(cij)) + ' quarters/cycles.')
print('Downloading them...')
for j in range(len(cij)):
sector = f"{i:02}" + f"{j:02}" + '-' + dpr['description'][cij[j]].split('- ')[1]
# Downloading the data
tab = Observations.download_products(dpr[cij[j]])
lpt = tab['Local Path'][0][1:]
# Unzipping the file
os.system('gunzip --keep ' + os.getcwd() + lpt)
# Reading fits
hdul = fits.open(os.getcwd() + lpt[:-3])
hdr = hdul[0].header
kicid = int(hdr['KEPLERID'])
kicid = f"{kicid:010}"
dta = Table.read(hdul[1])
# Creating a mask array to mask NaN
idx = np.ones( dta['FLUX'].shape[0], dtype=bool )
for t in range( dta['FLUX'].shape[0] ):
nan = np.where( np.isnan( dta['FLUX'][t,:,:] ) )
if len( nan[0] ) == len( dta['FLUX'][t,:,:].flatten() ):
idx[t] = False
self.tim_tpf[sector] = dta['TIME'][idx] + hdul[1].header['BJDREFI']
self.fl_tpf[sector], self.fle_tpf[sector] = dta['FLUX'][idx,:,:], dta['FLUX_ERR'][idx,:,:]
self.quality[sector] = dta['QUALITY'][idx]
self.badpix[sector] = np.ones( dta['FLUX'][idx,:,:].shape, dtype=bool )
if save:
data_sector = {}
data_sector['TIME'] = dta['TIME'][idx] + hdul[1].header['BJDREFI']
data_sector['FLUX'], data_sector['FLUX_ERR'] = dta['FLUX'][idx,:,:], dta['FLUX_ERR'][idx,:,:]
data_sector['QUALITY'] = dta['QUALITY'][idx]
data_sector['BADPIX'] = np.ones( dta['FLUX'][idx,:,:].shape, dtype=bool )
## And saving them
pickle.dump( data_sector, open(pout + '/TPF_' + self.object_name + '_Kep' + sector + '.pkl','wb') )
disp_kic.append(kicid)
disp_sec.append(sector)
if savefits:
if not Path(pout + '/TPF_' + self.object_name + '_fits').exists():
os.mkdir(pout + '/TPF_' + self.object_name + '_fits')
os.system('mv ' + os.getcwd() + lpt + ' ' + pout + '/TPF_' + self.object_name + '_fits')
os.system('mv ' + os.getcwd() + lpt[:-3] + ' ' + pout + '/TPF_' + self.object_name + '_fits')
# Delete the folder in the end
os.system('rm -rf mastDownload')
if verbose:
print('----------------------------------------------------------------------------------------')
print('Name\t\tKIC-id\t\tSector')
print('----------------------------------------------------------------------------------------')
for i in range(len(disp_kic)):
print(self.object_name + '\t\t' + disp_kic[i] + '\t\t' + disp_sec[i])
else:
fnames = glob(pout + '/TPF_' + self.object_name + '_Kep*.pkl')
# Dictionaries for saving the data in self
self.tim_tpf, self.fl_tpf, self.fle_tpf, self.badpix, self.quality = {}, {}, {}, {}, {}
for i in range(len(fnames)):
data_sector = pickle.load( open( fnames[i], 'rb' ) )
## Sector name
sec_name = fnames[i].split('/')[-1].split('.')[0].split('_')[-1]
## Loading the data
self.tim_tpf[sec_name] = data_sector['TIME']
self.fl_tpf[sec_name], self.fle_tpf[sec_name] = data_sector['FLUX'], data_sector['FLUX_ERR']
self.quality[sec_name] = data_sector['QUALITY']
self.badpix[sec_name] = data_sector['BADPIX']
def ApPhoto(self, sector, aprad=None, sky_rad1=None, sky_rad2=None, brightpix=False, nos_brightest=12, nos_faintest=None, minmax=None):
return ApPhoto(times=self.tim_tpf[sector], frames=self.fl_tpf[sector], errors=self.fle_tpf[sector], badpix=self.badpix[sector],\
aprad=aprad, sky_rad1=sky_rad1, sky_rad2=sky_rad2, brightpix=brightpix, nos_brightest=nos_brightest, nos_faintest=nos_faintest, minmax=minmax)
[docs]
class SpectroscopicLC(object):
"""Analyze and fit multi-wavelength time-series spectroscopic data.
This class provides tools for handling spectroscopic light curves where
flux is measured as a function of both time and wavelength. It supports
binning spectral data into channels, fitting individual wavelength bins
using ``juliet``, parallel processing of multiple channels, and
visualization of spectral transit/eclipse signatures and systematics.
Parameters
----------
times : ndarray
1D array of time stamps (in BJD or similar) with shape (N_frames,).
lc : ndarray
2D array of flux measurements with shape (N_frames, N_wavelengths).
Each row is a time frame; each column is a wavelength bin.
lc_errs : ndarray
2D array of flux uncertainties with the same shape as ``lc``.
wavelengths : ndarray
1D array of wavelength values (in microns or other units) corresponding
to each column of ``lc``. Shape is (N_wavelengths,).
gp_pars : dict or None, optional
Dictionary mapping instrument names to arrays of Gaussian Process (GP)
regressor values (e.g., time, roll angle). Used when fitting with GP
systematics. Default is ``None``.
lin_pars : dict or None, optional
Dictionary mapping instrument names to 2D arrays of linear regressor
values (shape: N_frames x N_regressors). Used for linear detrending.
Default is ``None``.
priors : callable or None, optional
A callable that accepts a channel name (string) and returns a juliet-format priors
for that channel. Required for fitting
via :meth:`analyse_lc_parallel`. Default is ``None``.
pout : str, optional
Output directory for saving results, intermediate data, and pickled
light curves. Default is the current working directory.
Attributes
----------
times : ndarray
Input time array.
lc : ndarray
Input 2D light-curve array.
lc_errs : ndarray
Input 2D error array.
wavelengths : ndarray
Input wavelength array.
gp_pars, lin_pars : dict or None
Input systematics parameters.
priors : callable or None
Input priors function.
pout : str
Output directory.
col_st, col_end : ndarray
Start and end column indices for each binned channel (populated by
:meth:`col_spec`).
spectral_lcs_dict : dict
Dictionary containing binned spectral light curves and metadata,
populated by :meth:`generating_lightcurves`. Keys include 'lc', 'err',
'wave', and 'wave_bin'.
wav, wav_bin : ndarray
Wavelength array and half-widths for each bin (populated by
:meth:`plot_parameter_spectrum`).
pars_med, pars_up, pars_lo : ndarray
Median and upper/lower credible interval bounds on fitted parameters
as a function of wavelength (populated by :meth:`plot_parameter_spectrum`).
qua_white : tuple
Quantiles (median, upper, lower) of the white-light parameter
(populated by :meth:`plot_parameter_spectrum`).
Main methods
------------
generating_lightcurves(ch_nos=None)
Extract and optionally bin spectral light curves into channels.
bin_lightcurve(unbinned_lc, unbinned_lc_err)
Inverse-variance weighted binning of light curves along wavelength direction.
white_light_lc()
Compute the white-light (all-wavelength) light curve.
analyse_lc_parallel(nthreads, ch_nos=None, juliet_fit_kwargs={})
End-to-end parallel analysis: bin, organize, and fit all channels.
plot_parameter_spectrum(parameter, bins=None, post_size=10000, ...)
Plot fitted parameter values as a function of wavelength.
plot2Ddata(cmap='plasma')
2D heatmap of spectral light curves vs time.
plot2D_data_model_resids(cmap='plasma', detrend=False, ...)
Side-by-side 2D heatmaps of data, model, and residuals.
joint_fake_allan_deviation(binmax=10, method='pipe', timeunit=None)
Combined noise vs binning plot across all channels.
Notes
-----
- All fitting is performed using the ``juliet`` package (Espinoza et al. 2019).
- Parallel processing uses Python's ``multiprocessing.Pool``.
- Results are saved as pickled dictionaries and ASCII text files to ``pout``.
- Methods check for existing output files and skip re-computation if files
are already present (useful for resuming interrupted analyses).
Examples
--------
Create and analyze a spectroscopic dataset:
>>> import numpy as np
>>> times = np.linspace(2459000, 2459001, 100)
>>> wavelengths = np.linspace(1.1, 1.7, 50)
>>> lc = np.random.normal(1.0, 0.01, (100, 50))
>>> lc_errs = np.full_like(lc, 0.005)
>>> def get_priors(ch_name):
... return juliet.utils.priors()
>>> spec = SpectroscopicLC(times, lc, lc_errs, wavelengths,
... priors=get_priors, pout='./results')
>>> spec.analyse_lc_parallel(nthreads=4, ch_nos=10)
>>> fig, ax = spec.plot_parameter_spectrum('p_p1', bins=10, plot_white=True)
"""
def __init__(self, times, lc, lc_errs, wavelengths, gp_pars=None, lin_pars=None, priors=None, pout=os.getcwd()):
# Dimensions of lc should be (nframes, ncols)
self.times = times
self.lc = lc
self.lc_errs = lc_errs
self.wavelengths = wavelengths
self.gp_pars = gp_pars
self.lin_pars = lin_pars
self.priors = priors
self.pout = pout
[docs]
def col_spec(self, ch_nos):
"""Given a number of total number of channels, this function gives an array containing
start and end column for each channel"""
if ch_nos != 1:
col_in_1_ch = round(self.lc.shape[1]/ch_nos)
self.col_st = np.arange(0, self.lc.shape[1]-col_in_1_ch, col_in_1_ch, dtype=int)
self.col_end = np.arange(0+col_in_1_ch, self.lc.shape[1], col_in_1_ch, dtype=int)
else:
self.col_st, self.col_end = np.array([0]), np.array([self.lc.shape[1]])
if self.col_end[-1] != self.lc.shape[1]:
self.col_st = np.hstack((self.col_st, self.col_end[-1]))
self.col_end = np.hstack((self.col_end, self.lc.shape[1]))
[docs]
def generating_lightcurves(self, ch_nos=None):
"""Extract and optionally bin spectral light curves into channels.
This method either bins the original light curve into ``ch_nos`` channels
(using inverse-variance weighting) or uses the original wavelength resolution.
Results are saved as a pickled dictionary and stored in
``self.spectral_lcs_dict``.
If a pickle file for the requested binning already exists, it is loaded
instead of recomputing.
Parameters
----------
ch_nos : int or None, optional
Number of channels to bin the light curve into. If ``None``, use
native wavelength resolution (no binning). Default is ``None``.
Returns
-------
None
Results are stored in ``self.spectral_lcs_dict`` with keys:
- 'lc' : ndarray
Binned light curves (shape: N_frames x N_channels).
- 'err' : ndarray
Binned flux errors (same shape as 'lc').
- 'wave' : ndarray
Central wavelength of each bin.
- 'wave_bin' : ndarray
Half-width (or full width) of each wavelength bin.
Notes
-----
The output is pickled to
``pout/spectroscopic_lc_ch_<ch_nos>.pkl`` for future loading.
"""
if ch_nos == None:
fname = self.pout + '/spectroscopic_lc_ch_' + str(self.lc.shape[1]) + '.pkl'
else:
fname = self.pout + '/spectroscopic_lc_ch_' + str(ch_nos) + '.pkl'
if not Path(fname).exists():
# Columns
if ch_nos != None:
self.col_spec(ch_nos)
# Creating spectral lc array
spec_lc, spec_err_lc = np.zeros( ( self.lc.shape[0], len(self.col_st) ) ), np.zeros( ( self.lc.shape[0], len(self.col_st) ) )
wavs, wav_bin_size = np.zeros( len(self.col_st) ), np.zeros( len(self.col_st) )
for i in range(len(self.col_st)):
spec_lc[:,i], spec_err_lc[:,i] = self.bin_lightcurve( self.lc[:, self.col_st[i]:self.col_end[i]], \
self.lc_errs[:, self.col_st[i]:self.col_end[i]] )
if self.col_end[i] != self.lc.shape[1]:
wavs[i] = ( self.wavelengths[self.col_st[i]] + self.wavelengths[self.col_end[i]] )/2
wav_bin_size[i] = np.abs( self.wavelengths[self.col_st[i]] - self.wavelengths[self.col_end[i]] )
else:
wavs[i] = ( self.wavelengths[self.col_st[i]] + self.wavelengths[self.col_end[i]-1] )/2
wav_bin_size[i] = np.abs( self.wavelengths[self.col_st[i]] - self.wavelengths[self.col_end[i]-1] )
else:
spec_lc, spec_err_lc = self.lc, self.lc_errs
wavs, wav_bin_size = self.wavelengths, np.append(np.diff(self.wavelengths), np.diff(self.wavelengths)[-1])
# Save the light curves
self.spectral_lcs_dict = {}
self.spectral_lcs_dict['lc'], self.spectral_lcs_dict['err'] = spec_lc, spec_err_lc
self.spectral_lcs_dict['wave'], self.spectral_lcs_dict['wave_bin'] = wavs, wav_bin_size
if ch_nos == None:
pickle.dump(self.spectral_lcs_dict, open(self.pout + '/spectroscopic_lc_ch_' + str(self.lc.shape[1]) + '.pkl','wb'))
else:
pickle.dump(self.spectral_lcs_dict, open(self.pout + '/spectroscopic_lc_ch_' + str(ch_nos) + '.pkl','wb'))
else:
print('>>>> --- The spectroscopic lightcurves already exists...')
print(' Loading them...')
self.spectral_lcs_dict = pickle.load( open(fname, 'rb') )
[docs]
def bin_lightcurve(self, unbinned_lc, unbinned_lc_err):
"""Perform inverse-variance weighted binning of a light curve.
Each output bin is the weighted average of input values, where weights
are the inverse squares of the input uncertainties. Output uncertainties
are computed by propagating input uncertainties through the weighted
average formula.
Parameters
----------
unbinned_lc : ndarray
2D array of unbinned flux values (shape: N_frames x N_wavelengths).
unbinned_lc_err : ndarray
2D array of flux uncertainties (same shape as ``unbinned_lc``).
Returns
-------
bin_lc : ndarray
1D array of binned flux values (shape: N_frames,).
bin_lc_err : ndarray
1D array of binned flux uncertainties (same shape as ``bin_lc``).
"""
# shape of both arrays are: (nframes, ncols)
weights = 1 / unbinned_lc_err**2
bin_lc = np.nansum(unbinned_lc * weights, axis=1) / np.nansum(weights, axis=1)
bin_lc_err = ( 1 / np.nansum(weights, axis=1) ) * np.sqrt( np.nansum( (weights**2) * (unbinned_lc_err**2), axis=1 ) )
return bin_lc, bin_lc_err
[docs]
def white_light_lc(self):
"""Compute the white-light (all-wavelength) light curve.
Bins the entire 2D light curve (all wavelengths combined) into a single
1D time-series using inverse-variance weighting.
Returns
-------
white_lc : ndarray
1D array of white-light flux values.
white_lc_err : ndarray
1D array of white-light flux uncertainties.
"""
return self.bin_lightcurve(unbinned_lc=self.lc, unbinned_lc_err=self.lc_errs)
# MultiProcessing helper function (does the actual fitting)
[docs]
def fit_lc(self, lightcurves, ch_name):
"""Inteernal helper function for multiprocessing.
This function fits lightcurve for one spectroscopic channel"""
print('---------------------------------')
print('Working on Channel: ' + ch_name)
print('')
# Output folder
pout = self.pout + '/' + ch_name
f15 = Path(pout + '/_dynesty_DNS_posteriors.pkl')
f16 = Path(pout + '/model_resids.dat')
f17 = Path(pout + '/posteriors.dat')
if f15.exists() and f16.exists() and f17.exists():
print('>>>> --- The result files already exists...')
print(' Continuing to the next channel...')
res = np.zeros(10)
else:
# Extracting the data
tim9, fl9, fle9 = lightcurves['times'], lightcurves['lc'], lightcurves['err']
# Removing Nan values
tim7, fl7, fle7 = tim9[~np.isnan(fl9)], fl9[~np.isnan(fl9)], fle9[~np.isnan(fl9)]
# Outlier removal
#msk2 = utl.outlier_removal(tim7, fl7, fle7, clip=10, msk1=False)
#tim7, fl7, fle7 = tim7[msk2], fl7[msk2], fle7[msk2]
# Normalizing the lightcurve
tim7, fl7, fle7 = tim7, fl7/np.median(fl7), fle7/np.median(fl7)
# Making data such that juliet can understand
tim, fl, fle = {}, {}, {}
tim[ch_name], fl[ch_name], fle[ch_name] = tim7, fl7, fle7
if type( lightcurves['lins'] ) == type( None ):
lin_pars = None
else:
lin_pars = {}
lin_pars[ch_name] = lightcurves['lins']
if type( lightcurves['gp'] ) == type( None ):
gp_pars = None
else:
gp_pars = {}
gp_pars[ch_name] = lightcurves['gp']
# Fitting
dataset = juliet.load(priors=self.priors(ch_name), t_lc=tim, y_lc=fl, yerr_lc=fle, linear_regressors_lc=lin_pars, GP_regressors_lc=gp_pars, out_folder=pout)
res = dataset.fit(sampler = 'dynamic_dynesty', **self.juliet_fit_kwargs)#, nthreads=8)
# Some plots
model = res.lc.evaluate(ch_name)
residuals = fl[ch_name]-model
data12 = np.vstack((model, residuals))
np.savetxt(pout + '/model_resids.dat', np.transpose(data12))
print('>>>> --- Done!!')
return res
# Function that does the multiprocessing
def multi_fit_lcs(self, lightcurves, nthreads=4):
input_data = [(lightcurves[lc], lc) for lc in lightcurves]
with multiprocessing.Pool(nthreads) as p:
result_list = p.starmap(self.fit_lc, input_data)
return np.array(result_list)
[docs]
def analyse_lc_parallel(self, nthreads, ch_nos=None, juliet_fit_kwargs={}):
"""End-to-end parallel analysis: bin, organize, and fit lightcurves from all channels.
This high-level wrapper orchestrates the full analysis pipeline:
1. Bins the spectral light curve into channels via :meth:`generating_lightcurves`
2. Organizes the data into a format suitable for fitting
3. Calls :meth:`multi_fit_lcs` to fit all channels in parallel
Parameters
----------
nthreads : int
Number of parallel threads to use for fitting.
ch_nos : int or None, optional
Number of wavelength channels to bin into. If ``None``, use native
wavelength resolution. Default is ``None``.
juliet_fit_kwargs : dict, optional
Keyword arguments to forward to ``juliet.dataset.fit(...)``,
such as ``sampler='dynamic_dynesty'``, ``nthreads=8``, etc.
Default is an empty dictionary.
Returns
-------
None
Results are written to ``pout`` and its subdirectories.
Notes
-----
Requires that ``self.priors`` is a callable that returns a priors
for each channel name.
"""
# Setting juliet_fit_kwargs
self.juliet_fit_kwargs = juliet_fit_kwargs
# Generating the data
## Bin the data, if necessary
self.generating_lightcurves(ch_nos=ch_nos)
# Storing all lightcurves in a big dictionary
all_lightcurve_data = {}
for i in range( self.spectral_lcs_dict['lc'].shape[1] ):
# Storing all lightcurves in a big dictionary
all_lightcurve_data['CH' + str(i)] = {}
## And now storing the actual lightcurve data
all_lightcurve_data['CH' + str(i)]['times'] = self.times
all_lightcurve_data['CH' + str(i)]['lc'] = self.spectral_lcs_dict['lc'][:,i]
all_lightcurve_data['CH' + str(i)]['err'] = self.spectral_lcs_dict['err'][:,i]
## Storing linear and GP parameters, if provided (None, otherwise)
all_lightcurve_data['CH' + str(i)]['lins'] = self.lin_pars
all_lightcurve_data['CH' + str(i)]['gp'] = self.gp_pars
# Now, analysing them
_ = self.multi_fit_lcs(all_lightcurve_data, nthreads=nthreads)
[docs]
def plot_parameter_spectrum(self, parameter, bins=None, post_size=10000, bin_method='mean', ppm=False, plot_white=False):
"""Plot a fitted parameter as a function of wavelength (a spectrum).
Loads posterior samples for a parameter from all fitted channels,
optionally re-bins them in the wavelength direction, and plots
median with credible intervals as a function of wavelength.
Parameters
----------
parameter : str
Name of the parameter to plot (e.g., 'p_p1', 'fp_p1', 'C1_p1').
Should match the parameter naming convention used by juliet.
bins : int or None, optional
Number of wavelength bins to use for the output spectrum. If ``None``
or equal to the native number of channels, no re-binning is performed.
Default is ``None``.
post_size : int, optional
Number of posterior samples to draw from each channel. Default is 10000.
bin_method : {'mean', 'median', 'weighted_average'}, optional
Method for combining posteriors across wavelength bins.
Default is ``'mean'``.
ppm : bool, optional
If ``True``, scale parameter values to ppm (multiply by 1e6).
Useful for small parameters like occultation depths. Default is ``False``.
plot_white : bool, optional
If ``True``, overplot the white-light (all-wavelength) parameter
value as a horizontal band with credible interval. Default is ``False``.
Returns
-------
fig : matplotlib.figure.Figure
The created figure.
axs : matplotlib.axes.Axes
The plot axes.
Notes
-----
Posterior samples are loaded from files named like
``pout/CH<i>/*_posteriors.pkl``.
"""
## First, let's put all posteriros in a big 2D array
all_posteriors = []
for i in range( self.spectral_lcs_dict['lc'].shape[1] ):
fname = glob( self.pout + '/CH' + str(i) + '/*_posteriors.pkl' )[0]
post = pickle.load( open(fname, 'rb') )
all_posteriors.append( np.random.choice( post['posterior_samples'][parameter + '_CH' + str(i)], size=post_size ) )
posteriors = np.transpose( np.vstack( all_posteriors ) )
## Calculating the white light parameter value
if bin_method == 'mean':
white_post = np.mean( posteriors, axis=1 )
elif bin_method == 'median':
white_post = np.median( posteriors, axis=1 )
elif bin_method == 'weighted_average':
white_post = np.average( posteriors, axis=1, weights=1/np.std(posteriors, axis=0)**2 )
else:
raise ValueError('>>> --- bin_method not recognized. Please use mean, median or weighted_average.')
### Quantiles
self.qua_white = juliet.utils.get_quantiles( white_post )
if ( bins == self.spectral_lcs_dict['lc'].shape[1] ) or ( bins == None ):
## Wavelengths
self.wav, self.wav_bin = self.spectral_lcs_dict['wave'], self.spectral_lcs_dict['wave_bin']/2
## If bin size is the same as the column size, then we don't perform any binning
self.pars_med, self.pars_up, self.pars_lo = np.zeros( self.spectral_lcs_dict['lc'].shape[1] ), np.zeros( self.spectral_lcs_dict['lc'].shape[1] ), np.zeros( self.spectral_lcs_dict['lc'].shape[1] )
for i in range( self.spectral_lcs_dict['lc'].shape[1] ):
## And, appending parameter values
qua = juliet.utils.get_quantiles( posteriors[:,i] )
self.pars_med[i], self.pars_up[i], self.pars_lo[i] = qua[0], qua[1] - qua[0], qua[0] - qua[2]
else:
## Wavelengths
native_wav, native_wav_bin = self.spectral_lcs_dict['wave'], self.spectral_lcs_dict['wave_bin']
# Column location, for low resolution eclipse spectrum
col_in_1_ch = round( len(native_wav) / bins )
col_st = np.arange(0, len(native_wav)-col_in_1_ch, col_in_1_ch, dtype=int)
col_end = np.arange(0+col_in_1_ch, len(native_wav), col_in_1_ch, dtype=int)
if col_end[-1] != len(native_wav):
col_st = np.hstack(( col_st, col_end[-1] ))
col_end = np.hstack(( col_end, len(native_wav) ))
# For binning the spectrum
self.pars_med, self.pars_up, self.pars_lo = np.zeros( len(col_st) ), np.zeros( len(col_st) ), np.zeros( len(col_st) )
self.wav, self.wav_bin = np.zeros( len(col_st) ), np.zeros( len(col_end) )
# And performing actual binning
for i in range(len(col_st)):
## Calculating the median of the posteriors in the bin
if bin_method == 'mean':
fp12 = np.mean( posteriors[:,col_st[i]:col_end[i]], axis=1 )
elif bin_method == 'median':
fp12 = np.median( posteriors[:,col_st[i]:col_end[i]], axis=1 )
elif bin_method == 'weighted_average':
fp12 = np.average( posteriors[:,col_st[i]:col_end[i]], axis=1, weights=1/np.std( posteriors[:,col_st[i]:col_end[i]], axis=0 )**2 )
else:
raise ValueError('>>> --- bin_method not recognized. Please use mean, median or weighted_average.')
qua_fp12 = juliet.utils.get_quantiles(fp12)
self.pars_med[i], self.pars_up[i], self.pars_lo[i] = qua_fp12[0], qua_fp12[1]-qua_fp12[0], qua_fp12[0]-qua_fp12[2]
# For wavelength bins
if col_end[i] != len(native_wav):
self.wav[i] = ( native_wav[col_st[i]] + native_wav[col_end[i]] ) / 2
self.wav_bin[i] = np.abs( native_wav[col_st[i]] - native_wav[col_end[i]] ) / 2
else:
self.wav[i] = ( native_wav[col_st[i]] + native_wav[col_end[i]-1] ) / 2
self.wav_bin[i] = np.abs( native_wav[col_st[i]] - native_wav[col_end[i]-1] ) / 2
# ---------------------------------------
#. Now, plotting the spectrum
# ---------------------------------------
if ppm:
self.pars_med *= 1e6
self.pars_up *= 1e6
self.pars_lo *= 1e6
self.qua_white = ( self.qua_white[0]*1e6, self.qua_white[1]*1e6, self.qua_white[2]*1e6 )
fig, axs = plt.subplots()
axs.errorbar(self.wav, self.pars_med, yerr=[self.pars_lo, self.pars_up], xerr=self.wav_bin, fmt='o', c='orangered', mfc='white', elinewidth=1.5, capthick=1.5, capsize=3.)
if plot_white:
wav12 = np.linspace( min(self.wav)-max(self.wav_bin)*2, max(self.wav)+max(self.wav_bin)*2, 1000 )
axs.axhline(self.qua_white[0], color='cornflowerblue', ls='-', zorder=10)
axs.fill_between(wav12, self.qua_white[2], self.qua_white[1], color='cornflowerblue', alpha=0.3)
axs.set_xlim( min(self.wav)-max(self.wav_bin)*1.5, max(self.wav)+max(self.wav_bin)*1.5 )
axs.set_xlabel('Wavelength')
axs.set_ylabel(parameter)
axs.set_title(f'Spectral {parameter} spectrum')
return fig, axs
[docs]
def plot2Ddata(self, cmap='plasma'):
"""Create a 2D heatmap of spectral light curves (flux vs time) vs wavelength.
Parameters
----------
cmap : str, optional
Matplotlib colormap name. Default is ``'plasma'``.
Returns
-------
fig : matplotlib.figure.Figure
The created figure.
ax1 : matplotlib.axes.Axes
The plot axes.
im1 : matplotlib.image.AxesImage
The image object.
cbar : matplotlib.colorbar.Colorbar
The colorbar object.
"""
# Adjusting the times to time since beginning in hours
tim = ( self.times - self.times[0] ) * 24
## Preparing the data
norm_lcs = np.copy( self.lc )
fig, ax1 = plt.subplots( figsize=(7, 7) )
# Data
im1 = ax1.imshow(norm_lcs.T, interpolation='none', cmap=cmap, aspect = 'auto', extent=[tim[0], tim[-1], self.wavelengths[-1], self.wavelengths[0]])
im1.set_clim( [ np.nanmedian(norm_lcs) - 5*mad_std(norm_lcs), np.nanmedian(norm_lcs) + 5*mad_std(norm_lcs) ] )
plt.ylabel(r'Wavelength [$\mu$m]')
plt.xlabel('Time from beginning [h]')
# Colorbar:
divider = make_axes_locatable(ax1)
cax = divider.append_axes("right", size="5%", pad=0.05)
cbar = fig.colorbar(im1, shrink = 0.08, cax=cax)
cbar.ax.get_yaxis().labelpad = 20
cbar.ax.set_ylabel('Relative flux', rotation=270)#, fontsize = 15)
return fig, ax1, im1, cbar
[docs]
def plot2D_data_model_resids(self, cmap='plasma', detrend=False, binwidth=None, ppm=False, **kwargs):
"""Create side-by-side 2D heatmaps of data, model, and residuals.
Loads the fitted models and residuals from all channels and creates
three synchronized heatmaps showing the full spectro-temporal evolution
of the data, fit quality, and systematics.
Parameters
----------
cmap : str, optional
Matplotlib colormap name. Default is ``'plasma'``.
detrend : bool, optional
If ``True``, load and plot detrended data (subtracting GP and
linear systematics). Requires additional ``juliet`` processing
and may be slow. If ``False``, plot raw data. Default is ``False``.
binwidth : float or None, optional
Time binwidth (in days) for additional temporal binning of the
data/model/residuals. If ``None``, no binning is performed.
Default is ``None``.
ppm : bool, optional
If ``True``, express data and model in ppm relative to unity.
Residuals are always in ppm. Default is ``False``.
**kwargs : dict
Forwarded to the ``julietPlots`` constructor when ``detrend=True``
(e.g., ``N=1000`` for the number of posterior samples).
Returns
-------
fig : matplotlib.figure.Figure
The created figure with three subplots.
axs : ndarray
Array of three Axes objects (data, model, residuals).
im_list : list
List of three AxesImage objects (useful for colorbars).
"""
# binwidth = None means that you should not perform any binning (in time)
# We don't perform any binning in wavelength direction -- please do that before fitting the light curves
# Detrend = True detrends the data and plot detrended model instead.
data = np.zeros( ( len(self.spectral_lcs_dict['wave']), len(self.times) ) )
models = np.zeros( data.shape )
resids = np.zeros( data.shape )
## Loading the wavelengths
waves = self.spectral_lcs_dict['wave']
min_mod, max_mod = 1e10, -1e10
for c in range( self.spectral_lcs_dict['lc'].shape[1] ):
if not detrend:
## First, we need data
tim, fl = np.loadtxt( self.pout + '/CH' + str(c) + '/lc.dat', usecols=(0,1), unpack=True )
## And the model and residuals
model, resid = np.loadtxt( self.pout + '/CH' + str(c) + '/model_resids.dat', usecols=(0,1), unpack=True )
else:
## Well, if detrend = True, then we need to detrend the data (that can take a while)
data = julietPlots(input_folder=self.pout + '/CH' + str(c), **kwargs)
## We now need to detrend the data and the calculate the detrended model
data.detrend_data(phmin=0.8, instruments='CH' + str(c))
data.detrend_model(phmin=0.8, instruments='CH' + str(c), highres=False)
# Time data
## Array that sort any array according to time
idx_time = np.argsort( data.dataset.times_lc['CH' + str(c)] )
tim = data.dataset.times_lc['CH' + str(c)][idx_time]
resid = ( data.dataset.data_lc['CH' + str(c)] - data.models_all_ins['CH' + str(c)][1] )[idx_time]
## Detrended dataset
fl = data.detrended_data['CH' + str(c)][idx_time]
## Detrended model
model = data.planet_only_models['CH' + str(c)][1][idx_time]
if ppm:
fl, model = ( fl - 1. ) * 1e6, ( model - 1. ) * 1e6
## Residuals are always in ppm
resid = resid * 1e6
if binwidth is not None:
_, fl, _, _ = lcbin(time=tim, flux=fl, binwidth=binwidth)
_, model, _, _ = lcbin(time=tim, flux=model, binwidth=binwidth)
_, resid, _, _ = lcbin(time=tim, flux=resid, binwidth=binwidth)
# And putting them in a big array
data[c, :], models[c, :], resids[c, :] = fl, model, resid
## Calculating the minimum and maximum values of the model
if np.nanmin(model) < min_mod:
min_mod = np.nanmin(model)
if np.nanmax(model) > max_mod:
max_mod = np.nanmax(model)
# And, finally, the plotting
# Convering time to hours
tim = ( tim - tim[0] ) * 24
# Colorbars
cmin1, cmax1 = min_mod-0.005*min_mod, max_mod+0.005*max_mod
norm1 = matplotlib.colors.Normalize(vmin=cmin1, vmax=cmax1, clip=True)
mapper1 = cm.ScalarMappable(norm=norm1, cmap=cmap)
mapper1.set_array([])
cmin2, cmax2 = np.nanmedian(resids)-3*mad_std(resids), np.nanmedian(resids)+3*mad_std(resids)
norm2 = matplotlib.colors.Normalize(vmin=cmin2, vmax=cmax2, clip=True)
mapper2 = cm.ScalarMappable(norm=norm2, cmap=cmap)
mapper2.set_array([])
# And plotting it
fig, axs = plt.subplots(nrows=1, ncols=3, figsize=(15,5), sharex=True, sharey=True)
## First box: plotting the data
im1 = axs[0].imshow(data, aspect='auto', cmap=cmap, interpolation='none', extent=[tim[0], tim[-1], waves[-1], waves[0]])
im1.set_clim([cmin1, cmax1])
## Second box: plotting the model
im2 = axs[1].imshow(models, aspect='auto', cmap=cmap, interpolation='none', extent=[tim[0], tim[-1], waves[-1], waves[0]])
im2.set_clim([cmin1, cmax1])
## First colorbar
cbar1 = plt.colorbar(mapper1, ax=np.array([axs[0],axs[1]]))#, location='top')
if ppm:
cbar1.set_label('Relative flux [ppm]', rotation=270, labelpad=20)
else:
cbar1.set_label('Relative flux', rotation=270, labelpad=20)
cbar1.set_ticks(ticks=np.linspace(cmin1, cmax1, 7))
cbar1.set_ticklabels(ticklabels=[ "{:.3f}".format( i ) for i in np.linspace(cmin1, cmax1, 7) ] )
im3 = axs[2].imshow(resids, aspect='auto', cmap=cmap, interpolation='none', extent=[tim[0], tim[-1], waves[-1], waves[0]])
im3.set_clim([cmin2, cmax2])
cbar2 = plt.colorbar(mapper2, ax=axs[2])#, location='top')
cbar2.set_label('Residuals [ppm]', rotation=270, labelpad=20)
cbar2.set_ticks( ticks=np.linspace(cmin2, cmax2, 7) )
cbar2.set_ticklabels(ticklabels=[ "{:d}".format( int(i) ) for i in np.linspace(cmin2, cmax2, 7) ] )
fig.supxlabel('Time since beginning [hr]', fontsize=15)
axs[0].set_ylabel(r'Wavelength [$\mu$m]')
if detrend:
axs[0].set_title('Detrended data')
else:
axs[0].set_title('Data')
axs[1].set_title('Model')
axs[2].set_title('Residuals')
return fig, axs, [im1, im2, im3]
[docs]
def joint_fake_allan_deviation(self, binmax=10, method='pipe', timeunit=None):
"""Plot combined noise-vs-binning curves across all spectroscopic channels.
Computes the "Allan deviation" (actually the MAD-based noise as a
function of binning) for each channel's residuals and overlays all
curves on a single plot. Also shows the white-noise expectation curve.
Parameters
----------
binmax : int, optional
Maximum number of bins to test. Default is 10.
method : {'pipe', 'std', 'rms', 'astropy'}, optional
Noise estimator to use (passed to :func:`utils.fake_allan_deviation`).
Default is ``'pipe'`` (MAD-based).
timeunit : {'d', 'hr', 'min'} or None, optional
Time unit for the secondary x-axis. If ``None``, chosen automatically
based on data span. Default is ``None``.
Returns
-------
fig : matplotlib.figure.Figure
The created figure.
axs : matplotlib.axes.Axes
The plot axes (with primary x-axis for bin size and secondary
x-axis for time period).
"""
# Creating a figure outside of the loop
fig, axs = plt.subplots()
## Looping over all channels
for c in tqdm(range( self.spectral_lcs_dict['lc'].shape[1] )):
times = np.loadtxt( self.pout + '/CH' + str(c) + '/lc.dat', usecols=0, unpack=True )
resids = np.loadtxt( self.pout + '/CH' + str(c) + '/model_resids.dat', usecols=1, unpack=True )
_, _, binsize, noise, white_noise_expec = fake_allan_deviation(times=times, residuals=resids, binmax=binmax, method=method, timeunit=timeunit, plot=False)
## And plotting it over the main plot
## First plotting the computed noise
if c == 0:
label1, label2 = 'Computed noise', 'White-noise expectation'
else:
label1, label2 = None, None
axs.plot(binsize, noise, color='dodgerblue', lw=0.7, label=label1, zorder=10)
## Now plotting the white-noise expectation
axs.plot(binsize, white_noise_expec, color='orangered', lw=1., label=label2, zorder=20)
# Converting binsize to time units
time_binsize = binsize * np.nanmedian( np.diff(times) )
# First, let's estimate the time units we need from times array:
if timeunit is None:
if np.ptp(time_binsize) >= 1.:
## If times duration is greater than 2 hours, we use days
time_unit_multiplication_factor = 1. # Units are already in days
time_unit_label = 'Time period [d]'
elif ( np.ptp(time_binsize) > 5/24 ) and ( np.ptp(time_binsize) < 1. ):
## If times duration is greater than 2 hours, but less than 2 days, we use hours
time_unit_multiplication_factor = 24. # Converting days to hours
time_unit_label = 'Time period [hr]'
else:
## If times duration is less than 2 hours, we use minutes
time_unit_multiplication_factor = 24 * 60
time_unit_label = 'Time period [min]'
else:
if timeunit == 'd':
time_unit_multiplication_factor = 1.
time_unit_label = 'Time period [d]'
elif timeunit == 'hr':
time_unit_multiplication_factor = 24.
time_unit_label = 'Time period [hr]'
elif timeunit == 'min':
time_unit_multiplication_factor = 24 * 60
time_unit_label = 'Time period [min]'
else:
raise ValueError("timeunit should be one of 'd', 'hr', or 'min'.")
# The following two functions will convert binsize to time (in minutes) and vice versa
def bin2time(binsize):
return binsize * np.nanmedian(np.diff(times)) * time_unit_multiplication_factor
def time2bin(bintime):
return bintime / ( np.nanmedian(np.diff(times)) * time_unit_multiplication_factor )
## Adding secondary x-axis for time
secax = axs.secondary_xaxis('top', functions=(bin2time, time2bin))
secax.set_xlabel(time_unit_label, labelpad=10)
axs.set_xlim([ np.min(binsize), np.max(binsize) ])
axs.set_xscale('log')
axs.set_yscale('log')
axs.set_xlabel('Bin size [Number of points]')
axs.set_ylabel('Noise estimate [ppm]')
axs.legend()
return fig, axs
[docs]
class InvertCowanAgolPC(object):
"""Invert phase curve observations to physical properties using Cowan & Agol (2008) model.
This class implements tools to analyze phase-curve observations, computing thermal properties
(temperature maps, day/night temperatures, Bond albedo, heat re-distribution efficiencies) and
orbital properties (hotspot offset, phase offset) using the parameterized phase-curve
model of Cowan & Agol (2008).
The class leverages the model's Fourier expansion coefficients (A0, A1, B1, A2, B2)
derived from eclipse depths (E) and phase-curve coefficients (C1, D1, C2, D2) to
construct 2D thermal maps and compute various physical properties across posterior
samples for uncertainty quantification.
Parameters
----------
E : ndarray
Array of eclipse depths (F_p/F_*) from posterior samples.
C1 : ndarray
Phase-curve cosine coefficient (first harmonic) from posterior samples.
D1 : ndarray
Phase-curve sine coefficient (first harmonic) from posterior samples.
C2 : ndarray
Phase-curve cosine coefficient (second harmonic) from posterior samples.
D2 : ndarray
Phase-curve sine coefficient (second harmonic) from posterior samples.
rprs : float or ndarray
Planet-to-star radius ratio. If scalar, same value used for all samples;
if array, sampled from for each posterior sample.
bandpass : dict, optional
Dictionary containing instrument bandpass data with keys 'WAVE' and 'RESPONSE'.
Required for computing brightness temperatures. Values must be Astropy Quantities.
Default is ``None``.
method : str, optional
Optimization method for fitting temperatures to flux (passed to
``scipy.optimize.minimize``). Default is ``'Nelder-Mead'``.
teff_star : float (astropy.units.Quantity) or None, optional
Stellar effective temperature in Kelvin. Required for computing Bond albedo,
heat redistribution efficiency, and average day/night temperatures. If not
provided, a warning is printed. Default is ``None``.
stellar_spec : dict or None, optional
Dictionary containing stellar spectrum with keys 'WAVE' and 'FLUX'. Used
to interpolate stellar flux at bandpass wavelengths. If provided,
overrides ``teff_star``-based blackbody spectrum. Values must be Astropy
Quantities. Default is ``None``.
nthreads : int, optional
Number of CPU threads to use for parallel processing of temperature maps
and multiprocessing operations. Default is the total number of available
CPU cores (``multiprocessing.cpu_count()``).
ntheta : int, optional
Number of grid points across latitude (from -π/2 to π/2). Default is ``49``.
nphi : int, optional
Number of grid points across longitude (from -π to π). Default is ``100``.
pout : str, optional
Output directory for saving intermediate results (temperature maps,
brightness temperatures, etc.). Default is the current working directory.
Attributes
----------
E, C1, D1, C2, D2 : ndarray
Stored input parameters (eclipse depths and phase-curve coefficients).
rprs : float or ndarray
Stored planet-to-star radius ratio.
bandpass, method, teff_star, stellar_spec : as above
Stored input parameters for spectral calculations.
nthreads, ntheta, nphi, pout : as above
Stored configuration parameters.
A0, A1, B1, A2, B2 : ndarray
Fourier coefficients derived from input parameters according to Cowan & Agol (2008)
model equations.
wav_star, fl_star : Astropy Quantity
Stellar wavelength array and flux spectrum (blackbody or interpolated).
wav_instrument, response_instrument, trans_fun : Astropy Quantity or ndarray
Instrument bandpass wavelengths, response curve, and normalized transmission
function evaluated on stellar wavelengths.
phi_ang, theta_ang : ndarray
1D arrays of longitude and latitude grid points.
temp_map : ndarray
3D array of temperature maps (shape: N_samples, N_phi, N_theta) computed
when :meth:`temperature_map_distribution` is called.
Main methods
------------
TdayTnight()
Compute dayside and nightside brightness temperatures.
calculate_0D_Ab_eps(ar)
Compute 0D Bond albedo and heat redistribution efficiency from brightness temperatures.
phase_offsets(method='root')
Compute hotspot offset and phase offset from phase curve.
temperature_map_distribution(nsamples=2000)
Compute 2D temperature map distribution across posterior samples.
median_temperature_map(plot=False, cmap='plasma')
Compute and optionally plot median temperature map.
equatorial_temp_map(plot=False, nsamples=2000)
Compute and optionally plot temperature along equator.
forward_phase_curve_model(plot=False)
Compute forward-model phase curve from 2D flux distribution.
albedo_eps_from_temp_map(a_by_Rst, uniform_nightside_temp=True, nsamples=2000)
Compute Bond albedo and heat redistribution from temperature map distribution.
albedo_eps_from_fpfs_map(a_by_Rst)
Compute Bond albedo and heat redistribution from flux map distribution.
albedo_from_kempton23(a_by_Rst, teq)
Compute Bond albedo using Kempton et al. (2023) method.
Notes
-----
- All temperature computations use the bandpass-weighted brightness temperature
assuming blackbody emission.
- The class uses multiprocessing for computationally intensive operations
(e.g., temperature map fitting).
- Intermediate results are cached as .npy files in ``pout`` to avoid
re-computation.
- The Cowan & Agol (2008) model assumes a parameterized intensity map I(φ, θ)
with latitudinal and longitudinal dependencies.
References
----------
- Cowan, N. B., & Agol, E. 2008. Inverting Phase Functions to Map Exoplanets.
The Astrophysical Journal Letters, 678(2), L129.
- Kempton, E. M. R., et al. 2023. A reflective, metal-rich atmosphere for
GJ 1214b from its JWST phase curve, Nature, 620(7972), 67
- Keating, D., Cowan, N. B., & Dang, L. 2019. Uniformly hot nightside
temperatures on short-period gas giants. Nature Astronomy, 3, 1092.
Examples
--------
Analyze eclipse depths and phase curve to extract thermal properties:
>>> # Load posterior samples
>>> E = posterior_samples['eclipse_depth'] # Array of eclipse depths
>>> C1 = posterior_samples['phase_c1']
>>> D1 = posterior_samples['phase_d1']
>>> C2 = posterior_samples['phase_c2']
>>> D2 = posterior_samples['phase_d2']
>>> rprs = posterior_samples['rprs']
>>> # Define bandpass and stellar spectrum
>>> bandpass = load_bandpass('TESS') # Must have 'WAVE' and 'RESPONSE' keys
>>> teff_star = 5800 * u.K # Kelvin
>>> # Create the inversion object
>>> invert = InvertCowanAgolPC(E=E, C1=C1, D1=D1, C2=C2, D2=D2, rprs=rprs,
... bandpass=bandpass, teff_star=teff_star)
>>> # Compute temperature maps
>>> temp_maps = invert.temperature_map_distribution(nsamples=1000)
>>> # Compute albedos and heat redistribution
>>> a_by_rst = 10.0 # Semi-major axis in units of stellar radius
>>> A_B, eps_kelp, T_day, T_night, eps_keating = invert.albedo_eps_from_temp_map(a_by_rst)
"""
def __init__(self, E, C1, D1, C2, D2, rprs, bandpass=None, method='Nelder-Mead', teff_star=None, stellar_spec=None,\
nthreads=multiprocessing.cpu_count(), ntheta=49, nphi=100, pout=os.getcwd()):
## Theta is along the latitude, phi is along the longitude
# Initializing the class with the parameters of the Cowan & Agol (2008) phase curve model and output folder.
self.E = E
self.C1, self.D1, self.C2, self.D2 = C1, D1, C2, D2
self.rprs = rprs
self.bandpass = bandpass
self.method = method
self.teff_star = teff_star
self.stellar_spec = stellar_spec
self.nthreads = nthreads
self.ntheta, self.nphi = ntheta, nphi
self.pout = pout
## Calculating the A coefficents of the Cowan & Agol (2008) phase curve model from the input parameters
self.A0 = (self.E - self.C1 - self.C2) / 2
self.A1 = 2 * self.C1 / np.pi
self.B1 = -2 * self.D1 / np.pi
self.A2 = 3 * self.C2 / 2
self.B2 = -3 * self.D2 / 2
# prepare stellar spectrum
if (self.stellar_spec is not None) and (self.teff_star is None):
self.wav_star = self.stellar_spec['WAVE'].to(u.micron)
self.fl_star = self.stellar_spec['FLUX'].to(u.J / u.s / u.m**2 / u.micron)
elif (self.teff_star is not None) and (self.stellar_spec is None):
self.wav_star = bandpass['WAVE'].to(u.micron)
self.fl_star = planck_func(self.wav_star, self.teff_star)
else:
print('>>> --- It looks like you have not provided either stellar_spec or teff_star.\n Please provide one of them to prepare the stellar spectrum.\n Otherwise we will not be able to calculate the brightness temperatures.')
if self.teff_star is None:
print('>>> --- It looks like you have not provided the stellar effective temperature.\n Please provide it if you want to calculate the Bond albedo and heat re-distribution efficiency, as well as the average dayside and nightside temperatures.')
# Prepare the bandpass
if self.bandpass is not None:
self.wav_instrument = self.bandpass['WAVE'].to(u.micron)
self.response_instrument = self.bandpass['RESPONSE']
# build transmission evaluated on stellar wavelengths
spln2 = interp1d(x=self.wav_instrument.value, y=self.response_instrument,
bounds_error=False, fill_value=0.0)
self.trans_fun = spln2(self.wav_star.value)
# guard against division by zero if response all zeros
if np.max(self.trans_fun) > 0:
self.trans_fun = self.trans_fun / np.max(self.trans_fun)
else:
print('>>> --- It looks like you have not provided the instrument bandpass.\n Please provide one if you want to calculate the brightness temperatures.')
# Latitude and longitude grids for the temperature map
self.phi_ang = np.linspace(-np.pi, np.pi, self.nphi)
self.theta_ang = np.linspace(-np.pi/2, np.pi/2, self.ntheta)
[docs]
def TdayTnight(self):
"""Compute 0D dayside and nightside brightness temperatures.
Calculates integrated brightness temperatures for the dayside and
nightside using the eclipse depths and phase-curve parameters. Results are
cached to disk to avoid re-computation. Uses the
`BrightnessTemperatureCalculator` helper class for computation.
The dayside brightness temperature is derived directly from the eclipse depth,
while the nightside is estimated from E - 2*C1.
Returns
-------
bt_day : ndarray
Dayside brightness temperatures (in Kelvin) for each posterior sample.
Shape: (N_samples,).
bt_night : ndarray
Nightside brightness temperatures (in Kelvin) for each posterior sample.
Shape: (N_samples,).
Notes
-----
- Results are cached to ``{pout}/Brightness_temp_day.npy`` and
``{pout}/Brightness_temp_night.npy``.
- Requires that ``self.bandpass``, ``self.teff_star`` (or
``self.stellar_spec``), are properly set during initialization.
- This function should be called before :meth:`calculate_0D_Ab_eps`.
Examples
--------
>>> invert = InvertCowanAgolPC(E=E, C1=C1, D1=D1, C2=C2, D2=D2,
... rprs=0.1, bandpass=bp, teff_star=5800 * u.K)
>>> bt_day, bt_night = invert.TdayTnight()
>>> print(f"Dayside brightness temperature: {np.median(bt_day)} K")
"""
# This function calculates the brightness temperatures of the dayside
fp_day = np.copy( self.E )
fp_night = self.E - ( 2 * self.C1 )
# ---------- Dayside brightness temperature ----------
# If the file already exists, we don't need to calculate it again
fname = self.pout + '/Brightness_temp_day.npy'
if os.path.isfile(fname):
print('>>>> --- The dayside brightness temperature file already exists...')
print(' Loading it...')
bt_day = np.load(fname)
else:
# And now we will use the BrightnessTemperatureCalculator class to calculate the brightness temperatures
btc_day = BrightnessTemperatureCalculator( fp=fp_day, rprs=self.rprs, bandpass=self.bandpass, nthreads=self.nthreads,\
method=self.method, teff_star=self.teff_star, stellar_spec=self.stellar_spec, pout=self.pout )
bt_day = btc_day.compute()
## Renaming the saved file to make it clear that it is the dayside brightness temperature
os.rename( self.pout + '/Brightness_temp.npy', self.pout + '/Brightness_temp_day.npy' )
# ---------- Nightside brightness temperature ----------
# If the file already exists, we don't need to calculate it again
fname = self.pout + '/Brightness_temp_night.npy'
if os.path.isfile(fname):
print('>>>> --- The nightside brightness temperature file already exists...')
print(' Loading it...')
bt_night = np.load(fname)
else:
# And now we will use the BrightnessTemperatureCalculator class to calculate the brightness temperatures
btc_night = BrightnessTemperatureCalculator( fp=fp_night, rprs=self.rprs, bandpass=self.bandpass, nthreads=self.nthreads,\
method=self.method, teff_star=self.teff_star, stellar_spec=self.stellar_spec, pout=self.pout )
bt_night = btc_night.compute()
## Renaming the saved file to make it clear that it is the nightside brightness temperature
os.rename( self.pout + '/Brightness_temp.npy', self.pout + '/Brightness_temp_night.npy' )
return bt_day, bt_night
[docs]
def calculate_0D_Ab_eps(self, ar):
"""Compute Bond albedo and heat redistribution efficiency from 0D brightness temperatures.
Calculates the 0D (calculated from spatially averaged temperatures) Bond albedo
and heat redistribution efficiency using the dayside and nightside brightness
temperatures computed from :meth:`TdayTnight`. This implementation uses the
relationship between the planet's temperature, stellar radiation, and thermal emission
to infer physical properties.
Parameters
----------
ar : float, or ndarray
Semi-major axis in units of stellar radius (a/R_*).
Returns
-------
A_B : ndarray
Bond albedo for each posterior sample (unitless).
Shape: (N_samples,). Values typically range from 0 to 1.
eps : ndarray
Heat redistribution efficiency for each posterior sample (unitless).
Shape: (N_samples,). Values typically range from 0 to 1.
Notes
-----
- Requires :meth:`TdayTnight` to be called first (or this function will call it).
- Assumes the 0D approximation where the planet is treated as having
uniform dayside and nightside temperatures.
Examples
--------
>>> bt_day, bt_night = invert.TdayTnight()
>>> A_Bond, eps = invert.calculate_0D_Ab_eps(ar=10.0)
>>> print(f"Bond albedo: {np.median(A_Bond):.3f}")
>>> print(f"Heat redistribution: {np.median(eps):.3f}")
"""
# This function calculates the 0D albedo and recirculation efficiency from the brightness temperatures
T_day, T_night = self.TdayTnight()
T0 = self.teff_star.value / np.sqrt(ar)
Td_by_Tn_4 = ( T_day / T_night ) ** 4
Tn_by_T0_4 = ( T_night / T0 ) ** 4
eps = 8 / ( (3 * Td_by_Tn_4) + 5 )
A_B = 1 - ( 0.5 * Tn_by_T0_4 * ( 3*Td_by_Tn_4 + 5 ))
return A_B, eps
def _find_hotspot_off(self, a1, b1, a2, b2, method):
"""Find the hotspot offset from phase-curve coefficients.
Helper method that solves for the longitude (phase angle) where the
brightness temperature reaches its maximum by finding where
dFp/dφ = 0. The hotspot offset quantifies how far the longitude of
maximum temperature has shifted from the substellar point (φ = 0).
Parameters
----------
a1 : float
First-harmonic cosine coefficient (A1).
b1 : float
First-harmonic sine coefficient (B1).
a2 : float
Second-harmonic cosine coefficient (A2).
b2 : float
Second-harmonic sine coefficient (B2).
method : str
Root-finding method: ``'root'`` uses ``scipy.optimize.root``
or ``'fsolve'`` uses ``scipy.optimize.fsolve``.
Returns
-------
off_loc : float
Hotspot offset in degrees. The location of the brightness maximum
relative to the substellar point.
Notes
-----
Finds the solution to:
$$ \\frac{dF_p}{d\\varphi} = -a_1 \\sin(\\varphi) + b_1 \\cos(\\varphi) - 2a_2 \\sin(2\\varphi) + 2b_2 \\cos(2\\varphi) = 0 $$
Examples
--------
>>> off = invert._find_hotspot_off(a1, b1, a2, b2, method='root')
"""
# Helper function to calculate the hotspot offset
# First writing a function solving which will give the location of the hotspot offset
# So, what I did was to compute Fp/F* as a function of phi, and solve for dFp/dphi = 0
def dfpdphi(phi, a1, b1, a2, b2):
dfphi = ( -a1 * np.sin(phi) ) + (b1 * np.cos(phi) ) + (-2 * a2 * np.sin(2*phi) ) + (2 * b2 * np.cos(2*phi) )
return dfphi
# Now solving it, based on the method
if method == 'root':
soln = root(fun=dfpdphi, x0=0., args=(a1, b1, a2, b2))
off_loc = np.rad2deg(soln.x[0])
elif method == 'fsolve':
soln = fsolve(func=dfpdphi, x0=0, args=(a1, b1, a2, b2))
off_loc = np.rad2deg(soln[0])
return off_loc
def _find_phase_off(self, c1, d1, c2, d2, method):
"""Find the phase offset from phase-curve coefficients.
Helper method that solves for the phase angle where the observed flux
reaches its maximum by finding where dFp/dωt = 0. The phase offset
quantifies how far the peak of the phase curve has shifted from the
secondary eclipse (ωt = π).
Parameters
----------
c1 : float
First-harmonic cosine coefficient (C1) from phase-curve fitting.
d1 : float
First-harmonic sine coefficient (D1) from phase-curve fitting.
c2 : float
Second-harmonic cosine coefficient (C2) from phase-curve fitting.
d2 : float
Second-harmonic sine coefficient (D2) from phase-curve fitting.
method : str
Root-finding method: ``'root'`` uses ``scipy.optimize.root``
or ``'fsolve'`` uses ``scipy.optimize.fsolve``.
Returns
-------
off_phs_deg : float
Phase offset in degrees relative to ωt = π (secondary eclipse).
Negative offsets indicate the peak occurs before eclipse;
positive offsets indicate it occurs after eclipse.
Notes
-----
Finds the solution to:
$$ \\frac{dF_p}{d\\omega t} = -c_1 \\sin(\\omega t) + d_1 \\cos(\\omega t) - 2c_2 \\sin(2\\omega t) + 2d_2 \\cos(2\\omega t) = 0 $$
Examples
--------
>>> phase_off = invert._find_phase_off(c1, d1, c2, d2, method='root')
>>> print(f"Phase offset: {phase_off} degrees")
"""
# Another helper function to calculate the phase offset
# First writing a function solving which will give the location of the hotspot offset
# So, what I did was to compute Fp/F* as a function of phi, and solve for dFp/dphi = 0
def dfpdwt(wt, c1, d1, c2, d2):
dfphi = ( -c1 * np.sin(wt) ) + (d1 * np.cos(wt) ) + ( -2 * c2 * np.sin(2*wt) ) + ( 2 * d2 * np.cos(2*wt) )
return dfphi
# Now solving it, based on the method
if method == 'root':
soln = root(fun=dfpdwt, x0=0., args=(c1, d1, c2, d2))
off_loc_wt = soln.x[0]
elif method == 'fsolve':
soln = fsolve(func=dfpdwt, x0=0, args=(c1, d1, c2, d2))
off_loc_wt = soln[0]
phs = ( ( off_loc_wt - np.pi ) / (2 * np.pi) ) % 1
off_phs_deg = np.rad2deg( 2 * np.pi * (phs - 0.5) )
return off_phs_deg
[docs]
def phase_offsets(self, method='root'):
"""Compute hotspot offset and phase offset from phase-curve parameters.
Calculates two complementary measures of the offset of thermal features
from expected locations:
1. **Hotspot offset**: The longitude (in degrees) where the brightness
temperature maximum occurs, relative to the substellar point.
2. **Phase offset**: The orbital phase (in degrees relative to secondary
eclipse) where the observed phase-curve flux peaks.
These quantities help constrain heat transport and atmospheric dynamics
by revealing where thermal features are located relative to the
tidally-locked dayside.
Parameters
----------
method : str, optional
Root-finding method to use for identifying extrema. Options are
``'root'`` (default, uses ``scipy.optimize.root``) or ``'fsolve'``
(uses ``scipy.optimize.fsolve``). Default is ``'root'``.
Returns
-------
hotspot_off : ndarray
Hotspot offset in degrees for each posterior sample.
Shape: (N_samples,). Must be computed in the range [-180, 180] degrees.
phase_off : ndarray
Phase offset in degrees for each posterior sample.
Shape: (N_samples,). Positive values indicate the phase-curve
maximum occurs before the secondary eclipse; negative values
indicate it occurs after.
Notes
-----
- Computation uses :meth:`_find_hotspot_off` and :meth:`_find_phase_off`
for each posterior sample.
- This method uses progress bars (tqdm) to show computation progress.
- Results are not cached; re-computing will recalculate offsets from scratch.
Examples
--------
>>> hotspot_off, phase_off = invert.phase_offsets(method='root')
>>> print(f"Median hotspot offset: {np.median(hotspot_off):.1f} degrees")
>>> print(f"Median phase offset: {np.median(phase_off):.1f} degrees")
"""
## This function calculates the phase offset: 1) phase offset of the observed phase curve, and
## 2) shift of the maximum of the temperature map from the substellar point (the "hotspot offset")
# ---------- Calculating the hotspot offset and phase offset ---------
hotspot_off = np.zeros( len(self.E) )
for i in tqdm(range(len(hotspot_off))):
hotspot_off[i] = self._find_hotspot_off( self.A1[i], self.B1[i], self.A2[i], self.B2[i], method=method )
phase_off = np.zeros( len(self.E) )
for i in tqdm(range(len(phase_off))):
phase_off[i] = self._find_phase_off( self.C1[i], self.D1[i], self.C2[i], self.D2[i], method=method )
return hotspot_off, phase_off
def _calculate_2d_temp_map(self, fpfs_2d, rprs_ratio):
"""Compute 2D temperature map from 2D flux distribution.
Helper method that converts a 2D array of bandpass-normalized planet-to-star
flux ratios (F_p/F_*) into a 2D brightness temperature map by fitting a
blackbody spectrum to each grid point independently.
Parameters
----------
fpfs_2d : ndarray
2D array of F_p/F_* values with shape (N_phi, N_theta) representing
the flux distribution across longitude and latitude.
rprs_ratio : float
Planet-to-star radius ratio.
Returns
-------
Tmap : ndarray
2D array of brightness temperatures (in Kelvin) with same shape as
``fpfs_2d``. Grid points where ``fpfs_2d <= 0`` have temperature set to 0.
Notes
-----
- For each grid point with positive flux, fits a blackbody spectrum
that best reproduces the observed bandpass-weighted flux.
- Uses ``scipy.optimize.minimize`` with the method from ``self.method``
to find the temperature that minimizes chi-squared difference between
observed and modeled flux.
- Temperatures are computed independently per grid point.
- Used internally by :meth:`temperature_map_distribution` and
:meth:`median_temperature_map`.
Examples
--------
>>> Tmap = invert._calculate_2d_temp_map(fpfs_2d, rprs_ratio=0.1)
>>> print(f"Max temperature: {np.nanmax(Tmap)} K")
"""
# This is helper function to calculate the 2D temperature map for a given 2D fp/f* map and rprs ratio
## Temperature map
Tmap = np.zeros( (self.nphi, self.ntheta) )
for i in range(self.ntheta):
for j in range(self.nphi):
if fpfs_2d[j,i] > 0:
fp_pl = fpfs_2d[j,i] * simpson(y=self.fl_star.value*self.trans_fun, x=self.wav_star.value) / (rprs_ratio**2)
def func_to_minimize_new(x):
planet_bb = planck_func(self.wav_star, x*u.K)
planet_den = simpson(y=planet_bb.value*self.trans_fun, x=self.wav_star.value)
chi2 = (fp_pl - planet_den)**2
return chi2
soln = minimize(fun=func_to_minimize_new, x0=1000., method=self.method)
Tmap[j,i] = soln.x
else:
Tmap[j,i] = np.array([0.])
return Tmap
[docs]
def temperature_map_distribution(self, nsamples=2000):
"""Compute 2D temperature map distribution across posterior samples.
Builds a distribution of 2D temperature maps (shape: N_samples, N_phi, N_theta)
by converting the posterior distribution of flux maps into temperature
maps via brightness temperature fitting. Results are cached and can be
re-used by other methods.
This method randomly samples from the posterior distribution if more
posterior samples exist than requested, improving computational efficiency
while maintaining statistical representation.
Parameters
----------
nsamples : int, optional
Number of posterior samples to process (randomly selected from all
available samples). Default is ``2000``.
Returns
-------
temp_map : ndarray
3D array of brightness temperature maps with shape
(nsamples, N_phi, N_theta). Also stored as ``self.temp_map``.
Notes
-----
- Uses multiprocessing with ``self.nthreads`` processes for parallel
computation of temperature maps.
- Results are cached to ``{pout}/Temperature_map.npy`` and reloaded
on subsequent calls.
- Uses progress bars (tqdm) to show computation progress.
- This is a computationally intensive operation and typically requires
several minutes to hours for large grids.
Examples
--------
>>> temp_maps = invert.temperature_map_distribution(nsamples=1000)
>>> print(temp_maps.shape)
(1000, 100, 49)
>>> # Get median map
>>> temp_median = np.median(temp_maps, axis=0)
"""
# This function calculates the distribution of the temperature maps
# Now, let's calculate the fp/f* map distribution
fpfs_map = np.zeros( (len(self.E), self.nphi, self.ntheta) )
for integration in range( len(self.E) ):
J_phi = self.A0[integration] + ( self.A1[integration] * np.cos(self.phi_ang) ) + ( self.B1[integration] * np.sin(self.phi_ang) )+\
( self.A2[integration] * np.cos(2*self.phi_ang) ) + ( self.B2[integration] * np.sin(2*self.phi_ang) )
for th in range(self.ntheta):
fpfs_map[integration, :, th] = np.sin(self.theta_ang[th] + np.pi/2) * J_phi * 0.75
# Selecting NSample samples from all samples
fpfs_map_pos_samples = fpfs_map[np.random.choice(np.arange(fpfs_map.shape[0]), size=nsamples, replace=False), :, :]
# Computing brightness temperature for the posterior of eclipse depth
temp_map_path = Path(self.pout + '/Temperature_map.npy')
if temp_map_path.exists():
print('>>> --- The temperature map distribution file already exists...')
print(' Loading it...')
self.temp_map = np.load(self.pout + '/Temperature_map.npy')
else:
if np.isscalar(self.rprs):
rprs_arr = np.full(fpfs_map_pos_samples.shape, float(self.rprs))
else:
rprs_arr = np.random.choice( self.rprs, size=nsamples, replace=False )
# prepare inputs for multiprocessing
inputs = [(fpfs_map_pos_samples[i,:,:], rprs_arr[i]) for i in range( fpfs_map_pos_samples.shape[0] )]
# use Pool with bound method; user's environment set_start_method('fork') so this is fine
with multiprocessing.Pool(self.nthreads) as p:
result_list = p.starmap(self._calculate_2d_temp_map, tqdm(inputs, total=nsamples), chunksize=self.nthreads)
self.temp_map = np.array(result_list)
# Saving the data
np.save(self.pout + '/Temperature_map.npy', self.temp_map)
return self.temp_map
def _compute_temp_along_longitude(self, fpfs, rprs_ratio):
"""Compute 1D temperature profile along longitude (i.e., for a given latitude such as equator).
Helper method that converts a 1D array of bandpass-normalized planet-to-star
flux ratios along any longitude into a 1D brightness temperature
profile by fitting blackbody spectra.
Parameters
----------
fpfs : ndarray
1D array of F_p/F_* values with shape (N_phi,) representing the
flux distribution across longitude at a given latitude.
rprs_ratio : float
Planet-to-star radius ratio for this sample.
Returns
-------
Tmap : ndarray
1D array of brightness temperatures (in Kelvin) with shape (N_phi,).
Grid points where ``fpfs <= 0`` have temperature set to 0.
Notes
-----
- Used internally by :meth:`equatorial_temp_map`.
- Similar to :meth:`_calculate_2d_temp_map` but computes only along
latitude θ = π/2 (equator).
Examples
--------
>>> Tmap_equator = invert._compute_temp_along_longitude(fpfs_1d, 0.1)
"""
## This is a helper function to compute the temperature along the longitude for a given fp/f* map and rprs ratio
# Temperature map
Tmap = np.zeros(self.nphi)
for j in range(self.nphi):
if fpfs[j] > 0.:
fp_pl = fpfs[j] * simpson(y=self.fl_star.value*self.trans_fun, x=self.wav_star.value) / (rprs_ratio**2)
def func_to_minimize_new(x):
planet_bb = planck_func(self.wav_star, x*u.K)
planet_den = simpson(y=planet_bb.value*self.trans_fun, x=self.wav_star.value)
chi2 = (fp_pl - planet_den)**2
return chi2
soln = minimize(fun=func_to_minimize_new, x0=1000., method=self.method)
Tmap[j] = soln.x
else:
Tmap[j] = np.array([0.])
return Tmap
[docs]
def equatorial_temp_map(self, plot=False, nsamples=2000):
"""Compute and optionally plot temperature profile along the equator.
Calculates a distribution of 1D temperature profiles along the equator
(θ = 0) across posterior samples. Useful for understanding longitudinal
variations in brightness temperature at the substellar latitude.
Parameters
----------
plot : bool, optional
If ``True``, generate a plot showing the median equatorial temperature
profile with individual posterior samples overplotted. Default is ``False``.
nsamples : int, optional
Number of posterior samples to process. Default is ``2000``.
Returns
-------
temp_map_equatorial : ndarray
2D array of brightness temperatures along the equator with shape
(nsamples, N_phi). Also saved to
``{pout}/Equatorial_Temperature_map.npy``.
fig : matplotlib.figure.Figure or None
Figure object if ``plot=True``, otherwise ``None``.
axs : matplotlib.axes.Axes or None
Axes object if ``plot=True``, otherwise ``None``.
Notes
-----
- Uses multiprocessing to parallelize temperature calculations.
- Results are cached and reloaded on subsequent calls.
- Posterior samples are randomly selected if the total exceeds ``nsamples``.
- When ``plot=True``, the plot shows 100 random posterior samples as light
curves and the median as a heavy line.
Examples
--------
>>> Tmap_equator = invert.equatorial_temp_map(plot=True, nsamples=1000)
>>> print(Tmap_equator.shape)
(1000, 100)
"""
# This function calculates the temperature map along the equator (theta=0) and plots it if plot=True
# Simply load the file if it already exists
temp_map_path = Path(self.pout + '/Equatorial_Temperature_map.npy')
if temp_map_path.exists():
print('>>> --- The equatorial temperature map file already exists...')
print(' Loading it...')
temp_map_equatorial = np.load(temp_map_path)
else:
# Now, let's calculate the fp/f* map distribution along the equator (theta=0)
fpfs_equatorial = np.zeros( (len(self.E), self.nphi) )
for integration in range( len(self.E) ):
J_phi = self.A0[integration] + ( self.A1[integration] * np.cos(self.phi_ang) ) + ( self.B1[integration] * np.sin(self.phi_ang) )+\
( self.A2[integration] * np.cos(2*self.phi_ang) ) + ( self.B2[integration] * np.sin(2*self.phi_ang) )
fpfs_equatorial[integration, :] = np.sin(np.pi/2) * J_phi * 0.75
# Selecting NSample samples from all samples
fpfs_equatorial_pos_samples = fpfs_equatorial[np.random.choice(np.arange(fpfs_equatorial.shape[0]), size=nsamples, replace=False), :]
# Computing brightness temperature for the median fpfs map along the equator
if np.isscalar(self.rprs):
rprs_arr = np.full(fpfs_equatorial_pos_samples.shape, float(self.rprs))
else:
rprs_arr = np.random.choice( self.rprs, size=nsamples, replace=False )
# prepare inputs for multiprocessing
inputs = [(fpfs_equatorial_pos_samples[i,:], rprs_arr[i]) for i in range( fpfs_equatorial_pos_samples.shape[0] )]
# use Pool with bound method; user's environment set_start_method('fork') so this is fine
with multiprocessing.Pool(self.nthreads) as p:
result_list = p.starmap(self._compute_temp_along_longitude, tqdm(inputs, total=nsamples), chunksize=self.nthreads)
temp_map_equatorial = np.array(result_list)
np.save(self.pout + '/Equatorial_Temperature_map.npy', temp_map_equatorial)
if plot:
fig, axs = plt.subplots()
axs.plot(np.rad2deg(self.phi_ang), np.nanmedian(temp_map_equatorial, axis=0), color='navy', lw=2., zorder=100)
for i in range(100):
axs.plot(np.rad2deg(self.phi_ang), temp_map_equatorial[np.random.randint(0,temp_map_equatorial.shape[0]), :], alpha=0.5, color='orangered', lw=1., zorder=10)
axs.set_xlim([-180., 180.])
axs.set_xlabel('Longitude [deg]')
axs.set_ylabel('Temperature [K]')
return temp_map_equatorial, fig, axs
else:
return temp_map_equatorial
def _phase_curve_forward_model(self, I_phi_theta, phi, theta, xi):
"""Compute phase-curve signal from a 2D flux distribution.
Helper method that integrates a 2D flux distribution I(φ, θ) over the
visible hemisphere as a function of orbital phase to predict the
observed phase curve. Uses integration over visible longitude and latitude.
Parameters
----------
I_phi_theta : ndarray
2D array of intensity map I(φ, θ) with shape (N_phi, N_theta).
phi : ndarray
1D array of longitude values (in radians) from -π to π.
theta : ndarray
1D array of latitude values (in radians) from -π/2 to π/2.
xi : ndarray
1D array of orbital phase angles (in radians). ξ = 0 at occultation,
ξ = ±π at transits.
Returns
-------
f_xi : ndarray
Phase curve signal (F_p/F_* equivalent) for each orbital phase.
Shape: (len(xi),).
Notes
-----
- Integrates only over the visible hemisphere for each phase angle.
- Uses meridian longitude bounds that change with orbital phase:
only pixels with -ξ - π/2 < φ < -ξ + π/2 are integrated.
- Uses 2D trapezoidal integration from ``scipy.integrate.trapz2d``.
- Used internally by :meth:`forward_phase_curve_model`.
Examples
--------
>>> xi = np.linspace(-np.pi, np.pi, 100)
>>> fpcs = invert._phase_curve_forward_model(I_2d, phi_ang, theta_ang, xi)
"""
# Given the 2D flux distribution map, I(phi, theta), we will compute phase curve signal
# as a function of xi (which is 0 at occultation and +/- pi at transits)
# 2D angles
theta2d, phi2d = np.meshgrid(theta, phi)
f_xi = np.zeros(len(xi))
for i in range(len(xi)):
# lower and upper limits on phi integration
l1_phi, l2_phi = -xi[i] - np.pi/2, -xi[i] + np.pi/2
## Both l1_phi and l2_phi must be within (-np.pi, np.pi)
if l1_phi < -np.pi:
l1_phi = l1_phi + (2 * np.pi)
if l2_phi > np.pi:
l2_phi = l2_phi - (2 * np.pi)
idx_phi = np.zeros(len(phi), dtype=bool)
if l2_phi > l1_phi:
idx_phi[ (phi>l1_phi) & (phi<l2_phi) ] = True
else:
idx_phi[ (phi>l1_phi) | (phi<l2_phi) ] = True
# And the integration
f_xi[i] = trapz2d( I_phi_theta[idx_phi,:,None] * np.cos(phi2d[idx_phi,:,None] + xi[i]) * np.sin(theta2d[idx_phi,:,None]) * np.sin(theta2d[idx_phi,:,None]), phi[idx_phi], theta)[0]
return f_xi
[docs]
def forward_phase_curve_model(self, plot=False):
"""Compute forward-model phase curve from 2D flux distribution.
Converts the median 2D flux map into a forward-modeled phase curve for
comparison with the fitted phase-curve parameterization. Provides a
self-consistency check between the temperature-map representation and
the fitted phase-curve model.
Parameters
----------
plot : bool, optional
If ``True``, generate a plot overlaying the forward model on the
fitted parameterized phase curve. Default is ``False``.
Returns
-------
FpFs_forward_model : ndarray
Forward-model phase curve values (F_p/F_*) sampled at 1000 orbital
phase angles from -π to π. Also returned when ``plot=True``.
fig : matplotlib.figure.Figure or None
Figure object if ``plot=True``, otherwise ``None``.
axs : matplotlib.axes.Axes or None
Axes object if ``plot=True``, otherwise ``None``.
Notes
-----
- Uses the median of the flux map distribution across all posterior samples.
- Fitted phase curve is computed from posterior median values of C1, D1, C2, D2.
- Plots both curves for visual comparison of model consistency.
Examples
--------
>>> FpFs_fwd = invert.forward_phase_curve_model(plot=True)
>>> print(f"Max forward model flux: {np.max(FpFs_fwd):.4f}")
"""
# Computing the Fp/F* (i.e., I(phi, theta) ) map
I_map = np.zeros( (len(self.E), self.nphi, self.ntheta) )
for integration in range( len(self.E) ):
fpfs_ph = self.A0[integration] + (self.A1[integration] * np.cos(self.phi_ang)) + (self.B1[integration] * np.sin(self.phi_ang))+\
(self.A2[integration] * np.cos(2*self.phi_ang)) + (self.B2[integration] * np.sin(2*self.phi_ang))
for th in range(self.ntheta):
I_map[integration, :, th] = np.sin(self.theta_ang[th] + np.pi/2) * fpfs_ph * 0.75
# Fitted phase curve model
xi = 2 * np.pi * (np.linspace(0., 1., 1000) - 0.5)
FpFs_fitted = np.nanmedian(self.E) + ( np.nanmedian(self.C1) * (np.cos( xi ) - 1.) ) + ( np.nanmedian(self.D1) * np.sin( xi ) ) +\
( np.nanmedian(self.C2) * (np.cos( 2*xi) - 1.) ) + ( np.nanmedian(self.D2) * np.sin( 2*xi ) )
FpFs_forward_model = self._phase_curve_forward_model(I_phi_theta=np.nanmedian(I_map, axis=0), phi=self.phi_ang, theta=self.theta_ang + np.pi/2, xi=xi)
if plot:
fig, axs = plt.subplots()
axs.plot(xi, FpFs_forward_model, color='navy', lw=1.5, label='Forward model', zorder=100)
axs.plot(xi, FpFs_fitted, color='orangered', lw=1.5, label='Fitted model', zorder=100)
axs.set_xlabel(r'$\xi$ [rad]')
axs.set_ylabel(r'$F_p/F_*$')
axs.legend()
return FpFs_forward_model, fig, axs
else:
return FpFs_forward_model
def _albedo_eps_tdaynight(self, temp_map, a_by_Rst):
"""Compute Bond albedo and heat redistribution from a 2D temperature map.
Helper method that computes Bond albedo, dayside/nightside brightness
temperatures, and heat redistribution efficiency (using two different methods)
from a single 2D temperature map.
Parameters
----------
temp_map : ndarray
2D array of brightness temperatures with shape (N_phi, N_theta).
a_by_Rst : float
Semi-major axis in units of stellar radius (a/R_*).
Returns
-------
A_Bond : float
Bond albedo computed from total planetary flux vs stellar flux.
eps_kelp : float
Heat redistribution efficiency using the Kelp method (nightside/dayside
flux ratio).
Tday : float
Dayside brightness temperature (integrated over dayside).
Tnight : float
Nightside brightness temperature (integrated over nightside).
eps_keating : float
Heat redistribution efficiency using Keating et al. (2019) method
(nightside flux / total flux).
Notes
-----
- Uses Stefan-Boltzmann law to relate temperature to thermal radiation.
- Dayside defined as |φ| < π/2; nightside defined as |φ| > π/2.
- Uses 2D trapezoidal integration.
- Original kelp-based code adapted for this implementation.
References
----------
- Keating, D., Cowan, N. B., & Dang, L. 2019. Uniformly hot nightside
temperatures on short-period gas giants. Nature Astronomy, 3, 1092.
- Original kelp package: https://github.com/bmorris3/kelp/blob/219a922849634d9e982cd7bd05910596dea2ef6e/kelp/core.py#L431
"""
# This function takes temperature map and theta (latitude) and phi (longitude)
# angles to compute the Bond albedo and heat re-distribution efficiency.
# Much of the code is copied from kelp package: https://github.com/bmorris3/kelp/blob/219a922849634d9e982cd7bd05910596dea2ef6e/kelp/core.py#L431
# First creating meshgrid of theta and phi
theta2d, phi2d = np.meshgrid(self.theta_ang + np.pi/2, self.phi_ang)
# ---------------------------
# To compute Bond albedo
# ---------------------------
# Now computing the planetary flux from temperature map
# Kelp added an addition condition here: to integrate only over positive values of phi
# I think that is because kelp has phi run from -2*pi to 2*pi, i.e., they have "two dayside" and "two nightside"
# To compensate for this, they only integrate over positive values of phi
# However, I chose phi from -pi to pi -- so, we don't need this extra condition
fl_pl_total = trapz2d( temp_map[...,None]**4 * np.sin(theta2d[...,None]), self.phi_ang, self.theta_ang + np.pi/2 )[0]
# Computing the flux from star
fl_star = np.pi * (self.teff_star.value ** 4)
# Computing the Bond albedo
A_Bond = 1 - ( (a_by_Rst**2) * fl_pl_total / fl_star)
# -----------------------------------
# To compute epsilon (kelp method)
# -----------------------------------
#
# kelp computes epsilon by taking ratio of nightside flux to the dayside flux
mask_day = np.abs(phi2d) < np.pi/2
mask_night = np.abs(phi2d) > np.pi/2
fl_day = np.sum( (temp_map * mask_day)**4 ) / np.sum(mask_day)
fl_night = np.sum( (temp_map * mask_night)**4 ) / np.sum(mask_night)
eps_kelp = fl_night / fl_day
# -----------------------------------------
# To compute T_day, T_night and epsilon
# (Keating, Cowan & Dang 2019)
# -----------------------------------------
# This method is slightly different -- they compute heat redistribution
# efficiency as ratio of heat irradiated by nightside vs heat irradiated
# by the whole planet
#
# Further, they compute dayside and nightside temperature by integrating temperatures
# on the dayside and nightside, respectively.
# Indices for the dayside and the nightside
idx_day = np.abs(self.phi_ang) < np.pi/2
idx_night = np.abs(self.phi_ang) > np.pi/2
#Tday = ( (0.5 / np.pi) * trapz2d( temp_map[...,None]**4 * np.sin(theta2d[...,None] ) * (np.abs(phi2d[...,None]) < np.pi/2 ), phi, theta )[0] ) ** 0.25
#Tnight = ( (0.5 / np.pi) * trapz2d( temp_map[...,None]**4 * np.sin(theta2d[...,None] ) * (np.abs(phi2d[...,None]) > np.pi/2 ), phi, theta )[0] ) ** 0.25
Tday = ( (0.5 / np.pi) * trapz2d( temp_map[idx_day, :, None]**4 * np.sin(theta2d[idx_day, :, None] ), self.phi_ang[idx_day], self.theta_ang + np.pi/2 )[0] ) ** 0.25
Tnight = ( (0.5 / np.pi) * trapz2d( temp_map[idx_night, :, None]**4 * np.sin(theta2d[idx_night, :, None] ), self.phi_ang[idx_night], self.theta_ang + np.pi/2 )[0] ) ** 0.25
#fl_pl_night = trapz2d( temp_map[...,None]**4 * np.sin(theta2d[...,None] ) * (np.abs(phi2d[...,None]) > np.pi/2 ), phi, theta )[0]
fl_pl_night = trapz2d( temp_map[idx_night, :, None]**4 * np.sin(theta2d[idx_night, :, None] ), self.phi_ang[idx_night], self.theta_ang + np.pi/2 )[0]
eps_keating = fl_pl_night / fl_pl_total
return A_Bond, eps_kelp, Tday, Tnight, eps_keating
def _albedo_eps_fpfs(self, fpfs_map, a_by_Rst):
"""Compute Bond albedo and heat redistribution from a 2D flux map.
Helper method that computes Bond albedo and heat redistribution efficiency
(using two different methods) directly from a 2D F_p/F_* map, without
requiring intermediate temperature conversion.
Parameters
----------
fpfs_map : ndarray
2D array of F_p/F_* values with shape (N_phi, N_theta).
a_by_Rst : float
Semi-major axis in units of stellar radius (a/R_*).
Returns
-------
A_Bond_fpfs : float
Bond albedo computed as 1 - (a/R*)^2 * integrated(F_p/F_*) / π.
eps_kelp : float
Heat redistribution efficiency (Kelp method): nightside/dayside
flux ratio.
eps_keating : float
Heat redistribution efficiency (Keating et al. 2019 method):
nightside flux / total flux.
Notes
-----
- Similar approach to :meth:`_albedo_eps_tdaynight` but operates on
flux map rather than temperature map.
- Dayside defined as |φ| < π/2; nightside defined as |φ| > π/2.
- Uses 2D trapezoidal integration.
- Original kelp-based code adapted for this implementation.
References
----------
- Keating, D., Cowan, N. B., & Dang, L. 2019. Uniformly hot nightside
temperatures on short-period gas giants. Nature Astronomy, 3, 1092.
- Original kelp package: https://github.com/bmorris3/kelp/blob/219a922849634d9e982cd7bd05910596dea2ef6e/kelp/core.py#L431
"""
# This function takes occultation depth map and theta (latitude) and phi (longitude)
# angles to compute the Bond albedo, heat re-distribution efficiency, and average dayside and nightside temperaturse.
# Much of the code is copied from kelp package: https://github.com/bmorris3/kelp/blob/219a922849634d9e982cd7bd05910596dea2ef6e/kelp/core.py#L431
# First creating meshgrid of theta and phi
theta2d, phi2d = np.meshgrid(self.theta_ang + np.pi/2, self.phi_ang)
# -----------------------------------
# To compute Bond albedo: method 1
# -----------------------------------
# We can directly compute the ratio of planetary to stellar flux since we already have Fp/F*
# We can simply integrate over Fp/F*
fl_fs_total = trapz2d( fpfs_map[...,None] * np.sin(theta2d[...,None]), self.phi_ang, self.theta_ang + np.pi/2 )[0]
# Computing the Bond albedo
A_Bond_fpfs = 1 - ( (a_by_Rst**2) * fl_fs_total / np.pi )
# -----------------------------------
# To compute epsilon (kelp method)
# -----------------------------------
#
# kelp computes epsilon by taking ratio of nightside flux to the dayside flux
mask_day = np.abs(phi2d) < np.pi/2
mask_night = np.abs(phi2d) > np.pi/2
fl_day = np.sum( (fpfs_map * mask_day) ) / np.sum(mask_day)
fl_night = np.sum( (fpfs_map * mask_night) ) / np.sum(mask_night)
eps_kelp = fl_night / fl_day
# -----------------------------------------
# epsilon (Keating, Cowan & Dang 2019)
# -----------------------------------------
# This method is slightly different -- they compute heat redistribution
# efficiency as ratio of heat irradiated by nightside vs heat irradiated
# by the whole planet
idx_night = np.abs(self.phi_ang) > np.pi/2
#fl_pl_night = trapz2d( fpfs_map[...,None] * np.sin(theta2d[...,None] ) * (np.abs(phi2d[...,None]) > np.pi/2 ), phi, theta )[0]
fl_pl_night = trapz2d( fpfs_map[idx_night, :, None] * np.sin(theta2d[idx_night, :, None] ), self.phi_ang[idx_night], self.theta_ang + np.pi/2 )[0]
eps_keating = fl_pl_night / fl_fs_total
return A_Bond_fpfs, eps_kelp, eps_keating
def _albedo_kempton23(self, fpfs_phs, rst, a_by_Rst, teq, rprs):
"""Compute Bond albedo using Kempton et al. (2023) method.
Helper method that calculates Bond albedo from eclipse depth using the
energy-balance method of Kempton et al. (2023), accounting for the
fraction of planetary flux emitted in the bandpass and an anisotropy
factor.
Parameters
----------
fpfs_phs : float
Average measured F_p/F_* in the bandpass (from eclipse or phase curve).
rst : float
Stellar radius (in solar radii or consistent units).
a_by_Rst : float
Semi-major axis in units of stellar radius.
teq : float
Equilibrium temperature of the planet (in Kelvin).
rprs : float
Planet-to-star radius ratio.
Returns
-------
A_Bond_fpfs : float
Bond albedo computed from energy balance accounting for bandpass-dependent
flux emission and anisotropy.
Notes
-----
- Assumes the measured F_p/F_* represents the hemisphere-averaged thermal emission.
- Uses an anisotropy factor of 1.08 (typical for tidally-locked planets).
- The method accounts for the fact that only a fraction of the planet's
total blackbody radiation falls within the instrument bandpass.
- Original code adapted from Kempton et al. (2023) implementation.
References
----------
- Kempton, E. M. R., et al. 2023. A reflective, metal-rich atmosphere for
GJ 1214b from its JWST phase curve, Nature, 620(7972), 67
"""
# This function takes occultation depth to compute the Bond albedo.
# Much of the code is copied from: https://zenodo.org/records/7703086#.ZAZk1dLMJhE
# -----------------------------------
# To compute Bond albedo: method 1
# -----------------------------------
# Where the transmission function is non-zero
idx1 = self.trans_fun > 0.
# Power received
fl_star = planck_func(lam=self.wav_star, temp=self.teff_star)
stellar_flux = np.trapz( y=fl_star, x=self.wav_star )
power_received = np.pi * ( (rprs * rst)**2 ) * np.pi * ( 1 / a_by_Rst**2 ) * stellar_flux
# Power radiated
Fstar = np.trapz(y=fl_star[idx1], x=self.wav_star[idx1]) * np.pi * (rst ** 2)
fp = np.mean( fpfs_phs * Fstar )
power_radiated = 4 * np.pi * fp
# Fraction of planetary flux emitted in TESS bandpass
fl_pl = planck_func(lam=self.wav_star, temp=teq)
fraction = np.trapz(y=fl_pl[idx1], x=self.wav_star[idx1]) / np.trapz(y=fl_pl, x=self.wav_star)
anisotropy = 1.08
# Computing the Bond albedo
A_Bond_fpfs = 1 - ( power_radiated / power_received / fraction / anisotropy )
return A_Bond_fpfs
[docs]
def albedo_eps_from_temp_map(self, a_by_Rst, uniform_nightside_temp=False, nsamples=2000):
"""Compute Bond albedo and heat redistribution from temperature map distribution.
Calculates Bond albedo (A_Bond), heat redistribution efficiency (ε) using
two methods (Kelp and Keating et al. 2019), and dayside/nightside brightness
temperatures from the distribution of 2D temperature maps.
Parameters
----------
a_by_Rst : float or ndarray
Semi-major axis in units of stellar radius (a/R_*). If scalar,
same value used for all samples; if array, sampled for each.
uniform_nightside_temp : bool, optional
If ``True``, replace zero nightside temperatures (from zero grid
points) with the median of measured nightside temperatures for
self-consistency. Default is ``False``.
nsamples : int, optional
Number of posterior samples to process when computing temperature maps.
Default is ``2000``.
Returns
-------
A_Bond_all : ndarray
Bond albedo for each posterior sample. Shape: (N_samples,).
eps_kelp_all : ndarray
Heat redistribution efficiency (Kelp method) for each sample.
Shape: (N_samples,). Computed as nightside/dayside flux ratio.
Tday_all : ndarray
Dayside brightness temperature for each sample. Shape: (N_samples,).
Tnight_all : ndarray
Nightside brightness temperature for each sample. Shape: (N_samples,).
eps_keating_all : ndarray
Heat redistribution efficiency (Keating et al. 2019 method).
Shape: (N_samples,). Computed as nightside flux / total flux.
Notes
-----
- Calls :meth:`temperature_map_distribution` to compute temperature maps if
not already cached.
- Prints posterior median and 68% credible intervals for all output parameters.
- Uses :meth:`_albedo_eps_tdaynight` for computation per sample.=
References
----------
- Keating, D., Cowan, N. B., & Dang, L. 2019. Uniformly hot nightside
temperatures on short-period gas giants. Nature Astronomy, 3, 1092.
Examples
--------
>>> a_by_rst = 10.0
>>> A_B, eps_k, T_d, T_n, eps_keat = invert.albedo_eps_from_temp_map(a_by_rst)
"""
# This function calculates the Bond albedo and heat re-distribution efficiency from the temperature map distribution.
# The heat re-distribution efficiency is calculated using both the kelp method and the Keating, Cowan & Dang (2019) method.
# First, we need to calculate the temperature map itself
self.temperature_map_distribution(nsamples=nsamples)
# Replacing the zero night side temperature with uniform nightside temperature
if uniform_nightside_temp:
## Angles
theta2d, phi2d = np.meshgrid(self.theta_ang, self.phi_ang)
## Mask for nightside
idx_night = np.abs(phi2d) > np.pi/2
## Replacing all zeros on night-side with
## Mean of some temperatures
self.avg_night_temp_all = np.zeros( self.temp_map.shape[0] )
for inte in tqdm(range(self.temp_map.shape[0])):
### Computing the average nightside temperatue
temp_night = self.temp_map[inte,:,:]*idx_night
temp_night[temp_night == 0.] = np.nan
avg_night_temp = np.nanmedian(temp_night.flatten())
self.avg_night_temp_all[inte] = avg_night_temp
### Replacing zeros with average nightside temperature
self.temp_map[inte, :, :][ self.temp_map[inte, :, :] == 0. ] = avg_night_temp
if np.isscalar(a_by_Rst):
a_by_Rst_val = a_by_Rst * np.ones(self.temp_map.shape[0])
else:
a_by_Rst_val = np.random.choice(a_by_Rst, size=self.temp_map.shape[0], replace=False)
# Now, we will compute the Bond albedo and heat re-distribution efficiency for each temperature map in the distribution
A_Bond_all = np.zeros( self.temp_map.shape[0] )
eps_kelp_all = np.zeros( self.temp_map.shape[0] )
Tday_all = np.zeros( self.temp_map.shape[0] )
Tnight_all = np.zeros( self.temp_map.shape[0] )
eps_keating_all = np.zeros( self.temp_map.shape[0] )
for i in tqdm(range(self.temp_map.shape[0])):
A_Bond_all[i], eps_kelp_all[i], Tday_all[i], Tnight_all[i], eps_keating_all[i] = \
self._albedo_eps_tdaynight(temp_map=self.temp_map[i,:,:], a_by_Rst=a_by_Rst_val[i] )
# Let's quickly print the median and 68 percentile confidence intervals for each of these parameters
print('>>> --- Bond albedo: {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( A_Bond_all ), np.percentile( A_Bond_all, 84 ) - np.median( A_Bond_all ), np.median( A_Bond_all ) - np.percentile( A_Bond_all, 16 ) ) )
print('>>> --- Heat re-distribution efficiency (Kelp method): {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( eps_kelp_all ), np.percentile( eps_kelp_all, 84 ) - np.median( eps_kelp_all ), np.median( eps_kelp_all ) - np.percentile( eps_kelp_all, 16 ) ) )
print('>>> --- Dayside temperature: {:.3f} K (+{:.3f}/-{:.3f})'.format( np.median( Tday_all ), np.percentile( Tday_all, 84 ) - np.median( Tday_all ), np.median( Tday_all ) - np.percentile( Tday_all, 16 ) ) )
print('>>> --- Nightside temperature: {:.3f} K (+{:.3f}/-{:.3f})'.format( np.median( Tnight_all ), np.percentile( Tnight_all, 84 ) - np.median( Tnight_all ), np.median( Tnight_all ) - np.percentile( Tnight_all, 16 ) ) )
print('>>> --- Heat re-distribution efficiency (Keating, Cowan & Dang 2019 method): {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( eps_keating_all ), np.percentile( eps_keating_all, 84 ) - np.median( eps_keating_all ), np.median( eps_keating_all ) - np.percentile( eps_keating_all, 16 ) ) )
return A_Bond_all, eps_kelp_all, Tday_all, Tnight_all, eps_keating_all
[docs]
def albedo_eps_from_fpfs_map(self, a_by_Rst):
"""Compute Bond albedo and heat redistribution from flux map distribution.
Calculates Bond albedo and heat redistribution efficiency directly from
the 2D distribution of F_p/F_* maps (without intermediate temperature
conversion). Provides an alternative approach to :meth:`albedo_eps_from_temp_map`
that does not require brightness temperature estimation.
Parameters
----------
a_by_Rst : float or ndarray
Semi-major axis in units of stellar radius (a/R_*). If scalar,
same value used for all samples; if array, sampled for each.
Returns
-------
A_Bond_all : ndarray
Bond albedo for each posterior sample. Shape: (N_samples,).
eps_kelp_all : ndarray
Heat redistribution efficiency (Kelp method). Shape: (N_samples,).
eps_keating_all : ndarray
Heat redistribution efficiency (Keating et al. 2019 method).
Shape: (N_samples,).
Notes
-----
- Does not require calls to temperature-mapping routines.
- Computes F_p/F_* maps directly from Fourier coefficients.
- Prints posterior median and 68% credible intervals for all outputs.
- Uses :meth:`_albedo_eps_fpfs` for computation per sample.
Examples
--------
>>> a_by_rst = 10.0
>>> A_B, eps_k, eps_keat = invert.albedo_eps_from_fpfs_map(a_by_rst)
"""
# This function calculates the Bond albedo and heat re-distribution efficiency from the fp/f* map distribution.
# The heat re-distribution efficiency is calculated using both the kelp method and the Keating, Cowan & Dang (2019) method.
# Now, let's calculate the fp/f* map distribution
fpfs_map = np.zeros( (len(self.E), self.nphi, self.ntheta) )
for integration in range( len(self.E) ):
J_phi = self.A0[integration] + ( self.A1[integration] * np.cos(self.phi_ang) ) + ( self.B1[integration] * np.sin(self.phi_ang) )+\
( self.A2[integration] * np.cos(2*self.phi_ang) ) + ( self.B2[integration] * np.sin(2*self.phi_ang) )
for th in range(self.ntheta):
fpfs_map[integration, :, th] = np.sin(self.theta_ang[th] + np.pi/2) * J_phi * 0.75
if np.isscalar(a_by_Rst):
a_by_Rst_val = a_by_Rst * np.ones(fpfs_map.shape[0])
else:
a_by_Rst_val = np.random.choice(a_by_Rst, size=fpfs_map.shape[0], replace=False)
# Now, we will compute the Bond albedo and heat re-distribution efficiency for each fp/f* map in the distribution
A_Bond_all = np.zeros( fpfs_map.shape[0] )
eps_kelp_all = np.zeros( fpfs_map.shape[0] )
eps_keating_all = np.zeros( fpfs_map.shape[0] )
for i in tqdm(range(fpfs_map.shape[0])):
A_Bond_all[i], eps_kelp_all[i], eps_keating_all[i] = \
self._albedo_eps_fpfs( fpfs_map=fpfs_map[i,:,:], a_by_Rst=a_by_Rst_val[i] )
# Let's quickly print the median and 68 percentile confidence intervals for each of these parameters
print('>>> --- Bond albedo: {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( A_Bond_all ), np.percentile( A_Bond_all, 84 ) - np.median( A_Bond_all ), np.median( A_Bond_all ) - np.percentile( A_Bond_all, 16 ) ) )
print('>>> --- Heat re-distribution efficiency (Kelp method): {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( eps_kelp_all ), np.percentile( eps_kelp_all, 84 ) - np.median( eps_kelp_all ), np.median( eps_kelp_all ) - np.percentile( eps_kelp_all, 16 ) ) )
print('>>> --- Heat re-distribution efficiency (Keating, Cowan & Dang 2019 method): {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( eps_keating_all ), np.percentile( eps_keating_all, 84 ) - np.median( eps_keating_all ), np.median( eps_keating_all ) - np.percentile( eps_keating_all, 16 ) ) )
return A_Bond_all, eps_kelp_all, eps_keating_all
[docs]
def albedo_from_kempton23(self, a_by_Rst, rst, teq):
"""Compute Bond albedo using Kempton et al. (2023) method across posteriors.
Calculates Bond albedo for each posterior sample using the energy-balance
method of Kempton et al. (2023). This approach directly relates observed
eclipse depths to Bond albedo accounting for bandpass-dependent radiation.
Parameters
----------
a_by_Rst : float or ndarray
Semi-major axis in units of stellar radius (a/R_*). If scalar,
same value used for all samples; if array, sampled for each.
rst : float or ndarray (must be astropy.units.Quantity)
Stellar radius in units of solar radius (R_*). If scalar,
same value used for all samples; if array, sampled for each.
teq : float or ndarray (must be astropy.units.Quantity)
Equilibrium temperature of the planet in Kelvin. If scalar,
same value used for all samples; if array, sampled for each.
Returns
-------
A_Bond_all : ndarray
Bond albedo for each posterior sample. Shape: (N_samples,).
Printed to console as median ± 68% credible interval.
Notes
-----
- Uses :meth:`_albedo_kempton23` for computation per sample.
- Requires ``rst`` (stellar radius) to be provided as an argument.
- Prints posterior median and 68% credible intervals.
References
----------
- Kempton, E. M. R., et al. 2023. A reflective, metal-rich atmosphere for
GJ 1214b from its JWST phase curve, Nature, 620(7972), 67
Examples
--------
>>> a_by_rst = 10.0
>>> rst = 1.0 * u.R_sun # Solar radius
>>> teq = 1500 * u.K # Kelvin
>>> A_Bond = invert.albedo_from_kempton23(a_by_rst, rst, teq)
>>> print(f"Bond albedo: {np.median(A_Bond):.3f}")
"""
# This function calculates the Bond albedo using the method from Kempton et al. (2023)
# First, we need to calculate the occultation depth map itself
fpfs_map = np.zeros( (len(self.E), self.nphi) )
for integration in range( len(self.E) ):
J_phi = self.A0[integration] + (self.A1[integration] * np.cos(self.phi_ang)) + (self.B1[integration] * np.sin(self.phi_ang))+\
(self.A2[integration] * np.cos(2*self.phi_ang)) + (self.B2[integration] * np.sin(2*self.phi_ang))
fpfs_map[integration, :] = np.sin(np.pi/2) * J_phi * 0.75
A_Bond_all = np.zeros( fpfs_map.shape[0] )
if np.isscalar(a_by_Rst):
a_by_Rst_val = a_by_Rst * np.ones(fpfs_map.shape[0])
else:
a_by_Rst_val = np.random.choice(a_by_Rst, size=fpfs_map.shape[0], replace=False)
if np.isscalar(teq) or np.isscalar(teq.value):
teq_val = teq * np.ones(fpfs_map.shape[0])
else:
teq_val = np.random.choice(teq, size=fpfs_map.shape[0], replace=False) * u.K
if np.isscalar(self.rprs):
rprs_arr = np.full(fpfs_map.shape, float(self.rprs))
else:
rprs_arr = np.random.choice(self.rprs, size=fpfs_map.shape[0], replace=False)
if np.isscalar(rst) or np.isscalar(rst.value):
rst = rst * np.ones(fpfs_map.shape[0])
else:
rst = np.random.choice(rst, size=fpfs_map.shape[0], replace=False) * u.R_sun
for i in tqdm(range(fpfs_map.shape[0])):
A_Bond_all[i] = self._albedo_kempton23(fpfs_phs=fpfs_map[i,:], rst=rst[i], a_by_Rst=a_by_Rst_val[i], teq=teq_val[i], rprs=rprs_arr[i])
print('>>> --- Bond albedo: {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( A_Bond_all ), np.percentile( A_Bond_all, 84 ) - np.median( A_Bond_all ), np.median( A_Bond_all ) - np.percentile( A_Bond_all, 16 ) ) )
return A_Bond_all
# ---------------------------------------------------------------------------
# Module-level worker for SelectLinDetrend.select_optimal_parameters
# ---------------------------------------------------------------------------
# It lives at module scope so that concurrent.futures.ProcessPoolExecutor can
# pickle it without ever needing to serialise the SelectLinDetrend instance
# (which may hold un-picklable attributes such as a closure for 'priors').
def _lindetrend_worker(worker_args):
(instruments, tim_s, fl_s, fle_s, gp_pars_s,
lin_pars, cached_priors, out_dir,
juliet_fit_kwargs, noise_method) = worker_args
noise_mapping = {'astropy': mad_std, 'std': np.std, 'pipe': pipe_mad}
noise_func = noise_mapping[noise_method]
# Build full prior lists from cached base priors + linear regressor priors
par_tot, dist_tot, hyper_tot = [], [], []
for ins in instruments:
par_ins, dist_ins, hyper_ins = cached_priors[ins]
par_tot += list(par_ins) + ['mdilution_' + ins, 'mflux_' + ins, 'sigma_w_' + ins]
dist_tot += list(dist_ins) + ['fixed', 'normal', 'loguniform']
hyper_tot += list(hyper_ins) + [1.0, [0., 0.1], [0.1, 1e4]]
if lin_pars is not None and lin_pars.get(ins) is not None:
for pc in range(lin_pars[ins].shape[1]):
par_tot.append('theta' + str(pc) + '_' + ins)
dist_tot.append('uniform')
hyper_tot.append([-1, 1])
priors = juliet.utils.generate_priors(par_tot, dist_tot, hyper_tot)
data = juliet.load(
priors=priors,
t_lc=tim_s, y_lc=fl_s, yerr_lc=fle_s,
linear_regressors_lc=lin_pars,
GP_regressors_lc=gp_pars_s,
out_folder=out_dir,
)
res = data.fit(sampler='dynamic_dynesty', **juliet_fit_kwargs)
mads_all = []
for ins in instruments:
model_lc = res.lc.evaluate(instrument=ins)
residuals = fl_s[ins] - model_lc
mads_all.append(noise_func(residuals) * 1e6)
return res.posteriors['lnZ'], mads_all
[docs]
class SelectLinDetrend(object):
"""Select optimal linear detrending vectors using model selection with Bayesian evidence.
This class provides tools to systematically identify the best combination of
linear detrending regressors (e.g., time polynomials, roll angle modulation,
background variations) for multi-instrument light-curve fitting. It employs
model selection based on log-evidence improvements, starting from a base model
with no linear regressors and iteratively adding vectors that maximize evidence
gain. A regressor is accepted only if it improves log-evidence by at least
2 units; the search terminates when no remaining regressor meets this threshold.
The class interfaces with the ``juliet`` package for Bayesian model fitting and supports
parallel processing of model comparisons through concurrent fitting. It automatically
handles multi-instrument datasets where different instruments may have different
numbers and types of regressors available.
Linear regressors can be passed directly or be generated from roll angle values via
Fourier series expansion. All data (time, flux, errors, regressors) are expected
as per-instrument dictionaries, enabling straightforward multi-instrument analysis.
Parameters
----------
time : dict
Time stamps for each instrument. Keys are instrument names; values are
1-D numpy arrays of time points (in BJD or similar). All arrays should
have the same length as corresponding ``flux`` and ``flux_err`` arrays.
flux : dict
Flux (or normalized flux) measurements for each instrument. Keys are
instrument names; values are 1-D numpy arrays of flux measurements.
flux_err : dict
Flux uncertainties for each instrument. Keys are instrument names;
values are 1-D numpy arrays of flux uncertainties (same length as flux).
priors : callable
A function mapping an instrument name (keyword argument ``instrument``)
to a tuple of three lists:
- param_names : list of str
Names of juliet-compatible model parameters (e.g., 't0_p1', 'p_p1', 'P_p1').
- distributions : list of str
juliet-compatible distribution names (e.g., 'normal', 'uniform', 'loguniform').
- hyperparams : list
Hyperparameter lists for each distribution. For example, a 'normal'
distribution takes [mean, std], and a 'uniform' takes [lower, upper].
The function is called once per instrument during initialization and
results are cached internally.
linear_regressors : dict
Linear regressors to consider for detrending. Keys are instrument names;
values are dicts where each key is a regressor name (str) and value is a
1-D numpy array (same length as time for that instrument). Examples include
'time', 'time^2', 'background', 'systematics', etc.
If ``roll_degree`` and ``roll`` are both provided, roll-angle Fourier
series regressors ('ROLL_SIN_1', 'ROLL_COS_1', etc.) are automatically
added to ``linear_regressors`` during initialization.
gp_regressors : dict or None, optional
Gaussian Process (GP) regressors to use for systematics fitting. Keys are
instrument names; values are 1-D numpy arrays of regressor values
(e.g., roll angle, time). If provided, these are sorted
and passed to juliet for GP handling. Default is ``None``.
roll_degree : int or None, optional
Maximum degree of roll-angle Fourier series to include as linear regressors.
If provided (and ``roll`` is also provided), Fourier series regressors
up to degree ``roll_degree`` are automatically generated and added to
``linear_regressors``. Must be paired with ``roll`` parameter.
Default is ``None``.
roll : dict or None, optional
Roll angle values (in degrees) for each instrument. Keys are instrument
names; values are 1-D numpy arrays of roll angles. Required only if
``roll_degree`` is specified. Default is ``None``.
noise_method : str, optional
Method to compute noise/scatter in residuals. Options are:
- 'astropy' : Uses median absolute deviation (astropy.stats.mad_std).
- 'std' : Uses standard deviation (numpy.std).
- 'pipe' : Uses the PIPE photometric uncertainty (pipe_mad function).
This method is applied to residuals after each model fit to estimate
per-instrument noise levels. Default is ``'astropy'``.
juliet_fit_kwargs : dict, optional
Additional keyword arguments to pass to ``juliet.fit()`` during model fits.
For example, ``{'sampler': 'dynamic_dynesty', 'nlive': 500}``.
Default is an empty dict ``{}``.
nthreads : int, optional
Total number of CPU threads available for fitting. Shared among parallel
workers if ``n_parallel > 1`` is used in :meth:`select_optimal_parameters`.
Default is the total number of available CPU cores.
pout : str, optional
Output directory for saving juliet fit results. Each model fit creates
a subdirectory named after the included regressors (e.g., 'linsel_base',
'linsel_time', 'linsel_time_background'). Default is the current working
directory.
Attributes
----------
time, flux, flux_err : dict
Stored input light-curve data (per-instrument).
instruments : list of str
Extracted list of instrument names from the keys of ``time`` dict.
priors : callable
Stored prior function.
linear_regressors : dict
Stored linear regressors (or regressors with added roll terms if
``roll_degree`` was provided).
GP_regressors : dict or None
Stored GP regressors.
roll_degree, roll : int or None, dict or None
Stored roll parameters.
noise_method : str
Stored noise method name.
noise_func : callable
Cached noise function (e.g., mad_std, np.std, rms, or pipe_mad).
juliet_fit_kwargs : dict
Stored juliet fitting kwargs.
nthreads : int
Stored thread count.
pout : str
Stored output directory.
_sort_idx, _tim_s, _fl_s, _fle_s : dict
Per-instrument sort indices and sorted data arrays (cached by :meth:`_precompute`).
_gp_pars_s, _gp_pars_s_juliet : dict or None
Cached GP regressor arrays (sorted, or None if not applicable).
_cached_priors : dict
Per-instrument prior tuples cached from the priors callable.
Main Methods
----------
select_optimal_parameters(n_parallel=1)
Execute forward selection to identify optimal linear detrending vectors
based on log-evidence maximization.
Notes
-----
- All fitting is performed using the ``juliet`` Bayesian fitting package.
- Log-evidence improvements are computed via ``juliet.fit()`` and returned
as ``posteriors['lnZ']``.
- The model selection algorithm is greedy: it selects the regressor with
the maximum evidence gain at each step, then moves to the next round.
- A regressor is accepted only if it improves log-evidence by >= delta_lnZ_threshold
(default 2.0). This threshold reflects a Bayes factor of ~exp(2) ≈ 7.4, corresponding to
"strong" evidence in Bayesian hypothesis testing.
- When ``n_parallel > 1`` in :meth:`select_optimal_parameters`, concurrent
fitting is performed using ``concurrent.futures.ProcessPoolExecutor``. To
keep total thread usage under control, the per-worker nthreads budget is
automatically adjusted as ``nthreads // n_parallel``.
- Roll-angle Fourier series regressors (if generated) follow the naming
convention 'ROLL_SIN_k' and 'ROLL_COS_k' for degree k = 1, 2, ..., roll_degree.
- Regressors that do not exist for a particular instrument (e.g., a regressor
available only for one camera in a multi-camera setup) are automatically
skipped for that instrument during model construction.
- All per-instrument data (time, flux, errors, regressors) are length-checked
during juliet loading but not explicitly in __init__. Ensure consistency
before instantiation.
Examples
--------
Basic forward selection with two instruments:
>>> import numpy as np
>>> time = {
... 'camera_1': np.linspace(2459000, 2459001, 200),
... 'camera_2': np.linspace(2459000, 2459001, 150),
... }
>>> flux = {
... 'camera_1': 1.0 + 0.01 * np.random.randn(200),
... 'camera_2': 1.0 + 0.005 * np.random.randn(150),
... }
>>> flux_err = {
... 'camera_1': np.full(200, 0.005),
... 'camera_2': np.full(150, 0.003),
... }
>>> linear_regressors = {
... 'camera_1': {
... 'time': time['camera_1'],
... 'time^2': time['camera_1']**2,
... },
... 'camera_2': {
... 'time': time['camera_2'],
... 'time^2': time['camera_2']**2,
... },
... }
>>> def priors_func(instrument):
... return (['t0_p1', 'p_p1', 'P_p1'],
... ['normal', 'normal', 'uniform'],
... [[2459000.5, 0.1], [3.5, 0.01], [0.01, 0.1]])
>>>
>>> selector = SelectLinDetrend(
... time=time,
... flux=flux,
... flux_err=flux_err,
... priors=priors_func,
... linear_regressors=linear_regressors,
... pout='/path/to/results'
... )
>>> selected_regs, lnZ_hist, scatter_hist = selector.select_optimal_parameters(n_parallel=2)
With roll-angle detrending:
>>> roll_angles = {
... 'camera_1': np.random.uniform(0, 360, 200),
... 'camera_2': np.random.uniform(0, 360, 150),
... }
>>> selector = SelectLinDetrend(
... time=time,
... flux=flux,
... flux_err=flux_err,
... priors=priors_func,
... linear_regressors=linear_regressors,
... roll_degree=3,
... roll=roll_angles,
... pout='/path/to/results'
... )
>>> # Now linear_regressors includes 'ROLL_SIN_1', 'ROLL_COS_1', ..., 'ROLL_SIN_3', 'ROLL_COS_3'
>>> selected_regs, lnZ_hist, scatter_hist = selector.select_optimal_parameters()
It is possible to use scatter in the residuals (instead of log-evidence) for model selection by setting selection_method='scatter' in select_optimal_parameters. In this case, the regressor that minimizes the scatter (as computed by noise_method) is selected at each step.
>>> # Using scatter-based selection instead of log-evidence
>>> selected_regs, lnZ_hist, scatter_hist = selector.select_optimal_parameters(selection_method='scatter')
"""
def __init__(self, time, flux, flux_err, priors, linear_regressors, gp_regressors=None, roll_degree=None, roll=None, noise_method='astropy', juliet_fit_kwargs={}, nthreads=multiprocessing.cpu_count(), pout=os.getcwd() ):
# Time, flux, and flux_err are the light curve data. All of them are dict with keys of instrument name.
self.time = time
self.flux = flux
self.flux_err = flux_err
# Extracting an instrument list
self.instruments = list( self.time.keys() )
# Priors; it is a function with only keyword is the name of the instrument.
# It should return a tuple of lists, where the first list is the list of parameter names, second is the list of distributions, and the third is the hyperparameters for the distributions.
self.priors = priors
# linear_regressors is a dict where the keys are the names of the instruments, and values are again dict where keys
# now are the name of the linear regressors (e.g., 'time', 'time^2', 'background', etc.) and values are numpy arrays of
# the same length as time, flux, and flux_err. These arrays represent the values of the linear regressors at each time point.
self.linear_regressors = linear_regressors
# GP_regressors is a dict where the keys are the names of the instruments
self.GP_regressors = gp_regressors
# The roll_degree parameter indicates the maximum degree of the roll angle Fourier series to include in the linear model.
self.roll_degree = roll_degree
# If we are going to include the roll angle as a linear regressor, then we need to have the roll angle values for each time point.
# It should be a dict where the keys are the names of the instruments and values are numpy arrays of the same length as
# time, flux, and flux_err. These arrays represent the roll angle values at each time point.
self.roll = roll
if self.roll_degree is not None and self.roll is not None:
# This means that we will be including the roll angle as a linear regressor, so we need to create the regressors for the roll angle modulation and include them in self.linear_regressors.
# Let's adjust the self.linear_regressors to include the roll angle regressors if GP_roll is False
self._roll_model_sine()
elif ( self.roll_degree is None) or (self.roll is None):
# This means that the user wants to include Roll angles as linear regressors but they forgot to provide either the roll_degree or the roll angle values. We will raise an error in this case.
if (self.roll_degree is not None) and (self.roll is None):
raise ValueError("roll_degree is provided but roll angle values are not provided. Please provide the roll angle values or set roll_degree to None.")
elif (self.roll_degree is None) and (self.roll is not None):
raise ValueError("roll angle values are provided but roll_degree is not provided. Please provide the roll_degree or set roll to None.")
else:
# Both roll_degree and roll are None, which means that we will be including the roll angle as a Gaussian process regressor, so we don't need to include it as a linear regressor.
# We assume that the use have alreadey included the roll angle as a GP regressor in self.GP_regressors, so we don't need to do anything here.
# Just a sanity check to make sure that the roll angle is included in the GP regressors.
if self.GP_regressors is None:
print("GP_regressors must be provided if roll angle modulation is desired to be fitted with a GP.")
# Noise method to use for calculating the scatter in residuals
noise_mapping = {'astropy': mad_std, 'std': np.std, 'pipe': pipe_mad, 'rms': rms}
if noise_method not in noise_mapping:
raise ValueError("Invalid noise method. Please choose from 'astropy', 'std', 'pipe', or 'rms'.")
self.noise_method = noise_method # stored as string for worker pickling
self.noise_func = noise_mapping[noise_method]
# Finally, we will save any extra juliet fit kwargs to be used in the juliet fitting later.
# This argument will be provided to juliet.fit() function when we fit the model to the data.
self.juliet_fit_kwargs = juliet_fit_kwargs
# Cores to use for juliet fitting
self.nthreads = nthreads
# Output directory to save the juliet fit results
self.pout = pout
# Pre-compute sort indices, sorted data arrays, and cached prior values
# so that _fit_model and parallel workers never redo this work.
self._precompute()
def _precompute(self):
# Cache per-instrument sort indices, pre-sorted data, GP regressors, and
# prior data so this work is done exactly once rather than once per fit.
self._sort_idx = {}
self._tim_s = {}
self._fl_s = {}
self._fle_s = {}
self._gp_pars_s = {}
for ins in self.instruments:
gp_reg = self.GP_regressors[ins] if self.GP_regressors is not None else None
if gp_reg is not None:
idx = np.argsort(gp_reg)
self._gp_pars_s[ins] = gp_reg[idx]
else:
idx = np.arange(len(self.time[ins]))
self._gp_pars_s[ins] = None
self._sort_idx[ins] = idx
self._tim_s[ins] = self.time[ins][idx]
self._fl_s[ins] = self.flux[ins][idx]
self._fle_s[ins] = self.flux_err[ins][idx]
# Collapse gp_pars: None if all instruments lack GP regressors,
# otherwise keep only the instruments that have them.
if all(v is None for v in self._gp_pars_s.values()):
self._gp_pars_s_juliet = None
else:
self._gp_pars_s_juliet = {k: v for k, v in self._gp_pars_s.items() if v is not None}
# Cache prior data per instrument (lists/tuples of plain data → picklable)
self._cached_priors = {}
for ins in self.instruments:
self._cached_priors[ins] = self.priors(instrument=ins)
def _roll_model_sine(self):
# This helper function creates the regressors for roll angle modulation for maximum degree of roll_degree.
# Eventually the regressors are saved in self.linear_regressors with keys 'ROLL_SIN_1', 'ROLL_COS_1', 'ROLL_SIN_2', 'ROLL_COS_2', ..., 'ROLL_SIN_roll_degree', 'ROLL_COS_roll_degree'.
for ins in range( len(self.instruments) ):
for i in range(self.roll_degree):
self.linear_regressors[self.instruments[ins]][f'ROLL_SIN_{i+1}'] = np.sin( (i+1) * np.radians( self.roll[self.instruments[ins]] ) )
self.linear_regressors[self.instruments[ins]][f'ROLL_COS_{i+1}'] = np.cos( (i+1) * np.radians( self.roll[self.instruments[ins]] ) )
def _build_lin_pars(self, lin_reg):
"""Apply cached sort indices to lin_reg and stack into juliet matrix form.
Parameters
----------
lin_reg : dict
{instrument: {regressor_name: array}} or {instrument: None}
Returns
-------
lin_pars : dict or None
{instrument: 2-D ndarray (n_points x n_regressors)} ready for juliet,
or None if every instrument has no regressors.
"""
lin_pars = {}
for ins in self.instruments:
if lin_reg[ins] is not None:
# Sort by GP regressor order; use sorted() for deterministic column order
arrs = [lin_reg[ins][k][self._sort_idx[ins]] for k in sorted(lin_reg[ins].keys())]
lin_pars[ins] = np.transpose(np.vstack(arrs))
else:
lin_pars[ins] = None
if all(v is None for v in lin_pars.values()):
return None
return lin_pars
def _fit_model(self, lin_reg, out_dir, nthreads=None):
"""Fit the total model using juliet.
Uses pre-sorted data and cached prior values from _precompute() so no
redundant sorting or prior evaluation occurs per call.
Parameters
----------
lin_reg : dict
{instrument: {regressor_name: array}} or {instrument: None}
out_dir : str
Output folder for juliet results.
nthreads : int, optional
Thread count passed to juliet. Defaults to self.nthreads.
Returns
-------
lnZ : float
mads_all : list of float
"""
if nthreads is None:
nthreads = self.nthreads
lin_pars = self._build_lin_pars(lin_reg)
# Build prior lists from cached per-instrument base priors
par_tot, dist_tot, hyper_tot = [], [], []
for ins in self.instruments:
par_ins, dist_ins, hyper_ins = self._cached_priors[ins]
par_tot += list(par_ins) + ['mdilution_' + ins, 'mflux_' + ins, 'sigma_w_' + ins]
dist_tot += list(dist_ins) + ['fixed', 'normal', 'loguniform']
hyper_tot += list(hyper_ins) + [1.0, [0., 0.1], [0.1, 1e4]]
if lin_pars is not None and lin_pars.get(ins) is not None:
for pc in range(lin_pars[ins].shape[1]):
par_tot.append('theta' + str(pc) + '_' + ins)
dist_tot.append('uniform')
hyper_tot.append([-1, 1])
priors = juliet.utils.generate_priors(par_tot, dist_tot, hyper_tot)
data = juliet.load(
priors=priors,
t_lc=self._tim_s, y_lc=self._fl_s, yerr_lc=self._fle_s,
linear_regressors_lc=lin_pars,
GP_regressors_lc=self._gp_pars_s_juliet,
out_folder=out_dir,
)
res = data.fit(sampler='dynamic_dynesty', nthreads=nthreads, **self.juliet_fit_kwargs)
mads_all = []
for ins in self.instruments:
model_lc = res.lc.evaluate(instrument=ins)
residuals = self._fl_s[ins] - model_lc
mads_all.append(self.noise_func(residuals) * 1e6)
return res.posteriors['lnZ'], mads_all
[docs]
def select_optimal_parameters(self, n_parallel=1, delta_lnZ_threshold=2.0,
selection_method='lnZ', scatter_threshold=5.0):
"""
Select optimal linear detrending vectors using forward selection based on
Bayesian evidence or light-curve scatter.
The algorithm starts with a base model that has no linear regressors and
iteratively adds the vector that yields the greatest improvement according
to the chosen selection metric. The search stops when no remaining vector
clears the acceptance threshold.
Unique regressor names are collected across all instruments. When a regressor
is selected it is included for every instrument that carries it; instruments
that do not have that regressor are left unchanged.
Parameters
----------
n_parallel : int, optional
Number of model fits to run concurrently within each selection round.
Default is 1 (sequential).
When n_parallel > 1 the fits in the inner loop are dispatched to a
ProcessPoolExecutor for true CPU parallelism (no GIL contention).
The juliet nthreads budget is automatically divided by n_parallel so
that the total thread count across all workers stays within self.nthreads.
The module-level _lindetrend_worker function is used as the callable so
that no part of this instance (including the potentially un-picklable
priors callable) needs to be serialised.
delta_lnZ_threshold : float, optional
Minimum improvement in log-evidence required to accept a new regressor
when selection_method='lnZ'. A candidate is added only if its lnZ
exceeds the current base model's lnZ by at least this value. Default
is 2.0, which corresponds to "strong evidence" on the Jeffreys scale.
Ignored when selection_method='scatter'.
selection_method : {'lnZ', 'scatter'}, optional
Metric used to rank and accept candidate regressors.
* ``'lnZ'`` (default) — select the regressor that maximises the
Bayesian log-evidence improvement. Works for any number of
instruments.
* ``'scatter'`` — select the regressor that minimises the MAD
scatter of the residuals for the single instrument. If more than
one instrument is provided this option is not supported and the
method automatically falls back to ``'lnZ'`` with a warning.
scatter_threshold : float, optional
Minimum reduction in scatter (in ppm) required to accept a new
regressor when selection_method='scatter'. A candidate is accepted
only if its residual scatter is at least this many ppm lower than
the current base model's scatter. Default is 5.0 ppm. Ignored when
selection_method='lnZ'.
Returns
-------
selected_regressors : dict
Dict with instrument names as keys. Each value is either a dict of
{regressor_name: array} for the selected regressors, or None if no
regressor was selected for that instrument.
lnZ_history : list of float
Log-evidence at each accepted step (index 0 is the base model).
scatter_history : list
Scatter (MAD) values at each accepted step (index 0 is the base model).
"""
# Validate selection_method; fall back to 'lnZ' for multi-instrument scatter
if selection_method not in ('lnZ', 'scatter'):
raise ValueError(
f"selection_method must be 'lnZ' or 'scatter', got {selection_method!r}"
)
if selection_method == 'scatter' and len(self.instruments) > 1:
import warnings
warnings.warn(
"selection_method='scatter' is not supported for multiple instruments; "
"falling back to selection_method='lnZ'.",
UserWarning, stacklevel=2,
)
selection_method = 'lnZ'
# Collect all unique regressor names across all instruments
all_regressor_names = set()
for ins in self.instruments:
all_regressor_names.update(self.linear_regressors[ins].keys())
remaining_names = list(all_regressor_names)
# --- Step 1: base model (no linear regressors) ---
lin_reg_none = {ins: None for ins in self.instruments}
base_out_dir = os.path.join(self.pout, 'linsel_base')
print("Fitting base model (no linear regressors)...")
base_lnZ, base_scatter = self._fit_model(lin_reg_none, base_out_dir)
if selection_method == 'scatter':
current_scatter_val = base_scatter[0]
print(f" Base model lnZ = {base_lnZ:.2f}, scatter = {current_scatter_val:.2f} ppm")
else:
print(f" Base model lnZ = {base_lnZ:.2f}")
selected_names = []
current_lnZ = base_lnZ
lnZ_history = [base_lnZ]
scatter_history = [base_scatter]
# Stores every round's results for the final per-round summary table.
all_rounds = []
# Per-worker thread budget so total threads ≤ self.nthreads
nthreads_per_worker = max(1, self.nthreads // max(1, n_parallel))
# --- Step 2: forward selection ---
while remaining_names:
# Build candidates: (name, lin_pars, out_dir)
# lin_pars is already sorted and stacked — ready to pickle / pass to juliet.
candidates = []
for name in remaining_names:
candidate_names = selected_names + [name]
lin_reg = {
ins: {
reg: self.linear_regressors[ins][reg]
for reg in candidate_names
if reg in self.linear_regressors[ins]
} or None
for ins in self.instruments
}
# Resolve empty dicts to None
lin_reg = {ins: (v if v else None) for ins, v in lin_reg.items()}
lin_pars = self._build_lin_pars(lin_reg)
out_dir = os.path.join(
self.pout,
'linsel_' + '_'.join(sorted(candidate_names))
)
candidates.append((name, lin_pars, out_dir))
# Run fits — parallel (ProcessPoolExecutor) or sequential
results = {} # name -> (lnZ, scatter)
if n_parallel > 1:
# juliet_fit_kwargs forwarded to each worker with per-worker nthreads
worker_fit_kwargs = dict(self.juliet_fit_kwargs)
worker_fit_kwargs['nthreads'] = nthreads_per_worker
worker_args_list = [
(
self.instruments,
self._tim_s, self._fl_s, self._fle_s,
self._gp_pars_s_juliet,
lin_pars,
self._cached_priors,
out_dir,
worker_fit_kwargs,
self.noise_method,
)
for _, lin_pars, out_dir in candidates
]
with concurrent.futures.ProcessPoolExecutor(max_workers=n_parallel) as executor:
future_to_name = {
executor.submit(_lindetrend_worker, args): name
for (name, _, _), args in zip(candidates, worker_args_list)
}
for future in concurrent.futures.as_completed(future_to_name):
name = future_to_name[future]
lnZ, scatter = future.result()
results[name] = (lnZ, scatter)
if selection_method == 'scatter':
print(f" Tested '{name}': lnZ = {lnZ:.2f}, scatter = {scatter[0]:.2f} ppm "
f"(current base: {current_scatter_val:.2f} ppm)")
else:
print(f" Tested '{name}': lnZ = {lnZ:.2f} (current best: {current_lnZ:.2f})")
else:
for name, _, out_dir in candidates:
candidate_names = selected_names + [name]
lin_reg = {
ins: {
reg: self.linear_regressors[ins][reg]
for reg in candidate_names
if reg in self.linear_regressors[ins]
} or None
for ins in self.instruments
}
lin_reg = {ins: (v if v else None) for ins, v in lin_reg.items()}
print(f" Testing '{name}': {candidate_names}")
lnZ, scatter = self._fit_model(lin_reg, out_dir, nthreads=nthreads_per_worker)
results[name] = (lnZ, scatter)
if selection_method == 'scatter':
print(f" lnZ = {lnZ:.2f}, scatter = {scatter[0]:.2f} ppm "
f"(current base: {current_scatter_val:.2f} ppm)")
else:
print(f" lnZ = {lnZ:.2f} (current best: {current_lnZ:.2f})")
# Record this round for the summary table. 'selected' is filled in below.
base_scatter_val = current_scatter_val if selection_method == 'scatter' else None
all_rounds.append({
'round_num': len(all_rounds) + 1,
'base_set': list(selected_names),
'base_lnZ': current_lnZ,
'base_scatter_val': base_scatter_val,
'selection_method': selection_method,
'candidates': {
name: {
'lnZ': lnZ,
'scatter': scatter,
'delta_lnZ': lnZ - current_lnZ,
'delta_scatter': (base_scatter_val - scatter[0])
if selection_method == 'scatter' else None,
}
for name, (lnZ, scatter) in results.items()
},
'selected': None,
})
# Find best candidate and accept if threshold is met
if selection_method == 'scatter':
best_scatter_val = np.inf
best_name = None
best_lnZ = None
best_scatter = None
for name, (lnZ, scatter) in results.items():
if scatter[0] < best_scatter_val:
best_scatter_val = scatter[0]
best_name = name
best_lnZ = lnZ
best_scatter = scatter
delta_scatter = current_scatter_val - best_scatter_val
if delta_scatter >= scatter_threshold:
print(f" Accepted '{best_name}' (scatter reduction = {delta_scatter:.2f} ppm)")
all_rounds[-1]['selected'] = best_name
selected_names.append(best_name)
remaining_names.remove(best_name)
current_scatter_val = best_scatter_val
current_lnZ = best_lnZ
lnZ_history.append(best_lnZ)
scatter_history.append(best_scatter)
else:
print(
f" No regressor reduced scatter by >= {scatter_threshold} ppm "
f"(best reduction = {delta_scatter:.2f} ppm). Stopping."
)
break
else:
# lnZ-based selection
best_lnZ = -np.inf
best_name = None
best_scatter = None
for name, (lnZ, scatter) in results.items():
if lnZ > best_lnZ:
best_lnZ = lnZ
best_name = name
best_scatter = scatter
if best_lnZ - current_lnZ >= delta_lnZ_threshold:
print(f" Accepted '{best_name}' (delta lnZ = {best_lnZ - current_lnZ:.2f})")
all_rounds[-1]['selected'] = best_name
selected_names.append(best_name)
remaining_names.remove(best_name)
current_lnZ = best_lnZ
lnZ_history.append(best_lnZ)
scatter_history.append(best_scatter)
else:
print(
f" No regressor improved lnZ by >= {delta_lnZ_threshold} "
f"(best delta = {best_lnZ - current_lnZ:.2f}). Stopping."
)
break
# ── Final summary table ──────────────────────────────────────────────────
col_name_w = max(len(n) for n in list(all_regressor_names) + ['[base model]']) + 4
col_lnz_w = 10
col_dlnz_w = 11
col_scat_w = 12
col_stat_w = 14
delta_col_label = 'delta scat' if selection_method == 'scatter' else 'delta lnZ'
header_parts = [f"{'Regressor':<{col_name_w}}", f"{'lnZ':>{col_lnz_w}}",
f"{delta_col_label:>{col_dlnz_w}}"]
header_parts += [f"{(ins + ' (ppm)'):>{col_scat_w}}" for ins in self.instruments]
header_parts += [f"{'Status':<{col_stat_w}}"]
header = ' | '.join(header_parts)
sep = '-+-'.join(['-' * col_name_w, '-' * col_lnz_w, '-' * col_dlnz_w]
+ ['-' * col_scat_w] * len(self.instruments)
+ ['-' * col_stat_w])
total_w = len(header)
title = f'Linear Detrending Selection Summary [method: {selection_method}]'
def _fmt_row(marker, name, lnZ, delta_str, scatter_list, status):
label = f"{marker}{name}"
row = [f"{label:<{col_name_w}}", f"{lnZ:>{col_lnz_w}.2f}",
f"{delta_str:>{col_dlnz_w}}"]
row += [f"{sc:>{col_scat_w}.2f}" for sc in scatter_list]
row += [f"{status:<{col_stat_w}}"]
return ' | '.join(row)
# Build all lines first so they can be printed and saved in one pass
lines = []
lines.append('\n' + '=' * total_w)
lines.append(title.center(total_w))
lines.append('=' * total_w)
lines.append(header)
lines.append(sep)
# Base model row
lines.append(_fmt_row('', '[base model]', base_lnZ, '---', base_scatter, 'base model'))
# One section per round
for rd in all_rounds:
base_set_str = '[' + ', '.join(rd['base_set']) + ']' if rd['base_set'] else 'none'
lines.append(sep)
if selection_method == 'scatter':
lines.append(
f" Round {rd['round_num']} "
f"(base set: {base_set_str}, base scatter = {rd['base_scatter_val']:.2f} ppm)"
)
else:
lines.append(
f" Round {rd['round_num']} "
f"(base set: {base_set_str}, base lnZ = {rd['base_lnZ']:.2f})"
)
lines.append(sep)
# Selected candidate first; remainder sorted ascending scatter or descending lnZ
if selection_method == 'scatter':
ordered = sorted(
rd['candidates'].keys(),
key=lambda n: (n != rd['selected'], rd['candidates'][n]['scatter'][0])
)
else:
ordered = sorted(
rd['candidates'].keys(),
key=lambda n: (n != rd['selected'], -rd['candidates'][n]['lnZ'])
)
for name in ordered:
r = rd['candidates'][name]
if selection_method == 'scatter':
dscat = r['delta_scatter']
delta_str = f"+{dscat:.2f}" if dscat >= 0 else f"{dscat:.2f}"
else:
dlnZ = r['delta_lnZ']
delta_str = f"+{dlnZ:.2f}" if dlnZ >= 0 else f"{dlnZ:.2f}"
if name == rd['selected']:
marker, status = '>> ', 'SELECTED'
else:
marker, status = ' ', 'not selected'
lines.append(_fmt_row(marker, name, r['lnZ'], delta_str, r['scatter'], status))
lines.append('=' * total_w)
# Print to stdout
for line in lines:
print(line)
# Save to file
summary_path = os.path.join(self.pout, 'linsel_summary.txt')
with open(summary_path, 'w') as f:
f.write('\n'.join(lines) + '\n')
print(f" Summary saved to {summary_path}")
# Build the final selected-regressors dict
selected_regressors = {}
for ins in self.instruments:
reg_dict = {
name: self.linear_regressors[ins][name]
for name in selected_names
if name in self.linear_regressors[ins]
}
selected_regressors[ins] = reg_dict if reg_dict else None
return selected_regressors, lnZ_history, scatter_history