From da8ed4fa2b68991d1d3f2eadb7404efb453da275 Mon Sep 17 00:00:00 2001 From: zosimovaa Date: Wed, 18 Feb 2026 23:11:53 +0300 Subject: [PATCH 1/2] docs: add healthcheck requirements and README_DEPLOY - Add docs/HEALTHCHECK_REQUIREMENTS.md with full spec (purpose, deploy.sh behaviour, endpoint contract, get_health_status(), app requirements, infrastructure) - Add README_DEPLOY.md with healthcheck section, link to requirements and HEALTHCHECK_* env vars Co-authored-by: Cursor --- README_DEPLOY.md | 29 ++++++++++ docs/HEALTHCHECK_REQUIREMENTS.md | 95 ++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 README_DEPLOY.md create mode 100644 docs/HEALTHCHECK_REQUIREMENTS.md diff --git a/README_DEPLOY.md b/README_DEPLOY.md new file mode 100644 index 0000000..21bcad4 --- /dev/null +++ b/README_DEPLOY.md @@ -0,0 +1,29 @@ +# Деплой / Deploy + +Краткое описание процесса деплоя приложений на базе config_manager и используемых скриптов. + +A short description of the deploy process for applications based on config_manager and the scripts used. + +--- + +## Healthcheck + +После поднятия контейнеров (`docker compose up -d`) скрипт деплоя может ожидать успешного ответа по URL проверки здоровья приложения. При таймауте выполняется откат и выход с ошибкой. + +After bringing up containers (`docker compose up -d`), the deploy script may wait for a successful response at the application health-check URL. On timeout, rollback is performed and the script exits with an error. + +**Полная спецификация:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md). + +**Full specification:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md). + +### Переменные окружения / Environment variables + +| Переменная | Назначение | Пример/дефолт | +| ----------------------- | ---------------------------------------- | ------------------------------- | +| `HEALTHCHECK_URL` | URL для проверки здоровья | `http://127.0.0.1:8000/health` | +| `HEALTHCHECK_TIMEOUT` | Максимальное время ожидания (секунды) | `120` | +| `HEALTHCHECK_INTERVAL` | Интервал между попытками (секунды) | `5` | + +Если задана `HEALTHCHECK_URL`, деплой после поднятия контейнеров вызывает этот URL (например, через `curl -fsS --max-time 5`); успех — HTTP 2xx, иначе повтор до истечения `HEALTHCHECK_TIMEOUT`. + +If `HEALTHCHECK_URL` is set, deploy calls this URL after bringing up containers (e.g. via `curl -fsS --max-time 5`); success is HTTP 2xx, otherwise retries until `HEALTHCHECK_TIMEOUT` expires. diff --git a/docs/HEALTHCHECK_REQUIREMENTS.md b/docs/HEALTHCHECK_REQUIREMENTS.md new file mode 100644 index 0000000..7cf796b --- /dev/null +++ b/docs/HEALTHCHECK_REQUIREMENTS.md @@ -0,0 +1,95 @@ +# Требования к healthcheck / Healthcheck Requirements + +Единая спецификация для реализации healthcheck в config_manager и в приложениях (в т.ч. MailOrderBot). + +A unified specification for implementing healthcheck in config_manager and in applications (including MailOrderBot). + +--- + +## 1. Назначение / Purpose + +- Healthcheck используется скриптом деплоя (`deploy.sh`): после `docker compose up -d` деплой ждёт успешного ответа по `HEALTHCHECK_URL`; при таймауте — откат и выход с ошибкой. +- The healthcheck is used by the deploy script (`deploy.sh`): after `docker compose up -d`, the deploy waits for a successful response at `HEALTHCHECK_URL`; on timeout — rollback and exit with error. + +- Эндпоинт должен отражать **реальное состояние приложения**, а не только факт работы HTTP-сервера (иначе деплой может считать успешным запуск «зависшего» или упавшего воркера). +- The endpoint must reflect the **actual state of the application**, not just that the HTTP server is running (otherwise deploy may consider successful a «hung» or crashed worker). + +--- + +## 2. Поведение deploy.sh / deploy.sh behaviour + +(Логика уже может быть реализована в коде deploy-скрипта.) + +- Если задана переменная `HEALTHCHECK_URL` — после поднятия контейнеров вызывается `wait_for_healthcheck`: цикл с интервалом `HEALTHCHECK_INTERVAL` (по умолчанию 5 с), пока не истечёт `HEALTHCHECK_TIMEOUT` (по умолчанию 120 с). +- If `HEALTHCHECK_URL` is set — after bringing up containers, `wait_for_healthcheck` is called: a loop with interval `HEALTHCHECK_INTERVAL` (default 5 s) until `HEALTHCHECK_TIMEOUT` (default 120 s) expires. + +- Проверка: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Флаг `-f`: любой HTTP-код 4xx/5xx считается ошибкой (повтор до таймаута). +- Check: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Flag `-f`: any HTTP 4xx/5xx is treated as failure (retry until timeout). + +- **Успех:** HTTP 2xx. **Неуспех:** не 2xx или таймаут curl/соединения → деплой падает с откатом. +- **Success:** HTTP 2xx. **Failure:** non-2xx or curl/connection timeout → deploy fails with rollback. + +### Переменные окружения / Environment variables + +| Переменная / Variable | Назначение / Purpose | Пример/дефолт / Example/default | +| ------------------------- | --------------------------------- | ----------------------------------- | +| `HEALTHCHECK_URL` | URL для проверки / Check URL | `http://127.0.0.1:8000/health` | +| `HEALTHCHECK_TIMEOUT` | Макс. время ожидания (сек) / Max wait (s) | `120` | +| `HEALTHCHECK_INTERVAL` | Интервал между попытками (сек) / Interval between attempts (s) | `5` | + +--- + +## 3. Контракт эндпоинта / Endpoint contract + +- **Метод и путь / Method and path:** `GET /health` (или иной путь по соглашению; один для всех приложений на config_manager). +- **Method and path:** `GET /health` (or another agreed path; one for all applications using config_manager). + +- **Успех (приложение в порядке) / Success (application healthy):** HTTP **200**, опционально тело JSON: `{"status": "ok"}`. +- **Success (application healthy):** HTTP **200**, optional JSON body: `{"status": "ok"}`. + +- **Приложение не в порядке / Application not healthy:** HTTP **503**, опционально тело: `{"status": "unhealthy"|"degraded", "detail": "причина"}`. +- **Application not healthy:** HTTP **503**, optional body: `{"status": "unhealthy"|"degraded", "detail": "reason"}`. + +- Ответ должен приходить в разумное время (рекомендуемый таймаут вызова логики проверки в config_manager — 2–5 с), иначе deploy через `curl --max-time 5` получит таймаут и будет повторять запросы. +- The response must arrive within a reasonable time (recommended timeout for the check logic in config_manager — 2–5 s); otherwise deploy will get a timeout via `curl --max-time 5` and will retry. + +--- + +## 4. Обратная связь от приложения (config_manager) / Application feedback (config_manager) + +- Эндпоинт реализуется в **config_manager** (опционально, при включённой опции). +- The endpoint is implemented in **config_manager** (optional, when the option is enabled). + +- При обработке `GET /health` config_manager **не решает сам** «здорово ли приложение», а вызывает метод приложения, например: `app.get_health_status()`. +- When handling `GET /health`, config_manager does **not** decide by itself whether the application is healthy; it calls the application method, e.g. `app.get_health_status()`. + +- **Контракт метода / Method contract** (приложение переопределяет в наследнике): + - **Method contract** (application overrides in subclass): + - Возвращает `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (поле `detail` опционально). + - Returns `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (`detail` optional). + - При исключении или превышении таймаута вызова — считать состояние unhealthy и отдавать 503. + - On exception or call timeout — treat as unhealthy and return 503. + +Таким образом, основное приложение «даёт обратную связь» через реализацию `get_health_status()` (флаги после сбоев, проверка БД, heartbeat и т.д.). + +Thus, the main application provides feedback via `get_health_status()` (flags after failures, DB check, heartbeat, etc.). + +--- + +## 5. Требования к приложению (например, MailOrderBot) / Application requirements (e.g. MailOrderBot) + +- Переопределить `get_health_status()` и возвращать: +- Override `get_health_status()` and return: + - `{"status": "ok"}` при нормальной работе; + - `{"status": "ok"}` when running normally; + - `{"status": "unhealthy", "detail": "..."}` при критичном сбое (например, последний `execute()` упал, зависимость недоступна); + - `{"status": "unhealthy", "detail": "..."}` on critical failure (e.g. last `execute()` failed, dependency unavailable); + - при желании — `{"status": "degraded", "detail": "..."}` для нефатальной деградации (в обоих случаях эндпоинт отдаёт 503 для совместимости с `curl -f`). + - optionally — `{"status": "degraded", "detail": "..."}` for non-fatal degradation (in both cases the endpoint returns 503 for compatibility with `curl -f`). + +--- + +## 6. Инфраструктура / Infrastructure + +- Для работы healthcheck приложение должно поднимать HTTP-сервер (в config_manager при включённой опции) и пробрасывать порт в `docker-compose` (например `8000:8000`), чтобы `deploy.sh` на хосте мог обращаться по `HEALTHCHECK_URL` (например `http://127.0.0.1:8000/health`). +- For healthcheck to work, the application must run an HTTP server (in config_manager when the option is enabled) and expose the port in `docker-compose` (e.g. `8000:8000`), so that `deploy.sh` on the host can call `HEALTHCHECK_URL` (e.g. `http://127.0.0.1:8000/health`). From 7eb3476b96532ab260bce87bac65155a1a9e69ab Mon Sep 17 00:00:00 2001 From: zosimovaa Date: Thu, 19 Feb 2026 22:45:06 +0300 Subject: [PATCH 2/2] ConfigManager V2 --- src/config_manager/__init__.py | 5 +- src/config_manager/{ => v1}/cfg_manager.py | 0 src/config_manager/{ => v1}/log_manager.py | 0 src/config_manager/v2/__init__.py | 4 + src/config_manager/v2/config_loader.py | 52 +++++ src/config_manager/v2/control/__init__.py | 4 + src/config_manager/v2/control/base.py | 21 ++ src/config_manager/v2/control/telegram.py | 111 +++++++++ src/config_manager/v2/health.py | 80 +++++++ src/config_manager/v2/manager.py | 256 +++++++++++++++++++++ src/config_manager/v2/scheduler.py | 39 ++++ src/config_manager/v2/types.py | 30 +++ tests/v2/test_config_reload_fallback.py | 33 +++ tests/v2/test_contract_v2.py | 30 +++ tests/v2/test_control_channel.py | 54 +++++ tests/v2/test_health_endpoint.py | 41 ++++ tests/v2/test_stop_graceful.py | 43 ++++ 17 files changed, 801 insertions(+), 2 deletions(-) rename src/config_manager/{ => v1}/cfg_manager.py (100%) rename src/config_manager/{ => v1}/log_manager.py (100%) create mode 100644 src/config_manager/v2/__init__.py create mode 100644 src/config_manager/v2/config_loader.py create mode 100644 src/config_manager/v2/control/__init__.py create mode 100644 src/config_manager/v2/control/base.py create mode 100644 src/config_manager/v2/control/telegram.py create mode 100644 src/config_manager/v2/health.py create mode 100644 src/config_manager/v2/manager.py create mode 100644 src/config_manager/v2/scheduler.py create mode 100644 src/config_manager/v2/types.py create mode 100644 tests/v2/test_config_reload_fallback.py create mode 100644 tests/v2/test_contract_v2.py create mode 100644 tests/v2/test_control_channel.py create mode 100644 tests/v2/test_health_endpoint.py create mode 100644 tests/v2/test_stop_graceful.py diff --git a/src/config_manager/__init__.py b/src/config_manager/__init__.py index d09f224..99a4596 100644 --- a/src/config_manager/__init__.py +++ b/src/config_manager/__init__.py @@ -1,2 +1,3 @@ -from .cfg_manager import ConfigManager -from .log_manager import LogManager \ No newline at end of file +from .v2.manager import ConfigManagerV2 as ConfigManager +from .v1.cfg_manager import ConfigManager as LegacyConfigManager +from .v1.log_manager import LogManager \ No newline at end of file diff --git a/src/config_manager/cfg_manager.py b/src/config_manager/v1/cfg_manager.py similarity index 100% rename from src/config_manager/cfg_manager.py rename to src/config_manager/v1/cfg_manager.py diff --git a/src/config_manager/log_manager.py b/src/config_manager/v1/log_manager.py similarity index 100% rename from src/config_manager/log_manager.py rename to src/config_manager/v1/log_manager.py diff --git a/src/config_manager/v2/__init__.py b/src/config_manager/v2/__init__.py new file mode 100644 index 0000000..add8ff7 --- /dev/null +++ b/src/config_manager/v2/__init__.py @@ -0,0 +1,4 @@ +from .manager import ConfigManagerV2 +from .types import HealthServerSettings + +__all__ = ["ConfigManagerV2", "HealthServerSettings"] diff --git a/src/config_manager/v2/config_loader.py b/src/config_manager/v2/config_loader.py new file mode 100644 index 0000000..2800b0b --- /dev/null +++ b/src/config_manager/v2/config_loader.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import asyncio +import hashlib +import json +import os +from typing import Any, Optional + +import yaml + + +class ConfigLoader: + def __init__(self, path: str): + """Initialize loader state for a specific config file path.""" + self.path = path + self.config: Any = None + self.last_valid_config: Any = None + self._last_seen_hash: Optional[str] = None + + def _read_file_sync(self) -> str: + """Read raw config text from disk synchronously.""" + with open(self.path, "r", encoding="utf-8") as fh: + return fh.read() + + async def read_file_async(self) -> str: + """Read raw config text from disk in a worker thread.""" + return await asyncio.to_thread(self._read_file_sync) + + def parse_config(self, data: str) -> Any: + """Parse config text as YAML or JSON based on file extension.""" + extension = os.path.splitext(self.path)[1].lower() + if extension in (".yaml", ".yml"): + return yaml.safe_load(data) + return json.loads(data) + + @staticmethod + def _calculate_hash(data: str) -> str: + """Calculate a stable content hash for change detection.""" + return hashlib.sha256(data.encode("utf-8")).hexdigest() + + async def load_if_changed(self) -> tuple[bool, Any]: + """Load and parse config only when file content changed.""" + raw_data = await self.read_file_async() + current_hash = self._calculate_hash(raw_data) + if current_hash == self._last_seen_hash: + return False, self.config + + self._last_seen_hash = current_hash + parsed = self.parse_config(raw_data) + self.config = parsed + self.last_valid_config = parsed + return True, parsed diff --git a/src/config_manager/v2/control/__init__.py b/src/config_manager/v2/control/__init__.py new file mode 100644 index 0000000..084e95f --- /dev/null +++ b/src/config_manager/v2/control/__init__.py @@ -0,0 +1,4 @@ +from .base import ControlChannel +from .telegram import TelegramControlChannel + +__all__ = ["ControlChannel", "TelegramControlChannel"] diff --git a/src/config_manager/v2/control/base.py b/src/config_manager/v2/control/base.py new file mode 100644 index 0000000..37b4f87 --- /dev/null +++ b/src/config_manager/v2/control/base.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Awaitable, Callable + + +StartHandler = Callable[[], Awaitable[str]] +StopHandler = Callable[[], Awaitable[str]] +StatusHandler = Callable[[], Awaitable[str]] + + +class ControlChannel(ABC): + @abstractmethod + async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None: + """Start channel and bind command handlers.""" + raise NotImplementedError + + @abstractmethod + async def stop(self) -> None: + """Stop channel and release its resources.""" + raise NotImplementedError diff --git a/src/config_manager/v2/control/telegram.py b/src/config_manager/v2/control/telegram.py new file mode 100644 index 0000000..a47c0d5 --- /dev/null +++ b/src/config_manager/v2/control/telegram.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import asyncio +import json +import logging +import urllib.parse +import urllib.request +from typing import Optional + +from .base import ControlChannel, StartHandler, StatusHandler, StopHandler + + +class TelegramControlChannel(ControlChannel): + def __init__( + self, + token: str, + chat_id: int, + poll_interval: float = 2.0, + logger: Optional[logging.Logger] = None, + ): + """Initialize Telegram polling channel with bot and chat settings.""" + self._token = token + self._chat_id = chat_id + self._poll_interval = poll_interval + self._offset: Optional[int] = None + self._task: Optional[asyncio.Task] = None + self._stop_event = asyncio.Event() + self._on_start: Optional[StartHandler] = None + self._on_stop: Optional[StopHandler] = None + self._on_status: Optional[StatusHandler] = None + self._logger = logger or logging.getLogger(__name__) + + async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None: + """Start polling Telegram updates and register command callbacks.""" + if self._task is not None and not self._task.done(): + return + self._on_start = on_start + self._on_stop = on_stop + self._on_status = on_status + self._stop_event.clear() + self._task = asyncio.create_task(self._poll_loop()) + + async def stop(self) -> None: + """Stop polling loop and wait until task termination.""" + self._stop_event.set() + if self._task is not None: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + + async def _poll_loop(self) -> None: + """Continuously fetch updates and dispatch supported commands.""" + while not self._stop_event.is_set(): + try: + updates = await asyncio.to_thread(self._fetch_updates) + for update in updates: + await self._process_update(update) + except Exception as exc: # noqa: BLE001 + self._logger.warning("Telegram polling error: %s", exc) + + try: + await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1)) + except asyncio.TimeoutError: + continue + + def _fetch_updates(self) -> list[dict]: + """Pull new Telegram updates using the latest offset.""" + params = {"timeout": 0} + if self._offset is not None: + params["offset"] = self._offset + query = urllib.parse.urlencode(params) + url = f"https://api.telegram.org/bot{self._token}/getUpdates?{query}" + with urllib.request.urlopen(url, timeout=10) as response: + payload = json.loads(response.read().decode("utf-8")) + + result = payload.get("result", []) + if result: + self._offset = max(item["update_id"] for item in result) + 1 + return result + + async def _process_update(self, update: dict) -> None: + """Handle one Telegram update and execute mapped command.""" + message = update.get("message") or {} + text = (message.get("text") or "").strip().lower() + chat = message.get("chat") or {} + chat_id = chat.get("id") + + if chat_id != self._chat_id: + return + + if text in {"/start", "/run"} and self._on_start is not None: + reply = await self._on_start() + elif text in {"/stop", "/halt"} and self._on_stop is not None: + reply = await self._on_stop() + elif text in {"/status", "/health"} and self._on_status is not None: + reply = await self._on_status() + else: + return + + await asyncio.to_thread(self._send_message, reply) + + def _send_message(self, text: str) -> None: + """Send plain-text reply to the configured Telegram chat.""" + encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text}) + url = f"https://api.telegram.org/bot{self._token}/sendMessage" + req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST") + with urllib.request.urlopen(req, timeout=10): + return diff --git a/src/config_manager/v2/health.py b/src/config_manager/v2/health.py new file mode 100644 index 0000000..bb58241 --- /dev/null +++ b/src/config_manager/v2/health.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import asyncio +import json +from collections.abc import Awaitable, Callable +from typing import Optional + +from .types import HealthPayload + + +class HealthServer: + def __init__( + self, + host: str, + port: int, + path: str, + timeout: float, + health_provider: Callable[[], Awaitable[HealthPayload]], + ): + """Configure lightweight HTTP health server parameters and callback.""" + self._host = host + self._port = port + self._path = path + self._timeout = timeout + self._health_provider = health_provider + self._server: Optional[asyncio.base_events.Server] = None + + async def start(self) -> None: + """Start listening for healthcheck requests if not running.""" + if self._server is not None: + return + self._server = await asyncio.start_server(self._handle_connection, self._host, self._port) + + async def stop(self) -> None: + """Stop the health server and release the listening socket.""" + if self._server is None: + return + self._server.close() + await self._server.wait_closed() + self._server = None + + async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None: + """Process one HTTP connection and return JSON health payload.""" + status_code = 404 + payload: HealthPayload = {"status": "unhealthy", "detail": "not found"} + + try: + request_line = await reader.readline() + parts = request_line.decode("utf-8", errors="ignore").strip().split(" ") + if len(parts) >= 2: + method, path = parts[0], parts[1] + if method == "GET" and path == self._path: + status_code, payload = await self._build_health_response() + except Exception: # noqa: BLE001 + status_code = 500 + payload = {"status": "unhealthy", "detail": "internal error"} + + body = json.dumps(payload).encode("utf-8") + phrase = "OK" if status_code == 200 else "SERVICE UNAVAILABLE" if status_code == 503 else "NOT FOUND" + response = ( + f"HTTP/1.1 {status_code} {phrase}\r\n" + "Content-Type: application/json\r\n" + f"Content-Length: {len(body)}\r\n" + "Connection: close\r\n" + "\r\n" + ).encode("utf-8") + body + + writer.write(response) + await writer.drain() + writer.close() + await writer.wait_closed() + + async def _build_health_response(self) -> tuple[int, HealthPayload]: + """Build HTTP status code and body from application health callback.""" + try: + payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout) + status = payload.get("status", "unhealthy") + return (200, payload) if status == "ok" else (503, payload) + except Exception as exc: # noqa: BLE001 + return 503, {"status": "unhealthy", "detail": str(exc)} diff --git a/src/config_manager/v2/manager.py b/src/config_manager/v2/manager.py new file mode 100644 index 0000000..dbdb4a3 --- /dev/null +++ b/src/config_manager/v2/manager.py @@ -0,0 +1,256 @@ +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Awaitable +from typing import Any, Optional + +from ..v1.log_manager import LogManager +from .config_loader import ConfigLoader +from .control.base import ControlChannel +from .health import HealthServer +from .scheduler import WorkerLoop +from .types import HealthPayload, HealthServerSettings, LifecycleState + + +class ConfigManagerV2: + DEFAULT_UPDATE_INTERVAL = 5.0 + DEFAULT_WORK_INTERVAL = 2.0 + + def __init__( + self, + path: str, + log_manager: Optional[LogManager] = None, + health_settings: Optional[HealthServerSettings] = None, + control_channel: Optional[ControlChannel] = None, + ): + """Initialize manager subsystems and runtime state.""" + self.path = path + self.config: Any = None + self.update_interval = self.DEFAULT_UPDATE_INTERVAL + self.work_interval = self.DEFAULT_WORK_INTERVAL + + self._loader = ConfigLoader(path) + self._log_manager = log_manager or LogManager() + self._control_channel = control_channel + + self._halt = asyncio.Event() + self._task: Optional[asyncio.Task] = None + self._loop: Optional[asyncio.AbstractEventLoop] = None + + self._state = LifecycleState.IDLE + self._last_execute_error: Optional[str] = None + + self._health_settings = health_settings or HealthServerSettings(enabled=False) + self._health_server: Optional[HealthServer] = None + if self._health_settings.enabled: + self._health_server = HealthServer( + host=self._health_settings.host, + port=self._health_settings.port, + path=self._health_settings.path, + timeout=self._health_settings.timeout, + health_provider=self._collect_health, + ) + + self.logger = logging.getLogger(__name__) + + def _read_file_sync(self) -> str: + """Read config file synchronously via the shared loader.""" + return self._loader._read_file_sync() + + async def _read_file_async(self) -> str: + """Read config file asynchronously via a worker thread.""" + return await self._loader.read_file_async() + + def _parse_config(self, data: str) -> Any: + """Parse raw config text to a Python object.""" + return self._loader.parse_config(data) + + def _update_intervals_from_config(self) -> None: + """Refresh work and update intervals from current config.""" + if not isinstance(self.config, dict): + return + + upd = self.config.get("update_interval") + wrk = self.config.get("work_interval") + + if isinstance(upd, (int, float)) and upd > 0: + self.update_interval = float(upd) + else: + self.update_interval = self.DEFAULT_UPDATE_INTERVAL + + if isinstance(wrk, (int, float)) and wrk > 0: + self.work_interval = float(wrk) + else: + self.work_interval = self.DEFAULT_WORK_INTERVAL + + async def _update_config(self) -> None: + """Reload config and apply new settings when content changes.""" + try: + changed, new_config = await self._loader.load_if_changed() + if not changed: + return + + self.config = new_config + self._update_intervals_from_config() + if isinstance(new_config, dict): + self._log_manager.apply_config(new_config) + except Exception as exc: # noqa: BLE001 + # Keep current config untouched and continue with last valid settings. + self.logger.error("Error reading/parsing config file: %s", exc) + if self._loader.last_valid_config is not None: + self.config = self._loader.last_valid_config + self._update_intervals_from_config() + + def execute(self) -> None: + """Override in subclass to implement one unit of blocking work.""" + + def get_health_status(self) -> HealthPayload: + """Return application-specific health payload for /health.""" + return {"status": "ok"} + + async def _collect_health(self) -> HealthPayload: + """Aggregate lifecycle and app status into one health result.""" + if self._state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}: + return {"status": "unhealthy", "detail": f"state={self._state.value}"} + + if self._last_execute_error is not None: + return {"status": "unhealthy", "detail": self._last_execute_error} + + result = self.get_health_status() + status = result.get("status", "unhealthy") + if status not in {"ok", "degraded", "unhealthy"}: + return {"status": "unhealthy", "detail": "invalid health status"} + return result + + def _on_execute_success(self) -> None: + """Clear the last execution error marker after successful run.""" + self._last_execute_error = None + + def _on_execute_error(self, exc: Exception) -> None: + """Store and log execution failure details for health reporting.""" + self._last_execute_error = str(exc) + self.logger.error("Execution error: %s", exc) + + async def _worker_loop(self) -> None: + """Run execute() repeatedly until shutdown is requested.""" + worker = WorkerLoop( + execute=self.execute, + get_interval=lambda: self.work_interval, + halt_event=self._halt, + on_error=self._on_execute_error, + on_success=self._on_execute_success, + ) + await worker.run() + + async def _periodic_update_loop(self) -> None: + """Periodically check config file for updates until shutdown.""" + while not self._halt.is_set(): + await self._update_config() + try: + await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05)) + except asyncio.TimeoutError: + continue + + async def _status_text(self) -> str: + """Build human-readable runtime status for control channels.""" + health = await self._collect_health() + detail = health.get("detail") + if detail: + return f"state={self._state.value}; health={health['status']}; detail={detail}" + return f"state={self._state.value}; health={health['status']}" + + async def _control_start(self) -> str: + """Handle external start command for control channels.""" + if self._state == LifecycleState.RUNNING: + return "already running" + self._halt.clear() + return "start signal accepted" + + async def _control_stop(self) -> str: + """Handle external stop command for control channels.""" + self._halt.set() + return "stop signal accepted" + + async def _control_status(self) -> str: + """Handle external status command for control channels.""" + return await self._status_text() + + async def _start_control_channel(self) -> None: + """Start configured control channel with bound command handlers.""" + if self._control_channel is None: + return + await self._control_channel.start(self._control_start, self._control_stop, self._control_status) + + async def _stop_control_channel(self) -> None: + """Stop configured control channel if it is active.""" + if self._control_channel is None: + return + await self._control_channel.stop() + + async def _run(self) -> None: + """Run manager lifecycle and coordinate all background tasks.""" + self._state = LifecycleState.STARTING + self._halt.clear() + await self._update_config() + + if self._health_server is not None: + await self._health_server.start() + await self._start_control_channel() + + self._state = LifecycleState.RUNNING + self.logger.info("ConfigManagerV2 started") + + tasks = [ + asyncio.create_task(self._worker_loop(), name="v2-worker-loop"), + asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"), + ] + + try: + await asyncio.gather(*tasks) + except asyncio.CancelledError: + raise + finally: + self._state = LifecycleState.STOPPING + self._halt.set() + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + await self._stop_control_channel() + if self._health_server is not None: + await self._health_server.stop() + self._state = LifecycleState.STOPPED + self.logger.info("ConfigManagerV2 stopped") + + async def start(self) -> None: + """Start manager lifecycle from an active asyncio context.""" + if self._task is not None and not self._task.done(): + self.logger.warning("ConfigManagerV2 is already running") + return + + try: + self._loop = asyncio.get_running_loop() + except RuntimeError: + self.logger.error("start() must be called from within an async context") + raise + + self._task = asyncio.create_task(self._run(), name="config-manager-v2") + try: + await self._task + finally: + self._task = None + + async def stop(self) -> None: + """Request graceful shutdown and wait for manager completion.""" + if self._task is None: + self.logger.warning("ConfigManagerV2 is not running") + return + + self._halt.set() + if asyncio.current_task() is self._task: + return + + try: + await self._task + except asyncio.CancelledError: + pass diff --git a/src/config_manager/v2/scheduler.py b/src/config_manager/v2/scheduler.py new file mode 100644 index 0000000..21d0390 --- /dev/null +++ b/src/config_manager/v2/scheduler.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import asyncio +from collections.abc import Callable +from typing import Optional + + +class WorkerLoop: + def __init__( + self, + execute: Callable[[], None], + get_interval: Callable[[], float], + halt_event: asyncio.Event, + on_error: Optional[Callable[[Exception], None]] = None, + on_success: Optional[Callable[[], None]] = None, + ): + """Store callbacks and synchronization primitives for worker execution.""" + self._execute = execute + self._get_interval = get_interval + self._halt_event = halt_event + self._on_error = on_error + self._on_success = on_success + + async def run(self) -> None: + """Run execute repeatedly until halt is requested.""" + while not self._halt_event.is_set(): + try: + await asyncio.to_thread(self._execute) + if self._on_success is not None: + self._on_success() + except Exception as exc: # noqa: BLE001 + if self._on_error is not None: + self._on_error(exc) + + timeout = max(self._get_interval(), 0.01) + try: + await asyncio.wait_for(self._halt_event.wait(), timeout=timeout) + except asyncio.TimeoutError: + continue diff --git a/src/config_manager/v2/types.py b/src/config_manager/v2/types.py new file mode 100644 index 0000000..9cb2f0e --- /dev/null +++ b/src/config_manager/v2/types.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from typing import Literal, TypedDict + + +HealthState = Literal["ok", "degraded", "unhealthy"] + + +class HealthPayload(TypedDict, total=False): + status: HealthState + detail: str + + +class LifecycleState(str, Enum): + IDLE = "idle" + STARTING = "starting" + RUNNING = "running" + STOPPING = "stopping" + STOPPED = "stopped" + + +@dataclass +class HealthServerSettings: + enabled: bool = False + host: str = "0.0.0.0" + port: int = 8000 + path: str = "/health" + timeout: float = 3.0 diff --git a/tests/v2/test_config_reload_fallback.py b/tests/v2/test_config_reload_fallback.py new file mode 100644 index 0000000..3295eb3 --- /dev/null +++ b/tests/v2/test_config_reload_fallback.py @@ -0,0 +1,33 @@ +import asyncio + +from config_manager.v2 import ConfigManagerV2 + + +class ReloadApp(ConfigManagerV2): + def execute(self) -> None: + return + + +def test_invalid_config_keeps_last_valid(tmp_path): + async def scenario() -> None: + cfg = tmp_path / "config.yaml" + cfg.write_text("work_interval: 0.2\nupdate_interval: 0.05\n", encoding="utf-8") + + app = ReloadApp(str(cfg)) + runner = asyncio.create_task(app.start()) + + await asyncio.sleep(0.12) + assert app.work_interval == 0.2 + assert app.update_interval == 0.05 + + cfg.write_text("work_interval: [broken\n", encoding="utf-8") + await asyncio.sleep(0.15) + + assert app.work_interval == 0.2 + assert app.update_interval == 0.05 + assert isinstance(app.config, dict) + + await app.stop() + await runner + + asyncio.run(scenario()) diff --git a/tests/v2/test_contract_v2.py b/tests/v2/test_contract_v2.py new file mode 100644 index 0000000..e190fd1 --- /dev/null +++ b/tests/v2/test_contract_v2.py @@ -0,0 +1,30 @@ +import asyncio + +from config_manager.v2 import ConfigManagerV2 + + +class DemoApp(ConfigManagerV2): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.calls = 0 + + def execute(self) -> None: + self.calls += 1 + + +def test_execute_loop_runs(tmp_path): + async def scenario() -> None: + cfg = tmp_path / "config.yaml" + cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8") + + app = DemoApp(str(cfg)) + runner = asyncio.create_task(app.start()) + + await asyncio.sleep(0.18) + await app.stop() + await runner + + assert app.calls >= 2 + assert isinstance(app.config, dict) + + asyncio.run(scenario()) diff --git a/tests/v2/test_control_channel.py b/tests/v2/test_control_channel.py new file mode 100644 index 0000000..8a0e95b --- /dev/null +++ b/tests/v2/test_control_channel.py @@ -0,0 +1,54 @@ +import asyncio + +from config_manager.v2 import ConfigManagerV2 +from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler + + +class DummyControlChannel(ControlChannel): + def __init__(self): + self.on_start: StartHandler | None = None + self.on_stop: StopHandler | None = None + self.on_status: StatusHandler | None = None + self.started = False + self.stopped = False + + async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None: + self.on_start = on_start + self.on_stop = on_stop + self.on_status = on_status + self.started = True + + async def stop(self) -> None: + self.stopped = True + + +class ControlledApp(ConfigManagerV2): + def execute(self) -> None: + return + + +def test_control_channel_can_stop_manager(tmp_path): + async def scenario() -> None: + cfg = tmp_path / "config.yaml" + cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8") + + channel = DummyControlChannel() + app = ControlledApp(str(cfg), control_channel=channel) + + runner = asyncio.create_task(app.start()) + await asyncio.sleep(0.12) + + assert channel.started is True + assert channel.on_status is not None + assert channel.on_stop is not None + + status_text = await channel.on_status() + assert "state=running" in status_text + + stop_text = await channel.on_stop() + assert "stop signal accepted" in stop_text + + await runner + assert channel.stopped is True + + asyncio.run(scenario()) diff --git a/tests/v2/test_health_endpoint.py b/tests/v2/test_health_endpoint.py new file mode 100644 index 0000000..0a3679e --- /dev/null +++ b/tests/v2/test_health_endpoint.py @@ -0,0 +1,41 @@ +import asyncio + +from config_manager.v2.health import HealthServer + + +def test_health_mapping_ok_to_200(): + async def provider(): + return {"status": "ok"} + + async def scenario() -> None: + server = HealthServer( + host="127.0.0.1", + port=8000, + path="/health", + timeout=0.2, + health_provider=provider, + ) + code, payload = await server._build_health_response() + assert code == 200 + assert payload["status"] == "ok" + + asyncio.run(scenario()) + + +def test_health_mapping_unhealthy_to_503(): + async def provider(): + return {"status": "unhealthy", "detail": "worker failed"} + + async def scenario() -> None: + server = HealthServer( + host="127.0.0.1", + port=8000, + path="/health", + timeout=0.2, + health_provider=provider, + ) + code, payload = await server._build_health_response() + assert code == 503 + assert payload["status"] == "unhealthy" + + asyncio.run(scenario()) diff --git a/tests/v2/test_stop_graceful.py b/tests/v2/test_stop_graceful.py new file mode 100644 index 0000000..0b60fe7 --- /dev/null +++ b/tests/v2/test_stop_graceful.py @@ -0,0 +1,43 @@ +import asyncio +import threading +import time + +from config_manager.v2 import ConfigManagerV2 + + +class BlockingApp(ConfigManagerV2): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.started_event = threading.Event() + self.active_event = threading.Event() + self.calls = 0 + + def execute(self) -> None: + self.calls += 1 + self.active_event.set() + self.started_event.set() + time.sleep(0.2) + self.active_event.clear() + + +def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path): + async def scenario() -> None: + cfg = tmp_path / "config.yaml" + cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8") + + app = BlockingApp(str(cfg)) + runner = asyncio.create_task(app.start()) + + started = await asyncio.to_thread(app.started_event.wait, 1.0) + assert started is True + + await app.stop() + await runner + + assert app.active_event.is_set() is False + assert app.calls == 1 + + await asyncio.sleep(0.15) + assert app.calls == 1 + + asyncio.run(scenario())