Source code for ctapipe.tools.calculate_pixel_stats

"""
Perform statistics calculation from pixel-wise image data
"""

import pathlib

import numpy as np
from astropy.table import vstack

from ctapipe.containers import EventType
from ctapipe.core import Tool
from ctapipe.core.tool import ToolConfigurationError
from ctapipe.core.traits import (
    CInt,
    Path,
    Set,
    Unicode,
    classes_with_traits,
)
from ctapipe.exceptions import InputMissing
from ctapipe.io import HDF5Merger, write_table
from ctapipe.io.hdf5dataformat import (
    DL1_COLUMN_NAMES,
    DL1_PIXEL_HISTOGRAMS_GROUP,
    DL1_PIXEL_STATISTICS_GROUP,
)
from ctapipe.io.tableloader import TableLoader
from ctapipe.monitoring.calculator import PixelStatisticsCalculator

__all__ = ["PixelStatisticsCalculatorTool"]


[docs] class PixelStatisticsCalculatorTool(Tool): """ Perform statistics calculation for pixel-wise image data """ name = "ctapipe-calculate-pixel-statistics" description = "Perform statistics calculation for pixel-wise image data" examples = """ To calculate statistics of pixel-wise image data files: > ctapipe-calculate-pixel-statistics --TableLoader.input_url input.dl1.h5 --output_path /path/monitoring.h5 --overwrite """ allowed_tels = Set( trait=CInt(), default_value=None, allow_none=True, help=( "List of allowed tel_ids, others will be ignored. " "If None, all telescopes in the input stream will be included." ), ).tag(config=True) input_column_name = Unicode( default_value="image", allow_none=False, help="Column name of the pixel-wise image data to calculate statistics", ).tag(config=True) output_path = Path( help="Output filename", default_value=pathlib.Path("monitoring.h5") ).tag(config=True) aliases = { ("i", "input_url"): "TableLoader.input_url", ("o", "output_path"): "PixelStatisticsCalculatorTool.output_path", } flags = { "overwrite": ( {"HDF5Merger": {"overwrite": True}}, "Overwrite existing files", ), "append": ( {"HDF5Merger": {"append": True}}, "Append to existing files", ), } classes = [ TableLoader, ] + classes_with_traits(PixelStatisticsCalculator)
[docs] def setup(self): if self.output_path is None: self.log.critical( "Setting output_path is required (via -o, --output or a config file)." ) self.exit(1) # Read the input data with the 'TableLoader' try: self.input_data = self.enter_context( TableLoader( parent=self, dl1_images=True, # Ensure that dl1 images are read ) ) except InputMissing: self.log.critical( "Specifying TableLoader.input_url is required (via -i, --input or a config file)." ) self.exit(1) # Copy selected tables from the input file to the output file self.log.info( "Copying selected data and metadata to output destination using the HDF5Merger component." ) # Disable the copy of waveforms and images in the HDF5Merger self.log.debug( "Overwriting the default configuration of the HDF5Merger " "component to disable the copy of waveforms and images." ) with HDF5Merger( parent=self, output_path=self.output_path, simulation=False, r0_waveforms=False, r1_waveforms=False, dl1_images=False, processing_statistics=False, merge_strategy="events-single-ob", ) as merger: merger(self.input_data.input_url) # Select a new subarray if the allowed_tels configuration is used self.subarray = ( self.input_data.subarray if self.allowed_tels is None else self.input_data.subarray.select_subarray(self.allowed_tels) ) # Initialization of the statistics calculator self.stats_calculator = PixelStatisticsCalculator( parent=self, subarray=self.subarray )
[docs] def start(self): """Iterate over all telescopes, process their statistics, and save the results.""" # Track the last used table group and name for the summary log statement output_table_name = "" table_group = "" for tel_id in self.subarray.tel_ids: # 1. Load and validate dl1_table = self.input_data.read_telescope_events(telescopes=[tel_id]) if not self._is_valid_table(dl1_table, tel_id): continue # 2. Reshape and calculate stats self._reshape_dl1_dimensions(dl1_table) aggregated_stats = self._process_telescope_stats(dl1_table, tel_id) # 3. Determine output paths and write out results event_type_name = EventType(dl1_table["event_type"][0]).name.lower() output_table_name = f"{event_type_name}_{self.input_column_name}" table_group = ( DL1_PIXEL_HISTOGRAMS_GROUP if "histogram" in aggregated_stats.colnames else DL1_PIXEL_STATISTICS_GROUP ) write_table( aggregated_stats, self.output_path, f"{table_group}/{output_table_name}/tel_{tel_id:03d}", overwrite=self.overwrite, ) if output_table_name and table_group: self.log.info( "DL1 monitoring data was stored in '%s' under '%s'", self.output_path, f"{table_group}/{output_table_name}", )
def _is_valid_table(self, table, tel_id): # Check if the dl1 table is empty and skip the telescope if so if len(table) == 0: self.log.warning( "No dl1 images found for telescope 'tel_id=%d'. Skipping.", tel_id, ) return False # Check if the chunk size does not exceed the table length of the input data if self.stats_calculator.stats_aggregators[ self.stats_calculator.stats_aggregator_type.tel[tel_id] ].chunking.chunk_size > len(table): raise ToolConfigurationError( f"Change --SizeChunking.chunk_size to decrease the chunk size " f"of the aggregation to a maximum of '{len(table)}' (table length of the " f"input data for telescope 'tel_id={tel_id}')." ) # Check if the input column name is in the table if self.input_column_name not in table.colnames: raise ToolConfigurationError( f"Column '{self.input_column_name}' not found " f"in the input data for telescope 'tel_id={tel_id}'." ) return True def _reshape_dl1_dimensions(self, dl1_table): """Check if the dl1 data is gain selected and add an extra dimension.""" for col in DL1_COLUMN_NAMES: if col in dl1_table.colnames and dl1_table[col].ndim == 2: dl1_table[col] = dl1_table[col][:, np.newaxis] def _process_telescope_stats(self, dl1_table, tel_id): """Perform first and (if necessary) second pass statistics calculation.""" # First pass stats = self.stats_calculator.first_pass( table=dl1_table, tel_id=tel_id, col_name=self.input_column_name, ) # Check if 'chunk_shift' is configured for overlapping chunks agg_type = self.stats_calculator.stats_aggregator_type.tel[tel_id] aggregator = self.stats_calculator.stats_aggregators[agg_type] has_chunk_shift = (getattr(aggregator.chunking, "chunk_shift") or 0) > 0 if not has_chunk_shift: return stats # Guard clause: Skip second pass if no faulty chunks exist if not np.any(~stats["is_valid"].data): self.log.info( "No faulty chunks found for telescope 'tel_id=%d'. Skipping second pass.", tel_id, ) return stats # Second pass execution and combining results stats_secondpass = self.stats_calculator.second_pass( table=dl1_table, valid_chunks=stats["is_valid"].data, tel_id=tel_id, col_name=self.input_column_name, ) combined_stats = vstack([stats, stats_secondpass]) combined_stats.sort(["time_start"]) return combined_stats
[docs] def finish(self): self.log.info("Tool is shutting down")
def main(): # Run the tool tool = PixelStatisticsCalculatorTool() tool.run() if __name__ == "main": main()