import enum
import warnings
from contextlib import nullcontext
from enum import Enum, auto, unique
from gzip import GzipFile
from io import BufferedReader
from pathlib import Path
from typing import Dict, Optional, Union
import numpy as np
from astropy import units as u
from astropy.coordinates import Angle, EarthLocation
from astropy.table import Table
from astropy.time import Time
from eventio.file_types import is_eventio
from eventio.simtel.simtelfile import SimTelFile
from ..atmosphere import (
AtmosphereDensityProfile,
FiveLayerAtmosphereDensityProfile,
TableAtmosphereDensityProfile,
)
from ..calib.camera.gainselection import GainSelector
from ..containers import (
ArrayEventContainer,
CoordinateFrameType,
EventIndexContainer,
EventType,
ObservationBlockContainer,
ObservationBlockState,
ObservingMode,
PixelStatusContainer,
PointingContainer,
PointingMode,
R0CameraContainer,
R1CameraContainer,
SchedulingBlockContainer,
SchedulingBlockType,
SimulatedCameraContainer,
SimulatedEventContainer,
SimulatedShowerContainer,
SimulationConfigContainer,
TelescopeImpactParameterContainer,
TelescopePointingContainer,
TelescopeTriggerContainer,
TriggerContainer,
)
from ..coordinates import CameraFrame, shower_impact_distance
from ..core import Map
from ..core.provenance import Provenance
from ..core.traits import Bool, ComponentName, Float, Undefined, UseEnum
from ..instrument import (
CameraDescription,
CameraGeometry,
CameraReadout,
FocalLengthKind,
OpticsDescription,
ReflectorShape,
SubarrayDescription,
TelescopeDescription,
)
from ..instrument.camera import UnknownPixelShapeWarning
from ..instrument.guess import (
GuessingResult,
guess_telescope,
type_from_mirror_area,
unknown_telescope,
)
from .datalevels import DataLevel
from .eventsource import EventSource
__all__ = [
"SimTelEventSource",
"read_atmosphere_profile_from_simtel",
"AtmosphereProfileKind",
"MirrorClass",
]
# Mapping of SimTelArray Calibration trigger types to EventType:
# from simtelarray: type Dark (0), pedestal (1), in-lid LED (2) or laser/LED (3+) data.
SIMTEL_TO_CTA_EVENT_TYPE = {
0: EventType.DARK_PEDESTAL,
1: EventType.SKY_PEDESTAL,
2: EventType.SINGLE_PE,
3: EventType.FLATFIELD,
-1: EventType.OTHER_CALIBRATION,
}
_half_pi = 0.5 * np.pi
_half_pi_maxval = (1 + 1e-6) * _half_pi
def _clip_altitude_if_close(altitude):
"""
Round absolute values slightly larger than pi/2 in float64 to pi/2
These can come from simtel_array because float32(pi/2) > float64(pi/2)
and simtel using float32.
Astropy complains about these values, so we fix them here.
"""
if altitude > _half_pi and altitude < _half_pi_maxval:
return _half_pi
if altitude < -_half_pi and altitude > -_half_pi_maxval:
return -_half_pi
return altitude
[docs]@enum.unique
class MirrorClass(enum.Enum):
"""Enum for the sim_telarray MIRROR_CLASS values"""
SINGLE_SEGMENTED_MIRROR = 0
SINGLE_SOLID_PARABOLIC_MIRROR = 1
DUAL_MIRROR = 2
X_MAX_UNIT = u.g / (u.cm**2)
NANOSECONDS_PER_DAY = (1 * u.day).to_value(u.ns)
def parse_simtel_time(simtel_time):
"""Convert a unix time second / nanosecond tuple into astropy.time.Time"""
return Time(
simtel_time[0], simtel_time[1] * 1e-9, format="unix", scale="utc" # ns to s
)
def _location_from_meta(global_meta):
"""Extract reference location of subarray from metadata"""
lat = global_meta.get(b"*LATITUDE")
lon = global_meta.get(b"*LONGITUDE")
height = global_meta.get(b"ALTITUDE")
if lat is None or lon is None or height is None:
return None
return EarthLocation(
lon=float(lon) * u.deg,
lat=float(lat) * u.deg,
height=float(height) * u.m,
)
def build_camera(simtel_telescope, telescope, frame):
"""Create CameraDescription from eventio data structures"""
camera_settings = simtel_telescope["camera_settings"]
pixel_settings = simtel_telescope["pixel_settings"]
camera_organization = simtel_telescope["camera_organization"]
pixel_shape = camera_settings["pixel_shape"][0]
try:
pix_type, pix_rotation = CameraGeometry.simtel_shape_to_type(pixel_shape)
except ValueError:
warnings.warn(
f"Unkown pixel_shape {pixel_shape} for camera_type {telescope.camera_name}",
UnknownPixelShapeWarning,
)
pix_type = "hexagon"
pix_rotation = "0d"
geometry = CameraGeometry(
telescope.camera_name,
pix_id=np.arange(camera_settings["n_pixels"]),
pix_x=u.Quantity(camera_settings["pixel_x"], u.m),
pix_y=u.Quantity(camera_settings["pixel_y"], u.m),
pix_area=u.Quantity(camera_settings["pixel_area"], u.m**2),
pix_type=pix_type,
pix_rotation=pix_rotation,
cam_rotation=-Angle(camera_settings["cam_rot"], u.rad),
apply_derotation=True,
frame=frame,
)
readout = CameraReadout(
telescope.camera_name,
sampling_rate=u.Quantity(1 / pixel_settings["time_slice"], u.GHz),
reference_pulse_shape=pixel_settings["ref_shape"].astype("float64", copy=False),
reference_pulse_sample_width=u.Quantity(
pixel_settings["ref_step"], u.ns, dtype="float64"
),
n_channels=camera_organization["n_gains"],
n_pixels=camera_organization["n_pixels"],
n_samples=pixel_settings["sum_bins"],
)
return CameraDescription(
name=telescope.camera_name,
geometry=geometry,
readout=readout,
)
def _telescope_from_meta(telescope_meta, mirror_area):
optics_name = telescope_meta.get(b"OPTICS_CONFIG_NAME")
camera_name = telescope_meta.get(b"CAMERA_CONFIG_NAME")
mirror_class = telescope_meta.get(b"MIRROR_CLASS")
if optics_name is None or camera_name is None or mirror_class is None:
return None
telescope_type = type_from_mirror_area(mirror_area)
mirror_class = MirrorClass(int(mirror_class))
reflector_shape = ReflectorShape.UNKNOWN
if mirror_class is MirrorClass.DUAL_MIRROR:
reflector_shape = ReflectorShape.SCHWARZSCHILD_COUDER
elif mirror_class is MirrorClass.SINGLE_SEGMENTED_MIRROR:
if int(telescope_meta.get(b"PARABOLIC_DISH", 0)) == 1:
reflector_shape = ReflectorShape.PARABOLIC
else:
reflector_shape = ReflectorShape.DAVIES_COTTON
shape_length = float(telescope_meta.get(b"DISH_SHAPE_Length", 0))
focal_length = float(telescope_meta.get(b"FOCAL_Length", 0))
if shape_length != focal_length:
reflector_shape = ReflectorShape.HYBRID
n_mirrors = 2 if mirror_class is MirrorClass.DUAL_MIRROR else 1
return GuessingResult(
type=telescope_type,
reflector_shape=reflector_shape,
name=optics_name.decode("utf-8"),
camera_name=camera_name.decode("utf-8"),
n_mirrors=n_mirrors,
)
def apply_simtel_r1_calibration(
r0_waveforms, pedestal, dc_to_pe, gain_selector, calib_scale=1.0, calib_shift=0.0
):
"""
Perform the R1 calibration for R0 simtel waveforms. This includes:
- Gain selection
- Pedestal subtraction
- Conversion of samples into units proportional to photoelectrons
(If the full signal in the waveform was integrated, then the resulting
value would be in photoelectrons.)
(Also applies flat-fielding)
Parameters
----------
r0_waveforms : ndarray
Raw ADC waveforms from a simtel file. All gain channels available.
Shape: (n_channels, n_pixels, n_samples)
pedestal : ndarray
Pedestal stored in the simtel file for each gain channel
Shape: (n_channels, n_pixels)
dc_to_pe : ndarray
Conversion factor between R0 waveform samples and ~p.e., stored in the
simtel file for each gain channel
Shape: (n_channels, n_pixels)
gain_selector : ctapipe.calib.camera.gainselection.GainSelector
calib_scale : float
Extra global scale factor for calibration.
Conversion factor to transform the integrated charges
(in ADC counts) into number of photoelectrons on top of dc_to_pe.
Defaults to no scaling.
calib_shift: float
Shift the resulting R1 output in p.e. for simulating miscalibration.
Defaults to no shift.
Returns
-------
r1_waveforms : ndarray
Calibrated waveforms
Shape: (n_pixels, n_samples)
selected_gain_channel : ndarray
The gain channel selected for each pixel
Shape: (n_pixels)
"""
n_channels, n_pixels, n_samples = r0_waveforms.shape
ped = pedestal[..., np.newaxis]
DC_to_PHE = dc_to_pe[..., np.newaxis]
gain = DC_to_PHE * calib_scale
r1_waveforms = (r0_waveforms - ped) * gain + calib_shift
if n_channels == 1:
selected_gain_channel = np.zeros(n_pixels, dtype=np.int8)
r1_waveforms = r1_waveforms[0]
else:
selected_gain_channel = gain_selector(r0_waveforms)
r1_waveforms = r1_waveforms[selected_gain_channel, np.arange(n_pixels)]
return r1_waveforms, selected_gain_channel
[docs]@unique
class AtmosphereProfileKind(Enum):
"""
choice for which kind of atmosphere density profile to load if more than
one is present"""
#: Don't load a profile
NONE = auto()
#: Use a TableAtmosphereDensityProfile
TABLE = auto()
#: Use a FiveLayerAtmosphereDensityProfile
FIVELAYER = auto()
#: Try TABLE first, and if it doesn't exist, use FIVELAYER
AUTO = auto()
[docs]def read_atmosphere_profile_from_simtel(
simtelfile: Union[str, Path, SimTelFile], kind=AtmosphereProfileKind.AUTO
) -> Optional[TableAtmosphereDensityProfile]:
"""Read an atmosphere profile from a SimTelArray file as an astropy Table
Parameters
----------
simtelfile: str | SimTelFile
filename of a SimTelArray file containing an atmosphere profile
kind: AtmosphereProfileKind | str
which type of model to load. In AUTO mode, table is tried first,
and if it doesn't exist, fivelayer is used.
Returns
-------
Optional[TableAtmosphereDensityProfile]:
Profile read from a table, with interpolation
"""
if not isinstance(kind, AtmosphereProfileKind):
raise TypeError(
"read_atmosphere_profile_from_simtel: kind should be a AtmosphereProfileKind"
)
profiles = []
if kind == AtmosphereProfileKind.NONE:
return None
if isinstance(simtelfile, (str, Path)):
context_manager = SimTelFile(simtelfile)
Provenance().add_input_file(
filename=simtelfile, role="ctapipe.atmosphere.AtmosphereDensityProfile"
)
else:
context_manager = nullcontext(simtelfile)
with context_manager as simtel:
if (
not hasattr(simtel, "atmospheric_profiles")
or len(simtel.atmospheric_profiles) == 0
):
return []
for atmo in simtel.atmospheric_profiles:
metadata = dict(
observation_level=atmo["obslevel"] * u.cm,
atmosphere_id=atmo["id"],
atmosphere_name=atmo["name"].decode("utf-8"),
atmosphere_height=atmo["htoa"] * u.cm,
)
if "altitude_km" in atmo and kind in {
AtmosphereProfileKind.TABLE,
AtmosphereProfileKind.AUTO,
}:
table = Table(
dict(
height=atmo["altitude_km"] * u.km,
density=atmo["rho"] * u.g / u.cm**3,
column_density=atmo["thickness"] * u.g / u.cm**2,
),
meta=metadata,
)
profiles.append(TableAtmosphereDensityProfile(table=table))
elif (
kind == AtmosphereProfileKind.FIVELAYER
and "five_layer_atmosphere" in atmo
):
profiles.append(
FiveLayerAtmosphereDensityProfile.from_array(
atmo["five_layer_atmosphere"], metadata=metadata
)
)
else:
raise ValueError(f"Couldn't load requested profile '{kind}'")
if profiles:
return profiles[0]
return None
[docs]class SimTelEventSource(EventSource):
"""
Read events from a SimTelArray data file (in EventIO format).
ctapipe makes use of the sim_telarray metadata system to fill some
information not directly accessible from the data inside the files itself.
Make sure you set this parameters in the simulation configuration to fully
make use of ctapipe. In future, ctapipe might require these metadata parameters.
This includes:
* Reference Point of the telescope coordinates. Make sure to include the
user-defined parameters ``LONGITUDE`` and ``LATITUDE`` with the geodetic
coordinates of the array reference point. Also make sure the ``ALTITUDE``
config parameter is included in the global metadata.
* Names of the optical structures and the cameras are read from
``OPTICS_CONFIG_NAME`` and ``CAMERA_CONFIG_NAME``, make sure to include
these in the telescope meta.
* The ``MIRROR_CLASS`` should also be included in the telescope meta
to correctly setup coordinate transforms.
If these parameters are not included in the input data, ctapipe will
fallback guesses these based on avaible data and the list of known telescopes
for `ctapipe.instrument.guess_telescope`.
"""
skip_calibration_events = Bool(True, help="Skip calibration events").tag(
config=True
)
back_seekable = Bool(
False,
help=(
"Require the event source to be backwards seekable."
" This will reduce in slower read speed for gzipped files"
" and is not possible for zstd compressed files"
),
).tag(config=True)
focal_length_choice = UseEnum(
FocalLengthKind,
default_value=FocalLengthKind.EFFECTIVE,
help=(
"If both nominal and effective focal lengths are available in the"
" SimTelArray file, which one to use for the `~ctapipe.coordinates.CameraFrame`"
" attached to the `~ctapipe.instrument.CameraGeometry` instances in"
" the `~ctapipe.instrument.SubarrayDescription`, which will be used in"
" CameraFrame to TelescopeFrame coordinate transforms. "
" The 'nominal' focal length is the one used during "
" the simulation, the 'effective' focal length is computed using specialized "
" ray-tracing from a point light source"
),
).tag(config=True)
gain_selector_type = ComponentName(
GainSelector, default_value="ThresholdGainSelector"
).tag(config=True)
calib_scale = Float(
default_value=1.0,
help=(
"Factor to transform ADC counts into number of photoelectrons."
" Corrects the DC_to_PHE factor."
),
).tag(config=True)
calib_shift = Float(
default_value=0.0,
help=(
"Factor to shift the R1 photoelectron samples. "
"Can be used to simulate mis-calibration."
),
).tag(config=True)
atmosphere_profile_choice = UseEnum(
AtmosphereProfileKind,
default_value=AtmosphereProfileKind.AUTO,
help=(
"Which type of atmosphere density profile to load "
"from the file, in case more than one exists. If set "
"to AUTO, TABLE will be attempted first and if missing, "
"FIVELAYER will be loaded."
),
).tag(config=True)
def __init__(self, input_url=Undefined, config=None, parent=None, **kwargs):
"""
EventSource for simtelarray files using the pyeventio library.
Parameters
----------
config : traitlets.loader.Config
Configuration specified by config file or cmdline arguments.
Used to set traitlet values.
Set to None if no configuration to pass.
tool : ctapipe.core.Tool
Tool executable that is calling this component.
Passes the correct logger to the component.
Set to None if no Tool to pass.
gain_selector : ctapipe.calib.camera.gainselection.GainSelector
The GainSelector to use. If None, then ThresholdGainSelector will be used.
kwargs
"""
super().__init__(input_url=input_url, config=config, parent=parent, **kwargs)
self.file_ = SimTelFile(
self.input_url.expanduser(),
allowed_telescopes=self.allowed_tels,
skip_calibration=self.skip_calibration_events,
zcat=not self.back_seekable,
)
if self.back_seekable and self.is_stream:
raise IOError("back seekable was required but not possible for inputfile")
self._subarray_info = self.prepare_subarray_info(
self.file_.telescope_descriptions, self.file_.header
)
(
self._scheduling_blocks,
self._observation_blocks,
) = self._fill_scheduling_and_observation_blocks()
self._simulation_config = self._parse_simulation_header()
self.start_pos = self.file_.tell()
self.gain_selector = GainSelector.from_name(
self.gain_selector_type, parent=self
)
self._atmosphere_density_profile = read_atmosphere_profile_from_simtel(
self.file_, kind=self.atmosphere_profile_choice
)
self.log.debug(f"Using gain selector {self.gain_selector}")
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs] def close(self):
self.file_.close()
@property
def is_simulation(self):
return True
@property
def datalevels(self):
return (DataLevel.R0, DataLevel.R1)
@property
def simulation_config(self) -> Dict[int, SimulationConfigContainer]:
return self._simulation_config
@property
def atmosphere_density_profile(self) -> AtmosphereDensityProfile:
return self._atmosphere_density_profile
@property
def observation_blocks(self) -> Dict[int, ObservationBlockContainer]:
"""
Obtain the ObservationConfigurations from the EventSource, indexed by obs_id
"""
return self._observation_blocks
@property
def scheduling_blocks(self) -> Dict[int, SchedulingBlockContainer]:
"""
Obtain the ObservationConfigurations from the EventSource, indexed by obs_id
"""
return self._scheduling_blocks
@property
def is_stream(self):
return not isinstance(self.file_._filehandle, (BufferedReader, GzipFile))
[docs] def prepare_subarray_info(self, telescope_descriptions, header):
"""
Constructs a SubarrayDescription object from the
``telescope_descriptions`` given by ``SimTelFile``
Parameters
----------
telescope_descriptions: dict
telescope descriptions as given by ``SimTelFile.telescope_descriptions``
header: dict
header as returned by ``SimTelFile.header``
Returns
-------
SubarrayDescription :
instrumental information
"""
tel_descriptions = {} # tel_id : TelescopeDescription
tel_positions = {} # tel_id : TelescopeDescription
self.telescope_indices_original = {}
for tel_id, telescope_description in telescope_descriptions.items():
cam_settings = telescope_description["camera_settings"]
n_pixels = cam_settings["n_pixels"]
mirror_area = u.Quantity(cam_settings["mirror_area"], u.m**2)
equivalent_focal_length = u.Quantity(cam_settings["focal_length"], u.m)
effective_focal_length = u.Quantity(
cam_settings.get("effective_focal_length", np.nan), u.m
)
telescope = _telescope_from_meta(
self.file_.telescope_meta.get(tel_id, {}),
mirror_area,
)
if telescope is None:
try:
telescope = guess_telescope(
n_pixels,
equivalent_focal_length,
cam_settings["n_mirrors"],
)
except ValueError:
telescope = unknown_telescope(mirror_area, n_pixels)
# TODO: switch to warning or even an exception once
# we start relying on this.
self.log.info(
"Could not determine telescope from sim_telarray metadata,"
" guessing using builtin lookup-table: %d: %s",
tel_id,
telescope,
)
optics = OpticsDescription(
name=telescope.name,
size_type=telescope.type,
reflector_shape=telescope.reflector_shape,
n_mirrors=telescope.n_mirrors,
equivalent_focal_length=equivalent_focal_length,
effective_focal_length=effective_focal_length,
mirror_area=mirror_area,
n_mirror_tiles=cam_settings["n_mirrors"],
)
if self.focal_length_choice is FocalLengthKind.EFFECTIVE:
if np.isnan(effective_focal_length):
raise RuntimeError(
"`SimTelEventSource.focal_length_choice` was set to 'EFFECTIVE'"
", but the effective focal length was not present in the file."
" Set `focal_length_choice='EQUIVALENT'` or make sure"
" input files contain the effective focal length"
)
focal_length = effective_focal_length
elif self.focal_length_choice is FocalLengthKind.EQUIVALENT:
focal_length = equivalent_focal_length
else:
raise ValueError(
f"Invalid focal length choice: {self.focal_length_choice}"
)
camera = build_camera(
telescope_description,
telescope,
frame=CameraFrame(focal_length=focal_length),
)
tel_descriptions[tel_id] = TelescopeDescription(
name=telescope.name,
optics=optics,
camera=camera,
)
tel_idx = np.where(header["tel_id"] == tel_id)[0][0]
self.telescope_indices_original[tel_id] = tel_idx
tel_positions[tel_id] = header["tel_pos"][tel_idx] * u.m
name = self.file_.global_meta.get(b"ARRAY_CONFIG_NAME", b"MonteCarloArray")
subarray = SubarrayDescription(
name=name.decode(),
tel_positions=tel_positions,
tel_descriptions=tel_descriptions,
reference_location=_location_from_meta(self.file_.global_meta),
)
self.n_telescopes_original = len(subarray)
if self.allowed_tels:
subarray = subarray.select_subarray(self.allowed_tels)
return subarray
[docs] @staticmethod
def is_compatible(file_path):
path = Path(file_path).expanduser()
if not path.is_file():
return False
return is_eventio(path)
@property
def subarray(self):
return self._subarray_info
def _generator(self):
if self.file_.tell() > self.start_pos:
self.file_._next_header_pos = 0
warnings.warn("Backseeking to start of file.")
try:
yield from self._generate_events()
except EOFError:
msg = 'EOFError reading from "{input_url}". Might be truncated'.format(
input_url=self.input_url
)
self.log.warning(msg)
warnings.warn(msg)
def _generate_events(self):
# for events without event_id, we use negative event_ids
pseudo_event_id = 0
for counter, array_event in enumerate(self.file_):
event_id = array_event.get("event_id", 0)
if event_id == 0:
pseudo_event_id -= 1
event_id = pseudo_event_id
obs_id = self.file_.header["run"]
trigger = self._fill_trigger_info(array_event)
if trigger.event_type == EventType.SUBARRAY:
shower = self._fill_simulated_event_information(array_event)
else:
shower = None
data = ArrayEventContainer(
simulation=SimulatedEventContainer(shower=shower),
pointing=self._fill_array_pointing(),
index=EventIndexContainer(obs_id=obs_id, event_id=event_id),
count=counter,
trigger=trigger,
)
data.meta["origin"] = "hessio"
data.meta["input_url"] = self.input_url
data.meta["max_events"] = self.max_events
telescope_events = array_event["telescope_events"]
tracking_positions = array_event["tracking_positions"]
photoelectron_sums = array_event.get("photoelectron_sums")
if photoelectron_sums is not None:
true_image_sums = photoelectron_sums.get(
"n_pe", np.full(self.n_telescopes_original, np.nan)
)
else:
true_image_sums = np.full(self.n_telescopes_original, np.nan)
if data.simulation.shower is not None:
# compute impact distances of the shower to the telescopes
impact_distances = shower_impact_distance(
shower_geom=data.simulation.shower, subarray=self.subarray
)
else:
impact_distances = np.full(len(self.subarray), np.nan) * u.m
for tel_id, telescope_event in telescope_events.items():
adc_samples = telescope_event.get("adc_samples")
if adc_samples is None:
adc_samples = telescope_event["adc_sums"][:, :, np.newaxis]
n_gains, n_pixels, n_samples = adc_samples.shape
true_image = (
array_event.get("photoelectrons", {})
.get(tel_id - 1, {})
.get("photoelectrons", None)
)
if data.simulation is not None:
if data.simulation.shower is not None:
impact_container = TelescopeImpactParameterContainer(
distance=impact_distances[
self.subarray.tel_index_array[tel_id]
],
distance_uncert=0 * u.m,
prefix="true_impact",
)
else:
impact_container = TelescopeImpactParameterContainer(
prefix="true_impact",
)
data.simulation.tel[tel_id] = SimulatedCameraContainer(
true_image_sum=true_image_sums[
self.telescope_indices_original[tel_id]
],
true_image=true_image,
impact=impact_container,
)
data.pointing.tel[tel_id] = self._fill_event_pointing(
tracking_positions[tel_id]
)
data.r0.tel[tel_id] = R0CameraContainer(waveform=adc_samples)
cam_mon = array_event["camera_monitorings"][tel_id]
pedestal = cam_mon["pedestal"] / cam_mon["n_ped_slices"]
dc_to_pe = array_event["laser_calibrations"][tel_id]["calib"]
# fill dc_to_pe and pedestal_per_sample info into monitoring
# container
mon = data.mon.tel[tel_id]
mon.calibration.dc_to_pe = dc_to_pe
mon.calibration.pedestal_per_sample = pedestal
mon.pixel_status = self._get_pixels_status(tel_id)
r1_waveform, selected_gain_channel = apply_simtel_r1_calibration(
adc_samples,
pedestal,
dc_to_pe,
self.gain_selector,
self.calib_scale,
self.calib_shift,
)
data.r1.tel[tel_id] = R1CameraContainer(
waveform=r1_waveform,
selected_gain_channel=selected_gain_channel,
)
# get time_shift from laser calibration
time_calib = array_event["laser_calibrations"][tel_id]["tm_calib"]
pix_index = np.arange(n_pixels)
dl1_calib = data.calibration.tel[tel_id].dl1
dl1_calib.time_shift = time_calib[selected_gain_channel, pix_index]
yield data
def _get_pixels_status(self, tel_id):
tel = self.file_.telescope_descriptions[tel_id]
n_pixels = tel["camera_organization"]["n_pixels"]
n_gains = tel["camera_organization"]["n_gains"]
disabled_ids = tel["disabled_pixels"]["HV_disabled"]
disabled_pixels = np.zeros((n_gains, n_pixels), dtype=bool)
disabled_pixels[:, disabled_ids] = True
return PixelStatusContainer(
hardware_failing_pixels=disabled_pixels,
pedestal_failing_pixels=disabled_pixels.copy(),
flatfield_failing_pixels=disabled_pixels.copy(),
)
@staticmethod
def _fill_event_pointing(tracking_position):
azimuth_raw = tracking_position["azimuth_raw"]
altitude_raw = tracking_position["altitude_raw"]
azimuth_cor = tracking_position.get("azimuth_cor", np.nan)
altitude_cor = tracking_position.get("altitude_cor", np.nan)
# take pointing corrected position if available
if np.isnan(azimuth_cor):
azimuth = u.Quantity(azimuth_raw, u.rad, copy=False)
else:
azimuth = u.Quantity(azimuth_cor, u.rad, copy=False)
# take pointing corrected position if available
if np.isnan(altitude_cor):
altitude = u.Quantity(
_clip_altitude_if_close(altitude_raw), u.rad, copy=False
)
else:
altitude = u.Quantity(
_clip_altitude_if_close(altitude_cor), u.rad, copy=False
)
return TelescopePointingContainer(azimuth=azimuth, altitude=altitude)
def _fill_trigger_info(self, array_event):
trigger = array_event["trigger_information"]
if array_event["type"] == "data":
event_type = EventType.SUBARRAY
elif array_event["type"] == "calibration":
# if using eventio >= 1.1.1, we can use the calibration_type
event_type = SIMTEL_TO_CTA_EVENT_TYPE.get(
array_event.get("calibration_type", -1), EventType.OTHER_CALIBRATION
)
else:
event_type = EventType.UNKNOWN
if self.allowed_tels:
tels_with_trigger = np.intersect1d(
trigger["triggered_telescopes"],
self.subarray.tel_ids,
assume_unique=True,
)
else:
tels_with_trigger = trigger["triggered_telescopes"]
central_time = parse_simtel_time(trigger["gps_time"])
tel = Map(TelescopeTriggerContainer)
for tel_id, time in zip(
trigger["triggered_telescopes"], trigger["trigger_times"]
):
if self.allowed_tels and tel_id not in self.allowed_tels:
continue
# telescope time is relative to central trigger in ns
time = Time(
central_time.jd1,
central_time.jd2 + time / NANOSECONDS_PER_DAY,
scale=central_time.scale,
format="jd",
)
# triggered pixel info
n_trigger_pixels = -1
trigger_pixels = None
tel_event = array_event["telescope_events"].get(tel_id)
if tel_event:
# code 0 = trigger pixels
pixel_list = tel_event["pixel_lists"].get(0)
if pixel_list:
n_trigger_pixels = pixel_list["pixels"]
trigger_pixels = pixel_list["pixel_list"]
tel[tel_id] = TelescopeTriggerContainer(
time=time,
n_trigger_pixels=n_trigger_pixels,
trigger_pixels=trigger_pixels,
)
return TriggerContainer(
event_type=event_type,
time=central_time,
tels_with_trigger=tels_with_trigger,
tel=tel,
)
def _fill_array_pointing(self):
if self.file_.header["tracking_mode"] == 0:
az, alt = self.file_.header["direction"]
return PointingContainer(
array_altitude=u.Quantity(alt, u.rad),
array_azimuth=u.Quantity(az, u.rad),
)
else:
ra, dec = self.file_.header["direction"]
return PointingContainer(
array_ra=u.Quantity(ra, u.rad),
array_dec=u.Quantity(dec, u.rad),
)
def _parse_simulation_header(self):
"""
Parse the simulation infos and return a dict with
observation ids mapped to SimulationConfigContainers.
As merged simtel files are not supported at this
point in time, this dictionary will always have
length 1.
"""
assert len(self.obs_ids) == 1
obs_id = self.obs_ids[0]
# With only one run, we can take the first entry:
mc_run_head = self.file_.mc_run_headers[-1]
simulation_config = SimulationConfigContainer(
corsika_version=mc_run_head["shower_prog_vers"],
simtel_version=mc_run_head["detector_prog_vers"],
energy_range_min=mc_run_head["E_range"][0] * u.TeV,
energy_range_max=mc_run_head["E_range"][1] * u.TeV,
prod_site_B_total=mc_run_head["B_total"] * u.uT,
prod_site_B_declination=Angle(mc_run_head["B_declination"], u.rad),
prod_site_B_inclination=Angle(mc_run_head["B_inclination"], u.rad),
prod_site_alt=mc_run_head["obsheight"] * u.m,
spectral_index=mc_run_head["spectral_index"],
shower_prog_start=mc_run_head["shower_prog_start"],
shower_prog_id=mc_run_head["shower_prog_id"],
detector_prog_start=mc_run_head["detector_prog_start"],
detector_prog_id=mc_run_head["detector_prog_id"],
n_showers=mc_run_head["n_showers"],
shower_reuse=mc_run_head["n_use"],
max_alt=_clip_altitude_if_close(mc_run_head["alt_range"][1]) * u.rad,
min_alt=_clip_altitude_if_close(mc_run_head["alt_range"][0]) * u.rad,
max_az=mc_run_head["az_range"][1] * u.rad,
min_az=mc_run_head["az_range"][0] * u.rad,
diffuse=mc_run_head["diffuse"],
max_viewcone_radius=mc_run_head["viewcone"][1] * u.deg,
min_viewcone_radius=mc_run_head["viewcone"][0] * u.deg,
max_scatter_range=mc_run_head["core_range"][1] * u.m,
min_scatter_range=mc_run_head["core_range"][0] * u.m,
core_pos_mode=mc_run_head["core_pos_mode"],
injection_height=mc_run_head["injection_height"] * u.m,
atmosphere=mc_run_head["atmosphere"],
corsika_iact_options=mc_run_head["corsika_iact_options"],
corsika_low_E_model=mc_run_head["corsika_low_E_model"],
corsika_high_E_model=mc_run_head["corsika_high_E_model"],
corsika_bunchsize=mc_run_head["corsika_bunchsize"],
corsika_wlen_min=mc_run_head["corsika_wlen_min"] * u.nm,
corsika_wlen_max=mc_run_head["corsika_wlen_max"] * u.nm,
corsika_low_E_detail=mc_run_head["corsika_low_E_detail"],
corsika_high_E_detail=mc_run_head["corsika_high_E_detail"],
)
return {obs_id: simulation_config}
def _fill_scheduling_and_observation_blocks(self):
"""fill scheduling and observation blocks must be run after the
simulation config is filled
"""
az, alt = self.file_.header["direction"]
obs_id = self.file_.header["run"]
sb_dict = {
obs_id: SchedulingBlockContainer(
sb_id=np.uint64(obs_id), # simulations have no SBs, use OB id
sb_type=SchedulingBlockType.OBSERVATION,
producer_id="simulation",
observing_mode=ObservingMode.UNKNOWN,
pointing_mode=PointingMode.DRIFT,
)
}
ob_dict = {
obs_id: ObservationBlockContainer(
obs_id=np.uint64(obs_id),
sb_id=np.uint64(obs_id), # see comment above
producer_id="simulation",
state=ObservationBlockState.COMPLETED_SUCCEDED,
subarray_pointing_lat=alt * u.rad,
subarray_pointing_lon=az * u.rad,
subarray_pointing_frame=CoordinateFrameType.ALTAZ,
actual_start_time=Time(self.file_.header["time"], format="unix"),
scheduled_start_time=Time(self.file_.header["time"], format="unix"),
)
}
return sb_dict, ob_dict
@staticmethod
def _fill_simulated_event_information(array_event):
mc_event = array_event["mc_event"]
mc_shower = array_event["mc_shower"]
if mc_shower is None:
return
alt = _clip_altitude_if_close(mc_shower["altitude"])
return SimulatedShowerContainer(
energy=u.Quantity(mc_shower["energy"], u.TeV),
alt=Angle(alt, u.rad),
az=Angle(mc_shower["azimuth"], u.rad),
core_x=u.Quantity(mc_event["xcore"], u.m),
core_y=u.Quantity(mc_event["ycore"], u.m),
h_first_int=u.Quantity(mc_shower["h_first_int"], u.m),
x_max=u.Quantity(mc_shower["xmax"], X_MAX_UNIT),
shower_primary_id=mc_shower["primary_id"],
)