Source code for ilex.frb

##===============================================##
##===============================================##
## Author: Tyson Dial
## Email: tdial@swin.edu.au
## Last Updated: 25/09/2023 
##
##
## 
## 
## Library of basic functions for analysing FRBs.
##
##===============================================##
##===============================================##
# imports
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import math
import os, sys
from copy import deepcopy
import inspect
from .pyfit import fit, _posterior
import yaml
from .frbutils import set_dynspec_plot_properties, save_frb_to_param_file
# import time 

## import utils ##
from .utils import (load_data, save_data, dict_get,
                    dict_init, dict_isall,
                    merge_dicts, dict_null, get_stk_from_datalist, load_param_file, 
                    set_plotstyle, fix_ds_freq_lims)

from .data import *

## import FRB stats ##
from .fitting import (fit_RMquad, fit_RMsynth, RM_QUfit, lorentz,
                     make_scatt_pulse_profile_func)

## import FRB params ##
from .par import FRB_params, FRB_metaparams

# ## import FRB htr functions ##
# from .htr import make_stokes

## import globals ##
from .globals import _G, c

## import plot functions ##
from .plot import (plot_RM, plot_PA, plot_stokes,      
                  plot_poincare_track, create_poincare_sphere, plot_data, _PLOT, plot_dynspec)

## import processing functions ##
from .logging import log, get_verbose, set_verbose, log_title
from .master_proc import master_proc_data
from .widths import *

# interactive module
from .interactive import ZapInteractive


    

##===============================================##
##                  FRB class                    ##
##===============================================##

