diff --git a/src/config_manager/v2/core/config_loader.py b/src/config_manager/v2/core/config_loader.py index 4c96391..feb1fc0 100644 --- a/src/config_manager/v2/core/config_loader.py +++ b/src/config_manager/v2/core/config_loader.py @@ -6,11 +6,14 @@ from __future__ import annotations import asyncio import hashlib import json +import logging import os from typing import Any, Optional import yaml +logger = logging.getLogger(__name__) + class ConfigLoader: def __init__(self, path: str): @@ -19,39 +22,59 @@ class ConfigLoader: self.config: Any = None self.last_valid_config: Any = None self._last_seen_hash: Optional[str] = None + logger.warning("ConfigLoader.__init__ result: path=%s", self.path) def _read_file_sync(self) -> str: """Синхронно прочитать сырой текст конфига с диска.""" with open(self.path, "r", encoding="utf-8") as fh: - return fh.read() + data = fh.read() + logger.warning("ConfigLoader._read_file_sync result: bytes=%s", len(data)) + return data async def read_file_async(self) -> str: """Прочитать сырой текст конфига с диска в рабочем потоке.""" - return await asyncio.to_thread(self._read_file_sync) + result = await asyncio.to_thread(self._read_file_sync) + logger.warning("ConfigLoader.read_file_async result: bytes=%s", len(result)) + return result def parse_config(self, data: str) -> Any: """Распарсить текст конфига как YAML или JSON по расширению файла.""" extension = os.path.splitext(self.path)[1].lower() - if extension in (".yaml", ".yml"): - return yaml.safe_load(data) - return json.loads(data) + try: + if extension in (".yaml", ".yml"): + result = yaml.safe_load(data) + else: + result = json.loads(data) + except Exception: # noqa: BLE001 + logger.exception("ConfigLoader.parse_config error: extension=%s", extension) + raise + logger.warning( + "ConfigLoader.parse_config result: extension=%s type=%s", + extension, + type(result).__name__, + ) + return result @staticmethod def _calculate_hash(data: str) -> str: """Вычислить устойчивый хеш содержимого для обнаружения изменений.""" - return hashlib.sha256(data.encode("utf-8")).hexdigest() + result = hashlib.sha256(data.encode("utf-8")).hexdigest() + logger.warning("ConfigLoader._calculate_hash result: hash=%s", result) + return result async def load_if_changed(self) -> tuple[bool, Any]: """Загрузить и распарсить конфиг только при изменении содержимого файла.""" raw_data = await self.read_file_async() current_hash = self._calculate_hash(raw_data) if current_hash == self._last_seen_hash: + logger.warning("ConfigLoader.load_if_changed result: changed=False") return False, self.config self._last_seen_hash = current_hash parsed = self.parse_config(raw_data) self.config = parsed self.last_valid_config = parsed + logger.warning("ConfigLoader.load_if_changed result: changed=True") return True, parsed diff --git a/src/config_manager/v2/core/manager.py b/src/config_manager/v2/core/manager.py index a7ebe34..212a349 100644 --- a/src/config_manager/v2/core/manager.py +++ b/src/config_manager/v2/core/manager.py @@ -1,14 +1,11 @@ -"""Главный класс менеджера V2: оркестрация жизненного цикла, конфига, API и каналов управления. - -Запускает воркер и периодическое обновление конфига, поднимает management-сервер и control-канал при наличии настроек.""" +"""Config manager v2: runtime orchestration and configuration updates.""" from __future__ import annotations -import os import asyncio import logging +import os import time from typing import Any, Optional -import logging from ...v1.log_manager import LogManager from ..control.base import ControlChannel @@ -24,7 +21,197 @@ from .scheduler import WorkerLoop logger = logging.getLogger(__name__) -class ConfigManagerV2: + +def _read_env_interval(name: str, default_value: float) -> float: + """Read positive float interval from env.""" + raw_value = os.environ.get(name) + if raw_value is None: + return float(default_value) + try: + parsed = float(raw_value) + if parsed <= 0: + raise ValueError(f"{name} must be greater than zero") + return parsed + except Exception: # noqa: BLE001 + logger.exception( + "ConfigManagerV2 interval parse error: env=%s raw_value=%s fallback=%s", + name, + raw_value, + default_value, + ) + return float(default_value) + + +class _RuntimeController: + """Runtime loops and lifecycle supervision.""" + + def _on_execute_success(self) -> None: + self._last_success_timestamp = time.monotonic() + self._last_execute_error = None + self.logger.warning( + "ConfigManagerV2._on_execute_success result: last_success_timestamp=%s", + self._last_success_timestamp, + ) + + def _on_execute_error(self, exc: Exception) -> None: + self._last_execute_error = str(exc) + self.logger.exception("ConfigManagerV2._on_execute_error") + self.logger.warning( + "ConfigManagerV2._on_execute_error result: last_execute_error=%s", + self._last_execute_error, + ) + + async def _worker_loop(self) -> None: + self.logger.warning( + "ConfigManagerV2._worker_loop result: started work_interval=%s", + self.work_interval, + ) + worker = WorkerLoop( + execute=self.execute, + get_interval=lambda: self.work_interval, + halt_event=self._halt, + on_error=self._on_execute_error, + on_success=self._on_execute_success, + ) + try: + await worker.run() + self.logger.warning("ConfigManagerV2._worker_loop result: completed") + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._worker_loop error") + raise + finally: + self.logger.warning("ConfigManagerV2._worker_loop result: stopped") + + async def _periodic_update_loop(self) -> None: + self.logger.warning( + "ConfigManagerV2._periodic_update_loop result: started update_interval=%s", + self.update_interval, + ) + try: + while not self._halt.is_set(): + await self._update_config() + try: + await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05)) + except asyncio.TimeoutError: + continue + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._periodic_update_loop error") + raise + finally: + self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped") + + async def _status_text(self) -> str: + health = await self._health_aggregator.collect() + detail = health.get("detail") + if detail: + status_text = f"state={self._state.value}; health={health['status']}; detail={detail}" + self.logger.warning("ConfigManagerV2._status_text result: %s", status_text) + return status_text + status_text = f"state={self._state.value}; health={health['status']}" + self.logger.warning("ConfigManagerV2._status_text result: %s", status_text) + return status_text + + async def _start_control_channel(self) -> None: + if self._control_channel is None: + self.logger.warning("ConfigManagerV2._start_control_channel result: no control channel") + return + try: + await self._control_channel.start( + self._control_bridge.on_start, + self._control_bridge.on_stop, + self._control_bridge.on_status, + ) + self.logger.warning("ConfigManagerV2._start_control_channel result: started") + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._start_control_channel error") + + async def _stop_control_channel(self) -> None: + if self._control_channel is None: + self.logger.warning("ConfigManagerV2._stop_control_channel result: no control channel") + return + try: + await self._control_channel.stop() + self.logger.warning("ConfigManagerV2._stop_control_channel result: stopped") + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._stop_control_channel error") + + async def _start_management_server(self) -> None: + if self._management_server is None: + self.logger.warning("ConfigManagerV2._start_management_server result: disabled") + return + try: + await self._management_server.start() + self.logger.warning( + "ConfigManagerV2._start_management_server result: started port=%s", + self._management_server.port, + ) + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._start_management_server error") + self.logger.warning( + "ConfigManagerV2._start_management_server result: failed worker will continue", + ) + + def _on_runtime_task_done(self, task: asyncio.Task) -> None: + if task.cancelled(): + self.logger.warning("ConfigManagerV2._on_runtime_task_done result: cancelled") + return + try: + exc = task.exception() + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._on_runtime_task_done error while reading task exception") + return + if exc is None: + self.logger.warning("ConfigManagerV2._on_runtime_task_done result: completed") + return + self.logger.error( + "ConfigManagerV2 background task failed", + exc_info=(type(exc), exc, exc.__traceback__), + ) + self.logger.warning("ConfigManagerV2._on_runtime_task_done result: failed") + + async def _run(self) -> None: + self._state = LifecycleState.STARTING + self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value) + self._halt.clear() + await self._update_config() + + await self._start_management_server() + await self._start_control_channel() + + self._state = LifecycleState.RUNNING + self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value) + tasks = [ + asyncio.create_task(self._worker_loop(), name="v2-worker-loop"), + asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"), + ] + try: + await asyncio.gather(*tasks) + self.logger.warning("ConfigManagerV2._run result: background loops completed") + except asyncio.CancelledError: + self.logger.warning("ConfigManagerV2._run result: cancelled") + raise + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._run error") + raise + finally: + self._state = LifecycleState.STOPPING + self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value) + self._halt.set() + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + self._state = LifecycleState.STOPPED + self._task = None + self.logger.warning( + "ConfigManagerV2._run result: state=%s api_and_control_available=%s", + self._state.value, + True, + ) + + +class ConfigManagerV2(_RuntimeController): + """Public manager API.""" + DEFAULT_UPDATE_INTERVAL = 5 DEFAULT_WORK_INTERVAL = 2 @@ -35,29 +222,25 @@ class ConfigManagerV2: management_settings: Optional[ManagementServerSettings] = None, control_channel: Optional[ControlChannel] = None, ): - """Инициализация подсистем менеджера и состояния рантайма.""" + self.logger = logging.getLogger(__name__) self.path = path self.config: Any = None - # Интервалы опроса (минуты): только здесь, в конфиг не пишем - self.update_interval = int(os.environ.get("UPDATE_INTERVAL", self.DEFAULT_UPDATE_INTERVAL)) - self.work_interval = int(os.environ.get("WORK_INTERVAL", self.DEFAULT_WORK_INTERVAL)) - print(f"self.update_interval {self.update_interval}") - print(f"self.work_interval {self.work_interval}") + self.update_interval = _read_env_interval("UPDATE_INTERVAL", float(self.DEFAULT_UPDATE_INTERVAL)) + self.work_interval = _read_env_interval("WORK_INTERVAL", float(self.DEFAULT_WORK_INTERVAL)) self._loader = ConfigLoader(path) self._log_manager = log_manager or LogManager() self._control_channel = control_channel - self._halt = asyncio.Event() self._task: Optional[asyncio.Task] = None self._loop: Optional[asyncio.AbstractEventLoop] = None - self._state = LifecycleState.IDLE self._last_execute_error: Optional[str] = None self._last_success_timestamp: Optional[float] = None - self._management_settings = management_settings or ManagementServerSettings(enabled=True) - self._health_timeout = self._management_settings.health_timeout + settings = management_settings or ManagementServerSettings(enabled=True) + self._management_settings = settings + self._health_timeout = settings.health_timeout self._health_aggregator = HealthAggregator( get_state=lambda: self._state, get_last_error=lambda: self._last_execute_error, @@ -72,168 +255,94 @@ class ConfigManagerV2: get_status=self._status_text, ) self._management_server: Optional[ManagementServer] = None - if self._management_settings.enabled: + if settings.enabled: self._management_server = ManagementServer( - host=self._management_settings.host, - port=self._management_settings.port, - timeout=self._management_settings.timeout, + host=settings.host, + port=settings.port, + timeout=settings.timeout, health_provider=self._health_aggregator.collect, on_start=self._api_bridge.on_start, on_stop=self._api_bridge.on_stop, ) - - self.logger = logging.getLogger(__name__) + self.logger.warning( + "ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s management_enabled=%s", + self.path, + self.update_interval, + self.work_interval, + self._management_server is not None, + ) def _apply_config(self, new_config: Any) -> None: - """Применить загруженный конфиг: log_manager. Интервалы (update_interval, work_interval) задаются только в классе/наследнике.""" self.config = new_config if isinstance(new_config, dict): - self._log_manager.apply_config(new_config) + try: + self._log_manager.apply_config(new_config) + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._apply_config error while applying logging config") + raise + self.logger.warning( + "ConfigManagerV2._apply_config result: config_type=%s is_dict=%s", + type(new_config).__name__, + isinstance(new_config, dict), + ) async def _update_config(self) -> None: - """Перезагрузить конфиг при изменении файла и применить к состоянию и log_manager.""" try: changed, new_config = await self._loader.load_if_changed() if not changed: + self.logger.warning("ConfigManagerV2._update_config result: no changes") return self._apply_config(new_config) + self.logger.warning("ConfigManagerV2._update_config result: config updated") except Exception as exc: # noqa: BLE001 - self.logger.error("Error reading/parsing config file: %s", exc) - if self._loader.last_valid_config is not None: + self.logger.exception("ConfigManagerV2._update_config error") + if self._loader.last_valid_config is None: + self.logger.warning( + "ConfigManagerV2._update_config result: no fallback config available detail=%s", + str(exc), + ) + return + try: self._apply_config(self._loader.last_valid_config) + self.logger.warning( + "ConfigManagerV2._update_config result: fallback to last valid config applied", + ) + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._update_config fallback error") def execute(self) -> None: - """Переопределить в подклассе для реализации одной единицы блокирующей работы.""" + """Override in subclasses.""" def get_health_status(self) -> HealthPayload: - """Вернуть payload здоровья приложения для /health. - - Варианты ответа по статусу: - - ``{"status": "ok"}`` — сервис в норме; GET /health → 200. - - ``{"status": "degraded", "detail": "..."}`` — работает с ограничениями; GET /health → 503. - - ``{"status": "unhealthy", "detail": "..."}`` — неработоспособен; GET /health → 503. - - Поле ``detail`` опционально; для ``ok`` обычно не задаётся. - Переопределить в подклассе для своей логики здоровья.""" return {"status": "ok"} - def _on_execute_success(self) -> None: - """Обновить время последнего успешного execute() и сбросить маркер ошибки.""" - self._last_success_timestamp = time.monotonic() - self._last_execute_error = None - - def _on_execute_error(self, exc: Exception) -> None: - """Сохранить и залогировать детали ошибки выполнения для отчёта здоровья.""" - self._last_execute_error = str(exc) - self.logger.error("Execution error: %s", exc) - - async def _worker_loop(self) -> None: - """Вызывать execute() циклически до запроса остановки.""" - logger.warning("Worker loop started") - logger.debug(f"Запускаем _worker_loop с интервалом {self.work_interval}") - worker = WorkerLoop( - execute=self.execute, - get_interval=lambda: self.work_interval, - halt_event=self._halt, - on_error=self._on_execute_error, - on_success=self._on_execute_success, - ) - try: - await worker.run() - finally: - logger.warning("Worker loop stopped") - - async def _periodic_update_loop(self) -> None: - """Периодически проверять файл конфига на обновления до остановки.""" - while not self._halt.is_set(): - await self._update_config() - try: - await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05)) - except asyncio.TimeoutError: - continue - - async def _status_text(self) -> str: - """Сформировать читаемый статус рантайма для каналов управления.""" - health = await self._health_aggregator.collect() - detail = health.get("detail") - if detail: - return f"state={self._state.value}; health={health['status']}; detail={detail}" - return f"state={self._state.value}; health={health['status']}" - - async def _start_control_channel(self) -> None: - """Запустить настроенный канал управления с привязанными обработчиками команд.""" - if self._control_channel is None: - return - await self._control_channel.start( - self._control_bridge.on_start, - self._control_bridge.on_stop, - self._control_bridge.on_status, - ) - - async def _stop_control_channel(self) -> None: - """Остановить настроенный канал управления, если он активен.""" - if self._control_channel is None: - return - await self._control_channel.stop() - - async def _run(self) -> None: - """Запустить жизненный цикл менеджера и координировать фоновые задачи.""" - self._state = LifecycleState.STARTING - self._halt.clear() - await self._update_config() - - if self._management_server is not None: - await self._management_server.start() - await self._start_control_channel() - - self._state = LifecycleState.RUNNING - self.logger.info("ConfigManagerV2 started") - - tasks = [ - asyncio.create_task(self._worker_loop(), name="v2-worker-loop"), - asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"), - ] - - try: - await asyncio.gather(*tasks) - except asyncio.CancelledError: - raise - finally: - self._state = LifecycleState.STOPPING - self._halt.set() - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) - # Management-сервер и control channel не останавливаем: API и канал управления остаются доступными. - self._state = LifecycleState.STOPPED - self._task = None - self.logger.info("ConfigManagerV2 stopped (API and control channel remain available)") - async def start(self) -> None: - """Запустить циклы execute и конфига в фоне; возвращает управление сразу (ответ на /actions/start приходит без ожидания).""" if self._task is not None and not self._task.done(): - self.logger.warning("ConfigManagerV2 is already running") + self.logger.warning("ConfigManagerV2.start result: already running") return - try: self._loop = asyncio.get_running_loop() except RuntimeError: - self.logger.error("start() must be called from within an async context") + self.logger.exception("ConfigManagerV2.start error: must be called from within async context") raise - self._task = asyncio.create_task(self._run(), name="config-manager-v2") + self._task.add_done_callback(self._on_runtime_task_done) + self.logger.warning("ConfigManagerV2.start result: background task started") async def stop(self) -> None: - """Запросить плавную остановку и дождаться завершения менеджера.""" if self._task is None: - self.logger.warning("ConfigManagerV2 is not running") + self.logger.warning("ConfigManagerV2.stop result: not running") return - self._halt.set() if asyncio.current_task() is self._task: + self.logger.warning("ConfigManagerV2.stop result: stop requested from runtime task") return - try: await self._task except asyncio.CancelledError: - pass + self.logger.warning("ConfigManagerV2.stop result: runtime task cancelled") + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2.stop error while awaiting runtime task") + raise + finally: + self.logger.warning("ConfigManagerV2.stop result: completed") diff --git a/src/config_manager/v2/core/scheduler.py b/src/config_manager/v2/core/scheduler.py index 50a684b..436cdde 100644 --- a/src/config_manager/v2/core/scheduler.py +++ b/src/config_manager/v2/core/scheduler.py @@ -4,9 +4,12 @@ from __future__ import annotations import asyncio +import logging from collections.abc import Callable from typing import Optional +logger = logging.getLogger(__name__) + class WorkerLoop: def __init__( @@ -23,20 +26,29 @@ class WorkerLoop: self._halt_event = halt_event self._on_error = on_error self._on_success = on_success + logger.warning( + "WorkerLoop.__init__ result: execute=%s", + getattr(execute, "__name__", execute.__class__.__name__), + ) async def run(self) -> None: """Вызывать execute циклически до запроса остановки.""" + logger.warning("WorkerLoop.run result: started") while not self._halt_event.is_set(): try: await asyncio.to_thread(self._execute) if self._on_success is not None: self._on_success() + logger.warning("WorkerLoop.run result: execute completed") except Exception as exc: # noqa: BLE001 + logger.exception("WorkerLoop.run error during execute") if self._on_error is not None: self._on_error(exc) + logger.warning("WorkerLoop.run result: execute failed") timeout = max(self._get_interval(), 0.01) try: await asyncio.wait_for(self._halt_event.wait(), timeout=timeout) except asyncio.TimeoutError: continue + logger.warning("WorkerLoop.run result: stopped") diff --git a/src/config_manager/v2/management/bridges.py b/src/config_manager/v2/management/bridges.py index f4c8b73..a21c5a7 100644 --- a/src/config_manager/v2/management/bridges.py +++ b/src/config_manager/v2/management/bridges.py @@ -4,10 +4,13 @@ ManagementApiBridge отдаёт start/stop в HTTP; ControlChannelBridge — st from __future__ import annotations import asyncio +import logging from collections.abc import Awaitable, Callable from ..types import LifecycleState +logger = logging.getLogger(__name__) + class ManagementApiBridge: """Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop).""" @@ -19,16 +22,29 @@ class ManagementApiBridge: ): self._start_fn = start_fn self._stop_fn = stop_fn + logger.warning("ManagementApiBridge.__init__ result: callbacks configured") async def on_start(self) -> str: """Выполнить start и вернуть сообщение для HTTP-ответа.""" - await self._start_fn() - return "start completed" + try: + await self._start_fn() + except Exception: # noqa: BLE001 + logger.exception("ManagementApiBridge.on_start error") + raise + result = "start completed" + logger.warning("ManagementApiBridge.on_start result: %s", result) + return result async def on_stop(self) -> str: """Выполнить stop и вернуть сообщение для HTTP-ответа.""" - await self._stop_fn() - return "stop completed" + try: + await self._stop_fn() + except Exception: # noqa: BLE001 + logger.exception("ManagementApiBridge.on_stop error") + raise + result = "stop completed" + logger.warning("ManagementApiBridge.on_stop result: %s", result) + return result class ControlChannelBridge: @@ -43,19 +59,32 @@ class ControlChannelBridge: self._halt = halt self._get_state = get_state self._get_status = get_status + logger.warning("ControlChannelBridge.__init__ result: callbacks configured") async def on_start(self) -> str: """Обработать внешний start: сбросить halt; идемпотентно при уже running.""" if self._get_state() == LifecycleState.RUNNING: - return "already running" + result = "already running" + logger.warning("ControlChannelBridge.on_start result: %s", result) + return result self._halt.clear() - return "start signal accepted" + result = "start signal accepted" + logger.warning("ControlChannelBridge.on_start result: %s", result) + return result async def on_stop(self) -> str: """Обработать внешний stop: установить halt.""" self._halt.set() - return "stop signal accepted" + result = "stop signal accepted" + logger.warning("ControlChannelBridge.on_stop result: %s", result) + return result async def on_status(self) -> str: """Вернуть текущий текст статуса.""" - return await self._get_status() + try: + result = await self._get_status() + except Exception: # noqa: BLE001 + logger.exception("ControlChannelBridge.on_status error") + raise + logger.warning("ControlChannelBridge.on_status result: %s", result) + return result diff --git a/src/config_manager/v2/management/health_aggregator.py b/src/config_manager/v2/management/health_aggregator.py index 731da17..1bc6503 100644 --- a/src/config_manager/v2/management/health_aggregator.py +++ b/src/config_manager/v2/management/health_aggregator.py @@ -3,11 +3,14 @@ Здоровье = был успешный execute() за последние health_timeout секунд; иначе unhealthy с деталью (ошибка или таймаут).""" from __future__ import annotations +import logging import time from collections.abc import Callable from ..types import HealthPayload, LifecycleState +logger = logging.getLogger(__name__) + class HealthAggregator: """Формирует ответ здоровья по времени последнего успешного execute() и таймауту.""" @@ -25,6 +28,7 @@ class HealthAggregator: self._get_last_success_timestamp = get_last_success_timestamp self._health_timeout = health_timeout self._get_app_health = get_app_health + logger.warning("HealthAggregator.__init__ result: health_timeout=%s", self._health_timeout) async def collect(self) -> HealthPayload: """Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state.""" @@ -33,21 +37,31 @@ class HealthAggregator: # Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy. if state != LifecycleState.RUNNING: - return {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value} + result = {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value} + logger.warning("HealthAggregator.collect result: %s", result) + return result last_success = self._get_last_success_timestamp() now = time.monotonic() if last_success is None: detail = self._get_last_error() or "no successful run yet" - return {"status": "unhealthy", "detail": detail, "state": state_value} + result = {"status": "unhealthy", "detail": detail, "state": state_value} + logger.warning("HealthAggregator.collect result: %s", result) + return result if (now - last_success) > self._health_timeout: detail = self._get_last_error() or f"no successful run within {self._health_timeout}s" - return {"status": "unhealthy", "detail": detail, "state": state_value} + result = {"status": "unhealthy", "detail": detail, "state": state_value} + logger.warning("HealthAggregator.collect result: %s", result) + return result result = self._get_app_health() status = result.get("status", "unhealthy") if status != "ok": - return {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value} - return {**result, "state": state_value} + unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value} + logger.warning("HealthAggregator.collect result: %s", unhealthy) + return unhealthy + healthy = {**result, "state": state_value} + logger.warning("HealthAggregator.collect result: %s", healthy) + return healthy diff --git a/src/config_manager/v2/management/management_server.py b/src/config_manager/v2/management/management_server.py index aa8c8d7..c547d7c 100644 --- a/src/config_manager/v2/management/management_server.py +++ b/src/config_manager/v2/management/management_server.py @@ -1,10 +1,9 @@ -"""Management HTTP API на FastAPI: эндпоинты /health, /actions/start, /actions/stop. - -Единообразное описание маршрутов через декораторы FastAPI.""" +"""Management HTTP API with /health, /actions/start and /actions/stop.""" from __future__ import annotations import asyncio import json +import logging from collections.abc import Awaitable, Callable from typing import Any, Optional @@ -14,13 +13,127 @@ from uvicorn import Config, Server from ..types import HealthPayload -# Захардкоженные эндпоинты management API. PATH_HEALTH = "/health" PATH_ACTION_START = "/actions/start" PATH_ACTION_STOP = "/actions/stop" +logger = logging.getLogger(__name__) + + +class UvicornServerRunner: + """Lifecycle wrapper around uvicorn Server.""" + + def __init__(self, host: str, port: int, timeout: int): + self._host = host + self._port = port + self._timeout = timeout + self._server: Optional[Server] = None + self._serve_task: Optional[asyncio.Task[None]] = None + self._bound_port: Optional[int] = None + logger.warning( + "UvicornServerRunner.__init__ result: host=%s port=%s timeout=%s", + self._host, + self._port, + self._timeout, + ) + + async def _raise_if_start_task_failed(self) -> None: + if self._serve_task is None or not self._serve_task.done(): + return + try: + await self._serve_task + except SystemExit as exc: + raise RuntimeError(f"Management server exited during startup with code {exc.code}") from exc + except Exception as exc: # noqa: BLE001 + raise RuntimeError("Management server failed during startup") from exc + raise RuntimeError("Management server stopped unexpectedly during startup") + + async def _wait_until_started(self) -> None: + if self._server is None: + raise RuntimeError("Management server is not initialized") + loop = asyncio.get_running_loop() + deadline = loop.time() + max(float(self._timeout), 1.0) + while not self._server.started: + await self._raise_if_start_task_failed() + if loop.time() >= deadline: + raise TimeoutError("Management server startup timed out") + await asyncio.sleep(0.05) + + def _resolve_bound_port(self) -> int: + if self._server is None: + return self._port + servers = getattr(self._server, "servers", None) + if not servers: + return self._port + sockets = getattr(servers[0], "sockets", None) + if not sockets: + return self._port + sockname = sockets[0].getsockname() + if isinstance(sockname, tuple) and len(sockname) > 1: + return int(sockname[1]) + return self._port + + async def _cleanup_start_failure(self) -> None: + if self._server is not None: + self._server.should_exit = True + if self._serve_task is not None: + try: + await self._serve_task + except BaseException: # noqa: BLE001 + logger.exception("UvicornServerRunner._cleanup_start_failure error") + self._server = None + self._serve_task = None + self._bound_port = None + logger.warning("UvicornServerRunner._cleanup_start_failure result: state reset") + + async def start(self, app: FastAPI) -> None: + if self._serve_task is not None and not self._serve_task.done(): + logger.warning("UvicornServerRunner.start result: already running") + return + if self._serve_task is not None and self._serve_task.done(): + self._serve_task = None + try: + config = Config(app=app, host=self._host, port=self._port, log_level="warning") + self._server = Server(config) + self._serve_task = asyncio.create_task(self._server.serve(), name="management-server-serve") + await self._wait_until_started() + self._bound_port = self._resolve_bound_port() + logger.warning( + "UvicornServerRunner.start result: running host=%s requested_port=%s bound_port=%s", + self._host, + self._port, + self._bound_port, + ) + except Exception: + logger.exception("UvicornServerRunner.start error") + await self._cleanup_start_failure() + raise + + async def stop(self) -> None: + if self._server is None or self._serve_task is None: + logger.warning("UvicornServerRunner.stop result: already stopped") + return + self._server.should_exit = True + try: + await self._serve_task + except BaseException: # noqa: BLE001 + logger.exception("UvicornServerRunner.stop error") + raise + finally: + self._server = None + self._serve_task = None + self._bound_port = None + logger.warning("UvicornServerRunner.stop result: stopped") + + @property + def port(self) -> int: + result = self._bound_port if self._bound_port is not None else self._port + logger.warning("UvicornServerRunner.port result: %s", result) + return result class ManagementServer: + """Management API endpoints and callback adapters.""" + def __init__( self, host: str, @@ -30,17 +143,18 @@ class ManagementServer: on_start: Optional[Callable[[], Awaitable[str]]] = None, on_stop: Optional[Callable[[], Awaitable[str]]] = None, ): - """Настройка параметров и колбэков лёгкого HTTP management-сервера.""" - self._host = host - self._port = port self._timeout = timeout self._health_provider = health_provider self._on_start = on_start self._on_stop = on_stop - self._uvicorn_server: Optional[Server] = None - self._serve_task: Optional[asyncio.Task[None]] = None - self._bound_port: Optional[int] = None + self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout) self._app = self._create_app() + logger.warning( + "ManagementServer.__init__ result: host=%s port=%s timeout=%s", + host, + port, + timeout, + ) def _create_app(self) -> FastAPI: app = FastAPI(title="Config Manager Management API") @@ -59,28 +173,38 @@ class ManagementServer: async def action_stop() -> JSONResponse: return await self._action_response("stop", self._on_stop) + logger.warning( + "ManagementServer._create_app result: routes=%s,%s,%s", + PATH_HEALTH, + PATH_ACTION_START, + PATH_ACTION_STOP, + ) return app async def _health_response(self) -> JSONResponse: - """Сформировать HTTP-ответ из колбэка здоровья приложения.""" try: payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout) - status = payload.get("status", "unhealthy") - status_code = 200 if status == "ok" else 503 + status_code = 200 if payload.get("status", "unhealthy") == "ok" else 503 + logger.warning( + "ManagementServer._health_response result: status_code=%s payload=%s", + status_code, + payload, + ) return JSONResponse(content=payload, status_code=status_code) except Exception as exc: # noqa: BLE001 - return JSONResponse( - content={"status": "unhealthy", "detail": str(exc)}, - status_code=503, - ) + logger.exception("ManagementServer._health_response error") + return JSONResponse(content={"status": "unhealthy", "detail": str(exc)}, status_code=503) async def _action_response( self, action: str, callback: Optional[Callable[[], Awaitable[str]]], ) -> JSONResponse: - """Сформировать HTTP-ответ для колбэка действия start/stop.""" if callback is None: + logger.warning( + "ManagementServer._action_response result: action=%s status_code=404 detail=handler not configured", + action, + ) return JSONResponse( content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404, @@ -89,57 +213,52 @@ class ManagementServer: detail = await callback() if not detail: detail = f"{action} action accepted" + logger.warning( + "ManagementServer._action_response result: action=%s status_code=200 detail=%s", + action, + detail, + ) return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200) except Exception as exc: # noqa: BLE001 - return JSONResponse( - content={"status": "error", "detail": str(exc)}, - status_code=500, - ) + logger.exception("ManagementServer._action_response error: action=%s", action) + return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]: - """Для тестов: вернуть (status_code, payload) как раньше.""" async def _run() -> tuple[int, HealthPayload]: response = await self._health_response() body: Any = response.body if isinstance(body, bytes): body = json.loads(body.decode("utf-8")) + logger.warning( + "ManagementServer._build_health_response result: status_code=%s payload=%s", + response.status_code, + body, + ) return response.status_code, body + return _run() async def start(self) -> None: - """Начать приём запросов к API здоровья и действий, если ещё не запущен.""" - if self._serve_task is not None: - return - config = Config( - app=self._app, - host=self._host, - port=self._port, - log_level="warning", - ) - self._uvicorn_server = Server(config) - self._serve_task = asyncio.create_task(self._uvicorn_server.serve()) - await asyncio.sleep(0.05) - if self._uvicorn_server.servers: - sock = self._uvicorn_server.servers[0].sockets[0] - self._bound_port = sock.getsockname()[1] - else: - self._bound_port = self._port + try: + await self._runner.start(self._app) + logger.warning("ManagementServer.start result: started") + except Exception: # noqa: BLE001 + logger.exception("ManagementServer.start error") + raise async def stop(self) -> None: - """Остановить management-сервер и освободить сокет.""" - if self._uvicorn_server is None or self._serve_task is None: - return - self._uvicorn_server.should_exit = True - await self._serve_task - self._uvicorn_server = None - self._serve_task = None - self._bound_port = None + try: + await self._runner.stop() + logger.warning("ManagementServer.stop result: stopped") + except BaseException: # noqa: BLE001 + logger.exception("ManagementServer.stop error") + raise @property def port(self) -> int: - """Порт, на котором слушает сервер (после start); при port=0 — фактически выданный ОС.""" - return self._bound_port if self._bound_port is not None else self._port + result = self._runner.port + logger.warning("ManagementServer.port result: %s", result) + return result -# Backward-compatible alias. HealthServer = ManagementServer