Рефакторинг канала управления

This commit is contained in:
2026-02-26 20:26:22 +03:00
parent aee8f7460f
commit 7b5d6d2156
10 changed files with 380 additions and 260 deletions

169
README.md
View File

@@ -1,20 +1,167 @@
# Config Manager # Config Manager
## Description ## Описание
This package was created to run my applications. Пакет предназначен для запуска приложений.
The ConfigManager class implements the entry point for the program and provides the actual application configuration. It also simplifies logging setup. Класс ConfigManager реализует точку входа программы и предоставляет актуальную конфигурацию приложения, а также упрощает настройку логирования.
## Logging (v2) ## ConfigManager v2: устройство и взаимосвязи
Logging is configured from the config file only if it contains a **`log`** section in [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig) format. If there is no `log` section, the manager logs a warning and the default Python level (WARNING) remains, so INFO/DEBUG messages may not appear.
**How to verify that logging config is applied:** **ConfigManager v2** — точка входа приложения. Он наследует внутреннюю логику от **\_RuntimeController** (циклы воркера и обновления конфига, запуск/остановка каналов управления).
- Ensure your config file path is correct and the file is loaded on startup (no error in logs about reading config).
- Ensure the config has a `log` key with `version: 1`, `handlers`, and `loggers` (see `tests/config.yaml` for an example).
- After startup you should see an INFO message: `"Logging configuration applied"` (from `config_manager.v1.log_manager`). If you do not see it, either the `log` section is missing (you will see a warning) or the root/package log level is above INFO.
## Installation **Ядро (core):**
- **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг.
- **WorkerLoop** — в отдельном потоке циклически вызывает ваш метод `execute()` с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки.
- **LogManager** (v1) — применяет секцию `log` из конфига к логированию (dictConfig).
- **HealthAggregator** — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного `execute()` и таймаут здоровья; формирует единый ответ для health (ok/unhealthy).
- **ControlChannelBridge** — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса).
**Каналы управления (control):**
- **ControlChannel** — абстрактный контракт: `start(on_start, on_stop, on_status)`, `stop()`.
- **HttpControlChannel** — HTTP API (`/health`, `/actions/start`, `/actions/stop`, `/actions/status`); использует **UvicornServerRunner**; для `/health` вызывает **HealthAggregator.collect()**, для действий — переданные обработчики из **ControlChannelBridge**.
- **TelegramControlChannel** — реализация через long polling Telegram; команды `/start`, `/stop`, `/status` вызывают переданные обработчики.
**Поток работы:** при `start()` менеджер собирает список каналов (при `management_settings.enabled`**HttpControlChannel**, плюс опционально **control_channel** / **control_channels**), поднимает все каналы с одним **ControlChannelBridge**, затем запускает два цикла: **WorkerLoop** и периодическое обновление конфига через **ConfigLoader**. Остановка по halt (через любой канал) завершает оба цикла; в конце останавливаются все каналы.
## Диаграмма классов (v1 и v2)
```mermaid
classDiagram
direction TB
class ConfigManager {
+str path
+Any config
+float update_interval
+float work_interval
-Event _halt
-Task _task
+start() async
+stop() async
+execute()*
-_worker_loop() async
-_periodic_update_loop() async
-_update_config() async
}
class ConfigManagerV2 {
+str path
+Any config
+float update_interval
+float work_interval
-ConfigLoader _loader
-LifecycleState _state
+start() async
+stop() async
+execute()*
+get_health_status() HealthPayload
-_run() async
-_worker_loop() async
-_periodic_update_loop() async
}
class _RuntimeController {
<<внутренний>>
-_on_execute_success()
-_on_execute_error(exc)
-_worker_loop() async
-_periodic_update_loop() async
-_start_control_channels() async
-_stop_control_channels() async
-_run() async
}
class ConfigLoader {
+str path
+Any config
+Any last_valid_config
+load_if_changed() async
+parse_config(data) Any
-_read_file_sync() str
-read_file_async() async
}
class WorkerLoop {
-Callable execute
-Callable get_interval
-Event halt_event
+run() async
}
class LogManager {
+apply_config(config) None
}
class ControlChannel {
<<абстрактный>>
+start(on_start, on_stop, on_status) async*
+stop() async*
}
class TelegramControlChannel {
-str _token
-int _chat_id
+start(on_start, on_stop, on_status) async
+stop() async
-_poll_loop() async
}
class HttpControlChannel {
-UvicornServerRunner _runner
-Callable _health_provider
+start(on_start, on_stop, on_status) async
+stop() async
+int port
}
class HealthAggregator {
-Callable get_state
-Callable get_app_health
+collect() async HealthPayload
}
class ControlChannelBridge {
-Event _halt
-Callable _get_state
-Callable _get_status
+on_start() async str
+on_stop() async str
+on_status() async str
}
class UvicornServerRunner {
-Server _server
-Task _serve_task
+start(app) async
+stop() async
+int port
}
ConfigManager --> LogManager : использует
ConfigManagerV2 --|> _RuntimeController : наследует
ConfigManagerV2 --> ConfigLoader : использует
ConfigManagerV2 --> LogManager : использует
ConfigManagerV2 --> HealthAggregator : использует
ConfigManagerV2 --> ControlChannelBridge : использует
ConfigManagerV2 ..> ControlChannel : список каналов
_RuntimeController ..> WorkerLoop : создаёт в _worker_loop
TelegramControlChannel --|> ControlChannel : реализует
HttpControlChannel --|> ControlChannel : реализует
HttpControlChannel --> UvicornServerRunner : использует
HttpControlChannel ..> HealthAggregator : health_provider
ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status
```
## Логирование (v2)
Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться.
**Как проверить, что конфигурация логирования применилась:**
- Убедитесь, что путь к файлу конфига верный и файл загружается при старте (в логах нет ошибки чтения конфига).
- Убедитесь, что в конфиге есть ключ `log` с `version: 1`, `handlers` и `loggers` (пример — `tests/config.yaml`).
- После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v1.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO.
## Установка
``pip install git+https://git.lesha.spb.ru/alex/config_manager.git`` ``pip install git+https://git.lesha.spb.ru/alex/config_manager.git``
## Contacts ## Контакты
- **e-mail**: lesha.spb@gmail.com - **e-mail**: lesha.spb@gmail.com
- **telegram**: https://t.me/lesha_spb - **telegram**: https://t.me/lesha_spb

