9 Commits

Author SHA1 Message Date
2d6179d366 Рефакторинг и добавил апишку для управления 2026-02-21 15:14:34 +03:00
d888ae7acb Первая итерация рефакторинга 2026-02-21 00:20:07 +03:00
1d71ce406f Merge pull request 'Version update' (#2) from feature/healthcheck-requirements into master
Reviewed-on: #2
2026-02-19 19:51:54 +00:00
80dd69c5ec Version update 2026-02-19 22:50:51 +03:00
8da6df0b2a Merge pull request 'feature/healthcheck-requirements' (#1) from feature/healthcheck-requirements into master
Reviewed-on: #1
2026-02-19 19:49:27 +00:00
7eb3476b96 ConfigManager V2 2026-02-19 22:45:06 +03:00
da8ed4fa2b docs: add healthcheck requirements and README_DEPLOY
- Add docs/HEALTHCHECK_REQUIREMENTS.md with full spec (purpose, deploy.sh
  behaviour, endpoint contract, get_health_status(), app requirements,
  infrastructure)
- Add README_DEPLOY.md with healthcheck section, link to requirements
  and HEALTHCHECK_* env vars

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-18 23:11:53 +03:00
8f22fcf6af method start no now blocks main cycle 2025-11-01 21:35:25 +03:00
311870fd73 Changed the algorithm of the start method 2025-11-01 21:00:14 +03:00
25 changed files with 1254 additions and 24 deletions

29
README_DEPLOY.md Normal file
View File

@@ -0,0 +1,29 @@
# Деплой / Deploy
Краткое описание процесса деплоя приложений на базе config_manager и используемых скриптов.
A short description of the deploy process for applications based on config_manager and the scripts used.
---
## Healthcheck
После поднятия контейнеров (`docker compose up -d`) скрипт деплоя может ожидать успешного ответа по URL проверки здоровья приложения. При таймауте выполняется откат и выход с ошибкой.
After bringing up containers (`docker compose up -d`), the deploy script may wait for a successful response at the application health-check URL. On timeout, rollback is performed and the script exits with an error.
**Полная спецификация:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md).
**Full specification:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md).
### Переменные окружения / Environment variables
| Переменная | Назначение | Пример/дефолт |
| ----------------------- | ---------------------------------------- | ------------------------------- |
| `HEALTHCHECK_URL` | URL для проверки здоровья | `http://127.0.0.1:8000/health` |
| `HEALTHCHECK_TIMEOUT` | Максимальное время ожидания (секунды) | `120` |
| `HEALTHCHECK_INTERVAL` | Интервал между попытками (секунды) | `5` |
Если задана `HEALTHCHECK_URL`, деплой после поднятия контейнеров вызывает этот URL (например, через `curl -fsS --max-time 5`); успех — HTTP 2xx, иначе повтор до истечения `HEALTHCHECK_TIMEOUT`.
If `HEALTHCHECK_URL` is set, deploy calls this URL after bringing up containers (e.g. via `curl -fsS --max-time 5`); success is HTTP 2xx, otherwise retries until `HEALTHCHECK_TIMEOUT` expires.

View File

