19 Commits

Author SHA1 Message Date
aee8f7460f Апгрейд версии 2026-02-26 11:25:06 +03:00
ad89b3db92 Фикс зависаний и логирование от кодекса 2026-02-26 11:24:34 +03:00
6502f2252d Добавлен execute_timeout 2026-02-24 20:59:46 +03:00
67cd098f54 Апдейт версии 2026-02-22 23:03:43 +03:00
8d177b0fd1 Добави логирование в работу worker loop 2026-02-22 23:03:11 +03:00
3293814898 ввв 2026-02-21 23:39:29 +03:00
e7d11ddf71 Добавил принт 2026-02-21 23:38:48 +03:00
2b02af60d5 Сделал сам 2026-02-21 23:35:16 +03:00
b2442f4d91 Поправил типы 2026-02-21 22:53:52 +03:00
f5bb681ddb Пофиксил версию 2026-02-21 22:47:01 +03:00
058c19d677 Изменена логика задания таймаутов ожидания 2026-02-21 22:45:41 +03:00
608cd42719 Апдейт версии 2026-02-21 15:27:27 +03:00
2d6179d366 Рефакторинг и добавил апишку для управления 2026-02-21 15:14:34 +03:00
d888ae7acb Первая итерация рефакторинга 2026-02-21 00:20:07 +03:00
1d71ce406f Merge pull request 'Version update' (#2) from feature/healthcheck-requirements into master
Reviewed-on: #2
2026-02-19 19:51:54 +00:00
80dd69c5ec Version update 2026-02-19 22:50:51 +03:00
8da6df0b2a Merge pull request 'feature/healthcheck-requirements' (#1) from feature/healthcheck-requirements into master
Reviewed-on: #1
2026-02-19 19:49:27 +00:00
7eb3476b96 ConfigManager V2 2026-02-19 22:45:06 +03:00
da8ed4fa2b docs: add healthcheck requirements and README_DEPLOY
- Add docs/HEALTHCHECK_REQUIREMENTS.md with full spec (purpose, deploy.sh
  behaviour, endpoint contract, get_health_status(), app requirements,
  infrastructure)
- Add README_DEPLOY.md with healthcheck section, link to requirements
  and HEALTHCHECK_* env vars

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-18 23:11:53 +03:00
27 changed files with 1607 additions and 41 deletions

View File

@@ -3,6 +3,14 @@
This package was created to run my applications.
The ConfigManager class implements the entry point for the program and provides the actual application configuration. It also simplifies logging setup.
## Logging (v2)
Logging is configured from the config file only if it contains a **`log`** section in [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig) format. If there is no `log` section, the manager logs a warning and the default Python level (WARNING) remains, so INFO/DEBUG messages may not appear.
**How to verify that logging config is applied:**
- Ensure your config file path is correct and the file is loaded on startup (no error in logs about reading config).
- Ensure the config has a `log` key with `version: 1`, `handlers`, and `loggers` (see `tests/config.yaml` for an example).
- After startup you should see an INFO message: `"Logging configuration applied"` (from `config_manager.v1.log_manager`). If you do not see it, either the `log` section is missing (you will see a warning) or the root/package log level is above INFO.
## Installation
``pip install git+https://git.lesha.spb.ru/alex/config_manager.git``

29
README_DEPLOY.md Normal file
View File

@@ -0,0 +1,29 @@
# Деплой / Deploy
Краткое описание процесса деплоя приложений на базе config_manager и используемых скриптов.
A short description of the deploy process for applications based on config_manager and the scripts used.
---
## Healthcheck
После поднятия контейнеров (`docker compose up -d`) скрипт деплоя может ожидать успешного ответа по URL проверки здоровья приложения. При таймауте выполняется откат и выход с ошибкой.
After bringing up containers (`docker compose up -d`), the deploy script may wait for a successful response at the application health-check URL. On timeout, rollback is performed and the script exits with an error.
**Полная спецификация:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md).
**Full specification:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md).
### Переменные окружения / Environment variables
| Переменная | Назначение | Пример/дефолт |
| ----------------------- | ---------------------------------------- | ------------------------------- |
| `HEALTHCHECK_URL` | URL для проверки здоровья | `http://127.0.0.1:8000/health` |
| `HEALTHCHECK_TIMEOUT` | Максимальное время ожидания (секунды) | `120` |
| `HEALTHCHECK_INTERVAL` | Интервал между попытками (секунды) | `5` |
Если задана `HEALTHCHECK_URL`, деплой после поднятия контейнеров вызывает этот URL (например, через `curl -fsS --max-time 5`); успех — HTTP 2xx, иначе повтор до истечения `HEALTHCHECK_TIMEOUT`.
If `HEALTHCHECK_URL` is set, deploy calls this URL after bringing up containers (e.g. via `curl -fsS --max-time 5`); success is HTTP 2xx, otherwise retries until `HEALTHCHECK_TIMEOUT` expires.

View File

