21 Commits

Author SHA1 Message Date
2b43e2f86a Поправил версию большого рефакторинга 2026-02-26 21:59:10 +03:00
a1dd495d6d Большой рефакторинг с кодексом 2026-02-26 21:58:21 +03:00
aa32c23dba Перенос LogManager в v2 и обновление документации. Обновлены импорты и исправлены ссылки на LogManager в README и тестах. Удалены устаревшие типы и рефакторинг конфигурации управления. 2026-02-26 20:37:00 +03:00
7b5d6d2156 Рефакторинг канала управления 2026-02-26 20:26:22 +03:00
aee8f7460f Апгрейд версии 2026-02-26 11:25:06 +03:00
ad89b3db92 Фикс зависаний и логирование от кодекса 2026-02-26 11:24:34 +03:00
6502f2252d Добавлен execute_timeout 2026-02-24 20:59:46 +03:00
67cd098f54 Апдейт версии 2026-02-22 23:03:43 +03:00
8d177b0fd1 Добави логирование в работу worker loop 2026-02-22 23:03:11 +03:00
3293814898 ввв 2026-02-21 23:39:29 +03:00
e7d11ddf71 Добавил принт 2026-02-21 23:38:48 +03:00
2b02af60d5 Сделал сам 2026-02-21 23:35:16 +03:00
b2442f4d91 Поправил типы 2026-02-21 22:53:52 +03:00
f5bb681ddb Пофиксил версию 2026-02-21 22:47:01 +03:00
058c19d677 Изменена логика задания таймаутов ожидания 2026-02-21 22:45:41 +03:00
608cd42719 Апдейт версии 2026-02-21 15:27:27 +03:00
2d6179d366 Рефакторинг и добавил апишку для управления 2026-02-21 15:14:34 +03:00
d888ae7acb Первая итерация рефакторинга 2026-02-21 00:20:07 +03:00
1d71ce406f Merge pull request 'Version update' (#2) from feature/healthcheck-requirements into master
Reviewed-on: #2
2026-02-19 19:51:54 +00:00
80dd69c5ec Version update 2026-02-19 22:50:51 +03:00
8da6df0b2a Merge pull request 'feature/healthcheck-requirements' (#1) from feature/healthcheck-requirements into master
Reviewed-on: #1
2026-02-19 19:49:27 +00:00
30 changed files with 1701 additions and 680 deletions

208
README.md
View File

@@ -1,12 +1,210 @@
# Config Manager # Config Manager
## Description ## Описание
This package was created to run my applications. Пакет предназначен для запуска приложений с периодическим выполнением логики, перезагрузкой конфига и управлением по HTTP API.
The ConfigManager class implements the entry point for the program and provides the actual application configuration. It also simplifies logging setup.
## Installation **Контракт:** приложение наследует **ConfigManagerV2**, переопределяет **execute()** (периодическая работа). Управление (старт/стоп, health) — через каналы, которые создаются снаружи и передаются в конструктор в **control_channels** (в т.ч. HttpControlChannel для API).
## ConfigManager: устройство и взаимосвязи
**ConfigManager** (класс ConfigManagerV2) — точка входа приложения. Он наследует внутреннюю логику от **\_RuntimeController** (циклы воркера и обновления конфига, запуск/остановка каналов управления).
**Ядро (core):**
- **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг.
- **WorkerLoop** — в отдельном потоке циклически вызывает ваш метод `execute()` с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки.
- **LogManager** — применяет секцию `log` из конфига к логированию (dictConfig).
- **HealthAggregator** — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного `execute()` и таймаут здоровья; формирует единый ответ для health (ok/unhealthy).
- **ControlChannelBridge** — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса).
**Каналы управления (control):**
- **ControlChannel** — абстрактный контракт: `start(on_start, on_stop, on_status)`, `stop()`.
- **HttpControlChannel** — HTTP API (`/health`, `/actions/start`, `/actions/stop`, `/actions/status`); использует **UvicornServerRunner**; для `/health` вызывает **HealthAggregator.collect()**, для действий — переданные обработчики из **ControlChannelBridge**.
- **TelegramControlChannel** — реализация через long polling Telegram; команды `/start`, `/stop`, `/status` вызывают переданные обработчики.
**Поток работы:** при `start()` менеджер поднимает каналы из **control_channels** (заданные снаружи), затем запускает два цикла: **WorkerLoop** и периодическое обновление конфига через **ConfigLoader**. Управление по API: `/health`, `/actions/start`, `/actions/stop` — если в control_channels передан **HttpControlChannel**. Остановка по halt завершает оба цикла; в конце останавливаются все каналы.
## Запуск приложения с ConfigManagerV2 и HttpControlChannel
1. **Наследуйте ConfigManagerV2** и реализуйте метод `execute()` (в нём — ваша периодическая работа). При необходимости переопределите `get_health_status()` для кастомного ответа `/health`.
2. **Создайте каналы снаружи и передайте в конструктор.** Для HTTP API создайте **HttpControlChannel**; для health нужен колбэк менеджера — передайте **control_channels** как фабрику (lambda, получающую менеджер):
```python
from config_manager.v2.control import HttpControlChannel
app = MyApp(
str(path_to_config),
control_channels=lambda m: [
HttpControlChannel(
host="0.0.0.0",
port=8000,
timeout=3,
health_provider=m.get_health_provider(),
)
],
)
```
Либо передайте готовый список каналов: `control_channels=[channel1, channel2]`.
3. **Запустите из async-контекста:** `await app.start()` или `asyncio.create_task(app.start())` для фона. Остановка: `await app.stop()` или запрос `/actions/stop` по HTTP.
**Минимальный пример с HTTP API:**
```python
import asyncio
import logging
from pathlib import Path
from config_manager import ConfigManager
from config_manager.v2.control import HttpControlChannel
class MyApp(ConfigManager):
def execute(self) -> None:
pass # ваша периодическая работа
async def main() -> None:
app = MyApp(
str(Path(__file__).parent / "config.yaml"),
control_channels=lambda m: [
HttpControlChannel(
host="0.0.0.0", port=8000, timeout=3,
health_provider=m.get_health_provider(),
)
],
)
asyncio.create_task(app.start())
await asyncio.sleep(3600)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
```
Готовый пример: `tests/test_app.py`.
## Диаграмма классов
```mermaid
classDiagram
direction TB
class ConfigManagerV2 {
+str path
+Any config
+float update_interval
+float work_interval
-ConfigLoader _loader
-LifecycleState _state
+start() async
+stop() async
+execute()*
+get_health_status() HealthPayload
-_run() async
-_worker_loop() async
-_periodic_update_loop() async
}
class _RuntimeController {
<<внутренний>>
-_on_execute_success()
-_on_execute_error(exc)
-_worker_loop() async
-_periodic_update_loop() async
-_start_control_channels() async
-_stop_control_channels() async
-_run() async
}
class ConfigLoader {
+str path
+Any config
+Any last_valid_config
+load_if_changed() async
+parse_config(data) Any
-_read_file_sync() str
-read_file_async() async
}
class WorkerLoop {
-Callable execute
-Callable get_interval
-Event halt_event
+run() async
}
class LogManager {
+apply_config(config) None
}
class ControlChannel {
<<абстрактный>>
+start(on_start, on_stop, on_status) async*
+stop() async*
}
class TelegramControlChannel {
-str _token
-int _chat_id
+start(on_start, on_stop, on_status) async
+stop() async
-_poll_loop() async
}
class HttpControlChannel {
-UvicornServerRunner _runner
-Callable _health_provider
+start(on_start, on_stop, on_status) async
+stop() async
+int port
}
class HealthAggregator {
-Callable get_state
-Callable get_app_health
+collect() async HealthPayload
}
class ControlChannelBridge {
-Event _halt
-Callable _get_state
-Callable _get_status
+on_start() async str
+on_stop() async str
+on_status() async str
}
class UvicornServerRunner {
-Server _server
-Task _serve_task
+start(app) async
+stop() async
+int port
}
ConfigManagerV2 --|> _RuntimeController : наследует
ConfigManagerV2 --> ConfigLoader : использует
ConfigManagerV2 --> LogManager : использует
ConfigManagerV2 --> HealthAggregator : использует
ConfigManagerV2 --> ControlChannelBridge : использует
ConfigManagerV2 ..> ControlChannel : список каналов
_RuntimeController ..> WorkerLoop : создаёт в _worker_loop
TelegramControlChannel --|> ControlChannel : реализует
HttpControlChannel --|> ControlChannel : реализует
HttpControlChannel --> UvicornServerRunner : использует
HttpControlChannel ..> HealthAggregator : health_provider
ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status
```
## Логирование
Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться.
**Как проверить, что конфигурация логирования применилась:**
- Убедитесь, что путь к файлу конфига верный и файл загружается при старте (в логах нет ошибки чтения конфига).
- Убедитесь, что в конфиге есть ключ `log` с `version: 1`, `handlers` и `loggers` (пример — `tests/config.yaml`).
- После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v2.core.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO.
## Установка
``pip install git+https://git.lesha.spb.ru/alex/config_manager.git`` ``pip install git+https://git.lesha.spb.ru/alex/config_manager.git``
## Contacts ## Контакты
- **e-mail**: lesha.spb@gmail.com - **e-mail**: lesha.spb@gmail.com
- **telegram**: https://t.me/lesha_spb - **telegram**: https://t.me/lesha_spb

View File

@@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "config_manager" name = "config_manager"
version = "1.2.2" version = "2.2.0"
description = "Config manager for building applications" description = "Большой рефакторинг"
authors = [ authors = [
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" } { name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
] ]
@@ -13,6 +13,8 @@ readme = "README.md"
requires-python = ">=3.8" requires-python = ">=3.8"
dependencies = [ dependencies = [
"PyYAML>=6.0", "PyYAML>=6.0",
"fastapi>=0.100.0",
"uvicorn[standard]>=0.22.0",
] ]
[project.urls] [project.urls]

View File

@@ -1,3 +1,2 @@
from .v2.manager import ConfigManagerV2 as ConfigManager from .v2 import ConfigManagerV2 as ConfigManager
from .v1.cfg_manager import ConfigManager as LegacyConfigManager from .v2.core.log_manager import LogManager
from .v1.log_manager import LogManager

View File

@@ -1,137 +0,0 @@
import logging
import logging.config
import asyncio
import json
import yaml
import os
from typing import Any, Optional
from .log_manager import LogManager
class ConfigManager:
DEFAULT_UPDATE_INTERVAL = 5.0
DEFAULT_WORK_INTERVAL = 2.0
def __init__(self, path: str, log_manager: Optional[LogManager] = None):
self.path = path
self.config: Any = None
self._last_hash = None
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
self.work_interval = self.DEFAULT_WORK_INTERVAL
self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._log_manager = log_manager or LogManager()
self.logger = logging.getLogger(__name__)
def _read_file_sync(self) -> str:
with open(self.path, "r", encoding="utf-8") as f:
return f.read()
async def _read_file_async(self) -> str:
return await asyncio.to_thread(self._read_file_sync)
def _parse_config(self, data) -> Any:
extension = os.path.splitext(self.path)[1].lower()
if extension in (".yaml", ".yml"):
return yaml.safe_load(data)
else:
return json.loads(data)
def _update_intervals_from_config(self) -> None:
if not self.config:
return
upd = self.config.get("update_interval")
wrk = self.config.get("work_interval")
if isinstance(upd, (int, float)) and upd > 0:
self.update_interval = float(upd)
self.logger.info(f"Update interval set to {self.update_interval} seconds")
else:
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
if isinstance(wrk, (int, float)) and wrk > 0:
self.work_interval = float(wrk)
self.logger.info(f"Work interval set to {self.work_interval} seconds")
else:
self.work_interval = self.DEFAULT_WORK_INTERVAL
async def _update_config(self) -> None:
try:
data = await self._read_file_async()
current_hash = hash(data)
if current_hash != self._last_hash:
new_config = self._parse_config(data)
self.config = new_config
self._last_hash = current_hash
self._log_manager.apply_config(new_config)
self._update_intervals_from_config()
except Exception as e:
self.logger.error(f"Error reading/parsing config file: {e}")
def execute(self) -> None:
"""
Метод для переопределения в подклассах.
Здесь может быть блокирующая работа.
Запускается в отдельном потоке.
"""
pass
async def _worker_loop(self) -> None:
while not self._halt.is_set():
await asyncio.to_thread(self.execute)
await asyncio.sleep(self.work_interval)
async def _periodic_update_loop(self) -> None:
while not self._halt.is_set():
await self._update_config()
await asyncio.sleep(self.update_interval)
async def _run(self) -> None:
"""Внутренняя корутина, запускающая все циклы"""
self._halt.clear()
self.logger.info("ConfigManager started")
try:
await asyncio.gather(
self._worker_loop(),
self._periodic_update_loop()
)
except asyncio.CancelledError:
self.logger.info("ConfigManager tasks cancelled")
finally:
self.logger.info("ConfigManager stopped")
async def start(self) -> None:
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManager is already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.error("start() must be called from within an async context")
raise
self.logger.info("ConfigManager starting and awaiting _run()")
await self._run()
async def stop(self) -> None:
"""Останавливает менеджер конфигурации и ожидает завершения"""
if self._task is None:
self.logger.warning("ConfigManager is not running")
return
self.logger.info("ConfigManager stopping...")
self._halt.set()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
self.logger.info("ConfigManager stopped successfully")

View File

@@ -1,40 +0,0 @@
import logging
from typing import Optional
class LogManager:
"""
Управляет конфигурацией логирования приложения.
Применяет конфигурацию из словаря с обработкой ошибок.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._last_valid_config: Optional[dict] = None
def apply_config(self, config: dict) -> None:
"""
Применяет конфигурацию логирования из словаря.
При ошибке восстанавливает последний валидный конфиг.
Args:
config: Словарь с настройками логирования (из файла конфига)
"""
logging_config = config.get("log")
if not logging_config:
return
try:
logging.config.dictConfig(logging_config)
self._last_valid_config = logging_config
self.logger.info("Logging configuration applied")
except Exception as e:
self.logger.error(f"Error applying logging config: {e}")
# Если был предыдущий валидный конфиг, восстанавливаем его
if self._last_valid_config:
try:
logging.config.dictConfig(self._last_valid_config)
self.logger.info("Previous logging configuration restored")
except Exception as restore_error:
self.logger.error(f"Error restoring previous config: {restore_error}")

View File

@@ -1,4 +1,6 @@
from .manager import ConfigManagerV2 """Публичный API: точка входа в менеджер конфигурации.
from .types import HealthServerSettings
__all__ = ["ConfigManagerV2", "HealthServerSettings"] Контракт: наследование от ConfigManagerV2, переопределение execute(), управление через API (config.yaml, секция management)."""
from .core import ConfigManagerV2
__all__ = ["ConfigManagerV2"]

View File

@@ -1,52 +0,0 @@
from __future__ import annotations
import asyncio
import hashlib
import json
import os
from typing import Any, Optional
import yaml
class ConfigLoader:
def __init__(self, path: str):
"""Initialize loader state for a specific config file path."""
self.path = path
self.config: Any = None
self.last_valid_config: Any = None
self._last_seen_hash: Optional[str] = None
def _read_file_sync(self) -> str:
"""Read raw config text from disk synchronously."""
with open(self.path, "r", encoding="utf-8") as fh:
return fh.read()
async def read_file_async(self) -> str:
"""Read raw config text from disk in a worker thread."""
return await asyncio.to_thread(self._read_file_sync)
def parse_config(self, data: str) -> Any:
"""Parse config text as YAML or JSON based on file extension."""
extension = os.path.splitext(self.path)[1].lower()
if extension in (".yaml", ".yml"):
return yaml.safe_load(data)
return json.loads(data)
@staticmethod
def _calculate_hash(data: str) -> str:
"""Calculate a stable content hash for change detection."""
return hashlib.sha256(data.encode("utf-8")).hexdigest()
async def load_if_changed(self) -> tuple[bool, Any]:
"""Load and parse config only when file content changed."""
raw_data = await self.read_file_async()
current_hash = self._calculate_hash(raw_data)
if current_hash == self._last_seen_hash:
return False, self.config
self._last_seen_hash = current_hash
parsed = self.parse_config(raw_data)
self.config = parsed
self.last_valid_config = parsed
return True, parsed

View File

@@ -1,4 +1,8 @@
"""Каналы внешнего управления: абстракция и реализация (HTTP, Telegram).
Позволяет запускать, останавливать и запрашивать статус менеджера через HTTP API и ботов."""
from .base import ControlChannel from .base import ControlChannel
from .http_channel import HttpControlChannel
from .telegram import TelegramControlChannel from .telegram import TelegramControlChannel
__all__ = ["ControlChannel", "TelegramControlChannel"] __all__ = ["ControlChannel", "HttpControlChannel", "TelegramControlChannel"]

View File

@@ -1,3 +1,6 @@
"""Базовый абстрактный канал управления и типы обработчиков команд.
Определяет контракт: старт/стоп канала и привязка обработчиков start/stop/status."""
from __future__ import annotations from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
@@ -12,10 +15,10 @@ StatusHandler = Callable[[], Awaitable[str]]
class ControlChannel(ABC): class ControlChannel(ABC):
@abstractmethod @abstractmethod
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None: async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
"""Start channel and bind command handlers.""" """Запустить канал и привязать обработчики команд."""
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
async def stop(self) -> None: async def stop(self) -> None:
"""Stop channel and release its resources.""" """Остановить канал и освободить его ресурсы."""
raise NotImplementedError raise NotImplementedError

View File

@@ -0,0 +1,296 @@
"""HTTP-канал управления: /health, /actions/start, /actions/stop (реализация ControlChannel)."""
from __future__ import annotations
import asyncio
import json
import logging
import time
from collections.abc import Awaitable, Callable
from typing import Any, Optional
from fastapi import FastAPI
from fastapi import Request
from fastapi.responses import JSONResponse
from uvicorn import Config, Server
from ..core.types import HealthPayload
from .base import ControlChannel, StartHandler, StatusHandler, StopHandler
PATH_HEALTH = "/health"
PATH_ACTION_START = "/actions/start"
PATH_ACTION_STOP = "/actions/stop"
PATH_ACTION_STATUS = "/actions/status"
logger = logging.getLogger(__name__)
class UvicornServerRunner:
"""Lifecycle wrapper around uvicorn Server."""
def __init__(self, host: str, port: int, timeout: int):
self._host = host
self._port = port
self._timeout = timeout
self._server: Optional[Server] = None
self._serve_task: Optional[asyncio.Task[None]] = None
self._bound_port: Optional[int] = None
logger.debug(
"UvicornServerRunner.__init__ result: host=%s port=%s timeout=%s",
self._host,
self._port,
self._timeout,
)
async def _raise_if_start_task_failed(self) -> None:
if self._serve_task is None or not self._serve_task.done():
return
try:
await self._serve_task
except SystemExit as exc:
raise RuntimeError(f"Management server exited during startup with code {exc.code}") from exc
except Exception as exc: # noqa: BLE001
raise RuntimeError("Management server failed during startup") from exc
raise RuntimeError("Management server stopped unexpectedly during startup")
async def _wait_until_started(self) -> None:
if self._server is None:
raise RuntimeError("Management server is not initialized")
loop = asyncio.get_running_loop()
deadline = loop.time() + max(float(self._timeout), 1.0)
while not self._server.started:
await self._raise_if_start_task_failed()
if loop.time() >= deadline:
raise TimeoutError("Management server startup timed out")
await asyncio.sleep(0.05)
def _resolve_bound_port(self) -> int:
if self._server is None:
return self._port
servers = getattr(self._server, "servers", None)
if not servers:
return self._port
sockets = getattr(servers[0], "sockets", None)
if not sockets:
return self._port
sockname = sockets[0].getsockname()
if isinstance(sockname, tuple) and len(sockname) > 1:
return int(sockname[1])
return self._port
async def _cleanup_start_failure(self) -> None:
if self._server is not None:
self._server.should_exit = True
if self._serve_task is not None:
try:
await self._serve_task
except BaseException: # noqa: BLE001
logger.exception("UvicornServerRunner._cleanup_start_failure error")
self._server = None
self._serve_task = None
self._bound_port = None
logger.debug("UvicornServerRunner._cleanup_start_failure result: state reset")
async def start(self, app: FastAPI) -> None:
if self._serve_task is not None and not self._serve_task.done():
logger.debug("UvicornServerRunner.start result: already running")
return
if self._serve_task is not None and self._serve_task.done():
self._serve_task = None
try:
config = Config(app=app, host=self._host, port=self._port, log_level="warning")
self._server = Server(config)
self._serve_task = asyncio.create_task(self._server.serve(), name="http-control-channel-serve")
await self._wait_until_started()
self._bound_port = self._resolve_bound_port()
logger.debug(
"UvicornServerRunner.start result: running host=%s requested_port=%s bound_port=%s",
self._host,
self._port,
self._bound_port,
)
except Exception:
logger.exception("UvicornServerRunner.start error")
await self._cleanup_start_failure()
raise
async def stop(self) -> None:
if self._server is None or self._serve_task is None:
logger.debug("UvicornServerRunner.stop result: already stopped")
return
self._server.should_exit = True
try:
await self._serve_task
except BaseException: # noqa: BLE001
logger.exception("UvicornServerRunner.stop error")
raise
finally:
self._server = None
self._serve_task = None
self._bound_port = None
logger.debug("UvicornServerRunner.stop result: stopped")
@property
def port(self) -> int:
result = self._bound_port if self._bound_port is not None else self._port
logger.debug("UvicornServerRunner.port result: %s", result)
return result
class HttpControlChannel(ControlChannel):
"""HTTP API как канал управления: /health, /actions/start, /actions/stop, /actions/status."""
def __init__(
self,
host: str,
port: int,
timeout: int,
health_provider: Callable[[], Awaitable[HealthPayload]],
):
self._host = host
self._port = port
self._timeout = timeout
self._health_provider = health_provider
self._on_start: Optional[StartHandler] = None
self._on_stop: Optional[StopHandler] = None
self._on_status: Optional[StatusHandler] = None
self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout)
self._app: Optional[FastAPI] = None
logger.debug(
"HttpControlChannel.__init__ result: host=%s port=%s timeout=%s",
host,
port,
timeout,
)
def _create_app(self) -> FastAPI:
app = FastAPI(title="Config Manager Management API")
@app.middleware("http")
async def log_api_call(request: Request, call_next): # type: ignore[no-untyped-def]
started = time.monotonic()
response = await call_next(request)
duration_ms = int((time.monotonic() - started) * 1000)
logger.info(
"API call: method=%s path=%s status=%s duration_ms=%s",
request.method,
request.url.path,
response.status_code,
duration_ms,
)
return response
@app.get(PATH_HEALTH)
async def health() -> JSONResponse:
return await self._health_response()
@app.get(PATH_ACTION_START)
@app.post(PATH_ACTION_START)
async def action_start() -> JSONResponse:
return await self._action_response("start", self._on_start)
@app.get(PATH_ACTION_STOP)
@app.post(PATH_ACTION_STOP)
async def action_stop() -> JSONResponse:
return await self._action_response("stop", self._on_stop)
@app.get(PATH_ACTION_STATUS)
@app.post(PATH_ACTION_STATUS)
async def action_status() -> JSONResponse:
return await self._action_response("status", self._on_status)
logger.debug(
"HttpControlChannel._create_app result: routes=%s,%s,%s,%s",
PATH_HEALTH,
PATH_ACTION_START,
PATH_ACTION_STOP,
PATH_ACTION_STATUS,
)
return app
async def _health_response(self) -> JSONResponse:
try:
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
status_code = 200 if payload.get("status", "unhealthy") == "ok" else 503
logger.debug(
"HttpControlChannel._health_response result: status_code=%s payload=%s",
status_code,
payload,
)
return JSONResponse(content=payload, status_code=status_code)
except Exception as exc: # noqa: BLE001
logger.exception("HttpControlChannel._health_response error")
return JSONResponse(content={"status": "unhealthy", "detail": str(exc)}, status_code=503)
async def _action_response(
self,
action: str,
callback: Optional[Callable[[], Awaitable[str]]],
) -> JSONResponse:
if callback is None:
logger.debug(
"HttpControlChannel._action_response result: action=%s status_code=404 detail=handler not configured",
action,
)
return JSONResponse(
content={"status": "error", "detail": f"{action} handler is not configured"},
status_code=404,
)
try:
detail = await asyncio.wait_for(callback(), timeout=float(self._timeout))
if not detail:
detail = f"{action} action accepted"
logger.debug(
"HttpControlChannel._action_response result: action=%s status_code=200 detail=%s",
action,
detail,
)
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
except asyncio.TimeoutError:
logger.warning("HttpControlChannel._action_response timeout: action=%s timeout=%s", action, self._timeout)
return JSONResponse(
content={"status": "error", "detail": f"{action} handler did not respond within {self._timeout}s"},
status_code=504,
)
except Exception as exc: # noqa: BLE001
logger.exception("HttpControlChannel._action_response error: action=%s", action)
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
async def start(
self,
on_start: StartHandler,
on_stop: StopHandler,
on_status: StatusHandler,
) -> None:
self._on_start = on_start
self._on_stop = on_stop
self._on_status = on_status
self._app = self._create_app()
try:
await self._runner.start(self._app)
logger.debug("HttpControlChannel.start result: started port=%s", self._runner.port)
except Exception: # noqa: BLE001
logger.exception("HttpControlChannel.start error")
raise
async def stop(self) -> None:
try:
await self._runner.stop()
logger.debug("HttpControlChannel.stop result: stopped")
except BaseException: # noqa: BLE001
logger.exception("HttpControlChannel.stop error")
raise
@property
def port(self) -> int:
return self._runner.port
def _build_health_response(self) -> Awaitable[tuple[int, Any]]:
"""Для тестов: вернуть (status_code, payload) без запуска сервера."""
async def _run() -> tuple[int, Any]:
response = await self._health_response()
body: Any = response.body
if isinstance(body, bytes):
body = json.loads(body.decode("utf-8"))
return response.status_code, body
return _run()

View File

@@ -1,3 +1,6 @@
"""Реализация канала управления через Telegram Bot API (long polling).
Принимает команды /start, /stop, /status в указанном чате и вызывает привязанные обработчики."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
@@ -15,10 +18,10 @@ class TelegramControlChannel(ControlChannel):
self, self,
token: str, token: str,
chat_id: int, chat_id: int,
poll_interval: float = 2.0, poll_interval: int = 2,
logger: Optional[logging.Logger] = None, logger: Optional[logging.Logger] = None,
): ):
"""Initialize Telegram polling channel with bot and chat settings.""" """Инициализация канала опроса Telegram с настройками бота и чата."""
self._token = token self._token = token
self._chat_id = chat_id self._chat_id = chat_id
self._poll_interval = poll_interval self._poll_interval = poll_interval
@@ -31,7 +34,7 @@ class TelegramControlChannel(ControlChannel):
self._logger = logger or logging.getLogger(__name__) self._logger = logger or logging.getLogger(__name__)
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None: async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
"""Start polling Telegram updates and register command callbacks.""" """Запустить опрос обновлений Telegram и зарегистрировать колбэки команд."""
if self._task is not None and not self._task.done(): if self._task is not None and not self._task.done():
return return
self._on_start = on_start self._on_start = on_start
@@ -41,7 +44,7 @@ class TelegramControlChannel(ControlChannel):
self._task = asyncio.create_task(self._poll_loop()) self._task = asyncio.create_task(self._poll_loop())
async def stop(self) -> None: async def stop(self) -> None:
"""Stop polling loop and wait until task termination.""" """Остановить цикл опроса и дождаться завершения задачи."""
self._stop_event.set() self._stop_event.set()
if self._task is not None: if self._task is not None:
self._task.cancel() self._task.cancel()
@@ -52,7 +55,7 @@ class TelegramControlChannel(ControlChannel):
self._task = None self._task = None
async def _poll_loop(self) -> None: async def _poll_loop(self) -> None:
"""Continuously fetch updates and dispatch supported commands.""" """Непрерывно получать обновления и вызывать поддерживаемые команды."""
while not self._stop_event.is_set(): while not self._stop_event.is_set():
try: try:
updates = await asyncio.to_thread(self._fetch_updates) updates = await asyncio.to_thread(self._fetch_updates)
@@ -67,7 +70,7 @@ class TelegramControlChannel(ControlChannel):
continue continue
def _fetch_updates(self) -> list[dict]: def _fetch_updates(self) -> list[dict]:
"""Pull new Telegram updates using the latest offset.""" """Запросить новые обновления Telegram с учётом последнего offset."""
params = {"timeout": 0} params = {"timeout": 0}
if self._offset is not None: if self._offset is not None:
params["offset"] = self._offset params["offset"] = self._offset
@@ -82,7 +85,7 @@ class TelegramControlChannel(ControlChannel):
return result return result
async def _process_update(self, update: dict) -> None: async def _process_update(self, update: dict) -> None:
"""Handle one Telegram update and execute mapped command.""" """Обработать одно обновление Telegram и выполнить соответствующую команду."""
message = update.get("message") or {} message = update.get("message") or {}
text = (message.get("text") or "").strip().lower() text = (message.get("text") or "").strip().lower()
chat = message.get("chat") or {} chat = message.get("chat") or {}
@@ -103,7 +106,7 @@ class TelegramControlChannel(ControlChannel):
await asyncio.to_thread(self._send_message, reply) await asyncio.to_thread(self._send_message, reply)
def _send_message(self, text: str) -> None: def _send_message(self, text: str) -> None:
"""Send plain-text reply to the configured Telegram chat.""" """Отправить текстовый ответ в настроенный чат Telegram."""
encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text}) encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text})
url = f"https://api.telegram.org/bot{self._token}/sendMessage" url = f"https://api.telegram.org/bot{self._token}/sendMessage"
req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST") req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST")

