Source code for ctapipe.reco.impact

#!/usr/bin/env python3
"""
Implementation of the ImPACT reconstruction algorithm
"""
import copy
from string import Template

import numpy as np
import numpy.ma as ma
from astropy import units as u
from astropy.coordinates import AltAz, SkyCoord
from scipy.stats import norm

from ctapipe.core import traits
from ctapipe.exceptions import OptionalDependencyMissing

from ..compat import COPY_IF_NEEDED
from ..containers import ReconstructedEnergyContainer, ReconstructedGeometryContainer
from ..coordinates import (
    CameraFrame,
    GroundFrame,
    NominalFrame,
    TiltedGroundFrame,
    project_to_ground,
)
from ..core import Provenance
from ..fitting import lts_linear_regression
from ..image.cleaning import dilate
from ..image.pixel_likelihood import (
    mean_poisson_likelihood_gaussian,
    neg_log_likelihood_approx,
)
from ..utils.template_network_interpolator import (
    DummyTemplateInterpolator,
    DummyTimeInterpolator,
    TemplateNetworkInterpolator,
    TimeGradientInterpolator,
)
from .impact_utilities import (
    EmptyImages,
    create_seed,
    guess_shower_depth,
    rotate_translate,
)
from .reconstructor import (
    HillasGeometryReconstructor,
    InvalidWidthException,
    ReconstructionProperty,
    TooFewTelescopesException,
)

try:
    from iminuit import Minuit
except ModuleNotFoundError:
    Minuit = None

PROV = Provenance()

INVALID_GEOMETRY = ReconstructedGeometryContainer(
    telescopes=[],
    prefix="ImPACTReconstructor",
)

INVALID_ENERGY = ReconstructedEnergyContainer(
    prefix="ImPACTReconstructor",
    telescopes=[],
)

# These are settings for the iminuit minimizer
MINUIT_ERRORDEF = 0.5  # 0.5 for a log-likelihood cost function for correct errors
MINUIT_STRATEGY = 1  # Default minimization strategy, 2 is careful, 0 is fast
MINUIT_TOLERANCE_FACTOR = 1000  # Tolerance for convergence according to EDM criterion
MIGRAD_ITERATE = 1  # Do not call migrad again if convergence was not reached
__all__ = ["ImPACTReconstructor"]