@@ -0,0 +1,95 @@
# Требования к healthcheck / Healthcheck Requirements
Единая спецификация для реализации healthcheck в config_manager и в приложениях (в т.ч. MailOrderBot).
A unified specification for implementing healthcheck in config_manager and in applications (including MailOrderBot).
---
## 1. Назначение / Purpose
- Healthcheck используется скриптом деплоя (`deploy.sh`): после `docker compose up -d` деплой ждёт успешного ответа по `HEALTHCHECK_URL`; при таймауте — откат и выход с ошибкой.
- The healthcheck is used by the deploy script (`deploy.sh`): after `docker compose up -d`, the deploy waits for a successful response at `HEALTHCHECK_URL`; on timeout — rollback and exit with error.
- Эндпоинт должен отражать **реальное состояние приложения**, а не только факт работы HTTP-сервера (иначе деплой может считать успешным запуск «зависшего» или упавшего воркера).
- The endpoint must reflect the **actual state of the application**, not just that the HTTP server is running (otherwise deploy may consider successful a «hung» or crashed worker).
---
## 2. Поведение deploy.sh / deploy.sh behaviour
(Логика уже может быть реализована в коде deploy-скрипта.)
- Если задана переменная `HEALTHCHECK_URL` — после поднятия контейнеров вызывается `wait_for_healthcheck`: цикл с интервалом `HEALTHCHECK_INTERVAL` (по умолчанию 5 с), пока не истечёт `HEALTHCHECK_TIMEOUT` (по умолчанию 120 с).
- If `HEALTHCHECK_URL` is set — after bringing up containers, `wait_for_healthcheck` is called: a loop with interval `HEALTHCHECK_INTERVAL` (default 5 s) until `HEALTHCHECK_TIMEOUT` (default 120 s) expires.
- Проверка: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Флаг `-f`: любой HTTP-код 4xx/5xx считается ошибкой (повтор до таймаута).
- Check: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Flag `-f`: any HTTP 4xx/5xx is treated as failure (retry until timeout).
- **Успех:** HTTP 2xx. **Неуспех:** не 2xx или таймаут curl/соединения → деплой падает с откатом.
- **Success:** HTTP 2xx. **Failure:** non-2xx or curl/connection timeout → deploy fails with rollback.
### Переменные окружения / Environment variables
| Переменная / Variable | Назначение / Purpose | Пример/дефолт / Example/default |
| ------------------------- | --------------------------------- | ----------------------------------- |
| `HEALTHCHECK_URL` | URL для проверки / Check URL | `http://127.0.0.1:8000/health` |
| `HEALTHCHECK_TIMEOUT` | Макс. время ожидания (сек) / Max wait (s) | `120` |
| `HEALTHCHECK_INTERVAL` | Интервал между попытками (сек) / Interval between attempts (s) | `5` |
---
## 3. Контракт эндпоинта / Endpoint contract
- **Метод и путь / Method and path:** `GET /health` (или иной путь по соглашению; один для всех приложений на config_manager).
- **Method and path:** `GET /health` (or another agreed path; one for all applications using config_manager).
- **Успех (приложение в порядке) / Success (application healthy):** HTTP **200**, опционально тело JSON: `{"status": "ok"}`.
- **Success (application healthy):** HTTP **200**, optional JSON body: `{"status": "ok"}`.
- **Приложение не в порядке / Application not healthy:** HTTP **503**, опционально тело: `{"status": "unhealthy"|"degraded", "detail": "причина"}`.
- **Application not healthy:** HTTP **503**, optional body: `{"status": "unhealthy"|"degraded", "detail": "reason"}`.
- Ответ должен приходить в разумное время (рекомендуемый таймаут вызова логики проверки в config_manager — 25 с), иначе deploy через `curl --max-time 5` получит таймаут и будет повторять запросы.
- The response must arrive within a reasonable time (recommended timeout for the check logic in config_manager — 25 s); otherwise deploy will get a timeout via `curl --max-time 5` and will retry.
---
## 4. Обратная связь от приложения (config_manager) / Application feedback (config_manager)
- Эндпоинт реализуется в **config_manager** (опционально, при включённой опции).
- The endpoint is implemented in **config_manager** (optional, when the option is enabled).
- При обработке `GET /health` config_manager **не решает сам** «здорово ли приложение», а вызывает метод приложения, например: `app.get_health_status()`.
- When handling `GET /health`, config_manager does **not** decide by itself whether the application is healthy; it calls the application method, e.g. `app.get_health_status()`.
- **Контракт метода / Method contract** (приложение переопределяет в наследнике):
- **Method contract** (application overrides in subclass):
- Возвращает `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (поле `detail` опционально).
- Returns `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (`detail` optional).
- При исключении или превышении таймаута вызова — считать состояние unhealthy и отдавать 503.
- On exception or call timeout — treat as unhealthy and return 503.
Таким образом, основное приложение «даёт обратную связь» через реализацию `get_health_status()` (флаги после сбоев, проверка БД, heartbeat и т.д.).
Thus, the main application provides feedback via `get_health_status()` (flags after failures, DB check, heartbeat, etc.).
---
## 5. Требования к приложению (например, MailOrderBot) / Application requirements (e.g. MailOrderBot)
- Переопределить `get_health_status()` и возвращать:
- Override `get_health_status()` and return:
- `{"status": "ok"}` при нормальной работе;
- `{"status": "ok"}` when running normally;
- `{"status": "unhealthy", "detail": "..."}` при критичном сбое (например, последний `execute()` упал, зависимость недоступна);
- `{"status": "unhealthy", "detail": "..."}` on critical failure (e.g. last `execute()` failed, dependency unavailable);
- при желании — `{"status": "degraded", "detail": "..."}` для нефатальной деградации (в обоих случаях эндпоинт отдаёт 503 для совместимости с `curl -f`).
- optionally — `{"status": "degraded", "detail": "..."}` for non-fatal degradation (in both cases the endpoint returns 503 for compatibility with `curl -f`).
---
## 6. Инфраструктура / Infrastructure
- Для работы healthcheck приложение должно поднимать HTTP-сервер (в config_manager при включённой опции) и пробрасывать порт в `docker-compose` (например `8000:8000`), чтобы `deploy.sh` на хосте мог обращаться по `HEALTHCHECK_URL` (например `http://127.0.0.1:8000/health`).
- For healthcheck to work, the application must run an HTTP server (in config_manager when the option is enabled) and expose the port in `docker-compose` (e.g. `8000:8000`), so that `deploy.sh` on the host can call `HEALTHCHECK_URL` (e.g. `http://127.0.0.1:8000/health`).

View File

@@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta"
[project]
name = "config_manager"
version = "1.2.2"
description = "Config manager for building applications"
version = "2.1.7"
description = "Фикс вечного цикла при ошибке"
authors = [
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
]
@@ -13,6 +13,8 @@ readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"PyYAML>=6.0",
"fastapi>=0.100.0",
"uvicorn[standard]>=0.22.0",
]
[project.urls]

View File

@@ -1,2 +1,3 @@
from .cfg_manager import ConfigManager
from .log_manager import LogManager
from .v2 import ConfigManagerV2 as ConfigManager
from .v1.cfg_manager import ConfigManager as LegacyConfigManager
from .v1.log_manager import LogManager

View File

@@ -9,8 +9,8 @@ from typing import Any, Optional
from .log_manager import LogManager
class ConfigManager:
DEFAULT_UPDATE_INTERVAL = 5.0
DEFAULT_WORK_INTERVAL = 2.0
DEFAULT_UPDATE_INTERVAL = 5
DEFAULT_WORK_INTERVAL = 2
def __init__(self, path: str, log_manager: Optional[LogManager] = None):
self.path = path
@@ -81,9 +81,13 @@ class ConfigManager:
pass
async def _worker_loop(self) -> None:
while not self._halt.is_set():
await asyncio.to_thread(self.execute)
await asyncio.sleep(self.work_interval)
self.logger.warning("Worker loop started")
try:
while not self._halt.is_set():
await asyncio.to_thread(self.execute)
await asyncio.sleep(self.work_interval)
finally:
self.logger.warning("Worker loop stopped")
async def _periodic_update_loop(self) -> None:
while not self._halt.is_set():

View File

