Source code for ctapipe.io.simteleventsource

import warnings
from gzip import GzipFile
from io import BufferedReader
from pathlib import Path

import numpy as np
from astropy import units as u
from astropy.coordinates import Angle
from astropy.time import Time
from eventio.file_types import is_eventio
from eventio.simtel.simtelfile import SimTelFile

from ..calib.camera.gainselection import GainSelector
from ..containers import (
    ArrayEventContainer,
    EventType,
    SimulatedEventContainer,
    SimulationConfigContainer,
    SimulatedCameraContainer,
    SimulatedShowerContainer,
    TelescopePointingContainer,
    TelescopeTriggerContainer,
)
from ..coordinates import CameraFrame
from ..core.traits import (
    Bool,
    Float,
    CaselessStrEnum,
    create_class_enum_trait,
    Undefined,
)
from ..instrument import (
    CameraDescription,
    CameraGeometry,
    CameraReadout,
    OpticsDescription,
    SubarrayDescription,
    TelescopeDescription,
)
from ..instrument.camera import UnknownPixelShapeWarning
from ..instrument.guess import unknown_telescope, guess_telescope
from .datalevels import DataLevel
from .eventsource import EventSource

X_MAX_UNIT = u.g / (u.cm ** 2)


__all__ = ["SimTelEventSource"]

# Mapping of SimTelArray Calibration trigger types to EventType:
# from simtelarray: type Dark (0), pedestal (1), in-lid LED (2) or laser/LED (3+) data.
SIMTEL_TO_CTA_EVENT_TYPE = {
    0: EventType.DARK_PEDESTAL,
    1: EventType.SKY_PEDESTAL,
    2: EventType.SINGLE_PE,
    3: EventType.FLATFIELD,
    -1: EventType.OTHER_CALIBRATION,
}


NANOSECONDS_PER_DAY = (1 * u.day).to_value(u.ns)


def parse_simtel_time(simtel_time):
    """Convert a unix time second / nanosecond tuple into astropy.time.Time"""
    return Time(
        simtel_time[0], simtel_time[1] * 1e-9, format="unix", scale="utc"  # ns to s
    )


def build_camera(cam_settings, pixel_settings, telescope, frame):
    pixel_shape = cam_settings["pixel_shape"][0]
    try:
        pix_type, pix_rotation = CameraGeometry.simtel_shape_to_type(pixel_shape)
    except ValueError:
        warnings.warn(
            f"Unkown pixel_shape {pixel_shape} for camera_type {telescope.camera_name}",
            UnknownPixelShapeWarning,
        )
        pix_type = "hexagon"
        pix_rotation = "0d"

    geometry = CameraGeometry(
        telescope.camera_name,
        pix_id=np.arange(cam_settings["n_pixels"]),
        pix_x=u.Quantity(cam_settings["pixel_x"], u.m),
        pix_y=u.Quantity(cam_settings["pixel_y"], u.m),
        pix_area=u.Quantity(cam_settings["pixel_area"], u.m ** 2),
        pix_type=pix_type,
        pix_rotation=pix_rotation,
        cam_rotation=-Angle(cam_settings["cam_rot"], u.rad),
        apply_derotation=True,
        frame=frame,
    )
    readout = CameraReadout(
        telescope.camera_name,
        sampling_rate=u.Quantity(1 / pixel_settings["time_slice"], u.GHz),
        reference_pulse_shape=pixel_settings["ref_shape"].astype("float64", copy=False),
        reference_pulse_sample_width=u.Quantity(
            pixel_settings["ref_step"], u.ns, dtype="float64"
        ),
    )

    return CameraDescription(
        camera_name=telescope.camera_name, geometry=geometry, readout=readout
    )


