Source code for leads.sft

from typing import Callable as _Callable

from leads.dt import Device
from leads.event import SuspensionEvent, SuspensionExitEvent
from leads.logger import L
from leads.registry import require_context


[docs] def mark_device(device: Device, system: str, *related: str, append: bool = True) -> None: setattr(device, "__sft_marker__", list(set(getattr(device, "__sft_marker__") + [ system, *related] if append and hasattr(device, "__sft_marker__") else [system, *related])))
[docs] def read_device_marker(device: Device) -> list[str] | None: return getattr(device, "__sft_marker__") if hasattr(device, "__sft_marker__") else None
[docs] class SystemFailureTracer(object): def __init__(self) -> None: super().__init__() self.on_fail: _Callable[[SuspensionEvent], None] = lambda _: None self.on_recover: _Callable[[SuspensionExitEvent], None] = lambda _: None self.on_device_fail: _Callable[[Device, str | Exception], None] = lambda _, __: None self.on_device_recover: _Callable[[Device], None] = lambda _: None self._system_failures: dict[str, int] = {} self._device_failures: dict[str, int] = {}
[docs] def system_ok(self, system: str) -> bool: return system not in self._system_failures or self._system_failures[system] < 1
[docs] def device_ok(self, tag: str) -> bool: return tag not in self._device_failures or self._device_failures[tag] < 1
[docs] def fail(self, device: Device, error: str | Exception) -> None: if isinstance(error, Exception): error = repr(error) if not (systems := read_device_marker(device)): raise RuntimeWarning(f"No system marked for device {device}") if (tag := device.tag()) not in self._device_failures: self._device_failures[tag] = 0 self._device_failures[tag] += 1 self.on_device_fail(device, error) L.error(f"{device} error: {error}") for system in systems: if system not in self._system_failures: self._system_failures[system] = 0 self._system_failures[system] += 1 self.on_fail(e := SuspensionEvent(context := require_context(), system, error)) context.suspend(e)
[docs] def recover(self, device: Device) -> None: if not (systems := read_device_marker(device)): raise RuntimeWarning(f"System not marked for device {device}") if (tag := device.tag()) in self._device_failures: self._device_failures[tag] -= 1 if self._device_failures[tag] < 1: self._device_failures.pop(tag) self.on_device_recover(device) L.debug(f"{device} recovered") for system in systems: if system not in self._system_failures: continue self._system_failures[system] -= 1 if self._system_failures[system] > 0: continue self._system_failures.pop(system) self.on_recover(e := SuspensionExitEvent(context := require_context(), system, "Recovered")) context.suspend(e)
SFT: SystemFailureTracer = SystemFailureTracer()