@@ -22,6 +22,9 @@ class LogManager:
"""
logging_config = config.get("log")
if not logging_config:
self.logger.warning(
"Config has no 'log' section; logging parameters from config are not applied (default level may be WARNING)."
)
return
try:

View File

@@ -0,0 +1,7 @@
"""Публичный API V2: точка входа в менеджер конфигурации и настройки management-сервера.
Экспортирует ConfigManagerV2 и типы настроек для использования приложениями."""
from .core import ConfigManagerV2
from .types import HealthServerSettings, ManagementServerSettings
__all__ = ["ConfigManagerV2", "ManagementServerSettings", "HealthServerSettings"]

View File

@@ -0,0 +1,7 @@
"""Каналы внешнего управления: абстракция и реализация (например, Telegram).
Позволяет запускать, останавливать и запрашивать статус менеджера через ботов и другие интерфейсы."""
from .base import ControlChannel
from .telegram import TelegramControlChannel
__all__ = ["ControlChannel", "TelegramControlChannel"]

View File

@@ -0,0 +1,24 @@
"""Базовый абстрактный канал управления и типы обработчиков команд.
Определяет контракт: старт/стоп канала и привязка обработчиков start/stop/status."""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
StartHandler = Callable[[], Awaitable[str]]
StopHandler = Callable[[], Awaitable[str]]
StatusHandler = Callable[[], Awaitable[str]]
class ControlChannel(ABC):
@abstractmethod
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
"""Запустить канал и привязать обработчики команд."""
raise NotImplementedError
@abstractmethod
async def stop(self) -> None:
"""Остановить канал и освободить его ресурсы."""
raise NotImplementedError

View File

@@ -0,0 +1,114 @@
"""Реализация канала управления через Telegram Bot API (long polling).
Принимает команды /start, /stop, /status в указанном чате и вызывает привязанные обработчики."""
from __future__ import annotations
import asyncio
import json
import logging
import urllib.parse
import urllib.request
from typing import Optional
from .base import ControlChannel, StartHandler, StatusHandler, StopHandler
class TelegramControlChannel(ControlChannel):
def __init__(
self,
token: str,
chat_id: int,
poll_interval: int = 2,
logger: Optional[logging.Logger] = None,
):
"""Инициализация канала опроса Telegram с настройками бота и чата."""
self._token = token
self._chat_id = chat_id
self._poll_interval = poll_interval
self._offset: Optional[int] = None
self._task: Optional[asyncio.Task] = None
self._stop_event = asyncio.Event()
self._on_start: Optional[StartHandler] = None
self._on_stop: Optional[StopHandler] = None
self._on_status: Optional[StatusHandler] = None
self._logger = logger or logging.getLogger(__name__)
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
"""Запустить опрос обновлений Telegram и зарегистрировать колбэки команд."""
if self._task is not None and not self._task.done():
return
self._on_start = on_start
self._on_stop = on_stop
self._on_status = on_status
self._stop_event.clear()
self._task = asyncio.create_task(self._poll_loop())
async def stop(self) -> None:
"""Остановить цикл опроса и дождаться завершения задачи."""
self._stop_event.set()
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
async def _poll_loop(self) -> None:
"""Непрерывно получать обновления и вызывать поддерживаемые команды."""
while not self._stop_event.is_set():
try:
updates = await asyncio.to_thread(self._fetch_updates)
for update in updates:
await self._process_update(update)
except Exception as exc: # noqa: BLE001
self._logger.warning("Telegram polling error: %s", exc)
try:
await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1))
except asyncio.TimeoutError:
continue
def _fetch_updates(self) -> list[dict]:
"""Запросить новые обновления Telegram с учётом последнего offset."""
params = {"timeout": 0}
if self._offset is not None:
params["offset"] = self._offset
query = urllib.parse.urlencode(params)
url = f"https://api.telegram.org/bot{self._token}/getUpdates?{query}"
with urllib.request.urlopen(url, timeout=10) as response:
payload = json.loads(response.read().decode("utf-8"))
result = payload.get("result", [])
if result:
self._offset = max(item["update_id"] for item in result) + 1
return result
async def _process_update(self, update: dict) -> None:
"""Обработать одно обновление Telegram и выполнить соответствующую команду."""
message = update.get("message") or {}
text = (message.get("text") or "").strip().lower()
chat = message.get("chat") or {}
chat_id = chat.get("id")
if chat_id != self._chat_id:
return
if text in {"/start", "/run"} and self._on_start is not None:
reply = await self._on_start()
elif text in {"/stop", "/halt"} and self._on_stop is not None:
reply = await self._on_stop()
elif text in {"/status", "/health"} and self._on_status is not None:
reply = await self._on_status()
else:
return
await asyncio.to_thread(self._send_message, reply)
def _send_message(self, text: str) -> None:
"""Отправить текстовый ответ в настроенный чат Telegram."""
encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text})
url = f"https://api.telegram.org/bot{self._token}/sendMessage"
req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST")
with urllib.request.urlopen(req, timeout=10):
return

View File

@@ -0,0 +1,8 @@
"""Ядро V2: жизненный цикл менеджера, загрузка конфига и цикл воркера.
Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute()."""
from .config_loader import ConfigLoader
from .manager import ConfigManagerV2
from .scheduler import WorkerLoop
__all__ = ["ConfigLoader", "ConfigManagerV2", "WorkerLoop"]

View File

@@ -0,0 +1,93 @@
"""Загрузчик конфигурации из файла (YAML/JSON) с обнаружением изменений по хешу.
Читает файл синхронно и асинхронно, парсит по расширению и возвращает последний валидный конфиг при ошибках."""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import os
from typing import Any, Optional
import yaml
logger = logging.getLogger(__name__)
class ConfigLoader:
def __init__(self, path: str):
"""Инициализация состояния загрузчика для указанного пути к файлу конфига."""
self.path = path
self.config: Any = None
self.last_valid_config: Any = None
self._last_seen_hash: Optional[str] = None
logger.warning("ConfigLoader.__init__ result: path=%s", self.path)
def _read_file_sync(self) -> str:
"""Синхронно прочитать сырой текст конфига с диска."""
with open(self.path, "r", encoding="utf-8") as fh:
data = fh.read()
logger.warning("ConfigLoader._read_file_sync result: bytes=%s", len(data))
return data
async def read_file_async(self) -> str:
"""Прочитать сырой текст конфига с диска в рабочем потоке."""
result = await asyncio.to_thread(self._read_file_sync)
logger.warning("ConfigLoader.read_file_async result: bytes=%s", len(result))
return result
def parse_config(self, data: str) -> Any:
"""Распарсить текст конфига как YAML или JSON по расширению файла."""
extension = os.path.splitext(self.path)[1].lower()
try:
if extension in (".yaml", ".yml"):
result = yaml.safe_load(data)
else:
result = json.loads(data)
except Exception: # noqa: BLE001
logger.exception("ConfigLoader.parse_config error: extension=%s", extension)
raise
logger.warning(
"ConfigLoader.parse_config result: extension=%s type=%s",
extension,
type(result).__name__,
)
return result
@staticmethod
def _calculate_hash(data: str) -> str:
"""Вычислить устойчивый хеш содержимого для обнаружения изменений."""
result = hashlib.sha256(data.encode("utf-8")).hexdigest()
logger.warning("ConfigLoader._calculate_hash result: hash=%s", result)
return result
async def load_if_changed(self) -> tuple[bool, Any]:
"""Загрузить и распарсить конфиг только при изменении содержимого файла."""
raw_data = await self.read_file_async()
current_hash = self._calculate_hash(raw_data)
if current_hash == self._last_seen_hash:
logger.warning("ConfigLoader.load_if_changed result: changed=False")
return False, self.config
self._last_seen_hash = current_hash
parsed = self.parse_config(raw_data)
self.config = parsed
self.last_valid_config = parsed
logger.warning("ConfigLoader.load_if_changed result: changed=True")
return True, parsed
def extract_scheduler_intervals(
config: Any,
default_update: float,
default_work: float,
) -> tuple[float, float]:
"""Извлекает update_interval и work_interval из конфига (dict). Возвращает значения по умолчанию при ошибке."""
if not isinstance(config, dict):
return default_update, default_work
upd = config.get("update_interval")
wrk = config.get("work_interval")
u = float(upd) if isinstance(upd, (int, float)) and upd > 0 else default_update
w = float(wrk) if isinstance(wrk, (int, float)) and wrk > 0 else default_work
return u, w

View File

@@ -0,0 +1,348 @@
"""Config manager v2: runtime orchestration and configuration updates."""
from __future__ import annotations
import asyncio
import logging
import os
import time
from typing import Any, Optional
from ...v1.log_manager import LogManager
from ..control.base import ControlChannel
from ..management import (
ControlChannelBridge,
HealthAggregator,
ManagementApiBridge,
ManagementServer,
)
from ..types import HealthPayload, LifecycleState, ManagementServerSettings
from .config_loader import ConfigLoader
from .scheduler import WorkerLoop
logger = logging.getLogger(__name__)
def _read_env_interval(name: str, default_value: float) -> float:
"""Read positive float interval from env."""
raw_value = os.environ.get(name)
if raw_value is None:
return float(default_value)
try:
parsed = float(raw_value)
if parsed <= 0:
raise ValueError(f"{name} must be greater than zero")
return parsed
except Exception: # noqa: BLE001
logger.exception(
"ConfigManagerV2 interval parse error: env=%s raw_value=%s fallback=%s",
name,
raw_value,
default_value,
)
return float(default_value)
class _RuntimeController:
"""Runtime loops and lifecycle supervision."""
def _on_execute_success(self) -> None:
self._last_success_timestamp = time.monotonic()
self._last_execute_error = None
self.logger.warning(
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
self._last_success_timestamp,
)
def _on_execute_error(self, exc: Exception) -> None:
self._last_execute_error = str(exc)
self.logger.exception("ConfigManagerV2._on_execute_error")
self.logger.warning(
"ConfigManagerV2._on_execute_error result: last_execute_error=%s",
self._last_execute_error,
)
async def _worker_loop(self) -> None:
self.logger.warning(
"ConfigManagerV2._worker_loop result: started work_interval=%s",
self.work_interval,
)
worker = WorkerLoop(
execute=self.execute,
get_interval=lambda: self.work_interval,
halt_event=self._halt,
on_error=self._on_execute_error,
on_success=self._on_execute_success,
)
try:
await worker.run()
self.logger.warning("ConfigManagerV2._worker_loop result: completed")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._worker_loop error")
raise
finally:
self.logger.warning("ConfigManagerV2._worker_loop result: stopped")
async def _periodic_update_loop(self) -> None:
self.logger.warning(
"ConfigManagerV2._periodic_update_loop result: started update_interval=%s",
self.update_interval,
)
try:
while not self._halt.is_set():
await self._update_config()
try:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError:
continue
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._periodic_update_loop error")
raise
finally:
self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped")
async def _status_text(self) -> str:
health = await self._health_aggregator.collect()
detail = health.get("detail")
if detail:
status_text = f"state={self._state.value}; health={health['status']}; detail={detail}"
self.logger.warning("ConfigManagerV2._status_text result: %s", status_text)
return status_text
status_text = f"state={self._state.value}; health={health['status']}"
self.logger.warning("ConfigManagerV2._status_text result: %s", status_text)
return status_text
async def _start_control_channel(self) -> None:
if self._control_channel is None:
self.logger.warning("ConfigManagerV2._start_control_channel result: no control channel")
return
try:
await self._control_channel.start(
self._control_bridge.on_start,
self._control_bridge.on_stop,
self._control_bridge.on_status,
)
self.logger.warning("ConfigManagerV2._start_control_channel result: started")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._start_control_channel error")
async def _stop_control_channel(self) -> None:
if self._control_channel is None:
self.logger.warning("ConfigManagerV2._stop_control_channel result: no control channel")
return
try:
await self._control_channel.stop()
self.logger.warning("ConfigManagerV2._stop_control_channel result: stopped")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._stop_control_channel error")
async def _start_management_server(self) -> None:
if self._management_server is None:
self.logger.warning("ConfigManagerV2._start_management_server result: disabled")
return
try:
await self._management_server.start()
self.logger.warning(
"ConfigManagerV2._start_management_server result: started port=%s",
self._management_server.port,
)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._start_management_server error")
self.logger.warning(
"ConfigManagerV2._start_management_server result: failed worker will continue",
)
def _on_runtime_task_done(self, task: asyncio.Task) -> None:
if task.cancelled():
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: cancelled")
return
try:
exc = task.exception()
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._on_runtime_task_done error while reading task exception")
return
if exc is None:
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: completed")
return
self.logger.error(
"ConfigManagerV2 background task failed",
exc_info=(type(exc), exc, exc.__traceback__),
)
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: failed")
async def _run(self) -> None:
self._state = LifecycleState.STARTING
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value)
self._halt.clear()
await self._update_config()
await self._start_management_server()
await self._start_control_channel()
self._state = LifecycleState.RUNNING
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value)
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try:
await asyncio.gather(*tasks)
self.logger.warning("ConfigManagerV2._run result: background loops completed")
except asyncio.CancelledError:
self.logger.warning("ConfigManagerV2._run result: cancelled")
raise
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._run error")
raise
finally:
self._state = LifecycleState.STOPPING
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value)
self._halt.set()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
self._state = LifecycleState.STOPPED
self._task = None
self.logger.warning(
"ConfigManagerV2._run result: state=%s api_and_control_available=%s",
self._state.value,
True,
)
class ConfigManagerV2(_RuntimeController):
"""Public manager API."""
DEFAULT_UPDATE_INTERVAL = 5
DEFAULT_WORK_INTERVAL = 2
def __init__(
self,
path: str,
log_manager: Optional[LogManager] = None,
management_settings: Optional[ManagementServerSettings] = None,
control_channel: Optional[ControlChannel] = None,
):
self.logger = logging.getLogger(__name__)
self.path = path
self.config: Any = None
self.update_interval = _read_env_interval("UPDATE_INTERVAL", float(self.DEFAULT_UPDATE_INTERVAL))
self.work_interval = _read_env_interval("WORK_INTERVAL", float(self.DEFAULT_WORK_INTERVAL))
self._loader = ConfigLoader(path)
self._log_manager = log_manager or LogManager()
self._control_channel = control_channel
self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._state = LifecycleState.IDLE
self._last_execute_error: Optional[str] = None
self._last_success_timestamp: Optional[float] = None
settings = management_settings or ManagementServerSettings(enabled=True)
self._management_settings = settings
self._health_timeout = settings.health_timeout
self._health_aggregator = HealthAggregator(
get_state=lambda: self._state,
get_last_error=lambda: self._last_execute_error,
get_last_success_timestamp=lambda: self._last_success_timestamp,
health_timeout=self._health_timeout,
get_app_health=self.get_health_status,
)
self._api_bridge = ManagementApiBridge(start_fn=self.start, stop_fn=self.stop)
self._control_bridge = ControlChannelBridge(
halt=self._halt,
get_state=lambda: self._state,
get_status=self._status_text,
)
self._management_server: Optional[ManagementServer] = None
if settings.enabled:
self._management_server = ManagementServer(
host=settings.host,
port=settings.port,
timeout=settings.timeout,
health_provider=self._health_aggregator.collect,
on_start=self._api_bridge.on_start,
on_stop=self._api_bridge.on_stop,
)
self.logger.warning(
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s management_enabled=%s",
self.path,
self.update_interval,
self.work_interval,
self._management_server is not None,
)
def _apply_config(self, new_config: Any) -> None:
self.config = new_config
if isinstance(new_config, dict):
try:
self._log_manager.apply_config(new_config)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._apply_config error while applying logging config")
raise
self.logger.warning(
"ConfigManagerV2._apply_config result: config_type=%s is_dict=%s",
type(new_config).__name__,
isinstance(new_config, dict),
)
async def _update_config(self) -> None:
try:
changed, new_config = await self._loader.load_if_changed()
if not changed:
self.logger.warning("ConfigManagerV2._update_config result: no changes")
return
self._apply_config(new_config)
self.logger.warning("ConfigManagerV2._update_config result: config updated")
except Exception as exc: # noqa: BLE001
self.logger.exception("ConfigManagerV2._update_config error")
if self._loader.last_valid_config is None:
self.logger.warning(
"ConfigManagerV2._update_config result: no fallback config available detail=%s",
str(exc),
)
return
try:
self._apply_config(self._loader.last_valid_config)
self.logger.warning(
"ConfigManagerV2._update_config result: fallback to last valid config applied",
)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._update_config fallback error")
def execute(self) -> None:
"""Override in subclasses."""
def get_health_status(self) -> HealthPayload:
return {"status": "ok"}
async def start(self) -> None:
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManagerV2.start result: already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.exception("ConfigManagerV2.start error: must be called from within async context")
raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
self._task.add_done_callback(self._on_runtime_task_done)
self.logger.warning("ConfigManagerV2.start result: background task started")
async def stop(self) -> None:
if self._task is None:
self.logger.warning("ConfigManagerV2.stop result: not running")
return
self._halt.set()
if asyncio.current_task() is self._task:
self.logger.warning("ConfigManagerV2.stop result: stop requested from runtime task")
return
try:
await self._task
except asyncio.CancelledError:
self.logger.warning("ConfigManagerV2.stop result: runtime task cancelled")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2.stop error while awaiting runtime task")
raise
finally:
self.logger.warning("ConfigManagerV2.stop result: completed")

View File

@@ -0,0 +1,54 @@
"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке с паузой между итерациями.
Поддерживает halt-событие для остановки, колбэки on_success/on_error для учёта ошибок и здоровья."""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Callable
from typing import Optional
logger = logging.getLogger(__name__)
class WorkerLoop:
def __init__(
self,
execute: Callable[[], None],
get_interval: Callable[[], int | float],
halt_event: asyncio.Event,
on_error: Optional[Callable[[Exception], None]] = None,
on_success: Optional[Callable[[], None]] = None,
):
"""Сохранить колбэки и примитивы синхронизации для выполнения воркера."""
self._execute = execute
self._get_interval = get_interval
self._halt_event = halt_event
self._on_error = on_error
self._on_success = on_success
logger.warning(
"WorkerLoop.__init__ result: execute=%s",
getattr(execute, "__name__", execute.__class__.__name__),
)
async def run(self) -> None:
"""Вызывать execute циклически до запроса остановки."""
logger.warning("WorkerLoop.run result: started")
while not self._halt_event.is_set():
try:
await asyncio.to_thread(self._execute)
if self._on_success is not None:
self._on_success()
logger.warning("WorkerLoop.run result: execute completed")
except Exception as exc: # noqa: BLE001
logger.exception("WorkerLoop.run error during execute")
if self._on_error is not None:
self._on_error(exc)
logger.warning("WorkerLoop.run result: execute failed")
timeout = max(self._get_interval(), 0.01)
try:
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
continue
logger.warning("WorkerLoop.run result: stopped")