View File

@@ -1,7 +1,8 @@
"""Каналы внешнего управления: абстракция и реализация (например, Telegram). """Каналы внешнего управления: абстракция и реализация (HTTP, Telegram).
Позволяет запускать, останавливать и запрашивать статус менеджера через ботов и другие интерфейсы.""" Позволяет запускать, останавливать и запрашивать статус менеджера через HTTP API и ботов."""
from .base import ControlChannel from .base import ControlChannel
from .http_channel import HttpControlChannel
from .telegram import TelegramControlChannel from .telegram import TelegramControlChannel
__all__ = ["ControlChannel", "TelegramControlChannel"] __all__ = ["ControlChannel", "HttpControlChannel", "TelegramControlChannel"]

View File

@@ -1,21 +1,25 @@
"""Management HTTP API with /health, /actions/start and /actions/stop.""" """HTTP-канал управления: /health, /actions/start, /actions/stop (реализация ControlChannel)."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import json import json
import logging import logging
import time
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from typing import Any, Optional from typing import Any, Optional
from fastapi import FastAPI from fastapi import FastAPI
from fastapi import Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from uvicorn import Config, Server from uvicorn import Config, Server
from ..types import HealthPayload from ..types import HealthPayload
from .base import ControlChannel, StartHandler, StatusHandler, StopHandler
PATH_HEALTH = "/health" PATH_HEALTH = "/health"
PATH_ACTION_START = "/actions/start" PATH_ACTION_START = "/actions/start"
PATH_ACTION_STOP = "/actions/stop" PATH_ACTION_STOP = "/actions/stop"
PATH_ACTION_STATUS = "/actions/status"
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -29,7 +33,7 @@ class UvicornServerRunner:
self._server: Optional[Server] = None self._server: Optional[Server] = None
self._serve_task: Optional[asyncio.Task[None]] = None self._serve_task: Optional[asyncio.Task[None]] = None
self._bound_port: Optional[int] = None self._bound_port: Optional[int] = None
logger.warning( logger.debug(
"UvicornServerRunner.__init__ result: host=%s port=%s timeout=%s", "UvicornServerRunner.__init__ result: host=%s port=%s timeout=%s",
self._host, self._host,
self._port, self._port,
@@ -83,21 +87,21 @@ class UvicornServerRunner:
self._server = None self._server = None
self._serve_task = None self._serve_task = None
self._bound_port = None self._bound_port = None
logger.warning("UvicornServerRunner._cleanup_start_failure result: state reset") logger.debug("UvicornServerRunner._cleanup_start_failure result: state reset")
async def start(self, app: FastAPI) -> None: async def start(self, app: FastAPI) -> None:
if self._serve_task is not None and not self._serve_task.done(): if self._serve_task is not None and not self._serve_task.done():
logger.warning("UvicornServerRunner.start result: already running") logger.debug("UvicornServerRunner.start result: already running")
return return
if self._serve_task is not None and self._serve_task.done(): if self._serve_task is not None and self._serve_task.done():
self._serve_task = None self._serve_task = None
try: try:
config = Config(app=app, host=self._host, port=self._port, log_level="warning") config = Config(app=app, host=self._host, port=self._port, log_level="warning")
self._server = Server(config) self._server = Server(config)
self._serve_task = asyncio.create_task(self._server.serve(), name="management-server-serve") self._serve_task = asyncio.create_task(self._server.serve(), name="http-control-channel-serve")
await self._wait_until_started() await self._wait_until_started()
self._bound_port = self._resolve_bound_port() self._bound_port = self._resolve_bound_port()
logger.warning( logger.debug(
"UvicornServerRunner.start result: running host=%s requested_port=%s bound_port=%s", "UvicornServerRunner.start result: running host=%s requested_port=%s bound_port=%s",
self._host, self._host,
self._port, self._port,
@@ -110,7 +114,7 @@ class UvicornServerRunner:
async def stop(self) -> None: async def stop(self) -> None:
if self._server is None or self._serve_task is None: if self._server is None or self._serve_task is None:
logger.warning("UvicornServerRunner.stop result: already stopped") logger.debug("UvicornServerRunner.stop result: already stopped")
return return
self._server.should_exit = True self._server.should_exit = True
try: try:
@@ -122,17 +126,17 @@ class UvicornServerRunner:
self._server = None self._server = None
self._serve_task = None self._serve_task = None
self._bound_port = None self._bound_port = None
logger.warning("UvicornServerRunner.stop result: stopped") logger.debug("UvicornServerRunner.stop result: stopped")
@property @property
def port(self) -> int: def port(self) -> int:
result = self._bound_port if self._bound_port is not None else self._port result = self._bound_port if self._bound_port is not None else self._port
logger.warning("UvicornServerRunner.port result: %s", result) logger.debug("UvicornServerRunner.port result: %s", result)
return result return result
class ManagementServer: class HttpControlChannel(ControlChannel):
"""Management API endpoints and callback adapters.""" """HTTP API как канал управления: /health, /actions/start, /actions/stop, /actions/status."""
def __init__( def __init__(
self, self,
@@ -140,17 +144,18 @@ class ManagementServer:
port: int, port: int,
timeout: int, timeout: int,
health_provider: Callable[[], Awaitable[HealthPayload]], health_provider: Callable[[], Awaitable[HealthPayload]],
on_start: Optional[Callable[[], Awaitable[str]]] = None,
on_stop: Optional[Callable[[], Awaitable[str]]] = None,
): ):
self._host = host
self._port = port
self._timeout = timeout self._timeout = timeout
self._health_provider = health_provider self._health_provider = health_provider
self._on_start = on_start self._on_start: Optional[StartHandler] = None
self._on_stop = on_stop self._on_stop: Optional[StopHandler] = None
self._on_status: Optional[StatusHandler] = None
self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout) self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout)
self._app = self._create_app() self._app: Optional[FastAPI] = None
logger.warning( logger.debug(
"ManagementServer.__init__ result: host=%s port=%s timeout=%s", "HttpControlChannel.__init__ result: host=%s port=%s timeout=%s",
host, host,
port, port,
timeout, timeout,
@@ -159,6 +164,20 @@ class ManagementServer:
def _create_app(self) -> FastAPI: def _create_app(self) -> FastAPI:
app = FastAPI(title="Config Manager Management API") app = FastAPI(title="Config Manager Management API")
@app.middleware("http")
async def log_api_call(request: Request, call_next): # type: ignore[no-untyped-def]
started = time.monotonic()
response = await call_next(request)
duration_ms = int((time.monotonic() - started) * 1000)
logger.info(
"API call: method=%s path=%s status=%s duration_ms=%s",
request.method,
request.url.path,
response.status_code,
duration_ms,
)
return response
@app.get(PATH_HEALTH) @app.get(PATH_HEALTH)
async def health() -> JSONResponse: async def health() -> JSONResponse:
return await self._health_response() return await self._health_response()
@@ -173,11 +192,17 @@ class ManagementServer:
async def action_stop() -> JSONResponse: async def action_stop() -> JSONResponse:
return await self._action_response("stop", self._on_stop) return await self._action_response("stop", self._on_stop)
logger.warning( @app.get(PATH_ACTION_STATUS)
"ManagementServer._create_app result: routes=%s,%s,%s", @app.post(PATH_ACTION_STATUS)
async def action_status() -> JSONResponse:
return await self._action_response("status", self._on_status)
logger.debug(
"HttpControlChannel._create_app result: routes=%s,%s,%s,%s",
PATH_HEALTH, PATH_HEALTH,
PATH_ACTION_START, PATH_ACTION_START,
PATH_ACTION_STOP, PATH_ACTION_STOP,
PATH_ACTION_STATUS,
) )
return app return app
@@ -185,14 +210,14 @@ class ManagementServer:
try: try:
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout) payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
status_code = 200 if payload.get("status", "unhealthy") == "ok" else 503 status_code = 200 if payload.get("status", "unhealthy") == "ok" else 503
logger.warning( logger.debug(
"ManagementServer._health_response result: status_code=%s payload=%s", "HttpControlChannel._health_response result: status_code=%s payload=%s",
status_code, status_code,
payload, payload,
) )
return JSONResponse(content=payload, status_code=status_code) return JSONResponse(content=payload, status_code=status_code)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
logger.exception("ManagementServer._health_response error") logger.exception("HttpControlChannel._health_response error")
return JSONResponse(content={"status": "unhealthy", "detail": str(exc)}, status_code=503) return JSONResponse(content={"status": "unhealthy", "detail": str(exc)}, status_code=503)
async def _action_response( async def _action_response(
@@ -201,8 +226,8 @@ class ManagementServer:
callback: Optional[Callable[[], Awaitable[str]]], callback: Optional[Callable[[], Awaitable[str]]],
) -> JSONResponse: ) -> JSONResponse:
if callback is None: if callback is None:
logger.warning( logger.debug(
"ManagementServer._action_response result: action=%s status_code=404 detail=handler not configured", "HttpControlChannel._action_response result: action=%s status_code=404 detail=handler not configured",
action, action,
) )
return JSONResponse( return JSONResponse(
@@ -213,52 +238,53 @@ class ManagementServer:
detail = await callback() detail = await callback()
if not detail: if not detail:
detail = f"{action} action accepted" detail = f"{action} action accepted"
logger.warning( logger.debug(
"ManagementServer._action_response result: action=%s status_code=200 detail=%s", "HttpControlChannel._action_response result: action=%s status_code=200 detail=%s",
action, action,
detail, detail,
) )
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200) return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
logger.exception("ManagementServer._action_response error: action=%s", action) logger.exception("HttpControlChannel._action_response error: action=%s", action)
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]: async def start(
async def _run() -> tuple[int, HealthPayload]: self,
response = await self._health_response() on_start: StartHandler,
body: Any = response.body on_stop: StopHandler,
if isinstance(body, bytes): on_status: StatusHandler,
body = json.loads(body.decode("utf-8")) ) -> None:
logger.warning( self._on_start = on_start
"ManagementServer._build_health_response result: status_code=%s payload=%s", self._on_stop = on_stop
response.status_code, self._on_status = on_status
body, self._app = self._create_app()
)
return response.status_code, body
return _run()
async def start(self) -> None:
try: try:
await self._runner.start(self._app) await self._runner.start(self._app)
logger.warning("ManagementServer.start result: started") logger.debug("HttpControlChannel.start result: started port=%s", self._runner.port)
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
logger.exception("ManagementServer.start error") logger.exception("HttpControlChannel.start error")
raise raise
async def stop(self) -> None: async def stop(self) -> None:
try: try:
await self._runner.stop() await self._runner.stop()
logger.warning("ManagementServer.stop result: stopped") logger.debug("HttpControlChannel.stop result: stopped")
except BaseException: # noqa: BLE001 except BaseException: # noqa: BLE001
logger.exception("ManagementServer.stop error") logger.exception("HttpControlChannel.stop error")
raise raise
@property @property
def port(self) -> int: def port(self) -> int:
result = self._runner.port return self._runner.port
logger.warning("ManagementServer.port result: %s", result)
return result
def _build_health_response(self) -> Awaitable[tuple[int, Any]]:
"""Для тестов: вернуть (status_code, payload) без запуска сервера."""
HealthServer = ManagementServer async def _run() -> tuple[int, Any]:
response = await self._health_response()
body: Any = response.body
if isinstance(body, bytes):
body = json.loads(body.decode("utf-8"))
return response.status_code, body
return _run()

View File

@@ -2,7 +2,9 @@
Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute().""" Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute()."""
from .config_loader import ConfigLoader from .config_loader import ConfigLoader
from .control_bridge import ControlChannelBridge
from .health_aggregator import HealthAggregator
from .manager import ConfigManagerV2 from .manager import ConfigManagerV2
from .scheduler import WorkerLoop from .scheduler import WorkerLoop
__all__ = ["ConfigLoader", "ConfigManagerV2", "WorkerLoop"] __all__ = ["ConfigLoader", "ConfigManagerV2", "ControlChannelBridge", "HealthAggregator", "WorkerLoop"]

View File

@@ -0,0 +1,55 @@
"""Адаптер между каналами управления и жизненным циклом менеджера.
Предоставляет обработчики start/stop/status для ControlChannel (halt, state, status text)."""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Awaitable, Callable
from ..types import LifecycleState
logger = logging.getLogger(__name__)
class ControlChannelBridge:
"""Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram)."""
def __init__(
self,
halt: asyncio.Event,
get_state: Callable[[], LifecycleState],
get_status: Callable[[], Awaitable[str]],
):
self._halt = halt
self._get_state = get_state
self._get_status = get_status
logger.debug("ControlChannelBridge.__init__ result: callbacks configured")
async def on_start(self) -> str:
"""Обработать внешний start: сбросить halt; идемпотентно при уже running."""
if self._get_state() == LifecycleState.RUNNING:
result = "already running"
logger.debug("ControlChannelBridge.on_start result: %s", result)
return result
self._halt.clear()
result = "start signal accepted"
logger.debug("ControlChannelBridge.on_start result: %s", result)
return result
async def on_stop(self) -> str:
"""Обработать внешний stop: установить halt."""
self._halt.set()
result = "stop signal accepted"
logger.debug("ControlChannelBridge.on_stop result: %s", result)
return result
async def on_status(self) -> str:
"""Вернуть текущий текст статуса."""
try:
result = await self._get_status()
except Exception: # noqa: BLE001
logger.exception("ControlChannelBridge.on_status error")
raise
logger.debug("ControlChannelBridge.on_status result: %s", result)
return result

View File

@@ -28,7 +28,7 @@ class HealthAggregator:
self._get_last_success_timestamp = get_last_success_timestamp self._get_last_success_timestamp = get_last_success_timestamp
self._health_timeout = health_timeout self._health_timeout = health_timeout
self._get_app_health = get_app_health self._get_app_health = get_app_health
logger.warning("HealthAggregator.__init__ result: health_timeout=%s", self._health_timeout) logger.debug("HealthAggregator.__init__ result: health_timeout=%s", self._health_timeout)
async def collect(self) -> HealthPayload: async def collect(self) -> HealthPayload:
"""Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state.""" """Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state."""
@@ -38,7 +38,7 @@ class HealthAggregator:
# Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy. # Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy.
if state != LifecycleState.RUNNING: if state != LifecycleState.RUNNING:
result = {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value} result = {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result) logger.debug("HealthAggregator.collect result: %s", result)
return result return result
last_success = self._get_last_success_timestamp() last_success = self._get_last_success_timestamp()
@@ -47,21 +47,21 @@ class HealthAggregator:
if last_success is None: if last_success is None:
detail = self._get_last_error() or "no successful run yet" detail = self._get_last_error() or "no successful run yet"
result = {"status": "unhealthy", "detail": detail, "state": state_value} result = {"status": "unhealthy", "detail": detail, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result) logger.debug("HealthAggregator.collect result: %s", result)
return result return result
if (now - last_success) > self._health_timeout: if (now - last_success) > self._health_timeout:
detail = self._get_last_error() or f"no successful run within {self._health_timeout}s" detail = self._get_last_error() or f"no successful run within {self._health_timeout}s"
result = {"status": "unhealthy", "detail": detail, "state": state_value} result = {"status": "unhealthy", "detail": detail, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result) logger.debug("HealthAggregator.collect result: %s", result)
return result return result
result = self._get_app_health() result = self._get_app_health()
status = result.get("status", "unhealthy") status = result.get("status", "unhealthy")
if status != "ok": if status != "ok":
unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value} unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value}
logger.warning("HealthAggregator.collect result: %s", unhealthy) logger.debug("HealthAggregator.collect result: %s", unhealthy)
return unhealthy return unhealthy
healthy = {**result, "state": state_value} healthy = {**result, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", healthy) logger.debug("HealthAggregator.collect result: %s", healthy)
return healthy return healthy

View File

@@ -5,18 +5,15 @@ import asyncio
import logging import logging
import os import os
import time import time
from typing import Any, Optional from typing import Any, Iterable, Optional
from ...v1.log_manager import LogManager from ...v1.log_manager import LogManager
from ..control.base import ControlChannel from ..control.base import ControlChannel
from ..management import ( from ..control.http_channel import HttpControlChannel
ControlChannelBridge,
HealthAggregator,
ManagementApiBridge,
ManagementServer,
)
from ..types import HealthPayload, LifecycleState, ManagementServerSettings from ..types import HealthPayload, LifecycleState, ManagementServerSettings
from .config_loader import ConfigLoader from .config_loader import ConfigLoader
from .control_bridge import ControlChannelBridge
from .health_aggregator import HealthAggregator
from .scheduler import WorkerLoop from .scheduler import WorkerLoop
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -48,7 +45,7 @@ class _RuntimeController:
def _on_execute_success(self) -> None: def _on_execute_success(self) -> None:
self._last_success_timestamp = time.monotonic() self._last_success_timestamp = time.monotonic()
self._last_execute_error = None self._last_execute_error = None
self.logger.warning( self.logger.debug(
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s", "ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
self._last_success_timestamp, self._last_success_timestamp,
) )
@@ -56,7 +53,7 @@ class _RuntimeController:
def _on_execute_error(self, exc: Exception) -> None: def _on_execute_error(self, exc: Exception) -> None:
self._last_execute_error = str(exc) self._last_execute_error = str(exc)
self.logger.exception("ConfigManagerV2._on_execute_error") self.logger.exception("ConfigManagerV2._on_execute_error")
self.logger.warning( self.logger.debug(
"ConfigManagerV2._on_execute_error result: last_execute_error=%s", "ConfigManagerV2._on_execute_error result: last_execute_error=%s",
self._last_execute_error, self._last_execute_error,
) )
@@ -75,7 +72,7 @@ class _RuntimeController:
) )
try: try:
await worker.run() await worker.run()
self.logger.warning("ConfigManagerV2._worker_loop result: completed") self.logger.debug("ConfigManagerV2._worker_loop result: completed")
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._worker_loop error") self.logger.exception("ConfigManagerV2._worker_loop error")
raise raise
@@ -105,55 +102,35 @@ class _RuntimeController:
detail = health.get("detail") detail = health.get("detail")
if detail: if detail:
status_text = f"state={self._state.value}; health={health['status']}; detail={detail}" status_text = f"state={self._state.value}; health={health['status']}; detail={detail}"
self.logger.warning("ConfigManagerV2._status_text result: %s", status_text) self.logger.debug("ConfigManagerV2._status_text result: %s", status_text)
return status_text return status_text
status_text = f"state={self._state.value}; health={health['status']}" status_text = f"state={self._state.value}; health={health['status']}"
self.logger.warning("ConfigManagerV2._status_text result: %s", status_text) self.logger.debug("ConfigManagerV2._status_text result: %s", status_text)
return status_text return status_text
async def _start_control_channel(self) -> None: async def _start_control_channels(self) -> None:
if self._control_channel is None: for channel in self._control_channels:
self.logger.warning("ConfigManagerV2._start_control_channel result: no control channel") try:
return await channel.start(
try: self._control_bridge.on_start,
await self._control_channel.start( self._control_bridge.on_stop,
self._control_bridge.on_start, self._control_bridge.on_status,
self._control_bridge.on_stop, )
self._control_bridge.on_status, self.logger.debug("ConfigManagerV2._start_control_channels result: started channel=%s", type(channel).__name__)
) except Exception: # noqa: BLE001
self.logger.warning("ConfigManagerV2._start_control_channel result: started") self.logger.exception("ConfigManagerV2._start_control_channels error channel=%s", type(channel).__name__)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._start_control_channel error")
async def _stop_control_channel(self) -> None: async def _stop_control_channels(self) -> None:
if self._control_channel is None: for channel in self._control_channels:
self.logger.warning("ConfigManagerV2._stop_control_channel result: no control channel") try:
return await channel.stop()
try: self.logger.debug("ConfigManagerV2._stop_control_channels result: stopped channel=%s", type(channel).__name__)
await self._control_channel.stop() except Exception: # noqa: BLE001
self.logger.warning("ConfigManagerV2._stop_control_channel result: stopped") self.logger.exception("ConfigManagerV2._stop_control_channels error channel=%s", type(channel).__name__)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._stop_control_channel error")
async def _start_management_server(self) -> None:
if self._management_server is None:
self.logger.warning("ConfigManagerV2._start_management_server result: disabled")
return
try:
await self._management_server.start()
self.logger.warning(
"ConfigManagerV2._start_management_server result: started port=%s",
self._management_server.port,
)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._start_management_server error")
self.logger.warning(
"ConfigManagerV2._start_management_server result: failed worker will continue",
)
def _on_runtime_task_done(self, task: asyncio.Task) -> None: def _on_runtime_task_done(self, task: asyncio.Task) -> None:
if task.cancelled(): if task.cancelled():
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: cancelled") self.logger.debug("ConfigManagerV2._on_runtime_task_done result: cancelled")
return return
try: try:
exc = task.exception() exc = task.exception()
@@ -161,51 +138,50 @@ class _RuntimeController:
self.logger.exception("ConfigManagerV2._on_runtime_task_done error while reading task exception") self.logger.exception("ConfigManagerV2._on_runtime_task_done error while reading task exception")
return return
if exc is None: if exc is None:
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: completed") self.logger.debug("ConfigManagerV2._on_runtime_task_done result: completed")
return return
self.logger.error( self.logger.error(
"ConfigManagerV2 background task failed", "ConfigManagerV2 background task failed",
exc_info=(type(exc), exc, exc.__traceback__), exc_info=(type(exc), exc, exc.__traceback__),
) )
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: failed") self.logger.debug("ConfigManagerV2._on_runtime_task_done result: failed")
async def _run(self) -> None: async def _run(self) -> None:
self._state = LifecycleState.STARTING self._state = LifecycleState.STARTING
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value) self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
self._halt.clear() self._halt.clear()
await self._update_config() await self._update_config()
await self._start_management_server() await self._start_control_channels()
await self._start_control_channel()
self._state = LifecycleState.RUNNING self._state = LifecycleState.RUNNING
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value) self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
tasks = [ tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"), asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"), asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
] ]
try: try:
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
self.logger.warning("ConfigManagerV2._run result: background loops completed") self.logger.debug("ConfigManagerV2._run result: background loops completed")
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.warning("ConfigManagerV2._run result: cancelled") self.logger.debug("ConfigManagerV2._run result: cancelled")
raise raise
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._run error") self.logger.exception("ConfigManagerV2._run error")
raise raise
finally: finally:
self._state = LifecycleState.STOPPING self._state = LifecycleState.STOPPING
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value) self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
self._halt.set() self._halt.set()
for task in tasks: for task in tasks:
task.cancel() task.cancel()
await asyncio.gather(*tasks, return_exceptions=True) await asyncio.gather(*tasks, return_exceptions=True)
await self._stop_control_channels()
self._state = LifecycleState.STOPPED self._state = LifecycleState.STOPPED
self._task = None self._task = None
self.logger.warning( self.logger.debug(
"ConfigManagerV2._run result: state=%s api_and_control_available=%s", "ConfigManagerV2._run result: state=%s",
self._state.value, self._state.value,
True,
) )
@@ -221,6 +197,7 @@ class ConfigManagerV2(_RuntimeController):
log_manager: Optional[LogManager] = None, log_manager: Optional[LogManager] = None,
management_settings: Optional[ManagementServerSettings] = None, management_settings: Optional[ManagementServerSettings] = None,
control_channel: Optional[ControlChannel] = None, control_channel: Optional[ControlChannel] = None,
control_channels: Optional[Iterable[ControlChannel]] = None,
): ):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.path = path self.path = path
@@ -230,7 +207,6 @@ class ConfigManagerV2(_RuntimeController):
self._loader = ConfigLoader(path) self._loader = ConfigLoader(path)
self._log_manager = log_manager or LogManager() self._log_manager = log_manager or LogManager()
self._control_channel = control_channel
self._halt = asyncio.Event() self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None self._loop: Optional[asyncio.AbstractEventLoop] = None
@@ -239,7 +215,6 @@ class ConfigManagerV2(_RuntimeController):
self._last_success_timestamp: Optional[float] = None self._last_success_timestamp: Optional[float] = None
settings = management_settings or ManagementServerSettings(enabled=True) settings = management_settings or ManagementServerSettings(enabled=True)
self._management_settings = settings
self._health_timeout = settings.health_timeout self._health_timeout = settings.health_timeout
self._health_aggregator = HealthAggregator( self._health_aggregator = HealthAggregator(
get_state=lambda: self._state, get_state=lambda: self._state,
@@ -248,28 +223,32 @@ class ConfigManagerV2(_RuntimeController):
health_timeout=self._health_timeout, health_timeout=self._health_timeout,
get_app_health=self.get_health_status, get_app_health=self.get_health_status,
) )
self._api_bridge = ManagementApiBridge(start_fn=self.start, stop_fn=self.stop)
self._control_bridge = ControlChannelBridge( self._control_bridge = ControlChannelBridge(
halt=self._halt, halt=self._halt,
get_state=lambda: self._state, get_state=lambda: self._state,
get_status=self._status_text, get_status=self._status_text,
) )
self._management_server: Optional[ManagementServer] = None channels: list[ControlChannel] = []
if settings.enabled: if settings.enabled:
self._management_server = ManagementServer( channels.append(
host=settings.host, HttpControlChannel(
port=settings.port, host=settings.host,
timeout=settings.timeout, port=settings.port,
health_provider=self._health_aggregator.collect, timeout=settings.timeout,
on_start=self._api_bridge.on_start, health_provider=self._health_aggregator.collect,
on_stop=self._api_bridge.on_stop, )
) )
self.logger.warning( if control_channels is not None:
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s management_enabled=%s", channels.extend(control_channels)
if control_channel is not None:
channels.append(control_channel)
self._control_channels = channels
self.logger.debug(
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s control_channels=%s",
self.path, self.path,
self.update_interval, self.update_interval,
self.work_interval, self.work_interval,
self._management_server is not None, len(self._control_channels),
) )
def _apply_config(self, new_config: Any) -> None: def _apply_config(self, new_config: Any) -> None:
@@ -280,7 +259,7 @@ class ConfigManagerV2(_RuntimeController):
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._apply_config error while applying logging config") self.logger.exception("ConfigManagerV2._apply_config error while applying logging config")
raise raise
self.logger.warning( self.logger.debug(
"ConfigManagerV2._apply_config result: config_type=%s is_dict=%s", "ConfigManagerV2._apply_config result: config_type=%s is_dict=%s",
type(new_config).__name__, type(new_config).__name__,
isinstance(new_config, dict), isinstance(new_config, dict),
@@ -290,21 +269,21 @@ class ConfigManagerV2(_RuntimeController):
try: try:
changed, new_config = await self._loader.load_if_changed() changed, new_config = await self._loader.load_if_changed()
if not changed: if not changed:
self.logger.warning("ConfigManagerV2._update_config result: no changes") self.logger.debug("ConfigManagerV2._update_config result: no changes")
return return
self._apply_config(new_config) self._apply_config(new_config)
self.logger.warning("ConfigManagerV2._update_config result: config updated") self.logger.debug("ConfigManagerV2._update_config result: config updated")
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
self.logger.exception("ConfigManagerV2._update_config error") self.logger.exception("ConfigManagerV2._update_config error")
if self._loader.last_valid_config is None: if self._loader.last_valid_config is None:
self.logger.warning( self.logger.debug(
"ConfigManagerV2._update_config result: no fallback config available detail=%s", "ConfigManagerV2._update_config result: no fallback config available detail=%s",
str(exc), str(exc),
) )
return return
try: try:
self._apply_config(self._loader.last_valid_config) self._apply_config(self._loader.last_valid_config)
self.logger.warning( self.logger.debug(
"ConfigManagerV2._update_config result: fallback to last valid config applied", "ConfigManagerV2._update_config result: fallback to last valid config applied",
) )
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
@@ -318,7 +297,7 @@ class ConfigManagerV2(_RuntimeController):
async def start(self) -> None: async def start(self) -> None:
if self._task is not None and not self._task.done(): if self._task is not None and not self._task.done():
self.logger.warning("ConfigManagerV2.start result: already running") self.logger.debug("ConfigManagerV2.start result: already running")
return return
try: try:
self._loop = asyncio.get_running_loop() self._loop = asyncio.get_running_loop()
@@ -327,22 +306,22 @@ class ConfigManagerV2(_RuntimeController):
raise raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2") self._task = asyncio.create_task(self._run(), name="config-manager-v2")
self._task.add_done_callback(self._on_runtime_task_done) self._task.add_done_callback(self._on_runtime_task_done)
self.logger.warning("ConfigManagerV2.start result: background task started") self.logger.debug("ConfigManagerV2.start result: background task started")
async def stop(self) -> None: async def stop(self) -> None:
if self._task is None: if self._task is None:
self.logger.warning("ConfigManagerV2.stop result: not running") self.logger.debug("ConfigManagerV2.stop result: not running")
return return
self._halt.set() self._halt.set()
if asyncio.current_task() is self._task: if asyncio.current_task() is self._task:
self.logger.warning("ConfigManagerV2.stop result: stop requested from runtime task") self.logger.debug("ConfigManagerV2.stop result: stop requested from runtime task")
return return
try: try:
await self._task await self._task
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.warning("ConfigManagerV2.stop result: runtime task cancelled") self.logger.debug("ConfigManagerV2.stop result: runtime task cancelled")
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2.stop error while awaiting runtime task") self.logger.exception("ConfigManagerV2.stop error while awaiting runtime task")
raise raise
finally: finally:
self.logger.warning("ConfigManagerV2.stop result: completed") self.logger.debug("ConfigManagerV2.stop result: completed")

View File

@@ -1,14 +1,13 @@
"""Management API: HTTP-сервер для /health и /actions/start|stop, адаптеры и сбор здоровья. """Re-exports для обратной совместимости: HealthAggregator, ControlChannelBridge, ManagementServer/HealthServer (HttpControlChannel)."""
from ..control import HttpControlChannel
from ..core import ControlChannelBridge, HealthAggregator
Объединяет сервер, мосты к жизненному циклу и агрегатор здоровья для единого контракта API.""" ManagementServer = HttpControlChannel
from .bridges import ControlChannelBridge, ManagementApiBridge HealthServer = HttpControlChannel
from .health_aggregator import HealthAggregator
from .management_server import HealthServer, ManagementServer
__all__ = [ __all__ = [
"ControlChannelBridge", "ControlChannelBridge",
"HealthAggregator", "HealthAggregator",
"HealthServer", "HealthServer",
"ManagementApiBridge",
"ManagementServer", "ManagementServer",
] ]

View File

@@ -1,90 +0,0 @@
"""Адаптеры, связывающие жизненный цикл менеджера с HTTP API и каналами управления.
ManagementApiBridge отдаёт start/stop в HTTP; ControlChannelBridge — start/stop/status в Telegram и др."""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Awaitable, Callable
from ..types import LifecycleState
logger = logging.getLogger(__name__)
class ManagementApiBridge:
"""Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop)."""
def __init__(
self,
start_fn: Callable[[], Awaitable[None]],
stop_fn: Callable[[], Awaitable[None]],
):
self._start_fn = start_fn
self._stop_fn = stop_fn
logger.warning("ManagementApiBridge.__init__ result: callbacks configured")
async def on_start(self) -> str:
"""Выполнить start и вернуть сообщение для HTTP-ответа."""
try:
await self._start_fn()
except Exception: # noqa: BLE001
logger.exception("ManagementApiBridge.on_start error")
raise
result = "start completed"
logger.warning("ManagementApiBridge.on_start result: %s", result)
return result
async def on_stop(self) -> str:
"""Выполнить stop и вернуть сообщение для HTTP-ответа."""
try:
await self._stop_fn()
except Exception: # noqa: BLE001
logger.exception("ManagementApiBridge.on_stop error")
raise
result = "stop completed"
logger.warning("ManagementApiBridge.on_stop result: %s", result)
return result
class ControlChannelBridge:
"""Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram)."""
def __init__(
self,
halt: asyncio.Event,
get_state: Callable[[], LifecycleState],
get_status: Callable[[], Awaitable[str]],
):
self._halt = halt
self._get_state = get_state
self._get_status = get_status
logger.warning("ControlChannelBridge.__init__ result: callbacks configured")
async def on_start(self) -> str:
"""Обработать внешний start: сбросить halt; идемпотентно при уже running."""
if self._get_state() == LifecycleState.RUNNING:
result = "already running"
logger.warning("ControlChannelBridge.on_start result: %s", result)
return result
self._halt.clear()
result = "start signal accepted"
logger.warning("ControlChannelBridge.on_start result: %s", result)
return result
async def on_stop(self) -> str:
"""Обработать внешний stop: установить halt."""
self._halt.set()
result = "stop signal accepted"
logger.warning("ControlChannelBridge.on_stop result: %s", result)
return result
async def on_status(self) -> str:
"""Вернуть текущий текст статуса."""
try:
result = await self._get_status()
except Exception: # noqa: BLE001
logger.exception("ControlChannelBridge.on_status error")
raise
logger.warning("ControlChannelBridge.on_status result: %s", result)
return result

View File

@@ -54,6 +54,9 @@ def test_action_routes_call_callbacks():
events.append("stop") events.append("stop")
return "stop accepted" return "stop accepted"
async def on_status() -> str:
return "ok"
async def request(port: int, path: str) -> tuple[int, dict[str, str]]: async def request(port: int, path: str) -> tuple[int, dict[str, str]]:
reader, writer = await asyncio.open_connection("127.0.0.1", port) reader, writer = await asyncio.open_connection("127.0.0.1", port)
writer.write( writer.write(
@@ -75,10 +78,8 @@ def test_action_routes_call_callbacks():
port=0, port=0,
timeout=0.2, timeout=0.2,
health_provider=provider, health_provider=provider,
on_start=on_start,
on_stop=on_stop,
) )
await server.start() await server.start(on_start, on_stop, on_status)
try: try:
port = server.port port = server.port
assert port > 0 assert port > 0