Рефакторинг и добавил апишку для управления

This commit is contained in:
2026-02-21 15:14:34 +03:00
parent d888ae7acb
commit 2d6179d366
4 changed files with 88 additions and 42 deletions

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import asyncio
import logging
import time
from typing import Any, Optional
from ...v1.log_manager import LogManager
@@ -29,7 +30,6 @@ class ConfigManagerV2:
path: str,
log_manager: Optional[LogManager] = None,
management_settings: Optional[ManagementServerSettings] = None,
health_settings: Optional[ManagementServerSettings] = None,
control_channel: Optional[ControlChannel] = None,
):
"""Инициализация подсистем менеджера и состояния рантайма."""
@@ -48,14 +48,15 @@ class ConfigManagerV2:
self._state = LifecycleState.IDLE
self._last_execute_error: Optional[str] = None
self._last_success_timestamp: Optional[float] = None
if management_settings is not None and health_settings is not None:
raise ValueError("Use either management_settings or health_settings, not both")
self._management_settings = management_settings or health_settings or ManagementServerSettings(enabled=True)
self._management_settings = management_settings or ManagementServerSettings(enabled=True)
self._health_timeout = self._management_settings.health_timeout
self._health_aggregator = HealthAggregator(
get_state=lambda: self._state,
get_last_error=lambda: self._last_execute_error,
get_last_success_timestamp=lambda: self._last_success_timestamp,
health_timeout=self._health_timeout,
get_app_health=self.get_health_status,
)
self._api_bridge = ManagementApiBridge(start_fn=self.start, stop_fn=self.stop)
@@ -104,11 +105,20 @@ class ConfigManagerV2:
"""Переопределить в подклассе для реализации одной единицы блокирующей работы."""
def get_health_status(self) -> HealthPayload:
"""Вернуть payload здоровья приложения для /health."""
"""Вернуть payload здоровья приложения для /health.
Варианты ответа по статусу:
- ``{"status": "ok"}`` — сервис в норме; GET /health → 200.
- ``{"status": "degraded", "detail": "..."}`` — работает с ограничениями; GET /health → 503.
- ``{"status": "unhealthy", "detail": "..."}`` — неработоспособен; GET /health → 503.
Поле ``detail`` опционально; для ``ok`` обычно не задаётся.
Переопределить в подклассе для своей логики здоровья."""
return {"status": "ok"}
def _on_execute_success(self) -> None:
"""Сбросить маркер последней ошибки выполнения после успешного запуска."""
"""Обновить время последнего успешного execute() и сбросить маркер ошибки."""
self._last_success_timestamp = time.monotonic()
self._last_execute_error = None
def _on_execute_error(self, exc: Exception) -> None:
@@ -188,14 +198,13 @@ class ConfigManagerV2:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
await self._stop_control_channel()
if self._management_server is not None:
await self._management_server.stop()
# Management-сервер и control channel не останавливаем: API и канал управления остаются доступными.
self._state = LifecycleState.STOPPED
self.logger.info("ConfigManagerV2 stopped")
self._task = None
self.logger.info("ConfigManagerV2 stopped (API and control channel remain available)")
async def start(self) -> None:
"""Запустить жизненный цикл менеджера из активного asyncio-контекста."""
"""Запустить циклы execute и конфига в фоне; возвращает управление сразу (ответ на /actions/start приходит без ожидания)."""
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManagerV2 is already running")
return
@@ -207,10 +216,6 @@ class ConfigManagerV2:
raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
try:
await self._task
finally:
self._task = None
async def stop(self) -> None:
"""Запросить плавную остановку и дождаться завершения менеджера."""

View File