View File

@@ -0,0 +1,14 @@
"""Management API: HTTP-сервер для /health и /actions/start|stop, адаптеры и сбор здоровья.
Объединяет сервер, мосты к жизненному циклу и агрегатор здоровья для единого контракта API."""
from .bridges import ControlChannelBridge, ManagementApiBridge
from .health_aggregator import HealthAggregator
from .management_server import HealthServer, ManagementServer
__all__ = [
"ControlChannelBridge",
"HealthAggregator",
"HealthServer",
"ManagementApiBridge",
"ManagementServer",
]

View File

@@ -0,0 +1,90 @@
"""Адаптеры, связывающие жизненный цикл менеджера с HTTP API и каналами управления.
ManagementApiBridge отдаёт start/stop в HTTP; ControlChannelBridge — start/stop/status в Telegram и др."""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Awaitable, Callable
from ..types import LifecycleState
logger = logging.getLogger(__name__)
class ManagementApiBridge:
"""Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop)."""
def __init__(
self,
start_fn: Callable[[], Awaitable[None]],
stop_fn: Callable[[], Awaitable[None]],
):
self._start_fn = start_fn
self._stop_fn = stop_fn
logger.warning("ManagementApiBridge.__init__ result: callbacks configured")
async def on_start(self) -> str:
"""Выполнить start и вернуть сообщение для HTTP-ответа."""
try:
await self._start_fn()
except Exception: # noqa: BLE001
logger.exception("ManagementApiBridge.on_start error")
raise
result = "start completed"
logger.warning("ManagementApiBridge.on_start result: %s", result)
return result
async def on_stop(self) -> str:
"""Выполнить stop и вернуть сообщение для HTTP-ответа."""
try:
await self._stop_fn()
except Exception: # noqa: BLE001
logger.exception("ManagementApiBridge.on_stop error")
raise
result = "stop completed"
logger.warning("ManagementApiBridge.on_stop result: %s", result)
return result
class ControlChannelBridge:
"""Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram)."""
def __init__(
self,
halt: asyncio.Event,
get_state: Callable[[], LifecycleState],
get_status: Callable[[], Awaitable[str]],
):
self._halt = halt
self._get_state = get_state
self._get_status = get_status
logger.warning("ControlChannelBridge.__init__ result: callbacks configured")
async def on_start(self) -> str:
"""Обработать внешний start: сбросить halt; идемпотентно при уже running."""
if self._get_state() == LifecycleState.RUNNING:
result = "already running"
logger.warning("ControlChannelBridge.on_start result: %s", result)
return result
self._halt.clear()
result = "start signal accepted"
logger.warning("ControlChannelBridge.on_start result: %s", result)
return result
async def on_stop(self) -> str:
"""Обработать внешний stop: установить halt."""
self._halt.set()
result = "stop signal accepted"
logger.warning("ControlChannelBridge.on_stop result: %s", result)
return result
async def on_status(self) -> str:
"""Вернуть текущий текст статуса."""
try:
result = await self._get_status()
except Exception: # noqa: BLE001
logger.exception("ControlChannelBridge.on_status error")
raise
logger.warning("ControlChannelBridge.on_status result: %s", result)
return result