@@ -0,0 +1,95 @@
# Требования к healthcheck / Healthcheck Requirements
Единая спецификация для реализации healthcheck в config_manager и в приложениях (в т.ч. MailOrderBot).
A unified specification for implementing healthcheck in config_manager and in applications (including MailOrderBot).
---
## 1. Назначение / Purpose
- Healthcheck используется скриптом деплоя (`deploy.sh`): после `docker compose up -d` деплой ждёт успешного ответа по `HEALTHCHECK_URL`; при таймауте — откат и выход с ошибкой.
- The healthcheck is used by the deploy script (`deploy.sh`): after `docker compose up -d`, the deploy waits for a successful response at `HEALTHCHECK_URL`; on timeout — rollback and exit with error.
- Эндпоинт должен отражать **реальное состояние приложения**, а не только факт работы HTTP-сервера (иначе деплой может считать успешным запуск «зависшего» или упавшего воркера).
- The endpoint must reflect the **actual state of the application**, not just that the HTTP server is running (otherwise deploy may consider successful a «hung» or crashed worker).
---
## 2. Поведение deploy.sh / deploy.sh behaviour
(Логика уже может быть реализована в коде deploy-скрипта.)
- Если задана переменная `HEALTHCHECK_URL` — после поднятия контейнеров вызывается `wait_for_healthcheck`: цикл с интервалом `HEALTHCHECK_INTERVAL` (по умолчанию 5 с), пока не истечёт `HEALTHCHECK_TIMEOUT` (по умолчанию 120 с).
- If `HEALTHCHECK_URL` is set — after bringing up containers, `wait_for_healthcheck` is called: a loop with interval `HEALTHCHECK_INTERVAL` (default 5 s) until `HEALTHCHECK_TIMEOUT` (default 120 s) expires.
- Проверка: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Флаг `-f`: любой HTTP-код 4xx/5xx считается ошибкой (повтор до таймаута).
- Check: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Flag `-f`: any HTTP 4xx/5xx is treated as failure (retry until timeout).
- **Успех:** HTTP 2xx. **Неуспех:** не 2xx или таймаут curl/соединения → деплой падает с откатом.
- **Success:** HTTP 2xx. **Failure:** non-2xx or curl/connection timeout → deploy fails with rollback.
### Переменные окружения / Environment variables
| Переменная / Variable | Назначение / Purpose | Пример/дефолт / Example/default |
| ------------------------- | --------------------------------- | ----------------------------------- |
| `HEALTHCHECK_URL` | URL для проверки / Check URL | `http://127.0.0.1:8000/health` |
| `HEALTHCHECK_TIMEOUT` | Макс. время ожидания (сек) / Max wait (s) | `120` |
| `HEALTHCHECK_INTERVAL` | Интервал между попытками (сек) / Interval between attempts (s) | `5` |
---
## 3. Контракт эндпоинта / Endpoint contract
- **Метод и путь / Method and path:** `GET /health` (или иной путь по соглашению; один для всех приложений на config_manager).
- **Method and path:** `GET /health` (or another agreed path; one for all applications using config_manager).
- **Успех (приложение в порядке) / Success (application healthy):** HTTP **200**, опционально тело JSON: `{"status": "ok"}`.
- **Success (application healthy):** HTTP **200**, optional JSON body: `{"status": "ok"}`.
- **Приложение не в порядке / Application not healthy:** HTTP **503**, опционально тело: `{"status": "unhealthy"|"degraded", "detail": "причина"}`.
- **Application not healthy:** HTTP **503**, optional body: `{"status": "unhealthy"|"degraded", "detail": "reason"}`.
- Ответ должен приходить в разумное время (рекомендуемый таймаут вызова логики проверки в config_manager — 25 с), иначе deploy через `curl --max-time 5` получит таймаут и будет повторять запросы.
- The response must arrive within a reasonable time (recommended timeout for the check logic in config_manager — 25 s); otherwise deploy will get a timeout via `curl --max-time 5` and will retry.
---
## 4. Обратная связь от приложения (config_manager) / Application feedback (config_manager)
- Эндпоинт реализуется в **config_manager** (опционально, при включённой опции).
- The endpoint is implemented in **config_manager** (optional, when the option is enabled).
- При обработке `GET /health` config_manager **не решает сам** «здорово ли приложение», а вызывает метод приложения, например: `app.get_health_status()`.
- When handling `GET /health`, config_manager does **not** decide by itself whether the application is healthy; it calls the application method, e.g. `app.get_health_status()`.
- **Контракт метода / Method contract** (приложение переопределяет в наследнике):
- **Method contract** (application overrides in subclass):
- Возвращает `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (поле `detail` опционально).
- Returns `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (`detail` optional).
- При исключении или превышении таймаута вызова — считать состояние unhealthy и отдавать 503.
- On exception or call timeout — treat as unhealthy and return 503.
Таким образом, основное приложение «даёт обратную связь» через реализацию `get_health_status()` (флаги после сбоев, проверка БД, heartbeat и т.д.).
Thus, the main application provides feedback via `get_health_status()` (flags after failures, DB check, heartbeat, etc.).
---
## 5. Требования к приложению (например, MailOrderBot) / Application requirements (e.g. MailOrderBot)
- Переопределить `get_health_status()` и возвращать:
- Override `get_health_status()` and return:
- `{"status": "ok"}` при нормальной работе;
- `{"status": "ok"}` when running normally;
- `{"status": "unhealthy", "detail": "..."}` при критичном сбое (например, последний `execute()` упал, зависимость недоступна);
- `{"status": "unhealthy", "detail": "..."}` on critical failure (e.g. last `execute()` failed, dependency unavailable);
- при желании — `{"status": "degraded", "detail": "..."}` для нефатальной деградации (в обоих случаях эндпоинт отдаёт 503 для совместимости с `curl -f`).
- optionally — `{"status": "degraded", "detail": "..."}` for non-fatal degradation (in both cases the endpoint returns 503 for compatibility with `curl -f`).
---
## 6. Инфраструктура / Infrastructure
- Для работы healthcheck приложение должно поднимать HTTP-сервер (в config_manager при включённой опции) и пробрасывать порт в `docker-compose` (например `8000:8000`), чтобы `deploy.sh` на хосте мог обращаться по `HEALTHCHECK_URL` (например `http://127.0.0.1:8000/health`).
- For healthcheck to work, the application must run an HTTP server (in config_manager when the option is enabled) and expose the port in `docker-compose` (e.g. `8000:8000`), so that `deploy.sh` on the host can call `HEALTHCHECK_URL` (e.g. `http://127.0.0.1:8000/health`).

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "config_manager"
version = "1.2.0"
version = "2.0.0"
description = "Config manager for building applications"
authors = [
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
@@ -13,6 +13,8 @@ readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"PyYAML>=6.0",
"fastapi>=0.100.0",
"uvicorn[standard]>=0.22.0",
]
[project.urls]

View File

@@ -1,2 +1,3 @@
from .cfg_manager import ConfigManager
from .log_manager import LogManager
from .v2 import ConfigManagerV2 as ConfigManager
from .v1.cfg_manager import ConfigManager as LegacyConfigManager
from .v1.log_manager import LogManager

View File