[docs] class FRB: """ FRB class for Processing of ASKAP FRB data Parameters ---------- name: str Name of FRB RA: str Right acension position DEC: str Declination position MJD: float Modified Julian date in days DM: float Dispersion Measure bw: float Bandwidth cfreq: float Central Frequency t_crop: list Crop start and end phase in Time f_crop: list Crop start and end phase in Frequency tN: int Factor for averaging in Time fN: int Factor for averaging in Frequency t_lim: list Limits for FRB in Time f_lim: list Limits for FRB in Frequency t_lim_base: list Base limits of FRB in time (not including t_ref) f_lim_base: list Base limits of FRB in freq t_ref: float Reference zero-point in time RM: float Rotation Measure f0: float Reference Frequency pa0: float Position angle at reference frequency f0 zapchan: str string used for zapping channels, in format -> "850, 860, 870:900" \n each element seperated by a ',' is a seperate channel. If ':' is used, user can specify a range of values \n i.e. 870:900 -> from channel 870 to 900 inclusive of both. verbose: bool Enable verbose logging norm: str Type of normalisation \n [max] - normalise using maximum \n [absmax] - normalise using absolute maximum \n [None] - Skip normalisation terr_crop: list bounds for off-pulse region in time [min, max] [ms], default is None yaml_file: str parameter yaml file of FRB to load in, default is None Attributes ---------- par: FRB_params parameters for FRB this_par: FRB_params Current instance of 'par' prev_par: FRB_params Last instance of 'par' metapar: FRB_metaparams hold meta-parameters for FRB this_metapar: FRB_metaparams Current instance of 'metapar' prev_metapar: FRB_metaparams Last instance of 'metapar' ds: Dict Dictionary of loaded stokes dynamic spectra pol: Dict Dictionary of loaded Polarisation time series _t: Dict Dictionary of cropped stokes time series _f: Dict Dictionary of cropped stokes spectra _ds: Dict Dictionary of cropped stokes dynamic spectra _freq: np.ndarray Cropped Frequency array [MHz] _time: np.ndarray Cropped time array [ms] verbose: bool Enable logging savefig: bool Save all created figures to files pcol: str Color of text for logging empty: bool Variable used to initialise FRB instance and data loading plot_type: str type of plot \n [scatter] - scatter plot with error bars \n [lines] - line plot with error patches show_plots: bool If true, shows plots save_plots: bool If true, saves plots to file residuals: bool if true, a residual panel will appear when plotting a fit using pyfit, default is True plotPosterior: bool if true, will save posterior corner plot when fitting using bayesian method, default is True apply_tw: bool if true, apply time dependant weights when scrunching in time, i.e making spectra, default is True apply_fw: bool if true, apply freq dependant weights when scrunching in freq, i.e. making time profiles, default is True fitted_params: dict dictionary of fitted values, i.e. RM """ ## [ INITIALISE FRB ] ## def __init__(self, yaml_file = None, name: str = _G.p['name'], RA: str = _G.p['RA'], DEC: str = _G.p['DEC'], MJD: float = _G.p['MJD'], DM: float = _G.p['DM'], bw: int = _G.p['bw'], cfreq: float = _G.p['cfreq'], t_crop = None, f_crop = None, tN: int = 1, fN: int = 1, t_lim_base = _G.p['t_lim_base'], f_lim_base = _G.p['f_lim_base'], RM: float = _G.p['RM'], f0: float = _G.p['f0'], pa0: float = _G.p['pa0'], verbose: bool = _G.hp['verbose'], norm = _G.mp['norm'], dt: float = _G.p['dt'], df: float = _G.p['df'], zapchan: str = _G.mp['zapchan'], terr_crop = None, t_ref = _G.p['t_ref'] ): """ Create FRB instance """ self._yaml_file = yaml_file self.par = FRB_params(name = name, RA = RA, DEC = DEC, MJD = MJD, DM = DM, bw = bw, cfreq = cfreq, t_lim_base = t_lim_base, f_lim_base = f_lim_base, RM = RM, f0 = f0, pa0 = pa0, dt = dt, df = df, t_ref = t_ref) self.this_par = self.par.copy() self.prev_par = FRB_params(EMPTY = True) self.metapar = FRB_metaparams(t_crop = t_crop, f_crop = f_crop, terr_crop = terr_crop, tN = tN, fN = fN, norm = norm, zapchan = zapchan) if t_crop is None: self.metapar.t_crop = ["min", "max"] # crop of time axis if f_crop is None: self.metapar.f_crop = ["min", "max"] # crop of frequency axis self.this_metapar = self.metapar.copy() self.prev_metapar = FRB_metaparams(EMPTY = True) ## Create data containers self.ds = {} # container for Dynamic spectra self.pol = {} # container for polarisation time series data (X, Y) for S in "IQUV": self.ds[S] = None ## data instance containers self._t = {} # container to store time series data self._f = {} # container to store frequency spectra data self._ds = {} # container to store dynamic spectra data self._freq = {} # container to store baseband frequency data self._time = {} # container to store time samples # initilise data containers for S in "IQUVLP": self._t[S] = None self._t[f"{S}err"] = None self._f[S] = None self._f[f"{S}err"] = None for S in "IQUV": self._ds[S]= None self.empty = True # used to initialise FRB instance and data loading self.verbose = verbose # TODO: implement # set verbose set_verbose(self.verbose) self.pcol = 'cyan' # color for verbose printing self.plot_type = "scatter" # type of errorbar plot self.residuals = False # plot residuals when plotting fits self.plotPosterior = True # plot posterior corner plot when plotting fits self.save_plots = False self.show_plots = True self.crop_units = "physical" self.zap = False # if True, will treat arrays as zapped # weightings self.apply_tW = True # apply time weights self.apply_fW = True # apply freq weights self._isinstance = False # if data instance is valid self.fitted_params = {} # plotting stuff self.dynspec_cmap = "viridis" # quick load yaml file if yaml_file is not None: self.load_data(yaml_file = yaml_file) @property def dynspec_cmap(self): return self._dynspec_cmap # Setters @dynspec_cmap.setter def dynspec_cmap(self, cmap): """ Change cmap of dynamic spectra Parameters ---------- cmap : str color map """ self._dynspec_cmap = cmap set_dynspec_plot_properties(cmap = cmap) ##===============================================## ## retrive data funtions ## ##===============================================## ## [ LOAD IN DATA ] ##
[docs] def load_data(self, dsI: str = None, dsQ: str = None, dsU: str = None, dsV: str = None, yaml_file: str = None, mmap = True, _init = False): """ Load Stokes HTR data Parameters ---------- dsI: str Filename of stokes I dynamic spectra dsQ: str Filename of stokes Q dynamic spectra dsU: str Filename of stokes U dynamic spectra dsV: str Filename of stokes V dynamic spectra yaml_file: str parameter yaml file for FRB, default is None mmap: bool Enable memory mapping for loading _init: bool For initial Data loading """ self._yaml_file = yaml_file log_title("Loading in Stokes dynamic spectra. Assuming the data being loaded are .npy files", col = "lblue") if yaml_file is not None: log("Loading from yaml file", lpf_col = self.pcol) # load pars yaml_pars = load_param_file(yaml_file) # extract pars pars = merge_dicts(yaml_pars['par'], yaml_pars['metapar'], yaml_pars['hyperpar']) self.set(**pars) # set weights if given self.par.set_weights(xtype = "t", **yaml_pars['weights']['time']) self.par.set_weights(xtype = "f", **yaml_pars['weights']['freq']) # set loaded files dsI, dsQ = yaml_pars['data']['dsI'], yaml_pars['data']['dsQ'] dsU, dsV = yaml_pars['data']['dsU'], yaml_pars['data']['dsV'] # check if plotstyle file is given set_plotstyle(yaml_pars['plots']['plotstyle_file']) if yaml_pars['plots']['plotstyle_file'] is None: log("Setting plotting style: Default") else: log(f"setting plotting style: {yaml_pars['plots']['plotstyle_file']}") def init_par_from_load(x): """ Initialise a number of parameters from loaded file """ self.par.nchan = x.shape[0] # assumed that dyn spec is [freq,time] self.par.nsamp = x.shape[1] self.par.t_lim_base = [0.0, self.par.dt * self.par.nsamp] ## dict. of files that will be loaded in data_files = {"dsI": dsI, "dsQ": dsQ, "dsU": dsU, "dsV": dsV} for dkey in data_files.keys(): if data_files[dkey] is not None: data_files[dkey] = os.path.abspath(data_files[dkey]) self._data_files = deepcopy(data_files) old_chans = None # loop through files load_zapchan = "" for key in data_files.keys(): file = data_files[key] init_key = None if file is not None: # load all dynamic spectra self.ds[key[-1]] = load_data(file, mmap) log(f"Loading stokes {key[-1]} Dynspec from: {file} with shape {self.ds[key[-1]].shape}", lpf_col=self.pcol) if init_key is None: init_par_from_load(self.ds[key[-1]]) init_key = key # check if any channels are nan's i.e. flagged chans = self.ds[key[-1]][:,0] if np.any(np.isnan(chans)): self.zap = True log("Finding zapped channels...") load_zapchan = get_zapstr(chans, self.par.get_freqs()) if old_chans is not None: if not np.all(old_chans == chans): log("Channels being zapped are different for each Stokes Dynamic spectra!!", stype = "warn") old_chans = chans.copy() self.metapar.zapchan = combine_zapchan(self.metapar.zapchan, load_zapchan)
## [ SAVING FUNCTION - SAVE CROP OF DATA ] ##
[docs] def save_data(self, data_list = None, name = None, save_yaml = False, yaml_file = None, stk_debias = False, stk_ratio = False, **kwargs): """ Save current instance data Parameters ---------- data_list : List(str), optional List of data to save, by default None name : str, optional Common Pre-fix for saved data, by default None, if None the name parameter of the FRB class will be used. stk_debias: bool Debias Stokes data before saving stk_ratio: bool Save stokes ratios """ log_title("Saving Stokes data, the data is saved as .npy files. ", col = "lblue") if save_yaml: log("Saving fitted parameters to yaml file...", lpf_col = "green") save_frb_to_param_file(self, yaml_file) if data_list is None: log("No data specified for saving...", stype = "warn") return print("Saving the following data products:") for data in data_list: print(f"[{data}]") # get data pdat = self.get_data(data_list, stk_debias = stk_debias, stk_ratio = stk_ratio, get = True) if not self._isdata(): return if name is None: frbname = str(self.par.name) if frbname is None: frbname = "FRBXXXXXX" name = os.path.join(os.getcwd(), frbname) # save data for data in pdat.keys(): np.save(name + f"_{data}.npy", pdat[data]) return
[docs] def set(self, **kwargs): """ Set FRB parameters, see class parameters """ # update pars par = self._from_kwargs_get_par(**kwargs) self.par.set_par(**par) # update metapars metapar = self._from_kwargs_get_metapar(**kwargs) self.metapar.set_metapar(**metapar) # update hyperpars self._update_hyperpar(**kwargs)
[docs] def get_freqs(self): """ Get Frequencies """ if self.empty: return self.par.get_freqs() else: return self.this_par.get_freqs()
# implement FRB_params struct ## [ GET DATA ] ##
[docs] def get_data(self, data_list = "dsI", get = False, ignore_nans = False, stk_debias = False, stk_ratio = False, stk_sigma = None, **kwargs): """ Make new instance of loaded data. This will take a crop of the loaded mmap-ed stokes data, pass it through the back-end processing function and save the data in memory in the ._ds, _t, _f, _time and _freq class instance attributes. Parameters ---------- data_list : List(str) or str, optional List of data products to load in, by default "dsI" get : bool, optional Return new crops of data, by default False and will only save data instances to class container attributes ignore_nans : bool, optional If true, if nans exist in data, they will be removed before saving the data instance stk_debias, bool, optional If true, tL/fL and tP/fP will be debiased stk_ratio, bool, optional If true, calculate X/I for t and f products stk_sigma : float, optional Mask X/I data by ratio_rms_threshold * rms Returns ------- data: Dict, optional Dictionary of processed data crops, by default None if get = False """ log_title("Retrieving Processed Data products. Any currently loaded crops of data will be overwritten. ", col = "lblue") # update par and metapar if nessesary self._load_new_params(**kwargs) # process data_list as str if type(data_list) == str: if data_list == "all": data_list = _G.hkeys else: data_list = [data_list] log(f"Retrieving the following data: {data_list}", lpf_col = self.pcol) # get all data products needed data_products = self._init_proc(data_list, stk_debias = stk_debias, stk_ratio = stk_ratio) ## first check if there is data to use if not self._isvalid(data_products): log("Loaded data not avaliable or incorrect DS shapes", stype = "err", lpf_col = self.pcol) self._isinstance = False return ## make new instances self._make_instance(data_list = data_list, ignore_nans = ignore_nans, stk_debias = stk_debias, stk_ratio = stk_ratio, stk_sigma = stk_sigma) ## set new instance param self._save_new_params() self._isinstance = True # check if get is true if get: # return instance return self._get_instance(data_list, ignore_nans) #return data return
def _get_instance(self, data_list = None, ignore_nans = False): """ retrieve data products Parameters ---------- data_list : List(str), optional crop types to return, by default None Returns ------- data: Dict Dictionary of data crops """ # initialise new data list new_data = {} # ingore nans? -> make mask for this process f_mask = np.ones(self._freq.size, dtype = bool) if ignore_nans: # find first data that isn't None while True: for key in self._ds.keys(): if self._ds[key] is not None: f_mask[np.isnan(self._ds[key][:,0])] = False break for key in self._f.keys(): if self._f[key] is not None: f_mask[np.isnan(self._f[key])] = False break # if no freq data, just pass through break # flags err_flag = self._iserr() for data in data_list: stk = data[-1] # dynamic spectra if "ds" in data: new_data[data] = self._ds[stk][f_mask,:].copy() # time series elif "t" in data: new_data[data] = self._t[stk].copy() new_data[f"{data}err"] = self._t[f"{stk}err"] # frequency spectra elif "f" in data: new_data[data] = self._f[stk][f_mask].copy() new_data[f"{data}err"] = self._f[f"{stk}err"] # also add freqs if self._freq is not None: new_data['freq'] = self._freq[f_mask].copy() else: log("Couldn't get freq array, something went wrong", stype = "warn") # also add times if self._time is not None: new_data['time'] = self._time.copy() else: log("Couldn't get time array, something went wrong", stype = "warn") return new_data def _make_instance(self, data_list = None, ignore_nans = False, stk_debias = False, stk_ratio = False, stk_sigma = None): """ Make New data crops for current instance Parameters ---------- data_list : List(str), optional List of crop products to make, by default None """ # assuming all prior checks on data were successful # purge everything for S in"IQUV": self._ds[S] = None self._ds[f"{S}err"] = None self._t[S] = None self._t[f"{S}err"] = None self._f[S] = None self._f[f"{S}err"] = None self._freq = None self._time = None for S in "LP": self._t[S] = None self._t[f"{S}err"] = None self._f[S] = None self._f[f"{S}err"] = None # get frequencies freqs = self.par.get_freqs() # set up parameter dictionary full_par = merge_dicts(self.this_metapar.metapar2dict(), self.this_par.par2dict()) # get tW and fW temp_w_par = self.par.copy() temp_w_par.update_from_crop(t_crop = full_par['t_crop'], f_crop = full_par['f_crop']) if self.apply_tW: log("Retrieving Time Weights") log("=======================") full_par['tW'] = self.par.tW.get_weights(x = temp_w_par.get_times()) log(temp_w_par.tW, lpf = False) if self.apply_fW: log("Retrieving Freq Weights") log("=======================") full_par['fW'] = self.par.fW.get_weights(x = temp_w_par.get_freqs()) log(temp_w_par.fW, lpf = False) # pass through to backend processing script _ds, _t, _f, self._freq, _flags = master_proc_data(self.ds, freqs, data_list, full_par, stk_debias, stk_ratio, stk_sigma) # process flags self.zap = _flags['zap_flag'] # ingore nans? -> make mask for this process f_mask = np.ones(self._freq.size, dtype = bool) if ignore_nans and self.zap: # find first data that isn't None while True: for key in _ds.keys(): if _ds[key] is not None: f_mask[np.isnan(_ds[key][:,0])] = False break for key in _f.keys(): if _f[key] is not None: f_mask[np.isnan(_f[key])] = False break # if no freq data, just pass through break log("Saving new data products to latest instance", lpf_col = self.pcol) aval_key_for_time = None _timesize = 0 # dynspecs ds_list = _ds.keys() for key in ds_list: if _ds[key] is not None: if "err" not in key: aval_key_for_time = key _timesize = _ds[key].size self._ds[key] = _ds[key][f_mask,:].copy() _ds[key] = None # time series t_list = _t.keys() for key in t_list: if _t[key] is not None: if "err" not in key: aval_key_for_time = key _timesize = _t[key].size self._t[key] = _t[key].copy() _t[key] = None # freq spectra f_list = _f.keys() for key in f_list: if _f[key] is not None: self._f[key] = _f[key][f_mask].copy() _f[key] = None # proc freq array, nan self._freq = self._freq[f_mask] if aval_key_for_time is not None: self._time = self.this_par.get_times() return def _clear_instance(self, data_list = None): """ Remove specified data products of crops Parameters ---------- data_list : List(str), optional List of data products to clear, by default None """ # flags err_flag = self._iserr() if data_list is None: data_list = _G.hkeys[:-2] # remove freqs self._freq = None log(f"Clearing data: {data_list}") for data in data_list: stk = data[-1] # dynamic spectra if "ds" in data: self._ds[stk] = None # time series elif "t" in data: self._t[stk] = None if err_flag: self._t[f"{stk}err"] = None # spectra elif "f" in data: self._f[stk] = None if err_flag: self._f[f"{stk}err"] = None if "freq" in data_list: # remove freqs self._freq = None if "time" in data_list: # remove time samples self._time = None return def _init_proc(self, data_list, stk_debias = False, stk_ratio = False): """ Check if all requested data products and their dependencies are being requested. Parameters ---------- data_list: List(str) List of requested cropped data products """ # get stokes data to load in stk = get_stk_from_datalist(data_list) # check if Q or U is there, and if RM is non-zero, if so # load both Q and U if (("Q" in stk) != ("U" in stk)) and (self.this_par.RM is not None): # add missing stokes to stk list if "Q" in stk: log("Added Stokes U to process for RM correction", lpf = False) stk += "U" else: log("Added Stokes Q to process for RM correction", lpf = False) stk += "Q" # if norm == "I", then we want to normalise all data using "I", this # must add it to the list. if self.this_metapar.norm == "maxI": log("Added stokes I for normalisation purposes", lpf = False) stk += "I" # if requesting L and/or P if ("tL" in data_list) or ("fL" in data_list): for s in "QU": if s not in stk: log(f"Added stokes {s} to process for retrieving L polarisation", lpf = False) stk += s if ("tP" in data_list) or ("fP" in data_list): for s in "QUV": if s not in stk: log(f"Added stokes {s} to process for retrieving P polarisation", lpf = False) stk += s # if debiasing L and/or P add_stokes_I = False for s in ["tL", "fL", "tP", "fP"]: if s in data_list: if stk_debias: add_stokes_I = True if add_stokes_I: log("Added stokes I to process for debiasing L and/or P polarisations", lpf = False) if "I" not in stk: stk += "I" # if calculating ratios if stk_ratio: log("Added stokes I to process for calculating stokes ratios", lpf = False) if "I" not in stk: stk += "I" return stk ##===============================================## ## validate par functions ## ##===============================================## def _update_par(self, **kwargs): """ Info: Return a copy of FRB_params class with updated parameters Args: **kwargs """ # extract pars par = self._from_kwargs_get_par(**kwargs) # create copy of par self.this_par = self.par.copy() # update copy self.this_par.set_par(**par) # update from crop metapar = self._from_kwargs_get_metapar(**kwargs) self.this_par.update_from_crop(metapar['t_crop'], metapar['f_crop'], metapar['tN'], metapar['fN']) def _update_metapar(self, **kwargs): """ Info: Return a copy of FRB_metaparams class with updated parameters Args: **kwargs """ # extract metapars metapar = self._from_kwargs_get_metapar(**kwargs) # create copy of par self.this_metapar = self.metapar.copy() # update self.this_metapar.set_metapar(**metapar) def _update_hyperpar(self, **kwargs): """ Info: Return updated hyper params Args: **kwargs """ hyperpar = {} for key in _G.hp.keys(): if key in kwargs.keys(): setattr(self, key, kwargs[key]) # set verbose set_verbose(self.verbose) def _load_new_params(self, **kwargs): """ Update parameters with keywords for current instance """ # make copy of kwargs that change, that way changes that are made do # not propagate kwargs_keys = kwargs.keys() # check if any are already made static static_keys = [] for key, item in _G.sp.items(): if key in kwargs_keys: if item not in kwargs_keys: static_keys += [key] else: kwargs[key] = deepcopy(kwargs[item]) kwargs = {_G.sp[k] if k in static_keys else k: v for k, v in kwargs.items()} # add copies of items back in, these will be processed through without touching the # original ones for static_key in static_keys: kwargs[static_key] = deepcopy(kwargs[_G.sp[static_key]]) # copy over current hyperparams to kwargs metapar = self.metapar.metapar2dict() kw = kwargs.keys() for key in metapar.keys(): if key not in kw: kwargs[key] = metapar[key] # make sure metaparameters are updated first self._proc_kwargs(**kwargs) # update hyper parameters self._update_hyperpar(**kwargs) # update self.this_metapar self._update_metapar(**kwargs) # update self.this_par self._update_par(**kwargs) def _save_new_params(self): """ save Current instance of FRB_params and FRB_metaparams """ # update self.prev_par self.prev_par = self.this_par.copy() # update self.prev_metapar self.prev_metapar = self.this_metapar.copy() def _from_kwargs_get_par(self, **kwargs): """ Info: Get all parameters from kwargs dictionary, missing parameters will be taken from [self.par] Args: **kwargs """ par = {} base_par = self.par.par2dict() for key in _G.p.keys(): # check if key part of par list if key in kwargs.keys(): par[key] = kwargs[key] else: par[key] = base_par[key] return par def _from_kwargs_get_metapar(self, **kwargs): """ Info: Get all Meta parameters from kwargs dictionary, missing meta parameters will be taken from [self.metapar] Args: **kwargs """ meta_par = {} base_metapar = self.metapar.metapar2dict() for key in _G.mp.keys(): # check if key part of meta par list if key in kwargs.keys(): meta_par[key] = kwargs[key] else: meta_par[key] = base_metapar[key] return meta_par def _proc_kwargs(self, **kwargs): """ Process Kwargs """ keys = kwargs.keys() # if self.crop_units not in ["physical", "phase"]: # log("Units for cropping must be one of: ['physical', 'phase'] ", stype = "err") # return if self.crop_units != "physical": self.crop_units = "physical" log("Only 'physical' crop units, i.e. [ms and MHz] are supported now...", stype = "warn") def check_crop_for_str(crop, domain): """ Check for crop "min" and "max" specifiers""" if domain == "t": _vars = self.par.t_lim elif domain == "f": _vars = self.par.f_lim else: log("Something went wrong converting crops, no domain chosen.", stype = "err") phase_vars = [0.0, 1.0] if self.crop_units == "physical": phase_vars = [_vars[0], _vars[1]] for i, spe in zip([0, -1], ["min", "max"]): if isinstance(crop[i], str): if crop[i] == spe: # check if other crop comp is phase or ms if isinstance(crop[i+1], float) or isinstance(crop[i+1], int): if crop[i+1] > 1.0: # convert to min/max crop[i] = _vars[i] else: crop[i] = phase_vars[i] elif isinstance(crop[i+1], str): crop[i] = phase_vars[i] else: log(f"Typing of crop isn't right. {crop[i+1]}", stype = "err") else: log("Incorrect placement of crop specifiers, must be ['min', 'max'] if being used.", stype = "err") elif isinstance(crop[i], float) or isinstance(crop[i], int): pass else: log(f"Typing of crop isn't right. {crop[i]}", stype = "err") return crop[0], crop[1] # check if t_crop has been given in units of ms if "t_crop" in keys: kwargs['t_crop'][0], kwargs['t_crop'][1] = check_crop_for_str(kwargs['t_crop'], "t") if self.crop_units == "physical": prev_t = kwargs['t_crop'].copy() new_t,_ = self.par.lim2phase(t_lim = prev_t, snap = True) kwargs['t_crop'][0], kwargs['t_crop'][1] = new_t[0], new_t[1] if kwargs['t_crop'][0] < 0.0: kwargs['t_crop'][0] = 0.0 if kwargs['t_crop'][1] > 1.0: kwargs['t_crop'][1] = 1.0 log(f"Converting Time crop {prev_t} ms -> {kwargs['t_crop']} phase units", lpf = False) elif self.crop_units == "phase": # check if within phase units bad_crop_flag = False prev_t = kwargs['t_crop'].copy() if (kwargs['t_crop'][0] < 0.0) or (kwargs['t_crop'][0] > 1.0): bad_crop_flag = True kwargs['t_crop'][0] = 0.0 if (kwargs['t_crop'][1] < 0.0) or (kwargs['t_crop'][1] > 1.0): bad_crop_flag = True kwargs['t_crop'][1] = 1.0 if bad_crop_flag: log(f"Phase crop in time was out-of-bounds of [0.0, 1.0], setting: [{prev_t[0]}, {prev_t[1]}] -> [{kwargs['t_crop'][0]},{kwargs['t_crop'][1]}]") # check if t_crop has been given in units of ms if "f_crop" in keys: kwargs['f_crop'][0], kwargs['f_crop'][1] = check_crop_for_str(kwargs['f_crop'], "f") if kwargs['f_crop'][0] > 1.0 or kwargs['f_crop'][1] > 1.0: prev_f = kwargs['f_crop'].copy() _, new_f = self.par.lim2phase(f_lim = prev_f, snap = True) kwargs['f_crop'][0], kwargs['f_crop'][1] = new_f[0], new_f[1] if kwargs['f_crop'][0] < 0.0: kwargs['f_crop'][0] = 0.0 if kwargs['f_crop'][1] > 1.0: kwargs['f_crop'][1] = 1.0 log(f"Converting Freq crop {prev_f} MHz -> {kwargs['f_crop']} phase units", lpf = False) # check if terr_crop has been given in units of ms if "terr_crop" in keys: if kwargs['terr_crop'] is not None: kwargs["terr_crop"][0], kwargs["terr_crop"][1] = check_crop_for_str(kwargs["terr_crop"], "t") if self.crop_units == "physical": prev_t = kwargs['terr_crop'].copy() new_t,_ = self.par.lim2phase(t_lim = prev_t, snap = True) kwargs['terr_crop'][0], kwargs['terr_crop'][1] = new_t[0], new_t[1] if kwargs['terr_crop'][0] < 0.0: kwargs['terr_crop'][0] = 0.0 if kwargs['terr_crop'][1] > 1.0: kwargs['terr_crop'][1] = 1.0 log(f"Converting err Time crop {prev_t} ms -> {kwargs['terr_crop']} phase units", lpf = False) elif self.crop_units == "phase": # check if within phase units bad_crop_flag = False prev_t = kwargs['terr_crop'].copy() if (kwargs['terr_crop'][0] < 0.0) or (kwargs['terr_crop'][0] > 1.0): bad_crop_flag = True kwargs['terr_crop'][0] = 0.0 if (kwargs['terr_crop'][1] < 0.0) or (kwargs['terr_crop'][1] > 1.0): bad_crop_flag = True kwargs['terr_crop'][1] = 1.0 if bad_crop_flag: log(f"Phase error crop in time was out-of-bounds of [0.0, 1.0], setting: [{prev_t[0]}, {prev_t[1]}] -> [{kwargs['terr_crop'][0]},{kwargs['terr_crop'][1]}]") ## [ CHECK IF DATA PROUCTS ARE VALID ] ## def _isvalid(self, data_products: list = None): """ Check if data products are valid, are they loaded? Do their shapes match Parameters ---------- data_products : list(str), optional Data products to check against, by default None Returns ------- bool 0 if failed, 1 if passed """ data_shape = [] print(data_products) for key in data_products: # check if none if self.ds[key] is None: log(f"Missing data for [{key}]") return 0 data_shape.append(list(self.ds[key].shape)) # check if shape of all data matches if not all(x==data_shape[0] for x in data_shape): log("Data shape mismatch between loaded Dynamic spectra") return 0 return 1 def _iserr(self): """ Check if off-pulse region crop parameters, i.e. terr_crop has been given, if so the off-pulse rms will be calculated. """ return self.this_metapar.terr_crop is not None def _isdata(self): return self._isinstance ##===============================================## ## Diagnostic functions ## ##===============================================## def __str__(self): """ Info: Print info about FRB class """ #create string outlining parameters outstr = "" outstr += "="*80 + "\n" outstr += " "*32 + " Crop Parameters " + ""*32 + "\n" outstr += "="*80 + "\n\n" outstr += "PARAMETERS:".ljust(25) + "SAVED:".ljust(25) + "INST:".ljust(25) + "\n" outstr += "="*80 + "\n\n" for _,key in enumerate(_G.mp.keys()): val = getattr(self.metapar, key) val2 = getattr(self.prev_metapar, key) outstr += f"{key}:".ljust(25) + f"{val}".ljust(25) + f"{val2}".ljust(25) + "\n" outstr += "\n" outstr += "="*80 + "\n" outstr += " "*32 + " Data Parameters " + " "*32 + "\n" outstr += "="*80 + "\n\n" outstr += "PARAMETERS:".ljust(25) + "SAVED:".ljust(25) + "INST:".ljust(25) + "\n" outstr += "="*80 + "\n\n" for _,key in enumerate(_G.p.keys()): val = getattr(self.par, key) val2 = getattr(self.prev_par, key) outstr += f"{key}:".ljust(25) + f"{val}".ljust(25) + f"{val2}".ljust(25) + "\n" # outline loaded data outstr += "\n" outstr += "="*80 + "\n" outstr += " "*30 + " DATA products " + " "*30 + "\n" outstr += "="*80 + "\n\n" outstr += "TYPE:".ljust(25) + "SHAPE:".ljust(25) + "\n" outstr += "="*80 + "\n\n" for S in "IQUV": if self.ds[S] is not None: outstr += f"{S}:".ljust(25) + f"{list(self.ds[S].shape)}".ljust(25) + "\n" #now print data instance outstr += "\n" outstr += "="*80 + "\n" outstr += " "*30 + " DATA instance " + " "*30 + "\n" outstr += "="*80 + "\n\n" outstr += "TYPE:".ljust(25) + "SHAPE/VAL:".ljust(25) + "\n" outstr += "="*80 + "\n\n" ds_str = "" t_str = "\n" f_str = "\n" for S in "IQUV": if self._ds[S] is not None: ds_str += f"ds{S}:".ljust(25) + f"{list(self._ds[S].shape)}".ljust(25) + "\n" for S in "IQUVLP": if self._t[S] is not None: t_str += f"t{S}:".ljust(25) + f"{list(self._t[S].shape)}".ljust(25) + "\n" if self._t[f"{S}err"] is not None: Serr = f"{S}err" t_str += f"t{S}err:".ljust(25) + f"{self._t[Serr]}".ljust(25) + "\n" if self._f[S] is not None: f_str += f"f{S}:".ljust(25) + f"{list(self._f[S].shape)}".ljust(25) + "\n" if self._f[f"{S}err"] is not None: Serr = f"{S}err" f_str += f"f{S}err:".ljust(25) + f"{list(self._f[Serr].shape)}".ljust(25) + "\n" outstr += ds_str + t_str + f_str if self._freq is not None and len(self._freq) > 0: outstr += f"freqs:".ljust(25) + f"top:{self._freq[0]}, bottom:{self._freq[-1]}" + "\n\n" def _print_fitted_params(pstr, pars): for i, key in enumerate(pars.keys()): pstr += f"{key}:".ljust(25) + f"{pars[key].val}".ljust(25) + f"+{pars[key].p}".ljust(20) + f"-{pars[key].m}".ljust(20) + "\n" return pstr # print fitted params if len(self.fitted_params) > 0: outstr += "\n" outstr += "="*80 + "\n" outstr += " "*31 + " Fitted Parameters " + " "*31 + "\n" outstr += "="*80 + "\n\n" outstr += "PARAMETERS:".ljust(25) + "VALUES:".ljust(25) + "+ ERR:".ljust(20) + "- ERR:".ljust(20) + "\n" outstr += "="*80 + "\n\n" keys = self.fitted_params.keys() if "RM" in keys: outstr += "#"*20 + "\n" outstr += " Fitted RM \n" outstr += "#"*20 + "\n" outstr = _print_fitted_params(outstr, self.fitted_params['RM']) if "tscatt" in keys: outstr += "#"*20 + "\n" outstr += " Fitted tscatt \n" outstr += "#"*20 + "\n" outstr = _print_fitted_params(outstr, self.fitted_params['tscatt']) if "scintband" in keys: outstr += "#"*20 + "\n" outstr += " Fitted scintband \n" outstr += "#"*20 + "\n" outstr = _print_fitted_params(outstr, self.fitted_params['scintband']) return outstr ##===============================================## ## Further FRB processing ## ##===============================================## ## [ FIND FRB PEAK AND TAKE REGION AROUND IT ] ##
[docs] def find_frb(self, method = "sigma", mode = "median", sigma: int = 5, rms_guard: float = 10, rms_width: float = 50, rms_offset: float = 60, yfrac: float = 0.95, buffer: float = None, padding: float = None, dt_from_peak_sigma: float = None, **kwargs): """ This function uses a number of method of finding the bounds of a burst. 1. Find FRB bounds using a sigma threshold [method = "sigma"] find_optimal_sigma_width(sigma, rms_guard, rms_width, rms_offset) 2. Find FRB width and centroid using a fractional fluence threshold method [method = "fluence"] find_optimal_fluence_width(yfrac) Note, the centroid of the burst is the point along the burst that splits the fluence 50/50 on either side. Parameters ---------- method: str method to use for finding burst bounds ["sigma", "fluence"] mode: str type of algorithm to use when finding optimal fluence width (method = "fluence")\n [median] -> find burst width by estimating centroid of burst and fluence threshold on either side \n [min] -> find minimum burst width that captures the desired fluence threshold (moving window algorithm) sigma: int S/N threshold rms_guard: float gap between estiamted pulse region and off-pulse region for rms and baseband estimation, in (ms) rms_width: float width of off-pulse region on either side of pulse region in (ms) rms_offset: float rough offset from peak on initial S/N threshold in (ms) yfrac: float fraction of total fluence on either side of FRB effective centroid to take as FRB bounds buffer: float initial width of data in [ms] centered at the peak that will be used to estimate FRB bounds pading: float Add additional padding to measured bounds, as a fraction of the width of the burst dt_from_peak_sigma: float Determine maximum time resolution (dt) to achieve a peak S/N of dt_from_peak_sigma **kwargs: FRB parameters + FRB meta-parameters Returns ------- t_crop: list New Phase start and end limits for found frb burst t_ref: float Zero point, either the peak or centroid, depending on the method used """ log_title(f"Looking for FRB burst.", col = "lblue") ms2phase = lambda x : x / (tI.size * self.this_par.dt) ##====================## ## check if data valid## ##====================## # if 't_crop' not in kwargs.keys(): kwargs['t_crop'] = ["min", "max"] # kwargs['f_crop'] = ["min", "max"] kwargs['terr_crop'] = None tN = None if dt_from_peak_sigma is not None: kwargs['tN'] = 1 # get full data self.get_data("tI", **kwargs) if not self._isdata(): return None tI = self._t['I'] kwargs['tN'] = find_optimal_sigma_dt(tI, sigma = dt_from_peak_sigma, rms_offset = ms2phase(rms_offset), rms_width = ms2phase(rms_width)) tN = kwargs['tN'] # get data self.get_data("tI", **kwargs) if not self._isdata(): return None # make smaller buffer of data if buffer is not None: log(f"Searching Buffer [{buffer}] ms around peak of burst", lpf_col = self.pcol) buffer = int(buffer / self.this_par.dt) peak = np.argmax(self._t['I']) tI = self._t['I'][peak - buffer//2 : peak + buffer//2] buffer_ref = peak - buffer//2 else: log(f"Searching full time series", lpf_col = self.pcol) tI = self._t['I'] buffer_ref = 0 # now choose method of finding burst bounds log(f"Searching for Width using: {method} method", lpf_col=self.pcol) if method == "sigma": ms2phase = lambda x : x / (tI.size * self.this_par.dt) ref_ind, lw, rw = find_optimal_sigma_width(tI = tI, sigma = sigma, rms_guard = ms2phase(rms_guard), rms_width = ms2phase(rms_width), rms_offset = ms2phase(rms_offset)) log("Setting zero point reference [t_ref] to PEAK of burst", lpf_col = self.pcol) elif method == "fluence": ref_ind, lw, rw = find_optimal_fluence_width(tI = tI, yfrac = yfrac, mode = mode) log("Setting zero point reference [t_ref] to EFFECTIVE CENTROID of burst", lpf_col = self.pcol) else: log(f"Undefined method [{method}].. Aborting!", lpf_col = self.pcol, stype = "err") return (None)*2 # Calculate new t_crop and t_ref relative to full time series dataset t_ref = (buffer_ref + ref_ind) * self.this_par.dt t_crop = [-lw * self.this_par.dt, rw * self.this_par.dt] # add padding width = t_crop[1] - t_crop[0] padded_width = width if padding is not None: t_crop[0] -= padding * width t_crop[1] += padding * width padded_width += 2 * padding * width else: padding = 0 # if units are physical if self.crop_units == "phase": self.par.set_par(t_ref = t_ref) t_crop,_ = self.par.lim2phase(t_lim = t_crop) self.metapar.set_metapar(t_crop = t_crop) self.metapar.set_metapar(terr_crop = [-rms_offset - rms_width, -rms_offset]) if dt_from_peak_sigma is not None: self.metapar.set_metapar(tN = kwargs['tN']) self.par.set_par(t_ref = t_ref) print("New t_crop: [{:.4f}, {:.4f}]".format(t_crop[0],t_crop[1])) print(f"Setting terr_crop: [{-rms_offset - rms_width:.4f}, {-rms_offset:.4f}]") print(f"New time series 0-point: [{t_ref:.4f}]") if dt_from_peak_sigma is not None: print(f"time resolution for peak S/N [{dt_from_peak_sigma:.4f}]: {self.this_par.dt:.4f} ms (tN = {kwargs['tN']})") log(f"Width of burst without padding: {width:.4f} ms", lpf_col = self.pcol) log(f"Width LHS, RHS of zero point (without padding): {-lw * self.this_par.dt:.4f}, {rw * self.this_par.dt:.4f} ms", lpf_col = self.pcol) log(f"Width of burst with padding: {padded_width:.4f} ms", lpf_col = self.pcol) log(f"Width LHS, RHS of zero point (with padding): {-lw * self.this_par.dt - padding * width:.4f}, {rw * self.this_par.dt + padding * width:.4f} ms", lpf_col = self.pcol) # clear dsI self._clear_instance(data_list = ["dsI"]) return t_crop, t_ref, tN
##===============================================## ## Plotting Methods ## ##===============================================##
[docs] def plot_data(self, data = "dsI", ax = None, stk_debias = False, stk_ratio = False, stk_sigma = None, filename: str = None, **kwargs): """ General Plotting function, choose to plot either dynamic spectrum or time series data for all stokes parameters Parameters ---------- data : str, optional type of data to plot, by default "dsI" ax : axes, optional Axes object to plot data into stk_debias : bool, optional If True, Any L or P data plotted will be debiased stk_ratio : bool, optional If True, any t or f data will be converted to X/I and plotted stk_sigma, optional Mask Stokes ratios by ratio_rms_threshold * rms filename : str, optional filename to save figure to, by default None Returns ------- fig : figure Return Figure Instance """ log_title(f"plotting [{data}] product.", col = "lblue") # get data pdat = self.get_data(data_list = data, get = True, stk_debias = stk_debias, stk_ratio = stk_ratio, stk_sigma = stk_sigma, **kwargs) if not self._isdata(): return None # plot fig = plot_data(pdat, data, ax = ax, plot_type = self.plot_type) if self.save_plots: if filename is None: filename = f"{self.par.name}_{data}.png" plt.savefig(filename) if self.show_plots: plt.show() self._save_new_params() return fig
[docs] def zap_channels(self, chans: str = None, zapzeros: bool = False, zapzerosmargin: float = 1e-5, resetzap: bool = False, interactive = False, **kwargs): """ Zap channels (uses Stokes I freq spectrum) Parameters ---------- chans : str, optional Channels to zap, by default None zapzeros : bool, optional zap channels at or close to zero, by default False zapzerosmargin : float margin close to zero at which to zap channels, ratio of max channel flux resetzap : bool, optional reset channels zapped, by default False interactive : bool, optional Enable interactive zapping mode **kwargs : dict Standard frb parameters that can be overidden (zapchan cannot be overidden) """ if "zapchan" in kwargs.keys(): del kwargs['zapchan'] # init pars self._load_new_params(**kwargs) zapchan = self.metapar.zapchan if zapchan is None: zapchan = "" if resetzap: data = self.get_data('fI', get = True, zapchan = "", **kwargs) zapchan = get_zapstr(data['fI'], self.par.get_freqs()) if chans is not None: zapchan += ("," + chans) if zapzeros: data = self.get_data('fI', get = True, zapchan = zapchan, **kwargs) data['fI'][np.isnan(data['fI'])] = np.max(data['fI'][~np.isnan(data['fI'])]) data['fI'][np.abs(data['fI']/np.max(np.abs(data['fI']))) < zapzerosmargin] = np.nan zapchan += ("," + get_zapstr(data['fI'], self.par.get_freqs())) # make sure there are no commas in illegal places test_zapchan = zapchan.strip() if len(test_zapchan) > 0: if test_zapchan[0] == ",": test_zapchan = test_zapchan[1:] if len(test_zapchan) > 0: if test_zapchan[-1] == ",": test_zapchan = test_zapchan[:-1] if interactive: data = self.get_data('dsI', zapchan = test_zapchan, **kwargs, get = True) data['dsI'][np.isnan(data['dsI'][:,0])] = 0.0 test_zapchan = ZapInteractive(data['dsI'], data['freq'], data['time'], zapchan = test_zapchan) # save self.metapar.zapchan = test_zapchan return
[docs] def plot_stokes(self, ax = None, stk_debias = False, stk_sigma = 2.0, stk_type = "f", stk2plot = "IQUV", stk_ratio = False, filename: str = None, **kwargs): """ Plot Stokes data, by default stokes I, Q, U and V data is plotted Parameters ---------- ax: _axes_ matplotlib.pyplot.axes object to plot to, default is None stk_debias : bool, optional Plot stokes L and/or P debias, by default False stk_sigma : float, optional sigma threshold for error masking, data that is I < sigma * Ierr, mask it out or else weird overflow behavior might be present when calculating stokes ratios, by default 2.0 stk_type : str, optional Type of stokes data to plot, "f" for Stokes Frequency data or "t" for time data, by default "f" stk2plot : str, optional string of stokes to plot, for example if "QV", only stokes Q and V are plotted, by default "IQUV" stk_ratio : bool, optional if true, plot stokes ratios S/I filename : str, optional name of file to save figure image, by default None **kwargs : Dict FRB parameter keywords Returns ------- fig : figure Return figure instance """ log_title(f"plotting stokes [{stk_type}] data", col = "lblue") # get data data_list = [f"{stk_type}I", f"{stk_type}Q", f"{stk_type}U", f"{stk_type}V"] data = self.get_data(data_list = data_list, get = True, **kwargs) if not self._isdata(): return None err_flag = self._iserr() # check if off-pulse region given if not err_flag: log("Off-pulse crop required for plotting Ldebias", lpf_col = self.pcol, stype = "warn") stk_debias = False # data container for plotting pstk = {} if not stk_type in "ft": log("stk_type can only be t or f", lpf_col = self.pcol, stype = "err") # plot fig = plot_stokes(data, Ldebias = stk_debias, stk_type = stk_type, sigma = stk_sigma, stk2plot = stk2plot, stk_ratio = stk_ratio, plot_type = self.plot_type, ax = ax) if self.save_plots: if filename is None: filename = f"{self.par.name}_stk_{stk_type}.png" plt.savefig(filename) if self.show_plots: plt.show() self._save_new_params() return fig
[docs] def plot_crop(self, stk = "I", filename = None, **kwargs): """ Plot current crop of of data along with off-pulse crop if given Parameters ---------- stk : str, optional Stokes data to plot, by default "I" filename : str, optional name of file to save figure image, by default None """ log_title("Plotting current crop parameters for visual inspection.", col = "lblue") # initialise self._load_new_params(**kwargs) # get crop in time and frequency, these will be used to draw the bounds of the crops in a larger # dynamic spectrum tcrop = self.this_metapar.t_crop.copy() fcrop = self.this_metapar.f_crop.copy() terrcrop = self.this_metapar.terr_crop.copy() # combine crops, these will be crops of the full dynamic spectrum with bound markers fcrop_ds = [0.0, 1.0] # by default take full bandwidth tpad = 50 / (self.par.t_lim[-1] - self.par.t_lim[0]) # by default we will pad time by 100ms # check of off-pulse region has been given err_flag = True if terrcrop is None: # this essentially ignores the off-pulse region when plotting err_flag = False terrcrop = tcrop.copy() tcrop_ds = [0.0, 1.0] tcrop_ds[0] = min(tcrop[0], terrcrop[0]) - tpad tcrop_ds[1] = max(tcrop[1], terrcrop[1]) + tpad # cut crop to between [0.0, 1.0] if tcrop_ds[0] < 0.0: tcrop_ds[0] = 0.0 if tcrop_ds[1] > 1.0: tcrop_ds[1] = 1.0 if self.crop_units == "physical": tcrop_ds, fcrop_ds = self.par.phase2lim(t_crop = tcrop_ds, f_crop = fcrop_ds) # get data kwargs['t_crop'] = [*tcrop_ds] kwargs['f_crop'] = [*fcrop_ds] self.get_data([f"ds{stk}"], **kwargs) # plot dynamic spectra fig = plt.figure(figsize = (12,12)) plot_dynspec(self._ds[stk], aspect = 'auto', extent = [*self.this_par.t_lim, *self.this_par.f_lim]) plt.xlabel("Time [ms]") plt.ylabel("Freq [MHz]") tcrop, fcrop = self.par.phase2lim(t_crop = tcrop, f_crop = fcrop) terrcrop, _ = self.par.phase2lim(t_crop = terrcrop) # plot on-pulse time region plt.plot([tcrop[0]]*2, self.this_par.f_lim, color = 'r', linestyle = "--", label = "On-pulse time crop") plt.plot([tcrop[1]]*2, self.this_par.f_lim, color = 'r', linestyle = "--") # plot freq region plt.plot(self.this_par.t_lim, [fcrop[0]]*2, color = "orange", linestyle = "--", label = "freq crop") plt.plot(self.this_par.t_lim, [fcrop[1]]*2, color = "orange", linestyle = "--") # plot off-pulse time region if err_flag: plt.plot([terrcrop[0]]*2, self.this_par.f_lim, color = 'm', linestyle = "--", label = "Off-pulse time crop") plt.plot([terrcrop[1]]*2, self.this_par.f_lim, color = 'm', linestyle = "--") plt.legend() # title for crop info titstr = f"t crop: [{tcrop[0]:.1f}, {tcrop[1]:.1f}] [ms]\n" titstr += f"f crop: [{fcrop[0]:.1f}, {fcrop[1]:.1f}] [MHz]" if err_flag: titstr += f"\nterr crop: [{terrcrop[0]:.1f}, {terrcrop[1]:.1f}] [ms]" plt.title(titstr) if self.save_plots: if filename is None: filename = f"{self.par.name}_crop.png" plt.savefig(filename) if self.show_plots: plt.show()
## [ PLOT LORENTZ OF CROP ] ##
[docs] def fit_scintband(self, method = "bayesian",priors: dict = None, statics: dict = None, fit_params: dict = None, redo = False, filename: str = None, n: int = None, **kwargs): """ Fit for, Find and plot Scintillation bandwidth in FRB Parameters ---------- method : str method for fitting \n [bayesian] - Use Bilby bayesian Statistics \n [least squares] - Use Scipy.Curve_fit least squares priors : dict, optional Priors for sampling, by default None statics : dict, optional priors to keep constant, by default None fit_params : dict, optional extra arguments for Bilby.run_sampler function, by default None redo : bool, optional if True, will redo fitting in the case that results are cached, this is mainly for BILBY fitting, default is False filename : str, optional Save figure to file, by default None n : float, optional Polynomial order, by default None Returns ------- p: pyfit.fit pyfit class structure """ log_title(f"Fitting for Scintillation bandwidth using [{method}] method.", col = "lblue") ##====================## ## get par ## ##====================## # initilise dicts priors, statics, fit_params = dict_init(priors, statics, fit_params) # init pars self._load_new_params(**kwargs) ##====================## ## do fitting ## ##====================## # get data crop and spectrum self.get_data("fI", **kwargs) if not self._isdata(): return None # in the case channel zapping has been performed, first calc residuals of non.nan values # then convert nans to zeros for acf. if self.zap: sumfunc = np.nansum else: sumfunc = np.sum # caculate acf of residuals if n is not None: y, yfit = residuals(self._f['I'], n = n) yrms = sumfunc(y**2) else: y = self._f['I'] yrms = None y = acf(y) # in case zapping is involved mask = np.isnan(y) # lags x = np.linspace(self.this_par.df, self.this_par.bw - self.this_par.df, y.size) # create instance of fitting yerr = None p = fit(x = x[~mask], y = y[~mask], yerr = None, func = lorentz, prior = priors, static = statics, fit_keywords = fit_params, method = method, residuals = self.residuals, plotPosterior = self.plotPosterior) # fit p.fit(redo = redo) # calculate modulation index # see (Macquart. j. P. et al, 2019) - [The spectral Properties of the bright FRB population] m = p.posterior['a'].val**0.5 # using error propogation and quick calculus to obtain error temp_err = (abs(p.posterior['a'].p) + abs(p.posterior['a'].m))/2 err = 0.5*temp_err/p.posterior['a'].val p.set_posterior('m', m, err, err) # set to fitted params self.fitted_params['scintband'] = p.get_posteriors() if self.verbose: p.stats() print(f"RMS in poly-n = {n} fitting (sum in square of residuals):") print(yrms) print(p) ##===================## ## do plotting ## ##===================## if self.save_plots or self.show_plots: if n is not None: plt.figure(figsize = (10,10)) plt.plot(self._freq, self._f['I'], 'k', label = "STOKES I spectra") plt.plot(self._freq, yfit(np.arange(self._f['I'].size)), 'r--', label = "STOKES I fit") plt.xlabel("Freq [MHz]") plt.ylabel("Flux (arb.)") plt.title(f"polyfit, n = {n}") plt.legend() if self.save_plots: if filename is None: filename = f"{self.par.name}_fit_scintband_broad_poly_model.png" else: filename += "_broad_poly_model.png" plt.savefig(filename) fig = p.plot(xlabel = "Freq [MHz]", ylabel = "Norm acf", show = False) if self.save_plots: if filename is None: filename = f"{self.par.name}_fit_scintband.png" else: filename += ".png" plt.savefig(filename) if self.show_plots: plt.show() # update instance par self._save_new_params() return p
## [ FIT SCATTERING TIMESCALE ] ##
[docs] def fit_tscatt(self, method = "bayesian", npulse = 1, priors: dict = None, statics: dict = None, fit_params: dict = None, redo = False, filename: str = None, **kwargs): """ Fit a series of gaussian's convolved with a one-sided exponential scattering tai using BILB Parameters ---------- method : str method for fitting \n [bayesian] - Use Bilby bayesian Statistics \n [least squares] - Use Scipy.Curve_fit least squares npulse : int, optional Number of gaussian to fit, by default 1 priors : dict, optional Priors for sampling, by default None statics : dict, optional Priors to keep constant during fitting, by default None fit_params : dict, optional Keyword parameters for Bilby.run_sampler, by default None redo : bool, optional if True, will redo fitting in the case that results are cached, this is mainly for BILBY fitting, default is False filename : str, optional filename to save final plot to, by default None Returns ------- p: pyfit.fit pyfit class structure """ log_title(f"Fitting for Scattering Time and overall burst profile using [{method}] method.", col = "lblue") ##====================## ## check if data valid## ##====================## # initilaise dicts priors, statics, fit_params = dict_init(priors, statics, fit_params) # init par self._load_new_params(**kwargs) ##====================## ## proc data to fit ## ##====================## # get data y = self.get_data("tI", get = True, **kwargs)["tI"] if not self._isdata(): return None # create time profile x = np.linspace(*self.this_par.t_lim,y.size) err = None if self._iserr(): err = self._t['Ierr']*np.ones(y.size) ##==================## ## Do Fitting ## ##==================## # create instance of fitting # the implemented convolution algorithm requires that we snap to integer samples # check if priors given or not p = fit(x = x, y = y, yerr = err, func = make_scatt_pulse_profile_func(npulse), prior = priors, static = statics, fit_keywords = fit_params, method = method, residuals = self.residuals, plotPosterior = self.plotPosterior) # make sure we don't sample 0 for select priors if method == "least squares": for key in p.keys: if key == "sigma": continue if key not in priors.keys(): print("KEYKEY") if ("sig" in key) or ("tau" in key) or (key[0] == "a"): p.bounds[key] = [0.0001, math.inf] # fit p.fit(redo = redo) # set to fitted params self.fitted_params['tscatt'] = p.get_posteriors() self.fitted_params['tscatt']['npulse'] = npulse # print best fit parameters if self.verbose: p.stats() print(p) # plot if self.show_plots or self.save_plots: p.plot(xlabel = "Time [ms]", ylabel = "Flux Density (arb.)", show = False) if self.save_plots: if filename is None: filename = f"{self.par.name}_fit_tscatt.png" plt.savefig(filename) if self.show_plots: plt.show() # save instance parameters self._save_new_params() return p
[docs] def plot_periodgram(self, plot_log = False, filename = None, **kwargs): """ Plot ACF of time series plot_log: bool Also plot log y-axis """ log_title(f"plotting periodgram [ACF]", col = "lblue") dat = self.get_data("tI", get = True, **kwargs) # get acf tIacf = acf(dat['tI']) # get time lag, but only from first non-zero time lag sample dt = dat['time'][1] - dat['time'][0] tlags = np.linspace(dt, dt * tIacf.size, tIacf.size) cols = 1 if plot_log: cols = 2 fig, ax = plt.subplots(1, cols, figsize = (6 * cols, 6)) if plot_log: ax = ax.flatten() else: ax = [ax] # plot linear periodgram ax[0].plot(tlags, tIacf, 'k', linewidth = 1.5) ax[0].set_xlabel("Time lag [ms]", fontsize = 16) ax[0].set_ylabel("ACF power (arb.)", fontsize = 16) if plot_log: ax[1].plot(tlags, tIacf, 'k', linewidth = 1.5) ax[1].set_xlabel("Time lag [ms]", fontsize = 16) ax[1].set_yscale('log') fig.subplots_adjust(hspace = 0, wspace = 0) fig.tight_layout() if self.save_plots: if filename is None: filename = f"{self.par.name}_periodgram.png" else: filename += "_periodgram.png" plt.savefig(filename) if self.show_plots: plt.show() self._save_new_params() return fig
[docs] def plot_poincare(self, stk_type = "f", stk_sigma = 2.0, plot_data = True, plot_model = False, n = 5, normalise = True, plot_1D_stokes = False, filename = None, **kwargs): """ Plot Stokes data on a Poincare Sphere. Parameters ---------- filename : str, optional filename to save figure to, by default None stk_type : str, optional types of stokes data to plot, by default "f" \n [f] - Plot as a function of frequency \n [t] - Plot as a function of time stk_sigma : float, optional Error threshold used for masking stokes data in the case that stokes/I is being calculated \n this avoids deviding by potentially small numbers and getting weird results,by default 2.0 plot_data : bool, optional Plot Data on Poincare sphere, by default True plot_model : bool, optional Plot Polynomial fitted data on Poincare sphere, by default False normalise : bool, optional Plot data on surface of Poincare sphere (this will require normalising stokes data), by default True n : int, optional Maximum order of Polynomial fit, by default 5 plot_1D_stokes: bool, optional if True, plot 1D stokes line plots seperately in another figure **kwargs : Dict FRB parameter keywords Returns ------- fig : figure Return figure instance """ log_title(f"Plotting stokes [{stk_type}] data onto poincare sphere.", col = "lblue") self._load_new_params(**kwargs) if stk_type not in "tf": print("Stokes type must be either time (t) or frequency (f)") if not self._iserr(): log("Must specify off-pulse crop region", stype = "err", lpf_col = self.pcol) return # get data data_list = [] for S in "IQUV": data_list += [f"{stk_type}{S}"] self.get_data(data_list = data_list, **kwargs) if not self._isdata(): return None # what type of data, time or freq if stk_type == "t": pdat = self._t cbar_lims = self.this_par.t_lim cbar_label = "Time [ms]" else: pdat = self._f cbar_lims = [self._freq[0], self._freq[-1]] cbar_label = "Frequency [MHz]" # plot poincare sphere fig, ax = create_poincare_sphere(cbar_lims = cbar_lims, cbar_label = cbar_label) stk_i, stk_m = plot_poincare_track(pdat, ax, sigma = stk_sigma, plot_data = plot_data, plot_model = plot_model, normalise = normalise, n = n) if self.save_plots: if filename is None: filename = f"{self.par.name}_poincare.png" else: filename += "_poincare.png" plt.savefig(filename) # also plot stokes params if plot_1D_stokes: if stk_type == "t": x = self._time else: x = self._freq x_m = np.linspace(*cbar_lims, stk_m['Q'].size) fig2, ax = plt.subplots(1, 1, figsize = (10,10)) for S in "QUV": ax.plot(x, stk_i[S], label = S) ax.plot(x_m, stk_m[S], '--r') ax.set(xlabel = cbar_label, ylabel = "arb. ") ax.set_title("1D stokes plot") ax.legend() if self.save_plots: if filename is None: filename = f"{self.par.name}_poincare_spectra_fit.png" else: filename += "_poincare_spectra_fit.png" plt.savefig(filename) if self.show_plots: plt.show() self._save_new_params() return fig, fig2
[docs] def fit_RM(self, method = "RMquad", sigma: float = None, rm_prior: list = [-1000, 1000], pa0_prior: list = [-3.1415926/2, 3.1415926/2], unwrap: bool = False, fit_params: dict = None, filename: str = None, **kwargs): """ Fit Spectra for Rotation Measure Parameters ---------- method : str, optional Method to perform Rotation Measure fitting, by default "RMquad" \n [RMquad] - Fit for the Rotation Measure using the standard quadratic method \n [RMsynth] - Use the RM-tools RM-Synthesis method \n [QUfit] - Fit log-likelihood model of Stokes Q and U parameters (see bannister et al 2019 - supplementary) sigma : float, optional Apply masking based on S/N threshold given, used in [RMquad, RMsynth, QUfit] rm_prior : list priors for rotation measure, used in [QUfit], by default [-1000, 1000] pa0_prior : list priors for PA0, used in [QUfit], by default [-pi/2, pi/2] (Shouldn't need to change) fit_params : dict, optional keyword parameters for fitting method, by default None \n [RMquad] - Scipy.optimise.curve_fit keyword params \n [RMsynth] - RMtools_1D.run_synth keyword params \n [QUfit] - bilby.run_sampler keyword params filename : str, optional filename to save figure to, by default None Returns ------- p : pyfit.fit pyfit class fitting structure """ log_title(f"Fitting for RM using [{method}] method.", col = "lblue") fit_params = dict_init(fit_params) self._load_new_params(**kwargs) # check which data products are needed if method in ["RMsynth", "RMquad", "QUfit"]: data_list = ["fI", "fQ", "fU"] else: log("Invalid method for estimating RM", stype = "err", lpf_col = self.pcol) return None if self.this_metapar.terr_crop is None: log("Must specify 'terr_crop' for rms crop if you want to use RMsynth or RMquad", stype = "err", lpf_col = self.pcol) return (None, ) * 5 ## get data ## self.get_data(data_list, ignore_nans = True, **kwargs) if not self._isdata(): return None ## mask data based on S/N threshold given if sigma is not None: mask = self._f["I"] > self._f['Ierr'] * sigma else: mask = np.ones(self._f['I'].size, dtype = bool) ## run fitting for RM ## if method == "RMquad": # run quadrature method if self.this_par.f0 is None: log("f0 not given, using middle of band", stype = "warn", lpf_col = self.pcol) self.this_par.f0 = self.this_par.cfreq f0 = self.this_par.f0 # run fitting rm, rm_err, pa0, pa0_err = fit_RMquad(self._f['Q'][mask], self._f['U'][mask], self._f['Qerr'][mask], self._f['Uerr'][mask], self._freq[mask], f0, **fit_params) elif method == "RMsynth": # run RM synthesis method pa0_err = 0.0 # do this for now, TODO I, Q, U = self._f['I'], self._f['Q'], self._f['U'] Ierr, Qerr, Uerr = self._f['Ierr'], self._f['Qerr'], self._f['Uerr'] rm, rm_err, f0, pa0 = fit_RMsynth(I[mask], Q[mask], U[mask], Ierr[mask], Qerr[mask], Uerr[mask], self._freq[mask], **fit_params) elif method == "QUfit": # TODO: make reference frequency same as FDF? # run log-likelihood estimating for Q and U paramters f0 = self.par.cfreq Q, U = self._f['Q'], self._f['U'] Ierr, Qerr, Uerr = self._f['Ierr'], self._f['Qerr'], self._f['Uerr'] rm, rm_err, pa0, pa0_err = RM_QUfit(Q = Q[mask], U = U[mask], Ierr = Ierr[mask], Qerr = Qerr[mask], Uerr = Uerr[mask], f = self._freq[mask], rm_priors = rm_prior, pa0_priors = pa0_prior, **fit_params) # function for plotting diagnostics # if method == "QUfit": # def rmquad(f, rm, pa0): # angs = pa0 + rm*c**2/(f*1e6)**2 # return 90/np.pi*np.arctan2(np.sin(2*angs), np.cos(2*angs)) # else: def rmquad(f, rm, pa0): angs = pa0 + rm*c**2/1e12*(1/f**2 - 1/f0**2) return 90/np.pi*np.arctan2(np.sin(2*angs), np.cos(2*angs)) def rmquad_unwrapped(f, rm, pa0): angs = pa0 + rm*c**2/1e12*(1/f**2) return 180/np.pi * angs # put into pyfit structure PA, PA_err = calc_PA(self._f['Q'][mask], self._f['U'][mask], self._f['Qerr'][mask], self._f['Uerr'][mask]) print(self._freq[mask].size, PA.size) p = fit(x = self._freq[mask], y = 180/np.pi*PA, yerr = 180/np.pi*PA_err, func = rmquad, residuals = self.residuals) p.set_posterior('rm', rm, rm_err, rm_err) p.set_posterior('pa0', pa0, pa0_err, pa0_err) p.set_posterior('f0', f0, 0.0, 0.0) # set values to fitted_params self.fitted_params['RM'] = p.get_posteriors() self.fitted_params['RM']['f0'] = _posterior(f0, 0, 0) p._is_fit = True p._is_stats = True p._get_stats() print(p) # plot if self.save_plots or self.show_plots: pa_ylim = [-90, 90] if unwrap: y_wrap = p.y.copy() func_wrap = p.func y_unwrap, _, _ = unwrap_pa(p.x, p.y, rm, pa0) p.set(y = y_unwrap, func = rmquad_unwrapped) y_height = np.max(y_unwrap) - np.min(y_unwrap) pa_ylim = [np.min(y_unwrap) - 0.15*y_height, np.max(y_unwrap) + 0.15*y_height] fig = p.plot(xlabel = "Frequency [MHz]", ylabel = "PA [deg]", ylim = pa_ylim, show = False) if unwrap: p.set(y = y_wrap, func = func_wrap) if self.residuals: fig.axes[1].set(ylim = [-360, 360]) if self.save_plots: if filename is None: filename = f"{self.par.name}_RM_fit.png" plt.savefig(filename) if self.show_plots: plt.show() self._save_new_params() return p
[docs] def plot_PA(self, Ldebias_threshold = 2.0, stk2plot = "ILV", flipPA = False, stk_ratio = False, fit_params: dict = None, filename: str = None, save_files = False, **kwargs): """ Plot Figure with PA profile, Stokes Time series data, and Stokes I dyspec. If RM is not specified, will be fitted first. Parameters ---------- Ldebias_threshold : float, optional Sigma threshold for PA masking, by default 2.0 stk2plot : str, optional string of stokes to plot, for example if "QV", only stokes Q and V are plotted, \n by default "IQUV", choice between "IQUVLP" flipPA : bool, optional Plot PA between [0, 180] degrees instead of [-90, 90], by default False stk_ratio: bool, optional Plot Stokes ratios in time series ax, by default False fit_params : dict, optional keyword parameters for fitting method, by default None \n [RMquad] - Scipy.optimise.curve_fit keyword params \n [RMsynth] - RMtools_1D.run_synth keyword params filename : str, optional filename of figure to save to, by default None save_files : Bool, optional if true, will save 1D .npy file with PA and .npy file with PAerrs, by default False Returns ------- fig : figure Return figure instance """ log_title("Plotting PA mosaic image with PA profile, Stokes profile and Dynamic spectra.", col = "lblue") # initialise parameters fit_params = dict_init(fit_params) self._load_new_params(**kwargs) if self.this_metapar.terr_crop is None: log("Need to specify 'terr_crop' for rms estimation", stype = "err", lpf_col = self.pcol) return None if self.this_par.RM is None: log("RM not specified, either provide an RM or fit for one using .fit_RM()", lpf_col = self.pcol, stype = "err") return None ## get data data_list = ["dsI", "dsQ", "dsU", "tI", "tQ", "tU", "fQ", "fU", "tV"] self.get_data(data_list, **kwargs) if not self._isdata(): return None ## calculate PA stk_data = {"tQ":self._t["Q"], "tU":self._t["U"], "tQerr":self._t["Qerr"], "tUerr":self._t["Uerr"], "tIerr":self._t["Ierr"]} PA, PA_err = calc_PAdebiased(stk_data, Ldebias_threshold = Ldebias_threshold) # create figure fig, AX = plt.subplot_mosaic("P;S;D", figsize = (12, 10), gridspec_kw={"height_ratios": [1, 2, 2]}, sharex=True) _x = np.linspace(*self.this_par.t_lim, PA.size) ## plot PA plot_PA(_x, PA, PA_err, ax = AX['P'], flipPA = flipPA) ## plot Spectra pdat = {'time':_x} for S in "IQUV": pdat[f"t{S}"] = self._t[S] pdat[f"t{S}err"] = self._t[f"{S}err"] plot_stokes(pdat, ax = AX['S'], stk_type = "t", stk2plot = stk2plot, Ldebias = True, plot_type = self.plot_type, stk_ratio = stk_ratio) ## plot dynamic spectra ds_freq_lims = fix_ds_freq_lims(self.this_par.f_lim, self.this_par.df) plot_dynspec(self._ds['I'], ax = AX['D'], aspect = 'auto', extent = [*self.this_par.t_lim,*ds_freq_lims]) AX['D'].set_ylabel("Frequency [MHz]", fontsize = 12) AX['D'].set_xlabel("Time [ms]", fontsize = 12) # adjust figure fig.tight_layout() fig.subplots_adjust(hspace = 0) AX['P'].get_xaxis().set_visible(False) AX['S'].get_xaxis().set_visible(False) if self.save_plots: if filename is None: filename = f"{self.par.name}_PA_mosaic.png" plt.savefig(filename) if save_files: print(f"Saving PA data to {self.par.name}_PA.npy...") np.save(f"{self.par.name}_PA.npy", PA) print(f"Saving PA err data to {self.par.name}_PAerr.npy...") np.save(f"{self.par.name}_PAerr.npy", PA_err) if self.show_plots: plt.show() self._save_new_params() return fig
[docs] def calc_polfracs(self, stk_debias = False, peak_sigma = 3.0, peak_average_factor = 1, **kwargs): """ Calculate polarisation fractions using a number of different methods. Parameters ---------- debias : bool, optional Debiases Stokes L, P and abs(V), abs(Q) and abs(U), by default False. peak_sigma : float, optional Provide a threshold in terms of I/Ierr that will be used to mask the data before estimating the peak fraction of each stokes parameter. This will be nessesary to filter out noisy data. peak_average_factor, float averaging (downsampling) factor to apply to X(t) stokes profiles to help estimate their peaks """ log_title("Calculating Polarisation fractions", col = "lblue") # We wont over complicate this, only calculate polarisation fractions if # all stokes dynamic spectra are loaded loaded_stk = "" for s in "IQUV": if self.ds[s] is None: log(f"Stokes {s} dynamic spectrum is missing, make sure all Stokes dynspecs are loaded in!", stype = "err") return # proc KWARGS self._load_new_params(**kwargs) # get data S = self.get_data(["tI", "tQ", "tU", "tV", "tL", "tP"], get = True, stk_debias = stk_debias, **kwargs) nsamp = S['tI'].size # check if error was given, else turn off debias err = True if self.this_metapar.terr_crop is None: stk_debias = False err = False log("No off-pulse crop given to calculate dibased L, P and/or |U/Q/V|, specify [terr_crop]...", stype = "warn") log("No peak fractions we be calculatedm specify [terr_crop]...", stype = "warn") # calculated integrated stokes intS = {} for s in "IQUVLP": intS[s] = np.nansum(S[f't{s}']) if err: if s in "LP": intS[f'{s}err'] = np.nansum(S[f't{s}err']**2)**0.5 else: intS[f'{s}err'] = nsamp**0.5 * S[f't{s}err'] else: intS[f"{s}err"] = None # calculate integrated absolute Stokes Q, U and V for s in "QUV": if err: intS[f"abs{s}"] = np.nansum(calc_stokes_abs_debias(S[f't{s}'], S['tIerr'])) else: intS[f"abs{s}"] = np.nansum(np.abs(S[f't{s}'])) # calculate Stokes fractions fracS = {} polname = ["I", "q", "u", "v", "l", "p"] for i, s in enumerate("IQUVLP"): fracS[polname[i]], fracS[f'{polname[i]}err'] = calc_ratio(intS['I'], intS[s], intS['Ierr'], intS[f'{s}err']) polname = ["|q|", "|u|", "|v|"] for i, s in enumerate("QUV"): # absolute values fracS[polname[i]], fracS[f'{polname[i]}err'] = calc_ratio(intS['I'], intS[f'abs{s}'], intS['Ierr'], intS[f'{s}err']) # get peak Q, U, V, L and P if err: # average Stokes I for masking peaks = {} peaks_pos = {} # find mask based on sigma value mask = (average(S['tI'], N = peak_average_factor, nan = True) /average(S['tIerr'], N = peak_average_factor, nan = True)) < peak_sigma stk_frac = {} S['time'] = average(S['time'], N = peak_average_factor) print(S['tI'].size) print(S['tQ'].size) for s in "QUVLP": # get Stokes ratio stk_frac[s.lower()], stk_frac[f'{s.lower()}err'] = calc_ratio(S['tI'], S[f't{s}'], S['tIerr'], S[f't{s}err']) # average stk_frac[s.lower()] = average(stk_frac[s.lower()], N = peak_average_factor, nan = True) stk_frac[f'{s.lower()}err'] = average(stk_frac[f'{s.lower()}err'], N = peak_average_factor, nan = True) # mask stk_frac[s.lower()][mask] = np.nan stk_frac[f'{s.lower()}err'][mask] = np.nan # find peak peak_ind = np.nanargmax(np.abs(stk_frac[s.lower()])) peaks[s.lower()] = stk_frac[s.lower()][peak_ind] peaks[f"{s.lower()}err"] = stk_frac[f'{s.lower()}err'][peak_ind] peaks_pos[s.lower()] = S['time'][peak_ind] # diagnostic plots fig, ax = plt.subplots(1, 1, figsize = (12,8)) for s in "quvlp": _PLOT(S['time'], stk_frac[s], stk_frac[f'{s}err'], ax = ax, plot_type = self.plot_type, color = _G.stk_colors[s.upper()]) ylim = ax.get_ylim() for s in "quvlp": # plot marker ax.plot([peaks_pos[s]]*2, ylim, color = _G.stk_colors[s.upper()], linestyle = "--", label = f'${s}_{{peak}}$ at t = {peaks_pos[s]:.2f} ms') ax.set(ylim = ylim, xlabel = "Time [ms]", ylabel = "Stokes X/I fraction") ax.legend() # save figure if self.save_plots: filename = f"{self.par.name}_peak_polfracs.png" plt.savefig(filename) # Now we can print everything out debias_flag = "FALSE\n" if stk_debias: debias_flag = "TRUE\n" print("\nStokes fractions:") print("="*50) print(f"debiased = ", debias_flag) def _print_err(val): if err: return f"{val:.4f}" else: return "None" print("======= CONTINUUM-ADDED Stokes fractions =======\n") print("These fractions are calculated by first") print("integrating over the debiased polarisation profile") print("="*50, "\n") print("Legend:") print("l = sum(L(t))/sum(I(t))\n") print("|q|".ljust(15), f"{fracS['|q|']:.4f} +/- {_print_err(fracS['|q|err'])}") print("|u|".ljust(15), f"{fracS['|u|']:.4f} +/- {_print_err(fracS['|u|err'])}") print("|v|".ljust(15), f"{fracS['|v|']:.4f} +/- {_print_err(fracS['|v|err'])}") print("l".ljust(15), f"{fracS['l']:.4f} +/- {_print_err(fracS['lerr'])}") print("p".ljust(15), f"{fracS['p']:.4f} +/- {_print_err(fracS['perr'])}\n") print("Integrated (Signed) Stokes Paramters") print("q".ljust(15), f"{fracS['q']:.4f}".ljust(7) + f" +/- {_print_err(fracS['qerr'])}") print("u".ljust(15), f"{fracS['u']:.4f}".ljust(7) + f" +/- {_print_err(fracS['uerr'])}") print("v".ljust(15), f"{fracS['v']:.4f}".ljust(7) + f" +/- {_print_err(fracS['verr'])}\n") print("==== Vector-addded Stokes L and P fractions ====") print("="*50, "\n") print("Legend:") print("l* = sqrt(q^2 + u^2)") print("|l|* = sqrt(|q|^2 + |u|^2)") print("p* = sqrt(q^2 + u^2 + v^2)") print("|p|* = sqrt(|q|^2 + |u|^2 + |v|^2)\n") # calculate l* and p* fracS['l*'], fracS['l*err'] = calc_L(fracS['q'], fracS['u'], fracS['qerr'], fracS['uerr']) fracS['p*'], fracS['p*err'] = calc_P(fracS['q'], fracS['u'], fracS['v'], fracS['qerr'], fracS['uerr'], fracS['verr']) print("l*".ljust(15), f"{fracS['l*']:.4f} +/- {_print_err(fracS['l*err'])}") print("p*".ljust(15), f"{fracS['p*']:.4f} +/- {_print_err(fracS['p*err'])}") # calculate |l|* and |p|* fracS['|l|*'], fracS['|l|*err'] = calc_L(fracS['|q|'], fracS['|u|'], fracS['|q|err'], fracS['|u|err']) fracS['|p|*'], fracS['|p|*err'] = calc_P(fracS['|q|'], fracS['|u|'], fracS['|v|'], fracS['|q|err'], fracS['|u|err'], fracS['|v|err']) print("|l|*".ljust(15), f"{fracS['|l|*']:.4f} +/- {_print_err(fracS['|l|*err'])}") print("|p|*".ljust(15), f"{fracS['|p|*']:.4f} +/- {_print_err(fracS['|p|*err'])}\n") print("= Total polarisation fraction calculated L and V =") print("="*50, "\n") print("Legend:") print("p^ = sqrt(l^2 + v^2)") print("|p|^ = sqrt(l^2 + |v|^2)\n") fracS['p^'], fracS['p^err'] = calc_L(fracS['l'], fracS['v'], fracS['lerr'], fracS['verr']) fracS['|p|^'], fracS['|p|^err'] = calc_L(fracS['l'], fracS['|v|'], fracS['lerr'], fracS['|v|err']) print("p^".ljust(15), f"{fracS['p^']:.4f} +/- {_print_err(fracS['p^err'])}") print("|p|^".ljust(15), f"{fracS['|p|^']:.4f} +/- {_print_err(fracS['|p|^err'])}\n") if err: print(f"= Peak absolute polarisation fraction at dt = [{self.this_par.dt * 1000:.0f}] us =") print(f"="*50, "\n") for s in "quvlp": print(f"{s}_peak".ljust(15), f"{peaks[s]:.4f} +/- {peaks[f'{s}err']:.4f}".ljust(25), f"at time t = {peaks_pos[s]:.2f} ms") if self.save_plots: print(f"\nPrinting out diagnostic plot of stokes polarisation fractions as a function of time [{filename}]\n") if self.show_plots and err: plt.show() return fracS