Source code for ctapipe.io.hdf5monitoringsource

"""
Handles reading of monitoring files
"""

import logging
import warnings
from contextlib import ExitStack

import astropy
import astropy.units as u
import numpy as np
import tables
from astropy.coordinates import AltAz, SkyCoord
from astropy.table import Row
from astropy.utils.decorators import lazyproperty

from ..containers import (
    ArrayEventContainer,
    CameraCalibrationContainer,
    CameraMonitoringContainer,
    PixelStatisticsContainer,
    StatisticsContainer,
    TelescopePointingContainer,
)
from ..core import Provenance
from ..core.traits import List, Path
from ..exceptions import InputMissing
from ..instrument import SubarrayDescription
from .astropy_helpers import read_table
from .hdf5dataformat import (
    DL0_TEL_POINTING_GROUP,
    DL1_CAMERA_COEFFICIENTS_GROUP,
    DL1_PIXEL_STATISTICS_GROUP,
    DL1_TEL_CALIBRATION_GROUP,
)
from .metadata import read_reference_metadata
from .monitoringsource import MonitoringSource
from .monitoringtypes import TELESCOPE_SPECIFIC_MONITORING, MonitoringType

__all__ = ["HDF5MonitoringSource", "get_hdf5_monitoring_types"]

logger = logging.getLogger(__name__)


