Source code for leads.dt.device
from abc import abstractmethod as _abstractmethod, ABCMeta as _ABCMeta
from threading import Thread as _Thread
from typing import Any as _Any, override as _override, overload as _overload
from leads.os import _thread_flags
[docs]
class Device(object):
@_overload
def __init__(self, *args, **kwargs) -> None: # real signature unknown
raise NotImplementedError
def __init__(self, *pins: int | str) -> None:
self._tag: str = ""
self._tag_locked: bool = False
self._parent_tags: tuple[str, ...] = ()
self._pins: tuple[int | str, ...] = pins
[docs]
@_override
def __str__(self) -> str:
return f"{len(self._parent_tags)}.{self._tag}"
[docs]
def level(self) -> int:
"""
Get the level of the device in the device tree.
:return: the number of parents
"""
return len(self._parent_tags)
[docs]
def tag(self, tag: str | None = None) -> str | None:
"""
Set or get the tag of the device. The tag will not be set if it is locked.
:param tag: the tag or None if getter mode
:return: the tag or None if setter mode
"""
if tag is None:
return self._tag
if not self._tag_locked:
self._tag = tag
[docs]
def lock_tag(self) -> None:
"""
Lock the tag of the device.
"""
self._tag_locked = True
[docs]
def initialize(self, *parent_tags: str) -> None:
self._parent_tags = parent_tags
[docs]
def read(self) -> _Any:
raise NotImplementedError
[docs]
def write(self, payload: _Any) -> None:
raise NotImplementedError
[docs]
def update(self, data: _Any) -> None:
raise NotImplementedError
[docs]
def close(self) -> None:
...
[docs]
class ShadowDevice(Device, metaclass=_ABCMeta):
def __init__(self, *pins: int | str) -> None:
super().__init__(*pins)
self._shadow_thread: _Thread | None = None
[docs]
@_abstractmethod
def loop(self) -> None:
raise NotImplementedError
[docs]
def run(self) -> None:
while _thread_flags.active:
self.loop()
[docs]
@_override
def initialize(self, *parent_tags: str) -> None:
super().initialize(*parent_tags)
self._shadow_thread = _Thread(name=f"{id(self)} shadow", target=self.run, daemon=True)
self._shadow_thread.start()