"""
Provenance-related functionality
TODO: have this register whenever ctapipe is loaded
"""
import json
import logging
import os
import platform
import sys
import uuid
from contextlib import contextmanager
from importlib import import_module
from os.path import abspath
import pkg_resources
import psutil
from astropy.time import Time
from pkg_resources import get_distribution
import ctapipe
from collections import UserList
from pathlib import Path
from .support import Singleton
log = logging.getLogger(__name__)
__all__ = ["Provenance"]
_interesting_env_vars = [
"CONDA_DEFAULT_ENV",
"CONDA_PREFIX",
"CONDA_PYTHON_EXE",
"CONDA_EXE",
"CONDA_PROMPT_MODIFIER",
"CONDA_SHLVL",
"PATH",
"LD_LIBRARY_PATH",
"DYLD_LIBRARY_PATH",
"USER",
"HOME",
"SHELL",
]
[docs]def get_module_version(name):
try:
module = import_module(name)
return module.__version__
except AttributeError:
try:
return get_distribution(name).version
except:
return "unknown"
except ImportError:
return "not installed"
[docs]class Provenance(metaclass=Singleton):
"""
Manage the provenance info for a stack of *activities*
use `start_activity(name) <start_activity>`_ to start an activity. Any calls to
`add_input_file` or `add_output_file` will register files within
that activity. Finish the current activity with `finish_activity`.
Nested activities are allowed, and handled as a stack. The final output
is not hierarchical, but a flat list of activities (however hierarchical
activities could easily be implemented if necessary)
"""
def __init__(self):
self._activities = [] # stack of active activities
self._finished_activities = []
[docs] def start_activity(self, activity_name=sys.executable):
""" push activity onto the stack"""
activity = _ActivityProvenance(activity_name)
activity.start()
self._activities.append(activity)
log.debug(f"started activity: {activity_name}")
[docs] def add_output_file(self, filename, role=None):
"""
register an output to the current activity
Parameters
----------
filename: str
name or url of file
role: str
role this output file satisfies (optional)
"""
self.current_activity.register_output(abspath(filename), role=role)
log.debug(
"added output entity '{}' to activity: '{}'".format(
filename, self.current_activity.name
)
)
[docs] def add_config(self, config):
"""
add configuration parameters to the current activity
Parameters
----------
config: dict
configuration paramters
"""
self.current_activity.register_config(config)
[docs] def finish_activity(self, status="completed", activity_name=None):
""" end the current activity """
activity = self._activities.pop()
if activity_name is not None and activity_name != activity.name:
raise ValueError(
"Tried to end activity '{}', but '{}' is current "
"activity".format(activity_name, activity.name)
)
activity.finish(status)
self._finished_activities.append(activity)
log.debug(f"finished activity: {activity.name}")
[docs] @contextmanager
def activity(self, name):
""" context manager for activities """
self.start_activity(name)
yield
self.finish_activity(name)
@property
def current_activity(self):
if len(self._activities) == 0:
log.debug("No activity has been started... starting a default one")
self.start_activity()
return self._activities[-1] # current activity as at the top of stack
@property
def finished_activities(self):
return self._finished_activities
@property
def provenance(self):
""" returns provenence for full list of activities """
return [x.provenance for x in self._finished_activities]
[docs] def as_json(self, **kwargs):
"""return all finished provenance as JSON. Kwargs for `json.dumps`
may be included, e.g. ``indent=4``"""
def set_default(obj):
""" handle sets (not part of JSON) by converting to list"""
if isinstance(obj, set):
return list(obj)
if isinstance(obj, UserList):
return list(obj)
if isinstance(obj, Path):
return str(obj)
return json.dumps(self.provenance, default=set_default, **kwargs)
@property
def active_activity_names(self):
return [x.name for x in self._activities]
@property
def finished_activity_names(self):
return [x.name for x in self._finished_activities]
[docs] def clear(self):
""" remove all tracked activities """
self._activities = []
self._finished_activities = []
class _ActivityProvenance:
"""
Low-level helper class to collect provenance information for a given
*activity*. Users should use `Provenance` as a top-level API,
not this class directly.
"""
def __init__(self, activity_name=sys.executable):
self._prov = {
"activity_name": activity_name,
"activity_uuid": str(uuid.uuid4()),
"start": {},
"stop": {},
"system": {},
"input": [],
"output": [],
}
self.name = activity_name
def start(self):
"""begin recording provenance for this activity. Set's up the system
and startup provenance data. Generally should be called at start of a
program."""
self._prov["start"].update(_sample_cpu_and_memory())
self._prov["system"].update(_get_system_provenance())
def register_input(self, url, role=None):
"""
Add a URL of a file to the list of inputs (can be a filename or full
url, if no URL specifier is given, assume 'file://')
Parameters
----------
url: str
filename or url of input file
role: str
role name that this input satisfies
"""
self._prov["input"].append(dict(url=url, role=role))
def register_output(self, url, role=None):
"""
Add a URL of a file to the list of outputs (can be a filename or full
url, if no URL specifier is given, assume 'file://')
Parameters
----------
url: str
filename or url of output file
role: str
role name that this output satisfies
"""
self._prov["output"].append(dict(url=url, role=role))
def register_config(self, config):
""" add a dictionary of configuration parameters to this activity"""
self._prov["config"] = config
def finish(self, status="completed"):
""" record final provenance information, normally called at shutdown."""
self._prov["stop"].update(_sample_cpu_and_memory())
# record the duration (wall-clock) for this activity
t_start = Time(self._prov["start"]["time_utc"], format="isot")
t_stop = Time(self._prov["stop"]["time_utc"], format="isot")
self._prov["status"] = status
self._prov["duration_min"] = (t_stop - t_start).to("min").value
@property
def output(self):
return self._prov.get("output", None)
@property
def input(self):
return self._prov.get("input", None)
def sample_cpu_and_memory(self):
"""
Record a snapshot of current CPU and memory information.
"""
if "samples" not in self._prov:
self._prov["samples"] = []
self._prov["samples"].append(_sample_cpu_and_memory())
@property
def provenance(self):
return self._prov
def _get_python_packages():
return [
{"name": p.project_name, "version": p.version, "path": p.module_path}
for p in sorted(pkg_resources.working_set, key=lambda p: p.project_name)
]
def _get_system_provenance():
"""return JSON string containing provenance for all things that are
fixed during the runtime"""
bits, linkage = platform.architecture()
return dict(
ctapipe_version=ctapipe.__version__,
ctapipe_resources_version=get_module_version("ctapipe_resources"),
eventio_version=get_module_version("eventio"),
ctapipe_svc_path=os.getenv("CTAPIPE_SVC_PATH"),
executable=sys.executable,
platform=dict(
architecture_bits=bits,
architecture_linkage=linkage,
machine=platform.machine(),
processor=platform.processor(),
node=platform.node(),
version=platform.version(),
system=platform.system(),
release=platform.release(),
libcver=platform.libc_ver(),
num_cpus=psutil.cpu_count(),
boot_time=Time(psutil.boot_time(), format="unix").isot,
),
python=dict(
version_string=sys.version,
version=platform.python_version_tuple(),
compiler=platform.python_compiler(),
implementation=platform.python_implementation(),
packages=_get_python_packages(),
),
environment=_get_env_vars(),
arguments=sys.argv,
start_time_utc=Time.now().isot,
)
def _get_env_vars():
envvars = {}
for var in _interesting_env_vars:
envvars[var] = os.getenv(var, None)
return envvars
def _sample_cpu_and_memory():
# times = np.asarray(psutil.cpu_times(percpu=True))
# mem = psutil.virtual_memory()
return dict(
time_utc=Time.now().utc.isot,
# memory=dict(total=mem.total,
# inactive=mem.inactive,
# available=mem.available,
# free=mem.free,
# wired=mem.wired),
# cpu=dict(ncpu=psutil.cpu_count(),
# user=list(times[:, 0]),
# nice=list(times[:, 1]),
# system=list(times[:, 2]),
# idle=list(times[:, 3])),
)