Compare commits
18 Commits
features/f
...
1d71ce406f
| Author | SHA1 | Date | |
|---|---|---|---|
| 1d71ce406f | |||
| 80dd69c5ec | |||
| 8da6df0b2a | |||
| 7eb3476b96 | |||
| da8ed4fa2b | |||
| 8f22fcf6af | |||
| 311870fd73 | |||
| ffd758d9a4 | |||
| 526661e498 | |||
| b0c87a427c | |||
| 7b74e0b0b8 | |||
| 5faea8f69f | |||
| f491c65455 | |||
| 4eb9327628 | |||
| e71685aad9 | |||
| 98867d69a7 | |||
| 68f4b26f00 | |||
| 3b8e28e077 |
6
.gitignore
vendored
6
.gitignore
vendored
@@ -1,3 +1,5 @@
|
|||||||
src/config_manager/__pycache__/basic_application.cpython-312.pyc
|
__pycache__
|
||||||
venv/
|
.venv/
|
||||||
.vscode/
|
.vscode/
|
||||||
|
log*.log
|
||||||
|
config_manager.egg-info
|
||||||
29
README_DEPLOY.md
Normal file
29
README_DEPLOY.md
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
# Деплой / Deploy
|
||||||
|
|
||||||
|
Краткое описание процесса деплоя приложений на базе config_manager и используемых скриптов.
|
||||||
|
|
||||||
|
A short description of the deploy process for applications based on config_manager and the scripts used.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Healthcheck
|
||||||
|
|
||||||
|
После поднятия контейнеров (`docker compose up -d`) скрипт деплоя может ожидать успешного ответа по URL проверки здоровья приложения. При таймауте выполняется откат и выход с ошибкой.
|
||||||
|
|
||||||
|
After bringing up containers (`docker compose up -d`), the deploy script may wait for a successful response at the application health-check URL. On timeout, rollback is performed and the script exits with an error.
|
||||||
|
|
||||||
|
**Полная спецификация:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md).
|
||||||
|
|
||||||
|
**Full specification:** [docs/HEALTHCHECK_REQUIREMENTS.md](docs/HEALTHCHECK_REQUIREMENTS.md).
|
||||||
|
|
||||||
|
### Переменные окружения / Environment variables
|
||||||
|
|
||||||
|
| Переменная | Назначение | Пример/дефолт |
|
||||||
|
| ----------------------- | ---------------------------------------- | ------------------------------- |
|
||||||
|
| `HEALTHCHECK_URL` | URL для проверки здоровья | `http://127.0.0.1:8000/health` |
|
||||||
|
| `HEALTHCHECK_TIMEOUT` | Максимальное время ожидания (секунды) | `120` |
|
||||||
|
| `HEALTHCHECK_INTERVAL` | Интервал между попытками (секунды) | `5` |
|
||||||
|
|
||||||
|
Если задана `HEALTHCHECK_URL`, деплой после поднятия контейнеров вызывает этот URL (например, через `curl -fsS --max-time 5`); успех — HTTP 2xx, иначе повтор до истечения `HEALTHCHECK_TIMEOUT`.
|
||||||
|
|
||||||
|
If `HEALTHCHECK_URL` is set, deploy calls this URL after bringing up containers (e.g. via `curl -fsS --max-time 5`); success is HTTP 2xx, otherwise retries until `HEALTHCHECK_TIMEOUT` expires.
|
||||||
95
docs/HEALTHCHECK_REQUIREMENTS.md
Normal file
95
docs/HEALTHCHECK_REQUIREMENTS.md
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
# Требования к healthcheck / Healthcheck Requirements
|
||||||
|
|
||||||
|
Единая спецификация для реализации healthcheck в config_manager и в приложениях (в т.ч. MailOrderBot).
|
||||||
|
|
||||||
|
A unified specification for implementing healthcheck in config_manager and in applications (including MailOrderBot).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 1. Назначение / Purpose
|
||||||
|
|
||||||
|
- Healthcheck используется скриптом деплоя (`deploy.sh`): после `docker compose up -d` деплой ждёт успешного ответа по `HEALTHCHECK_URL`; при таймауте — откат и выход с ошибкой.
|
||||||
|
- The healthcheck is used by the deploy script (`deploy.sh`): after `docker compose up -d`, the deploy waits for a successful response at `HEALTHCHECK_URL`; on timeout — rollback and exit with error.
|
||||||
|
|
||||||
|
- Эндпоинт должен отражать **реальное состояние приложения**, а не только факт работы HTTP-сервера (иначе деплой может считать успешным запуск «зависшего» или упавшего воркера).
|
||||||
|
- The endpoint must reflect the **actual state of the application**, not just that the HTTP server is running (otherwise deploy may consider successful a «hung» or crashed worker).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 2. Поведение deploy.sh / deploy.sh behaviour
|
||||||
|
|
||||||
|
(Логика уже может быть реализована в коде deploy-скрипта.)
|
||||||
|
|
||||||
|
- Если задана переменная `HEALTHCHECK_URL` — после поднятия контейнеров вызывается `wait_for_healthcheck`: цикл с интервалом `HEALTHCHECK_INTERVAL` (по умолчанию 5 с), пока не истечёт `HEALTHCHECK_TIMEOUT` (по умолчанию 120 с).
|
||||||
|
- If `HEALTHCHECK_URL` is set — after bringing up containers, `wait_for_healthcheck` is called: a loop with interval `HEALTHCHECK_INTERVAL` (default 5 s) until `HEALTHCHECK_TIMEOUT` (default 120 s) expires.
|
||||||
|
|
||||||
|
- Проверка: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Флаг `-f`: любой HTTP-код 4xx/5xx считается ошибкой (повтор до таймаута).
|
||||||
|
- Check: `curl -fsS --max-time 5 "$HEALTHCHECK_URL"`. Flag `-f`: any HTTP 4xx/5xx is treated as failure (retry until timeout).
|
||||||
|
|
||||||
|
- **Успех:** HTTP 2xx. **Неуспех:** не 2xx или таймаут curl/соединения → деплой падает с откатом.
|
||||||
|
- **Success:** HTTP 2xx. **Failure:** non-2xx or curl/connection timeout → deploy fails with rollback.
|
||||||
|
|
||||||
|
### Переменные окружения / Environment variables
|
||||||
|
|
||||||
|
| Переменная / Variable | Назначение / Purpose | Пример/дефолт / Example/default |
|
||||||
|
| ------------------------- | --------------------------------- | ----------------------------------- |
|
||||||
|
| `HEALTHCHECK_URL` | URL для проверки / Check URL | `http://127.0.0.1:8000/health` |
|
||||||
|
| `HEALTHCHECK_TIMEOUT` | Макс. время ожидания (сек) / Max wait (s) | `120` |
|
||||||
|
| `HEALTHCHECK_INTERVAL` | Интервал между попытками (сек) / Interval between attempts (s) | `5` |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 3. Контракт эндпоинта / Endpoint contract
|
||||||
|
|
||||||
|
- **Метод и путь / Method and path:** `GET /health` (или иной путь по соглашению; один для всех приложений на config_manager).
|
||||||
|
- **Method and path:** `GET /health` (or another agreed path; one for all applications using config_manager).
|
||||||
|
|
||||||
|
- **Успех (приложение в порядке) / Success (application healthy):** HTTP **200**, опционально тело JSON: `{"status": "ok"}`.
|
||||||
|
- **Success (application healthy):** HTTP **200**, optional JSON body: `{"status": "ok"}`.
|
||||||
|
|
||||||
|
- **Приложение не в порядке / Application not healthy:** HTTP **503**, опционально тело: `{"status": "unhealthy"|"degraded", "detail": "причина"}`.
|
||||||
|
- **Application not healthy:** HTTP **503**, optional body: `{"status": "unhealthy"|"degraded", "detail": "reason"}`.
|
||||||
|
|
||||||
|
- Ответ должен приходить в разумное время (рекомендуемый таймаут вызова логики проверки в config_manager — 2–5 с), иначе deploy через `curl --max-time 5` получит таймаут и будет повторять запросы.
|
||||||
|
- The response must arrive within a reasonable time (recommended timeout for the check logic in config_manager — 2–5 s); otherwise deploy will get a timeout via `curl --max-time 5` and will retry.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 4. Обратная связь от приложения (config_manager) / Application feedback (config_manager)
|
||||||
|
|
||||||
|
- Эндпоинт реализуется в **config_manager** (опционально, при включённой опции).
|
||||||
|
- The endpoint is implemented in **config_manager** (optional, when the option is enabled).
|
||||||
|
|
||||||
|
- При обработке `GET /health` config_manager **не решает сам** «здорово ли приложение», а вызывает метод приложения, например: `app.get_health_status()`.
|
||||||
|
- When handling `GET /health`, config_manager does **not** decide by itself whether the application is healthy; it calls the application method, e.g. `app.get_health_status()`.
|
||||||
|
|
||||||
|
- **Контракт метода / Method contract** (приложение переопределяет в наследнике):
|
||||||
|
- **Method contract** (application overrides in subclass):
|
||||||
|
- Возвращает `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (поле `detail` опционально).
|
||||||
|
- Returns `dict`: `{"status": "ok" | "degraded" | "unhealthy", "detail": "..."}` (`detail` optional).
|
||||||
|
- При исключении или превышении таймаута вызова — считать состояние unhealthy и отдавать 503.
|
||||||
|
- On exception or call timeout — treat as unhealthy and return 503.
|
||||||
|
|
||||||
|
Таким образом, основное приложение «даёт обратную связь» через реализацию `get_health_status()` (флаги после сбоев, проверка БД, heartbeat и т.д.).
|
||||||
|
|
||||||
|
Thus, the main application provides feedback via `get_health_status()` (flags after failures, DB check, heartbeat, etc.).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 5. Требования к приложению (например, MailOrderBot) / Application requirements (e.g. MailOrderBot)
|
||||||
|
|
||||||
|
- Переопределить `get_health_status()` и возвращать:
|
||||||
|
- Override `get_health_status()` and return:
|
||||||
|
- `{"status": "ok"}` при нормальной работе;
|
||||||
|
- `{"status": "ok"}` when running normally;
|
||||||
|
- `{"status": "unhealthy", "detail": "..."}` при критичном сбое (например, последний `execute()` упал, зависимость недоступна);
|
||||||
|
- `{"status": "unhealthy", "detail": "..."}` on critical failure (e.g. last `execute()` failed, dependency unavailable);
|
||||||
|
- при желании — `{"status": "degraded", "detail": "..."}` для нефатальной деградации (в обоих случаях эндпоинт отдаёт 503 для совместимости с `curl -f`).
|
||||||
|
- optionally — `{"status": "degraded", "detail": "..."}` for non-fatal degradation (in both cases the endpoint returns 503 for compatibility with `curl -f`).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 6. Инфраструктура / Infrastructure
|
||||||
|
|
||||||
|
- Для работы healthcheck приложение должно поднимать HTTP-сервер (в config_manager при включённой опции) и пробрасывать порт в `docker-compose` (например `8000:8000`), чтобы `deploy.sh` на хосте мог обращаться по `HEALTHCHECK_URL` (например `http://127.0.0.1:8000/health`).
|
||||||
|
- For healthcheck to work, the application must run an HTTP server (in config_manager when the option is enabled) and expose the port in `docker-compose` (e.g. `8000:8000`), so that `deploy.sh` on the host can call `HEALTHCHECK_URL` (e.g. `http://127.0.0.1:8000/health`).
|
||||||
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "config_manager"
|
name = "config_manager"
|
||||||
version = "1.0.2"
|
version = "2.0.0"
|
||||||
description = "Config manager for building applications"
|
description = "Config manager for building applications"
|
||||||
authors = [
|
authors = [
|
||||||
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
|
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
|
||||||
|
|||||||
24
setup.cfg
24
setup.cfg
@@ -1,24 +0,0 @@
|
|||||||
[metadata]
|
|
||||||
name = config_manager
|
|
||||||
version = 1.0.1
|
|
||||||
author = Aleksei Zosimov
|
|
||||||
author_email = lesha.spb@gmail.com
|
|
||||||
description = Base application with configuration and logging features.
|
|
||||||
long_description = file: README.md
|
|
||||||
long_description_content_type = text/markdown
|
|
||||||
url = https://git.lesha.spb.ru/alex/config_manager
|
|
||||||
project_urls =
|
|
||||||
Bug Tracker = https://git.lesha.spb.ru/alex/config_manager/issues
|
|
||||||
classifiers =
|
|
||||||
Programming Language :: Python :: 3
|
|
||||||
License :: OSI Approved :: MIT License
|
|
||||||
Operating System :: OS Independent
|
|
||||||
|
|
||||||
[options]
|
|
||||||
package_dir =
|
|
||||||
= src
|
|
||||||
packages = find:
|
|
||||||
python_requires = >=3.10
|
|
||||||
|
|
||||||
[options.packages.find]
|
|
||||||
where = src
|
|
||||||
@@ -1 +0,0 @@
|
|||||||
from config_manager.config_manager import ConfigManager
|
|
||||||
3
src/config_manager/__init__.py
Normal file
3
src/config_manager/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
from .v2.manager import ConfigManagerV2 as ConfigManager
|
||||||
|
from .v1.cfg_manager import ConfigManager as LegacyConfigManager
|
||||||
|
from .v1.log_manager import LogManager
|
||||||
@@ -1,26 +1,29 @@
|
|||||||
|
import logging
|
||||||
|
import logging.config
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import yaml
|
import yaml
|
||||||
import logging
|
|
||||||
import logging.config
|
|
||||||
from typing import Any
|
|
||||||
import os
|
import os
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
from .log_manager import LogManager
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class ConfigManager:
|
class ConfigManager:
|
||||||
DEFAULT_UPDATE_INTERVAL = 5.0
|
DEFAULT_UPDATE_INTERVAL = 5.0
|
||||||
DEFAULT_WORK_INTERVAL = 2.0
|
DEFAULT_WORK_INTERVAL = 2.0
|
||||||
|
|
||||||
def __init__(self, path: str):
|
def __init__(self, path: str, log_manager: Optional[LogManager] = None):
|
||||||
self.path = path
|
self.path = path
|
||||||
self.config: Any = None
|
self.config: Any = None
|
||||||
self._last_hash = None
|
self._last_hash = None
|
||||||
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
||||||
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
||||||
self._halt = asyncio.Event()
|
self._halt = asyncio.Event()
|
||||||
|
self._task: Optional[asyncio.Task] = None
|
||||||
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||||
|
|
||||||
|
self._log_manager = log_manager or LogManager()
|
||||||
|
self.logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
def _read_file_sync(self) -> str:
|
def _read_file_sync(self) -> str:
|
||||||
with open(self.path, "r", encoding="utf-8") as f:
|
with open(self.path, "r", encoding="utf-8") as f:
|
||||||
@@ -29,9 +32,9 @@ class ConfigManager:
|
|||||||
async def _read_file_async(self) -> str:
|
async def _read_file_async(self) -> str:
|
||||||
return await asyncio.to_thread(self._read_file_sync)
|
return await asyncio.to_thread(self._read_file_sync)
|
||||||
|
|
||||||
def _parse_config(self, data: str) -> Any:
|
def _parse_config(self, data) -> Any:
|
||||||
ext = os.path.splitext(self.path)[1].lower()
|
extension = os.path.splitext(self.path)[1].lower()
|
||||||
if ext in (".yaml", ".yml"):
|
if extension in (".yaml", ".yml"):
|
||||||
return yaml.safe_load(data)
|
return yaml.safe_load(data)
|
||||||
else:
|
else:
|
||||||
return json.loads(data)
|
return json.loads(data)
|
||||||
@@ -39,31 +42,21 @@ class ConfigManager:
|
|||||||
def _update_intervals_from_config(self) -> None:
|
def _update_intervals_from_config(self) -> None:
|
||||||
if not self.config:
|
if not self.config:
|
||||||
return
|
return
|
||||||
# Берём интервалы из секции config обновления, с контролем типа и значений
|
|
||||||
upd = self.config.get("update_interval")
|
upd = self.config.get("update_interval")
|
||||||
wrk = self.config.get("work_interval")
|
wrk = self.config.get("work_interval")
|
||||||
|
|
||||||
if isinstance(upd, (int, float)) and upd > 0:
|
if isinstance(upd, (int, float)) and upd > 0:
|
||||||
self.update_interval = float(upd)
|
self.update_interval = float(upd)
|
||||||
logger.info(f"Update interval set to {self.update_interval} seconds")
|
self.logger.info(f"Update interval set to {self.update_interval} seconds")
|
||||||
else:
|
else:
|
||||||
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
||||||
|
|
||||||
if isinstance(wrk, (int, float)) and wrk > 0:
|
if isinstance(wrk, (int, float)) and wrk > 0:
|
||||||
self.work_interval = float(wrk)
|
self.work_interval = float(wrk)
|
||||||
logger.info(f"Work interval set to {self.work_interval} seconds")
|
self.logger.info(f"Work interval set to {self.work_interval} seconds")
|
||||||
else:
|
else:
|
||||||
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
||||||
|
|
||||||
def _apply_logging_config(self, config: dict) -> None:
|
|
||||||
try:
|
|
||||||
logging_config = config.get("logging")
|
|
||||||
if logging_config:
|
|
||||||
logging.config.dictConfig(logging_config)
|
|
||||||
logger.info("Logging configuration applied")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error applying logging config: {e}")
|
|
||||||
|
|
||||||
async def _update_config(self) -> None:
|
async def _update_config(self) -> None:
|
||||||
try:
|
try:
|
||||||
data = await self._read_file_async()
|
data = await self._read_file_async()
|
||||||
@@ -73,12 +66,11 @@ class ConfigManager:
|
|||||||
self.config = new_config
|
self.config = new_config
|
||||||
self._last_hash = current_hash
|
self._last_hash = current_hash
|
||||||
|
|
||||||
self._apply_logging_config(new_config)
|
self._log_manager.apply_config(new_config)
|
||||||
self._update_intervals_from_config()
|
self._update_intervals_from_config()
|
||||||
|
|
||||||
logger.info("Config updated: %s", self.config)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error reading/parsing config file: {e}")
|
self.logger.error(f"Error reading/parsing config file: {e}")
|
||||||
|
|
||||||
def execute(self) -> None:
|
def execute(self) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -98,37 +90,48 @@ class ConfigManager:
|
|||||||
await self._update_config()
|
await self._update_config()
|
||||||
await asyncio.sleep(self.update_interval)
|
await asyncio.sleep(self.update_interval)
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def _run(self) -> None:
|
||||||
|
"""Внутренняя корутина, запускающая все циклы"""
|
||||||
self._halt.clear()
|
self._halt.clear()
|
||||||
logger.info("ConfigManager started")
|
self.logger.info("ConfigManager started")
|
||||||
await asyncio.gather(
|
try:
|
||||||
self._worker_loop(),
|
await asyncio.gather(
|
||||||
self._periodic_update_loop()
|
self._worker_loop(),
|
||||||
)
|
self._periodic_update_loop()
|
||||||
|
)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
self.logger.info("ConfigManager tasks cancelled")
|
||||||
|
finally:
|
||||||
|
self.logger.info("ConfigManager stopped")
|
||||||
|
|
||||||
def stop(self) -> None:
|
async def start(self) -> None:
|
||||||
|
if self._task is not None and not self._task.done():
|
||||||
|
self.logger.warning("ConfigManager is already running")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._loop = asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
self.logger.error("start() must be called from within an async context")
|
||||||
|
raise
|
||||||
|
|
||||||
|
self.logger.info("ConfigManager starting and awaiting _run()")
|
||||||
|
await self._run()
|
||||||
|
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Останавливает менеджер конфигурации и ожидает завершения"""
|
||||||
|
if self._task is None:
|
||||||
|
self.logger.warning("ConfigManager is not running")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.logger.info("ConfigManager stopping...")
|
||||||
self._halt.set()
|
self._halt.set()
|
||||||
logger.info("ConfigManager stopping...")
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._task = None
|
||||||
|
self.logger.info("ConfigManager stopped successfully")
|
||||||
# Пример наследования и переопределения execute
|
|
||||||
class MyApp(ConfigManager):
|
|
||||||
def execute(self) -> None:
|
|
||||||
logger.info("Executing blocking work with config: %s", self.config)
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
app = MyApp("config.yaml") # Можно config.json или config.yaml
|
|
||||||
task = asyncio.create_task(app.start())
|
|
||||||
await asyncio.sleep(20)
|
|
||||||
app.stop()
|
|
||||||
await task
|
|
||||||
logger.info("Work finished.")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
logging.basicConfig(level=logging.INFO,
|
|
||||||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
|
|
||||||
asyncio.run(main())
|
|
||||||
40
src/config_manager/v1/log_manager.py
Normal file
40
src/config_manager/v1/log_manager.py
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
import logging
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
class LogManager:
|
||||||
|
"""
|
||||||
|
Управляет конфигурацией логирования приложения.
|
||||||
|
Применяет конфигурацию из словаря с обработкой ошибок.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.logger = logging.getLogger(__name__)
|
||||||
|
self._last_valid_config: Optional[dict] = None
|
||||||
|
|
||||||
|
def apply_config(self, config: dict) -> None:
|
||||||
|
"""
|
||||||
|
Применяет конфигурацию логирования из словаря.
|
||||||
|
При ошибке восстанавливает последний валидный конфиг.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config: Словарь с настройками логирования (из файла конфига)
|
||||||
|
"""
|
||||||
|
logging_config = config.get("log")
|
||||||
|
if not logging_config:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
logging.config.dictConfig(logging_config)
|
||||||
|
self._last_valid_config = logging_config
|
||||||
|
self.logger.info("Logging configuration applied")
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Error applying logging config: {e}")
|
||||||
|
|
||||||
|
# Если был предыдущий валидный конфиг, восстанавливаем его
|
||||||
|
if self._last_valid_config:
|
||||||
|
try:
|
||||||
|
logging.config.dictConfig(self._last_valid_config)
|
||||||
|
self.logger.info("Previous logging configuration restored")
|
||||||
|
except Exception as restore_error:
|
||||||
|
self.logger.error(f"Error restoring previous config: {restore_error}")
|
||||||
4
src/config_manager/v2/__init__.py
Normal file
4
src/config_manager/v2/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
from .manager import ConfigManagerV2
|
||||||
|
from .types import HealthServerSettings
|
||||||
|
|
||||||
|
__all__ = ["ConfigManagerV2", "HealthServerSettings"]
|
||||||
52
src/config_manager/v2/config_loader.py
Normal file
52
src/config_manager/v2/config_loader.py
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import hashlib
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigLoader:
|
||||||
|
def __init__(self, path: str):
|
||||||
|
"""Initialize loader state for a specific config file path."""
|
||||||
|
self.path = path
|
||||||
|
self.config: Any = None
|
||||||
|
self.last_valid_config: Any = None
|
||||||
|
self._last_seen_hash: Optional[str] = None
|
||||||
|
|
||||||
|
def _read_file_sync(self) -> str:
|
||||||
|
"""Read raw config text from disk synchronously."""
|
||||||
|
with open(self.path, "r", encoding="utf-8") as fh:
|
||||||
|
return fh.read()
|
||||||
|
|
||||||
|
async def read_file_async(self) -> str:
|
||||||
|
"""Read raw config text from disk in a worker thread."""
|
||||||
|
return await asyncio.to_thread(self._read_file_sync)
|
||||||
|
|
||||||
|
def parse_config(self, data: str) -> Any:
|
||||||
|
"""Parse config text as YAML or JSON based on file extension."""
|
||||||
|
extension = os.path.splitext(self.path)[1].lower()
|
||||||
|
if extension in (".yaml", ".yml"):
|
||||||
|
return yaml.safe_load(data)
|
||||||
|
return json.loads(data)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _calculate_hash(data: str) -> str:
|
||||||
|
"""Calculate a stable content hash for change detection."""
|
||||||
|
return hashlib.sha256(data.encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
async def load_if_changed(self) -> tuple[bool, Any]:
|
||||||
|
"""Load and parse config only when file content changed."""
|
||||||
|
raw_data = await self.read_file_async()
|
||||||
|
current_hash = self._calculate_hash(raw_data)
|
||||||
|
if current_hash == self._last_seen_hash:
|
||||||
|
return False, self.config
|
||||||
|
|
||||||
|
self._last_seen_hash = current_hash
|
||||||
|
parsed = self.parse_config(raw_data)
|
||||||
|
self.config = parsed
|
||||||
|
self.last_valid_config = parsed
|
||||||
|
return True, parsed
|
||||||
4
src/config_manager/v2/control/__init__.py
Normal file
4
src/config_manager/v2/control/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
from .base import ControlChannel
|
||||||
|
from .telegram import TelegramControlChannel
|
||||||
|
|
||||||
|
__all__ = ["ControlChannel", "TelegramControlChannel"]
|
||||||
21
src/config_manager/v2/control/base.py
Normal file
21
src/config_manager/v2/control/base.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from collections.abc import Awaitable, Callable
|
||||||
|
|
||||||
|
|
||||||
|
StartHandler = Callable[[], Awaitable[str]]
|
||||||
|
StopHandler = Callable[[], Awaitable[str]]
|
||||||
|
StatusHandler = Callable[[], Awaitable[str]]
|
||||||
|
|
||||||
|
|
||||||
|
class ControlChannel(ABC):
|
||||||
|
@abstractmethod
|
||||||
|
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||||
|
"""Start channel and bind command handlers."""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Stop channel and release its resources."""
|
||||||
|
raise NotImplementedError
|
||||||
111
src/config_manager/v2/control/telegram.py
Normal file
111
src/config_manager/v2/control/telegram.py
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import urllib.parse
|
||||||
|
import urllib.request
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from .base import ControlChannel, StartHandler, StatusHandler, StopHandler
|
||||||
|
|
||||||
|
|
||||||
|
class TelegramControlChannel(ControlChannel):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
token: str,
|
||||||
|
chat_id: int,
|
||||||
|
poll_interval: float = 2.0,
|
||||||
|
logger: Optional[logging.Logger] = None,
|
||||||
|
):
|
||||||
|
"""Initialize Telegram polling channel with bot and chat settings."""
|
||||||
|
self._token = token
|
||||||
|
self._chat_id = chat_id
|
||||||
|
self._poll_interval = poll_interval
|
||||||
|
self._offset: Optional[int] = None
|
||||||
|
self._task: Optional[asyncio.Task] = None
|
||||||
|
self._stop_event = asyncio.Event()
|
||||||
|
self._on_start: Optional[StartHandler] = None
|
||||||
|
self._on_stop: Optional[StopHandler] = None
|
||||||
|
self._on_status: Optional[StatusHandler] = None
|
||||||
|
self._logger = logger or logging.getLogger(__name__)
|
||||||
|
|
||||||
|
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||||
|
"""Start polling Telegram updates and register command callbacks."""
|
||||||
|
if self._task is not None and not self._task.done():
|
||||||
|
return
|
||||||
|
self._on_start = on_start
|
||||||
|
self._on_stop = on_stop
|
||||||
|
self._on_status = on_status
|
||||||
|
self._stop_event.clear()
|
||||||
|
self._task = asyncio.create_task(self._poll_loop())
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Stop polling loop and wait until task termination."""
|
||||||
|
self._stop_event.set()
|
||||||
|
if self._task is not None:
|
||||||
|
self._task.cancel()
|
||||||
|
try:
|
||||||
|
await self._task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
self._task = None
|
||||||
|
|
||||||
|
async def _poll_loop(self) -> None:
|
||||||
|
"""Continuously fetch updates and dispatch supported commands."""
|
||||||
|
while not self._stop_event.is_set():
|
||||||
|
try:
|
||||||
|
updates = await asyncio.to_thread(self._fetch_updates)
|
||||||
|
for update in updates:
|
||||||
|
await self._process_update(update)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
self._logger.warning("Telegram polling error: %s", exc)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1))
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
def _fetch_updates(self) -> list[dict]:
|
||||||
|
"""Pull new Telegram updates using the latest offset."""
|
||||||
|
params = {"timeout": 0}
|
||||||
|
if self._offset is not None:
|
||||||
|
params["offset"] = self._offset
|
||||||
|
query = urllib.parse.urlencode(params)
|
||||||
|
url = f"https://api.telegram.org/bot{self._token}/getUpdates?{query}"
|
||||||
|
with urllib.request.urlopen(url, timeout=10) as response:
|
||||||
|
payload = json.loads(response.read().decode("utf-8"))
|
||||||
|
|
||||||
|
result = payload.get("result", [])
|
||||||
|
if result:
|
||||||
|
self._offset = max(item["update_id"] for item in result) + 1
|
||||||
|
return result
|
||||||
|
|
||||||
|
async def _process_update(self, update: dict) -> None:
|
||||||
|
"""Handle one Telegram update and execute mapped command."""
|
||||||
|
message = update.get("message") or {}
|
||||||
|
text = (message.get("text") or "").strip().lower()
|
||||||
|
chat = message.get("chat") or {}
|
||||||
|
chat_id = chat.get("id")
|
||||||
|
|
||||||
|
if chat_id != self._chat_id:
|
||||||
|
return
|
||||||
|
|
||||||
|
if text in {"/start", "/run"} and self._on_start is not None:
|
||||||
|
reply = await self._on_start()
|
||||||
|
elif text in {"/stop", "/halt"} and self._on_stop is not None:
|
||||||
|
reply = await self._on_stop()
|
||||||
|
elif text in {"/status", "/health"} and self._on_status is not None:
|
||||||
|
reply = await self._on_status()
|
||||||
|
else:
|
||||||
|
return
|
||||||
|
|
||||||
|
await asyncio.to_thread(self._send_message, reply)
|
||||||
|
|
||||||
|
def _send_message(self, text: str) -> None:
|
||||||
|
"""Send plain-text reply to the configured Telegram chat."""
|
||||||
|
encoded = urllib.parse.urlencode({"chat_id": self._chat_id, "text": text})
|
||||||
|
url = f"https://api.telegram.org/bot{self._token}/sendMessage"
|
||||||
|
req = urllib.request.Request(url, data=encoded.encode("utf-8"), method="POST")
|
||||||
|
with urllib.request.urlopen(req, timeout=10):
|
||||||
|
return
|
||||||
80
src/config_manager/v2/health.py
Normal file
80
src/config_manager/v2/health.py
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
from collections.abc import Awaitable, Callable
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from .types import HealthPayload
|
||||||
|
|
||||||
|
|
||||||
|
class HealthServer:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
host: str,
|
||||||
|
port: int,
|
||||||
|
path: str,
|
||||||
|
timeout: float,
|
||||||
|
health_provider: Callable[[], Awaitable[HealthPayload]],
|
||||||
|
):
|
||||||
|
"""Configure lightweight HTTP health server parameters and callback."""
|
||||||
|
self._host = host
|
||||||
|
self._port = port
|
||||||
|
self._path = path
|
||||||
|
self._timeout = timeout
|
||||||
|
self._health_provider = health_provider
|
||||||
|
self._server: Optional[asyncio.base_events.Server] = None
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""Start listening for healthcheck requests if not running."""
|
||||||
|
if self._server is not None:
|
||||||
|
return
|
||||||
|
self._server = await asyncio.start_server(self._handle_connection, self._host, self._port)
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Stop the health server and release the listening socket."""
|
||||||
|
if self._server is None:
|
||||||
|
return
|
||||||
|
self._server.close()
|
||||||
|
await self._server.wait_closed()
|
||||||
|
self._server = None
|
||||||
|
|
||||||
|
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
|
||||||
|
"""Process one HTTP connection and return JSON health payload."""
|
||||||
|
status_code = 404
|
||||||
|
payload: HealthPayload = {"status": "unhealthy", "detail": "not found"}
|
||||||
|
|
||||||
|
try:
|
||||||
|
request_line = await reader.readline()
|
||||||
|
parts = request_line.decode("utf-8", errors="ignore").strip().split(" ")
|
||||||
|
if len(parts) >= 2:
|
||||||
|
method, path = parts[0], parts[1]
|
||||||
|
if method == "GET" and path == self._path:
|
||||||
|
status_code, payload = await self._build_health_response()
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
status_code = 500
|
||||||
|
payload = {"status": "unhealthy", "detail": "internal error"}
|
||||||
|
|
||||||
|
body = json.dumps(payload).encode("utf-8")
|
||||||
|
phrase = "OK" if status_code == 200 else "SERVICE UNAVAILABLE" if status_code == 503 else "NOT FOUND"
|
||||||
|
response = (
|
||||||
|
f"HTTP/1.1 {status_code} {phrase}\r\n"
|
||||||
|
"Content-Type: application/json\r\n"
|
||||||
|
f"Content-Length: {len(body)}\r\n"
|
||||||
|
"Connection: close\r\n"
|
||||||
|
"\r\n"
|
||||||
|
).encode("utf-8") + body
|
||||||
|
|
||||||
|
writer.write(response)
|
||||||
|
await writer.drain()
|
||||||
|
writer.close()
|
||||||
|
await writer.wait_closed()
|
||||||
|
|
||||||
|
async def _build_health_response(self) -> tuple[int, HealthPayload]:
|
||||||
|
"""Build HTTP status code and body from application health callback."""
|
||||||
|
try:
|
||||||
|
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
|
||||||
|
status = payload.get("status", "unhealthy")
|
||||||
|
return (200, payload) if status == "ok" else (503, payload)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
return 503, {"status": "unhealthy", "detail": str(exc)}
|
||||||
256
src/config_manager/v2/manager.py
Normal file
256
src/config_manager/v2/manager.py
Normal file
@@ -0,0 +1,256 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from collections.abc import Awaitable
|
||||||
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
from ..v1.log_manager import LogManager
|
||||||
|
from .config_loader import ConfigLoader
|
||||||
|
from .control.base import ControlChannel
|
||||||
|
from .health import HealthServer
|
||||||
|
from .scheduler import WorkerLoop
|
||||||
|
from .types import HealthPayload, HealthServerSettings, LifecycleState
|
||||||
|
|
||||||
|
|
||||||
|
class ConfigManagerV2:
|
||||||
|
DEFAULT_UPDATE_INTERVAL = 5.0
|
||||||
|
DEFAULT_WORK_INTERVAL = 2.0
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
path: str,
|
||||||
|
log_manager: Optional[LogManager] = None,
|
||||||
|
health_settings: Optional[HealthServerSettings] = None,
|
||||||
|
control_channel: Optional[ControlChannel] = None,
|
||||||
|
):
|
||||||
|
"""Initialize manager subsystems and runtime state."""
|
||||||
|
self.path = path
|
||||||
|
self.config: Any = None
|
||||||
|
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
||||||
|
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
||||||
|
|
||||||
|
self._loader = ConfigLoader(path)
|
||||||
|
self._log_manager = log_manager or LogManager()
|
||||||
|
self._control_channel = control_channel
|
||||||
|
|
||||||
|
self._halt = asyncio.Event()
|
||||||
|
self._task: Optional[asyncio.Task] = None
|
||||||
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||||
|
|
||||||
|
self._state = LifecycleState.IDLE
|
||||||
|
self._last_execute_error: Optional[str] = None
|
||||||
|
|
||||||
|
self._health_settings = health_settings or HealthServerSettings(enabled=False)
|
||||||
|
self._health_server: Optional[HealthServer] = None
|
||||||
|
if self._health_settings.enabled:
|
||||||
|
self._health_server = HealthServer(
|
||||||
|
host=self._health_settings.host,
|
||||||
|
port=self._health_settings.port,
|
||||||
|
path=self._health_settings.path,
|
||||||
|
timeout=self._health_settings.timeout,
|
||||||
|
health_provider=self._collect_health,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def _read_file_sync(self) -> str:
|
||||||
|
"""Read config file synchronously via the shared loader."""
|
||||||
|
return self._loader._read_file_sync()
|
||||||
|
|
||||||
|
async def _read_file_async(self) -> str:
|
||||||
|
"""Read config file asynchronously via a worker thread."""
|
||||||
|
return await self._loader.read_file_async()
|
||||||
|
|
||||||
|
def _parse_config(self, data: str) -> Any:
|
||||||
|
"""Parse raw config text to a Python object."""
|
||||||
|
return self._loader.parse_config(data)
|
||||||
|
|
||||||
|
def _update_intervals_from_config(self) -> None:
|
||||||
|
"""Refresh work and update intervals from current config."""
|
||||||
|
if not isinstance(self.config, dict):
|
||||||
|
return
|
||||||
|
|
||||||
|
upd = self.config.get("update_interval")
|
||||||
|
wrk = self.config.get("work_interval")
|
||||||
|
|
||||||
|
if isinstance(upd, (int, float)) and upd > 0:
|
||||||
|
self.update_interval = float(upd)
|
||||||
|
else:
|
||||||
|
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
|
||||||
|
|
||||||
|
if isinstance(wrk, (int, float)) and wrk > 0:
|
||||||
|
self.work_interval = float(wrk)
|
||||||
|
else:
|
||||||
|
self.work_interval = self.DEFAULT_WORK_INTERVAL
|
||||||
|
|
||||||
|
async def _update_config(self) -> None:
|
||||||
|
"""Reload config and apply new settings when content changes."""
|
||||||
|
try:
|
||||||
|
changed, new_config = await self._loader.load_if_changed()
|
||||||
|
if not changed:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.config = new_config
|
||||||
|
self._update_intervals_from_config()
|
||||||
|
if isinstance(new_config, dict):
|
||||||
|
self._log_manager.apply_config(new_config)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
# Keep current config untouched and continue with last valid settings.
|
||||||
|
self.logger.error("Error reading/parsing config file: %s", exc)
|
||||||
|
if self._loader.last_valid_config is not None:
|
||||||
|
self.config = self._loader.last_valid_config
|
||||||
|
self._update_intervals_from_config()
|
||||||
|
|
||||||
|
def execute(self) -> None:
|
||||||
|
"""Override in subclass to implement one unit of blocking work."""
|
||||||
|
|
||||||
|
def get_health_status(self) -> HealthPayload:
|
||||||
|
"""Return application-specific health payload for /health."""
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
async def _collect_health(self) -> HealthPayload:
|
||||||
|
"""Aggregate lifecycle and app status into one health result."""
|
||||||
|
if self._state not in {LifecycleState.RUNNING, LifecycleState.STOPPING}:
|
||||||
|
return {"status": "unhealthy", "detail": f"state={self._state.value}"}
|
||||||
|
|
||||||
|
if self._last_execute_error is not None:
|
||||||
|
return {"status": "unhealthy", "detail": self._last_execute_error}
|
||||||
|
|
||||||
|
result = self.get_health_status()
|
||||||
|
status = result.get("status", "unhealthy")
|
||||||
|
if status not in {"ok", "degraded", "unhealthy"}:
|
||||||
|
return {"status": "unhealthy", "detail": "invalid health status"}
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _on_execute_success(self) -> None:
|
||||||
|
"""Clear the last execution error marker after successful run."""
|
||||||
|
self._last_execute_error = None
|
||||||
|
|
||||||
|
def _on_execute_error(self, exc: Exception) -> None:
|
||||||
|
"""Store and log execution failure details for health reporting."""
|
||||||
|
self._last_execute_error = str(exc)
|
||||||
|
self.logger.error("Execution error: %s", exc)
|
||||||
|
|
||||||
|
async def _worker_loop(self) -> None:
|
||||||
|
"""Run execute() repeatedly until shutdown is requested."""
|
||||||
|
worker = WorkerLoop(
|
||||||
|
execute=self.execute,
|
||||||
|
get_interval=lambda: self.work_interval,
|
||||||
|
halt_event=self._halt,
|
||||||
|
on_error=self._on_execute_error,
|
||||||
|
on_success=self._on_execute_success,
|
||||||
|
)
|
||||||
|
await worker.run()
|
||||||
|
|
||||||
|
async def _periodic_update_loop(self) -> None:
|
||||||
|
"""Periodically check config file for updates until shutdown."""
|
||||||
|
while not self._halt.is_set():
|
||||||
|
await self._update_config()
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
async def _status_text(self) -> str:
|
||||||
|
"""Build human-readable runtime status for control channels."""
|
||||||
|
health = await self._collect_health()
|
||||||
|
detail = health.get("detail")
|
||||||
|
if detail:
|
||||||
|
return f"state={self._state.value}; health={health['status']}; detail={detail}"
|
||||||
|
return f"state={self._state.value}; health={health['status']}"
|
||||||
|
|
||||||
|
async def _control_start(self) -> str:
|
||||||
|
"""Handle external start command for control channels."""
|
||||||
|
if self._state == LifecycleState.RUNNING:
|
||||||
|
return "already running"
|
||||||
|
self._halt.clear()
|
||||||
|
return "start signal accepted"
|
||||||
|
|
||||||
|
async def _control_stop(self) -> str:
|
||||||
|
"""Handle external stop command for control channels."""
|
||||||
|
self._halt.set()
|
||||||
|
return "stop signal accepted"
|
||||||
|
|
||||||
|
async def _control_status(self) -> str:
|
||||||
|
"""Handle external status command for control channels."""
|
||||||
|
return await self._status_text()
|
||||||
|
|
||||||
|
async def _start_control_channel(self) -> None:
|
||||||
|
"""Start configured control channel with bound command handlers."""
|
||||||
|
if self._control_channel is None:
|
||||||
|
return
|
||||||
|
await self._control_channel.start(self._control_start, self._control_stop, self._control_status)
|
||||||
|
|
||||||
|
async def _stop_control_channel(self) -> None:
|
||||||
|
"""Stop configured control channel if it is active."""
|
||||||
|
if self._control_channel is None:
|
||||||
|
return
|
||||||
|
await self._control_channel.stop()
|
||||||
|
|
||||||
|
async def _run(self) -> None:
|
||||||
|
"""Run manager lifecycle and coordinate all background tasks."""
|
||||||
|
self._state = LifecycleState.STARTING
|
||||||
|
self._halt.clear()
|
||||||
|
await self._update_config()
|
||||||
|
|
||||||
|
if self._health_server is not None:
|
||||||
|
await self._health_server.start()
|
||||||
|
await self._start_control_channel()
|
||||||
|
|
||||||
|
self._state = LifecycleState.RUNNING
|
||||||
|
self.logger.info("ConfigManagerV2 started")
|
||||||
|
|
||||||
|
tasks = [
|
||||||
|
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
|
||||||
|
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
await asyncio.gather(*tasks)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
self._state = LifecycleState.STOPPING
|
||||||
|
self._halt.set()
|
||||||
|
for task in tasks:
|
||||||
|
task.cancel()
|
||||||
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
await self._stop_control_channel()
|
||||||
|
if self._health_server is not None:
|
||||||
|
await self._health_server.stop()
|
||||||
|
self._state = LifecycleState.STOPPED
|
||||||
|
self.logger.info("ConfigManagerV2 stopped")
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""Start manager lifecycle from an active asyncio context."""
|
||||||
|
if self._task is not None and not self._task.done():
|
||||||
|
self.logger.warning("ConfigManagerV2 is already running")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._loop = asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
self.logger.error("start() must be called from within an async context")
|
||||||
|
raise
|
||||||
|
|
||||||
|
self._task = asyncio.create_task(self._run(), name="config-manager-v2")
|
||||||
|
try:
|
||||||
|
await self._task
|
||||||
|
finally:
|
||||||
|
self._task = None
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Request graceful shutdown and wait for manager completion."""
|
||||||
|
if self._task is None:
|
||||||
|
self.logger.warning("ConfigManagerV2 is not running")
|
||||||
|
return
|
||||||
|
|
||||||
|
self._halt.set()
|
||||||
|
if asyncio.current_task() is self._task:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
39
src/config_manager/v2/scheduler.py
Normal file
39
src/config_manager/v2/scheduler.py
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from collections.abc import Callable
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerLoop:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
execute: Callable[[], None],
|
||||||
|
get_interval: Callable[[], float],
|
||||||
|
halt_event: asyncio.Event,
|
||||||
|
on_error: Optional[Callable[[Exception], None]] = None,
|
||||||
|
on_success: Optional[Callable[[], None]] = None,
|
||||||
|
):
|
||||||
|
"""Store callbacks and synchronization primitives for worker execution."""
|
||||||
|
self._execute = execute
|
||||||
|
self._get_interval = get_interval
|
||||||
|
self._halt_event = halt_event
|
||||||
|
self._on_error = on_error
|
||||||
|
self._on_success = on_success
|
||||||
|
|
||||||
|
async def run(self) -> None:
|
||||||
|
"""Run execute repeatedly until halt is requested."""
|
||||||
|
while not self._halt_event.is_set():
|
||||||
|
try:
|
||||||
|
await asyncio.to_thread(self._execute)
|
||||||
|
if self._on_success is not None:
|
||||||
|
self._on_success()
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
if self._on_error is not None:
|
||||||
|
self._on_error(exc)
|
||||||
|
|
||||||
|
timeout = max(self._get_interval(), 0.01)
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
continue
|
||||||
30
src/config_manager/v2/types.py
Normal file
30
src/config_manager/v2/types.py
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Literal, TypedDict
|
||||||
|
|
||||||
|
|
||||||
|
HealthState = Literal["ok", "degraded", "unhealthy"]
|
||||||
|
|
||||||
|
|
||||||
|
class HealthPayload(TypedDict, total=False):
|
||||||
|
status: HealthState
|
||||||
|
detail: str
|
||||||
|
|
||||||
|
|
||||||
|
class LifecycleState(str, Enum):
|
||||||
|
IDLE = "idle"
|
||||||
|
STARTING = "starting"
|
||||||
|
RUNNING = "running"
|
||||||
|
STOPPING = "stopping"
|
||||||
|
STOPPED = "stopped"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HealthServerSettings:
|
||||||
|
enabled: bool = False
|
||||||
|
host: str = "0.0.0.0"
|
||||||
|
port: int = 8000
|
||||||
|
path: str = "/health"
|
||||||
|
timeout: float = 3.0
|
||||||
53
tests/config.yaml
Normal file
53
tests/config.yaml
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
# === Раздел с общими конфигурационными параметрами ===
|
||||||
|
runtime: 5
|
||||||
|
|
||||||
|
# === Логирование ===
|
||||||
|
log:
|
||||||
|
version: 1
|
||||||
|
disable_existing_loggers: False
|
||||||
|
|
||||||
|
formatters:
|
||||||
|
standard:
|
||||||
|
format: '%(asctime)s %(module)15s [%(levelname)8s]: %(message)s'
|
||||||
|
telegram:
|
||||||
|
format: '%(message)s'
|
||||||
|
|
||||||
|
handlers:
|
||||||
|
console:
|
||||||
|
level: DEBUG
|
||||||
|
formatter: standard
|
||||||
|
class: logging.StreamHandler
|
||||||
|
stream: ext://sys.stdout # Default is stderr
|
||||||
|
|
||||||
|
file:
|
||||||
|
level: DEBUG
|
||||||
|
formatter: standard
|
||||||
|
class: logging.handlers.RotatingFileHandler
|
||||||
|
filename: logs/log.log
|
||||||
|
mode: a
|
||||||
|
maxBytes: 500000
|
||||||
|
backupCount: 15
|
||||||
|
|
||||||
|
#telegram:
|
||||||
|
# level: CRITICAL
|
||||||
|
# formatter: telegram
|
||||||
|
# class: logging_telegram_handler.TelegramHandler
|
||||||
|
# chat_id: 211945135
|
||||||
|
# alias: "PDC"
|
||||||
|
|
||||||
|
|
||||||
|
# -- Логгеры --
|
||||||
|
loggers:
|
||||||
|
'':
|
||||||
|
handlers: [console, file]
|
||||||
|
level: INFO
|
||||||
|
propagate: False
|
||||||
|
|
||||||
|
__main__:
|
||||||
|
handlers: [console, file]
|
||||||
|
level: DEBUG
|
||||||
|
propagate: False
|
||||||
|
|
||||||
|
config_manager:
|
||||||
|
handlers: [console, file]
|
||||||
|
level: DEBUG
|
||||||
131
tests/test.py
131
tests/test.py
@@ -1,131 +0,0 @@
|
|||||||
import unittest
|
|
||||||
from unittest.mock import patch, mock_open, AsyncMock
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
import io
|
|
||||||
import json
|
|
||||||
import yaml
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
|
|
||||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))
|
|
||||||
|
|
||||||
from basic_application.basic_application import ConfigManager
|
|
||||||
|
|
||||||
|
|
||||||
class TestConfigManager(unittest.IsolatedAsyncioTestCase):
|
|
||||||
def setUp(self):
|
|
||||||
self.json_data = json.dumps({
|
|
||||||
"work_interval": 1,
|
|
||||||
"update_interval": 1,
|
|
||||||
"logging": {
|
|
||||||
"version": 1,
|
|
||||||
"handlers": {"console": {"class": "logging.StreamHandler", "level": "DEBUG"}},
|
|
||||||
"root": {"handlers": ["console"], "level": "DEBUG"}
|
|
||||||
},
|
|
||||||
"some_key": "some_value"
|
|
||||||
})
|
|
||||||
self.yaml_data = """
|
|
||||||
work_interval: 1
|
|
||||||
update_interval: 1
|
|
||||||
logging:
|
|
||||||
version: 1
|
|
||||||
handlers:
|
|
||||||
console:
|
|
||||||
class: logging.StreamHandler
|
|
||||||
level: DEBUG
|
|
||||||
root:
|
|
||||||
handlers: [console]
|
|
||||||
level: DEBUG
|
|
||||||
some_key: some_value
|
|
||||||
"""
|
|
||||||
|
|
||||||
@patch("builtins.open", new_callable=mock_open, read_data="")
|
|
||||||
async def test_read_file_async_json(self, mock_file):
|
|
||||||
mock_file.return_value.read = lambda: self.json_data
|
|
||||||
cm = ConfigManager("config.json")
|
|
||||||
content = await cm._read_file_async()
|
|
||||||
self.assertEqual(content, self.json_data)
|
|
||||||
|
|
||||||
@patch("builtins.open", new_callable=mock_open, read_data="")
|
|
||||||
async def test_read_file_async_yaml(self, mock_file):
|
|
||||||
mock_file.return_value.read = lambda: self.yaml_data
|
|
||||||
cm = ConfigManager("config.yaml")
|
|
||||||
content = await cm._read_file_async()
|
|
||||||
self.assertEqual(content, self.yaml_data)
|
|
||||||
|
|
||||||
def test_parse_json(self):
|
|
||||||
cm = ConfigManager("config.json")
|
|
||||||
parsed = cm._parse_config(self.json_data)
|
|
||||||
self.assertIsInstance(parsed, dict)
|
|
||||||
self.assertEqual(parsed["some_key"], "some_value")
|
|
||||||
|
|
||||||
def test_parse_yaml(self):
|
|
||||||
cm = ConfigManager("config.yaml")
|
|
||||||
parsed = cm._parse_config(self.yaml_data)
|
|
||||||
self.assertIsInstance(parsed, dict)
|
|
||||||
self.assertEqual(parsed["some_key"], "some_value")
|
|
||||||
|
|
||||||
@patch("basic_application.basic_application.logging.config.dictConfig")
|
|
||||||
def test_apply_logging_config(self, mock_dict_config):
|
|
||||||
cm = ConfigManager("config.json")
|
|
||||||
cm._apply_logging_config({"logging": {"version": 1}})
|
|
||||||
mock_dict_config.assert_called_once()
|
|
||||||
|
|
||||||
async def test_update_config_changes_config_and_intervals(self):
|
|
||||||
# Мокаем чтение файла
|
|
||||||
m = mock_open(read_data=self.json_data)
|
|
||||||
with patch("builtins.open", m):
|
|
||||||
cm = ConfigManager("config.json")
|
|
||||||
|
|
||||||
# Проверяем исходные интервалы
|
|
||||||
self.assertEqual(cm.update_interval, cm.DEFAULT_UPDATE_INTERVAL)
|
|
||||||
self.assertEqual(cm.work_interval, cm.DEFAULT_WORK_INTERVAL)
|
|
||||||
|
|
||||||
await cm._update_config()
|
|
||||||
|
|
||||||
# После обновления данные заполнены
|
|
||||||
self.assertIsInstance(cm.config, dict)
|
|
||||||
self.assertEqual(cm.update_interval, 1.0)
|
|
||||||
self.assertEqual(cm.work_interval, 1.0)
|
|
||||||
|
|
||||||
async def test_execute_called_in_worker_loop(self):
|
|
||||||
called = False
|
|
||||||
|
|
||||||
class TestCM(ConfigManager):
|
|
||||||
def execute(self2):
|
|
||||||
nonlocal called
|
|
||||||
called = True
|
|
||||||
|
|
||||||
cm = TestCM("config.json")
|
|
||||||
|
|
||||||
async def stop_after_delay():
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
cm.stop()
|
|
||||||
|
|
||||||
# Запускаем worker_loop и через 0.1 сек останавливаем
|
|
||||||
await asyncio.gather(cm._worker_loop(), stop_after_delay())
|
|
||||||
|
|
||||||
self.assertTrue(called)
|
|
||||||
|
|
||||||
async def test_periodic_update_loop_runs(self):
|
|
||||||
count = 0
|
|
||||||
|
|
||||||
class TestCM(ConfigManager):
|
|
||||||
async def _update_config(self2):
|
|
||||||
nonlocal count
|
|
||||||
count += 1
|
|
||||||
if count >= 2:
|
|
||||||
self2.stop()
|
|
||||||
|
|
||||||
cm = TestCM("config.json")
|
|
||||||
|
|
||||||
await cm._periodic_update_loop()
|
|
||||||
|
|
||||||
self.assertGreaterEqual(count, 2)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
logging.basicConfig(level=logging.WARNING) # отключаем логи во время тестов
|
|
||||||
unittest.main()
|
|
||||||
34
tests/test_app.py
Normal file
34
tests/test_app.py
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
#import os
|
||||||
|
#os.chdir(os.path.dirname(__file__))
|
||||||
|
|
||||||
|
from config_manager import ConfigManager
|
||||||
|
import logging
|
||||||
|
import asyncio
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
class MyApp(ConfigManager):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.iter = 0
|
||||||
|
|
||||||
|
def execute(self) -> None:
|
||||||
|
logger.info(f"current iteration {self.iter}")
|
||||||
|
self.iter += 1
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
app = MyApp("config.yaml")
|
||||||
|
logger.info("App started")
|
||||||
|
await app.start()
|
||||||
|
|
||||||
|
logger.info("App finished")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main())
|
||||||
|
|
||||||
33
tests/v2/test_config_reload_fallback.py
Normal file
33
tests/v2/test_config_reload_fallback.py
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
from config_manager.v2 import ConfigManagerV2
|
||||||
|
|
||||||
|
|
||||||
|
class ReloadApp(ConfigManagerV2):
|
||||||
|
def execute(self) -> None:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid_config_keeps_last_valid(tmp_path):
|
||||||
|
async def scenario() -> None:
|
||||||
|
cfg = tmp_path / "config.yaml"
|
||||||
|
cfg.write_text("work_interval: 0.2\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||||
|
|
||||||
|
app = ReloadApp(str(cfg))
|
||||||
|
runner = asyncio.create_task(app.start())
|
||||||
|
|
||||||
|
await asyncio.sleep(0.12)
|
||||||
|
assert app.work_interval == 0.2
|
||||||
|
assert app.update_interval == 0.05
|
||||||
|
|
||||||
|
cfg.write_text("work_interval: [broken\n", encoding="utf-8")
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
|
||||||
|
assert app.work_interval == 0.2
|
||||||
|
assert app.update_interval == 0.05
|
||||||
|
assert isinstance(app.config, dict)
|
||||||
|
|
||||||
|
await app.stop()
|
||||||
|
await runner
|
||||||
|
|
||||||
|
asyncio.run(scenario())
|
||||||
30
tests/v2/test_contract_v2.py
Normal file
30
tests/v2/test_contract_v2.py
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
from config_manager.v2 import ConfigManagerV2
|
||||||
|
|
||||||
|
|
||||||
|
class DemoApp(ConfigManagerV2):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.calls = 0
|
||||||
|
|
||||||
|
def execute(self) -> None:
|
||||||
|
self.calls += 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_execute_loop_runs(tmp_path):
|
||||||
|
async def scenario() -> None:
|
||||||
|
cfg = tmp_path / "config.yaml"
|
||||||
|
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||||
|
|
||||||
|
app = DemoApp(str(cfg))
|
||||||
|
runner = asyncio.create_task(app.start())
|
||||||
|
|
||||||
|
await asyncio.sleep(0.18)
|
||||||
|
await app.stop()
|
||||||
|
await runner
|
||||||
|
|
||||||
|
assert app.calls >= 2
|
||||||
|
assert isinstance(app.config, dict)
|
||||||
|
|
||||||
|
asyncio.run(scenario())
|
||||||
54
tests/v2/test_control_channel.py
Normal file
54
tests/v2/test_control_channel.py
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
from config_manager.v2 import ConfigManagerV2
|
||||||
|
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
|
||||||
|
|
||||||
|
|
||||||
|
class DummyControlChannel(ControlChannel):
|
||||||
|
def __init__(self):
|
||||||
|
self.on_start: StartHandler | None = None
|
||||||
|
self.on_stop: StopHandler | None = None
|
||||||
|
self.on_status: StatusHandler | None = None
|
||||||
|
self.started = False
|
||||||
|
self.stopped = False
|
||||||
|
|
||||||
|
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
|
||||||
|
self.on_start = on_start
|
||||||
|
self.on_stop = on_stop
|
||||||
|
self.on_status = on_status
|
||||||
|
self.started = True
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
self.stopped = True
|
||||||
|
|
||||||
|
|
||||||
|
class ControlledApp(ConfigManagerV2):
|
||||||
|
def execute(self) -> None:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def test_control_channel_can_stop_manager(tmp_path):
|
||||||
|
async def scenario() -> None:
|
||||||
|
cfg = tmp_path / "config.yaml"
|
||||||
|
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||||
|
|
||||||
|
channel = DummyControlChannel()
|
||||||
|
app = ControlledApp(str(cfg), control_channel=channel)
|
||||||
|
|
||||||
|
runner = asyncio.create_task(app.start())
|
||||||
|
await asyncio.sleep(0.12)
|
||||||
|
|
||||||
|
assert channel.started is True
|
||||||
|
assert channel.on_status is not None
|
||||||
|
assert channel.on_stop is not None
|
||||||
|
|
||||||
|
status_text = await channel.on_status()
|
||||||
|
assert "state=running" in status_text
|
||||||
|
|
||||||
|
stop_text = await channel.on_stop()
|
||||||
|
assert "stop signal accepted" in stop_text
|
||||||
|
|
||||||
|
await runner
|
||||||
|
assert channel.stopped is True
|
||||||
|
|
||||||
|
asyncio.run(scenario())
|
||||||
41
tests/v2/test_health_endpoint.py
Normal file
41
tests/v2/test_health_endpoint.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
from config_manager.v2.health import HealthServer
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_mapping_ok_to_200():
|
||||||
|
async def provider():
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
async def scenario() -> None:
|
||||||
|
server = HealthServer(
|
||||||
|
host="127.0.0.1",
|
||||||
|
port=8000,
|
||||||
|
path="/health",
|
||||||
|
timeout=0.2,
|
||||||
|
health_provider=provider,
|
||||||
|
)
|
||||||
|
code, payload = await server._build_health_response()
|
||||||
|
assert code == 200
|
||||||
|
assert payload["status"] == "ok"
|
||||||
|
|
||||||
|
asyncio.run(scenario())
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_mapping_unhealthy_to_503():
|
||||||
|
async def provider():
|
||||||
|
return {"status": "unhealthy", "detail": "worker failed"}
|
||||||
|
|
||||||
|
async def scenario() -> None:
|
||||||
|
server = HealthServer(
|
||||||
|
host="127.0.0.1",
|
||||||
|
port=8000,
|
||||||
|
path="/health",
|
||||||
|
timeout=0.2,
|
||||||
|
health_provider=provider,
|
||||||
|
)
|
||||||
|
code, payload = await server._build_health_response()
|
||||||
|
assert code == 503
|
||||||
|
assert payload["status"] == "unhealthy"
|
||||||
|
|
||||||
|
asyncio.run(scenario())
|
||||||
43
tests/v2/test_stop_graceful.py
Normal file
43
tests/v2/test_stop_graceful.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
import asyncio
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
from config_manager.v2 import ConfigManagerV2
|
||||||
|
|
||||||
|
|
||||||
|
class BlockingApp(ConfigManagerV2):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.started_event = threading.Event()
|
||||||
|
self.active_event = threading.Event()
|
||||||
|
self.calls = 0
|
||||||
|
|
||||||
|
def execute(self) -> None:
|
||||||
|
self.calls += 1
|
||||||
|
self.active_event.set()
|
||||||
|
self.started_event.set()
|
||||||
|
time.sleep(0.2)
|
||||||
|
self.active_event.clear()
|
||||||
|
|
||||||
|
|
||||||
|
def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path):
|
||||||
|
async def scenario() -> None:
|
||||||
|
cfg = tmp_path / "config.yaml"
|
||||||
|
cfg.write_text("work_interval: 0.05\nupdate_interval: 0.05\n", encoding="utf-8")
|
||||||
|
|
||||||
|
app = BlockingApp(str(cfg))
|
||||||
|
runner = asyncio.create_task(app.start())
|
||||||
|
|
||||||
|
started = await asyncio.to_thread(app.started_event.wait, 1.0)
|
||||||
|
assert started is True
|
||||||
|
|
||||||
|
await app.stop()
|
||||||
|
await runner
|
||||||
|
|
||||||
|
assert app.active_event.is_set() is False
|
||||||
|
assert app.calls == 1
|
||||||
|
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
assert app.calls == 1
|
||||||
|
|
||||||
|
asyncio.run(scenario())
|
||||||
Reference in New Issue
Block a user