[docs] def get_hdf5_monitoring_types( h5file: tables.File | str | Path, ) -> tuple[MonitoringType]: """Get the monitoring types present in the hdf5 file""" with ExitStack() as stack: if not isinstance(h5file, tables.File): h5file = stack.enter_context(tables.open_file(h5file)) try: calibration_group = h5file.get_node(DL1_TEL_CALIBRATION_GROUP) # Iterate over enum values of MonitoringType monitoring_types = [ monitoring_type for monitoring_type in [ MonitoringType.PIXEL_STATISTICS, MonitoringType.CAMERA_COEFFICIENTS, ] if monitoring_type.value in calibration_group ] # TODO: Simplify once backwards compatibility is not needed anymore # Check for telescope pointing if DL0_TEL_POINTING_GROUP in h5file.root: monitoring_types.append(MonitoringType.TELESCOPE_POINTINGS) except (KeyError, tables.NoSuchNodeError): # TODO: Simplify once backwards compatibility is not needed anymore # Check for telescope pointing if DL0_TEL_POINTING_GROUP in h5file.root: monitoring_types = [MonitoringType.TELESCOPE_POINTINGS] else: # Return empty tuple if calibration group doesn't exist warnings.warn( f"No monitoring types found in '{h5file.filename}'.", UserWarning ) monitoring_types = [] return tuple(monitoring_types)
[docs] class HDF5MonitoringSource(MonitoringSource): """ Class for reading HDF5 monitoring data as a `~ctapipe.io.MonitoringSource`. This class provides a common interface for accessing HDF5 monitoring data from different monitoring types. An event following the ArrayEventContainer is passed to the `~ctapipe.io.HDF5MonitoringSource.fill_monitoring_container()` method and the different monitoring types are filled into a MonitoringContainer instance. See `~ctapipe.containers.MonitoringContainer` for details. A basic example on how to use the `~ctapipe.io.HDF5MonitoringSource`: >>> from ctapipe.io import SimTelEventSource, HDF5MonitoringSource >>> from ctapipe.utils import get_dataset_path >>> tel_id = 1 >>> event_source = SimTelEventSource( ... input_url="dataset://gamma_prod6_preliminary.simtel.zst", ... allowed_tels={tel_id}, ... max_events=1, ... skip_r1_calibration=True, ... ) >>> file = get_dataset_path("calibpipe_camcalib_single_chunk_i0.1.0.dl1.h5") >>> monitoring_source = HDF5MonitoringSource( ... subarray=event_source.subarray, ... input_files=[file], ... ) >>> for event in event_source: ... # Fill the event data with the monitoring container ... monitoring_source.fill_monitoring_container(event) ... # Print the monitoring information for the camera calibration ... print(event.monitoring.tel[tel_id].camera.coefficients["time"]) ... print(event.monitoring.tel[tel_id].camera.coefficients["factor"]) ... print(event.monitoring.tel[tel_id].camera.coefficients["pedestal_offset"]) ... print(event.monitoring.tel[tel_id].camera.coefficients["time_shift"]) ... print(event.monitoring.tel[tel_id].camera.coefficients["outlier_mask"]) ... print(event.monitoring.tel[tel_id].camera.coefficients["is_valid"]) 40587.000000011576 [[0.01539444 0.01501589 0.0158232 ... 0.01514254 0.01504862 0.01497081] [0.25207437 0.24654945 0.25933876 ... 0.24859268 0.24722679 0.24587582]] [[399.5 398.66666667 399.5 ... 399.25 398.41666667 399. ] [400.08333333 400.41666667 399.91666667 ... 400.25 399.5 399.66666667]] [[ 0.01000023 0.1800003 -0.09000015 ... -0.12999916 0.1800003 0.07999992] [ 0.2800007 -0.27000046 0.11000061 ... 0.04000092 -0.19000053 -0.4699993 ]] [[False False False ... False False False] [False False False ... False False False]] True Attributes ---------- input_files: list of Paths Paths to the input monitoring files. pixel_statistics: dict Dictionary to hold pixel statistics tables camera_coefficients: dict Dictionary to hold camera coefficients telescope_pointings: dict Dictionary to hold telescope pointing information """ input_files = List( Path(exists=True, directory_ok=False), default_value=[], help="List of paths to the HDF5 input files containing monitoring data", ).tag(config=True) def __init__(self, subarray=None, config=None, parent=None, **kwargs): """ MonitoringSource for monitoring files in the standard HDF5 data format Parameters: ----------- subarray : SubarrayDescription or None Optional description of the subarray. If provided, the subarray description should match the one from the monitoring file(s). config : traitlets.loader.Config Configuration specified by config file or cmdline arguments. Used to set traitlet values. Set to None if no configuration to pass. parent: Parent from which the config is used. Mutually exclusive with config kwargs """ super().__init__( subarray=subarray, config=config, parent=parent, **kwargs, ) # Check if input_files list is empty if not self.input_files: raise InputMissing( "No input files provided. Please specify a list of input file(s) " "via configuration by `--HDF5MonitoringSource.input_files` " "or using as an argument <input_files> in the constructor." ) # Initialize attributes self._monitoring_types = set() self._is_simulation = None self._camera_coefficients = {} self._pixel_statistics = {} self._telescope_pointings = {} # Read and validate subarray descriptions self._read_and_validate_subarrays() # Process all monitoring files for file in self.input_files: self._process_single_file(file) def _read_and_validate_subarrays(self): """Read subarray descriptions from files and validate compatibility.""" # Loop over the input files to read the subarray description and check for compatibility # if a subarray is already provided either externally or via a previous monitoring file. subarrays = ([self.subarray] if self.subarray is not None else []) + [ SubarrayDescription.from_hdf(f) for f in self.input_files ] # Check if all subarray descriptions are compatible if not SubarrayDescription.check_matching_subarrays(subarrays): raise IOError("Incompatible subarray descriptions found in input files.") # Set the subarray description self.subarray = subarrays[0] def _process_single_file(self, file): """Process a single monitoring file.""" # Add the file to the provenance Provenance().add_input_file( str(file), role="Monitoring", reference_meta=read_reference_metadata(file), ) with tables.open_file(file) as open_file: # Validate simulation consistency # Determine if the file is from simulation. # First check for the presence of the simulation group. file_is_simulation = False if "simulation" in open_file.root: file_is_simulation = True else: # Check for metadata attribute if simulation group is not present if ( "CTA PRODUCT DATA CATEGORY" in open_file.root._v_attrs and open_file.root._v_attrs["CTA PRODUCT DATA CATEGORY"] == "Sim" ): file_is_simulation = True if self._is_simulation is None: self._is_simulation = file_is_simulation else: if self._is_simulation != file_is_simulation: raise IOError( f"HDF5MonitoringSource: Inconsistent simulation flags found in " f"file '{file}'. Previously processed files have " f"simulation flag set to {self._is_simulation}, while " f"current file has it set to {file_is_simulation}." ) # Get monitoring types from the file file_monitoring_types = get_hdf5_monitoring_types(open_file) # Check for overlapping monitoring types overlapping_types = set(file_monitoring_types) & self._monitoring_types if overlapping_types: overlapping_names = [mt.name for mt in overlapping_types] msg = ( f"File '{file}' contains monitoring types {overlapping_names} " f"that are already present in previously processed files. " f"This may indicate duplicate or overlapping monitoring data." ) self.log.warning(msg) warnings.warn(msg, UserWarning) # Update monitoring types self._monitoring_types.update(file_monitoring_types) # Process each monitoring type if MonitoringType.PIXEL_STATISTICS in file_monitoring_types: self._process_pixel_statistics(file) if MonitoringType.CAMERA_COEFFICIENTS in file_monitoring_types: self._process_camera_coefficients(file) if MonitoringType.TELESCOPE_POINTINGS in file_monitoring_types: self._process_telescope_pointings(file) def _process_pixel_statistics(self, file): """Process pixel statistics monitoring data.""" from ..monitoring import ( FlatfieldImageInterpolator, FlatfieldPeakTimeInterpolator, PedestalImageInterpolator, ) # Open the file to check for the existence of pixel statistics tables with tables.open_file(file, mode="r") as h5file: # Iterate over pixel statistics tables to check for their existence self.pixel_stats_dict = {} for group in h5file.walk_groups(DL1_PIXEL_STATISTICS_GROUP): # Skip the parent group itself if group._v_pathname == DL1_PIXEL_STATISTICS_GROUP: continue # Instantiate the appropriate interpolator based on the table name name = group._v_name if "pedestal_image" in name: self.pixel_stats_dict[name] = PedestalImageInterpolator() elif "flatfield_image" in name: self.pixel_stats_dict[name] = FlatfieldImageInterpolator() elif "flatfield_peak_time" in name: self.pixel_stats_dict[name] = FlatfieldPeakTimeInterpolator() # Process the tables and interpolate the data for tel_id in self.subarray.tel_ids: self._pixel_statistics[tel_id] = {} for name, interpolator in self.pixel_stats_dict.items(): # Read the tables from the monitoring file self._pixel_statistics[tel_id][name] = read_table( file, f"{DL1_PIXEL_STATISTICS_GROUP}/{name}/tel_{tel_id:03d}", ) # Set outliers to NaNs for col in ["mean", "median", "std"]: self._pixel_statistics[tel_id][name][col][ self._pixel_statistics[tel_id][name]["outlier_mask"].data ] = np.nan # Register the table with the interpolator interpolator.add_table(tel_id, self._pixel_statistics[tel_id][name]) def _process_camera_coefficients(self, file): """Process camera coefficients monitoring data.""" # Read the tables from the monitoring file for tel_id in self.subarray.tel_ids: self._camera_coefficients[tel_id] = read_table( file, f"{DL1_CAMERA_COEFFICIENTS_GROUP}/tel_{tel_id:03d}", ) # Convert time column to MJD self._camera_coefficients[tel_id]["time"] = self._camera_coefficients[ tel_id ]["time"].to_value("mjd") # Add index for the retrieval later on self._camera_coefficients[tel_id].add_index("time") def _process_telescope_pointings(self, file): """Process telescope pointing monitoring data.""" from ..monitoring import PointingInterpolator # Instantiate the pointing interpolator self._pointing_interpolator = PointingInterpolator() # Read the pointing data from the file for tel_id in self.subarray.tel_ids: self._telescope_pointings[tel_id] = read_table( file, f"{DL0_TEL_POINTING_GROUP}/tel_{tel_id:03d}", ) # Register the table with the pointing interpolator self._pointing_interpolator.add_table( tel_id, self._telescope_pointings[tel_id] ) @property def is_simulation(self): """ True for files with a simulation group at the root of the file. """ return self._is_simulation @lazyproperty def monitoring_types(self): return self._monitoring_types @lazyproperty def has_pixel_statistics(self): """ True for files that contain pixel statistics """ return MonitoringType.PIXEL_STATISTICS in self.monitoring_types @lazyproperty def has_camera_coefficients(self): """ True for files that contain camera calibration coefficients """ return MonitoringType.CAMERA_COEFFICIENTS in self.monitoring_types @lazyproperty def has_pointings(self): """ True for files that contain pointing information """ return MonitoringType.TELESCOPE_POINTINGS in self.monitoring_types @property def camera_coefficients(self): return self._camera_coefficients @property def pixel_statistics(self): return self._pixel_statistics @property def telescope_pointings(self): return self._telescope_pointings
[docs] def get_table( self, monitoring_type: MonitoringType, tel_id: int = None, **kwargs, ): if monitoring_type not in self.monitoring_types: raise KeyError( f"Monitoring type {monitoring_type} not available in this source. " f"Available types: {self.monitoring_types}" ) if monitoring_type in TELESCOPE_SPECIFIC_MONITORING and tel_id is None: raise TypeError( f"tel_id is required for {monitoring_type.name} monitoring type" ) if monitoring_type == MonitoringType.PIXEL_STATISTICS: subtype = kwargs.get("subtype") if subtype is None: raise KeyError( "subtype parameter is required for PIXEL_STATISTICS. " f"Available subtypes: {list(self.pixel_stats_dict.keys())}" ) if subtype not in self.pixel_stats_dict: raise KeyError( f"Unknown subtype '{subtype}' for PIXEL_STATISTICS. " f"Available subtypes: {list(self.pixel_stats_dict.keys())}" ) return self._pixel_statistics[tel_id][subtype] elif monitoring_type == MonitoringType.CAMERA_COEFFICIENTS: return self._camera_coefficients[tel_id] elif monitoring_type == MonitoringType.TELESCOPE_POINTINGS: return self._telescope_pointings[tel_id]
def _get_telescope_pointing_values( self, tel_id: int, time: astropy.time.Time, ): """ Get telescope pointing values for a given telescope and time. Parameters ---------- tel_id : int Telescope ID time : astropy.time.Time Target timestamp Returns ------- astropy.coordinates.SkyCoord Sky coordinate with altitude and azimuth in AltAz frame """ alt, az = self._pointing_interpolator(tel_id, time) # Get individual telescope location for proper AltAz frame location = self.subarray.tel_earth_locations[tel_id] # This is a naive SkyCoord frame that requires meteorological data # for proper transformations to the celestial coordinates. return SkyCoord( alt=alt, az=az, frame=AltAz(obstime=time, location=location), ) def _get_camera_coefficients_values( self, tel_id: int, time: astropy.time.Time, timestamp_tolerance: u.Quantity, ) -> dict: """ Get camera coefficients values for a given telescope and time. Parameters ---------- tel_id : int Telescope ID time : astropy.time.Time Target timestamp timestamp_tolerance : astropy.units.Quantity Time difference to consider two timestamps equal Returns ------- dict[str, astropy.units.Quantity | numpy.ndarray | bool] Dictionary with camera coefficient data where keys are column names (time, factor, pedestal_offset, time_shift, outlier_mask, is_valid) and values are Quantity objects with appropriate units or arrays """ # For simulation, use first entry if time is None if self.is_simulation and time is None: first_row = self._camera_coefficients[tel_id][0] return dict(zip(first_row.colnames, first_row)) return self._get_table_rows( self._camera_coefficients[tel_id], time, timestamp_tolerance ) def _get_pixel_statistics_values( self, tel_id: int, time: astropy.time.Time, subtype: str, timestamp_tolerance: u.Quantity, ) -> dict: """ Get pixel statistics values for a given telescope and time. Parameters ---------- tel_id : int Telescope ID time : astropy.time.Time Target timestamp subtype : str Subtype of pixel statistics (e.g., 'pedestal_image', 'flatfield_image') timestamp_tolerance : astropy.units.Quantity Time difference to consider two timestamps equal Returns ------- dict[str, astropy.units.Quantity | numpy.ndarray] Dictionary with pixel statistics data where keys are column names (mean, median, std) and values are Quantity objects or arrays. """ if subtype is None: raise KeyError( "subtype parameter is required for PIXEL_STATISTICS. " f"Available subtypes: {list(self.pixel_stats_dict.keys())}" ) if subtype not in self.pixel_stats_dict: raise KeyError( f"Unknown subtype '{subtype}' for PIXEL_STATISTICS. " f"Available subtypes: {list(self.pixel_stats_dict.keys())}" ) interpolator = self.pixel_stats_dict[subtype] # For simulation, use first entry if time is None if self.is_simulation and time is None: from astropy.time import Time time = Time( self._pixel_statistics[tel_id][subtype]["time_start"][0], format="mjd", ) return interpolator(tel_id, time, timestamp_tolerance)
[docs] def get_values( self, monitoring_type: MonitoringType, time: astropy.time.Time, tel_id: int | None = None, **kwargs, ): import astropy.units as u if monitoring_type not in self.monitoring_types: raise KeyError( f"Monitoring type {monitoring_type} not available in this source. " f"Available types: {self.monitoring_types}" ) if monitoring_type in TELESCOPE_SPECIFIC_MONITORING and tel_id is None: raise TypeError( f"tel_id is required for {monitoring_type.name} monitoring type" ) timestamp_tolerance = kwargs.get("timestamp_tolerance", 0.0 * u.s) if monitoring_type == MonitoringType.TELESCOPE_POINTINGS: return self._get_telescope_pointing_values(tel_id, time) elif monitoring_type == MonitoringType.CAMERA_COEFFICIENTS: return self._get_camera_coefficients_values( tel_id, time, timestamp_tolerance ) elif monitoring_type == MonitoringType.PIXEL_STATISTICS: subtype = kwargs.get("subtype") return self._get_pixel_statistics_values( tel_id, time, subtype, timestamp_tolerance )
[docs] def fill_monitoring_container(self, event: ArrayEventContainer): """ Fill the monitoring container for a given event. Parameters ---------- event : ArrayEventContainer The event to fill the monitoring container for. """ # Fill the monitoring container for the event for tel_id in self.subarray.tel_ids: time = None if self.is_simulation else event.trigger.time event.monitoring.tel[tel_id].camera = self.get_camera_monitoring_container( tel_id, time ) # Only overwrite the telescope pointings for observation data if self.has_pointings and not self.is_simulation: event.monitoring.tel[ tel_id ].pointing = self.get_telescope_pointing_container( tel_id, event.trigger.time )
[docs] def get_telescope_pointing_container( self, tel_id: int, time: astropy.time.Time ) -> TelescopePointingContainer: """ Get the telescope pointing container for a given telescope ID and time. Parameters ---------- tel_id : int The telescope ID to retrieve the monitoring data for. time : astropy.time.Time Target timestamp to find the telescope pointing data for. Returns ------- TelescopePointingContainer The telescope pointing container. """ skycoord = self.get_values( MonitoringType.TELESCOPE_POINTINGS, time=time, tel_id=tel_id ) return TelescopePointingContainer(altitude=skycoord.alt, azimuth=skycoord.az)
[docs] def get_camera_monitoring_container( self, tel_id: int, time: astropy.time.Time = None, timestamp_tolerance: u.Quantity = 0.0 * u.s, ) -> CameraMonitoringContainer: """ Retrieve the camera monitoring container with interpolated data. Parameters ---------- tel_id : int The telescope ID to retrieve the monitoring data for. time : astropy.time.Time or None Optional target timestamp(s) to find the camera monitoring data for. The target timestamp(s) are required to interpolate the monitoring data of observation. For monitoring data of simulation, the first entry of the monitoring data is typically used if no timestamp is provided. timestamp_tolerance : astropy.units.Quantity Time difference to consider two timestamps equal. Default is 0 seconds. Returns ------- CameraMonitoringContainer The camera monitoring container. """ if not self.is_simulation and time is None: raise ValueError( "Function argument 'time' must be provided for monitoring data from real observations." ) if self.is_simulation and time is not None: msg = ( "The function argument 'time' is provided, but the monitoring source is of simulated data. " "In simulations, we typically use the first entry of the monitoring data by not providing a timestamp. " "There is no proper time definition in simulated observing blocks. Besides, the simulation toolkit is not " "varying the observation conditions, e.g. raising pedestal noise level, within a given simulation run." ) self.log.warning(msg) warnings.warn(msg, UserWarning) cam_mon_container = CameraMonitoringContainer() if self.has_pixel_statistics: # Fill the the camera monitoring container with the pixel statistics pixel_stats_container = PixelStatisticsContainer() for name in self.pixel_stats_dict.keys(): stats_data = self.get_values( MonitoringType.PIXEL_STATISTICS, time=time, tel_id=tel_id, subtype=name, timestamp_tolerance=timestamp_tolerance, ) # Map any pedestal name to the container field name (unique for pedestal) container_name = "pedestal_image" if "pedestal_image" in name else name pixel_stats_container[container_name] = StatisticsContainer( mean=stats_data["mean"], median=stats_data["median"], std=stats_data["std"], ) cam_mon_container["pixel_statistics"] = pixel_stats_container if self.has_camera_coefficients: table_rows = self.get_values( MonitoringType.CAMERA_COEFFICIENTS, time=time, tel_id=tel_id, timestamp_tolerance=timestamp_tolerance, ) cam_mon_container["coefficients"] = CameraCalibrationContainer( time=table_rows["time"], pedestal_offset=table_rows["pedestal_offset"], factor=table_rows["factor"], time_shift=table_rows["time_shift"], outlier_mask=table_rows["outlier_mask"], is_valid=table_rows["is_valid"], ) return cam_mon_container
def _get_table_rows( self, table: astropy.table.Table, time: astropy.time.Time, timestamp_tolerance: u.Quantity = 0.0 * u.s, ) -> dict: """ Retrieve the rows of the table that corresponds to the target time. Parameters ---------- time : astropy.time.Time Target timestamp(s) to find the interval. table : astropy.table.Table Table containing ordered timestamp data. timestamp_tolerance : astropy.units.Quantity Time difference in seconds to consider two timestamps equal. Default is 0s. Returns ------- table_rows : dict[str, astropy.units.Quantity | numpy.ndarray | Any] Dictionary containing the column names of the original input table as keys and the corresponding data (with units preserved as Quantity objects) for the requested time(s) as values. """ mjd_times = np.atleast_1d(time.to_value("mjd")) table_times = table["time"] # Convert timestamp tolerance to MJD days tolerance_mjd = timestamp_tolerance.to_value("day") # Find the index of the closest preceding start time preceding_indices = np.searchsorted(table_times, mjd_times, side="right") - 1 time_idx = [] for mjd, preceding_index in zip(mjd_times, preceding_indices): # Check if the requested time is before the first chunk if preceding_index < 0: # If the time is before the first chunk and not within tolerance, break if (table_times[0] - tolerance_mjd) > mjd: raise ValueError( f"Out of bounds: Requested timestamp '{mjd} MJD' is before the " f"validity start '{table['time'][0]} MJD' (first entry in the table). " f"Please provide a timestamp within the validity range or increase " f"the 'timestamp_tolerance' (currently set to '{timestamp_tolerance}')." ) else: # Use the first chunk since it's within tolerance preceding_index = 0 # Check upper bounds when requested timestamp is after the last entry if preceding_index >= len(table) - 1: time_idx.append(table["time"][-1]) continue time_idx.append(table["time"][preceding_index]) # Get table row(s) and convert to dictionary table_rows = table.loc[time_idx] if len(time_idx) == 1: table_dict = ( {col: table_rows[col] for col in table_rows.colnames} if isinstance(table_rows, Row) else {col: table_rows[col][0] for col in table_rows.colnames} ) else: table_dict = {col: table_rows[col].data for col in table_rows.colnames} return table_dict