diff --git a/README.md b/README.md index 01db3b9..32fccd8 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,12 @@ # Config Manager ## Описание -Пакет предназначен для запуска приложений. -Класс ConfigManager реализует точку входа программы и предоставляет актуальную конфигурацию приложения, а также упрощает настройку логирования. +Пакет предназначен для запуска приложений с периодическим выполнением логики, перезагрузкой конфига и управлением по HTTP API. -## ConfigManager v2: устройство и взаимосвязи +**Контракт:** приложение наследует **ConfigManagerV2**, переопределяет **execute()** (периодическая работа). Управление (старт/стоп, health) — через каналы, которые создаются снаружи и передаются в конструктор в **control_channels** (в т.ч. HttpControlChannel для API). -**ConfigManager v2** — точка входа приложения. Он наследует внутреннюю логику от **\_RuntimeController** (циклы воркера и обновления конфига, запуск/остановка каналов управления). +## ConfigManager: устройство и взаимосвязи + +**ConfigManager** (класс ConfigManagerV2) — точка входа приложения. Он наследует внутреннюю логику от **\_RuntimeController** (циклы воркера и обновления конфига, запуск/остановка каналов управления). **Ядро (core):** - **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг. @@ -19,29 +20,72 @@ - **HttpControlChannel** — HTTP API (`/health`, `/actions/start`, `/actions/stop`, `/actions/status`); использует **UvicornServerRunner**; для `/health` вызывает **HealthAggregator.collect()**, для действий — переданные обработчики из **ControlChannelBridge**. - **TelegramControlChannel** — реализация через long polling Telegram; команды `/start`, `/stop`, `/status` вызывают переданные обработчики. -**Поток работы:** при `start()` менеджер собирает список каналов: при `management.enabled: true` в **config.yaml** (секция `management`) добавляется **HttpControlChannel**, плюс опционально **control_channel** / **control_channels** в конструкторе. Все каналы поднимаются с одним **ControlChannelBridge**, затем запускаются два цикла: **WorkerLoop** и периодическое обновление конфига через **ConfigLoader**. Остановка по halt (через любой канал) завершает оба цикла; в конце останавливаются все каналы. Настройки HTTP-канала (host, port, timeout, health_timeout) задаются в config.yaml в секции `management`. +**Поток работы:** при `start()` менеджер поднимает каналы из **control_channels** (заданные снаружи), затем запускает два цикла: **WorkerLoop** и периодическое обновление конфига через **ConfigLoader**. Управление по API: `/health`, `/actions/start`, `/actions/stop` — если в control_channels передан **HttpControlChannel**. Остановка по halt завершает оба цикла; в конце останавливаются все каналы. -## Диаграмма классов (v1 и v2) +## Запуск приложения с ConfigManagerV2 и HttpControlChannel + +1. **Наследуйте ConfigManagerV2** и реализуйте метод `execute()` (в нём — ваша периодическая работа). При необходимости переопределите `get_health_status()` для кастомного ответа `/health`. + +2. **Создайте каналы снаружи и передайте в конструктор.** Для HTTP API создайте **HttpControlChannel**; для health нужен колбэк менеджера — передайте **control_channels** как фабрику (lambda, получающую менеджер): + ```python + from config_manager.v2.control import HttpControlChannel + + app = MyApp( + str(path_to_config), + control_channels=lambda m: [ + HttpControlChannel( + host="0.0.0.0", + port=8000, + timeout=3, + health_provider=m.get_health_provider(), + ) + ], + ) + ``` + Либо передайте готовый список каналов: `control_channels=[channel1, channel2]`. + +3. **Запустите из async-контекста:** `await app.start()` или `asyncio.create_task(app.start())` для фона. Остановка: `await app.stop()` или запрос `/actions/stop` по HTTP. + +**Минимальный пример с HTTP API:** + +```python +import asyncio +import logging +from pathlib import Path + +from config_manager import ConfigManager +from config_manager.v2.control import HttpControlChannel + +class MyApp(ConfigManager): + def execute(self) -> None: + pass # ваша периодическая работа + +async def main() -> None: + app = MyApp( + str(Path(__file__).parent / "config.yaml"), + control_channels=lambda m: [ + HttpControlChannel( + host="0.0.0.0", port=8000, timeout=3, + health_provider=m.get_health_provider(), + ) + ], + ) + asyncio.create_task(app.start()) + await asyncio.sleep(3600) + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + asyncio.run(main()) +``` + +Готовый пример: `tests/test_app.py`. + +## Диаграмма классов ```mermaid classDiagram direction TB - class ConfigManager { - +str path - +Any config - +float update_interval - +float work_interval - -Event _halt - -Task _task - +start() async - +stop() async - +execute()* - -_worker_loop() async - -_periodic_update_loop() async - -_update_config() async - } - class ConfigManagerV2 { +str path +Any config @@ -135,7 +179,6 @@ classDiagram +int port } - ConfigManager --> LogManager : использует ConfigManagerV2 --|> _RuntimeController : наследует ConfigManagerV2 --> ConfigLoader : использует ConfigManagerV2 --> LogManager : использует @@ -150,7 +193,7 @@ classDiagram ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status ``` -## Логирование (v2) +## Логирование Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться. **Как проверить, что конфигурация логирования применилась:** diff --git a/src/config_manager/__init__.py b/src/config_manager/__init__.py index 143ebe5..46c4f50 100644 --- a/src/config_manager/__init__.py +++ b/src/config_manager/__init__.py @@ -1,3 +1,2 @@ from .v2 import ConfigManagerV2 as ConfigManager -from .v1.cfg_manager import ConfigManager as LegacyConfigManager from .v2.core.log_manager import LogManager \ No newline at end of file diff --git a/src/config_manager/v1/cfg_manager.py b/src/config_manager/v1/cfg_manager.py deleted file mode 100644 index f680b56..0000000 --- a/src/config_manager/v1/cfg_manager.py +++ /dev/null @@ -1,141 +0,0 @@ -import logging -import logging.config -import asyncio -import json -import yaml -import os -from typing import Any, Optional - -from ..v2.core.log_manager import LogManager - -class ConfigManager: - DEFAULT_UPDATE_INTERVAL = 5 - DEFAULT_WORK_INTERVAL = 2 - - def __init__(self, path: str, log_manager: Optional[LogManager] = None): - self.path = path - self.config: Any = None - self._last_hash = None - self.update_interval = self.DEFAULT_UPDATE_INTERVAL - self.work_interval = self.DEFAULT_WORK_INTERVAL - self._halt = asyncio.Event() - self._task: Optional[asyncio.Task] = None - self._loop: Optional[asyncio.AbstractEventLoop] = None - - self._log_manager = log_manager or LogManager() - self.logger = logging.getLogger(__name__) - - def _read_file_sync(self) -> str: - with open(self.path, "r", encoding="utf-8") as f: - return f.read() - - async def _read_file_async(self) -> str: - return await asyncio.to_thread(self._read_file_sync) - - def _parse_config(self, data) -> Any: - extension = os.path.splitext(self.path)[1].lower() - if extension in (".yaml", ".yml"): - return yaml.safe_load(data) - else: - return json.loads(data) - - def _update_intervals_from_config(self) -> None: - if not self.config: - return - upd = self.config.get("update_interval") - wrk = self.config.get("work_interval") - - if isinstance(upd, (int, float)) and upd > 0: - self.update_interval = float(upd) - self.logger.info(f"Update interval set to {self.update_interval} seconds") - else: - self.update_interval = self.DEFAULT_UPDATE_INTERVAL - - if isinstance(wrk, (int, float)) and wrk > 0: - self.work_interval = float(wrk) - self.logger.info(f"Work interval set to {self.work_interval} seconds") - else: - self.work_interval = self.DEFAULT_WORK_INTERVAL - - async def _update_config(self) -> None: - try: - data = await self._read_file_async() - current_hash = hash(data) - if current_hash != self._last_hash: - new_config = self._parse_config(data) - self.config = new_config - self._last_hash = current_hash - - self._log_manager.apply_config(new_config) - self._update_intervals_from_config() - - except Exception as e: - self.logger.error(f"Error reading/parsing config file: {e}") - - def execute(self) -> None: - """ - Метод для переопределения в подклассах. - Здесь может быть блокирующая работа. - Запускается в отдельном потоке. - """ - pass - - async def _worker_loop(self) -> None: - self.logger.warning("Worker loop started") - try: - while not self._halt.is_set(): - await asyncio.to_thread(self.execute) - await asyncio.sleep(self.work_interval) - finally: - self.logger.warning("Worker loop stopped") - - async def _periodic_update_loop(self) -> None: - while not self._halt.is_set(): - await self._update_config() - await asyncio.sleep(self.update_interval) - - async def _run(self) -> None: - """Внутренняя корутина, запускающая все циклы""" - self._halt.clear() - self.logger.info("ConfigManager started") - try: - await asyncio.gather( - self._worker_loop(), - self._periodic_update_loop() - ) - except asyncio.CancelledError: - self.logger.info("ConfigManager tasks cancelled") - finally: - self.logger.info("ConfigManager stopped") - - async def start(self) -> None: - if self._task is not None and not self._task.done(): - self.logger.warning("ConfigManager is already running") - return - - try: - self._loop = asyncio.get_running_loop() - except RuntimeError: - self.logger.error("start() must be called from within an async context") - raise - - self.logger.info("ConfigManager starting and awaiting _run()") - await self._run() - - - async def stop(self) -> None: - """Останавливает менеджер конфигурации и ожидает завершения""" - if self._task is None: - self.logger.warning("ConfigManager is not running") - return - - self.logger.info("ConfigManager stopping...") - self._halt.set() - - try: - await self._task - except asyncio.CancelledError: - pass - - self._task = None - self.logger.info("ConfigManager stopped successfully") diff --git a/src/config_manager/v1/log_manager.py b/src/config_manager/v1/log_manager.py deleted file mode 100644 index 0405069..0000000 --- a/src/config_manager/v1/log_manager.py +++ /dev/null @@ -1,4 +0,0 @@ -"""Обратная совместимость: LogManager перенесён в v2.core.log_manager.""" -from ..v2.core.log_manager import LogManager - -__all__ = ["LogManager"] diff --git a/src/config_manager/v2/__init__.py b/src/config_manager/v2/__init__.py index 50b4043..2eb2bed 100644 --- a/src/config_manager/v2/__init__.py +++ b/src/config_manager/v2/__init__.py @@ -1,7 +1,6 @@ -"""Публичный API V2: точка входа в менеджер конфигурации и настройки HTTP-канала из config.yaml. +"""Публичный API: точка входа в менеджер конфигурации. -Экспортирует ConfigManagerV2 и типы настроек для использования приложениями.""" +Контракт: наследование от ConfigManagerV2, переопределение execute(), управление через API (config.yaml, секция management).""" from .core import ConfigManagerV2 -from .core.types import HealthServerSettings, ManagementServerSettings -__all__ = ["ConfigManagerV2", "ManagementServerSettings", "HealthServerSettings"] +__all__ = ["ConfigManagerV2"] diff --git a/src/config_manager/v2/control/http_channel.py b/src/config_manager/v2/control/http_channel.py index acd0091..9dacd80 100644 --- a/src/config_manager/v2/control/http_channel.py +++ b/src/config_manager/v2/control/http_channel.py @@ -235,7 +235,7 @@ class HttpControlChannel(ControlChannel): status_code=404, ) try: - detail = await callback() + detail = await asyncio.wait_for(callback(), timeout=float(self._timeout)) if not detail: detail = f"{action} action accepted" logger.debug( @@ -244,6 +244,12 @@ class HttpControlChannel(ControlChannel): detail, ) return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200) + except asyncio.TimeoutError: + logger.warning("HttpControlChannel._action_response timeout: action=%s timeout=%s", action, self._timeout) + return JSONResponse( + content={"status": "error", "detail": f"{action} handler did not respond within {self._timeout}s"}, + status_code=504, + ) except Exception as exc: # noqa: BLE001 logger.exception("HttpControlChannel._action_response error: action=%s", action) return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) diff --git a/src/config_manager/v2/core/control_bridge.py b/src/config_manager/v2/core/control_bridge.py index 800750a..dabb55a 100644 --- a/src/config_manager/v2/core/control_bridge.py +++ b/src/config_manager/v2/core/control_bridge.py @@ -3,7 +3,6 @@ Предоставляет обработчики start/stop/status для ControlChannel (halt, state, status text).""" from __future__ import annotations -import asyncio import logging from collections.abc import Awaitable, Callable @@ -17,30 +16,30 @@ class ControlChannelBridge: def __init__( self, - halt: asyncio.Event, get_state: Callable[[], LifecycleState], get_status: Callable[[], Awaitable[str]], + start_runtime: Callable[[], Awaitable[str]], + stop_runtime: Callable[[], Awaitable[str]], ): - self._halt = halt self._get_state = get_state self._get_status = get_status + self._start_runtime = start_runtime + self._stop_runtime = stop_runtime logger.debug("ControlChannelBridge.__init__ result: callbacks configured") async def on_start(self) -> str: - """Обработать внешний start: сбросить halt; идемпотентно при уже running.""" + """Обработать внешний start через lifecycle-метод менеджера.""" if self._get_state() == LifecycleState.RUNNING: result = "already running" logger.debug("ControlChannelBridge.on_start result: %s", result) return result - self._halt.clear() - result = "start signal accepted" + result = await self._start_runtime() logger.debug("ControlChannelBridge.on_start result: %s", result) return result async def on_stop(self) -> str: - """Обработать внешний stop: установить halt.""" - self._halt.set() - result = "stop signal accepted" + """Обработать внешний stop через lifecycle-метод менеджера.""" + result = await self._stop_runtime() logger.debug("ControlChannelBridge.on_stop result: %s", result) return result diff --git a/src/config_manager/v2/core/health_aggregator.py b/src/config_manager/v2/core/health_aggregator.py index 4c861d7..937f61b 100644 --- a/src/config_manager/v2/core/health_aggregator.py +++ b/src/config_manager/v2/core/health_aggregator.py @@ -58,6 +58,10 @@ class HealthAggregator: result = self._get_app_health() status = result.get("status", "unhealthy") + if status == "degraded": + degraded = {"status": "degraded", "detail": result.get("detail", "app degraded"), "state": state_value} + logger.debug("HealthAggregator.collect result: %s", degraded) + return degraded if status != "ok": unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value} logger.debug("HealthAggregator.collect result: %s", unhealthy) diff --git a/src/config_manager/v2/core/manager.py b/src/config_manager/v2/core/manager.py index 602e263..2b0cf4e 100644 --- a/src/config_manager/v2/core/manager.py +++ b/src/config_manager/v2/core/manager.py @@ -5,16 +5,16 @@ import asyncio import logging import os import time -from typing import Any, Iterable, Optional +from collections.abc import Callable, Iterable +from typing import Any, Optional, Union -from .log_manager import LogManager from ..control.base import ControlChannel -from ..control.http_channel import HttpControlChannel -from .types import HealthPayload, LifecycleState, ManagementServerSettings, management_settings_from_config from .config_loader import ConfigLoader from .control_bridge import ControlChannelBridge from .health_aggregator import HealthAggregator +from .log_manager import LogManager from .scheduler import WorkerLoop +from .types import HealthPayload, LifecycleState logger = logging.getLogger(__name__) @@ -39,9 +39,27 @@ def _read_env_interval(name: str, default_value: float) -> float: return float(default_value) +def _read_env_optional_float(name: str, default_value: Optional[float]) -> Optional[float]: + """Read optional non-negative float from env (0 or missing => default_value).""" + raw_value = os.environ.get(name) + if raw_value is None: + return default_value + try: + parsed = float(raw_value) + if parsed < 0: + logger.warning("ConfigManagerV2 %s must be >= 0, got %s; using default %s", name, parsed, default_value) + return default_value + return parsed if parsed > 0 else default_value + except Exception: # noqa: BLE001 + logger.exception("ConfigManagerV2 %s parse error: raw_value=%s fallback=%s", name, raw_value, default_value) + return default_value + + class _RuntimeController: """Runtime loops and lifecycle supervision.""" + CONTROL_CHANNEL_TIMEOUT = 5.0 + def _on_execute_success(self) -> None: self._last_success_timestamp = time.monotonic() self._last_execute_error = None @@ -52,10 +70,23 @@ class _RuntimeController: def _on_execute_error(self, exc: Exception) -> None: self._last_execute_error = str(exc) - self.logger.exception("ConfigManagerV2._on_execute_error") - self.logger.debug( - "ConfigManagerV2._on_execute_error result: last_execute_error=%s", + self.logger.error( + "ConfigManagerV2._on_execute_error: %s", self._last_execute_error, + exc_info=(type(exc), exc, exc.__traceback__), + ) + + def _on_worker_degraded_change(self, degraded: bool) -> None: + self._worker_degraded = degraded + self.logger.warning("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded) + + def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None: + self._worker_inflight_count = inflight_count + self._worker_timed_out_inflight_count = timed_out_count + self.logger.debug( + "ConfigManagerV2._on_worker_metrics_change result: inflight=%s timed_out_inflight=%s", + inflight_count, + timed_out_count, ) async def _worker_loop(self) -> None: @@ -69,13 +100,13 @@ class _RuntimeController: halt_event=self._halt, on_error=self._on_execute_error, on_success=self._on_execute_success, + execute_timeout=self._execute_timeout, + on_degraded_change=self._on_worker_degraded_change, + on_metrics_change=self._on_worker_metrics_change, ) try: await worker.run() self.logger.debug("ConfigManagerV2._worker_loop result: completed") - except Exception: # noqa: BLE001 - self.logger.exception("ConfigManagerV2._worker_loop error") - raise finally: self.logger.warning("ConfigManagerV2._worker_loop result: stopped") @@ -91,44 +122,119 @@ class _RuntimeController: await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05)) except asyncio.TimeoutError: continue - except Exception: # noqa: BLE001 - self.logger.exception("ConfigManagerV2._periodic_update_loop error") - raise finally: self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped") async def _status_text(self) -> str: health = await self._health_aggregator.collect() detail = health.get("detail") + worker_tail = ( + f"worker_inflight={self._worker_inflight_count}; " + f"worker_timed_out_inflight={self._worker_timed_out_inflight_count}" + ) if detail: - status_text = f"state={self._state.value}; health={health['status']}; detail={detail}" + status_text = f"state={self._state.value}; health={health['status']}; detail={detail}; {worker_tail}" self.logger.debug("ConfigManagerV2._status_text result: %s", status_text) return status_text - status_text = f"state={self._state.value}; health={health['status']}" + status_text = f"state={self._state.value}; health={health['status']}; {worker_tail}" self.logger.debug("ConfigManagerV2._status_text result: %s", status_text) return status_text async def _start_control_channels(self) -> None: for channel in self._control_channels: try: - await channel.start( - self._control_bridge.on_start, - self._control_bridge.on_stop, - self._control_bridge.on_status, + await asyncio.wait_for( + channel.start( + self._control_bridge.on_start, + self._control_bridge.on_stop, + self._control_bridge.on_status, + ), + timeout=self.CONTROL_CHANNEL_TIMEOUT, ) self.logger.debug("ConfigManagerV2._start_control_channels result: started channel=%s", type(channel).__name__) + except asyncio.TimeoutError: + self.logger.error( + "ConfigManagerV2._start_control_channels timeout channel=%s timeout=%ss", + type(channel).__name__, + self.CONTROL_CHANNEL_TIMEOUT, + ) except Exception: # noqa: BLE001 self.logger.exception("ConfigManagerV2._start_control_channels error channel=%s", type(channel).__name__) async def _stop_control_channels(self) -> None: for channel in self._control_channels: try: - await channel.stop() + await asyncio.wait_for(channel.stop(), timeout=self.CONTROL_CHANNEL_TIMEOUT) self.logger.debug("ConfigManagerV2._stop_control_channels result: stopped channel=%s", type(channel).__name__) + except asyncio.TimeoutError: + self.logger.error( + "ConfigManagerV2._stop_control_channels timeout channel=%s timeout=%ss", + type(channel).__name__, + self.CONTROL_CHANNEL_TIMEOUT, + ) except Exception: # noqa: BLE001 self.logger.exception("ConfigManagerV2._stop_control_channels error channel=%s", type(channel).__name__) - def _on_runtime_task_done(self, task: asyncio.Task) -> None: + async def _run_runtime_loops(self) -> None: + self._state = LifecycleState.RUNNING + self.logger.debug("ConfigManagerV2._run_runtime_loops result: state=%s", self._state.value) + tasks = [ + asyncio.create_task(self._worker_loop(), name="v2-worker-loop"), + asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"), + ] + try: + await asyncio.gather(*tasks) + self.logger.debug("ConfigManagerV2._run_runtime_loops result: loops completed") + except asyncio.CancelledError: + self.logger.debug("ConfigManagerV2._run_runtime_loops result: cancelled") + raise + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._run_runtime_loops error") + raise + finally: + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + self._state = LifecycleState.STOPPED + self.logger.debug("ConfigManagerV2._run_runtime_loops result: state=%s", self._state.value) + + async def _start_runtime(self) -> str: + if self._shutdown.is_set(): + result = "manager is shutting down" + self.logger.debug("ConfigManagerV2._start_runtime result: %s", result) + return result + + async with self._runtime_lock: + if self._runtime_task is not None and not self._runtime_task.done(): + result = "already running" + self.logger.debug("ConfigManagerV2._start_runtime result: %s", result) + return result + self._halt.clear() + self._state = LifecycleState.STARTING + self._runtime_task = asyncio.create_task(self._run_runtime_loops(), name="config-manager-v2-runtime") + self._runtime_task.add_done_callback(self._on_runtime_task_done) + + result = "start signal accepted" + self.logger.debug("ConfigManagerV2._start_runtime result: %s", result) + return result + + async def _stop_runtime(self) -> str: + self._halt.set() + async with self._runtime_lock: + runtime_task = self._runtime_task + if runtime_task is None: + result = "already stopped" + self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result) + return result + if runtime_task.done(): + result = "already stopped" + self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result) + return result + result = "stop signal accepted" + self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result) + return result + + def _on_runtime_task_done(self, task: asyncio.Task[None]) -> None: if task.cancelled(): self.logger.debug("ConfigManagerV2._on_runtime_task_done result: cancelled") return @@ -141,82 +247,83 @@ class _RuntimeController: self.logger.debug("ConfigManagerV2._on_runtime_task_done result: completed") return self.logger.error( - "ConfigManagerV2 background task failed", + "ConfigManagerV2 runtime task failed", exc_info=(type(exc), exc, exc.__traceback__), ) - self.logger.debug("ConfigManagerV2._on_runtime_task_done result: failed") async def _run(self) -> None: + self._shutdown.clear() + self._halt.clear() self._state = LifecycleState.STARTING self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value) - self._halt.clear() await self._update_config() - await self._start_control_channels() + await self._start_runtime() - self._state = LifecycleState.RUNNING - self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value) - tasks = [ - asyncio.create_task(self._worker_loop(), name="v2-worker-loop"), - asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"), - ] try: - await asyncio.gather(*tasks) - self.logger.debug("ConfigManagerV2._run result: background loops completed") - except asyncio.CancelledError: - self.logger.debug("ConfigManagerV2._run result: cancelled") - raise - except Exception: # noqa: BLE001 - self.logger.exception("ConfigManagerV2._run error") - raise + await self._shutdown.wait() finally: self._state = LifecycleState.STOPPING self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value) self._halt.set() - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) + + async with self._runtime_lock: + runtime_task = self._runtime_task + if runtime_task is not None: + try: + await runtime_task + except asyncio.CancelledError: + self.logger.debug("ConfigManagerV2._run result: runtime task cancelled") + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._run error while awaiting runtime task") + await self._stop_control_channels() + self._runtime_task = None self._state = LifecycleState.STOPPED self._task = None - self.logger.debug( - "ConfigManagerV2._run result: state=%s", - self._state.value, - ) + self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value) class ConfigManagerV2(_RuntimeController): - """Public manager API.""" + """Public manager API. Каналы управления задаются снаружи через control_channels.""" DEFAULT_UPDATE_INTERVAL = 5 DEFAULT_WORK_INTERVAL = 2 + DEFAULT_HEALTH_TIMEOUT = 30 + DEFAULT_EXECUTE_TIMEOUT = 600.0 def __init__( self, path: str, - log_manager: Optional[LogManager] = None, - control_channel: Optional[ControlChannel] = None, - control_channels: Optional[Iterable[ControlChannel]] = None, + control_channels: Optional[ + Union[Iterable[ControlChannel], Callable[["ConfigManagerV2"], Iterable[ControlChannel]]] + ] = None, ): self.logger = logging.getLogger(__name__) self.path = path self.config: Any = None self.update_interval = _read_env_interval("UPDATE_INTERVAL", float(self.DEFAULT_UPDATE_INTERVAL)) self.work_interval = _read_env_interval("WORK_INTERVAL", float(self.DEFAULT_WORK_INTERVAL)) + self._execute_timeout = _read_env_optional_float("EXECUTE_TIMEOUT", float(self.DEFAULT_EXECUTE_TIMEOUT)) self._loader = ConfigLoader(path) - self._log_manager = log_manager or LogManager() + self._log_manager = LogManager() self._halt = asyncio.Event() - self._task: Optional[asyncio.Task] = None + self._shutdown = asyncio.Event() + self._task: Optional[asyncio.Task[None]] = None + self._runtime_task: Optional[asyncio.Task[None]] = None self._loop: Optional[asyncio.AbstractEventLoop] = None + self._runtime_lock = asyncio.Lock() self._state = LifecycleState.IDLE self._last_execute_error: Optional[str] = None self._last_success_timestamp: Optional[float] = None + self._worker_degraded = False + self._worker_inflight_count = 0 + self._worker_timed_out_inflight_count = 0 initial_config = self._loader.load_sync() self.config = initial_config - settings = management_settings_from_config(initial_config if isinstance(initial_config, dict) else {}) - self._health_timeout = settings.health_timeout + self._health_timeout = self.DEFAULT_HEALTH_TIMEOUT self._health_aggregator = HealthAggregator( get_state=lambda: self._state, get_last_error=lambda: self._last_execute_error, @@ -225,30 +332,23 @@ class ConfigManagerV2(_RuntimeController): get_app_health=self.get_health_status, ) self._control_bridge = ControlChannelBridge( - halt=self._halt, get_state=lambda: self._state, get_status=self._status_text, + start_runtime=self._start_runtime, + stop_runtime=self._stop_runtime, ) - channels: list[ControlChannel] = [] - if settings.enabled: - channels.append( - HttpControlChannel( - host=settings.host, - port=settings.port, - timeout=settings.timeout, - health_provider=self._health_aggregator.collect, - ) - ) - if control_channels is not None: - channels.extend(control_channels) - if control_channel is not None: - channels.append(control_channel) - self._control_channels = channels + if control_channels is None: + self._control_channels = [] + elif callable(control_channels): + self._control_channels = list(control_channels(self)) + else: + self._control_channels = list(control_channels) self.logger.debug( - "ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s control_channels=%s", + "ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s execute_timeout=%s control_channels=%s", self.path, self.update_interval, self.work_interval, + self._execute_timeout, len(self._control_channels), ) @@ -294,10 +394,17 @@ class ConfigManagerV2(_RuntimeController): """Override in subclasses.""" def get_health_status(self) -> HealthPayload: + if self._worker_degraded: + return {"status": "degraded", "detail": "worker has timed-out in-flight execute()"} return {"status": "ok"} + def get_health_provider(self) -> Callable[[], Any]: + """Вернуть колбэк для health (для передачи в HttpControlChannel при создании канала снаружи).""" + return self._health_aggregator.collect + async def start(self) -> None: if self._task is not None and not self._task.done(): + await self._start_runtime() self.logger.debug("ConfigManagerV2.start result: already running") return try: @@ -306,23 +413,20 @@ class ConfigManagerV2(_RuntimeController): self.logger.exception("ConfigManagerV2.start error: must be called from within async context") raise self._task = asyncio.create_task(self._run(), name="config-manager-v2") - self._task.add_done_callback(self._on_runtime_task_done) self.logger.debug("ConfigManagerV2.start result: background task started") async def stop(self) -> None: if self._task is None: self.logger.debug("ConfigManagerV2.stop result: not running") return + self._shutdown.set() self._halt.set() if asyncio.current_task() is self._task: - self.logger.debug("ConfigManagerV2.stop result: stop requested from runtime task") + self.logger.debug("ConfigManagerV2.stop result: stop requested from supervisor task") return try: await self._task except asyncio.CancelledError: - self.logger.debug("ConfigManagerV2.stop result: runtime task cancelled") - except Exception: # noqa: BLE001 - self.logger.exception("ConfigManagerV2.stop error while awaiting runtime task") - raise + self.logger.debug("ConfigManagerV2.stop result: supervisor task cancelled") finally: self.logger.debug("ConfigManagerV2.stop result: completed") diff --git a/src/config_manager/v2/core/scheduler.py b/src/config_manager/v2/core/scheduler.py index 436cdde..6480c4b 100644 --- a/src/config_manager/v2/core/scheduler.py +++ b/src/config_manager/v2/core/scheduler.py @@ -1,16 +1,29 @@ -"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке с паузой между итерациями. +"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке. -Поддерживает halt-событие для остановки, колбэки on_success/on_error для учёта ошибок и здоровья.""" +Базовый режим: один in-flight execute(). +Режим деградации: если активный execute() превысил timeout, запускается второй worker-thread, +чтобы работа продолжалась. Одновременно допускается не более двух in-flight задач.""" from __future__ import annotations import asyncio import logging +import time from collections.abc import Callable +from dataclasses import dataclass +from itertools import count from typing import Optional logger = logging.getLogger(__name__) +@dataclass +class _InFlightExecute: + id: int + task: asyncio.Task[None] + started_at: float + timeout_reported: bool = False + + class WorkerLoop: def __init__( self, @@ -19,36 +32,133 @@ class WorkerLoop: halt_event: asyncio.Event, on_error: Optional[Callable[[Exception], None]] = None, on_success: Optional[Callable[[], None]] = None, + execute_timeout: Optional[float] = None, + on_degraded_change: Optional[Callable[[bool], None]] = None, + on_metrics_change: Optional[Callable[[int, int], None]] = None, ): - """Сохранить колбэки и примитивы синхронизации для выполнения воркера.""" self._execute = execute self._get_interval = get_interval self._halt_event = halt_event self._on_error = on_error self._on_success = on_success - logger.warning( - "WorkerLoop.__init__ result: execute=%s", - getattr(execute, "__name__", execute.__class__.__name__), + self._execute_timeout = execute_timeout + self._on_degraded_change = on_degraded_change + self._on_metrics_change = on_metrics_change + self._inflight: list[_InFlightExecute] = [] + self._id_seq = count(1) + self._degraded = False + self._last_metrics: Optional[tuple[int, int]] = None + logger.debug( + "WorkerLoop.__init__ result: execute=%s execute_timeout=%s", + getattr(execute, "__name__", type(execute).__name__), + execute_timeout, ) - async def run(self) -> None: - """Вызывать execute циклически до запроса остановки.""" - logger.warning("WorkerLoop.run result: started") - while not self._halt_event.is_set(): + def _notify_error(self, exc: Exception) -> None: + if self._on_error is not None: + self._on_error(exc) + + def _set_degraded(self, value: bool) -> None: + if self._degraded == value: + return + self._degraded = value + if self._on_degraded_change is not None: + self._on_degraded_change(value) + logger.warning("WorkerLoop.run degraded state changed: degraded=%s", value) + + def _start_execute(self) -> None: + execute_id = next(self._id_seq) + execution = _InFlightExecute( + id=execute_id, + task=asyncio.create_task(asyncio.to_thread(self._execute), name=f"worker-loop-execute-{execute_id}"), + started_at=time.monotonic(), + ) + self._inflight.append(execution) + logger.debug("WorkerLoop.run result: execute started id=%s inflight=%s", execution.id, len(self._inflight)) + + def _collect_finished(self) -> None: + pending: list[_InFlightExecute] = [] + for execution in self._inflight: + task = execution.task + if not task.done(): + pending.append(execution) + continue try: - await asyncio.to_thread(self._execute) + task.result() if self._on_success is not None: self._on_success() - logger.warning("WorkerLoop.run result: execute completed") + logger.debug("WorkerLoop.run result: execute completed id=%s", execution.id) except Exception as exc: # noqa: BLE001 - logger.exception("WorkerLoop.run error during execute") - if self._on_error is not None: - self._on_error(exc) - logger.warning("WorkerLoop.run result: execute failed") + self._notify_error(exc) + logger.exception("WorkerLoop.run error during execute id=%s", execution.id) + self._inflight = pending + + def _mark_timeouts(self) -> None: + if self._execute_timeout is None or self._execute_timeout <= 0: + return + now = time.monotonic() + for execution in self._inflight: + if execution.timeout_reported: + continue + if execution.task.done(): + continue + elapsed = now - execution.started_at + if elapsed < self._execute_timeout: + continue + execution.timeout_reported = True + self._notify_error(TimeoutError(f"execute() did not finish within {self._execute_timeout}s")) + logger.warning( + "WorkerLoop.run execute timeout: id=%s elapsed=%.3fs timeout=%ss", + execution.id, + elapsed, + self._execute_timeout, + ) + + def _has_timed_out_inflight(self) -> bool: + return any(item.timeout_reported and not item.task.done() for item in self._inflight) + + def _ensure_capacity(self) -> None: + active_count = len(self._inflight) + if active_count == 0: + self._start_execute() + return + if active_count == 1 and self._has_timed_out_inflight(): + self._start_execute() + return + + def _emit_metrics(self) -> None: + if self._on_metrics_change is None: + return + inflight_count = len(self._inflight) + timed_out_count = sum(1 for item in self._inflight if item.timeout_reported and not item.task.done()) + metrics = (inflight_count, timed_out_count) + if self._last_metrics == metrics: + return + self._last_metrics = metrics + self._on_metrics_change(inflight_count, timed_out_count) + + async def run(self) -> None: + logger.debug("WorkerLoop.run result: started") + while not self._halt_event.is_set(): + self._collect_finished() + self._mark_timeouts() + self._set_degraded(self._has_timed_out_inflight()) + self._ensure_capacity() + self._emit_metrics() timeout = max(self._get_interval(), 0.01) try: await asyncio.wait_for(self._halt_event.wait(), timeout=timeout) except asyncio.TimeoutError: continue - logger.warning("WorkerLoop.run result: stopped") + + if self._inflight: + if self._has_timed_out_inflight(): + logger.warning("WorkerLoop.run stop: timed-out execute still running; exiting without waiting") + else: + await asyncio.gather(*(item.task for item in self._inflight), return_exceptions=True) + self._collect_finished() + + self._set_degraded(False) + self._emit_metrics() + logger.debug("WorkerLoop.run result: stopped") diff --git a/src/config_manager/v2/core/types.py b/src/config_manager/v2/core/types.py index 7a9f040..6f93c2b 100644 --- a/src/config_manager/v2/core/types.py +++ b/src/config_manager/v2/core/types.py @@ -1,11 +1,10 @@ -"""Типы core: состояние здоровья, жизненного цикла и настройки HTTP-канала из config.yaml. +"""Типы core: состояние здоровья и жизненного цикла. Используются в core и control для единообразных контрактов.""" from __future__ import annotations -from dataclasses import dataclass from enum import Enum -from typing import Any, Literal, TypedDict +from typing import Literal, TypedDict HealthState = Literal["ok", "degraded", "unhealthy"] @@ -24,39 +23,3 @@ class LifecycleState(str, Enum): RUNNING = "running" STOPPING = "stopping" STOPPED = "stopped" - - -@dataclass -class ManagementServerSettings: - """Настройки HTTP-канала управления и healthcheck (читаются из config.yaml, секция management).""" - enabled: bool = False - host: str = "0.0.0.0" - port: int = 8000 - timeout: int = 3 - """Таймаут запроса health (секунды).""" - health_timeout: int = 30 - """Секунды без успешного execute(), после которых health = unhealthy.""" - - -HealthServerSettings = ManagementServerSettings - - -def management_settings_from_config(config: Any) -> ManagementServerSettings: - """Извлечь настройки HTTP-канала из конфига (секция `management`).""" - if not isinstance(config, dict): - return ManagementServerSettings(enabled=False) - m = config.get("management") - if not isinstance(m, dict): - return ManagementServerSettings(enabled=False) - enabled = bool(m.get("enabled", False)) - host = str(m.get("host", "0.0.0.0")) - port = int(m.get("port", 8000)) if isinstance(m.get("port"), (int, float)) else 8000 - timeout = int(m.get("timeout", 3)) if isinstance(m.get("timeout"), (int, float)) else 3 - health_timeout = int(m.get("health_timeout", 30)) if isinstance(m.get("health_timeout"), (int, float)) else 30 - return ManagementServerSettings( - enabled=enabled, - host=host, - port=port, - timeout=timeout, - health_timeout=health_timeout, - ) diff --git a/tests/config.yaml b/tests/config.yaml index 38c54c9..7f38f67 100644 --- a/tests/config.yaml +++ b/tests/config.yaml @@ -2,12 +2,12 @@ runtime: 5 # === HTTP-канал управления (ConfigManagerV2): /health, /actions/start, /actions/stop === -# management: -# enabled: true -# host: "0.0.0.0" -# port: 8000 -# timeout: 3 -# health_timeout: 30 +management: + enabled: true + host: "0.0.0.0" + port: 8000 + timeout: 3 + health_timeout: 30 # === Логирование === log: diff --git a/tests/test_app.py b/tests/test_app.py index b26a952..6b85252 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -6,14 +6,10 @@ import logging from pathlib import Path from config_manager import ConfigManager -from config_manager.v2.core import LogManager -from config_manager.v2 import ManagementServerSettings +from config_manager.v2.control import HttpControlChannel logger = logging.getLogger() -# Таймаут health: без успешного execute() дольше этого времени — unhealthy. -HEALTH_TIMEOUT = 3.0 - class MyApp(ConfigManager): def __init__(self, *args, **kwargs): @@ -31,21 +27,20 @@ class MyApp(ConfigManager): async def main() -> None: - log_manager = LogManager() - # Один объект: и HTTP management-сервер (enabled, port), и health (health_timeout). - management_settings = ManagementServerSettings( - enabled=True, - port=8000, - health_timeout=HEALTH_TIMEOUT, - ) config_path = Path(__file__).parent / "config.yaml" print(config_path) app = MyApp( str(config_path), - log_manager=log_manager, - management_settings=management_settings, + control_channels=lambda m: [ + HttpControlChannel( + host="0.0.0.0", + port=8000, + timeout=3, + health_provider=m.get_health_provider(), + ) + ], ) - logger.info("App starting (health_timeout=%s)", HEALTH_TIMEOUT) + logger.info("App starting") # Менеджер запускаем в фоне (start() не возвращает управление до stop). asyncio.create_task(app.start()) diff --git a/tests/v2/test_control_channel.py b/tests/v2/test_control_channel.py index 462631f..aeeb184 100644 --- a/tests/v2/test_control_channel.py +++ b/tests/v2/test_control_channel.py @@ -36,7 +36,7 @@ def test_control_channel_can_stop_manager(tmp_path): cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8") channel = DummyControlChannel() - app = ControlledApp(str(cfg), control_channel=channel) + app = ControlledApp(str(cfg), control_channels=[channel]) runner = asyncio.create_task(app.start()) await asyncio.sleep(0.12) @@ -47,11 +47,14 @@ def test_control_channel_can_stop_manager(tmp_path): status_text = await channel.on_status() assert "state=running" in status_text + assert "worker_inflight=" in status_text + assert "worker_timed_out_inflight=" in status_text stop_text = await channel.on_stop() assert "stop signal accepted" in stop_text await runner - # Менеджер при остановке не вызывает control_channel.stop() (канал остаётся доступным) + await app.stop() + assert channel.stopped is True asyncio.run(scenario()) diff --git a/tests/v2/test_runtime_resilience.py b/tests/v2/test_runtime_resilience.py new file mode 100644 index 0000000..dfa9f56 --- /dev/null +++ b/tests/v2/test_runtime_resilience.py @@ -0,0 +1,153 @@ +import asyncio +import threading +import time + +from config_manager.v2 import ConfigManagerV2 +from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler + + +class DummyControlChannel(ControlChannel): + def __init__(self): + self.on_start: StartHandler | None = None + self.on_stop: StopHandler | None = None + self.on_status: StatusHandler | None = None + self.started = False + self.stopped = False + + async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None: + self.on_start = on_start + self.on_stop = on_stop + self.on_status = on_status + self.started = True + + async def stop(self) -> None: + self.stopped = True + + +class RestartableApp(ConfigManagerV2): + DEFAULT_UPDATE_INTERVAL = 0.05 + DEFAULT_WORK_INTERVAL = 0.05 + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.calls = 0 + + def execute(self) -> None: + self.calls += 1 + + +class TimeoutAwareApp(ConfigManagerV2): + DEFAULT_UPDATE_INTERVAL = 0.05 + DEFAULT_WORK_INTERVAL = 0.02 + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.calls = 0 + self.active = 0 + self.max_active = 0 + self._lock = threading.Lock() + + def execute(self) -> None: + with self._lock: + self.calls += 1 + self.active += 1 + self.max_active = max(self.max_active, self.active) + try: + time.sleep(0.2) + finally: + with self._lock: + self.active -= 1 + + +class NormalSingleThreadApp(ConfigManagerV2): + DEFAULT_UPDATE_INTERVAL = 0.05 + DEFAULT_WORK_INTERVAL = 0.02 + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.calls = 0 + self.active = 0 + self.max_active = 0 + self._lock = threading.Lock() + + def execute(self) -> None: + with self._lock: + self.calls += 1 + self.active += 1 + self.max_active = max(self.max_active, self.active) + try: + time.sleep(0.03) + finally: + with self._lock: + self.active -= 1 + + +def test_control_channel_stop_and_start_resumes_execute(tmp_path): + async def scenario() -> None: + cfg = tmp_path / "config.yaml" + cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8") + + channel = DummyControlChannel() + app = RestartableApp(str(cfg), control_channels=[channel]) + await app.start() + await asyncio.sleep(0.2) + before_stop = app.calls + assert before_stop > 0 + + assert channel.on_stop is not None + assert channel.on_start is not None + stop_text = await channel.on_stop() + assert "stop signal accepted" in stop_text + + await asyncio.sleep(0.2) + after_stop = app.calls + assert after_stop == before_stop + + start_text = await channel.on_start() + assert "start signal accepted" in start_text + await asyncio.sleep(0.2) + assert app.calls > after_stop + + await app.stop() + assert channel.stopped is True + + asyncio.run(scenario()) + + +def test_normal_mode_uses_single_inflight_execute(tmp_path): + async def scenario() -> None: + cfg = tmp_path / "config.yaml" + cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8") + + app = NormalSingleThreadApp(str(cfg)) + await app.start() + await asyncio.sleep(0.25) + health = await app.get_health_provider()() + await app.stop() + + assert app.calls >= 2 + assert app.max_active == 1 + assert health["status"] == "ok" + + asyncio.run(scenario()) + + +def test_execute_timeout_does_not_start_parallel_runs(tmp_path, monkeypatch): + async def scenario() -> None: + cfg = tmp_path / "config.yaml" + cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8") + + monkeypatch.setenv("EXECUTE_TIMEOUT", "0.05") + app = TimeoutAwareApp(str(cfg)) + await app.start() + await asyncio.sleep(0.35) + degraded_health = await app.get_health_provider()() + await app.stop() + + assert app.calls >= 1 + assert app._last_execute_error is not None + assert "did not finish within" in app._last_execute_error + assert app.max_active == 2 + assert degraded_health["status"] == "degraded" + + asyncio.run(scenario())