View File

@@ -0,0 +1,11 @@
"""Ядро V2: жизненный цикл менеджера, загрузка конфига и цикл воркера.
Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute()."""
from .config_loader import ConfigLoader
from .control_bridge import ControlChannelBridge
from .health_aggregator import HealthAggregator
from .log_manager import LogManager
from .manager import ConfigManagerV2
from .scheduler import WorkerLoop
__all__ = ["ConfigLoader", "ConfigManagerV2", "ControlChannelBridge", "HealthAggregator", "LogManager", "WorkerLoop"]

View File

@@ -0,0 +1,106 @@
"""Загрузчик конфигурации из файла (YAML/JSON) с обнаружением изменений по хешу.
Читает файл синхронно и асинхронно, парсит по расширению и возвращает последний валидный конфиг при ошибках."""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import os
from typing import Any, Optional
import yaml
logger = logging.getLogger(__name__)
class ConfigLoader:
def __init__(self, path: str):
"""Инициализация состояния загрузчика для указанного пути к файлу конфига."""
self.path = path
self.config: Any = None
self.last_valid_config: Any = None
self._last_seen_hash: Optional[str] = None
logger.debug("ConfigLoader.__init__ result: path=%s", self.path)
def _read_file_sync(self) -> str:
"""Синхронно прочитать сырой текст конфига с диска."""
with open(self.path, "r", encoding="utf-8") as fh:
data = fh.read()
logger.debug("ConfigLoader._read_file_sync result: bytes=%s", len(data))
return data
async def read_file_async(self) -> str:
"""Прочитать сырой текст конфига с диска в рабочем потоке."""
result = await asyncio.to_thread(self._read_file_sync)
logger.debug("ConfigLoader.read_file_async result: bytes=%s", len(result))
return result
def parse_config(self, data: str) -> Any:
"""Распарсить текст конфига как YAML или JSON по расширению файла."""
extension = os.path.splitext(self.path)[1].lower()
try:
if extension in (".yaml", ".yml"):
result = yaml.safe_load(data)
else:
result = json.loads(data)
except Exception: # noqa: BLE001
logger.exception("ConfigLoader.parse_config error: extension=%s", extension)
raise
logger.debug(
"ConfigLoader.parse_config result: extension=%s type=%s",
extension,
type(result).__name__,
)
return result
@staticmethod
def _calculate_hash(data: str) -> str:
"""Вычислить устойчивый хеш содержимого для обнаружения изменений."""
result = hashlib.sha256(data.encode("utf-8")).hexdigest()
logger.debug("ConfigLoader._calculate_hash result: hash=%s", result)
return result
def load_sync(self) -> Any:
"""Синхронно загрузить и распарсить конфиг один раз (для чтения настроек при инициализации)."""
try:
raw_data = self._read_file_sync()
parsed = self.parse_config(raw_data)
self.config = parsed
self.last_valid_config = parsed
logger.debug("ConfigLoader.load_sync result: loaded")
return parsed
except Exception: # noqa: BLE001
logger.exception("ConfigLoader.load_sync error")
return None
async def load_if_changed(self) -> tuple[bool, Any]:
"""Загрузить и распарсить конфиг только при изменении содержимого файла."""
raw_data = await self.read_file_async()
current_hash = self._calculate_hash(raw_data)
if current_hash == self._last_seen_hash:
logger.debug("ConfigLoader.load_if_changed result: changed=False")
return False, self.config
self._last_seen_hash = current_hash
parsed = self.parse_config(raw_data)
self.config = parsed
self.last_valid_config = parsed
logger.debug("ConfigLoader.load_if_changed result: changed=True")
return True, parsed
def extract_scheduler_intervals(
config: Any,
default_update: float,
default_work: float,
) -> tuple[float, float]:
"""Извлекает update_interval и work_interval из конфига (dict). Возвращает значения по умолчанию при ошибке."""
if not isinstance(config, dict):
return default_update, default_work
upd = config.get("update_interval")
wrk = config.get("work_interval")
u = float(upd) if isinstance(upd, (int, float)) and upd > 0 else default_update
w = float(wrk) if isinstance(wrk, (int, float)) and wrk > 0 else default_work
return u, w