@@ -104,21 +104,21 @@ class ConfigManager:
finally:
self.logger.info("ConfigManager stopped")
def start(self) -> None:
"""Запускает менеджер конфигурации в текущем event loop"""
async def start(self) -> None:
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManager is already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.error("start() must be called from within an async context")
raise
self._task = self._loop.create_task(self._run())
self.logger.info("ConfigManager task created")
self.logger.info("ConfigManager starting and awaiting _run()")
await self._run()
async def stop(self) -> None:
"""Останавливает менеджер конфигурации и ожидает завершения"""
if self._task is None:

View File

@@ -0,0 +1,7 @@
"""Публичный API V2: точка входа в менеджер конфигурации и настройки management-сервера.
Экспортирует ConfigManagerV2 и типы настроек для использования приложениями."""
from .core import ConfigManagerV2
from .types import HealthServerSettings, ManagementServerSettings
__all__ = ["ConfigManagerV2", "ManagementServerSettings", "HealthServerSettings"]

View File

@@ -0,0 +1,7 @@
"""Каналы внешнего управления: абстракция и реализация (например, Telegram).
Позволяет запускать, останавливать и запрашивать статус менеджера через ботов и другие интерфейсы."""
from .base import ControlChannel
from .telegram import TelegramControlChannel
__all__ = ["ControlChannel", "TelegramControlChannel"]

View File

@@ -0,0 +1,24 @@
"""Базовый абстрактный канал управления и типы обработчиков команд.
Определяет контракт: старт/стоп канала и привязка обработчиков start/stop/status."""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
StartHandler = Callable[[], Awaitable[str]]
StopHandler = Callable[[], Awaitable[str]]
StatusHandler = Callable[[], Awaitable[str]]
class ControlChannel(ABC):
@abstractmethod
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
"""Запустить канал и привязать обработчики команд."""
raise NotImplementedError
@abstractmethod
async def stop(self) -> None:
"""Остановить канал и освободить его ресурсы."""
raise NotImplementedError

View File

@@ -0,0 +1,114 @@
"""Реализация канала управления через Telegram Bot API (long polling).
Принимает команды /start, /stop, /status в указанном чате и вызывает привязанные обработчики."""
from __future__ import annotations
import asyncio
import json
import logging
import urllib.parse
import urllib.request
from typing import Optional
from .base import ControlChannel, StartHandler, StatusHandler, StopHandler
class TelegramControlChannel(ControlChannel):
def __init__(
self,
token: str,
chat_id: int,
poll_interval: float = 2.0,
logger: Optional[logging.Logger] = None,
):
"""Инициализация канала опроса Telegram с настройками бота и чата."""
self._token = token
self._chat_id = chat_id
self._poll_interval = poll_interval
self._offset: Optional[int] = None
self._task: Optional[asyncio.Task] = None
self._stop_event = asyncio.Event()
self._on_start: Optional[StartHandler] = None
self._on_stop: Optional[StopHandler] = None
self._on_status: Optional[StatusHandler] = None
self._logger = logger or logging.getLogger(__name__)
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
"""Запустить опрос обновлений Telegram и зарегистрировать колбэки команд."""
if self._task is not None and not self._task.done():
return
self._on_start = on_start
self._on_stop = on_stop
self._on_status = on_status
self._stop_event.clear()
self._task = asyncio.create_task(self._poll_loop())
async def stop(self) -> None:
"""Остановить цикл опроса и дождаться завершения задачи."""
self._stop_event.set()
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
async def _poll_loop(self) -> None:
"""Непрерывно получать обновления и вызывать поддерживаемые команды."""
while not self._stop_event.is_set():
try:
updates = await asyncio.to_thread(self._fetch_updates)
for update in updates:
await self._process_update(update)
except Exception as exc: # noqa: BLE001
self._logger.warning("Telegram polling error: %s", exc)
try:
await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1))
except asyncio.TimeoutError:
continue
def _fetch_updates(self) -> list[dict]:
"""Запросить новые обновления Telegram с учётом последнего offset."""
params = {"timeout": 0}
if self._offset is not None:
params["offset"] = self._offset
query = urllib.parse.urlencode(params)
url = f"https://api.telegram.org/bot{self._token}/getUpdates?{query}"
with urllib.request.urlopen(url, timeout=10) as response:
payload = json.loads(response.read().decode("utf-8"))
result = payload.get("result", [])
if result:
self._offset = max(item["update_id"] for item in result) + 1
return result
async def _process_update(self, update: dict) -> None:
"""Обработать одно обновление Telegram и выполнить соответствующую команду."""
message = update.get("message") or {}
text = (message.get("text") or "").strip().lower()
chat = message.get("chat") or {}
chat_id = chat.get("id")
if chat_id != self._chat_id:
return
if text in {"/start", "/run"} and self._on_start is not None:
reply = await self._on_start()
elif text in {"/stop", "/halt"} and self._on_stop is not None:
reply = await self._on_stop()
elif text in {"/status", "/health"} and self._on_status is not None:
reply = await self._on_status()
else:
return
await asyncio.to_thread(self._send_message, reply)
def _send_message(self, text: str) -> None:
"""Отправить текстовый ответ в настроенный чат Telegram."""
encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text})
url = f"https://api.telegram.org/bot{self._token}/sendMessage"
req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST")
with urllib.request.urlopen(req, timeout=10):
return

View File