View File

@@ -0,0 +1,67 @@
"""Собирает состояние жизненного цикла и здоровья в один ответ для /health.
Здоровье = был успешный execute() за последние health_timeout секунд; иначе unhealthy с деталью (ошибка или таймаут)."""
from __future__ import annotations
import logging
import time
from collections.abc import Callable
from ..types import HealthPayload, LifecycleState
logger = logging.getLogger(__name__)
class HealthAggregator:
"""Формирует ответ здоровья по времени последнего успешного execute() и таймауту."""
def __init__(
self,
get_state: Callable[[], LifecycleState],
get_last_error: Callable[[], str | None],
get_last_success_timestamp: Callable[[], float | None],
health_timeout: int,
get_app_health: Callable[[], HealthPayload],
):
self._get_state = get_state
self._get_last_error = get_last_error
self._get_last_success_timestamp = get_last_success_timestamp
self._health_timeout = health_timeout
self._get_app_health = get_app_health
logger.warning("HealthAggregator.__init__ result: health_timeout=%s", self._health_timeout)
async def collect(self) -> HealthPayload:
"""Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state."""
state = self._get_state()
state_value = state.value
# Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy.
if state != LifecycleState.RUNNING:
result = {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result)
return result
last_success = self._get_last_success_timestamp()
now = time.monotonic()
if last_success is None:
detail = self._get_last_error() or "no successful run yet"
result = {"status": "unhealthy", "detail": detail, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result)
return result
if (now - last_success) > self._health_timeout:
detail = self._get_last_error() or f"no successful run within {self._health_timeout}s"
result = {"status": "unhealthy", "detail": detail, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result)
return result
result = self._get_app_health()
status = result.get("status", "unhealthy")
if status != "ok":
unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value}
logger.warning("HealthAggregator.collect result: %s", unhealthy)
return unhealthy
healthy = {**result, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", healthy)
return healthy

View File

@@ -0,0 +1,264 @@
"""Management HTTP API with /health, /actions/start and /actions/stop."""
from __future__ import annotations
import asyncio
import json
import logging
from collections.abc import Awaitable, Callable
from typing import Any, Optional
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from uvicorn import Config, Server
from ..types import HealthPayload
PATH_HEALTH = "/health"
PATH_ACTION_START = "/actions/start"
PATH_ACTION_STOP = "/actions/stop"
logger = logging.getLogger(__name__)
class UvicornServerRunner:
"""Lifecycle wrapper around uvicorn Server."""
def __init__(self, host: str, port: int, timeout: int):
self._host = host
self._port = port
self._timeout = timeout
self._server: Optional[Server] = None
self._serve_task: Optional[asyncio.Task[None]] = None
self._bound_port: Optional[int] = None
logger.warning(
"UvicornServerRunner.__init__ result: host=%s port=%s timeout=%s",
self._host,
self._port,
self._timeout,
)
async def _raise_if_start_task_failed(self) -> None:
if self._serve_task is None or not self._serve_task.done():
return
try:
await self._serve_task
except SystemExit as exc:
raise RuntimeError(f"Management server exited during startup with code {exc.code}") from exc
except Exception as exc: # noqa: BLE001
raise RuntimeError("Management server failed during startup") from exc
raise RuntimeError("Management server stopped unexpectedly during startup")
async def _wait_until_started(self) -> None:
if self._server is None:
raise RuntimeError("Management server is not initialized")
loop = asyncio.get_running_loop()
deadline = loop.time() + max(float(self._timeout), 1.0)
while not self._server.started:
await self._raise_if_start_task_failed()
if loop.time() >= deadline:
raise TimeoutError("Management server startup timed out")
await asyncio.sleep(0.05)
def _resolve_bound_port(self) -> int:
if self._server is None:
return self._port
servers = getattr(self._server, "servers", None)
if not servers:
return self._port
sockets = getattr(servers[0], "sockets", None)
if not sockets:
return self._port
sockname = sockets[0].getsockname()
if isinstance(sockname, tuple) and len(sockname) > 1:
return int(sockname[1])
return self._port
async def _cleanup_start_failure(self) -> None:
if self._server is not None:
self._server.should_exit = True
if self._serve_task is not None:
try:
await self._serve_task
except BaseException: # noqa: BLE001
logger.exception("UvicornServerRunner._cleanup_start_failure error")
self._server = None
self._serve_task = None
self._bound_port = None
logger.warning("UvicornServerRunner._cleanup_start_failure result: state reset")
async def start(self, app: FastAPI) -> None:
if self._serve_task is not None and not self._serve_task.done():
logger.warning("UvicornServerRunner.start result: already running")
return
if self._serve_task is not None and self._serve_task.done():
self._serve_task = None
try:
config = Config(app=app, host=self._host, port=self._port, log_level="warning")
self._server = Server(config)
self._serve_task = asyncio.create_task(self._server.serve(), name="management-server-serve")
await self._wait_until_started()
self._bound_port = self._resolve_bound_port()
logger.warning(
"UvicornServerRunner.start result: running host=%s requested_port=%s bound_port=%s",
self._host,
self._port,
self._bound_port,
)
except Exception:
logger.exception("UvicornServerRunner.start error")
await self._cleanup_start_failure()
raise
async def stop(self) -> None:
if self._server is None or self._serve_task is None:
logger.warning("UvicornServerRunner.stop result: already stopped")
return
self._server.should_exit = True
try:
await self._serve_task
except BaseException: # noqa: BLE001
logger.exception("UvicornServerRunner.stop error")
raise
finally:
self._server = None
self._serve_task = None
self._bound_port = None
logger.warning("UvicornServerRunner.stop result: stopped")
@property
def port(self) -> int:
result = self._bound_port if self._bound_port is not None else self._port
logger.warning("UvicornServerRunner.port result: %s", result)
return result
class ManagementServer:
"""Management API endpoints and callback adapters."""
def __init__(
self,
host: str,
port: int,
timeout: int,
health_provider: Callable[[], Awaitable[HealthPayload]],
on_start: Optional[Callable[[], Awaitable[str]]] = None,
on_stop: Optional[Callable[[], Awaitable[str]]] = None,
):
self._timeout = timeout
self._health_provider = health_provider
self._on_start = on_start
self._on_stop = on_stop
self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout)
self._app = self._create_app()
logger.warning(
"ManagementServer.__init__ result: host=%s port=%s timeout=%s",
host,
port,
timeout,
)
def _create_app(self) -> FastAPI:
app = FastAPI(title="Config Manager Management API")
@app.get(PATH_HEALTH)
async def health() -> JSONResponse:
return await self._health_response()
@app.get(PATH_ACTION_START)
@app.post(PATH_ACTION_START)
async def action_start() -> JSONResponse:
return await self._action_response("start", self._on_start)
@app.get(PATH_ACTION_STOP)
@app.post(PATH_ACTION_STOP)
async def action_stop() -> JSONResponse:
return await self._action_response("stop", self._on_stop)
logger.warning(
"ManagementServer._create_app result: routes=%s,%s,%s",
PATH_HEALTH,
PATH_ACTION_START,
PATH_ACTION_STOP,
)
return app
async def _health_response(self) -> JSONResponse:
try:
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
status_code = 200 if payload.get("status", "unhealthy") == "ok" else 503
logger.warning(
"ManagementServer._health_response result: status_code=%s payload=%s",
status_code,
payload,
)
return JSONResponse(content=payload, status_code=status_code)
except Exception as exc: # noqa: BLE001
logger.exception("ManagementServer._health_response error")
return JSONResponse(content={"status": "unhealthy", "detail": str(exc)}, status_code=503)
async def _action_response(
self,
action: str,
callback: Optional[Callable[[], Awaitable[str]]],
) -> JSONResponse:
if callback is None:
logger.warning(
"ManagementServer._action_response result: action=%s status_code=404 detail=handler not configured",
action,
)
return JSONResponse(
content={"status": "error", "detail": f"{action} handler is not configured"},
status_code=404,
)
try:
detail = await callback()
if not detail:
detail = f"{action} action accepted"
logger.warning(
"ManagementServer._action_response result: action=%s status_code=200 detail=%s",
action,
detail,
)
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
except Exception as exc: # noqa: BLE001
logger.exception("ManagementServer._action_response error: action=%s", action)
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]:
async def _run() -> tuple[int, HealthPayload]:
response = await self._health_response()
body: Any = response.body
if isinstance(body, bytes):
body = json.loads(body.decode("utf-8"))
logger.warning(
"ManagementServer._build_health_response result: status_code=%s payload=%s",
response.status_code,
body,
)
return response.status_code, body
return _run()
async def start(self) -> None:
try:
await self._runner.start(self._app)
logger.warning("ManagementServer.start result: started")
except Exception: # noqa: BLE001
logger.exception("ManagementServer.start error")
raise
async def stop(self) -> None:
try:
await self._runner.stop()
logger.warning("ManagementServer.stop result: stopped")
except BaseException: # noqa: BLE001
logger.exception("ManagementServer.stop error")
raise
@property
def port(self) -> int:
result = self._runner.port
logger.warning("ManagementServer.port result: %s", result)
return result
HealthServer = ManagementServer

