Source code for ctapipe.reco.hillas_reconstructor

"""
Line-intersection-based fitting for reconstruction of direction
and core position of a shower.
"""

from ctapipe.reco.reco_algorithms import (
    Reconstructor,
    InvalidWidthException,
    TooFewTelescopesException,
)
from ctapipe.containers import (
    ReconstructedGeometryContainer,
    CameraHillasParametersContainer,
)
from itertools import combinations

from ctapipe.coordinates import (
    CameraFrame,
    TelescopeFrame,
    GroundFrame,
    TiltedGroundFrame,
    project_to_ground,
    MissingFrameAttributeWarning,
)
from astropy.coordinates import (
    SkyCoord,
    AltAz,
    spherical_to_cartesian,
    cartesian_to_spherical,
)
import warnings

import numpy as np

from astropy import units as u

__all__ = ["HillasPlane", "HillasReconstructor"]


def angle(v1, v2):
    """ computes the angle between two vectors
        assuming cartesian coordinates

    Parameters
    ----------
    v1 : numpy array
    v2 : numpy array

    Returns
    -------
    the angle between v1 and v2 as a dimensioned astropy quantity
    """
    norm = np.linalg.norm(v1) * np.linalg.norm(v2)
    return np.arccos(np.clip(v1.dot(v2) / norm, -1.0, 1.0))


def normalise(vec):
    """ Sets the length of the vector to 1
        without changing its direction

    Parameters
    ----------
    vec : numpy array

    Returns
    -------
    numpy array with the same direction but length of 1
    """
    try:
        return vec / np.linalg.norm(vec)
    except ZeroDivisionError:
        return vec


def line_line_intersection_3d(uvw_vectors, origins):
    """
    Intersection of many lines in 3d.
    See https://en.wikipedia.org/wiki/Line%E2%80%93line_intersection
    """
    C = []
    S = []
    for n, pos in zip(uvw_vectors, origins):
        n = n.reshape((3, 1))
        norm_matrix = (n @ n.T) - np.eye(3)
        C.append(norm_matrix @ pos)
        S.append(norm_matrix)

    S = np.array(S).sum(axis=0)
    C = np.array(C).sum(axis=0)
    return np.linalg.inv(S) @ C


