"""
Description of Arrays or Subarrays of telescopes
"""
import warnings
from collections import defaultdict
from collections.abc import Iterable
from contextlib import ExitStack
from copy import copy
from itertools import groupby
import numpy as np
import tables
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.coordinates.earth import EarthLocation
from astropy.table import QTable, Table, join
from astropy.utils import lazyproperty
from .. import __version__ as CTAPIPE_VERSION
from ..compat import COPY_IF_NEEDED
from ..coordinates import CameraFrame, GroundFrame
from ..exceptions import (
IncompatibleDataModelVersion,
UnknownSubarray,
UnknownTelescopeID,
)
from ..utils.datasets import get_structured_dataset, get_table_dataset
from .camera import CameraDescription, CameraGeometry, CameraReadout
from .optics import FocalLengthKind, OpticsDescription
from .telescope import TelescopeDescription
__all__ = ["SubarrayDescription"]
def _group_consecutives(sequence):
"""
Turn consecutive lists into ranges (used in SubarrayDescription.info())
from https://codereview.stackexchange.com/questions/214820/codewars-range-extraction
"""
sequence = sorted(sequence)
for _, g in groupby(enumerate(sequence), lambda i_x: int(i_x[0]) - int(i_x[1])):
r = [x for _, x in g]
if len(r) > 2:
yield f"{r[0]}-{r[-1]}"
else:
yield from map(str, r)
def _range_extraction(sequence):
return ",".join(_group_consecutives(sequence))
[docs]
class SubarrayDescription:
"""
Collects the `~ctapipe.instrument.TelescopeDescription` of all telescopes
along with their positions on the ground.
Attributes
----------
name: str
name of subarray
tel_coords: ctapipe.coordinates.GroundFrame
coordinates of all telescopes
tels:
dict of TelescopeDescription for each telescope in the subarray
"""
#: Current version number of the format written by `SubarrayDescription.to_hdf`
CURRENT_TAB_VERSION = "2.0"
#: Version numbers supported by `SubarrayDescription.from_hdf`
COMPATIBLE_VERSIONS = {"2.0"}
#: Current version of the service data format expected by `SubarrayDescription.from_service_data`
CURRENT_SERVICE_DATA_VERSION = "1.0"
#: Service data versions supported by `SubarrayDescription.from_service_data`
COMPATIBLE_SERVICE_DATA_VERSIONS = {"1.0"}
#: Current version of the subarray identifiers ("ctao.common.identifiers.subarrays") expected by `SubarrayDescription.from_service_data`
CURRENT_SUBARRAY_IDENTIFIERS_VERSION = "2.0"
#: Subarray identifiers versions supported by `SubarrayDescription.from_service_data`
COMPATIBLE_SUBARRAY_IDENTIFIERS_VERSIONS = {"2.0"}
#: Current version of the array element identifiers ("ctao.common.identifiers.array_elements") expected by `SubarrayDescription.from_service_data`
CURRENT_ARRAY_ELEMENTS_IDENTIFIERS_VERSION = "2.0"
#: Array element identifiers versions supported by `SubarrayDescription.from_service_data`
COMPATIBLE_ARRAY_ELEMENTS_IDENTIFIERS_VERSIONS = {"2.0"}
def __init__(
self,
name,
tel_positions,
tel_descriptions,
reference_location,
):
"""
Initialize a new SubarrayDescription
Parameters
----------
name : str
name of this subarray
tel_positions : dict[int, np.ndarray]
dict of x,y,z telescope positions on the ground by tel_id. These are
converted internally to a coordinate in the `~ctapipe.coordinates.GroundFrame`
tel_descriptions : dict[TelescopeDescription]
dict of TelescopeDescriptions by tel_id
reference_location : `astropy.coordinates.EarthLocation`
EarthLocation of the array reference position, (0, 0, 0) of the
coordinate system used for `tel_positions`.
"""
self.name = name
self.positions = tel_positions or dict()
self.tels: dict[int, TelescopeDescription] = tel_descriptions or dict()
self.reference_location = reference_location
if self.positions.keys() != self.tels.keys():
raise ValueError("Telescope ids in positions and descriptions do not match")
def __str__(self):
return self.name
def __repr__(self):
return "{}(name='{}', n_tels={})".format(
self.__class__.__name__, self.name, self.n_tels
)
@property
def tel(self) -> dict[int, TelescopeDescription]:
"""Dictionary mapping tel_ids to TelescopeDescriptions"""
return self.tels
@property
def n_tels(self):
"""number of telescopes in this subarray"""
return len(self.tels)
def __len__(self):
return len(self.tels)
[docs]
def info(self, printer=print):
"""
print descriptive info about subarray
"""
printer(f"Subarray : {self.name}")
printer(f"Num Tels : {self.n_tels}")
printer(f"Footprint: {self.footprint:.2f}")
printer(f"Height : {self.reference_location.geodetic.height:.2f}")
printer(
f"Lon/Lat : {self.reference_location.geodetic.lon}, "
f"{self.reference_location.geodetic.lat} "
)
printer("")
# print the per-telescope-type informatino:
tel_ids = defaultdict(list)
for tel_type in self.telescope_types:
ids = self.get_tel_ids_for_type(tel_type)
tel_ids[str(tel_type)].extend(ids)
n_tels = {tel_type: len(ids) for tel_type, ids in tel_ids.items()}
id_ranges = [_range_extraction(ids) for ids in tel_ids.values()]
out_table = Table(
{
"Type": list(n_tels.keys()),
"Count": list(n_tels.values()),
"Tel IDs": id_ranges,
}
)
out_table["Tel IDs"].format = "<s"
for line in str(out_table).split("\n"):
printer(line)
@lazyproperty
def tel_coords(self):
"""Telescope positions in `~ctapipe.coordinates.GroundFrame`"""
pos_x = [p[0].to_value(u.m) for p in self.positions.values()]
pos_y = [p[1].to_value(u.m) for p in self.positions.values()]
pos_z = [p[2].to_value(u.m) for p in self.positions.values()]
frame = GroundFrame(reference_location=self.reference_location)
return SkyCoord(x=pos_x, y=pos_y, z=pos_z, unit=u.m, frame=frame)
@lazyproperty
def tel_earth_locations(self):
"""
Telescope positions as `~astropy.coordinates.EarthLocation` objects.
Returns
-------
dict[int, EarthLocation]
Dictionary mapping telescope IDs to their EarthLocation.
This is cached to avoid expensive repeated conversions.
"""
return {
tel_id: coord.to_earth_location()
for tel_id, coord in zip(self.tel_ids, self.tel_coords)
}
@lazyproperty
def tel_ids(self):
"""Array of telescope ids in order of telescope indices"""
return np.array(list(self.tel.keys()))
@lazyproperty
def tel_indices(self):
"""dictionary mapping telescope ids to telescope index"""
return {tel_id: ii for ii, tel_id in enumerate(self.tels.keys())}
@lazyproperty
def tel_index_array(self):
"""
Array of telescope indices that can be indexed by telescope id
I.e. for a given telescope, this array maps the tel_id to a flat index
starting at 0 for the first telescope.
``tel_index = subarray.tel_index_array[tel_id]``
If the tel_ids are not contiguous, gaps will be filled in by int minval.
For a more compact representation use the `tel_indices`
"""
invalid = np.iinfo(int).min
idx = np.full(np.max(self.tel_ids) + 1, invalid, dtype=int)
for key, val in self.tel_indices.items():
idx[key] = val
return idx
[docs]
def tel_ids_to_indices(self, tel_ids):
"""maps a telescope id (or array of them) to flat indices
Parameters
----------
tel_ids : int or List[int]
array of tel IDs
Returns
-------
np.array:
array of corresponding tel indices
"""
tel_ids = np.array(tel_ids, dtype=int, copy=COPY_IF_NEEDED).ravel()
return self.tel_index_array[tel_ids]
[docs]
def tel_ids_to_mask(self, tel_ids):
"""Convert a list of telescope ids to a boolean mask
of length ``n_tels`` where the **index** of the telescope
is set to ``True`` for each tel_id in tel_ids
Parameters
----------
tel_ids : int or List[int]
array of tel IDs
Returns
-------
np.array[dtype=bool]:
Boolean array of length ``n_tels`` with indices of the
telescopes in ``tel_ids`` set to True.
"""
mask = np.zeros(self.n_tels, dtype=bool)
indices = self.tel_ids_to_indices(tel_ids)
mask[indices] = True
return mask
[docs]
def tel_mask_to_tel_ids(self, tel_mask):
"""
Convert a boolean mask of selected telescopes to a list of tel_ids.
Parameters
----------
tel_mask: array-like
Boolean array of length ``n_tels`` with indices of the
telescopes in ``tel_ids`` set to True.
Returns
-------
np.array:
Array of selected tel_ids
"""
return self.tel_ids[tel_mask]
@property
def footprint(self):
"""area of smallest circle containing array on ground"""
pos_x = self.tel_coords.x
pos_y = self.tel_coords.y
return (np.hypot(pos_x, pos_y).max() ** 2 * np.pi).to("km^2")
[docs]
def to_table(self, kind="subarray", meta_convention="hdf"):
"""
export SubarrayDescription information as an `astropy.table.Table`
Parameters
----------
kind: str
which table to generate (subarray or optics)
"""
if meta_convention not in {"hdf", "fits"}:
raise ValueError(
f"meta_convention must be 'hdf' or 'fits', not {meta_convention}"
)
if kind == "joined":
table = self.to_table()
optics = self.to_table(kind="optics")
optics["optics_index"] = np.arange(len(optics))
table = join(
table,
optics,
keys="optics_index",
# conflicts for TAB_VER, TAB_TYPE, not needed here, ignore
metadata_conflicts="silent",
)
table.remove_columns(["optics_index", "camera_index"])
table.add_index("tel_id")
return table
meta = {
"ORIGIN": "ctapipe.instrument.SubarrayDescription",
"SUBARRAY": self.name,
"SOFT_VER": CTAPIPE_VERSION,
"TAB_TYPE": kind,
}
if kind == "subarray":
if self.reference_location is not None:
itrs = self.reference_location.itrs
if meta_convention == "hdf":
meta["reference_itrs_x"] = itrs.x.to_value(u.m)
meta["reference_itrs_y"] = itrs.y.to_value(u.m)
meta["reference_itrs_z"] = itrs.z.to_value(u.m)
else:
meta["OBSGEO-X"] = itrs.x.to_value(u.m)
meta["OBSGEO-Y"] = itrs.y.to_value(u.m)
meta["OBSGEO-Z"] = itrs.z.to_value(u.m)
unique_optics = self.optics_types
ids = list(self.tels.keys())
descs = [str(t) for t in self.tels.values()]
tel_names = [t.name for t in self.tels.values()]
tel_types = [t.optics.size_type.value for t in self.tels.values()]
cam_names = [t.camera.name for t in self.tels.values()]
optics_names = [t.optics.name for t in self.tels.values()]
optics_index = [unique_optics.index(t.optics) for t in self.tels.values()]
camera_index = [
self.camera_types.index(t.camera) for t in self.tels.values()
]
tel_coords = self.tel_coords
tab = Table(
dict(
tel_id=np.array(ids, dtype=np.uint16),
name=tel_names,
type=tel_types,
pos_x=tel_coords.x,
pos_y=tel_coords.y,
pos_z=tel_coords.z,
camera_name=cam_names,
optics_name=optics_names,
camera_index=camera_index,
optics_index=optics_index,
tel_description=descs,
)
)
tab.meta["TAB_VER"] = self.CURRENT_TAB_VERSION
elif kind == "optics":
unique_optics = self.optics_types
mirror_area = u.Quantity(
[o.mirror_area.to_value(u.m**2) for o in unique_optics],
u.m**2,
)
focal_length = u.Quantity(
[o.equivalent_focal_length.to_value(u.m) for o in unique_optics],
u.m,
)
effective_focal_length = u.Quantity(
[o.effective_focal_length.to_value(u.m) for o in unique_optics],
u.m,
)
tab = Table(
{
"optics_name": [o.name for o in unique_optics],
"size_type": [o.size_type.value for o in unique_optics],
"reflector_shape": [o.reflector_shape.value for o in unique_optics],
"mirror_area": mirror_area,
"n_mirrors": [o.n_mirrors for o in unique_optics],
"n_mirror_tiles": [o.n_mirror_tiles for o in unique_optics],
"equivalent_focal_length": focal_length,
"effective_focal_length": effective_focal_length,
}
)
tab.meta["TAB_VER"] = OpticsDescription.CURRENT_TAB_VERSION
else:
raise ValueError(f"Table type '{kind}' not known")
tab.meta.update(meta)
return tab
[docs]
def select_subarray(self, tel_ids, name=None) -> "SubarrayDescription":
"""
return a new SubarrayDescription that is a sub-array of this one
Parameters
----------
tel_ids: list(int)
list of telescope IDs to include in the new subarray
name: str
name of new sub-selection
Returns
-------
SubarrayDescription
"""
unknown_tel_ids = set(tel_ids).difference(self.tel.keys())
if len(unknown_tel_ids) > 0:
known = _range_extraction(self.tel.keys())
raise UnknownTelescopeID(f"{unknown_tel_ids}, known telescopes: {known}")
tel_positions = {tid: self.positions[tid] for tid in tel_ids}
tel_descriptions = {tid: self.tel[tid] for tid in tel_ids}
if not name:
name = self.name + "_" + _range_extraction(tel_ids)
newsub = SubarrayDescription(
name,
tel_positions=tel_positions,
tel_descriptions=tel_descriptions,
reference_location=self.reference_location,
)
return newsub
[docs]
def peek(self, ax=None):
"""
Draw a quick matplotlib plot of the array
Parameters
----------
ax : matplotlib.axes.Axes or None
If given, the subarray will be plotted into this ax.
"""
from matplotlib import pyplot as plt
from ctapipe.coordinates.ground_frames import EastingNorthingFrame
from ctapipe.visualization import ArrayDisplay
if ax is None:
fig = plt.figure(figsize=(8, 8))
ax = fig.add_subplot(1, 1, 1)
else:
fig = ax.figure
ad = ArrayDisplay(
subarray=self, frame=EastingNorthingFrame(), tel_scale=0.75, axes=ax
)
ad.add_labels()
ax.set_title(self.name)
# This avoids a warning if users have e.g. "figure.constrained_layout"
# enabled through their matplotlib config. Only call tight layout here
# if the layout engine has not been set already
if fig.get_layout_engine() is None:
fig.tight_layout()
return ad
@lazyproperty
def telescope_types(self) -> tuple[TelescopeDescription]:
"""
Tuple of unique telescope types in the array
The entries are ordered by telescope name to be deterministic
but the order should not be relied on.
"""
unique_types = set(self.tel.values())
return tuple(sorted(unique_types, key=lambda tel: tel.name))
@lazyproperty
def camera_types(self) -> tuple[CameraDescription]:
"""
Tuple of unique camera types in the array
The entries are ordered by camera name to be deterministic
but the order should not be relied on.
"""
unique_cameras = {tel.camera for tel in self.tel.values()}
return tuple(sorted(unique_cameras, key=lambda camera: camera.name))
@lazyproperty
def optics_types(self) -> tuple[OpticsDescription]:
"""
Tuple of unique optics types in the array
The entries are ordered by optics name to be deterministic
but the order should not be relied on.
"""
unique_optics = {tel.optics for tel in self.tel.values()}
return tuple(sorted(unique_optics, key=lambda optics: optics.name))
[docs]
def get_tel_ids_for_type(self, tel_type) -> tuple[int]:
"""
return tuple of tel_ids that have the given tel_type
Parameters
----------
tel_type: str or TelescopeDescription
telescope type string (e.g. 'MST_MST_NectarCam')
"""
if isinstance(tel_type, TelescopeDescription):
if tel_type not in self.telescope_types:
raise ValueError(f"{tel_type} not in subarray: {self.telescope_types}")
return tuple(
tel_id for tel_id, descr in self.tels.items() if descr == tel_type
)
else:
valid = {str(tel) for tel in self.telescope_types}
if tel_type not in valid:
raise ValueError(f"{tel_type} not in subarray: {valid}")
return tuple(
tel_id for tel_id, descr in self.tels.items() if str(descr) == tel_type
)
[docs]
def get_tel_indices_for_type(self, tel_type):
"""
return tuple of telescope indices that have the given tel_type
Parameters
----------
tel_type: str or TelescopeDescription
telescope type string (e.g. 'MST_MST_NectarCam')
"""
return self.tel_ids_to_indices(self.get_tel_ids_for_type(tel_type))
[docs]
def multiplicity(self, tel_mask, tel_type=None):
"""
Compute multiplicity from a telescope mask
Parameters
----------
tel_mask : np.ndarray
Boolean array with last dimension of size n_telescopes
tel_type : None, str or TelescopeDescription
If not given, compute multiplicity from all telescopes.
If given, the multiplicity of the given telescope type will
be computed.
Returns
-------
multiplicity : int or np.ndarray
Number of true values for given telescope mask and telescope type
"""
if tel_type is None:
return np.count_nonzero(tel_mask, axis=-1)
return np.count_nonzero(
tel_mask[..., self.get_tel_indices_for_type(tel_type)], axis=-1
)
[docs]
def get_tel_ids(
self, telescopes: Iterable[int | str | TelescopeDescription]
) -> tuple[int]:
"""
Convert a list of telescope ids and telescope descriptions to
a list of unique telescope ids.
Parameters
----------
telescopes: List[Union[int, str, TelescopeDescription]]
List of Telescope IDs and descriptions.
Supported inputs for telescope descriptions are instances of
`~ctapipe.instrument.TelescopeDescription` as well as their
string representation.
Returns
-------
tel_ids: List[int]
List of unique telescope ids matching ``telescopes``
"""
ids = set()
# support single telescope element
if isinstance(telescopes, int | str | TelescopeDescription):
telescopes = (telescopes,)
for telescope in telescopes:
if isinstance(telescope, int | np.integer):
if telescope not in self.tel:
raise ValueError(
f"Telescope with tel_id={telescope} not in subarray."
)
ids.add(telescope)
else:
ids.update(self.get_tel_ids_for_type(telescope))
return tuple(sorted(ids))
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
if self.name != other.name:
return False
if self.tels.keys() != other.tels.keys():
return False
if self.positions.keys() != other.positions.keys():
return False
if self.reference_location != other.reference_location:
return False
for tel_id in self.tels.keys():
if self.tels[tel_id] != other.tels[tel_id]:
return False
for tel_id in self.tels.keys():
if np.any(self.positions[tel_id] != other.positions[tel_id]):
return False
return True
[docs]
def to_hdf(self, h5file, overwrite=False, mode="a"):
"""write the SubarrayDescription
Parameters
----------
h5file : str, bytes, path or tables.File
Path or already opened tables.File with write permission
overwrite : False
If the output path already contains a subarray, by default
an error will be raised. Set ``overwrite=True`` to overwrite an
existing subarray. This does not affect other content of the file.
Use ``mode="w"`` to completely overwrite the output path.
mode : str
If h5file is not an already opened file, the output file will
be opened with the given mode. Must be a mode that enables writing.
"""
# here to prevent circular import
from ..io import write_table
with ExitStack() as stack:
if not isinstance(h5file, tables.File):
h5file = stack.enter_context(tables.open_file(h5file, mode=mode))
if "/configuration/instrument/subarray" in h5file.root and not overwrite:
raise OSError(
"File already contains a SubarrayDescription and overwrite=False"
)
subarray_table = self.to_table(kind="subarray", meta_convention="hdf")
subarray_table.meta["name"] = self.name
write_table(
subarray_table,
h5file,
path="/configuration/instrument/subarray/layout",
overwrite=overwrite,
)
write_table(
self.to_table(kind="optics"),
h5file,
path="/configuration/instrument/telescope/optics",
overwrite=overwrite,
)
for i, camera in enumerate(self.camera_types):
write_table(
camera.geometry.to_table(),
h5file,
path=f"/configuration/instrument/telescope/camera/geometry_{i}",
overwrite=overwrite,
)
write_table(
camera.readout.to_table(),
h5file,
path=f"/configuration/instrument/telescope/camera/readout_{i}",
overwrite=overwrite,
)
[docs]
@classmethod
def from_hdf(cls, path, focal_length_choice=FocalLengthKind.EFFECTIVE):
# here to prevent circular import
from ..io import read_table
if isinstance(focal_length_choice, str):
focal_length_choice = FocalLengthKind[focal_length_choice.upper()]
layout = read_table(
path, "/configuration/instrument/subarray/layout", table_cls=QTable
)
version = layout.meta.get("TAB_VER")
if version not in cls.COMPATIBLE_VERSIONS:
raise IncompatibleDataModelVersion(
f"Unsupported HDF subarray table version: {version}. "
f"Compatible versions: {cls.COMPATIBLE_VERSIONS}"
)
cameras = {}
# backwards compatibility for older tables, index is the name
if "camera_index" not in layout.colnames:
layout["camera_index"] = layout["camera_type"]
for idx in set(layout["camera_index"]):
geometry = CameraGeometry.from_table(
read_table(
path, f"/configuration/instrument/telescope/camera/geometry_{idx}"
)
)
readout = CameraReadout.from_table(
read_table(
path, f"/configuration/instrument/telescope/camera/readout_{idx}"
)
)
cameras[idx] = CameraDescription(
name=geometry.name, readout=readout, geometry=geometry
)
optics_table = read_table(
path, "/configuration/instrument/telescope/optics", table_cls=QTable
)
optics_version = optics_table.meta.get("TAB_VER")
if optics_version not in OpticsDescription.COMPATIBLE_VERSIONS:
raise IncompatibleDataModelVersion(
f"Unsupported HDF optics table version: {optics_version}. "
f"Compatible versions: {OpticsDescription.COMPATIBLE_VERSIONS}"
)
# for backwards compatibility
# if optics_index not in table, guess via telescope_description string
# might not result in correct array when there are duplicated telescope_description
# strings
if "optics_index" not in layout.colnames:
descriptions = list(optics_table["description"])
layout["optics_index"] = [
descriptions.index(tel) for tel in layout["tel_description"]
]
if len(descriptions) != len(set(descriptions)):
warnings.warn(
"Array contains different telescopes with the same description string"
" representation, optics descriptions will be incorrect for some of the telescopes."
" Reprocessing the data with ctapipe >= 0.12 will fix this problem."
)
optic_descriptions = [
OpticsDescription(
name=str(row["optics_name"]),
size_type=row["size_type"],
reflector_shape=row["reflector_shape"],
n_mirrors=row["n_mirrors"],
equivalent_focal_length=row["equivalent_focal_length"],
effective_focal_length=row["effective_focal_length"],
mirror_area=row["mirror_area"],
n_mirror_tiles=row["n_mirror_tiles"],
)
for row in optics_table
]
# give correct frame for the camera to each telescope
telescope_descriptions = {}
for row in layout:
# copy to support different telescopes with same camera geom
camera = copy(cameras[row["camera_index"]])
optics = optic_descriptions[row["optics_index"]]
if focal_length_choice is FocalLengthKind.EFFECTIVE:
focal_length = optics.effective_focal_length
if np.isnan(focal_length.value):
raise RuntimeError(
"`focal_length_choice` was set to 'EFFECTIVE', but the"
" effective focal length was not present in the file. "
" Set `focal_length_choice='EQUIVALENT'` or make sure"
" input files contain the effective focal length"
)
elif focal_length_choice is FocalLengthKind.EQUIVALENT:
focal_length = optics.equivalent_focal_length
else:
raise ValueError(f"Invalid focal length choice: {focal_length_choice}")
camera.geometry.frame = CameraFrame(focal_length=focal_length)
telescope_descriptions[row["tel_id"]] = TelescopeDescription(
name=str(row["name"]), optics=optics, camera=camera
)
positions = np.column_stack([layout[f"pos_{c}"] for c in "xyz"])
name = layout.meta.get("SUBARRAY", "Unknown")
if "reference_itrs_x" in layout.meta:
reference_location = EarthLocation(
x=layout.meta["reference_itrs_x"] * u.m,
y=layout.meta["reference_itrs_y"] * u.m,
z=layout.meta["reference_itrs_z"] * u.m,
)
else:
# "null island" dummy location for backwards compatibility with old files
reference_location = EarthLocation(
lon=0 * u.deg,
lat=0 * u.deg,
height=0 * u.m,
)
return cls(
name=str(name),
tel_positions={
tel_id: pos for tel_id, pos in zip(layout["tel_id"], positions)
},
tel_descriptions=telescope_descriptions,
reference_location=reference_location,
)
[docs]
@staticmethod
def read(path, **kwargs):
"""Read subarray from path
This uses the `~ctapipe.io.EventSource` mechanism, so it should be
able to read a subarray from any file supported by ctapipe or an
installed io plugin.
kwargs are passed to the event source
"""
# here to prevent circular import
from ..io import EventSource
with EventSource(path, **kwargs) as s:
return s.subarray
[docs]
@staticmethod
def check_matching_subarrays(subarray_list: list) -> bool:
"""Check if subarrays are matching
Currently only checks if the tel_ids are the same in all subarrays.
More checks can be added if needed.
"""
return all(
set(subarray.tel_ids) == set(subarray_list[0].tel_ids)
for subarray in subarray_list
)
@staticmethod
def _resolve_telescope_type(ae_id):
"""Resolve telescope type by following symlink from ae_id directory.
Parameters
----------
ae_id : int
Array element ID
Returns
-------
str
Telescope type name (e.g., 'LSTN', 'MSTN')
"""
import os
from pathlib import Path
searchpath = os.getenv("CTAPIPE_SVC_PATH")
if not searchpath:
raise FileNotFoundError("CTAPIPE_SVC_PATH not set")
ae_id_str = f"{ae_id:03d}"
for search_dir in searchpath.split(os.pathsep):
candidate = Path(search_dir) / "instrument/array-elements" / ae_id_str
if candidate.exists():
return candidate.resolve().name
raise FileNotFoundError(f"instrument/array-elements/{ae_id_str} not found")
@staticmethod
def _load_telescope_description(ae_id, tel_name, tel_type):
"""Load telescope description from service data files.
Tries to load files with ae_id prefix first (e.g., 001.optics),
then falls back to tel_type prefix (e.g., LSTN.optics).
Parameters
----------
ae_id : int
Array element ID
tel_name : str
Telescope name
tel_type : str
Telescope type (e.g., 'LSTN', 'MSTN')
Returns
-------
TelescopeDescription
"""
ae_id_str = f"{ae_id:03d}"
# Helper function to try ae_id file first, then fall back to tel_type
def load_with_fallback(file_type, role):
try:
return get_table_dataset(f"{ae_id_str}.{file_type}", role=role)
except FileNotFoundError:
return get_table_dataset(f"{tel_type}.{file_type}", role=role)
# Load optics
optics_table = QTable(load_with_fallback("optics", "dl0.sub.svc.optics"))
optics_meta = optics_table.meta
optics_version = optics_meta.get("TAB_VER")
if optics_version not in OpticsDescription.COMPATIBLE_VERSIONS:
raise IncompatibleDataModelVersion(
f"Incompatible optics table version: {optics_version}. "
f"Compatible versions: {OpticsDescription.COMPATIBLE_VERSIONS}"
)
optics = OpticsDescription(
name=str(optics_meta["optics_name"]),
size_type=optics_meta["size_type"],
reflector_shape=optics_meta["reflector_shape"],
n_mirrors=optics_meta["n_mirrors"],
equivalent_focal_length=u.Quantity(optics_meta["equivalent_focal_length"]),
effective_focal_length=u.Quantity(optics_meta["effective_focal_length"]),
mirror_area=u.Quantity(optics_meta["mirror_area"]),
n_mirror_tiles=optics_meta["n_mirror_tiles"],
)
# Use effective focal length for camera frame, fall back to equivalent
focal_length = optics.effective_focal_length
if np.isnan(focal_length.value):
focal_length = optics.equivalent_focal_length
# Load camera geometry and readout
geom_table = load_with_fallback("camgeom", "dl0.sub.svc.camera")
readout_table = load_with_fallback("camreadout", "dl0.sub.svc.camera")
geometry = CameraGeometry.from_table(geom_table)
readout = CameraReadout.from_table(readout_table)
camera = CameraDescription(
name=geometry.name, geometry=geometry, readout=readout
)
camera.geometry.frame = CameraFrame(focal_length=focal_length)
return TelescopeDescription(
name=tel_name,
optics=optics,
camera=camera,
)
@staticmethod
def _load_telescope_positions(subarray_info, positions_table):
"""Load telescope positions for a subarray.
Parameters
----------
subarray_info : dict
Subarray information from subarray-ids.json
positions_table : QTable
Table with telescope positions
Returns
-------
dict
Dictionary mapping ae_id to position Quantity
"""
tel_positions = {}
for ae_id in subarray_info["array_element_ids"]:
row = positions_table[positions_table["ae_id"] == ae_id]
if len(row) == 0:
warnings.warn(f"Array element {ae_id} not found in positions table")
continue
if len(row) > 1:
warnings.warn(
f"Multiple entries for array element {ae_id}, using first"
)
row = row[0]
if np.isnan(row["x"]) or np.isnan(row["y"]) or np.isnan(row["z"]):
warnings.warn(f"Array element {ae_id} has NaN position, skipping")
continue
tel_positions[ae_id] = u.Quantity([row["x"], row["y"], row["z"]], u.m)
return tel_positions
@staticmethod
def _version_check(reference_meta, compatible_versions):
"""Check if the subarray description is compatible with the current version."""
version = reference_meta.product.data_model_version
if version not in compatible_versions:
raise IncompatibleDataModelVersion(
f"Incompatible {reference_meta.product.data_model_name} version : {version}. "
f"compatible versions: {compatible_versions}"
)
[docs]
@classmethod
def from_service_data(cls, subarray_id):
"""
Create a SubarrayDescription from CTAO service data files.
This method loads subarray definitions from the service data.
Expected directory structure::
instrument/
├── instrument.meta.json
├── array-element-ids.json
├── array-elements/
│ ├── 001 -> LSTN
│ ├── 002 -> LSTN
│ ├── 003 -> LSTN
│ ├── 004 -> LSTN
│ └── LSTN/
│ ├── LSTN.camgeom.fits.gz
│ ├── LSTN.camreadout.fits.gz
│ └── LSTN.optics.ecsv
├── positions/
│ ├── CTAO-North_ArrayElementPositions.ecsv
│ └── CTAO-South_ArrayElementPositions.ecsv
└── subarray-ids.json
Files:
- instrument.meta.json: metadata about the service data version for backwards compatibility
- array-element-ids.json: mapping of telescope IDs to names
- subarray-ids.json: subarray definitions
- positions/{site}_ArrayElementPositions.ecsv: ECSV files with telescope positions for each site
- array-elements/``{ae_id:03d}``/: Instrument description files for each array element. Symlinks may be used to de-duplicate descriptions.
Each directory should contain optics.ecsv, camgeom.fits.gz, and camreadout.fits.gz files.
Files can be named either ``{ae_id:03d}.*`` (e.g., 001.optics.ecsv) for telescope-specific configurations
or ``{type}.*`` (e.g., LSTN.optics.ecsv) for shared configurations across multiple telescopes
Parameters
----------
subarray_id : int
The subarray ID to load
Returns
-------
SubarrayDescription
New SubarrayDescription instance
"""
# Check service data version
from ..io.metadata import Reference
reference_meta = Reference.from_json(
get_structured_dataset("instrument.meta", role="dl0.sub.svc.meta")
)
cls._version_check(reference_meta, cls.COMPATIBLE_SERVICE_DATA_VERSIONS)
# Load array element IDs
ae_data = get_structured_dataset(
"array-element-ids", role="dl0.sub.svc.array_elements"
)
cls._version_check(
Reference.from_json(ae_data.get("metadata", ae_data)),
cls.COMPATIBLE_ARRAY_ELEMENTS_IDENTIFIERS_VERSIONS,
)
ae_id_to_name = {ae["id"]: ae["name"] for ae in ae_data["array_elements"]}
# Load subarray definition
subarray_data = get_structured_dataset(
"subarray-ids", role="dl0.sub.svc.subarray"
)
cls._version_check(
Reference.from_json(subarray_data.get("metadata", subarray_data)),
cls.COMPATIBLE_SUBARRAY_IDENTIFIERS_VERSIONS,
)
subarray_info = None
for sa in subarray_data["subarrays"]:
if sa["id"] == subarray_id:
subarray_info = sa
break
if subarray_info is None:
raise UnknownSubarray(f"Subarray ID {subarray_id} not found")
# Load positions
site = subarray_info["site"]
filename = f"{site}_ArrayElementPositions"
positions_table = QTable(
get_table_dataset(filename, role="dl0.sub.svc.arraylayout")
)
# Extract reference location from metadata
reference_location = EarthLocation(
x=u.Quantity(positions_table.meta["reference_x"]),
y=u.Quantity(positions_table.meta["reference_y"]),
z=u.Quantity(positions_table.meta["reference_z"]),
)
# Build telescope positions
tel_positions = cls._load_telescope_positions(subarray_info, positions_table)
# Build telescope descriptions
tel_descriptions = {}
for ae_id in tel_positions.keys():
tel_name = ae_id_to_name.get(ae_id, f"Unknown-{ae_id}")
try:
tel_type = cls._resolve_telescope_type(ae_id)
tel_descriptions[ae_id] = cls._load_telescope_description(
ae_id, tel_name, tel_type
)
except (FileNotFoundError, ValueError, KeyError) as e:
raise type(e)(
f"Could not load telescope description for element {ae_id} ({tel_name}): {e}"
) from e
return cls(
name=subarray_info["name"],
tel_positions=tel_positions,
tel_descriptions=tel_descriptions,
reference_location=reference_location,
)