diff --git a/README.md b/README.md index 07636e6..107cc9f 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,167 @@ # Config Manager -## Description -This package was created to run my applications. -The ConfigManager class implements the entry point for the program and provides the actual application configuration. It also simplifies logging setup. +## Описание +Пакет предназначен для запуска приложений. +Класс ConfigManager реализует точку входа программы и предоставляет актуальную конфигурацию приложения, а также упрощает настройку логирования. -## Logging (v2) -Logging is configured from the config file only if it contains a **`log`** section in [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig) format. If there is no `log` section, the manager logs a warning and the default Python level (WARNING) remains, so INFO/DEBUG messages may not appear. +## ConfigManager v2: устройство и взаимосвязи -**How to verify that logging config is applied:** -- Ensure your config file path is correct and the file is loaded on startup (no error in logs about reading config). -- Ensure the config has a `log` key with `version: 1`, `handlers`, and `loggers` (see `tests/config.yaml` for an example). -- After startup you should see an INFO message: `"Logging configuration applied"` (from `config_manager.v1.log_manager`). If you do not see it, either the `log` section is missing (you will see a warning) or the root/package log level is above INFO. +**ConfigManager v2** — точка входа приложения. Он наследует внутреннюю логику от **\_RuntimeController** (циклы воркера и обновления конфига, запуск/остановка каналов управления). -## Installation +**Ядро (core):** +- **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг. +- **WorkerLoop** — в отдельном потоке циклически вызывает ваш метод `execute()` с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки. +- **LogManager** (v1) — применяет секцию `log` из конфига к логированию (dictConfig). +- **HealthAggregator** — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного `execute()` и таймаут здоровья; формирует единый ответ для health (ok/unhealthy). +- **ControlChannelBridge** — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса). + +**Каналы управления (control):** +- **ControlChannel** — абстрактный контракт: `start(on_start, on_stop, on_status)`, `stop()`. +- **HttpControlChannel** — HTTP API (`/health`, `/actions/start`, `/actions/stop`, `/actions/status`); использует **UvicornServerRunner**; для `/health` вызывает **HealthAggregator.collect()**, для действий — переданные обработчики из **ControlChannelBridge**. +- **TelegramControlChannel** — реализация через long polling Telegram; команды `/start`, `/stop`, `/status` вызывают переданные обработчики. + +**Поток работы:** при `start()` менеджер собирает список каналов (при `management_settings.enabled` — **HttpControlChannel**, плюс опционально **control_channel** / **control_channels**), поднимает все каналы с одним **ControlChannelBridge**, затем запускает два цикла: **WorkerLoop** и периодическое обновление конфига через **ConfigLoader**. Остановка по halt (через любой канал) завершает оба цикла; в конце останавливаются все каналы. + +## Диаграмма классов (v1 и v2) + +```mermaid +classDiagram + direction TB + + class ConfigManager { + +str path + +Any config + +float update_interval + +float work_interval + -Event _halt + -Task _task + +start() async + +stop() async + +execute()* + -_worker_loop() async + -_periodic_update_loop() async + -_update_config() async + } + + class ConfigManagerV2 { + +str path + +Any config + +float update_interval + +float work_interval + -ConfigLoader _loader + -LifecycleState _state + +start() async + +stop() async + +execute()* + +get_health_status() HealthPayload + -_run() async + -_worker_loop() async + -_periodic_update_loop() async + } + + class _RuntimeController { + <<внутренний>> + -_on_execute_success() + -_on_execute_error(exc) + -_worker_loop() async + -_periodic_update_loop() async + -_start_control_channels() async + -_stop_control_channels() async + -_run() async + } + + class ConfigLoader { + +str path + +Any config + +Any last_valid_config + +load_if_changed() async + +parse_config(data) Any + -_read_file_sync() str + -read_file_async() async + } + + class WorkerLoop { + -Callable execute + -Callable get_interval + -Event halt_event + +run() async + } + + class LogManager { + +apply_config(config) None + } + + class ControlChannel { + <<абстрактный>> + +start(on_start, on_stop, on_status) async* + +stop() async* + } + + class TelegramControlChannel { + -str _token + -int _chat_id + +start(on_start, on_stop, on_status) async + +stop() async + -_poll_loop() async + } + + class HttpControlChannel { + -UvicornServerRunner _runner + -Callable _health_provider + +start(on_start, on_stop, on_status) async + +stop() async + +int port + } + + class HealthAggregator { + -Callable get_state + -Callable get_app_health + +collect() async HealthPayload + } + + class ControlChannelBridge { + -Event _halt + -Callable _get_state + -Callable _get_status + +on_start() async str + +on_stop() async str + +on_status() async str + } + + class UvicornServerRunner { + -Server _server + -Task _serve_task + +start(app) async + +stop() async + +int port + } + + ConfigManager --> LogManager : использует + ConfigManagerV2 --|> _RuntimeController : наследует + ConfigManagerV2 --> ConfigLoader : использует + ConfigManagerV2 --> LogManager : использует + ConfigManagerV2 --> HealthAggregator : использует + ConfigManagerV2 --> ControlChannelBridge : использует + ConfigManagerV2 ..> ControlChannel : список каналов + _RuntimeController ..> WorkerLoop : создаёт в _worker_loop + TelegramControlChannel --|> ControlChannel : реализует + HttpControlChannel --|> ControlChannel : реализует + HttpControlChannel --> UvicornServerRunner : использует + HttpControlChannel ..> HealthAggregator : health_provider + ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status +``` + +## Логирование (v2) +Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться. + +**Как проверить, что конфигурация логирования применилась:** +- Убедитесь, что путь к файлу конфига верный и файл загружается при старте (в логах нет ошибки чтения конфига). +- Убедитесь, что в конфиге есть ключ `log` с `version: 1`, `handlers` и `loggers` (пример — `tests/config.yaml`). +- После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v1.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO. + +## Установка ``pip install git+https://git.lesha.spb.ru/alex/config_manager.git`` -## Contacts +## Контакты - **e-mail**: lesha.spb@gmail.com - **telegram**: https://t.me/lesha_spb diff --git a/src/config_manager/v2/control/__init__.py b/src/config_manager/v2/control/__init__.py index 12b40e1..5ef4e0c 100644 --- a/src/config_manager/v2/control/__init__.py +++ b/src/config_manager/v2/control/__init__.py @@ -1,7 +1,8 @@ -"""Каналы внешнего управления: абстракция и реализация (например, Telegram). +"""Каналы внешнего управления: абстракция и реализация (HTTP, Telegram). -Позволяет запускать, останавливать и запрашивать статус менеджера через ботов и другие интерфейсы.""" +Позволяет запускать, останавливать и запрашивать статус менеджера через HTTP API и ботов.""" from .base import ControlChannel +from .http_channel import HttpControlChannel from .telegram import TelegramControlChannel -__all__ = ["ControlChannel", "TelegramControlChannel"] +__all__ = ["ControlChannel", "HttpControlChannel", "TelegramControlChannel"] diff --git a/src/config_manager/v2/management/management_server.py b/src/config_manager/v2/control/http_channel.py similarity index 69% rename from src/config_manager/v2/management/management_server.py rename to src/config_manager/v2/control/http_channel.py index c547d7c..c0a3b85 100644 --- a/src/config_manager/v2/management/management_server.py +++ b/src/config_manager/v2/control/http_channel.py @@ -1,21 +1,25 @@ -"""Management HTTP API with /health, /actions/start and /actions/stop.""" +"""HTTP-канал управления: /health, /actions/start, /actions/stop (реализация ControlChannel).""" from __future__ import annotations import asyncio import json import logging +import time from collections.abc import Awaitable, Callable from typing import Any, Optional from fastapi import FastAPI +from fastapi import Request from fastapi.responses import JSONResponse from uvicorn import Config, Server from ..types import HealthPayload +from .base import ControlChannel, StartHandler, StatusHandler, StopHandler PATH_HEALTH = "/health" PATH_ACTION_START = "/actions/start" PATH_ACTION_STOP = "/actions/stop" +PATH_ACTION_STATUS = "/actions/status" logger = logging.getLogger(__name__) @@ -29,7 +33,7 @@ class UvicornServerRunner: self._server: Optional[Server] = None self._serve_task: Optional[asyncio.Task[None]] = None self._bound_port: Optional[int] = None - logger.warning( + logger.debug( "UvicornServerRunner.__init__ result: host=%s port=%s timeout=%s", self._host, self._port, @@ -83,21 +87,21 @@ class UvicornServerRunner: self._server = None self._serve_task = None self._bound_port = None - logger.warning("UvicornServerRunner._cleanup_start_failure result: state reset") + logger.debug("UvicornServerRunner._cleanup_start_failure result: state reset") async def start(self, app: FastAPI) -> None: if self._serve_task is not None and not self._serve_task.done(): - logger.warning("UvicornServerRunner.start result: already running") + logger.debug("UvicornServerRunner.start result: already running") return if self._serve_task is not None and self._serve_task.done(): self._serve_task = None try: config = Config(app=app, host=self._host, port=self._port, log_level="warning") self._server = Server(config) - self._serve_task = asyncio.create_task(self._server.serve(), name="management-server-serve") + self._serve_task = asyncio.create_task(self._server.serve(), name="http-control-channel-serve") await self._wait_until_started() self._bound_port = self._resolve_bound_port() - logger.warning( + logger.debug( "UvicornServerRunner.start result: running host=%s requested_port=%s bound_port=%s", self._host, self._port, @@ -110,7 +114,7 @@ class UvicornServerRunner: async def stop(self) -> None: if self._server is None or self._serve_task is None: - logger.warning("UvicornServerRunner.stop result: already stopped") + logger.debug("UvicornServerRunner.stop result: already stopped") return self._server.should_exit = True try: @@ -122,17 +126,17 @@ class UvicornServerRunner: self._server = None self._serve_task = None self._bound_port = None - logger.warning("UvicornServerRunner.stop result: stopped") + logger.debug("UvicornServerRunner.stop result: stopped") @property def port(self) -> int: result = self._bound_port if self._bound_port is not None else self._port - logger.warning("UvicornServerRunner.port result: %s", result) + logger.debug("UvicornServerRunner.port result: %s", result) return result -class ManagementServer: - """Management API endpoints and callback adapters.""" +class HttpControlChannel(ControlChannel): + """HTTP API как канал управления: /health, /actions/start, /actions/stop, /actions/status.""" def __init__( self, @@ -140,17 +144,18 @@ class ManagementServer: port: int, timeout: int, health_provider: Callable[[], Awaitable[HealthPayload]], - on_start: Optional[Callable[[], Awaitable[str]]] = None, - on_stop: Optional[Callable[[], Awaitable[str]]] = None, ): + self._host = host + self._port = port self._timeout = timeout self._health_provider = health_provider - self._on_start = on_start - self._on_stop = on_stop + self._on_start: Optional[StartHandler] = None + self._on_stop: Optional[StopHandler] = None + self._on_status: Optional[StatusHandler] = None self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout) - self._app = self._create_app() - logger.warning( - "ManagementServer.__init__ result: host=%s port=%s timeout=%s", + self._app: Optional[FastAPI] = None + logger.debug( + "HttpControlChannel.__init__ result: host=%s port=%s timeout=%s", host, port, timeout, @@ -159,6 +164,20 @@ class ManagementServer: def _create_app(self) -> FastAPI: app = FastAPI(title="Config Manager Management API") + @app.middleware("http") + async def log_api_call(request: Request, call_next): # type: ignore[no-untyped-def] + started = time.monotonic() + response = await call_next(request) + duration_ms = int((time.monotonic() - started) * 1000) + logger.info( + "API call: method=%s path=%s status=%s duration_ms=%s", + request.method, + request.url.path, + response.status_code, + duration_ms, + ) + return response + @app.get(PATH_HEALTH) async def health() -> JSONResponse: return await self._health_response() @@ -173,11 +192,17 @@ class ManagementServer: async def action_stop() -> JSONResponse: return await self._action_response("stop", self._on_stop) - logger.warning( - "ManagementServer._create_app result: routes=%s,%s,%s", + @app.get(PATH_ACTION_STATUS) + @app.post(PATH_ACTION_STATUS) + async def action_status() -> JSONResponse: + return await self._action_response("status", self._on_status) + + logger.debug( + "HttpControlChannel._create_app result: routes=%s,%s,%s,%s", PATH_HEALTH, PATH_ACTION_START, PATH_ACTION_STOP, + PATH_ACTION_STATUS, ) return app @@ -185,14 +210,14 @@ class ManagementServer: try: payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout) status_code = 200 if payload.get("status", "unhealthy") == "ok" else 503 - logger.warning( - "ManagementServer._health_response result: status_code=%s payload=%s", + logger.debug( + "HttpControlChannel._health_response result: status_code=%s payload=%s", status_code, payload, ) return JSONResponse(content=payload, status_code=status_code) except Exception as exc: # noqa: BLE001 - logger.exception("ManagementServer._health_response error") + logger.exception("HttpControlChannel._health_response error") return JSONResponse(content={"status": "unhealthy", "detail": str(exc)}, status_code=503) async def _action_response( @@ -201,8 +226,8 @@ class ManagementServer: callback: Optional[Callable[[], Awaitable[str]]], ) -> JSONResponse: if callback is None: - logger.warning( - "ManagementServer._action_response result: action=%s status_code=404 detail=handler not configured", + logger.debug( + "HttpControlChannel._action_response result: action=%s status_code=404 detail=handler not configured", action, ) return JSONResponse( @@ -213,52 +238,53 @@ class ManagementServer: detail = await callback() if not detail: detail = f"{action} action accepted" - logger.warning( - "ManagementServer._action_response result: action=%s status_code=200 detail=%s", + logger.debug( + "HttpControlChannel._action_response result: action=%s status_code=200 detail=%s", action, detail, ) return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200) except Exception as exc: # noqa: BLE001 - logger.exception("ManagementServer._action_response error: action=%s", action) + logger.exception("HttpControlChannel._action_response error: action=%s", action) return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) - def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]: - async def _run() -> tuple[int, HealthPayload]: - response = await self._health_response() - body: Any = response.body - if isinstance(body, bytes): - body = json.loads(body.decode("utf-8")) - logger.warning( - "ManagementServer._build_health_response result: status_code=%s payload=%s", - response.status_code, - body, - ) - return response.status_code, body - - return _run() - - async def start(self) -> None: + async def start( + self, + on_start: StartHandler, + on_stop: StopHandler, + on_status: StatusHandler, + ) -> None: + self._on_start = on_start + self._on_stop = on_stop + self._on_status = on_status + self._app = self._create_app() try: await self._runner.start(self._app) - logger.warning("ManagementServer.start result: started") + logger.debug("HttpControlChannel.start result: started port=%s", self._runner.port) except Exception: # noqa: BLE001 - logger.exception("ManagementServer.start error") + logger.exception("HttpControlChannel.start error") raise async def stop(self) -> None: try: await self._runner.stop() - logger.warning("ManagementServer.stop result: stopped") + logger.debug("HttpControlChannel.stop result: stopped") except BaseException: # noqa: BLE001 - logger.exception("ManagementServer.stop error") + logger.exception("HttpControlChannel.stop error") raise @property def port(self) -> int: - result = self._runner.port - logger.warning("ManagementServer.port result: %s", result) - return result + return self._runner.port + def _build_health_response(self) -> Awaitable[tuple[int, Any]]: + """Для тестов: вернуть (status_code, payload) без запуска сервера.""" -HealthServer = ManagementServer + async def _run() -> tuple[int, Any]: + response = await self._health_response() + body: Any = response.body + if isinstance(body, bytes): + body = json.loads(body.decode("utf-8")) + return response.status_code, body + + return _run() diff --git a/src/config_manager/v2/core/__init__.py b/src/config_manager/v2/core/__init__.py index c44ab6a..e9d07d2 100644 --- a/src/config_manager/v2/core/__init__.py +++ b/src/config_manager/v2/core/__init__.py @@ -2,7 +2,9 @@ Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute().""" from .config_loader import ConfigLoader +from .control_bridge import ControlChannelBridge +from .health_aggregator import HealthAggregator from .manager import ConfigManagerV2 from .scheduler import WorkerLoop -__all__ = ["ConfigLoader", "ConfigManagerV2", "WorkerLoop"] +__all__ = ["ConfigLoader", "ConfigManagerV2", "ControlChannelBridge", "HealthAggregator", "WorkerLoop"] diff --git a/src/config_manager/v2/core/control_bridge.py b/src/config_manager/v2/core/control_bridge.py new file mode 100644 index 0000000..8950758 --- /dev/null +++ b/src/config_manager/v2/core/control_bridge.py @@ -0,0 +1,55 @@ +"""Адаптер между каналами управления и жизненным циклом менеджера. + +Предоставляет обработчики start/stop/status для ControlChannel (halt, state, status text).""" +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Awaitable, Callable + +from ..types import LifecycleState + +logger = logging.getLogger(__name__) + + +class ControlChannelBridge: + """Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram).""" + + def __init__( + self, + halt: asyncio.Event, + get_state: Callable[[], LifecycleState], + get_status: Callable[[], Awaitable[str]], + ): + self._halt = halt + self._get_state = get_state + self._get_status = get_status + logger.debug("ControlChannelBridge.__init__ result: callbacks configured") + + async def on_start(self) -> str: + """Обработать внешний start: сбросить halt; идемпотентно при уже running.""" + if self._get_state() == LifecycleState.RUNNING: + result = "already running" + logger.debug("ControlChannelBridge.on_start result: %s", result) + return result + self._halt.clear() + result = "start signal accepted" + logger.debug("ControlChannelBridge.on_start result: %s", result) + return result + + async def on_stop(self) -> str: + """Обработать внешний stop: установить halt.""" + self._halt.set() + result = "stop signal accepted" + logger.debug("ControlChannelBridge.on_stop result: %s", result) + return result + + async def on_status(self) -> str: + """Вернуть текущий текст статуса.""" + try: + result = await self._get_status() + except Exception: # noqa: BLE001 + logger.exception("ControlChannelBridge.on_status error") + raise + logger.debug("ControlChannelBridge.on_status result: %s", result) + return result diff --git a/src/config_manager/v2/management/health_aggregator.py b/src/config_manager/v2/core/health_aggregator.py similarity index 85% rename from src/config_manager/v2/management/health_aggregator.py rename to src/config_manager/v2/core/health_aggregator.py index 1bc6503..c17e035 100644 --- a/src/config_manager/v2/management/health_aggregator.py +++ b/src/config_manager/v2/core/health_aggregator.py @@ -28,7 +28,7 @@ class HealthAggregator: self._get_last_success_timestamp = get_last_success_timestamp self._health_timeout = health_timeout self._get_app_health = get_app_health - logger.warning("HealthAggregator.__init__ result: health_timeout=%s", self._health_timeout) + logger.debug("HealthAggregator.__init__ result: health_timeout=%s", self._health_timeout) async def collect(self) -> HealthPayload: """Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state.""" @@ -38,7 +38,7 @@ class HealthAggregator: # Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy. if state != LifecycleState.RUNNING: result = {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value} - logger.warning("HealthAggregator.collect result: %s", result) + logger.debug("HealthAggregator.collect result: %s", result) return result last_success = self._get_last_success_timestamp() @@ -47,21 +47,21 @@ class HealthAggregator: if last_success is None: detail = self._get_last_error() or "no successful run yet" result = {"status": "unhealthy", "detail": detail, "state": state_value} - logger.warning("HealthAggregator.collect result: %s", result) + logger.debug("HealthAggregator.collect result: %s", result) return result if (now - last_success) > self._health_timeout: detail = self._get_last_error() or f"no successful run within {self._health_timeout}s" result = {"status": "unhealthy", "detail": detail, "state": state_value} - logger.warning("HealthAggregator.collect result: %s", result) + logger.debug("HealthAggregator.collect result: %s", result) return result result = self._get_app_health() status = result.get("status", "unhealthy") if status != "ok": unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value} - logger.warning("HealthAggregator.collect result: %s", unhealthy) + logger.debug("HealthAggregator.collect result: %s", unhealthy) return unhealthy healthy = {**result, "state": state_value} - logger.warning("HealthAggregator.collect result: %s", healthy) + logger.debug("HealthAggregator.collect result: %s", healthy) return healthy diff --git a/src/config_manager/v2/core/manager.py b/src/config_manager/v2/core/manager.py index 212a349..05c400f 100644 --- a/src/config_manager/v2/core/manager.py +++ b/src/config_manager/v2/core/manager.py @@ -5,18 +5,15 @@ import asyncio import logging import os import time -from typing import Any, Optional +from typing import Any, Iterable, Optional from ...v1.log_manager import LogManager from ..control.base import ControlChannel -from ..management import ( - ControlChannelBridge, - HealthAggregator, - ManagementApiBridge, - ManagementServer, -) +from ..control.http_channel import HttpControlChannel from ..types import HealthPayload, LifecycleState, ManagementServerSettings from .config_loader import ConfigLoader +from .control_bridge import ControlChannelBridge +from .health_aggregator import HealthAggregator from .scheduler import WorkerLoop logger = logging.getLogger(__name__) @@ -48,7 +45,7 @@ class _RuntimeController: def _on_execute_success(self) -> None: self._last_success_timestamp = time.monotonic() self._last_execute_error = None - self.logger.warning( + self.logger.debug( "ConfigManagerV2._on_execute_success result: last_success_timestamp=%s", self._last_success_timestamp, ) @@ -56,7 +53,7 @@ class _RuntimeController: def _on_execute_error(self, exc: Exception) -> None: self._last_execute_error = str(exc) self.logger.exception("ConfigManagerV2._on_execute_error") - self.logger.warning( + self.logger.debug( "ConfigManagerV2._on_execute_error result: last_execute_error=%s", self._last_execute_error, ) @@ -75,7 +72,7 @@ class _RuntimeController: ) try: await worker.run() - self.logger.warning("ConfigManagerV2._worker_loop result: completed") + self.logger.debug("ConfigManagerV2._worker_loop result: completed") except Exception: # noqa: BLE001 self.logger.exception("ConfigManagerV2._worker_loop error") raise @@ -105,55 +102,35 @@ class _RuntimeController: detail = health.get("detail") if detail: status_text = f"state={self._state.value}; health={health['status']}; detail={detail}" - self.logger.warning("ConfigManagerV2._status_text result: %s", status_text) + self.logger.debug("ConfigManagerV2._status_text result: %s", status_text) return status_text status_text = f"state={self._state.value}; health={health['status']}" - self.logger.warning("ConfigManagerV2._status_text result: %s", status_text) + self.logger.debug("ConfigManagerV2._status_text result: %s", status_text) return status_text - async def _start_control_channel(self) -> None: - if self._control_channel is None: - self.logger.warning("ConfigManagerV2._start_control_channel result: no control channel") - return - try: - await self._control_channel.start( - self._control_bridge.on_start, - self._control_bridge.on_stop, - self._control_bridge.on_status, - ) - self.logger.warning("ConfigManagerV2._start_control_channel result: started") - except Exception: # noqa: BLE001 - self.logger.exception("ConfigManagerV2._start_control_channel error") + async def _start_control_channels(self) -> None: + for channel in self._control_channels: + try: + await channel.start( + self._control_bridge.on_start, + self._control_bridge.on_stop, + self._control_bridge.on_status, + ) + self.logger.debug("ConfigManagerV2._start_control_channels result: started channel=%s", type(channel).__name__) + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._start_control_channels error channel=%s", type(channel).__name__) - async def _stop_control_channel(self) -> None: - if self._control_channel is None: - self.logger.warning("ConfigManagerV2._stop_control_channel result: no control channel") - return - try: - await self._control_channel.stop() - self.logger.warning("ConfigManagerV2._stop_control_channel result: stopped") - except Exception: # noqa: BLE001 - self.logger.exception("ConfigManagerV2._stop_control_channel error") - - async def _start_management_server(self) -> None: - if self._management_server is None: - self.logger.warning("ConfigManagerV2._start_management_server result: disabled") - return - try: - await self._management_server.start() - self.logger.warning( - "ConfigManagerV2._start_management_server result: started port=%s", - self._management_server.port, - ) - except Exception: # noqa: BLE001 - self.logger.exception("ConfigManagerV2._start_management_server error") - self.logger.warning( - "ConfigManagerV2._start_management_server result: failed worker will continue", - ) + async def _stop_control_channels(self) -> None: + for channel in self._control_channels: + try: + await channel.stop() + self.logger.debug("ConfigManagerV2._stop_control_channels result: stopped channel=%s", type(channel).__name__) + except Exception: # noqa: BLE001 + self.logger.exception("ConfigManagerV2._stop_control_channels error channel=%s", type(channel).__name__) def _on_runtime_task_done(self, task: asyncio.Task) -> None: if task.cancelled(): - self.logger.warning("ConfigManagerV2._on_runtime_task_done result: cancelled") + self.logger.debug("ConfigManagerV2._on_runtime_task_done result: cancelled") return try: exc = task.exception() @@ -161,51 +138,50 @@ class _RuntimeController: self.logger.exception("ConfigManagerV2._on_runtime_task_done error while reading task exception") return if exc is None: - self.logger.warning("ConfigManagerV2._on_runtime_task_done result: completed") + self.logger.debug("ConfigManagerV2._on_runtime_task_done result: completed") return self.logger.error( "ConfigManagerV2 background task failed", exc_info=(type(exc), exc, exc.__traceback__), ) - self.logger.warning("ConfigManagerV2._on_runtime_task_done result: failed") + self.logger.debug("ConfigManagerV2._on_runtime_task_done result: failed") async def _run(self) -> None: self._state = LifecycleState.STARTING - self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value) + self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value) self._halt.clear() await self._update_config() - await self._start_management_server() - await self._start_control_channel() + await self._start_control_channels() self._state = LifecycleState.RUNNING - self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value) + self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value) tasks = [ asyncio.create_task(self._worker_loop(), name="v2-worker-loop"), asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"), ] try: await asyncio.gather(*tasks) - self.logger.warning("ConfigManagerV2._run result: background loops completed") + self.logger.debug("ConfigManagerV2._run result: background loops completed") except asyncio.CancelledError: - self.logger.warning("ConfigManagerV2._run result: cancelled") + self.logger.debug("ConfigManagerV2._run result: cancelled") raise except Exception: # noqa: BLE001 self.logger.exception("ConfigManagerV2._run error") raise finally: self._state = LifecycleState.STOPPING - self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value) + self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value) self._halt.set() for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) + await self._stop_control_channels() self._state = LifecycleState.STOPPED self._task = None - self.logger.warning( - "ConfigManagerV2._run result: state=%s api_and_control_available=%s", + self.logger.debug( + "ConfigManagerV2._run result: state=%s", self._state.value, - True, ) @@ -221,6 +197,7 @@ class ConfigManagerV2(_RuntimeController): log_manager: Optional[LogManager] = None, management_settings: Optional[ManagementServerSettings] = None, control_channel: Optional[ControlChannel] = None, + control_channels: Optional[Iterable[ControlChannel]] = None, ): self.logger = logging.getLogger(__name__) self.path = path @@ -230,7 +207,6 @@ class ConfigManagerV2(_RuntimeController): self._loader = ConfigLoader(path) self._log_manager = log_manager or LogManager() - self._control_channel = control_channel self._halt = asyncio.Event() self._task: Optional[asyncio.Task] = None self._loop: Optional[asyncio.AbstractEventLoop] = None @@ -239,7 +215,6 @@ class ConfigManagerV2(_RuntimeController): self._last_success_timestamp: Optional[float] = None settings = management_settings or ManagementServerSettings(enabled=True) - self._management_settings = settings self._health_timeout = settings.health_timeout self._health_aggregator = HealthAggregator( get_state=lambda: self._state, @@ -248,28 +223,32 @@ class ConfigManagerV2(_RuntimeController): health_timeout=self._health_timeout, get_app_health=self.get_health_status, ) - self._api_bridge = ManagementApiBridge(start_fn=self.start, stop_fn=self.stop) self._control_bridge = ControlChannelBridge( halt=self._halt, get_state=lambda: self._state, get_status=self._status_text, ) - self._management_server: Optional[ManagementServer] = None + channels: list[ControlChannel] = [] if settings.enabled: - self._management_server = ManagementServer( - host=settings.host, - port=settings.port, - timeout=settings.timeout, - health_provider=self._health_aggregator.collect, - on_start=self._api_bridge.on_start, - on_stop=self._api_bridge.on_stop, + channels.append( + HttpControlChannel( + host=settings.host, + port=settings.port, + timeout=settings.timeout, + health_provider=self._health_aggregator.collect, + ) ) - self.logger.warning( - "ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s management_enabled=%s", + if control_channels is not None: + channels.extend(control_channels) + if control_channel is not None: + channels.append(control_channel) + self._control_channels = channels + self.logger.debug( + "ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s control_channels=%s", self.path, self.update_interval, self.work_interval, - self._management_server is not None, + len(self._control_channels), ) def _apply_config(self, new_config: Any) -> None: @@ -280,7 +259,7 @@ class ConfigManagerV2(_RuntimeController): except Exception: # noqa: BLE001 self.logger.exception("ConfigManagerV2._apply_config error while applying logging config") raise - self.logger.warning( + self.logger.debug( "ConfigManagerV2._apply_config result: config_type=%s is_dict=%s", type(new_config).__name__, isinstance(new_config, dict), @@ -290,21 +269,21 @@ class ConfigManagerV2(_RuntimeController): try: changed, new_config = await self._loader.load_if_changed() if not changed: - self.logger.warning("ConfigManagerV2._update_config result: no changes") + self.logger.debug("ConfigManagerV2._update_config result: no changes") return self._apply_config(new_config) - self.logger.warning("ConfigManagerV2._update_config result: config updated") + self.logger.debug("ConfigManagerV2._update_config result: config updated") except Exception as exc: # noqa: BLE001 self.logger.exception("ConfigManagerV2._update_config error") if self._loader.last_valid_config is None: - self.logger.warning( + self.logger.debug( "ConfigManagerV2._update_config result: no fallback config available detail=%s", str(exc), ) return try: self._apply_config(self._loader.last_valid_config) - self.logger.warning( + self.logger.debug( "ConfigManagerV2._update_config result: fallback to last valid config applied", ) except Exception: # noqa: BLE001 @@ -318,7 +297,7 @@ class ConfigManagerV2(_RuntimeController): async def start(self) -> None: if self._task is not None and not self._task.done(): - self.logger.warning("ConfigManagerV2.start result: already running") + self.logger.debug("ConfigManagerV2.start result: already running") return try: self._loop = asyncio.get_running_loop() @@ -327,22 +306,22 @@ class ConfigManagerV2(_RuntimeController): raise self._task = asyncio.create_task(self._run(), name="config-manager-v2") self._task.add_done_callback(self._on_runtime_task_done) - self.logger.warning("ConfigManagerV2.start result: background task started") + self.logger.debug("ConfigManagerV2.start result: background task started") async def stop(self) -> None: if self._task is None: - self.logger.warning("ConfigManagerV2.stop result: not running") + self.logger.debug("ConfigManagerV2.stop result: not running") return self._halt.set() if asyncio.current_task() is self._task: - self.logger.warning("ConfigManagerV2.stop result: stop requested from runtime task") + self.logger.debug("ConfigManagerV2.stop result: stop requested from runtime task") return try: await self._task except asyncio.CancelledError: - self.logger.warning("ConfigManagerV2.stop result: runtime task cancelled") + self.logger.debug("ConfigManagerV2.stop result: runtime task cancelled") except Exception: # noqa: BLE001 self.logger.exception("ConfigManagerV2.stop error while awaiting runtime task") raise finally: - self.logger.warning("ConfigManagerV2.stop result: completed") + self.logger.debug("ConfigManagerV2.stop result: completed") diff --git a/src/config_manager/v2/management/__init__.py b/src/config_manager/v2/management/__init__.py index 8af990a..af724ac 100644 --- a/src/config_manager/v2/management/__init__.py +++ b/src/config_manager/v2/management/__init__.py @@ -1,14 +1,13 @@ -"""Management API: HTTP-сервер для /health и /actions/start|stop, адаптеры и сбор здоровья. +"""Re-exports для обратной совместимости: HealthAggregator, ControlChannelBridge, ManagementServer/HealthServer (HttpControlChannel).""" +from ..control import HttpControlChannel +from ..core import ControlChannelBridge, HealthAggregator -Объединяет сервер, мосты к жизненному циклу и агрегатор здоровья для единого контракта API.""" -from .bridges import ControlChannelBridge, ManagementApiBridge -from .health_aggregator import HealthAggregator -from .management_server import HealthServer, ManagementServer +ManagementServer = HttpControlChannel +HealthServer = HttpControlChannel __all__ = [ "ControlChannelBridge", "HealthAggregator", "HealthServer", - "ManagementApiBridge", "ManagementServer", ] diff --git a/src/config_manager/v2/management/bridges.py b/src/config_manager/v2/management/bridges.py deleted file mode 100644 index a21c5a7..0000000 --- a/src/config_manager/v2/management/bridges.py +++ /dev/null @@ -1,90 +0,0 @@ -"""Адаптеры, связывающие жизненный цикл менеджера с HTTP API и каналами управления. - -ManagementApiBridge отдаёт start/stop в HTTP; ControlChannelBridge — start/stop/status в Telegram и др.""" -from __future__ import annotations - -import asyncio -import logging -from collections.abc import Awaitable, Callable - -from ..types import LifecycleState - -logger = logging.getLogger(__name__) - - -class ManagementApiBridge: - """Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop).""" - - def __init__( - self, - start_fn: Callable[[], Awaitable[None]], - stop_fn: Callable[[], Awaitable[None]], - ): - self._start_fn = start_fn - self._stop_fn = stop_fn - logger.warning("ManagementApiBridge.__init__ result: callbacks configured") - - async def on_start(self) -> str: - """Выполнить start и вернуть сообщение для HTTP-ответа.""" - try: - await self._start_fn() - except Exception: # noqa: BLE001 - logger.exception("ManagementApiBridge.on_start error") - raise - result = "start completed" - logger.warning("ManagementApiBridge.on_start result: %s", result) - return result - - async def on_stop(self) -> str: - """Выполнить stop и вернуть сообщение для HTTP-ответа.""" - try: - await self._stop_fn() - except Exception: # noqa: BLE001 - logger.exception("ManagementApiBridge.on_stop error") - raise - result = "stop completed" - logger.warning("ManagementApiBridge.on_stop result: %s", result) - return result - - -class ControlChannelBridge: - """Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram).""" - - def __init__( - self, - halt: asyncio.Event, - get_state: Callable[[], LifecycleState], - get_status: Callable[[], Awaitable[str]], - ): - self._halt = halt - self._get_state = get_state - self._get_status = get_status - logger.warning("ControlChannelBridge.__init__ result: callbacks configured") - - async def on_start(self) -> str: - """Обработать внешний start: сбросить halt; идемпотентно при уже running.""" - if self._get_state() == LifecycleState.RUNNING: - result = "already running" - logger.warning("ControlChannelBridge.on_start result: %s", result) - return result - self._halt.clear() - result = "start signal accepted" - logger.warning("ControlChannelBridge.on_start result: %s", result) - return result - - async def on_stop(self) -> str: - """Обработать внешний stop: установить halt.""" - self._halt.set() - result = "stop signal accepted" - logger.warning("ControlChannelBridge.on_stop result: %s", result) - return result - - async def on_status(self) -> str: - """Вернуть текущий текст статуса.""" - try: - result = await self._get_status() - except Exception: # noqa: BLE001 - logger.exception("ControlChannelBridge.on_status error") - raise - logger.warning("ControlChannelBridge.on_status result: %s", result) - return result diff --git a/tests/v2/test_health_endpoint.py b/tests/v2/test_health_endpoint.py index ce034e9..05e1344 100644 --- a/tests/v2/test_health_endpoint.py +++ b/tests/v2/test_health_endpoint.py @@ -54,6 +54,9 @@ def test_action_routes_call_callbacks(): events.append("stop") return "stop accepted" + async def on_status() -> str: + return "ok" + async def request(port: int, path: str) -> tuple[int, dict[str, str]]: reader, writer = await asyncio.open_connection("127.0.0.1", port) writer.write( @@ -75,10 +78,8 @@ def test_action_routes_call_callbacks(): port=0, timeout=0.2, health_provider=provider, - on_start=on_start, - on_stop=on_stop, ) - await server.start() + await server.start(on_start, on_stop, on_status) try: port = server.port assert port > 0