View File

@@ -0,0 +1,54 @@
"""Адаптер между каналами управления и жизненным циклом менеджера.
Предоставляет обработчики start/stop/status для ControlChannel (halt, state, status text)."""
from __future__ import annotations
import logging
from collections.abc import Awaitable, Callable
from .types import LifecycleState
logger = logging.getLogger(__name__)
class ControlChannelBridge:
"""Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram)."""
def __init__(
self,
get_state: Callable[[], LifecycleState],
get_status: Callable[[], Awaitable[str]],
start_runtime: Callable[[], Awaitable[str]],
stop_runtime: Callable[[], Awaitable[str]],
):
self._get_state = get_state
self._get_status = get_status
self._start_runtime = start_runtime
self._stop_runtime = stop_runtime
logger.debug("ControlChannelBridge.__init__ result: callbacks configured")
async def on_start(self) -> str:
"""Обработать внешний start через lifecycle-метод менеджера."""
if self._get_state() == LifecycleState.RUNNING:
result = "already running"
logger.debug("ControlChannelBridge.on_start result: %s", result)
return result
result = await self._start_runtime()
logger.debug("ControlChannelBridge.on_start result: %s", result)
return result
async def on_stop(self) -> str:
"""Обработать внешний stop через lifecycle-метод менеджера."""
result = await self._stop_runtime()
logger.debug("ControlChannelBridge.on_stop result: %s", result)
return result
async def on_status(self) -> str:
"""Вернуть текущий текст статуса."""
try:
result = await self._get_status()
except Exception: # noqa: BLE001
logger.exception("ControlChannelBridge.on_status error")
raise
logger.debug("ControlChannelBridge.on_status result: %s", result)
return result

