"""
Methods to interpolate broken pixels
"""
from abc import ABCMeta, abstractmethod
from typing import Tuple
import numpy as np
from ..core import TelescopeComponent
__all__ = [
"InvalidPixelHandler",
"NeighborAverage",
]
[docs]class InvalidPixelHandler(TelescopeComponent, metaclass=ABCMeta):
"""
An abtract base class for algorithms treating invalid pixel data in images
"""
[docs] @abstractmethod
def __call__(
self, tel_id, image, peak_time, pixel_mask
) -> Tuple[np.ndarray, np.ndarray]:
"""
Handle invalid (broken, high noise, ...) pixels.
E.g. replace by average over neighbors, see `NeighborAverage`).
Parameters
----------
tel_id : int
telescope id
image : np.ndarray
Array of pixel image values
peak_time : np.ndarray
Array of pixel peak_time values
pixel_mask : np.ndarray
Boolean mask of the pixels to be interpolated
Returns
-------
image : np.ndarray
Image with interpolated values
peak_time : np.ndarray
peak_time with interpolated values
"""
[docs]class NeighborAverage(InvalidPixelHandler):
[docs] def __call__(self, tel_id, image, peak_time, pixel_mask):
"""Interpolate pixels in dl1 images and peak_times
Pixels to be interpolated are replaced by the average value their neighbors.
Pixels where no valid neighbors are available are filled with zeros.
Parameters
----------
tel_id : int
telescope id
image : np.ndarray
Array of pixel image values
peak_time : np.ndarray
Array of pixel peak_time values
pixel_mask : np.ndarray
Boolean mask of the pixels to be interpolated
Returns
-------
image : np.ndarray
Image with interpolated values
peak_time : np.ndarray
peak_time with interpolated values
"""
geometry = self.subarray.tel[tel_id].camera.geometry
n_interpolated = np.count_nonzero(pixel_mask)
if n_interpolated == 0:
return image, peak_time
# exclude to-be-interpolated pixels from neighbors
neighbors = geometry.neighbor_matrix[pixel_mask] & ~pixel_mask
index, neighbor = np.nonzero(neighbors)
image_sum = np.zeros(n_interpolated, dtype=image.dtype)
count = np.zeros(n_interpolated, dtype=int)
peak_time_sum = np.zeros(n_interpolated, dtype=peak_time.dtype)
# calculate average of image and peak_time
np.add.at(count, index, 1)
np.add.at(image_sum, index, image[neighbor])
np.add.at(peak_time_sum, index, peak_time[neighbor])
valid = count > 0
np.divide(image_sum, count, out=image_sum, where=valid)
np.divide(peak_time_sum, count, out=peak_time_sum, where=valid)
peak_time = peak_time.copy()
peak_time[pixel_mask] = peak_time_sum
image = image.copy()
image[pixel_mask] = image_sum
return image, peak_time