"""
Definition of the `CameraCalibrator` class, providing all steps needed to apply
calibration and image extraction, as well as supporting algorithms.
"""
import warnings
import astropy.units as u
import numpy as np
from numba import float32, float64, guvectorize, int64
from ctapipe.containers import DL1CameraContainer
from ctapipe.core import TelescopeComponent
from ctapipe.core.traits import (
BoolTelescopeParameter,
ComponentName,
TelescopeParameter,
)
from ctapipe.image.extractor import ImageExtractor
from ctapipe.image.invalid_pixels import InvalidPixelHandler
from ctapipe.image.reducer import DataVolumeReducer
__all__ = ["CameraCalibrator"]
def _get_invalid_pixels(n_pixels, pixel_status, selected_gain_channel):
broken_pixels = np.zeros(n_pixels, dtype=bool)
index = np.arange(n_pixels)
masks = (
pixel_status.hardware_failing_pixels,
pixel_status.pedestal_failing_pixels,
pixel_status.flatfield_failing_pixels,
)
for mask in masks:
if mask is not None:
broken_pixels |= mask[selected_gain_channel, index]
return broken_pixels
[docs]class CameraCalibrator(TelescopeComponent):
"""
Calibrator to handle the full camera calibration chain, in order to fill
the DL1 data level in the event container.
Attributes
----------
data_volume_reducer_type: str
The name of the DataVolumeReducer subclass to be used
for data volume reduction
image_extractor_type: str
The name of the ImageExtractor subclass to be used for image extraction
"""
data_volume_reducer_type = ComponentName(
DataVolumeReducer, default_value="NullDataVolumeReducer"
).tag(config=True)
image_extractor_type = TelescopeParameter(
trait=ComponentName(ImageExtractor, default_value="NeighborPeakWindowSum"),
default_value="NeighborPeakWindowSum",
help="Name of the ImageExtractor subclass to be used.",
).tag(config=True)
invalid_pixel_handler_type = ComponentName(
InvalidPixelHandler,
default_value="NeighborAverage",
help="Name of the InvalidPixelHandler to use",
allow_none=True,
).tag(config=True)
apply_waveform_time_shift = BoolTelescopeParameter(
default_value=False,
help=(
"Apply waveform time shift corrections."
" The minimal integer shift to synchronize waveforms is applied"
" before peak extraction if this option is True"
),
).tag(config=True)
apply_peak_time_shift = BoolTelescopeParameter(
default_value=True,
help=(
"Apply peak time shift corrections."
" Apply the remaining absolute and fractional time shift corrections"
" to the peak time after pulse extraction."
" If `apply_waveform_time_shift` is False, this will apply the full time shift"
),
).tag(config=True)
def __init__(
self,
subarray,
config=None,
parent=None,
image_extractor=None,
data_volume_reducer=None,
**kwargs,
):
"""
Parameters
----------
subarray: ctapipe.instrument.SubarrayDescription
Description of the subarray. Provides information about the
camera which are useful in calibration. Also required for
configuring the TelescopeParameter traitlets.
config: traitlets.loader.Config
Configuration specified by config file or cmdline arguments.
Used to set traitlet values.
This is mutually exclusive with passing a ``parent``.
parent: ctapipe.core.Component or ctapipe.core.Tool
Parent of this component in the configuration hierarchy,
this is mutually exclusive with passing ``config``
data_volume_reducer: ctapipe.image.reducer.DataVolumeReducer
The DataVolumeReducer to use.
This is used to override the options from the config system
and to enable passing a preconfigured reducer.
image_extractor: ctapipe.image.extractor.ImageExtractor
The ImageExtractor to use. If None, the default via the
configuration system will be constructed.
"""
super().__init__(subarray=subarray, config=config, parent=parent, **kwargs)
self.subarray = subarray
self._r1_empty_warn = False
self._dl0_empty_warn = False
self.image_extractors = {}
if image_extractor is None:
for (_, _, name) in self.image_extractor_type:
self.image_extractors[name] = ImageExtractor.from_name(
name, subarray=self.subarray, parent=self
)
else:
name = image_extractor.__class__.__name__
self.image_extractor_type = [("type", "*", name)]
self.image_extractors[name] = image_extractor
if data_volume_reducer is None:
self.data_volume_reducer = DataVolumeReducer.from_name(
self.data_volume_reducer_type, subarray=self.subarray, parent=self
)
else:
self.data_volume_reducer = data_volume_reducer
self.invalid_pixel_handler = None
if self.invalid_pixel_handler_type is not None:
self.invalid_pixel_handler = InvalidPixelHandler.from_name(
self.invalid_pixel_handler_type,
subarray=self.subarray,
parent=self,
)
def _check_r1_empty(self, waveforms):
if waveforms is None:
if not self._r1_empty_warn:
warnings.warn(
"Encountered an event with no R1 data. "
"DL0 is unchanged in this circumstance."
)
self._r1_empty_warn = True
return True
else:
return False
def _check_dl0_empty(self, waveforms):
if waveforms is None:
if not self._dl0_empty_warn:
warnings.warn(
"Encountered an event with no DL0 data. "
"DL1 is unchanged in this circumstance."
)
self._dl0_empty_warn = True
return True
else:
return False
def _calibrate_dl0(self, event, tel_id):
waveforms = event.r1.tel[tel_id].waveform
selected_gain_channel = event.r1.tel[tel_id].selected_gain_channel
if self._check_r1_empty(waveforms):
return
reduced_waveforms_mask = self.data_volume_reducer(
waveforms, tel_id=tel_id, selected_gain_channel=selected_gain_channel
)
waveforms_copy = waveforms.copy()
waveforms_copy[~reduced_waveforms_mask] = 0
event.dl0.tel[tel_id].waveform = waveforms_copy
event.dl0.tel[tel_id].selected_gain_channel = selected_gain_channel
def _calibrate_dl1(self, event, tel_id):
waveforms = event.dl0.tel[tel_id].waveform
if self._check_dl0_empty(waveforms):
return
n_pixels, n_samples = waveforms.shape
selected_gain_channel = event.dl0.tel[tel_id].selected_gain_channel
broken_pixels = _get_invalid_pixels(
n_pixels,
event.mon.tel[tel_id].pixel_status,
selected_gain_channel,
)
dl1_calib = event.calibration.tel[tel_id].dl1
time_shift = event.calibration.tel[tel_id].dl1.time_shift
readout = self.subarray.tel[tel_id].camera.readout
# subtract any remaining pedestal before extraction
if dl1_calib.pedestal_offset is not None:
# this copies intentionally, we don't want to modify the dl0 data
# waveforms have shape (n_pixel, n_samples), pedestals (n_pixels, )
waveforms = waveforms - dl1_calib.pedestal_offset[:, np.newaxis]
if n_samples == 1:
# To handle ASTRI and dst
# TODO: Improved handling of ASTRI and dst
# - dst with custom EventSource?
# - Read into dl1 container directly?
# - Don't do anything if dl1 container already filled
# - Update on SST review decision
dl1 = DL1CameraContainer(
image=waveforms[..., 0].astype(np.float32),
peak_time=np.zeros(n_pixels, dtype=np.float32),
is_valid=True,
)
else:
# shift waveforms if time_shift calibration is available
if time_shift is not None:
if self.apply_waveform_time_shift.tel[tel_id]:
sampling_rate = readout.sampling_rate.to_value(u.GHz)
time_shift_samples = time_shift * sampling_rate
waveforms, remaining_shift = shift_waveforms(
waveforms, time_shift_samples
)
remaining_shift /= sampling_rate
else:
remaining_shift = time_shift
extractor = self.image_extractors[self.image_extractor_type.tel[tel_id]]
dl1 = extractor(
waveforms,
tel_id=tel_id,
selected_gain_channel=selected_gain_channel,
broken_pixels=broken_pixels,
)
# correct non-integer remainder of the shift if given
if self.apply_peak_time_shift.tel[tel_id] and time_shift is not None:
dl1.peak_time -= remaining_shift
# Calibrate extracted charge
dl1.image *= dl1_calib.relative_factor / dl1_calib.absolute_factor
# handle invalid pixels
if self.invalid_pixel_handler is not None:
dl1.image, dl1.peak_time = self.invalid_pixel_handler(
tel_id,
dl1.image,
dl1.peak_time,
broken_pixels,
)
# store the results in the event structure
event.dl1.tel[tel_id] = dl1
[docs] def __call__(self, event):
"""
Perform the full camera calibration from R1 to DL1. Any calibration
relating to data levels before the data level the file is read into
will be skipped.
Parameters
----------
event : container
A `~ctapipe.containers.ArrayEventContainer` event container
"""
# TODO: How to handle different calibrations depending on tel_id?
tel = event.r1.tel or event.dl0.tel or event.dl1.tel
for tel_id in tel.keys():
self._calibrate_dl0(event, tel_id)
self._calibrate_dl1(event, tel_id)
def shift_waveforms(waveforms, time_shift_samples):
"""
Shift the waveforms by the mean integer shift to mediate
time differences between pixels.
The remaining shift (mean + fractional part) is returned
to be applied later to the extracted peak time.
Parameters
----------
waveforms: ndarray of shape (n_pixels, n_samples)
The waveforms to shift
time_shift_samples: ndarray of shape (n_pixels, )
The shift to apply in units of samples.
Waveforms are shifted to the left by the smallest integer
that minimizes inter-pixel differences.
Returns
-------
shifted_waveforms: ndarray of shape (n_pixels, n_samples)
The shifted waveforms
remaining_shift: ndarray of shape (n_pixels, )
The remaining shift after applying the integer shift to the waveforms.
"""
mean_shift = time_shift_samples.mean()
integer_shift = np.round(time_shift_samples - mean_shift).astype("int16")
remaining_shift = time_shift_samples - integer_shift
shifted_waveforms = _shift_waveforms_by_integer(waveforms, integer_shift)
return shifted_waveforms, remaining_shift
@guvectorize(
[(float64[:], int64, float64[:]), (float32[:], int64, float32[:])],
"(s),()->(s)",
nopython=True,
cache=True,
)
def _shift_waveforms_by_integer(waveforms, integer_shift, shifted_waveforms):
n_samples = waveforms.size
for new_sample_idx in range(n_samples):
# repeat first value if out ouf bounds to the left
# repeat last value if out ouf bounds to the right
sample_idx = min(max(new_sample_idx + integer_shift, 0), n_samples - 1)
shifted_waveforms[new_sample_idx] = waveforms[sample_idx]