View File

@@ -0,0 +1,71 @@
"""Собирает состояние жизненного цикла и здоровья в один ответ для /health.
Здоровье = был успешный execute() за последние health_timeout секунд; иначе unhealthy с деталью (ошибка или таймаут)."""
from __future__ import annotations
import logging
import time
from collections.abc import Callable
from .types import HealthPayload, LifecycleState
logger = logging.getLogger(__name__)
class HealthAggregator:
"""Формирует ответ здоровья по времени последнего успешного execute() и таймауту."""
def __init__(
self,
get_state: Callable[[], LifecycleState],
get_last_error: Callable[[], str | None],
get_last_success_timestamp: Callable[[], float | None],
health_timeout: int,
get_app_health: Callable[[], HealthPayload],
):
self._get_state = get_state
self._get_last_error = get_last_error
self._get_last_success_timestamp = get_last_success_timestamp
self._health_timeout = health_timeout
self._get_app_health = get_app_health
logger.debug("HealthAggregator.__init__ result: health_timeout=%s", self._health_timeout)
async def collect(self) -> HealthPayload:
"""Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state."""
state = self._get_state()
state_value = state.value
# Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy.
if state != LifecycleState.RUNNING:
result = {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value}
logger.debug("HealthAggregator.collect result: %s", result)
return result
last_success = self._get_last_success_timestamp()
now = time.monotonic()
if last_success is None:
detail = self._get_last_error() or "no successful run yet"
result = {"status": "unhealthy", "detail": detail, "state": state_value}
logger.debug("HealthAggregator.collect result: %s", result)
return result
if (now - last_success) > self._health_timeout:
detail = self._get_last_error() or f"no successful run within {self._health_timeout}s"
result = {"status": "unhealthy", "detail": detail, "state": state_value}
logger.debug("HealthAggregator.collect result: %s", result)
return result
result = self._get_app_health()
status = result.get("status", "unhealthy")
if status == "degraded":
degraded = {"status": "degraded", "detail": result.get("detail", "app degraded"), "state": state_value}
logger.debug("HealthAggregator.collect result: %s", degraded)
return degraded
if status != "ok":
unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value}
logger.debug("HealthAggregator.collect result: %s", unhealthy)
return unhealthy
healthy = {**result, "state": state_value}
logger.debug("HealthAggregator.collect result: %s", healthy)
return healthy

View File

@@ -0,0 +1,38 @@
"""Применение конфигурации логирования из словаря (секция `log` в config.yaml).
Управляет конфигурацией логирования приложения через dictConfig, с восстановлением последнего валидного конфига при ошибке."""
from __future__ import annotations
import logging
import logging.config
from typing import Optional
class LogManager:
"""Применяет секцию `log` из конфига к логированию (dictConfig). При ошибке восстанавливает последний валидный конфиг."""
def __init__(self) -> None:
self.logger = logging.getLogger(__name__)
self._last_valid_config: Optional[dict] = None
def apply_config(self, config: dict) -> None:
"""Применить конфигурацию логирования из словаря. При ошибке восстанавливает последний валидный конфиг."""
logging_config = config.get("log")
if not logging_config:
self.logger.warning(
"Config has no 'log' section; logging parameters from config are not applied (default level may be WARNING)."
)
return
try:
logging.config.dictConfig(logging_config)
self._last_valid_config = logging_config
self.logger.info("Logging configuration applied")
except Exception as e:
self.logger.error("Error applying logging config: %s", e)
if self._last_valid_config:
try:
logging.config.dictConfig(self._last_valid_config)
self.logger.info("Previous logging configuration restored")
except Exception as restore_error:
self.logger.error("Error restoring previous config: %s", restore_error)

View File