def apply_simtel_r1_calibration(
    r0_waveforms, pedestal, dc_to_pe, gain_selector, calib_scale=1.0, calib_shift=0.0
):
    """
    Perform the R1 calibration for R0 simtel waveforms. This includes:
        - Gain selection
        - Pedestal subtraction
        - Conversion of samples into units proportional to photoelectrons
          (If the full signal in the waveform was integrated, then the resulting
          value would be in photoelectrons.)
          (Also applies flat-fielding)

    Parameters
    ----------
    r0_waveforms : ndarray
        Raw ADC waveforms from a simtel file. All gain channels available.
        Shape: (n_channels, n_pixels, n_samples)
    pedestal : ndarray
        Pedestal stored in the simtel file for each gain channel
        Shape: (n_channels, n_pixels)
    dc_to_pe : ndarray
        Conversion factor between R0 waveform samples and ~p.e., stored in the
        simtel file for each gain channel
        Shape: (n_channels, n_pixels)
    gain_selector : ctapipe.calib.camera.gainselection.GainSelector
    calib_scale : float
        Extra global scale factor for calibration.
        Conversion factor to transform the integrated charges
        (in ADC counts) into number of photoelectrons on top of dc_to_pe.
        Defaults to no scaling.
    calib_shift: float
        Shift the resulting R1 output in p.e. for simulating miscalibration.
        Defaults to no shift.

    Returns
    -------
    r1_waveforms : ndarray
        Calibrated waveforms
        Shape: (n_pixels, n_samples)
    selected_gain_channel : ndarray
        The gain channel selected for each pixel
        Shape: (n_pixels)
    """
    n_channels, n_pixels, n_samples = r0_waveforms.shape
    ped = pedestal[..., np.newaxis]
    DC_to_PHE = dc_to_pe[..., np.newaxis]
    gain = DC_to_PHE * calib_scale
    r1_waveforms = (r0_waveforms - ped) * gain + calib_shift
    if n_channels == 1:
        selected_gain_channel = np.zeros(n_pixels, dtype=np.int8)
        r1_waveforms = r1_waveforms[0]
    else:
        selected_gain_channel = gain_selector(r0_waveforms)
        r1_waveforms = r1_waveforms[selected_gain_channel, np.arange(n_pixels)]
    return r1_waveforms, selected_gain_channel


