Source code for ctapipe.io.dl1eventsource

import astropy.units as u
from astropy.utils.decorators import lazyproperty
import logging
import numpy as np
import tables

from ..core import Container, Field
from ..instrument import SubarrayDescription
from ..containers import (
    ConcentrationContainer,
    ArrayEventContainer,
    DL1CameraContainer,
    EventIndexContainer,
    CameraHillasParametersContainer,
    HillasParametersContainer,
    IntensityStatisticsContainer,
    LeakageContainer,
    MorphologyContainer,
    SimulationConfigContainer,
    SimulatedShowerContainer,
    SimulatedEventContainer,
    PeakTimeStatisticsContainer,
    CameraTimingParametersContainer,
    TimingParametersContainer,
    TriggerContainer,
    ImageParametersContainer,
)
from .eventsource import EventSource
from .hdf5tableio import HDF5TableReader
from .datalevels import DataLevel
from ..utils import IndexFinder


__all__ = ["DL1EventSource"]


logger = logging.getLogger(__name__)


COMPATIBLE_DL1_VERSIONS = [
    "v1.0.0",
    "v1.0.1",
    "v1.0.2",
    "v1.0.3",
    "v1.1.0",
    "v1.2.0",
    "v2.0.0",
    "v2.1.0",
]


[docs]class DL1EventSource(EventSource): """ Event source for files in the ctapipe DL1 format. For general information about the concept of event sources, take a look at the parent class ctapipe.io.EventSource. To use this event source, create an instance of this class specifying the file to be read. Looping over the EventSource yields events from the _generate_events method. An event equals an ArrayEventContainer instance. See ctapipe.containers.ArrayEventContainer for details. Attributes ---------- input_url: str Path to the input event file. file: tables.File File object obs_ids: list Observation ids of te recorded runs. For unmerged files, this should only contain a single number. subarray: ctapipe.instrument.SubarrayDescription The subarray configuration of the recorded run. datalevels: Tuple One or both of ctapipe.io.datalevels.DataLevel.DL1_IMAGES and ctapipe.io.datalevels.DataLevel.DL1_PARAMETERS depending on the information present in the file. is_simulation: Boolean Whether the events are simulated or observed. simulation_configs: Dict Mapping of obs_id to ctapipe.containers.SimulationConfigContainer if the file contains simulated events. has_simulated_dl1: Boolean Whether the file contains simulated camera images and/or image parameters evaluated on these. """ def __init__(self, input_url=None, config=None, parent=None, **kwargs): """ EventSource for dl1 files in the standard DL1 data format Parameters: ----------- input_url : str Path of the file to load config : traitlets.loader.Config Configuration specified by config file or cmdline arguments. Used to set traitlet values. Set to None if no configuration to pass. parent: Parent from which the config is used. Mutually exclusive with config kwargs """ super().__init__(input_url=input_url, config=config, parent=parent, **kwargs) self.file_ = tables.open_file(self.input_url) self._full_subarray_info = SubarrayDescription.from_hdf(self.input_url) if self.allowed_tels: self._subarray_info = self._full_subarray_info.select_subarray( self.allowed_tels ) else: self._subarray_info = self._full_subarray_info self._simulation_configs = self._parse_simulation_configs() self.datamodel_version = self.file_.root._v_attrs[ "CTA PRODUCT DATA MODEL VERSION" ] params = "parameters" in self.file_.root.dl1.event.telescope images = "images" in self.file_.root.dl1.event.telescope if params and images: self._datalevels = (DataLevel.DL1_IMAGES, DataLevel.DL1_PARAMETERS) elif params: self._datalevels = (DataLevel.DL1_PARAMETERS,) elif images: self._datalevels = (DataLevel.DL1_IMAGES,) def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): self.file_.close()
[docs] @staticmethod def is_compatible(file_path): with open(file_path, "rb") as f: magic_number = f.read(8) if magic_number != b"\x89HDF\r\n\x1a\n": return False with tables.open_file(file_path) as f: metadata = f.root._v_attrs if "CTA PRODUCT DATA LEVEL" not in metadata._v_attrnames: return False if "DL1" not in metadata["CTA PRODUCT DATA LEVEL"]: return False if "CTA PRODUCT DATA MODEL VERSION" not in metadata._v_attrnames: return False version = metadata["CTA PRODUCT DATA MODEL VERSION"] if version not in COMPATIBLE_DL1_VERSIONS: logger.error( f"File is DL1 file but has unsupported version {version}" f", supported versions are {COMPATIBLE_DL1_VERSIONS}" ) return False return True
@property def is_simulation(self): """ True for files with a simulation group at the root of the file. """ return "simulation" in self.file_.root @property def has_simulated_dl1(self): """ True for files with telescope-wise event information in the simulation group """ if self.is_simulation: if "telescope" in self.file_.root.simulation.event: return True return False @property def subarray(self): return self._subarray_info @property def datalevels(self): return self._datalevels @lazyproperty def obs_ids(self): return list(np.unique(self.file_.root.dl1.event.subarray.trigger.col("obs_id"))) @property def simulation_config(self): """ Returns the simulation config. In case of a merged file, this will be a list of simulation configs. """ if len(self._simulation_configs) == 1: return next(iter(self._simulation_configs.values())) return self._simulation_configs def _generator(self): yield from self._generate_events() def __len__(self): return len(self.file_.root.dl1.event.subarray.trigger) def _parse_simulation_configs(self): """ Construct a dict of SimulationConfigContainers from the self.file_.root.configuration.simulation.run. These are used to match the correct header to each event """ # Just returning next(reader) would work as long as there are no merged files # The reader ignores obs_id making the setup somewhat tricky # This is ugly but supports multiple headers so each event can have # the correct mcheader assigned by matching the obs_id # Alternatively this becomes a flat list # and the obs_id matching part needs to be done in _generate_events() class ObsIdContainer(Container): container_prefix = "" obs_id = Field(-1) simulation_configs = {} if "simulation" in self.file_.root.configuration: reader = HDF5TableReader(self.file_).read( "/configuration/simulation/run", containers=[SimulationConfigContainer(), ObsIdContainer()], ) for (config, index) in reader: simulation_configs[index.obs_id] = config return simulation_configs def _generate_events(self): """ Yield ArrayEventContainer to iterate through events. """ data = ArrayEventContainer() # Maybe take some other metadata, but there are still some 'unknown' # written out by the stage1 tool data.meta["origin"] = self.file_.root._v_attrs["CTA PROCESS TYPE"] data.meta["input_url"] = self.input_url data.meta["max_events"] = self.max_events self.reader = HDF5TableReader(self.file_) if DataLevel.DL1_IMAGES in self.datalevels: image_readers = { tel.name: self.reader.read( f"/dl1/event/telescope/images/{tel.name}", DL1CameraContainer(), ignore_columns={"parameters"}, ) for tel in self.file_.root.dl1.event.telescope.images } if self.has_simulated_dl1: simulated_image_iterators = { tel.name: self.file_.root.simulation.event.telescope.images[ tel.name ].iterrows() for tel in self.file_.root.simulation.event.telescope.images } if DataLevel.DL1_PARAMETERS in self.datalevels: param_readers = { tel.name: self.reader.read( f"/dl1/event/telescope/parameters/{tel.name}", containers=[ ( HillasParametersContainer() if (self.datamodel_version >= "v2.1.0") else CameraHillasParametersContainer(prefix="hillas") ), ( TimingParametersContainer() if (self.datamodel_version >= "v2.1.0") else CameraTimingParametersContainer(prefix="timing") ), LeakageContainer(), ConcentrationContainer(), MorphologyContainer(), IntensityStatisticsContainer(), PeakTimeStatisticsContainer(), ], prefixes=True, ) for tel in self.file_.root.dl1.event.telescope.parameters } if self.has_simulated_dl1: simulated_param_readers = { tel.name: self.reader.read( f"/simulation/event/telescope/parameters/{tel.name}", containers=[ ( HillasParametersContainer() if (self.datamodel_version >= "v2.1.0") else CameraHillasParametersContainer(prefix="hillas") ), LeakageContainer(), ConcentrationContainer(), MorphologyContainer(), IntensityStatisticsContainer(), ], prefixes=True, ) for tel in self.file_.root.dl1.event.telescope.parameters } if self.is_simulation: # simulated shower wide information mc_shower_reader = HDF5TableReader(self.file_).read( "/simulation/event/subarray/shower", SimulatedShowerContainer(), prefixes="true", ) data.simulation = SimulatedEventContainer() # Setup iterators for the array events events = HDF5TableReader(self.file_).read( "/dl1/event/subarray/trigger", [TriggerContainer(), EventIndexContainer()], ignore_columns={"tel"}, ) array_pointing_finder = IndexFinder( self.file_.root.dl1.monitoring.subarray.pointing.col("time") ) tel_pointing_finder = { tel.name: IndexFinder(tel.col("time")) for tel in self.file_.root.dl1.monitoring.telescope.pointing } for counter, (trigger, index) in enumerate(events): data.dl1.tel.clear() if self.is_simulation: data.simulation.tel.clear() data.pointing.tel.clear() data.trigger.tel.clear() data.count = counter data.trigger = trigger data.index = index data.trigger.tels_with_trigger = self._full_subarray_info.tel_mask_to_tel_ids( data.trigger.tels_with_trigger ) if self.allowed_tels: data.trigger.tels_with_trigger = np.intersect1d( data.trigger.tels_with_trigger, np.array(list(self.allowed_tels)) ) # Maybe there is a simpler way to do this # Beware: tels_with_trigger contains all triggered telescopes whereas # the telescope trigger table contains only the subset of # allowed_tels given during the creation of the dl1 file for i in self.file_.root.dl1.event.telescope.trigger.where( f"(obs_id=={data.index.obs_id}) & (event_id=={data.index.event_id})" ): if self.allowed_tels and i["tel_id"] not in self.allowed_tels: continue if self.datamodel_version == "v1.0.0": data.trigger.tel[i["tel_id"]].time = i["telescopetrigger_time"] else: data.trigger.tel[i["tel_id"]].time = i["time"] self._fill_array_pointing(data, array_pointing_finder) self._fill_telescope_pointing(data, tel_pointing_finder) if self.is_simulation: data.simulation.shower = next(mc_shower_reader) for tel in data.trigger.tel.keys(): key = f"tel_{tel:03d}" if self.allowed_tels and tel not in self.allowed_tels: continue if self.has_simulated_dl1: simulated = data.simulation.tel[tel] if DataLevel.DL1_IMAGES in self.datalevels: if key not in image_readers: logger.debug( f"Triggered telescope {tel} is missing " "from the image table." ) continue data.dl1.tel[tel] = next(image_readers[key]) if self.has_simulated_dl1: if key not in simulated_image_iterators: logger.warning( f"Triggered telescope {tel} is missing " "from the simulated image table, but was present at the " "reconstructed image table." ) continue simulated_image_row = next( simulated_image_iterators[f"tel_{tel:03d}"] ) simulated.true_image = simulated_image_row["true_image"] if DataLevel.DL1_PARAMETERS in self.datalevels: if f"tel_{tel:03d}" not in param_readers.keys(): logger.debug( f"Triggered telescope {tel} is missing " "from the parameters table." ) continue # Is there a smarter way to unpack this? # Best would probbaly be if we could directly read # into the ImageParametersContainer params = next(param_readers[f"tel_{tel:03d}"]) data.dl1.tel[tel].parameters = ImageParametersContainer( hillas=params[0], timing=params[1], leakage=params[2], concentration=params[3], morphology=params[4], intensity_statistics=params[5], peak_time_statistics=params[6], ) if self.has_simulated_dl1: if f"tel_{tel:03d}" not in param_readers.keys(): logger.debug( f"Triggered telescope {tel} is missing " "from the simulated parameters table, but was " "present at the reconstructed parameters table." ) continue simulated_params = next( simulated_param_readers[f"tel_{tel:03d}"] ) simulated.true_parameters = ImageParametersContainer( hillas=simulated_params[0], leakage=simulated_params[1], concentration=simulated_params[2], morphology=simulated_params[3], intensity_statistics=simulated_params[4], ) yield data def _fill_array_pointing(self, data, array_pointing_finder): """ Fill the array pointing information of a given event """ # Only unique pointings are stored, so reader.read() wont work as easily # Thats why we match the pointings based on trigger time closest_time_index = array_pointing_finder.closest(data.trigger.time.mjd) array_pointing = self.file_.root.dl1.monitoring.subarray.pointing data.pointing.array_azimuth = u.Quantity( array_pointing[closest_time_index]["array_azimuth"], array_pointing.attrs["array_azimuth_UNIT"], ) data.pointing.array_altitude = u.Quantity( array_pointing[closest_time_index]["array_altitude"], array_pointing.attrs["array_altitude_UNIT"], ) data.pointing.array_ra = u.Quantity( array_pointing[closest_time_index]["array_ra"], array_pointing.attrs["array_ra_UNIT"], ) data.pointing.array_dec = u.Quantity( array_pointing[closest_time_index]["array_dec"], array_pointing.attrs["array_dec_UNIT"], ) def _fill_telescope_pointing(self, data, tel_pointing_finder): """ Fill the telescope pointing information of a given event """ # Same comments as to _fill_array_pointing apply for tel in data.trigger.tel.keys(): if self.allowed_tels and tel not in self.allowed_tels: continue tel_pointing_table = self.file_.root.dl1.monitoring.telescope.pointing[ f"tel_{tel:03d}" ] closest_time_index = tel_pointing_finder[f"tel_{tel:03d}"].closest( data.trigger.tel[tel].time ) pointing_telescope = tel_pointing_table data.pointing.tel[tel].azimuth = u.Quantity( pointing_telescope[closest_time_index]["azimuth"], pointing_telescope.attrs["azimuth_UNIT"], ) data.pointing.tel[tel].altitude = u.Quantity( pointing_telescope[closest_time_index]["altitude"], pointing_telescope.attrs["altitude_UNIT"], )