@@ -0,0 +1,432 @@
"""Config manager v2: runtime orchestration and configuration updates."""
from __future__ import annotations
import asyncio
import logging
import os
import time
from collections.abc import Callable, Iterable
from typing import Any, Optional, Union
from ..control.base import ControlChannel
from .config_loader import ConfigLoader
from .control_bridge import ControlChannelBridge
from .health_aggregator import HealthAggregator
from .log_manager import LogManager
from .scheduler import WorkerLoop
from .types import HealthPayload, LifecycleState
logger = logging.getLogger(__name__)
def _read_env_interval(name: str, default_value: float) -> float:
"""Read positive float interval from env."""
raw_value = os.environ.get(name)
if raw_value is None:
return float(default_value)
try:
parsed = float(raw_value)
if parsed <= 0:
raise ValueError(f"{name} must be greater than zero")
return parsed
except Exception: # noqa: BLE001
logger.exception(
"ConfigManagerV2 interval parse error: env=%s raw_value=%s fallback=%s",
name,
raw_value,
default_value,
)
return float(default_value)
def _read_env_optional_float(name: str, default_value: Optional[float]) -> Optional[float]:
"""Read optional non-negative float from env (0 or missing => default_value)."""
raw_value = os.environ.get(name)
if raw_value is None:
return default_value
try:
parsed = float(raw_value)
if parsed < 0:
logger.warning("ConfigManagerV2 %s must be >= 0, got %s; using default %s", name, parsed, default_value)
return default_value
return parsed if parsed > 0 else default_value
except Exception: # noqa: BLE001
logger.exception("ConfigManagerV2 %s parse error: raw_value=%s fallback=%s", name, raw_value, default_value)
return default_value
class _RuntimeController:
"""Runtime loops and lifecycle supervision."""
CONTROL_CHANNEL_TIMEOUT = 5.0
def _on_execute_success(self) -> None:
self._last_success_timestamp = time.monotonic()
self._last_execute_error = None
self.logger.debug(
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
self._last_success_timestamp,
)
def _on_execute_error(self, exc: Exception) -> None:
self._last_execute_error = str(exc)
self.logger.error(
"ConfigManagerV2._on_execute_error: %s",
self._last_execute_error,
exc_info=(type(exc), exc, exc.__traceback__),
)
def _on_worker_degraded_change(self, degraded: bool) -> None:
self._worker_degraded = degraded
self.logger.warning("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None:
self._worker_inflight_count = inflight_count
self._worker_timed_out_inflight_count = timed_out_count
self.logger.debug(
"ConfigManagerV2._on_worker_metrics_change result: inflight=%s timed_out_inflight=%s",
inflight_count,
timed_out_count,
)
async def _worker_loop(self) -> None:
self.logger.warning(
"ConfigManagerV2._worker_loop result: started work_interval=%s",
self.work_interval,
)
worker = WorkerLoop(
execute=self.execute,
get_interval=lambda: self.work_interval,
halt_event=self._halt,
on_error=self._on_execute_error,
on_success=self._on_execute_success,
execute_timeout=self._execute_timeout,
on_degraded_change=self._on_worker_degraded_change,
on_metrics_change=self._on_worker_metrics_change,
)
try:
await worker.run()
self.logger.debug("ConfigManagerV2._worker_loop result: completed")
finally:
self.logger.warning("ConfigManagerV2._worker_loop result: stopped")
async def _periodic_update_loop(self) -> None:
self.logger.warning(
"ConfigManagerV2._periodic_update_loop result: started update_interval=%s",
self.update_interval,
)
try:
while not self._halt.is_set():
await self._update_config()
try:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError:
continue
finally:
self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped")
async def _status_text(self) -> str:
health = await self._health_aggregator.collect()
detail = health.get("detail")
worker_tail = (
f"worker_inflight={self._worker_inflight_count}; "
f"worker_timed_out_inflight={self._worker_timed_out_inflight_count}"
)
if detail:
status_text = f"state={self._state.value}; health={health['status']}; detail={detail}; {worker_tail}"
self.logger.debug("ConfigManagerV2._status_text result: %s", status_text)
return status_text
status_text = f"state={self._state.value}; health={health['status']}; {worker_tail}"
self.logger.debug("ConfigManagerV2._status_text result: %s", status_text)
return status_text
async def _start_control_channels(self) -> None:
for channel in self._control_channels:
try:
await asyncio.wait_for(
channel.start(
self._control_bridge.on_start,
self._control_bridge.on_stop,
self._control_bridge.on_status,
),
timeout=self.CONTROL_CHANNEL_TIMEOUT,
)
self.logger.debug("ConfigManagerV2._start_control_channels result: started channel=%s", type(channel).__name__)
except asyncio.TimeoutError:
self.logger.error(
"ConfigManagerV2._start_control_channels timeout channel=%s timeout=%ss",
type(channel).__name__,
self.CONTROL_CHANNEL_TIMEOUT,
)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._start_control_channels error channel=%s", type(channel).__name__)
async def _stop_control_channels(self) -> None:
for channel in self._control_channels:
try:
await asyncio.wait_for(channel.stop(), timeout=self.CONTROL_CHANNEL_TIMEOUT)
self.logger.debug("ConfigManagerV2._stop_control_channels result: stopped channel=%s", type(channel).__name__)
except asyncio.TimeoutError:
self.logger.error(
"ConfigManagerV2._stop_control_channels timeout channel=%s timeout=%ss",
type(channel).__name__,
self.CONTROL_CHANNEL_TIMEOUT,
)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._stop_control_channels error channel=%s", type(channel).__name__)
async def _run_runtime_loops(self) -> None:
self._state = LifecycleState.RUNNING
self.logger.debug("ConfigManagerV2._run_runtime_loops result: state=%s", self._state.value)
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try:
await asyncio.gather(*tasks)
self.logger.debug("ConfigManagerV2._run_runtime_loops result: loops completed")
except asyncio.CancelledError:
self.logger.debug("ConfigManagerV2._run_runtime_loops result: cancelled")
raise
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._run_runtime_loops error")
raise
finally:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
self._state = LifecycleState.STOPPED
self.logger.debug("ConfigManagerV2._run_runtime_loops result: state=%s", self._state.value)
async def _start_runtime(self) -> str:
if self._shutdown.is_set():
result = "manager is shutting down"
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
return result
async with self._runtime_lock:
if self._runtime_task is not None and not self._runtime_task.done():
result = "already running"
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
return result
self._halt.clear()
self._state = LifecycleState.STARTING
self._runtime_task = asyncio.create_task(self._run_runtime_loops(), name="config-manager-v2-runtime")
self._runtime_task.add_done_callback(self._on_runtime_task_done)
result = "start signal accepted"
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
return result
async def _stop_runtime(self) -> str:
self._halt.set()
async with self._runtime_lock:
runtime_task = self._runtime_task
if runtime_task is None:
result = "already stopped"
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
return result
if runtime_task.done():
result = "already stopped"
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
return result
result = "stop signal accepted"
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
return result
def _on_runtime_task_done(self, task: asyncio.Task[None]) -> None:
if task.cancelled():
self.logger.debug("ConfigManagerV2._on_runtime_task_done result: cancelled")
return
try:
exc = task.exception()
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._on_runtime_task_done error while reading task exception")
return
if exc is None:
self.logger.debug("ConfigManagerV2._on_runtime_task_done result: completed")
return
self.logger.error(
"ConfigManagerV2 runtime task failed",
exc_info=(type(exc), exc, exc.__traceback__),
)
async def _run(self) -> None:
self._shutdown.clear()
self._halt.clear()
self._state = LifecycleState.STARTING
self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
await self._update_config()
await self._start_control_channels()
await self._start_runtime()
try:
await self._shutdown.wait()
finally:
self._state = LifecycleState.STOPPING
self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
self._halt.set()
async with self._runtime_lock:
runtime_task = self._runtime_task
if runtime_task is not None:
try:
await runtime_task
except asyncio.CancelledError:
self.logger.debug("ConfigManagerV2._run result: runtime task cancelled")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._run error while awaiting runtime task")
await self._stop_control_channels()
self._runtime_task = None
self._state = LifecycleState.STOPPED
self._task = None
self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
class ConfigManagerV2(_RuntimeController):
"""Public manager API. Каналы управления задаются снаружи через control_channels."""
DEFAULT_UPDATE_INTERVAL = 5
DEFAULT_WORK_INTERVAL = 2
DEFAULT_HEALTH_TIMEOUT = 30
DEFAULT_EXECUTE_TIMEOUT = 600.0
def __init__(
self,
path: str,
control_channels: Optional[
Union[Iterable[ControlChannel], Callable[["ConfigManagerV2"], Iterable[ControlChannel]]]
] = None,
):
self.logger = logging.getLogger(__name__)
self.path = path
self.config: Any = None
self.update_interval = _read_env_interval("UPDATE_INTERVAL", float(self.DEFAULT_UPDATE_INTERVAL))
self.work_interval = _read_env_interval("WORK_INTERVAL", float(self.DEFAULT_WORK_INTERVAL))
self._execute_timeout = _read_env_optional_float("EXECUTE_TIMEOUT", float(self.DEFAULT_EXECUTE_TIMEOUT))
self._loader = ConfigLoader(path)
self._log_manager = LogManager()
self._halt = asyncio.Event()
self._shutdown = asyncio.Event()
self._task: Optional[asyncio.Task[None]] = None
self._runtime_task: Optional[asyncio.Task[None]] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._runtime_lock = asyncio.Lock()
self._state = LifecycleState.IDLE
self._last_execute_error: Optional[str] = None
self._last_success_timestamp: Optional[float] = None
self._worker_degraded = False
self._worker_inflight_count = 0
self._worker_timed_out_inflight_count = 0
initial_config = self._loader.load_sync()
self.config = initial_config
self._health_timeout = self.DEFAULT_HEALTH_TIMEOUT
self._health_aggregator = HealthAggregator(
get_state=lambda: self._state,
get_last_error=lambda: self._last_execute_error,
get_last_success_timestamp=lambda: self._last_success_timestamp,
health_timeout=self._health_timeout,
get_app_health=self.get_health_status,
)
self._control_bridge = ControlChannelBridge(
get_state=lambda: self._state,
get_status=self._status_text,
start_runtime=self._start_runtime,
stop_runtime=self._stop_runtime,
)
if control_channels is None:
self._control_channels = []
elif callable(control_channels):
self._control_channels = list(control_channels(self))
else:
self._control_channels = list(control_channels)
self.logger.debug(
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s execute_timeout=%s control_channels=%s",
self.path,
self.update_interval,
self.work_interval,
self._execute_timeout,
len(self._control_channels),
)
def _apply_config(self, new_config: Any) -> None:
self.config = new_config
if isinstance(new_config, dict):
try:
self._log_manager.apply_config(new_config)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._apply_config error while applying logging config")
raise
self.logger.debug(
"ConfigManagerV2._apply_config result: config_type=%s is_dict=%s",
type(new_config).__name__,
isinstance(new_config, dict),
)
async def _update_config(self) -> None:
try:
changed, new_config = await self._loader.load_if_changed()
if not changed:
self.logger.debug("ConfigManagerV2._update_config result: no changes")
return
self._apply_config(new_config)
self.logger.debug("ConfigManagerV2._update_config result: config updated")
except Exception as exc: # noqa: BLE001
self.logger.exception("ConfigManagerV2._update_config error")
if self._loader.last_valid_config is None:
self.logger.debug(
"ConfigManagerV2._update_config result: no fallback config available detail=%s",
str(exc),
)
return
try:
self._apply_config(self._loader.last_valid_config)
self.logger.debug(
"ConfigManagerV2._update_config result: fallback to last valid config applied",
)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._update_config fallback error")
def execute(self) -> None:
"""Override in subclasses."""
def get_health_status(self) -> HealthPayload:
if self._worker_degraded:
return {"status": "degraded", "detail": "worker has timed-out in-flight execute()"}
return {"status": "ok"}
def get_health_provider(self) -> Callable[[], Any]:
"""Вернуть колбэк для health (для передачи в HttpControlChannel при создании канала снаружи)."""
return self._health_aggregator.collect
async def start(self) -> None:
if self._task is not None and not self._task.done():
await self._start_runtime()
self.logger.debug("ConfigManagerV2.start result: already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.exception("ConfigManagerV2.start error: must be called from within async context")
raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
self.logger.debug("ConfigManagerV2.start result: background task started")
async def stop(self) -> None:
if self._task is None:
self.logger.debug("ConfigManagerV2.stop result: not running")
return
self._shutdown.set()
self._halt.set()
if asyncio.current_task() is self._task:
self.logger.debug("ConfigManagerV2.stop result: stop requested from supervisor task")
return
try:
await self._task
except asyncio.CancelledError:
self.logger.debug("ConfigManagerV2.stop result: supervisor task cancelled")
finally:
self.logger.debug("ConfigManagerV2.stop result: completed")

View File

@@ -0,0 +1,164 @@
"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке.
Базовый режим: один in-flight execute().
Режим деградации: если активный execute() превысил timeout, запускается второй worker-thread,
чтобы работа продолжалась. Одновременно допускается не более двух in-flight задач."""
from __future__ import annotations
import asyncio
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass
from itertools import count
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass
class _InFlightExecute:
id: int
task: asyncio.Task[None]
started_at: float
timeout_reported: bool = False
class WorkerLoop:
def __init__(
self,
execute: Callable[[], None],
get_interval: Callable[[], int | float],
halt_event: asyncio.Event,
on_error: Optional[Callable[[Exception], None]] = None,
on_success: Optional[Callable[[], None]] = None,
execute_timeout: Optional[float] = None,
on_degraded_change: Optional[Callable[[bool], None]] = None,
on_metrics_change: Optional[Callable[[int, int], None]] = None,
):
self._execute = execute
self._get_interval = get_interval
self._halt_event = halt_event
self._on_error = on_error
self._on_success = on_success
self._execute_timeout = execute_timeout
self._on_degraded_change = on_degraded_change
self._on_metrics_change = on_metrics_change
self._inflight: list[_InFlightExecute] = []
self._id_seq = count(1)
self._degraded = False
self._last_metrics: Optional[tuple[int, int]] = None
logger.debug(
"WorkerLoop.__init__ result: execute=%s execute_timeout=%s",
getattr(execute, "__name__", type(execute).__name__),
execute_timeout,
)
def _notify_error(self, exc: Exception) -> None:
if self._on_error is not None:
self._on_error(exc)
def _set_degraded(self, value: bool) -> None:
if self._degraded == value:
return
self._degraded = value
if self._on_degraded_change is not None:
self._on_degraded_change(value)
logger.warning("WorkerLoop.run degraded state changed: degraded=%s", value)
def _start_execute(self) -> None:
execute_id = next(self._id_seq)
execution = _InFlightExecute(
id=execute_id,
task=asyncio.create_task(asyncio.to_thread(self._execute), name=f"worker-loop-execute-{execute_id}"),
started_at=time.monotonic(),
)
self._inflight.append(execution)
logger.debug("WorkerLoop.run result: execute started id=%s inflight=%s", execution.id, len(self._inflight))
def _collect_finished(self) -> None:
pending: list[_InFlightExecute] = []
for execution in self._inflight:
task = execution.task
if not task.done():
pending.append(execution)
continue
try:
task.result()
if self._on_success is not None:
self._on_success()
logger.debug("WorkerLoop.run result: execute completed id=%s", execution.id)
except Exception as exc: # noqa: BLE001
self._notify_error(exc)
logger.exception("WorkerLoop.run error during execute id=%s", execution.id)
self._inflight = pending
def _mark_timeouts(self) -> None:
if self._execute_timeout is None or self._execute_timeout <= 0:
return
now = time.monotonic()
for execution in self._inflight:
if execution.timeout_reported:
continue
if execution.task.done():
continue
elapsed = now - execution.started_at
if elapsed < self._execute_timeout:
continue
execution.timeout_reported = True
self._notify_error(TimeoutError(f"execute() did not finish within {self._execute_timeout}s"))
logger.warning(
"WorkerLoop.run execute timeout: id=%s elapsed=%.3fs timeout=%ss",
execution.id,
elapsed,
self._execute_timeout,
)
def _has_timed_out_inflight(self) -> bool:
return any(item.timeout_reported and not item.task.done() for item in self._inflight)
def _ensure_capacity(self) -> None:
active_count = len(self._inflight)
if active_count == 0:
self._start_execute()
return
if active_count == 1 and self._has_timed_out_inflight():
self._start_execute()
return
def _emit_metrics(self) -> None:
if self._on_metrics_change is None:
return
inflight_count = len(self._inflight)
timed_out_count = sum(1 for item in self._inflight if item.timeout_reported and not item.task.done())
metrics = (inflight_count, timed_out_count)
if self._last_metrics == metrics:
return
self._last_metrics = metrics
self._on_metrics_change(inflight_count, timed_out_count)
async def run(self) -> None:
logger.debug("WorkerLoop.run result: started")
while not self._halt_event.is_set():
self._collect_finished()
self._mark_timeouts()
self._set_degraded(self._has_timed_out_inflight())
self._ensure_capacity()
self._emit_metrics()
timeout = max(self._get_interval(), 0.01)
try:
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
continue
if self._inflight:
if self._has_timed_out_inflight():
logger.warning("WorkerLoop.run stop: timed-out execute still running; exiting without waiting")
else:
await asyncio.gather(*(item.task for item in self._inflight), return_exceptions=True)
self._collect_finished()
self._set_degraded(False)
self._emit_metrics()
logger.debug("WorkerLoop.run result: stopped")

View File

@@ -1,6 +1,8 @@
"""Типы core: состояние здоровья и жизненного цикла.
Используются в core и control для единообразных контрактов."""
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass
from enum import Enum from enum import Enum
from typing import Literal, TypedDict from typing import Literal, TypedDict
@@ -11,6 +13,8 @@ HealthState = Literal["ok", "degraded", "unhealthy"]
class HealthPayload(TypedDict, total=False): class HealthPayload(TypedDict, total=False):
status: HealthState status: HealthState
detail: str detail: str
state: str
"""Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped)."""
class LifecycleState(str, Enum): class LifecycleState(str, Enum):
@@ -19,12 +23,3 @@ class LifecycleState(str, Enum):
RUNNING = "running" RUNNING = "running"
STOPPING = "stopping" STOPPING = "stopping"
STOPPED = "stopped" STOPPED = "stopped"
@dataclass
class HealthServerSettings:
enabled: bool = False
host: str = "0.0.0.0"
port: int = 8000
path: str = "/health"
timeout: float = 3.0

View File

@@ -1,80 +0,0 @@
from __future__ import annotations
import asyncio
import json
from collections.abc import Awaitable, Callable
from typing import Optional
from .types import HealthPayload
class HealthServer:
def __init__(
self,
host: str,
port: int,
path: str,
timeout: float,
health_provider: Callable[[], Awaitable[HealthPayload]],
):
"""Configure lightweight HTTP health server parameters and callback."""
self._host = host
self._port = port
self._path = path
self._timeout = timeout
self._health_provider = health_provider
self._server: Optional[asyncio.base_events.Server] = None
async def start(self) -> None:
"""Start listening for healthcheck requests if not running."""
if self._server is not None:
return
self._server = await asyncio.start_server(self._handle_connection, self._host, self._port)
async def stop(self) -> None:
"""Stop the health server and release the listening socket."""
if self._server is None:
return
self._server.close()
await self._server.wait_closed()
self._server = None
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
"""Process one HTTP connection and return JSON health payload."""
status_code = 404
payload: HealthPayload = {"status": "unhealthy", "detail": "not found"}
try:
request_line = await reader.readline()
parts = request_line.decode("utf-8", errors="ignore").strip().split(" ")
if len(parts) >= 2:
method, path = parts[0], parts[1]
if method == "GET" and path == self._path:
status_code, payload = await self._build_health_response()
except Exception: # noqa: BLE001
status_code = 500
payload = {"status": "unhealthy", "detail": "internal error"}
body = json.dumps(payload).encode("utf-8")
phrase = "OK" if status_code == 200 else "SERVICE UNAVAILABLE" if status_code == 503 else "NOT FOUND"
response = (
f"HTTP/1.1 {status_code} {phrase}\r\n"
"Content-Type: application/json\r\n"
f"Content-Length: {len(body)}\r\n"
"Connection: close\r\n"
"\r\n"
).encode("utf-8") + body
writer.write(response)
await writer.drain()
writer.close()
await writer.wait_closed()
async def _build_health_response(self) -> tuple[int, HealthPayload]:
"""Build HTTP status code and body from application health callback."""
try:
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
status = payload.get("status", "unhealthy")
return (200, payload) if status == "ok" else (503, payload)
except Exception as exc: # noqa: BLE001
return 503, {"status": "unhealthy", "detail": str(exc)}

View File

@@ -1,256 +0,0 @@
from __future__ import annotations
import asyncio
import logging
from collections.abc import Awaitable
from typing import Any, Optional
from ..v1.log_manager import LogManager
from .config_loader import ConfigLoader
from .control.base import ControlChannel
from .health import HealthServer
from .scheduler import WorkerLoop
from .types import HealthPayload, HealthServerSettings, LifecycleState
class ConfigManagerV2:
DEFAULT_UPDATE_INTERVAL = 5.0
DEFAULT_WORK_INTERVAL = 2.0
def __init__(
self,
path: str,
log_manager: Optional[LogManager] = None,
health_settings: Optional[HealthServerSettings] = None,
control_channel: Optional[ControlChannel] = None,
):
"""Initialize manager subsystems and runtime state."""
self.path = path
self.config: Any = None
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
self.work_interval = self.DEFAULT_WORK_INTERVAL
self._loader = ConfigLoader(path)
self._log_manager = log_manager or LogManager()
self._control_channel = control_channel
self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._state = LifecycleState.IDLE
self._last_execute_error: Optional[str] = None
self._health_settings = health_settings or HealthServerSettings(enabled=False)
self._health_server: Optional[HealthServer] = None
if self._health_settings.enabled:
self._health_server = HealthServer(
host=self._health_settings.host,
port=self._health_settings.port,
path=self._health_settings.path,
timeout=self._health_settings.timeout,
health_provider=self._collect_health,
)
self.logger = logging.getLogger(__name__)
def _read_file_sync(self) -> str:
"""Read config file synchronously via the shared loader."""
return self._loader._read_file_sync()
async def _read_file_async(self) -> str:
"""Read config file asynchronously via a worker thread."""
return await self._loader.read_file_async()
def _parse_config(self, data: str) -> Any:
"""Parse raw config text to a Python object."""
return self._loader.parse_config(data)
def _update_intervals_from_config(self) -> None:
"""Refresh work and update intervals from current config."""
if not isinstance(self.config, dict):
return
upd = self.config.get("update_interval")
wrk = self.config.get("work_interval")
if isinstance(upd, (int, float)) and upd > 0:
self.update_interval = float(upd)
else:
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
if isinstance(wrk, (int, float)) and wrk > 0:
self.work_interval = float(wrk)
else:
self.work_interval = self.DEFAULT_WORK_INTERVAL
async def _update_config(self) -> None:
"""Reload config and apply new settings when content changes."""
try:
changed, new_config = await self._loader.load_if_changed()
if not changed:
return
self.config = new_config
self._update_intervals_from_config()
if isinstance(new_config, dict):
self._log_manager.apply_config(new_config)
except Exception as exc: # noqa: BLE001
# Keep current config untouched and continue with last valid settings.
self.logger.error("Error reading/parsing config file: %s", exc)
if self._loader.last_valid_config is not None:
self.config = self._loader.last_valid_config
self._update_intervals_from_config()
def execute(self) -> None:
"""Override in subclass to implement one unit of blocking work."""
def get_health_status(self) -> HealthPayload:
"""Return application-specific health payload for /health."""
return {"status": "ok"}
async def _collect_health(self) -> HealthPayload:
"""Aggregate lifecycle and app status into one health result."""
if self._state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}:
return {"status": "unhealthy", "detail": f"state={self._state.value}"}
if self._last_execute_error is not None:
return {"status": "unhealthy", "detail": self._last_execute_error}
result = self.get_health_status()
status = result.get("status", "unhealthy")
if status not in {"ok", "degraded", "unhealthy"}:
return {"status": "unhealthy", "detail": "invalid health status"}
return result
def _on_execute_success(self) -> None:
"""Clear the last execution error marker after successful run."""
self._last_execute_error = None
def _on_execute_error(self, exc: Exception) -> None:
"""Store and log execution failure details for health reporting."""
self._last_execute_error = str(exc)
self.logger.error("Execution error: %s", exc)
async def _worker_loop(self) -> None:
"""Run execute() repeatedly until shutdown is requested."""
worker = WorkerLoop(
execute=self.execute,
get_interval=lambda: self.work_interval,
halt_event=self._halt,
on_error=self._on_execute_error,
on_success=self._on_execute_success,
)
await worker.run()
async def _periodic_update_loop(self) -> None:
"""Periodically check config file for updates until shutdown."""
while not self._halt.is_set():
await self._update_config()
try:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError:
continue
async def _status_text(self) -> str:
"""Build human-readable runtime status for control channels."""
health = await self._collect_health()
detail = health.get("detail")
if detail:
return f"state={self._state.value}; health={health['status']}; detail={detail}"
return f"state={self._state.value}; health={health['status']}"
async def _control_start(self) -> str:
"""Handle external start command for control channels."""
if self._state == LifecycleState.RUNNING:
return "already running"
self._halt.clear()
return "start signal accepted"
async def _control_stop(self) -> str:
"""Handle external stop command for control channels."""
self._halt.set()
return "stop signal accepted"
async def _control_status(self) -> str:
"""Handle external status command for control channels."""
return await self._status_text()
async def _start_control_channel(self) -> None:
"""Start configured control channel with bound command handlers."""
if self._control_channel is None:
return
await self._control_channel.start(self._control_start, self._control_stop, self._control_status)
async def _stop_control_channel(self) -> None:
"""Stop configured control channel if it is active."""
if self._control_channel is None:
return
await self._control_channel.stop()
async def _run(self) -> None:
"""Run manager lifecycle and coordinate all background tasks."""
self._state = LifecycleState.STARTING
self._halt.clear()
await self._update_config()
if self._health_server is not None:
await self._health_server.start()
await self._start_control_channel()
self._state = LifecycleState.RUNNING
self.logger.info("ConfigManagerV2 started")
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
raise
finally:
self._state = LifecycleState.STOPPING
self._halt.set()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
await self._stop_control_channel()
if self._health_server is not None:
await self._health_server.stop()
self._state = LifecycleState.STOPPED
self.logger.info("ConfigManagerV2 stopped")
async def start(self) -> None:
"""Start manager lifecycle from an active asyncio context."""
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManagerV2 is already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.error("start() must be called from within an async context")
raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
try:
await self._task
finally:
self._task = None
async def stop(self) -> None:
"""Request graceful shutdown and wait for manager completion."""
if self._task is None:
self.logger.warning("ConfigManagerV2 is not running")
return
self._halt.set()
if asyncio.current_task() is self._task:
return
try:
await self._task
except asyncio.CancelledError:
pass

View File

@@ -1,39 +0,0 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable
from typing import Optional
class WorkerLoop:
def __init__(
self,
execute: Callable[[], None],
get_interval: Callable[[], float],
halt_event: asyncio.Event,
on_error: Optional[Callable[[Exception], None]] = None,
on_success: Optional[Callable[[], None]] = None,
):
"""Store callbacks and synchronization primitives for worker execution."""
self._execute = execute
self._get_interval = get_interval
self._halt_event = halt_event
self._on_error = on_error
self._on_success = on_success
async def run(self) -> None:
"""Run execute repeatedly until halt is requested."""
while not self._halt_event.is_set():
try:
await asyncio.to_thread(self._execute)
if self._on_success is not None:
self._on_success()
except Exception as exc: # noqa: BLE001
if self._on_error is not None:
self._on_error(exc)
timeout = max(self._get_interval(), 0.01)
try:
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
continue

View File

@@ -1,6 +1,14 @@
# === Раздел с общими конфигурационными параметрами === # === Раздел с общими конфигурационными параметрами ===
runtime: 5 runtime: 5
# === HTTP-канал управления (ConfigManagerV2): /health, /actions/start, /actions/stop ===
management:
enabled: true
host: "0.0.0.0"
port: 8000
timeout: 3
health_timeout: 30
# === Логирование === # === Логирование ===
log: log:
version: 1 version: 1
@@ -9,8 +17,7 @@ log:
formatters: formatters:
standard: standard:
format: '%(asctime)s %(module)15s [%(levelname)8s]: %(message)s' format: '%(asctime)s %(module)15s [%(levelname)8s]: %(message)s'
telegram:
format: '%(message)s'
handlers: handlers:
console: console:
@@ -26,28 +33,20 @@ log:
filename: logs/log.log filename: logs/log.log
mode: a mode: a
maxBytes: 500000 maxBytes: 500000
backupCount: 15 backupCount: 3
#telegram:
# level: CRITICAL
# formatter: telegram
# class: logging_telegram_handler.TelegramHandler
# chat_id: 211945135
# alias: "PDC"
# -- Логгеры -- # -- Логгеры --
loggers: root:
'':
handlers: [console, file] handlers: [console, file]
level: INFO level: INFO
propagate: False
loggers:
__main__: __main__:
handlers: [console, file] handlers: [console, file]
level: DEBUG level: DEBUG
propagate: False propagate: False
config_manager: config_manager.src.config_manager.v2.manager:
handlers: [console, file] handlers: [console, file]
level: DEBUG level: DEBUG

View File

@@ -1,34 +1,54 @@
#import os # import os
#os.chdir(os.path.dirname(__file__)) # os.chdir(os.path.dirname(__file__))
import asyncio
import logging
from pathlib import Path
from config_manager import ConfigManager from config_manager import ConfigManager
import logging from config_manager.v2.control import HttpControlChannel
import asyncio
from typing import Optional
logger = logging.getLogger() logger = logging.getLogger()
class MyApp(ConfigManager): class MyApp(ConfigManager):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.iter = 0 self.iter = 0
def execute(self) -> None: def execute(self) -> None:
logger.info(f"current iteration {self.iter}") """Успешный прогон сбрасывает таймер health (обновляет время последнего успеха)."""
#logger.critical("current iteration %s", self.iter)
#logger.error("current iteration %s", self.iter)
logger.warning("current iteration %s", self.iter)
#logger.info("current iteration %s", self.iter)
#logger.debug("current iteration %s", self.iter)
self.iter += 1 self.iter += 1
async def main():
app = MyApp("config.yaml")
logger.info("App started")
await app.start()
logger.info("App finished") async def main() -> None:
config_path = Path(__file__).parent / "config.yaml"
print(config_path)
app = MyApp(
str(config_path),
control_channels=lambda m: [
HttpControlChannel(
host="0.0.0.0",
port=8000,
timeout=3,
health_provider=m.get_health_provider(),
)
],
)
logger.info("App starting")
# Менеджер запускаем в фоне (start() не возвращает управление до stop).
asyncio.create_task(app.start())
logger.info("App running; Ctrl+C to stop")
while True: while True:
await asyncio.sleep(1) await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())

View File

@@ -4,6 +4,9 @@ from config_manager.v2 import ConfigManagerV2
class ReloadApp(ConfigManagerV2): class ReloadApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.2
def execute(self) -> None: def execute(self) -> None:
return return
@@ -11,7 +14,7 @@ class ReloadApp(ConfigManagerV2):
def test_invalid_config_keeps_last_valid(tmp_path): def test_invalid_config_keeps_last_valid(tmp_path):
async def scenario() -> None: async def scenario() -> None:
cfg = tmp_path / "config.yaml" cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.2\nupdate_interval: 0.05\n", encoding="utf-8") cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = ReloadApp(str(cfg)) app = ReloadApp(str(cfg))
runner = asyncio.create_task(app.start()) runner = asyncio.create_task(app.start())

View File

@@ -4,6 +4,9 @@ from config_manager.v2 import ConfigManagerV2
class DemoApp(ConfigManagerV2): class DemoApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.05
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.calls = 0 self.calls = 0
@@ -15,7 +18,7 @@ class DemoApp(ConfigManagerV2):
def test_execute_loop_runs(tmp_path): def test_execute_loop_runs(tmp_path):
async def scenario() -> None: async def scenario() -> None:
cfg = tmp_path / "config.yaml" cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8") cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = DemoApp(str(cfg)) app = DemoApp(str(cfg))
runner = asyncio.create_task(app.start()) runner = asyncio.create_task(app.start())

View File

@@ -23,6 +23,9 @@ class DummyControlChannel(ControlChannel):
class ControlledApp(ConfigManagerV2): class ControlledApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.05
def execute(self) -> None: def execute(self) -> None:
return return
@@ -30,10 +33,10 @@ class ControlledApp(ConfigManagerV2):
def test_control_channel_can_stop_manager(tmp_path): def test_control_channel_can_stop_manager(tmp_path):
async def scenario() -> None: async def scenario() -> None:
cfg = tmp_path / "config.yaml" cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8") cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
channel = DummyControlChannel() channel = DummyControlChannel()
app = ControlledApp(str(cfg), control_channel=channel) app = ControlledApp(str(cfg), control_channels=[channel])
runner = asyncio.create_task(app.start()) runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12) await asyncio.sleep(0.12)
@@ -44,11 +47,14 @@ def test_control_channel_can_stop_manager(tmp_path):
status_text = await channel.on_status() status_text = await channel.on_status()
assert "state=running" in status_text assert "state=running" in status_text
assert "worker_inflight=" in status_text
assert "worker_timed_out_inflight=" in status_text
stop_text = await channel.on_stop() stop_text = await channel.on_stop()
assert "stop signal accepted" in stop_text assert "stop signal accepted" in stop_text
await runner await runner
await app.stop()
assert channel.stopped is True assert channel.stopped is True
asyncio.run(scenario()) asyncio.run(scenario())

View File

@@ -1,6 +1,7 @@
import asyncio import asyncio
import json
from config_manager.v2.health import HealthServer from config_manager.v2.control import HttpControlChannel
def test_health_mapping_ok_to_200(): def test_health_mapping_ok_to_200():
@@ -8,10 +9,9 @@ def test_health_mapping_ok_to_200():
return {"status": "ok"} return {"status": "ok"}
async def scenario() -> None: async def scenario() -> None:
server = HealthServer( server = HttpControlChannel(
host="127.0.0.1", host="127.0.0.1",
port=8000, port=8000,
path="/health",
timeout=0.2, timeout=0.2,
health_provider=provider, health_provider=provider,
) )
@@ -27,10 +27,9 @@ def test_health_mapping_unhealthy_to_503():
return {"status": "unhealthy", "detail": "worker failed"} return {"status": "unhealthy", "detail": "worker failed"}
async def scenario() -> None: async def scenario() -> None:
server = HealthServer( server = HttpControlChannel(
host="127.0.0.1", host="127.0.0.1",
port=8000, port=8000,
path="/health",
timeout=0.2, timeout=0.2,
health_provider=provider, health_provider=provider,
) )
@@ -39,3 +38,64 @@ def test_health_mapping_unhealthy_to_503():
assert payload["status"] == "unhealthy" assert payload["status"] == "unhealthy"
asyncio.run(scenario()) asyncio.run(scenario())
def test_action_routes_call_callbacks():
events: list[str] = []
async def provider():
return {"status": "ok"}
async def on_start() -> str:
events.append("start")
return "start accepted"
async def on_stop() -> str:
events.append("stop")
return "stop accepted"
async def on_status() -> str:
return "ok"
async def request(port: int, path: str) -> tuple[int, dict[str, str]]:
reader, writer = await asyncio.open_connection("127.0.0.1", port)
writer.write(
f"GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n".encode("utf-8")
)
await writer.drain()
raw = await reader.read()
writer.close()
await writer.wait_closed()
header, body = raw.split(b"\r\n\r\n", maxsplit=1)
status_code = int(header.split(b" ")[1])
payload = json.loads(body.decode("utf-8"))
return status_code, payload
async def scenario() -> None:
channel = HttpControlChannel(
host="127.0.0.1",
port=0,
timeout=0.2,
health_provider=provider,
)
await channel.start(on_start, on_stop, on_status)
try:
port = channel.port
assert port > 0
start_code, start_payload = await request(port, "/actions/start")
stop_code, stop_payload = await request(port, "/actions/stop")
finally:
await channel.stop()
assert start_code == 200
assert start_payload["status"] == "ok"
assert start_payload["detail"] == "start accepted"
assert stop_code == 200
assert stop_payload["status"] == "ok"
assert stop_payload["detail"] == "stop accepted"
assert events == ["start", "stop"]
asyncio.run(scenario())

View File

@@ -0,0 +1,153 @@
import asyncio
import threading
import time
from config_manager.v2 import ConfigManagerV2
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
class DummyControlChannel(ControlChannel):
def __init__(self):
self.on_start: StartHandler | None = None
self.on_stop: StopHandler | None = None
self.on_status: StatusHandler | None = None
self.started = False
self.stopped = False
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
self.on_start = on_start
self.on_stop = on_stop
self.on_status = on_status
self.started = True
async def stop(self) -> None:
self.stopped = True
class RestartableApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.05
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
def execute(self) -> None:
self.calls += 1
class TimeoutAwareApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.02
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
self.active = 0
self.max_active = 0
self._lock = threading.Lock()
def execute(self) -> None:
with self._lock:
self.calls += 1
self.active += 1
self.max_active = max(self.max_active, self.active)
try:
time.sleep(0.2)
finally:
with self._lock:
self.active -= 1
class NormalSingleThreadApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.02
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
self.active = 0
self.max_active = 0
self._lock = threading.Lock()
def execute(self) -> None:
with self._lock:
self.calls += 1
self.active += 1
self.max_active = max(self.max_active, self.active)
try:
time.sleep(0.03)
finally:
with self._lock:
self.active -= 1
def test_control_channel_stop_and_start_resumes_execute(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
channel = DummyControlChannel()
app = RestartableApp(str(cfg), control_channels=[channel])
await app.start()
await asyncio.sleep(0.2)
before_stop = app.calls
assert before_stop > 0
assert channel.on_stop is not None
assert channel.on_start is not None
stop_text = await channel.on_stop()
assert "stop signal accepted" in stop_text
await asyncio.sleep(0.2)
after_stop = app.calls
assert after_stop == before_stop
start_text = await channel.on_start()
assert "start signal accepted" in start_text
await asyncio.sleep(0.2)
assert app.calls > after_stop
await app.stop()
assert channel.stopped is True
asyncio.run(scenario())
def test_normal_mode_uses_single_inflight_execute(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = NormalSingleThreadApp(str(cfg))
await app.start()
await asyncio.sleep(0.25)
health = await app.get_health_provider()()
await app.stop()
assert app.calls >= 2
assert app.max_active == 1
assert health["status"] == "ok"
asyncio.run(scenario())
def test_execute_timeout_does_not_start_parallel_runs(tmp_path, monkeypatch):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
monkeypatch.setenv("EXECUTE_TIMEOUT", "0.05")
app = TimeoutAwareApp(str(cfg))
await app.start()
await asyncio.sleep(0.35)
degraded_health = await app.get_health_provider()()
await app.stop()
assert app.calls >= 1
assert app._last_execute_error is not None
assert "did not finish within" in app._last_execute_error
assert app.max_active == 2
assert degraded_health["status"] == "degraded"
asyncio.run(scenario())

View File

@@ -6,6 +6,9 @@ from config_manager.v2 import ConfigManagerV2
class BlockingApp(ConfigManagerV2): class BlockingApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.05
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.started_event = threading.Event() self.started_event = threading.Event()
@@ -23,7 +26,7 @@ class BlockingApp(ConfigManagerV2):
def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path): def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path):
async def scenario() -> None: async def scenario() -> None:
cfg = tmp_path / "config.yaml" cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8") cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = BlockingApp(str(cfg)) app = BlockingApp(str(cfg))
runner = asyncio.create_task(app.start()) runner = asyncio.create_task(app.start())