@@ -0,0 +1,8 @@
"""Ядро V2: жизненный цикл менеджера, загрузка конфига и цикл воркера.
Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute()."""
from .config_loader import ConfigLoader
from .manager import ConfigManagerV2
from .scheduler import WorkerLoop
__all__ = ["ConfigLoader", "ConfigManagerV2", "WorkerLoop"]

View File

@@ -0,0 +1,70 @@
"""Загрузчик конфигурации из файла (YAML/JSON) с обнаружением изменений по хешу.
Читает файл синхронно и асинхронно, парсит по расширению и возвращает последний валидный конфиг при ошибках."""
from __future__ import annotations
import asyncio
import hashlib
import json
import os
from typing import Any, Optional
import yaml
class ConfigLoader:
def __init__(self, path: str):
"""Инициализация состояния загрузчика для указанного пути к файлу конфига."""
self.path = path
self.config: Any = None
self.last_valid_config: Any = None
self._last_seen_hash: Optional[str] = None
def _read_file_sync(self) -> str:
"""Синхронно прочитать сырой текст конфига с диска."""
with open(self.path, "r", encoding="utf-8") as fh:
return fh.read()
async def read_file_async(self) -> str:
"""Прочитать сырой текст конфига с диска в рабочем потоке."""
return await asyncio.to_thread(self._read_file_sync)
def parse_config(self, data: str) -> Any:
"""Распарсить текст конфига как YAML или JSON по расширению файла."""
extension = os.path.splitext(self.path)[1].lower()
if extension in (".yaml", ".yml"):
return yaml.safe_load(data)
return json.loads(data)
@staticmethod
def _calculate_hash(data: str) -> str:
"""Вычислить устойчивый хеш содержимого для обнаружения изменений."""
return hashlib.sha256(data.encode("utf-8")).hexdigest()
async def load_if_changed(self) -> tuple[bool, Any]:
"""Загрузить и распарсить конфиг только при изменении содержимого файла."""
raw_data = await self.read_file_async()
current_hash = self._calculate_hash(raw_data)
if current_hash == self._last_seen_hash:
return False, self.config
self._last_seen_hash = current_hash
parsed = self.parse_config(raw_data)
self.config = parsed
self.last_valid_config = parsed
return True, parsed
def extract_scheduler_intervals(
config: Any,
default_update: float,
default_work: float,
) -> tuple[float, float]:
"""Извлекает update_interval и work_interval из конфига (dict). Возвращает значения по умолчанию при ошибке."""
if not isinstance(config, dict):
return default_update, default_work
upd = config.get("update_interval")
wrk = config.get("work_interval")
u = float(upd) if isinstance(upd, (int, float)) and upd > 0 else default_update
w = float(wrk) if isinstance(wrk, (int, float)) and wrk > 0 else default_work
return u, w

View File

