Source code for leads.ltm

from json import loads as _loads, dumps as _dumps
from os import chmod as _chmod, access as _access, R_OK as _R_OK, W_OK as _W_OK
from os.path import abspath as _abspath, exists as _exists

from leads.logger import L
from leads.types import SupportedConfigValue as _SupportedConfigValue

_PATH: str = f"{_abspath(__file__)[:-6]}_ltm/core"

_ltm: dict[str, _SupportedConfigValue] = {}


[docs] def _acquire_permission() -> bool: if _access(_PATH, _R_OK) and _access(_PATH, _W_OK): return True L.debug(f"Attempting to acquire permission for {_PATH}") try: _chmod(_PATH, 0o666) return True except Exception as e: L.debug(f"Failed to acquire permission: {repr(e)}") L.debug(f"For Linux users, try executing `sudo chmod 666 {_PATH}` manually or run LEADS as the root user") return False
[docs] def _load_ltm() -> None: if not _permission_ok: return global _ltm try: if not _exists(_PATH): with open(_PATH, "w") as f: f.write("{}") return with open(_PATH) as f: ltm_content = f.read() if not (ltm_content.startswith("{") and ltm_content.endswith("}")): ltm_content = "{}" _ltm = _loads(ltm_content) except Exception as e: L.warn(f"Attempted but failed to load LTM: {repr(e)}")
[docs] def _sync_ltm() -> None: if not _permission_ok: return try: with open(_PATH, "w") as f: f.write(_dumps(_ltm)) except Exception as e: L.warn(f"Attempted but failed to sync LTM: {repr(e)}")
[docs] def ltm_get(key: str) -> _SupportedConfigValue: return _ltm[key]
[docs] def ltm_set(key: str, value: _SupportedConfigValue) -> None: _ltm[key] = value _sync_ltm()
_permission_ok: bool = _acquire_permission() L.debug(f"LTM permission {"OK" if _permission_ok else "NOT OK"}: {_PATH}") _load_ltm()