diff --git a/src/config_manager/v2/core/manager.py b/src/config_manager/v2/core/manager.py index f2b3400..350ab94 100644 --- a/src/config_manager/v2/core/manager.py +++ b/src/config_manager/v2/core/manager.py @@ -5,6 +5,7 @@ from __future__ import annotations import asyncio import logging +import time from typing import Any, Optional from ...v1.log_manager import LogManager @@ -29,7 +30,6 @@ class ConfigManagerV2: path: str, log_manager: Optional[LogManager] = None, management_settings: Optional[ManagementServerSettings] = None, - health_settings: Optional[ManagementServerSettings] = None, control_channel: Optional[ControlChannel] = None, ): """Инициализация подсистем менеджера и состояния рантайма.""" @@ -48,14 +48,15 @@ class ConfigManagerV2: self._state = LifecycleState.IDLE self._last_execute_error: Optional[str] = None + self._last_success_timestamp: Optional[float] = None - if management_settings is not None and health_settings is not None: - raise ValueError("Use either management_settings or health_settings, not both") - - self._management_settings = management_settings or health_settings or ManagementServerSettings(enabled=True) + self._management_settings = management_settings or ManagementServerSettings(enabled=True) + self._health_timeout = self._management_settings.health_timeout self._health_aggregator = HealthAggregator( get_state=lambda: self._state, get_last_error=lambda: self._last_execute_error, + get_last_success_timestamp=lambda: self._last_success_timestamp, + health_timeout=self._health_timeout, get_app_health=self.get_health_status, ) self._api_bridge = ManagementApiBridge(start_fn=self.start, stop_fn=self.stop) @@ -104,11 +105,20 @@ class ConfigManagerV2: """Переопределить в подклассе для реализации одной единицы блокирующей работы.""" def get_health_status(self) -> HealthPayload: - """Вернуть payload здоровья приложения для /health.""" + """Вернуть payload здоровья приложения для /health. + + Варианты ответа по статусу: + - ``{"status": "ok"}`` — сервис в норме; GET /health → 200. + - ``{"status": "degraded", "detail": "..."}`` — работает с ограничениями; GET /health → 503. + - ``{"status": "unhealthy", "detail": "..."}`` — неработоспособен; GET /health → 503. + + Поле ``detail`` опционально; для ``ok`` обычно не задаётся. + Переопределить в подклассе для своей логики здоровья.""" return {"status": "ok"} def _on_execute_success(self) -> None: - """Сбросить маркер последней ошибки выполнения после успешного запуска.""" + """Обновить время последнего успешного execute() и сбросить маркер ошибки.""" + self._last_success_timestamp = time.monotonic() self._last_execute_error = None def _on_execute_error(self, exc: Exception) -> None: @@ -188,14 +198,13 @@ class ConfigManagerV2: for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) - await self._stop_control_channel() - if self._management_server is not None: - await self._management_server.stop() + # Management-сервер и control channel не останавливаем: API и канал управления остаются доступными. self._state = LifecycleState.STOPPED - self.logger.info("ConfigManagerV2 stopped") + self._task = None + self.logger.info("ConfigManagerV2 stopped (API and control channel remain available)") async def start(self) -> None: - """Запустить жизненный цикл менеджера из активного asyncio-контекста.""" + """Запустить циклы execute и конфига в фоне; возвращает управление сразу (ответ на /actions/start приходит без ожидания).""" if self._task is not None and not self._task.done(): self.logger.warning("ConfigManagerV2 is already running") return @@ -207,10 +216,6 @@ class ConfigManagerV2: raise self._task = asyncio.create_task(self._run(), name="config-manager-v2") - try: - await self._task - finally: - self._task = None async def stop(self) -> None: """Запросить плавную остановку и дождаться завершения менеджера.""" diff --git a/src/config_manager/v2/management/health_aggregator.py b/src/config_manager/v2/management/health_aggregator.py index 1b34846..6bfbb1b 100644 --- a/src/config_manager/v2/management/health_aggregator.py +++ b/src/config_manager/v2/management/health_aggregator.py @@ -1,38 +1,53 @@ -"""Собирает состояние жизненного цикла и здоровья приложения в один ответ для /health. +"""Собирает состояние жизненного цикла и здоровья в один ответ для /health. -Учитывает состояние (running/stopping), последнюю ошибку execute и результат get_health_status().""" +Здоровье = был успешный execute() за последние health_timeout секунд; иначе unhealthy с деталью (ошибка или таймаут).""" from __future__ import annotations +import time from collections.abc import Callable from ..types import HealthPayload, LifecycleState class HealthAggregator: - """Формирует ответ здоровья из текущего состояния, последней ошибки и здоровья приложения.""" + """Формирует ответ здоровья по времени последнего успешного execute() и таймауту.""" def __init__( self, get_state: Callable[[], LifecycleState], get_last_error: Callable[[], str | None], + get_last_success_timestamp: Callable[[], float | None], + health_timeout: float, get_app_health: Callable[[], HealthPayload], ): self._get_state = get_state self._get_last_error = get_last_error + self._get_last_success_timestamp = get_last_success_timestamp + self._health_timeout = health_timeout self._get_app_health = get_app_health async def collect(self) -> HealthPayload: - """Вернуть агрегированное здоровье: unhealthy при не running или ошибке, иначе здоровье приложения.""" + """Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state.""" state = self._get_state() - if state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}: - return {"status": "unhealthy", "detail": f"state={state.value}"} + state_value = state.value - last_error = self._get_last_error() - if last_error is not None: - return {"status": "unhealthy", "detail": last_error} + # Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy. + if state != LifecycleState.RUNNING: + return {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value} + + last_success = self._get_last_success_timestamp() + now = time.monotonic() + + if last_success is None: + detail = self._get_last_error() or "no successful run yet" + return {"status": "unhealthy", "detail": detail, "state": state_value} + + if (now - last_success) > self._health_timeout: + detail = self._get_last_error() or f"no successful run within {self._health_timeout}s" + return {"status": "unhealthy", "detail": detail, "state": state_value} result = self._get_app_health() status = result.get("status", "unhealthy") - if status not in {"ok", "degraded", "unhealthy"}: - return {"status": "unhealthy", "detail": "invalid health status"} - return result + if status != "ok": + return {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value} + return {**result, "state": state_value} diff --git a/src/config_manager/v2/types.py b/src/config_manager/v2/types.py index 531ffc3..7e528c0 100644 --- a/src/config_manager/v2/types.py +++ b/src/config_manager/v2/types.py @@ -14,6 +14,8 @@ HealthState = Literal["ok", "degraded", "unhealthy"] class HealthPayload(TypedDict, total=False): status: HealthState detail: str + state: str + """Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped).""" class LifecycleState(str, Enum): @@ -26,10 +28,14 @@ class LifecycleState(str, Enum): @dataclass class ManagementServerSettings: + """Настройки management HTTP-сервера и healthcheck (один объект на оба).""" enabled: bool = False host: str = "0.0.0.0" port: int = 8000 timeout: float = 3.0 + """Таймаут запроса health (секунды).""" + health_timeout: float = 30.0 + """Секунды без успешного execute(), после которых health = unhealthy.""" # Backward-compatible alias. diff --git a/tests/test_app.py b/tests/test_app.py index a2eee25..8dce81b 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -1,34 +1,54 @@ -#import os -#os.chdir(os.path.dirname(__file__)) +# import os +# os.chdir(os.path.dirname(__file__)) + +import asyncio +import logging +from pathlib import Path from config_manager import ConfigManager -import logging -import asyncio -from typing import Optional - - +from config_manager.v1.log_manager import LogManager +from config_manager.v2 import ManagementServerSettings logger = logging.getLogger() +# Таймаут health: без успешного execute() дольше этого времени — unhealthy. +HEALTH_TIMEOUT = 3.0 + + class MyApp(ConfigManager): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.iter = 0 def execute(self) -> None: - logger.info(f"current iteration {self.iter}") + """Успешный прогон сбрасывает таймер health (обновляет время последнего успеха).""" + logger.info("current iteration %s", self.iter) self.iter += 1 -async def main(): - app = MyApp("config.yaml") - logger.info("App started") - await app.start() - logger.info("App finished") +async def main() -> None: + log_manager = LogManager() + # Один объект: и HTTP management-сервер (enabled, port), и health (health_timeout). + management_settings = ManagementServerSettings( + enabled=True, + port=8000, + health_timeout=HEALTH_TIMEOUT, + ) + config_path = Path(__file__).parent / "config.yaml" + app = MyApp( + str(config_path), + log_manager=log_manager, + management_settings=management_settings, + ) + logger.info("App starting (health_timeout=%s)", HEALTH_TIMEOUT) + # Менеджер запускаем в фоне (start() не возвращает управление до stop). + asyncio.create_task(app.start()) + logger.info("App running; Ctrl+C to stop") while True: await asyncio.sleep(1) + if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) asyncio.run(main()) - \ No newline at end of file