@@ -0,0 +1,233 @@
"""Главный класс менеджера V2: оркестрация жизненного цикла, конфига, API и каналов управления.
Запускает воркер и периодическое обновление конфига, поднимает management-сервер и control-канал при наличии настроек."""
from __future__ import annotations
import asyncio
import logging
import time
from typing import Any, Optional
from ...v1.log_manager import LogManager
from ..control.base import ControlChannel
from ..management import (
ControlChannelBridge,
HealthAggregator,
ManagementApiBridge,
ManagementServer,
)
from ..types import HealthPayload, LifecycleState, ManagementServerSettings
from .config_loader import ConfigLoader, extract_scheduler_intervals
from .scheduler import WorkerLoop
class ConfigManagerV2:
DEFAULT_UPDATE_INTERVAL = 5.0
DEFAULT_WORK_INTERVAL = 2.0
def __init__(
self,
path: str,
log_manager: Optional[LogManager] = None,
management_settings: Optional[ManagementServerSettings] = None,
control_channel: Optional[ControlChannel] = None,
):
"""Инициализация подсистем менеджера и состояния рантайма."""
self.path = path
self.config: Any = None
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
self.work_interval = self.DEFAULT_WORK_INTERVAL
self._loader = ConfigLoader(path)
self._log_manager = log_manager or LogManager()
self._control_channel = control_channel
self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._state = LifecycleState.IDLE
self._last_execute_error: Optional[str] = None
self._last_success_timestamp: Optional[float] = None
self._management_settings = management_settings or ManagementServerSettings(enabled=True)
self._health_timeout = self._management_settings.health_timeout
self._health_aggregator = HealthAggregator(
get_state=lambda: self._state,
get_last_error=lambda: self._last_execute_error,
get_last_success_timestamp=lambda: self._last_success_timestamp,
health_timeout=self._health_timeout,
get_app_health=self.get_health_status,
)
self._api_bridge = ManagementApiBridge(start_fn=self.start, stop_fn=self.stop)
self._control_bridge = ControlChannelBridge(
halt=self._halt,
get_state=lambda: self._state,
get_status=self._status_text,
)
self._management_server: Optional[ManagementServer] = None
if self._management_settings.enabled:
self._management_server = ManagementServer(
host=self._management_settings.host,
port=self._management_settings.port,
timeout=self._management_settings.timeout,
health_provider=self._health_aggregator.collect,
on_start=self._api_bridge.on_start,
on_stop=self._api_bridge.on_stop,
)
self.logger = logging.getLogger(__name__)
def _apply_config(self, new_config: Any) -> None:
"""Применить загруженный конфиг: интервалы и log_manager. Вызывается после load_if_changed."""
self.config = new_config
self.update_interval, self.work_interval = extract_scheduler_intervals(
new_config,
self.DEFAULT_UPDATE_INTERVAL,
self.DEFAULT_WORK_INTERVAL,
)
if isinstance(new_config, dict):
self._log_manager.apply_config(new_config)
async def _update_config(self) -> None:
"""Перезагрузить конфиг при изменении файла и применить к состоянию и log_manager."""
try:
changed, new_config = await self._loader.load_if_changed()
if not changed:
return
self._apply_config(new_config)
except Exception as exc: # noqa: BLE001
self.logger.error("Error reading/parsing config file: %s", exc)
if self._loader.last_valid_config is not None:
self._apply_config(self._loader.last_valid_config)
def execute(self) -> None:
"""Переопределить в подклассе для реализации одной единицы блокирующей работы."""
def get_health_status(self) -> HealthPayload:
"""Вернуть payload здоровья приложения для /health.
Варианты ответа по статусу:
- ``{"status": "ok"}`` — сервис в норме; GET /health → 200.
- ``{"status": "degraded", "detail": "..."}`` — работает с ограничениями; GET /health → 503.
- ``{"status": "unhealthy", "detail": "..."}`` — неработоспособен; GET /health → 503.
Поле ``detail`` опционально; для ``ok`` обычно не задаётся.
Переопределить в подклассе для своей логики здоровья."""
return {"status": "ok"}
def _on_execute_success(self) -> None:
"""Обновить время последнего успешного execute() и сбросить маркер ошибки."""
self._last_success_timestamp = time.monotonic()
self._last_execute_error = None
def _on_execute_error(self, exc: Exception) -> None:
"""Сохранить и залогировать детали ошибки выполнения для отчёта здоровья."""
self._last_execute_error = str(exc)
self.logger.error("Execution error: %s", exc)
async def _worker_loop(self) -> None:
"""Вызывать execute() циклически до запроса остановки."""
worker = WorkerLoop(
execute=self.execute,
get_interval=lambda: self.work_interval,
halt_event=self._halt,
on_error=self._on_execute_error,
on_success=self._on_execute_success,
)
await worker.run()
async def _periodic_update_loop(self) -> None:
"""Периодически проверять файл конфига на обновления до остановки."""
while not self._halt.is_set():
await self._update_config()
try:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError:
continue
async def _status_text(self) -> str:
"""Сформировать читаемый статус рантайма для каналов управления."""
health = await self._health_aggregator.collect()
detail = health.get("detail")
if detail:
return f"state={self._state.value}; health={health['status']}; detail={detail}"
return f"state={self._state.value}; health={health['status']}"
async def _start_control_channel(self) -> None:
"""Запустить настроенный канал управления с привязанными обработчиками команд."""
if self._control_channel is None:
return
await self._control_channel.start(
self._control_bridge.on_start,
self._control_bridge.on_stop,
self._control_bridge.on_status,
)
async def _stop_control_channel(self) -> None:
"""Остановить настроенный канал управления, если он активен."""
if self._control_channel is None:
return
await self._control_channel.stop()
async def _run(self) -> None:
"""Запустить жизненный цикл менеджера и координировать фоновые задачи."""
self._state = LifecycleState.STARTING
self._halt.clear()
await self._update_config()
if self._management_server is not None:
await self._management_server.start()
await self._start_control_channel()
self._state = LifecycleState.RUNNING
self.logger.info("ConfigManagerV2 started")
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
raise
finally:
self._state = LifecycleState.STOPPING
self._halt.set()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
# Management-сервер и control channel не останавливаем: API и канал управления остаются доступными.
self._state = LifecycleState.STOPPED
self._task = None
self.logger.info("ConfigManagerV2 stopped (API and control channel remain available)")
async def start(self) -> None:
"""Запустить циклы execute и конфига в фоне; возвращает управление сразу (ответ на /actions/start приходит без ожидания)."""
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManagerV2 is already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.error("start() must be called from within an async context")
raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
async def stop(self) -> None:
"""Запросить плавную остановку и дождаться завершения менеджера."""
if self._task is None:
self.logger.warning("ConfigManagerV2 is not running")
return
self._halt.set()
if asyncio.current_task() is self._task:
return
try:
await self._task
except asyncio.CancelledError:
pass

View File

@@ -0,0 +1,42 @@
"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке с паузой между итерациями.
Поддерживает halt-событие для остановки, колбэки on_success/on_error для учёта ошибок и здоровья."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from typing import Optional
class WorkerLoop:
def __init__(
self,
execute: Callable[[], None],
get_interval: Callable[[], float],
halt_event: asyncio.Event,
on_error: Optional[Callable[[Exception], None]] = None,
on_success: Optional[Callable[[], None]] = None,
):
"""Сохранить колбэки и примитивы синхронизации для выполнения воркера."""
self._execute = execute
self._get_interval = get_interval
self._halt_event = halt_event
self._on_error = on_error
self._on_success = on_success
async def run(self) -> None:
"""Вызывать execute циклически до запроса остановки."""
while not self._halt_event.is_set():
try:
await asyncio.to_thread(self._execute)
if self._on_success is not None:
self._on_success()
except Exception as exc: # noqa: BLE001
if self._on_error is not None:
self._on_error(exc)
timeout = max(self._get_interval(), 0.01)
try:
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
continue

