Source code for ctapipe.image.modifications

from collections import defaultdict

import numpy as np
from numba import njit
from traitlets import default

from ..containers import EventType
from ..core import TelescopeComponent
from ..core.env import CTAPIPE_DISABLE_NUMBA_CACHE
from ..core.traits import (
    Bool,
    BoolTelescopeParameter,
    FloatTelescopeParameter,
    Int,
    Path,
)
from ..instrument import SubarrayDescription
from ..io import EventSource
from ..utils import EventTypeFilter

__all__ = [
    "ImageModifier",
]


def _add_noise(image, noise_level, rng=None, correct_bias=True):
    """
    Create a new image with added poissonian noise
    """
    if not rng:
        rng = np.random.default_rng()

    noisy_image = image.copy()
    noise = rng.poisson(noise_level, size=image.shape)
    noisy_image += noise

    if correct_bias:
        noisy_image -= noise_level

    return noisy_image


@njit(cache=not CTAPIPE_DISABLE_NUMBA_CACHE)
def _smear_psf_randomly(
    image, fraction, indices, indptr, smear_probabilities, seed=None
):
    """
    Create a new image with values smeared across the
    neighbor pixels specified by `indices` and `indptr`.
    These are what defines the sparse neighbor matrix
    and are available as attributes from the neighbor matrix.
    The amount of charge that is distributed away from a given
    pixel is drawn from a poissonian distribution.
    The distribution of this charge among the neighboring
    pixels follows a multinomial.
    Pixels at the camera edge lose charge this way.
    No geometry is available in this function due to the numba optimization,
    so the indices, indptr and smear_probabilities have to match
    to get sensible results.

    Parameters:
    -----------
    image: ndarray
        1d array of the pixel charge values
    fraction: float
        fraction of charge that will be distributed among neighbors (modulo poissonian)
    indices: ndarray[int]
        CSR format index array of the neighbor matrix
    indptr: ndarray[int]
        CSR format index pointer array of the neighbor matrix
    smear_probabilities: ndarray[float]
        shape: (n_neighbors, )
        A priori distribution of the charge amongst neighbors.
        In most cases probably of the form np.full(n_neighbors, 1/n_neighbors)
    seed: int
        Random seed for the numpy rng.
        Because this is numba optimized, a rng instance can not be used here

    Returns:
    --------
    new_image: ndarray
        1d array with smeared values
    """
    new_image = image.copy()
    if seed is not None:
        np.random.seed(seed)

    for pixel in range(len(image)):
        if image[pixel] <= 0:
            continue

        to_smear = np.random.poisson(image[pixel] * fraction)
        if to_smear == 0:
            continue

        # remove light from current pixel
        new_image[pixel] -= to_smear

        # add light to neighbor pixels
        neighbors = indices[indptr[pixel] : indptr[pixel + 1]]
        n_neighbors = len(neighbors)

        # we always distribute the charge as if the maximum number
        # of neighbors of a geometry is present, so that charge
        # on the edges of the camera is lost
        neighbor_charges = np.random.multinomial(to_smear, smear_probabilities)

        for n in range(n_neighbors):
            neighbor = neighbors[n]
            new_image[neighbor] += neighbor_charges[n]

    return new_image


