Перенос LogManager в v2 и обновление документации. Обновлены импорты и исправлены ссылки на LogManager в README и тестах. Удалены устаревшие типы и рефакторинг конфигурации управления.

This commit is contained in:
2026-02-26 20:37:00 +03:00
parent 7b5d6d2156
commit aa32c23dba
22 changed files with 168 additions and 143 deletions

View File

@@ -10,7 +10,7 @@
**Ядро (core):**
- **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг.
- **WorkerLoop** — в отдельном потоке циклически вызывает ваш метод `execute()` с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки.
- **LogManager** (v1) — применяет секцию `log` из конфига к логированию (dictConfig).
- **LogManager** — применяет секцию `log` из конфига к логированию (dictConfig).
- **HealthAggregator** — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного `execute()` и таймаут здоровья; формирует единый ответ для health (ok/unhealthy).
- **ControlChannelBridge** — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса).
@@ -19,7 +19,7 @@
- **HttpControlChannel** — HTTP API (`/health`, `/actions/start`, `/actions/stop`, `/actions/status`); использует **UvicornServerRunner**; для `/health` вызывает **HealthAggregator.collect()**, для действий — переданные обработчики из **ControlChannelBridge**.
- **TelegramControlChannel** — реализация через long polling Telegram; команды `/start`, `/stop`, `/status` вызывают переданные обработчики.
**Поток работы:** при `start()` менеджер собирает список каналов (при `management_settings.enabled` **HttpControlChannel**, плюс опционально **control_channel** / **control_channels**), поднимает все каналы с одним **ControlChannelBridge**, затем запускает два цикла: **WorkerLoop** и периодическое обновление конфига через **ConfigLoader**. Остановка по halt (через любой канал) завершает оба цикла; в конце останавливаются все каналы.
**Поток работы:** при `start()` менеджер собирает список каналов: при `management.enabled: true` в **config.yaml** (секция `management`) добавляется **HttpControlChannel**, плюс опционально **control_channel** / **control_channels** в конструкторе. Все каналы поднимаются с одним **ControlChannelBridge**, затем запускаются два цикла: **WorkerLoop** и периодическое обновление конфига через **ConfigLoader**. Остановка по halt (через любой канал) завершает оба цикла; в конце останавливаются все каналы. Настройки HTTP-канала (host, port, timeout, health_timeout) задаются в config.yaml в секции `management`.
## Диаграмма классов (v1 и v2)
@@ -156,7 +156,7 @@ classDiagram
**Как проверить, что конфигурация логирования применилась:**
- Убедитесь, что путь к файлу конфига верный и файл загружается при старте (в логах нет ошибки чтения конфига).
- Убедитесь, что в конфиге есть ключ `log` с `version: 1`, `handlers` и `loggers` (пример — `tests/config.yaml`).
- После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v1.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO.
- После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v2.core.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO.
## Установка
``pip install git+https://git.lesha.spb.ru/alex/config_manager.git``

View File

@@ -1,3 +1,3 @@
from .v2 import ConfigManagerV2 as ConfigManager
from .v1.cfg_manager import ConfigManager as LegacyConfigManager
from .v1.log_manager import LogManager
from .v2.core.log_manager import LogManager

View File

@@ -6,7 +6,7 @@ import yaml
import os
from typing import Any, Optional
from .log_manager import LogManager
from ..v2.core.log_manager import LogManager
class ConfigManager:
DEFAULT_UPDATE_INTERVAL = 5

View File