View File

@@ -0,0 +1,14 @@
"""Management API: HTTP-сервер для /health и /actions/start|stop, адаптеры и сбор здоровья.
Объединяет сервер, мосты к жизненному циклу и агрегатор здоровья для единого контракта API."""
from .bridges import ControlChannelBridge, ManagementApiBridge
from .health_aggregator import HealthAggregator
from .management_server import HealthServer, ManagementServer
__all__ = [
"ControlChannelBridge",
"HealthAggregator",
"HealthServer",
"ManagementApiBridge",
"ManagementServer",
]

View File

@@ -0,0 +1,61 @@
"""Адаптеры, связывающие жизненный цикл менеджера с HTTP API и каналами управления.
ManagementApiBridge отдаёт start/stop в HTTP; ControlChannelBridge — start/stop/status в Telegram и др."""
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
from ..types import LifecycleState
class ManagementApiBridge:
"""Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop)."""
def __init__(
self,
start_fn: Callable[[], Awaitable[None]],
stop_fn: Callable[[], Awaitable[None]],
):
self._start_fn = start_fn
self._stop_fn = stop_fn
async def on_start(self) -> str:
"""Выполнить start и вернуть сообщение для HTTP-ответа."""
await self._start_fn()
return "start completed"
async def on_stop(self) -> str:
"""Выполнить stop и вернуть сообщение для HTTP-ответа."""
await self._stop_fn()
return "stop completed"
class ControlChannelBridge:
"""Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram)."""
def __init__(
self,
halt: asyncio.Event,
get_state: Callable[[], LifecycleState],
get_status: Callable[[], Awaitable[str]],
):
self._halt = halt
self._get_state = get_state
self._get_status = get_status
async def on_start(self) -> str:
"""Обработать внешний start: сбросить halt; идемпотентно при уже running."""
if self._get_state() == LifecycleState.RUNNING:
return "already running"
self._halt.clear()
return "start signal accepted"
async def on_stop(self) -> str:
"""Обработать внешний stop: установить halt."""
self._halt.set()
return "stop signal accepted"
async def on_status(self) -> str:
"""Вернуть текущий текст статуса."""
return await self._get_status()

View File

@@ -0,0 +1,53 @@
"""Собирает состояние жизненного цикла и здоровья в один ответ для /health.
Здоровье = был успешный execute() за последние health_timeout секунд; иначе unhealthy с деталью (ошибка или таймаут)."""
from __future__ import annotations
import time
from collections.abc import Callable
from ..types import HealthPayload, LifecycleState
class HealthAggregator:
"""Формирует ответ здоровья по времени последнего успешного execute() и таймауту."""
def __init__(
self,
get_state: Callable[[], LifecycleState],
get_last_error: Callable[[], str | None],
get_last_success_timestamp: Callable[[], float | None],
health_timeout: float,
get_app_health: Callable[[], HealthPayload],
):
self._get_state = get_state
self._get_last_error = get_last_error
self._get_last_success_timestamp = get_last_success_timestamp
self._health_timeout = health_timeout
self._get_app_health = get_app_health
async def collect(self) -> HealthPayload:
"""Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state."""
state = self._get_state()
state_value = state.value
# Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy.
if state != LifecycleState.RUNNING:
return {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value}
last_success = self._get_last_success_timestamp()
now = time.monotonic()
if last_success is None:
detail = self._get_last_error() or "no successful run yet"
return {"status": "unhealthy", "detail": detail, "state": state_value}
if (now - last_success) > self._health_timeout:
detail = self._get_last_error() or f"no successful run within {self._health_timeout}s"
return {"status": "unhealthy", "detail": detail, "state": state_value}
result = self._get_app_health()
status = result.get("status", "unhealthy")
if status != "ok":
return {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value}
return {**result, "state": state_value}

View File

