Source code for ctapipe.io.hdf5tableio

"""Implementations of TableWriter and -Reader for HDF5 files"""
import enum
from pathlib import PurePath
import re

import numpy as np
import tables
from astropy.time import Time
from astropy.units import Quantity, Unit

import ctapipe
from .tableio import (
    TableWriter,
    TableReader,
    FixedPointColumnTransform,
    TimeColumnTransform,
    EnumColumnTransform,
    QuantityColumnTransform,
)
from ..core import Container

__all__ = ["HDF5TableWriter", "HDF5TableReader"]

PYTABLES_TYPE_MAP = {
    "float": tables.Float64Col,
    "float64": tables.Float64Col,
    "float32": tables.Float32Col,
    "float16": tables.Float16Col,
    "int8": tables.Int8Col,
    "int16": tables.Int16Col,
    "int32": tables.Int32Col,
    "int64": tables.Int64Col,
    "int": tables.Int64Col,
    "uint8": tables.UInt8Col,
    "uint16": tables.UInt16Col,
    "uint32": tables.UInt32Col,
    "uint64": tables.UInt64Col,
    "bool": tables.BoolCol,
}


DEFAULT_FILTERS = tables.Filters(
    complevel=5,  # compression medium, tradeoff between speed and compression
    complib="blosc:zstd",  # use modern zstd algorithm
    fletcher32=True,  # add checksums to data chunks
)


def get_hdf5_attr(attrs, name, default=None):
    if name in attrs:
        return attrs[name]
    return default