@@ -1,43 +1,4 @@
import logging
from typing import Optional
"""Обратная совместимость: LogManager перенесён в v2.core.log_manager."""
from ..v2.core.log_manager import LogManager
class LogManager:
"""
Управляет конфигурацией логирования приложения.
Применяет конфигурацию из словаря с обработкой ошибок.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._last_valid_config: Optional[dict] = None
def apply_config(self, config: dict) -> None:
"""
Применяет конфигурацию логирования из словаря.
При ошибке восстанавливает последний валидный конфиг.
Args:
config: Словарь с настройками логирования (из файла конфига)
"""
logging_config = config.get("log")
if not logging_config:
self.logger.warning(
"Config has no 'log' section; logging parameters from config are not applied (default level may be WARNING)."
)
return
try:
logging.config.dictConfig(logging_config)
self._last_valid_config = logging_config
self.logger.info("Logging configuration applied")
except Exception as e:
self.logger.error(f"Error applying logging config: {e}")
# Если был предыдущий валидный конфиг, восстанавливаем его
if self._last_valid_config:
try:
logging.config.dictConfig(self._last_valid_config)
self.logger.info("Previous logging configuration restored")
except Exception as restore_error:
self.logger.error(f"Error restoring previous config: {restore_error}")
__all__ = ["LogManager"]

View File

@@ -1,7 +1,7 @@
"""Публичный API V2: точка входа в менеджер конфигурации и настройки management-сервера.
"""Публичный API V2: точка входа в менеджер конфигурации и настройки HTTP-канала из config.yaml.
Экспортирует ConfigManagerV2 и типы настроек для использования приложениями."""
from .core import ConfigManagerV2
from .types import HealthServerSettings, ManagementServerSettings
from .core.types import HealthServerSettings, ManagementServerSettings
__all__ = ["ConfigManagerV2", "ManagementServerSettings", "HealthServerSettings"]

View File

@@ -13,7 +13,7 @@ from fastapi import Request
from fastapi.responses import JSONResponse
from uvicorn import Config, Server
from ..types import HealthPayload
from ..core.types import HealthPayload
from .base import ControlChannel, StartHandler, StatusHandler, StopHandler
PATH_HEALTH = "/health"

View File

@@ -4,7 +4,8 @@
from .config_loader import ConfigLoader
from .control_bridge import ControlChannelBridge
from .health_aggregator import HealthAggregator
from .log_manager import LogManager
from .manager import ConfigManagerV2
from .scheduler import WorkerLoop
__all__ = ["ConfigLoader", "ConfigManagerV2", "ControlChannelBridge", "HealthAggregator", "WorkerLoop"]
__all__ = ["ConfigLoader", "ConfigManagerV2", "ControlChannelBridge", "HealthAggregator", "LogManager", "WorkerLoop"]

View File

