Source code for leads.leads

from typing import TypeVar as _TypeVar, Any as _Any, override as _override, Literal as _Literal

from leads.context import Context
from leads.data import DataContainer
from leads.dt import has_device
from leads.event import EventListener, Event, DataPushedEvent, UpdateEvent, SuspensionEvent, InterventionEvent, \
    InterventionExitEvent, SuspensionExitEvent
from leads.plugin import Plugin
from leads.sft import SFT

T = _TypeVar("T", bound=DataContainer)


[docs] class _SuspensionException(Exception): def __init__(self, event: SuspensionEvent) -> None: super().__init__() self.event = event
[docs] class LEADS(Context[T]): def __init__(self, initial_data: T | None = None, data_seq_size: int = 100, num_laps_timed: int = 3) -> None: super().__init__(initial_data, data_seq_size, num_laps_timed) self._plugins: dict[str, Plugin] = {} self._event_listener: EventListener = EventListener()
[docs] def plugin(self, key: str, plugin: Plugin | None = None) -> Plugin | None: if plugin is None: return self._plugins[key] self._plugins[key] = plugin plugin.on_load(self)
[docs] def set_event_listener(self, event_listener: EventListener) -> None: event_listener.bind_chain(self._event_listener) self._event_listener = event_listener
[docs] @_override def suspend(self, event: SuspensionEvent) -> None: if isinstance(event, SuspensionExitEvent): self._event_listener.post_suspend(event) else: self._event_listener.pre_suspend(event)
[docs] def _acquire_data(self, name: str, key: str, mandatory: bool = True) -> _Any | None: try: return getattr(self.data(), name) except AttributeError: if mandatory: raise _SuspensionException(SuspensionEvent(self, key, f"No data for `{name}`"))
[docs] def _do_plugin_callback(self, method: _Literal["pre_push", "post_push", "pre_update", "post_update"]) -> None: for key, plugin in self._plugins.items(): if plugin.enabled(): try: for tag in plugin.required_devices(): if not has_device(tag) or not SFT.device_ok(tag): raise _SuspensionException(SuspensionEvent(self, key, f"Device {tag} not ok")) getattr(plugin, method)(self, {d: self._acquire_data(d, key) for d in plugin.required_data()}) except _SuspensionException as e: self.suspend(e.event)
[docs] @_override def push(self, data: T) -> None: self._event_listener.pre_push(DataPushedEvent(self, data)) self._do_plugin_callback("pre_push") super().push(data) self._do_plugin_callback("post_push") self._event_listener.post_push(DataPushedEvent(self, data))
[docs] @_override def intervene(self, event: InterventionEvent) -> None: if isinstance(event, InterventionExitEvent): self._event_listener.post_intervene(event) else: self._event_listener.pre_intervene(event)
[docs] @_override def update(self) -> None: self._do_plugin_callback("pre_update") self._event_listener.on_update(UpdateEvent(self)) self._do_plugin_callback("post_update")
[docs] @_override def brake_indicator(self, brake_indicator: bool | None = None) -> bool | None: initial_state = self._brake_indicator try: return super().brake_indicator(brake_indicator) finally: if self._brake_indicator != initial_state: self._event_listener.brake_indicator(Event("BRAKE_INDICATOR", self), brake_indicator)
[docs] @_override def left_indicator(self, left_indicator: bool | None = None, override: bool = False) -> bool | None: initial_state = self._left_indicator try: return super().left_indicator(left_indicator, override) finally: if self._left_indicator != initial_state: self._event_listener.left_indicator(Event("LEFT_INDICATOR", self), left_indicator)
[docs] @_override def right_indicator(self, right_indicator: bool | None = None, override: bool = False) -> bool | None: initial_state = self._right_indicator try: return super().right_indicator(right_indicator, override) finally: if self._right_indicator != initial_state: self._event_listener.right_indicator(Event("RIGHT_INDICATOR", self), right_indicator)
[docs] @_override def hazard(self, hazard: bool | None = None) -> bool | None: if (r := super().hazard(hazard)) is None: self._event_listener.hazard(Event("HAZARD", self), hazard) return r