Source code for ctapipe.monitoring.interpolation

from abc import ABCMeta, abstractmethod
from typing import Any

import astropy.units as u
import numpy as np
import tables
from astropy.table import Table
from astropy.time import Time
from scipy.interpolate import interp1d

from ctapipe.core import Component, traits

__all__ = [
    "MonitoringInterpolator",
    "LinearInterpolator",
    "PointingInterpolator",
    "ChunkInterpolator",
]


[docs] class MonitoringInterpolator(Component, metaclass=ABCMeta): """ MonitoringInterpolator parent class. Parameters ---------- h5file : None | tables.File An open hdf5 file with read access. """ def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None: super().__init__(**kwargs) if h5file is not None and not isinstance(h5file, tables.File): raise TypeError("h5file must be a tables.File") self.h5file = h5file
[docs] @abstractmethod def __call__(self, tel_id: int, time: Time): """ Interpolates monitoring data for a given timestamp Parameters ---------- tel_id : int Telescope id. time : astropy.time.Time Time for which to interpolate the monitoring data. """ pass
[docs] @abstractmethod def add_table(self, tel_id: int, input_table: Table) -> None: """ Add a table to this interpolator. This method reads input tables and creates instances of the needed interpolators. The first index of _interpolators needs to be tel_id, the second needs to be the name of the parameter that is to be interpolated. Parameters ---------- tel_id : int Telescope id. input_table : astropy.table.Table Table of pointing values, expected columns are always ``time`` as ``Time`` column and other columns for the data that is to be interpolated. """ pass
def _check_tables(self, input_table: Table) -> None: missing = self.required_columns - set(input_table.colnames) if len(missing) > 0: raise ValueError(f"Table is missing required column(s): {missing}") for col in self.expected_units: unit = input_table[col].unit if unit is None: if self.expected_units[col] is not None: raise ValueError( f"{col} must have units compatible with '{self.expected_units[col].name}'" ) elif not self.expected_units[col].is_equivalent(unit): if self.expected_units[col] is None: raise ValueError(f"{col} must have units compatible with 'None'") else: raise ValueError( f"{col} must have units compatible with '{self.expected_units[col].name}'" ) def _read_parameter_table(self, tel_id: int) -> None: # prevent circular import between io and monitoring from ..io import read_table input_table = read_table( self.h5file, f"{self.telescope_data_group}/tel_{tel_id:03d}", ) self.add_table(tel_id, input_table)
[docs] class LinearInterpolator(MonitoringInterpolator): """ LinearInterpolator parent class. Parameters ---------- h5file : None | tables.File An open hdf5 file with read access. """ bounds_error = traits.Bool( default_value=True, help="If true, raises an exception when trying to extrapolate out of the given table", ).tag(config=True) extrapolate = traits.Bool( help="If bounds_error is False, this flag will specify whether values outside" "the available values are filled with nan (False) or extrapolated (True).", default_value=False, ).tag(config=True) def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None: super().__init__(h5file, **kwargs) self._interpolators = {} self.interp_options: dict[str, Any] = dict(assume_sorted=True, copy=False) if self.bounds_error: self.interp_options["bounds_error"] = True elif self.extrapolate: self.interp_options["bounds_error"] = False self.interp_options["fill_value"] = "extrapolate" else: self.interp_options["bounds_error"] = False self.interp_options["fill_value"] = np.nan def _check_interpolators(self, tel_id: int) -> None: if tel_id not in self._interpolators: if self.h5file is not None: self._read_parameter_table(tel_id) # might need to be removed else: raise KeyError(f"No table available for tel_id {tel_id}")
[docs] class PointingInterpolator(LinearInterpolator): """ Interpolator for pointing and pointing correction data. """ telescope_data_group = "/dl0/monitoring/telescope/pointing" required_columns = frozenset(["time", "azimuth", "altitude"]) expected_units = {"azimuth": u.rad, "altitude": u.rad}
[docs] def __call__(self, tel_id: int, time: Time) -> tuple[u.Quantity, u.Quantity]: """ Interpolate alt/az for given time and tel_id. Parameters ---------- tel_id : int Telescope id. time : astropy.time.Time Time for which to interpolate the pointing. Returns ------- altitude : astropy.units.Quantity[deg] Interpolated altitude angle. azimuth : astropy.units.Quantity[deg] Interpolated azimuth angle. """ self._check_interpolators(tel_id) mjd = time.tai.mjd az = u.Quantity(self._interpolators[tel_id]["az"](mjd), u.rad, copy=False) alt = u.Quantity(self._interpolators[tel_id]["alt"](mjd), u.rad, copy=False) return alt, az
[docs] def add_table(self, tel_id: int, input_table: Table) -> None: """ Add a table to this interpolator. Parameters ---------- tel_id : int Telescope id. input_table : astropy.table.Table Table of pointing values, expected columns are ``time`` as ``Time`` column, ``azimuth`` and ``altitude`` as quantity columns for pointing and pointing correction data. """ self._check_tables(input_table) if not isinstance(input_table["time"], Time): raise TypeError("'time' column of pointing table must be astropy.time.Time") input_table = input_table.copy() input_table.sort("time") az = input_table["azimuth"].quantity.to_value(u.rad) # prepare azimuth for interpolation by "unwrapping": i.e. turning # [359, 1] into [359, 361]. This assumes that if we get values like # [359, 1] the telescope moved 2 degrees through 0, not 358 degrees # the other way around. This should be true for all telescopes given # the sampling speed of pointing values and their maximum movement speed. # No telescope can turn more than 180° in 2 seconds. az = np.unwrap(az) alt = input_table["altitude"].quantity.to_value(u.rad) mjd = input_table["time"].tai.mjd self._interpolators[tel_id] = {} self._interpolators[tel_id]["az"] = interp1d( mjd, az, kind="linear", **self.interp_options ) self._interpolators[tel_id]["alt"] = interp1d( mjd, alt, kind="linear", **self.interp_options )
[docs] class ChunkInterpolator(MonitoringInterpolator): """ Simple interpolator for overlapping chunks of data. """ def __init__(self, h5file: None | tables.File = None, **kwargs: Any) -> None: super().__init__(**kwargs) self.start_time = {} self.end_time = {} self.values = {} self.columns = list(self.required_columns) # these will be the data columns self.columns.remove("start_time") self.columns.remove("end_time")
[docs] def __call__(self, tel_id: int, time: Time) -> float | dict[str, float]: """ Interpolate overlapping chunks of data for a given time, tel_id, and column(s). Parameters ---------- tel_id : int Telescope id. time : astropy.time.Time Time for which to interpolate the data. Returns ------- interpolated : float or dict Interpolated data for the specified column(s). """ if tel_id not in self.values: self._read_parameter_table(tel_id) result = {} mjd = time.to_value("mjd") for column in self.columns: result[column] = self._interpolate_chunk(tel_id, column, mjd) if len(result) == 1: return result[self.columns[0]] return result
[docs] def add_table(self, tel_id: int, input_table: Table) -> None: """ Add a table to this interpolator for specific columns. Parameters ---------- tel_id : int Telescope id. input_table : astropy.table.Table Table of values to be interpolated, expected columns are ``start_time`` as ``validity start Time`` column, ``end_time`` as ``validity end Time`` and the specified columns for the data of the chunks. """ self._check_tables(input_table) input_table = input_table.copy() input_table.sort("start_time") self.values[tel_id] = {} self.start_time[tel_id] = input_table["start_time"].to_value("mjd") self.end_time[tel_id] = input_table["end_time"].to_value("mjd") for column in self.columns: self.values[tel_id][column] = input_table[column]
def _interpolate_chunk(self, tel_id, column, mjd: float) -> float: """ Interpolates overlapping chunks of data preferring earlier chunks if valid Parameters ---------- tel_id : int tel_id for which data is to be interpolated mjd : float Time for which to interpolate the data. """ start_time = self.start_time[tel_id] end_time = self.end_time[tel_id] values = self.values[tel_id][column] # Find the index of the closest preceding start time preceding_index = np.searchsorted(start_time, mjd, side="right") - 1 if preceding_index < 0: return np.nan value = np.nan # Check if the time is within the valid range of the chunk if start_time[preceding_index] <= mjd <= end_time[preceding_index]: value = values[preceding_index] # If an element in the closest preceding chunk has nan, check the next closest chunk for i in range(preceding_index - 1, -1, -1): if start_time[i] <= mjd <= end_time[i]: if value is np.nan: value = values[i] else: value = np.where(np.isnan(value), values[i], value) return value
[docs] class FlatFieldInterpolator(ChunkInterpolator): required_columns = frozenset(["start_time", "end_time", "relative_gain"]) expected_units = {"relative_gain": None} telescope_data_group = "/dl1/monitoring/telescope/flatfield"
[docs] class PedestalInterpolator(ChunkInterpolator): required_columns = frozenset(["start_time", "end_time", "pedestal"]) expected_units = {"pedestal": None} telescope_data_group = "/dl1/monitoring/telescope/pedestal"