@@ -22,19 +22,19 @@ class ConfigLoader:
self.config: Any = None
self.last_valid_config: Any = None
self._last_seen_hash: Optional[str] = None
logger.warning("ConfigLoader.__init__ result: path=%s", self.path)
logger.debug("ConfigLoader.__init__ result: path=%s", self.path)
def _read_file_sync(self) -> str:
"""Синхронно прочитать сырой текст конфига с диска."""
with open(self.path, "r", encoding="utf-8") as fh:
data = fh.read()
logger.warning("ConfigLoader._read_file_sync result: bytes=%s", len(data))
logger.debug("ConfigLoader._read_file_sync result: bytes=%s", len(data))
return data
async def read_file_async(self) -> str:
"""Прочитать сырой текст конфига с диска в рабочем потоке."""
result = await asyncio.to_thread(self._read_file_sync)
logger.warning("ConfigLoader.read_file_async result: bytes=%s", len(result))
logger.debug("ConfigLoader.read_file_async result: bytes=%s", len(result))
return result
def parse_config(self, data: str) -> Any:
@@ -48,7 +48,7 @@ class ConfigLoader:
except Exception: # noqa: BLE001
logger.exception("ConfigLoader.parse_config error: extension=%s", extension)
raise
logger.warning(
logger.debug(
"ConfigLoader.parse_config result: extension=%s type=%s",
extension,
type(result).__name__,
@@ -59,22 +59,35 @@ class ConfigLoader:
def _calculate_hash(data: str) -> str:
"""Вычислить устойчивый хеш содержимого для обнаружения изменений."""
result = hashlib.sha256(data.encode("utf-8")).hexdigest()
logger.warning("ConfigLoader._calculate_hash result: hash=%s", result)
logger.debug("ConfigLoader._calculate_hash result: hash=%s", result)
return result
def load_sync(self) -> Any:
"""Синхронно загрузить и распарсить конфиг один раз (для чтения настроек при инициализации)."""
try:
raw_data = self._read_file_sync()
parsed = self.parse_config(raw_data)
self.config = parsed
self.last_valid_config = parsed
logger.debug("ConfigLoader.load_sync result: loaded")
return parsed
except Exception: # noqa: BLE001
logger.exception("ConfigLoader.load_sync error")
return None
async def load_if_changed(self) -> tuple[bool, Any]:
"""Загрузить и распарсить конфиг только при изменении содержимого файла."""
raw_data = await self.read_file_async()
current_hash = self._calculate_hash(raw_data)
if current_hash == self._last_seen_hash:
logger.warning("ConfigLoader.load_if_changed result: changed=False")
logger.debug("ConfigLoader.load_if_changed result: changed=False")
return False, self.config
self._last_seen_hash = current_hash
parsed = self.parse_config(raw_data)
self.config = parsed
self.last_valid_config = parsed
logger.warning("ConfigLoader.load_if_changed result: changed=True")
logger.debug("ConfigLoader.load_if_changed result: changed=True")
return True, parsed

View File

@@ -7,7 +7,7 @@ import asyncio
import logging
from collections.abc import Awaitable, Callable
from ..types import LifecycleState
from .types import LifecycleState
logger = logging.getLogger(__name__)

View File

@@ -7,7 +7,7 @@ import logging
import time
from collections.abc import Callable
from ..types import HealthPayload, LifecycleState
from .types import HealthPayload, LifecycleState
logger = logging.getLogger(__name__)

View File

@@ -0,0 +1,38 @@
"""Применение конфигурации логирования из словаря (секция `log` в config.yaml).
Управляет конфигурацией логирования приложения через dictConfig, с восстановлением последнего валидного конфига при ошибке."""
from __future__ import annotations
import logging
import logging.config
from typing import Optional
class LogManager:
"""Применяет секцию `log` из конфига к логированию (dictConfig). При ошибке восстанавливает последний валидный конфиг."""
def __init__(self) -> None:
self.logger = logging.getLogger(__name__)
self._last_valid_config: Optional[dict] = None
def apply_config(self, config: dict) -> None:
"""Применить конфигурацию логирования из словаря. При ошибке восстанавливает последний валидный конфиг."""
logging_config = config.get("log")
if not logging_config:
self.logger.warning(
"Config has no 'log' section; logging parameters from config are not applied (default level may be WARNING)."
)
return
try:
logging.config.dictConfig(logging_config)
self._last_valid_config = logging_config
self.logger.info("Logging configuration applied")
except Exception as e:
self.logger.error("Error applying logging config: %s", e)
if self._last_valid_config:
try:
logging.config.dictConfig(self._last_valid_config)
self.logger.info("Previous logging configuration restored")
except Exception as restore_error:
self.logger.error("Error restoring previous config: %s", restore_error)

View File

@@ -7,10 +7,10 @@ import os
import time
from typing import Any, Iterable, Optional
from ...v1.log_manager import LogManager
from .log_manager import LogManager
from ..control.base import ControlChannel
from ..control.http_channel import HttpControlChannel
from ..types import HealthPayload, LifecycleState, ManagementServerSettings
from .types import HealthPayload, LifecycleState, ManagementServerSettings, management_settings_from_config
from .config_loader import ConfigLoader
from .control_bridge import ControlChannelBridge
from .health_aggregator import HealthAggregator
@@ -195,7 +195,6 @@ class ConfigManagerV2(_RuntimeController):
self,
path: str,
log_manager: Optional[LogManager] = None,
management_settings: Optional[ManagementServerSettings] = None,
control_channel: Optional[ControlChannel] = None,
control_channels: Optional[Iterable[ControlChannel]] = None,
):
@@ -214,7 +213,9 @@ class ConfigManagerV2(_RuntimeController):
self._last_execute_error: Optional[str] = None
self._last_success_timestamp: Optional[float] = None
settings = management_settings or ManagementServerSettings(enabled=True)
initial_config = self._loader.load_sync()
self.config = initial_config
settings = management_settings_from_config(initial_config if isinstance(initial_config, dict) else {})
self._health_timeout = settings.health_timeout
self._health_aggregator = HealthAggregator(
get_state=lambda: self._state,

View File

@@ -0,0 +1,62 @@
"""Типы core: состояние здоровья, жизненного цикла и настройки HTTP-канала из config.yaml.
Используются в core и control для единообразных контрактов."""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Any, Literal, TypedDict
HealthState = Literal["ok", "degraded", "unhealthy"]
class HealthPayload(TypedDict, total=False):
status: HealthState
detail: str
state: str
"""Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped)."""
class LifecycleState(str, Enum):
IDLE = "idle"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
STOPPED = "stopped"
@dataclass
class ManagementServerSettings:
"""Настройки HTTP-канала управления и healthcheck (читаются из config.yaml, секция management)."""
enabled: bool = False
host: str = "0.0.0.0"
port: int = 8000
timeout: int = 3
"""Таймаут запроса health (секунды)."""
health_timeout: int = 30
"""Секунды без успешного execute(), после которых health = unhealthy."""
HealthServerSettings = ManagementServerSettings
def management_settings_from_config(config: Any) -> ManagementServerSettings:
"""Извлечь настройки HTTP-канала из конфига (секция `management`)."""
if not isinstance(config, dict):
return ManagementServerSettings(enabled=False)
m = config.get("management")
if not isinstance(m, dict):
return ManagementServerSettings(enabled=False)
enabled = bool(m.get("enabled", False))
host = str(m.get("host", "0.0.0.0"))
port = int(m.get("port", 8000)) if isinstance(m.get("port"), (int, float)) else 8000
timeout = int(m.get("timeout", 3)) if isinstance(m.get("timeout"), (int, float)) else 3
health_timeout = int(m.get("health_timeout", 30)) if isinstance(m.get("health_timeout"), (int, float)) else 30
return ManagementServerSettings(
enabled=enabled,
host=host,
port=port,
timeout=timeout,
health_timeout=health_timeout,
)

View File

@@ -1,13 +0,0 @@
"""Re-exports для обратной совместимости: HealthAggregator, ControlChannelBridge, ManagementServer/HealthServer (HttpControlChannel)."""
from ..control import HttpControlChannel
from ..core import ControlChannelBridge, HealthAggregator
ManagementServer = HttpControlChannel
HealthServer = HttpControlChannel
__all__ = [
"ControlChannelBridge",
"HealthAggregator",
"HealthServer",
"ManagementServer",
]

View File

@@ -1,42 +0,0 @@
"""Общие типы V2: состояние здоровья, жизненного цикла и настройки management-сервера.
Используются в core, management и control для единообразных контрактов."""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Literal, TypedDict
HealthState = Literal["ok", "degraded", "unhealthy"]
class HealthPayload(TypedDict, total=False):
status: HealthState
detail: str
state: str
"""Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped)."""
class LifecycleState(str, Enum):
IDLE = "idle"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
STOPPED = "stopped"
@dataclass
class ManagementServerSettings:
"""Настройки management HTTP-сервера и healthcheck (один объект на оба)."""
enabled: bool = False
host: str = "0.0.0.0"
port: int = 8000
timeout: int = 3
"""Таймаут запроса health (секунды)."""
health_timeout: int = 30
"""Секунды без успешного execute(), после которых health = unhealthy."""
# Backward-compatible alias.
HealthServerSettings = ManagementServerSettings

View File

@@ -1,6 +1,14 @@
# === Раздел с общими конфигурационными параметрами ===
runtime: 5
# === HTTP-канал управления (ConfigManagerV2): /health, /actions/start, /actions/stop ===
# management:
# enabled: true
# host: "0.0.0.0"
# port: 8000
# timeout: 3
# health_timeout: 30
# === Логирование ===
log:
version: 1

View File

@@ -6,7 +6,7 @@ import logging
from pathlib import Path
from config_manager import ConfigManager
from config_manager.v1.log_manager import LogManager
from config_manager.v2.core import LogManager
from config_manager.v2 import ManagementServerSettings
logger = logging.getLogger()

View File

@@ -1,6 +1,6 @@
import asyncio
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings
from config_manager.v2 import ConfigManagerV2
class ReloadApp(ConfigManagerV2):
@@ -14,9 +14,9 @@ class ReloadApp(ConfigManagerV2):
def test_invalid_config_keeps_last_valid(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8")
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = ReloadApp(str(cfg), management_settings=ManagementServerSettings(enabled=False))
app = ReloadApp(str(cfg))
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12)

View File

@@ -1,6 +1,6 @@
import asyncio
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings
from config_manager.v2 import ConfigManagerV2
class DemoApp(ConfigManagerV2):
@@ -18,9 +18,9 @@ class DemoApp(ConfigManagerV2):
def test_execute_loop_runs(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8")
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = DemoApp(str(cfg), management_settings=ManagementServerSettings(enabled=False))
app = DemoApp(str(cfg))
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.18)

View File

@@ -1,6 +1,6 @@
import asyncio
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings
from config_manager.v2 import ConfigManagerV2
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
@@ -33,14 +33,10 @@ class ControlledApp(ConfigManagerV2):
def test_control_channel_can_stop_manager(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8")
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
channel = DummyControlChannel()
app = ControlledApp(
str(cfg),
control_channel=channel,
management_settings=ManagementServerSettings(enabled=False),
)
app = ControlledApp(str(cfg), control_channel=channel)
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12)

View File

@@ -1,7 +1,7 @@
import asyncio
import json
from config_manager.v2.management import ManagementServer
from config_manager.v2.control import HttpControlChannel
def test_health_mapping_ok_to_200():
@@ -9,7 +9,7 @@ def test_health_mapping_ok_to_200():
return {"status": "ok"}
async def scenario() -> None:
server = ManagementServer(
server = HttpControlChannel(
host="127.0.0.1",
port=8000,
timeout=0.2,
@@ -27,7 +27,7 @@ def test_health_mapping_unhealthy_to_503():
return {"status": "unhealthy", "detail": "worker failed"}
async def scenario() -> None:
server = ManagementServer(
server = HttpControlChannel(
host="127.0.0.1",
port=8000,
timeout=0.2,
@@ -73,21 +73,21 @@ def test_action_routes_call_callbacks():
return status_code, payload
async def scenario() -> None:
server = ManagementServer(
channel = HttpControlChannel(
host="127.0.0.1",
port=0,
timeout=0.2,
health_provider=provider,
)
await server.start(on_start, on_stop, on_status)
await channel.start(on_start, on_stop, on_status)
try:
port = server.port
port = channel.port
assert port > 0
start_code, start_payload = await request(port, "/actions/start")
stop_code, stop_payload = await request(port, "/actions/stop")
finally:
await server.stop()
await channel.stop()
assert start_code == 200
assert start_payload["status"] == "ok"

View File

@@ -2,7 +2,7 @@ import asyncio
import threading
import time
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings
from config_manager.v2 import ConfigManagerV2
class BlockingApp(ConfigManagerV2):
@@ -26,9 +26,9 @@ class BlockingApp(ConfigManagerV2):
def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8")
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = BlockingApp(str(cfg), management_settings=ManagementServerSettings(enabled=False))
app = BlockingApp(str(cfg))
runner = asyncio.create_task(app.start())
started = await asyncio.to_thread(app.started_event.wait, 1.0)