Source code for ctapipe.tools.process

"""
Generate DL1 (a or b) output files in HDF5 format from {R0,R1,DL0} inputs.
"""
# pylint: disable=W0201
import sys

from tqdm.auto import tqdm

from ..calib import CameraCalibrator, GainSelector
from ..core import QualityQuery, Tool
from ..core.traits import Bool, classes_with_traits, flag
from ..image import ImageCleaner, ImageModifier, ImageProcessor
from ..image.extractor import ImageExtractor
from ..image.muon import MuonProcessor
from ..instrument import SoftwareTrigger
from ..io import (
    DataLevel,
    DataWriter,
    EventSource,
    metadata,
    write_table,
)
from ..io.datawriter import DATA_MODEL_VERSION
from ..reco import Reconstructor, ShowerProcessor
from ..utils import EventTypeFilter

COMPATIBLE_DATALEVELS = [
    DataLevel.R1,
    DataLevel.DL0,
    DataLevel.DL1_IMAGES,
    DataLevel.DL1_PARAMETERS,
]

__all__ = ["ProcessorTool"]


[docs] class ProcessorTool(Tool): """ Process data from lower-data levels up to DL1 and DL2, including image extraction and optionally image parameterization as well as muon analysis and shower reconstruction. Note that the muon analysis and shower reconstruction both depend on parametrized images and therefore compute image parameters even if DataWriter.write_dl1_parameters=False in case these are not already present in the input file. """ name = "ctapipe-process" description = ( __doc__ + f" This currently uses data model version {DATA_MODEL_VERSION}" ) examples = """ To process data with all default values: > ctapipe-process --input events.simtel.gz --output events.dl1.h5 --progress Or use an external configuration file, where you can specify all options: > ctapipe-process --config stage1_config.json --progress The config file should be in JSON or python format (see traitlets docs). For an example, see ctapipe/examples/stage1_config.json in the main code repo. """ progress_bar = Bool( help="show progress bar during processing", default_value=False ).tag(config=True) force_recompute_dl1 = Bool( help="Enforce dl1 recomputation even if already present in the input file", default_value=False, ).tag(config=True) force_recompute_dl2 = Bool( help="Enforce dl2 recomputation even if already present in the input file", default_value=False, ).tag(config=True) aliases = { ("i", "input"): "EventSource.input_url", ("o", "output"): "DataWriter.output_path", ("t", "allowed-tels"): "EventSource.allowed_tels", ("m", "max-events"): "EventSource.max_events", "reconstructor": "ShowerProcessor.reconstructor_types", "image-cleaner-type": "ImageProcessor.image_cleaner_type", } flags = { "overwrite": ( {"DataWriter": {"overwrite": True}}, "Overwrite output file if it exists", ), **flag( "progress", "ProcessorTool.progress_bar", "show a progress bar during event processing", "don't show a progress bar during event processing", ), **flag( "recompute-dl1", "ProcessorTool.force_recompute_dl1", "Enforce DL1 recomputation even if already present in the input file", "Only compute DL1 if there are no DL1b parameters in the file", ), **flag( "recompute-dl2", "ProcessorTool.force_recompute_dl2", "Enforce DL2 recomputation even if already present in the input file", "Only compute DL2 if there is no shower reconstruction in the file", ), **flag( "write-images", "DataWriter.write_dl1_images", "store DL1/Event/Telescope images in output", "don't store DL1/Event/Telescope images in output", ), **flag( "write-parameters", "DataWriter.write_dl1_parameters", "store DL1/Event/Telescope parameters in output", "don't store DL1/Event/Telescope parameters in output", ), **flag( "write-showers", "DataWriter.write_dl2", "store DL2/Event parameters in output", "don't DL2/Event parameters in output", ), **flag( "write-index-tables", "DataWriter.write_index_tables", "generate PyTables index tables for the parameter and image datasets", ), **flag( "write-muon-parameters", "DataWriter.write_muon_parameters", "store DL1/Event/Telescope muon parameters in output", "don't store DL1/Event/Telescope muon parameters in output", ), "camera-frame": ( {"ImageProcessor": {"use_telescope_frame": False}}, "Use camera frame for image parameters instead of telescope frame", ), } classes = ( [ CameraCalibrator, DataWriter, ImageProcessor, MuonProcessor, ShowerProcessor, metadata.Instrument, metadata.Contact, SoftwareTrigger, ] + classes_with_traits(EventSource) + classes_with_traits(ImageCleaner) + classes_with_traits(ImageExtractor) + classes_with_traits(GainSelector) + classes_with_traits(QualityQuery) + classes_with_traits(ImageModifier) + classes_with_traits(EventTypeFilter) + classes_with_traits(Reconstructor) )
[docs] def setup(self): # setup components: self.event_source = self.enter_context(EventSource(parent=self)) if not self.event_source.has_any_datalevel(COMPATIBLE_DATALEVELS): self.log.critical( "%s needs the EventSource to provide at least one of these datalevels: %s" ", %s provides only %s", self.name, COMPATIBLE_DATALEVELS, self.event_source, self.event_source.datalevels, ) sys.exit(1) subarray = self.event_source.subarray self.software_trigger = SoftwareTrigger(parent=self, subarray=subarray) self.calibrate = CameraCalibrator(parent=self, subarray=subarray) self.process_images = ImageProcessor(subarray=subarray, parent=self) self.process_shower = ShowerProcessor( subarray=subarray, atmosphere_profile=self.event_source.atmosphere_density_profile, parent=self, ) self.write = self.enter_context( DataWriter(event_source=self.event_source, parent=self) ) self.process_muons = None if self.should_compute_muon_parameters: self.process_muons = MuonProcessor(subarray=subarray, parent=self) self.event_type_filter = EventTypeFilter(parent=self)
@property def should_compute_dl2(self): """returns true if we should compute DL2 info""" if self.force_recompute_dl2: return True return self.write.write_dl2 @property def should_compute_dl1(self): """returns true if we should compute DL1 info""" if self.force_recompute_dl1: return True if DataLevel.DL1_PARAMETERS in self.event_source.datalevels: return False return ( self.write.write_dl1_parameters or self.should_compute_dl2 or self.should_compute_muon_parameters ) @property def should_calibrate(self): """returns true if data should be calibrated""" if self.force_recompute_dl1: return True if ( self.write.write_dl1_images and DataLevel.DL1_IMAGES not in self.event_source.datalevels ): return True if self.should_compute_dl1: return DataLevel.DL1_IMAGES not in self.event_source.datalevels return False @property def should_compute_muon_parameters(self): """returns true if we should compute muon parameters info""" if self.write.write_muon_parameters: return True return False def _write_processing_statistics(self): """write out the event selection stats, etc.""" # NOTE: don't remove this, not part of DataWriter if self.should_compute_dl1: image_stats = self.process_images.check_image.to_table(functions=True) write_table( image_stats, self.write.output_path, path="/dl1/service/image_statistics", append=True, ) if self.should_compute_dl2: reconstructors = self.process_shower.reconstructors reconstructor_names = self.process_shower.reconstructor_types for reconstructor_name, reconstructor in zip( reconstructor_names, reconstructors ): write_table( reconstructor.quality_query.to_table(functions=True), self.write.output_path, f"/dl2/service/tel_event_statistics/{reconstructor_name}", append=True, )
[docs] def start(self): """ Process events """ self.log.info("applying calibration: %s", self.should_calibrate) self.log.info("(re)compute DL1: %s", self.should_compute_dl1) self.log.info("(re)compute DL2: %s", self.should_compute_dl2) self.log.info( "compute muon parameters: %s", self.should_compute_muon_parameters ) self.event_source.subarray.info(printer=self.log.info) for event in tqdm( self.event_source, desc=self.event_source.__class__.__name__, total=self.event_source.max_events, unit="ev", disable=not self.progress_bar, ): self.log.debug("Processing event_id=%s", event.index.event_id) if not self.event_type_filter(event): continue if not self.software_trigger(event): self.log.debug( "Skipping event %i due to software trigger", event.index.event_id ) continue if self.should_calibrate: self.calibrate(event) if self.should_compute_dl1: self.process_images(event) if self.should_compute_muon_parameters: self.process_muons(event) if self.should_compute_dl2: self.process_shower(event) self.write(event)
[docs] def finish(self): """ Last steps after processing events. """ shower_dists = self.event_source.simulated_shower_distributions self.write.write_simulated_shower_distributions(shower_dists) self._write_processing_statistics()
def main(): """run the tool""" tool = ProcessorTool() tool.run() if __name__ == "__main__": main()