@@ -1,38 +1,53 @@
"""Собирает состояние жизненного цикла и здоровья приложения в один ответ для /health.
"""Собирает состояние жизненного цикла и здоровья в один ответ для /health.
Учитывает состояние (running/stopping), последнюю ошибку execute и результат get_health_status()."""
Здоровье = был успешный execute() за последние health_timeout секунд; иначе unhealthy с деталью (ошибка или таймаут)."""
from __future__ import annotations
import time
from collections.abc import Callable
from ..types import HealthPayload, LifecycleState
class HealthAggregator:
"""Формирует ответ здоровья из текущего состояния, последней ошибки и здоровья приложения."""
"""Формирует ответ здоровья по времени последнего успешного execute() и таймауту."""
def __init__(
self,
get_state: Callable[[], LifecycleState],
get_last_error: Callable[[], str | None],
get_last_success_timestamp: Callable[[], float | None],
health_timeout: float,
get_app_health: Callable[[], HealthPayload],
):
self._get_state = get_state
self._get_last_error = get_last_error
self._get_last_success_timestamp = get_last_success_timestamp
self._health_timeout = health_timeout
self._get_app_health = get_app_health
async def collect(self) -> HealthPayload:
"""Вернуть агрегированное здоровье: unhealthy при не running или ошибке, иначе здоровье приложения."""
"""Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state."""
state = self._get_state()
if state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}:
return {"status": "unhealthy", "detail": f"state={state.value}"}
state_value = state.value
last_error = self._get_last_error()
if last_error is not None:
return {"status": "unhealthy", "detail": last_error}
# Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy.
if state != LifecycleState.RUNNING:
return {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value}
last_success = self._get_last_success_timestamp()
now = time.monotonic()
if last_success is None:
detail = self._get_last_error() or "no successful run yet"
return {"status": "unhealthy", "detail": detail, "state": state_value}
if (now - last_success) > self._health_timeout:
detail = self._get_last_error() or f"no successful run within {self._health_timeout}s"
return {"status": "unhealthy", "detail": detail, "state": state_value}
result = self._get_app_health()
status = result.get("status", "unhealthy")
if status not in {"ok", "degraded", "unhealthy"}:
return {"status": "unhealthy", "detail": "invalid health status"}
return result
if status != "ok":
return {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value}
return {**result, "state": state_value}

View File

@@ -14,6 +14,8 @@ HealthState = Literal["ok", "degraded", "unhealthy"]
class HealthPayload(TypedDict, total=False):
status: HealthState
detail: str
state: str
"""Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped)."""
class LifecycleState(str, Enum):
@@ -26,10 +28,14 @@ class LifecycleState(str, Enum):
@dataclass
class ManagementServerSettings:
"""Настройки management HTTP-сервера и healthcheck (один объект на оба)."""
enabled: bool = False
host: str = "0.0.0.0"
port: int = 8000
timeout: float = 3.0
"""Таймаут запроса health (секунды)."""
health_timeout: float = 30.0
"""Секунды без успешного execute(), после которых health = unhealthy."""
# Backward-compatible alias.

View File

@@ -1,34 +1,54 @@
# import os
# os.chdir(os.path.dirname(__file__))
from config_manager import ConfigManager
import logging
import asyncio
from typing import Optional
import logging
from pathlib import Path
from config_manager import ConfigManager
from config_manager.v1.log_manager import LogManager
from config_manager.v2 import ManagementServerSettings
logger = logging.getLogger()
# Таймаут health: без успешного execute() дольше этого времени — unhealthy.
HEALTH_TIMEOUT = 3.0
class MyApp(ConfigManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.iter = 0
def execute(self) -> None:
logger.info(f"current iteration {self.iter}")
"""Успешный прогон сбрасывает таймер health (обновляет время последнего успеха)."""
logger.info("current iteration %s", self.iter)
self.iter += 1
async def main():
app = MyApp("config.yaml")
logger.info("App started")
await app.start()
logger.info("App finished")
async def main() -> None:
log_manager = LogManager()
# Один объект: и HTTP management-сервер (enabled, port), и health (health_timeout).
management_settings = ManagementServerSettings(
enabled=True,
port=8000,
health_timeout=HEALTH_TIMEOUT,
)
config_path = Path(__file__).parent / "config.yaml"
app = MyApp(
str(config_path),
log_manager=log_manager,
management_settings=management_settings,
)
logger.info("App starting (health_timeout=%s)", HEALTH_TIMEOUT)
# Менеджер запускаем в фоне (start() не возвращает управление до stop).
asyncio.create_task(app.start())
logger.info("App running; Ctrl+C to stop")
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())