Source code for ctapipe.image.cleaning

"""
Image Cleaning Algorithms (identification of noisy pixels)


All algorithms return a boolean mask that is True for pixels surviving
the cleaning.

To get a zero-suppressed image and pixel
list, use ``image[mask], geom.pix_id[mask]``, or to keep the same
image size and just set unclean pixels to 0 or similar, use
``image[~mask] = 0``

"""

__all__ = [
    "tailcuts_clean",
    "dilate",
    "mars_cleaning_1st_pass",
    "fact_image_cleaning",
    "apply_time_delta_cleaning",
    "apply_time_average_cleaning",
    "time_constrained_clean",
    "ImageCleaner",
    "TailcutsImageCleaner",
]

from abc import abstractmethod

import numpy as np

from ..core import TelescopeComponent
from ..core.traits import (
    BoolTelescopeParameter,
    FloatTelescopeParameter,
    IntTelescopeParameter,
)
from .morphology import brightest_island, number_of_islands


[docs]def tailcuts_clean( geom, image, picture_thresh=7, boundary_thresh=5, keep_isolated_pixels=False, min_number_picture_neighbors=0, ): """Clean an image by selection pixels that pass a two-threshold tail-cuts procedure. The picture and boundary thresholds are defined with respect to the pedestal dispersion. All pixels that have a signal higher than the picture threshold will be retained, along with all those above the boundary threshold that are neighbors of a picture pixel. To include extra neighbor rows of pixels beyond what are accepted, use the `ctapipe.image.dilate` function. Parameters ---------- geom: `ctapipe.instrument.CameraGeometry` Camera geometry information image: array pixel values picture_thresh: float or array threshold above which all pixels are retained boundary_thresh: float or array threshold above which pixels are retained if they have a neighbor already above the picture_thresh keep_isolated_pixels: bool If True, pixels above the picture threshold will be included always, if not they are only included if a neighbor is in the picture or boundary min_number_picture_neighbors: int A picture pixel survives cleaning only if it has at least this number of picture neighbors. This has no effect in case keep_isolated_pixels is True Returns ------- A boolean mask of *clean* pixels. """ pixels_above_picture = image >= picture_thresh if keep_isolated_pixels or min_number_picture_neighbors == 0: pixels_in_picture = pixels_above_picture else: # Require at least min_number_picture_neighbors. Otherwise, the pixel # is not selected number_of_neighbors_above_picture = geom.neighbor_matrix_sparse.dot( pixels_above_picture.view(np.byte) ) pixels_in_picture = pixels_above_picture & ( number_of_neighbors_above_picture >= min_number_picture_neighbors ) # by broadcasting together pixels_in_picture (1d) with the neighbor # matrix (2d), we find all pixels that are above the boundary threshold # AND have any neighbor that is in the picture pixels_above_boundary = image >= boundary_thresh pixels_with_picture_neighbors = geom.neighbor_matrix_sparse.dot(pixels_in_picture) if keep_isolated_pixels: return ( pixels_above_boundary & pixels_with_picture_neighbors ) | pixels_in_picture else: pixels_with_boundary_neighbors = geom.neighbor_matrix_sparse.dot( pixels_above_boundary ) return (pixels_above_boundary & pixels_with_picture_neighbors) | ( pixels_in_picture & pixels_with_boundary_neighbors )
[docs]def mars_cleaning_1st_pass( geom, image, picture_thresh=7, boundary_thresh=5, keep_isolated_pixels=False, min_number_picture_neighbors=0, ): """ Clean an image by selection pixels that pass a three-threshold tail-cuts procedure. All thresholds are defined with respect to the pedestal dispersion. All pixels that have a signal higher than the core (picture) threshold will be retained, along with all those above the boundary threshold that are neighbors of a core pixel AND all those above the boundary threshold that are neighbors of a neighbor of a core pixel. Parameters ---------- geom: `ctapipe.instrument.CameraGeometry` Camera geometry information image: array pixel values picture_thresh: float threshold above which all pixels are retained boundary_thresh: float threshold above which pixels are retained if they have a neighbor already above the picture_thresh; it is then reapplied iteratively to the neighbor of the neighbor keep_isolated_pixels: bool If True, pixels above the picture threshold will be included always, if not they are only included if a neighbor is in the picture or boundary min_number_picture_neighbors: int A picture pixel survives cleaning only if it has at least this number of picture neighbors. This has no effect in case keep_isolated_pixels is True Returns ------- A boolean mask of *clean* pixels. """ pixels_from_tailcuts_clean = tailcuts_clean( geom, image, picture_thresh, boundary_thresh, keep_isolated_pixels, min_number_picture_neighbors, ) # this selects any core pixel and any of its first neighbors # At this point we don't know yet which ones should be kept. # In principle, the pixel thresholds should be hierarchical from core to # boundaries (this should be true for every type of particle triggering # the image), so we can just check which pixels have more than # boundary_thresh photo-electrons in the same image, but starting from # the mask we got from 'tailcuts_clean'. pixels_above_2nd_boundary = image >= boundary_thresh # and now it's the same as the last part of 'tailcuts_clean', but without # the core pixels, i.e. we start from the neighbors of the core pixels. pixels_with_previous_neighbors = geom.neighbor_matrix_sparse.dot( pixels_from_tailcuts_clean ) if keep_isolated_pixels: return ( pixels_above_2nd_boundary & pixels_with_previous_neighbors ) | pixels_from_tailcuts_clean else: pixels_with_2ndboundary_neighbors = geom.neighbor_matrix_sparse.dot( pixels_above_2nd_boundary ) return (pixels_above_2nd_boundary & pixels_with_previous_neighbors) | ( pixels_from_tailcuts_clean & pixels_with_2ndboundary_neighbors )
[docs]def dilate(geom, mask): """ Add one row of neighbors to the True values of a pixel mask and return the new mask. This can be used to include extra rows of pixels in a mask that was pre-computed, e.g. via `tailcuts_clean`. Parameters ---------- geom: `~ctapipe.instrument.CameraGeometry` Camera geometry information mask: ndarray input mask (array of booleans) to be dilated """ return mask | geom.neighbor_matrix_sparse.dot(mask)
[docs]def apply_time_delta_cleaning( geom, mask, arrival_times, min_number_neighbors, time_limit ): """ Identify all pixels from selection that have less than N neighbors that arrived within a given timeframe. Parameters ---------- geom: `ctapipe.instrument.CameraGeometry` Camera geometry information mask: array, boolean boolean mask of *clean* pixels before time_delta_cleaning arrival_times: array pixel timing information min_number_neighbors: int Threshold to determine if a pixel survives cleaning steps. These steps include checks of neighbor arrival time and value time_limit: int or float arrival time limit for neighboring pixels Returns ------- A boolean mask of *clean* pixels. """ pixels_to_keep = mask.copy() time_diffs = np.abs(arrival_times[mask, None] - arrival_times) # neighboring pixels arriving in the time limit and previously selected valid_neighbors = (time_diffs < time_limit) & geom.neighbor_matrix[mask] & mask enough_neighbors = np.count_nonzero(valid_neighbors, axis=1) >= min_number_neighbors pixels_to_keep[pixels_to_keep] &= enough_neighbors return pixels_to_keep
[docs]def apply_time_average_cleaning( geom, mask, image, arrival_times, picture_thresh, time_limit ): """ Extract all pixels that arrived within a given timeframe with respect to the time average of the pixels on the main island. In order to avoid removing signal pixels of large impact-parameter events, the time limit for bright pixels is doubled. Parameters ---------- geom: `ctapipe.instrument.CameraGeometry` Camera geometry information mask: array, boolean boolean mask of *clean* pixels before time_delta_cleaning image: array pixel values arrival_times: array pixel timing information picture_thresh: float threshold above which time limit is extended twice its value time_limit: int or float arrival time limit w.r.t. the average time of the core pixels Returns ------- A boolean mask of *clean* pixels. """ mask = mask.copy() if np.count_nonzero(mask) > 0: # use main island (maximum charge) for time average calculation n_islands, island_labels = number_of_islands(geom, mask) mask_main = brightest_island(n_islands, island_labels, image) time_ave = np.average(arrival_times[mask_main], weights=image[mask_main] ** 2) time_diffs = np.abs(arrival_times[mask] - time_ave) time_limit_pixwise = np.where( image < (2 * picture_thresh), time_limit, time_limit * 2 )[mask] mask[mask] &= time_diffs < time_limit_pixwise return mask
[docs]def fact_image_cleaning( geom, image, arrival_times, picture_threshold=4, boundary_threshold=2, min_number_neighbors=2, time_limit=5, ): """Clean an image by selection pixels that pass the fact cleaning procedure. Cleaning contains the following steps: 1: Find pixels containing more photons than a threshold t1 2: Remove pixels with less than N neighbors 3: Add neighbors of the remaining pixels that are above a lower threshold t2 4: Remove pixels with less than N neighbors arriving within a given timeframe 5: Remove pixels with less than N neighbors 6: Remove pixels with less than N neighbors arriving within a given timeframe Parameters ---------- geom: `ctapipe.instrument.CameraGeometry` Camera geometry information image: array pixel values arrival_times: array pixel timing information picture_threshold: float or array threshold above which all pixels are retained boundary_threshold: float or array threshold above which pixels are retained if they have a neighbor already above the picture_thresh min_number_neighbors: int Threshold to determine if a pixel survives cleaning steps. These steps include checks of neighbor arrival time and value time_limit: int or float arrival time limit for neighboring pixels Returns ------- A boolean mask of *clean* pixels. References ---------- See [temme2016]_ and for implementation [factcleaning]_ """ # Step 1 pixels_to_keep = image >= picture_threshold # Step 2 number_of_neighbors_above_picture = geom.neighbor_matrix_sparse.dot( (pixels_to_keep).view(np.byte) ) pixels_to_keep = pixels_to_keep & ( number_of_neighbors_above_picture >= min_number_neighbors ) # Step 3 pixels_above_boundary = image >= boundary_threshold pixels_to_keep = dilate(geom, pixels_to_keep) & pixels_above_boundary # nothing else to do if min_number_neighbors <= 0 if min_number_neighbors <= 0: return pixels_to_keep # Step 4 pixels_to_keep = apply_time_delta_cleaning( geom, pixels_to_keep, arrival_times, min_number_neighbors, time_limit ) # Step 5 number_of_neighbors = geom.neighbor_matrix_sparse.dot( (pixels_to_keep).view(np.byte) ) pixels_to_keep = pixels_to_keep & (number_of_neighbors >= min_number_neighbors) # Step 6 pixels_to_keep = apply_time_delta_cleaning( geom, pixels_to_keep, arrival_times, min_number_neighbors, time_limit ) return pixels_to_keep
[docs]def time_constrained_clean( geom, image, arrival_times, picture_thresh=7, boundary_thresh=5, time_limit_core=4.5, time_limit_boundary=1.5, min_number_picture_neighbors=1, ): """ time constrained cleaning by MAGIC Cleaning contains the following steps: - Find core pixels (containing more photons than a picture threshold) - Remove pixels with less than N neighbors - Keep core pixels whose arrival times are within a time limit of the average time - Find boundary pixels (containing more photons than a boundary threshold) - Remove pixels with less than N neighbors arriving within a given timeframe Parameters ---------- geom: `ctapipe.instrument.CameraGeometry` Camera geometry information image: array pixel values arrival_times: array pixel timing information picture_threshold: float or array threshold above which all pixels are retained boundary_threshold: float or array threshold above which pixels are retained if they have a neighbor already above the picture_thresh time_limit_core: int or float arrival time limit of core pixels w.r.t the average time time_limit_boundary: int or float arrival time limit of boundary pixels w.r.t neighboring core pixels min_number_neighbors: int Threshold to determine if a pixel survives cleaning steps. These steps include checks of neighbor arrival time and value Returns ------- A boolean mask of *clean* pixels. """ # find core pixels that pass a picture threshold pixels_above_picture = image >= picture_thresh # require at least min_number_picture_neighbors number_of_neighbors_above_picture = geom.neighbor_matrix_sparse.dot( pixels_above_picture.view(np.byte) ) pixels_in_picture = pixels_above_picture & ( number_of_neighbors_above_picture >= min_number_picture_neighbors ) # keep core pixels whose arrival times are within a certain time limit of the average mask_core = apply_time_average_cleaning( geom, pixels_in_picture, image, arrival_times, picture_thresh, time_limit_core ) # find boundary pixels that pass a boundary threshold pixels_above_boundary = image >= boundary_thresh pixels_with_picture_neighbors = geom.neighbor_matrix_sparse.dot(mask_core) mask_boundary = (pixels_above_boundary & pixels_with_picture_neighbors) & np.invert( mask_core ) # keep boundary pixels whose arrival times are within a certain time limit of the neighboring core pixels mask_boundary = mask_boundary.copy() time_diffs = np.abs(arrival_times[mask_boundary, None] - arrival_times) valid_neighbors = ( (time_diffs < time_limit_boundary) & geom.neighbor_matrix[mask_boundary] & mask_core ) enough_neighbors = ( np.count_nonzero(valid_neighbors, axis=1) >= min_number_picture_neighbors ) mask_boundary[mask_boundary] &= enough_neighbors return mask_core | mask_boundary
[docs]class ImageCleaner(TelescopeComponent): """ Abstract class for all configurable Image Cleaning algorithms. Use ``ImageCleaner.from_name()`` to construct an instance of a particular algorithm """
[docs] @abstractmethod def __call__( self, tel_id: int, image: np.ndarray, arrival_times: np.ndarray = None ) -> np.ndarray: """ Identify pixels with signal, and reject those with pure noise. Parameters ---------- tel_id: int which telescope id in the subarray is being used (determines which cut is used) image : np.ndarray image pixel data corresponding to the camera geometry arrival_times: np.ndarray image of arrival time (not used in this method) Returns ------- np.ndarray boolean mask of pixels passing cleaning """ pass
[docs]class TailcutsImageCleaner(ImageCleaner): """ Clean images using the standard picture/boundary technique. See `ctapipe.image.tailcuts_clean` """ picture_threshold_pe = FloatTelescopeParameter( default_value=10.0, help="top-level threshold in photoelectrons" ).tag(config=True) boundary_threshold_pe = FloatTelescopeParameter( default_value=5.0, help="second-level threshold in photoelectrons" ).tag(config=True) min_picture_neighbors = IntTelescopeParameter( default_value=2, help="Minimum number of neighbors above threshold to consider" ).tag(config=True) keep_isolated_pixels = BoolTelescopeParameter( default_value=False, help="If False, pixels with less neighbors than ``min_picture_neighbors`` are" "removed.", ).tag(config=True)
[docs] def __call__( self, tel_id: int, image: np.ndarray, arrival_times=None ) -> np.ndarray: """ Apply standard picture-boundary cleaning. See `ImageCleaner.__call__()` """ return tailcuts_clean( self.subarray.tel[tel_id].camera.geometry, image, picture_thresh=self.picture_threshold_pe.tel[tel_id], boundary_thresh=self.boundary_threshold_pe.tel[tel_id], min_number_picture_neighbors=self.min_picture_neighbors.tel[tel_id], keep_isolated_pixels=self.keep_isolated_pixels.tel[tel_id], )
class MARSImageCleaner(TailcutsImageCleaner): """ 1st-pass MARS-like Image cleaner (See `ctapipe.image.mars_cleaning_1st_pass`) """ def __call__( self, tel_id: int, image: np.ndarray, arrival_times=None ) -> np.ndarray: """ Apply MARS-style image cleaning. See `ImageCleaner.__call__()` """ return mars_cleaning_1st_pass( self.subarray.tel[tel_id].camera.geometry, image, picture_thresh=self.picture_threshold_pe.tel[tel_id], boundary_thresh=self.boundary_threshold_pe.tel[tel_id], min_number_picture_neighbors=self.min_picture_neighbors.tel[tel_id], keep_isolated_pixels=False, ) class FACTImageCleaner(TailcutsImageCleaner): """ Clean images using the FACT technique. See `ctapipe.image.fact_image_cleaning` for algorithm details """ time_limit_ns = FloatTelescopeParameter( default_value=5.0, help="arrival time limit for neighboring " "pixels, in ns" ).tag(config=True) def __call__( self, tel_id: int, image: np.ndarray, arrival_times=None ) -> np.ndarray: """Apply FACT-style image cleaning. see ImageCleaner.__call__()""" return fact_image_cleaning( geom=self.subarray.tel[tel_id].camera.geometry, image=image, arrival_times=arrival_times, picture_threshold=self.picture_threshold_pe.tel[tel_id], boundary_threshold=self.boundary_threshold_pe.tel[tel_id], min_number_neighbors=self.min_picture_neighbors.tel[tel_id], time_limit=self.time_limit_ns.tel[tel_id], ) class TimeConstrainedImageCleaner(TailcutsImageCleaner): """ MAGIC-like Image cleaner with timing information (See `ctapipe.image.time_constrained_clean`) """ time_limit_core_ns = FloatTelescopeParameter( default_value=4.5, help="arrival time limit for neighboring " "core pixels, in ns", ).tag(config=True) time_limit_boundary_ns = FloatTelescopeParameter( default_value=1.5, help="arrival time limit for neighboring " "boundary pixels, in ns", ).tag(config=True) def __call__( self, tel_id: int, image: np.ndarray, arrival_times=None ) -> np.ndarray: """ Apply MAGIC-like image cleaning with timing information. See `ImageCleaner.__call__()` """ return time_constrained_clean( self.subarray.tel[tel_id].camera.geometry, image, arrival_times=arrival_times, picture_thresh=self.picture_threshold_pe.tel[tel_id], boundary_thresh=self.boundary_threshold_pe.tel[tel_id], min_number_picture_neighbors=self.min_picture_neighbors.tel[tel_id], time_limit_core=self.time_limit_core_ns.tel[tel_id], time_limit_boundary=self.time_limit_boundary_ns.tel[tel_id], )