Source code for ctapipe.io.hdf5tableio

"""Implementations of TableWriter and -Reader for HDF5 files"""
import enum
from pathlib import PurePath

import numpy as np
import tables
from astropy.time import Time
from astropy.units import Quantity

import ctapipe

from ..core import Container, Map
from .tableio import (
    EnumColumnTransform,
    FixedPointColumnTransform,
    QuantityColumnTransform,
    StringTransform,
    TableReader,
    TableWriter,
    TimeColumnTransform,
)

__all__ = ["HDF5TableWriter", "HDF5TableReader", "split_h5path"]

PYTABLES_TYPE_MAP = {
    "float": tables.Float64Col,
    "float64": tables.Float64Col,
    "float32": tables.Float32Col,
    "float16": tables.Float16Col,
    "int8": tables.Int8Col,
    "int16": tables.Int16Col,
    "int32": tables.Int32Col,
    "int64": tables.Int64Col,
    "int": tables.Int64Col,
    "uint8": tables.UInt8Col,
    "uint16": tables.UInt16Col,
    "uint32": tables.UInt32Col,
    "uint64": tables.UInt64Col,
    "bool": tables.BoolCol,  # python bool
    "bool_": tables.BoolCol,  # np.bool_
}


DEFAULT_FILTERS = tables.Filters(
    complevel=5,  # compression medium, tradeoff between speed and compression
    complib="blosc:zstd",  # use modern zstd algorithm
    fletcher32=True,  # add checksums to data chunks
)


