Source code for krithika.krithika

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.cm as cm
import matplotlib.gridspec as gd
from astroquery.mast import Observations
from mpl_toolkits.axes_grid1 import make_axes_locatable
from astropy.io import fits
from astropy.table import Table
from astropy.stats import mad_std
from scipy.interpolate import interp1d
from scipy.optimize import root, fsolve
import matplotlib
import multiprocessing
import concurrent.futures
from pathlib import Path
from glob import glob
from tqdm import tqdm
#from .plotstyles import *
import pickle
import juliet
from .utils import *
import warnings
import os

try:
    from photutils.aperture import CircularAnnulus, CircularAperture, ApertureStats
    from photutils.aperture import aperture_photometry as aphot
except:
    print('Warning: photutils is not installed! Will not be able to use photutils based photometry.')

try:
    from dace_query.cheops import Cheops
except:
    print('It looks like DACE API is not installed! Will not be able to download CHEOPS data products.')


[docs] class CHEOPSData(object): """ Manipulate CHEOPS time-series data from simple reading to extraction. This class provides tools to read, process, and extract photometry from CHEOPS (CHaracterising ExOPlanet Satellite) observations. It supports reading ``PIPE``-produced light curves, downloading data reduction pipeline (DRP) products, handling sub-array data, and performing aperture photometry on subarrays. Parameters ---------- object_name : str, optional Name or identifier of the target object. Used for querying the CHEOPS database when downloading data products. Default is ``None``. pipe_filename : str, optional Path to a pipeline-produced FITS file for reading with :meth:`pipe_data`. Default is ``None``. Attributes ---------- pipe_fname : str or None Stored pipeline FITS filename. object_name : str or None Stored target object name. drp_data : dict Dictionary containing DRP light curve data populated by :meth:`get_drp_lightcurves` with keys for each instrument (e.g., 'TG000001'), each containing 'TIME', 'FLUX', 'FLUX_ERR', 'ROLL', 'BG', 'CONTAM', 'SMEAR', 'XC', 'YC'. time_sub, flux_sub, flux_err_sub, badpix_sub : dict Dictionaries containing sub-array data populated by :meth:`get_subarrays`. subarray_data : dict, optional Dictionary that may be populated with processed subarray data. Examples -------- Reading ``PIPE`` data: >>> cheops = CHEOPSData(pipe_filename='path/to/file.fits') >>> data = cheops.pipe_data(bgmin=300) Downloading DRP light curves: >>> cheops = CHEOPSData(object_name='WASP-189b') >>> drp_data = cheops.get_drp_lightcurves(pout='/path/to/output') Notes ----- Requires the ``dace_query`` library to download CHEOPS data products from the DACE (Data and Analysis Center for Exoplanets) database. """ def __init__(self, object_name=None, pipe_filename=None): self.pipe_fname = pipe_filename self.object_name = object_name
[docs] def pipe_data(self, bgmin=300, not_flg_arr=None): """Extract and process light curve data from a ``PIPE`` FITS file. This method reads a CHEOPS pipeline-produced FITS file (typically from ``self.pipe_fname``) and extracts time series photometry along with auxiliary data such as centroids, background, and temperature. Data points are filtered by quality flags and background levels. The flux is normalized by its median value. Parameters ---------- bgmin : float, optional Minimum background level threshold in counts. Data points with background below this value are excluded. Default is ``300``. not_flg_arr : array_like of int, optional Array of FLAG values to *include* in the output (i.e., non-zero flags are normally rejected, but values in this array are kept). If ``None``, only FLAG==0 data are retained. Default is ``None``. Returns ------- data : dict Dictionary containing extracted and processed data with keys: - 'TIME' : ndarray Barycentric Julian Date time stamps. - 'FLUX' : ndarray Normalized flux (normalized by median). - 'FLUX_ERR' : ndarray Flux uncertainties (normalized by median). - 'ROLL' : ndarray Roll angle in degrees. - 'XC', 'YC' : ndarray Centroid positions in pixels. - 'BG' : ndarray Background level in counts. - 'TF2' : ndarray thermFront2 temperature in Kelvin. - 'U1' ... 'U16' : ndarray, optional Principal components from PCA. Notes ----- Quality flags (FLAG column): - 0 : Good data - >0 : Data with issues (normally rejected) The method performs two masking steps: 1. Removes flagged data and optionally includes specified flags 2. Removes high-background points (BG < bgmin) Examples -------- >>> cheops = CHEOPSData(pipe_filename='file.fits') >>> data = cheops.pipe_data(bgmin=300) >>> print(data['TIME'], data['FLUX']) """ hdul = fits.open(self.pipe_fname) tab = Table.read(hdul[1]) # Masking datasets flg = np.asarray(tab['FLAG']) # Flagged data msk = np.where(flg==0)[0] # Creating a mask to remove flg>0 values if not_flg_arr is not None: for i in range(len(not_flg_arr)): msk_f1 = np.where(flg==not_flg_arr[i])[0] msk = np.hstack((msk, msk_f1)) # Gathering dataset Us_n = np.array([]) Us = [] for i in tab.colnames: if i[0] == 'U': Us_n = np.hstack((Us_n, i)) for j in range(len(Us_n)): usn = np.asarray(tab[Us_n[j]])[msk] Us.append(usn) tim, flx, flxe = np.asarray(tab['BJD_TIME'])[msk], np.asarray(tab['FLUX'])[msk], np.asarray(tab['FLUXERR'])[msk] roll, xc, yc, bg = np.asarray(tab['ROLL'])[msk], np.asarray(tab['XC'])[msk], np.asarray(tab['YC'])[msk], np.asarray(tab['BG'])[msk] tf2 = np.asarray(tab['thermFront_2'])[msk] # Masking those points with high background values msk1 = np.where(bg<bgmin)[0] tim, flx, flxe, roll, xc, yc, bg, tf2 = tim[msk1], flx[msk1], flxe[msk1], roll[msk1], xc[msk1], yc[msk1], bg[msk1], tf2[msk1] Us1 = [] for i in range(len(Us_n)): us1 = Us[i][msk1] Us1.append(us1) # Normalising flux flx, flxe = flx/np.median(flx), flxe/np.median(flx) # Finally, sorting the data according to time idx_sort = np.argsort(tim) data = {} data['TIME'], data['FLUX'], data['FLUX_ERR'] = tim[idx_sort], flx[idx_sort], flxe[idx_sort] data['ROLL'], data['XC'], data['YC'], data['BG'] = roll[idx_sort], xc[idx_sort], yc[idx_sort], bg[idx_sort] data['TF2'] = tf2[idx_sort] for i in range(len(Us_n)): data[Us_n[i]] = Us1[i][idx_sort] return data
[docs] def get_drp_lightcurves(self, pout=os.getcwd(), load=False, save=False, aperture='default', visit_nos=None, filekey=None, bkg_clip=20): """Download or load Data Reduction Pipeline light curves from CHEOPS. This method queries the CHEOPS DACE database for light curve products (DRP level) for the target specified in ``self.object_name``. It can download a specific visit (by ``visit_nos`` or ``filekey``) or all available visits. Downloaded data are filtered to remove flagged points, high-background measurements, and cosmic rays (via EVENT/STATUS flags). Flux is normalized by its median value per instrument. Parameters ---------- pout : str, optional Output directory for downloaded/saved products. Created if it does not exist. Default is the current working directory. load : bool, optional If ``True``, skip downloading and instead load pre-downloaded files from ``pout``. Useful for re-reading data. Default is ``False``. save : bool, optional If ``True``, retain downloaded FITS files in ``pout`` after extraction. If ``False`` (default), files are deleted after reading. aperture : str, optional Aperture type to download ('default', 'optimal', 'rsup', and 'rinf). Default is ``'default'``. visit_nos : int or None, optional Specific visit number to download (e.g., 701, 101, etc.). If provided, only that visit's data are retrieved. Overridden by ``filekey``. Default is ``None``. filekey : str or None, optional Specific file key (e.g., 'CH_PR100009_TG000001_...') for precise product selection. Takes precedence over ``visit_nos``. Default is ``None``. bkg_clip : float, optional Sigma threshold for background clipping. Points with background above ``median(BG) + bkg_clip * MAD(BG)`` are removed. Default is ``20``. Returns ------- drp_data : dict Nested dictionary with structure:: {instrument_name: { 'TIME': ndarray, 'FLUX': ndarray, 'FLUX_ERR': ndarray, 'ROLL': ndarray, 'BG': ndarray, 'CONTAM': ndarray, # Contamination from nearby sources 'SMEAR': ndarray, # Smearing correction 'XC', 'YC': ndarray # Centroids }} Also stored in ``self.drp_data``. Notes ----- Data filtering steps: 1. Remove non-finite flux values 2. Remove points with EVENT != 0 or STATUS != 0 3. Remove high-background points (sigma clipping) The method creates a tar archive internally when downloading multiple files, which is extracted to ``pout`` then deleted. Examples -------- Download all DRP light curves for a target: >>> cheops = CHEOPSData(object_name='WASP-189b') >>> drp_data = cheops.get_drp_lightcurves(pout='./cheops_data') Download a specific visit: >>> drp_data = cheops.get_drp_lightcurves(visit_nos=1, pout='./data') Load previously downloaded data: >>> drp_data = cheops.get_drp_lightcurves(load=True, pout='./data') """ # When either filekey or visit_nos is provided, we can pin-point one particular visit # In that case, we can download particular light curve for particular visit # Otherwise, we will download DEFAULT aperture light curves for all visits # If pout doesn't exist then make one if not Path( pout ).exists(): os.mkdir( pout ) # Querying the CHEOPS database if ( filekey == None ) and ( visit_nos != None ): ## If file-key is not provided, but visit number is provided, we can find filekey from that values = Cheops.browse_products(filters={'target_name':{'equal': [self.object_name]}}, file_type='lightcurves') for i in range( len( values['file'] ) ): visit = 'TG' + f"{int(visit_nos):06}" if values['file'][i].split('_')[1] == visit: filekey = 'CH_' + values['file'][i].split('/')[1] if ( filekey == None ) and ( visit_nos == None ): filters = { 'target_name': { 'equal' : [self.object_name] } } else: filters = { 'target_name': { 'equal' : [self.object_name] },\ 'file_key': { 'equal' : [filekey] } } files = Cheops.browse_products(filters=filters, file_type='lightcurves', aperture=aperture) file_list = files['file'] if not load: Cheops.download(filters=filters, file_type='lightcurves', aperture=aperture, output_directory=pout) if len(file_list) != 1: os.system('tar -xzvf ' + pout + '/cheops_download.tar.gz -C ' + pout) self.drp_data = {} for ins in range(len(file_list)): ## Finding the instrument instrument = file_list[ins].split('/')[1].split('_')[1] self.drp_data[instrument] = {} ## Now loading the data if ( len(file_list) == 1 ) or load: hdul = fits.open( pout + '/' + file_list[ins].split('/')[-1] ) else: hdul = fits.open( pout + '/' + '/'.join(file_list[ins].split('/')[1:]) ) table = Table.read( hdul[1] ) ## Masking non-finite data points mask = np.isfinite( table['FLUX'] ) times, flux, flux_err = table['BJD_TIME'][mask], table['FLUX'][mask], table['FLUXERR'][mask] roll, bkg, contamination = table['ROLL_ANGLE'][mask], table['BACKGROUND'][mask], table['CONTA_LC'][mask] smear, cenx, ceny = table['SMEARING_LC'][mask], table['CENTROID_X'][mask], table['CENTROID_Y'][mask] event, status = table['EVENT'][mask], table['STATUS'][mask] ## Removing data points with EVENTS and STATUS msk3 = np.ones( len(times), dtype=bool ) msk3[ event != 0. ] = False msk3[ status != 0. ] = False times, flux, flux_err = times[msk3], flux[msk3], flux_err[msk3] roll, bkg, contamination = roll[msk3], bkg[msk3], contamination[msk3] smear, cenx, ceny = smear[msk3], cenx[msk3], ceny[msk3] ## Removing high background points msk_bkg = np.ones( len(times), dtype=bool ) msk_bkg[ bkg > ( np.nanmedian(bkg) + ( bkg_clip * mad_std(bkg) ) ) ] = False times, flux, flux_err = times[msk_bkg], flux[msk_bkg], flux_err[msk_bkg] roll, bkg, contamination = roll[msk_bkg], bkg[msk_bkg], contamination[msk_bkg] smear, cenx, ceny = smear[msk_bkg], cenx[msk_bkg], ceny[msk_bkg] # Saving the data in a dictionary self.drp_data[instrument]['TIME'], self.drp_data[instrument]['FLUX'], self.drp_data[instrument]['FLUX_ERR'] = times, flux / np.nanmedian(flux), flux_err / np.nanmedian(flux) self.drp_data[instrument]['ROLL'], self.drp_data[instrument]['BG'], self.drp_data[instrument]['CONTAM'] = roll, bkg, contamination self.drp_data[instrument]['SMEAR'], self.drp_data[instrument]['XC'], self.drp_data[instrument]['YC'] = smear, cenx, ceny if ( not save ) and ( not load ): if len(file_list) == 1: os.system('rm -rf ' + pout + '/' + file_list[ins].split('/')[-1]) else: os.system('rm -rf ' + pout + '/' + file_list[0].split('/')[1]) if save: if len(file_list) != 1: os.system('mv ' + pout + '/' + '/'.join(file_list[ins].split('/')[1:]) + ' ' + pout) os.system('rm -rf ' + pout + '/' + file_list[ins].split('/')[1] ) if not save: os.system('rm -rf ' + pout + '/cheops_download.tar.gz') return self.drp_data
[docs] def get_subarrays(self, pout=os.getcwd(), download_all=False, load=False, visit_nos=None, filekey=None): """Download or load CHEOPS sub-array image time-series data. This method queries and downloads sub-array FITS files from CHEOPS data products, which contain the flux measurements as 2D image cubes for each frame in the time series. These are useful for custom photometry and pixel-level analysis. Sub-array data are stored in ``self.time_sub``, ``self.flux_sub``, ``self.flux_err_sub``, and ``self.badpix_sub``. Parameters ---------- pout : str, optional Output directory for downloaded/saved products. Created if needed. Default is the current working directory. download_all : bool, optional If ``True``, download all available file types (not just sub-arrays). If ``False``, filter for sub-array products only. Default is ``False``. load : bool, optional If ``True``, skip downloading and load pre-downloaded files from ``pout``. Default is ``False``. visit_nos : int or None, optional Specific visit number to download. If provided with ``filekey=None``, only that visit's sub-arrays are retrieved. Default is ``None``. filekey : str or None, optional Specific file key for precise product selection. Takes precedence over ``visit_nos``. Default is ``None``. Returns ------- None Results are stored in instance dictionaries: - ``self.time_sub[instrument]`` : ndarray Time stamps (shape: N_frames). - ``self.flux_sub[instrument]`` : ndarray Flux images (shape: N_frames, Ny, Nx). - ``self.flux_err_sub[instrument]`` : ndarray Flux error (shape: N_frames, Ny, Nx) - ``self.badpix_sub[instrument]`` : ndarray Bad-pixel mask (shape: Ny, Nx) where 1 = good, 0 = bad. Notes ----- The method attempts to load a bad-pixel map from the CHEOPS data products. If not found, a default all-good mask is created. Sub-array file names are identified by the key 'COR_SubArray' in the filename. Examples -------- Download all sub-arrays for a target: >>> cheops = CHEOPSData(object_name='WASP-189b') >>> cheops.get_subarrays(pout='./cheops_data') Load previously downloaded sub-array data: >>> cheops.get_subarrays(load=True, pout='./data') """ filetype = 'all' if download_all else 'sub' # If pout doesn't exist then make one if not Path( pout ).exists(): os.mkdir( pout ) # Querying the CHEOPS database if ( filekey == None ) and ( visit_nos != None ): ## If file-key is not provided, but visit number is provided, we can find filekey from that values = Cheops.browse_products(filters={'target_name':{'equal': [self.object_name]}}, file_type=filetype) for i in range( len( values['file'] ) ): visit = 'TG' + f"{int(visit_nos):06}" if values['file'][i].split('_')[1] == visit: filekey = 'CH_' + values['file'][i].split('/')[1] if ( filekey == None ) and ( visit_nos == None ): filters = { 'target_name': { 'equal' : [self.object_name] } } else: filters = { 'target_name': { 'equal' : [self.object_name] },\ 'file_key': { 'equal' : [filekey] } } files = Cheops.browse_products(filters=filters, file_type=filetype) file_list = [] for i in range(len(files['file'])): keywd = 'COR_SubArray' if '_'.join(files['file'][i].split('_')[-3:-1]) == keywd: file_list.append( files['file'][i] ) if not load: Cheops.download(filters=filters, file_type=filetype, output_directory=pout) os.system('tar -xzvf ' + pout + '/cheops_download.tar.gz -C ' + pout) self.time_sub, self.flux_sub, self.flux_err_sub, self.badpix_sub = {}, {}, {}, {} for ins in range(len(file_list)): ## Finding the instrument instrument = file_list[ins].split('/')[1].split('_')[1] hdul = fits.open( pout + '/' + '/'.join(file_list[ins].split('/')[1:]) ) table = Table.read( hdul[2] ) # Reading the flux self.time_sub[instrument] = np.asarray( table['BJD_TIME'] ) self.flux_sub[instrument] = hdul[1].data self.flux_err_sub[instrument] = np.sqrt( hdul[1].data + table['RON'][:,None,None]**2 ) try: ## If we have bad-pixel file, we can use it for bad-pixel map hdul_badpix = fits.open( pout + '/' + file_list[ins].split('/')[1] + '/' + '_'.join(file_list[ins].split('/')[2].split('_')[0:4]) + '_PIP_COR_PixelFlagMapSubArray_V0300.fits') ## 2D badpixel map badpix2d = np.ones(hdul_badpix[1].data.shape) badpix2d[hdul_badpix[1].data != 0.] = 0. ## 3D badpixel map badpix = np.ones( hdul[1].data.shape ) badpix = badpix * badpix2d[None, :, :] except: ## Except, just assume that everything is good badpix = np.ones( hdul[1].data.shape ) self.badpix_sub[instrument] = badpix
def ApPhoto(self, visit_nos, aprad=None, sky_rad1=None, sky_rad2=None, brightpix=False, nos_brightest=12, nos_faintest=None, minmax=None): instrument = 'TG' + f"{int(visit_nos):06}" return ApPhoto(times=self.time_sub[instrument], frames=self.flux_sub[instrument], errors=self.flux_err_sub[instrument], badpix=self.badpix_sub[instrument],\ aprad=aprad, sky_rad1=sky_rad1, sky_rad2=sky_rad2, brightpix=brightpix, nos_brightest=nos_brightest, nos_faintest=nos_faintest, minmax=minmax)
[docs] class julietPlots(object): """Plotting helper for results produced by a `juliet` analysis. This class loads a `juliet` input/output folder, computes models from the fitted posterior and provides a collection of convenience plotting and data-processing methods for light-curve visualisation. Typical usage is to instantiate the class with the `input_folder` used for a juliet fit and then call plotting helpers such as :meth:`full_model_lc`, :meth:`plot_gp` or :meth:`detrend_data`. Attributes ---------- input_folder : str, optional Path to the juliet input/output folder provided at construction. The user need to provide either input_folder or dataset and res dataset : juliet.load object, optioanl The object returned by ``juliet.load`` for the given input folder. res : juliet.fit, optional The result of calling ``dataset.fit(...)``. N : int Number of posterior samples drawn when evaluating sampled models. **kwargs : dict, optional Any additional keywords provided to juliet.fit object. Main methods ------------ full_model_lc(instruments=None, save=False, nrandom=50, quantile_models=True) Plot observed light curves with the full fitted model and residuals. phase_folded_lc(phmin=0.8, instruments=None, highres=False, ...) Plot phase folder light curve with best-fitted planetary model for one or more instruments. plot_gp(instruments=None, highres=False, one_plot=False, ...) Plot GP components and binned residuals for one or more instruments. plot_fake_allan_deviation(instruments=None, binmax=10, method='pipe', timeunit=None) Plot "allan deviation" plot, which is noise as a function of binning, for one or more instruments plot_corner(planet_only=False, save=True) Plot corner plots for fitted posterior samples, can use planet-only or all parameters. """ def __init__(self, input_folder=None, dataset=None, res=None, N=1000, **kwargs): # First order of business: loading the juliet folder # We can optinally provide datasets and res directly if ( dataset is None ) and ( input_folder is not None ): self.dataset = juliet.load(input_folder=input_folder) self.res = self.dataset.fit(**kwargs) else: self.dataset = dataset self.res = res # Saving those kwargs self.fit_kwargs = kwargs # Location of the input folder self.input_folder = input_folder # Number of samples self.nsamps = N # Computing all models, for all instruments self.models_all_ins = {} self.all_mods_ins = {} # Sorting indices for all instruments (sometimes GP regressors are not time -- in that case, times # are sorted according to GP regressors, which will mess-up the time array -- we need to sort it again) self.idx_time_sort = {} for i in range(len(self.dataset.inames_lc)): # Evaluating the models ## This will return 5 elements: model samples, median model, upper_CI, lower_CI, and components self.models_all_ins[self.dataset.inames_lc[i]] =\ self.res.lc.evaluate(self.dataset.inames_lc[i], nsamples=self.nsamps, return_err=True,\ return_components=True, return_samples=True) # Saving all models self.all_mods_ins[self.dataset.inames_lc[i]] = self.res.lc.model[self.dataset.inames_lc[i]] # Sorting array self.idx_time_sort[self.dataset.inames_lc[i]] = np.argsort( self.dataset.times_lc[self.dataset.inames_lc[i]] )
[docs] def highres_planet_models_workaround(self, instrument, times_highres, GP_reg_highres, lin_reg_highres): """ The usual way of generating high time-resolution models is not working with juliet when light_travel_delay=True for occultations and phase curve modelling. This is a workaround for this.""" # Creating dictionaries to save high-res times, fluxes, and errors tim_highres, fl_highres, fle_highres = {}, {}, {} ## Actual high-res time array tim_highres[instrument] = times_highres ## For flux and flux error highres array, we will simply use array with ones fl_highres[instrument], fle_highres[instrument] = np.ones(len(times_highres)), np.ones(len(times_highres)) # GP and linear regressors would be arrays, so creating dictionaries to save them if GP_reg_highres is not None: gp_pars = {} gp_pars[instrument] = GP_reg_highres else: gp_pars = None if lin_reg_highres is not None: lin_pars = {} lin_pars[instrument] = lin_reg_highres else: lin_pars = None ## Again, running juliet.load and juliet.fit data_highres = juliet.load(priors=self.input_folder + '/priors.dat', t_lc=tim_highres, y_lc=fl_highres, yerr_lc=fle_highres,\ GP_regressors_lc=gp_pars, linear_regressors_lc=lin_pars, out_folder=self.input_folder) res_highres = data_highres.fit(**self.fit_kwargs) post_samps_new = res_highres.posteriors['posterior_samples'].copy() for i in post_samps_new.keys(): if ( i[0:5] == 'mflux' ) or ( i[0:9] == 'mdilution' ) or ( i[0:5] == 'theta' ) or ( i[0:2] == 'GP' ): post_samps_new[i] = np.zeros(len(post_samps_new[i])) planet_only_models_highres = \ res_highres.lc.evaluate(instrument, nsamples=self.nsamps, return_err=True, parameter_values=post_samps_new,\ return_components=True, return_samples=True, evaluate_transit=True) return planet_only_models_highres
[docs] def full_model_lc(self, instruments=None, save=False, nrandom=50, quantile_models=True): """Plot the full fitted light-curve model for one or more instruments. This method creates a figure per instrument showing the observed light curve with the fitted (full) model overplotted and a residuals panel beneath. Parameters ---------- instruments : list or None List of instrument names to plot. If ``None``, all instruments in the current juliet dataset plotted. save : bool, optional If ``True``, each figure is saved to ``<input_folder>/full_model_<instrument>.png``. Default ``False``. nrandom : int, optional Number of random posterior-sample models to draw when ``quantile_models`` is ``False``. Default is 50. quantile_models : bool, optional If ``True``, plot the 68%% credible interval as a filled band; if ``False``, plot ``nrandom`` random posterior models instead. Returns ------- all_fig : list List of matplotlib Figure objects created (one per instrument). all_axs1 : list List of top-panel Axes (data + model) for each figure. all_axs2 : list List of bottom-panel Axes (residuals) for each figure. Notes ----- The routine expects that ``self.models_all_ins`` was populated by a prior call (see the class constructor) and that the dataset arrays (times, data, errors) are available in ``self.dataset``. """ # Computing the model for all instruments if instruments is None: # If instruments is None, then all instruments are selected instruments = self.dataset.inames_lc all_fig, all_axs1, all_axs2 = [], [], [] for i in range(len(instruments)): # Data tim_ins = self.dataset.times_lc[instruments[i]][self.idx_time_sort[instruments[i]]] fl_ins = self.dataset.data_lc[instruments[i]][self.idx_time_sort[instruments[i]]] fle_ins = self.dataset.errors_lc[instruments[i]][self.idx_time_sort[instruments[i]]] ## Median model full_model = self.models_all_ins[instruments[i]][1][self.idx_time_sort[instruments[i]]] ## Quantile models up_68CI = self.models_all_ins[instruments[i]][2][self.idx_time_sort[instruments[i]]] lo_68CI = self.models_all_ins[instruments[i]][3][self.idx_time_sort[instruments[i]]] ## Random models sample_models = self.models_all_ins[instruments[i]][0][:, self.idx_time_sort[instruments[i]]] idx = np.random.choice( np.arange(sample_models.shape[0]), size=nrandom ) random_models = sample_models[idx, :] # And figure fig = plt.figure() gs = gd.GridSpec(2,1, height_ratios=[2,1]) # Top panel ax1 = plt.subplot(gs[0]) ax1.errorbar(tim_ins, fl_ins, yerr=fle_ins, fmt='.', elinewidth=0.5, markersize=1.5, color='dodgerblue')#, alpha=0.3) ax1.plot(tim_ins, full_model, c='navy', lw=2, zorder=50) if quantile_models: ax1.fill_between(x=tim_ins, y1=lo_68CI, y2=up_68CI, color='orangered', alpha=0.5, zorder=25) else: for rand in range(nrandom): ax1.plot(tim_ins, random_models[rand,:], color='orangered', alpha=0.5, lw=1., zorder=25) ax1.set_ylabel('Relative Flux') ax1.set_title('Full fitted model for instrument ' + str(instruments[i])) ax1.set_xlim(np.min(tim_ins), np.max(tim_ins)) ax1.xaxis.set_major_formatter(plt.NullFormatter()) # Bottom panel ax2 = plt.subplot(gs[1]) ax2.errorbar(tim_ins, (fl_ins-full_model)*1e6, yerr=fle_ins*1e6, fmt='.', elinewidth=0.5, markersize=1.5, color='dodgerblue')#, alpha=0.3) ax2.axhline(y=0.0, c='black', ls='--', zorder=100) ax2.set_ylabel('Residuals [ppm]') ax2.set_xlabel('Time [BJD]') ax2.set_xlim(np.min(tim_ins), np.max(tim_ins)) all_fig.append(fig) all_axs1.append(ax1) all_axs2.append(ax2) if save: plt.savefig(self.input_folder + '/full_model_' + instruments[i] + '.png', dpi=250) return all_fig, all_axs1, all_axs2
[docs] def detrend_data(self, phmin, instruments, planet='p1'): """This function will generate detrended data from the raw data. It will also generate phases.""" # First, create detrended dataset and models ## For detrended dataset self.phase, self.detrended_data, self.detrended_errs = {}, {}, {} self.ph_minimum, self.ph_maximum = 1e10, 1e-10 ## For normalising the data, we will compute F1 and F2, ## We can use the same F1 and F2 to normalise the model as well ## So, let's save them! self.F1, self.F2 = {}, {} for i in range(len(instruments)): # ----------------------------------------- # Our first task is to compute # period and t0: # ----------------------------------------- try: per = np.nanmedian(self.res.posteriors['posterior_samples']['P_' + planet]) except: per = self.all_mods_ins[instruments[i]]['params'].per try: t0 = np.nanmedian(self.res.posteriors['posterior_samples']['t0_' + planet]) except: t0 = self.all_mods_ins[instruments[i]]['params'].t0 # ----------------------------------------- # ----------------------------------------- # Next we would like to compute GP # and linear models, so that we can # detrend the data # ----------------------------------------- # GP model if self.dataset.GP_lc_arguments != None: if instruments[i] in self.dataset.GP_lc_arguments.keys(): gp_model = self.all_mods_ins[instruments[i]]['GP'] else: gp_model = 0. else: gp_model = 0. if instruments[i] in self.dataset.lm_lc_arguments.keys(): linear_model = self.models_all_ins[instruments[i]][-1]['lm'] else: linear_model = 0. # ----------------------------------------- # ----------------------------------------- # Finally, we want to compute F1 and F2 # So we can normalise the detrended data # The full model is defined as (courtesy of the original juliet code), # y = [ T * ( D / (1 + D*M) ) ] + [ (1 - D) / (1 + D*M)] # => y = T * F1 + F2 # => T = (y - F2) / F1 # ----------------------------------------- mflx = self.res.posteriors['posterior_samples']['mflux_' + instruments[i]] try: mdil = self.res.posteriors['posterior_samples']['mdilution_' + instruments[i]] except: mdil = np.ones( len(mflx) ) F1 = np.nanmedian( mdil / ( 1 + (mdil * mflx) ) ) F2 = np.nanmedian( (1 - mdil) / (1 + (mdil * mflx) ) ) ## Saving F1 and F2 self.F1[instruments[i]], self.F2[instruments[i]] = F1, F2 sigw = np.nanmedian(self.res.posteriors['posterior_samples']['sigma_w_' + instruments[i]] * 1e-6) ## Here we create detrended data by subtracting GP (if any) and linear (if any) models from the raw data self.detrended_data[instruments[i]] = ( self.dataset.data_lc[instruments[i]] - gp_model - linear_model - F2) / F1 self.detrended_errs[instruments[i]] = np.sqrt( self.dataset.errors_lc[instruments[i]]**2 + sigw**2 ) phases_instrument = juliet.utils.get_phases(self.dataset.times_lc[instruments[i]], P=per, t0=t0, phmin=phmin) self.phase[instruments[i]] = phases_instrument # Computing the minimum and maximum of the orbital phases if np.max(phases_instrument) > self.ph_maximum: self.ph_maximum = np.max(phases_instrument) if np.min(phases_instrument) < self.ph_minimum: self.ph_minimum = np.min(phases_instrument)
def detrend_model(self, instruments, phmin, highres, planet='p1'): # If we want to create high-resolution model (highres=True), then we will use the same times for all instruments # If not, then we will use different times for different instruments self.times_model, self.phases_model = {}, {} self.planet_only_models = {} # ----------------------------------------- # Our first task is to compute # period and t0: # ----------------------------------------- try: per = np.nanmedian(self.res.posteriors['posterior_samples']['P_' + planet]) except: per = self.all_mods_ins[instruments[0]]['params'].per try: t0 = np.nanmedian(self.res.posteriors['posterior_samples']['t0_' + planet]) except: t0 = self.all_mods_ins[instruments[0]]['params'].t0 # We also have to create a new posterior array since when we compute detrended model, # we only want uncertainties from planetary parameters to be propagated to the model uncertainties # So, we will make posteriors of mflux, theta, and GP to zero, such that any uncertainties # in them would not propagate to model uncertainties. post_samps_new = self.res.posteriors['posterior_samples'].copy() for i in post_samps_new.keys(): if ( i[0:5] == 'mflux' ) or ( i[0:9] == 'mdilution' ) or ( i[0:5] == 'theta' ) or ( i[0:2] == 'GP' ): post_samps_new[i] = np.zeros(len(post_samps_new[i])) if highres: ## First selecting the times ## This this is a high time resolution scenario so we will create a dummy time array dummy_phs_highres = np.linspace(self.ph_minimum, self.ph_maximum, 10000) dummy_time_highres = t0 + (dummy_phs_highres * per) # Now loop over all instruments to create high time-resolution planet-only models for ins in range(len(instruments)): ## First we have to check if there are GP or linear model fitted to the data ## Because, we have to create high-resolution GP and linear regressors as well. ## We will do this by interpolating the original GP and linear regressors ### High resolution GP regressors ### (we have to remember that everything is sorted according to GP regressors) ### (while dummy time is sorted according to times) ### (we will perform interpolation to arrays sorted according to time) if self.dataset.GP_lc_arguments != None: if instruments[ins] in self.dataset.GP_lc_arguments.keys(): #! Try/except because it is possible that not all instruments have GP model try: inter_gp = interp1d(x=self.dataset.times_lc[instruments[ins]][self.idx_time_sort[instruments[ins]]],\ y=self.dataset.GP_lc_arguments[instruments[ins]][:,0][self.idx_time_sort[instruments[ins]]],\ kind='cubic') gp_reg_highres = inter_gp(x=dummy_time_highres) #### Now, we can sort time, phase, gp regressors according to GP regressors idx_gp = np.argsort( gp_reg_highres ) dummy_time_highres, dummy_phs_highres = dummy_time_highres[idx_gp], dummy_phs_highres[idx_gp] gp_reg_highres = gp_reg_highres[idx_gp] except: gp_reg_highres = None else: gp_reg_highres = None else: gp_reg_highres = None ### High resolution linear regressors if instruments[ins] in self.dataset.lm_lc_arguments.keys(): #! Again, try/except because not all instruments might have a linear model ## (Here, we don't need to sort according to time -- because the dummy_times should have now ## sorted according to GP regressors, if applicable) try: inter_lin = interp1d(x=self.dataset.times_lc[instruments[ins]],\ y=self.dataset.lm_lc_arguments[instruments[ins]],\ kind='cubic', axis=0) lin_reg_highres = inter_lin(x=dummy_time_highres) except: lin_reg_highres = None else: lin_reg_highres = None try: # This high-res time models will not work for occultations/phase curves, when light_travel_delay=True self.planet_only_models[instruments[ins]] = \ self.res.lc.evaluate(instruments[ins], nsamples=self.nsamps, return_err=True, parameter_values=post_samps_new,\ return_components=True, return_samples=True, evaluate_transit=True,\ t=dummy_time_highres, GPregressors=gp_reg_highres, LMregressors=lin_reg_highres) except: # In case when it doesn't work, I have invented a workaround self.planet_only_models[instruments[ins]] = \ self.highres_planet_models_workaround(instrument=instruments[ins], times_highres=dummy_time_highres,\ GP_reg_highres=gp_reg_highres, lin_reg_highres=lin_reg_highres) ## Saving times and phases for model self.times_model[instruments[ins]] = dummy_time_highres self.phases_model[instruments[ins]] = dummy_phs_highres else: for ins in range(len(instruments)): # This should be fairly trivial since we don't need to compute the models at higher time resolution self.planet_only_models[instruments[ins]] = \ self.res.lc.evaluate(instruments[ins], nsamples=self.nsamps, return_err=True, parameter_values=post_samps_new,\ return_components=True, return_samples=True, evaluate_transit=True) ## Saving times and phases for model self.times_model[instruments[ins]] = self.dataset.times_lc[instruments[ins]] self.phases_model[instruments[ins]] = juliet.utils.get_phases(t=self.dataset.times_lc[instruments[ins]], P=per, t0=t0, phmin=phmin)
[docs] def phase_folded_lc(self, phmin=0.8, instruments=None, planet='p1', highres=False, nrandom=50, quantile_models=True, one_plot=None, figsize=(16/1.5, 9/1.5), pycheops_binning=False, nos_bin_tra=20, nos_bin_pc=30): """Plot phase-folded light curves and models for the fitted dataset. This method computes detrended data and planetary models by calling ``detrend_data`` and ``detrend_model`` internally, detects whether the fit contains transits, eclipses or phase-curves, and then produces matplotlib figures showing the data, model, and residuals. It can produce either one figure per instrument or a single combined figure with all instruments plotted together. Parameters ---------- phmin : float, optional Minimum phase value used for generating orbital phase (default ``0.8``). instruments : list or None, optional List of instrument names to plot. If ``None``, all instruments in the juliet dataset (``self.dataset.inames_lc``) are used. One plot is produced if all instruments are provided; otherwise, one plot per instrument is created. planet : str, optional Planet identifier used in the juliet fit (e.g., 'p1', 'p2', etc.) to select the appropriate planet model. Default is 'p1'. highres : bool, optional If ``True``, compute and plot high time-resolution planet-only models. Default ``False``. nrandom : int, optional Number of random posterior-sample models to draw when ``quantile_models`` is ``False``. Default ``50``. quantile_models : bool, optional If ``True``, display the 68%% credible interval as a filled band; if ``False``, overplot ``nrandom`` random posterior samples. Default ``True``. one_plot : bool or None, optional If ``True``, produce a single combined plot for all instruments (regardless of the number of instruments provided). If ``False``, produce one plot per instrument (unless all instruments are provided). If ``None``, the behaviour is as just described. Default ``None``. figsize : tuple, optional Figure size passed to matplotlib when creating figures. Default is ``(16/1.5, 9/1.5)``. pycheops_binning : bool, optional If ``True``, use binning as produced by ``pycheops`` the binned datapoints. If ``False``, it will use default ``juliet`` binning. Default ``False``. nos_bin_tra : int, optional Number of total binned data points in transit plot. Default is 20. nos_bin_pc : int, optional Number of total binned data points in phase curve plot. Default is 30. Returns ------- If a single combined plot is produced the function returns the combined ``fig`` and the primary axes used. If multiple figures are generated, the function returns lists: ``figs_all, axs1_all, axs2_all, axs3_all, axs4_all`` corresponding to created figures and their axes panels. """ # ------------------------------------------- # Do we want one plot? # ------------------------------------------- # If instruments == None, then we will make one common plot for _all_ instruments # Also, if all instruments are provided, this will make one common plot # Otherwise, one plot per instrument # However, sometimes we want to force one plot, so we have the option one_plot if one_plot is None: one_plot = True if instruments != None: if len(instruments) != len(self.dataset.inames_lc): one_plot = False else: instruments = self.dataset.inames_lc # ------------------------------------------- # Compute t14 # ------------------------------------------- # Let's compute the transit/occultation duration for the planet (that can help setting the xlimits of the plots) ## We need several other parameters for that first ### Orbital period try: per = np.nanmedian(self.res.posteriors['posterior_samples']['P_' + planet]) except: per = self.all_mods_ins[instruments[0]]['params'].per ### Rp/R* try: rprs = np.nanmedian(self.res.posteriors['posterior_samples']['p_' + planet]) except: rprs = self.all_mods_ins[instruments[0]]['params'].rp ### a/R* if 'rho' in self.res.model_parameters: ### This would mean that we have fitted for rho. We can convert rho to a/* try: rho = np.nanmedian(self.res.posteriors['posterior_samples']['rho']) ar = rho_to_ar(rho, per) except: ar = self.all_mods_ins[instruments[0]]['params'].a else: ### That means that we directly fit for a/R* try: ar = np.nanmedian(self.res.posteriors['posterior_samples']['a_' + planet]) except: ar = self.all_mods_ins[instruments[0]]['params'].a ### b try: b = np.nanmedian(self.res.posteriors['posterior_samples']['b_' + planet]) except: inc = self.all_mods_ins[instruments[0]]['params'].inc b = inc_to_b(inc=inc, ar=ar, ecc=self.all_mods_ins[instruments[0]]['params'].ecc,\ omega=self.all_mods_ins[instruments[0]]['params'].w) t14_dur = t14(per=per, ar=ar, rprs=rprs, b=b, ecc=self.all_mods_ins[instruments[0]]['params'].ecc,\ omega=self.all_mods_ins[instruments[0]]['params'].w) t14_phs = t14_dur / per # ------------------------------------------- # Computing detrended data and model # ------------------------------------------- self.detrend_data(phmin=phmin, instruments=instruments, planet=planet) self.detrend_model(instruments=instruments, phmin=phmin, highres=highres, planet=planet) # ------------------------------------------- # Type of fitting # ------------------------------------------- # We can find the type of fitting just by looking at the phase range covered phasecurve, transit, eclipse = False, False, False for ins in range(len(instruments)): if self.dataset.lc_options[instruments[ins]]['TranEclFit']: phasecurve = True elif self.dataset.lc_options[instruments[ins]]['EclipseFit']: eclipse = True else: transit = True ## For phase curve fitting, eclipse fitting is by default true, so, we need to manually turn it off if phasecurve and eclipse: eclipse = False ## Sometimes, even for just eclipse fitting we might have turned full transit+eclipse fitting, so double check ## if there is indeed phase curve fitting in the data if phasecurve: diff_phs = self.ph_maximum - self.ph_minimum if diff_phs<0.3: phasecurve = False eclipse = True # ------------------------------------------- # Starting figure code: # one plot: do plt.figure() here # Depending on phase curve/transit/occ # We need either 4 panels, or 2 panels # ------------------------------------------- if one_plot: if not phasecurve: fig = plt.figure(figsize=figsize) gs = gd.GridSpec(2,1, height_ratios=[2,1]) ax1 = plt.subplot(gs[0]) # Top panel ax2 = plt.subplot(gs[1]) # Bottom panel ax3, ax4 = None, None else: fig = plt.figure(figsize=figsize) gs = gd.GridSpec(2, 2, height_ratios=[2,1], width_ratios=[1,3])#, wspace=0.1) ax1 = plt.subplot(gs[0,0]) # Upper-left (for transit data) ax2 = plt.subplot(gs[1,0]) # Bottom-left (for transit residuals) ax3 = plt.subplot(gs[0,1]) # Upper-right (for phase curve) ax4 = plt.subplot(gs[1,1]) # Bottom-right (for phase curve residuals) # If there is one plot, then we also need to # collect all data to compute binned data bin_phs, bin_fl, bin_fle, bin_res = np.array([]), np.array([]), np.array([]), np.array([]) # Also, in case we don't have highres model, we want to save all models before plotting all_models_for_plotting = np.array([]) all_u68_mods_plotting, all_l68_mods_plotting = np.array([]), np.array([]) # ------------------------------------------------------------------------ # Creating a list of all fig, axes to return at the end of the function # if one_plot==False if not one_plot: figs_all, axs1_all, axs2_all, axs3_all, axs4_all = [], [], [], [], [] for i in range(len(instruments)): # ------------------------------------------- # Starting figure code: # If not one plot: do plt.figure() # inside the loop # Depending on phase curve/transit/occ # We need either 4 panels, or 2 panels # ------------------------------------------- if not one_plot: if not phasecurve: fig = plt.figure(figsize=figsize) gs = gd.GridSpec(2,1, height_ratios=[2,1]) ax1 = plt.subplot(gs[0]) ax2 = plt.subplot(gs[1]) ax3, ax4 = None, None else: fig = plt.figure(figsize=figsize) gs = gd.GridSpec(2, 2, height_ratios=[2,1], width_ratios=[1,3])#, wspace=0.1) ax1 = plt.subplot(gs[0,0]) # Upper-left (for transit data) ax2 = plt.subplot(gs[1,0]) # Bottom-left (for transit residuals) ax3 = plt.subplot(gs[0,1]) # Upper-right (for phase curve) ax4 = plt.subplot(gs[1,1]) # Bottom-right (for phase curve residuals) # ------------------------------------------- # Collecting the required data # ------------------------------------------- # Detrended data ## Sorting according to phase idx_phs = np.argsort(self.phase[instruments[i]]) phs = self.phase[instruments[i]][idx_phs] detrend_data = self.detrended_data[instruments[i]][idx_phs] detrend_errs = self.detrended_errs[instruments[i]][idx_phs] # Detrended model ## Sorting according to phase idx_phs_mod = np.argsort( self.phases_model[instruments[i]] ) phs_model = self.phases_model[instruments[i]][idx_phs_mod] detrend_model = self.planet_only_models[instruments[i]][1][idx_phs_mod] ## Quantile models up_68CI = self.planet_only_models[instruments[i]][2][idx_phs_mod] lo_68CI = self.planet_only_models[instruments[i]][3][idx_phs_mod] ## Random models sample_models = self.planet_only_models[instruments[i]][0][:, idx_phs_mod] idx = np.random.choice( np.arange(sample_models.shape[0]), size=nrandom ) random_models = sample_models[idx, :] # Residuals residuals = ( self.dataset.data_lc[instruments[i]] - self.models_all_ins[instruments[i]][1] )[idx_phs] # If we have phase curve fitting, we will use ppm units for the y-axis if phasecurve or eclipse: detrend_data, detrend_errs = (detrend_data-1.)*1e6, detrend_errs*1e6 detrend_model = (detrend_model-1.)*1e6 random_models = (random_models-1.)*1e6 lo_68CI, up_68CI = (lo_68CI-1.)*1e6, (up_68CI-1.)*1e6 if one_plot: # ------------------------------------------- # Saving the data for binning # outside the loop # ------------------------------------------- bin_phs = np.hstack(( bin_phs, phs )) bin_fl, bin_fle = np.hstack(( bin_fl, detrend_data )), np.hstack(( bin_fle, detrend_errs )) bin_res = np.hstack(( bin_res, residuals )) all_models_for_plotting = np.hstack(( all_models_for_plotting, detrend_model )) all_l68_mods_plotting = np.hstack(( all_l68_mods_plotting, lo_68CI )) all_u68_mods_plotting = np.hstack(( all_u68_mods_plotting, up_68CI )) # ---------------------------------------------------------- # The actual plotting code starts from here # ---------------------------------------------------------- # ------------------------------------------- # Top left: transit data # ------------------------------------------- if phasecurve: ppt = 1e-3 else: ppt = 1. ax1.errorbar(phs, detrend_data*ppt, fmt='.', elinewidth=0.5, markersize=1.5, alpha=0.5, color='dodgerblue', zorder=1) if (not one_plot) or highres: ## if we are making one-plot, we can plot the detrended model out of loop ax1.plot(phs_model, detrend_model*ppt, color='navy', lw=2.5, zorder=50) if quantile_models: if (not one_plot) or highres: ## Again, if we are making one plot, we can plot the detrended quantile models out of loop ax1.fill_between(phs_model, y1=lo_68CI*ppt, y2=up_68CI*ppt, color='orangered', alpha=0.15, zorder=25) else: for rand in range(nrandom): ax1.plot(phs_model, random_models[rand,:]*ppt, color='orangered', alpha=0.5, lw=1., zorder=25) # Limits ## Maximum of the transit model would always be 1. (or, 0 in case we have phase curve model, and we decided to use ppm) if transit or phasecurve: ax1.set_xlim(-0.7*t14_phs, 0.7*t14_phs) else: ax1.set_xlim(0.5-0.7*t14_phs, 0.5+0.7*t14_phs) if transit: ax1.set_ylim([np.min(detrend_model)-0.2*np.ptp(detrend_model), 1+0.2*np.ptp(detrend_model)]) elif phasecurve: ax1.set_ylim([np.min(detrend_model*ppt)-0.2*np.ptp(detrend_model*ppt), 0.+0.2*np.ptp(detrend_model*ppt)]) else: ax1.set_ylim([1-0.3*np.ptp(detrend_model), np.max(detrend_model)+0.3*np.ptp(detrend_model)]) # ------------------------------------------- # Bottom left: transit residuals # ------------------------------------------- ax2.errorbar(phs, residuals*ppt*1e6, fmt='.', elinewidth=0.5, markersize=1.5, alpha=0.5, color='dodgerblue', zorder=1) ax2.axhline(y=0., ls='--', color='k', zorder=50) # Limits (x-limit same as the top axis; for y-lim, we will set 3-sigma from median) if transit or phasecurve: ax2.set_xlim(-0.7*t14_phs, 0.7*t14_phs) else: ax2.set_xlim(0.5-0.7*t14_phs, 0.5+0.7*t14_phs) # ------------------------------------------- # Binned data: For TRANSIT # ------------------------------------------- ## We do binned data here: ONLY if we don't have one plot if not one_plot: if transit or phasecurve: idx_transits = ( phs > -0.8*t14_phs ) & ( phs < 0.8*t14_phs ) else: idx_transits = ( phs > 0.5-0.8*t14_phs ) & ( phs < 0.5+0.8*t14_phs ) if not pycheops_binning: nbin = int( np.sum(idx_transits) / nos_bin_tra ) bin_phs_tra, bin_fl_tra, bin_fle_tra = juliet.utils.bin_data(x=phs[idx_transits], y=detrend_data[idx_transits]*ppt, n_bin=nbin, yerr=detrend_errs[idx_transits]*ppt) if phasecurve or eclipse: _, bin_res_tra, bin_reserr_tra = juliet.utils.bin_data(x=phs[idx_transits], y=residuals[idx_transits]*ppt*1e6, n_bin=nbin, yerr=detrend_errs[idx_transits]*ppt) else: # When transit only, we haven't multiplied the errorbars with 1e6 yet; so we need to do it again _, bin_res_tra, bin_reserr_tra = juliet.utils.bin_data(x=phs[idx_transits], y=residuals[idx_transits]*1e6, n_bin=nbin, yerr=detrend_errs[idx_transits]*1e6) else: binwid = np.ptp( phs[idx_transits] ) / nos_bin_tra bin_phs_tra, bin_fl_tra, bin_fle_tra, _ = lcbin(time=phs[idx_transits], flux=detrend_data[idx_transits]*ppt, binwidth=binwid) if phasecurve or eclipse: _, bin_res_tra, bin_reserr_tra, _ = lcbin(time=phs[idx_transits], flux=residuals[idx_transits]*ppt*1e6, binwidth=binwid) else: # When transit only, we haven't multiplied the errorbars with 1e6 yet; so we need to do it again _, bin_res_tra, bin_reserr_tra, _ = lcbin(time=phs[idx_transits], flux=residuals[idx_transits]*1e6, binwidth=binwid) # Plotting them ax1.errorbar(bin_phs_tra, bin_fl_tra, yerr=bin_fle_tra, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100) ax2.errorbar(bin_phs_tra, bin_res_tra, yerr=bin_reserr_tra, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100) # Setting y-lim on residuals based on binned residuals ax2.set_ylim(np.nanmedian(bin_res_tra)-5*pipe_mad(bin_res_tra), np.nanmedian(bin_res_tra)+5*pipe_mad(bin_res_tra)) # ------------------------------------------- # If phase curve modelling, # then we also need right panel # ------------------------------------------- if phasecurve: # ------------------------------------------- # Top right: phase curve # ------------------------------------------- ax3.errorbar(phs, detrend_data, fmt='.', elinewidth=0.5, markersize=1.5, alpha=0.5, color='dodgerblue', zorder=1) if (not one_plot) or highres: ## If we are making one plot, then we can take this outside of the loop ax3.plot(phs_model, detrend_model, color='navy', lw=2.5, zorder=50) ax3.axhline(0.0, color='k', ls='--', lw=1.5, zorder=45) if quantile_models: if (not one_plot) or highres: ## If we are making one plot, we can take this outside of the loop ax3.fill_between(phs_model, y1=lo_68CI, y2=up_68CI, color='orangered', alpha=0.15, zorder=25) else: for rand in range(nrandom): ax3.plot(phs_model, random_models[rand,:], color='orangered', alpha=0.5, lw=1., zorder=25) # Limits ax3.set_xlim(phmin-1., phmin) ## For y-lim; we have baseline of 0, we can leave half occultation depth/phase amplitude below and above ylen = np.ptp( detrend_model[np.abs(phs_model) > 0.1] ) ax3.set_ylim([-0.4*ylen, 1.5*ylen]) # ------------------------------------------- # Bottom right: phase curve residuals # ------------------------------------------- ax4.errorbar(phs, residuals*1e6, fmt='.', elinewidth=0.5, markersize=1.5, alpha=0.5, color='dodgerblue', zorder=1) ax4.axhline(y=0., ls='--', color='k', zorder=10) # Limits ax4.set_xlim(phmin-1., phmin) # ------------------------------------------- # Binned data: For PHASE CURVE # ------------------------------------------- ## We do binned data here: ONLY if we don't have one plot if not one_plot: if not pycheops_binning: bin_phs_pc, bin_fl_pc, bin_fle_pc = juliet.utils.bin_data(x=phs, y=detrend_data, n_bin=int( len(phs)/nos_bin_pc ), yerr=detrend_errs) _, bin_res_pc, bin_reserr_pc = juliet.utils.bin_data(x=phs, y=residuals*1e6, n_bin=int( len(phs)/nos_bin_pc ), yerr=detrend_errs) else: bin_phs_pc, bin_fl_pc, bin_fle_pc, _ = lcbin(time=phs, flux=detrend_data, binwidth=1/nos_bin_pc) _, bin_res_pc, bin_reserr_pc, _ = lcbin(time=phs, flux=residuals*1e6, binwidth=1/nos_bin_pc) # Plotting them ax3.errorbar(bin_phs_pc, bin_fl_pc, yerr=bin_fle_pc, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100) ax4.errorbar(bin_phs_pc, bin_res_pc, yerr=bin_reserr_pc, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100) ax4.set_ylim(np.nanmedian(bin_res_pc)-5*pipe_mad(bin_res_pc), np.nanmedian(bin_res_pc)+5*pipe_mad(bin_res_pc)) if not one_plot: # --------------------------------------------------- # Figure labels etc. # --------------------------------------------------- # ------------------------ #. Transits # ------------------------ ## First, removing the x-axis labels of top panels ax1.xaxis.set_major_formatter(plt.NullFormatter()) # x-and y-labels if not phasecurve: if not eclipse: ax1.set_ylabel('Normalised flux') else: ax1.set_ylabel('Normalised flux [ppm]') ax2.set_ylabel('Residuals [ppm]') else: ax1.set_ylabel('Normalised flux [ppt]') ax2.set_ylabel('Residuals [ppt]') ax2.set_xlabel('Orbital phase') # ------------------------ #. Transits # ------------------------ if phasecurve: ## Removing x-axis labels of top panels ax3.xaxis.set_major_formatter(plt.NullFormatter()) ## Now, we will switch the y-axis labels to right side for phase curve panels ax3.yaxis.tick_right() ax3.tick_params(labelright=True) ax3.yaxis.set_label_position('right') ax4.yaxis.tick_right() ax4.tick_params(labelright=True) ax4.yaxis.set_label_position('right') ## x-and y-labels ax3.set_ylabel('Normalised flux [ppm]', rotation=270, labelpad=25) ax4.set_xlabel('Orbital phase') ax4.set_ylabel('Residuals [ppm]', rotation=270, labelpad=25) # Saving figs and axes if not one_plot figs_all.append(fig) axs1_all.append(ax1) axs2_all.append(ax2) axs3_all.append(ax3) axs4_all.append(ax4) # ------------------------------------------------------------- # Now, we plot the binned data when # we have one plot # ------------------------------------------------------------- if one_plot:# and (not highres): ## This ppt will convert transit data to PPT, when phase curve is plotted if phasecurve: ppt = 1e-3 else: ppt = 1. # ------------------------------------------- # Binned data: For Transits # ------------------------------------------- idx_phs_bin = np.argsort(bin_phs) bin_phs, bin_fl, bin_fle, bin_res = bin_phs[idx_phs_bin], bin_fl[idx_phs_bin], bin_fle[idx_phs_bin], bin_res[idx_phs_bin] # Now, we want to decide nbin (number of data points to bin) # We would like to have different binning for transit and phase curve -- so first selecting nbin for transit/occultation if transit or phasecurve: idx_transits = ( bin_phs > -0.8*t14_phs ) & ( bin_phs < 0.8*t14_phs ) else: idx_transits = ( bin_phs > 0.5-0.8*t14_phs ) & ( bin_phs < 0.5+0.8*t14_phs ) if not pycheops_binning: nbin = int( np.sum(idx_transits) / nos_bin_tra ) # Performing binning bin_phs_tra, bin_fl_tra, bin_fle_tra = juliet.utils.bin_data(x=bin_phs[idx_transits], y=bin_fl[idx_transits]*ppt, n_bin=nbin, yerr=bin_fle[idx_transits]*ppt) if phasecurve or eclipse: _, bin_res_tra, bin_reserr_tra = juliet.utils.bin_data(x=bin_phs[idx_transits], y=bin_res[idx_transits]*ppt*1e6, n_bin=nbin, yerr=bin_fle[idx_transits]*ppt) else: # When transit only, we haven't multiplied the errorbars with 1e6 yet; so we need to do it again _, bin_res_tra, bin_reserr_tra = juliet.utils.bin_data(x=bin_phs[idx_transits], y=bin_res[idx_transits]*1e6, n_bin=nbin, yerr=bin_fle[idx_transits]*1e6) else: binwid = np.ptp( bin_phs[idx_transits] ) / nos_bin_tra # Performing binning bin_phs_tra, bin_fl_tra, bin_fle_tra, _ = lcbin(time=bin_phs[idx_transits], flux=bin_fl[idx_transits]*ppt, binwidth=binwid) if phasecurve or eclipse: _, bin_res_tra, bin_reserr_tra, _ = lcbin(time=bin_phs[idx_transits], flux=bin_res[idx_transits]*ppt*1e6, binwidth=binwid) else: # When transit only, we haven't multiplied the errorbars with 1e6 yet; so we need to do it again _, bin_res_tra, bin_reserr_tra, _ = lcbin(time=bin_phs[idx_transits], flux=bin_res[idx_transits]*1e6, binwidth=binwid) # Plotting them ax1.errorbar(bin_phs_tra, bin_fl_tra, yerr=bin_fle_tra, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100) ax2.errorbar(bin_phs_tra, bin_res_tra, yerr=bin_reserr_tra, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100) # Setting y-lim on residuals based on binned residuals ax2.set_ylim(np.nanmedian(bin_res_tra)-5*pipe_mad(bin_res_tra), np.nanmedian(bin_res_tra)+5*pipe_mad(bin_res_tra)) # ------------------------------------------- # Binned data: For PHASE CURVE # ------------------------------------------- if phasecurve: if not pycheops_binning: bin_phs_pc, bin_fl_pc, bin_fle_pc = juliet.utils.bin_data(x=bin_phs, y=bin_fl, n_bin=int( len(bin_phs)/nos_bin_pc ), yerr=bin_fle) _, bin_res_pc, bin_reserr_pc = juliet.utils.bin_data(x=bin_phs, y=bin_res*1e6, n_bin=int( len(bin_phs)/nos_bin_pc ), yerr=bin_fle) else: bin_phs_pc, bin_fl_pc, bin_fle_pc, _ = lcbin(time=bin_phs, flux=bin_fl, binwidth=1/nos_bin_pc) _, bin_res_pc, bin_reserr_pc, _ = lcbin(time=bin_phs, flux=bin_res*1e6, binwidth=1/nos_bin_pc) # Plotting them ax3.errorbar(bin_phs_pc, bin_fl_pc, yerr=bin_fle_pc, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100) ax4.errorbar(bin_phs_pc, bin_res_pc, yerr=bin_reserr_pc, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100) ax4.set_ylim(np.nanmedian(bin_res_pc)-5*pipe_mad(bin_res_pc), np.nanmedian(bin_res_pc)+5*pipe_mad(bin_res_pc)) # ----------------------------------------------------------------------- # # Now, we want to plot the planetary model # if highres = False # # ----------------------------------------------------------------------- if not highres: # First, collecting all models all_models_for_plotting = all_models_for_plotting[idx_phs_bin] all_l68_mods_plotting, all_u68_mods_plotting = all_l68_mods_plotting[idx_phs_bin], all_u68_mods_plotting[idx_phs_bin] # --------------------------------------------------- # For transit/occultation # --------------------------------------------------- ## ----------- We can now plot the models (detrended and quantile models) ax1.plot(bin_phs, all_models_for_plotting*ppt, color='navy', lw=2.5, zorder=50) if quantile_models: ax1.fill_between(bin_phs, y1=all_l68_mods_plotting*ppt, y2=all_u68_mods_plotting*ppt, color='orangered', alpha=0.5, zorder=25) # --------------------------------------------------- # For phase curve # --------------------------------------------------- if phasecurve: ## ----------- We can now plot the models (detrended and quantile models) ax3.plot(bin_phs, all_models_for_plotting, color='navy', lw=2.5, zorder=50) if quantile_models: ax3.fill_between(bin_phs, y1=all_l68_mods_plotting, y2=all_u68_mods_plotting, color='orangered', alpha=0.5, zorder=25) if one_plot: # --------------------------------------------------- # Figure labels etc. # --------------------------------------------------- # ------------------------ #. Transits # ------------------------ ## First, removing the x-axis labels of top panels ax1.xaxis.set_major_formatter(plt.NullFormatter()) # x-and y-labels if not phasecurve: if not eclipse: ax1.set_ylabel('Normalised flux') else: ax1.set_ylabel('Normalised flux [ppm]') ax2.set_ylabel('Residuals [ppm]') else: ax1.set_ylabel('Normalised flux [ppt]') ax2.set_ylabel('Residuals [ppt]') ax2.set_xlabel('Orbital phase') # ------------------------ #. Phase curves # ------------------------ if phasecurve: ## Removing x-axis labels of top panels ax3.xaxis.set_major_formatter(plt.NullFormatter()) ## Now, we will switch the y-axis labels to right side for phase curve panels ax3.yaxis.tick_right() ax3.tick_params(labelright=True) ax3.yaxis.set_label_position('right') ax4.yaxis.tick_right() ax4.tick_params(labelright=True) ax4.yaxis.set_label_position('right') ## x-and y-labels ax3.set_ylabel('Normalised flux [ppm]', rotation=270, labelpad=25) ax4.set_xlabel('Orbital phase') ax4.set_ylabel('Residuals [ppm]', rotation=270, labelpad=25) ## Depending on one_plot, we return the appropriate objects if not one_plot: return figs_all, axs1_all, axs2_all, axs3_all, axs4_all else: return fig, ax1, ax2, ax3, ax4
[docs] def plot_fake_allan_deviation(self, instruments=None, binmax=10, method='pipe', timeunit=None): """Compute and plot per-instrument noise-vs-bin-size curves. This is a thin wrapper around ``utils.fake_allan_deviation`` that computes residuals for each instrument (data minus the full model) and returns the figures, axes and numeric results for further use. Parameters ---------- instruments : list or None List of instrument names to include. If ``None``, all instruments in the juliet dataset (``self.dataset.inames_lc``) are processed. binmax : int, optional Passed to ``utils.fake_allan_deviation`` to control the maximum number of bins (default ``10``). method : {'pipe','std','rms','astropy'}, optional Method used to estimate the scatter of the binned residuals: - 'pipe' : use ``pipe_mad`` (median absolute differences estimator) - 'std' : use ``np.nanstd`` - 'rms' : use the root-mean-square (``rms`` helper) - 'astropy' : use ``astropy.stats.mad_std`` (robust MAD-based std) timeunit : str or None, optional If provided, forwarded to ``utils.fake_allan_deviation`` to force the secondary x-axis time unit; otherwise the helper chooses an appropriate unit automatically. Returns ------- figs_all : list List of matplotlib Figure objects (one per instrument). axs_all : list List of matplotlib Axes objects corresponding to the figures. binsize_all : list List of binsize arrays returned for each instrument. noise_all : list List of noise arrays (ppm) computed for each instrument. white_noise_all : list List of white-noise expectation arrays (ppm) for each instrument. """ # Let's first gather the names of all instruments (we will always do one plot per instrument) if instruments != None: if len(instruments) != len(self.dataset.inames_lc): one_plot = False else: instruments = self.dataset.inames_lc ## All stuff we can save figs_all, axs_all = [], [] binsize_all, noise_all, white_noise_all = [], [], [] for ins in range(len(instruments)): # Getting the data tim_ins = self.dataset.times_lc[instruments[ins]][self.idx_time_sort[instruments[ins]]] fl_ins = self.dataset.data_lc[instruments[ins]][self.idx_time_sort[instruments[ins]]] ## Median model full_model = self.models_all_ins[instruments[ins]][1][self.idx_time_sort[instruments[ins]]] residuals = fl_ins - full_model # And, now, let's plot the Allan deviation fig, ax, binsize, noise, white_noise = fake_allan_deviation(times=tim_ins, residuals=residuals, binmax=binmax, method=method, timeunit=timeunit, plot=True) figs_all.append(fig) axs_all.append(ax) binsize_all.append(binsize) noise_all.append(noise) white_noise_all.append(white_noise) return figs_all, axs_all, binsize_all, noise_all, white_noise_all
[docs] def plot_corner(self, planet_only=False, param_list=None, save=True): """Create a corner plot of selected posterior parameters. Parameters ---------- planet_only : bool, optional If ``True``, include only planetary parameters in the corner plot. If ``False`` (default), include instrumental and noise parameters as well when available. param_list : list of str or None, optional If provided, a list of parameter names to include in the corner plot. If ``None`` (default), all parameters are included (subject to the ``planet_only`` filter). save : bool, optional If ``True`` (default), save the generated figure to ``<input_folder>/corner_plot.png``. Returns ------- fig : matplotlib.figure.Figure The figure produced by ``utils.corner_plot`` containing the marginal and pairwise plots for the selected parameters. """ # Access the posteriors post_samps = self.res.posteriors['posterior_samples'] # Arrays for posteriors and labels posteriors, labels, titles = [], [], [] for i in post_samps.keys(): if param_list is not None and i not in param_list: continue par = i.split('_') if ( 'p1' in par ) or ( 'p2' in par ) or ( 'p3' in par ) or ( 'p4' in par ) or ( 'q1' in par ) or ( 'q2' in par ) or ( 'rho' in par ): ## Adding posteriors and labels ( this parameters are added no matter planet_only is True/False) if i[0:3] == 'P_p': ## Period parameter: the uncertainty is generally very small, so we can scale it to make it plottable per_exponent_part = np.ceil( np.log10( np.nanstd( post_samps[i] ) ) ) per_median = np.nanmedian( post_samps[i] ) per = ( post_samps[i] - per_median ) / ( 10**per_exponent_part ) ## Saving the posteriors posteriors.append( per ) labels.append( i + ' [(P $-$ ' + str(np.around(per_median,5)) + ') / $10^{' + str(int(per_exponent_part)) + '}$ d]' ) titles.append( par[0] ) elif i[0:2] == 't0': ## Same for t0, we don't need to write out the whole thing t02 = np.floor( np.nanmedian(post_samps[i]) ) posteriors.append( post_samps[i] - t02 ) labels.append( i + ' [t0 $-$ ' + str(int(t02)) + ' d]' ) titles.append( par[0] ) elif (i[0:3] == 'p_p') or (i[0:4] == 'p1_p') or (i[0:4] == 'p2_p'): ## For Rp/R*, if there are more than 1 instruments, we will label them as 'instrument_name et al.' posteriors.append( post_samps[i] ) titles.append( par[0] ) if len(i.split('_')) > 3: labels.append('_'.join(i.split('_')[0:3]) + '_et al.') else: labels.append(i) elif (i[0:2] == 'q1') or (i[0:2] == 'q2'): ## Same for limb-darkening coefficients, if there are more than 1 instruments, we will label them as 'instrument_name et al.' posteriors.append( post_samps[i] ) titles.append( par[0] ) if len(i.split('_')) > 2: labels.append('_'.join(i.split('_')[0:2]) + '_et al.') else: labels.append(i) elif ( i[0:2] == 'fp' ) or ( i[0:4] == 'C1_p' ) or ( i[0:2] == 'C2' ) or ( i[0:2] == 'D1' ) or ( i[0:2] == 'D2' ): ## For flux parameters, we will label them as 'instrument_name et al.' posteriors.append( post_samps[i] * 1e6 ) titles.append( par[0] ) if len(i.split('_')) > 3: labels.append('_'.join(i.split('_')[0:3]) + '_et al. [ppm]') else: labels.append(i + ' [ppm]') ## If there are 'sesinomega_p1' and 'secosomega_p1' parameters, we will convert them to 'e_p1' and 'omega_p1' elif ( i[0:10] == 'secosomega' ): pl_num = i.split('_')[-1] sesinomega = post_samps['sesinomega_' + pl_num] secosomega = post_samps['secosomega_' + pl_num] e_p = sesinomega**2 + secosomega**2 omega_p = np.arctan2( sesinomega, secosomega ) * 180. / np.pi ## First, add sesinomega posteriors.append( post_samps[i] ) labels.append( i ) titles.append( par[0] ) ## And now, e and w posteriors.append( e_p ) labels.append( 'e_' + pl_num ) titles.append( 'e' ) posteriors.append( omega_p ) labels.append( r'$\omega$_' + pl_num + ' [deg]' ) titles.append( r'$\omega$' ) else: ## Else posteriors.append( post_samps[i] ) if len(i.split('_')) > 3: labels.append('_'.join(i.split('_')[0:3]) + '_et al.') else: labels.append(i) titles.append( par[0] ) else: ## Rest of the parameters are only added if planet_only is False if not planet_only: if ( i[0:7] != 'unnamed' ) and ( i[0:7] != 'loglike' ): posteriors.append( post_samps[i] ) if len(i.split('_')) > 3: labels.append('_'.join(i.split('_')[0:3]) + '_et al.') else: labels.append(i) titles.append( par[0] ) # ------------------------------------------ # Now, we can make the corner plot # ------------------------------------------ ## Reducing fontsize specially for this plot from matplotlib import rcParams rcParams['font.size'] = 10. rcParams['axes.labelsize'] = 'medium' rcParams['xtick.labelsize'] = 'medium' rcParams['ytick.labelsize'] = 'medium' rcParams['axes.titlesize'] = 'large' fig = corner_plot(samples=posteriors, labels=labels, titles=titles) if save: plt.savefig(self.input_folder + '/corner_plot.png', dpi=250) return fig
[docs] def plot_gp(self, instruments=None, highres=False, one_plot=False, figsize=(16/1.5, 9/1.5), pycheops_binning=False, xlabel='GP regressor', robust_errors=False, plot_errorbars=False, parameter_vector=None, nos_bin=20): """Plot the Gaussian Process component(s) from the fitted `juliet` model. This routine computes and plots the GP model (and binned data) for one or more instruments present in the current `juliet` dataset. Parameters ---------- instruments : list or None List of instrument names to plot. If ``None``, all instruments for which GP regressors exist in ``self.dataset.GP_lc_arguments`` are plotted. highres : bool, optional If ``True``, compute a high-resolution GP prediction (calls ``compute_gp_model``). Default is ``False``. one_plot : bool, optional If ``True``, combine all instruments into a single figure and return a single ``(fig, axs)``. If ``False``, return lists of figures and axes (one per instrument). Default ``False``. figsize : tuple, optional Figure size passed to ``matplotlib``. Default ``(16/1.5, 9/1.5)``. pycheops_binning : bool, optional Use ``pycheops`` produced binning when set to ``True``. Default ``False``. xlabel : str, optional X-axis label for the GP regressor. Default ``'GP regressor'``. robust_errors : bool, optional If ``True``, compute GP uncertainties using ``compute_gp_model`` instead of using ``juliet`` precomputed errors. plot_errorbars : bool, optional If ``True``, plot error bars on the GP data points. Default ``False``. parameter_vector : array-like or None, optional If provided, passed to the GP parameter setter before computing predictions (useful for evaluating GP at different parameter values). nos_bin : int, optional Approximate number of bins to use when computing binned data per instrument. Default ``20``. Returns ------- (fig, axs) or (fig_all, axs_all) If ``one_plot`` is ``True``, returns a single tuple ``(fig, axs)`` for the combined figure. Otherwise returns ``(fig_all, axs_all)`` where each is a list containing one Figure/Axis per instrument. """ # ------------------------------------------- # List of all instruments # ------------------------------------------- ## If instruments=None, we will generate one plot per instruments for all instruments for which GP fitting is done if instruments != None: ## Let's check if the provided instruments indeed have GP fitting for ins in range(len(instruments)): if instruments[ins] not in self.dataset.GP_lc_arguments.keys(): raise ValueError('GP fitting is not found for instrument ' + instruments[ins]) else: instruments = [i for i in self.dataset.GP_lc_arguments.keys()] # ------------------------------------------- # Computing detrended data and model # ------------------------------------------- self.detrend_data(phmin=0.8, instruments=instruments) self.detrend_model(instruments=instruments, phmin=0.8, highres=False) # ------------------------------------------- # Let's now compute the residuals # needed to plot the GP model # The total juliet model is: # y = T*F1 + F2 + GP + LM # => GP = y - T*F1 - F2 - LM # ------------------------------------------- gp_model_regressor, self.gp_data = {}, {} gp_med_model, gp_up68, gp_lo68 = {}, {}, {} for i in range(len(instruments)): # We note here that every array should be sorted according to GP regressors (e.g., time or roll angle) # We will keep it that way -- everything sorted according GP regressors ## -- We also need to compute linear model, if there is any if instruments[i] in self.dataset.lm_lc_arguments.keys(): linear_model = self.models_all_ins[instruments[i]][-1]['lm'] else: linear_model = 0. # ------------------------------------------- # GP data: i.e., data fitted to GP model # ------------------------------------------- self.gp_data[instruments[i]] = self.dataset.data_lc[instruments[i]] - ( self.F1[instruments[i]] * self.planet_only_models[instruments[i]][1] ) -\ self.F2[instruments[i]] - linear_model # ------------------------------------------- # And GP model -- we need to compute them # ------------------------------------------- if not highres: gp_model_regressor[instruments[i]] = np.copy( self.dataset.GP_lc_arguments[instruments[i]][:, 0] ) gp_med_model[instruments[i]] = self.all_mods_ins[instruments[i]]['GP'] if not robust_errors: gp_up68[instruments[i]], gp_lo68[instruments[i]] = self.all_mods_ins[instruments[i]]['GP_uerror'], self.all_mods_ins[instruments[i]]['GP_lerror'] else: ## In case of robust errors, we need to compute errors from ``compute_gp_model`` function gp_quantiles = self.compute_gp_model(instrument=instruments[i], highres=False, parameter_vector=parameter_vector) gp_up68[instruments[i]], gp_lo68[instruments[i]] = gp_quantiles[1,:], gp_quantiles[2,:] else: # High-resolution GP regressor per instrument predict_time = np.linspace( np.min(self.dataset.GP_lc_arguments[instruments[i]][:, 0]),\ np.max(self.dataset.GP_lc_arguments[instruments[i]][:, 0]), 10000 ) # ------ For highres models, we always need to compute models from ``compute_gp_model`` function gp_quantiles = self.compute_gp_model(instrument=instruments[i], highres=True, pred_time=predict_time, parameter_vector=parameter_vector) gp_model_regressor[instruments[i]] = predict_time gp_med_model[instruments[i]] = gp_quantiles[0,:] gp_up68[instruments[i]], gp_lo68[instruments[i]] = gp_quantiles[1,:], gp_quantiles[2,:] # Now, I think, we have everything -- so, let's plot! # ------------------------------------------- # Starting figure code: # one plot: do plt.subplots() here # ------------------------------------------- if one_plot: fig, axs = plt.subplots(figsize=figsize) # If there is one plot, then we also need to collect all data to compute binned data bin_gp_reg, bin_resids, bin_errors = np.array([]), np.array([]), np.array([]) # Also, in case we don't have highres model, we want to save all models before plotting all_regressors_for_plotting, all_models_for_plotting = np.array([]), np.array([]) all_u68_mods_plotting, all_l68_mods_plotting = np.array([]), np.array([]) else: # We need a list to save all fig and axs objects if we are planning more than one plot fig_all, axs_all = [], [] for i in range(len(instruments)): # ------------------------------------------- # If not one plot, we need to create # fig and axs object inside for loop # ------------------------------------------- if not one_plot: fig, axs = plt.subplots(figsize=figsize) # Plotting the "raw" data errorbars = self.detrended_errs[instruments[i]]*1e6 if plot_errorbars else None alp = 0.3 if plot_errorbars else 0.8 axs.errorbar(self.dataset.GP_lc_arguments[instruments[i]][:,0], self.gp_data[instruments[i]]*1e6, yerr=errorbars, fmt='.', elinewidth=0.3, markersize=1.5, alpha=alp, color='dodgerblue', zorder=1) # ------------------------------------------ # Now, we can plot the bin data and # quantile models here, if there is # no one plot, else, just save everything # to one big array, to plot outside # the for loop. We can also do # the label thing here. # ------------------------------------------ if not one_plot: # First bin-data if not pycheops_binning: nbin = int( len( self.gp_data[instruments[i]] ) / nos_bin ) bin_tim, bin_fl, bin_fle = juliet.utils.bin_data(x=self.dataset.GP_lc_arguments[instruments[i]][:,0], y=self.gp_data[instruments[i]]*1e6, yerr=self.detrended_errs[instruments[i]]*1e6, n_bin=nbin) else: binwid = np.ptp( self.dataset.GP_lc_arguments[instruments[i]][:,0] ) / nos_bin bin_tim, bin_fl, bin_fle, _ = lcbin(time=self.dataset.GP_lc_arguments[instruments[i]][:,0], flux=self.gp_data[instruments[i]]*1e6, binwidth=binwid) axs.errorbar(bin_tim, bin_fl, yerr=bin_fle, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100) # And now models ## If we don't need highres model, we can simply plot what we have ## Else, we need to create highres model first axs.plot(gp_model_regressor[instruments[i]], gp_med_model[instruments[i]]*1e6, color='navy', lw=1.5, zorder=50) # If we are making one plot per instrument (i.e., one_plot=False), we can simply plot quantile models right here axs.fill_between(gp_model_regressor[instruments[i]], y1=gp_lo68[instruments[i]]*1e6, y2=gp_up68[instruments[i]]*1e6, color='orangered', alpha=0.5, zorder=25) # Labels, limits axs.set_xlabel(xlabel) axs.set_ylabel('Relative flux [ppm]') axs.set_xlim([ np.min(self.dataset.GP_lc_arguments[instruments[i]][:,0]), np.max(self.dataset.GP_lc_arguments[instruments[i]][:,0]) ]) axs.set_ylim([ np.nanmedian(bin_fl)-10*pipe_mad(bin_fl), np.nanmedian(bin_fl)+10*pipe_mad(bin_fl) ]) # Saving fig, axs to the list fig_all.append(fig) axs_all.append(axs) else: # Saving data to the big list bin_gp_reg, bin_resids = np.hstack( (bin_gp_reg, self.dataset.GP_lc_arguments[instruments[i]][:,0]) ), np.hstack( (bin_resids, self.gp_data[instruments[i]]*1e6) ) bin_errors = np.hstack( (bin_errors, self.detrended_errs[instruments[i]]*1e6) ) # Saving models to the list all_regressors_for_plotting = np.hstack( (all_regressors_for_plotting, gp_model_regressor[instruments[i]]) ) all_models_for_plotting = np.hstack( (all_models_for_plotting, gp_med_model[instruments[i]]*1e6) ) ## Let's first save the quantile models all_l68_mods_plotting = np.hstack( (all_l68_mods_plotting, gp_lo68[instruments[i]]*1e6) ) all_u68_mods_plotting = np.hstack( (all_u68_mods_plotting, gp_up68[instruments[i]]*1e6) ) # ------------------------------------------ # If one plot, we can now plot the # bin data, quantile models, labels # etc. now, outside of the for loop # ------------------------------------------ if one_plot: # First, bin data if not pycheops_binning: nbin = int( len( bin_gp_reg ) / nos_bin / len(instruments) ) bin_tim, bin_fl, bin_fle = juliet.utils.bin_data(x=bin_gp_reg, y=bin_resids, yerr=bin_errors, n_bin=nbin) else: binwid = np.ptp( bin_gp_reg ) / nos_bin / len(instruments) bin_tim, bin_fl, bin_fle, _ = lcbin(time=bin_gp_reg, flux=bin_resids, binwidth=binwid) axs.errorbar(bin_tim, bin_fl, yerr=bin_fle, fmt='o', c='navy', elinewidth=2, capthick=2, capsize=3, mfc='white', zorder=100) if highres: # ------------------------------------------ # One plot _and_ highres requires a # special treatment # ------------------------------------------ ## -- Okay, we need one plot, in high resolution, i.e., we need to call a function to do this. idx_gp_reg_sort = np.argsort(bin_gp_reg) pred_time = np.linspace(np.min(bin_gp_reg[idx_gp_reg_sort]), np.max(bin_gp_reg[idx_gp_reg_sort]), 10000) gp_quantiles = self.compute_gp_model(instrument=None, highres=True, times=bin_gp_reg[idx_gp_reg_sort], pred_time=pred_time,\ resids=bin_resids[idx_gp_reg_sort]/1e6, errors=bin_errors[idx_gp_reg_sort]/1e6,\ parameter_vector=parameter_vector) axs.plot(pred_time, gp_quantiles[0,:]*1e6, color='navy', lw=1.5, zorder=50) axs.fill_between(pred_time, y1=gp_quantiles[2,:]*1e6, y2=gp_quantiles[1,:]*1e6, color='orangered', alpha=0.5, zorder=25) else: ## First, sorting according to regressors, and then plotting the full model idx_reg_sort = np.argsort(all_regressors_for_plotting) axs.plot(all_regressors_for_plotting[idx_reg_sort], all_models_for_plotting[idx_reg_sort], color='navy', lw=1.5, zorder=50) # We can plot qunatile models here, if highres=False axs.fill_between(all_regressors_for_plotting[idx_reg_sort], y1=all_l68_mods_plotting[idx_reg_sort], y2=all_u68_mods_plotting[idx_reg_sort], color='orangered', alpha=0.5, zorder=25) # Labels, limits axs.set_xlabel(xlabel) axs.set_ylabel('Relative flux [ppm]') axs.set_xlim([ np.min(bin_gp_reg), np.max(bin_gp_reg) ]) axs.set_ylim([ np.nanmedian(bin_fl)-10*pipe_mad(bin_fl), np.nanmedian(bin_fl)+10*pipe_mad(bin_fl) ]) if one_plot: return fig, axs else: return fig_all, axs_all
[docs] def compute_gp_model(self, instrument=None, highres=None, times=None, pred_time=None, resids=None, errors=None, parameter_vector=None): """This function computes the GP models. It accesses the GP object from the juliet files, and then do GP.compute and GP.predict.""" # If we compute GP model this way, then the errors will always be robust because we will be using gp.predict # If time is None: that means that we need model for an instrument (not one model for all instruments) # # times is not None, means that we need one model for more than one instruments # This model is always highres # This also assumes that the GP model is the same for all instruments if times is None: # When time is None, means that we haven't provided any data, so we need to extract it from self. ## Extracting times (GP regressors) from self.dataset object times = self.dataset.GP_lc_arguments[instrument][:,0] ## Similarly, extracting GP data and errors from self resids, errors = self.gp_data[instrument], self.detrended_errs[instrument] if highres: # Model for one instrument, but still highres model pred_time = np.linspace(np.min(times), np.max(times), 10000) else: # Not highres model pred_time = np.copy(times) else: # We don't need any data in this case, since everything is provided, # but we still need a name of an instrument to access GP object instrument = [i for i in self.dataset.GP_lc_arguments.keys()][0] if parameter_vector is not None: self.dataset.lc_options[instrument]['noise_model'].GP.set_parameter_vector(parameter_vector) # GP.compute self.dataset.lc_options[instrument]['noise_model'].GP.compute(t=times, yerr=errors) # And GP prediction gp_mean, gp_var = self.dataset.lc_options[instrument]['noise_model'].GP.predict(y=resids, t=pred_time, return_var=True) gp_std = np.sqrt(gp_var) gp_quantiles = np.zeros( (3, len(gp_mean)) ) gp_quantiles[0, :] = gp_mean gp_quantiles[1, :], gp_quantiles[2, :] = gp_mean + gp_std, gp_mean - gp_std return gp_quantiles
[docs] class ApPhoto(object): """Helper for aperture photometry on image time-series. This class encapsulates common operations required for aperture photometry on a data cube of frames (shape ``(N_frames, Ny, Nx)``), including centroid estimation, aperture and sky-mask construction, simple aperture flux extraction, and pixel-level decorrelation (PLD) basis construction. Parameters ---------- times : array_like 1D array of time stamps corresponding to each frame. frames : ndarray 3D data cube with shape ``(N_frames, Ny, Nx)`` containing image pixel values for each frame. errors : ndarray Per-pixel uncertainties with the same shape as ``frames``. badpix : ndarray 2D bad-pixel mask for a single frame (shape ``(Ny, Nx)``). A value of zero indicates a bad/ignored pixel; non-zero indicates a usable pixel. aprad : float or None, optional Aperture radius in pixels for circular aperture photometry. Used by :meth:`aperture_mask` and :meth:`simple_aperture_photometry`. Ignored when ``brightpix`` is True. Default is ``None``. sky_rad1 : float or None, optional Inner radius (pixels) of the sky annulus. Used by :meth:`sky_mask` and :meth:`simple_aperture_photometry` for background estimation. If omitted and ``brightpix`` is False, an empty (zero) mask is returned. Default is ``None``. sky_rad2 : float or None, optional Outer radius (pixels) of the sky annulus. Used by :meth:`sky_mask` and :meth:`simple_aperture_photometry` for background estimation. Default is ``None``. brightpix : bool, optional If ``True``, select ``nos_brightest`` brightest pixels to form an aperture instead of circular aperture. Used by :meth:`aperture_mask` and :meth:`sky_mask`. Default ``False``. nos_brightest : int, optional Number of brightest pixels to include in aperture when ``brightpix=True``. Default ``12``. nos_faintest : int, optional Number of faintest pixels to include in background aperture when it is not None and ``brightpix=True``. When None, all pixels that are not in aperture are included. Default is ``None``. minmax : dict or None, optional Optional spatial bounds for brightest pixel selection with keys 'rmin', 'rmax', 'cmin', 'cmax'. Default is ``None``. Attributes ---------- times, frames, errors, badpix : as above aprad, sky_rad1, sky_rad2, brightpix, nos_brightest, minmax : as above cen_r, cen_c : ndarray Per-frame row/column centroid positions (populated by :meth:`find_center`). ap_bool_mask_pld : ndarray 2D aperture boolean mask used by PLD routines. Psum, Phat, V, eigenvalues, PCA : ndarray Intermediate PLD quantities (pixel-sum, normalized fractions, PCA eigenvectors/values and projected time-series) populated by :meth:`pixel_level_decorrelation`. Main methods ------------ identify_crays(clip, niters) Flag cosmic-ray affected pixels and update ``badpix``. replace_nan(max_iter) Iteratively fill NaNs in ``frames`` from neighboring pixels. find_center(rmin,rmax,cmin,cmax) Compute center-of-flux centroids per frame. simple_aperture_photometry(method, plot, ...) Extract aperture photometry using manual, photutils, or median methods. pixel_level_decorrelation(removeNan) Build normalized pixel level light curves and compute PCA basis for PLD. Notes ----- The class stores both raw inputs and intermediate results used by subsequent calls; many methods modify the instance in-place and also return results for convenience. """ def __init__(self, times, frames, errors, badpix, aprad=None, sky_rad1=None, sky_rad2=None, brightpix=False, nos_brightest=12, nos_faintest=None, minmax=None): # The data self.times = times self.frames = frames self.errors = errors self.badpix = badpix # The aperture information self.aprad = aprad self.sky_rad1, self.sky_rad2 = sky_rad1, sky_rad2 self.brightpix = brightpix self.nos_brightest = nos_brightest self.nos_faintest = nos_faintest self.minmax = minmax if ( self.aprad == None ) and ( not self.brightpix ): raise Exception('You need to define an aperture by providing either aperture radius (aprad)\nOr setting brightpix=True, which will form the aperture from bightest pixels.')
[docs] def identify_crays(self, clip=5, niters=5): """Identify cosmic-ray affected pixels and update the bad-pixel map by comparing each data frame with the median frame. Parameters ---------- clip : float, optional Sigma threshold used to flag cosmic-ray candidates (default ``5``). niters : int, optional Number of iterative passes to update ``self.badpix`` (default ``5``). Returns ------- badpix : ndarray Updated 2D bad-pixel mask (shape ``(Ny, Nx)``) where flagged pixels are set to zero and good pixels are one. """ # Masking bad pixels as NaN for _ in range(niters): # Flagging bad data as Nan frame_new = np.copy(self.frames) frame_new[self.badpix == 0.] = np.nan # Median frame with warnings.catch_warnings(): warnings.simplefilter('ignore', RuntimeWarning) median_frame = np.nanmedian(frame_new, axis=0) # 2D frame # Creating residuals resids = frame_new - median_frame[None,:,:] # Median and std of residuals (both are 2D frames) med_resid, std_resid = np.nanmedian(resids, axis=0), np.nanstd(resids, axis=0) limit = med_resid + (clip*std_resid) mask_cr1 = np.abs(resids) < limit[None,:,:] self.badpix = mask_cr1*self.badpix return self.badpix
[docs] def replace_nan(self, max_iter = 50): """Fill NaN entries in the data array by the mean of neighbouring pixels. On each iteration the array is rolled along each axis to build a collection of nearest-neighbour shifts; the mean of these shifted arrays is used to replace NaNs. Iteration stops when there are no remaining NaNs or when ``max_iter`` is reached. Parameters ---------- max_iter : int, optional Maximum number of fill iterations to perform (default ``50``). Returns ------- frames : ndarray The data array with NaNs replaced (also modified in-place and returned). """ shape = np.append([2*self.frames.ndim], self.frames.shape) interp_cube = np.zeros(shape) axis = tuple(range(self.frames.ndim)) shift0 = np.zeros(self.frames.ndim, int) shift0[0] = 1 shift = [] # Shift list will be [(-1, 0), (1, 0), (0, -1), (0, 1)] for n in range(self.frames.ndim): shift.append(tuple(np.roll(-shift0, n))) shift.append(tuple(np.roll(shift0, n))) for _ in range(max_iter): for n in range(2*self.frames.ndim): interp_cube[n] = np.roll(self.frames, shift[n], axis = axis) # interp_cube would be (4, data.shape[0], data.shape[1]) sized array with warnings.catch_warnings(): # with shifted position in each element (so that we can take its mean) warnings.simplefilter('ignore', RuntimeWarning) mean_data = np.nanmean(interp_cube, axis=0) self.frames[np.isnan(self.frames)] = mean_data[np.isnan(self.frames)] if np.sum(np.isnan(self.frames)) == 0: break return self.frames
[docs] def plot_median_image(self): """This function plots the median image frame Returns ------- fig : matplotlib.figure.Figure or None Figure of median image. im : matplotlib.image.AxesImage or None Image object from the median image plot. axs : ndarray or None Axes of the figure. """ fig, axs = plt.subplots( figsize=(5,5) ) im = axs.imshow( np.nanmedian(self.frames, axis=0), interpolation='none' ) axs.set_title('Median image') return fig, im, axs
[docs] def find_center(self, rmin=None, rmax=None, cmin=None, cmax=None, plot=False): """Compute center-of-flux centroids for each frame in the 3D data cube. The centroid for each frame is computed as the center of flux (i.e. sum(x*I)/sum(I)) over either the full image or an optional subimage defined by the row/column limits ``rmin,rmax,cmin,cmax``. Results are stored on the instance as ``self.cen_r`` and ``self.cen_c`` and also returned. Parameters ---------- rmin, rmax, cmin, cmax : int or None, optional Optional integer bounds (rows and columns) defining a subimage to use for the centroid calculation. If omitted, the full frame is used. plot : bool, optional If ``True``, produce a plot of centroids as a function of time (default ``False``). Returns ------- cen_r, cen_c : ndarray 1D arrays (length ``N_frames``) containing the row and column centroid positions (in pixel coordinates) for each frame. fig : matplotlib.figure.Figure or None Figure of centroids vs time, if ``plot=True``, otherwise ``None``. axs : ndarray or None Axes of the figure if produced, otherwise ``None``. """ self.cen_r, self.cen_c = np.zeros(self.frames.shape[0]), np.zeros(self.frames.shape[0]) for i in range(len(self.cen_r)): # Row is the first index, column is the second index # First find the subimage if min & max row, cols are provided if (rmin != None)&(rmax != None)&(cmin == None)&(cmax == None): subimg = self.frames[i, rmin:rmax, :] elif (rmin == None)&(rmax == None)&(cmin != None)&(cmax != None): subimg = self.frames[i, :, cmin:cmax] elif (rmin != None)&(rmax != None)&(cmin != None)&(cmax != None): subimg = self.frames[i, rmin:rmax, cmin:cmax] else: subimg = np.copy(self.frames[i,:,:]) # Row and column indices row_idx, col_idx = np.arange(subimg.shape[0]), np.arange(subimg.shape[1]) # And now the center of *subimage* cen_r_sub = np.nansum(row_idx * np.nansum(subimg, axis=1)) / np.nansum(subimg.flatten()) cen_c_sub = np.nansum(col_idx * np.nansum(subimg, axis=0)) / np.nansum(subimg.flatten()) # This was the center of subimage, let's transform that to image coordinates if (rmin != None)&(cmin == None): self.cen_r[i], self.cen_c[i] = cen_r_sub + rmin, cen_c_sub elif (rmin == None)&(cmin != None): self.cen_r[i], self.cen_c[i] = cen_r_sub, cen_c_sub + cmin elif (rmin != None)&(cmin != None): self.cen_r[i], self.cen_c[i] = cen_r_sub + rmin, cen_c_sub + cmin else: self.cen_r[i], self.cen_c[i] = cen_r_sub, cen_c_sub if plot: fig, axs = plt.subplots(ncols=1, nrows=2, figsize=(15,7), sharex=True) axs[0].plot(self.times - self.times[0], self.cen_r, 'k-', lw=1.) axs[0].set_ylabel('Row') axs[1].plot(self.times - self.times[0], self.cen_c, 'k-', lw=1.) axs[1].set_xlabel('Time (BJD)') axs[1].set_ylabel('Column') axs[1].set_xlim([ 0., np.max( self.times - self.times[0] ) ]) else: fig, axs = None, None return self.cen_r, self.cen_c, fig, axs
[docs] def aperture_mask(self, plot=False): """Build a binary aperture mask (and masked frame/error arrays). By default a circular aperture is created on the median image using the median centroid position computed from ``self.cen_r`` and ``self.cen_c``. Alternatively, when ``self.brightpix=True``, the mask is formed by selecting the ``self.nos_brightest`` brightest pixels from the median image (optionally limited by ``self.minmax`` bounds). Parameters ---------- plot : bool, optional If ``True``, produce a plot of median image with aperture plotted on the top (default ``False``). Returns ------- aper_fl_mask : ndarray ``self.frames`` multiplied by the 2D aperture boolean mask (shape ``(N_frames, Ny, Nx)``). aper_err_mask : ndarray ``self.errors`` multiplied by the 2D aperture boolean mask. ap_bool_msk : ndarray 2D binary (0/1) mask describing the aperture on a single frame (shape ``(Ny, Nx)``). fig : matplotlib.figure.Figure or None Figure of median image and aperture mask, if ``plot=True``, otherwise ``None``. im : matplotlib.image.AxesImage or None Image object from the median image plot if ``plot=True``, otherwise ``None``. axs : ndarray or None Axes of the figure if produced, otherwise ``None``. Notes ----- Uses instance attributes ``self.aprad``, ``self.brightpix``, ``self.nos_brightest``, and ``self.minmax`` to control aperture construction. """ # Median image med_img = np.nanmedian(self.frames, axis=0) # Now, creating mask ap_bool_msk = np.zeros(med_img.shape) if not self.brightpix: # Let's first find a distance array which will contain the distance of each pixels from the center idx_arr_r, idx_arr_c = np.meshgrid(np.arange(med_img.shape[0]), np.arange(med_img.shape[1])) idx_arr_r, idx_arr_c = np.transpose(idx_arr_r), np.transpose(idx_arr_c) # Both of above array would be of the dimension of the image array, and we can get row and column index by doing, # idx_arr_r[row, col] = row index and idx_arr_c[row, col] = col index # Distance array would give distace of each pixel from the center distance = np.sqrt(((idx_arr_r - np.nanmedian(self.cen_r))**2) + ((idx_arr_c - np.nanmedian(self.cen_c))**2)) ap_bool_msk[distance < self.aprad] = 1. else: # Sorting all pixels values; so that we can select first N bright pixels in the aperture med_img_flat = med_img.flatten() med_img_sorted = np.sort(med_img_flat) # minmax != None if self.minmax is not None: rmin, rmax = self.minmax['rmin'], self.minmax['rmax'] cmin, cmax = self.minmax['cmin'], self.minmax['cmax'] else: rmin, rmax = 0, 10000 cmin, cmax = 0, 10000 tot_num = True ap_pix_nos, i = 0, 0 while tot_num: if np.isnan(med_img_sorted[-1-i]): i = i + 1 continue else: ind = np.where(med_img == med_img_sorted[-1-i]) # To prevent aperture being far from the center if (ind[0][0] > rmax) or (ind[0][0] < rmin) or (ind[1][0] < cmin) or (ind[1][0] > cmax): i = i + 1 continue else: ap_bool_msk[ind[0][0], ind[1][0]] = 1. ap_pix_nos = ap_pix_nos + 1 if ap_pix_nos>self.nos_brightest: tot_num = False i = i + 1 # And, aperture mask aper_fl_mask, aper_err_mask = self.frames * ap_bool_msk[None, :, :], self.errors * ap_bool_msk[None, :, :] if plot: # Figure code fig, axs = plt.subplots(1, 2, figsize=(10,5), sharex=True, sharey=True) im = axs[0].imshow(med_img, interpolation='none') axs[0].imshow(ap_bool_msk, alpha=0.5) axs[1].imshow(ap_bool_msk, interpolation='none') axs[0].set_title('Median image + Aperture mask') axs[1].set_title('Aperture mask') else: fig, im, axs = None, None, None return aper_fl_mask, aper_err_mask, ap_bool_msk, fig, im, axs
[docs] def sky_mask(self, plot=False): """Create a sky-annulus mask (and masked frame/error arrays). When ``self.brightpix`` is False this builds a circular annulus defined by ``self.sky_rad1`` (inner radius) and ``self.sky_rad2`` (outer radius) around the median centroid position. When ``self.brightpix`` is True the function instead selects the ``self.nos_brightest`` brightest pixels (optionally limited by ``self.minmax``) to define the aperture, and rest of the pixels as background. Parameters ---------- plot : bool, optional If ``True``, produce a plot of median image with sky mask plotted on the top (default ``False``). Returns ------- sky_fl_mask : ndarray ``self.frames`` multiplied by the 2D sky boolean mask (shape ``(N_frames, Ny, Nx)``). sky_err_mask : ndarray ``self.errors`` multiplied by the 2D sky boolean mask. sky_bool_msk : ndarray 2D binary (0/1) mask describing the sky annulus on a single frame (shape ``(Ny, Nx)``). fig : matplotlib.figure.Figure or None Figure of median image and sky mask, if ``plot=True``, otherwise ``None``. im : matplotlib.image.AxesImage or None Image object from the median image plot if ``plot=True``, otherwise ``None``. axs : ndarray or None Axes of the figure if produced, otherwise ``None``. Notes ----- Uses instance attributes ``self.sky_rad1``, ``self.sky_rad2``, ``self.brightpix``, ``self.nos_brightest``, and ``self.minmax`` to control sky mask construction. """ # Median image med_img = np.nanmedian(self.frames, axis=0) # Now, creating mask sky_bool_msk = np.zeros(med_img.shape) if ( self.sky_rad1 == None ) and ( self.sky_rad2 == None ) and ( not self.brightpix ): pass else: if ( not self.brightpix ) or ( ( self.sky_rad1 != None ) and ( self.sky_rad2 != None ) ): # Let's first find a distance array which will contain the distance of each pixels from the center idx_arr_r, idx_arr_c = np.meshgrid(np.arange(med_img.shape[0]), np.arange(med_img.shape[1])) idx_arr_r, idx_arr_c = np.transpose(idx_arr_r), np.transpose(idx_arr_c) # Both of above array would be of the dimension of the image array, i.e, we can get the row and column index by doing. # idx_arr_r[row, col] = row index and idx_arr_c[row, col] = col index # Distance array would give distace of each pixel from the center distance = np.sqrt(((idx_arr_r - np.nanmedian(self.cen_r))**2) + ((idx_arr_c - np.nanmedian(self.cen_c))**2)) sky_bool_msk[(distance > self.sky_rad1)&(distance < self.sky_rad2)] = 1. else: # Sorting all pixels values; so that we can select first N bright pixels in the aperture med_img_flat = med_img.flatten() med_img_sorted = np.sort(med_img_flat) # minmax != None if self.minmax is not None: rmin, rmax = self.minmax['rmin'], self.minmax['rmax'] cmin, cmax = self.minmax['cmin'], self.minmax['cmax'] else: rmin, rmax = 0, 10000 cmin, cmax = 0, 10000 if self.nos_faintest == None: sky_bool_msk = np.ones(med_img.shape) tot_num = True ap_pix_nos, i = 0, 0 while tot_num: if np.isnan(med_img_sorted[-1-i]): i = i + 1 continue else: ind = np.where(med_img == med_img_sorted[-1-i]) # To prevent aperture being far from the center if (ind[0][0] > rmax) or (ind[0][0] < rmin) or (ind[1][0] < cmin) or (ind[1][0] > cmax): i = i + 1 continue else: sky_bool_msk[ind[0][0], ind[1][0]] = 0. ap_pix_nos = ap_pix_nos + 1 if ap_pix_nos>self.nos_brightest: tot_num = False i = i + 1 else: sky_bool_msk = np.zeros(med_img.shape) tot_num = True ap_pix_nos, i = 0, 0 while tot_num: if np.isnan(med_img_sorted[i]): i = i + 1 continue else: ind = np.where(med_img == med_img_sorted[i]) # To prevent aperture being far from the center if (ind[0][0] < rmax) and (ind[0][0] > rmin) and (ind[1][0] > cmin) and (ind[1][0] < cmax): i = i + 1 continue else: sky_bool_msk[ind[0][0], ind[1][0]] = 1. ap_pix_nos = ap_pix_nos + 1 if ap_pix_nos>self.nos_faintest: tot_num = False i = i + 1 # And, sky mask sky_fl_mask, sky_err_mask = self.frames * sky_bool_msk[None, :, :], self.errors * sky_bool_msk[None, :, :] if plot: # Figure code fig, axs = plt.subplots(1, 2, figsize=(10,5), sharex=True, sharey=True) im = axs[0].imshow(med_img, interpolation='none') axs[0].imshow(sky_bool_msk, alpha=0.5) axs[1].imshow(sky_bool_msk, interpolation='none') axs[0].set_title('Median image + Bkg mask') axs[1].set_title('Bkg mask') else: fig, im, axs = None, None, None return sky_fl_mask, sky_err_mask, sky_bool_msk, fig, im, axs
[docs] def simple_aperture_photometry(self, method='manual', plot=False, bkg_corr='mean', robust_err=False, **kwargs): """Compute aperture photometry for each frame in the data cube. The function supports three methods: - ``manual``: sums pixels inside a binary aperture mask and subtracts the estimated sky background per pixel computed from a sky annulus. - ``photutils``: uses ``photutils`` aperture classes to compute aperture sums and background statistics per frame (requires ``photutils`` to be installed). Additional keyword arguments are forwarded to the photutils helpers. - ``median``: returns the per-frame median of the aperture pixels (useful as a robust estimator or for quick diagnostics). Parameters ---------- method : {'manual','photutils','median'}, optional Photometry method to use. Default is ``'manual'``. plot : bool, optional If ``True``, produce a plot of aperture photometry, flux vs time (default ``False``). bkg_corr : str, optional The method to compute sky background flux, can be either mean or median. Default is ``mean``. robust_err : bool, optional Computes robust errors on aperture photometry by including scatter in sky background flux. Default is ``False``. **kwargs : dict Additional keyword arguments forwarded to ``photutils`` routines when ``method='photutils'`` (for example, custom aperture or statistic options). Returns ------- ap_flux : ndarray 1D array (length N_frames) with the background-subtracted aperture flux for each frame (same units as ``frames``). ap_flux_err : ndarray 1D array (length N_frames) with the estimated uncertainty on the aperture flux for each frame. tot_sky_bkg : ndarray 1D array (length N_frames) with the total sky background contribution (summed over aperture pixels) subtracted from each frame. For methods that do not estimate a sky background this will be zeros. fig : matplotlib.figure.Figure or None Figure of aperture photometry vs time, if ``plot=True``, otherwise ``None``. axs : ndarray or None Axes of the figure if produced, otherwise ``None``. Notes ----- - The function expects centroids to be available as ``self.cen_r`` and ``self.cen_c`` (one per frame) before being called. - Uses instance attributes ``self.aprad``, ``self.sky_rad1``, ``self.sky_rad2``, ``self.brightpix``, ``self.nos_brightest``, and ``self.minmax`` to control aperture and background extraction. - When using ``photutils``, this routine constructs ``CircularAperture``/``CircularAnnulus`` objects per frame and uses ``ApertureStats`` and ``aperture_photometry``; errors are estimated combining aperture sum errors and annulus statistics following standard propagation. """ if method == 'manual': # Let's first obtain sky background flux per pixel (If sky radii are not None) sky_fl_mask, sky_err_mask, sky_bool_mask, _, _, _ = self.sky_mask() # Now, the aperture flux aper_fl_mask, aper_err_mask, ap_bool_mask, _, _, _ = self.aperture_mask() ## Expression of Sky flux per pixel and sky error per pixel if np.nansum(sky_bool_mask) != 0: # --------------------- First, calculate the mean sky flux per pix if bkg_corr == 'mean': sky_flx_per_pix = np.nansum(sky_fl_mask, axis=(1,2)) / np.nansum(sky_bool_mask) elif bkg_corr == 'median': # Replacing 0s with Nan, so that they don't interfere with median computation sky_fl_mask12 = np.copy( sky_fl_mask ) sky_fl_mask12[ sky_fl_mask12 == 0. ] = np.nan sky_flx_per_pix = np.nanmedian( sky_fl_mask12, axis=(1,2) ) else: raise Exception('The background can be computed either using mean or median.') # --------------------- And, now, errors on the mean sky flux per pix sky_flx_err_per_pix = np.sqrt( np.nansum( sky_err_mask**2, axis=(1,2) ) ) / np.nansum(sky_bool_mask) if robust_err: # Replacing 0s with Nan, so that they don't interfere with std computation sky_fl_mask12 = np.copy( sky_fl_mask ) sky_fl_mask12[ sky_fl_mask12 == 0. ] = np.nan sky_flx_err_per_pix = np.sqrt( sky_flx_err_per_pix**2 + ( np.nanstd( sky_fl_mask12, axis=(1,2) ) )**2 ) else: sky_flx_per_pix, sky_flx_err_per_pix = 0., 0. tot_sky_bkg = np.nansum(ap_bool_mask)*sky_flx_per_pix ap_flux = np.nansum(aper_fl_mask, axis=(1,2)) - tot_sky_bkg ap_flux_err = np.sqrt( np.nansum( aper_err_mask**2, axis=(1,2) ) + ( (np.nansum(ap_bool_mask) * sky_flx_err_per_pix)**2 ) ) elif method == 'photutils': ap_flux, ap_flux_err, tot_sky_bkg = np.zeros( self.frames.shape[0] ), np.zeros( self.frames.shape[0] ), np.zeros( self.frames.shape[0] ) for t in range(len(ap_flux)): # First let's perform the background subtraction if (self.sky_rad1 == None) and (self.sky_rad2 == None): bkg_mean, bkg_std = 0., 0. else: sky_aper = CircularAnnulus((int(self.cen_c[t]), int(self.cen_r[t])), r_in=self.sky_rad1, r_out=self.sky_rad2) sky_aperstats = ApertureStats(self.frames[t,:,:], sky_aper, **kwargs) if bkg_corr == 'mean': bkg_mean, bkg_std = sky_aperstats.mean, sky_aperstats.std # Mean sky background per pixel elif bkg_corr == 'median': bkg_mean, bkg_std = sky_aperstats.median, sky_aperstats.std # Mean sky background per pixel else: raise Exception('The background can be computed either using mean or median.') # Now computing the aperture flux circ_aper = CircularAperture((self.cen_c[t], self.cen_r[t]), r=self.aprad) ap_phot = aphot(data=self.frames[t,:,:], apertures=circ_aper, error=self.errors[t,:,:], **kwargs) total_sky_bkg = bkg_mean * circ_aper.area_overlap(self.frames[t,:,:], **kwargs) phot_bkgsub = ap_phot['aperture_sum'] - total_sky_bkg # Background subtraction ## Error estimation in background subtracted photometry phot_bkgsub_err = np.sqrt( (ap_phot['aperture_sum_err']**2) + ( (circ_aper.area_overlap(self.frames[t,:,:], **kwargs) * bkg_std)**2 ) ) ## Results ap_flux[t], ap_flux_err[t] = phot_bkgsub[0], phot_bkgsub_err[0] tot_sky_bkg[t] = total_sky_bkg elif method == 'median': aper_fl_mask, aper_err_mask, ap_bool_mask, _, _, _ = self.aperture_mask() tot_sky_bkg = np.zeros(aper_fl_mask.shape[0]) aper_fl_mask[aper_fl_mask == 0.] = np.nan ap_flux, ap_flux_err = np.nanmedian(aper_fl_mask, axis=(1,2)), np.sqrt( np.nanmedian( aper_fl_mask, axis=(1,2) ) ) else: raise Exception('Please enter correct method...\nMethod can either be manual, photutils, or median.') if plot: fig, axs = plt.subplots() axs.errorbar(self.times - self.times[0], ap_flux/np.nanmedian(ap_flux), yerr=ap_flux_err/np.nanmedian(ap_flux), fmt='.', color='dodgerblue') axs.set_xlim( [0., np.max(self.times - self.times[0]) ] ) axs.set_xlabel( 'Time [BJD - {:.2f}]'.format(self.times[0]) ) axs.set_ylabel('Relative flux') else: fig, axs = None, None return ap_flux, ap_flux_err, tot_sky_bkg, fig, axs
[docs] def growth_function(self, rmin, rmax, noise='pipe', plot=False, **kwargs): """Compute the growth curve (flux vs aperture radius) and noise. Parameters ---------- rmin : int Minimum aperture radius (inclusive) in pixels. rmax : int Maximum aperture radius (exclusive) in pixels. noise : {'pipe','std','rms','astropy'}, optional Method used to estimate scatter for each aperture radius. Default is 'pipe' (uses ``utils.pipe_mad``). plot : bool, optional If ``True``, produce a plot of the growth function (default ``False``). **kwargs : dict Additional keyword arguments forwarded to ``simple_aperture_photometry`` (for example sky annulus parameters). Returns ------- ap_flux_radii : ndarray Median aperture flux for each radius in the tested range. noise_radii : ndarray Estimated noise metric (in ppm) for each radius. fig : matplotlib.figure.Figure or None Figure of the growth function, if ``plot=True``, otherwise ``None``. axs : ndarray or None Axes of the figure if produced, otherwise ``None``. """ radii = np.arange(rmin, rmax, 1) ap_flux_radii, noise_radii = np.zeros(len(radii)), np.zeros(len(radii)) # Choosing the method to compute noise if noise == 'pipe': noise_func = pipe_mad elif noise == 'std': noise_func = np.nanstd elif noise == 'rms': noise_func = rms elif noise == 'astropy': noise_func = lambda x: mad_std(x, ignore_nan=True) else: raise ValueError("Method should be one of 'pipe', 'std', 'rms', or 'astropy'.") for r in tqdm(range(len(radii))): if not self.brightpix: self.aprad = radii[r] else: self.nos_brightest = radii[r] ap_fl, _, _, _, _ = self.simple_aperture_photometry(**kwargs) ap_flux_radii[r] = np.nanmedian( ap_fl ) noise_radii[r] = noise_func( ap_fl / np.nanmedian( ap_fl ) ) * 1e6 if plot: fig, axs1 = plt.subplots() color1 = 'orangered' axs1.plot(radii, ap_flux_radii, color=color1) axs1.set_xlabel('Radius') axs1.set_ylabel('Growth function', color=color1) axs1.tick_params(axis='y', which='both', color=color1, labelcolor=color1) color2 = 'royalblue' axs2 = axs1.twinx() axs2.plot(radii, noise_radii, color=color2) axs2.axvline(radii[np.argmin(noise_radii)], color='dimgrey', ls='--', lw=1.) axs2.set_ylabel('Median Absolute Deviation [ppm]', color=color2, rotation=270, labelpad=25) axs2.tick_params(axis='y', which='both', color=color2, labelcolor=color2) axs2.spines['right'].set_color(color2) axs2.spines['left'].set_color(color1) axs1.set_xlim([ np.min(radii), np.max(radii) ]) fig.tight_layout() else: fig, axs1, axs2 = None, None, None return ap_flux_radii, noise_radii, fig, [axs1, axs2]
[docs] def pixel_level_decorrelation(self, removeNan=False): """Prepare pixel-level decorrelation (PLD) basis functions. The method builds the normalized pixel-level light curves (Phat) inside the chosen aperture and performs a PCA (principal componene analysis) on them to produce a set of orthogonal basis vectors (``self.PCA``) with singular values and eigenvectors stored on the instance. Parameters ---------- removeNan : bool, optional If ``True``, remove frames that contain NaNs in the basis before performing PCA. Returns ------- V : ndarray Matrix of eigenvectors from the PCA (shape: n_components, n_pixels). eigenvalues : ndarray Eigenvalues corresponding to each PCA component. PCA : ndarray Projected PCA time-series (shape: n_components, n_frames). """ _, _, self.ap_bool_mask_pld, _, _, _ = self.aperture_mask() sky_fl, _, sky_bool, _, _, _ = self.sky_mask() if np.nansum(sky_bool) == 0.: sky_bkg_per_pix = 0. else: sky_bkg_per_pix = np.nansum(sky_fl, axis=(1,2)) / np.nansum(sky_bool) self.idxr, self.idxc = np.where(self.ap_bool_mask_pld == 1) pixel_fluxes = np.zeros( ( self.frames.shape[0], int(np.sum(self.ap_bool_mask_pld)) ) ) for r in range(len(self.idxr)): pixel_fluxes[:,r] = self.frames[:, int(self.idxr[r]), int(self.idxc[r])] - sky_bkg_per_pix # Calculating Phat self.Psum = np.nansum( pixel_fluxes, axis=1 ) self.Phat = pixel_fluxes / self.Psum[:, None] if removeNan: id0, _ = np.where(np.isnan(self.Phat)) self.idx_nan = np.ones(self.Phat.shape[0], dtype=bool) self.idx_nan[id0] = False self.Phat = self.Phat[self.idx_nan, :] self.times = self.times[self.idx_nan] self.Psum = self.Psum[self.idx_nan] self.idx_nan = np.ones(self.Phat.shape[0], dtype=bool) id0 = np.where( self.Psum == 0. ) self.idx_nan[id0] = False self.Phat, self.Psum = self.Phat[self.idx_nan, :], self.Psum[self.idx_nan] self.times = self.times[self.idx_nan] else: self.idx_nan = np.ones(self.Phat.shape[0], dtype=bool) self.V, self.eigenvalues, self.PCA = classic_PCA(self.Phat.T) return self.V, self.eigenvalues, self.PCA
[docs] def plot_correlation_matrices(self): """Plot correlation matrices before and after PCA on pixel-level light curves. Returns ------- fig : matplotlib.figure.Figure Figure containing the two correlation matrix subplots. axs : ndarray Array of two Axes objects for the before/after PCA plots. im_list : list List with the two AxesImage objects (useful for colorbars). """ # Calculating PCA correlation matrix CorrelationMatrix = np.abs(np.corrcoef(self.Phat.T)) PCACorrelationMatrix = np.abs(np.corrcoef(self.PCA)) from matplotlib import rcParams rcParams['xtick.direction'] = 'out' rcParams['ytick.direction'] = 'out' # Actual figure code fig, axs = plt.subplots(1, 2, figsize=(10,5), sharex=True, sharey=True) im1 = axs[0].imshow(CorrelationMatrix, cmap='magma', vmin=0, vmax=1) im2 = axs[1].imshow(PCACorrelationMatrix, cmap='magma', vmin=0, vmax=1) axs[0].set_xlabel('Element $i$') axs[1].set_xlabel('Element $i$') axs[0].set_ylabel('Element $j$') axs[0].set_title('Before PCA') axs[1].set_title('After PCA') fig.subplots_adjust(right=0.8) cbar_ax = fig.add_axes([0.85, 0.1, 0.015, 0.75]) fig.colorbar(im2, cax=cbar_ax) rcParams['xtick.direction'] = 'in' rcParams['ytick.direction'] = 'in' return fig, axs, [im1, im2]
[docs] def plot_eigenvectors(self): """Visualize the first 10 PCA eigenvectors as spatial maps. Returns ------- fig : matplotlib.figure.Figure Figure containing the eigenvector images. axs : ndarray Array of Axes for each eigenvector subplot. """ comps = 0 fig, axs = plt.subplots(nrows=2, ncols=5, figsize=(15, 5), sharex=True, sharey=True) for r in range(axs.shape[0]): for c in range(axs.shape[1]): eg_vec = np.zeros(self.ap_bool_mask_pld.shape) for k in range(len(self.idxr)): eg_vec[self.idxr[k], self.idxc[k]] = self.V[comps, k] im = axs[r,c].imshow(eg_vec, interpolation='none', cmap='plasma') im.set_clim([-0.22, 0.22]) axs[r,c].set_xlim([np.min(self.idxc)-3, np.max(self.idxc)+3]) axs[r,c].set_ylim([np.min(self.idxr)-3, np.max(self.idxr)+3]) axs[r,c].text(np.max(self.idxc)+1.5, np.max(self.idxr)+2, comps+1, fontweight='bold')#, backgroundcolor='white') comps = comps + 1 fig.suptitle('First 10 eigenvectors from PCA analysis', fontsize=16) return fig, axs
[docs] def plot_pcs(self, nmax=10): """Plot the PC time-series (principal components) up to `nmax`. Parameters ---------- nmax : int, optional Number of principal components to plot (default 10). Returns ------- fig : matplotlib.figure.Figure Figure containing the stacked PC plots. axs : ndarray Array of Axes objects, one per plotted PC. """ fig, axs = plt.subplots(nrows=int(nmax), ncols=1, figsize=(15,10), sharex=True) for i in range(int(nmax)): axs[i].plot(self.times - self.times[0], self.PCA[i,:], 'k-', lw=1.) axs[i].set_ylabel('PC ' + str(i+1)) med, std = np.nanmedian(self.PCA[i,:]), mad_std(self.PCA[i,:]) axs[i].set_ylim([med-4*std, med+4*std]) plt.xlabel('Time (BJD - {:.4f})'.format(self.times[0])) return fig, axs
[docs] def pld_correction(self, pc_max=5, extra_vectors=None, plot=False): """Apply PLD correction using the first `pc_max` principal components. Parameters ---------- pc_max : int, optional Number of PCA components to include in the PLD fit (default 5). extra_vectors : ndarray or None, optional Additional vectors to include in the PLD fit (shape: either 1D array of length n_frames or 2D array of shape (n_extra, n_frames)). If provided, these will be appended to the PCA basis before fitting. Default is ``None`` (no extra vectors). plot : bool, optional If ``True``, produce a diagnostic plot comparing the raw and PLD-predicted flux (default ``False``). Returns ------- Psum : ndarray Sum of pixel fluxes (SAP flux) before correction. prediction : ndarray PLD model prediction for the flux (same length as ``Psum``). fig : matplotlib.figure.Figure or None Diagnostic figure if ``plot=True``, otherwise ``None``. axs : ndarray or None Axes of the diagnostic figure if produced, otherwise ``None``. """ if extra_vectors is not None: # Ensure extra_vectors has the expected dimensionality (2D: n_extra, n_frames) extra_vectors = np.asarray(extra_vectors) if extra_vectors.ndim == 1: # Interpret a 1D array as a single extra vector extra_vectors = extra_vectors.reshape(1, -1) elif extra_vectors.ndim != 2: raise ValueError("extra_vectors must be a 1D or 2D array-like object.") if extra_vectors.shape[1] != self.PCA.shape[1]: raise ValueError("extra_vectors should have the same number of columns (frames) as PCA.") X = np.vstack([ np.ones(len(self.Psum)), self.PCA[0:pc_max,:], extra_vectors ]) else: X = np.vstack([ np.ones(len(self.Psum)), self.PCA[0:pc_max,:] ]) # Fit: result = np.linalg.lstsq(X.T, self.Psum, rcond=None) coeffs = result[0] prediction = np.dot(coeffs, X) print('>>> --- MAD of uncorrected light curve is: {:.2f} ppm'.format(pipe_mad(self.Psum/np.median(self.Psum))*1e6)) print('>>> --- MAD of PLD corrected light curve is: {:.2f} ppm'.format(pipe_mad(self.Psum/prediction)*1e6)) if plot: fig, axs = plt.subplots(2, 1, figsize=(15/1.5,10/1.5), sharex=True, sharey=True) axs[0].errorbar(self.times-self.times[0], self.Psum/np.median(self.Psum), fmt='.', c='orangered', label='SAP Flux') axs[0].plot(self.times-self.times[0], prediction/np.median(self.Psum), c='darkgreen', label='PLD prediction',alpha=0.7, zorder=10) axs[0].set_ylabel('Relative Flux') axs[0].legend(loc='best') axs[0].grid() axs[1].errorbar(self.times-self.times[0], self.Psum/prediction, fmt='.', c='cornflowerblue') axs[1].set_xlabel('Time [BJD - {:.2f}]'.format(self.times[0])) axs[1].set_ylabel('Relative Flux') axs[1].grid() axs[1].set_xlim([ 0., np.max(self.times-self.times[0]) ]) else: fig, axs = None, None return self.Psum, prediction, fig, axs
[docs] def noise_with_pcs(self, pc_max, noise='pipe', plot=False): """Estimate noise (MAD or other) as a function of included PCs. Parameters ---------- pc_max : int Maximum number of principal components to test (the method will evaluate values from 1 to ``pc_max-1``). noise : {'pipe','std','rms','astropy'}, optional Noise estimator to use (default 'pipe' which uses ``utils.pipe_mad``). plot : bool, optional If ``True``, display a plot of noise metric vs number of PCs. Returns ------- pcs_to_include : ndarray Array of numbers of PCs tested (1..pc_max-1). mad_with_pcs : ndarray Noise metric computed for each tested number of PCs. fig : matplotlib.figure.Figure or None Figure object if ``plot=True``, else ``None``. axs : matplotlib.axes.Axes or None Axis object for the plot if produced, else ``None``. """ # Choosing the method to compute noise if noise == 'pipe': noise_func = pipe_mad elif noise == 'std': noise_func = np.nanstd elif noise == 'rms': noise_func = rms elif noise == 'astropy': noise_func = lambda x: mad_std(x, ignore_nan=True) else: raise ValueError("Method should be one of 'pipe', 'std', 'rms', or 'astropy'.") pcs_to_include = np.arange(1, pc_max) mad_with_pcs = np.zeros(len(pcs_to_include)) for p in tqdm(range(len(pcs_to_include))): # First compute the prediction x_pc = np.vstack(( np.ones(len(self.Psum)), self.PCA[0:int(pcs_to_include[p]),:] )) res_pc = np.linalg.lstsq(x_pc.T, self.Psum, rcond=None) co_pc = res_pc[0] pred_pc = np.dot(co_pc, x_pc) # Computing the MAD mad_with_pcs[p] = noise_func( self.Psum / pred_pc ) * 1e6 # Number of PCs with minimum noise n_pc_of_min_scat = pcs_to_include[np.argmin(mad_with_pcs)] if plot: fig, axs = plt.subplots(figsize=(16/1.5, 9/1.5)) axs.plot(pcs_to_include, mad_with_pcs, c='orangered') axs.axvline(n_pc_of_min_scat, color='dimgrey', ls='--', lw=1.) axs.set_xlabel('Number of PCs included') axs.set_ylabel('Median Absolute Deviation [ppm]') axs.set_xlim([ np.min(pcs_to_include), np.max(pcs_to_include) ]) plt.grid() else: fig, axs = None, None return pcs_to_include, mad_with_pcs, fig, axs
[docs] class TESSData(object): """Helper to query and fetch TESS products from MAST and juliet. This lightweight utility class provides two convenience methods: ``get_lightcurves`` which delegates to ``juliet.utils.get_all_TESS_data`` (when available) to gather per-instrument light curves, and ``get_tpfs`` which queries MAST for Target Pixel Files (TPFs), downloads the products and populates simple dictionaries on the instance with the extracted time and pixel-cube data. Parameters ---------- object_name : str Target identifier accepted by MAST (for example a TIC or common object name). Attributes ---------- object_name : str The provided target name. tim_lc, fl_lc, fle_lc : dict or None Time/flux/flux-error collections populated by ``get_lightcurves`` when that method is called. out_dict_lc : object or None Optional output dictionary returned by the juliet helper. tim_tpf, fl_tpf, fle_tpf, badpix, quality : dict Dictionaries populated by :meth:`get_tpfs` containing per-sector time arrays, pixel flux cubes, pixel flux error cubes, bad-pixel masks and quality arrays respectively. """ def __init__(self, object_name): self.object_name = object_name
[docs] def get_lightcurves(self, pdc=True, save=False, pout=os.getcwd(), **kwargs): """Collect TESS light curves for ``self.object_name``. This method attempts to use ``juliet.utils.get_all_TESS_data`` to retrieve light curves. The juliet helper may return either a simple tuple ``(tim, fl, fle)`` or an (out_dict, tim, fl, fle) tuple; both cases are handled and stored on the instance. Parameters ---------- pdc : bool, optional If True request PDC-corrected fluxes when available. Default is ``True``. save : bool, optional If True the method will write ASCII files named ``LC_<object>_<inst>.dat`` to ``pout``. Default ``False``. pout : str, optional Output directory used when ``save=True`` (defaults to the current working directory). **kwargs : dict Forwarded to ``juliet.utils.get_all_TESS_data``. Returns ------- tim_lc, fl_lc, fle_lc, out_dict_lc : tuple Stored time/flux/fluxerr containers and an optional output dictionary (``out_dict_lc`` may be ``None``). """ try: self.tim_lc, self.fl_lc, self.fle_lc = juliet.utils.get_all_TESS_data(object_name=self.object_name, get_PDC=pdc, **kwargs) self.out_dict_lc = None except: self.out_dict_lc, self.tim_lc, self.fl_lc, self.fle_lc = juliet.utils.get_all_TESS_data(object_name=self.object_name, get_PDC=pdc, **kwargs) if save: name = 'PDC' if pdc else 'SAP' for ins in self.tim_lc.keys(): fname = open( pout + '/LC_' + self.object_name + '_' + ins + '_' + name + '.dat', 'w' ) for t in range( len(self.tim_lc[ins]) ): fname.write( str( self.tim_lc[ins][t] ) + '\t' + str( self.fl_lc[ins][t] ) + '\t' + str( self.fle_lc[ins][t] ) + '\n' ) fname.close() return self.tim_lc, self.fl_lc, self.fle_lc, self.out_dict_lc
[docs] def get_tpfs(self, pout=os.getcwd(), load=False, save=False, savefits=False): """Download or load TESS Target Pixel Files (TPFs) for the target. If ``load`` is False the method queries MAST for TESS time-series products for ``self.object_name``, filters suitable products (TPFs), downloads them, reads the FITS tables and populates the instance dictionaries ``tim_tpf``, ``fl_tpf``, ``fle_tpf``, ``quality`` and ``badpix``. If ``save`` is True a small pickled dictionary per sector is written to ``pout``. ``load=True`` simply reads these previously saved dictionaries. Parameters ---------- pout : str, optional Output directory used when ``save=True``. Default is the current working directory. load : bool, optional If True, attempt to load previously saved TPF pickles instead of querying/downloading from MAST. (Loading logic is not implemented in this helper; calling with ``load=True`` will bypass the MAST query branch.) save : bool, optional If True save extracted per-sector dictionaries as ``TPF_<object>_<sector>.pkl`` in ``pout``. Default ``False``. savefits : bool, optional If True, save the downloaded fits files in pout folder. Default is False. Returns ------- tim_tpf, fl_tpf, fle_tpf, badpix, quality : tuple The dictionaries populated on the instance containing the per-sector time arrays, pixel flux cubes, flux error cubes, quality arrays and a placeholder bad-pixel mask. """ if not load: try: obt = Observations.query_object(self.object_name, radius=0.01) except: raise Exception('The name of the object does not seem to be correct.\nPlease try again...') b = np.array([]) for j in range(len(obt['intentType'])): if obt['obs_collection'][j] == 'TESS' and obt['dataproduct_type'][j] == 'timeseries': b = np.hstack( (b, j) ) if len(b) == 0: raise Exception('No TESS timeseries data available for this target (strange, right?!!).\nTry another target...') sectors, pi_name, obsids, exptime, new_b = np.array([]), np.array([]), np.array([]), np.array([]), np.array([]) for i in range(len(b)): data1 = obt['dataURL'][int(b[i])] if data1[-9:] == 's_lc.fits': fls = data1.split('-') for j in range(len(fls)): if len(fls[j]) == 5: sec = fls[j] tic = fls[j+1] sectors = np.hstack((sectors, sec)) new_b = np.hstack((new_b, b[i])) obsids = np.hstack((obsids, obt['obsid'][int(b[i])])) pi_name = np.hstack((pi_name, obt['proposal_pi'][int(b[i])])) exptime = np.hstack((exptime, obt['t_exptime'][int(b[i])])) print('Data products found over ' + str( len(sectors) ) + ' sectors.') print('Downloading them...') disp_tic, disp_sec = [], [] # Dictionaries for saving the data in self self.tim_tpf, self.fl_tpf, self.fle_tpf, self.badpix, self.quality = {}, {}, {}, {}, {} for i in range(len(sectors)): dpr = Observations.get_product_list(obt[int(new_b[i])]) cij = 0 for j in range(len(dpr['obsID'])): if dpr['description'][j] == 'Target pixel files': cij = j tab = Observations.download_products(dpr[cij]) lpt = tab['Local Path'][0][1:] # Reading fits hdul = fits.open(os.getcwd() + lpt) hdr = hdul[0].header ## Finding TIC-id and sector ticid = int(hdr['TICID']) ticid = f"{ticid:010}" disp_tic.append(ticid) sec_tess = 'TESS' + str( hdr['SECTOR'] ) disp_sec.append(sec_tess) dta = Table.read(hdul[1]) # Creating a mask array to mask NaN idx = np.ones( dta['FLUX'].shape[0], dtype=bool ) for t in range( dta['FLUX'].shape[0] ): nan = np.where( np.isnan( dta['FLUX'][t,:,:] ) ) if len( nan[0] ) == len( dta['FLUX'][t,:,:].flatten() ): idx[t] = False self.tim_tpf[sec_tess] = dta['TIME'][idx] + hdul[1].header['BJDREFI'] self.fl_tpf[sec_tess], self.fle_tpf[sec_tess] = dta['FLUX'][idx,:,:], dta['FLUX_ERR'][idx,:,:] self.quality[sec_tess] = dta['QUALITY'][idx] self.badpix[sec_tess] = np.ones( dta['FLUX'][idx,:,:].shape, dtype=bool ) if save: data_sector = {} data_sector['TIME'] = dta['TIME'][idx] + hdul[1].header['BJDREFI'] data_sector['FLUX'], data_sector['FLUX_ERR'] = dta['FLUX'][idx,:,:], dta['FLUX_ERR'][idx,:,:] data_sector['QUALITY'] = dta['QUALITY'][idx] data_sector['BADPIX'] = np.ones( dta['FLUX'][idx,:,:].shape, dtype=bool ) ## And saving them pickle.dump( data_sector, open(pout + '/TPF_' + self.object_name + '_' + sec_tess + '.pkl','wb') ) if savefits: if not Path(pout + '/TPF_' + self.object_name + '_fits').exists(): os.mkdir(pout + '/TPF_' + self.object_name + '_fits') os.system('mv ' + os.getcwd() + lpt + ' ' + pout + '/TPF_' + self.object_name + '_fits/') # Delete the folder in the end os.system('rm -rf mastDownload') else: fnames = glob(pout + '/TPF_' + self.object_name + '_TESS*.pkl') # Dictionaries for saving the data in self self.tim_tpf, self.fl_tpf, self.fle_tpf, self.badpix, self.quality = {}, {}, {}, {}, {} for i in range(len(fnames)): data_sector = pickle.load( open( fnames[i], 'rb' ) ) ## Sector name sec_name = fnames[i].split('/')[-1].split('.')[0].split('_')[-1] ## Loading the data self.tim_tpf[sec_name] = data_sector['TIME'] self.fl_tpf[sec_name], self.fle_tpf[sec_name] = data_sector['FLUX'], data_sector['FLUX_ERR'] self.quality[sec_name] = data_sector['QUALITY'] self.badpix[sec_name] = data_sector['BADPIX']
def ApPhoto(self, sector, aprad=None, sky_rad1=None, sky_rad2=None, brightpix=False, nos_brightest=12, nos_faintest=None, minmax=None): return ApPhoto(times=self.tim_tpf[sector], frames=self.fl_tpf[sector], errors=self.fle_tpf[sector], badpix=self.badpix[sector],\ aprad=aprad, sky_rad1=sky_rad1, sky_rad2=sky_rad2, brightpix=brightpix, nos_brightest=nos_brightest, nos_faintest=nos_faintest, minmax=minmax)
[docs] class KeplerData(object): def __init__(self, object_name): self.object_name = object_name
[docs] def get_lightcurves(self, pdc=True, long_cadence=True, verbose=True, save=False, pout=os.getcwd()): """ Collect Kepler/K2 light curves for ``self.object_name``. This method uses ``astroquery`` to retrieve light curves. The function returns a simple tuple ``(tim, fl, fle)``. Parameters ---------- pdc : bool, optional If True request PDC-corrected fluxes when available. Default is ``True``. long_cadence : bool, optional If True the long cadence data will be downloaded. Default if False. verbose : bool Boolean on whether to print updates. Default is True save : bool, optional If True the method will write ASCII files named ``LC_<object>_<inst>.dat`` to ``pout``. Default ``False``. pout : str, optional Output directory used when ``save=True`` (defaults to the current working directory). """ if ('K2' in self.object_name) and (not long_cadence): raise Exception('No Short Cadence data available for K2 objects.') try: obt = Observations.query_object(self.object_name, radius=0.001) except: raise Exception('The name of the object does not seem to be correct.\nPlease try again...') # b contains indices of the timeseries observations from TESS b = np.array([]) for j in range(len(obt['intentType'])): if (obt['obs_collection'][j] == 'Kepler' or obt['obs_collection'][j] == 'K2') and obt['dataproduct_type'][j] == 'timeseries': b = np.hstack((b,j)) if len(b) == 0: raise Exception('No Kepler/K2 timeseries data available for this target.\nTry another target...') # To extract obs-id from the observation table pi_name, obsids, exptime, scad, lcad = np.array([]), np.array([]), np.array([]), np.array([]), np.array([]) for i in range(len(b)): data1 = obt['dataURL'][int(b[i])] if 'lc' in data1.split('_'): lcad = np.hstack((lcad, b[i])) if 'sc' in data1.split('_'): scad = np.hstack((scad, b[i])) if 'llc.fits' in data1.split('_'): lcad = np.hstack((lcad, b[i])) pi_nm = obt['proposal_pi'][int(b[i])] if type(pi_nm) != str: pi_nm = 'K2 Team' pi_name = np.hstack((pi_name, pi_nm)) if long_cadence: dwn_b = lcad keywd = 'Lightcurve Long Cadence' else: dwn_b = scad keywd = 'Lightcurve Short Cadence' for i in range(len(dwn_b)): obsids = np.hstack((obsids, obt['obsid'][int(dwn_b[i])])) exptime = np.hstack((exptime, obt['t_exptime'][int(dwn_b[i])])) disp_kic, disp_sec = [], [] # Directory to save the data self.tim_lc, self.fl_lc, self.fle_lc = {}, {}, {} for i in range(len(dwn_b)): dpr = Observations.get_product_list(obt[int(dwn_b[i])]) cij = [] for j in range(len(dpr['obsID'])): if keywd in dpr['description'][j]: cij.append(j) if verbose: print('Data products found over ' + str(len(cij)) + ' quarters/cycles.') print('Downloading them...') for j in range(len(cij)): sector = f"{i:02}" + f"{j:02}" + '-' + dpr['description'][cij[j]].split('- ')[1] # Downloading the data tab = Observations.download_products(dpr[cij[j]]) lpt = tab['Local Path'][0][1:] # Reading fits hdul = fits.open(os.getcwd() + lpt) hdr = hdul[0].header kicid = int(hdr['KEPLERID']) kicid = f"{kicid:010}" dta = Table.read(hdul[1]) # Available data products try: if pdc: fl = np.asarray(dta['PDCSAP_FLUX']) fle = np.asarray(dta['PDCSAP_FLUX_ERR']) else: fl = np.asarray(dta['SAP_FLUX']) fle = np.asarray(dta['SAP_FLUX_ERR']) except: continue mask = np.isfinite(fl) # Creating Mask to remove Nans bjd1 = np.asarray(dta['TIME'])[mask] + hdul[1].header['BJDREFI'] fl, fle = fl[mask], fle[mask] # Flux and Error in flux without Nans self.tim_lc[sector], self.fl_lc[sector], self.fle_lc[sector] = bjd1, fl / np.nanmedian(fl), fle / np.nanmedian(fl) disp_kic.append(kicid) disp_sec.append(sector) if verbose: print('----------------------------------------------------------------------------------------') print('Name\t\tKIC-id\t\tSector') print('----------------------------------------------------------------------------------------') for i in range(len(disp_kic)): print(self.object_name + '\t\t' + disp_kic[i] + '\t\t' + disp_sec[i]) if save: name = 'PDC' if pdc else 'SAP' for ins in self.tim_lc.keys(): fname = open( pout + '/LC_' + self.object_name + '_' + ins + '_' + name + '.dat', 'w' ) for t in range( len(self.tim_lc[ins]) ): fname.write( str( self.tim_lc[ins][t] ) + '\t' + str( self.fl_lc[ins][t] ) + '\t' + str( self.fle_lc[ins][t] ) + '\n' ) fname.close() # Deleting the data os.system('rm -rf mastDownload') return self.tim_lc, self.fl_lc, self.fle_lc
[docs] def get_tpfs(self, long_cadence=True, load=False, verbose=True, save=False, pout=os.getcwd(), savefits=False): """ Collect Kepler/K2 target pixel files for ``self.object_name``. This method uses ``astroquery`` to retrieve target pixel files. Parameters ---------- load : bool, optional If True request PDC-corrected fluxes when available. Default is ``True``. long_cadence : bool, optional If True the long cadence data will be downloaded. Default if False. verbose : bool Boolean on whether to print updates. Default is True save : bool, optional If True the method will write ASCII files named ``LC_<object>_<inst>.dat`` to ``pout``. Default ``False``. pout : str, optional Output directory used when ``save=True`` (defaults to the current working directory). """ if not load: if ('K2' in self.object_name) and (not long_cadence): raise Exception('No Short Cadence data available for K2 objects.') try: obt = Observations.query_object(self.object_name, radius=0.001) except: raise Exception('The name of the object does not seem to be correct.\nPlease try again...') # b contains indices of the timeseries observations from TESS b = np.array([]) for j in range(len(obt['intentType'])): if (obt['obs_collection'][j] == 'Kepler' or obt['obs_collection'][j] == 'K2') and obt['dataproduct_type'][j] == 'timeseries': b = np.hstack((b,j)) if len(b) == 0: raise Exception('No Kepler/K2 timeseries data available for this target.\nTry another target...') # To extract obs-id from the observation table pi_name, obsids, exptime, scad, lcad = np.array([]), np.array([]), np.array([]), np.array([]), np.array([]) for i in range(len(b)): data1 = obt['dataURL'][int(b[i])] if 'lc' in data1.split('_'): lcad = np.hstack((lcad, b[i])) if 'sc' in data1.split('_'): scad = np.hstack((scad, b[i])) if 'llc.fits' in data1.split('_'): lcad = np.hstack((lcad, b[i])) if 'slc.fits' in data1.split('_'): scad = np.hstack((scad, b[i])) pi_nm = obt['proposal_pi'][int(b[i])] if type(pi_nm) != str: pi_nm = 'K2 Team' pi_name = np.hstack((pi_name, pi_nm)) if long_cadence: dwn_b = lcad keywd = 'Target Pixel Long Cadence' else: dwn_b = scad keywd = 'Target Pixel Short Cadence' for i in range(len(dwn_b)): obsids = np.hstack((obsids, obt['obsid'][int(dwn_b[i])])) exptime = np.hstack((exptime, obt['t_exptime'][int(dwn_b[i])])) disp_kic, disp_sec = [], [] # Dictionaries to save the data self.tim_tpf, self.fl_tpf, self.fle_tpf, self.badpix, self.quality = {}, {}, {}, {}, {} for i in range(len(dwn_b)): dpr = Observations.get_product_list(obt[int(dwn_b[i])]) cij = [] for j in range(len(dpr['obsID'])): if keywd in dpr['description'][j]: cij.append(j) if verbose: print('Data products found over ' + str(len(cij)) + ' quarters/cycles.') print('Downloading them...') for j in range(len(cij)): sector = f"{i:02}" + f"{j:02}" + '-' + dpr['description'][cij[j]].split('- ')[1] # Downloading the data tab = Observations.download_products(dpr[cij[j]]) lpt = tab['Local Path'][0][1:] # Unzipping the file os.system('gunzip --keep ' + os.getcwd() + lpt) # Reading fits hdul = fits.open(os.getcwd() + lpt[:-3]) hdr = hdul[0].header kicid = int(hdr['KEPLERID']) kicid = f"{kicid:010}" dta = Table.read(hdul[1]) # Creating a mask array to mask NaN idx = np.ones( dta['FLUX'].shape[0], dtype=bool ) for t in range( dta['FLUX'].shape[0] ): nan = np.where( np.isnan( dta['FLUX'][t,:,:] ) ) if len( nan[0] ) == len( dta['FLUX'][t,:,:].flatten() ): idx[t] = False self.tim_tpf[sector] = dta['TIME'][idx] + hdul[1].header['BJDREFI'] self.fl_tpf[sector], self.fle_tpf[sector] = dta['FLUX'][idx,:,:], dta['FLUX_ERR'][idx,:,:] self.quality[sector] = dta['QUALITY'][idx] self.badpix[sector] = np.ones( dta['FLUX'][idx,:,:].shape, dtype=bool ) if save: data_sector = {} data_sector['TIME'] = dta['TIME'][idx] + hdul[1].header['BJDREFI'] data_sector['FLUX'], data_sector['FLUX_ERR'] = dta['FLUX'][idx,:,:], dta['FLUX_ERR'][idx,:,:] data_sector['QUALITY'] = dta['QUALITY'][idx] data_sector['BADPIX'] = np.ones( dta['FLUX'][idx,:,:].shape, dtype=bool ) ## And saving them pickle.dump( data_sector, open(pout + '/TPF_' + self.object_name + '_Kep' + sector + '.pkl','wb') ) disp_kic.append(kicid) disp_sec.append(sector) if savefits: if not Path(pout + '/TPF_' + self.object_name + '_fits').exists(): os.mkdir(pout + '/TPF_' + self.object_name + '_fits') os.system('mv ' + os.getcwd() + lpt + ' ' + pout + '/TPF_' + self.object_name + '_fits') os.system('mv ' + os.getcwd() + lpt[:-3] + ' ' + pout + '/TPF_' + self.object_name + '_fits') # Delete the folder in the end os.system('rm -rf mastDownload') if verbose: print('----------------------------------------------------------------------------------------') print('Name\t\tKIC-id\t\tSector') print('----------------------------------------------------------------------------------------') for i in range(len(disp_kic)): print(self.object_name + '\t\t' + disp_kic[i] + '\t\t' + disp_sec[i]) else: fnames = glob(pout + '/TPF_' + self.object_name + '_Kep*.pkl') # Dictionaries for saving the data in self self.tim_tpf, self.fl_tpf, self.fle_tpf, self.badpix, self.quality = {}, {}, {}, {}, {} for i in range(len(fnames)): data_sector = pickle.load( open( fnames[i], 'rb' ) ) ## Sector name sec_name = fnames[i].split('/')[-1].split('.')[0].split('_')[-1] ## Loading the data self.tim_tpf[sec_name] = data_sector['TIME'] self.fl_tpf[sec_name], self.fle_tpf[sec_name] = data_sector['FLUX'], data_sector['FLUX_ERR'] self.quality[sec_name] = data_sector['QUALITY'] self.badpix[sec_name] = data_sector['BADPIX']
def ApPhoto(self, sector, aprad=None, sky_rad1=None, sky_rad2=None, brightpix=False, nos_brightest=12, nos_faintest=None, minmax=None): return ApPhoto(times=self.tim_tpf[sector], frames=self.fl_tpf[sector], errors=self.fle_tpf[sector], badpix=self.badpix[sector],\ aprad=aprad, sky_rad1=sky_rad1, sky_rad2=sky_rad2, brightpix=brightpix, nos_brightest=nos_brightest, nos_faintest=nos_faintest, minmax=minmax)
[docs] class SpectroscopicLC(object): """Analyze and fit multi-wavelength time-series spectroscopic data. This class provides tools for handling spectroscopic light curves where flux is measured as a function of both time and wavelength. It supports binning spectral data into channels, fitting individual wavelength bins using ``juliet``, parallel processing of multiple channels, and visualization of spectral transit/eclipse signatures and systematics. Parameters ---------- times : ndarray 1D array of time stamps (in BJD or similar) with shape (N_frames,). lc : ndarray 2D array of flux measurements with shape (N_frames, N_wavelengths). Each row is a time frame; each column is a wavelength bin. lc_errs : ndarray 2D array of flux uncertainties with the same shape as ``lc``. wavelengths : ndarray 1D array of wavelength values (in microns or other units) corresponding to each column of ``lc``. Shape is (N_wavelengths,). gp_pars : dict or None, optional Dictionary mapping instrument names to arrays of Gaussian Process (GP) regressor values (e.g., time, roll angle). Used when fitting with GP systematics. Default is ``None``. lin_pars : dict or None, optional Dictionary mapping instrument names to 2D arrays of linear regressor values (shape: N_frames x N_regressors). Used for linear detrending. Default is ``None``. priors : callable or None, optional A callable that accepts a channel name (string) and returns a juliet-format priors for that channel. Required for fitting via :meth:`analyse_lc_parallel`. Default is ``None``. pout : str, optional Output directory for saving results, intermediate data, and pickled light curves. Default is the current working directory. Attributes ---------- times : ndarray Input time array. lc : ndarray Input 2D light-curve array. lc_errs : ndarray Input 2D error array. wavelengths : ndarray Input wavelength array. gp_pars, lin_pars : dict or None Input systematics parameters. priors : callable or None Input priors function. pout : str Output directory. col_st, col_end : ndarray Start and end column indices for each binned channel (populated by :meth:`col_spec`). spectral_lcs_dict : dict Dictionary containing binned spectral light curves and metadata, populated by :meth:`generating_lightcurves`. Keys include 'lc', 'err', 'wave', and 'wave_bin'. wav, wav_bin : ndarray Wavelength array and half-widths for each bin (populated by :meth:`plot_parameter_spectrum`). pars_med, pars_up, pars_lo : ndarray Median and upper/lower credible interval bounds on fitted parameters as a function of wavelength (populated by :meth:`plot_parameter_spectrum`). qua_white : tuple Quantiles (median, upper, lower) of the white-light parameter (populated by :meth:`plot_parameter_spectrum`). Main methods ------------ generating_lightcurves(ch_nos=None) Extract and optionally bin spectral light curves into channels. bin_lightcurve(unbinned_lc, unbinned_lc_err) Inverse-variance weighted binning of light curves along wavelength direction. white_light_lc() Compute the white-light (all-wavelength) light curve. analyse_lc_parallel(nthreads, ch_nos=None, juliet_fit_kwargs={}) End-to-end parallel analysis: bin, organize, and fit all channels. plot_parameter_spectrum(parameter, bins=None, post_size=10000, ...) Plot fitted parameter values as a function of wavelength. plot2Ddata(cmap='plasma') 2D heatmap of spectral light curves vs time. plot2D_data_model_resids(cmap='plasma', detrend=False, ...) Side-by-side 2D heatmaps of data, model, and residuals. joint_fake_allan_deviation(binmax=10, method='pipe', timeunit=None) Combined noise vs binning plot across all channels. Notes ----- - All fitting is performed using the ``juliet`` package (Espinoza et al. 2019). - Parallel processing uses Python's ``multiprocessing.Pool``. - Results are saved as pickled dictionaries and ASCII text files to ``pout``. - Methods check for existing output files and skip re-computation if files are already present (useful for resuming interrupted analyses). Examples -------- Create and analyze a spectroscopic dataset: >>> import numpy as np >>> times = np.linspace(2459000, 2459001, 100) >>> wavelengths = np.linspace(1.1, 1.7, 50) >>> lc = np.random.normal(1.0, 0.01, (100, 50)) >>> lc_errs = np.full_like(lc, 0.005) >>> def get_priors(ch_name): ... return juliet.utils.priors() >>> spec = SpectroscopicLC(times, lc, lc_errs, wavelengths, ... priors=get_priors, pout='./results') >>> spec.analyse_lc_parallel(nthreads=4, ch_nos=10) >>> fig, ax = spec.plot_parameter_spectrum('p_p1', bins=10, plot_white=True) """ def __init__(self, times, lc, lc_errs, wavelengths, gp_pars=None, lin_pars=None, priors=None, pout=os.getcwd()): # Dimensions of lc should be (nframes, ncols) self.times = times self.lc = lc self.lc_errs = lc_errs self.wavelengths = wavelengths self.gp_pars = gp_pars self.lin_pars = lin_pars self.priors = priors self.pout = pout
[docs] def col_spec(self, ch_nos): """Given a number of total number of channels, this function gives an array containing start and end column for each channel""" if ch_nos != 1: col_in_1_ch = round(self.lc.shape[1]/ch_nos) self.col_st = np.arange(0, self.lc.shape[1]-col_in_1_ch, col_in_1_ch, dtype=int) self.col_end = np.arange(0+col_in_1_ch, self.lc.shape[1], col_in_1_ch, dtype=int) else: self.col_st, self.col_end = np.array([0]), np.array([self.lc.shape[1]]) if self.col_end[-1] != self.lc.shape[1]: self.col_st = np.hstack((self.col_st, self.col_end[-1])) self.col_end = np.hstack((self.col_end, self.lc.shape[1]))
[docs] def generating_lightcurves(self, ch_nos=None): """Extract and optionally bin spectral light curves into channels. This method either bins the original light curve into ``ch_nos`` channels (using inverse-variance weighting) or uses the original wavelength resolution. Results are saved as a pickled dictionary and stored in ``self.spectral_lcs_dict``. If a pickle file for the requested binning already exists, it is loaded instead of recomputing. Parameters ---------- ch_nos : int or None, optional Number of channels to bin the light curve into. If ``None``, use native wavelength resolution (no binning). Default is ``None``. Returns ------- None Results are stored in ``self.spectral_lcs_dict`` with keys: - 'lc' : ndarray Binned light curves (shape: N_frames x N_channels). - 'err' : ndarray Binned flux errors (same shape as 'lc'). - 'wave' : ndarray Central wavelength of each bin. - 'wave_bin' : ndarray Half-width (or full width) of each wavelength bin. Notes ----- The output is pickled to ``pout/spectroscopic_lc_ch_<ch_nos>.pkl`` for future loading. """ if ch_nos == None: fname = self.pout + '/spectroscopic_lc_ch_' + str(self.lc.shape[1]) + '.pkl' else: fname = self.pout + '/spectroscopic_lc_ch_' + str(ch_nos) + '.pkl' if not Path(fname).exists(): # Columns if ch_nos != None: self.col_spec(ch_nos) # Creating spectral lc array spec_lc, spec_err_lc = np.zeros( ( self.lc.shape[0], len(self.col_st) ) ), np.zeros( ( self.lc.shape[0], len(self.col_st) ) ) wavs, wav_bin_size = np.zeros( len(self.col_st) ), np.zeros( len(self.col_st) ) for i in range(len(self.col_st)): spec_lc[:,i], spec_err_lc[:,i] = self.bin_lightcurve( self.lc[:, self.col_st[i]:self.col_end[i]], \ self.lc_errs[:, self.col_st[i]:self.col_end[i]] ) if self.col_end[i] != self.lc.shape[1]: wavs[i] = ( self.wavelengths[self.col_st[i]] + self.wavelengths[self.col_end[i]] )/2 wav_bin_size[i] = np.abs( self.wavelengths[self.col_st[i]] - self.wavelengths[self.col_end[i]] ) else: wavs[i] = ( self.wavelengths[self.col_st[i]] + self.wavelengths[self.col_end[i]-1] )/2 wav_bin_size[i] = np.abs( self.wavelengths[self.col_st[i]] - self.wavelengths[self.col_end[i]-1] ) else: spec_lc, spec_err_lc = self.lc, self.lc_errs wavs, wav_bin_size = self.wavelengths, np.append(np.diff(self.wavelengths), np.diff(self.wavelengths)[-1]) # Save the light curves self.spectral_lcs_dict = {} self.spectral_lcs_dict['lc'], self.spectral_lcs_dict['err'] = spec_lc, spec_err_lc self.spectral_lcs_dict['wave'], self.spectral_lcs_dict['wave_bin'] = wavs, wav_bin_size if ch_nos == None: pickle.dump(self.spectral_lcs_dict, open(self.pout + '/spectroscopic_lc_ch_' + str(self.lc.shape[1]) + '.pkl','wb')) else: pickle.dump(self.spectral_lcs_dict, open(self.pout + '/spectroscopic_lc_ch_' + str(ch_nos) + '.pkl','wb')) else: print('>>>> --- The spectroscopic lightcurves already exists...') print(' Loading them...') self.spectral_lcs_dict = pickle.load( open(fname, 'rb') )
[docs] def bin_lightcurve(self, unbinned_lc, unbinned_lc_err): """Perform inverse-variance weighted binning of a light curve. Each output bin is the weighted average of input values, where weights are the inverse squares of the input uncertainties. Output uncertainties are computed by propagating input uncertainties through the weighted average formula. Parameters ---------- unbinned_lc : ndarray 2D array of unbinned flux values (shape: N_frames x N_wavelengths). unbinned_lc_err : ndarray 2D array of flux uncertainties (same shape as ``unbinned_lc``). Returns ------- bin_lc : ndarray 1D array of binned flux values (shape: N_frames,). bin_lc_err : ndarray 1D array of binned flux uncertainties (same shape as ``bin_lc``). """ # shape of both arrays are: (nframes, ncols) weights = 1 / unbinned_lc_err**2 bin_lc = np.nansum(unbinned_lc * weights, axis=1) / np.nansum(weights, axis=1) bin_lc_err = ( 1 / np.nansum(weights, axis=1) ) * np.sqrt( np.nansum( (weights**2) * (unbinned_lc_err**2), axis=1 ) ) return bin_lc, bin_lc_err
[docs] def white_light_lc(self): """Compute the white-light (all-wavelength) light curve. Bins the entire 2D light curve (all wavelengths combined) into a single 1D time-series using inverse-variance weighting. Returns ------- white_lc : ndarray 1D array of white-light flux values. white_lc_err : ndarray 1D array of white-light flux uncertainties. """ return self.bin_lightcurve(unbinned_lc=self.lc, unbinned_lc_err=self.lc_errs)
# MultiProcessing helper function (does the actual fitting)
[docs] def fit_lc(self, lightcurves, ch_name): """Inteernal helper function for multiprocessing. This function fits lightcurve for one spectroscopic channel""" print('---------------------------------') print('Working on Channel: ' + ch_name) print('') # Output folder pout = self.pout + '/' + ch_name f15 = Path(pout + '/_dynesty_DNS_posteriors.pkl') f16 = Path(pout + '/model_resids.dat') f17 = Path(pout + '/posteriors.dat') if f15.exists() and f16.exists() and f17.exists(): print('>>>> --- The result files already exists...') print(' Continuing to the next channel...') res = np.zeros(10) else: # Extracting the data tim9, fl9, fle9 = lightcurves['times'], lightcurves['lc'], lightcurves['err'] # Removing Nan values tim7, fl7, fle7 = tim9[~np.isnan(fl9)], fl9[~np.isnan(fl9)], fle9[~np.isnan(fl9)] # Outlier removal #msk2 = utl.outlier_removal(tim7, fl7, fle7, clip=10, msk1=False) #tim7, fl7, fle7 = tim7[msk2], fl7[msk2], fle7[msk2] # Normalizing the lightcurve tim7, fl7, fle7 = tim7, fl7/np.median(fl7), fle7/np.median(fl7) # Making data such that juliet can understand tim, fl, fle = {}, {}, {} tim[ch_name], fl[ch_name], fle[ch_name] = tim7, fl7, fle7 if type( lightcurves['lins'] ) == type( None ): lin_pars = None else: lin_pars = {} lin_pars[ch_name] = lightcurves['lins'] if type( lightcurves['gp'] ) == type( None ): gp_pars = None else: gp_pars = {} gp_pars[ch_name] = lightcurves['gp'] # Fitting dataset = juliet.load(priors=self.priors(ch_name), t_lc=tim, y_lc=fl, yerr_lc=fle, linear_regressors_lc=lin_pars, GP_regressors_lc=gp_pars, out_folder=pout) res = dataset.fit(sampler = 'dynamic_dynesty', **self.juliet_fit_kwargs)#, nthreads=8) # Some plots model = res.lc.evaluate(ch_name) residuals = fl[ch_name]-model data12 = np.vstack((model, residuals)) np.savetxt(pout + '/model_resids.dat', np.transpose(data12)) print('>>>> --- Done!!') return res
# Function that does the multiprocessing def multi_fit_lcs(self, lightcurves, nthreads=4): input_data = [(lightcurves[lc], lc) for lc in lightcurves] with multiprocessing.Pool(nthreads) as p: result_list = p.starmap(self.fit_lc, input_data) return np.array(result_list)
[docs] def analyse_lc_parallel(self, nthreads, ch_nos=None, juliet_fit_kwargs={}): """End-to-end parallel analysis: bin, organize, and fit lightcurves from all channels. This high-level wrapper orchestrates the full analysis pipeline: 1. Bins the spectral light curve into channels via :meth:`generating_lightcurves` 2. Organizes the data into a format suitable for fitting 3. Calls :meth:`multi_fit_lcs` to fit all channels in parallel Parameters ---------- nthreads : int Number of parallel threads to use for fitting. ch_nos : int or None, optional Number of wavelength channels to bin into. If ``None``, use native wavelength resolution. Default is ``None``. juliet_fit_kwargs : dict, optional Keyword arguments to forward to ``juliet.dataset.fit(...)``, such as ``sampler='dynamic_dynesty'``, ``nthreads=8``, etc. Default is an empty dictionary. Returns ------- None Results are written to ``pout`` and its subdirectories. Notes ----- Requires that ``self.priors`` is a callable that returns a priors for each channel name. """ # Setting juliet_fit_kwargs self.juliet_fit_kwargs = juliet_fit_kwargs # Generating the data ## Bin the data, if necessary self.generating_lightcurves(ch_nos=ch_nos) # Storing all lightcurves in a big dictionary all_lightcurve_data = {} for i in range( self.spectral_lcs_dict['lc'].shape[1] ): # Storing all lightcurves in a big dictionary all_lightcurve_data['CH' + str(i)] = {} ## And now storing the actual lightcurve data all_lightcurve_data['CH' + str(i)]['times'] = self.times all_lightcurve_data['CH' + str(i)]['lc'] = self.spectral_lcs_dict['lc'][:,i] all_lightcurve_data['CH' + str(i)]['err'] = self.spectral_lcs_dict['err'][:,i] ## Storing linear and GP parameters, if provided (None, otherwise) all_lightcurve_data['CH' + str(i)]['lins'] = self.lin_pars all_lightcurve_data['CH' + str(i)]['gp'] = self.gp_pars # Now, analysing them _ = self.multi_fit_lcs(all_lightcurve_data, nthreads=nthreads)
[docs] def plot_parameter_spectrum(self, parameter, bins=None, post_size=10000, bin_method='mean', ppm=False, plot_white=False): """Plot a fitted parameter as a function of wavelength (a spectrum). Loads posterior samples for a parameter from all fitted channels, optionally re-bins them in the wavelength direction, and plots median with credible intervals as a function of wavelength. Parameters ---------- parameter : str Name of the parameter to plot (e.g., 'p_p1', 'fp_p1', 'C1_p1'). Should match the parameter naming convention used by juliet. bins : int or None, optional Number of wavelength bins to use for the output spectrum. If ``None`` or equal to the native number of channels, no re-binning is performed. Default is ``None``. post_size : int, optional Number of posterior samples to draw from each channel. Default is 10000. bin_method : {'mean', 'median', 'weighted_average'}, optional Method for combining posteriors across wavelength bins. Default is ``'mean'``. ppm : bool, optional If ``True``, scale parameter values to ppm (multiply by 1e6). Useful for small parameters like occultation depths. Default is ``False``. plot_white : bool, optional If ``True``, overplot the white-light (all-wavelength) parameter value as a horizontal band with credible interval. Default is ``False``. Returns ------- fig : matplotlib.figure.Figure The created figure. axs : matplotlib.axes.Axes The plot axes. Notes ----- Posterior samples are loaded from files named like ``pout/CH<i>/*_posteriors.pkl``. """ ## First, let's put all posteriros in a big 2D array all_posteriors = [] for i in range( self.spectral_lcs_dict['lc'].shape[1] ): fname = glob( self.pout + '/CH' + str(i) + '/*_posteriors.pkl' )[0] post = pickle.load( open(fname, 'rb') ) all_posteriors.append( np.random.choice( post['posterior_samples'][parameter + '_CH' + str(i)], size=post_size ) ) posteriors = np.transpose( np.vstack( all_posteriors ) ) ## Calculating the white light parameter value if bin_method == 'mean': white_post = np.mean( posteriors, axis=1 ) elif bin_method == 'median': white_post = np.median( posteriors, axis=1 ) elif bin_method == 'weighted_average': white_post = np.average( posteriors, axis=1, weights=1/np.std(posteriors, axis=0)**2 ) else: raise ValueError('>>> --- bin_method not recognized. Please use mean, median or weighted_average.') ### Quantiles self.qua_white = juliet.utils.get_quantiles( white_post ) if ( bins == self.spectral_lcs_dict['lc'].shape[1] ) or ( bins == None ): ## Wavelengths self.wav, self.wav_bin = self.spectral_lcs_dict['wave'], self.spectral_lcs_dict['wave_bin']/2 ## If bin size is the same as the column size, then we don't perform any binning self.pars_med, self.pars_up, self.pars_lo = np.zeros( self.spectral_lcs_dict['lc'].shape[1] ), np.zeros( self.spectral_lcs_dict['lc'].shape[1] ), np.zeros( self.spectral_lcs_dict['lc'].shape[1] ) for i in range( self.spectral_lcs_dict['lc'].shape[1] ): ## And, appending parameter values qua = juliet.utils.get_quantiles( posteriors[:,i] ) self.pars_med[i], self.pars_up[i], self.pars_lo[i] = qua[0], qua[1] - qua[0], qua[0] - qua[2] else: ## Wavelengths native_wav, native_wav_bin = self.spectral_lcs_dict['wave'], self.spectral_lcs_dict['wave_bin'] # Column location, for low resolution eclipse spectrum col_in_1_ch = round( len(native_wav) / bins ) col_st = np.arange(0, len(native_wav)-col_in_1_ch, col_in_1_ch, dtype=int) col_end = np.arange(0+col_in_1_ch, len(native_wav), col_in_1_ch, dtype=int) if col_end[-1] != len(native_wav): col_st = np.hstack(( col_st, col_end[-1] )) col_end = np.hstack(( col_end, len(native_wav) )) # For binning the spectrum self.pars_med, self.pars_up, self.pars_lo = np.zeros( len(col_st) ), np.zeros( len(col_st) ), np.zeros( len(col_st) ) self.wav, self.wav_bin = np.zeros( len(col_st) ), np.zeros( len(col_end) ) # And performing actual binning for i in range(len(col_st)): ## Calculating the median of the posteriors in the bin if bin_method == 'mean': fp12 = np.mean( posteriors[:,col_st[i]:col_end[i]], axis=1 ) elif bin_method == 'median': fp12 = np.median( posteriors[:,col_st[i]:col_end[i]], axis=1 ) elif bin_method == 'weighted_average': fp12 = np.average( posteriors[:,col_st[i]:col_end[i]], axis=1, weights=1/np.std( posteriors[:,col_st[i]:col_end[i]], axis=0 )**2 ) else: raise ValueError('>>> --- bin_method not recognized. Please use mean, median or weighted_average.') qua_fp12 = juliet.utils.get_quantiles(fp12) self.pars_med[i], self.pars_up[i], self.pars_lo[i] = qua_fp12[0], qua_fp12[1]-qua_fp12[0], qua_fp12[0]-qua_fp12[2] # For wavelength bins if col_end[i] != len(native_wav): self.wav[i] = ( native_wav[col_st[i]] + native_wav[col_end[i]] ) / 2 self.wav_bin[i] = np.abs( native_wav[col_st[i]] - native_wav[col_end[i]] ) / 2 else: self.wav[i] = ( native_wav[col_st[i]] + native_wav[col_end[i]-1] ) / 2 self.wav_bin[i] = np.abs( native_wav[col_st[i]] - native_wav[col_end[i]-1] ) / 2 # --------------------------------------- #. Now, plotting the spectrum # --------------------------------------- if ppm: self.pars_med *= 1e6 self.pars_up *= 1e6 self.pars_lo *= 1e6 self.qua_white = ( self.qua_white[0]*1e6, self.qua_white[1]*1e6, self.qua_white[2]*1e6 ) fig, axs = plt.subplots() axs.errorbar(self.wav, self.pars_med, yerr=[self.pars_lo, self.pars_up], xerr=self.wav_bin, fmt='o', c='orangered', mfc='white', elinewidth=1.5, capthick=1.5, capsize=3.) if plot_white: wav12 = np.linspace( min(self.wav)-max(self.wav_bin)*2, max(self.wav)+max(self.wav_bin)*2, 1000 ) axs.axhline(self.qua_white[0], color='cornflowerblue', ls='-', zorder=10) axs.fill_between(wav12, self.qua_white[2], self.qua_white[1], color='cornflowerblue', alpha=0.3) axs.set_xlim( min(self.wav)-max(self.wav_bin)*1.5, max(self.wav)+max(self.wav_bin)*1.5 ) axs.set_xlabel('Wavelength') axs.set_ylabel(parameter) axs.set_title(f'Spectral {parameter} spectrum') return fig, axs
[docs] def plot2Ddata(self, cmap='plasma'): """Create a 2D heatmap of spectral light curves (flux vs time) vs wavelength. Parameters ---------- cmap : str, optional Matplotlib colormap name. Default is ``'plasma'``. Returns ------- fig : matplotlib.figure.Figure The created figure. ax1 : matplotlib.axes.Axes The plot axes. im1 : matplotlib.image.AxesImage The image object. cbar : matplotlib.colorbar.Colorbar The colorbar object. """ # Adjusting the times to time since beginning in hours tim = ( self.times - self.times[0] ) * 24 ## Preparing the data norm_lcs = np.copy( self.lc ) fig, ax1 = plt.subplots( figsize=(7, 7) ) # Data im1 = ax1.imshow(norm_lcs.T, interpolation='none', cmap=cmap, aspect = 'auto', extent=[tim[0], tim[-1], self.wavelengths[-1], self.wavelengths[0]]) im1.set_clim( [ np.nanmedian(norm_lcs) - 5*mad_std(norm_lcs), np.nanmedian(norm_lcs) + 5*mad_std(norm_lcs) ] ) plt.ylabel(r'Wavelength [$\mu$m]') plt.xlabel('Time from beginning [h]') # Colorbar: divider = make_axes_locatable(ax1) cax = divider.append_axes("right", size="5%", pad=0.05) cbar = fig.colorbar(im1, shrink = 0.08, cax=cax) cbar.ax.get_yaxis().labelpad = 20 cbar.ax.set_ylabel('Relative flux', rotation=270)#, fontsize = 15) return fig, ax1, im1, cbar
[docs] def plot2D_data_model_resids(self, cmap='plasma', detrend=False, binwidth=None, ppm=False, **kwargs): """Create side-by-side 2D heatmaps of data, model, and residuals. Loads the fitted models and residuals from all channels and creates three synchronized heatmaps showing the full spectro-temporal evolution of the data, fit quality, and systematics. Parameters ---------- cmap : str, optional Matplotlib colormap name. Default is ``'plasma'``. detrend : bool, optional If ``True``, load and plot detrended data (subtracting GP and linear systematics). Requires additional ``juliet`` processing and may be slow. If ``False``, plot raw data. Default is ``False``. binwidth : float or None, optional Time binwidth (in days) for additional temporal binning of the data/model/residuals. If ``None``, no binning is performed. Default is ``None``. ppm : bool, optional If ``True``, express data and model in ppm relative to unity. Residuals are always in ppm. Default is ``False``. **kwargs : dict Forwarded to the ``julietPlots`` constructor when ``detrend=True`` (e.g., ``N=1000`` for the number of posterior samples). Returns ------- fig : matplotlib.figure.Figure The created figure with three subplots. axs : ndarray Array of three Axes objects (data, model, residuals). im_list : list List of three AxesImage objects (useful for colorbars). """ # binwidth = None means that you should not perform any binning (in time) # We don't perform any binning in wavelength direction -- please do that before fitting the light curves # Detrend = True detrends the data and plot detrended model instead. data = np.zeros( ( len(self.spectral_lcs_dict['wave']), len(self.times) ) ) models = np.zeros( data.shape ) resids = np.zeros( data.shape ) ## Loading the wavelengths waves = self.spectral_lcs_dict['wave'] min_mod, max_mod = 1e10, -1e10 for c in range( self.spectral_lcs_dict['lc'].shape[1] ): if not detrend: ## First, we need data tim, fl = np.loadtxt( self.pout + '/CH' + str(c) + '/lc.dat', usecols=(0,1), unpack=True ) ## And the model and residuals model, resid = np.loadtxt( self.pout + '/CH' + str(c) + '/model_resids.dat', usecols=(0,1), unpack=True ) else: ## Well, if detrend = True, then we need to detrend the data (that can take a while) data = julietPlots(input_folder=self.pout + '/CH' + str(c), **kwargs) ## We now need to detrend the data and the calculate the detrended model data.detrend_data(phmin=0.8, instruments='CH' + str(c)) data.detrend_model(phmin=0.8, instruments='CH' + str(c), highres=False) # Time data ## Array that sort any array according to time idx_time = np.argsort( data.dataset.times_lc['CH' + str(c)] ) tim = data.dataset.times_lc['CH' + str(c)][idx_time] resid = ( data.dataset.data_lc['CH' + str(c)] - data.models_all_ins['CH' + str(c)][1] )[idx_time] ## Detrended dataset fl = data.detrended_data['CH' + str(c)][idx_time] ## Detrended model model = data.planet_only_models['CH' + str(c)][1][idx_time] if ppm: fl, model = ( fl - 1. ) * 1e6, ( model - 1. ) * 1e6 ## Residuals are always in ppm resid = resid * 1e6 if binwidth is not None: _, fl, _, _ = lcbin(time=tim, flux=fl, binwidth=binwidth) _, model, _, _ = lcbin(time=tim, flux=model, binwidth=binwidth) _, resid, _, _ = lcbin(time=tim, flux=resid, binwidth=binwidth) # And putting them in a big array data[c, :], models[c, :], resids[c, :] = fl, model, resid ## Calculating the minimum and maximum values of the model if np.nanmin(model) < min_mod: min_mod = np.nanmin(model) if np.nanmax(model) > max_mod: max_mod = np.nanmax(model) # And, finally, the plotting # Convering time to hours tim = ( tim - tim[0] ) * 24 # Colorbars cmin1, cmax1 = min_mod-0.005*min_mod, max_mod+0.005*max_mod norm1 = matplotlib.colors.Normalize(vmin=cmin1, vmax=cmax1, clip=True) mapper1 = cm.ScalarMappable(norm=norm1, cmap=cmap) mapper1.set_array([]) cmin2, cmax2 = np.nanmedian(resids)-3*mad_std(resids), np.nanmedian(resids)+3*mad_std(resids) norm2 = matplotlib.colors.Normalize(vmin=cmin2, vmax=cmax2, clip=True) mapper2 = cm.ScalarMappable(norm=norm2, cmap=cmap) mapper2.set_array([]) # And plotting it fig, axs = plt.subplots(nrows=1, ncols=3, figsize=(15,5), sharex=True, sharey=True) ## First box: plotting the data im1 = axs[0].imshow(data, aspect='auto', cmap=cmap, interpolation='none', extent=[tim[0], tim[-1], waves[-1], waves[0]]) im1.set_clim([cmin1, cmax1]) ## Second box: plotting the model im2 = axs[1].imshow(models, aspect='auto', cmap=cmap, interpolation='none', extent=[tim[0], tim[-1], waves[-1], waves[0]]) im2.set_clim([cmin1, cmax1]) ## First colorbar cbar1 = plt.colorbar(mapper1, ax=np.array([axs[0],axs[1]]))#, location='top') if ppm: cbar1.set_label('Relative flux [ppm]', rotation=270, labelpad=20) else: cbar1.set_label('Relative flux', rotation=270, labelpad=20) cbar1.set_ticks(ticks=np.linspace(cmin1, cmax1, 7)) cbar1.set_ticklabels(ticklabels=[ "{:.3f}".format( i ) for i in np.linspace(cmin1, cmax1, 7) ] ) im3 = axs[2].imshow(resids, aspect='auto', cmap=cmap, interpolation='none', extent=[tim[0], tim[-1], waves[-1], waves[0]]) im3.set_clim([cmin2, cmax2]) cbar2 = plt.colorbar(mapper2, ax=axs[2])#, location='top') cbar2.set_label('Residuals [ppm]', rotation=270, labelpad=20) cbar2.set_ticks( ticks=np.linspace(cmin2, cmax2, 7) ) cbar2.set_ticklabels(ticklabels=[ "{:d}".format( int(i) ) for i in np.linspace(cmin2, cmax2, 7) ] ) fig.supxlabel('Time since beginning [hr]', fontsize=15) axs[0].set_ylabel(r'Wavelength [$\mu$m]') if detrend: axs[0].set_title('Detrended data') else: axs[0].set_title('Data') axs[1].set_title('Model') axs[2].set_title('Residuals') return fig, axs, [im1, im2, im3]
[docs] def joint_fake_allan_deviation(self, binmax=10, method='pipe', timeunit=None): """Plot combined noise-vs-binning curves across all spectroscopic channels. Computes the "Allan deviation" (actually the MAD-based noise as a function of binning) for each channel's residuals and overlays all curves on a single plot. Also shows the white-noise expectation curve. Parameters ---------- binmax : int, optional Maximum number of bins to test. Default is 10. method : {'pipe', 'std', 'rms', 'astropy'}, optional Noise estimator to use (passed to :func:`utils.fake_allan_deviation`). Default is ``'pipe'`` (MAD-based). timeunit : {'d', 'hr', 'min'} or None, optional Time unit for the secondary x-axis. If ``None``, chosen automatically based on data span. Default is ``None``. Returns ------- fig : matplotlib.figure.Figure The created figure. axs : matplotlib.axes.Axes The plot axes (with primary x-axis for bin size and secondary x-axis for time period). """ # Creating a figure outside of the loop fig, axs = plt.subplots() ## Looping over all channels for c in tqdm(range( self.spectral_lcs_dict['lc'].shape[1] )): times = np.loadtxt( self.pout + '/CH' + str(c) + '/lc.dat', usecols=0, unpack=True ) resids = np.loadtxt( self.pout + '/CH' + str(c) + '/model_resids.dat', usecols=1, unpack=True ) _, _, binsize, noise, white_noise_expec = fake_allan_deviation(times=times, residuals=resids, binmax=binmax, method=method, timeunit=timeunit, plot=False) ## And plotting it over the main plot ## First plotting the computed noise if c == 0: label1, label2 = 'Computed noise', 'White-noise expectation' else: label1, label2 = None, None axs.plot(binsize, noise, color='dodgerblue', lw=0.7, label=label1, zorder=10) ## Now plotting the white-noise expectation axs.plot(binsize, white_noise_expec, color='orangered', lw=1., label=label2, zorder=20) # Converting binsize to time units time_binsize = binsize * np.nanmedian( np.diff(times) ) # First, let's estimate the time units we need from times array: if timeunit is None: if np.ptp(time_binsize) >= 1.: ## If times duration is greater than 2 hours, we use days time_unit_multiplication_factor = 1. # Units are already in days time_unit_label = 'Time period [d]' elif ( np.ptp(time_binsize) > 5/24 ) and ( np.ptp(time_binsize) < 1. ): ## If times duration is greater than 2 hours, but less than 2 days, we use hours time_unit_multiplication_factor = 24. # Converting days to hours time_unit_label = 'Time period [hr]' else: ## If times duration is less than 2 hours, we use minutes time_unit_multiplication_factor = 24 * 60 time_unit_label = 'Time period [min]' else: if timeunit == 'd': time_unit_multiplication_factor = 1. time_unit_label = 'Time period [d]' elif timeunit == 'hr': time_unit_multiplication_factor = 24. time_unit_label = 'Time period [hr]' elif timeunit == 'min': time_unit_multiplication_factor = 24 * 60 time_unit_label = 'Time period [min]' else: raise ValueError("timeunit should be one of 'd', 'hr', or 'min'.") # The following two functions will convert binsize to time (in minutes) and vice versa def bin2time(binsize): return binsize * np.nanmedian(np.diff(times)) * time_unit_multiplication_factor def time2bin(bintime): return bintime / ( np.nanmedian(np.diff(times)) * time_unit_multiplication_factor ) ## Adding secondary x-axis for time secax = axs.secondary_xaxis('top', functions=(bin2time, time2bin)) secax.set_xlabel(time_unit_label, labelpad=10) axs.set_xlim([ np.min(binsize), np.max(binsize) ]) axs.set_xscale('log') axs.set_yscale('log') axs.set_xlabel('Bin size [Number of points]') axs.set_ylabel('Noise estimate [ppm]') axs.legend() return fig, axs
[docs] class InvertCowanAgolPC(object): """Invert phase curve observations to physical properties using Cowan & Agol (2008) model. This class implements tools to analyze phase-curve observations, computing thermal properties (temperature maps, day/night temperatures, Bond albedo, heat re-distribution efficiencies) and orbital properties (hotspot offset, phase offset) using the parameterized phase-curve model of Cowan & Agol (2008). The class leverages the model's Fourier expansion coefficients (A0, A1, B1, A2, B2) derived from eclipse depths (E) and phase-curve coefficients (C1, D1, C2, D2) to construct 2D thermal maps and compute various physical properties across posterior samples for uncertainty quantification. Parameters ---------- E : ndarray Array of eclipse depths (F_p/F_*) from posterior samples. C1 : ndarray Phase-curve cosine coefficient (first harmonic) from posterior samples. D1 : ndarray Phase-curve sine coefficient (first harmonic) from posterior samples. C2 : ndarray Phase-curve cosine coefficient (second harmonic) from posterior samples. D2 : ndarray Phase-curve sine coefficient (second harmonic) from posterior samples. rprs : float or ndarray Planet-to-star radius ratio. If scalar, same value used for all samples; if array, sampled from for each posterior sample. bandpass : dict, optional Dictionary containing instrument bandpass data with keys 'WAVE' and 'RESPONSE'. Required for computing brightness temperatures. Values must be Astropy Quantities. Default is ``None``. method : str, optional Optimization method for fitting temperatures to flux (passed to ``scipy.optimize.minimize``). Default is ``'Nelder-Mead'``. teff_star : float (astropy.units.Quantity) or None, optional Stellar effective temperature in Kelvin. Required for computing Bond albedo, heat redistribution efficiency, and average day/night temperatures. If not provided, a warning is printed. Default is ``None``. stellar_spec : dict or None, optional Dictionary containing stellar spectrum with keys 'WAVE' and 'FLUX'. Used to interpolate stellar flux at bandpass wavelengths. If provided, overrides ``teff_star``-based blackbody spectrum. Values must be Astropy Quantities. Default is ``None``. nthreads : int, optional Number of CPU threads to use for parallel processing of temperature maps and multiprocessing operations. Default is the total number of available CPU cores (``multiprocessing.cpu_count()``). ntheta : int, optional Number of grid points across latitude (from -π/2 to π/2). Default is ``49``. nphi : int, optional Number of grid points across longitude (from -π to π). Default is ``100``. pout : str, optional Output directory for saving intermediate results (temperature maps, brightness temperatures, etc.). Default is the current working directory. Attributes ---------- E, C1, D1, C2, D2 : ndarray Stored input parameters (eclipse depths and phase-curve coefficients). rprs : float or ndarray Stored planet-to-star radius ratio. bandpass, method, teff_star, stellar_spec : as above Stored input parameters for spectral calculations. nthreads, ntheta, nphi, pout : as above Stored configuration parameters. A0, A1, B1, A2, B2 : ndarray Fourier coefficients derived from input parameters according to Cowan & Agol (2008) model equations. wav_star, fl_star : Astropy Quantity Stellar wavelength array and flux spectrum (blackbody or interpolated). wav_instrument, response_instrument, trans_fun : Astropy Quantity or ndarray Instrument bandpass wavelengths, response curve, and normalized transmission function evaluated on stellar wavelengths. phi_ang, theta_ang : ndarray 1D arrays of longitude and latitude grid points. temp_map : ndarray 3D array of temperature maps (shape: N_samples, N_phi, N_theta) computed when :meth:`temperature_map_distribution` is called. Main methods ------------ TdayTnight() Compute dayside and nightside brightness temperatures. calculate_0D_Ab_eps(ar) Compute 0D Bond albedo and heat redistribution efficiency from brightness temperatures. phase_offsets(method='root') Compute hotspot offset and phase offset from phase curve. temperature_map_distribution(nsamples=2000) Compute 2D temperature map distribution across posterior samples. median_temperature_map(plot=False, cmap='plasma') Compute and optionally plot median temperature map. equatorial_temp_map(plot=False, nsamples=2000) Compute and optionally plot temperature along equator. forward_phase_curve_model(plot=False) Compute forward-model phase curve from 2D flux distribution. albedo_eps_from_temp_map(a_by_Rst, uniform_nightside_temp=True, nsamples=2000) Compute Bond albedo and heat redistribution from temperature map distribution. albedo_eps_from_fpfs_map(a_by_Rst) Compute Bond albedo and heat redistribution from flux map distribution. albedo_from_kempton23(a_by_Rst, teq) Compute Bond albedo using Kempton et al. (2023) method. Notes ----- - All temperature computations use the bandpass-weighted brightness temperature assuming blackbody emission. - The class uses multiprocessing for computationally intensive operations (e.g., temperature map fitting). - Intermediate results are cached as .npy files in ``pout`` to avoid re-computation. - The Cowan & Agol (2008) model assumes a parameterized intensity map I(φ, θ) with latitudinal and longitudinal dependencies. References ---------- - Cowan, N. B., & Agol, E. 2008. Inverting Phase Functions to Map Exoplanets. The Astrophysical Journal Letters, 678(2), L129. - Kempton, E. M. R., et al. 2023. A reflective, metal-rich atmosphere for GJ 1214b from its JWST phase curve, Nature, 620(7972), 67 - Keating, D., Cowan, N. B., & Dang, L. 2019. Uniformly hot nightside temperatures on short-period gas giants. Nature Astronomy, 3, 1092. Examples -------- Analyze eclipse depths and phase curve to extract thermal properties: >>> # Load posterior samples >>> E = posterior_samples['eclipse_depth'] # Array of eclipse depths >>> C1 = posterior_samples['phase_c1'] >>> D1 = posterior_samples['phase_d1'] >>> C2 = posterior_samples['phase_c2'] >>> D2 = posterior_samples['phase_d2'] >>> rprs = posterior_samples['rprs'] >>> # Define bandpass and stellar spectrum >>> bandpass = load_bandpass('TESS') # Must have 'WAVE' and 'RESPONSE' keys >>> teff_star = 5800 * u.K # Kelvin >>> # Create the inversion object >>> invert = InvertCowanAgolPC(E=E, C1=C1, D1=D1, C2=C2, D2=D2, rprs=rprs, ... bandpass=bandpass, teff_star=teff_star) >>> # Compute temperature maps >>> temp_maps = invert.temperature_map_distribution(nsamples=1000) >>> # Compute albedos and heat redistribution >>> a_by_rst = 10.0 # Semi-major axis in units of stellar radius >>> A_B, eps_kelp, T_day, T_night, eps_keating = invert.albedo_eps_from_temp_map(a_by_rst) """ def __init__(self, E, C1, D1, C2, D2, rprs, bandpass=None, method='Nelder-Mead', teff_star=None, stellar_spec=None,\ nthreads=multiprocessing.cpu_count(), ntheta=49, nphi=100, pout=os.getcwd()): ## Theta is along the latitude, phi is along the longitude # Initializing the class with the parameters of the Cowan & Agol (2008) phase curve model and output folder. self.E = E self.C1, self.D1, self.C2, self.D2 = C1, D1, C2, D2 self.rprs = rprs self.bandpass = bandpass self.method = method self.teff_star = teff_star self.stellar_spec = stellar_spec self.nthreads = nthreads self.ntheta, self.nphi = ntheta, nphi self.pout = pout ## Calculating the A coefficents of the Cowan & Agol (2008) phase curve model from the input parameters self.A0 = (self.E - self.C1 - self.C2) / 2 self.A1 = 2 * self.C1 / np.pi self.B1 = -2 * self.D1 / np.pi self.A2 = 3 * self.C2 / 2 self.B2 = -3 * self.D2 / 2 # prepare stellar spectrum if (self.stellar_spec is not None) and (self.teff_star is None): self.wav_star = self.stellar_spec['WAVE'].to(u.micron) self.fl_star = self.stellar_spec['FLUX'].to(u.J / u.s / u.m**2 / u.micron) elif (self.teff_star is not None) and (self.stellar_spec is None): self.wav_star = bandpass['WAVE'].to(u.micron) self.fl_star = planck_func(self.wav_star, self.teff_star) else: print('>>> --- It looks like you have not provided either stellar_spec or teff_star.\n Please provide one of them to prepare the stellar spectrum.\n Otherwise we will not be able to calculate the brightness temperatures.') if self.teff_star is None: print('>>> --- It looks like you have not provided the stellar effective temperature.\n Please provide it if you want to calculate the Bond albedo and heat re-distribution efficiency, as well as the average dayside and nightside temperatures.') # Prepare the bandpass if self.bandpass is not None: self.wav_instrument = self.bandpass['WAVE'].to(u.micron) self.response_instrument = self.bandpass['RESPONSE'] # build transmission evaluated on stellar wavelengths spln2 = interp1d(x=self.wav_instrument.value, y=self.response_instrument, bounds_error=False, fill_value=0.0) self.trans_fun = spln2(self.wav_star.value) # guard against division by zero if response all zeros if np.max(self.trans_fun) > 0: self.trans_fun = self.trans_fun / np.max(self.trans_fun) else: print('>>> --- It looks like you have not provided the instrument bandpass.\n Please provide one if you want to calculate the brightness temperatures.') # Latitude and longitude grids for the temperature map self.phi_ang = np.linspace(-np.pi, np.pi, self.nphi) self.theta_ang = np.linspace(-np.pi/2, np.pi/2, self.ntheta)
[docs] def TdayTnight(self): """Compute 0D dayside and nightside brightness temperatures. Calculates integrated brightness temperatures for the dayside and nightside using the eclipse depths and phase-curve parameters. Results are cached to disk to avoid re-computation. Uses the `BrightnessTemperatureCalculator` helper class for computation. The dayside brightness temperature is derived directly from the eclipse depth, while the nightside is estimated from E - 2*C1. Returns ------- bt_day : ndarray Dayside brightness temperatures (in Kelvin) for each posterior sample. Shape: (N_samples,). bt_night : ndarray Nightside brightness temperatures (in Kelvin) for each posterior sample. Shape: (N_samples,). Notes ----- - Results are cached to ``{pout}/Brightness_temp_day.npy`` and ``{pout}/Brightness_temp_night.npy``. - Requires that ``self.bandpass``, ``self.teff_star`` (or ``self.stellar_spec``), are properly set during initialization. - This function should be called before :meth:`calculate_0D_Ab_eps`. Examples -------- >>> invert = InvertCowanAgolPC(E=E, C1=C1, D1=D1, C2=C2, D2=D2, ... rprs=0.1, bandpass=bp, teff_star=5800 * u.K) >>> bt_day, bt_night = invert.TdayTnight() >>> print(f"Dayside brightness temperature: {np.median(bt_day)} K") """ # This function calculates the brightness temperatures of the dayside fp_day = np.copy( self.E ) fp_night = self.E - ( 2 * self.C1 ) # ---------- Dayside brightness temperature ---------- # If the file already exists, we don't need to calculate it again fname = self.pout + '/Brightness_temp_day.npy' if os.path.isfile(fname): print('>>>> --- The dayside brightness temperature file already exists...') print(' Loading it...') bt_day = np.load(fname) else: # And now we will use the BrightnessTemperatureCalculator class to calculate the brightness temperatures btc_day = BrightnessTemperatureCalculator( fp=fp_day, rprs=self.rprs, bandpass=self.bandpass, nthreads=self.nthreads,\ method=self.method, teff_star=self.teff_star, stellar_spec=self.stellar_spec, pout=self.pout ) bt_day = btc_day.compute() ## Renaming the saved file to make it clear that it is the dayside brightness temperature os.rename( self.pout + '/Brightness_temp.npy', self.pout + '/Brightness_temp_day.npy' ) # ---------- Nightside brightness temperature ---------- # If the file already exists, we don't need to calculate it again fname = self.pout + '/Brightness_temp_night.npy' if os.path.isfile(fname): print('>>>> --- The nightside brightness temperature file already exists...') print(' Loading it...') bt_night = np.load(fname) else: # And now we will use the BrightnessTemperatureCalculator class to calculate the brightness temperatures btc_night = BrightnessTemperatureCalculator( fp=fp_night, rprs=self.rprs, bandpass=self.bandpass, nthreads=self.nthreads,\ method=self.method, teff_star=self.teff_star, stellar_spec=self.stellar_spec, pout=self.pout ) bt_night = btc_night.compute() ## Renaming the saved file to make it clear that it is the nightside brightness temperature os.rename( self.pout + '/Brightness_temp.npy', self.pout + '/Brightness_temp_night.npy' ) return bt_day, bt_night
[docs] def calculate_0D_Ab_eps(self, ar): """Compute Bond albedo and heat redistribution efficiency from 0D brightness temperatures. Calculates the 0D (calculated from spatially averaged temperatures) Bond albedo and heat redistribution efficiency using the dayside and nightside brightness temperatures computed from :meth:`TdayTnight`. This implementation uses the relationship between the planet's temperature, stellar radiation, and thermal emission to infer physical properties. Parameters ---------- ar : float, or ndarray Semi-major axis in units of stellar radius (a/R_*). Returns ------- A_B : ndarray Bond albedo for each posterior sample (unitless). Shape: (N_samples,). Values typically range from 0 to 1. eps : ndarray Heat redistribution efficiency for each posterior sample (unitless). Shape: (N_samples,). Values typically range from 0 to 1. Notes ----- - Requires :meth:`TdayTnight` to be called first (or this function will call it). - Assumes the 0D approximation where the planet is treated as having uniform dayside and nightside temperatures. Examples -------- >>> bt_day, bt_night = invert.TdayTnight() >>> A_Bond, eps = invert.calculate_0D_Ab_eps(ar=10.0) >>> print(f"Bond albedo: {np.median(A_Bond):.3f}") >>> print(f"Heat redistribution: {np.median(eps):.3f}") """ # This function calculates the 0D albedo and recirculation efficiency from the brightness temperatures T_day, T_night = self.TdayTnight() T0 = self.teff_star.value / np.sqrt(ar) Td_by_Tn_4 = ( T_day / T_night ) ** 4 Tn_by_T0_4 = ( T_night / T0 ) ** 4 eps = 8 / ( (3 * Td_by_Tn_4) + 5 ) A_B = 1 - ( 0.5 * Tn_by_T0_4 * ( 3*Td_by_Tn_4 + 5 )) return A_B, eps
def _find_hotspot_off(self, a1, b1, a2, b2, method): """Find the hotspot offset from phase-curve coefficients. Helper method that solves for the longitude (phase angle) where the brightness temperature reaches its maximum by finding where dFp/dφ = 0. The hotspot offset quantifies how far the longitude of maximum temperature has shifted from the substellar point (φ = 0). Parameters ---------- a1 : float First-harmonic cosine coefficient (A1). b1 : float First-harmonic sine coefficient (B1). a2 : float Second-harmonic cosine coefficient (A2). b2 : float Second-harmonic sine coefficient (B2). method : str Root-finding method: ``'root'`` uses ``scipy.optimize.root`` or ``'fsolve'`` uses ``scipy.optimize.fsolve``. Returns ------- off_loc : float Hotspot offset in degrees. The location of the brightness maximum relative to the substellar point. Notes ----- Finds the solution to: $$ \\frac{dF_p}{d\\varphi} = -a_1 \\sin(\\varphi) + b_1 \\cos(\\varphi) - 2a_2 \\sin(2\\varphi) + 2b_2 \\cos(2\\varphi) = 0 $$ Examples -------- >>> off = invert._find_hotspot_off(a1, b1, a2, b2, method='root') """ # Helper function to calculate the hotspot offset # First writing a function solving which will give the location of the hotspot offset # So, what I did was to compute Fp/F* as a function of phi, and solve for dFp/dphi = 0 def dfpdphi(phi, a1, b1, a2, b2): dfphi = ( -a1 * np.sin(phi) ) + (b1 * np.cos(phi) ) + (-2 * a2 * np.sin(2*phi) ) + (2 * b2 * np.cos(2*phi) ) return dfphi # Now solving it, based on the method if method == 'root': soln = root(fun=dfpdphi, x0=0., args=(a1, b1, a2, b2)) off_loc = np.rad2deg(soln.x[0]) elif method == 'fsolve': soln = fsolve(func=dfpdphi, x0=0, args=(a1, b1, a2, b2)) off_loc = np.rad2deg(soln[0]) return off_loc def _find_phase_off(self, c1, d1, c2, d2, method): """Find the phase offset from phase-curve coefficients. Helper method that solves for the phase angle where the observed flux reaches its maximum by finding where dFp/dωt = 0. The phase offset quantifies how far the peak of the phase curve has shifted from the secondary eclipse (ωt = π). Parameters ---------- c1 : float First-harmonic cosine coefficient (C1) from phase-curve fitting. d1 : float First-harmonic sine coefficient (D1) from phase-curve fitting. c2 : float Second-harmonic cosine coefficient (C2) from phase-curve fitting. d2 : float Second-harmonic sine coefficient (D2) from phase-curve fitting. method : str Root-finding method: ``'root'`` uses ``scipy.optimize.root`` or ``'fsolve'`` uses ``scipy.optimize.fsolve``. Returns ------- off_phs_deg : float Phase offset in degrees relative to ωt = π (secondary eclipse). Negative offsets indicate the peak occurs before eclipse; positive offsets indicate it occurs after eclipse. Notes ----- Finds the solution to: $$ \\frac{dF_p}{d\\omega t} = -c_1 \\sin(\\omega t) + d_1 \\cos(\\omega t) - 2c_2 \\sin(2\\omega t) + 2d_2 \\cos(2\\omega t) = 0 $$ Examples -------- >>> phase_off = invert._find_phase_off(c1, d1, c2, d2, method='root') >>> print(f"Phase offset: {phase_off} degrees") """ # Another helper function to calculate the phase offset # First writing a function solving which will give the location of the hotspot offset # So, what I did was to compute Fp/F* as a function of phi, and solve for dFp/dphi = 0 def dfpdwt(wt, c1, d1, c2, d2): dfphi = ( -c1 * np.sin(wt) ) + (d1 * np.cos(wt) ) + ( -2 * c2 * np.sin(2*wt) ) + ( 2 * d2 * np.cos(2*wt) ) return dfphi # Now solving it, based on the method if method == 'root': soln = root(fun=dfpdwt, x0=0., args=(c1, d1, c2, d2)) off_loc_wt = soln.x[0] elif method == 'fsolve': soln = fsolve(func=dfpdwt, x0=0, args=(c1, d1, c2, d2)) off_loc_wt = soln[0] phs = ( ( off_loc_wt - np.pi ) / (2 * np.pi) ) % 1 off_phs_deg = np.rad2deg( 2 * np.pi * (phs - 0.5) ) return off_phs_deg
[docs] def phase_offsets(self, method='root'): """Compute hotspot offset and phase offset from phase-curve parameters. Calculates two complementary measures of the offset of thermal features from expected locations: 1. **Hotspot offset**: The longitude (in degrees) where the brightness temperature maximum occurs, relative to the substellar point. 2. **Phase offset**: The orbital phase (in degrees relative to secondary eclipse) where the observed phase-curve flux peaks. These quantities help constrain heat transport and atmospheric dynamics by revealing where thermal features are located relative to the tidally-locked dayside. Parameters ---------- method : str, optional Root-finding method to use for identifying extrema. Options are ``'root'`` (default, uses ``scipy.optimize.root``) or ``'fsolve'`` (uses ``scipy.optimize.fsolve``). Default is ``'root'``. Returns ------- hotspot_off : ndarray Hotspot offset in degrees for each posterior sample. Shape: (N_samples,). Must be computed in the range [-180, 180] degrees. phase_off : ndarray Phase offset in degrees for each posterior sample. Shape: (N_samples,). Positive values indicate the phase-curve maximum occurs before the secondary eclipse; negative values indicate it occurs after. Notes ----- - Computation uses :meth:`_find_hotspot_off` and :meth:`_find_phase_off` for each posterior sample. - This method uses progress bars (tqdm) to show computation progress. - Results are not cached; re-computing will recalculate offsets from scratch. Examples -------- >>> hotspot_off, phase_off = invert.phase_offsets(method='root') >>> print(f"Median hotspot offset: {np.median(hotspot_off):.1f} degrees") >>> print(f"Median phase offset: {np.median(phase_off):.1f} degrees") """ ## This function calculates the phase offset: 1) phase offset of the observed phase curve, and ## 2) shift of the maximum of the temperature map from the substellar point (the "hotspot offset") # ---------- Calculating the hotspot offset and phase offset --------- hotspot_off = np.zeros( len(self.E) ) for i in tqdm(range(len(hotspot_off))): hotspot_off[i] = self._find_hotspot_off( self.A1[i], self.B1[i], self.A2[i], self.B2[i], method=method ) phase_off = np.zeros( len(self.E) ) for i in tqdm(range(len(phase_off))): phase_off[i] = self._find_phase_off( self.C1[i], self.D1[i], self.C2[i], self.D2[i], method=method ) return hotspot_off, phase_off
def _calculate_2d_temp_map(self, fpfs_2d, rprs_ratio): """Compute 2D temperature map from 2D flux distribution. Helper method that converts a 2D array of bandpass-normalized planet-to-star flux ratios (F_p/F_*) into a 2D brightness temperature map by fitting a blackbody spectrum to each grid point independently. Parameters ---------- fpfs_2d : ndarray 2D array of F_p/F_* values with shape (N_phi, N_theta) representing the flux distribution across longitude and latitude. rprs_ratio : float Planet-to-star radius ratio. Returns ------- Tmap : ndarray 2D array of brightness temperatures (in Kelvin) with same shape as ``fpfs_2d``. Grid points where ``fpfs_2d <= 0`` have temperature set to 0. Notes ----- - For each grid point with positive flux, fits a blackbody spectrum that best reproduces the observed bandpass-weighted flux. - Uses ``scipy.optimize.minimize`` with the method from ``self.method`` to find the temperature that minimizes chi-squared difference between observed and modeled flux. - Temperatures are computed independently per grid point. - Used internally by :meth:`temperature_map_distribution` and :meth:`median_temperature_map`. Examples -------- >>> Tmap = invert._calculate_2d_temp_map(fpfs_2d, rprs_ratio=0.1) >>> print(f"Max temperature: {np.nanmax(Tmap)} K") """ # This is helper function to calculate the 2D temperature map for a given 2D fp/f* map and rprs ratio ## Temperature map Tmap = np.zeros( (self.nphi, self.ntheta) ) for i in range(self.ntheta): for j in range(self.nphi): if fpfs_2d[j,i] > 0: fp_pl = fpfs_2d[j,i] * simpson(y=self.fl_star.value*self.trans_fun, x=self.wav_star.value) / (rprs_ratio**2) def func_to_minimize_new(x): planet_bb = planck_func(self.wav_star, x*u.K) planet_den = simpson(y=planet_bb.value*self.trans_fun, x=self.wav_star.value) chi2 = (fp_pl - planet_den)**2 return chi2 soln = minimize(fun=func_to_minimize_new, x0=1000., method=self.method) Tmap[j,i] = soln.x else: Tmap[j,i] = np.array([0.]) return Tmap
[docs] def temperature_map_distribution(self, nsamples=2000): """Compute 2D temperature map distribution across posterior samples. Builds a distribution of 2D temperature maps (shape: N_samples, N_phi, N_theta) by converting the posterior distribution of flux maps into temperature maps via brightness temperature fitting. Results are cached and can be re-used by other methods. This method randomly samples from the posterior distribution if more posterior samples exist than requested, improving computational efficiency while maintaining statistical representation. Parameters ---------- nsamples : int, optional Number of posterior samples to process (randomly selected from all available samples). Default is ``2000``. Returns ------- temp_map : ndarray 3D array of brightness temperature maps with shape (nsamples, N_phi, N_theta). Also stored as ``self.temp_map``. Notes ----- - Uses multiprocessing with ``self.nthreads`` processes for parallel computation of temperature maps. - Results are cached to ``{pout}/Temperature_map.npy`` and reloaded on subsequent calls. - Uses progress bars (tqdm) to show computation progress. - This is a computationally intensive operation and typically requires several minutes to hours for large grids. Examples -------- >>> temp_maps = invert.temperature_map_distribution(nsamples=1000) >>> print(temp_maps.shape) (1000, 100, 49) >>> # Get median map >>> temp_median = np.median(temp_maps, axis=0) """ # This function calculates the distribution of the temperature maps # Now, let's calculate the fp/f* map distribution fpfs_map = np.zeros( (len(self.E), self.nphi, self.ntheta) ) for integration in range( len(self.E) ): J_phi = self.A0[integration] + ( self.A1[integration] * np.cos(self.phi_ang) ) + ( self.B1[integration] * np.sin(self.phi_ang) )+\ ( self.A2[integration] * np.cos(2*self.phi_ang) ) + ( self.B2[integration] * np.sin(2*self.phi_ang) ) for th in range(self.ntheta): fpfs_map[integration, :, th] = np.sin(self.theta_ang[th] + np.pi/2) * J_phi * 0.75 # Selecting NSample samples from all samples fpfs_map_pos_samples = fpfs_map[np.random.choice(np.arange(fpfs_map.shape[0]), size=nsamples, replace=False), :, :] # Computing brightness temperature for the posterior of eclipse depth temp_map_path = Path(self.pout + '/Temperature_map.npy') if temp_map_path.exists(): print('>>> --- The temperature map distribution file already exists...') print(' Loading it...') self.temp_map = np.load(self.pout + '/Temperature_map.npy') else: if np.isscalar(self.rprs): rprs_arr = np.full(fpfs_map_pos_samples.shape, float(self.rprs)) else: rprs_arr = np.random.choice( self.rprs, size=nsamples, replace=False ) # prepare inputs for multiprocessing inputs = [(fpfs_map_pos_samples[i,:,:], rprs_arr[i]) for i in range( fpfs_map_pos_samples.shape[0] )] # use Pool with bound method; user's environment set_start_method('fork') so this is fine with multiprocessing.Pool(self.nthreads) as p: result_list = p.starmap(self._calculate_2d_temp_map, tqdm(inputs, total=nsamples), chunksize=self.nthreads) self.temp_map = np.array(result_list) # Saving the data np.save(self.pout + '/Temperature_map.npy', self.temp_map) return self.temp_map
[docs] def median_temperature_map(self, plot=False, cmap='plasma'): """Compute and optionally plot the median temperature map across posteriors. Computes the median of the flux distribution across all posterior samples, then converts it to a single 2D temperature map. This provides a robust point estimate of the planet's thermal structure. Parameters ---------- plot : bool, optional If ``True``, generate a Mollweide projection plot of the median temperature map. Default is ``False``. cmap : str, optional Colormap name (passed to matplotlib) for the plot. Default is ``'plasma'``. Returns ------- temp_map_median : ndarray 2D array of median brightness temperatures with shape (N_phi, N_theta). Also saved to ``{pout}/Median_Temperature_map.npy``. fig : matplotlib.figure.Figure or None Figure object if ``plot=True``, otherwise ``None``. ax : matplotlib.axes.Axes or None Axes object (Mollweide projection) if ``plot=True``, otherwise ``None``. cax : matplotlib.collections.QuadMesh or None Mesh container from pcolormesh if ``plot=True``, otherwise ``None``. Notes ----- - Results are cached to ``{pout}/Median_Temperature_map.npy`` and reloaded on subsequent calls. - If ``plot=True``, the temperature color scale is set to median ± 5σ (using MAD-based standard deviation) to highlight structure. - Uses a Mollweide projection to show the full planetary surface. Examples -------- >>> Tmap_median = invert.median_temperature_map(plot=True, cmap='viridis') >>> print(f"Max dayside temperature: {np.nanmax(Tmap_median)} K") """ # Simply load the file if it already exists temp_map_path = Path(self.pout + '/Median_Temperature_map.npy') if temp_map_path.exists(): print('>>> --- The median temperature map file already exists...') print(' Loading it...') temp_map_median = np.load(temp_map_path) else: # Now, let's calculate the fp/f* map distribution fpfs_map = np.zeros( (len(self.E), self.nphi, self.ntheta) ) for integration in range( len(self.E) ): J_phi = self.A0[integration] + ( self.A1[integration] * np.cos(self.phi_ang) ) + ( self.B1[integration] * np.sin(self.phi_ang) )+\ ( self.A2[integration] * np.cos(2*self.phi_ang) ) + ( self.B2[integration] * np.sin(2*self.phi_ang) ) for th in range(self.ntheta): fpfs_map[integration, :, th] = np.sin(self.theta_ang[th] + np.pi/2) * J_phi * 0.75 # Median fpfs_map fpfs_map_median = np.nanmedian(fpfs_map, axis=0) # Computing brightness temperature for the median fpfs map if np.isscalar(self.rprs): rprs_val = self.rprs else: rprs_val = np.nanmedian(self.rprs) temp_map_median = self._calculate_2d_temp_map(fpfs_map_median, rprs_val) np.save(self.pout + '/Median_Temperature_map.npy', temp_map_median) if plot: theta2d, phi2d = np.meshgrid(self.theta_ang, self.phi_ang) # Plot the temperature map fig = plt.figure() ax = fig.add_subplot(111, projection='mollweide') cax = ax.pcolormesh(phi2d, theta2d, temp_map_median, cmap=cmap) cax.set_clim([ np.nanmedian(temp_map_median) - 5*mad_std(temp_map_median), np.nanmedian(temp_map_median) + 5*mad_std(temp_map_median) ]) plt.colorbar(cax, label='T [K]') plt.tight_layout() return temp_map_median, fig, ax, cax else: return temp_map_median
def _compute_temp_along_longitude(self, fpfs, rprs_ratio): """Compute 1D temperature profile along longitude (i.e., for a given latitude such as equator). Helper method that converts a 1D array of bandpass-normalized planet-to-star flux ratios along any longitude into a 1D brightness temperature profile by fitting blackbody spectra. Parameters ---------- fpfs : ndarray 1D array of F_p/F_* values with shape (N_phi,) representing the flux distribution across longitude at a given latitude. rprs_ratio : float Planet-to-star radius ratio for this sample. Returns ------- Tmap : ndarray 1D array of brightness temperatures (in Kelvin) with shape (N_phi,). Grid points where ``fpfs <= 0`` have temperature set to 0. Notes ----- - Used internally by :meth:`equatorial_temp_map`. - Similar to :meth:`_calculate_2d_temp_map` but computes only along latitude θ = π/2 (equator). Examples -------- >>> Tmap_equator = invert._compute_temp_along_longitude(fpfs_1d, 0.1) """ ## This is a helper function to compute the temperature along the longitude for a given fp/f* map and rprs ratio # Temperature map Tmap = np.zeros(self.nphi) for j in range(self.nphi): if fpfs[j] > 0.: fp_pl = fpfs[j] * simpson(y=self.fl_star.value*self.trans_fun, x=self.wav_star.value) / (rprs_ratio**2) def func_to_minimize_new(x): planet_bb = planck_func(self.wav_star, x*u.K) planet_den = simpson(y=planet_bb.value*self.trans_fun, x=self.wav_star.value) chi2 = (fp_pl - planet_den)**2 return chi2 soln = minimize(fun=func_to_minimize_new, x0=1000., method=self.method) Tmap[j] = soln.x else: Tmap[j] = np.array([0.]) return Tmap
[docs] def equatorial_temp_map(self, plot=False, nsamples=2000): """Compute and optionally plot temperature profile along the equator. Calculates a distribution of 1D temperature profiles along the equator (θ = 0) across posterior samples. Useful for understanding longitudinal variations in brightness temperature at the substellar latitude. Parameters ---------- plot : bool, optional If ``True``, generate a plot showing the median equatorial temperature profile with individual posterior samples overplotted. Default is ``False``. nsamples : int, optional Number of posterior samples to process. Default is ``2000``. Returns ------- temp_map_equatorial : ndarray 2D array of brightness temperatures along the equator with shape (nsamples, N_phi). Also saved to ``{pout}/Equatorial_Temperature_map.npy``. fig : matplotlib.figure.Figure or None Figure object if ``plot=True``, otherwise ``None``. axs : matplotlib.axes.Axes or None Axes object if ``plot=True``, otherwise ``None``. Notes ----- - Uses multiprocessing to parallelize temperature calculations. - Results are cached and reloaded on subsequent calls. - Posterior samples are randomly selected if the total exceeds ``nsamples``. - When ``plot=True``, the plot shows 100 random posterior samples as light curves and the median as a heavy line. Examples -------- >>> Tmap_equator = invert.equatorial_temp_map(plot=True, nsamples=1000) >>> print(Tmap_equator.shape) (1000, 100) """ # This function calculates the temperature map along the equator (theta=0) and plots it if plot=True # Simply load the file if it already exists temp_map_path = Path(self.pout + '/Equatorial_Temperature_map.npy') if temp_map_path.exists(): print('>>> --- The equatorial temperature map file already exists...') print(' Loading it...') temp_map_equatorial = np.load(temp_map_path) else: # Now, let's calculate the fp/f* map distribution along the equator (theta=0) fpfs_equatorial = np.zeros( (len(self.E), self.nphi) ) for integration in range( len(self.E) ): J_phi = self.A0[integration] + ( self.A1[integration] * np.cos(self.phi_ang) ) + ( self.B1[integration] * np.sin(self.phi_ang) )+\ ( self.A2[integration] * np.cos(2*self.phi_ang) ) + ( self.B2[integration] * np.sin(2*self.phi_ang) ) fpfs_equatorial[integration, :] = np.sin(np.pi/2) * J_phi * 0.75 # Selecting NSample samples from all samples fpfs_equatorial_pos_samples = fpfs_equatorial[np.random.choice(np.arange(fpfs_equatorial.shape[0]), size=nsamples, replace=False), :] # Computing brightness temperature for the median fpfs map along the equator if np.isscalar(self.rprs): rprs_arr = np.full(fpfs_equatorial_pos_samples.shape, float(self.rprs)) else: rprs_arr = np.random.choice( self.rprs, size=nsamples, replace=False ) # prepare inputs for multiprocessing inputs = [(fpfs_equatorial_pos_samples[i,:], rprs_arr[i]) for i in range( fpfs_equatorial_pos_samples.shape[0] )] # use Pool with bound method; user's environment set_start_method('fork') so this is fine with multiprocessing.Pool(self.nthreads) as p: result_list = p.starmap(self._compute_temp_along_longitude, tqdm(inputs, total=nsamples), chunksize=self.nthreads) temp_map_equatorial = np.array(result_list) np.save(self.pout + '/Equatorial_Temperature_map.npy', temp_map_equatorial) if plot: fig, axs = plt.subplots() axs.plot(np.rad2deg(self.phi_ang), np.nanmedian(temp_map_equatorial, axis=0), color='navy', lw=2., zorder=100) for i in range(100): axs.plot(np.rad2deg(self.phi_ang), temp_map_equatorial[np.random.randint(0,temp_map_equatorial.shape[0]), :], alpha=0.5, color='orangered', lw=1., zorder=10) axs.set_xlim([-180., 180.]) axs.set_xlabel('Longitude [deg]') axs.set_ylabel('Temperature [K]') return temp_map_equatorial, fig, axs else: return temp_map_equatorial
def _phase_curve_forward_model(self, I_phi_theta, phi, theta, xi): """Compute phase-curve signal from a 2D flux distribution. Helper method that integrates a 2D flux distribution I(φ, θ) over the visible hemisphere as a function of orbital phase to predict the observed phase curve. Uses integration over visible longitude and latitude. Parameters ---------- I_phi_theta : ndarray 2D array of intensity map I(φ, θ) with shape (N_phi, N_theta). phi : ndarray 1D array of longitude values (in radians) from -π to π. theta : ndarray 1D array of latitude values (in radians) from -π/2 to π/2. xi : ndarray 1D array of orbital phase angles (in radians). ξ = 0 at occultation, ξ = ±π at transits. Returns ------- f_xi : ndarray Phase curve signal (F_p/F_* equivalent) for each orbital phase. Shape: (len(xi),). Notes ----- - Integrates only over the visible hemisphere for each phase angle. - Uses meridian longitude bounds that change with orbital phase: only pixels with -ξ - π/2 < φ < -ξ + π/2 are integrated. - Uses 2D trapezoidal integration from ``scipy.integrate.trapz2d``. - Used internally by :meth:`forward_phase_curve_model`. Examples -------- >>> xi = np.linspace(-np.pi, np.pi, 100) >>> fpcs = invert._phase_curve_forward_model(I_2d, phi_ang, theta_ang, xi) """ # Given the 2D flux distribution map, I(phi, theta), we will compute phase curve signal # as a function of xi (which is 0 at occultation and +/- pi at transits) # 2D angles theta2d, phi2d = np.meshgrid(theta, phi) f_xi = np.zeros(len(xi)) for i in range(len(xi)): # lower and upper limits on phi integration l1_phi, l2_phi = -xi[i] - np.pi/2, -xi[i] + np.pi/2 ## Both l1_phi and l2_phi must be within (-np.pi, np.pi) if l1_phi < -np.pi: l1_phi = l1_phi + (2 * np.pi) if l2_phi > np.pi: l2_phi = l2_phi - (2 * np.pi) idx_phi = np.zeros(len(phi), dtype=bool) if l2_phi > l1_phi: idx_phi[ (phi>l1_phi) & (phi<l2_phi) ] = True else: idx_phi[ (phi>l1_phi) | (phi<l2_phi) ] = True # And the integration f_xi[i] = trapz2d( I_phi_theta[idx_phi,:,None] * np.cos(phi2d[idx_phi,:,None] + xi[i]) * np.sin(theta2d[idx_phi,:,None]) * np.sin(theta2d[idx_phi,:,None]), phi[idx_phi], theta)[0] return f_xi
[docs] def forward_phase_curve_model(self, plot=False): """Compute forward-model phase curve from 2D flux distribution. Converts the median 2D flux map into a forward-modeled phase curve for comparison with the fitted phase-curve parameterization. Provides a self-consistency check between the temperature-map representation and the fitted phase-curve model. Parameters ---------- plot : bool, optional If ``True``, generate a plot overlaying the forward model on the fitted parameterized phase curve. Default is ``False``. Returns ------- FpFs_forward_model : ndarray Forward-model phase curve values (F_p/F_*) sampled at 1000 orbital phase angles from -π to π. Also returned when ``plot=True``. fig : matplotlib.figure.Figure or None Figure object if ``plot=True``, otherwise ``None``. axs : matplotlib.axes.Axes or None Axes object if ``plot=True``, otherwise ``None``. Notes ----- - Uses the median of the flux map distribution across all posterior samples. - Fitted phase curve is computed from posterior median values of C1, D1, C2, D2. - Plots both curves for visual comparison of model consistency. Examples -------- >>> FpFs_fwd = invert.forward_phase_curve_model(plot=True) >>> print(f"Max forward model flux: {np.max(FpFs_fwd):.4f}") """ # Computing the Fp/F* (i.e., I(phi, theta) ) map I_map = np.zeros( (len(self.E), self.nphi, self.ntheta) ) for integration in range( len(self.E) ): fpfs_ph = self.A0[integration] + (self.A1[integration] * np.cos(self.phi_ang)) + (self.B1[integration] * np.sin(self.phi_ang))+\ (self.A2[integration] * np.cos(2*self.phi_ang)) + (self.B2[integration] * np.sin(2*self.phi_ang)) for th in range(self.ntheta): I_map[integration, :, th] = np.sin(self.theta_ang[th] + np.pi/2) * fpfs_ph * 0.75 # Fitted phase curve model xi = 2 * np.pi * (np.linspace(0., 1., 1000) - 0.5) FpFs_fitted = np.nanmedian(self.E) + ( np.nanmedian(self.C1) * (np.cos( xi ) - 1.) ) + ( np.nanmedian(self.D1) * np.sin( xi ) ) +\ ( np.nanmedian(self.C2) * (np.cos( 2*xi) - 1.) ) + ( np.nanmedian(self.D2) * np.sin( 2*xi ) ) FpFs_forward_model = self._phase_curve_forward_model(I_phi_theta=np.nanmedian(I_map, axis=0), phi=self.phi_ang, theta=self.theta_ang + np.pi/2, xi=xi) if plot: fig, axs = plt.subplots() axs.plot(xi, FpFs_forward_model, color='navy', lw=1.5, label='Forward model', zorder=100) axs.plot(xi, FpFs_fitted, color='orangered', lw=1.5, label='Fitted model', zorder=100) axs.set_xlabel(r'$\xi$ [rad]') axs.set_ylabel(r'$F_p/F_*$') axs.legend() return FpFs_forward_model, fig, axs else: return FpFs_forward_model
def _albedo_eps_tdaynight(self, temp_map, a_by_Rst): """Compute Bond albedo and heat redistribution from a 2D temperature map. Helper method that computes Bond albedo, dayside/nightside brightness temperatures, and heat redistribution efficiency (using two different methods) from a single 2D temperature map. Parameters ---------- temp_map : ndarray 2D array of brightness temperatures with shape (N_phi, N_theta). a_by_Rst : float Semi-major axis in units of stellar radius (a/R_*). Returns ------- A_Bond : float Bond albedo computed from total planetary flux vs stellar flux. eps_kelp : float Heat redistribution efficiency using the Kelp method (nightside/dayside flux ratio). Tday : float Dayside brightness temperature (integrated over dayside). Tnight : float Nightside brightness temperature (integrated over nightside). eps_keating : float Heat redistribution efficiency using Keating et al. (2019) method (nightside flux / total flux). Notes ----- - Uses Stefan-Boltzmann law to relate temperature to thermal radiation. - Dayside defined as |φ| < π/2; nightside defined as |φ| > π/2. - Uses 2D trapezoidal integration. - Original kelp-based code adapted for this implementation. References ---------- - Keating, D., Cowan, N. B., & Dang, L. 2019. Uniformly hot nightside temperatures on short-period gas giants. Nature Astronomy, 3, 1092. - Original kelp package: https://github.com/bmorris3/kelp/blob/219a922849634d9e982cd7bd05910596dea2ef6e/kelp/core.py#L431 """ # This function takes temperature map and theta (latitude) and phi (longitude) # angles to compute the Bond albedo and heat re-distribution efficiency. # Much of the code is copied from kelp package: https://github.com/bmorris3/kelp/blob/219a922849634d9e982cd7bd05910596dea2ef6e/kelp/core.py#L431 # First creating meshgrid of theta and phi theta2d, phi2d = np.meshgrid(self.theta_ang + np.pi/2, self.phi_ang) # --------------------------- # To compute Bond albedo # --------------------------- # Now computing the planetary flux from temperature map # Kelp added an addition condition here: to integrate only over positive values of phi # I think that is because kelp has phi run from -2*pi to 2*pi, i.e., they have "two dayside" and "two nightside" # To compensate for this, they only integrate over positive values of phi # However, I chose phi from -pi to pi -- so, we don't need this extra condition fl_pl_total = trapz2d( temp_map[...,None]**4 * np.sin(theta2d[...,None]), self.phi_ang, self.theta_ang + np.pi/2 )[0] # Computing the flux from star fl_star = np.pi * (self.teff_star.value ** 4) # Computing the Bond albedo A_Bond = 1 - ( (a_by_Rst**2) * fl_pl_total / fl_star) # ----------------------------------- # To compute epsilon (kelp method) # ----------------------------------- # # kelp computes epsilon by taking ratio of nightside flux to the dayside flux mask_day = np.abs(phi2d) < np.pi/2 mask_night = np.abs(phi2d) > np.pi/2 fl_day = np.sum( (temp_map * mask_day)**4 ) / np.sum(mask_day) fl_night = np.sum( (temp_map * mask_night)**4 ) / np.sum(mask_night) eps_kelp = fl_night / fl_day # ----------------------------------------- # To compute T_day, T_night and epsilon # (Keating, Cowan & Dang 2019) # ----------------------------------------- # This method is slightly different -- they compute heat redistribution # efficiency as ratio of heat irradiated by nightside vs heat irradiated # by the whole planet # # Further, they compute dayside and nightside temperature by integrating temperatures # on the dayside and nightside, respectively. # Indices for the dayside and the nightside idx_day = np.abs(self.phi_ang) < np.pi/2 idx_night = np.abs(self.phi_ang) > np.pi/2 #Tday = ( (0.5 / np.pi) * trapz2d( temp_map[...,None]**4 * np.sin(theta2d[...,None] ) * (np.abs(phi2d[...,None]) < np.pi/2 ), phi, theta )[0] ) ** 0.25 #Tnight = ( (0.5 / np.pi) * trapz2d( temp_map[...,None]**4 * np.sin(theta2d[...,None] ) * (np.abs(phi2d[...,None]) > np.pi/2 ), phi, theta )[0] ) ** 0.25 Tday = ( (0.5 / np.pi) * trapz2d( temp_map[idx_day, :, None]**4 * np.sin(theta2d[idx_day, :, None] ), self.phi_ang[idx_day], self.theta_ang + np.pi/2 )[0] ) ** 0.25 Tnight = ( (0.5 / np.pi) * trapz2d( temp_map[idx_night, :, None]**4 * np.sin(theta2d[idx_night, :, None] ), self.phi_ang[idx_night], self.theta_ang + np.pi/2 )[0] ) ** 0.25 #fl_pl_night = trapz2d( temp_map[...,None]**4 * np.sin(theta2d[...,None] ) * (np.abs(phi2d[...,None]) > np.pi/2 ), phi, theta )[0] fl_pl_night = trapz2d( temp_map[idx_night, :, None]**4 * np.sin(theta2d[idx_night, :, None] ), self.phi_ang[idx_night], self.theta_ang + np.pi/2 )[0] eps_keating = fl_pl_night / fl_pl_total return A_Bond, eps_kelp, Tday, Tnight, eps_keating def _albedo_eps_fpfs(self, fpfs_map, a_by_Rst): """Compute Bond albedo and heat redistribution from a 2D flux map. Helper method that computes Bond albedo and heat redistribution efficiency (using two different methods) directly from a 2D F_p/F_* map, without requiring intermediate temperature conversion. Parameters ---------- fpfs_map : ndarray 2D array of F_p/F_* values with shape (N_phi, N_theta). a_by_Rst : float Semi-major axis in units of stellar radius (a/R_*). Returns ------- A_Bond_fpfs : float Bond albedo computed as 1 - (a/R*)^2 * integrated(F_p/F_*) / π. eps_kelp : float Heat redistribution efficiency (Kelp method): nightside/dayside flux ratio. eps_keating : float Heat redistribution efficiency (Keating et al. 2019 method): nightside flux / total flux. Notes ----- - Similar approach to :meth:`_albedo_eps_tdaynight` but operates on flux map rather than temperature map. - Dayside defined as |φ| < π/2; nightside defined as |φ| > π/2. - Uses 2D trapezoidal integration. - Original kelp-based code adapted for this implementation. References ---------- - Keating, D., Cowan, N. B., & Dang, L. 2019. Uniformly hot nightside temperatures on short-period gas giants. Nature Astronomy, 3, 1092. - Original kelp package: https://github.com/bmorris3/kelp/blob/219a922849634d9e982cd7bd05910596dea2ef6e/kelp/core.py#L431 """ # This function takes occultation depth map and theta (latitude) and phi (longitude) # angles to compute the Bond albedo, heat re-distribution efficiency, and average dayside and nightside temperaturse. # Much of the code is copied from kelp package: https://github.com/bmorris3/kelp/blob/219a922849634d9e982cd7bd05910596dea2ef6e/kelp/core.py#L431 # First creating meshgrid of theta and phi theta2d, phi2d = np.meshgrid(self.theta_ang + np.pi/2, self.phi_ang) # ----------------------------------- # To compute Bond albedo: method 1 # ----------------------------------- # We can directly compute the ratio of planetary to stellar flux since we already have Fp/F* # We can simply integrate over Fp/F* fl_fs_total = trapz2d( fpfs_map[...,None] * np.sin(theta2d[...,None]), self.phi_ang, self.theta_ang + np.pi/2 )[0] # Computing the Bond albedo A_Bond_fpfs = 1 - ( (a_by_Rst**2) * fl_fs_total / np.pi ) # ----------------------------------- # To compute epsilon (kelp method) # ----------------------------------- # # kelp computes epsilon by taking ratio of nightside flux to the dayside flux mask_day = np.abs(phi2d) < np.pi/2 mask_night = np.abs(phi2d) > np.pi/2 fl_day = np.sum( (fpfs_map * mask_day) ) / np.sum(mask_day) fl_night = np.sum( (fpfs_map * mask_night) ) / np.sum(mask_night) eps_kelp = fl_night / fl_day # ----------------------------------------- # epsilon (Keating, Cowan & Dang 2019) # ----------------------------------------- # This method is slightly different -- they compute heat redistribution # efficiency as ratio of heat irradiated by nightside vs heat irradiated # by the whole planet idx_night = np.abs(self.phi_ang) > np.pi/2 #fl_pl_night = trapz2d( fpfs_map[...,None] * np.sin(theta2d[...,None] ) * (np.abs(phi2d[...,None]) > np.pi/2 ), phi, theta )[0] fl_pl_night = trapz2d( fpfs_map[idx_night, :, None] * np.sin(theta2d[idx_night, :, None] ), self.phi_ang[idx_night], self.theta_ang + np.pi/2 )[0] eps_keating = fl_pl_night / fl_fs_total return A_Bond_fpfs, eps_kelp, eps_keating def _albedo_kempton23(self, fpfs_phs, rst, a_by_Rst, teq, rprs): """Compute Bond albedo using Kempton et al. (2023) method. Helper method that calculates Bond albedo from eclipse depth using the energy-balance method of Kempton et al. (2023), accounting for the fraction of planetary flux emitted in the bandpass and an anisotropy factor. Parameters ---------- fpfs_phs : float Average measured F_p/F_* in the bandpass (from eclipse or phase curve). rst : float Stellar radius (in solar radii or consistent units). a_by_Rst : float Semi-major axis in units of stellar radius. teq : float Equilibrium temperature of the planet (in Kelvin). rprs : float Planet-to-star radius ratio. Returns ------- A_Bond_fpfs : float Bond albedo computed from energy balance accounting for bandpass-dependent flux emission and anisotropy. Notes ----- - Assumes the measured F_p/F_* represents the hemisphere-averaged thermal emission. - Uses an anisotropy factor of 1.08 (typical for tidally-locked planets). - The method accounts for the fact that only a fraction of the planet's total blackbody radiation falls within the instrument bandpass. - Original code adapted from Kempton et al. (2023) implementation. References ---------- - Kempton, E. M. R., et al. 2023. A reflective, metal-rich atmosphere for GJ 1214b from its JWST phase curve, Nature, 620(7972), 67 """ # This function takes occultation depth to compute the Bond albedo. # Much of the code is copied from: https://zenodo.org/records/7703086#.ZAZk1dLMJhE # ----------------------------------- # To compute Bond albedo: method 1 # ----------------------------------- # Where the transmission function is non-zero idx1 = self.trans_fun > 0. # Power received fl_star = planck_func(lam=self.wav_star, temp=self.teff_star) stellar_flux = np.trapz( y=fl_star, x=self.wav_star ) power_received = np.pi * ( (rprs * rst)**2 ) * np.pi * ( 1 / a_by_Rst**2 ) * stellar_flux # Power radiated Fstar = np.trapz(y=fl_star[idx1], x=self.wav_star[idx1]) * np.pi * (rst ** 2) fp = np.mean( fpfs_phs * Fstar ) power_radiated = 4 * np.pi * fp # Fraction of planetary flux emitted in TESS bandpass fl_pl = planck_func(lam=self.wav_star, temp=teq) fraction = np.trapz(y=fl_pl[idx1], x=self.wav_star[idx1]) / np.trapz(y=fl_pl, x=self.wav_star) anisotropy = 1.08 # Computing the Bond albedo A_Bond_fpfs = 1 - ( power_radiated / power_received / fraction / anisotropy ) return A_Bond_fpfs
[docs] def albedo_eps_from_temp_map(self, a_by_Rst, uniform_nightside_temp=False, nsamples=2000): """Compute Bond albedo and heat redistribution from temperature map distribution. Calculates Bond albedo (A_Bond), heat redistribution efficiency (ε) using two methods (Kelp and Keating et al. 2019), and dayside/nightside brightness temperatures from the distribution of 2D temperature maps. Parameters ---------- a_by_Rst : float or ndarray Semi-major axis in units of stellar radius (a/R_*). If scalar, same value used for all samples; if array, sampled for each. uniform_nightside_temp : bool, optional If ``True``, replace zero nightside temperatures (from zero grid points) with the median of measured nightside temperatures for self-consistency. Default is ``False``. nsamples : int, optional Number of posterior samples to process when computing temperature maps. Default is ``2000``. Returns ------- A_Bond_all : ndarray Bond albedo for each posterior sample. Shape: (N_samples,). eps_kelp_all : ndarray Heat redistribution efficiency (Kelp method) for each sample. Shape: (N_samples,). Computed as nightside/dayside flux ratio. Tday_all : ndarray Dayside brightness temperature for each sample. Shape: (N_samples,). Tnight_all : ndarray Nightside brightness temperature for each sample. Shape: (N_samples,). eps_keating_all : ndarray Heat redistribution efficiency (Keating et al. 2019 method). Shape: (N_samples,). Computed as nightside flux / total flux. Notes ----- - Calls :meth:`temperature_map_distribution` to compute temperature maps if not already cached. - Prints posterior median and 68% credible intervals for all output parameters. - Uses :meth:`_albedo_eps_tdaynight` for computation per sample.= References ---------- - Keating, D., Cowan, N. B., & Dang, L. 2019. Uniformly hot nightside temperatures on short-period gas giants. Nature Astronomy, 3, 1092. Examples -------- >>> a_by_rst = 10.0 >>> A_B, eps_k, T_d, T_n, eps_keat = invert.albedo_eps_from_temp_map(a_by_rst) """ # This function calculates the Bond albedo and heat re-distribution efficiency from the temperature map distribution. # The heat re-distribution efficiency is calculated using both the kelp method and the Keating, Cowan & Dang (2019) method. # First, we need to calculate the temperature map itself self.temperature_map_distribution(nsamples=nsamples) # Replacing the zero night side temperature with uniform nightside temperature if uniform_nightside_temp: ## Angles theta2d, phi2d = np.meshgrid(self.theta_ang, self.phi_ang) ## Mask for nightside idx_night = np.abs(phi2d) > np.pi/2 ## Replacing all zeros on night-side with ## Mean of some temperatures self.avg_night_temp_all = np.zeros( self.temp_map.shape[0] ) for inte in tqdm(range(self.temp_map.shape[0])): ### Computing the average nightside temperatue temp_night = self.temp_map[inte,:,:]*idx_night temp_night[temp_night == 0.] = np.nan avg_night_temp = np.nanmedian(temp_night.flatten()) self.avg_night_temp_all[inte] = avg_night_temp ### Replacing zeros with average nightside temperature self.temp_map[inte, :, :][ self.temp_map[inte, :, :] == 0. ] = avg_night_temp if np.isscalar(a_by_Rst): a_by_Rst_val = a_by_Rst * np.ones(self.temp_map.shape[0]) else: a_by_Rst_val = np.random.choice(a_by_Rst, size=self.temp_map.shape[0], replace=False) # Now, we will compute the Bond albedo and heat re-distribution efficiency for each temperature map in the distribution A_Bond_all = np.zeros( self.temp_map.shape[0] ) eps_kelp_all = np.zeros( self.temp_map.shape[0] ) Tday_all = np.zeros( self.temp_map.shape[0] ) Tnight_all = np.zeros( self.temp_map.shape[0] ) eps_keating_all = np.zeros( self.temp_map.shape[0] ) for i in tqdm(range(self.temp_map.shape[0])): A_Bond_all[i], eps_kelp_all[i], Tday_all[i], Tnight_all[i], eps_keating_all[i] = \ self._albedo_eps_tdaynight(temp_map=self.temp_map[i,:,:], a_by_Rst=a_by_Rst_val[i] ) # Let's quickly print the median and 68 percentile confidence intervals for each of these parameters print('>>> --- Bond albedo: {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( A_Bond_all ), np.percentile( A_Bond_all, 84 ) - np.median( A_Bond_all ), np.median( A_Bond_all ) - np.percentile( A_Bond_all, 16 ) ) ) print('>>> --- Heat re-distribution efficiency (Kelp method): {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( eps_kelp_all ), np.percentile( eps_kelp_all, 84 ) - np.median( eps_kelp_all ), np.median( eps_kelp_all ) - np.percentile( eps_kelp_all, 16 ) ) ) print('>>> --- Dayside temperature: {:.3f} K (+{:.3f}/-{:.3f})'.format( np.median( Tday_all ), np.percentile( Tday_all, 84 ) - np.median( Tday_all ), np.median( Tday_all ) - np.percentile( Tday_all, 16 ) ) ) print('>>> --- Nightside temperature: {:.3f} K (+{:.3f}/-{:.3f})'.format( np.median( Tnight_all ), np.percentile( Tnight_all, 84 ) - np.median( Tnight_all ), np.median( Tnight_all ) - np.percentile( Tnight_all, 16 ) ) ) print('>>> --- Heat re-distribution efficiency (Keating, Cowan & Dang 2019 method): {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( eps_keating_all ), np.percentile( eps_keating_all, 84 ) - np.median( eps_keating_all ), np.median( eps_keating_all ) - np.percentile( eps_keating_all, 16 ) ) ) return A_Bond_all, eps_kelp_all, Tday_all, Tnight_all, eps_keating_all
[docs] def albedo_eps_from_fpfs_map(self, a_by_Rst): """Compute Bond albedo and heat redistribution from flux map distribution. Calculates Bond albedo and heat redistribution efficiency directly from the 2D distribution of F_p/F_* maps (without intermediate temperature conversion). Provides an alternative approach to :meth:`albedo_eps_from_temp_map` that does not require brightness temperature estimation. Parameters ---------- a_by_Rst : float or ndarray Semi-major axis in units of stellar radius (a/R_*). If scalar, same value used for all samples; if array, sampled for each. Returns ------- A_Bond_all : ndarray Bond albedo for each posterior sample. Shape: (N_samples,). eps_kelp_all : ndarray Heat redistribution efficiency (Kelp method). Shape: (N_samples,). eps_keating_all : ndarray Heat redistribution efficiency (Keating et al. 2019 method). Shape: (N_samples,). Notes ----- - Does not require calls to temperature-mapping routines. - Computes F_p/F_* maps directly from Fourier coefficients. - Prints posterior median and 68% credible intervals for all outputs. - Uses :meth:`_albedo_eps_fpfs` for computation per sample. Examples -------- >>> a_by_rst = 10.0 >>> A_B, eps_k, eps_keat = invert.albedo_eps_from_fpfs_map(a_by_rst) """ # This function calculates the Bond albedo and heat re-distribution efficiency from the fp/f* map distribution. # The heat re-distribution efficiency is calculated using both the kelp method and the Keating, Cowan & Dang (2019) method. # Now, let's calculate the fp/f* map distribution fpfs_map = np.zeros( (len(self.E), self.nphi, self.ntheta) ) for integration in range( len(self.E) ): J_phi = self.A0[integration] + ( self.A1[integration] * np.cos(self.phi_ang) ) + ( self.B1[integration] * np.sin(self.phi_ang) )+\ ( self.A2[integration] * np.cos(2*self.phi_ang) ) + ( self.B2[integration] * np.sin(2*self.phi_ang) ) for th in range(self.ntheta): fpfs_map[integration, :, th] = np.sin(self.theta_ang[th] + np.pi/2) * J_phi * 0.75 if np.isscalar(a_by_Rst): a_by_Rst_val = a_by_Rst * np.ones(fpfs_map.shape[0]) else: a_by_Rst_val = np.random.choice(a_by_Rst, size=fpfs_map.shape[0], replace=False) # Now, we will compute the Bond albedo and heat re-distribution efficiency for each fp/f* map in the distribution A_Bond_all = np.zeros( fpfs_map.shape[0] ) eps_kelp_all = np.zeros( fpfs_map.shape[0] ) eps_keating_all = np.zeros( fpfs_map.shape[0] ) for i in tqdm(range(fpfs_map.shape[0])): A_Bond_all[i], eps_kelp_all[i], eps_keating_all[i] = \ self._albedo_eps_fpfs( fpfs_map=fpfs_map[i,:,:], a_by_Rst=a_by_Rst_val[i] ) # Let's quickly print the median and 68 percentile confidence intervals for each of these parameters print('>>> --- Bond albedo: {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( A_Bond_all ), np.percentile( A_Bond_all, 84 ) - np.median( A_Bond_all ), np.median( A_Bond_all ) - np.percentile( A_Bond_all, 16 ) ) ) print('>>> --- Heat re-distribution efficiency (Kelp method): {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( eps_kelp_all ), np.percentile( eps_kelp_all, 84 ) - np.median( eps_kelp_all ), np.median( eps_kelp_all ) - np.percentile( eps_kelp_all, 16 ) ) ) print('>>> --- Heat re-distribution efficiency (Keating, Cowan & Dang 2019 method): {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( eps_keating_all ), np.percentile( eps_keating_all, 84 ) - np.median( eps_keating_all ), np.median( eps_keating_all ) - np.percentile( eps_keating_all, 16 ) ) ) return A_Bond_all, eps_kelp_all, eps_keating_all
[docs] def albedo_from_kempton23(self, a_by_Rst, rst, teq): """Compute Bond albedo using Kempton et al. (2023) method across posteriors. Calculates Bond albedo for each posterior sample using the energy-balance method of Kempton et al. (2023). This approach directly relates observed eclipse depths to Bond albedo accounting for bandpass-dependent radiation. Parameters ---------- a_by_Rst : float or ndarray Semi-major axis in units of stellar radius (a/R_*). If scalar, same value used for all samples; if array, sampled for each. rst : float or ndarray (must be astropy.units.Quantity) Stellar radius in units of solar radius (R_*). If scalar, same value used for all samples; if array, sampled for each. teq : float or ndarray (must be astropy.units.Quantity) Equilibrium temperature of the planet in Kelvin. If scalar, same value used for all samples; if array, sampled for each. Returns ------- A_Bond_all : ndarray Bond albedo for each posterior sample. Shape: (N_samples,). Printed to console as median ± 68% credible interval. Notes ----- - Uses :meth:`_albedo_kempton23` for computation per sample. - Requires ``rst`` (stellar radius) to be provided as an argument. - Prints posterior median and 68% credible intervals. References ---------- - Kempton, E. M. R., et al. 2023. A reflective, metal-rich atmosphere for GJ 1214b from its JWST phase curve, Nature, 620(7972), 67 Examples -------- >>> a_by_rst = 10.0 >>> rst = 1.0 * u.R_sun # Solar radius >>> teq = 1500 * u.K # Kelvin >>> A_Bond = invert.albedo_from_kempton23(a_by_rst, rst, teq) >>> print(f"Bond albedo: {np.median(A_Bond):.3f}") """ # This function calculates the Bond albedo using the method from Kempton et al. (2023) # First, we need to calculate the occultation depth map itself fpfs_map = np.zeros( (len(self.E), self.nphi) ) for integration in range( len(self.E) ): J_phi = self.A0[integration] + (self.A1[integration] * np.cos(self.phi_ang)) + (self.B1[integration] * np.sin(self.phi_ang))+\ (self.A2[integration] * np.cos(2*self.phi_ang)) + (self.B2[integration] * np.sin(2*self.phi_ang)) fpfs_map[integration, :] = np.sin(np.pi/2) * J_phi * 0.75 A_Bond_all = np.zeros( fpfs_map.shape[0] ) if np.isscalar(a_by_Rst): a_by_Rst_val = a_by_Rst * np.ones(fpfs_map.shape[0]) else: a_by_Rst_val = np.random.choice(a_by_Rst, size=fpfs_map.shape[0], replace=False) if np.isscalar(teq) or np.isscalar(teq.value): teq_val = teq * np.ones(fpfs_map.shape[0]) else: teq_val = np.random.choice(teq, size=fpfs_map.shape[0], replace=False) * u.K if np.isscalar(self.rprs): rprs_arr = np.full(fpfs_map.shape, float(self.rprs)) else: rprs_arr = np.random.choice(self.rprs, size=fpfs_map.shape[0], replace=False) if np.isscalar(rst) or np.isscalar(rst.value): rst = rst * np.ones(fpfs_map.shape[0]) else: rst = np.random.choice(rst, size=fpfs_map.shape[0], replace=False) * u.R_sun for i in tqdm(range(fpfs_map.shape[0])): A_Bond_all[i] = self._albedo_kempton23(fpfs_phs=fpfs_map[i,:], rst=rst[i], a_by_Rst=a_by_Rst_val[i], teq=teq_val[i], rprs=rprs_arr[i]) print('>>> --- Bond albedo: {:.3f} (+{:.3f}/-{:.3f})'.format( np.median( A_Bond_all ), np.percentile( A_Bond_all, 84 ) - np.median( A_Bond_all ), np.median( A_Bond_all ) - np.percentile( A_Bond_all, 16 ) ) ) return A_Bond_all
# --------------------------------------------------------------------------- # Module-level worker for SelectLinDetrend.select_optimal_parameters # --------------------------------------------------------------------------- # It lives at module scope so that concurrent.futures.ProcessPoolExecutor can # pickle it without ever needing to serialise the SelectLinDetrend instance # (which may hold un-picklable attributes such as a closure for 'priors'). def _lindetrend_worker(worker_args): (instruments, tim_s, fl_s, fle_s, gp_pars_s, lin_pars, cached_priors, out_dir, juliet_fit_kwargs, noise_method) = worker_args noise_mapping = {'astropy': mad_std, 'std': np.std, 'pipe': pipe_mad} noise_func = noise_mapping[noise_method] # Build full prior lists from cached base priors + linear regressor priors par_tot, dist_tot, hyper_tot = [], [], [] for ins in instruments: par_ins, dist_ins, hyper_ins = cached_priors[ins] par_tot += list(par_ins) + ['mdilution_' + ins, 'mflux_' + ins, 'sigma_w_' + ins] dist_tot += list(dist_ins) + ['fixed', 'normal', 'loguniform'] hyper_tot += list(hyper_ins) + [1.0, [0., 0.1], [0.1, 1e4]] if lin_pars is not None and lin_pars.get(ins) is not None: for pc in range(lin_pars[ins].shape[1]): par_tot.append('theta' + str(pc) + '_' + ins) dist_tot.append('uniform') hyper_tot.append([-1, 1]) priors = juliet.utils.generate_priors(par_tot, dist_tot, hyper_tot) data = juliet.load( priors=priors, t_lc=tim_s, y_lc=fl_s, yerr_lc=fle_s, linear_regressors_lc=lin_pars, GP_regressors_lc=gp_pars_s, out_folder=out_dir, ) res = data.fit(sampler='dynamic_dynesty', **juliet_fit_kwargs) mads_all = [] for ins in instruments: model_lc = res.lc.evaluate(instrument=ins) residuals = fl_s[ins] - model_lc mads_all.append(noise_func(residuals) * 1e6) return res.posteriors['lnZ'], mads_all
[docs] class SelectLinDetrend(object): """Select optimal linear detrending vectors using model selection with Bayesian evidence. This class provides tools to systematically identify the best combination of linear detrending regressors (e.g., time polynomials, roll angle modulation, background variations) for multi-instrument light-curve fitting. It employs model selection based on log-evidence improvements, starting from a base model with no linear regressors and iteratively adding vectors that maximize evidence gain. A regressor is accepted only if it improves log-evidence by at least 2 units; the search terminates when no remaining regressor meets this threshold. The class interfaces with the ``juliet`` package for Bayesian model fitting and supports parallel processing of model comparisons through concurrent fitting. It automatically handles multi-instrument datasets where different instruments may have different numbers and types of regressors available. Linear regressors can be passed directly or be generated from roll angle values via Fourier series expansion. All data (time, flux, errors, regressors) are expected as per-instrument dictionaries, enabling straightforward multi-instrument analysis. Parameters ---------- time : dict Time stamps for each instrument. Keys are instrument names; values are 1-D numpy arrays of time points (in BJD or similar). All arrays should have the same length as corresponding ``flux`` and ``flux_err`` arrays. flux : dict Flux (or normalized flux) measurements for each instrument. Keys are instrument names; values are 1-D numpy arrays of flux measurements. flux_err : dict Flux uncertainties for each instrument. Keys are instrument names; values are 1-D numpy arrays of flux uncertainties (same length as flux). priors : callable A function mapping an instrument name (keyword argument ``instrument``) to a tuple of three lists: - param_names : list of str Names of juliet-compatible model parameters (e.g., 't0_p1', 'p_p1', 'P_p1'). - distributions : list of str juliet-compatible distribution names (e.g., 'normal', 'uniform', 'loguniform'). - hyperparams : list Hyperparameter lists for each distribution. For example, a 'normal' distribution takes [mean, std], and a 'uniform' takes [lower, upper]. The function is called once per instrument during initialization and results are cached internally. linear_regressors : dict Linear regressors to consider for detrending. Keys are instrument names; values are dicts where each key is a regressor name (str) and value is a 1-D numpy array (same length as time for that instrument). Examples include 'time', 'time^2', 'background', 'systematics', etc. If ``roll_degree`` and ``roll`` are both provided, roll-angle Fourier series regressors ('ROLL_SIN_1', 'ROLL_COS_1', etc.) are automatically added to ``linear_regressors`` during initialization. gp_regressors : dict or None, optional Gaussian Process (GP) regressors to use for systematics fitting. Keys are instrument names; values are 1-D numpy arrays of regressor values (e.g., roll angle, time). If provided, these are sorted and passed to juliet for GP handling. Default is ``None``. roll_degree : int or None, optional Maximum degree of roll-angle Fourier series to include as linear regressors. If provided (and ``roll`` is also provided), Fourier series regressors up to degree ``roll_degree`` are automatically generated and added to ``linear_regressors``. Must be paired with ``roll`` parameter. Default is ``None``. roll : dict or None, optional Roll angle values (in degrees) for each instrument. Keys are instrument names; values are 1-D numpy arrays of roll angles. Required only if ``roll_degree`` is specified. Default is ``None``. noise_method : str, optional Method to compute noise/scatter in residuals. Options are: - 'astropy' : Uses median absolute deviation (astropy.stats.mad_std). - 'std' : Uses standard deviation (numpy.std). - 'pipe' : Uses the PIPE photometric uncertainty (pipe_mad function). This method is applied to residuals after each model fit to estimate per-instrument noise levels. Default is ``'astropy'``. juliet_fit_kwargs : dict, optional Additional keyword arguments to pass to ``juliet.fit()`` during model fits. For example, ``{'sampler': 'dynamic_dynesty', 'nlive': 500}``. Default is an empty dict ``{}``. nthreads : int, optional Total number of CPU threads available for fitting. Shared among parallel workers if ``n_parallel > 1`` is used in :meth:`select_optimal_parameters`. Default is the total number of available CPU cores. pout : str, optional Output directory for saving juliet fit results. Each model fit creates a subdirectory named after the included regressors (e.g., 'linsel_base', 'linsel_time', 'linsel_time_background'). Default is the current working directory. Attributes ---------- time, flux, flux_err : dict Stored input light-curve data (per-instrument). instruments : list of str Extracted list of instrument names from the keys of ``time`` dict. priors : callable Stored prior function. linear_regressors : dict Stored linear regressors (or regressors with added roll terms if ``roll_degree`` was provided). GP_regressors : dict or None Stored GP regressors. roll_degree, roll : int or None, dict or None Stored roll parameters. noise_method : str Stored noise method name. noise_func : callable Cached noise function (e.g., mad_std, np.std, rms, or pipe_mad). juliet_fit_kwargs : dict Stored juliet fitting kwargs. nthreads : int Stored thread count. pout : str Stored output directory. _sort_idx, _tim_s, _fl_s, _fle_s : dict Per-instrument sort indices and sorted data arrays (cached by :meth:`_precompute`). _gp_pars_s, _gp_pars_s_juliet : dict or None Cached GP regressor arrays (sorted, or None if not applicable). _cached_priors : dict Per-instrument prior tuples cached from the priors callable. Main Methods ---------- select_optimal_parameters(n_parallel=1) Execute forward selection to identify optimal linear detrending vectors based on log-evidence maximization. Notes ----- - All fitting is performed using the ``juliet`` Bayesian fitting package. - Log-evidence improvements are computed via ``juliet.fit()`` and returned as ``posteriors['lnZ']``. - The model selection algorithm is greedy: it selects the regressor with the maximum evidence gain at each step, then moves to the next round. - A regressor is accepted only if it improves log-evidence by >= delta_lnZ_threshold (default 2.0). This threshold reflects a Bayes factor of ~exp(2) ≈ 7.4, corresponding to "strong" evidence in Bayesian hypothesis testing. - When ``n_parallel > 1`` in :meth:`select_optimal_parameters`, concurrent fitting is performed using ``concurrent.futures.ProcessPoolExecutor``. To keep total thread usage under control, the per-worker nthreads budget is automatically adjusted as ``nthreads // n_parallel``. - Roll-angle Fourier series regressors (if generated) follow the naming convention 'ROLL_SIN_k' and 'ROLL_COS_k' for degree k = 1, 2, ..., roll_degree. - Regressors that do not exist for a particular instrument (e.g., a regressor available only for one camera in a multi-camera setup) are automatically skipped for that instrument during model construction. - All per-instrument data (time, flux, errors, regressors) are length-checked during juliet loading but not explicitly in __init__. Ensure consistency before instantiation. Examples -------- Basic forward selection with two instruments: >>> import numpy as np >>> time = { ... 'camera_1': np.linspace(2459000, 2459001, 200), ... 'camera_2': np.linspace(2459000, 2459001, 150), ... } >>> flux = { ... 'camera_1': 1.0 + 0.01 * np.random.randn(200), ... 'camera_2': 1.0 + 0.005 * np.random.randn(150), ... } >>> flux_err = { ... 'camera_1': np.full(200, 0.005), ... 'camera_2': np.full(150, 0.003), ... } >>> linear_regressors = { ... 'camera_1': { ... 'time': time['camera_1'], ... 'time^2': time['camera_1']**2, ... }, ... 'camera_2': { ... 'time': time['camera_2'], ... 'time^2': time['camera_2']**2, ... }, ... } >>> def priors_func(instrument): ... return (['t0_p1', 'p_p1', 'P_p1'], ... ['normal', 'normal', 'uniform'], ... [[2459000.5, 0.1], [3.5, 0.01], [0.01, 0.1]]) >>> >>> selector = SelectLinDetrend( ... time=time, ... flux=flux, ... flux_err=flux_err, ... priors=priors_func, ... linear_regressors=linear_regressors, ... pout='/path/to/results' ... ) >>> selected_regs, lnZ_hist, scatter_hist = selector.select_optimal_parameters(n_parallel=2) With roll-angle detrending: >>> roll_angles = { ... 'camera_1': np.random.uniform(0, 360, 200), ... 'camera_2': np.random.uniform(0, 360, 150), ... } >>> selector = SelectLinDetrend( ... time=time, ... flux=flux, ... flux_err=flux_err, ... priors=priors_func, ... linear_regressors=linear_regressors, ... roll_degree=3, ... roll=roll_angles, ... pout='/path/to/results' ... ) >>> # Now linear_regressors includes 'ROLL_SIN_1', 'ROLL_COS_1', ..., 'ROLL_SIN_3', 'ROLL_COS_3' >>> selected_regs, lnZ_hist, scatter_hist = selector.select_optimal_parameters() It is possible to use scatter in the residuals (instead of log-evidence) for model selection by setting selection_method='scatter' in select_optimal_parameters. In this case, the regressor that minimizes the scatter (as computed by noise_method) is selected at each step. >>> # Using scatter-based selection instead of log-evidence >>> selected_regs, lnZ_hist, scatter_hist = selector.select_optimal_parameters(selection_method='scatter') """ def __init__(self, time, flux, flux_err, priors, linear_regressors, gp_regressors=None, roll_degree=None, roll=None, noise_method='astropy', juliet_fit_kwargs={}, nthreads=multiprocessing.cpu_count(), pout=os.getcwd() ): # Time, flux, and flux_err are the light curve data. All of them are dict with keys of instrument name. self.time = time self.flux = flux self.flux_err = flux_err # Extracting an instrument list self.instruments = list( self.time.keys() ) # Priors; it is a function with only keyword is the name of the instrument. # It should return a tuple of lists, where the first list is the list of parameter names, second is the list of distributions, and the third is the hyperparameters for the distributions. self.priors = priors # linear_regressors is a dict where the keys are the names of the instruments, and values are again dict where keys # now are the name of the linear regressors (e.g., 'time', 'time^2', 'background', etc.) and values are numpy arrays of # the same length as time, flux, and flux_err. These arrays represent the values of the linear regressors at each time point. self.linear_regressors = linear_regressors # GP_regressors is a dict where the keys are the names of the instruments self.GP_regressors = gp_regressors # The roll_degree parameter indicates the maximum degree of the roll angle Fourier series to include in the linear model. self.roll_degree = roll_degree # If we are going to include the roll angle as a linear regressor, then we need to have the roll angle values for each time point. # It should be a dict where the keys are the names of the instruments and values are numpy arrays of the same length as # time, flux, and flux_err. These arrays represent the roll angle values at each time point. self.roll = roll if self.roll_degree is not None and self.roll is not None: # This means that we will be including the roll angle as a linear regressor, so we need to create the regressors for the roll angle modulation and include them in self.linear_regressors. # Let's adjust the self.linear_regressors to include the roll angle regressors if GP_roll is False self._roll_model_sine() elif ( self.roll_degree is None) or (self.roll is None): # This means that the user wants to include Roll angles as linear regressors but they forgot to provide either the roll_degree or the roll angle values. We will raise an error in this case. if (self.roll_degree is not None) and (self.roll is None): raise ValueError("roll_degree is provided but roll angle values are not provided. Please provide the roll angle values or set roll_degree to None.") elif (self.roll_degree is None) and (self.roll is not None): raise ValueError("roll angle values are provided but roll_degree is not provided. Please provide the roll_degree or set roll to None.") else: # Both roll_degree and roll are None, which means that we will be including the roll angle as a Gaussian process regressor, so we don't need to include it as a linear regressor. # We assume that the use have alreadey included the roll angle as a GP regressor in self.GP_regressors, so we don't need to do anything here. # Just a sanity check to make sure that the roll angle is included in the GP regressors. if self.GP_regressors is None: print("GP_regressors must be provided if roll angle modulation is desired to be fitted with a GP.") # Noise method to use for calculating the scatter in residuals noise_mapping = {'astropy': mad_std, 'std': np.std, 'pipe': pipe_mad, 'rms': rms} if noise_method not in noise_mapping: raise ValueError("Invalid noise method. Please choose from 'astropy', 'std', 'pipe', or 'rms'.") self.noise_method = noise_method # stored as string for worker pickling self.noise_func = noise_mapping[noise_method] # Finally, we will save any extra juliet fit kwargs to be used in the juliet fitting later. # This argument will be provided to juliet.fit() function when we fit the model to the data. self.juliet_fit_kwargs = juliet_fit_kwargs # Cores to use for juliet fitting self.nthreads = nthreads # Output directory to save the juliet fit results self.pout = pout # Pre-compute sort indices, sorted data arrays, and cached prior values # so that _fit_model and parallel workers never redo this work. self._precompute() def _precompute(self): # Cache per-instrument sort indices, pre-sorted data, GP regressors, and # prior data so this work is done exactly once rather than once per fit. self._sort_idx = {} self._tim_s = {} self._fl_s = {} self._fle_s = {} self._gp_pars_s = {} for ins in self.instruments: gp_reg = self.GP_regressors[ins] if self.GP_regressors is not None else None if gp_reg is not None: idx = np.argsort(gp_reg) self._gp_pars_s[ins] = gp_reg[idx] else: idx = np.arange(len(self.time[ins])) self._gp_pars_s[ins] = None self._sort_idx[ins] = idx self._tim_s[ins] = self.time[ins][idx] self._fl_s[ins] = self.flux[ins][idx] self._fle_s[ins] = self.flux_err[ins][idx] # Collapse gp_pars: None if all instruments lack GP regressors, # otherwise keep only the instruments that have them. if all(v is None for v in self._gp_pars_s.values()): self._gp_pars_s_juliet = None else: self._gp_pars_s_juliet = {k: v for k, v in self._gp_pars_s.items() if v is not None} # Cache prior data per instrument (lists/tuples of plain data → picklable) self._cached_priors = {} for ins in self.instruments: self._cached_priors[ins] = self.priors(instrument=ins) def _roll_model_sine(self): # This helper function creates the regressors for roll angle modulation for maximum degree of roll_degree. # Eventually the regressors are saved in self.linear_regressors with keys 'ROLL_SIN_1', 'ROLL_COS_1', 'ROLL_SIN_2', 'ROLL_COS_2', ..., 'ROLL_SIN_roll_degree', 'ROLL_COS_roll_degree'. for ins in range( len(self.instruments) ): for i in range(self.roll_degree): self.linear_regressors[self.instruments[ins]][f'ROLL_SIN_{i+1}'] = np.sin( (i+1) * np.radians( self.roll[self.instruments[ins]] ) ) self.linear_regressors[self.instruments[ins]][f'ROLL_COS_{i+1}'] = np.cos( (i+1) * np.radians( self.roll[self.instruments[ins]] ) ) def _build_lin_pars(self, lin_reg): """Apply cached sort indices to lin_reg and stack into juliet matrix form. Parameters ---------- lin_reg : dict {instrument: {regressor_name: array}} or {instrument: None} Returns ------- lin_pars : dict or None {instrument: 2-D ndarray (n_points x n_regressors)} ready for juliet, or None if every instrument has no regressors. """ lin_pars = {} for ins in self.instruments: if lin_reg[ins] is not None: # Sort by GP regressor order; use sorted() for deterministic column order arrs = [lin_reg[ins][k][self._sort_idx[ins]] for k in sorted(lin_reg[ins].keys())] lin_pars[ins] = np.transpose(np.vstack(arrs)) else: lin_pars[ins] = None if all(v is None for v in lin_pars.values()): return None return lin_pars def _fit_model(self, lin_reg, out_dir, nthreads=None): """Fit the total model using juliet. Uses pre-sorted data and cached prior values from _precompute() so no redundant sorting or prior evaluation occurs per call. Parameters ---------- lin_reg : dict {instrument: {regressor_name: array}} or {instrument: None} out_dir : str Output folder for juliet results. nthreads : int, optional Thread count passed to juliet. Defaults to self.nthreads. Returns ------- lnZ : float mads_all : list of float """ if nthreads is None: nthreads = self.nthreads lin_pars = self._build_lin_pars(lin_reg) # Build prior lists from cached per-instrument base priors par_tot, dist_tot, hyper_tot = [], [], [] for ins in self.instruments: par_ins, dist_ins, hyper_ins = self._cached_priors[ins] par_tot += list(par_ins) + ['mdilution_' + ins, 'mflux_' + ins, 'sigma_w_' + ins] dist_tot += list(dist_ins) + ['fixed', 'normal', 'loguniform'] hyper_tot += list(hyper_ins) + [1.0, [0., 0.1], [0.1, 1e4]] if lin_pars is not None and lin_pars.get(ins) is not None: for pc in range(lin_pars[ins].shape[1]): par_tot.append('theta' + str(pc) + '_' + ins) dist_tot.append('uniform') hyper_tot.append([-1, 1]) priors = juliet.utils.generate_priors(par_tot, dist_tot, hyper_tot) data = juliet.load( priors=priors, t_lc=self._tim_s, y_lc=self._fl_s, yerr_lc=self._fle_s, linear_regressors_lc=lin_pars, GP_regressors_lc=self._gp_pars_s_juliet, out_folder=out_dir, ) res = data.fit(sampler='dynamic_dynesty', nthreads=nthreads, **self.juliet_fit_kwargs) mads_all = [] for ins in self.instruments: model_lc = res.lc.evaluate(instrument=ins) residuals = self._fl_s[ins] - model_lc mads_all.append(self.noise_func(residuals) * 1e6) return res.posteriors['lnZ'], mads_all
[docs] def select_optimal_parameters(self, n_parallel=1, delta_lnZ_threshold=2.0, selection_method='lnZ', scatter_threshold=5.0): """ Select optimal linear detrending vectors using forward selection based on Bayesian evidence or light-curve scatter. The algorithm starts with a base model that has no linear regressors and iteratively adds the vector that yields the greatest improvement according to the chosen selection metric. The search stops when no remaining vector clears the acceptance threshold. Unique regressor names are collected across all instruments. When a regressor is selected it is included for every instrument that carries it; instruments that do not have that regressor are left unchanged. Parameters ---------- n_parallel : int, optional Number of model fits to run concurrently within each selection round. Default is 1 (sequential). When n_parallel > 1 the fits in the inner loop are dispatched to a ProcessPoolExecutor for true CPU parallelism (no GIL contention). The juliet nthreads budget is automatically divided by n_parallel so that the total thread count across all workers stays within self.nthreads. The module-level _lindetrend_worker function is used as the callable so that no part of this instance (including the potentially un-picklable priors callable) needs to be serialised. delta_lnZ_threshold : float, optional Minimum improvement in log-evidence required to accept a new regressor when selection_method='lnZ'. A candidate is added only if its lnZ exceeds the current base model's lnZ by at least this value. Default is 2.0, which corresponds to "strong evidence" on the Jeffreys scale. Ignored when selection_method='scatter'. selection_method : {'lnZ', 'scatter'}, optional Metric used to rank and accept candidate regressors. * ``'lnZ'`` (default) — select the regressor that maximises the Bayesian log-evidence improvement. Works for any number of instruments. * ``'scatter'`` — select the regressor that minimises the MAD scatter of the residuals for the single instrument. If more than one instrument is provided this option is not supported and the method automatically falls back to ``'lnZ'`` with a warning. scatter_threshold : float, optional Minimum reduction in scatter (in ppm) required to accept a new regressor when selection_method='scatter'. A candidate is accepted only if its residual scatter is at least this many ppm lower than the current base model's scatter. Default is 5.0 ppm. Ignored when selection_method='lnZ'. Returns ------- selected_regressors : dict Dict with instrument names as keys. Each value is either a dict of {regressor_name: array} for the selected regressors, or None if no regressor was selected for that instrument. lnZ_history : list of float Log-evidence at each accepted step (index 0 is the base model). scatter_history : list Scatter (MAD) values at each accepted step (index 0 is the base model). """ # Validate selection_method; fall back to 'lnZ' for multi-instrument scatter if selection_method not in ('lnZ', 'scatter'): raise ValueError( f"selection_method must be 'lnZ' or 'scatter', got {selection_method!r}" ) if selection_method == 'scatter' and len(self.instruments) > 1: import warnings warnings.warn( "selection_method='scatter' is not supported for multiple instruments; " "falling back to selection_method='lnZ'.", UserWarning, stacklevel=2, ) selection_method = 'lnZ' # Collect all unique regressor names across all instruments all_regressor_names = set() for ins in self.instruments: all_regressor_names.update(self.linear_regressors[ins].keys()) remaining_names = list(all_regressor_names) # --- Step 1: base model (no linear regressors) --- lin_reg_none = {ins: None for ins in self.instruments} base_out_dir = os.path.join(self.pout, 'linsel_base') print("Fitting base model (no linear regressors)...") base_lnZ, base_scatter = self._fit_model(lin_reg_none, base_out_dir) if selection_method == 'scatter': current_scatter_val = base_scatter[0] print(f" Base model lnZ = {base_lnZ:.2f}, scatter = {current_scatter_val:.2f} ppm") else: print(f" Base model lnZ = {base_lnZ:.2f}") selected_names = [] current_lnZ = base_lnZ lnZ_history = [base_lnZ] scatter_history = [base_scatter] # Stores every round's results for the final per-round summary table. all_rounds = [] # Per-worker thread budget so total threads ≤ self.nthreads nthreads_per_worker = max(1, self.nthreads // max(1, n_parallel)) # --- Step 2: forward selection --- while remaining_names: # Build candidates: (name, lin_pars, out_dir) # lin_pars is already sorted and stacked — ready to pickle / pass to juliet. candidates = [] for name in remaining_names: candidate_names = selected_names + [name] lin_reg = { ins: { reg: self.linear_regressors[ins][reg] for reg in candidate_names if reg in self.linear_regressors[ins] } or None for ins in self.instruments } # Resolve empty dicts to None lin_reg = {ins: (v if v else None) for ins, v in lin_reg.items()} lin_pars = self._build_lin_pars(lin_reg) out_dir = os.path.join( self.pout, 'linsel_' + '_'.join(sorted(candidate_names)) ) candidates.append((name, lin_pars, out_dir)) # Run fits — parallel (ProcessPoolExecutor) or sequential results = {} # name -> (lnZ, scatter) if n_parallel > 1: # juliet_fit_kwargs forwarded to each worker with per-worker nthreads worker_fit_kwargs = dict(self.juliet_fit_kwargs) worker_fit_kwargs['nthreads'] = nthreads_per_worker worker_args_list = [ ( self.instruments, self._tim_s, self._fl_s, self._fle_s, self._gp_pars_s_juliet, lin_pars, self._cached_priors, out_dir, worker_fit_kwargs, self.noise_method, ) for _, lin_pars, out_dir in candidates ] with concurrent.futures.ProcessPoolExecutor(max_workers=n_parallel) as executor: future_to_name = { executor.submit(_lindetrend_worker, args): name for (name, _, _), args in zip(candidates, worker_args_list) } for future in concurrent.futures.as_completed(future_to_name): name = future_to_name[future] lnZ, scatter = future.result() results[name] = (lnZ, scatter) if selection_method == 'scatter': print(f" Tested '{name}': lnZ = {lnZ:.2f}, scatter = {scatter[0]:.2f} ppm " f"(current base: {current_scatter_val:.2f} ppm)") else: print(f" Tested '{name}': lnZ = {lnZ:.2f} (current best: {current_lnZ:.2f})") else: for name, _, out_dir in candidates: candidate_names = selected_names + [name] lin_reg = { ins: { reg: self.linear_regressors[ins][reg] for reg in candidate_names if reg in self.linear_regressors[ins] } or None for ins in self.instruments } lin_reg = {ins: (v if v else None) for ins, v in lin_reg.items()} print(f" Testing '{name}': {candidate_names}") lnZ, scatter = self._fit_model(lin_reg, out_dir, nthreads=nthreads_per_worker) results[name] = (lnZ, scatter) if selection_method == 'scatter': print(f" lnZ = {lnZ:.2f}, scatter = {scatter[0]:.2f} ppm " f"(current base: {current_scatter_val:.2f} ppm)") else: print(f" lnZ = {lnZ:.2f} (current best: {current_lnZ:.2f})") # Record this round for the summary table. 'selected' is filled in below. base_scatter_val = current_scatter_val if selection_method == 'scatter' else None all_rounds.append({ 'round_num': len(all_rounds) + 1, 'base_set': list(selected_names), 'base_lnZ': current_lnZ, 'base_scatter_val': base_scatter_val, 'selection_method': selection_method, 'candidates': { name: { 'lnZ': lnZ, 'scatter': scatter, 'delta_lnZ': lnZ - current_lnZ, 'delta_scatter': (base_scatter_val - scatter[0]) if selection_method == 'scatter' else None, } for name, (lnZ, scatter) in results.items() }, 'selected': None, }) # Find best candidate and accept if threshold is met if selection_method == 'scatter': best_scatter_val = np.inf best_name = None best_lnZ = None best_scatter = None for name, (lnZ, scatter) in results.items(): if scatter[0] < best_scatter_val: best_scatter_val = scatter[0] best_name = name best_lnZ = lnZ best_scatter = scatter delta_scatter = current_scatter_val - best_scatter_val if delta_scatter >= scatter_threshold: print(f" Accepted '{best_name}' (scatter reduction = {delta_scatter:.2f} ppm)") all_rounds[-1]['selected'] = best_name selected_names.append(best_name) remaining_names.remove(best_name) current_scatter_val = best_scatter_val current_lnZ = best_lnZ lnZ_history.append(best_lnZ) scatter_history.append(best_scatter) else: print( f" No regressor reduced scatter by >= {scatter_threshold} ppm " f"(best reduction = {delta_scatter:.2f} ppm). Stopping." ) break else: # lnZ-based selection best_lnZ = -np.inf best_name = None best_scatter = None for name, (lnZ, scatter) in results.items(): if lnZ > best_lnZ: best_lnZ = lnZ best_name = name best_scatter = scatter if best_lnZ - current_lnZ >= delta_lnZ_threshold: print(f" Accepted '{best_name}' (delta lnZ = {best_lnZ - current_lnZ:.2f})") all_rounds[-1]['selected'] = best_name selected_names.append(best_name) remaining_names.remove(best_name) current_lnZ = best_lnZ lnZ_history.append(best_lnZ) scatter_history.append(best_scatter) else: print( f" No regressor improved lnZ by >= {delta_lnZ_threshold} " f"(best delta = {best_lnZ - current_lnZ:.2f}). Stopping." ) break # ── Final summary table ────────────────────────────────────────────────── col_name_w = max(len(n) for n in list(all_regressor_names) + ['[base model]']) + 4 col_lnz_w = 10 col_dlnz_w = 11 col_scat_w = 12 col_stat_w = 14 delta_col_label = 'delta scat' if selection_method == 'scatter' else 'delta lnZ' header_parts = [f"{'Regressor':<{col_name_w}}", f"{'lnZ':>{col_lnz_w}}", f"{delta_col_label:>{col_dlnz_w}}"] header_parts += [f"{(ins + ' (ppm)'):>{col_scat_w}}" for ins in self.instruments] header_parts += [f"{'Status':<{col_stat_w}}"] header = ' | '.join(header_parts) sep = '-+-'.join(['-' * col_name_w, '-' * col_lnz_w, '-' * col_dlnz_w] + ['-' * col_scat_w] * len(self.instruments) + ['-' * col_stat_w]) total_w = len(header) title = f'Linear Detrending Selection Summary [method: {selection_method}]' def _fmt_row(marker, name, lnZ, delta_str, scatter_list, status): label = f"{marker}{name}" row = [f"{label:<{col_name_w}}", f"{lnZ:>{col_lnz_w}.2f}", f"{delta_str:>{col_dlnz_w}}"] row += [f"{sc:>{col_scat_w}.2f}" for sc in scatter_list] row += [f"{status:<{col_stat_w}}"] return ' | '.join(row) # Build all lines first so they can be printed and saved in one pass lines = [] lines.append('\n' + '=' * total_w) lines.append(title.center(total_w)) lines.append('=' * total_w) lines.append(header) lines.append(sep) # Base model row lines.append(_fmt_row('', '[base model]', base_lnZ, '---', base_scatter, 'base model')) # One section per round for rd in all_rounds: base_set_str = '[' + ', '.join(rd['base_set']) + ']' if rd['base_set'] else 'none' lines.append(sep) if selection_method == 'scatter': lines.append( f" Round {rd['round_num']} " f"(base set: {base_set_str}, base scatter = {rd['base_scatter_val']:.2f} ppm)" ) else: lines.append( f" Round {rd['round_num']} " f"(base set: {base_set_str}, base lnZ = {rd['base_lnZ']:.2f})" ) lines.append(sep) # Selected candidate first; remainder sorted ascending scatter or descending lnZ if selection_method == 'scatter': ordered = sorted( rd['candidates'].keys(), key=lambda n: (n != rd['selected'], rd['candidates'][n]['scatter'][0]) ) else: ordered = sorted( rd['candidates'].keys(), key=lambda n: (n != rd['selected'], -rd['candidates'][n]['lnZ']) ) for name in ordered: r = rd['candidates'][name] if selection_method == 'scatter': dscat = r['delta_scatter'] delta_str = f"+{dscat:.2f}" if dscat >= 0 else f"{dscat:.2f}" else: dlnZ = r['delta_lnZ'] delta_str = f"+{dlnZ:.2f}" if dlnZ >= 0 else f"{dlnZ:.2f}" if name == rd['selected']: marker, status = '>> ', 'SELECTED' else: marker, status = ' ', 'not selected' lines.append(_fmt_row(marker, name, r['lnZ'], delta_str, r['scatter'], status)) lines.append('=' * total_w) # Print to stdout for line in lines: print(line) # Save to file summary_path = os.path.join(self.pout, 'linsel_summary.txt') with open(summary_path, 'w') as f: f.write('\n'.join(lines) + '\n') print(f" Summary saved to {summary_path}") # Build the final selected-regressors dict selected_regressors = {} for ins in self.instruments: reg_dict = { name: self.linear_regressors[ins][name] for name in selected_names if name in self.linear_regressors[ins] } selected_regressors[ins] = reg_dict if reg_dict else None return selected_regressors, lnZ_history, scatter_history