[docs] class ImPACTReconstructor(HillasGeometryReconstructor): """This class is an implementation if the impact_reco Monte Carlo Template based image fitting method from parsons14. This method uses a comparison of the predicted image from a library of image templates to perform a maximum likelihood fit for the shower axis, energy and height of maximum. Besides the image information, there is also the option to use the time gradient of the pixels across the image as additional information in the fit. This requires an additional set of templates Because this application is computationally intensive the usual advice to use astropy units for all quantities is ignored (as these slow down some computations), instead units within the class are fixed: - Angular units in radians - Distance units in metres - Energy units in TeV Parameters ---------- subarray : ctapipe.instrument.SubarrayDescription The telescope subarray to use for reconstruction atmosphere_profile : ctapipe.atmosphere.AtmosphereDensityProfile Density vs. altitude profile of the local atmosphere dummy_reconstructor : bool, optional Option to use a set of dummy templates. This can be used for testing the algorithm, but for any actual reconstruction should be set to its default False References ---------- .. [parsons14] Parsons & Hinton, Astroparticle Physics 56 (2014), pp. 26-34 """ use_time_gradient = traits.Bool( default_value=False, help="Use time gradient in ImPACT reconstruction. Requires an extra set of time gradient templates", ).tag(config=True) root_dir = traits.Unicode( default_value=".", help="Directory containing ImPACT tables" ).tag(config=True) # For likelihood calculation we need the with of the # pedestal distribution for each pixel # currently this is not available from the calibration, # so for now lets hard code it in a dict ped_table = { "LSTCam": 1.4, "NectarCam": 1.3, "FlashCam": 1.3, "SST-Camera": 0.5, "CHEC": 0.5, "ASTRICam": 0.5, "dummy": 0.01, "UNKNOWN-960PX": 1.0, } spe = 0.6 # Also hard code single p.e. distribution width property = ReconstructionProperty.ENERGY | ReconstructionProperty.GEOMETRY def __init__( self, subarray, atmosphere_profile, dummy_reconstructor=False, **kwargs ): if Minuit is None: raise OptionalDependencyMissing("iminuit") from None if atmosphere_profile is None: raise TypeError( "Argument 'atmosphere_profile' can not be 'None' for ImPACTReconstructor" ) super().__init__(subarray, atmosphere_profile, **kwargs) # First we create a dictionary of image template interpolators # for each telescope type # self.priors = prior # String templates for loading ImPACT templates self.amplitude_template = Template("${base}/${camera}.template.gz") self.time_template = Template("${base}/${camera}_time.template.gz") # Next we need the position, area and amplitude from each pixel in the event # making this a class member makes passing them around much easier self.pixel_x, self.pixel_y = None, None self.image, self.time = None, None self.tel_types, self.tel_id = None, None # We also need telescope positions self.tel_pos_x, self.tel_pos_y = None, None # And the peak of the images self.peak_x, self.peak_y, self.peak_amp = None, None, None self.hillas_parameters, self.ped = None, None self.prediction = dict() self.time_prediction = dict() self.array_direction = None self.nominal_frame = None self.dummy_reconstructor = dummy_reconstructor
[docs] def __call__(self, event): """ Perform the full shower geometry reconstruction on the input event. Parameters ---------- event : container `ctapipe.containers.ArrayEventContainer` """ try: hillas_dict = self._create_hillas_dict(event) except (TooFewTelescopesException, InvalidWidthException): event.dl2.stereo.geometry[self.__class__.__name__] = INVALID_GEOMETRY event.dl2.stereo.energy[self.__class__.__name__] = INVALID_ENERGY self._store_impact_parameter(event) return # Due to tracking the pointing of the array will never be a constant array_pointing = SkyCoord( az=event.pointing.array_azimuth, alt=event.pointing.array_altitude, frame=AltAz(), ) # And the pointing direction of the telescopes may not be the same telescope_pointings = self._get_telescope_pointings(event) # Finally get the telescope images and and the selection masks mask_dict, image_dict, time_dict = {}, {}, {} for tel_id in hillas_dict.keys(): image = event.dl1.tel[tel_id].image image_dict[tel_id] = image time_dict[tel_id] = event.dl1.tel[tel_id].peak_time mask = event.dl1.tel[tel_id].image_mask # Dilate the images around the original cleaning to help the fit for _ in range(3): mask = dilate(self.subarray.tel[tel_id].camera.geometry, mask) mask_dict[tel_id] = mask # Next, we look for geometry and energy seeds from previously applied reconstructors. # Both need to be present at elast once for ImPACT to run. reco_geom_pred = event.dl2.stereo.geometry valid_geometry_seed = False for geom_pred in reco_geom_pred.values(): if geom_pred.is_valid: valid_geometry_seed = True break reco_energy_pred = event.dl2.stereo.energy valid_energy_seed = False for E_pred in reco_energy_pred.values(): if E_pred.is_valid: valid_energy_seed = True break if valid_geometry_seed is False or valid_energy_seed is False: event.dl2.stereo.geometry[self.__class__.__name__] = INVALID_GEOMETRY event.dl2.stereo.energy[self.__class__.__name__] = INVALID_ENERGY self._store_impact_parameter(event) return shower_result, energy_result = self.predict( hillas_dict=hillas_dict, subarray=self.subarray, shower_seed=reco_geom_pred, energy_seed=reco_energy_pred, array_pointing=array_pointing, telescope_pointings=telescope_pointings, image_dict=image_dict, mask_dict=mask_dict, time_dict=time_dict, ) event.dl2.stereo.geometry[self.__class__.__name__] = shower_result event.dl2.stereo.energy[self.__class__.__name__] = energy_result self._store_impact_parameter(event)
[docs] def initialise_templates(self, tel_type): """Check if templates for a given telescope type has been initialised and if not do it and add to the dictionary Parameters ---------- tel_type: dictionary Dictionary of telescope types in event Returns ------- boolean: Confirm initialisation """ for t in tel_type: if tel_type[t] in self.prediction.keys() or tel_type[t] == "DUMMY": continue if self.dummy_reconstructor: self.prediction[tel_type[t]] = DummyTemplateInterpolator() else: filename = self.amplitude_template.substitute( base=self.root_dir, camera=tel_type[t] ) self.prediction[tel_type[t]] = TemplateNetworkInterpolator( filename, bounds=((-5, 1), (-1.5, 1.5)) ) PROV.add_input_file( filename, role="ImPACT Template file for " + tel_type[t] ) if self.use_time_gradient: if self.dummy_reconstructor: self.time_prediction[tel_type[t]] = DummyTimeInterpolator() else: filename = self.time_template.substitute( base=self.root_dir, camera=tel_type[t] ) self.time_prediction[tel_type[t]] = TimeGradientInterpolator( filename ) PROV.add_input_file( filename, role="ImPACT Time Template file for " + tel_type[t] ) return True
[docs] def get_hillas_mean(self): """This is a simple function to find the peak position of each image in an event which will be used later in the Xmax calculation. Peak is found by taking the average position of the n hottest pixels in the image. """ peak_x = np.zeros([len(self.pixel_x)]) # Create blank arrays for peaks # rather than a dict (faster) peak_y = np.zeros(peak_x.shape) peak_amp = np.zeros(peak_x.shape) # Loop over all tels to take weighted average of pixel # positions This loop could maybe be replaced by an array # operation by a numpy wizard # Maybe a vectorize? tel_num = 0 for hillas in self.hillas_parameters: peak_x[tel_num] = hillas.fov_lon.to(u.rad).value # Fill up array peak_y[tel_num] = hillas.fov_lat.to(u.rad).value peak_amp[tel_num] = hillas.intensity tel_num += 1 self.peak_x = peak_x # * unit # Add to class member self.peak_y = peak_y # * unit self.peak_amp = peak_amp
# This function would be useful elsewhere so probably be implemented in a # more general form
[docs] def get_shower_max(self, source_x, source_y, core_x, core_y, zen): """Function to calculate the depth of shower maximum geometrically under the assumption that the shower maximum lies at the brightest point of the camera image. Parameters ---------- source_x: float Event source position in nominal frame source_y: float Event source position in nominal frame core_x: float Event core position in telescope tilted frame core_y: float Event core position in telescope tilted frame zen: float Zenith angle of event Returns ------- float: Depth of maximum of air shower """ # Calculate displacement of image centroid from source position (in # rad) disp = np.sqrt((self.peak_x - source_x) ** 2 + (self.peak_y - source_y) ** 2) # Calculate impact parameter of the shower impact = np.sqrt( (self.tel_pos_x - core_x) ** 2 + (self.tel_pos_y - core_y) ** 2 ) # Distance above telescope is ratio of these two (small angle) height_tilted = ( impact / disp ) # This is in tilted frame along the telescope axis weight = np.power(self.peak_amp, 0.0) # weight average by sqrt amplitude # sqrt may not be the best option... # Take weighted mean of estimates, converted to height above ground mean_height_above_ground = np.sum( height_tilted * np.cos(zen) * weight ) / np.sum(weight) # Add on the height of the detector above sea level mean_height_asl = ( mean_height_above_ground + self.subarray.reference_location.height.to_value(u.m) ) if mean_height_asl > 100000 or np.isnan(mean_height_asl): mean_height_asl = 100000 # Lookup this height in the depth tables, the convert Hmax to Xmax slant_depth = self.atmosphere_profile.slant_depth_from_height( mean_height_asl * u.m, zen * u.rad ) return slant_depth.to_value(u.g / (u.cm * u.cm))
[docs] def image_prediction( self, tel_type, zenith, azimuth, energy, impact, x_max, pix_x, pix_y ): """Creates predicted image for the specified pixels, interpolated from the template library. Parameters ---------- tel_type: string Telescope type specifier energy: float Event energy (TeV) impact: float Impact diance of shower (metres) x_max: float Depth of shower maximum (num bins from expectation) pix_x: ndarray X coordinate of pixels pix_y: ndarray Y coordinate of pixels Returns ------- ndarray: predicted amplitude for all pixels """ return self.prediction[tel_type]( zenith, azimuth, energy, impact, x_max, pix_x, pix_y )
[docs] def predict_time(self, tel_type, zenith, azimuth, energy, impact, x_max): """Creates predicted image for the specified pixels, interpolated from the template library. Parameters ---------- tel_type: string Telescope type specifier energy: float Event energy (TeV) impact: float Impact diance of shower (metres) x_max: float Depth of shower maximum (num bins from expectation) Returns ------- ndarray: predicted amplitude for all pixels """ time = self.time_prediction[tel_type](zenith, azimuth, energy, impact, x_max) return time.T[0], time.T[1]
[docs] def get_likelihood( self, source_x, source_y, core_x, core_y, energy, x_max_scale, goodness_of_fit=False, ): """Get the likelihood that the image predicted at the given test position matches the camera image. Parameters ---------- source_x: float Source position of shower in the nominal system (in deg) source_y: float Source position of shower in the nominal system (in deg) core_x: float Core position of shower in tilted telescope system (in m) core_y: float Core position of shower in tilted telescope system (in m) energy: float Shower energy (in TeV) x_max_scale: float Scaling factor applied to geometrically calculated Xmax goodness_of_fit: boolean Determines whether expected likelihood should be subtracted from result Returns ------- float: Likelihood the model represents the camera image at this position """ if np.isnan(source_x) or np.isnan(source_y): return 1e8 # First we add units back onto everything. Currently not # handled very well, maybe in future we could just put # everything in the correct units when loading in the class # and ignore them from then on zenith = self.zenith azimuth = self.azimuth # Geometrically calculate the depth of maximum given this test position x_max_guess = self.get_shower_max(source_x, source_y, core_x, core_y, zenith) x_max_guess *= x_max_scale # Calculate expected Xmax given this energy x_max_exp = guess_shower_depth(energy) # / np.cos(20*u.deg) # Convert to binning of Xmax x_max_diff = x_max_guess - x_max_exp # Check for range if x_max_diff > 200: x_max_diff = 200 if x_max_diff < -150: x_max_diff = -150 # Calculate impact distance for all telescopes impact = np.sqrt( (self.tel_pos_x - core_x) ** 2 + (self.tel_pos_y - core_y) ** 2 ) # And the expected rotation angle phi = np.arctan2(self.tel_pos_y - core_y, self.tel_pos_x - core_x) # Rotate and translate all pixels such that they match the # template orientation # numba does not support masked arrays, work on underlying array and add mask back pix_x_rot, pix_y_rot = rotate_translate( self.pixel_y.data, self.pixel_x.data, source_y, source_x, -phi ) pix_x_rot = np.ma.array(pix_x_rot, mask=self.pixel_x.mask, copy=COPY_IF_NEEDED) pix_y_rot = np.ma.array(pix_y_rot, mask=self.pixel_y.mask, copy=COPY_IF_NEEDED) # In the interpolator class we can gain speed advantages by using masked arrays # so we need to make sure here everything is masked prediction = ma.zeros(self.image.shape) prediction.mask = ma.getmask(self.image) time_gradients, time_gradients_uncertainty = ( np.zeros(self.image.shape[0]), np.zeros(self.image.shape[0]), ) # Loop over all telescope types and get prediction for tel_type in np.unique(self.tel_types).tolist(): type_mask = self.tel_types == tel_type prediction[type_mask] = self.image_prediction( tel_type, np.rad2deg(zenith), azimuth, energy * np.ones_like(impact[type_mask]), impact[type_mask], x_max_diff * np.ones_like(impact[type_mask]), np.rad2deg(pix_x_rot[type_mask]), np.rad2deg(pix_y_rot[type_mask]), ) if self.use_time_gradient: tg, tgu = self.predict_time( tel_type, np.rad2deg(zenith), azimuth, energy * np.ones_like(impact[type_mask]), impact[type_mask], x_max_diff * np.ones_like(impact[type_mask]), ) time_gradients[type_mask] = tg time_gradients_uncertainty[type_mask] = tgu if self.use_time_gradient: time_gradients_uncertainty[time_gradients_uncertainty == 0] = 1e-6 chi2 = 0 for telescope_index, (image, time) in enumerate(zip(self.image, self.time)): time_mask = np.logical_and( np.invert(ma.getmask(image)), time > 0, ) time_mask = np.logical_and(time_mask, np.isfinite(time)) time_mask = np.logical_and(time_mask, image > 5) if ( np.sum(time_mask) > 3 and time_gradients_uncertainty[telescope_index] > 0 ): time_slope = lts_linear_regression( x=np.rad2deg(pix_x_rot[telescope_index][time_mask]), y=time[time_mask], samples=3, )[0][0] time_like = -1 * norm.logpdf( time_slope, loc=time_gradients[telescope_index], scale=time_gradients_uncertainty[telescope_index], ) chi2 += time_like # Likelihood function will break if we find a NaN or a 0 prediction[np.isnan(prediction)] = 1e-8 prediction[prediction < 1e-8] = 1e-8 # prediction *= self.scale_factor[:, np.newaxis] # Get likelihood that the prediction matched the camera image mask = ma.getmask(self.image) like = neg_log_likelihood_approx(self.image, prediction, self.spe, self.ped) like[mask] = 0 if goodness_of_fit: like_expectation_gaus = mean_poisson_likelihood_gaussian( prediction, self.spe, self.ped ) like_expectation_gaus[mask] = 0 mask_shower = np.invert(mask) goodness = np.sum(2 * like - like_expectation_gaus, axis=-1) / np.sqrt( 2 * (np.sum(mask_shower, axis=-1) - 6) ) return goodness like = np.sum(like) final_sum = like if self.use_time_gradient: final_sum += chi2 return final_sum
[docs] def set_event_properties( self, hillas_dict, image_dict, time_dict, mask_dict, subarray, array_pointing, telescope_pointing, ): """The setter class is used to set the event properties within this class before minimisation can take place. This simply copies a bunch of useful properties to class members, so that we can use them later without passing all this information around. Parameters ---------- hillas_dict: dict dictionary with telescope IDs as key and HillasParametersContainer instances as values image_dict: dict Amplitude of pixels in camera images time_dict: dict Time information per each pixel in camera images mask_dict: dict Event image masks subarray: dict Type of telescope array_pointing: SkyCoord[AltAz] Array pointing direction in the AltAz Frame telescope_pointing: SkyCoord[AltAz] Telescope pointing directions in the AltAz Frame Returns ------- None """ # First store these parameters in the class so we can use them # in minimisation For most values this is simply copying self.image = image_dict self.tel_pos_x = np.zeros(len(hillas_dict)) self.tel_pos_y = np.zeros(len(hillas_dict)) # self.scale_factor = np.zeros(len(hillas_dict)) self.ped = np.zeros(len(hillas_dict)) self.tel_types, self.tel_id = list(), list() max_pix_x = 0 px, py, pa, pt = list(), list(), list(), list() self.hillas_parameters = list() # Get telescope positions in tilted frame tilted_frame = TiltedGroundFrame(pointing_direction=array_pointing) ground_positions = subarray.tel_coords grd_coord = SkyCoord( x=ground_positions.x, y=ground_positions.y, z=ground_positions.z, frame=GroundFrame(), ) self.array_direction = array_pointing self.zenith = (np.pi / 2) - self.array_direction.alt.to(u.rad).value self.azimuth = self.array_direction.az.to(u.deg).value self.nominal_frame = NominalFrame(origin=self.array_direction) tilt_coord = grd_coord.transform_to(tilted_frame) type_tel = {} indices = subarray.tel_ids_to_indices(list(hillas_dict.keys())) # So here we must loop over the telescopes for tel_id, i in zip(hillas_dict, range(len(hillas_dict))): geometry = subarray.tel[tel_id].camera.geometry type = subarray.tel[tel_id].camera.name type_tel[tel_id] = type mask = mask_dict[tel_id] focal_length = subarray.tel[tel_id].optics.effective_focal_length camera_frame = CameraFrame( telescope_pointing=telescope_pointing[tel_id], focal_length=focal_length, ) camera_coords = SkyCoord( x=geometry.pix_x[mask], y=geometry.pix_y[mask], frame=camera_frame ) nominal_coords = camera_coords.transform_to(self.nominal_frame) px.append(nominal_coords.fov_lon.to(u.rad).value) if len(px[i]) > max_pix_x: max_pix_x = len(px[i]) py.append(nominal_coords.fov_lat.to(u.rad).value) pa.append(image_dict[tel_id][mask]) pt.append(time_dict[tel_id][mask]) self.ped[i] = self.ped_table[type] self.tel_types.append(type) self.tel_id.append(tel_id) self.tel_pos_x[i] = tilt_coord[indices[i]].x.to(u.m).value self.tel_pos_y[i] = tilt_coord[indices[i]].y.to(u.m).value self.hillas_parameters.append(hillas_dict[tel_id]) # self.scale_factor[i] = 1#self.template_scale[tel_id] # Most interesting stuff is now copied to the class, but to remove our requirement # for loops we must copy the pixel positions to an array with the length of the # largest image # First allocate everything shape = (len(hillas_dict), max_pix_x) self.pixel_x, self.pixel_y = ma.zeros(shape), ma.zeros(shape) self.image, self.time, self.ped, self.spe = ( ma.zeros(shape), ma.zeros(shape), ma.zeros(shape), ma.zeros(shape), ) self.tel_types = np.array(self.tel_types) # Copy everything into our masked arrays for i in range(len(hillas_dict)): array_len = len(px[i]) self.pixel_x[i][:array_len] = px[i] self.pixel_y[i][:array_len] = py[i] self.image[i][:array_len] = pa[i] self.time[i][:array_len] = pt[i] self.ped[i][:] = self.ped_table[self.tel_types[i]] self.spe[i][:] = 0.5 # Set the image mask mask = self.image == 0.0 self.pixel_x[mask], self.pixel_y[mask] = ma.masked, ma.masked self.image[mask] = ma.masked self.time[mask] = ma.masked # Finally run some functions to get ready for the event self.get_hillas_mean() self.initialise_templates(type_tel)
[docs] def predict( self, hillas_dict, subarray, array_pointing, shower_seed, energy_seed, telescope_pointings=None, image_dict=None, mask_dict=None, time_dict=None, ): """Predict method for the ImPACT reconstructor. Used to calculate the reconstructed ImPACT shower geometry and energy. Parameters ---------- shower_seed: ReconstructedShowerContainer Seed shower geometry to be used in the fit energy_seed: ReconstructedEnergyContainer Seed energy to be used in fit Returns ------- ReconstructedShowerContainer, ReconstructedEnergyContainer: """ if image_dict is None: raise EmptyImages("Images not passed to ImPACT reconstructor") self.set_event_properties( copy.deepcopy(hillas_dict), image_dict, time_dict, mask_dict, subarray, array_pointing, telescope_pointings, ) self.reset_interpolator() like_min = 1e9 fit_params = None for cleaning in shower_seed: # Copy all of our seed parameters out of the shower objects # We need to convert the shower direction to the nominal system horizon_seed = SkyCoord( az=shower_seed[cleaning].az, alt=shower_seed[cleaning].alt, frame=AltAz(), ) nominal_seed = horizon_seed.transform_to(self.nominal_frame) source_x = nominal_seed.fov_lon.to_value(u.rad) source_y = nominal_seed.fov_lat.to_value(u.rad) # And the core position to the tilted ground frame ground = SkyCoord( x=shower_seed[cleaning].core_x, y=shower_seed[cleaning].core_y, z=0 * u.m, frame=GroundFrame, ) tilted = ground.transform_to( TiltedGroundFrame(pointing_direction=self.array_direction) ) tilt_x = tilted.x.to(u.m).value tilt_y = tilted.y.to(u.m).value zenith = 90 * u.deg - self.array_direction.alt for energy_reco in energy_seed: energy = energy_seed[energy_reco].energy.value seed, step, limits = create_seed( source_x, source_y, tilt_x, tilt_y, energy ) # Perform maximum likelihood fit fit_params_min, errors, like = self.minimise( params=seed, step=step, limits=limits, ) # fit_params_min, like = self.energy_guess(seed) if like < like_min: fit_params = fit_params_min like_min = like if fit_params is None: return INVALID_GEOMETRY, INVALID_ENERGY # Now do full minimisation seed = create_seed( fit_params[0], fit_params[1], fit_params[2], fit_params[3], fit_params[4] ) fit_params, errors, like = self.minimise( params=seed[0], step=seed[1], limits=seed[2], ) # Create a container class for reconstructed shower # Convert the best fits direction and core to Horizon and ground systems and # copy to the shower container nominal = SkyCoord( fov_lon=fit_params[0] * u.rad, fov_lat=fit_params[1] * u.rad, frame=self.nominal_frame, ) horizon = nominal.transform_to(AltAz()) shower_result = ReconstructedGeometryContainer() shower_result.prefix = self.__class__.__name__ # Transform everything back to a useful system shower_result.alt, shower_result.az = horizon.alt, horizon.az tilted = SkyCoord( x=fit_params[2] * u.m, y=fit_params[3] * u.m, z=0 * u.m, frame=TiltedGroundFrame(pointing_direction=self.array_direction), ) ground = project_to_ground(tilted) shower_result.core_x = ground.x shower_result.core_y = ground.y shower_result.core_tilted_x = tilted.x shower_result.core_tilted_y = tilted.y shower_result.core_tilted_uncert_x = errors[2] * u.m shower_result.core_tilted_uncert_y = errors[3] * u.m shower_result.is_valid = True # Currently no errors not available to copy NaN shower_result.alt_uncert = source_x * u.rad shower_result.az_uncert = source_y * u.rad # shower_result.core_uncert = np.nan # Copy reconstructed Xmax slant_depth = ( fit_params[5] * self.get_shower_max( fit_params[0], fit_params[1], fit_params[2], fit_params[3], zenith.to(u.rad).value, ) * u.g / (u.cm * u.cm) ) # h_max really is the vertical altitude of the maximum above sea level shower_result.h_max = self.atmosphere_profile.height_from_slant_depth( slant_depth ) shower_result.h_max_uncert = errors[5] * shower_result.h_max goodness_of_fit = self.get_likelihood( fit_params[0], fit_params[1], fit_params[2], fit_params[3], fit_params[4], fit_params[5], True, ) shower_result.goodness_of_fit = np.sum(goodness_of_fit) shower_result.telescopes = self.tel_id shower_result.is_valid = True # Create a container class for reconstructed energy energy_result = ReconstructedEnergyContainer( prefix=self.__class__.__name__, energy=fit_params[4] * u.TeV, energy_uncert=errors[4] * u.TeV, telescopes=self.tel_id, is_valid=True, goodness_of_fit=np.sum(goodness_of_fit), ) return shower_result, energy_result
[docs] def minimise( self, params, step, limits, ): """ Parameters ---------- params: ndarray Seed parameters for fit step: ndarray Initial step size in the fit limits: ndarray Fit bounds Returns ------- tuple: best fit parameters and errors """ limits = np.asarray(limits) energy = params[4] xmax_scale = 1 # Now do the minimisation proper minimizer = Minuit( self.get_likelihood, source_x=params[0], source_y=params[1], core_x=params[2], core_y=params[3], energy=energy, x_max_scale=xmax_scale, goodness_of_fit=False, ) # This time leave everything free minimizer.fixed = [False, False, False, False, False, False, True] minimizer.errors = step minimizer.limits = limits minimizer.errordef = MINUIT_ERRORDEF # Tighter fit tolerances minimizer.tol *= MINUIT_TOLERANCE_FACTOR minimizer.strategy = MINUIT_STRATEGY # Fit and output parameters and errors _ = minimizer.migrad(iterate=MIGRAD_ITERATE) fit_params = minimizer.values errors = minimizer.errors return ( ( fit_params["source_x"], fit_params["source_y"], fit_params["core_x"], fit_params["core_y"], fit_params["energy"], fit_params["x_max_scale"], ), ( errors["source_x"], errors["source_y"], errors["core_x"], errors["core_x"], errors["energy"], errors["x_max_scale"], ), minimizer.fval, )
[docs] def reset_interpolator(self): """ This function is needed in order to reset some variables in the interpolator at each new event. Without this reset, a new event starts with information from the previous event. """ for key in self.prediction: self.prediction[key].reset() for key in self.time_prediction: self.time_prediction[key].reset()