16 Commits

Author SHA1 Message Date
80dd69c5ec Version update 2026-02-19 22:50:51 +03:00
7eb3476b96 ConfigManager V2 2026-02-19 22:45:06 +03:00
da8ed4fa2b docs: add healthcheck requirements and README_DEPLOY
- Add docs/HEALTHCHECK_REQUIREMENTS.md with full spec (purpose, deploy.sh
  behaviour, endpoint contract, get_health_status(), app requirements,
  infrastructure)
- Add README_DEPLOY.md with healthcheck section, link to requirements
  and HEALTHCHECK_* env vars

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-18 23:11:53 +03:00
8f22fcf6af method start no now blocks main cycle 2025-11-01 21:35:25 +03:00
311870fd73 Changed the algorithm of the start method 2025-11-01 21:00:14 +03:00
ffd758d9a4 ready for release 2025-11-01 08:31:57 +03:00
526661e498 no message 2025-10-31 23:18:55 +03:00
b0c87a427c Merge branch 'develop' 2025-10-31 23:16:35 +03:00
7b74e0b0b8 no message 2025-10-31 23:16:25 +03:00
5faea8f69f Merge branch 'develop' 2025-10-31 18:54:18 +03:00
f491c65455 Обновил структуру и отладил, все работает 2025-10-31 18:48:52 +03:00
4eb9327628 Незначительные правки 2025-10-30 13:00:53 +03:00
e71685aad9 increment version 2025-10-30 08:12:10 +03:00
98867d69a7 Update config manager to call it with less code 2025-10-30 08:08:12 +03:00
68f4b26f00 increment version 2025-10-30 08:02:44 +03:00
3b8e28e077 config example was added 2025-10-30 07:59:35 +03:00
27 changed files with 1116 additions and 215 deletions

6
.gitignore vendored
View File

@@ -1,3 +1,5 @@
src/config_manager/__pycache__/basic_application.cpython-312.pyc __pycache__
venv/ .venv/
.vscode/ .vscode/
log*.log
config_manager.egg-info

29
README_DEPLOY.md Normal file
View File

@@ -0,0 +1,29 @@
# Деплой / Deploy
Краткое описание процесса деплоя приложений на базе config_manager и используемых скриптов.
A short description of the deploy process for applications based on config_manager and the scripts used.
---
## Healthcheck
После поднятия контейнеров (`docker compose up -d`) скрипт деплоя может ожидать успешного ответа по URL проверки здоровья приложения. При таймауте выполняется откат и выход с ошибкой.
After bringing up containers (`docker compose up -d`), the deploy script may wait for a successful response at the application health-check URL. On timeout, rollback is performed and the script exits with an error.
**Полная спецификация:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md).
**Full specification:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md).
### Переменные окружения / Environment variables
| Переменная | Назначение | Пример/дефолт |
| ----------------------- | ---------------------------------------- | ------------------------------- |
| `HEALTHCHECK_URL` | URL для проверки здоровья | `http://127.0.0.1:8000/health` |
| `HEALTHCHECK_TIMEOUT` | Максимальное время ожидания (секунды) | `120` |
| `HEALTHCHECK_INTERVAL` | Интервал между попытками (секунды) | `5` |
Если задана `HEALTHCHECK_URL`, деплой после поднятия контейнеров вызывает этот URL (например, через `curl -fsS --max-time 5`); успех — HTTP 2xx, иначе повтор до истечения `HEALTHCHECK_TIMEOUT`.
If `HEALTHCHECK_URL` is set, deploy calls this URL after bringing up containers (e.g. via `curl -fsS --max-time 5`); success is HTTP 2xx, otherwise retries until `HEALTHCHECK_TIMEOUT` expires.

View File