@@ -0,0 +1,145 @@
"""Management HTTP API на FastAPI: эндпоинты /health, /actions/start, /actions/stop.
Единообразное описание маршрутов через декораторы FastAPI."""
from __future__ import annotations
import asyncio
import json
from collections.abc import Awaitable, Callable
from typing import Any, Optional
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from uvicorn import Config, Server
from ..types import HealthPayload
# Захардкоженные эндпоинты management API.
PATH_HEALTH = "/health"
PATH_ACTION_START = "/actions/start"
PATH_ACTION_STOP = "/actions/stop"
class ManagementServer:
def __init__(
self,
host: str,
port: int,
timeout: float,
health_provider: Callable[[], Awaitable[HealthPayload]],
on_start: Optional[Callable[[], Awaitable[str]]] = None,
on_stop: Optional[Callable[[], Awaitable[str]]] = None,
):
"""Настройка параметров и колбэков лёгкого HTTP management-сервера."""
self._host = host
self._port = port
self._timeout = timeout
self._health_provider = health_provider
self._on_start = on_start
self._on_stop = on_stop
self._uvicorn_server: Optional[Server] = None
self._serve_task: Optional[asyncio.Task[None]] = None
self._bound_port: Optional[int] = None
self._app = self._create_app()
def _create_app(self) -> FastAPI:
app = FastAPI(title="Config Manager Management API")
@app.get(PATH_HEALTH)
async def health() -> JSONResponse:
return await self._health_response()
@app.get(PATH_ACTION_START)
@app.post(PATH_ACTION_START)
async def action_start() -> JSONResponse:
return await self._action_response("start", self._on_start)
@app.get(PATH_ACTION_STOP)
@app.post(PATH_ACTION_STOP)
async def action_stop() -> JSONResponse:
return await self._action_response("stop", self._on_stop)
return app
async def _health_response(self) -> JSONResponse:
"""Сформировать HTTP-ответ из колбэка здоровья приложения."""
try:
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
status = payload.get("status", "unhealthy")
status_code = 200 if status == "ok" else 503
return JSONResponse(content=payload, status_code=status_code)
except Exception as exc: # noqa: BLE001
return JSONResponse(
content={"status": "unhealthy", "detail": str(exc)},
status_code=503,
)
async def _action_response(
self,
action: str,
callback: Optional[Callable[[], Awaitable[str]]],
) -> JSONResponse:
"""Сформировать HTTP-ответ для колбэка действия start/stop."""
if callback is None:
return JSONResponse(
content={"status": "error", "detail": f"{action} handler is not configured"},
status_code=404,
)
try:
detail = await callback()
if not detail:
detail = f"{action} action accepted"
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
except Exception as exc: # noqa: BLE001
return JSONResponse(
content={"status": "error", "detail": str(exc)},
status_code=500,
)
def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]:
"""Для тестов: вернуть (status_code, payload) как раньше."""
async def _run() -> tuple[int, HealthPayload]:
response = await self._health_response()
body: Any = response.body
if isinstance(body, bytes):
body = json.loads(body.decode("utf-8"))
return response.status_code, body
return _run()
async def start(self) -> None:
"""Начать приём запросов к API здоровья и действий, если ещё не запущен."""
if self._serve_task is not None:
return
config = Config(
app=self._app,
host=self._host,
port=self._port,
log_level="warning",
)
self._uvicorn_server = Server(config)
self._serve_task = asyncio.create_task(self._uvicorn_server.serve())
await asyncio.sleep(0.05)
if self._uvicorn_server.servers:
sock = self._uvicorn_server.servers[0].sockets[0]
self._bound_port = sock.getsockname()[1]
else:
self._bound_port = self._port
async def stop(self) -> None:
"""Остановить management-сервер и освободить сокет."""
if self._uvicorn_server is None or self._serve_task is None:
return
self._uvicorn_server.should_exit = True
await self._serve_task
self._uvicorn_server = None
self._serve_task = None
self._bound_port = None
@property
def port(self) -> int:
"""Порт, на котором слушает сервер (после start); при port=0 — фактически выданный ОС."""
return self._bound_port if self._bound_port is not None else self._port
# Backward-compatible alias.
HealthServer = ManagementServer

View File

@@ -0,0 +1,42 @@
"""Общие типы V2: состояние здоровья, жизненного цикла и настройки management-сервера.
Используются в core, management и control для единообразных контрактов."""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Literal, TypedDict
HealthState = Literal["ok", "degraded", "unhealthy"]
class HealthPayload(TypedDict, total=False):
status: HealthState
detail: str
state: str
"""Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped)."""
class LifecycleState(str, Enum):
IDLE = "idle"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
STOPPED = "stopped"
@dataclass
class ManagementServerSettings:
"""Настройки management HTTP-сервера и healthcheck (один объект на оба)."""
enabled: bool = False
host: str = "0.0.0.0"
port: int = 8000
timeout: float = 3.0
"""Таймаут запроса health (секунды)."""
health_timeout: float = 30.0
"""Секунды без успешного execute(), после которых health = unhealthy."""
# Backward-compatible alias.
HealthServerSettings = ManagementServerSettings

View File

@@ -1,31 +1,54 @@
#import os
#os.chdir(os.path.dirname(__file__))
# import os
# os.chdir(os.path.dirname(__file__))
import asyncio
import logging
from pathlib import Path
from config_manager import ConfigManager
import logging
import asyncio
from typing import Optional
from config_manager.v1.log_manager import LogManager
from config_manager.v2 import ManagementServerSettings
logger = logging.getLogger()
# Таймаут health: без успешного execute() дольше этого времени — unhealthy.
HEALTH_TIMEOUT = 3.0
class MyApp(ConfigManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.iter = 0
def execute(self) -> None:
logger.info(f"current iteration {self.iter}")
"""Успешный прогон сбрасывает таймер health (обновляет время последнего успеха)."""
logger.info("current iteration %s", self.iter)
self.iter += 1
async def main():
app = MyApp("config.yaml")
app.start()
logger.info("App started")
await asyncio.sleep(20)
await app.stop()
async def main() -> None:
log_manager = LogManager()
# Один объект: и HTTP management-сервер (enabled, port), и health (health_timeout).
management_settings = ManagementServerSettings(
enabled=True,
port=8000,
health_timeout=HEALTH_TIMEOUT,
)
config_path = Path(__file__).parent / "config.yaml"
app = MyApp(
str(config_path),
log_manager=log_manager,
management_settings=management_settings,
)
logger.info("App starting (health_timeout=%s)", HEALTH_TIMEOUT)
# Менеджер запускаем в фоне (start() не возвращает управление до stop).
asyncio.create_task(app.start())
logger.info("App running; Ctrl+C to stop")
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())

