Первая итерация рефакторинга

This commit is contained in:
2026-02-21 00:20:07 +03:00
parent 1d71ce406f
commit d888ae7acb
19 changed files with 668 additions and 410 deletions

View File

@@ -13,6 +13,8 @@ readme = "README.md"
requires-python = ">=3.8" requires-python = ">=3.8"
dependencies = [ dependencies = [
"PyYAML>=6.0", "PyYAML>=6.0",
"fastapi>=0.100.0",
"uvicorn[standard]>=0.22.0",
] ]
[project.urls] [project.urls]

View File

@@ -1,3 +1,3 @@
from .v2.manager import ConfigManagerV2 as ConfigManager from .v2 import ConfigManagerV2 as ConfigManager
from .v1.cfg_manager import ConfigManager as LegacyConfigManager from .v1.cfg_manager import ConfigManager as LegacyConfigManager
from .v1.log_manager import LogManager from .v1.log_manager import LogManager

View File

@@ -1,4 +1,7 @@
from .manager import ConfigManagerV2 """Публичный API V2: точка входа в менеджер конфигурации и настройки management-сервера.
from .types import HealthServerSettings
__all__ = ["ConfigManagerV2", "HealthServerSettings"] Экспортирует ConfigManagerV2 и типы настроек для использования приложениями."""
from .core import ConfigManagerV2
from .types import HealthServerSettings, ManagementServerSettings
__all__ = ["ConfigManagerV2", "ManagementServerSettings", "HealthServerSettings"]

View File

@@ -1,52 +0,0 @@
from __future__ import annotations
import asyncio
import hashlib
import json
import os
from typing import Any, Optional
import yaml
class ConfigLoader:
def __init__(self, path: str):
"""Initialize loader state for a specific config file path."""
self.path = path
self.config: Any = None
self.last_valid_config: Any = None
self._last_seen_hash: Optional[str] = None
def _read_file_sync(self) -> str:
"""Read raw config text from disk synchronously."""
with open(self.path, "r", encoding="utf-8") as fh:
return fh.read()
async def read_file_async(self) -> str:
"""Read raw config text from disk in a worker thread."""
return await asyncio.to_thread(self._read_file_sync)
def parse_config(self, data: str) -> Any:
"""Parse config text as YAML or JSON based on file extension."""
extension = os.path.splitext(self.path)[1].lower()
if extension in (".yaml", ".yml"):
return yaml.safe_load(data)
return json.loads(data)
@staticmethod
def _calculate_hash(data: str) -> str:
"""Calculate a stable content hash for change detection."""
return hashlib.sha256(data.encode("utf-8")).hexdigest()
async def load_if_changed(self) -> tuple[bool, Any]:
"""Load and parse config only when file content changed."""
raw_data = await self.read_file_async()
current_hash = self._calculate_hash(raw_data)
if current_hash == self._last_seen_hash:
return False, self.config
self._last_seen_hash = current_hash
parsed = self.parse_config(raw_data)
self.config = parsed
self.last_valid_config = parsed
return True, parsed

View File

@@ -1,3 +1,6 @@
"""Каналы внешнего управления: абстракция и реализация (например, Telegram).
Позволяет запускать, останавливать и запрашивать статус менеджера через ботов и другие интерфейсы."""
from .base import ControlChannel from .base import ControlChannel
from .telegram import TelegramControlChannel from .telegram import TelegramControlChannel

View File

@@ -1,3 +1,6 @@
"""Базовый абстрактный канал управления и типы обработчиков команд.
Определяет контракт: старт/стоп канала и привязка обработчиков start/stop/status."""
from __future__ import annotations from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
@@ -12,10 +15,10 @@ StatusHandler = Callable[[], Awaitable[str]]
class ControlChannel(ABC): class ControlChannel(ABC):
@abstractmethod @abstractmethod
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None: async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
"""Start channel and bind command handlers.""" """Запустить канал и привязать обработчики команд."""
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
async def stop(self) -> None: async def stop(self) -> None:
"""Stop channel and release its resources.""" """Остановить канал и освободить его ресурсы."""
raise NotImplementedError raise NotImplementedError

View File

