Compare commits
4 Commits
feature/he
...
2d6179d366
| Author | SHA1 | Date | |
|---|---|---|---|
| 2d6179d366 | |||
| d888ae7acb | |||
| 1d71ce406f | |||
| 8da6df0b2a |
@@ -13,6 +13,8 @@ readme = "README.md"
|
|||||||
requires-python = ">=3.8"
|
requires-python = ">=3.8"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"PyYAML>=6.0",
|
"PyYAML>=6.0",
|
||||||
|
"fastapi>=0.100.0",
|
||||||
|
"uvicorn[standard]>=0.22.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.urls]
|
[project.urls]
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
from .v2.manager import ConfigManagerV2 as ConfigManager
|
from .v2 import ConfigManagerV2 as ConfigManager
|
||||||
from .v1.cfg_manager import ConfigManager as LegacyConfigManager
|
from .v1.cfg_manager import ConfigManager as LegacyConfigManager
|
||||||
from .v1.log_manager import LogManager
|
from .v1.log_manager import LogManager
|
||||||
@@ -1,4 +1,7 @@
|
|||||||
from .manager import ConfigManagerV2
|
"""Публичный API V2: точка входа в менеджер конфигурации и настройки management-сервера.
|
||||||
from .types import HealthServerSettings
|
|
||||||
|
|
||||||
__all__ = ["ConfigManagerV2", "HealthServerSettings"]
|
Экспортирует ConfigManagerV2 и типы настроек для использования приложениями."""
|
||||||
|
from .core import ConfigManagerV2
|
||||||
|
from .types import HealthServerSettings, ManagementServerSettings
|
||||||
|
|
||||||
|
__all__ = ["ConfigManagerV2", "ManagementServerSettings", "HealthServerSettings"]
|
||||||
|
|||||||
@@ -1,52 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import hashlib
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
from typing import Any, Optional
|
|
||||||
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
|
|
||||||
class ConfigLoader:
|
|
||||||
def __init__(self, path: str):
|
|
||||||
"""Initialize loader state for a specific config file path."""
|
|
||||||
self.path = path
|
|
||||||
self.config: Any = None
|
|
||||||
self.last_valid_config: Any = None
|
|
||||||
self._last_seen_hash: Optional[str] = None
|
|
||||||
|
|
||||||
def _read_file_sync(self) -> str:
|
|
||||||
"""Read raw config text from disk synchronously."""
|
|
||||||
with open(self.path, "r", encoding="utf-8") as fh:
|
|
||||||
return fh.read()
|
|
||||||
|
|
||||||
async def read_file_async(self) -> str:
|
|
||||||
"""Read raw config text from disk in a worker thread."""
|
|
||||||
return await asyncio.to_thread(self._read_file_sync)
|
|
||||||
|
|
||||||
def parse_config(self, data: str) -> Any:
|
|
||||||
"""Parse config text as YAML or JSON based on file extension."""
|
|
||||||
extension = os.path.splitext(self.path)[1].lower()
|
|
||||||
if extension in (".yaml", ".yml"):
|
|
||||||
return yaml.safe_load(data)
|
|
||||||
return json.loads(data)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _calculate_hash(data: str) -> str:
|
|
||||||
"""Calculate a stable content hash for change detection."""
|
|
||||||
return hashlib.sha256(data.encode("utf-8")).hexdigest()
|
|
||||||
|
|
||||||
async def load_if_changed(self) -> tuple[bool, Any]:
|
|
||||||
"""Load and parse config only when file content changed."""
|
|
||||||
raw_data = await self.read_file_async()
|
|
||||||
current_hash = self._calculate_hash(raw_data)
|
|
||||||
if current_hash == self._last_seen_hash:
|
|
||||||
return False, self.config
|
|
||||||
|
|
||||||
self._last_seen_hash = current_hash
|
|
||||||
parsed = self.parse_config(raw_data)
|
|
||||||
self.config = parsed
|
|
||||||
self.last_valid_config = parsed
|
|
||||||
return True, parsed
|
|
||||||
@@ -1,3 +1,6 @@
|
|||||||
|
"""Каналы внешнего управления: абстракция и реализация (например, Telegram).
|
||||||
|
|
||||||
|
Позволяет запускать, останавливать и запрашивать статус менеджера через ботов и другие интерфейсы."""
|
||||||
from .base import ControlChannel
|
from .base import ControlChannel
|
||||||
from .telegram import TelegramControlChannel
|
from .telegram import TelegramControlChannel
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,6 @@
|
|||||||
|
"""Базовый абстрактный канал управления и типы обработчиков команд.
|
||||||
|
|
||||||
|
Определяет контракт: старт/стоп канала и привязка обработчиков start/stop/status."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
@@ -12,10 +15,10 @@ StatusHandler = Callable[[], Awaitable[str]]
|
|||||||
class ControlChannel(ABC):
|
class ControlChannel(ABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||||
"""Start channel and bind command handlers."""
|
"""Запустить канал и привязать обработчики команд."""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def stop(self) -> None:
|
async def stop(self) -> None:
|
||||||
"""Stop channel and release its resources."""
|
"""Остановить канал и освободить его ресурсы."""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|||||||
@@ -1,3 +1,6 @@
|
|||||||
|
"""Реализация канала управления через Telegram Bot API (long polling).
|
||||||
|
|
||||||
|
Принимает команды /start, /stop, /status в указанном чате и вызывает привязанные обработчики."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
@@ -18,7 +21,7 @@ class TelegramControlChannel(ControlChannel):
|
|||||||
poll_interval: float = 2.0,
|
poll_interval: float = 2.0,
|
||||||
logger: Optional[logging.Logger] = None,
|
logger: Optional[logging.Logger] = None,
|
||||||
):
|
):
|
||||||
"""Initialize Telegram polling channel with bot and chat settings."""
|
"""Инициализация канала опроса Telegram с настройками бота и чата."""
|
||||||
self._token = token
|
self._token = token
|
||||||
self._chat_id = chat_id
|
self._chat_id = chat_id
|
||||||
self._poll_interval = poll_interval
|
self._poll_interval = poll_interval
|
||||||
@@ -31,7 +34,7 @@ class TelegramControlChannel(ControlChannel):
|
|||||||
self._logger = logger or logging.getLogger(__name__)
|
self._logger = logger or logging.getLogger(__name__)
|
||||||
|
|
||||||
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||||
"""Start polling Telegram updates and register command callbacks."""
|
"""Запустить опрос обновлений Telegram и зарегистрировать колбэки команд."""
|
||||||
if self._task is not None and not self._task.done():
|
if self._task is not None and not self._task.done():
|
||||||
return
|
return
|
||||||
self._on_start = on_start
|
self._on_start = on_start
|
||||||
@@ -41,7 +44,7 @@ class TelegramControlChannel(ControlChannel):
|
|||||||
self._task = asyncio.create_task(self._poll_loop())
|
self._task = asyncio.create_task(self._poll_loop())
|
||||||
|
|
||||||
async def stop(self) -> None:
|
async def stop(self) -> None:
|
||||||
"""Stop polling loop and wait until task termination."""
|
"""Остановить цикл опроса и дождаться завершения задачи."""
|
||||||
self._stop_event.set()
|
self._stop_event.set()
|
||||||
if self._task is not None:
|
if self._task is not None:
|
||||||
self._task.cancel()
|
self._task.cancel()
|
||||||
@@ -52,7 +55,7 @@ class TelegramControlChannel(ControlChannel):
|
|||||||
self._task = None
|
self._task = None
|
||||||
|
|
||||||
async def _poll_loop(self) -> None:
|
async def _poll_loop(self) -> None:
|
||||||
"""Continuously fetch updates and dispatch supported commands."""
|
"""Непрерывно получать обновления и вызывать поддерживаемые команды."""
|
||||||
while not self._stop_event.is_set():
|
while not self._stop_event.is_set():
|
||||||
try:
|
try:
|
||||||
updates = await asyncio.to_thread(self._fetch_updates)
|
updates = await asyncio.to_thread(self._fetch_updates)
|
||||||
@@ -67,7 +70,7 @@ class TelegramControlChannel(ControlChannel):
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
def _fetch_updates(self) -> list[dict]:
|
def _fetch_updates(self) -> list[dict]:
|
||||||
"""Pull new Telegram updates using the latest offset."""
|
"""Запросить новые обновления Telegram с учётом последнего offset."""
|
||||||
params = {"timeout": 0}
|
params = {"timeout": 0}
|
||||||
if self._offset is not None:
|
if self._offset is not None:
|
||||||
params["offset"] = self._offset
|
params["offset"] = self._offset
|
||||||
@@ -82,7 +85,7 @@ class TelegramControlChannel(ControlChannel):
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
async def _process_update(self, update: dict) -> None:
|
async def _process_update(self, update: dict) -> None:
|
||||||
"""Handle one Telegram update and execute mapped command."""
|
"""Обработать одно обновление Telegram и выполнить соответствующую команду."""
|
||||||
message = update.get("message") or {}
|
message = update.get("message") or {}
|
||||||
text = (message.get("text") or "").strip().lower()
|
text = (message.get("text") or "").strip().lower()
|
||||||
chat = message.get("chat") or {}
|
chat = message.get("chat") or {}
|
||||||
@@ -103,7 +106,7 @@ class TelegramControlChannel(ControlChannel):
|
|||||||
await asyncio.to_thread(self._send_message, reply)
|
await asyncio.to_thread(self._send_message, reply)
|
||||||
|
|
||||||
def _send_message(self, text: str) -> None:
|
def _send_message(self, text: str) -> None:
|
||||||
"""Send plain-text reply to the configured Telegram chat."""
|
"""Отправить текстовый ответ в настроенный чат Telegram."""
|
||||||
encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text})
|
encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text})
|
||||||
url = f"https://api.telegram.org/bot{self._token}/sendMessage"
|
url = f"https://api.telegram.org/bot{self._token}/sendMessage"
|
||||||
req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST")
|
req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST")
|
||||||
|
|||||||
8
src/config_manager/v2/core/__init__.py
Normal file
8
src/config_manager/v2/core/__init__.py
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
"""Ядро V2: жизненный цикл менеджера, загрузка конфига и цикл воркера.
|
||||||
|
|
||||||
|
Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute()."""
|
||||||
|
from .config_loader import ConfigLoader
|
||||||
|
from .manager import ConfigManagerV2
|
||||||
|
from .scheduler import WorkerLoop
|
||||||
|
|
||||||
|
__all__ = ["ConfigLoader", "ConfigManagerV2", "WorkerLoop"]
|
||||||
70
src/config_manager/v2/core/config_loader.py
Normal file
70
src/config_manager/v2/core/config_loader.py
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
"""Загрузчик конфигурации из файла (YAML/JSON) с обнаружением изменений по хешу.
|
||||||
|
|
||||||
|
Читает файл синхронно и асинхронно, парсит по расширению и возвращает последний валидный конфиг при ошибках."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigLoader:
|
||||||
|
def __init__(self, path: str):
|
||||||
|
"""Инициализация состояния загрузчика для указанного пути к файлу конфига."""
|
||||||
|
self.path = path
|
||||||
|
self.config: Any = None
|
||||||
|
self.last_valid_config: Any = None
|
||||||
|
self._last_seen_hash: Optional[str] = None
|
||||||
|
|
||||||
|
def _read_file_sync(self) -> str:
|
||||||
|
"""Синхронно прочитать сырой текст конфига с диска."""
|
||||||
|
with open(self.path, "r", encoding="utf-8") as fh:
|
||||||
|
return fh.read()
|
||||||
|
|
||||||
|
async def read_file_async(self) -> str:
|
||||||
|
"""Прочитать сырой текст конфига с диска в рабочем потоке."""
|
||||||
|
return await asyncio.to_thread(self._read_file_sync)
|
||||||
|
|
||||||
|
def parse_config(self, data: str) -> Any:
|
||||||
|
"""Распарсить текст конфига как YAML или JSON по расширению файла."""
|
||||||
|
extension = os.path.splitext(self.path)[1].lower()
|
||||||
|
if extension in (".yaml", ".yml"):
|
||||||
|
return yaml.safe_load(data)
|
||||||
|
return json.loads(data)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _calculate_hash(data: str) -> str:
|
||||||
|
"""Вычислить устойчивый хеш содержимого для обнаружения изменений."""
|
||||||
|
return hashlib.sha256(data.encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
async def load_if_changed(self) -> tuple[bool, Any]:
|
||||||
|
"""Загрузить и распарсить конфиг только при изменении содержимого файла."""
|
||||||
|
raw_data = await self.read_file_async()
|
||||||
|
current_hash = self._calculate_hash(raw_data)
|
||||||
|
if current_hash == self._last_seen_hash:
|
||||||
|
return False, self.config
|
||||||
|
|
||||||
|
self._last_seen_hash = current_hash
|
||||||
|
parsed = self.parse_config(raw_data)
|
||||||
|
self.config = parsed
|
||||||
|
self.last_valid_config = parsed
|
||||||
|
return True, parsed
|
||||||
|
|
||||||
|
|
||||||
|
def extract_scheduler_intervals(
|
||||||
|
config: Any,
|
||||||
|
default_update: float,
|
||||||
|
default_work: float,
|
||||||
|
) -> tuple[float, float]:
|
||||||
|
"""Извлекает update_interval и work_interval из конфига (dict). Возвращает значения по умолчанию при ошибке."""
|
||||||
|
if not isinstance(config, dict):
|
||||||
|
return default_update, default_work
|
||||||
|
upd = config.get("update_interval")
|
||||||
|
wrk = config.get("work_interval")
|
||||||
|
u = float(upd) if isinstance(upd, (int, float)) and upd > 0 else default_update
|
||||||
|
w = float(wrk) if isinstance(wrk, (int, float)) and wrk > 0 else default_work
|
||||||
|
return u, w
|
||||||
233
src/config_manager/v2/core/manager.py
Normal file
233
src/config_manager/v2/core/manager.py
Normal file
@@ -0,0 +1,233 @@
|
|||||||
|
"""Главный класс менеджера V2: оркестрация жизненного цикла, конфига, API и каналов управления.
|
||||||
|
|
||||||
|
Запускает воркер и периодическое обновление конфига, поднимает management-сервер и control-канал при наличии настроек."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import time
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
from ...v1.log_manager import LogManager
|
||||||
|
from ..control.base import ControlChannel
|
||||||
|
from ..management import (
|
||||||
|
ControlChannelBridge,
|
||||||
|
HealthAggregator,
|
||||||
|
ManagementApiBridge,
|
||||||
|
ManagementServer,
|
||||||
|
)
|
||||||
|
from ..types import HealthPayload, LifecycleState, ManagementServerSettings
|
||||||
|
from .config_loader import ConfigLoader, extract_scheduler_intervals
|
||||||
|
from .scheduler import WorkerLoop
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigManagerV2:
|
||||||
|
DEFAULT_UPDATE_INTERVAL = 5.0
|
||||||
|
DEFAULT_WORK_INTERVAL = 2.0
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
path: str,
|
||||||
|
log_manager: Optional[LogManager] = None,
|
||||||
|
management_settings: Optional[ManagementServerSettings] = None,
|
||||||
|
control_channel: Optional[ControlChannel] = None,
|
||||||
|
):
|
||||||
|
"""Инициализация подсистем менеджера и состояния рантайма."""
|
||||||
|
self.path = path
|
||||||
|
self.config: Any = None
|
||||||
|
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
||||||
|
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
||||||
|
|
||||||
|
self._loader = ConfigLoader(path)
|
||||||
|
self._log_manager = log_manager or LogManager()
|
||||||
|
self._control_channel = control_channel
|
||||||
|
|
||||||
|
self._halt = asyncio.Event()
|
||||||
|
self._task: Optional[asyncio.Task] = None
|
||||||
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||||
|
|
||||||
|
self._state = LifecycleState.IDLE
|
||||||
|
self._last_execute_error: Optional[str] = None
|
||||||
|
self._last_success_timestamp: Optional[float] = None
|
||||||
|
|
||||||
|
self._management_settings = management_settings or ManagementServerSettings(enabled=True)
|
||||||
|
self._health_timeout = self._management_settings.health_timeout
|
||||||
|
self._health_aggregator = HealthAggregator(
|
||||||
|
get_state=lambda: self._state,
|
||||||
|
get_last_error=lambda: self._last_execute_error,
|
||||||
|
get_last_success_timestamp=lambda: self._last_success_timestamp,
|
||||||
|
health_timeout=self._health_timeout,
|
||||||
|
get_app_health=self.get_health_status,
|
||||||
|
)
|
||||||
|
self._api_bridge = ManagementApiBridge(start_fn=self.start, stop_fn=self.stop)
|
||||||
|
self._control_bridge = ControlChannelBridge(
|
||||||
|
halt=self._halt,
|
||||||
|
get_state=lambda: self._state,
|
||||||
|
get_status=self._status_text,
|
||||||
|
)
|
||||||
|
self._management_server: Optional[ManagementServer] = None
|
||||||
|
if self._management_settings.enabled:
|
||||||
|
self._management_server = ManagementServer(
|
||||||
|
host=self._management_settings.host,
|
||||||
|
port=self._management_settings.port,
|
||||||
|
timeout=self._management_settings.timeout,
|
||||||
|
health_provider=self._health_aggregator.collect,
|
||||||
|
on_start=self._api_bridge.on_start,
|
||||||
|
on_stop=self._api_bridge.on_stop,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def _apply_config(self, new_config: Any) -> None:
|
||||||
|
"""Применить загруженный конфиг: интервалы и log_manager. Вызывается после load_if_changed."""
|
||||||
|
self.config = new_config
|
||||||
|
self.update_interval, self.work_interval = extract_scheduler_intervals(
|
||||||
|
new_config,
|
||||||
|
self.DEFAULT_UPDATE_INTERVAL,
|
||||||
|
self.DEFAULT_WORK_INTERVAL,
|
||||||
|
)
|
||||||
|
if isinstance(new_config, dict):
|
||||||
|
self._log_manager.apply_config(new_config)
|
||||||
|
|
||||||
|
async def _update_config(self) -> None:
|
||||||
|
"""Перезагрузить конфиг при изменении файла и применить к состоянию и log_manager."""
|
||||||
|
try:
|
||||||
|
changed, new_config = await self._loader.load_if_changed()
|
||||||
|
if not changed:
|
||||||
|
return
|
||||||
|
self._apply_config(new_config)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
self.logger.error("Error reading/parsing config file: %s", exc)
|
||||||
|
if self._loader.last_valid_config is not None:
|
||||||
|
self._apply_config(self._loader.last_valid_config)
|
||||||
|
|
||||||
|
def execute(self) -> None:
|
||||||
|
"""Переопределить в подклассе для реализации одной единицы блокирующей работы."""
|
||||||
|
|
||||||
|
def get_health_status(self) -> HealthPayload:
|
||||||
|
"""Вернуть payload здоровья приложения для /health.
|
||||||
|
|
||||||
|
Варианты ответа по статусу:
|
||||||
|
- ``{"status": "ok"}`` — сервис в норме; GET /health → 200.
|
||||||
|
- ``{"status": "degraded", "detail": "..."}`` — работает с ограничениями; GET /health → 503.
|
||||||
|
- ``{"status": "unhealthy", "detail": "..."}`` — неработоспособен; GET /health → 503.
|
||||||
|
|
||||||
|
Поле ``detail`` опционально; для ``ok`` обычно не задаётся.
|
||||||
|
Переопределить в подклассе для своей логики здоровья."""
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
def _on_execute_success(self) -> None:
|
||||||
|
"""Обновить время последнего успешного execute() и сбросить маркер ошибки."""
|
||||||
|
self._last_success_timestamp = time.monotonic()
|
||||||
|
self._last_execute_error = None
|
||||||
|
|
||||||
|
def _on_execute_error(self, exc: Exception) -> None:
|
||||||
|
"""Сохранить и залогировать детали ошибки выполнения для отчёта здоровья."""
|
||||||
|
self._last_execute_error = str(exc)
|
||||||
|
self.logger.error("Execution error: %s", exc)
|
||||||
|
|
||||||
|
async def _worker_loop(self) -> None:
|
||||||
|
"""Вызывать execute() циклически до запроса остановки."""
|
||||||
|
worker = WorkerLoop(
|
||||||
|
execute=self.execute,
|
||||||
|
get_interval=lambda: self.work_interval,
|
||||||
|
halt_event=self._halt,
|
||||||
|
on_error=self._on_execute_error,
|
||||||
|
on_success=self._on_execute_success,
|
||||||
|
)
|
||||||
|
await worker.run()
|
||||||
|
|
||||||
|
async def _periodic_update_loop(self) -> None:
|
||||||
|
"""Периодически проверять файл конфига на обновления до остановки."""
|
||||||
|
while not self._halt.is_set():
|
||||||
|
await self._update_config()
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
async def _status_text(self) -> str:
|
||||||
|
"""Сформировать читаемый статус рантайма для каналов управления."""
|
||||||
|
health = await self._health_aggregator.collect()
|
||||||
|
detail = health.get("detail")
|
||||||
|
if detail:
|
||||||
|
return f"state={self._state.value}; health={health['status']}; detail={detail}"
|
||||||
|
return f"state={self._state.value}; health={health['status']}"
|
||||||
|
|
||||||
|
async def _start_control_channel(self) -> None:
|
||||||
|
"""Запустить настроенный канал управления с привязанными обработчиками команд."""
|
||||||
|
if self._control_channel is None:
|
||||||
|
return
|
||||||
|
await self._control_channel.start(
|
||||||
|
self._control_bridge.on_start,
|
||||||
|
self._control_bridge.on_stop,
|
||||||
|
self._control_bridge.on_status,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _stop_control_channel(self) -> None:
|
||||||
|
"""Остановить настроенный канал управления, если он активен."""
|
||||||
|
if self._control_channel is None:
|
||||||
|
return
|
||||||
|
await self._control_channel.stop()
|
||||||
|
|
||||||
|
async def _run(self) -> None:
|
||||||
|
"""Запустить жизненный цикл менеджера и координировать фоновые задачи."""
|
||||||
|
self._state = LifecycleState.STARTING
|
||||||
|
self._halt.clear()
|
||||||
|
await self._update_config()
|
||||||
|
|
||||||
|
if self._management_server is not None:
|
||||||
|
await self._management_server.start()
|
||||||
|
await self._start_control_channel()
|
||||||
|
|
||||||
|
self._state = LifecycleState.RUNNING
|
||||||
|
self.logger.info("ConfigManagerV2 started")
|
||||||
|
|
||||||
|
tasks = [
|
||||||
|
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
|
||||||
|
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
self._state = LifecycleState.STOPPING
|
||||||
|
self._halt.set()
|
||||||
|
for task in tasks:
|
||||||
|
task.cancel()
|
||||||
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
# Management-сервер и control channel не останавливаем: API и канал управления остаются доступными.
|
||||||
|
self._state = LifecycleState.STOPPED
|
||||||
|
self._task = None
|
||||||
|
self.logger.info("ConfigManagerV2 stopped (API and control channel remain available)")
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""Запустить циклы execute и конфига в фоне; возвращает управление сразу (ответ на /actions/start приходит без ожидания)."""
|
||||||
|
if self._task is not None and not self._task.done():
|
||||||
|
self.logger.warning("ConfigManagerV2 is already running")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._loop = asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
self.logger.error("start() must be called from within an async context")
|
||||||
|
raise
|
||||||
|
|
||||||
|
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Запросить плавную остановку и дождаться завершения менеджера."""
|
||||||
|
if self._task is None:
|
||||||
|
self.logger.warning("ConfigManagerV2 is not running")
|
||||||
|
return
|
||||||
|
|
||||||
|
self._halt.set()
|
||||||
|
if asyncio.current_task() is self._task:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
@@ -1,3 +1,6 @@
|
|||||||
|
"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке с паузой между итерациями.
|
||||||
|
|
||||||
|
Поддерживает halt-событие для остановки, колбэки on_success/on_error для учёта ошибок и здоровья."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
@@ -14,7 +17,7 @@ class WorkerLoop:
|
|||||||
on_error: Optional[Callable[[Exception], None]] = None,
|
on_error: Optional[Callable[[Exception], None]] = None,
|
||||||
on_success: Optional[Callable[[], None]] = None,
|
on_success: Optional[Callable[[], None]] = None,
|
||||||
):
|
):
|
||||||
"""Store callbacks and synchronization primitives for worker execution."""
|
"""Сохранить колбэки и примитивы синхронизации для выполнения воркера."""
|
||||||
self._execute = execute
|
self._execute = execute
|
||||||
self._get_interval = get_interval
|
self._get_interval = get_interval
|
||||||
self._halt_event = halt_event
|
self._halt_event = halt_event
|
||||||
@@ -22,7 +25,7 @@ class WorkerLoop:
|
|||||||
self._on_success = on_success
|
self._on_success = on_success
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
"""Run execute repeatedly until halt is requested."""
|
"""Вызывать execute циклически до запроса остановки."""
|
||||||
while not self._halt_event.is_set():
|
while not self._halt_event.is_set():
|
||||||
try:
|
try:
|
||||||
await asyncio.to_thread(self._execute)
|
await asyncio.to_thread(self._execute)
|
||||||
@@ -1,80 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import json
|
|
||||||
from collections.abc import Awaitable, Callable
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from .types import HealthPayload
|
|
||||||
|
|
||||||
|
|
||||||
class HealthServer:
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
host: str,
|
|
||||||
port: int,
|
|
||||||
path: str,
|
|
||||||
timeout: float,
|
|
||||||
health_provider: Callable[[], Awaitable[HealthPayload]],
|
|
||||||
):
|
|
||||||
"""Configure lightweight HTTP health server parameters and callback."""
|
|
||||||
self._host = host
|
|
||||||
self._port = port
|
|
||||||
self._path = path
|
|
||||||
self._timeout = timeout
|
|
||||||
self._health_provider = health_provider
|
|
||||||
self._server: Optional[asyncio.base_events.Server] = None
|
|
||||||
|
|
||||||
async def start(self) -> None:
|
|
||||||
"""Start listening for healthcheck requests if not running."""
|
|
||||||
if self._server is not None:
|
|
||||||
return
|
|
||||||
self._server = await asyncio.start_server(self._handle_connection, self._host, self._port)
|
|
||||||
|
|
||||||
async def stop(self) -> None:
|
|
||||||
"""Stop the health server and release the listening socket."""
|
|
||||||
if self._server is None:
|
|
||||||
return
|
|
||||||
self._server.close()
|
|
||||||
await self._server.wait_closed()
|
|
||||||
self._server = None
|
|
||||||
|
|
||||||
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
|
||||||
"""Process one HTTP connection and return JSON health payload."""
|
|
||||||
status_code = 404
|
|
||||||
payload: HealthPayload = {"status": "unhealthy", "detail": "not found"}
|
|
||||||
|
|
||||||
try:
|
|
||||||
request_line = await reader.readline()
|
|
||||||
parts = request_line.decode("utf-8", errors="ignore").strip().split(" ")
|
|
||||||
if len(parts) >= 2:
|
|
||||||
method, path = parts[0], parts[1]
|
|
||||||
if method == "GET" and path == self._path:
|
|
||||||
status_code, payload = await self._build_health_response()
|
|
||||||
except Exception: # noqa: BLE001
|
|
||||||
status_code = 500
|
|
||||||
payload = {"status": "unhealthy", "detail": "internal error"}
|
|
||||||
|
|
||||||
body = json.dumps(payload).encode("utf-8")
|
|
||||||
phrase = "OK" if status_code == 200 else "SERVICE UNAVAILABLE" if status_code == 503 else "NOT FOUND"
|
|
||||||
response = (
|
|
||||||
f"HTTP/1.1 {status_code} {phrase}\r\n"
|
|
||||||
"Content-Type: application/json\r\n"
|
|
||||||
f"Content-Length: {len(body)}\r\n"
|
|
||||||
"Connection: close\r\n"
|
|
||||||
"\r\n"
|
|
||||||
).encode("utf-8") + body
|
|
||||||
|
|
||||||
writer.write(response)
|
|
||||||
await writer.drain()
|
|
||||||
writer.close()
|
|
||||||
await writer.wait_closed()
|
|
||||||
|
|
||||||
async def _build_health_response(self) -> tuple[int, HealthPayload]:
|
|
||||||
"""Build HTTP status code and body from application health callback."""
|
|
||||||
try:
|
|
||||||
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
|
|
||||||
status = payload.get("status", "unhealthy")
|
|
||||||
return (200, payload) if status == "ok" else (503, payload)
|
|
||||||
except Exception as exc: # noqa: BLE001
|
|
||||||
return 503, {"status": "unhealthy", "detail": str(exc)}
|
|
||||||
14
src/config_manager/v2/management/__init__.py
Normal file
14
src/config_manager/v2/management/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
"""Management API: HTTP-сервер для /health и /actions/start|stop, адаптеры и сбор здоровья.
|
||||||
|
|
||||||
|
Объединяет сервер, мосты к жизненному циклу и агрегатор здоровья для единого контракта API."""
|
||||||
|
from .bridges import ControlChannelBridge, ManagementApiBridge
|
||||||
|
from .health_aggregator import HealthAggregator
|
||||||
|
from .management_server import HealthServer, ManagementServer
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"ControlChannelBridge",
|
||||||
|
"HealthAggregator",
|
||||||
|
"HealthServer",
|
||||||
|
"ManagementApiBridge",
|
||||||
|
"ManagementServer",
|
||||||
|
]
|
||||||
61
src/config_manager/v2/management/bridges.py
Normal file
61
src/config_manager/v2/management/bridges.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
"""Адаптеры, связывающие жизненный цикл менеджера с HTTP API и каналами управления.
|
||||||
|
|
||||||
|
ManagementApiBridge отдаёт start/stop в HTTP; ControlChannelBridge — start/stop/status в Telegram и др."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from collections.abc import Awaitable, Callable
|
||||||
|
|
||||||
|
from ..types import LifecycleState
|
||||||
|
|
||||||
|
|
||||||
|
class ManagementApiBridge:
|
||||||
|
"""Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop)."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
start_fn: Callable[[], Awaitable[None]],
|
||||||
|
stop_fn: Callable[[], Awaitable[None]],
|
||||||
|
):
|
||||||
|
self._start_fn = start_fn
|
||||||
|
self._stop_fn = stop_fn
|
||||||
|
|
||||||
|
async def on_start(self) -> str:
|
||||||
|
"""Выполнить start и вернуть сообщение для HTTP-ответа."""
|
||||||
|
await self._start_fn()
|
||||||
|
return "start completed"
|
||||||
|
|
||||||
|
async def on_stop(self) -> str:
|
||||||
|
"""Выполнить stop и вернуть сообщение для HTTP-ответа."""
|
||||||
|
await self._stop_fn()
|
||||||
|
return "stop completed"
|
||||||
|
|
||||||
|
|
||||||
|
class ControlChannelBridge:
|
||||||
|
"""Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram)."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
halt: asyncio.Event,
|
||||||
|
get_state: Callable[[], LifecycleState],
|
||||||
|
get_status: Callable[[], Awaitable[str]],
|
||||||
|
):
|
||||||
|
self._halt = halt
|
||||||
|
self._get_state = get_state
|
||||||
|
self._get_status = get_status
|
||||||
|
|
||||||
|
async def on_start(self) -> str:
|
||||||
|
"""Обработать внешний start: сбросить halt; идемпотентно при уже running."""
|
||||||
|
if self._get_state() == LifecycleState.RUNNING:
|
||||||
|
return "already running"
|
||||||
|
self._halt.clear()
|
||||||
|
return "start signal accepted"
|
||||||
|
|
||||||
|
async def on_stop(self) -> str:
|
||||||
|
"""Обработать внешний stop: установить halt."""
|
||||||
|
self._halt.set()
|
||||||
|
return "stop signal accepted"
|
||||||
|
|
||||||
|
async def on_status(self) -> str:
|
||||||
|
"""Вернуть текущий текст статуса."""
|
||||||
|
return await self._get_status()
|
||||||
53
src/config_manager/v2/management/health_aggregator.py
Normal file
53
src/config_manager/v2/management/health_aggregator.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
"""Собирает состояние жизненного цикла и здоровья в один ответ для /health.
|
||||||
|
|
||||||
|
Здоровье = был успешный execute() за последние health_timeout секунд; иначе unhealthy с деталью (ошибка или таймаут)."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
from collections.abc import Callable
|
||||||
|
|
||||||
|
from ..types import HealthPayload, LifecycleState
|
||||||
|
|
||||||
|
|
||||||
|
class HealthAggregator:
|
||||||
|
"""Формирует ответ здоровья по времени последнего успешного execute() и таймауту."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
get_state: Callable[[], LifecycleState],
|
||||||
|
get_last_error: Callable[[], str | None],
|
||||||
|
get_last_success_timestamp: Callable[[], float | None],
|
||||||
|
health_timeout: float,
|
||||||
|
get_app_health: Callable[[], HealthPayload],
|
||||||
|
):
|
||||||
|
self._get_state = get_state
|
||||||
|
self._get_last_error = get_last_error
|
||||||
|
self._get_last_success_timestamp = get_last_success_timestamp
|
||||||
|
self._health_timeout = health_timeout
|
||||||
|
self._get_app_health = get_app_health
|
||||||
|
|
||||||
|
async def collect(self) -> HealthPayload:
|
||||||
|
"""Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state."""
|
||||||
|
state = self._get_state()
|
||||||
|
state_value = state.value
|
||||||
|
|
||||||
|
# Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy.
|
||||||
|
if state != LifecycleState.RUNNING:
|
||||||
|
return {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value}
|
||||||
|
|
||||||
|
last_success = self._get_last_success_timestamp()
|
||||||
|
now = time.monotonic()
|
||||||
|
|
||||||
|
if last_success is None:
|
||||||
|
detail = self._get_last_error() or "no successful run yet"
|
||||||
|
return {"status": "unhealthy", "detail": detail, "state": state_value}
|
||||||
|
|
||||||
|
if (now - last_success) > self._health_timeout:
|
||||||
|
detail = self._get_last_error() or f"no successful run within {self._health_timeout}s"
|
||||||
|
return {"status": "unhealthy", "detail": detail, "state": state_value}
|
||||||
|
|
||||||
|
result = self._get_app_health()
|
||||||
|
status = result.get("status", "unhealthy")
|
||||||
|
if status != "ok":
|
||||||
|
return {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value}
|
||||||
|
return {**result, "state": state_value}
|
||||||
145
src/config_manager/v2/management/management_server.py
Normal file
145
src/config_manager/v2/management/management_server.py
Normal file
@@ -0,0 +1,145 @@
|
|||||||
|
"""Management HTTP API на FastAPI: эндпоинты /health, /actions/start, /actions/stop.
|
||||||
|
|
||||||
|
Единообразное описание маршрутов через декораторы FastAPI."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
from collections.abc import Awaitable, Callable
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from fastapi.responses import JSONResponse
|
||||||
|
from uvicorn import Config, Server
|
||||||
|
|
||||||
|
from ..types import HealthPayload
|
||||||
|
|
||||||
|
# Захардкоженные эндпоинты management API.
|
||||||
|
PATH_HEALTH = "/health"
|
||||||
|
PATH_ACTION_START = "/actions/start"
|
||||||
|
PATH_ACTION_STOP = "/actions/stop"
|
||||||
|
|
||||||
|
|
||||||
|
class ManagementServer:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
host: str,
|
||||||
|
port: int,
|
||||||
|
timeout: float,
|
||||||
|
health_provider: Callable[[], Awaitable[HealthPayload]],
|
||||||
|
on_start: Optional[Callable[[], Awaitable[str]]] = None,
|
||||||
|
on_stop: Optional[Callable[[], Awaitable[str]]] = None,
|
||||||
|
):
|
||||||
|
"""Настройка параметров и колбэков лёгкого HTTP management-сервера."""
|
||||||
|
self._host = host
|
||||||
|
self._port = port
|
||||||
|
self._timeout = timeout
|
||||||
|
self._health_provider = health_provider
|
||||||
|
self._on_start = on_start
|
||||||
|
self._on_stop = on_stop
|
||||||
|
self._uvicorn_server: Optional[Server] = None
|
||||||
|
self._serve_task: Optional[asyncio.Task[None]] = None
|
||||||
|
self._bound_port: Optional[int] = None
|
||||||
|
self._app = self._create_app()
|
||||||
|
|
||||||
|
def _create_app(self) -> FastAPI:
|
||||||
|
app = FastAPI(title="Config Manager Management API")
|
||||||
|
|
||||||
|
@app.get(PATH_HEALTH)
|
||||||
|
async def health() -> JSONResponse:
|
||||||
|
return await self._health_response()
|
||||||
|
|
||||||
|
@app.get(PATH_ACTION_START)
|
||||||
|
@app.post(PATH_ACTION_START)
|
||||||
|
async def action_start() -> JSONResponse:
|
||||||
|
return await self._action_response("start", self._on_start)
|
||||||
|
|
||||||
|
@app.get(PATH_ACTION_STOP)
|
||||||
|
@app.post(PATH_ACTION_STOP)
|
||||||
|
async def action_stop() -> JSONResponse:
|
||||||
|
return await self._action_response("stop", self._on_stop)
|
||||||
|
|
||||||
|
return app
|
||||||
|
|
||||||
|
async def _health_response(self) -> JSONResponse:
|
||||||
|
"""Сформировать HTTP-ответ из колбэка здоровья приложения."""
|
||||||
|
try:
|
||||||
|
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
|
||||||
|
status = payload.get("status", "unhealthy")
|
||||||
|
status_code = 200 if status == "ok" else 503
|
||||||
|
return JSONResponse(content=payload, status_code=status_code)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
return JSONResponse(
|
||||||
|
content={"status": "unhealthy", "detail": str(exc)},
|
||||||
|
status_code=503,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _action_response(
|
||||||
|
self,
|
||||||
|
action: str,
|
||||||
|
callback: Optional[Callable[[], Awaitable[str]]],
|
||||||
|
) -> JSONResponse:
|
||||||
|
"""Сформировать HTTP-ответ для колбэка действия start/stop."""
|
||||||
|
if callback is None:
|
||||||
|
return JSONResponse(
|
||||||
|
content={"status": "error", "detail": f"{action} handler is not configured"},
|
||||||
|
status_code=404,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
detail = await callback()
|
||||||
|
if not detail:
|
||||||
|
detail = f"{action} action accepted"
|
||||||
|
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
return JSONResponse(
|
||||||
|
content={"status": "error", "detail": str(exc)},
|
||||||
|
status_code=500,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]:
|
||||||
|
"""Для тестов: вернуть (status_code, payload) как раньше."""
|
||||||
|
async def _run() -> tuple[int, HealthPayload]:
|
||||||
|
response = await self._health_response()
|
||||||
|
body: Any = response.body
|
||||||
|
if isinstance(body, bytes):
|
||||||
|
body = json.loads(body.decode("utf-8"))
|
||||||
|
return response.status_code, body
|
||||||
|
return _run()
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""Начать приём запросов к API здоровья и действий, если ещё не запущен."""
|
||||||
|
if self._serve_task is not None:
|
||||||
|
return
|
||||||
|
config = Config(
|
||||||
|
app=self._app,
|
||||||
|
host=self._host,
|
||||||
|
port=self._port,
|
||||||
|
log_level="warning",
|
||||||
|
)
|
||||||
|
self._uvicorn_server = Server(config)
|
||||||
|
self._serve_task = asyncio.create_task(self._uvicorn_server.serve())
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
if self._uvicorn_server.servers:
|
||||||
|
sock = self._uvicorn_server.servers[0].sockets[0]
|
||||||
|
self._bound_port = sock.getsockname()[1]
|
||||||
|
else:
|
||||||
|
self._bound_port = self._port
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Остановить management-сервер и освободить сокет."""
|
||||||
|
if self._uvicorn_server is None or self._serve_task is None:
|
||||||
|
return
|
||||||
|
self._uvicorn_server.should_exit = True
|
||||||
|
await self._serve_task
|
||||||
|
self._uvicorn_server = None
|
||||||
|
self._serve_task = None
|
||||||
|
self._bound_port = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def port(self) -> int:
|
||||||
|
"""Порт, на котором слушает сервер (после start); при port=0 — фактически выданный ОС."""
|
||||||
|
return self._bound_port if self._bound_port is not None else self._port
|
||||||
|
|
||||||
|
|
||||||
|
# Backward-compatible alias.
|
||||||
|
HealthServer = ManagementServer
|
||||||
@@ -1,256 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
from collections.abc import Awaitable
|
|
||||||
from typing import Any, Optional
|
|
||||||
|
|
||||||
from ..v1.log_manager import LogManager
|
|
||||||
from .config_loader import ConfigLoader
|
|
||||||
from .control.base import ControlChannel
|
|
||||||
from .health import HealthServer
|
|
||||||
from .scheduler import WorkerLoop
|
|
||||||
from .types import HealthPayload, HealthServerSettings, LifecycleState
|
|
||||||
|
|
||||||
|
|
||||||
class ConfigManagerV2:
|
|
||||||
DEFAULT_UPDATE_INTERVAL = 5.0
|
|
||||||
DEFAULT_WORK_INTERVAL = 2.0
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
path: str,
|
|
||||||
log_manager: Optional[LogManager] = None,
|
|
||||||
health_settings: Optional[HealthServerSettings] = None,
|
|
||||||
control_channel: Optional[ControlChannel] = None,
|
|
||||||
):
|
|
||||||
"""Initialize manager subsystems and runtime state."""
|
|
||||||
self.path = path
|
|
||||||
self.config: Any = None
|
|
||||||
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
|
||||||
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
|
||||||
|
|
||||||
self._loader = ConfigLoader(path)
|
|
||||||
self._log_manager = log_manager or LogManager()
|
|
||||||
self._control_channel = control_channel
|
|
||||||
|
|
||||||
self._halt = asyncio.Event()
|
|
||||||
self._task: Optional[asyncio.Task] = None
|
|
||||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
||||||
|
|
||||||
self._state = LifecycleState.IDLE
|
|
||||||
self._last_execute_error: Optional[str] = None
|
|
||||||
|
|
||||||
self._health_settings = health_settings or HealthServerSettings(enabled=False)
|
|
||||||
self._health_server: Optional[HealthServer] = None
|
|
||||||
if self._health_settings.enabled:
|
|
||||||
self._health_server = HealthServer(
|
|
||||||
host=self._health_settings.host,
|
|
||||||
port=self._health_settings.port,
|
|
||||||
path=self._health_settings.path,
|
|
||||||
timeout=self._health_settings.timeout,
|
|
||||||
health_provider=self._collect_health,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
def _read_file_sync(self) -> str:
|
|
||||||
"""Read config file synchronously via the shared loader."""
|
|
||||||
return self._loader._read_file_sync()
|
|
||||||
|
|
||||||
async def _read_file_async(self) -> str:
|
|
||||||
"""Read config file asynchronously via a worker thread."""
|
|
||||||
return await self._loader.read_file_async()
|
|
||||||
|
|
||||||
def _parse_config(self, data: str) -> Any:
|
|
||||||
"""Parse raw config text to a Python object."""
|
|
||||||
return self._loader.parse_config(data)
|
|
||||||
|
|
||||||
def _update_intervals_from_config(self) -> None:
|
|
||||||
"""Refresh work and update intervals from current config."""
|
|
||||||
if not isinstance(self.config, dict):
|
|
||||||
return
|
|
||||||
|
|
||||||
upd = self.config.get("update_interval")
|
|
||||||
wrk = self.config.get("work_interval")
|
|
||||||
|
|
||||||
if isinstance(upd, (int, float)) and upd > 0:
|
|
||||||
self.update_interval = float(upd)
|
|
||||||
else:
|
|
||||||
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
|
||||||
|
|
||||||
if isinstance(wrk, (int, float)) and wrk > 0:
|
|
||||||
self.work_interval = float(wrk)
|
|
||||||
else:
|
|
||||||
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
|
||||||
|
|
||||||
async def _update_config(self) -> None:
|
|
||||||
"""Reload config and apply new settings when content changes."""
|
|
||||||
try:
|
|
||||||
changed, new_config = await self._loader.load_if_changed()
|
|
||||||
if not changed:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.config = new_config
|
|
||||||
self._update_intervals_from_config()
|
|
||||||
if isinstance(new_config, dict):
|
|
||||||
self._log_manager.apply_config(new_config)
|
|
||||||
except Exception as exc: # noqa: BLE001
|
|
||||||
# Keep current config untouched and continue with last valid settings.
|
|
||||||
self.logger.error("Error reading/parsing config file: %s", exc)
|
|
||||||
if self._loader.last_valid_config is not None:
|
|
||||||
self.config = self._loader.last_valid_config
|
|
||||||
self._update_intervals_from_config()
|
|
||||||
|
|
||||||
def execute(self) -> None:
|
|
||||||
"""Override in subclass to implement one unit of blocking work."""
|
|
||||||
|
|
||||||
def get_health_status(self) -> HealthPayload:
|
|
||||||
"""Return application-specific health payload for /health."""
|
|
||||||
return {"status": "ok"}
|
|
||||||
|
|
||||||
async def _collect_health(self) -> HealthPayload:
|
|
||||||
"""Aggregate lifecycle and app status into one health result."""
|
|
||||||
if self._state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}:
|
|
||||||
return {"status": "unhealthy", "detail": f"state={self._state.value}"}
|
|
||||||
|
|
||||||
if self._last_execute_error is not None:
|
|
||||||
return {"status": "unhealthy", "detail": self._last_execute_error}
|
|
||||||
|
|
||||||
result = self.get_health_status()
|
|
||||||
status = result.get("status", "unhealthy")
|
|
||||||
if status not in {"ok", "degraded", "unhealthy"}:
|
|
||||||
return {"status": "unhealthy", "detail": "invalid health status"}
|
|
||||||
return result
|
|
||||||
|
|
||||||
def _on_execute_success(self) -> None:
|
|
||||||
"""Clear the last execution error marker after successful run."""
|
|
||||||
self._last_execute_error = None
|
|
||||||
|
|
||||||
def _on_execute_error(self, exc: Exception) -> None:
|
|
||||||
"""Store and log execution failure details for health reporting."""
|
|
||||||
self._last_execute_error = str(exc)
|
|
||||||
self.logger.error("Execution error: %s", exc)
|
|
||||||
|
|
||||||
async def _worker_loop(self) -> None:
|
|
||||||
"""Run execute() repeatedly until shutdown is requested."""
|
|
||||||
worker = WorkerLoop(
|
|
||||||
execute=self.execute,
|
|
||||||
get_interval=lambda: self.work_interval,
|
|
||||||
halt_event=self._halt,
|
|
||||||
on_error=self._on_execute_error,
|
|
||||||
on_success=self._on_execute_success,
|
|
||||||
)
|
|
||||||
await worker.run()
|
|
||||||
|
|
||||||
async def _periodic_update_loop(self) -> None:
|
|
||||||
"""Periodically check config file for updates until shutdown."""
|
|
||||||
while not self._halt.is_set():
|
|
||||||
await self._update_config()
|
|
||||||
try:
|
|
||||||
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
async def _status_text(self) -> str:
|
|
||||||
"""Build human-readable runtime status for control channels."""
|
|
||||||
health = await self._collect_health()
|
|
||||||
detail = health.get("detail")
|
|
||||||
if detail:
|
|
||||||
return f"state={self._state.value}; health={health['status']}; detail={detail}"
|
|
||||||
return f"state={self._state.value}; health={health['status']}"
|
|
||||||
|
|
||||||
async def _control_start(self) -> str:
|
|
||||||
"""Handle external start command for control channels."""
|
|
||||||
if self._state == LifecycleState.RUNNING:
|
|
||||||
return "already running"
|
|
||||||
self._halt.clear()
|
|
||||||
return "start signal accepted"
|
|
||||||
|
|
||||||
async def _control_stop(self) -> str:
|
|
||||||
"""Handle external stop command for control channels."""
|
|
||||||
self._halt.set()
|
|
||||||
return "stop signal accepted"
|
|
||||||
|
|
||||||
async def _control_status(self) -> str:
|
|
||||||
"""Handle external status command for control channels."""
|
|
||||||
return await self._status_text()
|
|
||||||
|
|
||||||
async def _start_control_channel(self) -> None:
|
|
||||||
"""Start configured control channel with bound command handlers."""
|
|
||||||
if self._control_channel is None:
|
|
||||||
return
|
|
||||||
await self._control_channel.start(self._control_start, self._control_stop, self._control_status)
|
|
||||||
|
|
||||||
async def _stop_control_channel(self) -> None:
|
|
||||||
"""Stop configured control channel if it is active."""
|
|
||||||
if self._control_channel is None:
|
|
||||||
return
|
|
||||||
await self._control_channel.stop()
|
|
||||||
|
|
||||||
async def _run(self) -> None:
|
|
||||||
"""Run manager lifecycle and coordinate all background tasks."""
|
|
||||||
self._state = LifecycleState.STARTING
|
|
||||||
self._halt.clear()
|
|
||||||
await self._update_config()
|
|
||||||
|
|
||||||
if self._health_server is not None:
|
|
||||||
await self._health_server.start()
|
|
||||||
await self._start_control_channel()
|
|
||||||
|
|
||||||
self._state = LifecycleState.RUNNING
|
|
||||||
self.logger.info("ConfigManagerV2 started")
|
|
||||||
|
|
||||||
tasks = [
|
|
||||||
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
|
|
||||||
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
|
|
||||||
]
|
|
||||||
|
|
||||||
try:
|
|
||||||
await asyncio.gather(*tasks)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
finally:
|
|
||||||
self._state = LifecycleState.STOPPING
|
|
||||||
self._halt.set()
|
|
||||||
for task in tasks:
|
|
||||||
task.cancel()
|
|
||||||
await asyncio.gather(*tasks, return_exceptions=True)
|
|
||||||
await self._stop_control_channel()
|
|
||||||
if self._health_server is not None:
|
|
||||||
await self._health_server.stop()
|
|
||||||
self._state = LifecycleState.STOPPED
|
|
||||||
self.logger.info("ConfigManagerV2 stopped")
|
|
||||||
|
|
||||||
async def start(self) -> None:
|
|
||||||
"""Start manager lifecycle from an active asyncio context."""
|
|
||||||
if self._task is not None and not self._task.done():
|
|
||||||
self.logger.warning("ConfigManagerV2 is already running")
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
self._loop = asyncio.get_running_loop()
|
|
||||||
except RuntimeError:
|
|
||||||
self.logger.error("start() must be called from within an async context")
|
|
||||||
raise
|
|
||||||
|
|
||||||
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
|
|
||||||
try:
|
|
||||||
await self._task
|
|
||||||
finally:
|
|
||||||
self._task = None
|
|
||||||
|
|
||||||
async def stop(self) -> None:
|
|
||||||
"""Request graceful shutdown and wait for manager completion."""
|
|
||||||
if self._task is None:
|
|
||||||
self.logger.warning("ConfigManagerV2 is not running")
|
|
||||||
return
|
|
||||||
|
|
||||||
self._halt.set()
|
|
||||||
if asyncio.current_task() is self._task:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
await self._task
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
@@ -1,3 +1,6 @@
|
|||||||
|
"""Общие типы V2: состояние здоровья, жизненного цикла и настройки management-сервера.
|
||||||
|
|
||||||
|
Используются в core, management и control для единообразных контрактов."""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
@@ -11,6 +14,8 @@ HealthState = Literal["ok", "degraded", "unhealthy"]
|
|||||||
class HealthPayload(TypedDict, total=False):
|
class HealthPayload(TypedDict, total=False):
|
||||||
status: HealthState
|
status: HealthState
|
||||||
detail: str
|
detail: str
|
||||||
|
state: str
|
||||||
|
"""Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped)."""
|
||||||
|
|
||||||
|
|
||||||
class LifecycleState(str, Enum):
|
class LifecycleState(str, Enum):
|
||||||
@@ -22,9 +27,16 @@ class LifecycleState(str, Enum):
|
|||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class HealthServerSettings:
|
class ManagementServerSettings:
|
||||||
|
"""Настройки management HTTP-сервера и healthcheck (один объект на оба)."""
|
||||||
enabled: bool = False
|
enabled: bool = False
|
||||||
host: str = "0.0.0.0"
|
host: str = "0.0.0.0"
|
||||||
port: int = 8000
|
port: int = 8000
|
||||||
path: str = "/health"
|
|
||||||
timeout: float = 3.0
|
timeout: float = 3.0
|
||||||
|
"""Таймаут запроса health (секунды)."""
|
||||||
|
health_timeout: float = 30.0
|
||||||
|
"""Секунды без успешного execute(), после которых health = unhealthy."""
|
||||||
|
|
||||||
|
|
||||||
|
# Backward-compatible alias.
|
||||||
|
HealthServerSettings = ManagementServerSettings
|
||||||
|
|||||||
@@ -1,34 +1,54 @@
|
|||||||
#import os
|
# import os
|
||||||
#os.chdir(os.path.dirname(__file__))
|
# os.chdir(os.path.dirname(__file__))
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from config_manager import ConfigManager
|
from config_manager import ConfigManager
|
||||||
import logging
|
from config_manager.v1.log_manager import LogManager
|
||||||
import asyncio
|
from config_manager.v2 import ManagementServerSettings
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger()
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
# Таймаут health: без успешного execute() дольше этого времени — unhealthy.
|
||||||
|
HEALTH_TIMEOUT = 3.0
|
||||||
|
|
||||||
|
|
||||||
class MyApp(ConfigManager):
|
class MyApp(ConfigManager):
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self.iter = 0
|
self.iter = 0
|
||||||
|
|
||||||
def execute(self) -> None:
|
def execute(self) -> None:
|
||||||
logger.info(f"current iteration {self.iter}")
|
"""Успешный прогон сбрасывает таймер health (обновляет время последнего успеха)."""
|
||||||
|
logger.info("current iteration %s", self.iter)
|
||||||
self.iter += 1
|
self.iter += 1
|
||||||
|
|
||||||
async def main():
|
|
||||||
app = MyApp("config.yaml")
|
|
||||||
logger.info("App started")
|
|
||||||
await app.start()
|
|
||||||
|
|
||||||
logger.info("App finished")
|
async def main() -> None:
|
||||||
|
log_manager = LogManager()
|
||||||
|
# Один объект: и HTTP management-сервер (enabled, port), и health (health_timeout).
|
||||||
|
management_settings = ManagementServerSettings(
|
||||||
|
enabled=True,
|
||||||
|
port=8000,
|
||||||
|
health_timeout=HEALTH_TIMEOUT,
|
||||||
|
)
|
||||||
|
config_path = Path(__file__).parent / "config.yaml"
|
||||||
|
app = MyApp(
|
||||||
|
str(config_path),
|
||||||
|
log_manager=log_manager,
|
||||||
|
management_settings=management_settings,
|
||||||
|
)
|
||||||
|
logger.info("App starting (health_timeout=%s)", HEALTH_TIMEOUT)
|
||||||
|
# Менеджер запускаем в фоне (start() не возвращает управление до stop).
|
||||||
|
asyncio.create_task(app.start())
|
||||||
|
|
||||||
|
logger.info("App running; Ctrl+C to stop")
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
asyncio.run(main())
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
asyncio.run(main())
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import json
|
||||||
|
|
||||||
from config_manager.v2.health import HealthServer
|
from config_manager.v2.management import ManagementServer
|
||||||
|
|
||||||
|
|
||||||
def test_health_mapping_ok_to_200():
|
def test_health_mapping_ok_to_200():
|
||||||
@@ -8,10 +9,9 @@ def test_health_mapping_ok_to_200():
|
|||||||
return {"status": "ok"}
|
return {"status": "ok"}
|
||||||
|
|
||||||
async def scenario() -> None:
|
async def scenario() -> None:
|
||||||
server = HealthServer(
|
server = ManagementServer(
|
||||||
host="127.0.0.1",
|
host="127.0.0.1",
|
||||||
port=8000,
|
port=8000,
|
||||||
path="/health",
|
|
||||||
timeout=0.2,
|
timeout=0.2,
|
||||||
health_provider=provider,
|
health_provider=provider,
|
||||||
)
|
)
|
||||||
@@ -27,10 +27,9 @@ def test_health_mapping_unhealthy_to_503():
|
|||||||
return {"status": "unhealthy", "detail": "worker failed"}
|
return {"status": "unhealthy", "detail": "worker failed"}
|
||||||
|
|
||||||
async def scenario() -> None:
|
async def scenario() -> None:
|
||||||
server = HealthServer(
|
server = ManagementServer(
|
||||||
host="127.0.0.1",
|
host="127.0.0.1",
|
||||||
port=8000,
|
port=8000,
|
||||||
path="/health",
|
|
||||||
timeout=0.2,
|
timeout=0.2,
|
||||||
health_provider=provider,
|
health_provider=provider,
|
||||||
)
|
)
|
||||||
@@ -39,3 +38,63 @@ def test_health_mapping_unhealthy_to_503():
|
|||||||
assert payload["status"] == "unhealthy"
|
assert payload["status"] == "unhealthy"
|
||||||
|
|
||||||
asyncio.run(scenario())
|
asyncio.run(scenario())
|
||||||
|
|
||||||
|
|
||||||
|
def test_action_routes_call_callbacks():
|
||||||
|
events: list[str] = []
|
||||||
|
|
||||||
|
async def provider():
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
async def on_start() -> str:
|
||||||
|
events.append("start")
|
||||||
|
return "start accepted"
|
||||||
|
|
||||||
|
async def on_stop() -> str:
|
||||||
|
events.append("stop")
|
||||||
|
return "stop accepted"
|
||||||
|
|
||||||
|
async def request(port: int, path: str) -> tuple[int, dict[str, str]]:
|
||||||
|
reader, writer = await asyncio.open_connection("127.0.0.1", port)
|
||||||
|
writer.write(
|
||||||
|
f"GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n".encode("utf-8")
|
||||||
|
)
|
||||||
|
await writer.drain()
|
||||||
|
raw = await reader.read()
|
||||||
|
writer.close()
|
||||||
|
await writer.wait_closed()
|
||||||
|
|
||||||
|
header, body = raw.split(b"\r\n\r\n", maxsplit=1)
|
||||||
|
status_code = int(header.split(b" ")[1])
|
||||||
|
payload = json.loads(body.decode("utf-8"))
|
||||||
|
return status_code, payload
|
||||||
|
|
||||||
|
async def scenario() -> None:
|
||||||
|
server = ManagementServer(
|
||||||
|
host="127.0.0.1",
|
||||||
|
port=0,
|
||||||
|
timeout=0.2,
|
||||||
|
health_provider=provider,
|
||||||
|
on_start=on_start,
|
||||||
|
on_stop=on_stop,
|
||||||
|
)
|
||||||
|
await server.start()
|
||||||
|
try:
|
||||||
|
port = server.port
|
||||||
|
assert port > 0
|
||||||
|
|
||||||
|
start_code, start_payload = await request(port, "/actions/start")
|
||||||
|
stop_code, stop_payload = await request(port, "/actions/stop")
|
||||||
|
finally:
|
||||||
|
await server.stop()
|
||||||
|
|
||||||
|
assert start_code == 200
|
||||||
|
assert start_payload["status"] == "ok"
|
||||||
|
assert start_payload["detail"] == "start accepted"
|
||||||
|
|
||||||
|
assert stop_code == 200
|
||||||
|
assert stop_payload["status"] == "ok"
|
||||||
|
assert stop_payload["detail"] == "stop accepted"
|
||||||
|
assert events == ["start", "stop"]
|
||||||
|
|
||||||
|
asyncio.run(scenario())
|
||||||
|
|||||||
Reference in New Issue
Block a user