[docs] def split_h5path(path): """ Split a path inside an hdf5 file into parent / child """ if not path.startswith("/"): raise ValueError("Path must start with /") head, _, tail = path.rstrip("/").rpartition("/") if head == "": head = "/" return head, tail
def get_hdf5_attr(attrs, name, default=None): if name in attrs: return attrs[name] return default def get_column_attrs(table): """ Read custom ctapipe column metadata from hdf5 table Parameters ---------- table : `tables.Table` The table for which to parse the column metadata attributes """ attrs = table.attrs column_attrs = {} for name, desc in table.coldescrs.items(): pos = desc._v_pos prefix = f"CTAFIELD_{pos}_" current = { "POS": pos, "DTYPE": desc.dtype, } for full_key in filter(lambda k: k.startswith(prefix), attrs._v_attrnamesuser): value = attrs[full_key] key = full_key[len(prefix) :] # convert numpy scalars to plain python objects if isinstance(value, np.str_ | np.bool_ | np.number): value = value.item() current[key] = value current["PREFIX"] = name.rpartition(current.get("NAME", name))[0].rstrip("_") column_attrs[name] = current return column_attrs def get_node_meta(node): """Return the metadata attached to a table object This does not include ctapipe-attached column descriptions and transform metadata, only any additional attributes defined. Parameters ---------- node : `tables.Node` The node for which to parse the metadata attributes """ def _ignore_column_descriptions(attr_name): return not attr_name.startswith("CTAFIELD_") meta = {} attrs = node._v_attrs for key in filter(_ignore_column_descriptions, attrs._v_attrnamesuser): value = attrs[key] # convert numpy scalars to plain python objects if isinstance(value, np.str_ | np.bool_ | np.number): value = value.item() meta[key] = value return meta def get_column_transforms(column_attrs): """ create any transforms needed to "undo" ones in the writer Parameters ---------- column_attrs : dict Column attrs, as returned by `get_column_attrs` """ transforms = {} for colname, col_attrs in column_attrs.items(): if unit := col_attrs.get("UNIT"): transform = QuantityColumnTransform(unit=unit) transforms[colname] = transform elif enum := col_attrs.get("ENUM"): transform = EnumColumnTransform(enum) transforms[colname] = transform elif scale := col_attrs.get("TIME_SCALE"): scale = scale.lower() time_format = col_attrs.get("TIME_FORMAT", "mjd") transform = TimeColumnTransform(scale=scale, format=time_format) transforms[colname] = transform elif scale := col_attrs.get("TRANSFORM_SCALE"): transform = FixedPointColumnTransform( scale=scale, offset=col_attrs.get("TRANSFORM_OFFSET", 0), source_dtype=col_attrs.get("TRANSFORM_DTYPE", "float32"), target_dtype=col_attrs["DTYPE"].base, ) transforms[colname] = transform elif col_attrs.get("TRANSFORM") == "string": maxlen = col_attrs["MAXLEN"] transform = StringTransform(maxlen) transforms[colname] = transform return transforms
[docs] class HDF5TableWriter(TableWriter): """ A very basic table writer that can take a container (or more than one) and write it to an HDF5 file. It does _not_ recursively write the container. This is intended as a building block to create a more complex I/O system. It works by creating a HDF5 Table description from the `~ctapipe.core.Field` definitions inside a container, where each item becomes a column in the table. The first time `HDF5TableWriter.write()` is called, the container(s) are registered and the table created in the output file. Each item in the container can also have an optional transform function that is called before writing to transform the value. For example, unit quantities always have their units removed, or converted to a common unit if specified in the `~ctapipe.core.Field`. Any metadata in the `~ctapipe.core.Container` (stored in ``Container.meta``) will be written to the table's header on the first call to write() Multiple tables may be written at once in a single file, as long as you change the table_name attribute to `write()` to specify which one to write to. Likewise multiple Containers can be merged into a single output table by passing a list of containers to `write()`. To append to existing files, pass the ``mode='a'`` option to the constructor. Parameters ---------- filename: str name of hdf5 output file group_name: str name of group into which to put all of the tables generated by this Writer (it will be placed under "/" in the file) add_prefix: bool if True, add the container prefix before each column name mode : str ('w', 'a') 'w' if you want to overwrite the file 'a' if you want to append data to the file root_uep : str root location of the ``group_name`` filters: pytables.Filters A set of filters (compression settings) to be used for all datasets created by this writer. kwargs: any other arguments that will be passed through to ``pytables.open_file``. """ def __init__( self, filename, group_name="", add_prefix=False, mode="w", root_uep="/", filters=DEFAULT_FILTERS, parent=None, config=None, **kwargs, ): super().__init__(add_prefix=add_prefix, parent=parent, config=config) self._schemas = {} self._tables = {} if mode not in ["a", "w", "r+"]: raise OSError(f"The mode '{mode}' is not supported for writing") kwargs.update(mode=mode, root_uep=root_uep, filters=filters) self.open(str(filename), **kwargs) self._group = "/" + group_name self.filters = filters self.log.debug("h5file: %s", self.h5file)
[docs] def open(self, filename, **kwargs): self.log.debug("kwargs for tables.open_file: %s", kwargs) self.h5file = tables.open_file(filename, **kwargs)
[docs] def close(self): self.h5file.close()
def _add_column_to_schema(self, table_name, schema, meta, field, name, value): typename = "" shape = 1 pos = len(schema.columns) if isinstance(value, Container): self.log.debug("Ignoring sub-container: %s/%s", table_name, name) return if isinstance(value, Map): self.log.debug("Ignoring map-field: %s/%s", table_name, name) return if self._is_column_excluded(table_name, name): self.log.debug("excluded column: %s/%s", table_name, name) return if name in schema.columns: self.log.warning("Found duplicated column %s, skipping", name) return # apply any user-defined transforms first value = self._apply_col_transform(table_name, name, value) # now set up automatic transforms to make values that cannot be # written in their default form into a form that is serializable if isinstance(value, enum.Enum): tr = EnumColumnTransform(enum=value.__class__) value = tr(value) self.add_column_transform(table_name, name, tr) if isinstance(value, Quantity): unit = field.unit or value.unit tr = QuantityColumnTransform(unit=unit) value = tr(value) self.add_column_transform(table_name, name, tr) if isinstance(value, np.ndarray): typename = value.dtype.name coltype = PYTABLES_TYPE_MAP[typename] shape = value.shape schema.columns[name] = coltype(shape=shape, pos=pos) elif isinstance(value, Time): # TODO: really should use MET, but need a func for that schema.columns[name] = tables.Float64Col(pos=pos) tr = TimeColumnTransform(scale="tai", format="mjd") self.add_column_transform(table_name, name, tr) elif type(value).__name__ in PYTABLES_TYPE_MAP: typename = type(value).__name__ coltype = PYTABLES_TYPE_MAP[typename] schema.columns[name] = coltype(pos=pos) elif isinstance(value, str): max_length = field.max_length or len(value.encode("utf-8")) tr = StringTransform(max_length) self.add_column_transform(table_name, name, tr) schema.columns[name] = tables.StringCol(itemsize=max_length, pos=pos) else: raise ValueError(f"Column {name} of type {type(value)} not writable") # add meta fields of transform transform = self._transforms[table_name].get(name) if transform is not None: if hasattr(transform, "get_meta"): meta.update(transform.get_meta(pos)) # add original field name name, without prefix meta[f"CTAFIELD_{pos}_NAME"] = field.name # add description to metadata meta[f"CTAFIELD_{pos}_DESC"] = field.description self.log.debug( f"Table {table_name}: " f"added col: {name} type: " f"{typename} shape: {shape} " f"with transform: {transform} " ) return True def _create_hdf5_table_schema(self, table_name, containers): """ Creates a pytables description class for the given containers and registers it in the Writer Parameters ---------- table_name: str name of table container: ctapipe.core.Container instance of an initialized container Returns ------- dictionary of extra metadata to add to the table's header """ class Schema(tables.IsDescription): pass meta = {} # any extra meta-data generated here (like units, etc) # set up any column transforms that were requested as regexps (i.e. # convert them to explicit transform in the _transforms dict if they # match) self._realize_regexp_transforms(table_name, containers) # create pytables schema description for the given container for container in containers: container.validate() # ensure the data are complete it = zip( container.items(add_prefix=self.add_prefix), container.fields.values() ) for (col_name, value), field in it: try: self._add_column_to_schema( table_name=table_name, schema=Schema, meta=meta, field=field, name=col_name, value=value, ) except ValueError: self.log.warning( f"Column {col_name}" f" with value {value!r} of type {type(value)} " f" of container {container.__class__.__name__} in" f" table {table_name} not writable, skipping" ) self._schemas[table_name] = Schema meta["CTAPIPE_VERSION"] = ctapipe.__version__ return meta def _setup_new_table(self, table_name, containers): """set up the table. This is called the first time `write()` is called on a new table""" self.log.debug("Initializing table '%s' in group '%s'", table_name, self._group) meta = self._create_hdf5_table_schema(table_name, containers) if table_name.startswith("/"): raise ValueError("Table name must not start with '/'") table_path = f"{self._group.rstrip('/')}/{table_name}" table_group, table_basename = split_h5path(table_path) for container in containers: meta.update(container.meta) # copy metadata from container if table_path not in self.h5file: table = self.h5file.create_table( where=table_group, name=table_basename, title="Storage of {}".format( ",".join(c.__class__.__name__ for c in containers) ), description=self._schemas[table_name], createparents=True, filters=self.filters, ) self.log.debug(f"CREATED TABLE: {table}") for key, val in meta.items(): table.attrs[key] = val else: table = self.h5file.get_node(table_path) self._tables[table_name] = table def _append_row(self, table_name, containers): """ append a row to an already initialized table. This is called automatically by `write()` """ table = self._tables[table_name] row = table.row for container in containers: selected_fields = filter( lambda kv: kv[0] in table.colnames, container.items(add_prefix=self.add_prefix), ) for colname, value in selected_fields: try: value = self._apply_col_transform(table_name, colname, value) row[colname] = value except Exception: self.log.error( f"Error writing col {colname} of " f"container {container.__class__.__name__}" ) raise row.append()
[docs] def write(self, table_name, containers): """ Write the contents of the given container or containers to a table. The first call to write will create a schema and initialize the table within the file. The shape of data within the container must not change between calls, since variable-length arrays are not supported. Parameters ---------- table_name: str name of table to write to containers: `ctapipe.core.Container` or `Iterable[ctapipe.core.Container]` container to write """ if isinstance(containers, Container): containers = (containers,) if table_name not in self._schemas: self._setup_new_table(table_name, containers) self._append_row(table_name, containers)
[docs] class HDF5TableReader(TableReader): """ Reader that reads a single row of an HDF5 table at once into a Container. Simply construct a `HDF5TableReader` with an input HDF5 file, and call the `read(path, container) <read>`_ method to get a generator that fills the given container with a new row of the table on each access. Columns in the table are automatically mapped to container fields by name, and if a field is missing in either, it is skipped during read, but a warning is emitted. Columns that were written by HDF5TableWriter and which had unit transforms applied, will have the units re-applied when reading (the unit used is stored in the header attributes). Note that this is only useful if you want to read all information *one event at a time* into a container, which is not very I/O efficient. For some other use cases, it may be much more efficient to access the table data directly, for example to read an entire column or table at once (which means not using the Container data structure). Todo: - add ability to synchronize reading of multiple tables on a key """ def __init__(self, filename, **kwargs): """ Parameters ---------- filename: str, pathlib.PurePath or tables.File instance name of hdf5 file or file handle kwargs: any other arguments that will be passed through to `pytables.file.open_file`. """ super().__init__() self._tables = {} self._col_mapping = {} self._prefixes = {} self._missing_fields = {} self._meta = {} kwargs.update(mode="r") if isinstance(filename, str) or isinstance(filename, PurePath): self.open(filename, **kwargs) elif isinstance(filename, tables.File): self._h5file = filename else: raise NotImplementedError( "filename needs to be either a string, pathlib.PurePath " "or tables.File" )
[docs] def open(self, filename, **kwargs): self._h5file = tables.open_file(filename, **kwargs)
[docs] def close(self): self._h5file.close()
def _setup_table(self, table_name, containers, prefixes, ignore_columns): table = self._h5file.get_node(table_name) self._tables[table_name] = table column_attrs = get_column_attrs(table) self._map_table_to_containers( table_name=table_name, column_attrs=column_attrs, containers=containers, prefixes=prefixes, ignore_columns=ignore_columns, ) # setup transforms transforms = get_column_transforms(column_attrs) for col_name, transform in transforms.items(): self.add_column_transform(table_name, col_name, transform) # store the meta self._meta[table_name] = get_node_meta(table) return table def _map_table_to_containers( self, table_name, column_attrs, containers, prefixes, ignore_columns, ): """identifies which columns in the table to read into the containers, by comparing their names including an optional prefix.""" self._missing_fields[table_name] = [] self._col_mapping[table_name] = [] self._prefixes[table_name] = [] inverse_mapping = {} for colname, col_attrs in column_attrs.items(): name = col_attrs.get("NAME") if name is not None: inverse_mapping[name] = colname cols_to_read = set() for container, prefix in zip(containers, prefixes): col_mapping = {} missing = [] for field_name in container.fields: if prefix is None: col_name = inverse_mapping.get(field_name) else: col_name = field_name if prefix == "" else f"{prefix}_{field_name}" if col_name in ignore_columns: continue elif col_name is None or col_name not in column_attrs: missing.append(field_name) self.log.warning( f"Table {table_name} is missing column {col_name} for field {field_name}" f" of container {container}. It will be skipped." ) else: if col_name in cols_to_read: raise OSError( "Mapping of column names to container fields is not unique" ", prefixes are required." f" Duplicated column: {col_name} / {field_name}" ) if prefix is None: prefix = column_attrs[col_name].get("PREFIX") col_mapping[field_name] = col_name cols_to_read.add(col_name) self._prefixes[table_name].append(prefix) self._missing_fields[table_name].append(missing) self._col_mapping[table_name].append(col_mapping) # check if the table has additional columns not present in any container for colname in column_attrs: if colname not in cols_to_read: self.log.debug( f"Table {table_name} contains column {colname} " "that does not map to any of the specified containers" )
[docs] def read(self, table_name, containers, prefixes=None, ignore_columns=None): """ Returns a generator that reads the next row from the table into the given container. The generator returns the same container. Note that no containers are copied, the data are overwritten inside. Parameters ---------- table_name: str name of table to read from containers : Iterable[ctapipe.core.Container] Container classes to fill prefix: bool, str or list Prefix that was added while writing the file. If None, the prefix in the file are used. This only works when the mapping from column name without prefix to container is unique. If True, the ``default_prefix`` attribute of the containers is used If False, no prefix is used. If a string is provided, it is used as prefix for all containers. If a list is provided, the length needs to match th number of containers. """ ignore_columns = set(ignore_columns) if ignore_columns is not None else set() return_iterable = True if isinstance(containers, Container): raise TypeError("Expected container *classes*, not *instances*") # check for a single container if isinstance(containers, type): containers = (containers,) return_iterable = False for container in containers: if isinstance(container, Container): raise TypeError("Expected container *classes*, not *instances*") if prefixes is False: prefixes = ["" for _ in containers] elif prefixes is True: prefixes = [container.default_prefix for container in containers] elif isinstance(prefixes, str): prefixes = [prefixes for _ in containers] elif prefixes is None: prefixes = [None] * len(containers) if len(prefixes) != len(containers): raise ValueError("Length of provided prefixes does not match containers") if table_name not in self._tables: tab = self._setup_table( table_name, containers=containers, prefixes=prefixes, ignore_columns=ignore_columns, ) else: tab = self._tables[table_name] prefixes = self._prefixes[table_name] missing = self._missing_fields[table_name] mappings = self._col_mapping[table_name] for row_index in range(len(tab)): # looping over table yields Row instances. # __getitem__ just gives plain numpy data row = tab[row_index] ret = [] for cls, prefix, mapping, missing_fields in zip( containers, prefixes, mappings, missing ): data = { field_name: self._apply_col_transform( table_name, col_name, row[col_name] ) for field_name, col_name in mapping.items() } # set missing fields to None for field_name in missing_fields: data[field_name] = None container = cls(**data, prefix=prefix) container.meta = self._meta[table_name] ret.append(container) if return_iterable: yield ret else: yield ret[0]