Source code for leads.dt.registry

from atexit import register as _register
from typing import Any as _Any, Callable as _Callable, Sequence as _Sequence

from leads.dt.controller import Controller
from leads.dt.device import Device
from leads.dt.predefined_tags import MAIN_CONTROLLER
from leads.logger import L

_controllers: dict[str, Controller] = {}
_devices: dict[str, Device] = {}


[docs] def controller(tag: str, parent: str | None = None, args: tuple[_Any, ...] = (), kwargs: dict[str, _Any] | None = None) -> _Callable[[type[Controller]], None]: if kwargs is None: kwargs = {} def _(target: type[Controller]) -> None: if not issubclass(target, Controller): raise TypeError("Controllers must inherit from `Controller`") register_controller(tag, target(*args, **kwargs), parent) return _
[docs] def device(tag: str | _Sequence[str], parent: str | _Sequence[str], args: tuple[_Any, ...] | list[tuple[_Any, ...]] = (), kwargs: dict[str, _Any] | list[dict[str, _Any]] | None = None) -> _Callable[[type[Device]], None]: if isinstance(tag, str): tag = [tag] n = len(tag) if isinstance(parent, str): parent = [parent] * n if isinstance(args, tuple): args = [args] * n if kwargs is None: kwargs = [{}] * n elif isinstance(kwargs, dict): kwargs = [kwargs] * n def _(target: type[Device]) -> None: if not issubclass(target, Device): raise TypeError("Devices must inherit from `Device`") for i in range(len(tag)): _register_device(target, tag[i], _controllers[parent[i]], args[i], kwargs[i]) return _
[docs] def register_controller(tag: str, c: Controller, parent: str | None = None) -> None: if tag in _controllers: raise RuntimeError(f"Cannot register: tag \"{tag}\" is already used") if parent: _controllers[parent].device(tag, c) else: c.tag(tag) c.lock_tag() _controllers[tag] = c
[docs] def has_controller(tag: str) -> bool: return tag in _controllers
[docs] def get_controller(tag: str) -> Controller: return _controllers[tag]
[docs] def _register_device(prototype: type[Device], tag: str, parent: Controller, args: tuple[_Any, ...], kwargs: dict[str, _Any]) -> None: instance = prototype(*args, **kwargs) parent.device(tag, instance) _devices[tag] = instance
[docs] def has_device(tag: str) -> bool: return tag in _devices
[docs] def get_device(tag: str) -> Device: return _devices[tag]
[docs] @_register def release() -> None: for d in _devices.values(): try: d.close() except Exception as e: L.debug(f"Error when closing device {d}: {repr(e)}") _devices.clear() for c in _controllers.values(): try: c.close() except Exception as e: L.debug(f"Error when closing controller {c}: {repr(e)}") _controllers.clear()
[docs] def initialize_main() -> None: get_controller(MAIN_CONTROLLER).initialize()