View File

@@ -0,0 +1,33 @@
import asyncio
from config_manager.v2 import ConfigManagerV2
class ReloadApp(ConfigManagerV2):
def execute(self) -> None:
return
def test_invalid_config_keeps_last_valid(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.2\nupdate_interval: 0.05\n", encoding="utf-8")
app = ReloadApp(str(cfg))
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12)
assert app.work_interval == 0.2
assert app.update_interval == 0.05
cfg.write_text("work_interval: [broken\n", encoding="utf-8")
await asyncio.sleep(0.15)
assert app.work_interval == 0.2
assert app.update_interval == 0.05
assert isinstance(app.config, dict)
await app.stop()
await runner
asyncio.run(scenario())

View File

@@ -0,0 +1,30 @@
import asyncio
from config_manager.v2 import ConfigManagerV2
class DemoApp(ConfigManagerV2):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
def execute(self) -> None:
self.calls += 1
def test_execute_loop_runs(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
app = DemoApp(str(cfg))
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.18)
await app.stop()
await runner
assert app.calls >= 2
assert isinstance(app.config, dict)
asyncio.run(scenario())

View File

@@ -0,0 +1,54 @@
import asyncio
from config_manager.v2 import ConfigManagerV2
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
class DummyControlChannel(ControlChannel):
def __init__(self):
self.on_start: StartHandler | None = None
self.on_stop: StopHandler | None = None
self.on_status: StatusHandler | None = None
self.started = False
self.stopped = False
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
self.on_start = on_start
self.on_stop = on_stop
self.on_status = on_status
self.started = True
async def stop(self) -> None:
self.stopped = True
class ControlledApp(ConfigManagerV2):
def execute(self) -> None:
return
def test_control_channel_can_stop_manager(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
channel = DummyControlChannel()
app = ControlledApp(str(cfg), control_channel=channel)
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12)
assert channel.started is True
assert channel.on_status is not None
assert channel.on_stop is not None
status_text = await channel.on_status()
assert "state=running" in status_text
stop_text = await channel.on_stop()
assert "stop signal accepted" in stop_text
await runner
assert channel.stopped is True
asyncio.run(scenario())

View File

@@ -0,0 +1,100 @@
import asyncio
import json
from config_manager.v2.management import ManagementServer
def test_health_mapping_ok_to_200():
async def provider():
return {"status": "ok"}
async def scenario() -> None:
server = ManagementServer(
host="127.0.0.1",
port=8000,
timeout=0.2,
health_provider=provider,
)
code, payload = await server._build_health_response()
assert code == 200
assert payload["status"] == "ok"
asyncio.run(scenario())
def test_health_mapping_unhealthy_to_503():
async def provider():
return {"status": "unhealthy", "detail": "worker failed"}
async def scenario() -> None:
server = ManagementServer(
host="127.0.0.1",
port=8000,
timeout=0.2,
health_provider=provider,
)
code, payload = await server._build_health_response()
assert code == 503
assert payload["status"] == "unhealthy"
asyncio.run(scenario())
def test_action_routes_call_callbacks():
events: list[str] = []
async def provider():
return {"status": "ok"}
async def on_start() -> str:
events.append("start")
return "start accepted"
async def on_stop() -> str:
events.append("stop")
return "stop accepted"
async def request(port: int, path: str) -> tuple[int, dict[str, str]]:
reader, writer = await asyncio.open_connection("127.0.0.1", port)
writer.write(
f"GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n".encode("utf-8")
)
await writer.drain()
raw = await reader.read()
writer.close()
await writer.wait_closed()
header, body = raw.split(b"\r\n\r\n", maxsplit=1)
status_code = int(header.split(b" ")[1])
payload = json.loads(body.decode("utf-8"))
return status_code, payload
async def scenario() -> None:
server = ManagementServer(
host="127.0.0.1",
port=0,
timeout=0.2,
health_provider=provider,
on_start=on_start,
on_stop=on_stop,
)
await server.start()
try:
port = server.port
assert port > 0
start_code, start_payload = await request(port, "/actions/start")
stop_code, stop_payload = await request(port, "/actions/stop")
finally:
await server.stop()
assert start_code == 200
assert start_payload["status"] == "ok"
assert start_payload["detail"] == "start accepted"
assert stop_code == 200
assert stop_payload["status"] == "ok"
assert stop_payload["detail"] == "stop accepted"
assert events == ["start", "stop"]
asyncio.run(scenario())

View File

@@ -0,0 +1,43 @@
import asyncio
import threading
import time
from config_manager.v2 import ConfigManagerV2
class BlockingApp(ConfigManagerV2):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.started_event = threading.Event()
self.active_event = threading.Event()
self.calls = 0
def execute(self) -> None:
self.calls += 1
self.active_event.set()
self.started_event.set()
time.sleep(0.2)
self.active_event.clear()
def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
app = BlockingApp(str(cfg))
runner = asyncio.create_task(app.start())
started = await asyncio.to_thread(app.started_event.wait, 1.0)
assert started is True
await app.stop()
await runner
assert app.active_event.is_set() is False
assert app.calls == 1
await asyncio.sleep(0.15)
assert app.calls == 1
asyncio.run(scenario())