View File

@@ -0,0 +1,42 @@
"""Общие типы V2: состояние здоровья, жизненного цикла и настройки management-сервера.
Используются в core, management и control для единообразных контрактов."""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Literal, TypedDict
HealthState = Literal["ok", "degraded", "unhealthy"]
class HealthPayload(TypedDict, total=False):
status: HealthState
detail: str
state: str
"""Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped)."""
class LifecycleState(str, Enum):
IDLE = "idle"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
STOPPED = "stopped"
@dataclass
class ManagementServerSettings:
"""Настройки management HTTP-сервера и healthcheck (один объект на оба)."""
enabled: bool = False
host: str = "0.0.0.0"
port: int = 8000
timeout: int = 3
"""Таймаут запроса health (секунды)."""
health_timeout: int = 30
"""Секунды без успешного execute(), после которых health = unhealthy."""
# Backward-compatible alias.
HealthServerSettings = ManagementServerSettings

View File

@@ -9,8 +9,7 @@ log:
formatters:
standard:
format: '%(asctime)s %(module)15s [%(levelname)8s]: %(message)s'
telegram:
format: '%(message)s'
handlers:
console:
@@ -26,28 +25,20 @@ log:
filename: logs/log.log
mode: a
maxBytes: 500000
backupCount: 15
#telegram:
# level: CRITICAL
# formatter: telegram
# class: logging_telegram_handler.TelegramHandler
# chat_id: 211945135
# alias: "PDC"
backupCount: 3
# -- Логгеры --
loggers:
'':
handlers: [console, file]
level: INFO
propagate: False
root:
handlers: [console, file]
level: INFO
loggers:
__main__:
handlers: [console, file]
level: DEBUG
propagate: False
config_manager:
config_manager.src.config_manager.v2.manager:
handlers: [console, file]
level: DEBUG
level: DEBUG