[docs] class NoiseEventTypeFilter(EventTypeFilter): """ Event filter to select noise events for MC tuning By default it selects SKY_PEDESTAL events """ @default("allowed_types") def allowed_types_default(self): return {EventType.SKY_PEDESTAL}
@njit(cache=not CTAPIPE_DISABLE_NUMBA_CACHE) def build_wf_noise_pixelwise( waveforms, n_noise_realizations, nsb_level, rng, sample_pixels_independently ): """ Combine "elemental noise waveforms" into total noise waveforms by combining a given number of them, chosen randomly Parameters ---------- waveforms: array (nevents, ngains, npixels, nsamples), the elemental noise waveforms n_noise_realizations: int the number of total noise waveforms we want to generate nsb_level: int the number of elemental noise waveforms we combine to produce each total noise waveform rng: random number generator sample_pixels_independently: bool if True, each pixel will use a different random combination of the elemental noise events to build the noise to be added to its waveform. if False, the waveform for each pixel in any given noise realization comes from the same random combination of the input elemental noise events. Returns ------- array (n_noise, ngains, npixels, nsamples), total noise waveforms """ n_events, n_gains, n_pixels, n_samples = waveforms.shape noise = np.zeros( (n_noise_realizations, n_gains, n_pixels, n_samples), dtype=np.float32 ) for i in range(n_noise_realizations): if sample_pixels_independently: for pixel in range(n_pixels): chosen = rng.permutation(n_events)[:nsb_level] # The line above is slower (especially for n_events much # larger than nsb_level) than rng.choice(n_events, nsb_level) # Unfortunately rng.choice does not currently work with numba. for event in chosen: noise[i, :, pixel] += waveforms[event, :, pixel] else: chosen = rng.permutation(n_events)[:nsb_level] for event in chosen: noise[i] += waveforms[event] return noise
[docs] class WaveformModifier(TelescopeComponent): """ Component to add NSB noise to R1 waveforms. This component in principle to be applied on MC shower simulations, to make them closer to real data in terms of noise level. There are two possibilities: 1. The "showers MC" file has dark-NSB settings and electronic noise ( waveform baseline fluctuations), and the input NSB file is a dedicated sim_telarray file, which must be produced with the same telescope array configuration (and other simulation settings) as the showers MC to which the noise is to be added, but containing only NSB noise (electronic fluctuations of the baseline should be switched off) 2. The showers MC file is produced with no noise (baseline fluctuations) of any kind (electronic or NSB), just the Cherenkov signal (with the appropriate single-p.e.-response fluctuations), whereas the nsb file is a real data DL0 file from which only the interleaved pedestals are used (all gain channels must be present for all pixels). In that case, nsb_level must be =1 (to match the MC to the data) and sample_pixels_independently=False (we do not want e.g. to duplicate stars in the FoV). In case (1), the number of available noise events per telescope in the NSB file must be at least twice the number of waveforms ("nsb_level") from that file that we want to add up. If the NSB file is produced with a level of 25% of dark NSB, and we want to simulate 10x dark NSB, then nsb_level=40 (=10/0.25) and the file must contain at least 80 events. This is to guarantee that the different noise waveforms are not too correlated """ nsb_file = Path( default_value=None, help="Path to a dedicated NSB-only file (e.g. from sim_telarray)", ).tag(config=True) nsb_level = Int( default_value=1, help=( "Number of random instances of the NSB waveforms from the " "NSB file to be added up to the waveform" ), ).tag(config=True) n_noise_realizations = Int( default_value=100, help=( "Number of different realizations of the total NSB waveform to " "be created per pixel" ), ).tag(config=True) sample_pixels_independently = Bool( default_value=True, help=( "If True, each pixel uses a different " "random combination of the input noise " "events" "If False, all pixels use the same random " "combination of noise events. That is, noise " "events will be combined as full cameras" ), ).tag(config=True) rng_seed = Int(default_value=1, help="Seed for the random number generator").tag( config=True ) def __init__( self, subarray: SubarrayDescription, config=None, parent=None, **kwargs, ): """ Parameters ---------- subarray: SubarrayDescription Description of the subarray. Provides information about the camera which are useful in calibration. Also required for configuring the TelescopeParameter traitlets. config: traitlets.loader.Config Configuration specified by config file or cmdline arguments. Used to set traitlet values. This is mutually exclusive with passing a ``parent``. parent: ctapipe.core.Component or ctapipe.core.Tool Parent of this component in the configuration hierarchy, this is mutually exclusive with passing ``config`` """ super().__init__(subarray=subarray, config=config, parent=parent, **kwargs) self.rng = np.random.default_rng(self.rng_seed) self.event_type_filter = NoiseEventTypeFilter(parent=self) self.total_noise = dict() # One key per tel_id, and each of them is an array of shape # [n_noise_realizations, ngains, npixels, nsamples] # Read in the waveforms in the NSB-only file. Store in a dictionary # with one key per telescope, containing an array [n_events, n_gains, # n_pixels, n_samples] nsb_database = self.read_nsb_database() # Check if noise statistics is sufficient: stats_ok = self.check_noise_statistics(nsb_database) if not stats_ok: raise ValueError("Please use an input nsb_file with more events!") # Now shift the waveforms so that they have mean=0 and do not introduce # any bias (just fluctuations) self.zero_baseline(nsb_database) # Add up waveforms selected at random to obtain different # realizations of the total noise that will be added for tel_id in nsb_database: self.total_noise[tel_id] = build_wf_noise_pixelwise( nsb_database[tel_id], self.n_noise_realizations, self.nsb_level, self.rng, self.sample_pixels_independently, )
[docs] def read_nsb_database(self): """ Reads in R1 noise waveforms from an input file self.nsb_file Returns ------- nsb_database : dict Dictionary with one key per telescope, containing an array [n_events, n_gains, n_pixels, n_samples] (noise waveforms) """ nsb_database = defaultdict(list) with EventSource( input_url=self.nsb_file, skip_calibration_events=False ) as source: for event in source: if not self.event_type_filter(event): continue for tel_id, r1 in event.r1.tel.items(): nsb_database[tel_id].append(r1.waveform) nsb_database = { tel_id: np.stack(waveforms) for tel_id, waveforms in nsb_database.items() } return nsb_database
[docs] def check_noise_statistics(self, nsb_database): """ Check that we have enough NSB-only events for all telescopes. We require that the number of NSB events for any telescope is at least two times the number of waveforms (=nsb_level) that we will add up. This is to avoid excessive correlation among the waveforms. Parameters ---------- nsb_database: dict Dictionary with one key per telescope, containing an array [n_events, n_gains, n_pixels, n_samples] (noise waveforms) Returns ------- stats_ok : bool True if statistics of noise events is considered sufficient """ stats_ok = True for tel_id in nsb_database: nevents = nsb_database[tel_id].shape[0] if nevents >= 2 * self.nsb_level: continue self.log.error( f"Not enough NSB events available for tel_" f"id {tel_id}. " f"For nsb_level = {self.nsb_level}, at least " f"{2 * self.nsb_level} events are needed ({nevents} " f"were found)." ) stats_ok = False return stats_ok
[docs] def zero_baseline(self, nsb_database): """ For each telescope and gain we average the waveform values for all pixels, and subtract those averages from the waveforms. In this way we make sure we do not introduce any net average charge, but only increase the fluctuations. Parameters ---------- nsb_database: dict Dictionary with one key per telescope, containing an array [n_events, n_gains, n_pixels, n_samples] (noise waveforms) Returns ------- nsb_database: dict Dictionary, same as above but after baseline zeroing """ for tel_id in nsb_database: for channel in range(nsb_database[tel_id].shape[1]): mean = np.mean(nsb_database[tel_id][:, channel, :, :]) nsb_database[tel_id][:, channel, :, :] -= mean
[docs] def __call__(self, event): """ Parameters ---------- event: container A `~ctapipe.containers.ArrayEventContainer` event container """ for tel_id, r1 in event.r1.tel.items(): waveform = r1.waveform # Note: MC waveforms passed to this function should contain data in all # pixels (not DVR'ed - obviously DVR depends on noise level, it does not # make sense to add the noise after DVR was applied) noise = self.total_noise[tel_id][ self.rng.integers(self.n_noise_realizations) ] if r1.selected_gain_channel is not None: noise = noise[r1.selected_gain_channel, np.arange(waveform.shape[1])] r1.waveform = waveform + noise
[docs] class ImageModifier(TelescopeComponent): """ Component to tune simulated background to overserved NSB values. A differentiation between bright and dim pixels is taking place because this happens at DL1a level and in general the integration window would change for peak-searching extraction algorithms with different background levels introducing a bias to the charge in dim pixels. The performed steps include: - Smearing of the image (simulating a worse PSF) - Adding poissonian noise (different for bright and dim pixels) - Adding a bias to dim pixel charges (see above) """ psf_smear_factor = FloatTelescopeParameter( default_value=0.0, help="Fraction of light to move to each neighbor" ).tag(config=True) noise_transition_charge = FloatTelescopeParameter( default_value=0.0, help="separation between dim and bright pixels" ).tag(config=True) noise_bias_dim_pixels = FloatTelescopeParameter( default_value=0.0, help="extra bias to add in dim pixels" ).tag(config=True) noise_level_dim_pixels = FloatTelescopeParameter( default_value=0.0, help="expected extra noise in dim pixels" ).tag(config=True) noise_level_bright_pixels = FloatTelescopeParameter( default_value=0.0, help="expected extra noise in bright pixels" ).tag(config=True) noise_correct_bias = BoolTelescopeParameter( default_value=True, help="If True subtract the expected noise from the image." ).tag(config=True) rng_seed = Int(default_value=1337, help="Seed for the random number generator").tag( config=True ) def __init__( self, subarray: SubarrayDescription, config=None, parent=None, **kwargs ): """ Parameters ---------- subarray: SubarrayDescription Description of the subarray. Provides information about the camera which are useful in calibration. Also required for configuring the TelescopeParameter traitlets. config: traitlets.loader.Config Configuration specified by config file or cmdline arguments. Used to set traitlet values. This is mutually exclusive with passing a ``parent``. parent: ctapipe.core.Component or ctapipe.core.Tool Parent of this component in the configuration hierarchy, this is mutually exclusive with passing ``config`` """ super().__init__(subarray=subarray, config=config, parent=parent, **kwargs) self.rng = np.random.default_rng(self.rng_seed)
[docs] def __call__(self, tel_id, image, rng=None): dtype = image.dtype if self.psf_smear_factor.tel[tel_id] > 0: geom = self.subarray.tel[tel_id].camera.geometry image = _smear_psf_randomly( image=image, fraction=self.psf_smear_factor.tel[tel_id], indices=geom.neighbor_matrix_sparse.indices, indptr=geom.neighbor_matrix_sparse.indptr, smear_probabilities=np.full(geom.max_neighbors, 1 / geom.max_neighbors), seed=self.rng.integers(0, np.iinfo(np.int64).max), ) if ( self.noise_level_dim_pixels.tel[tel_id] > 0 or self.noise_level_bright_pixels.tel[tel_id] > 0 ): bright_pixels = image > self.noise_transition_charge.tel[tel_id] noise = np.where( bright_pixels, self.noise_level_bright_pixels.tel[tel_id], self.noise_level_dim_pixels.tel[tel_id], ) image = _add_noise( image, noise, rng=self.rng, correct_bias=self.noise_correct_bias.tel[tel_id], ) image[~bright_pixels] += self.noise_bias_dim_pixels.tel[tel_id] return image.astype(dtype, copy=False)