"""module containing optimization related functions and classes"""
import operator
from abc import abstractmethod
from collections.abc import Sequence
import astropy.units as u
import numpy as np
from astropy.io import fits
from astropy.table import QTable, Table
from pyirf.cut_optimization import optimize_gh_cut
from pyirf.cuts import calculate_percentile_cut, evaluate_binned_cut
from ..core import Component, QualityQuery
from ..core.traits import AstroQuantity, Float, Integer, Path
from .binning import DefaultRecoEnergyBins, ResultValidRange
from .preprocessing import EventQualityQuery
__all__ = [
"CutOptimizerBase",
"GhPercentileCutCalculator",
"OptimizationResult",
"PercentileCuts",
"PointSourceSensitivityOptimizer",
"ThetaPercentileCutCalculator",
]
[docs]
class OptimizationResult:
"""Result of an optimization of G/H and theta cuts or only G/H cuts."""
def __init__(
self,
valid_energy_min: u.Quantity,
valid_energy_max: u.Quantity,
valid_offset_min: u.Quantity,
valid_offset_max: u.Quantity,
gh_cuts: QTable,
clf_prefix: str,
spatial_selection_table: QTable | None = None,
quality_query: QualityQuery | Sequence | None = None,
) -> None:
if quality_query:
if isinstance(quality_query, QualityQuery):
if len(quality_query.quality_criteria) == 0:
quality_query.quality_criteria = [
(" ", " ")
] # Ensures table serialises properly
self.quality_query = quality_query
elif isinstance(quality_query, list):
self.quality_query = QualityQuery(quality_criteria=quality_query)
else:
self.quality_query = QualityQuery(quality_criteria=list(quality_query))
else:
self.quality_query = QualityQuery(quality_criteria=[(" ", " ")])
self.valid_energy = ResultValidRange(min=valid_energy_min, max=valid_energy_max)
self.valid_offset = ResultValidRange(min=valid_offset_min, max=valid_offset_max)
self.gh_cuts = gh_cuts
self.clf_prefix = clf_prefix
self.spatial_selection_table = spatial_selection_table
def __repr__(self):
if self.spatial_selection_table is not None:
return (
f"<OptimizationResult with {len(self.gh_cuts)} G/H bins "
f"and {len(self.spatial_selection_table)} theta bins valid "
f"between {self.valid_offset.min} to {self.valid_offset.max} "
f"and {self.valid_energy.min} to {self.valid_energy.max} "
f"with {len(self.quality_query.quality_criteria)} quality criteria>"
)
else:
return (
f"<OptimizationResult with {len(self.gh_cuts)} G/H bins valid "
f"between {self.valid_offset.min} to {self.valid_offset.max} "
f"and {self.valid_energy.min} to {self.valid_energy.max} "
f"with {len(self.quality_query.quality_criteria)} quality criteria>"
)
[docs]
def write(self, output_name: Path | str, overwrite: bool = False) -> None:
"""Write an ``OptimizationResult`` to a file in FITS format."""
cut_expr_tab = Table(
rows=self.quality_query.quality_criteria,
names=["name", "cut_expr"],
dtype=[np.str_, np.str_],
)
cut_expr_tab.meta["EXTNAME"] = "QUALITY_CUTS_EXPR"
self.gh_cuts.meta["EXTNAME"] = "GH_CUTS"
self.gh_cuts.meta["CLFNAME"] = self.clf_prefix
energy_lim_tab = QTable(
rows=[[self.valid_energy.min, self.valid_energy.max]],
names=["energy_min", "energy_max"],
)
energy_lim_tab.meta["EXTNAME"] = "VALID_ENERGY"
offset_lim_tab = QTable(
rows=[[self.valid_offset.min, self.valid_offset.max]],
names=["offset_min", "offset_max"],
)
offset_lim_tab.meta["EXTNAME"] = "VALID_OFFSET"
results = [cut_expr_tab, self.gh_cuts, energy_lim_tab, offset_lim_tab]
if self.spatial_selection_table is not None:
self.spatial_selection_table.meta["EXTNAME"] = "RAD_MAX"
results.append(self.spatial_selection_table)
# Overwrite if needed and allowed
results[0].write(output_name, format="fits", overwrite=overwrite)
for table in results[1:]:
table.write(output_name, format="fits", append=True)
[docs]
@classmethod
def read(cls, file_name):
"""Read an ``OptimizationResult`` from a file in FITS format."""
with fits.open(file_name) as hdul:
cut_expr_tab = Table.read(hdul[1])
cut_expr_lst = [(name, expr) for name, expr in cut_expr_tab.iterrows()]
if (" ", " ") in cut_expr_lst:
cut_expr_lst.remove((" ", " "))
quality_query = QualityQuery(quality_criteria=cut_expr_lst)
gh_cuts = QTable.read(hdul[2])
valid_energy = QTable.read(hdul[3])
valid_offset = QTable.read(hdul[4])
spatial_selection_table = QTable.read(hdul[5]) if len(hdul) > 5 else None
return cls(
quality_query=quality_query,
valid_energy_min=valid_energy["energy_min"],
valid_energy_max=valid_energy["energy_max"],
valid_offset_min=valid_offset["offset_min"],
valid_offset_max=valid_offset["offset_max"],
gh_cuts=gh_cuts,
clf_prefix=gh_cuts.meta["CLFNAME"],
spatial_selection_table=spatial_selection_table,
)
[docs]
class CutOptimizerBase(DefaultRecoEnergyBins):
"""Base class for cut optimization algorithms."""
needs_background = False
def __init__(self, config=None, parent=None, **kwargs):
super().__init__(config=config, parent=parent, **kwargs)
def _check_events(self, events: dict[str, QTable]):
if "signal" not in events.keys():
raise ValueError(
"Calculating G/H and/or spatial selection cuts requires 'signal' "
f"events, but none are given. Provided events: {events.keys()}"
)
if self.needs_background and "background" not in events.keys():
raise ValueError(
"Optimizing G/H cuts for maximum point-source sensitivity "
"requires 'background' events, but none were given. "
f"Provided events: {events.keys()}"
)
[docs]
@abstractmethod
def __call__(
self,
events: dict[str, QTable],
quality_query: EventQualityQuery,
clf_prefix: str,
) -> OptimizationResult:
"""
Optimize G/H (and optionally spatial selection) cuts
and fill them in an ``OptimizationResult``.
Parameters
----------
events: dict[str, astropy.table.QTable]
Dictionary containing tables of events used for calculating cuts.
This has to include "signal" events and can include "background" events.
quality_query: ctapipe.irf.EventPreprocessor
``ctapipe.core.QualityQuery`` subclass containing preselection
criteria for events.
clf_prefix: str
Prefix of the output from the G/H classifier for which the
cut will be optimized.
"""
[docs]
class GhPercentileCutCalculator(Component):
"""Computes a percentile cut on gammaness."""
min_counts = Integer(
default_value=10,
help="Minimum number of events in a bin to attempt to find a cut value",
).tag(config=True)
smoothing = Float(
default_value=None,
allow_none=True,
help="When given, the width (in units of bins) of gaussian smoothing applied",
).tag(config=True)
target_percentile = Integer(
default_value=68,
help="Percent of events in each energy bin to keep after the G/H cut",
).tag(config=True)
[docs]
def __call__(self, gammaness, reco_energy, reco_energy_bins):
if self.smoothing and self.smoothing < 0:
self.smoothing = None
return calculate_percentile_cut(
gammaness,
reco_energy,
reco_energy_bins,
smoothing=self.smoothing,
percentile=100 - self.target_percentile,
fill_value=gammaness.max(),
min_events=self.min_counts,
)
[docs]
class ThetaPercentileCutCalculator(Component):
"""Computes a percentile cut on theta."""
theta_min_angle = AstroQuantity(
default_value=u.Quantity(-1, u.deg),
physical_type=u.physical.angle,
help="Smallest angular cut value allowed (-1 means no cut)",
).tag(config=True)
theta_max_angle = AstroQuantity(
default_value=u.Quantity(0.32, u.deg),
physical_type=u.physical.angle,
help="Largest angular cut value allowed",
).tag(config=True)
min_counts = Integer(
default_value=10,
help="Minimum number of events in a bin to attempt to find a cut value",
).tag(config=True)
theta_fill_value = AstroQuantity(
default_value=u.Quantity(0.32, u.deg),
physical_type=u.physical.angle,
help="Angular cut value used for bins with too few events",
).tag(config=True)
smoothing = Float(
default_value=None,
allow_none=True,
help="When given, the width (in units of bins) of gaussian smoothing applied",
).tag(config=True)
target_percentile = Integer(
default_value=68,
help="Percent of events in each energy bin to keep after the theta cut",
).tag(config=True)
[docs]
def __call__(self, theta, reco_energy, reco_energy_bins):
if self.theta_min_angle < 0 * u.deg:
theta_min_angle = None
else:
theta_min_angle = self.theta_min_angle
if self.theta_max_angle < 0 * u.deg:
theta_max_angle = None
else:
theta_max_angle = self.theta_max_angle
if self.smoothing and self.smoothing < 0:
self.smoothing = None
return calculate_percentile_cut(
theta,
reco_energy,
reco_energy_bins,
min_value=theta_min_angle,
max_value=theta_max_angle,
smoothing=self.smoothing,
percentile=self.target_percentile,
fill_value=self.theta_fill_value,
min_events=self.min_counts,
)
[docs]
class PercentileCuts(CutOptimizerBase):
"""
Calculates G/H separation cut based on the percentile of signal events
to keep in each bin.
Optionally also calculates a percentile cut on theta based on the signal
events surviving this G/H cut.
"""
classes = [GhPercentileCutCalculator, ThetaPercentileCutCalculator]
def __init__(self, config=None, parent=None, **kwargs):
super().__init__(config=config, parent=parent, **kwargs)
self.gh_cut_calculator = GhPercentileCutCalculator(parent=self)
self.theta_cut_calculator = ThetaPercentileCutCalculator(parent=self)
[docs]
def __call__(
self,
events: dict[str, QTable],
quality_query: EventQualityQuery,
clf_prefix: str,
) -> OptimizationResult:
self._check_events(events)
gh_cuts = self.gh_cut_calculator(
events["signal"]["gh_score"],
events["signal"]["reco_energy"],
self.reco_energy_bins,
)
gh_mask = evaluate_binned_cut(
events["signal"]["gh_score"],
events["signal"]["reco_energy"],
gh_cuts,
op=operator.ge,
)
spatial_selection_table = self.theta_cut_calculator(
events["signal"]["theta"][gh_mask],
events["signal"]["reco_energy"][gh_mask],
self.reco_energy_bins,
)
result = OptimizationResult(
quality_query=quality_query,
gh_cuts=gh_cuts,
clf_prefix=clf_prefix,
valid_energy_min=self.reco_energy_min,
valid_energy_max=self.reco_energy_max,
# A single set of cuts is calculated for the whole fov atm
valid_offset_min=0 * u.deg,
valid_offset_max=np.inf * u.deg,
spatial_selection_table=spatial_selection_table,
)
return result
[docs]
class PointSourceSensitivityOptimizer(CutOptimizerBase):
"""
Optimizes a G/H cut for maximum point source sensitivity and
calculates a percentile cut on theta.
"""
needs_background = True
classes = [ThetaPercentileCutCalculator]
initial_gh_cut_efficency = Float(
default_value=0.4, help="Start value of gamma efficiency before optimization"
).tag(config=True)
max_gh_cut_efficiency = Float(
default_value=0.8, help="Maximum gamma efficiency requested"
).tag(config=True)
gh_cut_efficiency_step = Float(
default_value=0.1,
help="Stepsize used for scanning after optimal gammaness cut",
).tag(config=True)
alpha = Float(
default_value=0.2,
help="Size ratio of on region / off region.",
).tag(config=True)
min_background_fov_offset = AstroQuantity(
help=(
"Minimum distance from the fov center for background events "
"to be taken into account"
),
default_value=u.Quantity(0, u.deg),
physical_type=u.physical.angle,
).tag(config=True)
max_background_fov_offset = AstroQuantity(
help=(
"Maximum distance from the fov center for background events "
"to be taken into account"
),
default_value=u.Quantity(5, u.deg),
physical_type=u.physical.angle,
).tag(config=True)
def __init__(self, config=None, parent=None, **kwargs):
super().__init__(config=config, parent=parent, **kwargs)
self.theta_cut_calculator = ThetaPercentileCutCalculator(parent=self)
[docs]
def __call__(
self,
events: dict[str, QTable],
quality_query: EventQualityQuery,
clf_prefix: str,
) -> OptimizationResult:
self._check_events(events)
initial_gh_cuts = calculate_percentile_cut(
events["signal"]["gh_score"],
events["signal"]["reco_energy"],
bins=self.reco_energy_bins,
fill_value=0.0,
percentile=100 * (1 - self.initial_gh_cut_efficency),
min_events=10,
smoothing=1,
)
initial_gh_mask = evaluate_binned_cut(
events["signal"]["gh_score"],
events["signal"]["reco_energy"],
initial_gh_cuts,
op=operator.gt,
)
spatial_selection_table = self.theta_cut_calculator(
events["signal"]["theta"][initial_gh_mask],
events["signal"]["reco_energy"][initial_gh_mask],
self.reco_energy_bins,
)
self.log.info("Optimizing G/H separation cut for best sensitivity")
gh_cut_efficiencies = np.arange(
self.gh_cut_efficiency_step,
self.max_gh_cut_efficiency + self.gh_cut_efficiency_step / 2,
self.gh_cut_efficiency_step,
)
opt_sensitivity, gh_cuts = optimize_gh_cut(
events["signal"],
events["background"],
reco_energy_bins=self.reco_energy_bins,
gh_cut_efficiencies=gh_cut_efficiencies,
op=operator.ge,
theta_cuts=spatial_selection_table,
alpha=self.alpha,
fov_offset_max=self.max_background_fov_offset,
fov_offset_min=self.min_background_fov_offset,
)
valid_energy = self._get_valid_energy_range(opt_sensitivity)
# Re-calculate theta cut with optimized g/h cut
gh_mask = evaluate_binned_cut(
events["signal"]["gh_score"],
events["signal"]["reco_energy"],
gh_cuts,
operator.ge,
)
events["signal"]["selected_gh"] = gh_mask
spatial_selection_table_opt = self.theta_cut_calculator(
events["signal"][gh_mask]["theta"],
events["signal"][gh_mask]["reco_energy"],
self.reco_energy_bins,
)
result = OptimizationResult(
quality_query=quality_query,
gh_cuts=gh_cuts,
clf_prefix=clf_prefix,
valid_energy_min=valid_energy[0],
valid_energy_max=valid_energy[1],
# A single set of cuts is calculated for the whole fov atm
valid_offset_min=self.min_background_fov_offset,
valid_offset_max=self.max_background_fov_offset,
spatial_selection_table=spatial_selection_table_opt,
)
return result
def _get_valid_energy_range(self, opt_sensitivity):
keep_mask = np.isfinite(opt_sensitivity["significance"])
count = np.arange(start=0, stop=len(keep_mask), step=1)
if all(np.diff(count[keep_mask]) == 1):
return [
opt_sensitivity["reco_energy_low"][keep_mask][0],
opt_sensitivity["reco_energy_high"][keep_mask][-1],
]
else:
raise ValueError("Optimal significance curve has internal NaN bins")