From d888ae7acb639de0f79828ed93c0a2b73a82359d Mon Sep 17 00:00:00 2001 From: zosimovaa Date: Sat, 21 Feb 2026 00:20:07 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=B5=D1=80=D0=B2=D0=B0=D1=8F=20=D0=B8?= =?UTF-8?q?=D1=82=D0=B5=D1=80=D0=B0=D1=86=D0=B8=D1=8F=20=D1=80=D0=B5=D1=84?= =?UTF-8?q?=D0=B0=D0=BA=D1=82=D0=BE=D1=80=D0=B8=D0=BD=D0=B3=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pyproject.toml | 2 + src/config_manager/__init__.py | 2 +- src/config_manager/v2/__init__.py | 9 +- src/config_manager/v2/config_loader.py | 52 ---- src/config_manager/v2/control/__init__.py | 3 + src/config_manager/v2/control/base.py | 7 +- src/config_manager/v2/control/telegram.py | 17 +- src/config_manager/v2/core/__init__.py | 8 + src/config_manager/v2/core/config_loader.py | 70 +++++ src/config_manager/v2/core/manager.py | 228 ++++++++++++++++ src/config_manager/v2/{ => core}/scheduler.py | 7 +- src/config_manager/v2/health.py | 80 ------ src/config_manager/v2/management/__init__.py | 14 + src/config_manager/v2/management/bridges.py | 61 +++++ .../v2/management/health_aggregator.py | 38 +++ .../v2/management/management_server.py | 145 ++++++++++ src/config_manager/v2/manager.py | 256 ------------------ src/config_manager/v2/types.py | 10 +- tests/v2/test_health_endpoint.py | 69 ++++- 19 files changed, 668 insertions(+), 410 deletions(-) delete mode 100644 src/config_manager/v2/config_loader.py create mode 100644 src/config_manager/v2/core/__init__.py create mode 100644 src/config_manager/v2/core/config_loader.py create mode 100644 src/config_manager/v2/core/manager.py rename src/config_manager/v2/{ => core}/scheduler.py (67%) delete mode 100644 src/config_manager/v2/health.py create mode 100644 src/config_manager/v2/management/__init__.py create mode 100644 src/config_manager/v2/management/bridges.py create mode 100644 src/config_manager/v2/management/health_aggregator.py create mode 100644 src/config_manager/v2/management/management_server.py delete mode 100644 src/config_manager/v2/manager.py diff --git a/pyproject.toml b/pyproject.toml index b4fcadb..44dbdd4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,8 @@ readme = "README.md" requires-python = ">=3.8" dependencies = [ "PyYAML>=6.0", + "fastapi>=0.100.0", + "uvicorn[standard]>=0.22.0", ] [project.urls] diff --git a/src/config_manager/__init__.py b/src/config_manager/__init__.py index 99a4596..6c06ed3 100644 --- a/src/config_manager/__init__.py +++ b/src/config_manager/__init__.py @@ -1,3 +1,3 @@ -from .v2.manager import ConfigManagerV2 as ConfigManager +from .v2 import ConfigManagerV2 as ConfigManager from .v1.cfg_manager import ConfigManager as LegacyConfigManager from .v1.log_manager import LogManager \ No newline at end of file diff --git a/src/config_manager/v2/__init__.py b/src/config_manager/v2/__init__.py index add8ff7..3f1bcc7 100644 --- a/src/config_manager/v2/__init__.py +++ b/src/config_manager/v2/__init__.py @@ -1,4 +1,7 @@ -from .manager import ConfigManagerV2 -from .types import HealthServerSettings +"""Публичный API V2: точка входа в менеджер конфигурации и настройки management-сервера. -__all__ = ["ConfigManagerV2", "HealthServerSettings"] +Экспортирует ConfigManagerV2 и типы настроек для использования приложениями.""" +from .core import ConfigManagerV2 +from .types import HealthServerSettings, ManagementServerSettings + +__all__ = ["ConfigManagerV2", "ManagementServerSettings", "HealthServerSettings"] diff --git a/src/config_manager/v2/config_loader.py b/src/config_manager/v2/config_loader.py deleted file mode 100644 index 2800b0b..0000000 --- a/src/config_manager/v2/config_loader.py +++ /dev/null @@ -1,52 +0,0 @@ -from __future__ import annotations - -import asyncio -import hashlib -import json -import os -from typing import Any, Optional - -import yaml - - -class ConfigLoader: - def __init__(self, path: str): - """Initialize loader state for a specific config file path.""" - self.path = path - self.config: Any = None - self.last_valid_config: Any = None - self._last_seen_hash: Optional[str] = None - - def _read_file_sync(self) -> str: - """Read raw config text from disk synchronously.""" - with open(self.path, "r", encoding="utf-8") as fh: - return fh.read() - - async def read_file_async(self) -> str: - """Read raw config text from disk in a worker thread.""" - return await asyncio.to_thread(self._read_file_sync) - - def parse_config(self, data: str) -> Any: - """Parse config text as YAML or JSON based on file extension.""" - extension = os.path.splitext(self.path)[1].lower() - if extension in (".yaml", ".yml"): - return yaml.safe_load(data) - return json.loads(data) - - @staticmethod - def _calculate_hash(data: str) -> str: - """Calculate a stable content hash for change detection.""" - return hashlib.sha256(data.encode("utf-8")).hexdigest() - - async def load_if_changed(self) -> tuple[bool, Any]: - """Load and parse config only when file content changed.""" - raw_data = await self.read_file_async() - current_hash = self._calculate_hash(raw_data) - if current_hash == self._last_seen_hash: - return False, self.config - - self._last_seen_hash = current_hash - parsed = self.parse_config(raw_data) - self.config = parsed - self.last_valid_config = parsed - return True, parsed diff --git a/src/config_manager/v2/control/__init__.py b/src/config_manager/v2/control/__init__.py index 084e95f..12b40e1 100644 --- a/src/config_manager/v2/control/__init__.py +++ b/src/config_manager/v2/control/__init__.py @@ -1,3 +1,6 @@ +"""Каналы внешнего управления: абстракция и реализация (например, Telegram). + +Позволяет запускать, останавливать и запрашивать статус менеджера через ботов и другие интерфейсы.""" from .base import ControlChannel from .telegram import TelegramControlChannel diff --git a/src/config_manager/v2/control/base.py b/src/config_manager/v2/control/base.py index 37b4f87..e5cc6d2 100644 --- a/src/config_manager/v2/control/base.py +++ b/src/config_manager/v2/control/base.py @@ -1,3 +1,6 @@ +"""Базовый абстрактный канал управления и типы обработчиков команд. + +Определяет контракт: старт/стоп канала и привязка обработчиков start/stop/status.""" from __future__ import annotations from abc import ABC, abstractmethod @@ -12,10 +15,10 @@ StatusHandler = Callable[[], Awaitable[str]] class ControlChannel(ABC): @abstractmethod async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None: - """Start channel and bind command handlers.""" + """Запустить канал и привязать обработчики команд.""" raise NotImplementedError @abstractmethod async def stop(self) -> None: - """Stop channel and release its resources.""" + """Остановить канал и освободить его ресурсы.""" raise NotImplementedError diff --git a/src/config_manager/v2/control/telegram.py b/src/config_manager/v2/control/telegram.py index a47c0d5..e88522c 100644 --- a/src/config_manager/v2/control/telegram.py +++ b/src/config_manager/v2/control/telegram.py @@ -1,3 +1,6 @@ +"""Реализация канала управления через Telegram Bot API (long polling). + +Принимает команды /start, /stop, /status в указанном чате и вызывает привязанные обработчики.""" from __future__ import annotations import asyncio @@ -18,7 +21,7 @@ class TelegramControlChannel(ControlChannel): poll_interval: float = 2.0, logger: Optional[logging.Logger] = None, ): - """Initialize Telegram polling channel with bot and chat settings.""" + """Инициализация канала опроса Telegram с настройками бота и чата.""" self._token = token self._chat_id = chat_id self._poll_interval = poll_interval @@ -31,7 +34,7 @@ class TelegramControlChannel(ControlChannel): self._logger = logger or logging.getLogger(__name__) async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None: - """Start polling Telegram updates and register command callbacks.""" + """Запустить опрос обновлений Telegram и зарегистрировать колбэки команд.""" if self._task is not None and not self._task.done(): return self._on_start = on_start @@ -41,7 +44,7 @@ class TelegramControlChannel(ControlChannel): self._task = asyncio.create_task(self._poll_loop()) async def stop(self) -> None: - """Stop polling loop and wait until task termination.""" + """Остановить цикл опроса и дождаться завершения задачи.""" self._stop_event.set() if self._task is not None: self._task.cancel() @@ -52,7 +55,7 @@ class TelegramControlChannel(ControlChannel): self._task = None async def _poll_loop(self) -> None: - """Continuously fetch updates and dispatch supported commands.""" + """Непрерывно получать обновления и вызывать поддерживаемые команды.""" while not self._stop_event.is_set(): try: updates = await asyncio.to_thread(self._fetch_updates) @@ -67,7 +70,7 @@ class TelegramControlChannel(ControlChannel): continue def _fetch_updates(self) -> list[dict]: - """Pull new Telegram updates using the latest offset.""" + """Запросить новые обновления Telegram с учётом последнего offset.""" params = {"timeout": 0} if self._offset is not None: params["offset"] = self._offset @@ -82,7 +85,7 @@ class TelegramControlChannel(ControlChannel): return result async def _process_update(self, update: dict) -> None: - """Handle one Telegram update and execute mapped command.""" + """Обработать одно обновление Telegram и выполнить соответствующую команду.""" message = update.get("message") or {} text = (message.get("text") or "").strip().lower() chat = message.get("chat") or {} @@ -103,7 +106,7 @@ class TelegramControlChannel(ControlChannel): await asyncio.to_thread(self._send_message, reply) def _send_message(self, text: str) -> None: - """Send plain-text reply to the configured Telegram chat.""" + """Отправить текстовый ответ в настроенный чат Telegram.""" encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text}) url = f"https://api.telegram.org/bot{self._token}/sendMessage" req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST") diff --git a/src/config_manager/v2/core/__init__.py b/src/config_manager/v2/core/__init__.py new file mode 100644 index 0000000..c44ab6a --- /dev/null +++ b/src/config_manager/v2/core/__init__.py @@ -0,0 +1,8 @@ +"""Ядро V2: жизненный цикл менеджера, загрузка конфига и цикл воркера. + +Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute().""" +from .config_loader import ConfigLoader +from .manager import ConfigManagerV2 +from .scheduler import WorkerLoop + +__all__ = ["ConfigLoader", "ConfigManagerV2", "WorkerLoop"] diff --git a/src/config_manager/v2/core/config_loader.py b/src/config_manager/v2/core/config_loader.py new file mode 100644 index 0000000..4c96391 --- /dev/null +++ b/src/config_manager/v2/core/config_loader.py @@ -0,0 +1,70 @@ +"""Загрузчик конфигурации из файла (YAML/JSON) с обнаружением изменений по хешу. + +Читает файл синхронно и асинхронно, парсит по расширению и возвращает последний валидный конфиг при ошибках.""" +from __future__ import annotations + +import asyncio +import hashlib +import json +import os +from typing import Any, Optional + +import yaml + + +class ConfigLoader: + def __init__(self, path: str): + """Инициализация состояния загрузчика для указанного пути к файлу конфига.""" + self.path = path + self.config: Any = None + self.last_valid_config: Any = None + self._last_seen_hash: Optional[str] = None + + def _read_file_sync(self) -> str: + """Синхронно прочитать сырой текст конфига с диска.""" + with open(self.path, "r", encoding="utf-8") as fh: + return fh.read() + + async def read_file_async(self) -> str: + """Прочитать сырой текст конфига с диска в рабочем потоке.""" + return await asyncio.to_thread(self._read_file_sync) + + def parse_config(self, data: str) -> Any: + """Распарсить текст конфига как YAML или JSON по расширению файла.""" + extension = os.path.splitext(self.path)[1].lower() + if extension in (".yaml", ".yml"): + return yaml.safe_load(data) + return json.loads(data) + + @staticmethod + def _calculate_hash(data: str) -> str: + """Вычислить устойчивый хеш содержимого для обнаружения изменений.""" + return hashlib.sha256(data.encode("utf-8")).hexdigest() + + async def load_if_changed(self) -> tuple[bool, Any]: + """Загрузить и распарсить конфиг только при изменении содержимого файла.""" + raw_data = await self.read_file_async() + current_hash = self._calculate_hash(raw_data) + if current_hash == self._last_seen_hash: + return False, self.config + + self._last_seen_hash = current_hash + parsed = self.parse_config(raw_data) + self.config = parsed + self.last_valid_config = parsed + return True, parsed + + +def extract_scheduler_intervals( + config: Any, + default_update: float, + default_work: float, +) -> tuple[float, float]: + """Извлекает update_interval и work_interval из конфига (dict). Возвращает значения по умолчанию при ошибке.""" + if not isinstance(config, dict): + return default_update, default_work + upd = config.get("update_interval") + wrk = config.get("work_interval") + u = float(upd) if isinstance(upd, (int, float)) and upd > 0 else default_update + w = float(wrk) if isinstance(wrk, (int, float)) and wrk > 0 else default_work + return u, w diff --git a/src/config_manager/v2/core/manager.py b/src/config_manager/v2/core/manager.py new file mode 100644 index 0000000..f2b3400 --- /dev/null +++ b/src/config_manager/v2/core/manager.py @@ -0,0 +1,228 @@ +"""Главный класс менеджера V2: оркестрация жизненного цикла, конфига, API и каналов управления. + +Запускает воркер и периодическое обновление конфига, поднимает management-сервер и control-канал при наличии настроек.""" +from __future__ import annotations + +import asyncio +import logging +from typing import Any, Optional + +from ...v1.log_manager import LogManager +from ..control.base import ControlChannel +from ..management import ( + ControlChannelBridge, + HealthAggregator, + ManagementApiBridge, + ManagementServer, +) +from ..types import HealthPayload, LifecycleState, ManagementServerSettings +from .config_loader import ConfigLoader, extract_scheduler_intervals +from .scheduler import WorkerLoop + + +class ConfigManagerV2: + DEFAULT_UPDATE_INTERVAL = 5.0 + DEFAULT_WORK_INTERVAL = 2.0 + + def __init__( + self, + path: str, + log_manager: Optional[LogManager] = None, + management_settings: Optional[ManagementServerSettings] = None, + health_settings: Optional[ManagementServerSettings] = None, + control_channel: Optional[ControlChannel] = None, + ): + """Инициализация подсистем менеджера и состояния рантайма.""" + self.path = path + self.config: Any = None + self.update_interval = self.DEFAULT_UPDATE_INTERVAL + self.work_interval = self.DEFAULT_WORK_INTERVAL + + self._loader = ConfigLoader(path) + self._log_manager = log_manager or LogManager() + self._control_channel = control_channel + + self._halt = asyncio.Event() + self._task: Optional[asyncio.Task] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + + self._state = LifecycleState.IDLE + self._last_execute_error: Optional[str] = None + + if management_settings is not None and health_settings is not None: + raise ValueError("Use either management_settings or health_settings, not both") + + self._management_settings = management_settings or health_settings or ManagementServerSettings(enabled=True) + self._health_aggregator = HealthAggregator( + get_state=lambda: self._state, + get_last_error=lambda: self._last_execute_error, + get_app_health=self.get_health_status, + ) + self._api_bridge = ManagementApiBridge(start_fn=self.start, stop_fn=self.stop) + self._control_bridge = ControlChannelBridge( + halt=self._halt, + get_state=lambda: self._state, + get_status=self._status_text, + ) + self._management_server: Optional[ManagementServer] = None + if self._management_settings.enabled: + self._management_server = ManagementServer( + host=self._management_settings.host, + port=self._management_settings.port, + timeout=self._management_settings.timeout, + health_provider=self._health_aggregator.collect, + on_start=self._api_bridge.on_start, + on_stop=self._api_bridge.on_stop, + ) + + self.logger = logging.getLogger(__name__) + + def _apply_config(self, new_config: Any) -> None: + """Применить загруженный конфиг: интервалы и log_manager. Вызывается после load_if_changed.""" + self.config = new_config + self.update_interval, self.work_interval = extract_scheduler_intervals( + new_config, + self.DEFAULT_UPDATE_INTERVAL, + self.DEFAULT_WORK_INTERVAL, + ) + if isinstance(new_config, dict): + self._log_manager.apply_config(new_config) + + async def _update_config(self) -> None: + """Перезагрузить конфиг при изменении файла и применить к состоянию и log_manager.""" + try: + changed, new_config = await self._loader.load_if_changed() + if not changed: + return + self._apply_config(new_config) + except Exception as exc: # noqa: BLE001 + self.logger.error("Error reading/parsing config file: %s", exc) + if self._loader.last_valid_config is not None: + self._apply_config(self._loader.last_valid_config) + + def execute(self) -> None: + """Переопределить в подклассе для реализации одной единицы блокирующей работы.""" + + def get_health_status(self) -> HealthPayload: + """Вернуть payload здоровья приложения для /health.""" + return {"status": "ok"} + + def _on_execute_success(self) -> None: + """Сбросить маркер последней ошибки выполнения после успешного запуска.""" + self._last_execute_error = None + + def _on_execute_error(self, exc: Exception) -> None: + """Сохранить и залогировать детали ошибки выполнения для отчёта здоровья.""" + self._last_execute_error = str(exc) + self.logger.error("Execution error: %s", exc) + + async def _worker_loop(self) -> None: + """Вызывать execute() циклически до запроса остановки.""" + worker = WorkerLoop( + execute=self.execute, + get_interval=lambda: self.work_interval, + halt_event=self._halt, + on_error=self._on_execute_error, + on_success=self._on_execute_success, + ) + await worker.run() + + async def _periodic_update_loop(self) -> None: + """Периодически проверять файл конфига на обновления до остановки.""" + while not self._halt.is_set(): + await self._update_config() + try: + await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05)) + except asyncio.TimeoutError: + continue + + async def _status_text(self) -> str: + """Сформировать читаемый статус рантайма для каналов управления.""" + health = await self._health_aggregator.collect() + detail = health.get("detail") + if detail: + return f"state={self._state.value}; health={health['status']}; detail={detail}" + return f"state={self._state.value}; health={health['status']}" + + async def _start_control_channel(self) -> None: + """Запустить настроенный канал управления с привязанными обработчиками команд.""" + if self._control_channel is None: + return + await self._control_channel.start( + self._control_bridge.on_start, + self._control_bridge.on_stop, + self._control_bridge.on_status, + ) + + async def _stop_control_channel(self) -> None: + """Остановить настроенный канал управления, если он активен.""" + if self._control_channel is None: + return + await self._control_channel.stop() + + async def _run(self) -> None: + """Запустить жизненный цикл менеджера и координировать фоновые задачи.""" + self._state = LifecycleState.STARTING + self._halt.clear() + await self._update_config() + + if self._management_server is not None: + await self._management_server.start() + await self._start_control_channel() + + self._state = LifecycleState.RUNNING + self.logger.info("ConfigManagerV2 started") + + tasks = [ + asyncio.create_task(self._worker_loop(), name="v2-worker-loop"), + asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"), + ] + + try: + await asyncio.gather(*tasks) + except asyncio.CancelledError: + raise + finally: + self._state = LifecycleState.STOPPING + self._halt.set() + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + await self._stop_control_channel() + if self._management_server is not None: + await self._management_server.stop() + self._state = LifecycleState.STOPPED + self.logger.info("ConfigManagerV2 stopped") + + async def start(self) -> None: + """Запустить жизненный цикл менеджера из активного asyncio-контекста.""" + if self._task is not None and not self._task.done(): + self.logger.warning("ConfigManagerV2 is already running") + return + + try: + self._loop = asyncio.get_running_loop() + except RuntimeError: + self.logger.error("start() must be called from within an async context") + raise + + self._task = asyncio.create_task(self._run(), name="config-manager-v2") + try: + await self._task + finally: + self._task = None + + async def stop(self) -> None: + """Запросить плавную остановку и дождаться завершения менеджера.""" + if self._task is None: + self.logger.warning("ConfigManagerV2 is not running") + return + + self._halt.set() + if asyncio.current_task() is self._task: + return + + try: + await self._task + except asyncio.CancelledError: + pass diff --git a/src/config_manager/v2/scheduler.py b/src/config_manager/v2/core/scheduler.py similarity index 67% rename from src/config_manager/v2/scheduler.py rename to src/config_manager/v2/core/scheduler.py index 21d0390..8ac1342 100644 --- a/src/config_manager/v2/scheduler.py +++ b/src/config_manager/v2/core/scheduler.py @@ -1,3 +1,6 @@ +"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке с паузой между итерациями. + +Поддерживает halt-событие для остановки, колбэки on_success/on_error для учёта ошибок и здоровья.""" from __future__ import annotations import asyncio @@ -14,7 +17,7 @@ class WorkerLoop: on_error: Optional[Callable[[Exception], None]] = None, on_success: Optional[Callable[[], None]] = None, ): - """Store callbacks and synchronization primitives for worker execution.""" + """Сохранить колбэки и примитивы синхронизации для выполнения воркера.""" self._execute = execute self._get_interval = get_interval self._halt_event = halt_event @@ -22,7 +25,7 @@ class WorkerLoop: self._on_success = on_success async def run(self) -> None: - """Run execute repeatedly until halt is requested.""" + """Вызывать execute циклически до запроса остановки.""" while not self._halt_event.is_set(): try: await asyncio.to_thread(self._execute) diff --git a/src/config_manager/v2/health.py b/src/config_manager/v2/health.py deleted file mode 100644 index bb58241..0000000 --- a/src/config_manager/v2/health.py +++ /dev/null @@ -1,80 +0,0 @@ -from __future__ import annotations - -import asyncio -import json -from collections.abc import Awaitable, Callable -from typing import Optional - -from .types import HealthPayload - - -class HealthServer: - def __init__( - self, - host: str, - port: int, - path: str, - timeout: float, - health_provider: Callable[[], Awaitable[HealthPayload]], - ): - """Configure lightweight HTTP health server parameters and callback.""" - self._host = host - self._port = port - self._path = path - self._timeout = timeout - self._health_provider = health_provider - self._server: Optional[asyncio.base_events.Server] = None - - async def start(self) -> None: - """Start listening for healthcheck requests if not running.""" - if self._server is not None: - return - self._server = await asyncio.start_server(self._handle_connection, self._host, self._port) - - async def stop(self) -> None: - """Stop the health server and release the listening socket.""" - if self._server is None: - return - self._server.close() - await self._server.wait_closed() - self._server = None - - async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: - """Process one HTTP connection and return JSON health payload.""" - status_code = 404 - payload: HealthPayload = {"status": "unhealthy", "detail": "not found"} - - try: - request_line = await reader.readline() - parts = request_line.decode("utf-8", errors="ignore").strip().split(" ") - if len(parts) >= 2: - method, path = parts[0], parts[1] - if method == "GET" and path == self._path: - status_code, payload = await self._build_health_response() - except Exception: # noqa: BLE001 - status_code = 500 - payload = {"status": "unhealthy", "detail": "internal error"} - - body = json.dumps(payload).encode("utf-8") - phrase = "OK" if status_code == 200 else "SERVICE UNAVAILABLE" if status_code == 503 else "NOT FOUND" - response = ( - f"HTTP/1.1 {status_code} {phrase}\r\n" - "Content-Type: application/json\r\n" - f"Content-Length: {len(body)}\r\n" - "Connection: close\r\n" - "\r\n" - ).encode("utf-8") + body - - writer.write(response) - await writer.drain() - writer.close() - await writer.wait_closed() - - async def _build_health_response(self) -> tuple[int, HealthPayload]: - """Build HTTP status code and body from application health callback.""" - try: - payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout) - status = payload.get("status", "unhealthy") - return (200, payload) if status == "ok" else (503, payload) - except Exception as exc: # noqa: BLE001 - return 503, {"status": "unhealthy", "detail": str(exc)} diff --git a/src/config_manager/v2/management/__init__.py b/src/config_manager/v2/management/__init__.py new file mode 100644 index 0000000..8af990a --- /dev/null +++ b/src/config_manager/v2/management/__init__.py @@ -0,0 +1,14 @@ +"""Management API: HTTP-сервер для /health и /actions/start|stop, адаптеры и сбор здоровья. + +Объединяет сервер, мосты к жизненному циклу и агрегатор здоровья для единого контракта API.""" +from .bridges import ControlChannelBridge, ManagementApiBridge +from .health_aggregator import HealthAggregator +from .management_server import HealthServer, ManagementServer + +__all__ = [ + "ControlChannelBridge", + "HealthAggregator", + "HealthServer", + "ManagementApiBridge", + "ManagementServer", +] diff --git a/src/config_manager/v2/management/bridges.py b/src/config_manager/v2/management/bridges.py new file mode 100644 index 0000000..f4c8b73 --- /dev/null +++ b/src/config_manager/v2/management/bridges.py @@ -0,0 +1,61 @@ +"""Адаптеры, связывающие жизненный цикл менеджера с HTTP API и каналами управления. + +ManagementApiBridge отдаёт start/stop в HTTP; ControlChannelBridge — start/stop/status в Telegram и др.""" +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable + +from ..types import LifecycleState + + +class ManagementApiBridge: + """Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop).""" + + def __init__( + self, + start_fn: Callable[[], Awaitable[None]], + stop_fn: Callable[[], Awaitable[None]], + ): + self._start_fn = start_fn + self._stop_fn = stop_fn + + async def on_start(self) -> str: + """Выполнить start и вернуть сообщение для HTTP-ответа.""" + await self._start_fn() + return "start completed" + + async def on_stop(self) -> str: + """Выполнить stop и вернуть сообщение для HTTP-ответа.""" + await self._stop_fn() + return "stop completed" + + +class ControlChannelBridge: + """Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram).""" + + def __init__( + self, + halt: asyncio.Event, + get_state: Callable[[], LifecycleState], + get_status: Callable[[], Awaitable[str]], + ): + self._halt = halt + self._get_state = get_state + self._get_status = get_status + + async def on_start(self) -> str: + """Обработать внешний start: сбросить halt; идемпотентно при уже running.""" + if self._get_state() == LifecycleState.RUNNING: + return "already running" + self._halt.clear() + return "start signal accepted" + + async def on_stop(self) -> str: + """Обработать внешний stop: установить halt.""" + self._halt.set() + return "stop signal accepted" + + async def on_status(self) -> str: + """Вернуть текущий текст статуса.""" + return await self._get_status() diff --git a/src/config_manager/v2/management/health_aggregator.py b/src/config_manager/v2/management/health_aggregator.py new file mode 100644 index 0000000..1b34846 --- /dev/null +++ b/src/config_manager/v2/management/health_aggregator.py @@ -0,0 +1,38 @@ +"""Собирает состояние жизненного цикла и здоровья приложения в один ответ для /health. + +Учитывает состояние (running/stopping), последнюю ошибку execute и результат get_health_status().""" +from __future__ import annotations + +from collections.abc import Callable + +from ..types import HealthPayload, LifecycleState + + +class HealthAggregator: + """Формирует ответ здоровья из текущего состояния, последней ошибки и здоровья приложения.""" + + def __init__( + self, + get_state: Callable[[], LifecycleState], + get_last_error: Callable[[], str | None], + get_app_health: Callable[[], HealthPayload], + ): + self._get_state = get_state + self._get_last_error = get_last_error + self._get_app_health = get_app_health + + async def collect(self) -> HealthPayload: + """Вернуть агрегированное здоровье: unhealthy при не running или ошибке, иначе здоровье приложения.""" + state = self._get_state() + if state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}: + return {"status": "unhealthy", "detail": f"state={state.value}"} + + last_error = self._get_last_error() + if last_error is not None: + return {"status": "unhealthy", "detail": last_error} + + result = self._get_app_health() + status = result.get("status", "unhealthy") + if status not in {"ok", "degraded", "unhealthy"}: + return {"status": "unhealthy", "detail": "invalid health status"} + return result diff --git a/src/config_manager/v2/management/management_server.py b/src/config_manager/v2/management/management_server.py new file mode 100644 index 0000000..bd27c8e --- /dev/null +++ b/src/config_manager/v2/management/management_server.py @@ -0,0 +1,145 @@ +"""Management HTTP API на FastAPI: эндпоинты /health, /actions/start, /actions/stop. + +Единообразное описание маршрутов через декораторы FastAPI.""" +from __future__ import annotations + +import asyncio +import json +from collections.abc import Awaitable, Callable +from typing import Any, Optional + +from fastapi import FastAPI +from fastapi.responses import JSONResponse +from uvicorn import Config, Server + +from ..types import HealthPayload + +# Захардкоженные эндпоинты management API. +PATH_HEALTH = "/health" +PATH_ACTION_START = "/actions/start" +PATH_ACTION_STOP = "/actions/stop" + + +class ManagementServer: + def __init__( + self, + host: str, + port: int, + timeout: float, + health_provider: Callable[[], Awaitable[HealthPayload]], + on_start: Optional[Callable[[], Awaitable[str]]] = None, + on_stop: Optional[Callable[[], Awaitable[str]]] = None, + ): + """Настройка параметров и колбэков лёгкого HTTP management-сервера.""" + self._host = host + self._port = port + self._timeout = timeout + self._health_provider = health_provider + self._on_start = on_start + self._on_stop = on_stop + self._uvicorn_server: Optional[Server] = None + self._serve_task: Optional[asyncio.Task[None]] = None + self._bound_port: Optional[int] = None + self._app = self._create_app() + + def _create_app(self) -> FastAPI: + app = FastAPI(title="Config Manager Management API") + + @app.get(PATH_HEALTH) + async def health() -> JSONResponse: + return await self._health_response() + + @app.get(PATH_ACTION_START) + @app.post(PATH_ACTION_START) + async def action_start() -> JSONResponse: + return await self._action_response("start", self._on_start) + + @app.get(PATH_ACTION_STOP) + @app.post(PATH_ACTION_STOP) + async def action_stop() -> JSONResponse: + return await self._action_response("stop", self._on_stop) + + return app + + async def _health_response(self) -> JSONResponse: + """Сформировать HTTP-ответ из колбэка здоровья приложения.""" + try: + payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout) + status = payload.get("status", "unhealthy") + status_code = 200 if status == "ok" else 503 + return JSONResponse(content=payload, status_code=status_code) + except Exception as exc: # noqa: BLE001 + return JSONResponse( + content={"status": "unhealthy", "detail": str(exc)}, + status_code=503, + ) + + async def _action_response( + self, + action: str, + callback: Optional[Callable[[], Awaitable[str]]], + ) -> JSONResponse: + """Сформировать HTTP-ответ для колбэка действия start/stop.""" + if callback is None: + return JSONResponse( + content={"status": "error", "detail": f"{action} handler is not configured"}, + status_code=404, + ) + try: + detail = await callback() + if not detail: + detail = f"{action} action accepted" + return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200) + except Exception as exc: # noqa: BLE001 + return JSONResponse( + content={"status": "error", "detail": str(exc)}, + status_code=500, + ) + + def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]: + """Для тестов: вернуть (status_code, payload) как раньше.""" + async def _run() -> tuple[int, HealthPayload]: + response = await self._health_response() + body: Any = response.body + if isinstance(body, bytes): + body = json.loads(body.decode("utf-8")) + return response.status_code, body + return _run() + + async def start(self) -> None: + """Начать приём запросов к API здоровья и действий, если ещё не запущен.""" + if self._serve_task is not None: + return + config = Config( + app=self._app, + host=self._host, + port=self._port, + log_level="warning", + ) + self._uvicorn_server = Server(config) + self._serve_task = asyncio.create_task(self._uvicorn_server.serve()) + await asyncio.sleep(0.05) + if self._uvicorn_server.servers: + sock = self._uvicorn_server.servers[0].sockets[0] + self._bound_port = sock.getsockname()[1] + else: + self._bound_port = self._port + + async def stop(self) -> None: + """Остановить management-сервер и освободить сокет.""" + if self._uvicorn_server is None or self._serve_task is None: + return + self._uvicorn_server.should_exit = True + await self._serve_task + self._uvicorn_server = None + self._serve_task = None + self._bound_port = None + + @property + def port(self) -> int: + """Порт, на котором слушает сервер (после start); при port=0 — фактически выданный ОС.""" + return self._bound_port if self._bound_port is not None else self._port + + +# Backward-compatible alias. +HealthServer = ManagementServer diff --git a/src/config_manager/v2/manager.py b/src/config_manager/v2/manager.py deleted file mode 100644 index dbdb4a3..0000000 --- a/src/config_manager/v2/manager.py +++ /dev/null @@ -1,256 +0,0 @@ -from __future__ import annotations - -import asyncio -import logging -from collections.abc import Awaitable -from typing import Any, Optional - -from ..v1.log_manager import LogManager -from .config_loader import ConfigLoader -from .control.base import ControlChannel -from .health import HealthServer -from .scheduler import WorkerLoop -from .types import HealthPayload, HealthServerSettings, LifecycleState - - -class ConfigManagerV2: - DEFAULT_UPDATE_INTERVAL = 5.0 - DEFAULT_WORK_INTERVAL = 2.0 - - def __init__( - self, - path: str, - log_manager: Optional[LogManager] = None, - health_settings: Optional[HealthServerSettings] = None, - control_channel: Optional[ControlChannel] = None, - ): - """Initialize manager subsystems and runtime state.""" - self.path = path - self.config: Any = None - self.update_interval = self.DEFAULT_UPDATE_INTERVAL - self.work_interval = self.DEFAULT_WORK_INTERVAL - - self._loader = ConfigLoader(path) - self._log_manager = log_manager or LogManager() - self._control_channel = control_channel - - self._halt = asyncio.Event() - self._task: Optional[asyncio.Task] = None - self._loop: Optional[asyncio.AbstractEventLoop] = None - - self._state = LifecycleState.IDLE - self._last_execute_error: Optional[str] = None - - self._health_settings = health_settings or HealthServerSettings(enabled=False) - self._health_server: Optional[HealthServer] = None - if self._health_settings.enabled: - self._health_server = HealthServer( - host=self._health_settings.host, - port=self._health_settings.port, - path=self._health_settings.path, - timeout=self._health_settings.timeout, - health_provider=self._collect_health, - ) - - self.logger = logging.getLogger(__name__) - - def _read_file_sync(self) -> str: - """Read config file synchronously via the shared loader.""" - return self._loader._read_file_sync() - - async def _read_file_async(self) -> str: - """Read config file asynchronously via a worker thread.""" - return await self._loader.read_file_async() - - def _parse_config(self, data: str) -> Any: - """Parse raw config text to a Python object.""" - return self._loader.parse_config(data) - - def _update_intervals_from_config(self) -> None: - """Refresh work and update intervals from current config.""" - if not isinstance(self.config, dict): - return - - upd = self.config.get("update_interval") - wrk = self.config.get("work_interval") - - if isinstance(upd, (int, float)) and upd > 0: - self.update_interval = float(upd) - else: - self.update_interval = self.DEFAULT_UPDATE_INTERVAL - - if isinstance(wrk, (int, float)) and wrk > 0: - self.work_interval = float(wrk) - else: - self.work_interval = self.DEFAULT_WORK_INTERVAL - - async def _update_config(self) -> None: - """Reload config and apply new settings when content changes.""" - try: - changed, new_config = await self._loader.load_if_changed() - if not changed: - return - - self.config = new_config - self._update_intervals_from_config() - if isinstance(new_config, dict): - self._log_manager.apply_config(new_config) - except Exception as exc: # noqa: BLE001 - # Keep current config untouched and continue with last valid settings. - self.logger.error("Error reading/parsing config file: %s", exc) - if self._loader.last_valid_config is not None: - self.config = self._loader.last_valid_config - self._update_intervals_from_config() - - def execute(self) -> None: - """Override in subclass to implement one unit of blocking work.""" - - def get_health_status(self) -> HealthPayload: - """Return application-specific health payload for /health.""" - return {"status": "ok"} - - async def _collect_health(self) -> HealthPayload: - """Aggregate lifecycle and app status into one health result.""" - if self._state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}: - return {"status": "unhealthy", "detail": f"state={self._state.value}"} - - if self._last_execute_error is not None: - return {"status": "unhealthy", "detail": self._last_execute_error} - - result = self.get_health_status() - status = result.get("status", "unhealthy") - if status not in {"ok", "degraded", "unhealthy"}: - return {"status": "unhealthy", "detail": "invalid health status"} - return result - - def _on_execute_success(self) -> None: - """Clear the last execution error marker after successful run.""" - self._last_execute_error = None - - def _on_execute_error(self, exc: Exception) -> None: - """Store and log execution failure details for health reporting.""" - self._last_execute_error = str(exc) - self.logger.error("Execution error: %s", exc) - - async def _worker_loop(self) -> None: - """Run execute() repeatedly until shutdown is requested.""" - worker = WorkerLoop( - execute=self.execute, - get_interval=lambda: self.work_interval, - halt_event=self._halt, - on_error=self._on_execute_error, - on_success=self._on_execute_success, - ) - await worker.run() - - async def _periodic_update_loop(self) -> None: - """Periodically check config file for updates until shutdown.""" - while not self._halt.is_set(): - await self._update_config() - try: - await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05)) - except asyncio.TimeoutError: - continue - - async def _status_text(self) -> str: - """Build human-readable runtime status for control channels.""" - health = await self._collect_health() - detail = health.get("detail") - if detail: - return f"state={self._state.value}; health={health['status']}; detail={detail}" - return f"state={self._state.value}; health={health['status']}" - - async def _control_start(self) -> str: - """Handle external start command for control channels.""" - if self._state == LifecycleState.RUNNING: - return "already running" - self._halt.clear() - return "start signal accepted" - - async def _control_stop(self) -> str: - """Handle external stop command for control channels.""" - self._halt.set() - return "stop signal accepted" - - async def _control_status(self) -> str: - """Handle external status command for control channels.""" - return await self._status_text() - - async def _start_control_channel(self) -> None: - """Start configured control channel with bound command handlers.""" - if self._control_channel is None: - return - await self._control_channel.start(self._control_start, self._control_stop, self._control_status) - - async def _stop_control_channel(self) -> None: - """Stop configured control channel if it is active.""" - if self._control_channel is None: - return - await self._control_channel.stop() - - async def _run(self) -> None: - """Run manager lifecycle and coordinate all background tasks.""" - self._state = LifecycleState.STARTING - self._halt.clear() - await self._update_config() - - if self._health_server is not None: - await self._health_server.start() - await self._start_control_channel() - - self._state = LifecycleState.RUNNING - self.logger.info("ConfigManagerV2 started") - - tasks = [ - asyncio.create_task(self._worker_loop(), name="v2-worker-loop"), - asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"), - ] - - try: - await asyncio.gather(*tasks) - except asyncio.CancelledError: - raise - finally: - self._state = LifecycleState.STOPPING - self._halt.set() - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) - await self._stop_control_channel() - if self._health_server is not None: - await self._health_server.stop() - self._state = LifecycleState.STOPPED - self.logger.info("ConfigManagerV2 stopped") - - async def start(self) -> None: - """Start manager lifecycle from an active asyncio context.""" - if self._task is not None and not self._task.done(): - self.logger.warning("ConfigManagerV2 is already running") - return - - try: - self._loop = asyncio.get_running_loop() - except RuntimeError: - self.logger.error("start() must be called from within an async context") - raise - - self._task = asyncio.create_task(self._run(), name="config-manager-v2") - try: - await self._task - finally: - self._task = None - - async def stop(self) -> None: - """Request graceful shutdown and wait for manager completion.""" - if self._task is None: - self.logger.warning("ConfigManagerV2 is not running") - return - - self._halt.set() - if asyncio.current_task() is self._task: - return - - try: - await self._task - except asyncio.CancelledError: - pass diff --git a/src/config_manager/v2/types.py b/src/config_manager/v2/types.py index 9cb2f0e..531ffc3 100644 --- a/src/config_manager/v2/types.py +++ b/src/config_manager/v2/types.py @@ -1,3 +1,6 @@ +"""Общие типы V2: состояние здоровья, жизненного цикла и настройки management-сервера. + +Используются в core, management и control для единообразных контрактов.""" from __future__ import annotations from dataclasses import dataclass @@ -22,9 +25,12 @@ class LifecycleState(str, Enum): @dataclass -class HealthServerSettings: +class ManagementServerSettings: enabled: bool = False host: str = "0.0.0.0" port: int = 8000 - path: str = "/health" timeout: float = 3.0 + + +# Backward-compatible alias. +HealthServerSettings = ManagementServerSettings diff --git a/tests/v2/test_health_endpoint.py b/tests/v2/test_health_endpoint.py index 0a3679e..ce034e9 100644 --- a/tests/v2/test_health_endpoint.py +++ b/tests/v2/test_health_endpoint.py @@ -1,6 +1,7 @@ import asyncio +import json -from config_manager.v2.health import HealthServer +from config_manager.v2.management import ManagementServer def test_health_mapping_ok_to_200(): @@ -8,10 +9,9 @@ def test_health_mapping_ok_to_200(): return {"status": "ok"} async def scenario() -> None: - server = HealthServer( + server = ManagementServer( host="127.0.0.1", port=8000, - path="/health", timeout=0.2, health_provider=provider, ) @@ -27,10 +27,9 @@ def test_health_mapping_unhealthy_to_503(): return {"status": "unhealthy", "detail": "worker failed"} async def scenario() -> None: - server = HealthServer( + server = ManagementServer( host="127.0.0.1", port=8000, - path="/health", timeout=0.2, health_provider=provider, ) @@ -39,3 +38,63 @@ def test_health_mapping_unhealthy_to_503(): assert payload["status"] == "unhealthy" asyncio.run(scenario()) + + +def test_action_routes_call_callbacks(): + events: list[str] = [] + + async def provider(): + return {"status": "ok"} + + async def on_start() -> str: + events.append("start") + return "start accepted" + + async def on_stop() -> str: + events.append("stop") + return "stop accepted" + + async def request(port: int, path: str) -> tuple[int, dict[str, str]]: + reader, writer = await asyncio.open_connection("127.0.0.1", port) + writer.write( + f"GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n".encode("utf-8") + ) + await writer.drain() + raw = await reader.read() + writer.close() + await writer.wait_closed() + + header, body = raw.split(b"\r\n\r\n", maxsplit=1) + status_code = int(header.split(b" ")[1]) + payload = json.loads(body.decode("utf-8")) + return status_code, payload + + async def scenario() -> None: + server = ManagementServer( + host="127.0.0.1", + port=0, + timeout=0.2, + health_provider=provider, + on_start=on_start, + on_stop=on_stop, + ) + await server.start() + try: + port = server.port + assert port > 0 + + start_code, start_payload = await request(port, "/actions/start") + stop_code, stop_payload = await request(port, "/actions/stop") + finally: + await server.stop() + + assert start_code == 200 + assert start_payload["status"] == "ok" + assert start_payload["detail"] == "start accepted" + + assert stop_code == 200 + assert stop_payload["status"] == "ok" + assert stop_payload["detail"] == "stop accepted" + assert events == ["start", "stop"] + + asyncio.run(scenario())