Source code for leads.context
from abc import ABCMeta as _ABCMeta, abstractmethod as _abstractmethod
from collections import deque as _deque
from time import time as _time
from typing import TypeVar as _TypeVar, Generic as _Generic
from leads.constant import ESCMode
from leads.data import DataContainer
T = _TypeVar("T", bound=DataContainer)
[docs]
def _check_data_type(data: T, superclass: type[DataContainer] = DataContainer) -> None:
if not isinstance(data, superclass):
raise TypeError(f"New data must inherit from `{superclass}`")
[docs]
class Context(_Generic[T], metaclass=_ABCMeta):
def __init__(self, initial_data: T | None, data_seq_size: int, num_laps_timed: int) -> None:
"""
:param initial_data: initial data
:param data_seq_size: buffer size of history data
:param num_laps_timed: number of timed laps retained
"""
if initial_data:
_check_data_type(initial_data)
else:
initial_data = DataContainer()
self._initial_data_type: type[DataContainer] = type(initial_data)
if data_seq_size < 1:
raise ValueError("`data_seq_size` must be greater or equal to 1")
self._data_seq: _deque[DataContainer] = _deque((initial_data,), maxlen=data_seq_size)
self._speed_seq: _deque[float] = _deque(maxlen=data_seq_size)
self._lap_time_seq: _deque[int] = _deque((int(_time() * 1000),), maxlen=num_laps_timed + 1)
self._esc_mode: ESCMode = ESCMode.STANDARD
self._brake_indicator: bool = False
self._left_indicator: bool = False
self._right_indicator: bool = False
self._hazard: bool = False
[docs]
def data(self) -> T:
"""
:return: a copy of the current data container
"""
return self._data_seq[-1]
[docs]
def push(self, data: T) -> None:
"""
Push new data into the sequence.
:param data: the new data
"""
_check_data_type(data, self._initial_data_type)
self._data_seq.append(data)
self._speed_seq.append(data.speed)
[docs]
def esc_mode(self, esc_mode: ESCMode | None = None) -> ESCMode | None:
"""
Set or get the ESC mode.
:param esc_mode: the ESC mode or None if getter mode
:return: the ESC mode or None if setter mode
"""
if esc_mode is None:
return self._esc_mode
self._esc_mode = esc_mode
[docs]
@_abstractmethod
def update(self) -> None:
raise NotImplementedError
[docs]
@_abstractmethod
def intervene(self, *args, **kwargs) -> None: # real signature unknown
raise NotImplementedError
[docs]
@_abstractmethod
def suspend(self, *args, **kwargs) -> None: # real signature unknown
raise NotImplementedError
[docs]
def time_lap(self) -> None:
self._lap_time_seq.append(int(_time() * 1000))
[docs]
def lap_times(self) -> list[int]:
return [self._lap_time_seq[i] - self._lap_time_seq[i - 1] for i in range(1, len(self._lap_time_seq))]
[docs]
def speed_trend(self) -> float:
return (self._speed_seq[-1] - self._speed_seq[0]) / len(self._speed_seq) if len(self._speed_seq) > 1 else 0
[docs]
def brake_indicator(self, brake_indicator: bool | None = None) -> bool | None:
if brake_indicator is None:
return self._brake_indicator
self._brake_indicator = brake_indicator
[docs]
def left_indicator(self, left_indicator: bool | None = None, override: bool = False) -> bool | None:
if not override:
if self._hazard:
return True
if left_indicator:
self.right_indicator(False, True)
if left_indicator is None:
return self._left_indicator
self._left_indicator = left_indicator
[docs]
def right_indicator(self, right_indicator: bool | None = None, override: bool = False) -> bool | None:
if not override:
if self._hazard:
return True
if right_indicator:
self.left_indicator(False, True)
if right_indicator is None:
return self._right_indicator
self._right_indicator = right_indicator
[docs]
def hazard(self, hazard: bool | None = None) -> bool | None:
"""
Set or get the hazard light status.
:param hazard: True: hazard light on; False: hazard light off; None: getter mode
:return: the hazard light status or None if setter mode
"""
if hazard is None:
return self._hazard
self.left_indicator(False, True)
self.right_indicator(False, True)
self.left_indicator(hazard, True)
self.right_indicator(hazard, True)
self._hazard = hazard