"""
The service layer is responsible for turning validated inputs into the relevant calculation inputs,
calling any calculation functions and collating the results.
"""
from inspect import signature
from typing import Optional, TypedDict
import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.units import Quantity
from ska_ost_senscalc.common.service import get_subbands
from ska_ost_senscalc.common.spectropolarimetry import (
SpectropolarimetryInput,
SpectropolarimetryResults,
get_spectropolarimetry_results,
)
from ska_ost_senscalc.mid.calculator import Calculator
from ska_ost_senscalc.mid.validation import MID_CONTINUUM_CHANNEL_WIDTH_KHZ
from ska_ost_senscalc.subarray import SubarrayStorage
from ska_ost_senscalc.utilities import Telescope
subarray_storage = SubarrayStorage(Telescope.MID)
[docs]
class SingleSubbandResponse(TypedDict):
subband_freq_centre: Quantity
sensitivity: Optional[Quantity]
integration_time: Optional[Quantity]
[docs]
class ContinuumSensitivityResponse(TypedDict):
"""
Typed dictionary constrained to match the OpenAPI schema for the
response body of a single continuum sensitivity calculation.
"""
continuum_sensitivity: Optional[Quantity]
continuum_integration_time: Optional[Quantity]
spectral_sensitivity: Optional[Quantity]
spectral_integration_time: Optional[Quantity]
continuum_subband_sensitivities: Optional[list[SingleSubbandResponse]]
continuum_subband_integration_times: Optional[list[SingleSubbandResponse]]
spectropolarimetry_results: SpectropolarimetryResults
[docs]
class SingleZoomSensitivityResponse(TypedDict):
"""
Typed dictionary constrained to match the OpenAPI schema for the
response body of a single zoom sensitivity calculation.
"""
freq_centre: Quantity
spectral_sensitivity: Optional[Quantity]
spectral_integration_time: Optional[Quantity]
spectropolarimetry_results: SpectropolarimetryResults
# Get the keywords of the Calculator constructor
KWARGS_CALCULATOR = [
p.name
for p in signature(Calculator).parameters.values()
if p.kind == p.KEYWORD_ONLY
]
def _create_calculator(params) -> Calculator:
"""
Returns an instance of the calculator from the params.
We filter the params using the Calculator kwargs. This is legacy code,
eventually we will refactor the calculator to take an object for an input and we won't need this
"""
# Keep only the params in the list of constructor inputs
filtered_params = {k: v for k, v in params.items() if k in KWARGS_CALCULATOR}
return Calculator(**filtered_params)
[docs]
def get_continuum_calculate_response(params: dict) -> ContinuumSensitivityResponse:
"""
Extract the params from the request, convert them into the relevant calculator inputs,
perform the calculations and collect the results into the response body.
"""
# Parse the target
target = SkyCoord(
params["pointing_centre"],
frame="icrs",
unit=(u.hourangle, u.deg),
)
params["target"] = target
# Main results
result = {}
# Create the calculator for the main continuum result
continuum_calculator = _create_calculator(params)
# Create the calculator for the spectral results, which are returned in the continuum response body
spectral_params = params.copy()
# For the spectral calculation, the bandwidth used in the calculation should be the effective resolution,
# which is the intrinsic channel width multiplied by the spectral_averaging_factor
effective_resolution_hz = (
MID_CONTINUUM_CHANNEL_WIDTH_KHZ
* 1e3
* spectral_params.get("spectral_averaging_factor", 1)
)
spectral_params["bandwidth_hz"] = effective_resolution_hz
spectral_calculator = _create_calculator(spectral_params)
if params.get("integration_time_s"):
sensitivity = continuum_calculator.calculate_sensitivity(
params["integration_time_s"]
).to(u.Jy)
result.update({"continuum_sensitivity": sensitivity})
# Calculate spectral sensitivity using effective resolution for bandwidth
spectral_sensitivity = spectral_calculator.calculate_sensitivity(
params["integration_time_s"]
).to(u.Jy)
result.update({"spectral_sensitivity": spectral_sensitivity})
if params.get("sensitivity_jy"):
integration_time = continuum_calculator.calculate_integration_time(
params["sensitivity_jy"]
).to(u.s)
result.update({"continuum_integration_time": integration_time})
# Calculate spectral integration time using resolution for bandwidth
spectral_integration_time = spectral_calculator.calculate_integration_time(
params["sensitivity_jy"]
).to(u.s)
result.update({"spectral_integration_time": spectral_integration_time})
# Subbands - if subbands is 1 then we just return the main sensitivity calculation as the subband is the whole bandwidth
if params.get("n_subbands", 1) != 1:
subband_results = []
subband_freq_centres_hz, subband_bandwidth = get_subbands(
params["n_subbands"], params["freq_centre_hz"], params["bandwidth_hz"]
)
if integration_time_s := params.get("integration_time_s"):
for subband_freq_centre_hz in subband_freq_centres_hz:
# Create the calculator for each subband result
subband_params = params.copy()
subband_params["freq_centre_hz"] = subband_freq_centre_hz
subband_params["bandwidth_hz"] = subband_bandwidth
subband_calculator = _create_calculator(subband_params)
sensitivity = subband_calculator.calculate_sensitivity(
integration_time_s
).to(u.Jy)
subband_results.append(
{
"subband_freq_centre": u.Quantity(subband_freq_centre_hz, "Hz"),
"sensitivity": sensitivity,
}
)
result.update({"continuum_subband_sensitivities": subband_results})
if params.get("sensitivity_jy"):
for subband_freq_centre_hz, subband_sensitivity_jy in zip(
subband_freq_centres_hz, params.get("subband_sensitivities_jy")
):
# Create the calculator for each subband result
subband_params = params.copy()
subband_params["freq_centre_hz"] = subband_freq_centre_hz
subband_params["bandwidth_hz"] = subband_bandwidth
subband_calculator = _create_calculator(subband_params)
integration_time = subband_calculator.calculate_integration_time(
subband_sensitivity_jy
).to(u.s)
subband_results.append(
{
"subband_freq_centre": u.Quantity(subband_freq_centre_hz, "Hz"),
"integration_time": integration_time,
}
)
result.update({"continuum_subband_integration_times": subband_results})
spectropolarimetry_input = SpectropolarimetryInput(
bandwidth=u.Quantity(params["bandwidth_hz"], "Hz"),
frequency=u.Quantity(params["freq_centre_hz"], "Hz"),
effective_channel_width=u.Quantity(effective_resolution_hz, "Hz"),
)
result.update(
{
"spectropolarimetry_results": get_spectropolarimetry_results(
spectropolarimetry_input
)
}
)
return ContinuumSensitivityResponse(**result)
[docs]
def get_zoom_calculate_response(
params,
) -> list[SingleZoomSensitivityResponse]:
"""
Extract the params from the request, convert them into the relevant calculator inputs,
perform the calculations and collect the results into the response body.
"""
# Parse the target
target = SkyCoord(
params["pointing_centre"],
frame="icrs",
unit=(u.hourangle, u.deg),
)
params["target"] = target
zoom_results = []
if params.get("integration_time_s"):
for zoom_freq_centre_hz, zoom_spectral_resolution_hz, total_bandwidth_hz in zip(
params["freq_centres_hz"],
params["spectral_resolutions_hz"],
params["total_bandwidths_hz"],
):
zoom_params = params.copy()
zoom_params["freq_centre_hz"] = zoom_freq_centre_hz
effective_resolution_hz = zoom_spectral_resolution_hz * params.get(
"spectral_averaging_factor", 1
)
zoom_params["bandwidth_hz"] = effective_resolution_hz
zoom_calculator = _create_calculator(zoom_params)
sensitivity = zoom_calculator.calculate_sensitivity(
params["integration_time_s"]
).to(u.Jy)
spectropolarimetry_input = SpectropolarimetryInput(
bandwidth=Quantity(total_bandwidth_hz, "Hz"),
frequency=Quantity(zoom_freq_centre_hz, "Hz"),
effective_channel_width=Quantity(effective_resolution_hz, "Hz"),
)
spectropolarimetry_results = get_spectropolarimetry_results(
spectropolarimetry_input
)
zoom_results.append(
SingleZoomSensitivityResponse(
freq_centre=zoom_freq_centre_hz * u.Hz,
spectral_sensitivity=sensitivity,
spectropolarimetry_results=spectropolarimetry_results,
)
)
if params.get("sensitivities_jy"):
for (
zoom_freq_centre_hz,
zoom_spectral_resolution_hz,
zoom_sensitivity_jy,
total_bandwidth_hz,
) in zip(
params["freq_centres_hz"],
params["spectral_resolutions_hz"],
params["sensitivities_jy"],
params["total_bandwidths_hz"],
):
zoom_params = params.copy()
zoom_params["freq_centre_hz"] = zoom_freq_centre_hz
effective_resolution_hz = zoom_spectral_resolution_hz * params.get(
"spectral_averaging_factor", 1
)
zoom_params["bandwidth_hz"] = effective_resolution_hz
zoom_calculator = _create_calculator(zoom_params)
integration_time = zoom_calculator.calculate_integration_time(
zoom_sensitivity_jy
).to(u.s)
spectropolarimetry_input = SpectropolarimetryInput(
bandwidth=Quantity(total_bandwidth_hz, "Hz"),
frequency=Quantity(zoom_freq_centre_hz, "Hz"),
effective_channel_width=Quantity(effective_resolution_hz, "Hz"),
)
spectropolarimetry_results = get_spectropolarimetry_results(
spectropolarimetry_input
)
zoom_results.append(
SingleZoomSensitivityResponse(
freq_centre=zoom_freq_centre_hz * u.Hz,
spectral_integration_time=integration_time,
spectropolarimetry_results=spectropolarimetry_results,
)
)
return zoom_results
[docs]
def get_subarray_response():
"""
return the appropriate subarray objects
"""
return [
{
"name": subarray.name,
"label": subarray.label,
"n_ska": subarray.n_ska,
"n_meer": subarray.n_meer,
}
for subarray in subarray_storage.list()
]