@@ -0,0 +1,95 @@
# Требования к healthcheck / Healthcheck Requirements
Единая спецификация для реализации healthcheck в config_manager и в приложениях (в т.ч. MailOrderBot).
A unified specification for implementing healthcheck in config_manager and in applications (including MailOrderBot).
---
## 1. Назначение / Purpose
- Healthcheck используется скриптом деплоя (`deploy.sh`): после `docker compose up -d` деплой ждёт успешного ответа по `HEALTHCHECK_URL`; при таймауте — откат и выход с ошибкой.
- The healthcheck is used by the deploy script (`deploy.sh`): after `docker compose up -d`, the deploy waits for a successful response at `HEALTHCHECK_URL`; on timeout — rollback and exit with error.
- Эндпоинт должен отражать **реальное состояние приложения**, а не только факт работы HTTP-сервера (иначе деплой может считать успешным запуск «зависшего» или упавшего воркера).
- The endpoint must reflect the **actual state of the application**, not just that the HTTP server is running (otherwise deploy may consider successful a «hung» or crashed worker).
---
## 2. Поведение deploy.sh / deploy.sh behaviour
(Логика уже может быть реализована в коде deploy-скрипта.)
- Если задана переменная `HEALTHCHECK_URL` — после поднятия контейнеров вызывается `wait_for_healthcheck`: цикл с интервалом `HEALTHCHECK_INTERVAL` (по умолчанию 5 с), пока не истечёт `HEALTHCHECK_TIMEOUT` (по умолчанию 120 с).
- If `HEALTHCHECK_URL` is set — after bringing up containers, `wait_for_healthcheck` is called: a loop with interval `HEALTHCHECK_INTERVAL` (default 5 s) until `HEALTHCHECK_TIMEOUT` (default 120 s) expires.
- Проверка: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Флаг `-f`: любой HTTP-код 4xx/5xx считается ошибкой (повтор до таймаута).
- Check: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Flag `-f`: any HTTP 4xx/5xx is treated as failure (retry until timeout).
- **Успех:** HTTP 2xx. **Неуспех:** не 2xx или таймаут curl/соединения → деплой падает с откатом.
- **Success:** HTTP 2xx. **Failure:** non-2xx or curl/connection timeout → deploy fails with rollback.
### Переменные окружения / Environment variables
| Переменная / Variable | Назначение / Purpose | Пример/дефолт / Example/default |
| ------------------------- | --------------------------------- | ----------------------------------- |
| `HEALTHCHECK_URL` | URL для проверки / Check URL | `http://127.0.0.1:8000/health` |
| `HEALTHCHECK_TIMEOUT` | Макс. время ожидания (сек) / Max wait (s) | `120` |
| `HEALTHCHECK_INTERVAL` | Интервал между попытками (сек) / Interval between attempts (s) | `5` |
---
## 3. Контракт эндпоинта / Endpoint contract
- **Метод и путь / Method and path:** `GET /health` (или иной путь по соглашению; один для всех приложений на config_manager).
- **Method and path:** `GET /health` (or another agreed path; one for all applications using config_manager).
- **Успех (приложение в порядке) / Success (application healthy):** HTTP **200**, опционально тело JSON: `{"status": "ok"}`.
- **Success (application healthy):** HTTP **200**, optional JSON body: `{"status": "ok"}`.
- **Приложение не в порядке / Application not healthy:** HTTP **503**, опционально тело: `{"status": "unhealthy"|"degraded", "detail": "причина"}`.
- **Application not healthy:** HTTP **503**, optional body: `{"status": "unhealthy"|"degraded", "detail": "reason"}`.
- Ответ должен приходить в разумное время (рекомендуемый таймаут вызова логики проверки в config_manager — 25 с), иначе deploy через `curl --max-time 5` получит таймаут и будет повторять запросы.
- The response must arrive within a reasonable time (recommended timeout for the check logic in config_manager — 25 s); otherwise deploy will get a timeout via `curl --max-time 5` and will retry.
---
## 4. Обратная связь от приложения (config_manager) / Application feedback (config_manager)
- Эндпоинт реализуется в **config_manager** (опционально, при включённой опции).
- The endpoint is implemented in **config_manager** (optional, when the option is enabled).
- При обработке `GET /health` config_manager **не решает сам** «здорово ли приложение», а вызывает метод приложения, например: `app.get_health_status()`.
- When handling `GET /health`, config_manager does **not** decide by itself whether the application is healthy; it calls the application method, e.g. `app.get_health_status()`.
- **Контракт метода / Method contract** (приложение переопределяет в наследнике):
- **Method contract** (application overrides in subclass):
- Возвращает `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (поле `detail` опционально).
- Returns `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (`detail` optional).
- При исключении или превышении таймаута вызова — считать состояние unhealthy и отдавать 503.
- On exception or call timeout — treat as unhealthy and return 503.
Таким образом, основное приложение «даёт обратную связь» через реализацию `get_health_status()` (флаги после сбоев, проверка БД, heartbeat и т.д.).
Thus, the main application provides feedback via `get_health_status()` (flags after failures, DB check, heartbeat, etc.).
---
## 5. Требования к приложению (например, MailOrderBot) / Application requirements (e.g. MailOrderBot)
- Переопределить `get_health_status()` и возвращать:
- Override `get_health_status()` and return:
- `{"status": "ok"}` при нормальной работе;
- `{"status": "ok"}` when running normally;
- `{"status": "unhealthy", "detail": "..."}` при критичном сбое (например, последний `execute()` упал, зависимость недоступна);
- `{"status": "unhealthy", "detail": "..."}` on critical failure (e.g. last `execute()` failed, dependency unavailable);
- при желании — `{"status": "degraded", "detail": "..."}` для нефатальной деградации (в обоих случаях эндпоинт отдаёт 503 для совместимости с `curl -f`).
- optionally — `{"status": "degraded", "detail": "..."}` for non-fatal degradation (in both cases the endpoint returns 503 for compatibility with `curl -f`).
---
## 6. Инфраструктура / Infrastructure
- Для работы healthcheck приложение должно поднимать HTTP-сервер (в config_manager при включённой опции) и пробрасывать порт в `docker-compose` (например `8000:8000`), чтобы `deploy.sh` на хосте мог обращаться по `HEALTHCHECK_URL` (например `http://127.0.0.1:8000/health`).
- For healthcheck to work, the application must run an HTTP server (in config_manager when the option is enabled) and expose the port in `docker-compose` (e.g. `8000:8000`), so that `deploy.sh` on the host can call `HEALTHCHECK_URL` (e.g. `http://127.0.0.1:8000/health`).

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "config_manager" name = "config_manager"
version = "1.0.2" version = "2.0.0"
description = "Config manager for building applications" description = "Config manager for building applications"
authors = [ authors = [
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" } { name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }

View File

@@ -1,24 +0,0 @@
[metadata]
name = config_manager
version = 1.0.1
author = Aleksei Zosimov
author_email = lesha.spb@gmail.com
description = Base application with configuration and logging features.
long_description = file: README.md
long_description_content_type = text/markdown
url = https://git.lesha.spb.ru/alex/config_manager
project_urls =
Bug Tracker = https://git.lesha.spb.ru/alex/config_manager/issues
classifiers =
Programming Language :: Python :: 3
License :: OSI Approved :: MIT License
Operating System :: OS Independent
[options]
package_dir =
= src
packages = find:
python_requires = >=3.10
[options.packages.find]
where = src

View File

@@ -1 +0,0 @@
from config_manager.config_manager import ConfigManager

View File

@@ -0,0 +1,3 @@
from .v2.manager import ConfigManagerV2 as ConfigManager
from .v1.cfg_manager import ConfigManager as LegacyConfigManager
from .v1.log_manager import LogManager

View File

@@ -1,26 +1,29 @@
import logging
import logging.config
import asyncio import asyncio
import json import json
import yaml import yaml
import logging
import logging.config
from typing import Any
import os import os
from typing import Any, Optional
from .log_manager import LogManager
logger = logging.getLogger(__name__)
class ConfigManager: class ConfigManager:
DEFAULT_UPDATE_INTERVAL = 5.0 DEFAULT_UPDATE_INTERVAL = 5.0
DEFAULT_WORK_INTERVAL = 2.0 DEFAULT_WORK_INTERVAL = 2.0
def __init__(self, path: str): def __init__(self, path: str, log_manager: Optional[LogManager] = None):
self.path = path self.path = path
self.config: Any = None self.config: Any = None
self._last_hash = None self._last_hash = None
self.update_interval = self.DEFAULT_UPDATE_INTERVAL self.update_interval = self.DEFAULT_UPDATE_INTERVAL
self.work_interval = self.DEFAULT_WORK_INTERVAL self.work_interval = self.DEFAULT_WORK_INTERVAL
self._halt = asyncio.Event() self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._log_manager = log_manager or LogManager()
self.logger = logging.getLogger(__name__)
def _read_file_sync(self) -> str: def _read_file_sync(self) -> str:
with open(self.path, "r", encoding="utf-8") as f: with open(self.path, "r", encoding="utf-8") as f:
@@ -29,9 +32,9 @@ class ConfigManager:
async def _read_file_async(self) -> str: async def _read_file_async(self) -> str:
return await asyncio.to_thread(self._read_file_sync) return await asyncio.to_thread(self._read_file_sync)
def _parse_config(self, data: str) -> Any: def _parse_config(self, data) -> Any:
ext = os.path.splitext(self.path)[1].lower() extension = os.path.splitext(self.path)[1].lower()
if ext in (".yaml", ".yml"): if extension in (".yaml", ".yml"):
return yaml.safe_load(data) return yaml.safe_load(data)
else: else:
return json.loads(data) return json.loads(data)
@@ -39,31 +42,21 @@ class ConfigManager:
def _update_intervals_from_config(self) -> None: def _update_intervals_from_config(self) -> None:
if not self.config: if not self.config:
return return
# Берём интервалы из секции config обновления, с контролем типа и значений
upd = self.config.get("update_interval") upd = self.config.get("update_interval")
wrk = self.config.get("work_interval") wrk = self.config.get("work_interval")
if isinstance(upd, (int, float)) and upd > 0: if isinstance(upd, (int, float)) and upd > 0:
self.update_interval = float(upd) self.update_interval = float(upd)
logger.info(f"Update interval set to {self.update_interval} seconds") self.logger.info(f"Update interval set to {self.update_interval} seconds")
else: else:
self.update_interval = self.DEFAULT_UPDATE_INTERVAL self.update_interval = self.DEFAULT_UPDATE_INTERVAL
if isinstance(wrk, (int, float)) and wrk > 0: if isinstance(wrk, (int, float)) and wrk > 0:
self.work_interval = float(wrk) self.work_interval = float(wrk)
logger.info(f"Work interval set to {self.work_interval} seconds") self.logger.info(f"Work interval set to {self.work_interval} seconds")
else: else:
self.work_interval = self.DEFAULT_WORK_INTERVAL self.work_interval = self.DEFAULT_WORK_INTERVAL
def _apply_logging_config(self, config: dict) -> None:
try:
logging_config = config.get("logging")
if logging_config:
logging.config.dictConfig(logging_config)
logger.info("Logging configuration applied")
except Exception as e:
logger.error(f"Error applying logging config: {e}")
async def _update_config(self) -> None: async def _update_config(self) -> None:
try: try:
data = await self._read_file_async() data = await self._read_file_async()
@@ -73,12 +66,11 @@ class ConfigManager:
self.config = new_config self.config = new_config
self._last_hash = current_hash self._last_hash = current_hash
self._apply_logging_config(new_config) self._log_manager.apply_config(new_config)
self._update_intervals_from_config() self._update_intervals_from_config()
logger.info("Config updated: %s", self.config)
except Exception as e: except Exception as e:
logger.error(f"Error reading/parsing config file: {e}") self.logger.error(f"Error reading/parsing config file: {e}")
def execute(self) -> None: def execute(self) -> None:
""" """
@@ -98,37 +90,48 @@ class ConfigManager:
await self._update_config() await self._update_config()
await asyncio.sleep(self.update_interval) await asyncio.sleep(self.update_interval)
async def start(self) -> None: async def _run(self) -> None:
"""Внутренняя корутина, запускающая все циклы"""
self._halt.clear() self._halt.clear()
logger.info("ConfigManager started") self.logger.info("ConfigManager started")
await asyncio.gather( try:
self._worker_loop(), await asyncio.gather(
self._periodic_update_loop() self._worker_loop(),
) self._periodic_update_loop()
)
except asyncio.CancelledError:
self.logger.info("ConfigManager tasks cancelled")
finally:
self.logger.info("ConfigManager stopped")
def stop(self) -> None: async def start(self) -> None:
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManager is already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.error("start() must be called from within an async context")
raise
self.logger.info("ConfigManager starting and awaiting _run()")
await self._run()
async def stop(self) -> None:
"""Останавливает менеджер конфигурации и ожидает завершения"""
if self._task is None:
self.logger.warning("ConfigManager is not running")
return
self.logger.info("ConfigManager stopping...")
self._halt.set() self._halt.set()
logger.info("ConfigManager stopping...")
try:
await self._task
except asyncio.CancelledError:
pass
# Пример наследования и переопределения execute
class MyApp(ConfigManager): self._task = None
def execute(self) -> None: self.logger.info("ConfigManager stopped successfully")
logger.info("Executing blocking work with config: %s", self.config)
async def main():
app = MyApp("config.yaml") # Можно config.json или config.yaml
task = asyncio.create_task(app.start())
await asyncio.sleep(20)
app.stop()
await task
logger.info("Work finished.")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
asyncio.run(main())

View File

@@ -0,0 +1,40 @@
import logging
from typing import Optional
class LogManager:
"""
Управляет конфигурацией логирования приложения.
Применяет конфигурацию из словаря с обработкой ошибок.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._last_valid_config: Optional[dict] = None
def apply_config(self, config: dict) -> None:
"""
Применяет конфигурацию логирования из словаря.
При ошибке восстанавливает последний валидный конфиг.
Args:
config: Словарь с настройками логирования (из файла конфига)
"""
logging_config = config.get("log")
if not logging_config:
return
try:
logging.config.dictConfig(logging_config)
self._last_valid_config = logging_config
self.logger.info("Logging configuration applied")
except Exception as e:
self.logger.error(f"Error applying logging config: {e}")
# Если был предыдущий валидный конфиг, восстанавливаем его
if self._last_valid_config:
try:
logging.config.dictConfig(self._last_valid_config)
self.logger.info("Previous logging configuration restored")
except Exception as restore_error:
self.logger.error(f"Error restoring previous config: {restore_error}")

View File

@@ -0,0 +1,4 @@
from .manager import ConfigManagerV2
from .types import HealthServerSettings
__all__ = ["ConfigManagerV2", "HealthServerSettings"]

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import asyncio
import hashlib
import json
import os
from typing import Any, Optional
import yaml
class ConfigLoader:
def __init__(self, path: str):
"""Initialize loader state for a specific config file path."""
self.path = path
self.config: Any = None
self.last_valid_config: Any = None
self._last_seen_hash: Optional[str] = None
def _read_file_sync(self) -> str:
"""Read raw config text from disk synchronously."""
with open(self.path, "r", encoding="utf-8") as fh:
return fh.read()
async def read_file_async(self) -> str:
"""Read raw config text from disk in a worker thread."""
return await asyncio.to_thread(self._read_file_sync)
def parse_config(self, data: str) -> Any:
"""Parse config text as YAML or JSON based on file extension."""
extension = os.path.splitext(self.path)[1].lower()
if extension in (".yaml", ".yml"):
return yaml.safe_load(data)
return json.loads(data)
@staticmethod
def _calculate_hash(data: str) -> str:
"""Calculate a stable content hash for change detection."""
return hashlib.sha256(data.encode("utf-8")).hexdigest()
async def load_if_changed(self) -> tuple[bool, Any]:
"""Load and parse config only when file content changed."""
raw_data = await self.read_file_async()
current_hash = self._calculate_hash(raw_data)
if current_hash == self._last_seen_hash:
return False, self.config
self._last_seen_hash = current_hash
parsed = self.parse_config(raw_data)
self.config = parsed
self.last_valid_config = parsed
return True, parsed

View File

@@ -0,0 +1,4 @@
from .base import ControlChannel
from .telegram import TelegramControlChannel
__all__ = ["ControlChannel", "TelegramControlChannel"]

View File

@@ -0,0 +1,21 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
StartHandler = Callable[[], Awaitable[str]]
StopHandler = Callable[[], Awaitable[str]]
StatusHandler = Callable[[], Awaitable[str]]
class ControlChannel(ABC):
@abstractmethod
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
"""Start channel and bind command handlers."""
raise NotImplementedError
@abstractmethod
async def stop(self) -> None:
"""Stop channel and release its resources."""
raise NotImplementedError

View File

@@ -0,0 +1,111 @@
from __future__ import annotations
import asyncio
import json
import logging
import urllib.parse
import urllib.request
from typing import Optional
from .base import ControlChannel, StartHandler, StatusHandler, StopHandler
class TelegramControlChannel(ControlChannel):
def __init__(
self,
token: str,
chat_id: int,
poll_interval: float = 2.0,
logger: Optional[logging.Logger] = None,
):
"""Initialize Telegram polling channel with bot and chat settings."""
self._token = token
self._chat_id = chat_id
self._poll_interval = poll_interval
self._offset: Optional[int] = None
self._task: Optional[asyncio.Task] = None
self._stop_event = asyncio.Event()
self._on_start: Optional[StartHandler] = None
self._on_stop: Optional[StopHandler] = None
self._on_status: Optional[StatusHandler] = None
self._logger = logger or logging.getLogger(__name__)
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
"""Start polling Telegram updates and register command callbacks."""
if self._task is not None and not self._task.done():
return
self._on_start = on_start
self._on_stop = on_stop
self._on_status = on_status
self._stop_event.clear()
self._task = asyncio.create_task(self._poll_loop())
async def stop(self) -> None:
"""Stop polling loop and wait until task termination."""
self._stop_event.set()
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
async def _poll_loop(self) -> None:
"""Continuously fetch updates and dispatch supported commands."""
while not self._stop_event.is_set():
try:
updates = await asyncio.to_thread(self._fetch_updates)
for update in updates:
await self._process_update(update)
except Exception as exc: # noqa: BLE001
self._logger.warning("Telegram polling error: %s", exc)
try:
await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1))
except asyncio.TimeoutError:
continue
def _fetch_updates(self) -> list[dict]:
"""Pull new Telegram updates using the latest offset."""
params = {"timeout": 0}
if self._offset is not None:
params["offset"] = self._offset
query = urllib.parse.urlencode(params)
url = f"https://api.telegram.org/bot{self._token}/getUpdates?{query}"
with urllib.request.urlopen(url, timeout=10) as response:
payload = json.loads(response.read().decode("utf-8"))
result = payload.get("result", [])
if result:
self._offset = max(item["update_id"] for item in result) + 1
return result
async def _process_update(self, update: dict) -> None:
"""Handle one Telegram update and execute mapped command."""
message = update.get("message") or {}
text = (message.get("text") or "").strip().lower()
chat = message.get("chat") or {}
chat_id = chat.get("id")
if chat_id != self._chat_id:
return
if text in {"/start", "/run"} and self._on_start is not None:
reply = await self._on_start()
elif text in {"/stop", "/halt"} and self._on_stop is not None:
reply = await self._on_stop()
elif text in {"/status", "/health"} and self._on_status is not None:
reply = await self._on_status()
else:
return
await asyncio.to_thread(self._send_message, reply)
def _send_message(self, text: str) -> None:
"""Send plain-text reply to the configured Telegram chat."""
encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text})
url = f"https://api.telegram.org/bot{self._token}/sendMessage"
req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST")
with urllib.request.urlopen(req, timeout=10):
return

View File

@@ -0,0 +1,80 @@
from __future__ import annotations
import asyncio
import json
from collections.abc import Awaitable, Callable
from typing import Optional
from .types import HealthPayload
class HealthServer:
def __init__(
self,
host: str,
port: int,
path: str,
timeout: float,
health_provider: Callable[[], Awaitable[HealthPayload]],
):
"""Configure lightweight HTTP health server parameters and callback."""
self._host = host
self._port = port
self._path = path
self._timeout = timeout
self._health_provider = health_provider
self._server: Optional[asyncio.base_events.Server] = None
async def start(self) -> None:
"""Start listening for healthcheck requests if not running."""
if self._server is not None:
return
self._server = await asyncio.start_server(self._handle_connection, self._host, self._port)
async def stop(self) -> None:
"""Stop the health server and release the listening socket."""
if self._server is None:
return
self._server.close()
await self._server.wait_closed()
self._server = None
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
"""Process one HTTP connection and return JSON health payload."""
status_code = 404
payload: HealthPayload = {"status": "unhealthy", "detail": "not found"}
try:
request_line = await reader.readline()
parts = request_line.decode("utf-8", errors="ignore").strip().split(" ")
if len(parts) >= 2:
method, path = parts[0], parts[1]
if method == "GET" and path == self._path:
status_code, payload = await self._build_health_response()
except Exception: # noqa: BLE001
status_code = 500
payload = {"status": "unhealthy", "detail": "internal error"}
body = json.dumps(payload).encode("utf-8")
phrase = "OK" if status_code == 200 else "SERVICE UNAVAILABLE" if status_code == 503 else "NOT FOUND"
response = (
f"HTTP/1.1 {status_code} {phrase}\r\n"
"Content-Type: application/json\r\n"
f"Content-Length: {len(body)}\r\n"
"Connection: close\r\n"
"\r\n"
).encode("utf-8") + body
writer.write(response)
await writer.drain()
writer.close()
await writer.wait_closed()
async def _build_health_response(self) -> tuple[int, HealthPayload]:
"""Build HTTP status code and body from application health callback."""
try:
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
status = payload.get("status", "unhealthy")
return (200, payload) if status == "ok" else (503, payload)
except Exception as exc: # noqa: BLE001
return 503, {"status": "unhealthy", "detail": str(exc)}

View File

@@ -0,0 +1,256 @@
from __future__ import annotations
import asyncio
import logging
from collections.abc import Awaitable
from typing import Any, Optional
from ..v1.log_manager import LogManager
from .config_loader import ConfigLoader
from .control.base import ControlChannel
from .health import HealthServer
from .scheduler import WorkerLoop
from .types import HealthPayload, HealthServerSettings, LifecycleState
class ConfigManagerV2:
DEFAULT_UPDATE_INTERVAL = 5.0
DEFAULT_WORK_INTERVAL = 2.0
def __init__(
self,
path: str,
log_manager: Optional[LogManager] = None,
health_settings: Optional[HealthServerSettings] = None,
control_channel: Optional[ControlChannel] = None,
):
"""Initialize manager subsystems and runtime state."""
self.path = path
self.config: Any = None
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
self.work_interval = self.DEFAULT_WORK_INTERVAL
self._loader = ConfigLoader(path)
self._log_manager = log_manager or LogManager()
self._control_channel = control_channel
self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._state = LifecycleState.IDLE
self._last_execute_error: Optional[str] = None
self._health_settings = health_settings or HealthServerSettings(enabled=False)
self._health_server: Optional[HealthServer] = None
if self._health_settings.enabled:
self._health_server = HealthServer(
host=self._health_settings.host,
port=self._health_settings.port,
path=self._health_settings.path,
timeout=self._health_settings.timeout,
health_provider=self._collect_health,
)
self.logger = logging.getLogger(__name__)
def _read_file_sync(self) -> str:
"""Read config file synchronously via the shared loader."""
return self._loader._read_file_sync()
async def _read_file_async(self) -> str:
"""Read config file asynchronously via a worker thread."""
return await self._loader.read_file_async()
def _parse_config(self, data: str) -> Any:
"""Parse raw config text to a Python object."""
return self._loader.parse_config(data)
def _update_intervals_from_config(self) -> None:
"""Refresh work and update intervals from current config."""
if not isinstance(self.config, dict):
return
upd = self.config.get("update_interval")
wrk = self.config.get("work_interval")
if isinstance(upd, (int, float)) and upd > 0:
self.update_interval = float(upd)
else:
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
if isinstance(wrk, (int, float)) and wrk > 0:
self.work_interval = float(wrk)
else:
self.work_interval = self.DEFAULT_WORK_INTERVAL
async def _update_config(self) -> None:
"""Reload config and apply new settings when content changes."""
try:
changed, new_config = await self._loader.load_if_changed()
if not changed:
return
self.config = new_config
self._update_intervals_from_config()
if isinstance(new_config, dict):
self._log_manager.apply_config(new_config)
except Exception as exc: # noqa: BLE001
# Keep current config untouched and continue with last valid settings.
self.logger.error("Error reading/parsing config file: %s", exc)
if self._loader.last_valid_config is not None:
self.config = self._loader.last_valid_config
self._update_intervals_from_config()
def execute(self) -> None:
"""Override in subclass to implement one unit of blocking work."""
def get_health_status(self) -> HealthPayload:
"""Return application-specific health payload for /health."""
return {"status": "ok"}
async def _collect_health(self) -> HealthPayload:
"""Aggregate lifecycle and app status into one health result."""
if self._state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}:
return {"status": "unhealthy", "detail": f"state={self._state.value}"}
if self._last_execute_error is not None:
return {"status": "unhealthy", "detail": self._last_execute_error}
result = self.get_health_status()
status = result.get("status", "unhealthy")
if status not in {"ok", "degraded", "unhealthy"}:
return {"status": "unhealthy", "detail": "invalid health status"}
return result
def _on_execute_success(self) -> None:
"""Clear the last execution error marker after successful run."""
self._last_execute_error = None
def _on_execute_error(self, exc: Exception) -> None:
"""Store and log execution failure details for health reporting."""
self._last_execute_error = str(exc)
self.logger.error("Execution error: %s", exc)
async def _worker_loop(self) -> None:
"""Run execute() repeatedly until shutdown is requested."""
worker = WorkerLoop(
execute=self.execute,
get_interval=lambda: self.work_interval,
halt_event=self._halt,
on_error=self._on_execute_error,
on_success=self._on_execute_success,
)
await worker.run()
async def _periodic_update_loop(self) -> None:
"""Periodically check config file for updates until shutdown."""
while not self._halt.is_set():
await self._update_config()
try:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError:
continue
async def _status_text(self) -> str:
"""Build human-readable runtime status for control channels."""
health = await self._collect_health()
detail = health.get("detail")
if detail:
return f"state={self._state.value}; health={health['status']}; detail={detail}"
return f"state={self._state.value}; health={health['status']}"
async def _control_start(self) -> str:
"""Handle external start command for control channels."""
if self._state == LifecycleState.RUNNING:
return "already running"
self._halt.clear()
return "start signal accepted"
async def _control_stop(self) -> str:
"""Handle external stop command for control channels."""
self._halt.set()
return "stop signal accepted"
async def _control_status(self) -> str:
"""Handle external status command for control channels."""
return await self._status_text()
async def _start_control_channel(self) -> None:
"""Start configured control channel with bound command handlers."""
if self._control_channel is None:
return
await self._control_channel.start(self._control_start, self._control_stop, self._control_status)
async def _stop_control_channel(self) -> None:
"""Stop configured control channel if it is active."""
if self._control_channel is None:
return
await self._control_channel.stop()
async def _run(self) -> None:
"""Run manager lifecycle and coordinate all background tasks."""
self._state = LifecycleState.STARTING
self._halt.clear()
await self._update_config()
if self._health_server is not None:
await self._health_server.start()
await self._start_control_channel()
self._state = LifecycleState.RUNNING
self.logger.info("ConfigManagerV2 started")
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
raise
finally:
self._state = LifecycleState.STOPPING
self._halt.set()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
await self._stop_control_channel()
if self._health_server is not None:
await self._health_server.stop()
self._state = LifecycleState.STOPPED
self.logger.info("ConfigManagerV2 stopped")
async def start(self) -> None:
"""Start manager lifecycle from an active asyncio context."""
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManagerV2 is already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.error("start() must be called from within an async context")
raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
try:
await self._task
finally:
self._task = None
async def stop(self) -> None:
"""Request graceful shutdown and wait for manager completion."""
if self._task is None:
self.logger.warning("ConfigManagerV2 is not running")
return
self._halt.set()
if asyncio.current_task() is self._task:
return
try:
await self._task
except asyncio.CancelledError:
pass

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable
from typing import Optional
class WorkerLoop:
def __init__(
self,
execute: Callable[[], None],
get_interval: Callable[[], float],
halt_event: asyncio.Event,
on_error: Optional[Callable[[Exception], None]] = None,
on_success: Optional[Callable[[], None]] = None,
):
"""Store callbacks and synchronization primitives for worker execution."""
self._execute = execute
self._get_interval = get_interval
self._halt_event = halt_event
self._on_error = on_error
self._on_success = on_success
async def run(self) -> None:
"""Run execute repeatedly until halt is requested."""
while not self._halt_event.is_set():
try:
await asyncio.to_thread(self._execute)
if self._on_success is not None:
self._on_success()
except Exception as exc: # noqa: BLE001
if self._on_error is not None:
self._on_error(exc)
timeout = max(self._get_interval(), 0.01)
try:
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
continue

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Literal, TypedDict
HealthState = Literal["ok", "degraded", "unhealthy"]
class HealthPayload(TypedDict, total=False):
status: HealthState
detail: str
class LifecycleState(str, Enum):
IDLE = "idle"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
STOPPED = "stopped"
@dataclass
class HealthServerSettings:
enabled: bool = False
host: str = "0.0.0.0"
port: int = 8000
path: str = "/health"
timeout: float = 3.0

53
tests/config.yaml Normal file
View File

@@ -0,0 +1,53 @@
# === Раздел с общими конфигурационными параметрами ===
runtime: 5
# === Логирование ===
log:
version: 1
disable_existing_loggers: False
formatters:
standard:
format: '%(asctime)s %(module)15s [%(levelname)8s]: %(message)s'
telegram:
format: '%(message)s'
handlers:
console:
level: DEBUG
formatter: standard
class: logging.StreamHandler
stream: ext://sys.stdout # Default is stderr
file:
level: DEBUG
formatter: standard
class: logging.handlers.RotatingFileHandler
filename: logs/log.log
mode: a
maxBytes: 500000
backupCount: 15
#telegram:
# level: CRITICAL
# formatter: telegram
# class: logging_telegram_handler.TelegramHandler
# chat_id: 211945135
# alias: "PDC"
# -- Логгеры --
loggers:
'':
handlers: [console, file]
level: INFO
propagate: False
__main__:
handlers: [console, file]
level: DEBUG
propagate: False
config_manager:
handlers: [console, file]
level: DEBUG

View File

@@ -1,131 +0,0 @@
import unittest
from unittest.mock import patch, mock_open, AsyncMock
import asyncio
import logging
import io
import json
import yaml
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))
from basic_application.basic_application import ConfigManager
class TestConfigManager(unittest.IsolatedAsyncioTestCase):
def setUp(self):
self.json_data = json.dumps({
"work_interval": 1,
"update_interval": 1,
"logging": {
"version": 1,
"handlers": {"console": {"class": "logging.StreamHandler", "level": "DEBUG"}},
"root": {"handlers": ["console"], "level": "DEBUG"}
},
"some_key": "some_value"
})
self.yaml_data = """
work_interval: 1
update_interval: 1
logging:
version: 1
handlers:
console:
class: logging.StreamHandler
level: DEBUG
root:
handlers: [console]
level: DEBUG
some_key: some_value
"""
@patch("builtins.open", new_callable=mock_open, read_data="")
async def test_read_file_async_json(self, mock_file):
mock_file.return_value.read = lambda: self.json_data
cm = ConfigManager("config.json")
content = await cm._read_file_async()
self.assertEqual(content, self.json_data)
@patch("builtins.open", new_callable=mock_open, read_data="")
async def test_read_file_async_yaml(self, mock_file):
mock_file.return_value.read = lambda: self.yaml_data
cm = ConfigManager("config.yaml")
content = await cm._read_file_async()
self.assertEqual(content, self.yaml_data)
def test_parse_json(self):
cm = ConfigManager("config.json")
parsed = cm._parse_config(self.json_data)
self.assertIsInstance(parsed, dict)
self.assertEqual(parsed["some_key"], "some_value")
def test_parse_yaml(self):
cm = ConfigManager("config.yaml")
parsed = cm._parse_config(self.yaml_data)
self.assertIsInstance(parsed, dict)
self.assertEqual(parsed["some_key"], "some_value")
@patch("basic_application.basic_application.logging.config.dictConfig")
def test_apply_logging_config(self, mock_dict_config):
cm = ConfigManager("config.json")
cm._apply_logging_config({"logging": {"version": 1}})
mock_dict_config.assert_called_once()
async def test_update_config_changes_config_and_intervals(self):
# Мокаем чтение файла
m = mock_open(read_data=self.json_data)
with patch("builtins.open", m):
cm = ConfigManager("config.json")
# Проверяем исходные интервалы
self.assertEqual(cm.update_interval, cm.DEFAULT_UPDATE_INTERVAL)
self.assertEqual(cm.work_interval, cm.DEFAULT_WORK_INTERVAL)
await cm._update_config()
# После обновления данные заполнены
self.assertIsInstance(cm.config, dict)
self.assertEqual(cm.update_interval, 1.0)
self.assertEqual(cm.work_interval, 1.0)
async def test_execute_called_in_worker_loop(self):
called = False
class TestCM(ConfigManager):
def execute(self2):
nonlocal called
called = True
cm = TestCM("config.json")
async def stop_after_delay():
await asyncio.sleep(0.1)
cm.stop()
# Запускаем worker_loop и через 0.1 сек останавливаем
await asyncio.gather(cm._worker_loop(), stop_after_delay())
self.assertTrue(called)
async def test_periodic_update_loop_runs(self):
count = 0
class TestCM(ConfigManager):
async def _update_config(self2):
nonlocal count
count += 1
if count >= 2:
self2.stop()
cm = TestCM("config.json")
await cm._periodic_update_loop()
self.assertGreaterEqual(count, 2)
if __name__ == "__main__":
logging.basicConfig(level=logging.WARNING) # отключаем логи во время тестов
unittest.main()

34
tests/test_app.py Normal file
View File

@@ -0,0 +1,34 @@
#import os
#os.chdir(os.path.dirname(__file__))
from config_manager import ConfigManager
import logging
import asyncio
from typing import Optional
logger = logging.getLogger()
class MyApp(ConfigManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.iter = 0
def execute(self) -> None:
logger.info(f"current iteration {self.iter}")
self.iter += 1
async def main():
app = MyApp("config.yaml")
logger.info("App started")
await app.start()
logger.info("App finished")
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())

View File

View File

@@ -0,0 +1,33 @@
import asyncio
from config_manager.v2 import ConfigManagerV2
class ReloadApp(ConfigManagerV2):
def execute(self) -> None:
return
def test_invalid_config_keeps_last_valid(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.2\nupdate_interval: 0.05\n", encoding="utf-8")
app = ReloadApp(str(cfg))
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12)
assert app.work_interval == 0.2
assert app.update_interval == 0.05
cfg.write_text("work_interval: [broken\n", encoding="utf-8")
await asyncio.sleep(0.15)
assert app.work_interval == 0.2
assert app.update_interval == 0.05
assert isinstance(app.config, dict)
await app.stop()
await runner
asyncio.run(scenario())

View File

@@ -0,0 +1,30 @@
import asyncio
from config_manager.v2 import ConfigManagerV2
class DemoApp(ConfigManagerV2):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
def execute(self) -> None:
self.calls += 1
def test_execute_loop_runs(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
app = DemoApp(str(cfg))
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.18)
await app.stop()
await runner
assert app.calls >= 2
assert isinstance(app.config, dict)
asyncio.run(scenario())

View File

@@ -0,0 +1,54 @@
import asyncio
from config_manager.v2 import ConfigManagerV2
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
class DummyControlChannel(ControlChannel):
def __init__(self):
self.on_start: StartHandler | None = None
self.on_stop: StopHandler | None = None
self.on_status: StatusHandler | None = None
self.started = False
self.stopped = False
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
self.on_start = on_start
self.on_stop = on_stop
self.on_status = on_status
self.started = True
async def stop(self) -> None:
self.stopped = True
class ControlledApp(ConfigManagerV2):
def execute(self) -> None:
return
def test_control_channel_can_stop_manager(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
channel = DummyControlChannel()
app = ControlledApp(str(cfg), control_channel=channel)
runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12)
assert channel.started is True
assert channel.on_status is not None
assert channel.on_stop is not None
status_text = await channel.on_status()
assert "state=running" in status_text
stop_text = await channel.on_stop()
assert "stop signal accepted" in stop_text
await runner
assert channel.stopped is True
asyncio.run(scenario())

View File

@@ -0,0 +1,41 @@
import asyncio
from config_manager.v2.health import HealthServer
def test_health_mapping_ok_to_200():
async def provider():
return {"status": "ok"}
async def scenario() -> None:
server = HealthServer(
host="127.0.0.1",
port=8000,
path="/health",
timeout=0.2,
health_provider=provider,
)
code, payload = await server._build_health_response()
assert code == 200
assert payload["status"] == "ok"
asyncio.run(scenario())
def test_health_mapping_unhealthy_to_503():
async def provider():
return {"status": "unhealthy", "detail": "worker failed"}
async def scenario() -> None:
server = HealthServer(
host="127.0.0.1",
port=8000,
path="/health",
timeout=0.2,
health_provider=provider,
)
code, payload = await server._build_health_response()
assert code == 503
assert payload["status"] == "unhealthy"
asyncio.run(scenario())

View File

@@ -0,0 +1,43 @@
import asyncio
import threading
import time
from config_manager.v2 import ConfigManagerV2
class BlockingApp(ConfigManagerV2):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.started_event = threading.Event()
self.active_event = threading.Event()
self.calls = 0
def execute(self) -> None:
self.calls += 1
self.active_event.set()
self.started_event.set()
time.sleep(0.2)
self.active_event.clear()
def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
app = BlockingApp(str(cfg))
runner = asyncio.create_task(app.start())
started = await asyncio.to_thread(app.started_event.wait, 1.0)
assert started is True
await app.stop()
await runner
assert app.active_event.is_set() is False
assert app.calls == 1
await asyncio.sleep(0.15)
assert app.calls == 1
asyncio.run(scenario())