Compare commits
19 Commits
feature/he
...
a1dd495d6d
| Author | SHA1 | Date | |
|---|---|---|---|
| a1dd495d6d | |||
| aa32c23dba | |||
| 7b5d6d2156 | |||
| aee8f7460f | |||
| ad89b3db92 | |||
| 6502f2252d | |||
| 67cd098f54 | |||
| 8d177b0fd1 | |||
| 3293814898 | |||
| e7d11ddf71 | |||
| 2b02af60d5 | |||
| b2442f4d91 | |||
| f5bb681ddb | |||
| 058c19d677 | |||
| 608cd42719 | |||
| 2d6179d366 | |||
| d888ae7acb | |||
| 1d71ce406f | |||
| 8da6df0b2a |
208
README.md
208
README.md
@@ -1,12 +1,210 @@
|
||||
# Config Manager
|
||||
## Description
|
||||
This package was created to run my applications.
|
||||
The ConfigManager class implements the entry point for the program and provides the actual application configuration. It also simplifies logging setup.
|
||||
## Описание
|
||||
Пакет предназначен для запуска приложений с периодическим выполнением логики, перезагрузкой конфига и управлением по HTTP API.
|
||||
|
||||
## Installation
|
||||
**Контракт:** приложение наследует **ConfigManagerV2**, переопределяет **execute()** (периодическая работа). Управление (старт/стоп, health) — через каналы, которые создаются снаружи и передаются в конструктор в **control_channels** (в т.ч. HttpControlChannel для API).
|
||||
|
||||
## ConfigManager: устройство и взаимосвязи
|
||||
|
||||
**ConfigManager** (класс ConfigManagerV2) — точка входа приложения. Он наследует внутреннюю логику от **\_RuntimeController** (циклы воркера и обновления конфига, запуск/остановка каналов управления).
|
||||
|
||||
**Ядро (core):**
|
||||
- **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг.
|
||||
- **WorkerLoop** — в отдельном потоке циклически вызывает ваш метод `execute()` с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки.
|
||||
- **LogManager** — применяет секцию `log` из конфига к логированию (dictConfig).
|
||||
- **HealthAggregator** — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного `execute()` и таймаут здоровья; формирует единый ответ для health (ok/unhealthy).
|
||||
- **ControlChannelBridge** — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса).
|
||||
|
||||
**Каналы управления (control):**
|
||||
- **ControlChannel** — абстрактный контракт: `start(on_start, on_stop, on_status)`, `stop()`.
|
||||
- **HttpControlChannel** — HTTP API (`/health`, `/actions/start`, `/actions/stop`, `/actions/status`); использует **UvicornServerRunner**; для `/health` вызывает **HealthAggregator.collect()**, для действий — переданные обработчики из **ControlChannelBridge**.
|
||||
- **TelegramControlChannel** — реализация через long polling Telegram; команды `/start`, `/stop`, `/status` вызывают переданные обработчики.
|
||||
|
||||
**Поток работы:** при `start()` менеджер поднимает каналы из **control_channels** (заданные снаружи), затем запускает два цикла: **WorkerLoop** и периодическое обновление конфига через **ConfigLoader**. Управление по API: `/health`, `/actions/start`, `/actions/stop` — если в control_channels передан **HttpControlChannel**. Остановка по halt завершает оба цикла; в конце останавливаются все каналы.
|
||||
|
||||
## Запуск приложения с ConfigManagerV2 и HttpControlChannel
|
||||
|
||||
1. **Наследуйте ConfigManagerV2** и реализуйте метод `execute()` (в нём — ваша периодическая работа). При необходимости переопределите `get_health_status()` для кастомного ответа `/health`.
|
||||
|
||||
2. **Создайте каналы снаружи и передайте в конструктор.** Для HTTP API создайте **HttpControlChannel**; для health нужен колбэк менеджера — передайте **control_channels** как фабрику (lambda, получающую менеджер):
|
||||
```python
|
||||
from config_manager.v2.control import HttpControlChannel
|
||||
|
||||
app = MyApp(
|
||||
str(path_to_config),
|
||||
control_channels=lambda m: [
|
||||
HttpControlChannel(
|
||||
host="0.0.0.0",
|
||||
port=8000,
|
||||
timeout=3,
|
||||
health_provider=m.get_health_provider(),
|
||||
)
|
||||
],
|
||||
)
|
||||
```
|
||||
Либо передайте готовый список каналов: `control_channels=[channel1, channel2]`.
|
||||
|
||||
3. **Запустите из async-контекста:** `await app.start()` или `asyncio.create_task(app.start())` для фона. Остановка: `await app.stop()` или запрос `/actions/stop` по HTTP.
|
||||
|
||||
**Минимальный пример с HTTP API:**
|
||||
|
||||
```python
|
||||
import asyncio
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from config_manager import ConfigManager
|
||||
from config_manager.v2.control import HttpControlChannel
|
||||
|
||||
class MyApp(ConfigManager):
|
||||
def execute(self) -> None:
|
||||
pass # ваша периодическая работа
|
||||
|
||||
async def main() -> None:
|
||||
app = MyApp(
|
||||
str(Path(__file__).parent / "config.yaml"),
|
||||
control_channels=lambda m: [
|
||||
HttpControlChannel(
|
||||
host="0.0.0.0", port=8000, timeout=3,
|
||||
health_provider=m.get_health_provider(),
|
||||
)
|
||||
],
|
||||
)
|
||||
asyncio.create_task(app.start())
|
||||
await asyncio.sleep(3600)
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
Готовый пример: `tests/test_app.py`.
|
||||
|
||||
## Диаграмма классов
|
||||
|
||||
```mermaid
|
||||
classDiagram
|
||||
direction TB
|
||||
|
||||
class ConfigManagerV2 {
|
||||
+str path
|
||||
+Any config
|
||||
+float update_interval
|
||||
+float work_interval
|
||||
-ConfigLoader _loader
|
||||
-LifecycleState _state
|
||||
+start() async
|
||||
+stop() async
|
||||
+execute()*
|
||||
+get_health_status() HealthPayload
|
||||
-_run() async
|
||||
-_worker_loop() async
|
||||
-_periodic_update_loop() async
|
||||
}
|
||||
|
||||
class _RuntimeController {
|
||||
<<внутренний>>
|
||||
-_on_execute_success()
|
||||
-_on_execute_error(exc)
|
||||
-_worker_loop() async
|
||||
-_periodic_update_loop() async
|
||||
-_start_control_channels() async
|
||||
-_stop_control_channels() async
|
||||
-_run() async
|
||||
}
|
||||
|
||||
class ConfigLoader {
|
||||
+str path
|
||||
+Any config
|
||||
+Any last_valid_config
|
||||
+load_if_changed() async
|
||||
+parse_config(data) Any
|
||||
-_read_file_sync() str
|
||||
-read_file_async() async
|
||||
}
|
||||
|
||||
class WorkerLoop {
|
||||
-Callable execute
|
||||
-Callable get_interval
|
||||
-Event halt_event
|
||||
+run() async
|
||||
}
|
||||
|
||||
class LogManager {
|
||||
+apply_config(config) None
|
||||
}
|
||||
|
||||
class ControlChannel {
|
||||
<<абстрактный>>
|
||||
+start(on_start, on_stop, on_status) async*
|
||||
+stop() async*
|
||||
}
|
||||
|
||||
class TelegramControlChannel {
|
||||
-str _token
|
||||
-int _chat_id
|
||||
+start(on_start, on_stop, on_status) async
|
||||
+stop() async
|
||||
-_poll_loop() async
|
||||
}
|
||||
|
||||
class HttpControlChannel {
|
||||
-UvicornServerRunner _runner
|
||||
-Callable _health_provider
|
||||
+start(on_start, on_stop, on_status) async
|
||||
+stop() async
|
||||
+int port
|
||||
}
|
||||
|
||||
class HealthAggregator {
|
||||
-Callable get_state
|
||||
-Callable get_app_health
|
||||
+collect() async HealthPayload
|
||||
}
|
||||
|
||||
class ControlChannelBridge {
|
||||
-Event _halt
|
||||
-Callable _get_state
|
||||
-Callable _get_status
|
||||
+on_start() async str
|
||||
+on_stop() async str
|
||||
+on_status() async str
|
||||
}
|
||||
|
||||
class UvicornServerRunner {
|
||||
-Server _server
|
||||
-Task _serve_task
|
||||
+start(app) async
|
||||
+stop() async
|
||||
+int port
|
||||
}
|
||||
|
||||
ConfigManagerV2 --|> _RuntimeController : наследует
|
||||
ConfigManagerV2 --> ConfigLoader : использует
|
||||
ConfigManagerV2 --> LogManager : использует
|
||||
ConfigManagerV2 --> HealthAggregator : использует
|
||||
ConfigManagerV2 --> ControlChannelBridge : использует
|
||||
ConfigManagerV2 ..> ControlChannel : список каналов
|
||||
_RuntimeController ..> WorkerLoop : создаёт в _worker_loop
|
||||
TelegramControlChannel --|> ControlChannel : реализует
|
||||
HttpControlChannel --|> ControlChannel : реализует
|
||||
HttpControlChannel --> UvicornServerRunner : использует
|
||||
HttpControlChannel ..> HealthAggregator : health_provider
|
||||
ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status
|
||||
```
|
||||
|
||||
## Логирование
|
||||
Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться.
|
||||
|
||||
**Как проверить, что конфигурация логирования применилась:**
|
||||
- Убедитесь, что путь к файлу конфига верный и файл загружается при старте (в логах нет ошибки чтения конфига).
|
||||
- Убедитесь, что в конфиге есть ключ `log` с `version: 1`, `handlers` и `loggers` (пример — `tests/config.yaml`).
|
||||
- После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v2.core.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO.
|
||||
|
||||
## Установка
|
||||
``pip install git+https://git.lesha.spb.ru/alex/config_manager.git``
|
||||
|
||||
## Contacts
|
||||
## Контакты
|
||||
- **e-mail**: lesha.spb@gmail.com
|
||||
- **telegram**: https://t.me/lesha_spb
|
||||
|
||||
|
||||
@@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "config_manager"
|
||||
version = "2.0.0"
|
||||
description = "Config manager for building applications"
|
||||
version = "2.1.7"
|
||||
description = "Фикс вечного цикла при ошибке"
|
||||
authors = [
|
||||
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
|
||||
]
|
||||
@@ -13,6 +13,8 @@ readme = "README.md"
|
||||
requires-python = ">=3.8"
|
||||
dependencies = [
|
||||
"PyYAML>=6.0",
|
||||
"fastapi>=0.100.0",
|
||||
"uvicorn[standard]>=0.22.0",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
@@ -1,3 +1,2 @@
|
||||
from .v2.manager import ConfigManagerV2 as ConfigManager
|
||||
from .v1.cfg_manager import ConfigManager as LegacyConfigManager
|
||||
from .v1.log_manager import LogManager
|
||||
from .v2 import ConfigManagerV2 as ConfigManager
|
||||
from .v2.core.log_manager import LogManager
|
||||
@@ -1,137 +0,0 @@
|
||||
import logging
|
||||
import logging.config
|
||||
import asyncio
|
||||
import json
|
||||
import yaml
|
||||
import os
|
||||
from typing import Any, Optional
|
||||
|
||||
from .log_manager import LogManager
|
||||
|
||||
class ConfigManager:
|
||||
DEFAULT_UPDATE_INTERVAL = 5.0
|
||||
DEFAULT_WORK_INTERVAL = 2.0
|
||||
|
||||
def __init__(self, path: str, log_manager: Optional[LogManager] = None):
|
||||
self.path = path
|
||||
self.config: Any = None
|
||||
self._last_hash = None
|
||||
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
||||
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
||||
self._halt = asyncio.Event()
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
|
||||
self._log_manager = log_manager or LogManager()
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def _read_file_sync(self) -> str:
|
||||
with open(self.path, "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
async def _read_file_async(self) -> str:
|
||||
return await asyncio.to_thread(self._read_file_sync)
|
||||
|
||||
def _parse_config(self, data) -> Any:
|
||||
extension = os.path.splitext(self.path)[1].lower()
|
||||
if extension in (".yaml", ".yml"):
|
||||
return yaml.safe_load(data)
|
||||
else:
|
||||
return json.loads(data)
|
||||
|
||||
def _update_intervals_from_config(self) -> None:
|
||||
if not self.config:
|
||||
return
|
||||
upd = self.config.get("update_interval")
|
||||
wrk = self.config.get("work_interval")
|
||||
|
||||
if isinstance(upd, (int, float)) and upd > 0:
|
||||
self.update_interval = float(upd)
|
||||
self.logger.info(f"Update interval set to {self.update_interval} seconds")
|
||||
else:
|
||||
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
||||
|
||||
if isinstance(wrk, (int, float)) and wrk > 0:
|
||||
self.work_interval = float(wrk)
|
||||
self.logger.info(f"Work interval set to {self.work_interval} seconds")
|
||||
else:
|
||||
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
||||
|
||||
async def _update_config(self) -> None:
|
||||
try:
|
||||
data = await self._read_file_async()
|
||||
current_hash = hash(data)
|
||||
if current_hash != self._last_hash:
|
||||
new_config = self._parse_config(data)
|
||||
self.config = new_config
|
||||
self._last_hash = current_hash
|
||||
|
||||
self._log_manager.apply_config(new_config)
|
||||
self._update_intervals_from_config()
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error reading/parsing config file: {e}")
|
||||
|
||||
def execute(self) -> None:
|
||||
"""
|
||||
Метод для переопределения в подклассах.
|
||||
Здесь может быть блокирующая работа.
|
||||
Запускается в отдельном потоке.
|
||||
"""
|
||||
pass
|
||||
|
||||
async def _worker_loop(self) -> None:
|
||||
while not self._halt.is_set():
|
||||
await asyncio.to_thread(self.execute)
|
||||
await asyncio.sleep(self.work_interval)
|
||||
|
||||
async def _periodic_update_loop(self) -> None:
|
||||
while not self._halt.is_set():
|
||||
await self._update_config()
|
||||
await asyncio.sleep(self.update_interval)
|
||||
|
||||
async def _run(self) -> None:
|
||||
"""Внутренняя корутина, запускающая все циклы"""
|
||||
self._halt.clear()
|
||||
self.logger.info("ConfigManager started")
|
||||
try:
|
||||
await asyncio.gather(
|
||||
self._worker_loop(),
|
||||
self._periodic_update_loop()
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
self.logger.info("ConfigManager tasks cancelled")
|
||||
finally:
|
||||
self.logger.info("ConfigManager stopped")
|
||||
|
||||
async def start(self) -> None:
|
||||
if self._task is not None and not self._task.done():
|
||||
self.logger.warning("ConfigManager is already running")
|
||||
return
|
||||
|
||||
try:
|
||||
self._loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
self.logger.error("start() must be called from within an async context")
|
||||
raise
|
||||
|
||||
self.logger.info("ConfigManager starting and awaiting _run()")
|
||||
await self._run()
|
||||
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Останавливает менеджер конфигурации и ожидает завершения"""
|
||||
if self._task is None:
|
||||
self.logger.warning("ConfigManager is not running")
|
||||
return
|
||||
|
||||
self.logger.info("ConfigManager stopping...")
|
||||
self._halt.set()
|
||||
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
self._task = None
|
||||
self.logger.info("ConfigManager stopped successfully")
|
||||
@@ -1,40 +0,0 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class LogManager:
|
||||
"""
|
||||
Управляет конфигурацией логирования приложения.
|
||||
Применяет конфигурацию из словаря с обработкой ошибок.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self._last_valid_config: Optional[dict] = None
|
||||
|
||||
def apply_config(self, config: dict) -> None:
|
||||
"""
|
||||
Применяет конфигурацию логирования из словаря.
|
||||
При ошибке восстанавливает последний валидный конфиг.
|
||||
|
||||
Args:
|
||||
config: Словарь с настройками логирования (из файла конфига)
|
||||
"""
|
||||
logging_config = config.get("log")
|
||||
if not logging_config:
|
||||
return
|
||||
|
||||
try:
|
||||
logging.config.dictConfig(logging_config)
|
||||
self._last_valid_config = logging_config
|
||||
self.logger.info("Logging configuration applied")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error applying logging config: {e}")
|
||||
|
||||
# Если был предыдущий валидный конфиг, восстанавливаем его
|
||||
if self._last_valid_config:
|
||||
try:
|
||||
logging.config.dictConfig(self._last_valid_config)
|
||||
self.logger.info("Previous logging configuration restored")
|
||||
except Exception as restore_error:
|
||||
self.logger.error(f"Error restoring previous config: {restore_error}")
|
||||
@@ -1,4 +1,6 @@
|
||||
from .manager import ConfigManagerV2
|
||||
from .types import HealthServerSettings
|
||||
"""Публичный API: точка входа в менеджер конфигурации.
|
||||
|
||||
__all__ = ["ConfigManagerV2", "HealthServerSettings"]
|
||||
Контракт: наследование от ConfigManagerV2, переопределение execute(), управление через API (config.yaml, секция management)."""
|
||||
from .core import ConfigManagerV2
|
||||
|
||||
__all__ = ["ConfigManagerV2"]
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Optional
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
class ConfigLoader:
|
||||
def __init__(self, path: str):
|
||||
"""Initialize loader state for a specific config file path."""
|
||||
self.path = path
|
||||
self.config: Any = None
|
||||
self.last_valid_config: Any = None
|
||||
self._last_seen_hash: Optional[str] = None
|
||||
|
||||
def _read_file_sync(self) -> str:
|
||||
"""Read raw config text from disk synchronously."""
|
||||
with open(self.path, "r", encoding="utf-8") as fh:
|
||||
return fh.read()
|
||||
|
||||
async def read_file_async(self) -> str:
|
||||
"""Read raw config text from disk in a worker thread."""
|
||||
return await asyncio.to_thread(self._read_file_sync)
|
||||
|
||||
def parse_config(self, data: str) -> Any:
|
||||
"""Parse config text as YAML or JSON based on file extension."""
|
||||
extension = os.path.splitext(self.path)[1].lower()
|
||||
if extension in (".yaml", ".yml"):
|
||||
return yaml.safe_load(data)
|
||||
return json.loads(data)
|
||||
|
||||
@staticmethod
|
||||
def _calculate_hash(data: str) -> str:
|
||||
"""Calculate a stable content hash for change detection."""
|
||||
return hashlib.sha256(data.encode("utf-8")).hexdigest()
|
||||
|
||||
async def load_if_changed(self) -> tuple[bool, Any]:
|
||||
"""Load and parse config only when file content changed."""
|
||||
raw_data = await self.read_file_async()
|
||||
current_hash = self._calculate_hash(raw_data)
|
||||
if current_hash == self._last_seen_hash:
|
||||
return False, self.config
|
||||
|
||||
self._last_seen_hash = current_hash
|
||||
parsed = self.parse_config(raw_data)
|
||||
self.config = parsed
|
||||
self.last_valid_config = parsed
|
||||
return True, parsed
|
||||
@@ -1,4 +1,8 @@
|
||||
"""Каналы внешнего управления: абстракция и реализация (HTTP, Telegram).
|
||||
|
||||
Позволяет запускать, останавливать и запрашивать статус менеджера через HTTP API и ботов."""
|
||||
from .base import ControlChannel
|
||||
from .http_channel import HttpControlChannel
|
||||
from .telegram import TelegramControlChannel
|
||||
|
||||
__all__ = ["ControlChannel", "TelegramControlChannel"]
|
||||
__all__ = ["ControlChannel", "HttpControlChannel", "TelegramControlChannel"]
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
"""Базовый абстрактный канал управления и типы обработчиков команд.
|
||||
|
||||
Определяет контракт: старт/стоп канала и привязка обработчиков start/stop/status."""
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
@@ -12,10 +15,10 @@ StatusHandler = Callable[[], Awaitable[str]]
|
||||
class ControlChannel(ABC):
|
||||
@abstractmethod
|
||||
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||
"""Start channel and bind command handlers."""
|
||||
"""Запустить канал и привязать обработчики команд."""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def stop(self) -> None:
|
||||
"""Stop channel and release its resources."""
|
||||
"""Остановить канал и освободить его ресурсы."""
|
||||
raise NotImplementedError
|
||||
|
||||
296
src/config_manager/v2/control/http_channel.py
Normal file
296
src/config_manager/v2/control/http_channel.py
Normal file
@@ -0,0 +1,296 @@
|
||||
"""HTTP-канал управления: /health, /actions/start, /actions/stop (реализация ControlChannel)."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import Any, Optional
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi import Request
|
||||
from fastapi.responses import JSONResponse
|
||||
from uvicorn import Config, Server
|
||||
|
||||
from ..core.types import HealthPayload
|
||||
from .base import ControlChannel, StartHandler, StatusHandler, StopHandler
|
||||
|
||||
PATH_HEALTH = "/health"
|
||||
PATH_ACTION_START = "/actions/start"
|
||||
PATH_ACTION_STOP = "/actions/stop"
|
||||
PATH_ACTION_STATUS = "/actions/status"
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UvicornServerRunner:
|
||||
"""Lifecycle wrapper around uvicorn Server."""
|
||||
|
||||
def __init__(self, host: str, port: int, timeout: int):
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._timeout = timeout
|
||||
self._server: Optional[Server] = None
|
||||
self._serve_task: Optional[asyncio.Task[None]] = None
|
||||
self._bound_port: Optional[int] = None
|
||||
logger.debug(
|
||||
"UvicornServerRunner.__init__ result: host=%s port=%s timeout=%s",
|
||||
self._host,
|
||||
self._port,
|
||||
self._timeout,
|
||||
)
|
||||
|
||||
async def _raise_if_start_task_failed(self) -> None:
|
||||
if self._serve_task is None or not self._serve_task.done():
|
||||
return
|
||||
try:
|
||||
await self._serve_task
|
||||
except SystemExit as exc:
|
||||
raise RuntimeError(f"Management server exited during startup with code {exc.code}") from exc
|
||||
except Exception as exc: # noqa: BLE001
|
||||
raise RuntimeError("Management server failed during startup") from exc
|
||||
raise RuntimeError("Management server stopped unexpectedly during startup")
|
||||
|
||||
async def _wait_until_started(self) -> None:
|
||||
if self._server is None:
|
||||
raise RuntimeError("Management server is not initialized")
|
||||
loop = asyncio.get_running_loop()
|
||||
deadline = loop.time() + max(float(self._timeout), 1.0)
|
||||
while not self._server.started:
|
||||
await self._raise_if_start_task_failed()
|
||||
if loop.time() >= deadline:
|
||||
raise TimeoutError("Management server startup timed out")
|
||||
await asyncio.sleep(0.05)
|
||||
|
||||
def _resolve_bound_port(self) -> int:
|
||||
if self._server is None:
|
||||
return self._port
|
||||
servers = getattr(self._server, "servers", None)
|
||||
if not servers:
|
||||
return self._port
|
||||
sockets = getattr(servers[0], "sockets", None)
|
||||
if not sockets:
|
||||
return self._port
|
||||
sockname = sockets[0].getsockname()
|
||||
if isinstance(sockname, tuple) and len(sockname) > 1:
|
||||
return int(sockname[1])
|
||||
return self._port
|
||||
|
||||
async def _cleanup_start_failure(self) -> None:
|
||||
if self._server is not None:
|
||||
self._server.should_exit = True
|
||||
if self._serve_task is not None:
|
||||
try:
|
||||
await self._serve_task
|
||||
except BaseException: # noqa: BLE001
|
||||
logger.exception("UvicornServerRunner._cleanup_start_failure error")
|
||||
self._server = None
|
||||
self._serve_task = None
|
||||
self._bound_port = None
|
||||
logger.debug("UvicornServerRunner._cleanup_start_failure result: state reset")
|
||||
|
||||
async def start(self, app: FastAPI) -> None:
|
||||
if self._serve_task is not None and not self._serve_task.done():
|
||||
logger.debug("UvicornServerRunner.start result: already running")
|
||||
return
|
||||
if self._serve_task is not None and self._serve_task.done():
|
||||
self._serve_task = None
|
||||
try:
|
||||
config = Config(app=app, host=self._host, port=self._port, log_level="warning")
|
||||
self._server = Server(config)
|
||||
self._serve_task = asyncio.create_task(self._server.serve(), name="http-control-channel-serve")
|
||||
await self._wait_until_started()
|
||||
self._bound_port = self._resolve_bound_port()
|
||||
logger.debug(
|
||||
"UvicornServerRunner.start result: running host=%s requested_port=%s bound_port=%s",
|
||||
self._host,
|
||||
self._port,
|
||||
self._bound_port,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("UvicornServerRunner.start error")
|
||||
await self._cleanup_start_failure()
|
||||
raise
|
||||
|
||||
async def stop(self) -> None:
|
||||
if self._server is None or self._serve_task is None:
|
||||
logger.debug("UvicornServerRunner.stop result: already stopped")
|
||||
return
|
||||
self._server.should_exit = True
|
||||
try:
|
||||
await self._serve_task
|
||||
except BaseException: # noqa: BLE001
|
||||
logger.exception("UvicornServerRunner.stop error")
|
||||
raise
|
||||
finally:
|
||||
self._server = None
|
||||
self._serve_task = None
|
||||
self._bound_port = None
|
||||
logger.debug("UvicornServerRunner.stop result: stopped")
|
||||
|
||||
@property
|
||||
def port(self) -> int:
|
||||
result = self._bound_port if self._bound_port is not None else self._port
|
||||
logger.debug("UvicornServerRunner.port result: %s", result)
|
||||
return result
|
||||
|
||||
|
||||
class HttpControlChannel(ControlChannel):
|
||||
"""HTTP API как канал управления: /health, /actions/start, /actions/stop, /actions/status."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
port: int,
|
||||
timeout: int,
|
||||
health_provider: Callable[[], Awaitable[HealthPayload]],
|
||||
):
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._timeout = timeout
|
||||
self._health_provider = health_provider
|
||||
self._on_start: Optional[StartHandler] = None
|
||||
self._on_stop: Optional[StopHandler] = None
|
||||
self._on_status: Optional[StatusHandler] = None
|
||||
self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout)
|
||||
self._app: Optional[FastAPI] = None
|
||||
logger.debug(
|
||||
"HttpControlChannel.__init__ result: host=%s port=%s timeout=%s",
|
||||
host,
|
||||
port,
|
||||
timeout,
|
||||
)
|
||||
|
||||
def _create_app(self) -> FastAPI:
|
||||
app = FastAPI(title="Config Manager Management API")
|
||||
|
||||
@app.middleware("http")
|
||||
async def log_api_call(request: Request, call_next): # type: ignore[no-untyped-def]
|
||||
started = time.monotonic()
|
||||
response = await call_next(request)
|
||||
duration_ms = int((time.monotonic() - started) * 1000)
|
||||
logger.info(
|
||||
"API call: method=%s path=%s status=%s duration_ms=%s",
|
||||
request.method,
|
||||
request.url.path,
|
||||
response.status_code,
|
||||
duration_ms,
|
||||
)
|
||||
return response
|
||||
|
||||
@app.get(PATH_HEALTH)
|
||||
async def health() -> JSONResponse:
|
||||
return await self._health_response()
|
||||
|
||||
@app.get(PATH_ACTION_START)
|
||||
@app.post(PATH_ACTION_START)
|
||||
async def action_start() -> JSONResponse:
|
||||
return await self._action_response("start", self._on_start)
|
||||
|
||||
@app.get(PATH_ACTION_STOP)
|
||||
@app.post(PATH_ACTION_STOP)
|
||||
async def action_stop() -> JSONResponse:
|
||||
return await self._action_response("stop", self._on_stop)
|
||||
|
||||
@app.get(PATH_ACTION_STATUS)
|
||||
@app.post(PATH_ACTION_STATUS)
|
||||
async def action_status() -> JSONResponse:
|
||||
return await self._action_response("status", self._on_status)
|
||||
|
||||
logger.debug(
|
||||
"HttpControlChannel._create_app result: routes=%s,%s,%s,%s",
|
||||
PATH_HEALTH,
|
||||
PATH_ACTION_START,
|
||||
PATH_ACTION_STOP,
|
||||
PATH_ACTION_STATUS,
|
||||
)
|
||||
return app
|
||||
|
||||
async def _health_response(self) -> JSONResponse:
|
||||
try:
|
||||
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
|
||||
status_code = 200 if payload.get("status", "unhealthy") == "ok" else 503
|
||||
logger.debug(
|
||||
"HttpControlChannel._health_response result: status_code=%s payload=%s",
|
||||
status_code,
|
||||
payload,
|
||||
)
|
||||
return JSONResponse(content=payload, status_code=status_code)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.exception("HttpControlChannel._health_response error")
|
||||
return JSONResponse(content={"status": "unhealthy", "detail": str(exc)}, status_code=503)
|
||||
|
||||
async def _action_response(
|
||||
self,
|
||||
action: str,
|
||||
callback: Optional[Callable[[], Awaitable[str]]],
|
||||
) -> JSONResponse:
|
||||
if callback is None:
|
||||
logger.debug(
|
||||
"HttpControlChannel._action_response result: action=%s status_code=404 detail=handler not configured",
|
||||
action,
|
||||
)
|
||||
return JSONResponse(
|
||||
content={"status": "error", "detail": f"{action} handler is not configured"},
|
||||
status_code=404,
|
||||
)
|
||||
try:
|
||||
detail = await asyncio.wait_for(callback(), timeout=float(self._timeout))
|
||||
if not detail:
|
||||
detail = f"{action} action accepted"
|
||||
logger.debug(
|
||||
"HttpControlChannel._action_response result: action=%s status_code=200 detail=%s",
|
||||
action,
|
||||
detail,
|
||||
)
|
||||
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("HttpControlChannel._action_response timeout: action=%s timeout=%s", action, self._timeout)
|
||||
return JSONResponse(
|
||||
content={"status": "error", "detail": f"{action} handler did not respond within {self._timeout}s"},
|
||||
status_code=504,
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.exception("HttpControlChannel._action_response error: action=%s", action)
|
||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
||||
|
||||
async def start(
|
||||
self,
|
||||
on_start: StartHandler,
|
||||
on_stop: StopHandler,
|
||||
on_status: StatusHandler,
|
||||
) -> None:
|
||||
self._on_start = on_start
|
||||
self._on_stop = on_stop
|
||||
self._on_status = on_status
|
||||
self._app = self._create_app()
|
||||
try:
|
||||
await self._runner.start(self._app)
|
||||
logger.debug("HttpControlChannel.start result: started port=%s", self._runner.port)
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception("HttpControlChannel.start error")
|
||||
raise
|
||||
|
||||
async def stop(self) -> None:
|
||||
try:
|
||||
await self._runner.stop()
|
||||
logger.debug("HttpControlChannel.stop result: stopped")
|
||||
except BaseException: # noqa: BLE001
|
||||
logger.exception("HttpControlChannel.stop error")
|
||||
raise
|
||||
|
||||
@property
|
||||
def port(self) -> int:
|
||||
return self._runner.port
|
||||
|
||||
def _build_health_response(self) -> Awaitable[tuple[int, Any]]:
|
||||
"""Для тестов: вернуть (status_code, payload) без запуска сервера."""
|
||||
|
||||
async def _run() -> tuple[int, Any]:
|
||||
response = await self._health_response()
|
||||
body: Any = response.body
|
||||
if isinstance(body, bytes):
|
||||
body = json.loads(body.decode("utf-8"))
|
||||
return response.status_code, body
|
||||
|
||||
return _run()
|
||||
@@ -1,3 +1,6 @@
|
||||
"""Реализация канала управления через Telegram Bot API (long polling).
|
||||
|
||||
Принимает команды /start, /stop, /status в указанном чате и вызывает привязанные обработчики."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
@@ -15,10 +18,10 @@ class TelegramControlChannel(ControlChannel):
|
||||
self,
|
||||
token: str,
|
||||
chat_id: int,
|
||||
poll_interval: float = 2.0,
|
||||
poll_interval: int = 2,
|
||||
logger: Optional[logging.Logger] = None,
|
||||
):
|
||||
"""Initialize Telegram polling channel with bot and chat settings."""
|
||||
"""Инициализация канала опроса Telegram с настройками бота и чата."""
|
||||
self._token = token
|
||||
self._chat_id = chat_id
|
||||
self._poll_interval = poll_interval
|
||||
@@ -31,7 +34,7 @@ class TelegramControlChannel(ControlChannel):
|
||||
self._logger = logger or logging.getLogger(__name__)
|
||||
|
||||
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||
"""Start polling Telegram updates and register command callbacks."""
|
||||
"""Запустить опрос обновлений Telegram и зарегистрировать колбэки команд."""
|
||||
if self._task is not None and not self._task.done():
|
||||
return
|
||||
self._on_start = on_start
|
||||
@@ -41,7 +44,7 @@ class TelegramControlChannel(ControlChannel):
|
||||
self._task = asyncio.create_task(self._poll_loop())
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop polling loop and wait until task termination."""
|
||||
"""Остановить цикл опроса и дождаться завершения задачи."""
|
||||
self._stop_event.set()
|
||||
if self._task is not None:
|
||||
self._task.cancel()
|
||||
@@ -52,7 +55,7 @@ class TelegramControlChannel(ControlChannel):
|
||||
self._task = None
|
||||
|
||||
async def _poll_loop(self) -> None:
|
||||
"""Continuously fetch updates and dispatch supported commands."""
|
||||
"""Непрерывно получать обновления и вызывать поддерживаемые команды."""
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
updates = await asyncio.to_thread(self._fetch_updates)
|
||||
@@ -67,7 +70,7 @@ class TelegramControlChannel(ControlChannel):
|
||||
continue
|
||||
|
||||
def _fetch_updates(self) -> list[dict]:
|
||||
"""Pull new Telegram updates using the latest offset."""
|
||||
"""Запросить новые обновления Telegram с учётом последнего offset."""
|
||||
params = {"timeout": 0}
|
||||
if self._offset is not None:
|
||||
params["offset"] = self._offset
|
||||
@@ -82,7 +85,7 @@ class TelegramControlChannel(ControlChannel):
|
||||
return result
|
||||
|
||||
async def _process_update(self, update: dict) -> None:
|
||||
"""Handle one Telegram update and execute mapped command."""
|
||||
"""Обработать одно обновление Telegram и выполнить соответствующую команду."""
|
||||
message = update.get("message") or {}
|
||||
text = (message.get("text") or "").strip().lower()
|
||||
chat = message.get("chat") or {}
|
||||
@@ -103,7 +106,7 @@ class TelegramControlChannel(ControlChannel):
|
||||
await asyncio.to_thread(self._send_message, reply)
|
||||
|
||||
def _send_message(self, text: str) -> None:
|
||||
"""Send plain-text reply to the configured Telegram chat."""
|
||||
"""Отправить текстовый ответ в настроенный чат Telegram."""
|
||||
encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text})
|
||||
url = f"https://api.telegram.org/bot{self._token}/sendMessage"
|
||||
req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST")
|
||||
|
||||
11
src/config_manager/v2/core/__init__.py
Normal file
11
src/config_manager/v2/core/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""Ядро V2: жизненный цикл менеджера, загрузка конфига и цикл воркера.
|
||||
|
||||
Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute()."""
|
||||
from .config_loader import ConfigLoader
|
||||
from .control_bridge import ControlChannelBridge
|
||||
from .health_aggregator import HealthAggregator
|
||||
from .log_manager import LogManager
|
||||
from .manager import ConfigManagerV2
|
||||
from .scheduler import WorkerLoop
|
||||
|
||||
__all__ = ["ConfigLoader", "ConfigManagerV2", "ControlChannelBridge", "HealthAggregator", "LogManager", "WorkerLoop"]
|
||||
106
src/config_manager/v2/core/config_loader.py
Normal file
106
src/config_manager/v2/core/config_loader.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""Загрузчик конфигурации из файла (YAML/JSON) с обнаружением изменений по хешу.
|
||||
|
||||
Читает файл синхронно и асинхронно, парсит по расширению и возвращает последний валидный конфиг при ошибках."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Optional
|
||||
|
||||
import yaml
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConfigLoader:
|
||||
def __init__(self, path: str):
|
||||
"""Инициализация состояния загрузчика для указанного пути к файлу конфига."""
|
||||
self.path = path
|
||||
self.config: Any = None
|
||||
self.last_valid_config: Any = None
|
||||
self._last_seen_hash: Optional[str] = None
|
||||
logger.debug("ConfigLoader.__init__ result: path=%s", self.path)
|
||||
|
||||
def _read_file_sync(self) -> str:
|
||||
"""Синхронно прочитать сырой текст конфига с диска."""
|
||||
with open(self.path, "r", encoding="utf-8") as fh:
|
||||
data = fh.read()
|
||||
logger.debug("ConfigLoader._read_file_sync result: bytes=%s", len(data))
|
||||
return data
|
||||
|
||||
async def read_file_async(self) -> str:
|
||||
"""Прочитать сырой текст конфига с диска в рабочем потоке."""
|
||||
result = await asyncio.to_thread(self._read_file_sync)
|
||||
logger.debug("ConfigLoader.read_file_async result: bytes=%s", len(result))
|
||||
return result
|
||||
|
||||
def parse_config(self, data: str) -> Any:
|
||||
"""Распарсить текст конфига как YAML или JSON по расширению файла."""
|
||||
extension = os.path.splitext(self.path)[1].lower()
|
||||
try:
|
||||
if extension in (".yaml", ".yml"):
|
||||
result = yaml.safe_load(data)
|
||||
else:
|
||||
result = json.loads(data)
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception("ConfigLoader.parse_config error: extension=%s", extension)
|
||||
raise
|
||||
logger.debug(
|
||||
"ConfigLoader.parse_config result: extension=%s type=%s",
|
||||
extension,
|
||||
type(result).__name__,
|
||||
)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def _calculate_hash(data: str) -> str:
|
||||
"""Вычислить устойчивый хеш содержимого для обнаружения изменений."""
|
||||
result = hashlib.sha256(data.encode("utf-8")).hexdigest()
|
||||
logger.debug("ConfigLoader._calculate_hash result: hash=%s", result)
|
||||
return result
|
||||
|
||||
def load_sync(self) -> Any:
|
||||
"""Синхронно загрузить и распарсить конфиг один раз (для чтения настроек при инициализации)."""
|
||||
try:
|
||||
raw_data = self._read_file_sync()
|
||||
parsed = self.parse_config(raw_data)
|
||||
self.config = parsed
|
||||
self.last_valid_config = parsed
|
||||
logger.debug("ConfigLoader.load_sync result: loaded")
|
||||
return parsed
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception("ConfigLoader.load_sync error")
|
||||
return None
|
||||
|
||||
async def load_if_changed(self) -> tuple[bool, Any]:
|
||||
"""Загрузить и распарсить конфиг только при изменении содержимого файла."""
|
||||
raw_data = await self.read_file_async()
|
||||
current_hash = self._calculate_hash(raw_data)
|
||||
if current_hash == self._last_seen_hash:
|
||||
logger.debug("ConfigLoader.load_if_changed result: changed=False")
|
||||
return False, self.config
|
||||
|
||||
self._last_seen_hash = current_hash
|
||||
parsed = self.parse_config(raw_data)
|
||||
self.config = parsed
|
||||
self.last_valid_config = parsed
|
||||
logger.debug("ConfigLoader.load_if_changed result: changed=True")
|
||||
return True, parsed
|
||||
|
||||
|
||||
def extract_scheduler_intervals(
|
||||
config: Any,
|
||||
default_update: float,
|
||||
default_work: float,
|
||||
) -> tuple[float, float]:
|
||||
"""Извлекает update_interval и work_interval из конфига (dict). Возвращает значения по умолчанию при ошибке."""
|
||||
if not isinstance(config, dict):
|
||||
return default_update, default_work
|
||||
upd = config.get("update_interval")
|
||||
wrk = config.get("work_interval")
|
||||
u = float(upd) if isinstance(upd, (int, float)) and upd > 0 else default_update
|
||||
w = float(wrk) if isinstance(wrk, (int, float)) and wrk > 0 else default_work
|
||||
return u, w
|
||||
54
src/config_manager/v2/core/control_bridge.py
Normal file
54
src/config_manager/v2/core/control_bridge.py
Normal file
@@ -0,0 +1,54 @@
|
||||
"""Адаптер между каналами управления и жизненным циклом менеджера.
|
||||
|
||||
Предоставляет обработчики start/stop/status для ControlChannel (halt, state, status text)."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from collections.abc import Awaitable, Callable
|
||||
|
||||
from .types import LifecycleState
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ControlChannelBridge:
|
||||
"""Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram)."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
get_state: Callable[[], LifecycleState],
|
||||
get_status: Callable[[], Awaitable[str]],
|
||||
start_runtime: Callable[[], Awaitable[str]],
|
||||
stop_runtime: Callable[[], Awaitable[str]],
|
||||
):
|
||||
self._get_state = get_state
|
||||
self._get_status = get_status
|
||||
self._start_runtime = start_runtime
|
||||
self._stop_runtime = stop_runtime
|
||||
logger.debug("ControlChannelBridge.__init__ result: callbacks configured")
|
||||
|
||||
async def on_start(self) -> str:
|
||||
"""Обработать внешний start через lifecycle-метод менеджера."""
|
||||
if self._get_state() == LifecycleState.RUNNING:
|
||||
result = "already running"
|
||||
logger.debug("ControlChannelBridge.on_start result: %s", result)
|
||||
return result
|
||||
result = await self._start_runtime()
|
||||
logger.debug("ControlChannelBridge.on_start result: %s", result)
|
||||
return result
|
||||
|
||||
async def on_stop(self) -> str:
|
||||
"""Обработать внешний stop через lifecycle-метод менеджера."""
|
||||
result = await self._stop_runtime()
|
||||
logger.debug("ControlChannelBridge.on_stop result: %s", result)
|
||||
return result
|
||||
|
||||
async def on_status(self) -> str:
|
||||
"""Вернуть текущий текст статуса."""
|
||||
try:
|
||||
result = await self._get_status()
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception("ControlChannelBridge.on_status error")
|
||||
raise
|
||||
logger.debug("ControlChannelBridge.on_status result: %s", result)
|
||||
return result
|
||||
71
src/config_manager/v2/core/health_aggregator.py
Normal file
71
src/config_manager/v2/core/health_aggregator.py
Normal file
@@ -0,0 +1,71 @@
|
||||
"""Собирает состояние жизненного цикла и здоровья в один ответ для /health.
|
||||
|
||||
Здоровье = был успешный execute() за последние health_timeout секунд; иначе unhealthy с деталью (ошибка или таймаут)."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
|
||||
from .types import HealthPayload, LifecycleState
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HealthAggregator:
|
||||
"""Формирует ответ здоровья по времени последнего успешного execute() и таймауту."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
get_state: Callable[[], LifecycleState],
|
||||
get_last_error: Callable[[], str | None],
|
||||
get_last_success_timestamp: Callable[[], float | None],
|
||||
health_timeout: int,
|
||||
get_app_health: Callable[[], HealthPayload],
|
||||
):
|
||||
self._get_state = get_state
|
||||
self._get_last_error = get_last_error
|
||||
self._get_last_success_timestamp = get_last_success_timestamp
|
||||
self._health_timeout = health_timeout
|
||||
self._get_app_health = get_app_health
|
||||
logger.debug("HealthAggregator.__init__ result: health_timeout=%s", self._health_timeout)
|
||||
|
||||
async def collect(self) -> HealthPayload:
|
||||
"""Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state."""
|
||||
state = self._get_state()
|
||||
state_value = state.value
|
||||
|
||||
# Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy.
|
||||
if state != LifecycleState.RUNNING:
|
||||
result = {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value}
|
||||
logger.debug("HealthAggregator.collect result: %s", result)
|
||||
return result
|
||||
|
||||
last_success = self._get_last_success_timestamp()
|
||||
now = time.monotonic()
|
||||
|
||||
if last_success is None:
|
||||
detail = self._get_last_error() or "no successful run yet"
|
||||
result = {"status": "unhealthy", "detail": detail, "state": state_value}
|
||||
logger.debug("HealthAggregator.collect result: %s", result)
|
||||
return result
|
||||
|
||||
if (now - last_success) > self._health_timeout:
|
||||
detail = self._get_last_error() or f"no successful run within {self._health_timeout}s"
|
||||
result = {"status": "unhealthy", "detail": detail, "state": state_value}
|
||||
logger.debug("HealthAggregator.collect result: %s", result)
|
||||
return result
|
||||
|
||||
result = self._get_app_health()
|
||||
status = result.get("status", "unhealthy")
|
||||
if status == "degraded":
|
||||
degraded = {"status": "degraded", "detail": result.get("detail", "app degraded"), "state": state_value}
|
||||
logger.debug("HealthAggregator.collect result: %s", degraded)
|
||||
return degraded
|
||||
if status != "ok":
|
||||
unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value}
|
||||
logger.debug("HealthAggregator.collect result: %s", unhealthy)
|
||||
return unhealthy
|
||||
healthy = {**result, "state": state_value}
|
||||
logger.debug("HealthAggregator.collect result: %s", healthy)
|
||||
return healthy
|
||||
38
src/config_manager/v2/core/log_manager.py
Normal file
38
src/config_manager/v2/core/log_manager.py
Normal file
@@ -0,0 +1,38 @@
|
||||
"""Применение конфигурации логирования из словаря (секция `log` в config.yaml).
|
||||
|
||||
Управляет конфигурацией логирования приложения через dictConfig, с восстановлением последнего валидного конфига при ошибке."""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import logging.config
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class LogManager:
|
||||
"""Применяет секцию `log` из конфига к логированию (dictConfig). При ошибке восстанавливает последний валидный конфиг."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self._last_valid_config: Optional[dict] = None
|
||||
|
||||
def apply_config(self, config: dict) -> None:
|
||||
"""Применить конфигурацию логирования из словаря. При ошибке восстанавливает последний валидный конфиг."""
|
||||
logging_config = config.get("log")
|
||||
if not logging_config:
|
||||
self.logger.warning(
|
||||
"Config has no 'log' section; logging parameters from config are not applied (default level may be WARNING)."
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
logging.config.dictConfig(logging_config)
|
||||
self._last_valid_config = logging_config
|
||||
self.logger.info("Logging configuration applied")
|
||||
except Exception as e:
|
||||
self.logger.error("Error applying logging config: %s", e)
|
||||
if self._last_valid_config:
|
||||
try:
|
||||
logging.config.dictConfig(self._last_valid_config)
|
||||
self.logger.info("Previous logging configuration restored")
|
||||
except Exception as restore_error:
|
||||
self.logger.error("Error restoring previous config: %s", restore_error)
|
||||
432
src/config_manager/v2/core/manager.py
Normal file
432
src/config_manager/v2/core/manager.py
Normal file
@@ -0,0 +1,432 @@
|
||||
"""Config manager v2: runtime orchestration and configuration updates."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from collections.abc import Callable, Iterable
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from ..control.base import ControlChannel
|
||||
from .config_loader import ConfigLoader
|
||||
from .control_bridge import ControlChannelBridge
|
||||
from .health_aggregator import HealthAggregator
|
||||
from .log_manager import LogManager
|
||||
from .scheduler import WorkerLoop
|
||||
from .types import HealthPayload, LifecycleState
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _read_env_interval(name: str, default_value: float) -> float:
|
||||
"""Read positive float interval from env."""
|
||||
raw_value = os.environ.get(name)
|
||||
if raw_value is None:
|
||||
return float(default_value)
|
||||
try:
|
||||
parsed = float(raw_value)
|
||||
if parsed <= 0:
|
||||
raise ValueError(f"{name} must be greater than zero")
|
||||
return parsed
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception(
|
||||
"ConfigManagerV2 interval parse error: env=%s raw_value=%s fallback=%s",
|
||||
name,
|
||||
raw_value,
|
||||
default_value,
|
||||
)
|
||||
return float(default_value)
|
||||
|
||||
|
||||
def _read_env_optional_float(name: str, default_value: Optional[float]) -> Optional[float]:
|
||||
"""Read optional non-negative float from env (0 or missing => default_value)."""
|
||||
raw_value = os.environ.get(name)
|
||||
if raw_value is None:
|
||||
return default_value
|
||||
try:
|
||||
parsed = float(raw_value)
|
||||
if parsed < 0:
|
||||
logger.warning("ConfigManagerV2 %s must be >= 0, got %s; using default %s", name, parsed, default_value)
|
||||
return default_value
|
||||
return parsed if parsed > 0 else default_value
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception("ConfigManagerV2 %s parse error: raw_value=%s fallback=%s", name, raw_value, default_value)
|
||||
return default_value
|
||||
|
||||
|
||||
class _RuntimeController:
|
||||
"""Runtime loops and lifecycle supervision."""
|
||||
|
||||
CONTROL_CHANNEL_TIMEOUT = 5.0
|
||||
|
||||
def _on_execute_success(self) -> None:
|
||||
self._last_success_timestamp = time.monotonic()
|
||||
self._last_execute_error = None
|
||||
self.logger.debug(
|
||||
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
|
||||
self._last_success_timestamp,
|
||||
)
|
||||
|
||||
def _on_execute_error(self, exc: Exception) -> None:
|
||||
self._last_execute_error = str(exc)
|
||||
self.logger.error(
|
||||
"ConfigManagerV2._on_execute_error: %s",
|
||||
self._last_execute_error,
|
||||
exc_info=(type(exc), exc, exc.__traceback__),
|
||||
)
|
||||
|
||||
def _on_worker_degraded_change(self, degraded: bool) -> None:
|
||||
self._worker_degraded = degraded
|
||||
self.logger.warning("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
|
||||
|
||||
def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None:
|
||||
self._worker_inflight_count = inflight_count
|
||||
self._worker_timed_out_inflight_count = timed_out_count
|
||||
self.logger.debug(
|
||||
"ConfigManagerV2._on_worker_metrics_change result: inflight=%s timed_out_inflight=%s",
|
||||
inflight_count,
|
||||
timed_out_count,
|
||||
)
|
||||
|
||||
async def _worker_loop(self) -> None:
|
||||
self.logger.warning(
|
||||
"ConfigManagerV2._worker_loop result: started work_interval=%s",
|
||||
self.work_interval,
|
||||
)
|
||||
worker = WorkerLoop(
|
||||
execute=self.execute,
|
||||
get_interval=lambda: self.work_interval,
|
||||
halt_event=self._halt,
|
||||
on_error=self._on_execute_error,
|
||||
on_success=self._on_execute_success,
|
||||
execute_timeout=self._execute_timeout,
|
||||
on_degraded_change=self._on_worker_degraded_change,
|
||||
on_metrics_change=self._on_worker_metrics_change,
|
||||
)
|
||||
try:
|
||||
await worker.run()
|
||||
self.logger.debug("ConfigManagerV2._worker_loop result: completed")
|
||||
finally:
|
||||
self.logger.warning("ConfigManagerV2._worker_loop result: stopped")
|
||||
|
||||
async def _periodic_update_loop(self) -> None:
|
||||
self.logger.warning(
|
||||
"ConfigManagerV2._periodic_update_loop result: started update_interval=%s",
|
||||
self.update_interval,
|
||||
)
|
||||
try:
|
||||
while not self._halt.is_set():
|
||||
await self._update_config()
|
||||
try:
|
||||
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
finally:
|
||||
self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped")
|
||||
|
||||
async def _status_text(self) -> str:
|
||||
health = await self._health_aggregator.collect()
|
||||
detail = health.get("detail")
|
||||
worker_tail = (
|
||||
f"worker_inflight={self._worker_inflight_count}; "
|
||||
f"worker_timed_out_inflight={self._worker_timed_out_inflight_count}"
|
||||
)
|
||||
if detail:
|
||||
status_text = f"state={self._state.value}; health={health['status']}; detail={detail}; {worker_tail}"
|
||||
self.logger.debug("ConfigManagerV2._status_text result: %s", status_text)
|
||||
return status_text
|
||||
status_text = f"state={self._state.value}; health={health['status']}; {worker_tail}"
|
||||
self.logger.debug("ConfigManagerV2._status_text result: %s", status_text)
|
||||
return status_text
|
||||
|
||||
async def _start_control_channels(self) -> None:
|
||||
for channel in self._control_channels:
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
channel.start(
|
||||
self._control_bridge.on_start,
|
||||
self._control_bridge.on_stop,
|
||||
self._control_bridge.on_status,
|
||||
),
|
||||
timeout=self.CONTROL_CHANNEL_TIMEOUT,
|
||||
)
|
||||
self.logger.debug("ConfigManagerV2._start_control_channels result: started channel=%s", type(channel).__name__)
|
||||
except asyncio.TimeoutError:
|
||||
self.logger.error(
|
||||
"ConfigManagerV2._start_control_channels timeout channel=%s timeout=%ss",
|
||||
type(channel).__name__,
|
||||
self.CONTROL_CHANNEL_TIMEOUT,
|
||||
)
|
||||
except Exception: # noqa: BLE001
|
||||
self.logger.exception("ConfigManagerV2._start_control_channels error channel=%s", type(channel).__name__)
|
||||
|
||||
async def _stop_control_channels(self) -> None:
|
||||
for channel in self._control_channels:
|
||||
try:
|
||||
await asyncio.wait_for(channel.stop(), timeout=self.CONTROL_CHANNEL_TIMEOUT)
|
||||
self.logger.debug("ConfigManagerV2._stop_control_channels result: stopped channel=%s", type(channel).__name__)
|
||||
except asyncio.TimeoutError:
|
||||
self.logger.error(
|
||||
"ConfigManagerV2._stop_control_channels timeout channel=%s timeout=%ss",
|
||||
type(channel).__name__,
|
||||
self.CONTROL_CHANNEL_TIMEOUT,
|
||||
)
|
||||
except Exception: # noqa: BLE001
|
||||
self.logger.exception("ConfigManagerV2._stop_control_channels error channel=%s", type(channel).__name__)
|
||||
|
||||
async def _run_runtime_loops(self) -> None:
|
||||
self._state = LifecycleState.RUNNING
|
||||
self.logger.debug("ConfigManagerV2._run_runtime_loops result: state=%s", self._state.value)
|
||||
tasks = [
|
||||
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
|
||||
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
|
||||
]
|
||||
try:
|
||||
await asyncio.gather(*tasks)
|
||||
self.logger.debug("ConfigManagerV2._run_runtime_loops result: loops completed")
|
||||
except asyncio.CancelledError:
|
||||
self.logger.debug("ConfigManagerV2._run_runtime_loops result: cancelled")
|
||||
raise
|
||||
except Exception: # noqa: BLE001
|
||||
self.logger.exception("ConfigManagerV2._run_runtime_loops error")
|
||||
raise
|
||||
finally:
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
self._state = LifecycleState.STOPPED
|
||||
self.logger.debug("ConfigManagerV2._run_runtime_loops result: state=%s", self._state.value)
|
||||
|
||||
async def _start_runtime(self) -> str:
|
||||
if self._shutdown.is_set():
|
||||
result = "manager is shutting down"
|
||||
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
|
||||
return result
|
||||
|
||||
async with self._runtime_lock:
|
||||
if self._runtime_task is not None and not self._runtime_task.done():
|
||||
result = "already running"
|
||||
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
|
||||
return result
|
||||
self._halt.clear()
|
||||
self._state = LifecycleState.STARTING
|
||||
self._runtime_task = asyncio.create_task(self._run_runtime_loops(), name="config-manager-v2-runtime")
|
||||
self._runtime_task.add_done_callback(self._on_runtime_task_done)
|
||||
|
||||
result = "start signal accepted"
|
||||
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
|
||||
return result
|
||||
|
||||
async def _stop_runtime(self) -> str:
|
||||
self._halt.set()
|
||||
async with self._runtime_lock:
|
||||
runtime_task = self._runtime_task
|
||||
if runtime_task is None:
|
||||
result = "already stopped"
|
||||
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
|
||||
return result
|
||||
if runtime_task.done():
|
||||
result = "already stopped"
|
||||
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
|
||||
return result
|
||||
result = "stop signal accepted"
|
||||
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
|
||||
return result
|
||||
|
||||
def _on_runtime_task_done(self, task: asyncio.Task[None]) -> None:
|
||||
if task.cancelled():
|
||||
self.logger.debug("ConfigManagerV2._on_runtime_task_done result: cancelled")
|
||||
return
|
||||
try:
|
||||
exc = task.exception()
|
||||
except Exception: # noqa: BLE001
|
||||
self.logger.exception("ConfigManagerV2._on_runtime_task_done error while reading task exception")
|
||||
return
|
||||
if exc is None:
|
||||
self.logger.debug("ConfigManagerV2._on_runtime_task_done result: completed")
|
||||
return
|
||||
self.logger.error(
|
||||
"ConfigManagerV2 runtime task failed",
|
||||
exc_info=(type(exc), exc, exc.__traceback__),
|
||||
)
|
||||
|
||||
async def _run(self) -> None:
|
||||
self._shutdown.clear()
|
||||
self._halt.clear()
|
||||
self._state = LifecycleState.STARTING
|
||||
self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
|
||||
await self._update_config()
|
||||
await self._start_control_channels()
|
||||
await self._start_runtime()
|
||||
|
||||
try:
|
||||
await self._shutdown.wait()
|
||||
finally:
|
||||
self._state = LifecycleState.STOPPING
|
||||
self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
|
||||
self._halt.set()
|
||||
|
||||
async with self._runtime_lock:
|
||||
runtime_task = self._runtime_task
|
||||
if runtime_task is not None:
|
||||
try:
|
||||
await runtime_task
|
||||
except asyncio.CancelledError:
|
||||
self.logger.debug("ConfigManagerV2._run result: runtime task cancelled")
|
||||
except Exception: # noqa: BLE001
|
||||
self.logger.exception("ConfigManagerV2._run error while awaiting runtime task")
|
||||
|
||||
await self._stop_control_channels()
|
||||
self._runtime_task = None
|
||||
self._state = LifecycleState.STOPPED
|
||||
self._task = None
|
||||
self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
|
||||
|
||||
|
||||
class ConfigManagerV2(_RuntimeController):
|
||||
"""Public manager API. Каналы управления задаются снаружи через control_channels."""
|
||||
|
||||
DEFAULT_UPDATE_INTERVAL = 5
|
||||
DEFAULT_WORK_INTERVAL = 2
|
||||
DEFAULT_HEALTH_TIMEOUT = 30
|
||||
DEFAULT_EXECUTE_TIMEOUT = 600.0
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
path: str,
|
||||
control_channels: Optional[
|
||||
Union[Iterable[ControlChannel], Callable[["ConfigManagerV2"], Iterable[ControlChannel]]]
|
||||
] = None,
|
||||
):
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.path = path
|
||||
self.config: Any = None
|
||||
self.update_interval = _read_env_interval("UPDATE_INTERVAL", float(self.DEFAULT_UPDATE_INTERVAL))
|
||||
self.work_interval = _read_env_interval("WORK_INTERVAL", float(self.DEFAULT_WORK_INTERVAL))
|
||||
self._execute_timeout = _read_env_optional_float("EXECUTE_TIMEOUT", float(self.DEFAULT_EXECUTE_TIMEOUT))
|
||||
|
||||
self._loader = ConfigLoader(path)
|
||||
self._log_manager = LogManager()
|
||||
self._halt = asyncio.Event()
|
||||
self._shutdown = asyncio.Event()
|
||||
self._task: Optional[asyncio.Task[None]] = None
|
||||
self._runtime_task: Optional[asyncio.Task[None]] = None
|
||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
self._runtime_lock = asyncio.Lock()
|
||||
self._state = LifecycleState.IDLE
|
||||
self._last_execute_error: Optional[str] = None
|
||||
self._last_success_timestamp: Optional[float] = None
|
||||
self._worker_degraded = False
|
||||
self._worker_inflight_count = 0
|
||||
self._worker_timed_out_inflight_count = 0
|
||||
|
||||
initial_config = self._loader.load_sync()
|
||||
self.config = initial_config
|
||||
self._health_timeout = self.DEFAULT_HEALTH_TIMEOUT
|
||||
self._health_aggregator = HealthAggregator(
|
||||
get_state=lambda: self._state,
|
||||
get_last_error=lambda: self._last_execute_error,
|
||||
get_last_success_timestamp=lambda: self._last_success_timestamp,
|
||||
health_timeout=self._health_timeout,
|
||||
get_app_health=self.get_health_status,
|
||||
)
|
||||
self._control_bridge = ControlChannelBridge(
|
||||
get_state=lambda: self._state,
|
||||
get_status=self._status_text,
|
||||
start_runtime=self._start_runtime,
|
||||
stop_runtime=self._stop_runtime,
|
||||
)
|
||||
if control_channels is None:
|
||||
self._control_channels = []
|
||||
elif callable(control_channels):
|
||||
self._control_channels = list(control_channels(self))
|
||||
else:
|
||||
self._control_channels = list(control_channels)
|
||||
self.logger.debug(
|
||||
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s execute_timeout=%s control_channels=%s",
|
||||
self.path,
|
||||
self.update_interval,
|
||||
self.work_interval,
|
||||
self._execute_timeout,
|
||||
len(self._control_channels),
|
||||
)
|
||||
|
||||
def _apply_config(self, new_config: Any) -> None:
|
||||
self.config = new_config
|
||||
if isinstance(new_config, dict):
|
||||
try:
|
||||
self._log_manager.apply_config(new_config)
|
||||
except Exception: # noqa: BLE001
|
||||
self.logger.exception("ConfigManagerV2._apply_config error while applying logging config")
|
||||
raise
|
||||
self.logger.debug(
|
||||
"ConfigManagerV2._apply_config result: config_type=%s is_dict=%s",
|
||||
type(new_config).__name__,
|
||||
isinstance(new_config, dict),
|
||||
)
|
||||
|
||||
async def _update_config(self) -> None:
|
||||
try:
|
||||
changed, new_config = await self._loader.load_if_changed()
|
||||
if not changed:
|
||||
self.logger.debug("ConfigManagerV2._update_config result: no changes")
|
||||
return
|
||||
self._apply_config(new_config)
|
||||
self.logger.debug("ConfigManagerV2._update_config result: config updated")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
self.logger.exception("ConfigManagerV2._update_config error")
|
||||
if self._loader.last_valid_config is None:
|
||||
self.logger.debug(
|
||||
"ConfigManagerV2._update_config result: no fallback config available detail=%s",
|
||||
str(exc),
|
||||
)
|
||||
return
|
||||
try:
|
||||
self._apply_config(self._loader.last_valid_config)
|
||||
self.logger.debug(
|
||||
"ConfigManagerV2._update_config result: fallback to last valid config applied",
|
||||
)
|
||||
except Exception: # noqa: BLE001
|
||||
self.logger.exception("ConfigManagerV2._update_config fallback error")
|
||||
|
||||
def execute(self) -> None:
|
||||
"""Override in subclasses."""
|
||||
|
||||
def get_health_status(self) -> HealthPayload:
|
||||
if self._worker_degraded:
|
||||
return {"status": "degraded", "detail": "worker has timed-out in-flight execute()"}
|
||||
return {"status": "ok"}
|
||||
|
||||
def get_health_provider(self) -> Callable[[], Any]:
|
||||
"""Вернуть колбэк для health (для передачи в HttpControlChannel при создании канала снаружи)."""
|
||||
return self._health_aggregator.collect
|
||||
|
||||
async def start(self) -> None:
|
||||
if self._task is not None and not self._task.done():
|
||||
await self._start_runtime()
|
||||
self.logger.debug("ConfigManagerV2.start result: already running")
|
||||
return
|
||||
try:
|
||||
self._loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
self.logger.exception("ConfigManagerV2.start error: must be called from within async context")
|
||||
raise
|
||||
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
|
||||
self.logger.debug("ConfigManagerV2.start result: background task started")
|
||||
|
||||
async def stop(self) -> None:
|
||||
if self._task is None:
|
||||
self.logger.debug("ConfigManagerV2.stop result: not running")
|
||||
return
|
||||
self._shutdown.set()
|
||||
self._halt.set()
|
||||
if asyncio.current_task() is self._task:
|
||||
self.logger.debug("ConfigManagerV2.stop result: stop requested from supervisor task")
|
||||
return
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
self.logger.debug("ConfigManagerV2.stop result: supervisor task cancelled")
|
||||
finally:
|
||||
self.logger.debug("ConfigManagerV2.stop result: completed")
|
||||
164
src/config_manager/v2/core/scheduler.py
Normal file
164
src/config_manager/v2/core/scheduler.py
Normal file
@@ -0,0 +1,164 @@
|
||||
"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке.
|
||||
|
||||
Базовый режим: один in-flight execute().
|
||||
Режим деградации: если активный execute() превысил timeout, запускается второй worker-thread,
|
||||
чтобы работа продолжалась. Одновременно допускается не более двух in-flight задач."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from itertools import count
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class _InFlightExecute:
|
||||
id: int
|
||||
task: asyncio.Task[None]
|
||||
started_at: float
|
||||
timeout_reported: bool = False
|
||||
|
||||
|
||||
class WorkerLoop:
|
||||
def __init__(
|
||||
self,
|
||||
execute: Callable[[], None],
|
||||
get_interval: Callable[[], int | float],
|
||||
halt_event: asyncio.Event,
|
||||
on_error: Optional[Callable[[Exception], None]] = None,
|
||||
on_success: Optional[Callable[[], None]] = None,
|
||||
execute_timeout: Optional[float] = None,
|
||||
on_degraded_change: Optional[Callable[[bool], None]] = None,
|
||||
on_metrics_change: Optional[Callable[[int, int], None]] = None,
|
||||
):
|
||||
self._execute = execute
|
||||
self._get_interval = get_interval
|
||||
self._halt_event = halt_event
|
||||
self._on_error = on_error
|
||||
self._on_success = on_success
|
||||
self._execute_timeout = execute_timeout
|
||||
self._on_degraded_change = on_degraded_change
|
||||
self._on_metrics_change = on_metrics_change
|
||||
self._inflight: list[_InFlightExecute] = []
|
||||
self._id_seq = count(1)
|
||||
self._degraded = False
|
||||
self._last_metrics: Optional[tuple[int, int]] = None
|
||||
logger.debug(
|
||||
"WorkerLoop.__init__ result: execute=%s execute_timeout=%s",
|
||||
getattr(execute, "__name__", type(execute).__name__),
|
||||
execute_timeout,
|
||||
)
|
||||
|
||||
def _notify_error(self, exc: Exception) -> None:
|
||||
if self._on_error is not None:
|
||||
self._on_error(exc)
|
||||
|
||||
def _set_degraded(self, value: bool) -> None:
|
||||
if self._degraded == value:
|
||||
return
|
||||
self._degraded = value
|
||||
if self._on_degraded_change is not None:
|
||||
self._on_degraded_change(value)
|
||||
logger.warning("WorkerLoop.run degraded state changed: degraded=%s", value)
|
||||
|
||||
def _start_execute(self) -> None:
|
||||
execute_id = next(self._id_seq)
|
||||
execution = _InFlightExecute(
|
||||
id=execute_id,
|
||||
task=asyncio.create_task(asyncio.to_thread(self._execute), name=f"worker-loop-execute-{execute_id}"),
|
||||
started_at=time.monotonic(),
|
||||
)
|
||||
self._inflight.append(execution)
|
||||
logger.debug("WorkerLoop.run result: execute started id=%s inflight=%s", execution.id, len(self._inflight))
|
||||
|
||||
def _collect_finished(self) -> None:
|
||||
pending: list[_InFlightExecute] = []
|
||||
for execution in self._inflight:
|
||||
task = execution.task
|
||||
if not task.done():
|
||||
pending.append(execution)
|
||||
continue
|
||||
try:
|
||||
task.result()
|
||||
if self._on_success is not None:
|
||||
self._on_success()
|
||||
logger.debug("WorkerLoop.run result: execute completed id=%s", execution.id)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
self._notify_error(exc)
|
||||
logger.exception("WorkerLoop.run error during execute id=%s", execution.id)
|
||||
self._inflight = pending
|
||||
|
||||
def _mark_timeouts(self) -> None:
|
||||
if self._execute_timeout is None or self._execute_timeout <= 0:
|
||||
return
|
||||
now = time.monotonic()
|
||||
for execution in self._inflight:
|
||||
if execution.timeout_reported:
|
||||
continue
|
||||
if execution.task.done():
|
||||
continue
|
||||
elapsed = now - execution.started_at
|
||||
if elapsed < self._execute_timeout:
|
||||
continue
|
||||
execution.timeout_reported = True
|
||||
self._notify_error(TimeoutError(f"execute() did not finish within {self._execute_timeout}s"))
|
||||
logger.warning(
|
||||
"WorkerLoop.run execute timeout: id=%s elapsed=%.3fs timeout=%ss",
|
||||
execution.id,
|
||||
elapsed,
|
||||
self._execute_timeout,
|
||||
)
|
||||
|
||||
def _has_timed_out_inflight(self) -> bool:
|
||||
return any(item.timeout_reported and not item.task.done() for item in self._inflight)
|
||||
|
||||
def _ensure_capacity(self) -> None:
|
||||
active_count = len(self._inflight)
|
||||
if active_count == 0:
|
||||
self._start_execute()
|
||||
return
|
||||
if active_count == 1 and self._has_timed_out_inflight():
|
||||
self._start_execute()
|
||||
return
|
||||
|
||||
def _emit_metrics(self) -> None:
|
||||
if self._on_metrics_change is None:
|
||||
return
|
||||
inflight_count = len(self._inflight)
|
||||
timed_out_count = sum(1 for item in self._inflight if item.timeout_reported and not item.task.done())
|
||||
metrics = (inflight_count, timed_out_count)
|
||||
if self._last_metrics == metrics:
|
||||
return
|
||||
self._last_metrics = metrics
|
||||
self._on_metrics_change(inflight_count, timed_out_count)
|
||||
|
||||
async def run(self) -> None:
|
||||
logger.debug("WorkerLoop.run result: started")
|
||||
while not self._halt_event.is_set():
|
||||
self._collect_finished()
|
||||
self._mark_timeouts()
|
||||
self._set_degraded(self._has_timed_out_inflight())
|
||||
self._ensure_capacity()
|
||||
self._emit_metrics()
|
||||
|
||||
timeout = max(self._get_interval(), 0.01)
|
||||
try:
|
||||
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
if self._inflight:
|
||||
if self._has_timed_out_inflight():
|
||||
logger.warning("WorkerLoop.run stop: timed-out execute still running; exiting without waiting")
|
||||
else:
|
||||
await asyncio.gather(*(item.task for item in self._inflight), return_exceptions=True)
|
||||
self._collect_finished()
|
||||
|
||||
self._set_degraded(False)
|
||||
self._emit_metrics()
|
||||
logger.debug("WorkerLoop.run result: stopped")
|
||||
@@ -1,6 +1,8 @@
|
||||
"""Типы core: состояние здоровья и жизненного цикла.
|
||||
|
||||
Используются в core и control для единообразных контрактов."""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Literal, TypedDict
|
||||
|
||||
@@ -11,6 +13,8 @@ HealthState = Literal["ok", "degraded", "unhealthy"]
|
||||
class HealthPayload(TypedDict, total=False):
|
||||
status: HealthState
|
||||
detail: str
|
||||
state: str
|
||||
"""Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped)."""
|
||||
|
||||
|
||||
class LifecycleState(str, Enum):
|
||||
@@ -19,12 +23,3 @@ class LifecycleState(str, Enum):
|
||||
RUNNING = "running"
|
||||
STOPPING = "stopping"
|
||||
STOPPED = "stopped"
|
||||
|
||||
|
||||
@dataclass
|
||||
class HealthServerSettings:
|
||||
enabled: bool = False
|
||||
host: str = "0.0.0.0"
|
||||
port: int = 8000
|
||||
path: str = "/health"
|
||||
timeout: float = 3.0
|
||||
@@ -1,80 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import Optional
|
||||
|
||||
from .types import HealthPayload
|
||||
|
||||
|
||||
class HealthServer:
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
port: int,
|
||||
path: str,
|
||||
timeout: float,
|
||||
health_provider: Callable[[], Awaitable[HealthPayload]],
|
||||
):
|
||||
"""Configure lightweight HTTP health server parameters and callback."""
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._path = path
|
||||
self._timeout = timeout
|
||||
self._health_provider = health_provider
|
||||
self._server: Optional[asyncio.base_events.Server] = None
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start listening for healthcheck requests if not running."""
|
||||
if self._server is not None:
|
||||
return
|
||||
self._server = await asyncio.start_server(self._handle_connection, self._host, self._port)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the health server and release the listening socket."""
|
||||
if self._server is None:
|
||||
return
|
||||
self._server.close()
|
||||
await self._server.wait_closed()
|
||||
self._server = None
|
||||
|
||||
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
||||
"""Process one HTTP connection and return JSON health payload."""
|
||||
status_code = 404
|
||||
payload: HealthPayload = {"status": "unhealthy", "detail": "not found"}
|
||||
|
||||
try:
|
||||
request_line = await reader.readline()
|
||||
parts = request_line.decode("utf-8", errors="ignore").strip().split(" ")
|
||||
if len(parts) >= 2:
|
||||
method, path = parts[0], parts[1]
|
||||
if method == "GET" and path == self._path:
|
||||
status_code, payload = await self._build_health_response()
|
||||
except Exception: # noqa: BLE001
|
||||
status_code = 500
|
||||
payload = {"status": "unhealthy", "detail": "internal error"}
|
||||
|
||||
body = json.dumps(payload).encode("utf-8")
|
||||
phrase = "OK" if status_code == 200 else "SERVICE UNAVAILABLE" if status_code == 503 else "NOT FOUND"
|
||||
response = (
|
||||
f"HTTP/1.1 {status_code} {phrase}\r\n"
|
||||
"Content-Type: application/json\r\n"
|
||||
f"Content-Length: {len(body)}\r\n"
|
||||
"Connection: close\r\n"
|
||||
"\r\n"
|
||||
).encode("utf-8") + body
|
||||
|
||||
writer.write(response)
|
||||
await writer.drain()
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
|
||||
async def _build_health_response(self) -> tuple[int, HealthPayload]:
|
||||
"""Build HTTP status code and body from application health callback."""
|
||||
try:
|
||||
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
|
||||
status = payload.get("status", "unhealthy")
|
||||
return (200, payload) if status == "ok" else (503, payload)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
return 503, {"status": "unhealthy", "detail": str(exc)}
|
||||
@@ -1,256 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from collections.abc import Awaitable
|
||||
from typing import Any, Optional
|
||||
|
||||
from ..v1.log_manager import LogManager
|
||||
from .config_loader import ConfigLoader
|
||||
from .control.base import ControlChannel
|
||||
from .health import HealthServer
|
||||
from .scheduler import WorkerLoop
|
||||
from .types import HealthPayload, HealthServerSettings, LifecycleState
|
||||
|
||||
|
||||
class ConfigManagerV2:
|
||||
DEFAULT_UPDATE_INTERVAL = 5.0
|
||||
DEFAULT_WORK_INTERVAL = 2.0
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
path: str,
|
||||
log_manager: Optional[LogManager] = None,
|
||||
health_settings: Optional[HealthServerSettings] = None,
|
||||
control_channel: Optional[ControlChannel] = None,
|
||||
):
|
||||
"""Initialize manager subsystems and runtime state."""
|
||||
self.path = path
|
||||
self.config: Any = None
|
||||
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
||||
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
||||
|
||||
self._loader = ConfigLoader(path)
|
||||
self._log_manager = log_manager or LogManager()
|
||||
self._control_channel = control_channel
|
||||
|
||||
self._halt = asyncio.Event()
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
|
||||
self._state = LifecycleState.IDLE
|
||||
self._last_execute_error: Optional[str] = None
|
||||
|
||||
self._health_settings = health_settings or HealthServerSettings(enabled=False)
|
||||
self._health_server: Optional[HealthServer] = None
|
||||
if self._health_settings.enabled:
|
||||
self._health_server = HealthServer(
|
||||
host=self._health_settings.host,
|
||||
port=self._health_settings.port,
|
||||
path=self._health_settings.path,
|
||||
timeout=self._health_settings.timeout,
|
||||
health_provider=self._collect_health,
|
||||
)
|
||||
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def _read_file_sync(self) -> str:
|
||||
"""Read config file synchronously via the shared loader."""
|
||||
return self._loader._read_file_sync()
|
||||
|
||||
async def _read_file_async(self) -> str:
|
||||
"""Read config file asynchronously via a worker thread."""
|
||||
return await self._loader.read_file_async()
|
||||
|
||||
def _parse_config(self, data: str) -> Any:
|
||||
"""Parse raw config text to a Python object."""
|
||||
return self._loader.parse_config(data)
|
||||
|
||||
def _update_intervals_from_config(self) -> None:
|
||||
"""Refresh work and update intervals from current config."""
|
||||
if not isinstance(self.config, dict):
|
||||
return
|
||||
|
||||
upd = self.config.get("update_interval")
|
||||
wrk = self.config.get("work_interval")
|
||||
|
||||
if isinstance(upd, (int, float)) and upd > 0:
|
||||
self.update_interval = float(upd)
|
||||
else:
|
||||
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
||||
|
||||
if isinstance(wrk, (int, float)) and wrk > 0:
|
||||
self.work_interval = float(wrk)
|
||||
else:
|
||||
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
||||
|
||||
async def _update_config(self) -> None:
|
||||
"""Reload config and apply new settings when content changes."""
|
||||
try:
|
||||
changed, new_config = await self._loader.load_if_changed()
|
||||
if not changed:
|
||||
return
|
||||
|
||||
self.config = new_config
|
||||
self._update_intervals_from_config()
|
||||
if isinstance(new_config, dict):
|
||||
self._log_manager.apply_config(new_config)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
# Keep current config untouched and continue with last valid settings.
|
||||
self.logger.error("Error reading/parsing config file: %s", exc)
|
||||
if self._loader.last_valid_config is not None:
|
||||
self.config = self._loader.last_valid_config
|
||||
self._update_intervals_from_config()
|
||||
|
||||
def execute(self) -> None:
|
||||
"""Override in subclass to implement one unit of blocking work."""
|
||||
|
||||
def get_health_status(self) -> HealthPayload:
|
||||
"""Return application-specific health payload for /health."""
|
||||
return {"status": "ok"}
|
||||
|
||||
async def _collect_health(self) -> HealthPayload:
|
||||
"""Aggregate lifecycle and app status into one health result."""
|
||||
if self._state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}:
|
||||
return {"status": "unhealthy", "detail": f"state={self._state.value}"}
|
||||
|
||||
if self._last_execute_error is not None:
|
||||
return {"status": "unhealthy", "detail": self._last_execute_error}
|
||||
|
||||
result = self.get_health_status()
|
||||
status = result.get("status", "unhealthy")
|
||||
if status not in {"ok", "degraded", "unhealthy"}:
|
||||
return {"status": "unhealthy", "detail": "invalid health status"}
|
||||
return result
|
||||
|
||||
def _on_execute_success(self) -> None:
|
||||
"""Clear the last execution error marker after successful run."""
|
||||
self._last_execute_error = None
|
||||
|
||||
def _on_execute_error(self, exc: Exception) -> None:
|
||||
"""Store and log execution failure details for health reporting."""
|
||||
self._last_execute_error = str(exc)
|
||||
self.logger.error("Execution error: %s", exc)
|
||||
|
||||
async def _worker_loop(self) -> None:
|
||||
"""Run execute() repeatedly until shutdown is requested."""
|
||||
worker = WorkerLoop(
|
||||
execute=self.execute,
|
||||
get_interval=lambda: self.work_interval,
|
||||
halt_event=self._halt,
|
||||
on_error=self._on_execute_error,
|
||||
on_success=self._on_execute_success,
|
||||
)
|
||||
await worker.run()
|
||||
|
||||
async def _periodic_update_loop(self) -> None:
|
||||
"""Periodically check config file for updates until shutdown."""
|
||||
while not self._halt.is_set():
|
||||
await self._update_config()
|
||||
try:
|
||||
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
async def _status_text(self) -> str:
|
||||
"""Build human-readable runtime status for control channels."""
|
||||
health = await self._collect_health()
|
||||
detail = health.get("detail")
|
||||
if detail:
|
||||
return f"state={self._state.value}; health={health['status']}; detail={detail}"
|
||||
return f"state={self._state.value}; health={health['status']}"
|
||||
|
||||
async def _control_start(self) -> str:
|
||||
"""Handle external start command for control channels."""
|
||||
if self._state == LifecycleState.RUNNING:
|
||||
return "already running"
|
||||
self._halt.clear()
|
||||
return "start signal accepted"
|
||||
|
||||
async def _control_stop(self) -> str:
|
||||
"""Handle external stop command for control channels."""
|
||||
self._halt.set()
|
||||
return "stop signal accepted"
|
||||
|
||||
async def _control_status(self) -> str:
|
||||
"""Handle external status command for control channels."""
|
||||
return await self._status_text()
|
||||
|
||||
async def _start_control_channel(self) -> None:
|
||||
"""Start configured control channel with bound command handlers."""
|
||||
if self._control_channel is None:
|
||||
return
|
||||
await self._control_channel.start(self._control_start, self._control_stop, self._control_status)
|
||||
|
||||
async def _stop_control_channel(self) -> None:
|
||||
"""Stop configured control channel if it is active."""
|
||||
if self._control_channel is None:
|
||||
return
|
||||
await self._control_channel.stop()
|
||||
|
||||
async def _run(self) -> None:
|
||||
"""Run manager lifecycle and coordinate all background tasks."""
|
||||
self._state = LifecycleState.STARTING
|
||||
self._halt.clear()
|
||||
await self._update_config()
|
||||
|
||||
if self._health_server is not None:
|
||||
await self._health_server.start()
|
||||
await self._start_control_channel()
|
||||
|
||||
self._state = LifecycleState.RUNNING
|
||||
self.logger.info("ConfigManagerV2 started")
|
||||
|
||||
tasks = [
|
||||
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
|
||||
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
|
||||
]
|
||||
|
||||
try:
|
||||
await asyncio.gather(*tasks)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
finally:
|
||||
self._state = LifecycleState.STOPPING
|
||||
self._halt.set()
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
await self._stop_control_channel()
|
||||
if self._health_server is not None:
|
||||
await self._health_server.stop()
|
||||
self._state = LifecycleState.STOPPED
|
||||
self.logger.info("ConfigManagerV2 stopped")
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start manager lifecycle from an active asyncio context."""
|
||||
if self._task is not None and not self._task.done():
|
||||
self.logger.warning("ConfigManagerV2 is already running")
|
||||
return
|
||||
|
||||
try:
|
||||
self._loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
self.logger.error("start() must be called from within an async context")
|
||||
raise
|
||||
|
||||
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
|
||||
try:
|
||||
await self._task
|
||||
finally:
|
||||
self._task = None
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Request graceful shutdown and wait for manager completion."""
|
||||
if self._task is None:
|
||||
self.logger.warning("ConfigManagerV2 is not running")
|
||||
return
|
||||
|
||||
self._halt.set()
|
||||
if asyncio.current_task() is self._task:
|
||||
return
|
||||
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
@@ -1,39 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class WorkerLoop:
|
||||
def __init__(
|
||||
self,
|
||||
execute: Callable[[], None],
|
||||
get_interval: Callable[[], float],
|
||||
halt_event: asyncio.Event,
|
||||
on_error: Optional[Callable[[Exception], None]] = None,
|
||||
on_success: Optional[Callable[[], None]] = None,
|
||||
):
|
||||
"""Store callbacks and synchronization primitives for worker execution."""
|
||||
self._execute = execute
|
||||
self._get_interval = get_interval
|
||||
self._halt_event = halt_event
|
||||
self._on_error = on_error
|
||||
self._on_success = on_success
|
||||
|
||||
async def run(self) -> None:
|
||||
"""Run execute repeatedly until halt is requested."""
|
||||
while not self._halt_event.is_set():
|
||||
try:
|
||||
await asyncio.to_thread(self._execute)
|
||||
if self._on_success is not None:
|
||||
self._on_success()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
if self._on_error is not None:
|
||||
self._on_error(exc)
|
||||
|
||||
timeout = max(self._get_interval(), 0.01)
|
||||
try:
|
||||
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
@@ -1,6 +1,14 @@
|
||||
# === Раздел с общими конфигурационными параметрами ===
|
||||
runtime: 5
|
||||
|
||||
# === HTTP-канал управления (ConfigManagerV2): /health, /actions/start, /actions/stop ===
|
||||
management:
|
||||
enabled: true
|
||||
host: "0.0.0.0"
|
||||
port: 8000
|
||||
timeout: 3
|
||||
health_timeout: 30
|
||||
|
||||
# === Логирование ===
|
||||
log:
|
||||
version: 1
|
||||
@@ -9,8 +17,7 @@ log:
|
||||
formatters:
|
||||
standard:
|
||||
format: '%(asctime)s %(module)15s [%(levelname)8s]: %(message)s'
|
||||
telegram:
|
||||
format: '%(message)s'
|
||||
|
||||
|
||||
handlers:
|
||||
console:
|
||||
@@ -26,28 +33,20 @@ log:
|
||||
filename: logs/log.log
|
||||
mode: a
|
||||
maxBytes: 500000
|
||||
backupCount: 15
|
||||
|
||||
#telegram:
|
||||
# level: CRITICAL
|
||||
# formatter: telegram
|
||||
# class: logging_telegram_handler.TelegramHandler
|
||||
# chat_id: 211945135
|
||||
# alias: "PDC"
|
||||
|
||||
backupCount: 3
|
||||
|
||||
# -- Логгеры --
|
||||
loggers:
|
||||
'':
|
||||
root:
|
||||
handlers: [console, file]
|
||||
level: INFO
|
||||
propagate: False
|
||||
|
||||
loggers:
|
||||
__main__:
|
||||
handlers: [console, file]
|
||||
level: DEBUG
|
||||
propagate: False
|
||||
|
||||
config_manager:
|
||||
config_manager.src.config_manager.v2.manager:
|
||||
handlers: [console, file]
|
||||
level: DEBUG
|
||||
|
||||
|
||||
@@ -1,34 +1,54 @@
|
||||
#import os
|
||||
#os.chdir(os.path.dirname(__file__))
|
||||
# import os
|
||||
# os.chdir(os.path.dirname(__file__))
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from config_manager import ConfigManager
|
||||
import logging
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
|
||||
from config_manager.v2.control import HttpControlChannel
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
class MyApp(ConfigManager):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.iter = 0
|
||||
|
||||
def execute(self) -> None:
|
||||
logger.info(f"current iteration {self.iter}")
|
||||
"""Успешный прогон сбрасывает таймер health (обновляет время последнего успеха)."""
|
||||
#logger.critical("current iteration %s", self.iter)
|
||||
#logger.error("current iteration %s", self.iter)
|
||||
logger.warning("current iteration %s", self.iter)
|
||||
#logger.info("current iteration %s", self.iter)
|
||||
#logger.debug("current iteration %s", self.iter)
|
||||
self.iter += 1
|
||||
|
||||
async def main():
|
||||
app = MyApp("config.yaml")
|
||||
logger.info("App started")
|
||||
await app.start()
|
||||
|
||||
logger.info("App finished")
|
||||
async def main() -> None:
|
||||
config_path = Path(__file__).parent / "config.yaml"
|
||||
print(config_path)
|
||||
app = MyApp(
|
||||
str(config_path),
|
||||
control_channels=lambda m: [
|
||||
HttpControlChannel(
|
||||
host="0.0.0.0",
|
||||
port=8000,
|
||||
timeout=3,
|
||||
health_provider=m.get_health_provider(),
|
||||
)
|
||||
],
|
||||
)
|
||||
logger.info("App starting")
|
||||
# Менеджер запускаем в фоне (start() не возвращает управление до stop).
|
||||
asyncio.create_task(app.start())
|
||||
|
||||
logger.info("App running; Ctrl+C to stop")
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
asyncio.run(main())
|
||||
|
||||
@@ -4,6 +4,9 @@ from config_manager.v2 import ConfigManagerV2
|
||||
|
||||
|
||||
class ReloadApp(ConfigManagerV2):
|
||||
DEFAULT_UPDATE_INTERVAL = 0.05
|
||||
DEFAULT_WORK_INTERVAL = 0.2
|
||||
|
||||
def execute(self) -> None:
|
||||
return
|
||||
|
||||
@@ -11,7 +14,7 @@ class ReloadApp(ConfigManagerV2):
|
||||
def test_invalid_config_keeps_last_valid(tmp_path):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("work_interval: 0.2\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
|
||||
|
||||
app = ReloadApp(str(cfg))
|
||||
runner = asyncio.create_task(app.start())
|
||||
|
||||
@@ -4,6 +4,9 @@ from config_manager.v2 import ConfigManagerV2
|
||||
|
||||
|
||||
class DemoApp(ConfigManagerV2):
|
||||
DEFAULT_UPDATE_INTERVAL = 0.05
|
||||
DEFAULT_WORK_INTERVAL = 0.05
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.calls = 0
|
||||
@@ -15,7 +18,7 @@ class DemoApp(ConfigManagerV2):
|
||||
def test_execute_loop_runs(tmp_path):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
|
||||
|
||||
app = DemoApp(str(cfg))
|
||||
runner = asyncio.create_task(app.start())
|
||||
|
||||
@@ -23,6 +23,9 @@ class DummyControlChannel(ControlChannel):
|
||||
|
||||
|
||||
class ControlledApp(ConfigManagerV2):
|
||||
DEFAULT_UPDATE_INTERVAL = 0.05
|
||||
DEFAULT_WORK_INTERVAL = 0.05
|
||||
|
||||
def execute(self) -> None:
|
||||
return
|
||||
|
||||
@@ -30,10 +33,10 @@ class ControlledApp(ConfigManagerV2):
|
||||
def test_control_channel_can_stop_manager(tmp_path):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
|
||||
|
||||
channel = DummyControlChannel()
|
||||
app = ControlledApp(str(cfg), control_channel=channel)
|
||||
app = ControlledApp(str(cfg), control_channels=[channel])
|
||||
|
||||
runner = asyncio.create_task(app.start())
|
||||
await asyncio.sleep(0.12)
|
||||
@@ -44,11 +47,14 @@ def test_control_channel_can_stop_manager(tmp_path):
|
||||
|
||||
status_text = await channel.on_status()
|
||||
assert "state=running" in status_text
|
||||
assert "worker_inflight=" in status_text
|
||||
assert "worker_timed_out_inflight=" in status_text
|
||||
|
||||
stop_text = await channel.on_stop()
|
||||
assert "stop signal accepted" in stop_text
|
||||
|
||||
await runner
|
||||
await app.stop()
|
||||
assert channel.stopped is True
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
from config_manager.v2.health import HealthServer
|
||||
from config_manager.v2.control import HttpControlChannel
|
||||
|
||||
|
||||
def test_health_mapping_ok_to_200():
|
||||
@@ -8,10 +9,9 @@ def test_health_mapping_ok_to_200():
|
||||
return {"status": "ok"}
|
||||
|
||||
async def scenario() -> None:
|
||||
server = HealthServer(
|
||||
server = HttpControlChannel(
|
||||
host="127.0.0.1",
|
||||
port=8000,
|
||||
path="/health",
|
||||
timeout=0.2,
|
||||
health_provider=provider,
|
||||
)
|
||||
@@ -27,10 +27,9 @@ def test_health_mapping_unhealthy_to_503():
|
||||
return {"status": "unhealthy", "detail": "worker failed"}
|
||||
|
||||
async def scenario() -> None:
|
||||
server = HealthServer(
|
||||
server = HttpControlChannel(
|
||||
host="127.0.0.1",
|
||||
port=8000,
|
||||
path="/health",
|
||||
timeout=0.2,
|
||||
health_provider=provider,
|
||||
)
|
||||
@@ -39,3 +38,64 @@ def test_health_mapping_unhealthy_to_503():
|
||||
assert payload["status"] == "unhealthy"
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_action_routes_call_callbacks():
|
||||
events: list[str] = []
|
||||
|
||||
async def provider():
|
||||
return {"status": "ok"}
|
||||
|
||||
async def on_start() -> str:
|
||||
events.append("start")
|
||||
return "start accepted"
|
||||
|
||||
async def on_stop() -> str:
|
||||
events.append("stop")
|
||||
return "stop accepted"
|
||||
|
||||
async def on_status() -> str:
|
||||
return "ok"
|
||||
|
||||
async def request(port: int, path: str) -> tuple[int, dict[str, str]]:
|
||||
reader, writer = await asyncio.open_connection("127.0.0.1", port)
|
||||
writer.write(
|
||||
f"GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n".encode("utf-8")
|
||||
)
|
||||
await writer.drain()
|
||||
raw = await reader.read()
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
|
||||
header, body = raw.split(b"\r\n\r\n", maxsplit=1)
|
||||
status_code = int(header.split(b" ")[1])
|
||||
payload = json.loads(body.decode("utf-8"))
|
||||
return status_code, payload
|
||||
|
||||
async def scenario() -> None:
|
||||
channel = HttpControlChannel(
|
||||
host="127.0.0.1",
|
||||
port=0,
|
||||
timeout=0.2,
|
||||
health_provider=provider,
|
||||
)
|
||||
await channel.start(on_start, on_stop, on_status)
|
||||
try:
|
||||
port = channel.port
|
||||
assert port > 0
|
||||
|
||||
start_code, start_payload = await request(port, "/actions/start")
|
||||
stop_code, stop_payload = await request(port, "/actions/stop")
|
||||
finally:
|
||||
await channel.stop()
|
||||
|
||||
assert start_code == 200
|
||||
assert start_payload["status"] == "ok"
|
||||
assert start_payload["detail"] == "start accepted"
|
||||
|
||||
assert stop_code == 200
|
||||
assert stop_payload["status"] == "ok"
|
||||
assert stop_payload["detail"] == "stop accepted"
|
||||
assert events == ["start", "stop"]
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
153
tests/v2/test_runtime_resilience.py
Normal file
153
tests/v2/test_runtime_resilience.py
Normal file
@@ -0,0 +1,153 @@
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
|
||||
from config_manager.v2 import ConfigManagerV2
|
||||
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
|
||||
|
||||
|
||||
class DummyControlChannel(ControlChannel):
|
||||
def __init__(self):
|
||||
self.on_start: StartHandler | None = None
|
||||
self.on_stop: StopHandler | None = None
|
||||
self.on_status: StatusHandler | None = None
|
||||
self.started = False
|
||||
self.stopped = False
|
||||
|
||||
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||
self.on_start = on_start
|
||||
self.on_stop = on_stop
|
||||
self.on_status = on_status
|
||||
self.started = True
|
||||
|
||||
async def stop(self) -> None:
|
||||
self.stopped = True
|
||||
|
||||
|
||||
class RestartableApp(ConfigManagerV2):
|
||||
DEFAULT_UPDATE_INTERVAL = 0.05
|
||||
DEFAULT_WORK_INTERVAL = 0.05
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.calls = 0
|
||||
|
||||
def execute(self) -> None:
|
||||
self.calls += 1
|
||||
|
||||
|
||||
class TimeoutAwareApp(ConfigManagerV2):
|
||||
DEFAULT_UPDATE_INTERVAL = 0.05
|
||||
DEFAULT_WORK_INTERVAL = 0.02
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.calls = 0
|
||||
self.active = 0
|
||||
self.max_active = 0
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def execute(self) -> None:
|
||||
with self._lock:
|
||||
self.calls += 1
|
||||
self.active += 1
|
||||
self.max_active = max(self.max_active, self.active)
|
||||
try:
|
||||
time.sleep(0.2)
|
||||
finally:
|
||||
with self._lock:
|
||||
self.active -= 1
|
||||
|
||||
|
||||
class NormalSingleThreadApp(ConfigManagerV2):
|
||||
DEFAULT_UPDATE_INTERVAL = 0.05
|
||||
DEFAULT_WORK_INTERVAL = 0.02
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.calls = 0
|
||||
self.active = 0
|
||||
self.max_active = 0
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def execute(self) -> None:
|
||||
with self._lock:
|
||||
self.calls += 1
|
||||
self.active += 1
|
||||
self.max_active = max(self.max_active, self.active)
|
||||
try:
|
||||
time.sleep(0.03)
|
||||
finally:
|
||||
with self._lock:
|
||||
self.active -= 1
|
||||
|
||||
|
||||
def test_control_channel_stop_and_start_resumes_execute(tmp_path):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
|
||||
|
||||
channel = DummyControlChannel()
|
||||
app = RestartableApp(str(cfg), control_channels=[channel])
|
||||
await app.start()
|
||||
await asyncio.sleep(0.2)
|
||||
before_stop = app.calls
|
||||
assert before_stop > 0
|
||||
|
||||
assert channel.on_stop is not None
|
||||
assert channel.on_start is not None
|
||||
stop_text = await channel.on_stop()
|
||||
assert "stop signal accepted" in stop_text
|
||||
|
||||
await asyncio.sleep(0.2)
|
||||
after_stop = app.calls
|
||||
assert after_stop == before_stop
|
||||
|
||||
start_text = await channel.on_start()
|
||||
assert "start signal accepted" in start_text
|
||||
await asyncio.sleep(0.2)
|
||||
assert app.calls > after_stop
|
||||
|
||||
await app.stop()
|
||||
assert channel.stopped is True
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_normal_mode_uses_single_inflight_execute(tmp_path):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
|
||||
|
||||
app = NormalSingleThreadApp(str(cfg))
|
||||
await app.start()
|
||||
await asyncio.sleep(0.25)
|
||||
health = await app.get_health_provider()()
|
||||
await app.stop()
|
||||
|
||||
assert app.calls >= 2
|
||||
assert app.max_active == 1
|
||||
assert health["status"] == "ok"
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_execute_timeout_does_not_start_parallel_runs(tmp_path, monkeypatch):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
|
||||
|
||||
monkeypatch.setenv("EXECUTE_TIMEOUT", "0.05")
|
||||
app = TimeoutAwareApp(str(cfg))
|
||||
await app.start()
|
||||
await asyncio.sleep(0.35)
|
||||
degraded_health = await app.get_health_provider()()
|
||||
await app.stop()
|
||||
|
||||
assert app.calls >= 1
|
||||
assert app._last_execute_error is not None
|
||||
assert "did not finish within" in app._last_execute_error
|
||||
assert app.max_active == 2
|
||||
assert degraded_health["status"] == "degraded"
|
||||
|
||||
asyncio.run(scenario())
|
||||
@@ -6,6 +6,9 @@ from config_manager.v2 import ConfigManagerV2
|
||||
|
||||
|
||||
class BlockingApp(ConfigManagerV2):
|
||||
DEFAULT_UPDATE_INTERVAL = 0.05
|
||||
DEFAULT_WORK_INTERVAL = 0.05
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.started_event = threading.Event()
|
||||
@@ -23,7 +26,7 @@ class BlockingApp(ConfigManagerV2):
|
||||
def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
|
||||
|
||||
app = BlockingApp(str(cfg))
|
||||
runner = asyncio.create_task(app.start())
|
||||
|
||||
Reference in New Issue
Block a user