Compare commits
15 Commits
releases/v
...
2d6179d366
| Author | SHA1 | Date | |
|---|---|---|---|
| 2d6179d366 | |||
| d888ae7acb | |||
| 1d71ce406f | |||
| 80dd69c5ec | |||
| 8da6df0b2a | |||
| 7eb3476b96 | |||
| da8ed4fa2b | |||
| 8f22fcf6af | |||
| 311870fd73 | |||
| ffd758d9a4 | |||
| 526661e498 | |||
| b0c87a427c | |||
| 7b74e0b0b8 | |||
| 5faea8f69f | |||
| 4eb9327628 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1,4 +1,5 @@
|
||||
__pycache__
|
||||
venv/
|
||||
.venv/
|
||||
.vscode/
|
||||
log*.log
|
||||
config_manager.egg-info
|
||||
29
README_DEPLOY.md
Normal file
29
README_DEPLOY.md
Normal file
@@ -0,0 +1,29 @@
|
||||
# Деплой / Deploy
|
||||
|
||||
Краткое описание процесса деплоя приложений на базе config_manager и используемых скриптов.
|
||||
|
||||
A short description of the deploy process for applications based on config_manager and the scripts used.
|
||||
|
||||
---
|
||||
|
||||
## Healthcheck
|
||||
|
||||
После поднятия контейнеров (`docker compose up -d`) скрипт деплоя может ожидать успешного ответа по URL проверки здоровья приложения. При таймауте выполняется откат и выход с ошибкой.
|
||||
|
||||
After bringing up containers (`docker compose up -d`), the deploy script may wait for a successful response at the application health-check URL. On timeout, rollback is performed and the script exits with an error.
|
||||
|
||||
**Полная спецификация:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md).
|
||||
|
||||
**Full specification:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md).
|
||||
|
||||
### Переменные окружения / Environment variables
|
||||
|
||||
| Переменная | Назначение | Пример/дефолт |
|
||||
| ----------------------- | ---------------------------------------- | ------------------------------- |
|
||||
| `HEALTHCHECK_URL` | URL для проверки здоровья | `http://127.0.0.1:8000/health` |
|
||||
| `HEALTHCHECK_TIMEOUT` | Максимальное время ожидания (секунды) | `120` |
|
||||
| `HEALTHCHECK_INTERVAL` | Интервал между попытками (секунды) | `5` |
|
||||
|
||||
Если задана `HEALTHCHECK_URL`, деплой после поднятия контейнеров вызывает этот URL (например, через `curl -fsS --max-time 5`); успех — HTTP 2xx, иначе повтор до истечения `HEALTHCHECK_TIMEOUT`.
|
||||
|
||||
If `HEALTHCHECK_URL` is set, deploy calls this URL after bringing up containers (e.g. via `curl -fsS --max-time 5`); success is HTTP 2xx, otherwise retries until `HEALTHCHECK_TIMEOUT` expires.
|
||||
95
docs/HEALTHCHECK_REQUIREMENTS.md
Normal file
95
docs/HEALTHCHECK_REQUIREMENTS.md
Normal file
@@ -0,0 +1,95 @@
|
||||
# Требования к healthcheck / Healthcheck Requirements
|
||||
|
||||
Единая спецификация для реализации healthcheck в config_manager и в приложениях (в т.ч. MailOrderBot).
|
||||
|
||||
A unified specification for implementing healthcheck in config_manager and in applications (including MailOrderBot).
|
||||
|
||||
---
|
||||
|
||||
## 1. Назначение / Purpose
|
||||
|
||||
- Healthcheck используется скриптом деплоя (`deploy.sh`): после `docker compose up -d` деплой ждёт успешного ответа по `HEALTHCHECK_URL`; при таймауте — откат и выход с ошибкой.
|
||||
- The healthcheck is used by the deploy script (`deploy.sh`): after `docker compose up -d`, the deploy waits for a successful response at `HEALTHCHECK_URL`; on timeout — rollback and exit with error.
|
||||
|
||||
- Эндпоинт должен отражать **реальное состояние приложения**, а не только факт работы HTTP-сервера (иначе деплой может считать успешным запуск «зависшего» или упавшего воркера).
|
||||
- The endpoint must reflect the **actual state of the application**, not just that the HTTP server is running (otherwise deploy may consider successful a «hung» or crashed worker).
|
||||
|
||||
---
|
||||
|
||||
## 2. Поведение deploy.sh / deploy.sh behaviour
|
||||
|
||||
(Логика уже может быть реализована в коде deploy-скрипта.)
|
||||
|
||||
- Если задана переменная `HEALTHCHECK_URL` — после поднятия контейнеров вызывается `wait_for_healthcheck`: цикл с интервалом `HEALTHCHECK_INTERVAL` (по умолчанию 5 с), пока не истечёт `HEALTHCHECK_TIMEOUT` (по умолчанию 120 с).
|
||||
- If `HEALTHCHECK_URL` is set — after bringing up containers, `wait_for_healthcheck` is called: a loop with interval `HEALTHCHECK_INTERVAL` (default 5 s) until `HEALTHCHECK_TIMEOUT` (default 120 s) expires.
|
||||
|
||||
- Проверка: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Флаг `-f`: любой HTTP-код 4xx/5xx считается ошибкой (повтор до таймаута).
|
||||
- Check: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Flag `-f`: any HTTP 4xx/5xx is treated as failure (retry until timeout).
|
||||
|
||||
- **Успех:** HTTP 2xx. **Неуспех:** не 2xx или таймаут curl/соединения → деплой падает с откатом.
|
||||
- **Success:** HTTP 2xx. **Failure:** non-2xx or curl/connection timeout → deploy fails with rollback.
|
||||
|
||||
### Переменные окружения / Environment variables
|
||||
|
||||
| Переменная / Variable | Назначение / Purpose | Пример/дефолт / Example/default |
|
||||
| ------------------------- | --------------------------------- | ----------------------------------- |
|
||||
| `HEALTHCHECK_URL` | URL для проверки / Check URL | `http://127.0.0.1:8000/health` |
|
||||
| `HEALTHCHECK_TIMEOUT` | Макс. время ожидания (сек) / Max wait (s) | `120` |
|
||||
| `HEALTHCHECK_INTERVAL` | Интервал между попытками (сек) / Interval between attempts (s) | `5` |
|
||||
|
||||
---
|
||||
|
||||
## 3. Контракт эндпоинта / Endpoint contract
|
||||
|
||||
- **Метод и путь / Method and path:** `GET /health` (или иной путь по соглашению; один для всех приложений на config_manager).
|
||||
- **Method and path:** `GET /health` (or another agreed path; one for all applications using config_manager).
|
||||
|
||||
- **Успех (приложение в порядке) / Success (application healthy):** HTTP **200**, опционально тело JSON: `{"status": "ok"}`.
|
||||
- **Success (application healthy):** HTTP **200**, optional JSON body: `{"status": "ok"}`.
|
||||
|
||||
- **Приложение не в порядке / Application not healthy:** HTTP **503**, опционально тело: `{"status": "unhealthy"|"degraded", "detail": "причина"}`.
|
||||
- **Application not healthy:** HTTP **503**, optional body: `{"status": "unhealthy"|"degraded", "detail": "reason"}`.
|
||||
|
||||
- Ответ должен приходить в разумное время (рекомендуемый таймаут вызова логики проверки в config_manager — 2–5 с), иначе deploy через `curl --max-time 5` получит таймаут и будет повторять запросы.
|
||||
- The response must arrive within a reasonable time (recommended timeout for the check logic in config_manager — 2–5 s); otherwise deploy will get a timeout via `curl --max-time 5` and will retry.
|
||||
|
||||
---
|
||||
|
||||
## 4. Обратная связь от приложения (config_manager) / Application feedback (config_manager)
|
||||
|
||||
- Эндпоинт реализуется в **config_manager** (опционально, при включённой опции).
|
||||
- The endpoint is implemented in **config_manager** (optional, when the option is enabled).
|
||||
|
||||
- При обработке `GET /health` config_manager **не решает сам** «здорово ли приложение», а вызывает метод приложения, например: `app.get_health_status()`.
|
||||
- When handling `GET /health`, config_manager does **not** decide by itself whether the application is healthy; it calls the application method, e.g. `app.get_health_status()`.
|
||||
|
||||
- **Контракт метода / Method contract** (приложение переопределяет в наследнике):
|
||||
- **Method contract** (application overrides in subclass):
|
||||
- Возвращает `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (поле `detail` опционально).
|
||||
- Returns `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (`detail` optional).
|
||||
- При исключении или превышении таймаута вызова — считать состояние unhealthy и отдавать 503.
|
||||
- On exception or call timeout — treat as unhealthy and return 503.
|
||||
|
||||
Таким образом, основное приложение «даёт обратную связь» через реализацию `get_health_status()` (флаги после сбоев, проверка БД, heartbeat и т.д.).
|
||||
|
||||
Thus, the main application provides feedback via `get_health_status()` (flags after failures, DB check, heartbeat, etc.).
|
||||
|
||||
---
|
||||
|
||||
## 5. Требования к приложению (например, MailOrderBot) / Application requirements (e.g. MailOrderBot)
|
||||
|
||||
- Переопределить `get_health_status()` и возвращать:
|
||||
- Override `get_health_status()` and return:
|
||||
- `{"status": "ok"}` при нормальной работе;
|
||||
- `{"status": "ok"}` when running normally;
|
||||
- `{"status": "unhealthy", "detail": "..."}` при критичном сбое (например, последний `execute()` упал, зависимость недоступна);
|
||||
- `{"status": "unhealthy", "detail": "..."}` on critical failure (e.g. last `execute()` failed, dependency unavailable);
|
||||
- при желании — `{"status": "degraded", "detail": "..."}` для нефатальной деградации (в обоих случаях эндпоинт отдаёт 503 для совместимости с `curl -f`).
|
||||
- optionally — `{"status": "degraded", "detail": "..."}` for non-fatal degradation (in both cases the endpoint returns 503 for compatibility with `curl -f`).
|
||||
|
||||
---
|
||||
|
||||
## 6. Инфраструктура / Infrastructure
|
||||
|
||||
- Для работы healthcheck приложение должно поднимать HTTP-сервер (в config_manager при включённой опции) и пробрасывать порт в `docker-compose` (например `8000:8000`), чтобы `deploy.sh` на хосте мог обращаться по `HEALTHCHECK_URL` (например `http://127.0.0.1:8000/health`).
|
||||
- For healthcheck to work, the application must run an HTTP server (in config_manager when the option is enabled) and expose the port in `docker-compose` (e.g. `8000:8000`), so that `deploy.sh` on the host can call `HEALTHCHECK_URL` (e.g. `http://127.0.0.1:8000/health`).
|
||||
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "config_manager"
|
||||
version = "1.1.0"
|
||||
version = "2.0.0"
|
||||
description = "Config manager for building applications"
|
||||
authors = [
|
||||
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
|
||||
@@ -13,6 +13,8 @@ readme = "README.md"
|
||||
requires-python = ">=3.8"
|
||||
dependencies = [
|
||||
"PyYAML>=6.0",
|
||||
"fastapi>=0.100.0",
|
||||
"uvicorn[standard]>=0.22.0",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
from .config_manager import ConfigManager
|
||||
from .log_manager import LogManager
|
||||
3
src/config_manager/__init__.py
Normal file
3
src/config_manager/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .v2 import ConfigManagerV2 as ConfigManager
|
||||
from .v1.cfg_manager import ConfigManager as LegacyConfigManager
|
||||
from .v1.log_manager import LogManager
|
||||
@@ -104,8 +104,7 @@ class ConfigManager:
|
||||
finally:
|
||||
self.logger.info("ConfigManager stopped")
|
||||
|
||||
def start(self) -> None:
|
||||
"""Запускает менеджер конфигурации в текущем event loop"""
|
||||
async def start(self) -> None:
|
||||
if self._task is not None and not self._task.done():
|
||||
self.logger.warning("ConfigManager is already running")
|
||||
return
|
||||
@@ -116,8 +115,9 @@ class ConfigManager:
|
||||
self.logger.error("start() must be called from within an async context")
|
||||
raise
|
||||
|
||||
self._task = self._loop.create_task(self._run())
|
||||
self.logger.info("ConfigManager task created")
|
||||
self.logger.info("ConfigManager starting and awaiting _run()")
|
||||
await self._run()
|
||||
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Останавливает менеджер конфигурации и ожидает завершения"""
|
||||
7
src/config_manager/v2/__init__.py
Normal file
7
src/config_manager/v2/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""Публичный API V2: точка входа в менеджер конфигурации и настройки management-сервера.
|
||||
|
||||
Экспортирует ConfigManagerV2 и типы настроек для использования приложениями."""
|
||||
from .core import ConfigManagerV2
|
||||
from .types import HealthServerSettings, ManagementServerSettings
|
||||
|
||||
__all__ = ["ConfigManagerV2", "ManagementServerSettings", "HealthServerSettings"]
|
||||
7
src/config_manager/v2/control/__init__.py
Normal file
7
src/config_manager/v2/control/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""Каналы внешнего управления: абстракция и реализация (например, Telegram).
|
||||
|
||||
Позволяет запускать, останавливать и запрашивать статус менеджера через ботов и другие интерфейсы."""
|
||||
from .base import ControlChannel
|
||||
from .telegram import TelegramControlChannel
|
||||
|
||||
__all__ = ["ControlChannel", "TelegramControlChannel"]
|
||||
24
src/config_manager/v2/control/base.py
Normal file
24
src/config_manager/v2/control/base.py
Normal file
@@ -0,0 +1,24 @@
|
||||
"""Базовый абстрактный канал управления и типы обработчиков команд.
|
||||
|
||||
Определяет контракт: старт/стоп канала и привязка обработчиков start/stop/status."""
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Awaitable, Callable
|
||||
|
||||
|
||||
StartHandler = Callable[[], Awaitable[str]]
|
||||
StopHandler = Callable[[], Awaitable[str]]
|
||||
StatusHandler = Callable[[], Awaitable[str]]
|
||||
|
||||
|
||||
class ControlChannel(ABC):
|
||||
@abstractmethod
|
||||
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||
"""Запустить канал и привязать обработчики команд."""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def stop(self) -> None:
|
||||
"""Остановить канал и освободить его ресурсы."""
|
||||
raise NotImplementedError
|
||||
114
src/config_manager/v2/control/telegram.py
Normal file
114
src/config_manager/v2/control/telegram.py
Normal file
@@ -0,0 +1,114 @@
|
||||
"""Реализация канала управления через Telegram Bot API (long polling).
|
||||
|
||||
Принимает команды /start, /stop, /status в указанном чате и вызывает привязанные обработчики."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
from typing import Optional
|
||||
|
||||
from .base import ControlChannel, StartHandler, StatusHandler, StopHandler
|
||||
|
||||
|
||||
class TelegramControlChannel(ControlChannel):
|
||||
def __init__(
|
||||
self,
|
||||
token: str,
|
||||
chat_id: int,
|
||||
poll_interval: float = 2.0,
|
||||
logger: Optional[logging.Logger] = None,
|
||||
):
|
||||
"""Инициализация канала опроса Telegram с настройками бота и чата."""
|
||||
self._token = token
|
||||
self._chat_id = chat_id
|
||||
self._poll_interval = poll_interval
|
||||
self._offset: Optional[int] = None
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._stop_event = asyncio.Event()
|
||||
self._on_start: Optional[StartHandler] = None
|
||||
self._on_stop: Optional[StopHandler] = None
|
||||
self._on_status: Optional[StatusHandler] = None
|
||||
self._logger = logger or logging.getLogger(__name__)
|
||||
|
||||
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||
"""Запустить опрос обновлений Telegram и зарегистрировать колбэки команд."""
|
||||
if self._task is not None and not self._task.done():
|
||||
return
|
||||
self._on_start = on_start
|
||||
self._on_stop = on_stop
|
||||
self._on_status = on_status
|
||||
self._stop_event.clear()
|
||||
self._task = asyncio.create_task(self._poll_loop())
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Остановить цикл опроса и дождаться завершения задачи."""
|
||||
self._stop_event.set()
|
||||
if self._task is not None:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
|
||||
async def _poll_loop(self) -> None:
|
||||
"""Непрерывно получать обновления и вызывать поддерживаемые команды."""
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
updates = await asyncio.to_thread(self._fetch_updates)
|
||||
for update in updates:
|
||||
await self._process_update(update)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
self._logger.warning("Telegram polling error: %s", exc)
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1))
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
def _fetch_updates(self) -> list[dict]:
|
||||
"""Запросить новые обновления Telegram с учётом последнего offset."""
|
||||
params = {"timeout": 0}
|
||||
if self._offset is not None:
|
||||
params["offset"] = self._offset
|
||||
query = urllib.parse.urlencode(params)
|
||||
url = f"https://api.telegram.org/bot{self._token}/getUpdates?{query}"
|
||||
with urllib.request.urlopen(url, timeout=10) as response:
|
||||
payload = json.loads(response.read().decode("utf-8"))
|
||||
|
||||
result = payload.get("result", [])
|
||||
if result:
|
||||
self._offset = max(item["update_id"] for item in result) + 1
|
||||
return result
|
||||
|
||||
async def _process_update(self, update: dict) -> None:
|
||||
"""Обработать одно обновление Telegram и выполнить соответствующую команду."""
|
||||
message = update.get("message") or {}
|
||||
text = (message.get("text") or "").strip().lower()
|
||||
chat = message.get("chat") or {}
|
||||
chat_id = chat.get("id")
|
||||
|
||||
if chat_id != self._chat_id:
|
||||
return
|
||||
|
||||
if text in {"/start", "/run"} and self._on_start is not None:
|
||||
reply = await self._on_start()
|
||||
elif text in {"/stop", "/halt"} and self._on_stop is not None:
|
||||
reply = await self._on_stop()
|
||||
elif text in {"/status", "/health"} and self._on_status is not None:
|
||||
reply = await self._on_status()
|
||||
else:
|
||||
return
|
||||
|
||||
await asyncio.to_thread(self._send_message, reply)
|
||||
|
||||
def _send_message(self, text: str) -> None:
|
||||
"""Отправить текстовый ответ в настроенный чат Telegram."""
|
||||
encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text})
|
||||
url = f"https://api.telegram.org/bot{self._token}/sendMessage"
|
||||
req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST")
|
||||
with urllib.request.urlopen(req, timeout=10):
|
||||
return
|
||||
8
src/config_manager/v2/core/__init__.py
Normal file
8
src/config_manager/v2/core/__init__.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""Ядро V2: жизненный цикл менеджера, загрузка конфига и цикл воркера.
|
||||
|
||||
Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute()."""
|
||||
from .config_loader import ConfigLoader
|
||||
from .manager import ConfigManagerV2
|
||||
from .scheduler import WorkerLoop
|
||||
|
||||
__all__ = ["ConfigLoader", "ConfigManagerV2", "WorkerLoop"]
|
||||
70
src/config_manager/v2/core/config_loader.py
Normal file
70
src/config_manager/v2/core/config_loader.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""Загрузчик конфигурации из файла (YAML/JSON) с обнаружением изменений по хешу.
|
||||
|
||||
Читает файл синхронно и асинхронно, парсит по расширению и возвращает последний валидный конфиг при ошибках."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
from typing import Any, Optional
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
class ConfigLoader:
|
||||
def __init__(self, path: str):
|
||||
"""Инициализация состояния загрузчика для указанного пути к файлу конфига."""
|
||||
self.path = path
|
||||
self.config: Any = None
|
||||
self.last_valid_config: Any = None
|
||||
self._last_seen_hash: Optional[str] = None
|
||||
|
||||
def _read_file_sync(self) -> str:
|
||||
"""Синхронно прочитать сырой текст конфига с диска."""
|
||||
with open(self.path, "r", encoding="utf-8") as fh:
|
||||
return fh.read()
|
||||
|
||||
async def read_file_async(self) -> str:
|
||||
"""Прочитать сырой текст конфига с диска в рабочем потоке."""
|
||||
return await asyncio.to_thread(self._read_file_sync)
|
||||
|
||||
def parse_config(self, data: str) -> Any:
|
||||
"""Распарсить текст конфига как YAML или JSON по расширению файла."""
|
||||
extension = os.path.splitext(self.path)[1].lower()
|
||||
if extension in (".yaml", ".yml"):
|
||||
return yaml.safe_load(data)
|
||||
return json.loads(data)
|
||||
|
||||
@staticmethod
|
||||
def _calculate_hash(data: str) -> str:
|
||||
"""Вычислить устойчивый хеш содержимого для обнаружения изменений."""
|
||||
return hashlib.sha256(data.encode("utf-8")).hexdigest()
|
||||
|
||||
async def load_if_changed(self) -> tuple[bool, Any]:
|
||||
"""Загрузить и распарсить конфиг только при изменении содержимого файла."""
|
||||
raw_data = await self.read_file_async()
|
||||
current_hash = self._calculate_hash(raw_data)
|
||||
if current_hash == self._last_seen_hash:
|
||||
return False, self.config
|
||||
|
||||
self._last_seen_hash = current_hash
|
||||
parsed = self.parse_config(raw_data)
|
||||
self.config = parsed
|
||||
self.last_valid_config = parsed
|
||||
return True, parsed
|
||||
|
||||
|
||||
def extract_scheduler_intervals(
|
||||
config: Any,
|
||||
default_update: float,
|
||||
default_work: float,
|
||||
) -> tuple[float, float]:
|
||||
"""Извлекает update_interval и work_interval из конфига (dict). Возвращает значения по умолчанию при ошибке."""
|
||||
if not isinstance(config, dict):
|
||||
return default_update, default_work
|
||||
upd = config.get("update_interval")
|
||||
wrk = config.get("work_interval")
|
||||
u = float(upd) if isinstance(upd, (int, float)) and upd > 0 else default_update
|
||||
w = float(wrk) if isinstance(wrk, (int, float)) and wrk > 0 else default_work
|
||||
return u, w
|
||||
233
src/config_manager/v2/core/manager.py
Normal file
233
src/config_manager/v2/core/manager.py
Normal file
@@ -0,0 +1,233 @@
|
||||
"""Главный класс менеджера V2: оркестрация жизненного цикла, конфига, API и каналов управления.
|
||||
|
||||
Запускает воркер и периодическое обновление конфига, поднимает management-сервер и control-канал при наличии настроек."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import Any, Optional
|
||||
|
||||
from ...v1.log_manager import LogManager
|
||||
from ..control.base import ControlChannel
|
||||
from ..management import (
|
||||
ControlChannelBridge,
|
||||
HealthAggregator,
|
||||
ManagementApiBridge,
|
||||
ManagementServer,
|
||||
)
|
||||
from ..types import HealthPayload, LifecycleState, ManagementServerSettings
|
||||
from .config_loader import ConfigLoader, extract_scheduler_intervals
|
||||
from .scheduler import WorkerLoop
|
||||
|
||||
|
||||
class ConfigManagerV2:
|
||||
DEFAULT_UPDATE_INTERVAL = 5.0
|
||||
DEFAULT_WORK_INTERVAL = 2.0
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
path: str,
|
||||
log_manager: Optional[LogManager] = None,
|
||||
management_settings: Optional[ManagementServerSettings] = None,
|
||||
control_channel: Optional[ControlChannel] = None,
|
||||
):
|
||||
"""Инициализация подсистем менеджера и состояния рантайма."""
|
||||
self.path = path
|
||||
self.config: Any = None
|
||||
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
||||
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
||||
|
||||
self._loader = ConfigLoader(path)
|
||||
self._log_manager = log_manager or LogManager()
|
||||
self._control_channel = control_channel
|
||||
|
||||
self._halt = asyncio.Event()
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
|
||||
self._state = LifecycleState.IDLE
|
||||
self._last_execute_error: Optional[str] = None
|
||||
self._last_success_timestamp: Optional[float] = None
|
||||
|
||||
self._management_settings = management_settings or ManagementServerSettings(enabled=True)
|
||||
self._health_timeout = self._management_settings.health_timeout
|
||||
self._health_aggregator = HealthAggregator(
|
||||
get_state=lambda: self._state,
|
||||
get_last_error=lambda: self._last_execute_error,
|
||||
get_last_success_timestamp=lambda: self._last_success_timestamp,
|
||||
health_timeout=self._health_timeout,
|
||||
get_app_health=self.get_health_status,
|
||||
)
|
||||
self._api_bridge = ManagementApiBridge(start_fn=self.start, stop_fn=self.stop)
|
||||
self._control_bridge = ControlChannelBridge(
|
||||
halt=self._halt,
|
||||
get_state=lambda: self._state,
|
||||
get_status=self._status_text,
|
||||
)
|
||||
self._management_server: Optional[ManagementServer] = None
|
||||
if self._management_settings.enabled:
|
||||
self._management_server = ManagementServer(
|
||||
host=self._management_settings.host,
|
||||
port=self._management_settings.port,
|
||||
timeout=self._management_settings.timeout,
|
||||
health_provider=self._health_aggregator.collect,
|
||||
on_start=self._api_bridge.on_start,
|
||||
on_stop=self._api_bridge.on_stop,
|
||||
)
|
||||
|
||||
self.logger = logging.getLogger(__name__)
|
||||
|
||||
def _apply_config(self, new_config: Any) -> None:
|
||||
"""Применить загруженный конфиг: интервалы и log_manager. Вызывается после load_if_changed."""
|
||||
self.config = new_config
|
||||
self.update_interval, self.work_interval = extract_scheduler_intervals(
|
||||
new_config,
|
||||
self.DEFAULT_UPDATE_INTERVAL,
|
||||
self.DEFAULT_WORK_INTERVAL,
|
||||
)
|
||||
if isinstance(new_config, dict):
|
||||
self._log_manager.apply_config(new_config)
|
||||
|
||||
async def _update_config(self) -> None:
|
||||
"""Перезагрузить конфиг при изменении файла и применить к состоянию и log_manager."""
|
||||
try:
|
||||
changed, new_config = await self._loader.load_if_changed()
|
||||
if not changed:
|
||||
return
|
||||
self._apply_config(new_config)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
self.logger.error("Error reading/parsing config file: %s", exc)
|
||||
if self._loader.last_valid_config is not None:
|
||||
self._apply_config(self._loader.last_valid_config)
|
||||
|
||||
def execute(self) -> None:
|
||||
"""Переопределить в подклассе для реализации одной единицы блокирующей работы."""
|
||||
|
||||
def get_health_status(self) -> HealthPayload:
|
||||
"""Вернуть payload здоровья приложения для /health.
|
||||
|
||||
Варианты ответа по статусу:
|
||||
- ``{"status": "ok"}`` — сервис в норме; GET /health → 200.
|
||||
- ``{"status": "degraded", "detail": "..."}`` — работает с ограничениями; GET /health → 503.
|
||||
- ``{"status": "unhealthy", "detail": "..."}`` — неработоспособен; GET /health → 503.
|
||||
|
||||
Поле ``detail`` опционально; для ``ok`` обычно не задаётся.
|
||||
Переопределить в подклассе для своей логики здоровья."""
|
||||
return {"status": "ok"}
|
||||
|
||||
def _on_execute_success(self) -> None:
|
||||
"""Обновить время последнего успешного execute() и сбросить маркер ошибки."""
|
||||
self._last_success_timestamp = time.monotonic()
|
||||
self._last_execute_error = None
|
||||
|
||||
def _on_execute_error(self, exc: Exception) -> None:
|
||||
"""Сохранить и залогировать детали ошибки выполнения для отчёта здоровья."""
|
||||
self._last_execute_error = str(exc)
|
||||
self.logger.error("Execution error: %s", exc)
|
||||
|
||||
async def _worker_loop(self) -> None:
|
||||
"""Вызывать execute() циклически до запроса остановки."""
|
||||
worker = WorkerLoop(
|
||||
execute=self.execute,
|
||||
get_interval=lambda: self.work_interval,
|
||||
halt_event=self._halt,
|
||||
on_error=self._on_execute_error,
|
||||
on_success=self._on_execute_success,
|
||||
)
|
||||
await worker.run()
|
||||
|
||||
async def _periodic_update_loop(self) -> None:
|
||||
"""Периодически проверять файл конфига на обновления до остановки."""
|
||||
while not self._halt.is_set():
|
||||
await self._update_config()
|
||||
try:
|
||||
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
|
||||
async def _status_text(self) -> str:
|
||||
"""Сформировать читаемый статус рантайма для каналов управления."""
|
||||
health = await self._health_aggregator.collect()
|
||||
detail = health.get("detail")
|
||||
if detail:
|
||||
return f"state={self._state.value}; health={health['status']}; detail={detail}"
|
||||
return f"state={self._state.value}; health={health['status']}"
|
||||
|
||||
async def _start_control_channel(self) -> None:
|
||||
"""Запустить настроенный канал управления с привязанными обработчиками команд."""
|
||||
if self._control_channel is None:
|
||||
return
|
||||
await self._control_channel.start(
|
||||
self._control_bridge.on_start,
|
||||
self._control_bridge.on_stop,
|
||||
self._control_bridge.on_status,
|
||||
)
|
||||
|
||||
async def _stop_control_channel(self) -> None:
|
||||
"""Остановить настроенный канал управления, если он активен."""
|
||||
if self._control_channel is None:
|
||||
return
|
||||
await self._control_channel.stop()
|
||||
|
||||
async def _run(self) -> None:
|
||||
"""Запустить жизненный цикл менеджера и координировать фоновые задачи."""
|
||||
self._state = LifecycleState.STARTING
|
||||
self._halt.clear()
|
||||
await self._update_config()
|
||||
|
||||
if self._management_server is not None:
|
||||
await self._management_server.start()
|
||||
await self._start_control_channel()
|
||||
|
||||
self._state = LifecycleState.RUNNING
|
||||
self.logger.info("ConfigManagerV2 started")
|
||||
|
||||
tasks = [
|
||||
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
|
||||
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
|
||||
]
|
||||
|
||||
try:
|
||||
await asyncio.gather(*tasks)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
finally:
|
||||
self._state = LifecycleState.STOPPING
|
||||
self._halt.set()
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
# Management-сервер и control channel не останавливаем: API и канал управления остаются доступными.
|
||||
self._state = LifecycleState.STOPPED
|
||||
self._task = None
|
||||
self.logger.info("ConfigManagerV2 stopped (API and control channel remain available)")
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Запустить циклы execute и конфига в фоне; возвращает управление сразу (ответ на /actions/start приходит без ожидания)."""
|
||||
if self._task is not None and not self._task.done():
|
||||
self.logger.warning("ConfigManagerV2 is already running")
|
||||
return
|
||||
|
||||
try:
|
||||
self._loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
self.logger.error("start() must be called from within an async context")
|
||||
raise
|
||||
|
||||
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Запросить плавную остановку и дождаться завершения менеджера."""
|
||||
if self._task is None:
|
||||
self.logger.warning("ConfigManagerV2 is not running")
|
||||
return
|
||||
|
||||
self._halt.set()
|
||||
if asyncio.current_task() is self._task:
|
||||
return
|
||||
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
42
src/config_manager/v2/core/scheduler.py
Normal file
42
src/config_manager/v2/core/scheduler.py
Normal file
@@ -0,0 +1,42 @@
|
||||
"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке с паузой между итерациями.
|
||||
|
||||
Поддерживает halt-событие для остановки, колбэки on_success/on_error для учёта ошибок и здоровья."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class WorkerLoop:
|
||||
def __init__(
|
||||
self,
|
||||
execute: Callable[[], None],
|
||||
get_interval: Callable[[], float],
|
||||
halt_event: asyncio.Event,
|
||||
on_error: Optional[Callable[[Exception], None]] = None,
|
||||
on_success: Optional[Callable[[], None]] = None,
|
||||
):
|
||||
"""Сохранить колбэки и примитивы синхронизации для выполнения воркера."""
|
||||
self._execute = execute
|
||||
self._get_interval = get_interval
|
||||
self._halt_event = halt_event
|
||||
self._on_error = on_error
|
||||
self._on_success = on_success
|
||||
|
||||
async def run(self) -> None:
|
||||
"""Вызывать execute циклически до запроса остановки."""
|
||||
while not self._halt_event.is_set():
|
||||
try:
|
||||
await asyncio.to_thread(self._execute)
|
||||
if self._on_success is not None:
|
||||
self._on_success()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
if self._on_error is not None:
|
||||
self._on_error(exc)
|
||||
|
||||
timeout = max(self._get_interval(), 0.01)
|
||||
try:
|
||||
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
14
src/config_manager/v2/management/__init__.py
Normal file
14
src/config_manager/v2/management/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
"""Management API: HTTP-сервер для /health и /actions/start|stop, адаптеры и сбор здоровья.
|
||||
|
||||
Объединяет сервер, мосты к жизненному циклу и агрегатор здоровья для единого контракта API."""
|
||||
from .bridges import ControlChannelBridge, ManagementApiBridge
|
||||
from .health_aggregator import HealthAggregator
|
||||
from .management_server import HealthServer, ManagementServer
|
||||
|
||||
__all__ = [
|
||||
"ControlChannelBridge",
|
||||
"HealthAggregator",
|
||||
"HealthServer",
|
||||
"ManagementApiBridge",
|
||||
"ManagementServer",
|
||||
]
|
||||
61
src/config_manager/v2/management/bridges.py
Normal file
61
src/config_manager/v2/management/bridges.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""Адаптеры, связывающие жизненный цикл менеджера с HTTP API и каналами управления.
|
||||
|
||||
ManagementApiBridge отдаёт start/stop в HTTP; ControlChannelBridge — start/stop/status в Telegram и др."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Awaitable, Callable
|
||||
|
||||
from ..types import LifecycleState
|
||||
|
||||
|
||||
class ManagementApiBridge:
|
||||
"""Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop)."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
start_fn: Callable[[], Awaitable[None]],
|
||||
stop_fn: Callable[[], Awaitable[None]],
|
||||
):
|
||||
self._start_fn = start_fn
|
||||
self._stop_fn = stop_fn
|
||||
|
||||
async def on_start(self) -> str:
|
||||
"""Выполнить start и вернуть сообщение для HTTP-ответа."""
|
||||
await self._start_fn()
|
||||
return "start completed"
|
||||
|
||||
async def on_stop(self) -> str:
|
||||
"""Выполнить stop и вернуть сообщение для HTTP-ответа."""
|
||||
await self._stop_fn()
|
||||
return "stop completed"
|
||||
|
||||
|
||||
class ControlChannelBridge:
|
||||
"""Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram)."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
halt: asyncio.Event,
|
||||
get_state: Callable[[], LifecycleState],
|
||||
get_status: Callable[[], Awaitable[str]],
|
||||
):
|
||||
self._halt = halt
|
||||
self._get_state = get_state
|
||||
self._get_status = get_status
|
||||
|
||||
async def on_start(self) -> str:
|
||||
"""Обработать внешний start: сбросить halt; идемпотентно при уже running."""
|
||||
if self._get_state() == LifecycleState.RUNNING:
|
||||
return "already running"
|
||||
self._halt.clear()
|
||||
return "start signal accepted"
|
||||
|
||||
async def on_stop(self) -> str:
|
||||
"""Обработать внешний stop: установить halt."""
|
||||
self._halt.set()
|
||||
return "stop signal accepted"
|
||||
|
||||
async def on_status(self) -> str:
|
||||
"""Вернуть текущий текст статуса."""
|
||||
return await self._get_status()
|
||||
53
src/config_manager/v2/management/health_aggregator.py
Normal file
53
src/config_manager/v2/management/health_aggregator.py
Normal file
@@ -0,0 +1,53 @@
|
||||
"""Собирает состояние жизненного цикла и здоровья в один ответ для /health.
|
||||
|
||||
Здоровье = был успешный execute() за последние health_timeout секунд; иначе unhealthy с деталью (ошибка или таймаут)."""
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
|
||||
from ..types import HealthPayload, LifecycleState
|
||||
|
||||
|
||||
class HealthAggregator:
|
||||
"""Формирует ответ здоровья по времени последнего успешного execute() и таймауту."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
get_state: Callable[[], LifecycleState],
|
||||
get_last_error: Callable[[], str | None],
|
||||
get_last_success_timestamp: Callable[[], float | None],
|
||||
health_timeout: float,
|
||||
get_app_health: Callable[[], HealthPayload],
|
||||
):
|
||||
self._get_state = get_state
|
||||
self._get_last_error = get_last_error
|
||||
self._get_last_success_timestamp = get_last_success_timestamp
|
||||
self._health_timeout = health_timeout
|
||||
self._get_app_health = get_app_health
|
||||
|
||||
async def collect(self) -> HealthPayload:
|
||||
"""Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state."""
|
||||
state = self._get_state()
|
||||
state_value = state.value
|
||||
|
||||
# Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy.
|
||||
if state != LifecycleState.RUNNING:
|
||||
return {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value}
|
||||
|
||||
last_success = self._get_last_success_timestamp()
|
||||
now = time.monotonic()
|
||||
|
||||
if last_success is None:
|
||||
detail = self._get_last_error() or "no successful run yet"
|
||||
return {"status": "unhealthy", "detail": detail, "state": state_value}
|
||||
|
||||
if (now - last_success) > self._health_timeout:
|
||||
detail = self._get_last_error() or f"no successful run within {self._health_timeout}s"
|
||||
return {"status": "unhealthy", "detail": detail, "state": state_value}
|
||||
|
||||
result = self._get_app_health()
|
||||
status = result.get("status", "unhealthy")
|
||||
if status != "ok":
|
||||
return {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value}
|
||||
return {**result, "state": state_value}
|
||||
145
src/config_manager/v2/management/management_server.py
Normal file
145
src/config_manager/v2/management/management_server.py
Normal file
@@ -0,0 +1,145 @@
|
||||
"""Management HTTP API на FastAPI: эндпоинты /health, /actions/start, /actions/stop.
|
||||
|
||||
Единообразное описание маршрутов через декораторы FastAPI."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import Any, Optional
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import JSONResponse
|
||||
from uvicorn import Config, Server
|
||||
|
||||
from ..types import HealthPayload
|
||||
|
||||
# Захардкоженные эндпоинты management API.
|
||||
PATH_HEALTH = "/health"
|
||||
PATH_ACTION_START = "/actions/start"
|
||||
PATH_ACTION_STOP = "/actions/stop"
|
||||
|
||||
|
||||
class ManagementServer:
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
port: int,
|
||||
timeout: float,
|
||||
health_provider: Callable[[], Awaitable[HealthPayload]],
|
||||
on_start: Optional[Callable[[], Awaitable[str]]] = None,
|
||||
on_stop: Optional[Callable[[], Awaitable[str]]] = None,
|
||||
):
|
||||
"""Настройка параметров и колбэков лёгкого HTTP management-сервера."""
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._timeout = timeout
|
||||
self._health_provider = health_provider
|
||||
self._on_start = on_start
|
||||
self._on_stop = on_stop
|
||||
self._uvicorn_server: Optional[Server] = None
|
||||
self._serve_task: Optional[asyncio.Task[None]] = None
|
||||
self._bound_port: Optional[int] = None
|
||||
self._app = self._create_app()
|
||||
|
||||
def _create_app(self) -> FastAPI:
|
||||
app = FastAPI(title="Config Manager Management API")
|
||||
|
||||
@app.get(PATH_HEALTH)
|
||||
async def health() -> JSONResponse:
|
||||
return await self._health_response()
|
||||
|
||||
@app.get(PATH_ACTION_START)
|
||||
@app.post(PATH_ACTION_START)
|
||||
async def action_start() -> JSONResponse:
|
||||
return await self._action_response("start", self._on_start)
|
||||
|
||||
@app.get(PATH_ACTION_STOP)
|
||||
@app.post(PATH_ACTION_STOP)
|
||||
async def action_stop() -> JSONResponse:
|
||||
return await self._action_response("stop", self._on_stop)
|
||||
|
||||
return app
|
||||
|
||||
async def _health_response(self) -> JSONResponse:
|
||||
"""Сформировать HTTP-ответ из колбэка здоровья приложения."""
|
||||
try:
|
||||
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
|
||||
status = payload.get("status", "unhealthy")
|
||||
status_code = 200 if status == "ok" else 503
|
||||
return JSONResponse(content=payload, status_code=status_code)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
return JSONResponse(
|
||||
content={"status": "unhealthy", "detail": str(exc)},
|
||||
status_code=503,
|
||||
)
|
||||
|
||||
async def _action_response(
|
||||
self,
|
||||
action: str,
|
||||
callback: Optional[Callable[[], Awaitable[str]]],
|
||||
) -> JSONResponse:
|
||||
"""Сформировать HTTP-ответ для колбэка действия start/stop."""
|
||||
if callback is None:
|
||||
return JSONResponse(
|
||||
content={"status": "error", "detail": f"{action} handler is not configured"},
|
||||
status_code=404,
|
||||
)
|
||||
try:
|
||||
detail = await callback()
|
||||
if not detail:
|
||||
detail = f"{action} action accepted"
|
||||
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
return JSONResponse(
|
||||
content={"status": "error", "detail": str(exc)},
|
||||
status_code=500,
|
||||
)
|
||||
|
||||
def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]:
|
||||
"""Для тестов: вернуть (status_code, payload) как раньше."""
|
||||
async def _run() -> tuple[int, HealthPayload]:
|
||||
response = await self._health_response()
|
||||
body: Any = response.body
|
||||
if isinstance(body, bytes):
|
||||
body = json.loads(body.decode("utf-8"))
|
||||
return response.status_code, body
|
||||
return _run()
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Начать приём запросов к API здоровья и действий, если ещё не запущен."""
|
||||
if self._serve_task is not None:
|
||||
return
|
||||
config = Config(
|
||||
app=self._app,
|
||||
host=self._host,
|
||||
port=self._port,
|
||||
log_level="warning",
|
||||
)
|
||||
self._uvicorn_server = Server(config)
|
||||
self._serve_task = asyncio.create_task(self._uvicorn_server.serve())
|
||||
await asyncio.sleep(0.05)
|
||||
if self._uvicorn_server.servers:
|
||||
sock = self._uvicorn_server.servers[0].sockets[0]
|
||||
self._bound_port = sock.getsockname()[1]
|
||||
else:
|
||||
self._bound_port = self._port
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Остановить management-сервер и освободить сокет."""
|
||||
if self._uvicorn_server is None or self._serve_task is None:
|
||||
return
|
||||
self._uvicorn_server.should_exit = True
|
||||
await self._serve_task
|
||||
self._uvicorn_server = None
|
||||
self._serve_task = None
|
||||
self._bound_port = None
|
||||
|
||||
@property
|
||||
def port(self) -> int:
|
||||
"""Порт, на котором слушает сервер (после start); при port=0 — фактически выданный ОС."""
|
||||
return self._bound_port if self._bound_port is not None else self._port
|
||||
|
||||
|
||||
# Backward-compatible alias.
|
||||
HealthServer = ManagementServer
|
||||
42
src/config_manager/v2/types.py
Normal file
42
src/config_manager/v2/types.py
Normal file
@@ -0,0 +1,42 @@
|
||||
"""Общие типы V2: состояние здоровья, жизненного цикла и настройки management-сервера.
|
||||
|
||||
Используются в core, management и control для единообразных контрактов."""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Literal, TypedDict
|
||||
|
||||
|
||||
HealthState = Literal["ok", "degraded", "unhealthy"]
|
||||
|
||||
|
||||
class HealthPayload(TypedDict, total=False):
|
||||
status: HealthState
|
||||
detail: str
|
||||
state: str
|
||||
"""Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped)."""
|
||||
|
||||
|
||||
class LifecycleState(str, Enum):
|
||||
IDLE = "idle"
|
||||
STARTING = "starting"
|
||||
RUNNING = "running"
|
||||
STOPPING = "stopping"
|
||||
STOPPED = "stopped"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ManagementServerSettings:
|
||||
"""Настройки management HTTP-сервера и healthcheck (один объект на оба)."""
|
||||
enabled: bool = False
|
||||
host: str = "0.0.0.0"
|
||||
port: int = 8000
|
||||
timeout: float = 3.0
|
||||
"""Таймаут запроса health (секунды)."""
|
||||
health_timeout: float = 30.0
|
||||
"""Секунды без успешного execute(), после которых health = unhealthy."""
|
||||
|
||||
|
||||
# Backward-compatible alias.
|
||||
HealthServerSettings = ManagementServerSettings
|
||||
35
src/test.py
35
src/test.py
@@ -1,35 +0,0 @@
|
||||
from basic_application import ConfigManager
|
||||
import logging
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
import os
|
||||
os.chdir(os.path.dirname(__file__))
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
class MyApp(ConfigManager):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.iter = 0
|
||||
|
||||
|
||||
def execute(self) -> None:
|
||||
logger.info(f"current iteration {self.iter}")
|
||||
self.iter += 1
|
||||
|
||||
|
||||
async def main():
|
||||
app = MyApp("config.yaml")
|
||||
app.start()
|
||||
logger.info("App started")
|
||||
await asyncio.sleep(20)
|
||||
await app.stop()
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
131
tests/test.py
131
tests/test.py
@@ -1,131 +0,0 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, mock_open, AsyncMock
|
||||
import asyncio
|
||||
import logging
|
||||
import io
|
||||
import json
|
||||
import yaml
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))
|
||||
|
||||
from basic_application.basic_application import ConfigManager
|
||||
|
||||
|
||||
class TestConfigManager(unittest.IsolatedAsyncioTestCase):
|
||||
def setUp(self):
|
||||
self.json_data = json.dumps({
|
||||
"work_interval": 1,
|
||||
"update_interval": 1,
|
||||
"logging": {
|
||||
"version": 1,
|
||||
"handlers": {"console": {"class": "logging.StreamHandler", "level": "DEBUG"}},
|
||||
"root": {"handlers": ["console"], "level": "DEBUG"}
|
||||
},
|
||||
"some_key": "some_value"
|
||||
})
|
||||
self.yaml_data = """
|
||||
work_interval: 1
|
||||
update_interval: 1
|
||||
logging:
|
||||
version: 1
|
||||
handlers:
|
||||
console:
|
||||
class: logging.StreamHandler
|
||||
level: DEBUG
|
||||
root:
|
||||
handlers: [console]
|
||||
level: DEBUG
|
||||
some_key: some_value
|
||||
"""
|
||||
|
||||
@patch("builtins.open", new_callable=mock_open, read_data="")
|
||||
async def test_read_file_async_json(self, mock_file):
|
||||
mock_file.return_value.read = lambda: self.json_data
|
||||
cm = ConfigManager("config.json")
|
||||
content = await cm._read_file_async()
|
||||
self.assertEqual(content, self.json_data)
|
||||
|
||||
@patch("builtins.open", new_callable=mock_open, read_data="")
|
||||
async def test_read_file_async_yaml(self, mock_file):
|
||||
mock_file.return_value.read = lambda: self.yaml_data
|
||||
cm = ConfigManager("config.yaml")
|
||||
content = await cm._read_file_async()
|
||||
self.assertEqual(content, self.yaml_data)
|
||||
|
||||
def test_parse_json(self):
|
||||
cm = ConfigManager("config.json")
|
||||
parsed = cm._parse_config(self.json_data)
|
||||
self.assertIsInstance(parsed, dict)
|
||||
self.assertEqual(parsed["some_key"], "some_value")
|
||||
|
||||
def test_parse_yaml(self):
|
||||
cm = ConfigManager("config.yaml")
|
||||
parsed = cm._parse_config(self.yaml_data)
|
||||
self.assertIsInstance(parsed, dict)
|
||||
self.assertEqual(parsed["some_key"], "some_value")
|
||||
|
||||
@patch("basic_application.basic_application.logging.config.dictConfig")
|
||||
def test_apply_logging_config(self, mock_dict_config):
|
||||
cm = ConfigManager("config.json")
|
||||
cm._apply_logging_config({"logging": {"version": 1}})
|
||||
mock_dict_config.assert_called_once()
|
||||
|
||||
async def test_update_config_changes_config_and_intervals(self):
|
||||
# Мокаем чтение файла
|
||||
m = mock_open(read_data=self.json_data)
|
||||
with patch("builtins.open", m):
|
||||
cm = ConfigManager("config.json")
|
||||
|
||||
# Проверяем исходные интервалы
|
||||
self.assertEqual(cm.update_interval, cm.DEFAULT_UPDATE_INTERVAL)
|
||||
self.assertEqual(cm.work_interval, cm.DEFAULT_WORK_INTERVAL)
|
||||
|
||||
await cm._update_config()
|
||||
|
||||
# После обновления данные заполнены
|
||||
self.assertIsInstance(cm.config, dict)
|
||||
self.assertEqual(cm.update_interval, 1.0)
|
||||
self.assertEqual(cm.work_interval, 1.0)
|
||||
|
||||
async def test_execute_called_in_worker_loop(self):
|
||||
called = False
|
||||
|
||||
class TestCM(ConfigManager):
|
||||
def execute(self2):
|
||||
nonlocal called
|
||||
called = True
|
||||
|
||||
cm = TestCM("config.json")
|
||||
|
||||
async def stop_after_delay():
|
||||
await asyncio.sleep(0.1)
|
||||
cm.stop()
|
||||
|
||||
# Запускаем worker_loop и через 0.1 сек останавливаем
|
||||
await asyncio.gather(cm._worker_loop(), stop_after_delay())
|
||||
|
||||
self.assertTrue(called)
|
||||
|
||||
async def test_periodic_update_loop_runs(self):
|
||||
count = 0
|
||||
|
||||
class TestCM(ConfigManager):
|
||||
async def _update_config(self2):
|
||||
nonlocal count
|
||||
count += 1
|
||||
if count >= 2:
|
||||
self2.stop()
|
||||
|
||||
cm = TestCM("config.json")
|
||||
|
||||
await cm._periodic_update_loop()
|
||||
|
||||
self.assertGreaterEqual(count, 2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.WARNING) # отключаем логи во время тестов
|
||||
unittest.main()
|
||||
54
tests/test_app.py
Normal file
54
tests/test_app.py
Normal file
@@ -0,0 +1,54 @@
|
||||
# import os
|
||||
# os.chdir(os.path.dirname(__file__))
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
from config_manager import ConfigManager
|
||||
from config_manager.v1.log_manager import LogManager
|
||||
from config_manager.v2 import ManagementServerSettings
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
# Таймаут health: без успешного execute() дольше этого времени — unhealthy.
|
||||
HEALTH_TIMEOUT = 3.0
|
||||
|
||||
|
||||
class MyApp(ConfigManager):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.iter = 0
|
||||
|
||||
def execute(self) -> None:
|
||||
"""Успешный прогон сбрасывает таймер health (обновляет время последнего успеха)."""
|
||||
logger.info("current iteration %s", self.iter)
|
||||
self.iter += 1
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
log_manager = LogManager()
|
||||
# Один объект: и HTTP management-сервер (enabled, port), и health (health_timeout).
|
||||
management_settings = ManagementServerSettings(
|
||||
enabled=True,
|
||||
port=8000,
|
||||
health_timeout=HEALTH_TIMEOUT,
|
||||
)
|
||||
config_path = Path(__file__).parent / "config.yaml"
|
||||
app = MyApp(
|
||||
str(config_path),
|
||||
log_manager=log_manager,
|
||||
management_settings=management_settings,
|
||||
)
|
||||
logger.info("App starting (health_timeout=%s)", HEALTH_TIMEOUT)
|
||||
# Менеджер запускаем в фоне (start() не возвращает управление до stop).
|
||||
asyncio.create_task(app.start())
|
||||
|
||||
logger.info("App running; Ctrl+C to stop")
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
asyncio.run(main())
|
||||
33
tests/v2/test_config_reload_fallback.py
Normal file
33
tests/v2/test_config_reload_fallback.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import asyncio
|
||||
|
||||
from config_manager.v2 import ConfigManagerV2
|
||||
|
||||
|
||||
class ReloadApp(ConfigManagerV2):
|
||||
def execute(self) -> None:
|
||||
return
|
||||
|
||||
|
||||
def test_invalid_config_keeps_last_valid(tmp_path):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("work_interval: 0.2\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||
|
||||
app = ReloadApp(str(cfg))
|
||||
runner = asyncio.create_task(app.start())
|
||||
|
||||
await asyncio.sleep(0.12)
|
||||
assert app.work_interval == 0.2
|
||||
assert app.update_interval == 0.05
|
||||
|
||||
cfg.write_text("work_interval: [broken\n", encoding="utf-8")
|
||||
await asyncio.sleep(0.15)
|
||||
|
||||
assert app.work_interval == 0.2
|
||||
assert app.update_interval == 0.05
|
||||
assert isinstance(app.config, dict)
|
||||
|
||||
await app.stop()
|
||||
await runner
|
||||
|
||||
asyncio.run(scenario())
|
||||
30
tests/v2/test_contract_v2.py
Normal file
30
tests/v2/test_contract_v2.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import asyncio
|
||||
|
||||
from config_manager.v2 import ConfigManagerV2
|
||||
|
||||
|
||||
class DemoApp(ConfigManagerV2):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.calls = 0
|
||||
|
||||
def execute(self) -> None:
|
||||
self.calls += 1
|
||||
|
||||
|
||||
def test_execute_loop_runs(tmp_path):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||
|
||||
app = DemoApp(str(cfg))
|
||||
runner = asyncio.create_task(app.start())
|
||||
|
||||
await asyncio.sleep(0.18)
|
||||
await app.stop()
|
||||
await runner
|
||||
|
||||
assert app.calls >= 2
|
||||
assert isinstance(app.config, dict)
|
||||
|
||||
asyncio.run(scenario())
|
||||
54
tests/v2/test_control_channel.py
Normal file
54
tests/v2/test_control_channel.py
Normal file
@@ -0,0 +1,54 @@
|
||||
import asyncio
|
||||
|
||||
from config_manager.v2 import ConfigManagerV2
|
||||
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
|
||||
|
||||
|
||||
class DummyControlChannel(ControlChannel):
|
||||
def __init__(self):
|
||||
self.on_start: StartHandler | None = None
|
||||
self.on_stop: StopHandler | None = None
|
||||
self.on_status: StatusHandler | None = None
|
||||
self.started = False
|
||||
self.stopped = False
|
||||
|
||||
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||
self.on_start = on_start
|
||||
self.on_stop = on_stop
|
||||
self.on_status = on_status
|
||||
self.started = True
|
||||
|
||||
async def stop(self) -> None:
|
||||
self.stopped = True
|
||||
|
||||
|
||||
class ControlledApp(ConfigManagerV2):
|
||||
def execute(self) -> None:
|
||||
return
|
||||
|
||||
|
||||
def test_control_channel_can_stop_manager(tmp_path):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||
|
||||
channel = DummyControlChannel()
|
||||
app = ControlledApp(str(cfg), control_channel=channel)
|
||||
|
||||
runner = asyncio.create_task(app.start())
|
||||
await asyncio.sleep(0.12)
|
||||
|
||||
assert channel.started is True
|
||||
assert channel.on_status is not None
|
||||
assert channel.on_stop is not None
|
||||
|
||||
status_text = await channel.on_status()
|
||||
assert "state=running" in status_text
|
||||
|
||||
stop_text = await channel.on_stop()
|
||||
assert "stop signal accepted" in stop_text
|
||||
|
||||
await runner
|
||||
assert channel.stopped is True
|
||||
|
||||
asyncio.run(scenario())
|
||||
100
tests/v2/test_health_endpoint.py
Normal file
100
tests/v2/test_health_endpoint.py
Normal file
@@ -0,0 +1,100 @@
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
from config_manager.v2.management import ManagementServer
|
||||
|
||||
|
||||
def test_health_mapping_ok_to_200():
|
||||
async def provider():
|
||||
return {"status": "ok"}
|
||||
|
||||
async def scenario() -> None:
|
||||
server = ManagementServer(
|
||||
host="127.0.0.1",
|
||||
port=8000,
|
||||
timeout=0.2,
|
||||
health_provider=provider,
|
||||
)
|
||||
code, payload = await server._build_health_response()
|
||||
assert code == 200
|
||||
assert payload["status"] == "ok"
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_health_mapping_unhealthy_to_503():
|
||||
async def provider():
|
||||
return {"status": "unhealthy", "detail": "worker failed"}
|
||||
|
||||
async def scenario() -> None:
|
||||
server = ManagementServer(
|
||||
host="127.0.0.1",
|
||||
port=8000,
|
||||
timeout=0.2,
|
||||
health_provider=provider,
|
||||
)
|
||||
code, payload = await server._build_health_response()
|
||||
assert code == 503
|
||||
assert payload["status"] == "unhealthy"
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
|
||||
def test_action_routes_call_callbacks():
|
||||
events: list[str] = []
|
||||
|
||||
async def provider():
|
||||
return {"status": "ok"}
|
||||
|
||||
async def on_start() -> str:
|
||||
events.append("start")
|
||||
return "start accepted"
|
||||
|
||||
async def on_stop() -> str:
|
||||
events.append("stop")
|
||||
return "stop accepted"
|
||||
|
||||
async def request(port: int, path: str) -> tuple[int, dict[str, str]]:
|
||||
reader, writer = await asyncio.open_connection("127.0.0.1", port)
|
||||
writer.write(
|
||||
f"GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n".encode("utf-8")
|
||||
)
|
||||
await writer.drain()
|
||||
raw = await reader.read()
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
|
||||
header, body = raw.split(b"\r\n\r\n", maxsplit=1)
|
||||
status_code = int(header.split(b" ")[1])
|
||||
payload = json.loads(body.decode("utf-8"))
|
||||
return status_code, payload
|
||||
|
||||
async def scenario() -> None:
|
||||
server = ManagementServer(
|
||||
host="127.0.0.1",
|
||||
port=0,
|
||||
timeout=0.2,
|
||||
health_provider=provider,
|
||||
on_start=on_start,
|
||||
on_stop=on_stop,
|
||||
)
|
||||
await server.start()
|
||||
try:
|
||||
port = server.port
|
||||
assert port > 0
|
||||
|
||||
start_code, start_payload = await request(port, "/actions/start")
|
||||
stop_code, stop_payload = await request(port, "/actions/stop")
|
||||
finally:
|
||||
await server.stop()
|
||||
|
||||
assert start_code == 200
|
||||
assert start_payload["status"] == "ok"
|
||||
assert start_payload["detail"] == "start accepted"
|
||||
|
||||
assert stop_code == 200
|
||||
assert stop_payload["status"] == "ok"
|
||||
assert stop_payload["detail"] == "stop accepted"
|
||||
assert events == ["start", "stop"]
|
||||
|
||||
asyncio.run(scenario())
|
||||
43
tests/v2/test_stop_graceful.py
Normal file
43
tests/v2/test_stop_graceful.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
|
||||
from config_manager.v2 import ConfigManagerV2
|
||||
|
||||
|
||||
class BlockingApp(ConfigManagerV2):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.started_event = threading.Event()
|
||||
self.active_event = threading.Event()
|
||||
self.calls = 0
|
||||
|
||||
def execute(self) -> None:
|
||||
self.calls += 1
|
||||
self.active_event.set()
|
||||
self.started_event.set()
|
||||
time.sleep(0.2)
|
||||
self.active_event.clear()
|
||||
|
||||
|
||||
def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path):
|
||||
async def scenario() -> None:
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||
|
||||
app = BlockingApp(str(cfg))
|
||||
runner = asyncio.create_task(app.start())
|
||||
|
||||
started = await asyncio.to_thread(app.started_event.wait, 1.0)
|
||||
assert started is True
|
||||
|
||||
await app.stop()
|
||||
await runner
|
||||
|
||||
assert app.active_event.is_set() is False
|
||||
assert app.calls == 1
|
||||
|
||||
await asyncio.sleep(0.15)
|
||||
assert app.calls == 1
|
||||
|
||||
asyncio.run(scenario())
|
||||
Reference in New Issue
Block a user