Source code for krithika.utils

import numpy as np
import matplotlib.pyplot as plt
from astropy.stats import mad_std
from scipy.optimize import minimize
from scipy.integrate import simpson
from scipy.signal import medfilt
from scipy.interpolate import interp1d
from astropy.timeseries import LombScargle
import astropy.constants as con
from tqdm import tqdm
import astropy.units as u
from pathlib import Path
import multiprocessing
import corner
import os

[docs] def t14(per, ar, rprs, b, ecc=0, omega=90, transit=True): """ To compute transit/eclipse duration from Period, a/R*, Rp/R*, b, eccentricity, and omega Parameters: ----------- per : float, or numpy.ndarray Orbital period (in days) of the planet aR : float, or numpy.ndarray Scaled semi-major axis, a/R* rprs : float, or numpy.ndarray Planet-to-star radius ratio, Rp/R* bb : float, or numpy.ndarray Impact parameter ecc: float, or numpy.ndarray Eccentricity of the orbit omega: float, or numpy.ndarray Argument of periastron (deg) transit: bool Whether to compute transit or occultation duration ----------- return ----------- t14 : float, or numpy.ndarray Transit/occultation duration, in days """ inc_rad = np.radians( b_to_inc(b=b, ar=ar, ecc=ecc, omega=omega) ) omega = np.radians(omega) # Computing inclination from impact parameter term1 = np.sqrt( (1 + rprs)**2 - b**2 ) t14 = ( per / np.pi ) * np.arcsin( term1 / ar / np.sin( inc_rad ) ) # Factor accounting for eccentric orbit if transit: fac = np.sqrt( 1 - ecc**2 ) / ( 1 + (ecc*np.sin(omega)) ) else: fac = np.sqrt( 1 - ecc**2 ) / ( 1 - (ecc*np.sin(omega)) ) return t14 * fac
[docs] def b_to_inc(b, ar, ecc=0., omega=90., transit=True): """Computing inclination from impact parameter""" # First converting all angles to radians omega = np.radians(omega) if transit: cosi = b * ( 1 + ( ecc * np.sin(omega) ) ) / ar / (1 - ecc**2) else: cosi = b * ( 1 - ( ecc * np.sin(omega) ) ) / ar / (1 - ecc**2) return np.rad2deg( np.arccos( cosi ) )
[docs] def inc_to_b(inc, ar, ecc=0., omega=90., transit=True): """To compute impact parameter from inclination""" # First convert all angles to radians inc, omega = np.radians(inc), np.radians(omega) if transit: bb = ar * np.cos(inc) * (1 - ecc**2) / ( 1 + ( ecc * np.sin(omega) ) ) else: bb = ar * np.cos(inc) * (1 - ecc**2) / ( 1 - ( ecc * np.sin(omega) ) ) return bb
[docs] def rho_to_ar(rho, per): """To compute a/R* from rho""" G = con.G.value a = ((rho * G * ((per * 24. * 3600.)**2)) / (3. * np.pi))**(1. / 3.) return a
[docs] def pipe_mad(x, axis=0): return np.nanmedian(np.abs(np.diff(x, axis=axis)), axis=axis)
def lcbin(time, flux, binwidth=0.06859, nmin=4, time0=None, robust=False, tmid=False): """ ------ A function from pycheops ------- Calculate average flux and error in time bins of equal width. The default bin width is equivalent to one CHEOPS orbit in units of days. To avoid binning data on either side of the gaps in the light curve due to the CHEOPS orbit, the algorithm searches for the largest gap in the data shorter than binwidth and places the bin edges so that they fall at the centre of this gap. This behaviour can be avoided by setting a value for the parameter time0. The time values for the output bins can be either the average time value of the input points or, if tmid is True, the centre of the time bin. If robust is True, the output bin values are the median of the flux values of the bin and the standard error is estimated from their mean absolute deviation. Otherwise, the mean and standard deviation are used. The output values are as follows. * t_bin - average time of binned data points or centre of time bin. * f_bin - mean or median of the input flux values. * e_bin - standard error of flux points in the bin. * n_bin - number of flux points in the bin. :param time: time :param flux: flux (or other quantity to be time-binned) :param binwidth: bin width in the same units as time :param nmin: minimum number of points for output bins :param time0: time value at the lower edge of one bin :param robust: use median and robust estimate of standard deviation :param tmid: return centre of time bins instead of mean time value :returns: t_bin, f_bin, e_bin, n_bin """ if time0 is None: tgap = (time[1:]+time[:-1])/2 gap = time[1:]-time[:-1] j = gap < binwidth gap = gap[j] tgap = tgap[j] time0 = tgap[np.argmax(gap)] time0 = time0 - binwidth*np.ceil((time0-min(time))/binwidth) n = int(1+np.ceil(np.ptp(time)/binwidth)) r = (time0,time0+n*binwidth) n_in_bin,bin_edges = np.histogram(time,bins=n,range=r) bin_indices = np.digitize(time,bin_edges) t_bin = np.zeros(n) f_bin = np.zeros(n) e_bin = np.zeros(n) n_bin = np.zeros(n, dtype=int) for i,n in enumerate(n_in_bin): if n >= nmin: j = bin_indices == i+1 n_bin[i] = n if tmid: t_bin[i] = (bin_edges[i]+bin_edges[i+1])/2 else: t_bin[i] = np.nanmean(time[j]) if robust: f_bin[i] = np.nanmedian(flux[j]) e_bin[i] = 1.25*np.nanmean(abs(flux[j] - f_bin[i]))/np.sqrt(n) else: f_bin[i] = np.nanmean(flux[j]) e_bin[i] = np.std(flux[j])/np.sqrt(n-1) j = (n_bin >= nmin) return t_bin[j], f_bin[j], e_bin[j], n_bin[j] def clip_outliers(flux, clip=5, width=11, verbose=True): """ Another function from pycheops. Courtesy of P. Maxted. Remove outliers from the light curve. Data more than clip*mad from a smoothed version of the light curve are removed where mad is the mean absolute deviation from the median-smoothed light curve. :param clip: tolerance on clipping :param width: width of window for median-smoothing filter :returns: time, flux, flux_err """ # medfilt pads the array to be filtered with zeros, so edge behaviour # is better if we filter flux-1 rather than flux. d = abs(medfilt(flux-1, width)+1-flux) mad = d.mean() ok = d < clip*mad if verbose: print('\nRejected {} points more than {:0.1f} x MAD = {:0.0f} ' 'ppm from the median'.format(sum(~ok),clip,1e6*mad*clip)) return ok
[docs] def rms(x): return np.sqrt( np.nanmean( (x - np.nanmean(x))**2 ) )
[docs] def fake_allan_deviation(times, residuals, binmax=10, method='pipe', timeunit=None, plot=True): """Estimate and optionally plot a noise-vs-bin-size curve (a proxy for Allan deviation). The function computes the scatter of binned residuals for a sequence of integer bin sizes and returns both the computed noise (in ppm) and a white-noise expectation for comparison. When ``plot=True`` a log-log figure is produced with a secondary x-axis that maps bin size to a time-unit (days/hours/minutes) according to the total span of ``times``. Parameters ---------- times : array-like Time stamps of the observations (assumed in days). residuals : array-like Residual fluxes (same length as ``times``). binmax : int, optional Parameter controlling the maximum number of bins. The function will evaluate binsizes from 1..int(len(times)/binmax). Default 10. method : {'pipe', 'std', 'rms', 'astropy'}, optional Method used to estimate the scatter of the binned residuals: - 'pipe' : use ``pipe_mad`` (median absolute differences estimator) - 'std' : use ``np.nanstd`` - 'rms' : use the root-mean-square (``rms`` helper) - 'astropy' : use ``astropy.stats.mad_std`` (robust MAD-based std) timeunit : {'d', 'hr', 'min'}, optional If provided, forces the time unit used in the secondary x-axis to days ('d'), hours ('hr'), or minutes ('min'). By default the function selects the most appropriate unit based on the total span of ``times``. plot : bool, optional If ``True`` (default), create and display a matplotlib figure with the computed noise curve and the white-noise expectation. Returns ------- fig : matplotlib.figure.Figure or None The created figure if ``plot=True``, otherwise ``None``. axs : matplotlib.axes.Axes or None The axes object for the figure if ``plot=True``, otherwise ``None``. binsize : ndarray Integer array of evaluated bin sizes. noise_ppm : ndarray Computed scatter of binned residuals converted to parts-per-million (ppm), corresponding to ``binsize``. white_noise_expec : ndarray White-noise expectation (ppm) computed as ``noise_func(residuals) / sqrt(binsize)`` (with a small-sample correction applied). Values for ``binsize==0`` will be ``inf`` or undefined. """ # Computing bin-size binsize = np.arange(1, int(len(times)/binmax), 1) # Array to save noise noise = np.zeros( len(binsize) ) # Number of data points in each bin nbin = np.zeros( len(binsize) ) # Choosing the method to compute noise if method == 'pipe': noise_func = pipe_mad elif method == 'std': noise_func = np.nanstd elif method == 'rms': noise_func = rms elif method == 'astropy': noise_func = lambda x: mad_std(x, ignore_nan=True) else: raise ValueError("Method should be one of 'pipe', 'std', 'rms', or 'astropy'.") # Computing binning for each binsize, and then computing the stddev of the binned residuals for i in range(len(binsize)): if binsize[i] != 1: _, binned_flux, _, _ = lcbin(time=times, flux=residuals, nmin=1, binwidth=binsize[i] * np.nanmedian(np.diff(times)) ) else: binned_flux= np.copy( residuals ) noise[i] = noise_func(binned_flux) nbin[i] = int( np.floor( len(residuals) / binsize[i] ) ) # Converting binsize to time units time_binsize = binsize * np.nanmedian( np.diff(times) ) # First, let's estimate the time units we need from times array: if timeunit is None: if np.ptp(time_binsize) >= 1.: ## If times duration is greater than 2 hours, we use days time_unit_multiplication_factor = 1. # Units are already in days time_unit_label = 'Time period [d]' elif ( np.ptp(time_binsize) > 5/24 ) and ( np.ptp(time_binsize) < 1. ): ## If times duration is greater than 2 hours, but less than 2 days, we use hours time_unit_multiplication_factor = 24. # Converting days to hours time_unit_label = 'Time period [hr]' else: ## If times duration is less than 2 hours, we use minutes time_unit_multiplication_factor = 24 * 60 time_unit_label = 'Time period [min]' else: if timeunit == 'd': time_unit_multiplication_factor = 1. time_unit_label = 'Time period [d]' elif timeunit == 'hr': time_unit_multiplication_factor = 24. time_unit_label = 'Time period [hr]' elif timeunit == 'min': time_unit_multiplication_factor = 24 * 60 time_unit_label = 'Time period [min]' else: raise ValueError("timeunit should be one of 'd', 'hr', or 'min'.") # The following two functions will convert binsize to time (in minutes) and vice versa def bin2time(binsize): return binsize * np.nanmedian(np.diff(times)) * time_unit_multiplication_factor def time2bin(bintime): return bintime / ( np.nanmedian(np.diff(times)) * time_unit_multiplication_factor ) # Computing the white-noise expectation white_noise_expec = noise_func(residuals)/np.sqrt(binsize) * 1e6 * np.sqrt(nbin / (nbin - 1.)) if plot: # ---------------------------------- # And generating the plot # ---------------------------------- fig, axs = plt.subplots() ## First plotting the computed noise axs.plot(binsize, noise * 1e6, color='dodgerblue', lw=1., label='Computed noise', zorder=10) ## Now plotting the white-noise expectation axs.plot(binsize, white_noise_expec, color='orangered', label='White-noise expectation') ## Adding secondary x-axis for time secax = axs.secondary_xaxis('top', functions=(bin2time, time2bin)) secax.set_xlabel(time_unit_label, labelpad=10) axs.set_xscale('log') axs.set_yscale('log') axs.set_xlabel('Bin size [Number of points]') axs.set_ylabel('Noise estimate [ppm]') axs.legend() else: fig, axs = None, None return fig, axs, binsize, noise*1e6, white_noise_expec
[docs] def corner_plot(samples, labels, **kwargs): """Generate a corner plot from MCMC samples. Parameters ---------- samples : ndarray 2D array of shape (n_samples, n_parameters) containing the MCMC samples. labels : list of str List of parameter names for labeling the axes. figsize : tuple, optional Size of the figure to create. Default is (9, 9). Returns ------- fig : matplotlib.figure.Figure The generated corner plot figure. """ samples = np.transpose( np.vstack( samples ) ) fig = corner.corner(samples, labels=labels, show_titles=True, title_fmt=".2f", quantiles=[0.16,0.5,0.84], **kwargs) return fig
[docs] def make_psd(times, flux, nos_freq_points=100000, plot=True, plot_max_freq=True, timeunit=None): """Compute a Lomb-Scargle power spectral density (PSD) for a light curve. The routine converts ``times`` (assumed in days) to seconds and the input ``flux`` to parts-per-million (ppm), computes a dense frequency grid, evaluates a Lomb-Scargle PSD on that grid and (optionally) produces a log-log plot with a secondary axis showing period in an appropriate time unit. Parameters ---------- times : array-like Time stamps of the observations (in days). flux : array-like Relative flux measurements (unitless, e.g. normalised to 1.0). The function converts these to ppm internally. nos_freq_points : int, optional Number of frequency points to evaluate the PSD on. Default is 100,000. plot : bool, optional If ``True`` (default) create and return a matplotlib figure and axes containing the PSD plot. If ``False``, no plot is created and ``fig`` and ``axs`` in the return tuple will be ``None``. plot_max_freq : bool, optional If ``True`` (default) mark the frequency with maximum power on the plot with a vertical dashed line. timeunit : {'d','hr','min'} or None, optional Force the secondary x-axis unit for the period conversion. If ``None`` (default) the function selects days/hours/minutes automatically based on the frequency grid span. Returns ------- freq_grid : astropy.units.Quantity (Hz) Frequency grid used to evaluate the PSD. psd1 : astropy.units.Quantity Lomb-Scargle power spectral density evaluated on ``freq_grid`` (units: ppm^2 Hz^-1). fig : matplotlib.figure.Figure or None Figure with the PSD plot when ``plot=True``, otherwise ``None``. axs : matplotlib.axes.Axes or None Axes for the PSD plot when ``plot=True``, otherwise ``None``. per_max_pow : astropy.units.Quantity Period corresponding to the maximum power in the PSD (converted to days). Notes ----- The function uses ``astropy.timeseries.LombScargle`` for the PSD and multiplies the input flux by 1e6 to report power in ppm^2 Hz^-1. """ # Computing time in units of days to seconds times = times * 24. * 60. * 60. * u.s # Converting flux to ppm fl1 = 1e6 * flux * u.dimensionless_unscaled ## Frequency grid min_freq, max_freq = 1 / np.ptp(times), 1 / np.nanmedian(np.diff(times)) * 0.5 freq_grid = np.linspace(min_freq, max_freq, nos_freq_points) ## And the Lomb-Scargle periodogram psd1 = LombScargle(times, fl1, normalization='psd').power(frequency=freq_grid) ## Computing the period corresponding to the highest power idx_max_pow = np.argmax(psd1.value) freq_max_pow = freq_grid[idx_max_pow] per_max_pow = ( 1/freq_max_pow ).to(u.day) # First, let's estimate the time units we need from times array: time_grid = (1 / freq_grid ).to(u.d).value if timeunit is None: if np.ptp(time_grid) >= 1.: ## If times duration is greater than 2 hours, we use days time_unit = u.d time_unit_label = 'Time [d]' elif ( np.ptp(time_grid) > 5/24 ) and ( np.ptp(time_grid) < 1. ): ## If times duration is greater than 2 hours, but less than 2 days, we use hours time_unit = u.hr # Converting days to hours time_unit_label = 'Time [hr]' else: ## If times duration is less than 2 hours, we use minutes time_unit = u.min time_unit_label = 'Time [min]' else: if timeunit == 'd': time_unit = u.d time_unit_label = 'Time [d]' elif timeunit == 'hr': time_unit = u.hr time_unit_label = 'Time [hr]' elif timeunit == 'min': time_unit = u.min time_unit_label = 'Time [min]' else: raise ValueError("timeunit should be one of 'd', 'hr', or 'min'.") if plot: # Making the plot: fig, axs = plt.subplots() ## Un-binned data axs.plot(freq_grid, psd1, alpha=1., color='orangered', lw=1., zorder=10) if plot_max_freq: axs.axvline(freq_max_pow.value, ls='--', c='cornflowerblue', lw=1., zorder=5) # Define properties to define upper axis as well: def freq2tim(x): x = np.where(x != 0, x, np.nan) * u.Hz return (1/x).to(time_unit).value def tim2freq(x): x = np.where(x != 0, x, np.nan) * u.s return (1/x).to(u.Hz).value ax2 = axs.secondary_xaxis("top", functions=(freq2tim, tim2freq)) ax2.tick_params(axis='both', which='major') ax2.set_xlabel(time_unit_label, labelpad=10) axs.set_xscale('log') axs.set_yscale('log') axs.set_xlabel(r'Frequency [Hz]') axs.set_ylabel(r'Power [ppm$^2$ Hz$^{-1}$]') axs.set_xlim([np.min(freq_grid.value), np.max(freq_grid.value)]) else: fig, axs = None, None return freq_grid, psd1, fig, axs, per_max_pow
def psd_fitting_kernel(times, flux, kernel, init_values, bounds=None, nos_freq_points=100000, plot=True, timeunit=None, **kwargs): """Fit a kernel PSD to the Lomb-Scargle periodogram of a light curve. The function computes the Lomb-Scargle PSD of the input light curve and fits the provided kernel PSD to it using ``scipy.optimize.minimize``. The kernel should be a function that takes frequency and kernel parameters as input and returns the corresponding PSD value. The fitting is done by minimizing the squared difference between the Lomb-Scargle PSD and the kernel PSD evaluated on the same frequency grid. Parameters ---------- times : array-like Time stamps of the observations (in days). flux : array-like Relative flux measurements (unitless, e.g. normalised to 1.0). kernel : callable A celerite.terms term that takes parameters as input and returns kernel instance. init_values : list or array-like Initial guess for the kernel parameters to be fitted. bounds : list of tuples, optional Bounds for the kernel parameters in the form [(min1, max1), (min2, max2), ...]. If ``None`` (default), no bounds are applied. nos_freq_points : int, optional Number of frequency points to evaluate the PSD on. Default is 100,000. plot : bool, optional If ``True`` (default), create and return a matplotlib figure and axes containing the PSD plot with the Lomb-Scargle PSD, the fitted kernel PSD, and the initial guess kernel PSD for comparison. If ``False``, no plot is created and ``fig`` and ``axs`` in the return tuple will be ``None``. timeunit : {'d','hr','min'} or None, optional Force the secondary x-axis unit for the period conversion. If ``None`` (default) the function selects days/hours/minutes automatically based on the frequency grid span. **kwargs Additional keyword arguments to pass to ``scipy.optimize.minimize`` for the optimization process (e.g. method, options, etc.). Returns ------- best_fit_params : ndarray Best-fit kernel parameters found by the optimization. kernel_psd_fit : ndarray Kernel PSD evaluated on the frequency grid using the best-fit parameters. freq_grid : astropy.units.Quantity (Hz) Frequency grid used to evaluate the PSD. psd_ls : astropy.units.Quantity Lomb-Scargle power spectral density evaluated on ``freq_grid`` (units: ppm^2 Hz^-1). fig : matplotlib.figure.Figure or None Figure with the PSD plot when ``plot=True``, otherwise ``None``. axs : matplotlib.axes.Axes or None Axes for the PSD plot when ``plot=True``, otherwise ``None``. """ # First, let's compute the Lomb-Scargle PSD using the previously defined function freq_grid, psd_ls, fig, axs, _ = make_psd(times=times, flux=flux * 1e-6, nos_freq_points=nos_freq_points, plot=plot, timeunit=timeunit) freq_grid = freq_grid.to(u.d**-1) psd_ls = psd_ls / len(times) # Now we define the objective function for optimization (squared difference between PSDs) def objective(params): kernel_instance = kernel(*params) kernel_psd = kernel_instance.get_psd(2*np.pi*freq_grid.value) / 2 / np.pi return np.sum( (psd_ls.value - kernel_psd)**2 ) # Performing the optimization to find best-fit parameters result = minimize(objective, x0=init_values, bounds=bounds, **kwargs) best_fit_params = result.x # Evaluating the kernel PSD with the best-fit parameters kernel_instance = kernel(*best_fit_params) kernel_psd_fit = kernel_instance.get_psd(2*np.pi*freq_grid.value) / 2 / np.pi if plot: # Plotting the fitted kernel PSD on top of the Lomb-Scargle PSD axs.plot(freq_grid.to(u.s**-1), kernel_psd_fit * len(times), color='navy', lw=1., label='Fitted Kernel PSD', zorder=20) # Also plotting the kernel PSD with initial guess parameters for comparison kernel_instance_init = kernel(*init_values) kernel_psd_init = kernel_instance_init.get_psd(2*np.pi*freq_grid.value) / 2 / np.pi axs.plot(freq_grid.to(u.s**-1), kernel_psd_init * len(times), color='gray', lw=1., ls='--', label='Initial Guess Kernel PSD') axs.legend() return best_fit_params, kernel_psd_fit, freq_grid, psd_ls, fig, axs def standarize_data(input_data): """ Standarize the dataset. The function originally written by N. Espinoza for TESS data analysis. """ output_data = np.copy(input_data) averages = np.nanmedian(input_data,axis=1) for i in range(len(averages)): sigma = mad_std(output_data[i,:]) output_data[i,:] = output_data[i,:] - averages[i] output_data[i,:] = output_data[i,:]/sigma return output_data def classic_PCA(Input_Data, standarize = True): """ classic_PCA function Description This function performs the classic Principal Component Analysis on a given dataset. The function originally written by N. Espinoza for TESS data analysis. """ if standarize: Data = standarize_data(Input_Data) else: Data = np.copy(Input_Data) eigenvectors_cols,eigenvalues,eigenvectors_rows = np.linalg.svd(np.cov(Data)) idx = eigenvalues.argsort() eigenvalues = eigenvalues[idx[::-1]] eigenvectors_cols = eigenvectors_cols[:,idx[::-1]] eigenvectors_rows = eigenvectors_rows[idx[::-1],:] # Return: V matrix, eigenvalues and the principal components. return eigenvectors_rows,eigenvalues,np.dot(eigenvectors_rows,Data)
[docs] def generate_times_with_gaps(times, efficiency): """This function takes the times array with a regular cadence and generate another times array with data gaps in it with a given efficiency to mimick CHEOPS observations. Efficiency here is the fraction of the actual time spent on the star. Parameters ---------- times : numpy.ndarray Regularly spaced time array efficiency : float Efficiency of the observations in per cent. Returns ------- tim : numpy.ndarray Time array with gaps in it. roll : numpy.ndarray The corresponding roll angle array. """ # Generating gappy times ## Computing total number of orbits in the data cheops_orbit_time_day = 98.77 / (60 * 24) orbit_nos = np.ptp(times) / cheops_orbit_time_day ## Generating roll numbers roll = np.linspace(0, orbit_nos*360, len(times)) + np.random.randint(0,360) roll = roll % 360 idx_rollsort = np.argsort(roll) roll_rollsort = roll[idx_rollsort] tim_rollsort = times[idx_rollsort] ## Selecting the range of roll-angles to discard st_roll = np.random.choice(roll_rollsort) # Starting roll number of discarded roll angles loc_st_roll = np.where(roll_rollsort == st_roll)[0][0] nos_discarded_pts = int((1 - (0.01 * efficiency)) * len(times)) # Total number of discarded points idx_discarded = np.ones(len(times), dtype=bool) if int(loc_st_roll+nos_discarded_pts) > len(times): # Roll over the starting roll numbers idx_discarded[loc_st_roll:] = False dis_pt = len(times) - loc_st_roll idx_discarded[0:nos_discarded_pts-dis_pt] = False else: idx_discarded[loc_st_roll:loc_st_roll + nos_discarded_pts] = False # Let's actually discard the points now tim = tim_rollsort[idx_discarded] roll = roll_rollsort[idx_discarded] # And sort them idx_timsort = np.argsort(tim) tim, roll = tim[idx_timsort], roll[idx_timsort] return tim, roll
def planck_func(lam, temp): """ Given the wavelength and temperature this function will compute the specific intensity using the Planck's law """ coeff1 = 2 * con.h * con.c * con.c / lam**5 expo = np.exp( con.h * con.c / lam / con.k_B / temp ) - 1 planck = (coeff1/expo).to(u.W / u.m**2 / u.micron) return planck
[docs] class BrightnessTemperatureCalculator: """Compute brightness temperature(s) from eclipse depths and a bandpass. Parameters ---------- fp : float or array-like Eclipse depths (planet/star flux ratio). If array-like, temperature distribution will be computed (multiprocessing is used in this case). rprs : float or array-like Planet-to-star radius ratio (Rp/R*). If scalar and fp is array-like, the same rprs is applied to all elements. bandpass : dict Bandpass dict with keys 'WAVE' (astropy Quantity) and 'RESPONSE' (array). nthreads : int, optional Number of worker processes for multiprocessing. Default is cpu count. method : str, optional Minimizer method passed to scipy.optimize.minimize. Default 'Nelder-Mead'. teff_star : float or None Stellar effective temperature in K (used if stellar_spec not provided). stellar_spec : dict or None Stellar spectrum dict with keys 'WAVE' and 'FLUX' (astropy Quantities). pout : str, optional Output directory used to cache results. Default current working dir. Methods ------- compute() Run the calculation and return temperature(s) in Kelvin (float or 1D ndarray). """ def __init__(self, fp, rprs, bandpass, nthreads=multiprocessing.cpu_count(), method='Nelder-Mead', teff_star=None, stellar_spec=None, pout=os.getcwd()): self.fp = fp self.rprs = rprs self.bandpass = bandpass self.nthreads = nthreads self.method = method self.teff_star = teff_star self.stellar_spec = stellar_spec self.pout = pout # prepare stellar spectrum if (self.stellar_spec is not None) and (self.teff_star is None): self.wav_star = self.stellar_spec['WAVE'].to(u.micron) self.fl_star = self.stellar_spec['FLUX'].to(u.J / u.s / u.m**2 / u.micron) elif (self.teff_star is not None) and (self.stellar_spec is None): self.wav_star = self.bandpass['WAVE'].to(u.micron) self.fl_star = planck_func(self.wav_star, self.teff_star) else: raise ValueError('Provide either stellar_spec or teff_star.') # bandpass self.wav_instrument = self.bandpass['WAVE'].to(u.micron) self.response_instrument = self.bandpass['RESPONSE'] # build transmission evaluated on stellar wavelengths spln2 = interp1d(x=self.wav_instrument.value, y=self.response_instrument, bounds_error=False, fill_value=0.0) trans_fun = spln2(self.wav_star.value) # guard against division by zero if response all zeros if np.max(trans_fun) > 0: trans_fun = trans_fun / np.max(trans_fun) self.trans_fun = trans_fun def _solve_single(self, ecl_dep, rprs_ratio): """Compute brightness temperature for a single eclipse depth.""" # scaled fp for integration comparison if ecl_dep > 0: fp_pl = ecl_dep * simpson(y=self.fl_star.value * self.trans_fun, x=self.wav_star.value) / (rprs_ratio**2) def func_to_minimize_new(x): planet_bb = planck_func(self.wav_star, x * u.K) planet_den = simpson(y=planet_bb.value * self.trans_fun, x=self.wav_star.value) chi2 = (fp_pl - planet_den)**2 return chi2 soln = minimize(fun=func_to_minimize_new, x0=1000.0, method=self.method) temp_pl = float(np.atleast_1d(soln.x)[0]) else: temp_pl = 0. # return scalar Kelvin return temp_pl
[docs] def compute(self): """Compute and return brightness temperature(s) in Kelvin. Returns ------- temp_pl : float or ndarray Brightness temperature (K) for scalar input or 1D ndarray for array input. """ fp = self.fp rprs = self.rprs # scalar path if isinstance(fp, (float, np.floating)) or (np.asarray(fp).size == 1): return self._solve_single(float(fp), float(rprs) if np.isscalar(rprs) else float(np.asarray(rprs).item())) # array path fp_arr = np.asarray(fp, dtype=float) if np.isscalar(rprs): rprs_arr = np.full(fp_arr.shape, float(rprs)) else: rprs_arr = np.asarray(rprs, dtype=float) cache_path = Path(self.pout) / 'Brightness_temp.npy' if cache_path.exists(): print('>>> --- Loading...') temp_pl = np.load(str(cache_path)) return temp_pl # prepare inputs for multiprocessing inputs = [(float(fp_arr[i]), float(rprs_arr[i])) for i in range(len(fp_arr))] # use Pool with bound method; user's environment set_start_method('fork') so this is fine with multiprocessing.Pool(self.nthreads) as p: result_list = p.starmap(self._solve_single, tqdm(inputs, total=len(fp)), chunksize=max(1, self.nthreads)) temp_pl = np.array(result_list, dtype=float) # save cache np.save(str(cache_path), temp_pl) return temp_pl
def trapz2d(z, x, y): """ Helper function to perform 2D trapezoidal integration. Integrates a regularly spaced 2D grid using the composite trapezium rule. Source: https://github.com/tiagopereira/python_tips/blob/master/code/trapz2d.py My sourse: I have copied from `kelp`: https://github.com/bmorris3/kelp/blob/219a922849634d9e982cd7bd05910596dea2ef6e/kelp/core.py#L15C1-L44C48 Parameters ---------- z : `~numpy.ndarray` 2D array x : `~numpy.ndarray` grid values for x (1D array) y : `~numpy.ndarray` grid values for y (1D array) Returns ------- t : `~numpy.ndarray` Trapezoidal approximation to the integral under z """ m = z.shape[0] - 1 n = z.shape[1] - 1 dx = x[1] - x[0] dy = y[1] - y[0] s1 = z[0, 0, :] + z[m, 0, :] + z[0, n, :] + z[m, n, :] s2 = (np.sum(z[1:m, 0, :], axis=0) + np.sum(z[1:m, n, :], axis=0) + np.sum(z[0, 1:n, :], axis=0) + np.sum(z[m, 1:n, :], axis=0)) s3 = np.sum(np.sum(z[1:m, 1:n, :], axis=0), axis=0) return dx * dy * (s1 + 2 * s2 + 4 * s3) / 4 def autocorrelation_function(chain): """Compute the autocorrelation function for a given chain and lags. Parameters ---------- chain : ndarray Array containing the chain values. Returns ------- rhos : ndarray Autocorrelation values for each lag. """ mu, variance = np.mean(chain), np.var(chain) N = len(chain) rhos = np.zeros( len(chain) ) for i in range(N): X0 = chain[:N-i] - mu Xk = chain[i:] - mu rhoi = np.sum(X0*Xk)/variance rhos[i] = rhoi/(N-i) return rhos