View File

@@ -1,34 +1,59 @@
#import os
#os.chdir(os.path.dirname(__file__))
# import os
# os.chdir(os.path.dirname(__file__))
import asyncio
import logging
from pathlib import Path
from config_manager import ConfigManager
import logging
import asyncio
from typing import Optional
from config_manager.v1.log_manager import LogManager
from config_manager.v2 import ManagementServerSettings
logger = logging.getLogger()
# Таймаут health: без успешного execute() дольше этого времени — unhealthy.
HEALTH_TIMEOUT = 3.0
class MyApp(ConfigManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.iter = 0
def execute(self) -> None:
logger.info(f"current iteration {self.iter}")
"""Успешный прогон сбрасывает таймер health (обновляет время последнего успеха)."""
#logger.critical("current iteration %s", self.iter)
#logger.error("current iteration %s", self.iter)
logger.warning("current iteration %s", self.iter)
#logger.info("current iteration %s", self.iter)
#logger.debug("current iteration %s", self.iter)
self.iter += 1
async def main():
app = MyApp("config.yaml")
logger.info("App started")
await app.start()
logger.info("App finished")
async def main() -> None:
log_manager = LogManager()
# Один объект: и HTTP management-сервер (enabled, port), и health (health_timeout).
management_settings = ManagementServerSettings(
enabled=True,
port=8000,
health_timeout=HEALTH_TIMEOUT,
)
config_path = Path(__file__).parent / "config.yaml"
print(config_path)
app = MyApp(
str(config_path),
log_manager=log_manager,
management_settings=management_settings,
)
logger.info("App starting (health_timeout=%s)", HEALTH_TIMEOUT)
# Менеджер запускаем в фоне (start() не возвращает управление до stop).
asyncio.create_task(app.start())
logger.info("App running; Ctrl+C to stop")
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())

