Source code for ctapipe.instrument.subarray

"""
Description of Arrays or Subarrays of telescopes
"""
from typing import Dict, List, Union
from pathlib import Path
import warnings

import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.table import QTable, Table
from astropy.utils import lazyproperty
import tables
from copy import copy
from itertools import groupby

import ctapipe

from ..coordinates import GroundFrame, CameraFrame
from .telescope import TelescopeDescription
from .camera import CameraDescription, CameraReadout, CameraGeometry
from .optics import OpticsDescription


__all__ = ["SubarrayDescription"]


def _group_consecutives(sequence):
    """
    Turn consequtive lists into ranges (used in SubarrayDescription.info())

    from https://codereview.stackexchange.com/questions/214820/codewars-range-extraction
    """
    for _, g in groupby(enumerate(sequence), lambda i_x: i_x[0] - i_x[1]):
        r = [x for _, x in g]
        if len(r) > 2:
            yield f"{r[0]}-{r[-1]}"
        else:
            yield from map(str, r)


def _range_extraction(sequence):
    return ",".join(_group_consecutives(sequence))


[docs]class SubarrayDescription: """ Collects the `~ctapipe.instrument.TelescopeDescription` of all telescopes along with their positions on the ground. Parameters ---------- name: str name of this subarray tel_positions: Dict[Array] dict of x,y,z telescope positions on the ground by tel_id. These are converted internally to a coordinate in the `~ctapipe.coordinates.GroundFrame` tel_descriptions: Dict[TelescopeDescription] dict of TelescopeDescriptions by tel_id Attributes ---------- name: str name of subarray tel_coords: astropy.coordinates.SkyCoord coordinates of all telescopes tels: dict of TelescopeDescription for each telescope in the subarray tel_ids: np.ndarray array of tel_ids tel_indices: dict dict mapping tel_id to index in array attributes """ def __init__(self, name, tel_positions=None, tel_descriptions=None): self.name = name self.positions = tel_positions or dict() self.tels: Dict[int, TelescopeDescription] = tel_descriptions or dict() if self.positions.keys() != self.tels.keys(): raise ValueError("Telescope ids in positions and descriptions do not match") def __str__(self): return self.name def __repr__(self): return "{}(name='{}', num_tels={})".format( self.__class__.__name__, self.name, self.num_tels ) @property def tel(self): """ for backward compatibility""" return self.tels @property def num_tels(self): """number of telescopes in this subarray""" return len(self.tels) def __len__(self): return len(self.tels)
[docs] def info(self, printer=print): """ print descriptive info about subarray """ printer(f"Subarray : {self.name}") printer(f"Num Tels : {self.num_tels}") printer(f"Footprint: {self.footprint:.2f}") printer("") # print the per-telescope-type informatino: n_tels = {} tel_ids = {} for tel_type in self.telescope_types: ids = self.get_tel_ids_for_type(tel_type) tel_ids[str(tel_type)] = _range_extraction(ids) n_tels[str(tel_type)] = len(ids) out_table = Table( { "Type": list(n_tels.keys()), "Count": list(n_tels.values()), "Tel IDs": list(tel_ids.values()), } ) out_table["Tel IDs"].format = "<s" for line in str(out_table).split("\n"): printer(line)
@lazyproperty def tel_coords(self): """ returns telescope positions as astropy.coordinates.SkyCoord""" pos_x = np.array([p[0].to("m").value for p in self.positions.values()]) * u.m pos_y = np.array([p[1].to("m").value for p in self.positions.values()]) * u.m pos_z = np.array([p[2].to("m").value for p in self.positions.values()]) * u.m return SkyCoord(x=pos_x, y=pos_y, z=pos_z, frame=GroundFrame()) @lazyproperty def tel_ids(self): """ telescope IDs as an array""" return np.array(list(self.tel.keys())) @lazyproperty def tel_indices(self): """returns dict mapping tel_id to tel_index, useful for unpacking lists based on tel_ids into fixed-length arrays""" return {tel_id: ii for ii, tel_id in enumerate(self.tels.keys())} @lazyproperty def tel_index_array(self): """ returns an expanded array that maps tel_id to tel_index. I.e. for a given telescope, this array maps the tel_id to a flat index starting at 0 for the first telescope. ``tel_index = tel_id_to_index_array[tel_id]`` If the tel_ids are not contiguous, gaps will be filled in by -1. For a more compact representation use the `tel_indices` """ idx = np.zeros(np.max(self.tel_ids) + 1, dtype=int) - 1 # start with -1 for key, val in self.tel_indices.items(): idx[key] = val return idx
[docs] def tel_ids_to_indices(self, tel_ids): """maps a telescope id (or array of them) to flat indices Parameters ---------- tel_ids : int or List[int] array of tel IDs Returns ------- np.array: array of corresponding tel indices """ tel_ids = np.array(tel_ids, dtype=int, copy=False).ravel() return self.tel_index_array[tel_ids]
[docs] def tel_ids_to_mask(self, tel_ids): """Convert a list of telescope ids to a boolean mask of length ``num_tels`` where the **index** of the telescope is set to ``True`` for each tel_id in tel_ids Parameters ---------- tel_ids : int or List[int] array of tel IDs Returns ------- np.array[dtype=bool]: Boolean array of length ``num_tels`` with indices of the telescopes in ``tel_ids`` set to True. """ mask = np.zeros(self.num_tels, dtype=bool) indices = self.tel_ids_to_indices(tel_ids) mask[indices] = True return mask
[docs] def tel_mask_to_tel_ids(self, tel_mask): """ Convert a boolean mask of selected telescopes to a list of tel_ids. Parameters ---------- tel_mask: array-like Boolean array of length ``num_tels`` with indices of the telescopes in ``tel_ids`` set to True. Returns ------- np.array: Array of selected tel_ids """ return self.tel_ids[tel_mask]
@property def footprint(self): """area of smallest circle containing array on ground""" pos_x = self.tel_coords.x pos_y = self.tel_coords.y return (np.hypot(pos_x, pos_y).max() ** 2 * np.pi).to("km^2")
[docs] def to_table(self, kind="subarray"): """ export SubarrayDescription information as an `astropy.table.Table` Parameters ---------- kind: str which table to generate (subarray or optics) """ meta = { "ORIGIN": "ctapipe.instrument.SubarrayDescription", "SUBARRAY": self.name, "SOFT_VER": ctapipe.__version__, "TAB_TYPE": kind, } if kind == "subarray": unique_types = self.telescope_types ids = list(self.tels.keys()) descs = [str(t) for t in self.tels.values()] tel_names = [t.name for t in self.tels.values()] tel_types = [t.type for t in self.tels.values()] cam_types = [t.camera.camera_name for t in self.tels.values()] optics_index = [unique_types.index(t) for t in self.tels.values()] camera_index = [ self.camera_types.index(t.camera) for t in self.tels.values() ] tel_coords = self.tel_coords tab = Table( dict( tel_id=np.array(ids, dtype=np.short), pos_x=tel_coords.x, pos_y=tel_coords.y, pos_z=tel_coords.z, name=tel_names, type=tel_types, camera_type=cam_types, camera_index=camera_index, optics_index=optics_index, tel_description=descs, ) ) tab.meta["TAB_VER"] = "1.0" elif kind == "optics": unique_types = self.telescope_types mirror_area = u.Quantity( [t.optics.mirror_area.to_value(u.m ** 2) for t in unique_types], u.m ** 2, ) focal_length = u.Quantity( [t.optics.equivalent_focal_length.to_value(u.m) for t in unique_types], u.m, ) cols = { "description": [str(t) for t in unique_types], "name": [t.name for t in unique_types], "type": [t.type for t in unique_types], "mirror_area": mirror_area, "num_mirrors": [t.optics.num_mirrors for t in unique_types], "num_mirror_tiles": [t.optics.num_mirror_tiles for t in unique_types], "equivalent_focal_length": focal_length, } tab = Table(cols) tab.meta["TAB_VER"] = "2.0" else: raise ValueError(f"Table type '{kind}' not known") tab.meta.update(meta) return tab
[docs] def select_subarray(self, tel_ids, name=None): """ return a new SubarrayDescription that is a sub-array of this one Parameters ---------- tel_ids: list(int) list of telescope IDs to include in the new subarray name: str name of new sub-selection Returns ------- SubarrayDescription """ tel_positions = {tid: self.positions[tid] for tid in tel_ids} tel_descriptions = {tid: self.tel[tid] for tid in tel_ids} if not name: tel_ids = sorted(tel_ids) name = self.name + "_" + _range_extraction(tel_ids) newsub = SubarrayDescription( name, tel_positions=tel_positions, tel_descriptions=tel_descriptions ) return newsub
[docs] def peek(self): """ Draw a quick matplotlib plot of the array """ from matplotlib import pyplot as plt from astropy.visualization import quantity_support types = set(self.tels.values()) tab = self.to_table() plt.figure(figsize=(8, 8)) with quantity_support(): for tel_type in types: tels = tab[tab["tel_description"] == str(tel_type)]["tel_id"] sub = self.select_subarray(tels, name=tel_type) tel_coords = sub.tel_coords radius = np.array( [ np.sqrt(tel.optics.mirror_area / np.pi).value for tel in sub.tels.values() ] ) plt.scatter( tel_coords.x, tel_coords.y, s=radius * 8, alpha=0.5, label=tel_type ) plt.legend(loc="best") plt.title(self.name) plt.tight_layout()
@lazyproperty def telescope_types(self) -> List[TelescopeDescription]: """ list of telescope types in the array""" return list({tel for tel in self.tel.values()}) @lazyproperty def camera_types(self) -> List[CameraDescription]: """ list of camera types in the array """ return list({tel.camera for tel in self.tel.values()}) @lazyproperty def optics_types(self) -> List[OpticsDescription]: """ list of optics types in the array """ return list({tel.optics for tel in self.tel.values()})
[docs] def get_tel_ids_for_type(self, tel_type): """ return list of tel_ids that have the given tel_type Parameters ---------- tel_type: str or TelescopeDescription telescope type string (e.g. 'MST:NectarCam') """ if isinstance(tel_type, TelescopeDescription): tel_str = str(tel_type) else: tel_str = tel_type return [id for id, descr in self.tels.items() if str(descr) == tel_str]
[docs] def get_tel_ids( self, telescopes: List[Union[int, str, TelescopeDescription]] ) -> List[int]: """ Convert a list of telescope ids and telescope descriptions to a list of unique telescope ids. Parameters ---------- telescopes: List[Union[int, str, TelescopeDescription]] List of Telescope IDs and descriptions. Supported inputs for telescope descriptions are instances of `~ctapipe.instrument.TelescopeDescription` as well as their string representation. Returns ------- tel_ids: List[int] List of unique telescope ids matching ``telescopes`` """ ids = set() valid_tel_types = {str(tel_type) for tel_type in self.telescope_types} for telescope in telescopes: if isinstance(telescope, (int, np.integer)): ids.add(telescope) if isinstance(telescope, str) and telescope not in valid_tel_types: raise ValueError("Invalid telescope type input.") ids.update(self.get_tel_ids_for_type(telescope)) return sorted(ids)
def __eq__(self, other): if not isinstance(other, self.__class__): return False if self.name != other.name: return False if self.tels.keys() != other.tels.keys(): return False if self.positions.keys() != other.positions.keys(): return False for tel_id in self.tels.keys(): if self.tels[tel_id] != other.tels[tel_id]: return False for tel_id in self.tels.keys(): if np.any(self.positions[tel_id] != other.positions[tel_id]): return False return True
[docs] def to_hdf(self, output_path, overwrite=False): """write the SubarrayDescription Parameters ---------- subarray : ctapipe.instrument.SubarrayDescription subarray description """ serialize_meta = True output_path = Path(output_path) if output_path.suffix not in (".h5", ".hdf", ".hdf5"): raise ValueError( f"This function can only write to hdf files, got {output_path.suffix}" ) self.to_table().write( output_path, path="/configuration/instrument/subarray/layout", serialize_meta=serialize_meta, append=True, overwrite=overwrite, ) self.to_table(kind="optics").write( output_path, path="/configuration/instrument/telescope/optics", append=True, serialize_meta=serialize_meta, overwrite=overwrite, ) for i, camera in enumerate(self.camera_types): camera.geometry.to_table().write( output_path, path=f"/configuration/instrument/telescope/camera/geometry_{i}", append=True, serialize_meta=serialize_meta, overwrite=overwrite, ) camera.readout.to_table().write( output_path, path=f"/configuration/instrument/telescope/camera/readout_{i}", append=True, serialize_meta=serialize_meta, overwrite=overwrite, ) with tables.open_file(output_path, mode="r+") as f: f.root.configuration.instrument.subarray._v_attrs.name = self.name
[docs] @classmethod def from_hdf(cls, path): layout = QTable.read(path, path="/configuration/instrument/subarray/layout") cameras = {} # backwards compatibility for older tables, index is the name if "camera_index" not in layout.colnames: layout["camera_index"] = layout["camera_type"] for idx in set(layout["camera_index"]): geometry = CameraGeometry.from_table( Table.read( path, path=f"/configuration/instrument/telescope/camera/geometry_{idx}", ) ) readout = CameraReadout.from_table( Table.read( path, path=f"/configuration/instrument/telescope/camera/readout_{idx}", ) ) cameras[idx] = CameraDescription( camera_name=geometry.camera_name, readout=readout, geometry=geometry ) optics_table = QTable.read( path, path="/configuration/instrument/telescope/optics" ) # for backwards compatibility # if optics_index not in table, guess via telescope_description string # might not result in correct array when there are duplicated telescope_description # strings if "optics_index" not in layout.colnames: descriptions = list(optics_table["description"]) layout["optics_index"] = [ descriptions.index(tel) for tel in layout["tel_description"] ] if len(descriptions) != len(set(descriptions)): warnings.warn( "Array contains different telescopes with the same description string" " representation, optics descriptions will be incorrect for some of the telescopes." " Reprocessing the data with ctapipe >= 0.12 will fix this problem." ) optic_descriptions = [ OpticsDescription( row["name"], num_mirrors=row["num_mirrors"], equivalent_focal_length=row["equivalent_focal_length"], mirror_area=row["mirror_area"], num_mirror_tiles=row["num_mirror_tiles"], ) for row in optics_table ] # give correct frame for the camera to each telescope telescope_descriptions = {} for row in layout: # copy to support different telescopes with same camera geom camera = copy(cameras[row["camera_index"]]) optics = optic_descriptions[row["optics_index"]] focal_length = optics.equivalent_focal_length camera.geometry.frame = CameraFrame(focal_length=focal_length) telescope_descriptions[row["tel_id"]] = TelescopeDescription( name=row["name"], tel_type=row["type"], optics=optics, camera=camera ) positions = np.column_stack([layout[f"pos_{c}"] for c in "xyz"]) with tables.open_file(path, mode="r") as f: attrs = f.root.configuration.instrument.subarray._v_attrs if "name" in attrs: name = str(attrs.name) else: name = "Unknown" return cls( name=name, tel_positions={ tel_id: pos for tel_id, pos in zip(layout["tel_id"], positions) }, tel_descriptions=telescope_descriptions, )