"""
Algorithms to compute aggregated time-series statistics and histograms from columns of an astropy table.
These classes take as input an events table containing any event-wise quantities
(e.g., images, scalars, vectors), divide it into time chunks, which may optionally
overlap, and compute various aggregated statistics and histograms for each chunk.
The statistics include the mean, median, and standard deviation. The result
is a monitoring table with columns describing the start and stop time of the chunk and the
aggregated statistic values or histograms.
The aggregation is always performed along axis=0 (the event dimension), making
these classes suitable for any N-dimensional event-wise data.
"""
__all__ = [
"BaseChunking",
"SizeChunking",
"TimeChunking",
"BaseAggregator",
"HistogramAggregator",
"StatisticsAggregator",
"PlainAggregator",
"SigmaClippingAggregator",
]
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from collections.abc import Generator
import astropy.units as u
import hist
import numpy as np
from astropy.stats import sigma_clip
from astropy.table import Row, Table
from hist import Hist
from traitlets import TraitError
from ..containers import ChunkHistogramContainer, ChunkStatisticsContainer
from ..core import Component
from ..core.traits import (
AstroQuantity,
Bool,
ComponentName,
Dict,
Enum,
Int,
List,
Unicode,
)
[docs]
class BaseChunking(Component, metaclass=ABCMeta):
"""
Abstract base class for chunking strategies.
Chunking components divide tables into overlapping or non-overlapping chunks
for processing by aggregators.
"""
allow_undersized_tables = Bool(
default_value=False,
help=(
"If True, allow processing tables smaller than chunk size/duration by yielding "
"the entire table as a single chunk. If False, raise an error."
),
).tag(config=True)
last_chunk_policy = Enum(
values=["overlap", "truncate", "skip"],
default_value="overlap",
help=(
"Policy for handling the last chunk when data doesn't divide evenly. "
"'overlap': Create overlapping chunk with full size (default behavior). "
"'truncate': Yield remaining data as smaller chunk. "
"'skip': Skip the last partial chunk."
),
).tag(config=True)
[docs]
def __call__(self, table) -> Generator[Table, None, None]:
"""
Generate chunks from the input table.
Parameters
----------
table : astropy.table.Table
Input table with 'time' column.
Yields
------
astropy.table.Table
Chunks of the input table. Each chunk is a view/reference to the
original table data, meaning modifications to chunk data will affect
the original table.
"""
# Basic validation that all chunking strategies need
if "time" not in table.colnames and (
"time_start" not in table.colnames or "time_end" not in table.colnames
):
raise ValueError(
"Table must have a 'time' column or both 'time_start' and 'time_end' columns for chunking."
)
yield from self._generate_chunks(table)
@abstractmethod
def _generate_chunks(self, table) -> Generator[Table, None, None]:
"""Generate chunks from table. Implemented by subclasses."""
pass
[docs]
class SizeChunking(BaseChunking):
"""Divides tables into chunks based on number of rows."""
chunk_size = Int(
default_value=None,
allow_none=True,
help="Number of rows per chunk. If None, use entire table as one chunk.",
).tag(config=True)
chunk_shift = Int(
default_value=None,
allow_none=True,
help=(
"Number of rows to shift between consecutive chunks. "
"If None, chunks do not overlap."
),
).tag(config=True)
def _generate_final_chunk(self, table, last_chunk_start):
"""Generate the final chunk according to last_chunk_policy."""
remaining_rows = len(table) - last_chunk_start
# Only add final chunk if there are remaining rows
if remaining_rows == 0:
return None
if self.last_chunk_policy == "overlap":
# Ensure last chunk has full size by potentially overlapping
return table[-self.chunk_size :]
elif self.last_chunk_policy == "truncate":
# Yield remaining rows as smaller chunk
return table[last_chunk_start:]
elif self.last_chunk_policy == "skip":
# Skip the last partial chunk
return None
return None
def _generate_chunks(self, table) -> Generator[Table, None, None]:
"""Generate row-count based chunks."""
# Handle case where chunk_size is None (entire table)
if self.chunk_size is None:
yield table
return
# Check table size vs chunk_size
if len(table) < self.chunk_size:
if self.allow_undersized_tables:
yield table # Yield entire table as single chunk
return
else:
raise ValueError(
f"Table length ({len(table)}) is less than chunk_size ({self.chunk_size}). "
f"Set allow_undersized_tables=True to process as single chunk."
)
# Calculate step size
step = self.chunk_shift if self.chunk_shift is not None else self.chunk_size
# Calculate all main chunk start indices (not extending beyond table)
n_chunks = (len(table) - self.chunk_size) // step + 1
main_chunk_indices = np.arange(n_chunks) * step
# Filter indices that would create valid chunks
main_chunk_indices = main_chunk_indices[
main_chunk_indices + self.chunk_size <= len(table)
]
last_chunk_start = main_chunk_indices[-1] + step
# Generate chunks for each main chunk start index
for start_idx in main_chunk_indices:
end_idx = start_idx + self.chunk_size
yield table[start_idx:end_idx]
# Handle final chunk according to policy
final_chunk = self._generate_final_chunk(table, last_chunk_start)
if final_chunk is not None:
yield final_chunk
[docs]
class TimeChunking(BaseChunking):
"""Divides tables into chunks based on time intervals."""
chunk_duration = AstroQuantity(
physical_type=u.s,
default_value=0 * u.s,
help="Duration of each time chunk.",
).tag(config=True)
chunk_shift = AstroQuantity(
physical_type=u.s,
default_value=0 * u.s,
allow_none=True,
help=("Time shift between consecutive chunks. If 0, chunks do not overlap."),
).tag(config=True)
def _validate_inputs(self, total_duration):
"""Validate chunk_duration and table duration."""
chunk_duration = self.chunk_duration.to_value(u.s)
# Check if chunk_duration is properly set
if chunk_duration <= 0:
raise ValueError("chunk_duration must be greater than zero.")
# Check if total duration is sufficient for chunking
if total_duration < chunk_duration:
if self.allow_undersized_tables:
return True # Signal to yield entire table as single chunk
raise ValueError(
f"Total duration ({total_duration * u.s}) is less than chunk_duration ({self.chunk_duration}). "
f"Set allow_undersized_tables=True to process as single chunk."
)
return False # Normal processing
def _generate_final_chunk(self, table, last_chunk_idx):
"""Generate the final chunk according to last_chunk_policy."""
if last_chunk_idx == len(table):
return None
if self.last_chunk_policy == "overlap":
# Ensure last chunk has full duration by potentially overlapping
last_chunk_idx = np.searchsorted(
table["time"], table["time"][-1] - self.chunk_duration, side="left"
)
return table[last_chunk_idx:]
elif self.last_chunk_policy == "truncate":
# Yield remaining time as smaller chunk
return table[last_chunk_idx:]
elif self.last_chunk_policy == "skip":
# Skip the last partial chunk
return None
def _generate_chunks(self, table) -> Generator[Table, None, None]:
"""Generate time-based chunks."""
# Handle case where chunk_duration is None (entire table)
if self.chunk_duration is None:
yield table
return
times = table["time"]
relative_times = (times - times[0]).to_value(u.s)
# Validate inputs and handle undersized tables
use_entire_table = self._validate_inputs(relative_times[-1])
if use_entire_table:
yield table # Yield entire table as single chunk
return
# Calculate time step
time_step = (
self.chunk_shift if self.chunk_shift > 0 * u.s else self.chunk_duration
).to_value(u.s)
chunk_start_time = np.arange(
0,
relative_times[-1] - self.chunk_duration.to_value(u.s),
time_step,
)
chunk_end_time = chunk_start_time + self.chunk_duration.to_value(u.s)
chunk_start_idx = np.searchsorted(relative_times, chunk_start_time)
chunk_end_idx = np.searchsorted(relative_times, chunk_end_time, side="right")
# Generate chunks for each main chunk start time
for i, j in zip(chunk_start_idx, chunk_end_idx):
chunk = table[i:j]
if chunk is not None:
yield chunk
# Handle final chunk according to policy
last_chunk_idx = np.searchsorted(
relative_times, chunk_start_time[-1] + time_step
)
final_chunk = self._generate_final_chunk(table, last_chunk_idx)
if final_chunk is not None:
yield final_chunk
[docs]
class BaseAggregator(Component, metaclass=ABCMeta):
"""
Base class for aggregators that compute statistics and histograms over chunks of data.
Aggregators use a chunking strategy to divide input tables and compute
aggregated statistics and histograms for each chunk.
"""
chunking_type = ComponentName(
BaseChunking,
default_value="SizeChunking",
help="The chunking strategy to use for dividing data into chunks.",
).tag(config=True)
def __init__(self, config=None, parent=None, **kwargs):
"""
Parameters
----------
config : traitlets.loader.Config
Configuration specified by config file or cmdline arguments
parent : ctapipe.core.Component or ctapipe.core.Tool
Parent of this component in the configuration hierarchy
"""
super().__init__(config=config, parent=parent, **kwargs)
# Create the chunking component using ComponentName
self.chunking = BaseChunking.from_name(self.chunking_type, parent=self)
[docs]
def __call__(
self,
table,
masked_elements_of_sample=None,
col_name="image",
) -> Table:
r"""
Divide table into chunks and compute aggregated statistic values or histograms.
Parameters
----------
table : astropy.table.Table
table with event-wise data of shape (n_events, \*data_dimensions),
event IDs and timestamps of shape (n_events, )
masked_elements_of_sample : ndarray, optional
boolean array of masked elements of shape (\*data_dimensions)
that are not available for processing
col_name : string
column name in the table containing the event-wise data to aggregate
Returns
-------
astropy.table.Table
table containing the start and end values as timestamps and event IDs
as well as the aggregated statistic values or histograms for each chunk
"""
# Get chunks using the chunking strategy
chunks = self.chunking(table)
# Initialize result storage
results = defaultdict(list)
# Process each chunk
for chunk in chunks:
# Add time metadata
if "time_start" in chunk.colnames and "time_end" in chunk.colnames:
results["time_start"].append(chunk["time_start"][0])
results["time_end"].append(chunk["time_end"][-1])
else:
results["time_start"].append(chunk["time"][0])
results["time_end"].append(chunk["time"][-1])
# Add event_id metadata if column exists
if "event_id" in chunk.colnames:
results["event_id_start"].append(chunk["event_id"][0])
results["event_id_end"].append(chunk["event_id"][-1])
# Compute aggregator-specific statistics
self._add_result_columns(
chunk[col_name].data,
masked_elements_of_sample,
results,
)
# Deal with metadata if present in results
metadata = {}
if "meta" in results:
metadata["meta"] = results.pop("meta")
# Create and return table
result_table = Table(results)
if "meta" in metadata:
result_table.meta = metadata["meta"]
# Preserve units if present
if hasattr(table[col_name], "unit") and table[col_name].unit is not None:
self._set_result_units(result_table, table[col_name].unit)
return result_table
@abstractmethod
def _add_result_columns(
self,
data,
masked_elements_of_sample,
results_dict,
):
r"""
Compute statistics and histograms. Add columns to results dictionary.
Parameters
----------
data : array-like
Data for this chunk (already extracted from table)
masked_elements_of_sample : ndarray, optional
Boolean mask of shape (\*data_dimensions) for elements to exclude
results_dict : dict
Dictionary to which statistic or histogram columns should be added.
"""
pass
@abstractmethod
def _set_result_units(self, table, unit):
"""Set units for result columns that should inherit from input data."""
pass
[docs]
class HistogramAggregator(BaseAggregator):
"""
Compute aggregated histograms from a chunk of event-wise data using Hist.
Works with any N-dimensional event-wise data by aggregating along axis=0 (event dimension).
"""
axis_definition = Dict(
allow_none=False,
help=(
"Dictionary that contains ``class_name`` and the corresponding kwargs "
"to construct a ``hist.axis.<class_name>(**kwargs)`` instance. "
"E.g. ``{'class_name': 'Regular', 'bins': 40, 'start': 20.0, 'stop': 80.0, 'name': 'value'}``."
),
).tag(config=True)
axis_names = List(
default_value=["channel", "pixel"],
trait=Unicode(),
allow_none=True,
help="List of axis names for the histogram. E.g. ['channel', 'pixel']. If None, default names will be used.",
).tag(config=True)
def __init__(self, config=None, parent=None, **kwargs):
"""
Parameters
----------
config : traitlets.loader.Config
Configuration specified by config file or cmdline arguments
parent : ctapipe.core.Component or ctapipe.core.Tool
Parent of this component in the configuration hierarchy
"""
super().__init__(config=config, parent=parent, **kwargs)
self.axis_kwargs = self.axis_definition.copy()
if "class_name" not in self.axis_kwargs.keys():
raise TraitError(
"The ``axis_definition`` trait is missing required key 'class_name'."
)
self.axis_class_name = self.axis_kwargs.pop("class_name")
self.hist_axis = getattr(hist.axis, self.axis_class_name)(**self.axis_kwargs)
def _add_result_columns(
self,
data,
masked_elements_of_sample,
results_dict,
):
histograms = self._compute_histograms(data, masked_elements_of_sample)
results_dict["n_events"].append(histograms.n_events)
# store full histogram (including flow bins) in the `histogram` column
results_dict["histogram"].append(histograms.histogram)
if "meta" not in results_dict and histograms.meta:
results_dict["meta"] = histograms.meta
def _set_result_units(self, table, unit):
"""
Set units for histogram columns that inherit from the input data.
For HistogramAggregator, the histogram columns should have the same units as the input data.
"""
for col in ("bin_edges", "bin_centers"):
table.meta[col].unit = unit
def _compute_histograms(
self, data, masked_elements_of_sample
) -> ChunkHistogramContainer:
r"""Compute histograms for a chunk of data.
Parameters
----------
data : ndarray
Event-wise data of shape (n_events, \*data_dimensions)
masked_elements_of_sample : ndarray, optional
Boolean mask of shape (\*data_dimensions) for elements to exclude
Returns
-------
ChunkHistogramContainer
Container with computed histograms for the chunk
"""
# Build the histograms over the event dimension (axis=0) for each element of the data dimensions
spatial_shape = data.shape[1:]
# Determine axis names for the histogram axes.
axis_names = (
[f"axis_{i + 1}" for i in range(len(spatial_shape))]
if self.axis_names is None
else self.axis_names
)
if len(axis_names) != len(spatial_shape):
raise ValueError(
f"Number of axis names '{len(axis_names)}' does not match spatial dimensions '{len(spatial_shape)}'."
)
# Broadcast mask to full shape
if masked_elements_of_sample is not None:
mask = np.broadcast_to(masked_elements_of_sample, data.shape)
else:
mask = np.zeros_like(data, dtype=bool)
# Mask also NaN values
invalid = np.isnan(data)
mask = mask | invalid
# Build one histogram per spatial element and combine them into a stack.
underflow = bool(self.axis_kwargs.get("underflow", False))
overflow = bool(self.axis_kwargs.get("overflow", False))
flow = underflow or overflow
n_flow_bins = int(underflow) + int(overflow)
stacked_histograms = hist.stack.Stack.from_iter(
self._make_histogram(
data[(slice(None),) + index], mask[(slice(None),) + index]
)
for index in np.ndindex(spatial_shape)
)
# Extract histogram counts and reshape to original data dimensions (with bin dimension first)
n_bins = stacked_histograms[0].axes[0].size + n_flow_bins
# Build counts including only the configured flow bins.
# `n_flow_bins` is 0, 1, or 2 depending on whether underflow and/or
# overflow are enabled in the axis configuration.
hist_counts = np.stack(
[np.asarray(h.view(flow=flow)) for h in stacked_histograms],
axis=-1,
)
hist_counts = hist_counts.reshape((n_bins,) + spatial_shape)
# Count valid entries per element (excludes masked and invalid values)
n_events_valid = np.sum(~mask, axis=0)
# Build and return the ChunkHistogramContainer
return ChunkHistogramContainer(
n_events=n_events_valid,
histogram=hist_counts,
meta={
"bin_edges": stacked_histograms[0].axes[0].edges,
"bin_centers": stacked_histograms[0].axes[0].centers,
"axis_class_name": self.axis_class_name,
"axis_kwargs": dict(self.axis_kwargs),
"axis_names": axis_names,
},
)
[docs]
@staticmethod
def hist_from_tablerow(
row: Row,
):
"""Build a ``hist.Hist`` from an Astropy table row produced by the
``HistogramAggregator``.
This is a thin wrapper that constructs a ``ChunkHistogramContainer``
from the provided ``row`` (reading ``n_events``, ``histogram`` and
``row.meta``) and delegates the actual reconstruction to
:meth:`hist_from_container`.
Note
----
``HistogramAggregator.__call__`` produces an ``astropy.table.Table``
with one row per chunk. To rebuild the histogram for chunk ``i`` you
can do::
result_table = aggregator(event_table)
hist = HistogramAggregator.hist_from_tablerow(result_table[i])
This helper handles extracting the stored ndarray and metadata from
the row and returns a complete ``hist.Hist`` object.
Parameters
----------
row : astropy.table.Row
Table row created by ``HistogramAggregator.__call__``. Expected to
contain a ``histogram`` ndarray with shape ``(n_bins, *data_dims)``,
number of valid events in ``n_events``, and metadata in ``row.meta``
describing the original axis.
Returns
-------
hist.Hist
Reconstructed histogram object with axes and counts populated.
"""
# Remove 'outlier_mask_detector_X' column names from the row
# to avoid issues when creating the ChunkHistogramContainer
row_dict = {
col: row[col]
for col in row.colnames
if not col.startswith("outlier_mask_detector_")
}
hist_container = ChunkHistogramContainer(**row_dict)
hist_container.meta = row.meta
return HistogramAggregator.hist_from_container(hist_container)
[docs]
@staticmethod
def hist_from_container(
hist_container: ChunkHistogramContainer,
):
"""Construct a ``hist.Hist`` from a ``ChunkHistogramContainer``.
Parameters
----------
hist_container : ChunkHistogramContainer
Stored histogram container.
"""
# Extract axis information from container metadata
axis_kwargs_meta = hist_container.meta.get("axis_kwargs", {}).copy()
axis_class = getattr(hist.axis, hist_container.meta["axis_class_name"])
axis_names = hist_container.meta["axis_names"]
# Reconstruct the original axis class so metadata round-trips exactly,
# including Integer and transformed/circular Regular axes.
axes = [axis_class(**axis_kwargs_meta)]
# Non-value axes must match histogram.shape[1:]
for name, n_bins in zip(axis_names, hist_container.histogram.shape[1:]):
axes.append(hist.axis.IntCategory(categories=np.arange(n_bins), name=name))
# Create a Hist object with the reconstructed axes and fill it with the stored histogram counts.
h = Hist(*axes)
h[...] = hist_container.histogram[...]
return h
def _make_histogram(self, values, mask):
valid_values = values[~mask]
hist_object = Hist(self.hist_axis, storage=hist.storage.Int64())
if len(valid_values) > 0:
hist_object.fill(**{self.hist_axis.name: valid_values})
return hist_object
[docs]
class StatisticsAggregator(BaseAggregator):
"""
Base component to handle the computation of aggregated statistic values from a table
containing any event-wise quantities (e.g., images, scalars, vectors, or other arrays).
Aggregation is performed along axis=0 (the event dimension) for any N-dimensional data.
"""
def _add_result_columns(
self,
data,
masked_elements_of_sample,
results_dict,
axis_names=None,
):
stats = self._compute_stats(data, masked_elements_of_sample)
results_dict["n_events"].append(stats.n_events)
results_dict["mean"].append(stats.mean)
results_dict["median"].append(stats.median)
results_dict["std"].append(stats.std)
def _set_result_units(self, table, unit):
"""
Set units for statistics columns that inherit from the input data.
For StatisticsAggregator, the mean, median, std, and histogram columns
should have the same units as the input data.
"""
for col in ("mean", "median", "std"):
table[col].unit = unit
@abstractmethod
def _compute_stats(
self, data, masked_elements_of_sample
) -> ChunkStatisticsContainer:
r"""Compute aggregated statistics for a chunk of data.
Parameters
----------
data : ndarray
Event-wise data of shape (n_events, \*data_dimensions)
masked_elements_of_sample : ndarray, optional
Boolean mask of shape (\*data_dimensions) for elements to exclude
Returns
-------
ChunkStatisticsContainer
Container with computed statistics for the chunk
"""
pass
[docs]
class PlainAggregator(StatisticsAggregator):
"""
Compute aggregated statistic values from a chunk of event-wise data using numpy functions.
Works with any N-dimensional event-wise data by aggregating along axis=0 (event dimension).
"""
def _compute_stats(
self, data, masked_elements_of_sample
) -> ChunkStatisticsContainer:
# Mask excluded elements and NaN/inf values
masked_data = np.ma.array(data, mask=masked_elements_of_sample)
masked_data = np.ma.masked_invalid(masked_data)
# Compute the mean, median, and std over the event dimension (axis=0)
element_mean = np.ma.mean(masked_data, axis=0)
element_median = np.ma.median(masked_data, axis=0)
element_std = np.ma.std(masked_data, axis=0)
# For 1D data, these operations return scalars (not MaskedArrays)
# Convert to array and fill masked values with NaN
element_mean = np.ma.filled(np.ma.asarray(element_mean), np.nan)
element_median = np.ma.filled(np.ma.asarray(element_median), np.nan)
element_std = np.ma.filled(np.ma.asarray(element_std), np.nan)
# Count non-masked events per element (excludes both masked and NaN/inf values)
n_events_per_element = np.count_nonzero(~masked_data.mask, axis=0)
return ChunkStatisticsContainer(
n_events=n_events_per_element,
mean=element_mean,
median=element_median,
std=element_std,
)
[docs]
class SigmaClippingAggregator(StatisticsAggregator):
"""
Compute aggregated statistic values from a chunk of event-wise data using astropy's sigma clipping functions.
Works with any N-dimensional event-wise data by aggregating along axis=0 (event dimension)
while removing outliers using sigma clipping.
"""
max_sigma = Int(
default_value=4,
help="Maximal value for the sigma clipping outlier removal",
).tag(config=True)
iterations = Int(
default_value=5,
help="Number of iterations for the sigma clipping outlier removal",
).tag(config=True)
def _compute_stats(
self, data, masked_elements_of_sample
) -> ChunkStatisticsContainer:
# Mask excluded elements and NaN/inf values
masked_data = np.ma.array(data, mask=masked_elements_of_sample)
masked_data = np.ma.masked_invalid(masked_data)
# Use sigma_clip to get the clipped data, then compute stats from it
# Clipping is performed along axis=0 (event dimension)
filtered_data = sigma_clip(
masked_data,
sigma=self.max_sigma,
maxiters=self.iterations,
cenfunc="mean",
axis=0,
)
# Count the number of events remaining after sigma clipping per element
# (excludes both masked, NaN/inf, and sigma-clipped values)
n_events_after_clipping = np.count_nonzero(~filtered_data.mask, axis=0)
# Compute statistics from the filtered data along the event dimension
element_mean = np.ma.mean(filtered_data, axis=0)
element_median = np.ma.median(filtered_data, axis=0)
element_std = np.ma.std(filtered_data, axis=0)
# For 1D data, these operations return scalars (not MaskedArrays)
# Convert to array and fill masked values with NaN
element_mean = np.ma.filled(np.ma.asarray(element_mean), np.nan)
element_median = np.ma.filled(np.ma.asarray(element_median), np.nan)
element_std = np.ma.filled(np.ma.asarray(element_std), np.nan)
return ChunkStatisticsContainer(
n_events=n_events_after_clipping,
mean=element_mean,
median=element_median,
std=element_std,
)