[docs]class SimTelEventSource(EventSource): """ Read events from a SimTelArray data file (in EventIO format).""" skip_calibration_events = Bool(True, help="Skip calibration events").tag( config=True ) back_seekable = Bool( False, help=( "Require the event source to be backwards seekable." " This will reduce in slower read speed for gzipped files" " and is not possible for zstd compressed files" ), ).tag(config=True) focal_length_choice = CaselessStrEnum( ["nominal", "effective"], default_value="nominal", help=( "if both nominal and effective focal lengths are available in the " "SimTelArray file, which one to use when constructing the " "SubarrayDescription (which will be used in CameraFrame to TelescopeFrame " "coordinate transforms. The 'nominal' focal length is the one used during " "the simulation, the 'effective' focal length is computed using specialized " "ray-tracing from a point light source" ), ).tag(config=True) gain_selector_type = create_class_enum_trait( base_class=GainSelector, default_value="ThresholdGainSelector" ).tag(config=True) calib_scale = Float( default_value=1.0, help=( "Factor to transform ADC counts into number of photoelectrons." " Corrects the DC_to_PHE factor." ), ).tag(config=True) calib_shift = Float( default_value=0.0, help=( "Factor to shift the R1 photoelectron samples. " "Can be used to simulate mis-calibration." ), ).tag(config=True) def __init__(self, input_url=Undefined, config=None, parent=None, **kwargs): """ EventSource for simtelarray files using the pyeventio library. Parameters ---------- config : traitlets.loader.Config Configuration specified by config file or cmdline arguments. Used to set traitlet values. Set to None if no configuration to pass. tool : ctapipe.core.Tool Tool executable that is calling this component. Passes the correct logger to the component. Set to None if no Tool to pass. gain_selector : ctapipe.calib.camera.gainselection.GainSelector The GainSelector to use. If None, then ThresholdGainSelector will be used. kwargs """ super().__init__(input_url=input_url, config=config, parent=parent, **kwargs) self.file_ = SimTelFile( self.input_url.expanduser(), allowed_telescopes=self.allowed_tels, skip_calibration=self.skip_calibration_events, zcat=not self.back_seekable, ) if self.back_seekable and self.is_stream: raise IOError("back seekable was required but not possible for inputfile") self._subarray_info = self.prepare_subarray_info( self.file_.telescope_descriptions, self.file_.header ) self._simulation_config = self._parse_simulation_header() self.start_pos = self.file_.tell() self.gain_selector = GainSelector.from_name( self.gain_selector_type, parent=self ) self.log.debug(f"Using gain selector {self.gain_selector}") def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): self.file_.close()
@property def is_simulation(self): return True @property def datalevels(self): return (DataLevel.R0, DataLevel.R1) @property def obs_ids(self): # ToDo: This does not support merged simtel files! return [self.file_.header["run"]] @property def simulation_config(self) -> SimulationConfigContainer: return self._simulation_config @property def is_stream(self): return not isinstance(self.file_._filehandle, (BufferedReader, GzipFile))
[docs] def prepare_subarray_info(self, telescope_descriptions, header): """ Constructs a SubarrayDescription object from the ``telescope_descriptions`` given by ``SimTelFile`` Parameters ---------- telescope_descriptions: dict telescope descriptions as given by ``SimTelFile.telescope_descriptions`` header: dict header as returned by ``SimTelFile.header`` Returns ------- SubarrayDescription : instrumental information """ tel_descriptions = {} # tel_id : TelescopeDescription tel_positions = {} # tel_id : TelescopeDescription for tel_id, telescope_description in telescope_descriptions.items(): cam_settings = telescope_description["camera_settings"] pixel_settings = telescope_description["pixel_settings"] n_pixels = cam_settings["n_pixels"] focal_length = u.Quantity(cam_settings["focal_length"], u.m) mirror_area = u.Quantity(cam_settings["mirror_area"], u.m ** 2) if self.focal_length_choice == "effective": try: focal_length = u.Quantity( cam_settings["effective_focal_length"], u.m ) except KeyError as err: raise RuntimeError( f"the SimTelEventSource option 'focal_length_choice' was set to " f"{self.focal_length_choice}, but the effective focal length " f"was not present in the file. ({err})" ) try: telescope = guess_telescope(n_pixels, focal_length) except ValueError: telescope = unknown_telescope(mirror_area, n_pixels) optics = OpticsDescription( name=telescope.name, num_mirrors=telescope.n_mirrors, equivalent_focal_length=focal_length, mirror_area=mirror_area, num_mirror_tiles=cam_settings["n_mirrors"], ) camera = build_camera( cam_settings, pixel_settings, telescope, frame=CameraFrame(focal_length=optics.equivalent_focal_length), ) tel_descriptions[tel_id] = TelescopeDescription( name=telescope.name, tel_type=telescope.type, optics=optics, camera=camera, ) tel_idx = np.where(header["tel_id"] == tel_id)[0][0] tel_positions[tel_id] = header["tel_pos"][tel_idx] * u.m subarray = SubarrayDescription( name="MonteCarloArray", tel_positions=tel_positions, tel_descriptions=tel_descriptions, ) if self.allowed_tels: subarray = subarray.select_subarray(self.allowed_tels) return subarray
[docs] @staticmethod def is_compatible(file_path): return is_eventio(Path(file_path).expanduser())
@property def subarray(self): return self._subarray_info def _generator(self): if self.file_.tell() > self.start_pos: self.file_._next_header_pos = 0 warnings.warn("Backseeking to start of file.") try: yield from self._generate_events() except EOFError: msg = 'EOFError reading from "{input_url}". Might be truncated'.format( input_url=self.input_url ) self.log.warning(msg) warnings.warn(msg) def _generate_events(self): data = ArrayEventContainer() data.simulation = SimulatedEventContainer() data.meta["origin"] = "hessio" data.meta["input_url"] = self.input_url data.meta["max_events"] = self.max_events self._fill_array_pointing(data) for counter, array_event in enumerate(self.file_): event_id = array_event.get("event_id", -1) obs_id = self.file_.header["run"] data.count = counter data.index.obs_id = obs_id data.index.event_id = event_id self._fill_trigger_info(data, array_event) if data.trigger.event_type == EventType.SUBARRAY: self._fill_simulated_event_information(data, array_event) # this should be done in a nicer way to not re-allocate the # data each time (right now it's just deleted and garbage # collected) data.r0.tel.clear() data.r1.tel.clear() data.dl0.tel.clear() data.dl1.tel.clear() data.pointing.tel.clear() data.simulation.tel.clear() telescope_events = array_event["telescope_events"] tracking_positions = array_event["tracking_positions"] for tel_id, telescope_event in telescope_events.items(): adc_samples = telescope_event.get("adc_samples") if adc_samples is None: adc_samples = telescope_event["adc_sums"][:, :, np.newaxis] n_gains, n_pixels, n_samples = adc_samples.shape true_image = ( array_event.get("photoelectrons", {}) .get(tel_id - 1, {}) .get("photoelectrons", None) ) data.simulation.tel[tel_id] = SimulatedCameraContainer( true_image=true_image ) data.pointing.tel[tel_id] = self._fill_event_pointing( tracking_positions[tel_id] ) r0 = data.r0.tel[tel_id] r1 = data.r1.tel[tel_id] r0.waveform = adc_samples cam_mon = array_event["camera_monitorings"][tel_id] pedestal = cam_mon["pedestal"] / cam_mon["n_ped_slices"] dc_to_pe = array_event["laser_calibrations"][tel_id]["calib"] # fill dc_to_pe and pedestal_per_sample info into monitoring # container mon = data.mon.tel[tel_id] mon.calibration.dc_to_pe = dc_to_pe mon.calibration.pedestal_per_sample = pedestal r1.waveform, r1.selected_gain_channel = apply_simtel_r1_calibration( adc_samples, pedestal, dc_to_pe, self.gain_selector, self.calib_scale, self.calib_shift, ) # get time_shift from laser calibration time_calib = array_event["laser_calibrations"][tel_id]["tm_calib"] pix_index = np.arange(n_pixels) dl1_calib = data.calibration.tel[tel_id].dl1 dl1_calib.time_shift = time_calib[r1.selected_gain_channel, pix_index] yield data @staticmethod def _fill_event_pointing(tracking_position): azimuth_raw = tracking_position["azimuth_raw"] altitude_raw = tracking_position["altitude_raw"] azimuth_cor = tracking_position.get("azimuth_cor", np.nan) altitude_cor = tracking_position.get("altitude_cor", np.nan) # take pointing corrected position if available if np.isnan(azimuth_cor): azimuth = u.Quantity(azimuth_raw, u.rad, copy=False) else: azimuth = u.Quantity(azimuth_cor, u.rad, copy=False) # take pointing corrected position if available if np.isnan(altitude_cor): altitude = u.Quantity(altitude_raw, u.rad, copy=False) else: altitude = u.Quantity(altitude_cor, u.rad, copy=False) return TelescopePointingContainer(azimuth=azimuth, altitude=altitude) def _fill_trigger_info(self, data, array_event): trigger = array_event["trigger_information"] if array_event["type"] == "data": data.trigger.event_type = EventType.SUBARRAY elif array_event["type"] == "calibration": # if using eventio >= 1.1.1, we can use the calibration_type data.trigger.event_type = SIMTEL_TO_CTA_EVENT_TYPE.get( array_event.get("calibration_type", -1), EventType.OTHER_CALIBRATION ) else: data.trigger.event_type = EventType.UNKNOWN data.trigger.tels_with_trigger = trigger["triggered_telescopes"] if self.allowed_tels: data.trigger.tels_with_trigger = np.intersect1d( data.trigger.tels_with_trigger, self.subarray.tel_ids, assume_unique=True, ) central_time = parse_simtel_time(trigger["gps_time"]) data.trigger.time = central_time for tel_id, time in zip( trigger["triggered_telescopes"], trigger["trigger_times"] ): if self.allowed_tels and tel_id not in self.allowed_tels: continue # telescope time is relative to central trigger in ns time = Time( central_time.jd1, central_time.jd2 + time / NANOSECONDS_PER_DAY, scale=central_time.scale, format="jd", ) # triggered pixel info n_trigger_pixels = -1 trigger_pixels = None tel_event = array_event["telescope_events"].get(tel_id) if tel_event: # code 0 = trigger pixels pixel_list = tel_event["pixel_lists"].get(0) if pixel_list: n_trigger_pixels = pixel_list["pixels"] trigger_pixels = pixel_list["pixel_list"] trigger = data.trigger.tel[tel_id] = TelescopeTriggerContainer( time=time, n_trigger_pixels=n_trigger_pixels, trigger_pixels=trigger_pixels, ) def _fill_array_pointing(self, data): if self.file_.header["tracking_mode"] == 0: az, alt = self.file_.header["direction"] data.pointing.array_altitude = u.Quantity(alt, u.rad) data.pointing.array_azimuth = u.Quantity(az, u.rad) else: ra, dec = self.file_.header["direction"] data.pointing.array_ra = u.Quantity(ra, u.rad) data.pointing.array_dec = u.Quantity(dec, u.rad) def _parse_simulation_header(self): mc_run_head = self.file_.mc_run_headers[-1] return SimulationConfigContainer( corsika_version=mc_run_head["shower_prog_vers"], simtel_version=mc_run_head["detector_prog_vers"], energy_range_min=mc_run_head["E_range"][0] * u.TeV, energy_range_max=mc_run_head["E_range"][1] * u.TeV, prod_site_B_total=mc_run_head["B_total"] * u.uT, prod_site_B_declination=Angle(mc_run_head["B_declination"], u.rad), prod_site_B_inclination=Angle(mc_run_head["B_inclination"], u.rad), prod_site_alt=mc_run_head["obsheight"] * u.m, spectral_index=mc_run_head["spectral_index"], shower_prog_start=mc_run_head["shower_prog_start"], shower_prog_id=mc_run_head["shower_prog_id"], detector_prog_start=mc_run_head["detector_prog_start"], detector_prog_id=mc_run_head["detector_prog_id"], num_showers=mc_run_head["n_showers"], shower_reuse=mc_run_head["n_use"], max_alt=mc_run_head["alt_range"][1] * u.rad, min_alt=mc_run_head["alt_range"][0] * u.rad, max_az=mc_run_head["az_range"][1] * u.rad, min_az=mc_run_head["az_range"][0] * u.rad, diffuse=mc_run_head["diffuse"], max_viewcone_radius=mc_run_head["viewcone"][1] * u.deg, min_viewcone_radius=mc_run_head["viewcone"][0] * u.deg, max_scatter_range=mc_run_head["core_range"][1] * u.m, min_scatter_range=mc_run_head["core_range"][0] * u.m, core_pos_mode=mc_run_head["core_pos_mode"], injection_height=mc_run_head["injection_height"] * u.m, atmosphere=mc_run_head["atmosphere"], corsika_iact_options=mc_run_head["corsika_iact_options"], corsika_low_E_model=mc_run_head["corsika_low_E_model"], corsika_high_E_model=mc_run_head["corsika_high_E_model"], corsika_bunchsize=mc_run_head["corsika_bunchsize"], corsika_wlen_min=mc_run_head["corsika_wlen_min"] * u.nm, corsika_wlen_max=mc_run_head["corsika_wlen_max"] * u.nm, corsika_low_E_detail=mc_run_head["corsika_low_E_detail"], corsika_high_E_detail=mc_run_head["corsika_high_E_detail"], ) @staticmethod def _fill_simulated_event_information(data, array_event): mc_event = array_event["mc_event"] mc_shower = array_event["mc_shower"] data.simulation.shower = SimulatedShowerContainer( energy=u.Quantity(mc_shower["energy"], u.TeV), alt=Angle(mc_shower["altitude"], u.rad), az=Angle(mc_shower["azimuth"], u.rad), core_x=u.Quantity(mc_event["xcore"], u.m), core_y=u.Quantity(mc_event["ycore"], u.m), h_first_int=u.Quantity(mc_shower["h_first_int"], u.m), x_max=u.Quantity(mc_shower["xmax"], X_MAX_UNIT), shower_primary_id=mc_shower["primary_id"], )