Source code for ilex.data

##===============================================##
##===============================================##
## Author: Tyson Dial                            ##
## Email: tdial@swin.edu.au                      ##
## Last Updated: 09/01/2024                      ##
##                                               ##
##                                               ##
##                                               ##
##                                               ##
## Data processing library to manipulate Stokes  ##
## High time resolution (HTR) data               ##
##===============================================##
##===============================================##

import numpy as np
from scipy.signal import correlate
from copy import deepcopy
from .globals import *


##===============================================##
##      Basic functions to manipulate data       ##
##===============================================##

##  function to average data    ##
[docs] def average(x: np.ndarray, axis: int = 0, N: int = 10, nan = False): """ average in either frequency or time Parameters ---------- x: ndarray data to average over axis: int axis to average over N: int Averaging/donwsampling factor nan : bool, optional If True, using nanmean to ignore NaN values in array 'x', by default False Returns ------- x: ndarray Averaged data """ # if nan if true, will use numpy function that ignores nans in array x if nan: func = np.nanmean else: func = np.mean if N == 1: return x # either dynamic spectra or time series ndims = x.ndim if ndims == 1: N_new = int(x.size / N) * N return func(x[:N_new].reshape(int(N_new / N), N),axis = 1).flatten() elif ndims == 2: if axis == 0: #frequency scrunching N_new = int(x.shape[0] / N) * N return func(x[:N_new].T.reshape(x.shape[1],int(N_new / N), N),axis = 2).T elif axis == 1 or axis == -1: #time scrunching N_new = int(x.shape[1] / N) * N return func(x[:,:N_new].reshape(x.shape[0],int(N_new / N), N),axis = 2) else: print("axis must be 1[-1] or 0") return x else: print("ndims must equal 1 or 2..") return x
[docs] def scrunch(x: np.ndarray, axis: int = 0, weights = None, nan = False): """ Scrunch data along a given axis, weights may also be applied. Parameters ---------- x: np.ndarray data to scrunch axis: int, optional axes to scrunch over, by default 0 weights: array-like or float, optional weights to apply during scrunching, by default None nan : bool, optional If True, using nanmean to ignore NaN values in array 'x', by default False Returns ------- y: np.ndarray Weighted and scrunched data """ # if nan if true, will use numpy function that ignores nans in array x if nan: func = np.nanmean else: func = np.mean # check weights if weights is None: weights = 1.0 if type(weights) == np.ndarray: if weights.size != x.shape[axis]: print(f"weights {weights.size} != xdata {x.size}") print("size of weights array mismatch, using uniform weighting instead") weights = 1.0 else: if axis == 0: w_shape = (weights.size, 1) elif axis == 1 or axis == -1: w_shape = (1, weights.size) weights = weights.reshape(w_shape) # scrunch y = func(x * weights, axis = axis) return y
[docs] def downsample(x: np.ndarray, axis: int = -1, N: int = 1, nan = False, mode = "mean"): """ Downsample in either frequency or time Parameters ---------- x: ndarray data to average over axis: int axis to average over N: int donwsampling factor nan : bool, optional If True, using nanmean to ignore NaN values in array 'x', by default False mode : str Method of downsampling, ["mean", "sum"] Returns ------- x: ndarray Averaged data """ # if nan if true, will use numpy function that ignores nans in array x if mode == "mean": func = np.mean if nan: func = np.nanmean elif mode == "sum": func = np.sum if nan: func = np.nansum else: ValueError(f"Downsampling mode must be either ['mean', 'sum']") if N == 1: return x # either dynamic spectra or time series ndims = x.ndim if ndims == 1: N_new = int(x.size / N) * N return func(x[:N_new].reshape(int(N_new / N), N),axis = 1).flatten() elif ndims == 2: if axis == 0: #frequency scrunching N_new = int(x.shape[0] / N) * N return func(x[:N_new].T.reshape(x.shape[1],int(N_new / N), N),axis = 2).T elif axis == 1 or axis == -1: #time scrunching N_new = int(x.shape[1] / N) * N return func(x[:,:N_new].reshape(x.shape[0],int(N_new / N), N),axis = 2) else: print("axis must be 1[-1] or 0") return x else: print("ndims must equal 1 or 2..") return x
## function to index in phase ##
[docs] def pslice(x: np.ndarray, start: float, end: float, axis: int = 0): """ Slice 1D or 2D data in phase, between 0.0-1.0 which represents the start and end of ndarray along given axis. If array is 1D, given axis is set to 0. Parameters ---------- x : np.ndarray 1D or 2D data array start : float starting point of slice end : float ending point of slice axis : int, optional axis to slice along, can be 0 or 1(-1) Returns ------- y: np.ndarray sliced 1D or 2D data array """ # Check number of dims ndims = x.ndim if ndims == 1: # time series start, end = int(start*x.size), int(end*x.size) if start < 0 or end > x.size: print("Phase slicing must be between [0,1]") return None return x[start:end].copy() elif ndims == 2: if axis == 0: # frequency phase slicing start, end = int(start*x.shape[0]), int(end*x.shape[0]) return x[start:end,:].copy() elif axis == 1 or axis == -1: start, end = int(start*x.shape[1]), int(end*x.shape[1]) return x[:,start:end].copy() else: print("axis must be 1[-1] or 0") return x else: print("ndims must be either 1 or 2") return x
[docs] def rotate_data(A, B, angle): """ Rotate data in 2D Parameters ---------- A : np.ndarray First array B : np.ndarray Second array angle : float angle [rad] to rotate A and B in 2D Returns ------- X: np.ndarray First array rotated Y: np.ndarray Second array rotated """ # apply rotation between A and B (and errors) X = A*np.cos(angle) - B*np.sin(angle) Y = A*np.sin(angle) + B*np.cos(angle) return X, Y
[docs] def f_weight(x, fW): """ Apply frequency weights on a 2D array, it is assumed that the freq axis is axis = 0 Parameters ---------- x : np.ndarray 2D array, (f, t) fW : np.ndarray or float frequency weights Returns ------- np.ndarray weighted 2D array """ if fW is not None: if np.isscalar(fW): return x * fW else: return x * fW[:, None] else: return x
[docs] def t_weight(x, tW): """ Apply time weights on a 2D array, it is assumed that the time axis is axis = 1 Parameters ---------- x : np.ndarray 2D array, (f, t) fW : np.ndarray or float time weights weights Returns ------- np.ndarray weighted 2D array """ if tW is not None: if np.isscalar(tW): return x * tW else: return x * tW[None, :] else: return x
[docs] def norm(x, method = "abs_max", nan = False): """ Normalise data Parameters ---------- x : np.ndarray data to normalise method : str, optional method of normalising, by default "abs_max" \n [abs_max] - Normalise using absolute maximum abs(max) \n [max] - Normalise using maximum \n [unit] - normalise data between -1 and 1 - not implemented nan : bool, optional If True, using nanmax to ignore NaN values in array 'x', by default False Returns ------- np.ndarray normalised data """ # if nan if true, will use numpy function that ignores nans in array x if nan: func = np.nanmax else: func = np.max # normalise using the maximum value if method == "max": x /= func(x) # normalise using the absolute maximum value elif method == "abs_max": x /= np.abs(func(x)) # normalise data to between -1 and 1 [-1, 1] elif method == "unit": print("Not implemented yet") elif method == "None": pass else: print("invalid method for normalisation") return x
##===============================================## ## Advanced functions for HTR data ## ##===============================================##
[docs] def fday_rot(Q, U, f, RM, f0 = None, pa0 = 0.0): """ Apply Faraday rotation to 1D or 2D Stokes data Parameters ---------- Q : np.ndarray Stokes Q data U : np.ndarray Stokes U data f : np.ndarray frequency array RM : float Rotation Measure [rad/m2] f0 : float reference frequency [MHz] pa0 : float, optional reference position angle [rad], by default 0.0 Returns ------- Qrot : np.ndarray de-faraday rotated Stokes Q Urot : np.ndarray de-faraday rotated Stokes U """ if RM == 0.0 or RM is None: return Q, U # if f0 is None or f0 == 0.0: # print("Must specify non-zero f0") # return Q, U if pa0 is None: pa0 = 0.0 # calculate faraday angle if (f0 is None) or (f0 == 0.0): fang = RM * c**2 / 1e12 * (1/f**2) + pa0 else: fang = RM * c**2 / 1e12 * (1/f**2 - 1/f0**2) + pa0 # dynamic spectra or spectra? if Q.ndim > 1: fang = fang[:, None] # de-rotate using faraday angle return rotate_data(Q, U, -2*fang)
# def denoise(): # pass # create a function to zap channels
[docs] def zap_chan(f, zap_str): """ Zap channels, assumes contiguous frequency array Parameters ---------- f : np.ndarray frequency array used for zapping zap_str : str string used for zapping channels, in format -> "850, 860, 870:900" \n each element seperated by a ',' is a seperate channel. If ':' is used, user can specify a range of values \n i.e. 870:900 -> from channel 870 to 900 inclusive of both. Returns ------- y : np.ndarray zapped indicies in frequency """ if zap_str is None: return [] if len(zap_str) == 0: return [] # vals df = f[1] - f[0] f_min = np.min(f) f_max = np.max(f) if df < 0: # upperside band fi = f_max df_step = -1 else: # lowerside band fi = f_min df_step = 1 df = abs(df) # split segments zap_segments = zap_str.split(',') # print(zap_str) # print(zap_segments) seg_idx = [] # for each segment, check for delimiter :, else float cast for i, zap_seg in enumerate(zap_segments): # if segment is a range of frequencies if ":" in zap_seg: zap_range = zap_seg.strip().split(':') zap_0 = round(df_step * (float(zap_range[0]) - fi)/df) zap_1 = round(df_step * (float(zap_range[1]) - fi)/df) # check if completely outside bounds if (zap_0 < 0 and zap_1 < 0) or (zap_0 > f.size -1 and zap_1 > f.size -1): print(f"zap range [{zap_range[0]}, {zap_range[1]}] MHz out of range of bandwidth [{f_min}, {f_max}] MHz") continue # check bounds crop_zap = False if zap_0 < 0: crop_zap = True zap_0 = 0 elif zap_0 > f.size - 1: crop_zap = True zap_0 = f.size - 1 if zap_1 < 0: crop_zap = True zap_1 = 0 elif zap_1 > f.size - 1: crop_zap = True zap_1 = f.size - 1 if crop_zap: print(f"zap range cropped from [{zap_range[0]}, {zap_range[1]}] MHz -> [{f[zap_0]}, {f[zap_1]}] MHz") seg_idx += list(range(zap_0,zap_1+(1*df_step),df_step))[::df_step] # if segment is just a single frequency else: _idx = round(df_step * (float(zap_seg.strip()) - fi)/df) if (_idx < 0) or (_idx > f.size - 1): print(f"zap channel {zap_seg.strip()} MHz out of bounds of bandwidth [{f_min}, {f_max}] MHz") else: seg_idx += [_idx] return seg_idx
[docs] def get_zapstr(chan, freq): """ Create string of channels to zap based on given nan frequencies in stokes I dynamic spectrum Parameters ---------- chan : np.ndarray or array-like Stokes I freq array freq : np.ndarray or array-like Array of frequency values in [MHz] Returns ------- zap_str: str string of frequencies to zap using zap_chan function """ # could be improved later with a smarter algorithm, but not nessesary for ilex. if freq[0] > freq[1]: chan = chan[::-1] freq = freq[::-1] zap_str = "" chan2zap = np.argwhere(np.isnan(chan)).flatten() i = 0 while i < chan2zap.size: j = 0 while i + j + 1 < chan2zap.size: if chan2zap[i + 1 + j] - chan2zap[i + j] == 1: j += 1 else: break if j > 3: zap_str += "," + str(freq[chan2zap[i]]) + ":" + str(freq[chan2zap[i + j]]) else: for k in range(j+1): zap_str += f",{freq[chan2zap[i+k]]}" i += j + 1 if zap_str != "": zap_str = zap_str[1:] # remove ',' if freq[0] > freq[1]: chan = chan[::-1] freq = freq[::-1] return zap_str
[docs] def combine_zapchan(chan1, chan2): """ Combine two zapchan strings together without duplication. If chan1 is NoneType, return chan2 Parameters ---------- chan1 : str String of current channels chan2 : str String of channels to add Returns ------- zapstr: str combined String of channels to zap """ if chan1 is None: if chan2 is None: return "" else: return chan2 if chan2 is None: if chan1 is None: return "" else: return chan1 if len(chan2) == 0: return chan1 chan1_list = chan1.split(',') chan2_list = chan2.split(',') zapstr = chan1 for i, chan in enumerate(chan2_list): if chan not in chan1_list: if zapstr == "": zapstr += f"{chan}" else: zapstr += f",{chan}" return zapstr
##===============================================## ## Aditional Stokes params calcs ## ##===============================================##
[docs] def calc_PA(Q, U, Qerr = None, Uerr = None, rad2deg = False): """ Calculate Position Angle (PA) and PA angle Parameters ---------- Q : np.ndarray Stokes Q data U : np.ndarray Stokes U data Qerr : np.ndarray Stokes Q err data Uerr : np.ndarray Stokes U err data rad2deg: bool, optional If true, converts output to degrees, by default is False Returns ------- PA : np.ndarray Position Angle (PA) PAerr : np.ndarray Position Angle err """ errflag = True PAerr = None if (Qerr is None) or (Uerr is None): errflag = False # calculate PA and error PA = 0.5 * np.arctan2(U, Q) if errflag: PAerr = 0.5 * np.sqrt((Q**2*Uerr**2 + Q**2*Qerr**2)/ (Q**2 + Q**2)**2) # convert to degrees if requested if rad2deg: PA *= 180/np.pi if errflag: PAerr *= 180/np.pi return PA, PAerr
[docs] def calc_PAdebiased(stk, Ldebias_threshold = 2.0, rad2deg = False): """ Calculate time-dependant Position Angle masked using stokes L debiased Parameters ---------- stk : Dict(np.ndarray) Dictionary of Stokes data \n [tQ] - Stokes Q dynamic spectra \n [tU] - Stokes U dynamic spectra \n [tQerr] - Stokes Q average rms over time \n [tUerr] - Stokes U average rms over time \n [tIerr] - Stokes I average rms over time Ldebias_threshold : float, optional sigma threshold for masking PA, by default 2.0 rad2deg: bool, optional If true, converts output to degrees, by default is False Returns ------- PA : np.ndarray Position Angle (PA) PAerr : np.ndarray Position Angle err """ # calculate PA and error PA = 0.5 * np.arctan2(stk['tU'], stk['tQ']) PAerr = 0.5 * np.sqrt((stk['tQ']**2*stk['tUerr']**2 + stk['tU']**2*stk['tQerr']**2)/ (stk['tQ']**2 + stk['tU']**2)**2) # calculate de-baised L and mask PA L_debias,_ = calc_Ldebiased(stk['tQ'], stk['tU'], stk['tIerr']) PA_mask = L_debias < (Ldebias_threshold * stk['tIerr']) # mask PA PA[PA_mask] = np.nan PAerr[PA_mask] = np.nan if rad2deg: PA *= 180/np.pi PAerr *= 180/np.pi return PA, PAerr
[docs] def calc_L(Q, U, Qerr = None, Uerr = None): """ Calculate L Parameters ---------- Q : np.ndarray or float Stokes Q data U : np.ndarray or float Stokes U data Qerr : np.ndarray or float Stokes Q error Uerr : np.ndarray or float Stokes U error Returns ------- L : np.ndarray L data Lerr : np.ndarray L errors """ # calc L L = np.sqrt(Q**2 + U**2) # calc Error in L Lerr = None if (Qerr is not None) and (Uerr is not None): Lerr = np.sqrt(Q**2*Qerr**2 + U**2*Uerr**2) / L if hasattr(Q, '__len__') and hasattr(U, '__len__'): Lmask = L != 0.0 Lerr[~Lmask] = np.nan L[~Lmask] = np.nan return L, Lerr
[docs] def calc_Ldebiased(Q, U, Ierr, Qerr = None, Uerr = None): """ Calculate De-biased Linear polarisation fraction (see Everett & Weisberg+2001) Parameters ---------- Q : np.ndarray Stokes Q data U : np.ndarray Stokes U data Ierr : np.ndarray Stokes U err data Qerr : np.ndarray Stokes Q error Uerr : np.ndarray Stokes U error Returns ------- L_debias : np.ndarray Stokes L debias Lerr : np.ndarray L errors """ L_meas = np.sqrt(Q**2 + U**2) L_debias = Ierr * np.sqrt((L_meas/Ierr)**2 - 1) L_debias[np.isnan(L_debias)] = 0 L_debias[L_meas/Ierr < 1.57] = 0 Lerr = None if (Qerr is not None) and (Uerr is not None): Lerr = np.sqrt(Q**2*Qerr**2 + U**2*Uerr**2) / L_debias Lmask = L_debias != 0.0 Lerr[~Lmask] = np.nan L_debias[~Lmask] = np.nan return L_debias, Lerr
[docs] def calc_P(Q, U, V, Qerr = None, Uerr = None, Verr = None): """ Calculate P Parameters ---------- Q : np.ndarray or float Stokes Q data U : np.ndarray or float Stokes U data V : np.ndarray or float Stokes V data Qerr : np.ndarray or float Stokes Q error Uerr : np.ndarray or float Stokes U error Verr : np.ndarray or float Stokes V error Returns ------- P : np.ndarray P data Perr : np.ndarray P errors """ # calculate L L, Lerr = calc_L(Q, U, Qerr, Uerr) # calculate P P = np.sqrt(L**2 + V**2) # calculate Perr Perr = None if (Lerr is not None) and (Verr is not None): Perr = np.sqrt(L**2*Lerr**2 + V**2*Verr**2) / P if hasattr(L, '__len__') and hasattr(V, '__len__'): Pmask = P != 0.0 Perr[~Pmask] = np.nan P[~Pmask] = np.nan return P, Perr
[docs] def calc_Pdebiased(Q, U, V, Ierr, Qerr = None, Uerr = None, Verr = None): """ Calculate De-biased Linear polarisation fraction then calculate total polarisation (see Everett & Weisberg+2001) Parameters ---------- Q : np.ndarray Stokes Q data U : np.ndarray Stokes U data V : np.ndarray Stokes V data Ierr : np.ndarray Stokes U err data Qerr : np.ndarray Stokes Q error Uerr : np.ndarray Stokes U error Verr : np.ndarray Stokes V error Returns ------- P_debias : np.ndarray P debias Perr : np.ndarray P errors """ # calc L debais L_debias, Lerr = calc_Ldebiased(Q, U, Ierr, Qerr, Uerr) # calc P P_debias = np.sqrt(L_debias**2 + V**2) # calc P err Perr = None if (Lerr is not None) and (Verr is not None): Perr = np.sqrt(L_debias**2*Lerr**2 + V**2*Verr**2) / P_debias Pmask = P_debias != 0.0 Perr[~Pmask] = np.nan P_debias[~Pmask] = np.nan return P_debias, Perr
[docs] def calc_ratio(I, X, Ierr = None, Xerr = None): """ Calculate Stokes Ratio X/I Parameters ---------- I : np.ndarray Stokes I data X : np.ndarray Stokes [X] data, usually either Q, U or V Ierr : np.ndarray, optional Stokes I err data, by default None, if both Ierr and Xerr is given \n Stokes X/I err will also be calculated and returned Xerr : np.ndarray, optional Stokes [X] err data, by default None Returns ------- XI : np.ndarray Stokes X/I data XIerr : np.ndarray, optional Stokes X/I err data, by default None """ XIerr = None # calc XI XI = X/I # calc error? if (Ierr is not None) and (Xerr is not None): XIerr = np.sqrt((Xerr/I)**2 + (Ierr*X/I**2)**2) # # check if array or scalar # if keep_size: # if not hasattr(Ierr, "__len__") or not hasattr(Xerr, "__len__"): # # take standard deviation # XIerr = np.nanmean(XIerr) return XI, XIerr
[docs] def calc_stokes_abs_debias(X, Ierr): """ Calculate the absolute Stokes profile, debiased as outlined in Karastergiou et al. 2003 and Posselt et al. 2022. Works best if the Stokes profile is zero-mean, i.e. baseline subtracted. Parameters ---------- X : np.array or array-like Stokes profile Ierr : Uncertainties in Stokes I profile """ absX = np.abs(X) mask = absX > Ierr * (2/np.pi)**0.5 absX -= Ierr * (2/np.pi)**0.5 absX[~mask] = 0.0 return absX
[docs] def calc_freqs(cfreq, bw = 336.0, df = 1.0, upper = True): """ Calculate Frequencies Parameters ---------- cfreq : float Central frequency [MHz] bw : float, optional Bandwidth [MHz], by default 336.0 df : float, optional channel width [MHz], by default 1.0 upper : bool, optional If true, freq band starts at top, by default True Returns ------- freqs : np.ndarray Frequency array """ freqs = np.linspace(cfreq - bw/2 + df/2, cfreq + bw/2 - df/2, int(bw / df)) if upper: freqs = freqs[::-1] return freqs
##===============================================## ## basic statistics functions ## ##===============================================## ## [ GET RESIDUALS OF SOME FUNCTION ] ##
[docs] def residuals(y,n=5): """ Calculate residuals of a data array by subtracting the mean model. This function can handle NaN values Parameters ---------- y : np.ndarray data array n : int, optional max order for polynomial to fit to mean model of y, by default 5 Returns ------- np.ndarray residuals of y """ x = np.arange(y.size) # set x vals to nans for corrosponding y values mask = np.isnan(y) y_fit = np.poly1d(np.polyfit(x[~mask],y[~mask],n)) y_out = y.copy() y_out[~mask] -= y_fit(x[~mask]) return y_out, y_fit
def _nanacf(x): """ autocorrelate with NaN values Parameters ---------- x : np.ndarray (1D) 1D vector to correlate """ kmax = 2*x.size - 1 # output vector of 1D correlation corrout = np.zeros(kmax) # set up arrays y = x.conj().T # First half for k in range(kmax // 2): # calculate acf, need to normalise in case NaNs are present corrout[k] = np.nanmean(x[0:k+1] * y[y.size - 1 - k:]) * (k + 1) # if there is a mid point corrout[k + 1] = np.nanmean(x * y) * (x.size) # second half corrout[k + 2:] = corrout[-k - 3::-1] return corrout ## [ AUTO-CORRELATION FUNCTION ] ##
[docs] def acf(x, outs = "unique"): """ Calculate normalised Auto-Correlation function using 'fft' method of real-valued data. If NaN values are present, the acf function will use a direct summation approach that ignores any NaN values. Parameters ---------- x : np.ndarray data array, 1D vector outs : str, optional describes output of acf function, by default "unique" \n [unique] - Take positive acf lags, exclude zero-lag peak [all] - Take full acf, including positive, negative and zero lags Returns ------- np.ndarray Auto-Correlation of x data """ # if nan values exist, use "direct" method, will use sum method instead if np.any(np.isnan(x)): acorr = _nanacf(x) else: #correlate using FFT approach acorr = correlate(x,x,mode = "full", method = "fft") acorr /= acorr[acorr.size // 2] if outs == "unique": return acorr[x.size:] elif outs == "all": return acorr else: return None
## [ CROSS-CORRELATION FUNCTION ] ##
[docs] def ccf(x, y, outs = "unique"): """ Calculate normalised Cross-Correlation function using 'fft' method of real-valued data. Does NOT support NaN values Parameters ---------- x : np.ndarray data array, 1D y : np.ndarray data array, 1D outs : str, optional describes output of ccf function, by default "unique" \n [unique] - Take positive acf lags, including zero-lag peak [all] - Take full acf, including positive, negative and zero lags Returns ------- np.ndarray Cross-Correlation of x data """ #correlate xcorr = correlate(x,y,mode = "full",method = "fft")/(np.sum(x**2)*np.sum(y**2))**0.5 if outs == "unique": return xcorr[x.size-1:] elif outs == "all": return xcorr else: return None
[docs] def unwrap_pa(f, pa, rm, pa0): """ Unwrap polarisation position angle (PA) with a given RM """ if rm == 0.0: ValueError("RM is 0, cannot unwrap...") def rmquad(f, rm, pa0): angs = rm*c**2/1e12*(1/f**2) return 180/np.pi*angs # calculate number of points needed for PA model f1 = f[-1] rmconst1 = 180 * rm * c**2 / (np.pi * 1e12) - f1**2 rmconst2 = 2 * f1 * 180 * rm * c**2 / (np.pi * 1e12) - 2*f1**3 rmconst3 = f1**4 rmsgn = rm/abs(rm) eps = (-rmconst2 + rmsgn * np.sqrt(rmconst2**2 + 4*rmconst1*rmconst3)) / (2 * rmconst1) df = abs(f[-1] - f[-2]) bw = (f[0] - f[-1] + df) N = int(bw / abs(eps)) # get model data fmodel = np.linspace(f[0] + df/2, f[-1] - df/2, N) pamodel = rmquad(fmodel, rm, pa0) pamodel_wrapped = pamodel % 180 wrap_points = np.where(np.abs(np.diff(pamodel_wrapped)) > 10)[0] + 1 wrap_points = (wrap_points / (pamodel.size - 1) * (f.size - 1)).astype(int) pa_unwrapped = pa.copy() idx = 0 i = 0 for i in range(wrap_points.size): pa_unwrapped[idx:wrap_points[i]] += float(rmsgn * i * 180.0) idx = wrap_points[i] pa_unwrapped[idx:] += float(rmsgn * (i+1)*180) pa_unwrapped -= (pa_unwrapped[-1] - rmquad(f, rm, pa0)[-1]) return pa_unwrapped, fmodel, pamodel