[docs]class HillasPlane: """ a tiny helper class to collect some parameters for each great great circle Stores some vectors a, b, and c These vectors are euclidean [x, y, z] where positive z values point towards the sky and x and y are parallel to the ground. """ def __init__(self, p1, p2, telescope_position, weight=1): r"""The constructor takes two coordinates in the horizontal frame (alt, az) which define a plane perpendicular to the camera. Parameters ----------- p1: astropy.coordinates.SkyCoord One of two direction vectors which define the plane. This coordinate has to be defined in the ctapipe.coordinates.AltAz p2: astropy.coordinates.SkyCoord One of two direction vectors which define the plane. This coordinate has to be defined in the ctapipe.coordinates.AltAz telescope_position: np.array(3) Position of the telescope on the ground weight : float, optional weight of this plane for later use during the reconstruction Notes ----- c: numpy.ndarray(3) :math:`\vec c = (\vec a \times \vec b) \times \vec a` :math:`\rightarrow` a and c form an orthogonal base for the great circle (only orthonormal if a and b are of unit-length) norm: numpy.ndarray(3) normal vector of the circle's plane, perpendicular to a, b and c """ self.pos = telescope_position # astropy's coordinates system rotates counter clockwise. Apparently we assume it to # be clockwise self.a = np.array(spherical_to_cartesian(1, p1.alt, -p1.az)).ravel() self.b = np.array(spherical_to_cartesian(1, p2.alt, -p2.az)).ravel() # a and c form an orthogonal basis for the great circle # not really necessary since the norm can be calculated # with a and b just as well self.c = np.cross(np.cross(self.a, self.b), self.a) # normal vector for the plane defined by the great circle self.norm = normalise(np.cross(self.a, self.c)) # some weight for this circle # (put e.g. uncertainty on the Hillas parameters # or number of PE in here) self.weight = weight
[docs]class HillasReconstructor(Reconstructor): """ class that reconstructs the direction of an atmospheric shower using a simple hillas parametrisation of the camera images it provides a direction estimate in two steps and an estimate for the shower's impact position on the ground. so far, it does neither provide an energy estimator nor an uncertainty on the reconstructed parameters """ def __init__(self, subarray, config=None, parent=None, **kwargs): super().__init__(config=config, parent=parent, **kwargs) self.subarray = subarray self._cam_radius_m = { cam: cam.geometry.guess_radius() for cam in subarray.camera_types } telframe = TelescopeFrame() self._cam_radius_deg = {} for cam, radius_m in self._cam_radius_m.items(): point_cam = SkyCoord(0 * u.m, radius_m, frame=cam.geometry.frame) point_tel = point_cam.transform_to(telframe) self._cam_radius_deg[cam] = point_tel.fov_lon
[docs] def __call__(self, event): """ Perform the full shower geometry reconstruction on the input event. Parameters ---------- event : container `ctapipe.containers.ArrayEventContainer` """ # Read only valid HillasContainers hillas_dict = { tel_id: dl1.parameters.hillas for tel_id, dl1 in event.dl1.tel.items() if np.isfinite(dl1.parameters.hillas.intensity) } # Due to tracking the pointing of the array will never be a constant array_pointing = SkyCoord( az=event.pointing.array_azimuth, alt=event.pointing.array_altitude, frame=AltAz(), ) telescope_pointings = { tel_id: SkyCoord( alt=event.pointing.tel[tel_id].altitude, az=event.pointing.tel[tel_id].azimuth, frame=AltAz(), ) for tel_id in event.dl1.tel.keys() } try: result = self._predict( event, hillas_dict, self.subarray, array_pointing, telescope_pointings ) except (TooFewTelescopesException, InvalidWidthException): result = ReconstructedGeometryContainer() event.dl2.stereo.geometry["HillasReconstructor"] = result
def _predict( self, event, hillas_dict, subarray, array_pointing, telescopes_pointings ): """ The function you want to call for the reconstruction of the event. It takes care of setting up the event and consecutively calls the functions for the direction and core position reconstruction. Shower parameters not reconstructed by this class are set to np.nan Parameters ----------- hillas_dict: dict dictionary with telescope IDs as key and HillasParametersContainer instances as values inst : ctapipe.io.InstrumentContainer instrumental description array_pointing: SkyCoord[AltAz] pointing direction of the array telescopes_pointings: dict[SkyCoord[AltAz]] dictionary of pointing direction per each telescope Raises ------ TooFewTelescopesException if len(hillas_dict) < 2 InvalidWidthException if any width is np.nan or 0 """ # filter warnings for missing obs time. this is needed because MC data has no obs time warnings.filterwarnings(action="ignore", category=MissingFrameAttributeWarning) # Here we perform some basic quality checks BEFORE applying reconstruction # This should be substituted by a DL1 QualityQuery specific to this # reconstructor # stereoscopy needs at least two telescopes if len(hillas_dict) < 2: raise TooFewTelescopesException( "need at least two telescopes, have {}".format(len(hillas_dict)) ) # check for np.nan or 0 width's as these screw up weights if any([np.isnan(hillas_dict[tel]["width"].value) for tel in hillas_dict]): raise InvalidWidthException( "A HillasContainer contains an ellipse of width==np.nan" ) if any([hillas_dict[tel]["width"].value == 0 for tel in hillas_dict]): raise InvalidWidthException( "A HillasContainer contains an ellipse of width==0" ) hillas_planes, psi_core_dict = self.initialize_hillas_planes( hillas_dict, subarray, telescopes_pointings, array_pointing ) # algebraic direction estimate direction, err_est_dir = self.estimate_direction(hillas_planes) # array pointing is needed to define the tilted frame core_pos = self.estimate_core_position( event, hillas_dict, array_pointing, psi_core_dict, hillas_planes ) # container class for reconstructed showers _, lat, lon = cartesian_to_spherical(*direction) # estimate max height of shower h_max = self.estimate_h_max(hillas_planes) # astropy's coordinates system rotates counter-clockwise. # Apparently we assume it to be clockwise. # that's why lon get's a sign result = ReconstructedGeometryContainer( alt=lat, az=-lon, core_x=core_pos[0], core_y=core_pos[1], tel_ids=[h for h in hillas_dict.keys()], average_intensity=np.mean([h.intensity for h in hillas_dict.values()]), is_valid=True, alt_uncert=err_est_dir, az_uncert=err_est_dir, h_max=h_max, ) return result
[docs] def initialize_hillas_planes( self, hillas_dict, subarray, telescopes_pointings, array_pointing ): """ Creates a dictionary of `.HillasPlane` from a dictionary of hillas parameters Parameters ---------- hillas_dict : dictionary dictionary of hillas moments subarray : ctapipe.instrument.SubarrayDescription subarray information telescopes_pointings: dictionary dictionary of pointing direction per each telescope array_pointing: SkyCoord[AltAz] pointing direction of the array Notes ----- The part of the algorithm taking into account divergent pointing mode and the correction to the psi angle is explained in [gasparetto]_ section 7.1.4. """ hillas_planes = {} # dictionary to store the telescope-wise image directions # to be projected on the ground and corrected in case of mispointing corrected_angle_dict = {} k = next(iter(telescopes_pointings)) horizon_frame = telescopes_pointings[k].frame for tel_id, moments in hillas_dict.items(): camera = self.subarray.tel[tel_id].camera pointing = SkyCoord( alt=telescopes_pointings[tel_id].alt, az=telescopes_pointings[tel_id].az, frame=horizon_frame, ) focal_length = subarray.tel[tel_id].optics.equivalent_focal_length if isinstance( moments, CameraHillasParametersContainer ): # Image parameters are in CameraFrame # we just need any point on the main shower axis a bit away from the cog p2_x = moments.x + 0.1 * self._cam_radius_m[camera] * np.cos( moments.psi ) p2_y = moments.y + 0.1 * self._cam_radius_m[camera] * np.sin( moments.psi ) camera_frame = CameraFrame( focal_length=focal_length, telescope_pointing=pointing ) cog_coord = SkyCoord(x=moments.x, y=moments.y, frame=camera_frame) p2_coord = SkyCoord(x=p2_x, y=p2_y, frame=camera_frame) else: # Image parameters are already in TelescopeFrame # we just need any point on the main shower axis a bit away from the cog p2_delta_alt = moments.fov_lat + 0.1 * self._cam_radius_deg[ camera ] * np.sin(moments.psi) p2_delta_az = moments.fov_lon + 0.1 * self._cam_radius_deg[ camera ] * np.cos(moments.psi) telescope_frame = TelescopeFrame(telescope_pointing=pointing) cog_coord = SkyCoord( fov_lon=moments.fov_lon, fov_lat=moments.fov_lat, frame=telescope_frame, ) p2_coord = SkyCoord( fov_lon=p2_delta_az, fov_lat=p2_delta_alt, frame=telescope_frame ) # Coordinates in the sky cog_coord = cog_coord.transform_to(horizon_frame) p2_coord = p2_coord.transform_to(horizon_frame) # re-project from sky to a "fake"-parallel-pointing telescope # then recalculate the psi angle in order to be able to project # it on the ground # This is done to bypass divergent pointing or mispointing camera_frame_parallel = CameraFrame( focal_length=focal_length, telescope_pointing=array_pointing ) cog_sky_to_parallel = cog_coord.transform_to(camera_frame_parallel) p2_sky_to_parallel = p2_coord.transform_to(camera_frame_parallel) angle_psi_corr = np.arctan2( cog_sky_to_parallel.y - p2_sky_to_parallel.y, cog_sky_to_parallel.x - p2_sky_to_parallel.x, ) corrected_angle_dict[tel_id] = angle_psi_corr circle = HillasPlane( p1=cog_coord, p2=p2_coord, telescope_position=subarray.positions[tel_id], weight=moments.intensity * (moments.length / moments.width), ) hillas_planes[tel_id] = circle return hillas_planes, corrected_angle_dict
[docs] def estimate_direction(self, hillas_planes): """calculates the origin of the gamma as the weighted average direction of the intersections of all hillas planes Returns ------- gamma : shape (3) numpy array direction of origin of the reconstructed shower as a 3D vector crossings : shape (n,3) list an error estimate """ crossings = [] for perm in combinations(hillas_planes.values(), 2): n1, n2 = perm[0].norm, perm[1].norm # cross product automatically weighs in the angle between # the two vectors: narrower angles have less impact, # perpendicular vectors have the most crossing = np.cross(n1, n2) # two great circles cross each other twice (one would be # the origin, the other one the direction of the gamma) it # doesn't matter which we pick but it should at least be # consistent: make sure to always take the "upper" solution if crossing[2] < 0: crossing *= -1 crossings.append(crossing * perm[0].weight * perm[1].weight) result = normalise(np.sum(crossings, axis=0)) off_angles = [angle(result, cross) for cross in crossings] * u.rad err_est_dir = np.average( off_angles, weights=[len(cross) for cross in crossings] ) return result, err_est_dir
[docs] def estimate_core_position( self, event, hillas_dict, array_pointing, corrected_angle_dict, hillas_planes ): """ Estimate the core position by intersection the major ellipse lines of each telescope. Parameters ----------- hillas_dict: dict[HillasContainer] dictionary of hillas moments array_pointing: SkyCoord[HorizonFrame] Pointing direction of the array Returns ----------- core_x: u.Quantity estimated x position of impact core_y: u.Quantity estimated y position of impact Notes ----- The part of the algorithm taking into account divergent pointing mode and the usage of a corrected psi angle is explained in [gasparetto]_ section 7.1.4. """ # Since psi has been recalculated in the fake CameraFrame # it doesn't need any further corrections because it is now independent # of both pointing and cleaning/parametrization frame. # This angle will be used to visualize the telescope-wise directions of # the shower core the ground. psi_core = corrected_angle_dict # Record these values for tel_id in hillas_dict.keys(): event.dl1.tel[tel_id].parameters.core.psi = psi_core[tel_id] # Transform them for numpy psi = u.Quantity(list(psi_core.values())) # Estimate the position of the shower's core # from the TiltedFram to the GroundFrame z = np.zeros(len(psi)) uvw_vectors = np.column_stack([np.cos(psi).value, np.sin(psi).value, z]) tilted_frame = TiltedGroundFrame(pointing_direction=array_pointing) ground_frame = GroundFrame() positions = [ ( SkyCoord(*plane.pos, frame=ground_frame) .transform_to(tilted_frame) .cartesian.xyz ) for plane in hillas_planes.values() ] core_position = line_line_intersection_3d(uvw_vectors, positions) core_pos_tilted = SkyCoord( x=core_position[0] * u.m, y=core_position[1] * u.m, frame=tilted_frame ) core_pos = project_to_ground(core_pos_tilted) return core_pos.x, core_pos.y
[docs] def estimate_h_max(self, hillas_planes): """ Estimate the max height by intersecting the lines of the cog directions of each telescope. Returns ----------- astropy.unit.Quantity the estimated max height """ uvw_vectors = np.array([plane.a for plane in hillas_planes.values()]) positions = [plane.pos for plane in hillas_planes.values()] # not sure if its better to return the length of the vector of the z component return np.linalg.norm(line_line_intersection_3d(uvw_vectors, positions)) * u.m