[docs]class HDF5TableWriter(TableWriter): """ A very basic table writer that can take a container (or more than one) and write it to an HDF5 file. It does _not_ recursively write the container. This is intended as a building block to create a more complex I/O system. It works by creating a HDF5 Table description from the `~ctapipe.core.Field` definitions inside a container, where each item becomes a column in the table. The first time `HDF5TableWriter.write()` is called, the container(s) are registered and the table created in the output file. Each item in the container can also have an optional transform function that is called before writing to transform the value. For example, unit quantities always have their units removed, or converted to a common unit if specified in the `~ctapipe.core.Field`. Any metadata in the `~ctapipe.core.Container` (stored in ``Container.meta``) will be written to the table's header on the first call to write() Multiple tables may be written at once in a single file, as long as you change the table_name attribute to `write()` to specify which one to write to. Likewise multiple Containers can be merged into a single output table by passing a list of containers to `write()`. To append to existing files, pass the ``mode='a'`` option to the constructor. Parameters ---------- filename: str name of hdf5 output file group_name: str name of group into which to put all of the tables generated by this Writer (it will be placed under "/" in the file) add_prefix: bool if True, add the container prefix before each column name mode : str ('w', 'a') 'w' if you want to overwrite the file 'a' if you want to append data to the file root_uep : str root location of the ``group_name`` filters: pytables.Filters A set of filters (compression settings) to be used for all datasets created by this writer. kwargs: any other arguments that will be passed through to ``pytables.open_file``. """ def __init__( self, filename, group_name="", add_prefix=False, mode="w", root_uep="/", filters=DEFAULT_FILTERS, parent=None, config=None, **kwargs, ): super().__init__(add_prefix=add_prefix, parent=parent, config=config) self._schemas = {} self._tables = {} if mode not in ["a", "w", "r+"]: raise IOError(f"The mode '{mode}' is not supported for writing") kwargs.update(mode=mode, root_uep=root_uep, filters=filters) self.open(str(filename), **kwargs) self._group = "/" + group_name self.filters = filters self.log.debug("h5file: %s", self.h5file)
[docs] def open(self, filename, **kwargs): self.log.debug("kwargs for tables.open_file: %s", kwargs) self.h5file = tables.open_file(filename, **kwargs)
[docs] def close(self): self.h5file.close()
def _create_hdf5_table_schema(self, table_name, containers): """ Creates a pytables description class for the given containers and registers it in the Writer Parameters ---------- table_name: str name of table container: ctapipe.core.Container instance of an initalized container Returns ------- dictionary of extra metadata to add to the table's header """ class Schema(tables.IsDescription): pass meta = {} # any extra meta-data generated here (like units, etc) # set up any column tranforms that were requested as regexps (i.e. # convert them to explicit transform in the _transforms dict if they # match) self._realize_regexp_transforms(table_name, containers) # create pytables schema description for the given container pos = 0 for container in containers: container.validate() # ensure the data are complete for col_name, value in container.items(add_prefix=self.add_prefix): typename = "" shape = 1 if self._is_column_excluded(table_name, col_name): self.log.debug(f"excluded column: {table_name}/{col_name}") continue if col_name in Schema.columns: self.log.warning(f"Found duplicated column {col_name}, skipping") continue # apply any user-defined transforms first value = self._apply_col_transform(table_name, col_name, value) # now set up automatic transforms to make values that cannot be # written in their default form into a form that is serializable if isinstance(value, enum.Enum): tr = EnumColumnTransform(enum=value.__class__) value = tr(value) self.add_column_transform(table_name, col_name, tr) if isinstance(value, Quantity): if self.add_prefix and container.prefix: key = col_name.replace(container.prefix + "_", "") else: key = col_name unit = container.fields[key].unit or value.unit tr = QuantityColumnTransform(unit=unit) value = tr(value) self.add_column_transform(table_name, col_name, tr) if isinstance(value, np.ndarray): typename = value.dtype.name coltype = PYTABLES_TYPE_MAP[typename] shape = value.shape Schema.columns[col_name] = coltype(shape=shape, pos=pos) elif isinstance(value, Time): # TODO: really should use MET, but need a func for that Schema.columns[col_name] = tables.Float64Col(pos=pos) tr = TimeColumnTransform(scale="tai", format="mjd") self.add_column_transform(table_name, col_name, tr) elif type(value).__name__ in PYTABLES_TYPE_MAP: typename = type(value).__name__ coltype = PYTABLES_TYPE_MAP[typename] Schema.columns[col_name] = coltype(pos=pos) else: self.log.warning( f"Column {col_name} of " f"container {container.__class__.__name__} in " f"table {table_name} not writable, skipping" ) continue # add meta fields of transform transform = self._transforms[table_name].get(col_name) if transform is not None: if hasattr(transform, "get_meta"): meta.update(transform.get_meta(col_name)) pos += 1 # add desription to metadata if self.add_prefix: meta[f"{col_name}_DESC"] = container.fields[ re.sub(f"^{container.prefix}_", "", col_name) ].description else: meta[f"{col_name}_DESC"] = container.fields[col_name].description self.log.debug( f"Table {table_name}: " f"added col: {col_name} type: " f"{typename} shape: {shape} " f"with transform: {transform} " ) self._schemas[table_name] = Schema meta["CTAPIPE_VERSION"] = ctapipe.__version__ return meta def _setup_new_table(self, table_name, containers): """ set up the table. This is called the first time `write()` is called on a new table """ self.log.debug("Initializing table '%s' in group '%s'", table_name, self._group) meta = self._create_hdf5_table_schema(table_name, containers) if table_name.startswith("/"): raise ValueError("Table name must not start with '/'") table_path = PurePath(self._group) / PurePath(table_name) table_group = str(table_path.parent) table_basename = table_path.stem table_path = str(table_path) for container in containers: meta.update(container.meta) # copy metadata from container if table_path not in self.h5file: table = self.h5file.create_table( where=table_group, name=table_basename, title="Storage of {}".format( ",".join(c.__class__.__name__ for c in containers) ), description=self._schemas[table_name], createparents=True, filters=self.filters, ) self.log.debug(f"CREATED TABLE: {table}") for key, val in meta.items(): table.attrs[key] = val else: table = self.h5file.get_node(table_path) self._tables[table_name] = table def _append_row(self, table_name, containers): """ append a row to an already initialized table. This is called automatically by `write()` """ table = self._tables[table_name] row = table.row for container in containers: selected_fields = filter( lambda kv: kv[0] in table.colnames, container.items(add_prefix=self.add_prefix), ) for colname, value in selected_fields: try: value = self._apply_col_transform(table_name, colname, value) row[colname] = value except Exception: self.log.error( f"Error writing col {colname} of " f"container {container.__class__.__name__}" ) raise row.append()
[docs] def write(self, table_name, containers): """ Write the contents of the given container or containers to a table. The first call to write will create a schema and initialize the table within the file. The shape of data within the container must not change between calls, since variable-length arrays are not supported. Parameters ---------- table_name: str name of table to write to containers: `ctapipe.core.Container` or `Iterable[ctapipe.core.Container]` container to write """ if isinstance(containers, Container): containers = (containers,) if table_name not in self._schemas: self._setup_new_table(table_name, containers) self._append_row(table_name, containers)
[docs]class HDF5TableReader(TableReader): """ Reader that reads a single row of an HDF5 table at once into a Container. Simply construct a `HDF5TableReader` with an input HDF5 file, and call the `read(path, container) <read>`_ method to get a generator that fills the given container with a new row of the table on each access. Columns in the table are automatically mapped to container fields by name, and if a field is missing in either, it is skipped during read, but a warning is emitted. Columns that were written by HDF5TableWriter and which had unit transforms applied, will have the units re-applied when reading (the unit used is stored in the header attributes). Note that this is only useful if you want to read all information *one event at a time* into a container, which is not very I/O efficient. For some other use cases, it may be much more efficient to access the table data directly, for example to read an entire column or table at once (which means not using the Container data structure). Todo: - add ability to synchronize reading of multiple tables on a key """ def __init__(self, filename, **kwargs): """ Parameters ---------- filename: str, pathlib.PurePath or tables.File instance name of hdf5 file or file handle kwargs: any other arguments that will be passed through to `pytables.file.open_file`. """ super().__init__() self._tables = {} self._cols_to_read = {} self._missing_cols = {} kwargs.update(mode="r") if isinstance(filename, str) or isinstance(filename, PurePath): self.open(filename, **kwargs) elif isinstance(filename, tables.File): self._h5file = filename else: raise NotImplementedError( "filename needs to be either a string, pathlib.PurePath " "or tables.File" )
[docs] def open(self, filename, **kwargs): self._h5file = tables.open_file(filename, **kwargs)
[docs] def close(self): self._h5file.close()
def _setup_table(self, table_name, containers, prefixes, ignore_columns): tab = self._h5file.get_node(table_name) self._tables[table_name] = tab self._map_table_to_containers(table_name, containers, prefixes, ignore_columns) self._map_transforms_from_table_header(table_name) return tab def _map_transforms_from_table_header(self, table_name): """ create any transforms needed to "undo" ones in the writer """ tab = self._tables[table_name] attrs = tab.attrs._f_list() for attr in attrs: if attr.endswith("_UNIT"): colname = attr[:-5] tr = QuantityColumnTransform(unit=Unit(tab.attrs[attr])) self.add_column_transform(table_name, colname, tr) elif attr.endswith("_ENUM"): colname = attr[:-5] tr = EnumColumnTransform(tab.attrs[attr]) self.add_column_transform(table_name, colname, tr) elif attr.endswith("_TIME_SCALE"): colname, _, _ = attr.rpartition("_TIME_SCALE") scale = tab.attrs[attr] time_format = get_hdf5_attr(tab.attrs, colname + "_TIME_FORMAT", "mjd") transform = TimeColumnTransform(scale=scale, format=time_format) self.add_column_transform(table_name, colname, transform) elif attr.endswith("_TRANSFORM_SCALE"): colname, _, _ = attr.rpartition("_TRANSFORM_SCALE") tr = FixedPointColumnTransform( scale=tab.attrs[attr], offset=get_hdf5_attr(tab.attrs, colname + "_TRANSFORM_OFFSET", 0), source_dtype=get_hdf5_attr( tab.attrs, colname + "_TRANSFORM_DTYPE", "float32" ), target_dtype=tab.dtype[colname].base, ) self.add_column_transform(table_name, colname, tr) def _map_table_to_containers( self, table_name, containers, prefixes, ignore_columns ): """ identifies which columns in the table to read into the containers, by comparing their names including an optional prefix.""" tab = self._tables[table_name] self._cols_to_read[table_name] = [] self._missing_cols[table_name] = [] for container, prefix in zip(containers, prefixes): self._missing_cols[table_name].append([]) for colname in tab.colnames: if prefix and colname.startswith(prefix): colname_without_prefix = colname[len(prefix) + 1 :] else: colname_without_prefix = colname if colname_without_prefix in container.fields: self._cols_to_read[table_name].append(colname) else: self.log.debug( f"Table {table_name} has column {colname_without_prefix} that is not in " f"container {container.__class__.__name__}. It will be skipped." ) # also check that the container doesn't have fields that are not # in the table: for colname in container.fields: if colname in ignore_columns: continue if prefix: colname_with_prefix = f"{prefix}_{colname}" else: colname_with_prefix = colname if colname_with_prefix not in self._cols_to_read[table_name]: self._missing_cols[table_name][-1].append(colname) self.log.warning( f"Table {table_name} is missing column {colname_with_prefix} " f"that is in container {container.__class__.__name__}. " "It will be skipped." ) # copy all user-defined attributes back to Container.meta for key in tab.attrs._f_list(): container.meta[key] = tab.attrs[key] # check if the table has additional columns not present in any container for colname in tab.colnames: if colname not in self._cols_to_read[table_name]: self.log.debug( f"Table {table_name} contains column {colname} " "that does not map to any of the specified containers" )
[docs] def read(self, table_name, containers, prefixes=False, ignore_columns=None): """ Returns a generator that reads the next row from the table into the given container. The generator returns the same container. Note that no containers are copied, the data are overwritten inside. Parameters ---------- table_name: str name of table to read from container : ctapipe.core.Container Container instance to fill prefix: bool, str or list Prefix that was added while writing the file. If True, the container prefix is taken into consideration, when comparing column names and container fields. If False, no prefix is used. If a string is provided, it is used as prefix for all containers. If a list is provided, the length needs to match th number of containers. """ ignore_columns = set(ignore_columns) if ignore_columns is not None else set() return_iterable = True if isinstance(containers, Container): containers = (containers,) return_iterable = False if prefixes is False: prefixes = ["" for _ in containers] elif prefixes is True: prefixes = [container.prefix for container in containers] elif isinstance(prefixes, str): prefixes = [prefixes for _ in containers] assert len(prefixes) == len(containers) if table_name not in self._tables: tab = self._setup_table(table_name, containers, prefixes, ignore_columns) else: tab = self._tables[table_name] row_count = 0 while 1: try: row = tab[row_count] except IndexError: return # stop generator when done missing = self._missing_cols[table_name] for container, prefix, missing_cols in zip(containers, prefixes, missing): for fieldname in container.keys(): if prefix: colname = f"{prefix}_{fieldname}" else: colname = fieldname if colname not in self._cols_to_read[table_name]: continue container[fieldname] = self._apply_col_transform( table_name, colname, row[colname] ) # set missing fields to None for fieldname in missing_cols: container[fieldname] = None if return_iterable: yield containers else: yield containers[0] row_count += 1