Source code for ctapipe.instrument.subarray

"""
Description of Arrays or Subarrays of telescopes
"""
import warnings
from contextlib import ExitStack
from copy import copy
from itertools import groupby
from typing import Dict, Iterable, Tuple, Union

import numpy as np
import tables
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.coordinates.earth import EarthLocation
from astropy.table import QTable, Table, join
from astropy.utils import lazyproperty

from .. import __version__ as CTAPIPE_VERSION
from ..coordinates import CameraFrame, GroundFrame
from .camera import CameraDescription, CameraGeometry, CameraReadout
from .optics import FocalLengthKind, OpticsDescription
from .telescope import TelescopeDescription

__all__ = ["SubarrayDescription"]


[docs]class UnknownTelescopeID(KeyError): """Raised when an unknown telescope id is encountered"""
def _group_consecutives(sequence): """ Turn consequtive lists into ranges (used in SubarrayDescription.info()) from https://codereview.stackexchange.com/questions/214820/codewars-range-extraction """ sequence = sorted(sequence) for _, g in groupby(enumerate(sequence), lambda i_x: i_x[0] - i_x[1]): r = [x for _, x in g] if len(r) > 2: yield f"{r[0]}-{r[-1]}" else: yield from map(str, r) def _range_extraction(sequence): return ",".join(_group_consecutives(sequence))
[docs]class SubarrayDescription: """ Collects the `~ctapipe.instrument.TelescopeDescription` of all telescopes along with their positions on the ground. Attributes ---------- name: str name of subarray tel_coords: astropy.coordinates.SkyCoord coordinates of all telescopes tels: dict of TelescopeDescription for each telescope in the subarray """ CURRENT_TAB_VERSION = "2.0" COMPATIBLE_VERSIONS = {"2.0"} def __init__( self, name, tel_positions=None, tel_descriptions=None, reference_location=None, ): """ Initialize a new SubarrayDescription Parameters ---------- name : str name of this subarray tel_positions : Dict[int, np.ndarray] dict of x,y,z telescope positions on the ground by tel_id. These are converted internally to a coordinate in the `~ctapipe.coordinates.GroundFrame` tel_descriptions : Dict[TelescopeDescription] dict of TelescopeDescriptions by tel_id reference_location : `astropy.coordinates.EarthLocation` EarthLocation of the array reference position, (0, 0, 0) of the coordinate system used for `tel_positions`. """ self.name = name self.positions = tel_positions or dict() self.tels: Dict[int, TelescopeDescription] = tel_descriptions or dict() self.reference_location = reference_location if self.positions.keys() != self.tels.keys(): raise ValueError("Telescope ids in positions and descriptions do not match") def __str__(self): return self.name def __repr__(self): return "{}(name='{}', n_tels={})".format( self.__class__.__name__, self.name, self.n_tels ) @property def tel(self) -> Dict[int, TelescopeDescription]: """Dictionary mapping tel_ids to TelescopeDescriptions""" return self.tels @property def n_tels(self): """number of telescopes in this subarray""" return len(self.tels) def __len__(self): return len(self.tels)
[docs] def info(self, printer=print): """ print descriptive info about subarray """ printer(f"Subarray : {self.name}") printer(f"Num Tels : {self.n_tels}") printer(f"Footprint: {self.footprint:.2f}") printer("") # print the per-telescope-type informatino: n_tels = {} tel_ids = {} for tel_type in self.telescope_types: ids = self.get_tel_ids_for_type(tel_type) tel_ids[str(tel_type)] = _range_extraction(ids) n_tels[str(tel_type)] = len(ids) out_table = Table( { "Type": list(n_tels.keys()), "Count": list(n_tels.values()), "Tel IDs": list(tel_ids.values()), } ) out_table["Tel IDs"].format = "<s" for line in str(out_table).split("\n"): printer(line)
@lazyproperty def tel_coords(self): """returns telescope positions as astropy.coordinates.SkyCoord""" pos_x = [p[0].to_value(u.m) for p in self.positions.values()] pos_y = [p[1].to_value(u.m) for p in self.positions.values()] pos_z = [p[2].to_value(u.m) for p in self.positions.values()] return SkyCoord(x=pos_x, y=pos_y, z=pos_z, unit=u.m, frame=GroundFrame()) @lazyproperty def tel_ids(self): """telescope IDs as an array""" return np.array(list(self.tel.keys())) @lazyproperty def tel_indices(self): """returns dict mapping tel_id to tel_index, useful for unpacking lists based on tel_ids into fixed-length arrays""" return {tel_id: ii for ii, tel_id in enumerate(self.tels.keys())} @lazyproperty def tel_index_array(self): """ returns an expanded array that maps tel_id to tel_index. I.e. for a given telescope, this array maps the tel_id to a flat index starting at 0 for the first telescope. ``tel_index = tel_id_to_index_array[tel_id]`` If the tel_ids are not contiguous, gaps will be filled in by -1. For a more compact representation use the `tel_indices` """ idx = np.zeros(np.max(self.tel_ids) + 1, dtype=int) - 1 # start with -1 for key, val in self.tel_indices.items(): idx[key] = val return idx
[docs] def tel_ids_to_indices(self, tel_ids): """maps a telescope id (or array of them) to flat indices Parameters ---------- tel_ids : int or List[int] array of tel IDs Returns ------- np.array: array of corresponding tel indices """ tel_ids = np.array(tel_ids, dtype=int, copy=False).ravel() return self.tel_index_array[tel_ids]
[docs] def tel_ids_to_mask(self, tel_ids): """Convert a list of telescope ids to a boolean mask of length ``n_tels`` where the **index** of the telescope is set to ``True`` for each tel_id in tel_ids Parameters ---------- tel_ids : int or List[int] array of tel IDs Returns ------- np.array[dtype=bool]: Boolean array of length ``n_tels`` with indices of the telescopes in ``tel_ids`` set to True. """ mask = np.zeros(self.n_tels, dtype=bool) indices = self.tel_ids_to_indices(tel_ids) mask[indices] = True return mask
[docs] def tel_mask_to_tel_ids(self, tel_mask): """ Convert a boolean mask of selected telescopes to a list of tel_ids. Parameters ---------- tel_mask: array-like Boolean array of length ``n_tels`` with indices of the telescopes in ``tel_ids`` set to True. Returns ------- np.array: Array of selected tel_ids """ return self.tel_ids[tel_mask]
@property def footprint(self): """area of smallest circle containing array on ground""" pos_x = self.tel_coords.x pos_y = self.tel_coords.y return (np.hypot(pos_x, pos_y).max() ** 2 * np.pi).to("km^2")
[docs] def to_table(self, kind="subarray"): """ export SubarrayDescription information as an `astropy.table.Table` Parameters ---------- kind: str which table to generate (subarray or optics) """ if kind == "joined": table = self.to_table() optics = self.to_table(kind="optics") optics["optics_index"] = np.arange(len(optics)) table = join( table, optics, keys="optics_index", # conflicts for TAB_VER, TAB_TYPE, not needed here, ignore metadata_conflicts="silent", ) table.remove_columns(["optics_index", "camera_index"]) table.add_index("tel_id") return table meta = { "ORIGIN": "ctapipe.instrument.SubarrayDescription", "SUBARRAY": self.name, "SOFT_VER": CTAPIPE_VERSION, "TAB_TYPE": kind, } if kind == "subarray": if self.reference_location is not None: itrs = self.reference_location.itrs meta["OBSGEO-X"] = itrs.x.to_value(u.m) meta["OBSGEO-Y"] = itrs.y.to_value(u.m) meta["OBSGEO-Z"] = itrs.z.to_value(u.m) unique_optics = self.optics_types ids = list(self.tels.keys()) descs = [str(t) for t in self.tels.values()] tel_names = [t.name for t in self.tels.values()] tel_types = [t.optics.size_type.value for t in self.tels.values()] cam_names = [t.camera.name for t in self.tels.values()] optics_names = [t.optics.name for t in self.tels.values()] optics_index = [unique_optics.index(t.optics) for t in self.tels.values()] camera_index = [ self.camera_types.index(t.camera) for t in self.tels.values() ] tel_coords = self.tel_coords tab = Table( dict( tel_id=np.array(ids, dtype=np.short), name=tel_names, type=tel_types, pos_x=tel_coords.x, pos_y=tel_coords.y, pos_z=tel_coords.z, camera_name=cam_names, optics_name=optics_names, camera_index=camera_index, optics_index=optics_index, tel_description=descs, ) ) tab.meta["TAB_VER"] = self.CURRENT_TAB_VERSION elif kind == "optics": unique_optics = self.optics_types mirror_area = u.Quantity( [o.mirror_area.to_value(u.m**2) for o in unique_optics], u.m**2, ) focal_length = u.Quantity( [o.equivalent_focal_length.to_value(u.m) for o in unique_optics], u.m, ) effective_focal_length = u.Quantity( [o.effective_focal_length.to_value(u.m) for o in unique_optics], u.m, ) tab = Table( { "optics_name": [o.name for o in unique_optics], "size_type": [o.size_type.value for o in unique_optics], "reflector_shape": [o.reflector_shape.value for o in unique_optics], "mirror_area": mirror_area, "n_mirrors": [o.n_mirrors for o in unique_optics], "n_mirror_tiles": [o.n_mirror_tiles for o in unique_optics], "equivalent_focal_length": focal_length, "effective_focal_length": effective_focal_length, } ) tab.meta["TAB_VER"] = OpticsDescription.CURRENT_TAB_VERSION else: raise ValueError(f"Table type '{kind}' not known") tab.meta.update(meta) return tab
[docs] def select_subarray(self, tel_ids, name=None): """ return a new SubarrayDescription that is a sub-array of this one Parameters ---------- tel_ids: list(int) list of telescope IDs to include in the new subarray name: str name of new sub-selection Returns ------- SubarrayDescription """ unknown_tel_ids = set(tel_ids).difference(self.tel.keys()) if len(unknown_tel_ids) > 0: known = _range_extraction(self.tel.keys()) raise UnknownTelescopeID(f"{unknown_tel_ids}, known telescopes: {known}") tel_positions = {tid: self.positions[tid] for tid in tel_ids} tel_descriptions = {tid: self.tel[tid] for tid in tel_ids} if not name: name = self.name + "_" + _range_extraction(tel_ids) newsub = SubarrayDescription( name, tel_positions=tel_positions, tel_descriptions=tel_descriptions ) return newsub
[docs] def peek(self): """ Draw a quick matplotlib plot of the array """ from matplotlib import pyplot as plt from ctapipe.coordinates.ground_frames import EastingNorthingFrame from ctapipe.visualization import ArrayDisplay plt.figure(figsize=(8, 8)) ad = ArrayDisplay(subarray=self, frame=EastingNorthingFrame(), tel_scale=0.75) ad.add_labels() plt.title(self.name) plt.tight_layout()
@lazyproperty def telescope_types(self) -> Tuple[TelescopeDescription]: """ Tuple of unique telescope types in the array The entries are ordered by telescope name to be deterministic but the order should not be relied on. """ unique_types = set(self.tel.values()) return tuple(sorted(unique_types, key=lambda tel: tel.name)) @lazyproperty def camera_types(self) -> Tuple[CameraDescription]: """ Tuple of unique camera types in the array The entries are ordered by camera name to be deterministic but the order should not be relied on. """ unique_cameras = {tel.camera for tel in self.tel.values()} return tuple(sorted(unique_cameras, key=lambda camera: camera.name)) @lazyproperty def optics_types(self) -> Tuple[OpticsDescription]: """ Tuple of unique optics types in the array The entries are ordered by optics name to be deterministic but the order should not be relied on. """ unique_optics = {tel.optics for tel in self.tel.values()} return tuple(sorted(unique_optics, key=lambda optics: optics.name))
[docs] def get_tel_ids_for_type(self, tel_type) -> Tuple[int]: """ return tuple of tel_ids that have the given tel_type Parameters ---------- tel_type: str or TelescopeDescription telescope type string (e.g. 'MST_MST_NectarCam') """ if isinstance(tel_type, TelescopeDescription): if tel_type not in self.telescope_types: raise ValueError(f"{tel_type} not in subarray: {self.telescope_types}") return tuple( tel_id for tel_id, descr in self.tels.items() if descr == tel_type ) else: valid = {str(tel) for tel in self.telescope_types} if tel_type not in valid: raise ValueError(f"{tel_type} not in subarray: {valid}") return tuple( tel_id for tel_id, descr in self.tels.items() if str(descr) == tel_type )
[docs] def get_tel_indices_for_type(self, tel_type): """ return tuple of telescope indices that have the given tel_type Parameters ---------- tel_type: str or TelescopeDescription telescope type string (e.g. 'MST_MST_NectarCam') """ return self.tel_ids_to_indices(self.get_tel_ids_for_type(tel_type))
[docs] def multiplicity(self, tel_mask, tel_type=None): """ Compute multiplicity from a telescope mask Parameters ---------- tel_mask : np.ndarray Boolean array with last dimension of size n_telescopes tel_type : None, str or TelescopeDescription If not given, compute multiplicity from all telescopes. If given, the multiplicity of the given telescope type will be computed. Returns ------- multiplicity : int or np.ndarray Number of true values for given telescope mask and telescope type """ if tel_type is None: return np.count_nonzero(tel_mask, axis=-1) return np.count_nonzero( tel_mask[..., self.get_tel_indices_for_type(tel_type)], axis=-1 )
[docs] def get_tel_ids( self, telescopes: Iterable[Union[int, str, TelescopeDescription]] ) -> Tuple[int]: """ Convert a list of telescope ids and telescope descriptions to a list of unique telescope ids. Parameters ---------- telescopes: List[Union[int, str, TelescopeDescription]] List of Telescope IDs and descriptions. Supported inputs for telescope descriptions are instances of `~ctapipe.instrument.TelescopeDescription` as well as their string representation. Returns ------- tel_ids: List[int] List of unique telescope ids matching ``telescopes`` """ ids = set() # support single telescope element if isinstance(telescopes, (int, str, TelescopeDescription)): telescopes = (telescopes,) for telescope in telescopes: if isinstance(telescope, (int, np.integer)): if telescope not in self.tel: raise ValueError( f"Telescope with tel_id={telescope} not in subarray." ) ids.add(telescope) else: ids.update(self.get_tel_ids_for_type(telescope)) return tuple(sorted(ids))
def __eq__(self, other): if not isinstance(other, self.__class__): return False if self.name != other.name: return False if self.tels.keys() != other.tels.keys(): return False if self.positions.keys() != other.positions.keys(): return False if self.reference_location != other.reference_location: return False for tel_id in self.tels.keys(): if self.tels[tel_id] != other.tels[tel_id]: return False for tel_id in self.tels.keys(): if np.any(self.positions[tel_id] != other.positions[tel_id]): return False return True
[docs] def to_hdf(self, h5file, overwrite=False, mode="a"): """write the SubarrayDescription Parameters ---------- h5file : str, bytes, path or tables.File Path or already opened tables.File with write permission overwrite : False If the output path already contains a subarray, by default an error will be raised. Set ``overwrite=True`` to overwrite an existing subarray. This does not affect other content of the file. Use ``mode="w"`` to completely overwrite the output path. mode : str If h5file is not an already opened file, the output file will be opened with the given mode. Must be a mode that enables writing. """ # here to prevent circular import from ..io import write_table with ExitStack() as stack: if not isinstance(h5file, tables.File): h5file = stack.enter_context(tables.open_file(h5file, mode=mode)) if "/configuration/instrument/subarray" in h5file.root and not overwrite: raise IOError( "File already contains a SubarrayDescription and overwrite=False" ) subarray_table = self.to_table(kind="subarray") subarray_table.meta["name"] = self.name if self.reference_location is not None: # change FITS convention to something better fitting HDF for direction in ("X", "Y", "Z"): fits_key = f"OBSGEO-{direction}" hdf_key = "reference_itrs_" + direction.lower() subarray_table.meta[hdf_key] = subarray_table.meta.pop(fits_key) write_table( subarray_table, h5file, path="/configuration/instrument/subarray/layout", overwrite=overwrite, ) write_table( self.to_table(kind="optics"), h5file, path="/configuration/instrument/telescope/optics", overwrite=overwrite, ) for i, camera in enumerate(self.camera_types): write_table( camera.geometry.to_table(), h5file, path=f"/configuration/instrument/telescope/camera/geometry_{i}", overwrite=overwrite, ) write_table( camera.readout.to_table(), h5file, path=f"/configuration/instrument/telescope/camera/readout_{i}", overwrite=overwrite, )
[docs] @classmethod def from_hdf(cls, path, focal_length_choice=FocalLengthKind.EFFECTIVE): # here to prevent circular import from ..io import read_table if isinstance(focal_length_choice, str): focal_length_choice = FocalLengthKind[focal_length_choice.upper()] layout = read_table( path, "/configuration/instrument/subarray/layout", table_cls=QTable ) version = layout.meta.get("TAB_VER") if version not in cls.COMPATIBLE_VERSIONS: raise IOError(f"Unsupported version of subarray table: {version}") cameras = {} # backwards compatibility for older tables, index is the name if "camera_index" not in layout.colnames: layout["camera_index"] = layout["camera_type"] for idx in set(layout["camera_index"]): geometry = CameraGeometry.from_table( read_table( path, f"/configuration/instrument/telescope/camera/geometry_{idx}" ) ) readout = CameraReadout.from_table( read_table( path, f"/configuration/instrument/telescope/camera/readout_{idx}" ) ) cameras[idx] = CameraDescription( name=geometry.name, readout=readout, geometry=geometry ) optics_table = read_table( path, "/configuration/instrument/telescope/optics", table_cls=QTable ) optics_version = optics_table.meta.get("TAB_VER") if optics_version not in OpticsDescription.COMPATIBLE_VERSIONS: raise IOError(f"Unsupported version of optics table: {optics_version}") # for backwards compatibility # if optics_index not in table, guess via telescope_description string # might not result in correct array when there are duplicated telescope_description # strings if "optics_index" not in layout.colnames: descriptions = list(optics_table["description"]) layout["optics_index"] = [ descriptions.index(tel) for tel in layout["tel_description"] ] if len(descriptions) != len(set(descriptions)): warnings.warn( "Array contains different telescopes with the same description string" " representation, optics descriptions will be incorrect for some of the telescopes." " Reprocessing the data with ctapipe >= 0.12 will fix this problem." ) optic_descriptions = [ OpticsDescription( name=row["optics_name"], size_type=row["size_type"], reflector_shape=row["reflector_shape"], n_mirrors=row["n_mirrors"], equivalent_focal_length=row["equivalent_focal_length"], effective_focal_length=row["effective_focal_length"], mirror_area=row["mirror_area"], n_mirror_tiles=row["n_mirror_tiles"], ) for row in optics_table ] # give correct frame for the camera to each telescope telescope_descriptions = {} for row in layout: # copy to support different telescopes with same camera geom camera = copy(cameras[row["camera_index"]]) optics = optic_descriptions[row["optics_index"]] if focal_length_choice is FocalLengthKind.EFFECTIVE: focal_length = optics.effective_focal_length if np.isnan(focal_length.value): raise RuntimeError( "`focal_length_choice` was set to 'EFFECTIVE', but the" " effective focal length was not present in the file. " " Set `focal_length_choice='EQUIVALENT'` or make sure" " input files contain the effective focal length" ) elif focal_length_choice is FocalLengthKind.EQUIVALENT: focal_length = optics.equivalent_focal_length else: raise ValueError(f"Invalid focal length choice: {focal_length_choice}") camera.geometry.frame = CameraFrame(focal_length=focal_length) telescope_descriptions[row["tel_id"]] = TelescopeDescription( name=row["name"], optics=optics, camera=camera ) positions = np.column_stack([layout[f"pos_{c}"] for c in "xyz"]) reference_location = None name = layout.meta.get("SUBARRAY", "Unknown") if "reference_itrs_x" in layout.meta: reference_location = EarthLocation( x=layout.meta["reference_itrs_x"] * u.m, y=layout.meta["reference_itrs_y"] * u.m, z=layout.meta["reference_itrs_z"] * u.m, ) return cls( name=name, tel_positions={ tel_id: pos for tel_id, pos in zip(layout["tel_id"], positions) }, tel_descriptions=telescope_descriptions, reference_location=reference_location, )
[docs] @staticmethod def read(path, **kwargs): """Read subarray from path This uses the `~ctapipe.io.EventSource` mechanism, so it should be able to read a subarray from any file supported by ctapipe or an installed io plugin. kwargs are passed to the event source """ # here to prevent circular import from ..io import EventSource with EventSource(path, **kwargs) as s: return s.subarray