@@ -1,3 +1,6 @@
"""Реализация канала управления через Telegram Bot API (long polling).
Принимает команды /start, /stop, /status в указанном чате и вызывает привязанные обработчики."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
@@ -18,7 +21,7 @@ class TelegramControlChannel(ControlChannel):
poll_interval: float = 2.0, poll_interval: float = 2.0,
logger: Optional[logging.Logger] = None, logger: Optional[logging.Logger] = None,
): ):
"""Initialize Telegram polling channel with bot and chat settings.""" """Инициализация канала опроса Telegram с настройками бота и чата."""
self._token = token self._token = token
self._chat_id = chat_id self._chat_id = chat_id
self._poll_interval = poll_interval self._poll_interval = poll_interval
@@ -31,7 +34,7 @@ class TelegramControlChannel(ControlChannel):
self._logger = logger or logging.getLogger(__name__) self._logger = logger or logging.getLogger(__name__)
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None: async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
"""Start polling Telegram updates and register command callbacks.""" """Запустить опрос обновлений Telegram и зарегистрировать колбэки команд."""
if self._task is not None and not self._task.done(): if self._task is not None and not self._task.done():
return return
self._on_start = on_start self._on_start = on_start
@@ -41,7 +44,7 @@ class TelegramControlChannel(ControlChannel):
self._task = asyncio.create_task(self._poll_loop()) self._task = asyncio.create_task(self._poll_loop())
async def stop(self) -> None: async def stop(self) -> None:
"""Stop polling loop and wait until task termination.""" """Остановить цикл опроса и дождаться завершения задачи."""
self._stop_event.set() self._stop_event.set()
if self._task is not None: if self._task is not None:
self._task.cancel() self._task.cancel()
@@ -52,7 +55,7 @@ class TelegramControlChannel(ControlChannel):
self._task = None self._task = None
async def _poll_loop(self) -> None: async def _poll_loop(self) -> None:
"""Continuously fetch updates and dispatch supported commands.""" """Непрерывно получать обновления и вызывать поддерживаемые команды."""
while not self._stop_event.is_set(): while not self._stop_event.is_set():
try: try:
updates = await asyncio.to_thread(self._fetch_updates) updates = await asyncio.to_thread(self._fetch_updates)
@@ -67,7 +70,7 @@ class TelegramControlChannel(ControlChannel):
continue continue
def _fetch_updates(self) -> list[dict]: def _fetch_updates(self) -> list[dict]:
"""Pull new Telegram updates using the latest offset.""" """Запросить новые обновления Telegram с учётом последнего offset."""
params = {"timeout": 0} params = {"timeout": 0}
if self._offset is not None: if self._offset is not None:
params["offset"] = self._offset params["offset"] = self._offset
@@ -82,7 +85,7 @@ class TelegramControlChannel(ControlChannel):
return result return result
async def _process_update(self, update: dict) -> None: async def _process_update(self, update: dict) -> None:
"""Handle one Telegram update and execute mapped command.""" """Обработать одно обновление Telegram и выполнить соответствующую команду."""
message = update.get("message") or {} message = update.get("message") or {}
text = (message.get("text") or "").strip().lower() text = (message.get("text") or "").strip().lower()
chat = message.get("chat") or {} chat = message.get("chat") or {}
@@ -103,7 +106,7 @@ class TelegramControlChannel(ControlChannel):
await asyncio.to_thread(self._send_message, reply) await asyncio.to_thread(self._send_message, reply)
def _send_message(self, text: str) -> None: def _send_message(self, text: str) -> None:
"""Send plain-text reply to the configured Telegram chat.""" """Отправить текстовый ответ в настроенный чат Telegram."""
encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text}) encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text})
url = f"https://api.telegram.org/bot{self._token}/sendMessage" url = f"https://api.telegram.org/bot{self._token}/sendMessage"
req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST") req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST")

View File

@@ -0,0 +1,8 @@
"""Ядро V2: жизненный цикл менеджера, загрузка конфига и цикл воркера.
Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute()."""
from .config_loader import ConfigLoader
from .manager import ConfigManagerV2
from .scheduler import WorkerLoop
__all__ = ["ConfigLoader", "ConfigManagerV2", "WorkerLoop"]

View File

@@ -0,0 +1,70 @@
"""Загрузчик конфигурации из файла (YAML/JSON) с обнаружением изменений по хешу.
Читает файл синхронно и асинхронно, парсит по расширению и возвращает последний валидный конфиг при ошибках."""
from __future__ import annotations
import asyncio
import hashlib
import json
import os
from typing import Any, Optional
import yaml
class ConfigLoader:
def __init__(self, path: str):
"""Инициализация состояния загрузчика для указанного пути к файлу конфига."""
self.path = path
self.config: Any = None
self.last_valid_config: Any = None
self._last_seen_hash: Optional[str] = None
def _read_file_sync(self) -> str:
"""Синхронно прочитать сырой текст конфига с диска."""
with open(self.path, "r", encoding="utf-8") as fh:
return fh.read()
async def read_file_async(self) -> str:
"""Прочитать сырой текст конфига с диска в рабочем потоке."""
return await asyncio.to_thread(self._read_file_sync)
def parse_config(self, data: str) -> Any:
"""Распарсить текст конфига как YAML или JSON по расширению файла."""
extension = os.path.splitext(self.path)[1].lower()
if extension in (".yaml", ".yml"):
return yaml.safe_load(data)
return json.loads(data)
@staticmethod
def _calculate_hash(data: str) -> str:
"""Вычислить устойчивый хеш содержимого для обнаружения изменений."""
return hashlib.sha256(data.encode("utf-8")).hexdigest()
async def load_if_changed(self) -> tuple[bool, Any]:
"""Загрузить и распарсить конфиг только при изменении содержимого файла."""
raw_data = await self.read_file_async()
current_hash = self._calculate_hash(raw_data)
if current_hash == self._last_seen_hash:
return False, self.config
self._last_seen_hash = current_hash
parsed = self.parse_config(raw_data)
self.config = parsed
self.last_valid_config = parsed
return True, parsed
def extract_scheduler_intervals(
config: Any,
default_update: float,
default_work: float,
) -> tuple[float, float]:
"""Извлекает update_interval и work_interval из конфига (dict). Возвращает значения по умолчанию при ошибке."""
if not isinstance(config, dict):
return default_update, default_work
upd = config.get("update_interval")
wrk = config.get("work_interval")
u = float(upd) if isinstance(upd, (int, float)) and upd > 0 else default_update
w = float(wrk) if isinstance(wrk, (int, float)) and wrk > 0 else default_work
return u, w

View File

@@ -0,0 +1,228 @@
"""Главный класс менеджера V2: оркестрация жизненного цикла, конфига, API и каналов управления.
Запускает воркер и периодическое обновление конфига, поднимает management-сервер и control-канал при наличии настроек."""
from __future__ import annotations
import asyncio
import logging
from typing import Any, Optional
from ...v1.log_manager import LogManager
from ..control.base import ControlChannel
from ..management import (
ControlChannelBridge,
HealthAggregator,
ManagementApiBridge,
ManagementServer,
)
from ..types import HealthPayload, LifecycleState, ManagementServerSettings
from .config_loader import ConfigLoader, extract_scheduler_intervals
from .scheduler import WorkerLoop
class ConfigManagerV2:
DEFAULT_UPDATE_INTERVAL = 5.0
DEFAULT_WORK_INTERVAL = 2.0
def __init__(
self,
path: str,
log_manager: Optional[LogManager] = None,
management_settings: Optional[ManagementServerSettings] = None,
health_settings: Optional[ManagementServerSettings] = None,
control_channel: Optional[ControlChannel] = None,
):
"""Инициализация подсистем менеджера и состояния рантайма."""
self.path = path
self.config: Any = None
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
self.work_interval = self.DEFAULT_WORK_INTERVAL
self._loader = ConfigLoader(path)
self._log_manager = log_manager or LogManager()
self._control_channel = control_channel
self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._state = LifecycleState.IDLE
self._last_execute_error: Optional[str] = None
if management_settings is not None and health_settings is not None:
raise ValueError("Use either management_settings or health_settings, not both")
self._management_settings = management_settings or health_settings or ManagementServerSettings(enabled=True)
self._health_aggregator = HealthAggregator(
get_state=lambda: self._state,
get_last_error=lambda: self._last_execute_error,
get_app_health=self.get_health_status,
)
self._api_bridge = ManagementApiBridge(start_fn=self.start, stop_fn=self.stop)
self._control_bridge = ControlChannelBridge(
halt=self._halt,
get_state=lambda: self._state,
get_status=self._status_text,
)
self._management_server: Optional[ManagementServer] = None
if self._management_settings.enabled:
self._management_server = ManagementServer(
host=self._management_settings.host,
port=self._management_settings.port,
timeout=self._management_settings.timeout,
health_provider=self._health_aggregator.collect,
on_start=self._api_bridge.on_start,
on_stop=self._api_bridge.on_stop,
)
self.logger = logging.getLogger(__name__)
def _apply_config(self, new_config: Any) -> None:
"""Применить загруженный конфиг: интервалы и log_manager. Вызывается после load_if_changed."""
self.config = new_config
self.update_interval, self.work_interval = extract_scheduler_intervals(
new_config,
self.DEFAULT_UPDATE_INTERVAL,
self.DEFAULT_WORK_INTERVAL,
)
if isinstance(new_config, dict):
self._log_manager.apply_config(new_config)
async def _update_config(self) -> None:
"""Перезагрузить конфиг при изменении файла и применить к состоянию и log_manager."""
try:
changed, new_config = await self._loader.load_if_changed()
if not changed:
return
self._apply_config(new_config)
except Exception as exc: # noqa: BLE001
self.logger.error("Error reading/parsing config file: %s", exc)
if self._loader.last_valid_config is not None:
self._apply_config(self._loader.last_valid_config)
def execute(self) -> None:
"""Переопределить в подклассе для реализации одной единицы блокирующей работы."""
def get_health_status(self) -> HealthPayload:
"""Вернуть payload здоровья приложения для /health."""
return {"status": "ok"}
def _on_execute_success(self) -> None:
"""Сбросить маркер последней ошибки выполнения после успешного запуска."""
self._last_execute_error = None
def _on_execute_error(self, exc: Exception) -> None:
"""Сохранить и залогировать детали ошибки выполнения для отчёта здоровья."""
self._last_execute_error = str(exc)
self.logger.error("Execution error: %s", exc)
async def _worker_loop(self) -> None:
"""Вызывать execute() циклически до запроса остановки."""
worker = WorkerLoop(
execute=self.execute,
get_interval=lambda: self.work_interval,
halt_event=self._halt,
on_error=self._on_execute_error,
on_success=self._on_execute_success,
)
await worker.run()
async def _periodic_update_loop(self) -> None:
"""Периодически проверять файл конфига на обновления до остановки."""
while not self._halt.is_set():
await self._update_config()
try:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError:
continue
async def _status_text(self) -> str:
"""Сформировать читаемый статус рантайма для каналов управления."""
health = await self._health_aggregator.collect()
detail = health.get("detail")
if detail:
return f"state={self._state.value}; health={health['status']}; detail={detail}"
return f"state={self._state.value}; health={health['status']}"
async def _start_control_channel(self) -> None:
"""Запустить настроенный канал управления с привязанными обработчиками команд."""
if self._control_channel is None:
return
await self._control_channel.start(
self._control_bridge.on_start,
self._control_bridge.on_stop,
self._control_bridge.on_status,
)
async def _stop_control_channel(self) -> None:
"""Остановить настроенный канал управления, если он активен."""
if self._control_channel is None:
return
await self._control_channel.stop()
async def _run(self) -> None:
"""Запустить жизненный цикл менеджера и координировать фоновые задачи."""
self._state = LifecycleState.STARTING
self._halt.clear()
await self._update_config()
if self._management_server is not None:
await self._management_server.start()
await self._start_control_channel()
self._state = LifecycleState.RUNNING
self.logger.info("ConfigManagerV2 started")
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
raise
finally:
self._state = LifecycleState.STOPPING
self._halt.set()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
await self._stop_control_channel()
if self._management_server is not None:
await self._management_server.stop()
self._state = LifecycleState.STOPPED
self.logger.info("ConfigManagerV2 stopped")
async def start(self) -> None:
"""Запустить жизненный цикл менеджера из активного asyncio-контекста."""
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManagerV2 is already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.error("start() must be called from within an async context")
raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
try:
await self._task
finally:
self._task = None
async def stop(self) -> None:
"""Запросить плавную остановку и дождаться завершения менеджера."""
if self._task is None:
self.logger.warning("ConfigManagerV2 is not running")
return
self._halt.set()
if asyncio.current_task() is self._task:
return
try:
await self._task
except asyncio.CancelledError:
pass

View File

@@ -1,3 +1,6 @@
"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке с паузой между итерациями.
Поддерживает halt-событие для остановки, колбэки on_success/on_error для учёта ошибок и здоровья."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
@@ -14,7 +17,7 @@ class WorkerLoop:
on_error: Optional[Callable[[Exception], None]] = None, on_error: Optional[Callable[[Exception], None]] = None,
on_success: Optional[Callable[[], None]] = None, on_success: Optional[Callable[[], None]] = None,
): ):
"""Store callbacks and synchronization primitives for worker execution.""" """Сохранить колбэки и примитивы синхронизации для выполнения воркера."""
self._execute = execute self._execute = execute
self._get_interval = get_interval self._get_interval = get_interval
self._halt_event = halt_event self._halt_event = halt_event
@@ -22,7 +25,7 @@ class WorkerLoop:
self._on_success = on_success self._on_success = on_success
async def run(self) -> None: async def run(self) -> None:
"""Run execute repeatedly until halt is requested.""" """Вызывать execute циклически до запроса остановки."""
while not self._halt_event.is_set(): while not self._halt_event.is_set():
try: try:
await asyncio.to_thread(self._execute) await asyncio.to_thread(self._execute)

View File

@@ -1,80 +0,0 @@
from __future__ import annotations
import asyncio
import json
from collections.abc import Awaitable, Callable
from typing import Optional
from .types import HealthPayload
class HealthServer:
def __init__(
self,
host: str,
port: int,
path: str,
timeout: float,
health_provider: Callable[[], Awaitable[HealthPayload]],
):
"""Configure lightweight HTTP health server parameters and callback."""
self._host = host
self._port = port
self._path = path
self._timeout = timeout
self._health_provider = health_provider
self._server: Optional[asyncio.base_events.Server] = None
async def start(self) -> None:
"""Start listening for healthcheck requests if not running."""
if self._server is not None:
return
self._server = await asyncio.start_server(self._handle_connection, self._host, self._port)
async def stop(self) -> None:
"""Stop the health server and release the listening socket."""
if self._server is None:
return
self._server.close()
await self._server.wait_closed()
self._server = None
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
"""Process one HTTP connection and return JSON health payload."""
status_code = 404
payload: HealthPayload = {"status": "unhealthy", "detail": "not found"}
try:
request_line = await reader.readline()
parts = request_line.decode("utf-8", errors="ignore").strip().split(" ")
if len(parts) >= 2:
method, path = parts[0], parts[1]
if method == "GET" and path == self._path:
status_code, payload = await self._build_health_response()
except Exception: # noqa: BLE001
status_code = 500
payload = {"status": "unhealthy", "detail": "internal error"}
body = json.dumps(payload).encode("utf-8")
phrase = "OK" if status_code == 200 else "SERVICE UNAVAILABLE" if status_code == 503 else "NOT FOUND"
response = (
f"HTTP/1.1 {status_code} {phrase}\r\n"
"Content-Type: application/json\r\n"
f"Content-Length: {len(body)}\r\n"
"Connection: close\r\n"
"\r\n"
).encode("utf-8") + body
writer.write(response)
await writer.drain()
writer.close()
await writer.wait_closed()
async def _build_health_response(self) -> tuple[int, HealthPayload]:
"""Build HTTP status code and body from application health callback."""
try:
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
status = payload.get("status", "unhealthy")
return (200, payload) if status == "ok" else (503, payload)
except Exception as exc: # noqa: BLE001
return 503, {"status": "unhealthy", "detail": str(exc)}

View File

@@ -0,0 +1,14 @@
"""Management API: HTTP-сервер для /health и /actions/start|stop, адаптеры и сбор здоровья.
Объединяет сервер, мосты к жизненному циклу и агрегатор здоровья для единого контракта API."""
from .bridges import ControlChannelBridge, ManagementApiBridge
from .health_aggregator import HealthAggregator
from .management_server import HealthServer, ManagementServer
__all__ = [
"ControlChannelBridge",
"HealthAggregator",
"HealthServer",
"ManagementApiBridge",
"ManagementServer",
]

View File

@@ -0,0 +1,61 @@
"""Адаптеры, связывающие жизненный цикл менеджера с HTTP API и каналами управления.
ManagementApiBridge отдаёт start/stop в HTTP; ControlChannelBridge — start/stop/status в Telegram и др."""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
from ..types import LifecycleState
class ManagementApiBridge:
"""Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop)."""
def __init__(
self,
start_fn: Callable[[], Awaitable[None]],
stop_fn: Callable[[], Awaitable[None]],
):
self._start_fn = start_fn
self._stop_fn = stop_fn
async def on_start(self) -> str:
"""Выполнить start и вернуть сообщение для HTTP-ответа."""
await self._start_fn()
return "start completed"
async def on_stop(self) -> str:
"""Выполнить stop и вернуть сообщение для HTTP-ответа."""
await self._stop_fn()
return "stop completed"
class ControlChannelBridge:
"""Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram)."""
def __init__(
self,
halt: asyncio.Event,
get_state: Callable[[], LifecycleState],
get_status: Callable[[], Awaitable[str]],
):
self._halt = halt
self._get_state = get_state
self._get_status = get_status
async def on_start(self) -> str:
"""Обработать внешний start: сбросить halt; идемпотентно при уже running."""
if self._get_state() == LifecycleState.RUNNING:
return "already running"
self._halt.clear()
return "start signal accepted"
async def on_stop(self) -> str:
"""Обработать внешний stop: установить halt."""
self._halt.set()
return "stop signal accepted"
async def on_status(self) -> str:
"""Вернуть текущий текст статуса."""
return await self._get_status()

View File

@@ -0,0 +1,38 @@
"""Собирает состояние жизненного цикла и здоровья приложения в один ответ для /health.
Учитывает состояние (running/stopping), последнюю ошибку execute и результат get_health_status()."""
from __future__ import annotations
from collections.abc import Callable
from ..types import HealthPayload, LifecycleState
class HealthAggregator:
"""Формирует ответ здоровья из текущего состояния, последней ошибки и здоровья приложения."""
def __init__(
self,
get_state: Callable[[], LifecycleState],
get_last_error: Callable[[], str | None],
get_app_health: Callable[[], HealthPayload],
):
self._get_state = get_state
self._get_last_error = get_last_error
self._get_app_health = get_app_health
async def collect(self) -> HealthPayload:
"""Вернуть агрегированное здоровье: unhealthy при не running или ошибке, иначе здоровье приложения."""
state = self._get_state()
if state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}:
return {"status": "unhealthy", "detail": f"state={state.value}"}
last_error = self._get_last_error()
if last_error is not None:
return {"status": "unhealthy", "detail": last_error}
result = self._get_app_health()
status = result.get("status", "unhealthy")
if status not in {"ok", "degraded", "unhealthy"}:
return {"status": "unhealthy", "detail": "invalid health status"}
return result

View File

@@ -0,0 +1,145 @@
"""Management HTTP API на FastAPI: эндпоинты /health, /actions/start, /actions/stop.
Единообразное описание маршрутов через декораторы FastAPI."""
from __future__ import annotations
import asyncio
import json
from collections.abc import Awaitable, Callable
from typing import Any, Optional
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from uvicorn import Config, Server
from ..types import HealthPayload
# Захардкоженные эндпоинты management API.
PATH_HEALTH = "/health"
PATH_ACTION_START = "/actions/start"
PATH_ACTION_STOP = "/actions/stop"
class ManagementServer:
def __init__(
self,
host: str,
port: int,
timeout: float,
health_provider: Callable[[], Awaitable[HealthPayload]],
on_start: Optional[Callable[[], Awaitable[str]]] = None,
on_stop: Optional[Callable[[], Awaitable[str]]] = None,
):
"""Настройка параметров и колбэков лёгкого HTTP management-сервера."""
self._host = host
self._port = port
self._timeout = timeout
self._health_provider = health_provider
self._on_start = on_start
self._on_stop = on_stop
self._uvicorn_server: Optional[Server] = None
self._serve_task: Optional[asyncio.Task[None]] = None
self._bound_port: Optional[int] = None
self._app = self._create_app()
def _create_app(self) -> FastAPI:
app = FastAPI(title="Config Manager Management API")
@app.get(PATH_HEALTH)
async def health() -> JSONResponse:
return await self._health_response()
@app.get(PATH_ACTION_START)
@app.post(PATH_ACTION_START)
async def action_start() -> JSONResponse:
return await self._action_response("start", self._on_start)
@app.get(PATH_ACTION_STOP)
@app.post(PATH_ACTION_STOP)
async def action_stop() -> JSONResponse:
return await self._action_response("stop", self._on_stop)
return app
async def _health_response(self) -> JSONResponse:
"""Сформировать HTTP-ответ из колбэка здоровья приложения."""
try:
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
status = payload.get("status", "unhealthy")
status_code = 200 if status == "ok" else 503
return JSONResponse(content=payload, status_code=status_code)
except Exception as exc: # noqa: BLE001
return JSONResponse(
content={"status": "unhealthy", "detail": str(exc)},
status_code=503,
)
async def _action_response(
self,
action: str,
callback: Optional[Callable[[], Awaitable[str]]],
) -> JSONResponse:
"""Сформировать HTTP-ответ для колбэка действия start/stop."""
if callback is None:
return JSONResponse(
content={"status": "error", "detail": f"{action} handler is not configured"},
status_code=404,
)
try:
detail = await callback()
if not detail:
detail = f"{action} action accepted"
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
except Exception as exc: # noqa: BLE001
return JSONResponse(
content={"status": "error", "detail": str(exc)},
status_code=500,
)
def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]:
"""Для тестов: вернуть (status_code, payload) как раньше."""
async def _run() -> tuple[int, HealthPayload]:
response = await self._health_response()
body: Any = response.body
if isinstance(body, bytes):
body = json.loads(body.decode("utf-8"))
return response.status_code, body
return _run()
async def start(self) -> None:
"""Начать приём запросов к API здоровья и действий, если ещё не запущен."""
if self._serve_task is not None:
return
config = Config(
app=self._app,
host=self._host,
port=self._port,
log_level="warning",
)
self._uvicorn_server = Server(config)
self._serve_task = asyncio.create_task(self._uvicorn_server.serve())
await asyncio.sleep(0.05)
if self._uvicorn_server.servers:
sock = self._uvicorn_server.servers[0].sockets[0]
self._bound_port = sock.getsockname()[1]
else:
self._bound_port = self._port
async def stop(self) -> None:
"""Остановить management-сервер и освободить сокет."""
if self._uvicorn_server is None or self._serve_task is None:
return
self._uvicorn_server.should_exit = True
await self._serve_task
self._uvicorn_server = None
self._serve_task = None
self._bound_port = None
@property
def port(self) -> int:
"""Порт, на котором слушает сервер (после start); при port=0 — фактически выданный ОС."""
return self._bound_port if self._bound_port is not None else self._port
# Backward-compatible alias.
HealthServer = ManagementServer

View File

@@ -1,256 +0,0 @@
from __future__ import annotations
import asyncio
import logging
from collections.abc import Awaitable
from typing import Any, Optional
from ..v1.log_manager import LogManager
from .config_loader import ConfigLoader
from .control.base import ControlChannel
from .health import HealthServer
from .scheduler import WorkerLoop
from .types import HealthPayload, HealthServerSettings, LifecycleState
class ConfigManagerV2:
DEFAULT_UPDATE_INTERVAL = 5.0
DEFAULT_WORK_INTERVAL = 2.0
def __init__(
self,
path: str,
log_manager: Optional[LogManager] = None,
health_settings: Optional[HealthServerSettings] = None,
control_channel: Optional[ControlChannel] = None,
):
"""Initialize manager subsystems and runtime state."""
self.path = path
self.config: Any = None
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
self.work_interval = self.DEFAULT_WORK_INTERVAL
self._loader = ConfigLoader(path)
self._log_manager = log_manager or LogManager()
self._control_channel = control_channel
self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._state = LifecycleState.IDLE
self._last_execute_error: Optional[str] = None
self._health_settings = health_settings or HealthServerSettings(enabled=False)
self._health_server: Optional[HealthServer] = None
if self._health_settings.enabled:
self._health_server = HealthServer(
host=self._health_settings.host,
port=self._health_settings.port,
path=self._health_settings.path,
timeout=self._health_settings.timeout,
health_provider=self._collect_health,
)
self.logger = logging.getLogger(__name__)
def _read_file_sync(self) -> str:
"""Read config file synchronously via the shared loader."""
return self._loader._read_file_sync()
async def _read_file_async(self) -> str:
"""Read config file asynchronously via a worker thread."""
return await self._loader.read_file_async()
def _parse_config(self, data: str) -> Any:
"""Parse raw config text to a Python object."""
return self._loader.parse_config(data)
def _update_intervals_from_config(self) -> None:
"""Refresh work and update intervals from current config."""
if not isinstance(self.config, dict):
return
upd = self.config.get("update_interval")
wrk = self.config.get("work_interval")
if isinstance(upd, (int, float)) and upd > 0:
self.update_interval = float(upd)
else:
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
if isinstance(wrk, (int, float)) and wrk > 0:
self.work_interval = float(wrk)
else:
self.work_interval = self.DEFAULT_WORK_INTERVAL
async def _update_config(self) -> None:
"""Reload config and apply new settings when content changes."""
try:
changed, new_config = await self._loader.load_if_changed()
if not changed:
return
self.config = new_config
self._update_intervals_from_config()
if isinstance(new_config, dict):
self._log_manager.apply_config(new_config)
except Exception as exc: # noqa: BLE001
# Keep current config untouched and continue with last valid settings.
self.logger.error("Error reading/parsing config file: %s", exc)
if self._loader.last_valid_config is not None:
self.config = self._loader.last_valid_config
self._update_intervals_from_config()
def execute(self) -> None:
"""Override in subclass to implement one unit of blocking work."""
def get_health_status(self) -> HealthPayload:
"""Return application-specific health payload for /health."""
return {"status": "ok"}
async def _collect_health(self) -> HealthPayload:
"""Aggregate lifecycle and app status into one health result."""
if self._state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}:
return {"status": "unhealthy", "detail": f"state={self._state.value}"}
if self._last_execute_error is not None:
return {"status": "unhealthy", "detail": self._last_execute_error}
result = self.get_health_status()
status = result.get("status", "unhealthy")
if status not in {"ok", "degraded", "unhealthy"}:
return {"status": "unhealthy", "detail": "invalid health status"}
return result
def _on_execute_success(self) -> None:
"""Clear the last execution error marker after successful run."""
self._last_execute_error = None
def _on_execute_error(self, exc: Exception) -> None:
"""Store and log execution failure details for health reporting."""
self._last_execute_error = str(exc)
self.logger.error("Execution error: %s", exc)
async def _worker_loop(self) -> None:
"""Run execute() repeatedly until shutdown is requested."""
worker = WorkerLoop(
execute=self.execute,
get_interval=lambda: self.work_interval,
halt_event=self._halt,
on_error=self._on_execute_error,
on_success=self._on_execute_success,
)
await worker.run()
async def _periodic_update_loop(self) -> None:
"""Periodically check config file for updates until shutdown."""
while not self._halt.is_set():
await self._update_config()
try:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError:
continue
async def _status_text(self) -> str:
"""Build human-readable runtime status for control channels."""
health = await self._collect_health()
detail = health.get("detail")
if detail:
return f"state={self._state.value}; health={health['status']}; detail={detail}"
return f"state={self._state.value}; health={health['status']}"
async def _control_start(self) -> str:
"""Handle external start command for control channels."""
if self._state == LifecycleState.RUNNING:
return "already running"
self._halt.clear()
return "start signal accepted"
async def _control_stop(self) -> str:
"""Handle external stop command for control channels."""
self._halt.set()
return "stop signal accepted"
async def _control_status(self) -> str:
"""Handle external status command for control channels."""
return await self._status_text()
async def _start_control_channel(self) -> None:
"""Start configured control channel with bound command handlers."""
if self._control_channel is None:
return
await self._control_channel.start(self._control_start, self._control_stop, self._control_status)
async def _stop_control_channel(self) -> None:
"""Stop configured control channel if it is active."""
if self._control_channel is None:
return
await self._control_channel.stop()
async def _run(self) -> None:
"""Run manager lifecycle and coordinate all background tasks."""
self._state = LifecycleState.STARTING
self._halt.clear()
await self._update_config()
if self._health_server is not None:
await self._health_server.start()
await self._start_control_channel()
self._state = LifecycleState.RUNNING
self.logger.info("ConfigManagerV2 started")
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
raise
finally:
self._state = LifecycleState.STOPPING
self._halt.set()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
await self._stop_control_channel()
if self._health_server is not None:
await self._health_server.stop()
self._state = LifecycleState.STOPPED
self.logger.info("ConfigManagerV2 stopped")
async def start(self) -> None:
"""Start manager lifecycle from an active asyncio context."""
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManagerV2 is already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.error("start() must be called from within an async context")
raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
try:
await self._task
finally:
self._task = None
async def stop(self) -> None:
"""Request graceful shutdown and wait for manager completion."""
if self._task is None:
self.logger.warning("ConfigManagerV2 is not running")
return
self._halt.set()
if asyncio.current_task() is self._task:
return
try:
await self._task
except asyncio.CancelledError:
pass

View File

@@ -1,3 +1,6 @@
"""Общие типы V2: состояние здоровья, жизненного цикла и настройки management-сервера.
Используются в core, management и control для единообразных контрактов."""
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
@@ -22,9 +25,12 @@ class LifecycleState(str, Enum):
@dataclass @dataclass
class HealthServerSettings: class ManagementServerSettings:
enabled: bool = False enabled: bool = False
host: str = "0.0.0.0" host: str = "0.0.0.0"
port: int = 8000 port: int = 8000
path: str = "/health"
timeout: float = 3.0 timeout: float = 3.0
# Backward-compatible alias.
HealthServerSettings = ManagementServerSettings

View File

@@ -1,6 +1,7 @@
import asyncio import asyncio
import json
from config_manager.v2.health import HealthServer from config_manager.v2.management import ManagementServer
def test_health_mapping_ok_to_200(): def test_health_mapping_ok_to_200():
@@ -8,10 +9,9 @@ def test_health_mapping_ok_to_200():
return {"status": "ok"} return {"status": "ok"}
async def scenario() -> None: async def scenario() -> None:
server = HealthServer( server = ManagementServer(
host="127.0.0.1", host="127.0.0.1",
port=8000, port=8000,
path="/health",
timeout=0.2, timeout=0.2,
health_provider=provider, health_provider=provider,
) )
@@ -27,10 +27,9 @@ def test_health_mapping_unhealthy_to_503():
return {"status": "unhealthy", "detail": "worker failed"} return {"status": "unhealthy", "detail": "worker failed"}
async def scenario() -> None: async def scenario() -> None:
server = HealthServer( server = ManagementServer(
host="127.0.0.1", host="127.0.0.1",
port=8000, port=8000,
path="/health",
timeout=0.2, timeout=0.2,
health_provider=provider, health_provider=provider,
) )
@@ -39,3 +38,63 @@ def test_health_mapping_unhealthy_to_503():
assert payload["status"] == "unhealthy" assert payload["status"] == "unhealthy"
asyncio.run(scenario()) asyncio.run(scenario())
def test_action_routes_call_callbacks():
events: list[str] = []
async def provider():
return {"status": "ok"}
async def on_start() -> str:
events.append("start")
return "start accepted"
async def on_stop() -> str:
events.append("stop")
return "stop accepted"
async def request(port: int, path: str) -> tuple[int, dict[str, str]]:
reader, writer = await asyncio.open_connection("127.0.0.1", port)
writer.write(
f"GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n".encode("utf-8")
)
await writer.drain()
raw = await reader.read()
writer.close()
await writer.wait_closed()
header, body = raw.split(b"\r\n\r\n", maxsplit=1)
status_code = int(header.split(b" ")[1])
payload = json.loads(body.decode("utf-8"))
return status_code, payload
async def scenario() -> None:
server = ManagementServer(
host="127.0.0.1",
port=0,
timeout=0.2,
health_provider=provider,
on_start=on_start,
on_stop=on_stop,
)
await server.start()
try:
port = server.port
assert port > 0
start_code, start_payload = await request(port, "/actions/start")
stop_code, stop_payload = await request(port, "/actions/stop")
finally:
await server.stop()
assert start_code == 200
assert start_payload["status"] == "ok"
assert start_payload["detail"] == "start accepted"
assert stop_code == 200
assert stop_payload["status"] == "ok"
assert stop_payload["detail"] == "stop accepted"
assert events == ["start", "stop"]
asyncio.run(scenario())