View File

@@ -0,0 +1,36 @@
import asyncio
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings
class ReloadApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.2
def execute(self) -> None:
return
def test_invalid_config_keeps_last_valid(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8")
app = ReloadApp(str(cfg), management_settings=ManagementServerSettings(enabled=False))
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12)
assert app.work_interval == 0.2
assert app.update_interval == 0.05
cfg.write_text("work_interval: [broken\n", encoding="utf-8")
await asyncio.sleep(0.15)
assert app.work_interval == 0.2
assert app.update_interval == 0.05
assert isinstance(app.config, dict)
await app.stop()
await runner
asyncio.run(scenario())

View File

@@ -0,0 +1,33 @@
import asyncio
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings
class DemoApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.05
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
def execute(self) -> None:
self.calls += 1
def test_execute_loop_runs(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8")
app = DemoApp(str(cfg), management_settings=ManagementServerSettings(enabled=False))
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.18)
await app.stop()
await runner
assert app.calls >= 2
assert isinstance(app.config, dict)
asyncio.run(scenario())

View File

@@ -0,0 +1,61 @@
import asyncio
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
class DummyControlChannel(ControlChannel):
def __init__(self):
self.on_start: StartHandler | None = None
self.on_stop: StopHandler | None = None
self.on_status: StatusHandler | None = None
self.started = False
self.stopped = False
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
self.on_start = on_start
self.on_stop = on_stop
self.on_status = on_status
self.started = True
async def stop(self) -> None:
self.stopped = True
class ControlledApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.05
def execute(self) -> None:
return
def test_control_channel_can_stop_manager(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8")
channel = DummyControlChannel()
app = ControlledApp(
str(cfg),
control_channel=channel,
management_settings=ManagementServerSettings(enabled=False),
)
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12)
assert channel.started is True
assert channel.on_status is not None
assert channel.on_stop is not None
status_text = await channel.on_status()
assert "state=running" in status_text
stop_text = await channel.on_stop()
assert "stop signal accepted" in stop_text
await runner
# Менеджер при остановке не вызывает control_channel.stop() (канал остаётся доступным)
asyncio.run(scenario())

View File

@@ -0,0 +1,100 @@
import asyncio
import json
from config_manager.v2.management import ManagementServer
def test_health_mapping_ok_to_200():
async def provider():
return {"status": "ok"}
async def scenario() -> None:
server = ManagementServer(
host="127.0.0.1",
port=8000,
timeout=0.2,
health_provider=provider,
)
code, payload = await server._build_health_response()
assert code == 200
assert payload["status"] == "ok"
asyncio.run(scenario())
def test_health_mapping_unhealthy_to_503():
async def provider():
return {"status": "unhealthy", "detail": "worker failed"}
async def scenario() -> None:
server = ManagementServer(
host="127.0.0.1",
port=8000,
timeout=0.2,
health_provider=provider,
)
code, payload = await server._build_health_response()
assert code == 503
assert payload["status"] == "unhealthy"
asyncio.run(scenario())
def test_action_routes_call_callbacks():
events: list[str] = []
async def provider():
return {"status": "ok"}
async def on_start() -> str:
events.append("start")
return "start accepted"
async def on_stop() -> str:
events.append("stop")
return "stop accepted"
async def request(port: int, path: str) -> tuple[int, dict[str, str]]:
reader, writer = await asyncio.open_connection("127.0.0.1", port)
writer.write(
f"GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n".encode("utf-8")
)
await writer.drain()
raw = await reader.read()
writer.close()
await writer.wait_closed()
header, body = raw.split(b"\r\n\r\n", maxsplit=1)
status_code = int(header.split(b" ")[1])
payload = json.loads(body.decode("utf-8"))
return status_code, payload
async def scenario() -> None:
server = ManagementServer(
host="127.0.0.1",
port=0,
timeout=0.2,
health_provider=provider,
on_start=on_start,
on_stop=on_stop,
)
await server.start()
try:
port = server.port
assert port > 0
start_code, start_payload = await request(port, "/actions/start")
stop_code, stop_payload = await request(port, "/actions/stop")
finally:
await server.stop()
assert start_code == 200
assert start_payload["status"] == "ok"
assert start_payload["detail"] == "start accepted"
assert stop_code == 200
assert stop_payload["status"] == "ok"
assert stop_payload["detail"] == "stop accepted"
assert events == ["start", "stop"]
asyncio.run(scenario())

View File

@@ -0,0 +1,46 @@
import asyncio
import threading
import time
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings
class BlockingApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.05
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.started_event = threading.Event()
self.active_event = threading.Event()
self.calls = 0
def execute(self) -> None:
self.calls += 1
self.active_event.set()
self.started_event.set()
time.sleep(0.2)
self.active_event.clear()
def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8")
app = BlockingApp(str(cfg), management_settings=ManagementServerSettings(enabled=False))
runner = asyncio.create_task(app.start())
started = await asyncio.to_thread(app.started_event.wait, 1.0)
assert started is True
await app.stop()
await runner
assert app.active_event.is_set() is False
assert app.calls == 1
await asyncio.sleep(0.15)
assert app.calls == 1
asyncio.run(scenario())