Source code for ska_ost_senscalc.mid_utilities

"""
Module holding functions useful to the MidCalculator
"""
import logging
import os
from enum import Enum
from math import sqrt

import astropy.units as u
import numpy as np
from astropy.constants import c, k_B
from astropy.table import Table
from astropy.units import Quantity
from scipy.interpolate import interp1d

from ska_ost_senscalc.subarray import (
    LOWArrayConfiguration,
    MIDArrayConfiguration,
    SubarrayStorage,
)
from ska_ost_senscalc.utilities import (
    MEERKAT_DISH_POINTING_ERROR,
    SKA_DISH_POINTING_ERROR,
    STATIC_DATA_PATH,
    Atmosphere,
    Celestial,
    DishType,
    Telescope,
    TelParams,
    Utilities,
)

logger = logging.getLogger("senscalc")

# pylint: disable=invalid-name

celestial = Celestial()


[docs]def read_table(file_name, columns=None): """ function to read in the lookup tables """ path_to_table = STATIC_DATA_PATH / "lookups" / file_name return Table.read(path_to_table, include_names=columns)
mid_weighting_table = read_table("beam_weighting_mid.ecsv") mid_reference_frequency = mid_weighting_table.meta["reference_frequency"] low_weighting_table = read_table("beam_weighting_low.ecsv") low_reference_frequency = low_weighting_table.meta["reference_frequency"] weighting_tables = { Telescope.MID: mid_weighting_table, Telescope.LOW: low_weighting_table, } ref_frequencies = { Telescope.MID: mid_reference_frequency, Telescope.LOW: low_reference_frequency, } confusion_noise_table = read_table("confusion_noise.ecsv") # original look up table in linear space, converting the table to log space for # the calculation for k in confusion_noise_table.keys(): confusion_noise_table[k] = np.log10(confusion_noise_table[k]) # look up table has the subarrays stored according to their filename stem. creating a new column that holds the # subarray label data mid_sub_storage = SubarrayStorage(Telescope.MID) file_mapping = mid_sub_storage.filename_label_mapping() mid_weighting_table.add_column( [ file_mapping[os.path.splitext(file_stem)[0]] for file_stem in mid_weighting_table["subarray"] ], name="subarray_label", ) low_sub_storage = SubarrayStorage(Telescope.LOW) file_mapping = low_sub_storage.filename_label_mapping() low_weighting_table.add_column( [ file_mapping[os.path.splitext(file_stem)[0]] for file_stem in low_weighting_table["subarray"] ], name="subarray_label", )
[docs]class Weighting(Enum): """ Enumeration for different weighting """ NATURAL = "natural" ROBUST = "robust" UNIFORM = "uniform"
[docs]class CalculatorMode(Enum): """ Enumeration for calculator modes """ LINE = "line" CONTINUUM = "continuum" PULSAR = "pulsar"
[docs]class Limit(Enum): """ Enumeration for different types of limit """ UPPER = "upper limit" LOWER = "lower limit" VALUE = "value"
[docs]class BeamSize(object): """Class for returning a BeamSize object. :param beam_maj: the beam major axis in degrees :type beam_maj: astropy.units.Quantity or number :param beam_min: the beam minor axis in degrees :type beam_min: astropy.units.Quantity or number :param beam_pa: the beam position angle in degrees :type beam_pa: astropy.units.Quantity or number :return: BeamSize object :rtype: object """ def __init__(self, beam_maj, beam_min, beam_pa): """ BeamSize class constructor """ self.beam_maj = Utilities.to_astropy(beam_maj, u.deg) self.beam_min = Utilities.to_astropy(beam_min, u.deg) self.beam_pa = Utilities.to_astropy(beam_pa, u.deg) def __repr__(self): return f"<BeamSize({self.beam_maj}, {self.beam_min}, {self.beam_pa})>"
[docs]def eta_bandpass(): """Efficiency factor for due to the departure of the bandpass from an ideal, rectangular shape. For now this is a placeholder. :return: the efficiency, eta :rtype: float """ eta = 1.0 logger.debug(f"eta_bandpass() -> {eta}") return eta
[docs]def eta_coherence(obs_freq): """Efficiency factor for the sensitivity degradation due to the incoherence on a baseline. :param obs_freq: the observing frequency :type obs_freq: astropy.units.Quantity :return: the efficiency, eta :rtype: float """ eta = np.exp(np.log(0.98) * obs_freq.to_value(u.GHz) ** 2 / 15.4**2) logger.debug(f"eta_coherence({obs_freq}) -> {eta}") return eta
[docs]def eta_correlation(): """Efficiency factor due to imperfection in the correlation algorithm, e.g. truncation error. :return: the efficiency, eta :rtype: float """ eta = 0.98 logger.debug(f"eta_correlation() -> {eta}") return eta
[docs]def eta_digitisation(obs_band): """Efficiency factor due to losses from quantisation during signal digitisation. This process is independent of the telescope and environment, but only depends on the 'effective number of bits' (ENOB) of the system, which depends in turn on digitiser quality and clock jitter, and on band flatness. :param obs_band: the observing band :type obs_band: str :return: the efficiency, eta :rtype: float """ if obs_band == "Band 1": eta = 0.999 elif obs_band == "Band 2": eta = 0.999 elif obs_band == "Band 3": eta = 0.998 elif obs_band == "Band 4": eta = 0.98 elif obs_band == "Band 5a": eta = 0.955 elif obs_band == "Band 5b": eta = 0.955 else: raise RuntimeError("bad obs_band: %s" % obs_band) logger.debug(f"eta_digitisation() -> {eta}") return eta
[docs]def eta_dish(obs_freq, dish_type): """Efficiency factor due to losses for specified dish type. :param obs_freq: the observing frequency :type obs_freq: astropy.units.Quantity :param dish_type: the type of dish :type dish_type: DishType :return: the efficiency, eta :rtype: float """ eta = TelParams.calculate_dish_efficiency(obs_freq, dish_type) logger.debug(f"eta_dish({obs_freq}, {dish_type}) -> {eta}") return eta
[docs]def eta_point(obs_freq, dish_type): """Efficiency factor at the observing frequency due to the dish pointing error. :param obs_freq: the observing frequency :type obs_freq: astropy.units.Quantity :param dish_type: the type of dish :type dish_type: DishType :return: the efficiency, eta :rtype: float """ if dish_type is DishType.SKA1: pointing_error = SKA_DISH_POINTING_ERROR elif dish_type is DishType.MeerKAT: pointing_error = MEERKAT_DISH_POINTING_ERROR else: raise RuntimeError("bad dish_type: %s" % dish_type) eta = 1.0 / ( 1.0 + ( 8 * np.log(2) * (pointing_error / TelParams.dish_fwhm(obs_freq, dish_type)) ** 2 ) ) eta = eta.to_value(u.dimensionless_unscaled) logger.debug(f"eta_point({obs_freq}, {dish_type}) -> {eta}") return eta
# Not currently used.
[docs]def eta_rfi(): """Efficiency factor due to Radio Frequency Interference (RFI) :return: the efficiency, eta :rtype: float """ eta = 1.00 logger.debug(f"eta_rfi() -> {eta}") return eta
[docs]def eta_system( eta_point, eta_coherence, eta_digitisation, eta_correlation, eta_bandpass ): """System efficiency for SKA interferometer :param eta_point: efficiency loss due to pointing errors :type eta_point: float :param eta_coherence: efficiency due to loss of coherence :type eta_coherence: float :param eta_digitisation: efficiency loss due to errors in the digitisation process :type eta_digitisation: float :param eta_correlation: efficiency loss due to errors in the correlation process :type eta_correlation: float :param eta_bandpass: efficiency loss due to the bandpass not being rectangular :type eta_bandpass: float :return: the system efficiency :rtype: float """ eta_system = ( eta_point * eta_coherence * eta_digitisation * eta_correlation * eta_bandpass ) logger.debug( f"eta_system({eta_point}, {eta_coherence}, {eta_digitisation}," f" {eta_correlation}, {eta_bandpass}) -> {eta_system}" ) return eta_system
[docs]def Tgal(target, obs_freq, dish_type, alpha): """Brightness temperature of Galactic background in target direction at observing frequency. :param target: target direction :type target: astropy.SkyCoord :param obs_freq: the observing frequency :type obs_freq: astropy.units.Quantity :param dish_type: the type of dish :type dish_type: DishType :param alpha: spectral index of emission :type alpha: float :return: the brightness temperature of the Galactic background :rtype: astropy.units.Quantity """ result = celestial.calculate_Tgal(target, obs_freq, dish_type, alpha) logger.debug(f"Tgal({target}, {obs_freq}, {dish_type}, {alpha}) -> {result}") return result
[docs]def Trcv(obs_freq, obs_band, dish_type): """Receiver temperature for specified freq, band and dish. :param obs_freq: the observing frequency :type obs_freq: astropy.units.Quantity :param obs_band: the observing band :type obs_band: str :param dish_type: the type of dish :type dish_type: DishType :return: the receiver temperature :rtype: astropy.units.Quantity """ result = TelParams.calculate_Trcv(obs_freq, obs_band, dish_type) logger.debug(f"Trcv({obs_freq}, {obs_band}, {dish_type}) -> {result}") return result
[docs]def Tsky(Tgal, obs_freq, elevation, weather): """Brightness temperature of sky in target direction. :param Tgal: brightness temperature of Galactic background :type Tgal: astropy.units.Quantity :param obs_freq: the observing frequency :type obs_freq: astropy.units.Quantity :param elevation: the observing elevation :type elevation: astropy.units.Quantity :param weather: the atmosphere PWV :type weather: float :return: the brightness temperature of the sky :rtype: astropy.units.Quantity """ # Tatm, Tcmb cannot be overriden by the user # Tatm at zenith Tatm_zen = Atmosphere.calculate_Tatm(weather, obs_freq) # Tcmb and Tgal are attenuated by the atmosphere, # Tatm also varies with zenith angle tau_zen = Atmosphere.get_tauz_atm(weather, obs_freq) zenith = 90.0 * u.deg - elevation tau = tau_zen / np.cos(zenith) # approximating Tatm ~ Tphys * (1 - exp(-tau)) Tphys = Tatm_zen / (1 - np.exp(-tau_zen)) result = Tphys * (1 - np.exp(-tau)) + (Tgal + celestial.Tcmb) * np.exp(-tau) logger.debug(f"Tsky({Tgal}, {obs_freq}, {elevation}, {weather}) -> {result}") return result
[docs]def Tspl(dish_type): """Spillover temperature for specified dish type. :param dish_type: the type of dish :type dish_type: DishType :return: the spillover temperature :rtype: astropy.units.Quantity """ result = TelParams.calculate_Tspl(dish_type) logger.debug(f"Tspl() -> {result}") return result
[docs]def Tsys_dish(Trcv, Tspl, Tsky, obs_freq): """System temperature. :param Trcv: the receiver temperature :type Trcv: astropy.units.Quantity :param Tspl: the spillover temperature :type Tspl: astropy.units.Quantity :param Tsky: the sky temperature :type Tsky: astropy.units.Quantity :param obs_freq: the observing frequency :type obs_freq: astropy.units.Quantity :return: the dish system temperature :rtype: astropy.units.Quantity """ # Add the temperatures to get the system temperature on the ground Tsys_ground = Trcv + Tspl + Tsky # Apply high-frequency correction to Rayleigh-Jeans temperature result = Utilities.Tx(obs_freq, Tsys_ground) result = result.to(u.K) logger.debug(f"Tsys_dish({Trcv}, {Tspl}, {Tsky}, {obs_freq}) -> {result}") return result
[docs]class Beam: """Beam class :param frequency: the observing frequency (Hz) :type frequency: astropy.units.Quantity or number :param zoom_frequencies: the frequencies of the line zooms (Hz) :type zoom_frequencies: iterable astropy.units.Quantity or list :param array_configuration: the subarray configuration :type array_configuration: ArrayConfiguration :param weighting: the type of uv-weighting used (robust or uniform) :type weighting: Weighting :param robustness: the Briggs robustness parameter :type robustness: float :param dec: the target declination (deg) :type dec: astropy.units.Quantity or number :param taper: the Gaussian taper to apply (arcsec) :type taper: astropy.units.Quantity or number :param calculator_mode: the calculator mode being used :type calculator_mode: CalculatorMode :return: Beam object :rtype: object """ def __init__( self, *, frequency=None, zoom_frequencies=None, array_configuration, weighting, robustness=None, dec, taper=0.0, calculator_mode, telescope=Telescope.MID, ): self.weighting_table = weighting_tables[telescope] self.reference_frequency = ref_frequencies[telescope] if frequency is not None: self.frequency = Utilities.to_astropy(frequency, u.Hz) if zoom_frequencies is not None: self.zoom_frequencies = Utilities.to_astropy(zoom_frequencies, u.Hz) self.dec = Utilities.to_astropy(dec, u.deg) self.weighting = weighting if weighting != Weighting.ROBUST and robustness: logger.warning( f"Robust parameter defined ({robustness}) but will be" f" ignored for given weighting ({weighting.name})" ) self.robustness = robustness self.array_configuration = array_configuration if array_configuration is None: raise RuntimeError("Parameter 'array_configuration' is required") elif ( array_configuration not in MIDArrayConfiguration and array_configuration not in LOWArrayConfiguration ): raise RuntimeError( f"Invalid array configuration: {array_configuration} for telescope {telescope}" ) else: self.array_configuration = array_configuration if isinstance(taper, u.quantity.Quantity): taper_arcsec = taper.to(u.arcsec) else: taper_arcsec = taper * u.arcsec self.taper = taper_arcsec self.calculator_mode = calculator_mode self.telescope = telescope
[docs] def beam_size(self): """ method to return list of beam sizes. """ weighted = self.__filter_table() self._validate_declination(weighted) interp_bmaj = interp1d( weighted["declination"], weighted["cleanbeam_bmaj"], fill_value="extrapolate", ) interp_bmin = interp1d( weighted["declination"], weighted["cleanbeam_bmin"], fill_value="extrapolate", ) interp_bpa = interp1d( weighted["declination"], weighted["cleanbeam_bpa"], fill_value="extrapolate", ) beam_maj = interp_bmaj(self.dec) * u.deg beam_min = interp_bmin(self.dec) * u.deg beam_pa = interp_bpa(self.dec) obj = [] if self.calculator_mode is CalculatorMode.LINE: frequencies = self.zoom_frequencies else: frequencies = [self.frequency] for freq in frequencies: obj.append( BeamSize( beam_maj=beam_maj * self.reference_frequency / freq.to(u.GHz), beam_min=beam_min * self.reference_frequency / freq.to(u.GHz), beam_pa=beam_pa, ) ) return obj
[docs] def weighting_factor(self): """ method to calculate the beam weighting factor of the observation """ # As of PI#15 we are no longer interpolating over frequency, elevation or integration time; # only declination. This has considerably simplified this method. If interpolation over more # than one parameter is required see commit 3640f432 if self.weighting is Weighting.NATURAL and self.taper == 0.0 * u.arcsec: return 1.0 else: weighted = self.__filter_table() self._validate_declination(weighted) natural = self.__filter_table( use_weighting=Weighting.NATURAL, use_taper=0.0 * u.arcsec ) interp_weighted = interp1d( weighted["declination"], weighted["pss_casa"], fill_value="extrapolate", ) interp_natural = interp1d( natural["declination"], natural["pss_casa"], fill_value="extrapolate", ) return interp_weighted(self.dec) / interp_natural(self.dec)
def _validate_declination(self, weighted): # As of PI19, the weighting tables do not have values for every # allowed declination. It was decided to extrapolate the table # values within reasonable limits: to 1 degree of the maximum # calculated declination. extrapolation_tolerance = 1.0 dec_min = weighted["declination"].min() dec_max = weighted["declination"].max() + extrapolation_tolerance # Raise RuntimeError if declination value is outside the allowed # extrapolation limits if self.dec.value < dec_min or self.dec.value > dec_max: raise ValueError( "Cannot extrapolate beyond declination limits of " f"{dec_min} <= dec < {dec_max}" )
[docs] def surface_brightness_conversion_factor(self) -> Quantity: """ method to calculate the factor the sensitivity returned by calculator must be multiplied by to obtain the surface brightness sensitivity in units of K/Jy. """ if self.calculator_mode is CalculatorMode.LINE: frequency = self.zoom_frequencies else: frequency = self.frequency wavelength = c / frequency.to(u.s**-1) factor = ( wavelength**2 / (2.0 * k_B * self.__area()) * (1 * u.sr) ) # fudge factor to remove the /sr return factor.to(u.K / u.Jy)
[docs] def confusion_noise(self): """ method to calculate the confusion noise of beam """ confusion_noise = [] lim = [] if self.calculator_mode == CalculatorMode.LINE: frequency = self.zoom_frequencies.value else: frequency = [self.frequency.value] log_frequency = np.log10(frequency) beam_geomean = [] for beam in self.beam_size(): beam_geomean.append(sqrt(beam.beam_min.value * beam.beam_maj.value)) log_beam_geomean = np.log10(beam_geomean) sigma_min = confusion_noise_table["sigma"].min(axis=0) sigma_max = confusion_noise_table["sigma"].max(axis=0) for n, log_f in enumerate(log_frequency): sigma_m = interp1d( confusion_noise_table["beam"], confusion_noise_table["sigma"], bounds_error=False, axis=0, fill_value=(sigma_min, sigma_max), )(log_beam_geomean[n]) sigma = interp1d( confusion_noise_table["frequency"][0], sigma_m, fill_value="extrapolate", )(log_f) confusion_noise.append(10**sigma) if log_beam_geomean[n] < confusion_noise_table["beam"].min(): lim.append(Limit.UPPER.value) elif log_beam_geomean[n] > confusion_noise_table["beam"].max(): lim.append(Limit.LOWER.value) else: lim.append(Limit.VALUE.value) return confusion_noise * u.Jy, lim
def __filter_table(self, use_weighting=None, use_taper=None): """ private method to filter the lookup table """ if use_weighting is None: use_weighting = self.weighting if use_taper is None: use_taper = self.taper elif isinstance(use_taper, u.quantity.Quantity): use_taper = use_taper.to(u.arcsec) else: use_taper = use_taper * u.arcsec weighted = self.weighting_table[ ( ( self.weighting_table["subarray_label"] == self.array_configuration.value ) & (self.weighting_table["weighting"] == use_weighting.value) & (self.weighting_table["spec_mode"] == self.calculator_mode.value) & (self.weighting_table["taper_arcsec"] == use_taper.value) ) ] # Only take robustness into account if weighting is `robust` if use_weighting == Weighting.ROBUST: weighted = weighted[(weighted["robustness"] == self.robustness)] return weighted def __area(self): """ private method to calculate the beam area """ beam = self.beam_size() area = [] for b in beam: theta = np.sqrt(b.beam_maj * b.beam_min).to(u.rad) area_sr = (np.pi * theta**2 / (4 * np.log(2))).to(u.sr).value area.append(area_sr) return area * u.sr