Большой рефакторинг с кодексом

This commit is contained in:
2026-02-26 21:58:21 +03:00
parent aa32c23dba
commit a1dd495d6d
15 changed files with 572 additions and 339 deletions

View File

@@ -1,11 +1,12 @@
# Config Manager # Config Manager
## Описание ## Описание
Пакет предназначен для запуска приложений. Пакет предназначен для запуска приложений с периодическим выполнением логики, перезагрузкой конфига и управлением по HTTP API.
Класс ConfigManager реализует точку входа программы и предоставляет актуальную конфигурацию приложения, а также упрощает настройку логирования.
## ConfigManager v2: устройство и взаимосвязи **Контракт:** приложение наследует **ConfigManagerV2**, переопределяет **execute()** (периодическая работа). Управление (старт/стоп, health) — через каналы, которые создаются снаружи и передаются в конструктор в **control_channels** (в т.ч. HttpControlChannel для API).
**ConfigManager v2** — точка входа приложения. Он наследует внутреннюю логику от **\_RuntimeController** (циклы воркера и обновления конфига, запуск/остановка каналов управления). ## ConfigManager: устройство и взаимосвязи
**ConfigManager** (класс ConfigManagerV2) — точка входа приложения. Он наследует внутреннюю логику от **\_RuntimeController** (циклы воркера и обновления конфига, запуск/остановка каналов управления).
**Ядро (core):** **Ядро (core):**
- **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг. - **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг.
@@ -19,29 +20,72 @@
- **HttpControlChannel** — HTTP API (`/health`, `/actions/start`, `/actions/stop`, `/actions/status`); использует **UvicornServerRunner**; для `/health` вызывает **HealthAggregator.collect()**, для действий — переданные обработчики из **ControlChannelBridge**. - **HttpControlChannel** — HTTP API (`/health`, `/actions/start`, `/actions/stop`, `/actions/status`); использует **UvicornServerRunner**; для `/health` вызывает **HealthAggregator.collect()**, для действий — переданные обработчики из **ControlChannelBridge**.
- **TelegramControlChannel** — реализация через long polling Telegram; команды `/start`, `/stop`, `/status` вызывают переданные обработчики. - **TelegramControlChannel** — реализация через long polling Telegram; команды `/start`, `/stop`, `/status` вызывают переданные обработчики.
**Поток работы:** при `start()` менеджер собирает список каналов: при `management.enabled: true` в **config.yaml** (секция `management`) добавляется **HttpControlChannel**, плюс опционально **control_channel** / **control_channels** в конструкторе. Все каналы поднимаются с одним **ControlChannelBridge**, затем запускаются два цикла: **WorkerLoop** и периодическое обновление конфига через **ConfigLoader**. Остановка по halt (через любой канал) завершает оба цикла; в конце останавливаются все каналы. Настройки HTTP-канала (host, port, timeout, health_timeout) задаются в config.yaml в секции `management`. **Поток работы:** при `start()` менеджер поднимает каналы из **control_channels** (заданные снаружи), затем запускает два цикла: **WorkerLoop** и периодическое обновление конфига через **ConfigLoader**. Управление по API: `/health`, `/actions/start`, `/actions/stop` — если в control_channels передан **HttpControlChannel**. Остановка по halt завершает оба цикла; в конце останавливаются все каналы.
## Диаграмма классов (v1 и v2) ## Запуск приложения с ConfigManagerV2 и HttpControlChannel
1. **Наследуйте ConfigManagerV2** и реализуйте метод `execute()` (в нём — ваша периодическая работа). При необходимости переопределите `get_health_status()` для кастомного ответа `/health`.
2. **Создайте каналы снаружи и передайте в конструктор.** Для HTTP API создайте **HttpControlChannel**; для health нужен колбэк менеджера — передайте **control_channels** как фабрику (lambda, получающую менеджер):
```python
from config_manager.v2.control import HttpControlChannel
app = MyApp(
str(path_to_config),
control_channels=lambda m: [
HttpControlChannel(
host="0.0.0.0",
port=8000,
timeout=3,
health_provider=m.get_health_provider(),
)
],
)
```
Либо передайте готовый список каналов: `control_channels=[channel1, channel2]`.
3. **Запустите из async-контекста:** `await app.start()` или `asyncio.create_task(app.start())` для фона. Остановка: `await app.stop()` или запрос `/actions/stop` по HTTP.
**Минимальный пример с HTTP API:**
```python
import asyncio
import logging
from pathlib import Path
from config_manager import ConfigManager
from config_manager.v2.control import HttpControlChannel
class MyApp(ConfigManager):
def execute(self) -> None:
pass # ваша периодическая работа
async def main() -> None:
app = MyApp(
str(Path(__file__).parent / "config.yaml"),
control_channels=lambda m: [
HttpControlChannel(
host="0.0.0.0", port=8000, timeout=3,
health_provider=m.get_health_provider(),
)
],
)
asyncio.create_task(app.start())
await asyncio.sleep(3600)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
```
Готовый пример: `tests/test_app.py`.
## Диаграмма классов
```mermaid ```mermaid
classDiagram classDiagram
direction TB direction TB
class ConfigManager {
+str path
+Any config
+float update_interval
+float work_interval
-Event _halt
-Task _task
+start() async
+stop() async
+execute()*
-_worker_loop() async
-_periodic_update_loop() async
-_update_config() async
}
class ConfigManagerV2 { class ConfigManagerV2 {
+str path +str path
+Any config +Any config
@@ -135,7 +179,6 @@ classDiagram
+int port +int port
} }
ConfigManager --> LogManager : использует
ConfigManagerV2 --|> _RuntimeController : наследует ConfigManagerV2 --|> _RuntimeController : наследует
ConfigManagerV2 --> ConfigLoader : использует ConfigManagerV2 --> ConfigLoader : использует
ConfigManagerV2 --> LogManager : использует ConfigManagerV2 --> LogManager : использует
@@ -150,7 +193,7 @@ classDiagram
ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status
``` ```
## Логирование (v2) ## Логирование
Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться. Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться.
**Как проверить, что конфигурация логирования применилась:** **Как проверить, что конфигурация логирования применилась:**

View File

@@ -1,3 +1,2 @@
from .v2 import ConfigManagerV2 as ConfigManager from .v2 import ConfigManagerV2 as ConfigManager
from .v1.cfg_manager import ConfigManager as LegacyConfigManager
from .v2.core.log_manager import LogManager from .v2.core.log_manager import LogManager

View File

@@ -1,141 +0,0 @@
import logging
import logging.config
import asyncio
import json
import yaml
import os
from typing import Any, Optional
from ..v2.core.log_manager import LogManager
class ConfigManager:
DEFAULT_UPDATE_INTERVAL = 5
DEFAULT_WORK_INTERVAL = 2
def __init__(self, path: str, log_manager: Optional[LogManager] = None):
self.path = path
self.config: Any = None
self._last_hash = None
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
self.work_interval = self.DEFAULT_WORK_INTERVAL
self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._log_manager = log_manager or LogManager()
self.logger = logging.getLogger(__name__)
def _read_file_sync(self) -> str:
with open(self.path, "r", encoding="utf-8") as f:
return f.read()
async def _read_file_async(self) -> str:
return await asyncio.to_thread(self._read_file_sync)
def _parse_config(self, data) -> Any:
extension = os.path.splitext(self.path)[1].lower()
if extension in (".yaml", ".yml"):
return yaml.safe_load(data)
else:
return json.loads(data)
def _update_intervals_from_config(self) -> None:
if not self.config:
return
upd = self.config.get("update_interval")
wrk = self.config.get("work_interval")
if isinstance(upd, (int, float)) and upd > 0:
self.update_interval = float(upd)
self.logger.info(f"Update interval set to {self.update_interval} seconds")
else:
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
if isinstance(wrk, (int, float)) and wrk > 0:
self.work_interval = float(wrk)
self.logger.info(f"Work interval set to {self.work_interval} seconds")
else:
self.work_interval = self.DEFAULT_WORK_INTERVAL
async def _update_config(self) -> None:
try:
data = await self._read_file_async()
current_hash = hash(data)
if current_hash != self._last_hash:
new_config = self._parse_config(data)
self.config = new_config
self._last_hash = current_hash
self._log_manager.apply_config(new_config)
self._update_intervals_from_config()
except Exception as e:
self.logger.error(f"Error reading/parsing config file: {e}")
def execute(self) -> None:
"""
Метод для переопределения в подклассах.
Здесь может быть блокирующая работа.
Запускается в отдельном потоке.
"""
pass
async def _worker_loop(self) -> None:
self.logger.warning("Worker loop started")
try:
while not self._halt.is_set():
await asyncio.to_thread(self.execute)
await asyncio.sleep(self.work_interval)
finally:
self.logger.warning("Worker loop stopped")
async def _periodic_update_loop(self) -> None:
while not self._halt.is_set():
await self._update_config()
await asyncio.sleep(self.update_interval)
async def _run(self) -> None:
"""Внутренняя корутина, запускающая все циклы"""
self._halt.clear()
self.logger.info("ConfigManager started")
try:
await asyncio.gather(
self._worker_loop(),
self._periodic_update_loop()
)
except asyncio.CancelledError:
self.logger.info("ConfigManager tasks cancelled")
finally:
self.logger.info("ConfigManager stopped")
async def start(self) -> None:
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManager is already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.error("start() must be called from within an async context")
raise
self.logger.info("ConfigManager starting and awaiting _run()")
await self._run()
async def stop(self) -> None:
"""Останавливает менеджер конфигурации и ожидает завершения"""
if self._task is None:
self.logger.warning("ConfigManager is not running")
return
self.logger.info("ConfigManager stopping...")
self._halt.set()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
self.logger.info("ConfigManager stopped successfully")

View File

@@ -1,4 +0,0 @@
"""Обратная совместимость: LogManager перенесён в v2.core.log_manager."""
from ..v2.core.log_manager import LogManager
__all__ = ["LogManager"]

View File

@@ -1,7 +1,6 @@
"""Публичный API V2: точка входа в менеджер конфигурации и настройки HTTP-канала из config.yaml. """Публичный API: точка входа в менеджер конфигурации.
Экспортирует ConfigManagerV2 и типы настроек для использования приложениями.""" Контракт: наследование от ConfigManagerV2, переопределение execute(), управление через API (config.yaml, секция management)."""
from .core import ConfigManagerV2 from .core import ConfigManagerV2
from .core.types import HealthServerSettings, ManagementServerSettings
__all__ = ["ConfigManagerV2", "ManagementServerSettings", "HealthServerSettings"] __all__ = ["ConfigManagerV2"]

View File

@@ -235,7 +235,7 @@ class HttpControlChannel(ControlChannel):
status_code=404, status_code=404,
) )
try: try:
detail = await callback() detail = await asyncio.wait_for(callback(), timeout=float(self._timeout))
if not detail: if not detail:
detail = f"{action} action accepted" detail = f"{action} action accepted"
logger.debug( logger.debug(
@@ -244,6 +244,12 @@ class HttpControlChannel(ControlChannel):
detail, detail,
) )
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200) return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
except asyncio.TimeoutError:
logger.warning("HttpControlChannel._action_response timeout: action=%s timeout=%s", action, self._timeout)
return JSONResponse(
content={"status": "error", "detail": f"{action} handler did not respond within {self._timeout}s"},
status_code=504,
)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
logger.exception("HttpControlChannel._action_response error: action=%s", action) logger.exception("HttpControlChannel._action_response error: action=%s", action)
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)

View File

@@ -3,7 +3,6 @@
Предоставляет обработчики start/stop/status для ControlChannel (halt, state, status text).""" Предоставляет обработчики start/stop/status для ControlChannel (halt, state, status text)."""
from __future__ import annotations from __future__ import annotations
import asyncio
import logging import logging
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
@@ -17,30 +16,30 @@ class ControlChannelBridge:
def __init__( def __init__(
self, self,
halt: asyncio.Event,
get_state: Callable[[], LifecycleState], get_state: Callable[[], LifecycleState],
get_status: Callable[[], Awaitable[str]], get_status: Callable[[], Awaitable[str]],
start_runtime: Callable[[], Awaitable[str]],
stop_runtime: Callable[[], Awaitable[str]],
): ):
self._halt = halt
self._get_state = get_state self._get_state = get_state
self._get_status = get_status self._get_status = get_status
self._start_runtime = start_runtime
self._stop_runtime = stop_runtime
logger.debug("ControlChannelBridge.__init__ result: callbacks configured") logger.debug("ControlChannelBridge.__init__ result: callbacks configured")
async def on_start(self) -> str: async def on_start(self) -> str:
"""Обработать внешний start: сбросить halt; идемпотентно при уже running.""" """Обработать внешний start через lifecycle-метод менеджера."""
if self._get_state() == LifecycleState.RUNNING: if self._get_state() == LifecycleState.RUNNING:
result = "already running" result = "already running"
logger.debug("ControlChannelBridge.on_start result: %s", result) logger.debug("ControlChannelBridge.on_start result: %s", result)
return result return result
self._halt.clear() result = await self._start_runtime()
result = "start signal accepted"
logger.debug("ControlChannelBridge.on_start result: %s", result) logger.debug("ControlChannelBridge.on_start result: %s", result)
return result return result
async def on_stop(self) -> str: async def on_stop(self) -> str:
"""Обработать внешний stop: установить halt.""" """Обработать внешний stop через lifecycle-метод менеджера."""
self._halt.set() result = await self._stop_runtime()
result = "stop signal accepted"
logger.debug("ControlChannelBridge.on_stop result: %s", result) logger.debug("ControlChannelBridge.on_stop result: %s", result)
return result return result

View File

@@ -58,6 +58,10 @@ class HealthAggregator:
result = self._get_app_health() result = self._get_app_health()
status = result.get("status", "unhealthy") status = result.get("status", "unhealthy")
if status == "degraded":
degraded = {"status": "degraded", "detail": result.get("detail", "app degraded"), "state": state_value}
logger.debug("HealthAggregator.collect result: %s", degraded)
return degraded
if status != "ok": if status != "ok":
unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value} unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value}
logger.debug("HealthAggregator.collect result: %s", unhealthy) logger.debug("HealthAggregator.collect result: %s", unhealthy)

View File

@@ -5,16 +5,16 @@ import asyncio
import logging import logging
import os import os
import time import time
from typing import Any, Iterable, Optional from collections.abc import Callable, Iterable
from typing import Any, Optional, Union
from .log_manager import LogManager
from ..control.base import ControlChannel from ..control.base import ControlChannel
from ..control.http_channel import HttpControlChannel
from .types import HealthPayload, LifecycleState, ManagementServerSettings, management_settings_from_config
from .config_loader import ConfigLoader from .config_loader import ConfigLoader
from .control_bridge import ControlChannelBridge from .control_bridge import ControlChannelBridge
from .health_aggregator import HealthAggregator from .health_aggregator import HealthAggregator
from .log_manager import LogManager
from .scheduler import WorkerLoop from .scheduler import WorkerLoop
from .types import HealthPayload, LifecycleState
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -39,9 +39,27 @@ def _read_env_interval(name: str, default_value: float) -> float:
return float(default_value) return float(default_value)
def _read_env_optional_float(name: str, default_value: Optional[float]) -> Optional[float]:
"""Read optional non-negative float from env (0 or missing => default_value)."""
raw_value = os.environ.get(name)
if raw_value is None:
return default_value
try:
parsed = float(raw_value)
if parsed < 0:
logger.warning("ConfigManagerV2 %s must be >= 0, got %s; using default %s", name, parsed, default_value)
return default_value
return parsed if parsed > 0 else default_value
except Exception: # noqa: BLE001
logger.exception("ConfigManagerV2 %s parse error: raw_value=%s fallback=%s", name, raw_value, default_value)
return default_value
class _RuntimeController: class _RuntimeController:
"""Runtime loops and lifecycle supervision.""" """Runtime loops and lifecycle supervision."""
CONTROL_CHANNEL_TIMEOUT = 5.0
def _on_execute_success(self) -> None: def _on_execute_success(self) -> None:
self._last_success_timestamp = time.monotonic() self._last_success_timestamp = time.monotonic()
self._last_execute_error = None self._last_execute_error = None
@@ -52,10 +70,23 @@ class _RuntimeController:
def _on_execute_error(self, exc: Exception) -> None: def _on_execute_error(self, exc: Exception) -> None:
self._last_execute_error = str(exc) self._last_execute_error = str(exc)
self.logger.exception("ConfigManagerV2._on_execute_error") self.logger.error(
self.logger.debug( "ConfigManagerV2._on_execute_error: %s",
"ConfigManagerV2._on_execute_error result: last_execute_error=%s",
self._last_execute_error, self._last_execute_error,
exc_info=(type(exc), exc, exc.__traceback__),
)
def _on_worker_degraded_change(self, degraded: bool) -> None:
self._worker_degraded = degraded
self.logger.warning("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None:
self._worker_inflight_count = inflight_count
self._worker_timed_out_inflight_count = timed_out_count
self.logger.debug(
"ConfigManagerV2._on_worker_metrics_change result: inflight=%s timed_out_inflight=%s",
inflight_count,
timed_out_count,
) )
async def _worker_loop(self) -> None: async def _worker_loop(self) -> None:
@@ -69,13 +100,13 @@ class _RuntimeController:
halt_event=self._halt, halt_event=self._halt,
on_error=self._on_execute_error, on_error=self._on_execute_error,
on_success=self._on_execute_success, on_success=self._on_execute_success,
execute_timeout=self._execute_timeout,
on_degraded_change=self._on_worker_degraded_change,
on_metrics_change=self._on_worker_metrics_change,
) )
try: try:
await worker.run() await worker.run()
self.logger.debug("ConfigManagerV2._worker_loop result: completed") self.logger.debug("ConfigManagerV2._worker_loop result: completed")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._worker_loop error")
raise
finally: finally:
self.logger.warning("ConfigManagerV2._worker_loop result: stopped") self.logger.warning("ConfigManagerV2._worker_loop result: stopped")
@@ -91,44 +122,119 @@ class _RuntimeController:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05)) await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._periodic_update_loop error")
raise
finally: finally:
self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped") self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped")
async def _status_text(self) -> str: async def _status_text(self) -> str:
health = await self._health_aggregator.collect() health = await self._health_aggregator.collect()
detail = health.get("detail") detail = health.get("detail")
worker_tail = (
f"worker_inflight={self._worker_inflight_count}; "
f"worker_timed_out_inflight={self._worker_timed_out_inflight_count}"
)
if detail: if detail:
status_text = f"state={self._state.value}; health={health['status']}; detail={detail}" status_text = f"state={self._state.value}; health={health['status']}; detail={detail}; {worker_tail}"
self.logger.debug("ConfigManagerV2._status_text result: %s", status_text) self.logger.debug("ConfigManagerV2._status_text result: %s", status_text)
return status_text return status_text
status_text = f"state={self._state.value}; health={health['status']}" status_text = f"state={self._state.value}; health={health['status']}; {worker_tail}"
self.logger.debug("ConfigManagerV2._status_text result: %s", status_text) self.logger.debug("ConfigManagerV2._status_text result: %s", status_text)
return status_text return status_text
async def _start_control_channels(self) -> None: async def _start_control_channels(self) -> None:
for channel in self._control_channels: for channel in self._control_channels:
try: try:
await channel.start( await asyncio.wait_for(
self._control_bridge.on_start, channel.start(
self._control_bridge.on_stop, self._control_bridge.on_start,
self._control_bridge.on_status, self._control_bridge.on_stop,
self._control_bridge.on_status,
),
timeout=self.CONTROL_CHANNEL_TIMEOUT,
) )
self.logger.debug("ConfigManagerV2._start_control_channels result: started channel=%s", type(channel).__name__) self.logger.debug("ConfigManagerV2._start_control_channels result: started channel=%s", type(channel).__name__)
except asyncio.TimeoutError:
self.logger.error(
"ConfigManagerV2._start_control_channels timeout channel=%s timeout=%ss",
type(channel).__name__,
self.CONTROL_CHANNEL_TIMEOUT,
)
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._start_control_channels error channel=%s", type(channel).__name__) self.logger.exception("ConfigManagerV2._start_control_channels error channel=%s", type(channel).__name__)
async def _stop_control_channels(self) -> None: async def _stop_control_channels(self) -> None:
for channel in self._control_channels: for channel in self._control_channels:
try: try:
await channel.stop() await asyncio.wait_for(channel.stop(), timeout=self.CONTROL_CHANNEL_TIMEOUT)
self.logger.debug("ConfigManagerV2._stop_control_channels result: stopped channel=%s", type(channel).__name__) self.logger.debug("ConfigManagerV2._stop_control_channels result: stopped channel=%s", type(channel).__name__)
except asyncio.TimeoutError:
self.logger.error(
"ConfigManagerV2._stop_control_channels timeout channel=%s timeout=%ss",
type(channel).__name__,
self.CONTROL_CHANNEL_TIMEOUT,
)
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._stop_control_channels error channel=%s", type(channel).__name__) self.logger.exception("ConfigManagerV2._stop_control_channels error channel=%s", type(channel).__name__)
def _on_runtime_task_done(self, task: asyncio.Task) -> None: async def _run_runtime_loops(self) -> None:
self._state = LifecycleState.RUNNING
self.logger.debug("ConfigManagerV2._run_runtime_loops result: state=%s", self._state.value)
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try:
await asyncio.gather(*tasks)
self.logger.debug("ConfigManagerV2._run_runtime_loops result: loops completed")
except asyncio.CancelledError:
self.logger.debug("ConfigManagerV2._run_runtime_loops result: cancelled")
raise
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._run_runtime_loops error")
raise
finally:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
self._state = LifecycleState.STOPPED
self.logger.debug("ConfigManagerV2._run_runtime_loops result: state=%s", self._state.value)
async def _start_runtime(self) -> str:
if self._shutdown.is_set():
result = "manager is shutting down"
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
return result
async with self._runtime_lock:
if self._runtime_task is not None and not self._runtime_task.done():
result = "already running"
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
return result
self._halt.clear()
self._state = LifecycleState.STARTING
self._runtime_task = asyncio.create_task(self._run_runtime_loops(), name="config-manager-v2-runtime")
self._runtime_task.add_done_callback(self._on_runtime_task_done)
result = "start signal accepted"
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
return result
async def _stop_runtime(self) -> str:
self._halt.set()
async with self._runtime_lock:
runtime_task = self._runtime_task
if runtime_task is None:
result = "already stopped"
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
return result
if runtime_task.done():
result = "already stopped"
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
return result
result = "stop signal accepted"
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
return result
def _on_runtime_task_done(self, task: asyncio.Task[None]) -> None:
if task.cancelled(): if task.cancelled():
self.logger.debug("ConfigManagerV2._on_runtime_task_done result: cancelled") self.logger.debug("ConfigManagerV2._on_runtime_task_done result: cancelled")
return return
@@ -141,82 +247,83 @@ class _RuntimeController:
self.logger.debug("ConfigManagerV2._on_runtime_task_done result: completed") self.logger.debug("ConfigManagerV2._on_runtime_task_done result: completed")
return return
self.logger.error( self.logger.error(
"ConfigManagerV2 background task failed", "ConfigManagerV2 runtime task failed",
exc_info=(type(exc), exc, exc.__traceback__), exc_info=(type(exc), exc, exc.__traceback__),
) )
self.logger.debug("ConfigManagerV2._on_runtime_task_done result: failed")
async def _run(self) -> None: async def _run(self) -> None:
self._shutdown.clear()
self._halt.clear()
self._state = LifecycleState.STARTING self._state = LifecycleState.STARTING
self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value) self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
self._halt.clear()
await self._update_config() await self._update_config()
await self._start_control_channels() await self._start_control_channels()
await self._start_runtime()
self._state = LifecycleState.RUNNING
self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try: try:
await asyncio.gather(*tasks) await self._shutdown.wait()
self.logger.debug("ConfigManagerV2._run result: background loops completed")
except asyncio.CancelledError:
self.logger.debug("ConfigManagerV2._run result: cancelled")
raise
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._run error")
raise
finally: finally:
self._state = LifecycleState.STOPPING self._state = LifecycleState.STOPPING
self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value) self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
self._halt.set() self._halt.set()
for task in tasks:
task.cancel() async with self._runtime_lock:
await asyncio.gather(*tasks, return_exceptions=True) runtime_task = self._runtime_task
if runtime_task is not None:
try:
await runtime_task
except asyncio.CancelledError:
self.logger.debug("ConfigManagerV2._run result: runtime task cancelled")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._run error while awaiting runtime task")
await self._stop_control_channels() await self._stop_control_channels()
self._runtime_task = None
self._state = LifecycleState.STOPPED self._state = LifecycleState.STOPPED
self._task = None self._task = None
self.logger.debug( self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
"ConfigManagerV2._run result: state=%s",
self._state.value,
)
class ConfigManagerV2(_RuntimeController): class ConfigManagerV2(_RuntimeController):
"""Public manager API.""" """Public manager API. Каналы управления задаются снаружи через control_channels."""
DEFAULT_UPDATE_INTERVAL = 5 DEFAULT_UPDATE_INTERVAL = 5
DEFAULT_WORK_INTERVAL = 2 DEFAULT_WORK_INTERVAL = 2
DEFAULT_HEALTH_TIMEOUT = 30
DEFAULT_EXECUTE_TIMEOUT = 600.0
def __init__( def __init__(
self, self,
path: str, path: str,
log_manager: Optional[LogManager] = None, control_channels: Optional[
control_channel: Optional[ControlChannel] = None, Union[Iterable[ControlChannel], Callable[["ConfigManagerV2"], Iterable[ControlChannel]]]
control_channels: Optional[Iterable[ControlChannel]] = None, ] = None,
): ):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.path = path self.path = path
self.config: Any = None self.config: Any = None
self.update_interval = _read_env_interval("UPDATE_INTERVAL", float(self.DEFAULT_UPDATE_INTERVAL)) self.update_interval = _read_env_interval("UPDATE_INTERVAL", float(self.DEFAULT_UPDATE_INTERVAL))
self.work_interval = _read_env_interval("WORK_INTERVAL", float(self.DEFAULT_WORK_INTERVAL)) self.work_interval = _read_env_interval("WORK_INTERVAL", float(self.DEFAULT_WORK_INTERVAL))
self._execute_timeout = _read_env_optional_float("EXECUTE_TIMEOUT", float(self.DEFAULT_EXECUTE_TIMEOUT))
self._loader = ConfigLoader(path) self._loader = ConfigLoader(path)
self._log_manager = log_manager or LogManager() self._log_manager = LogManager()
self._halt = asyncio.Event() self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None self._shutdown = asyncio.Event()
self._task: Optional[asyncio.Task[None]] = None
self._runtime_task: Optional[asyncio.Task[None]] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None self._loop: Optional[asyncio.AbstractEventLoop] = None
self._runtime_lock = asyncio.Lock()
self._state = LifecycleState.IDLE self._state = LifecycleState.IDLE
self._last_execute_error: Optional[str] = None self._last_execute_error: Optional[str] = None
self._last_success_timestamp: Optional[float] = None self._last_success_timestamp: Optional[float] = None
self._worker_degraded = False
self._worker_inflight_count = 0
self._worker_timed_out_inflight_count = 0
initial_config = self._loader.load_sync() initial_config = self._loader.load_sync()
self.config = initial_config self.config = initial_config
settings = management_settings_from_config(initial_config if isinstance(initial_config, dict) else {}) self._health_timeout = self.DEFAULT_HEALTH_TIMEOUT
self._health_timeout = settings.health_timeout
self._health_aggregator = HealthAggregator( self._health_aggregator = HealthAggregator(
get_state=lambda: self._state, get_state=lambda: self._state,
get_last_error=lambda: self._last_execute_error, get_last_error=lambda: self._last_execute_error,
@@ -225,30 +332,23 @@ class ConfigManagerV2(_RuntimeController):
get_app_health=self.get_health_status, get_app_health=self.get_health_status,
) )
self._control_bridge = ControlChannelBridge( self._control_bridge = ControlChannelBridge(
halt=self._halt,
get_state=lambda: self._state, get_state=lambda: self._state,
get_status=self._status_text, get_status=self._status_text,
start_runtime=self._start_runtime,
stop_runtime=self._stop_runtime,
) )
channels: list[ControlChannel] = [] if control_channels is None:
if settings.enabled: self._control_channels = []
channels.append( elif callable(control_channels):
HttpControlChannel( self._control_channels = list(control_channels(self))
host=settings.host, else:
port=settings.port, self._control_channels = list(control_channels)
timeout=settings.timeout,
health_provider=self._health_aggregator.collect,
)
)
if control_channels is not None:
channels.extend(control_channels)
if control_channel is not None:
channels.append(control_channel)
self._control_channels = channels
self.logger.debug( self.logger.debug(
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s control_channels=%s", "ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s execute_timeout=%s control_channels=%s",
self.path, self.path,
self.update_interval, self.update_interval,
self.work_interval, self.work_interval,
self._execute_timeout,
len(self._control_channels), len(self._control_channels),
) )
@@ -294,10 +394,17 @@ class ConfigManagerV2(_RuntimeController):
"""Override in subclasses.""" """Override in subclasses."""
def get_health_status(self) -> HealthPayload: def get_health_status(self) -> HealthPayload:
if self._worker_degraded:
return {"status": "degraded", "detail": "worker has timed-out in-flight execute()"}
return {"status": "ok"} return {"status": "ok"}
def get_health_provider(self) -> Callable[[], Any]:
"""Вернуть колбэк для health (для передачи в HttpControlChannel при создании канала снаружи)."""
return self._health_aggregator.collect
async def start(self) -> None: async def start(self) -> None:
if self._task is not None and not self._task.done(): if self._task is not None and not self._task.done():
await self._start_runtime()
self.logger.debug("ConfigManagerV2.start result: already running") self.logger.debug("ConfigManagerV2.start result: already running")
return return
try: try:
@@ -306,23 +413,20 @@ class ConfigManagerV2(_RuntimeController):
self.logger.exception("ConfigManagerV2.start error: must be called from within async context") self.logger.exception("ConfigManagerV2.start error: must be called from within async context")
raise raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2") self._task = asyncio.create_task(self._run(), name="config-manager-v2")
self._task.add_done_callback(self._on_runtime_task_done)
self.logger.debug("ConfigManagerV2.start result: background task started") self.logger.debug("ConfigManagerV2.start result: background task started")
async def stop(self) -> None: async def stop(self) -> None:
if self._task is None: if self._task is None:
self.logger.debug("ConfigManagerV2.stop result: not running") self.logger.debug("ConfigManagerV2.stop result: not running")
return return
self._shutdown.set()
self._halt.set() self._halt.set()
if asyncio.current_task() is self._task: if asyncio.current_task() is self._task:
self.logger.debug("ConfigManagerV2.stop result: stop requested from runtime task") self.logger.debug("ConfigManagerV2.stop result: stop requested from supervisor task")
return return
try: try:
await self._task await self._task
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.debug("ConfigManagerV2.stop result: runtime task cancelled") self.logger.debug("ConfigManagerV2.stop result: supervisor task cancelled")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2.stop error while awaiting runtime task")
raise
finally: finally:
self.logger.debug("ConfigManagerV2.stop result: completed") self.logger.debug("ConfigManagerV2.stop result: completed")

View File

@@ -1,16 +1,29 @@
"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке с паузой между итерациями. """Цикл воркера: повторяющийся вызов блокирующего execute() в потоке.
Поддерживает halt-событие для остановки, колбэки on_success/on_error для учёта ошибок и здоровья.""" Базовый режим: один in-flight execute().
Режим деградации: если активный execute() превысил timeout, запускается второй worker-thread,
чтобы работа продолжалась. Одновременно допускается не более двух in-flight задач."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging import logging
import time
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass
from itertools import count
from typing import Optional from typing import Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass
class _InFlightExecute:
id: int
task: asyncio.Task[None]
started_at: float
timeout_reported: bool = False
class WorkerLoop: class WorkerLoop:
def __init__( def __init__(
self, self,
@@ -19,36 +32,133 @@ class WorkerLoop:
halt_event: asyncio.Event, halt_event: asyncio.Event,
on_error: Optional[Callable[[Exception], None]] = None, on_error: Optional[Callable[[Exception], None]] = None,
on_success: Optional[Callable[[], None]] = None, on_success: Optional[Callable[[], None]] = None,
execute_timeout: Optional[float] = None,
on_degraded_change: Optional[Callable[[bool], None]] = None,
on_metrics_change: Optional[Callable[[int, int], None]] = None,
): ):
"""Сохранить колбэки и примитивы синхронизации для выполнения воркера."""
self._execute = execute self._execute = execute
self._get_interval = get_interval self._get_interval = get_interval
self._halt_event = halt_event self._halt_event = halt_event
self._on_error = on_error self._on_error = on_error
self._on_success = on_success self._on_success = on_success
logger.warning( self._execute_timeout = execute_timeout
"WorkerLoop.__init__ result: execute=%s", self._on_degraded_change = on_degraded_change
getattr(execute, "__name__", execute.__class__.__name__), self._on_metrics_change = on_metrics_change
self._inflight: list[_InFlightExecute] = []
self._id_seq = count(1)
self._degraded = False
self._last_metrics: Optional[tuple[int, int]] = None
logger.debug(
"WorkerLoop.__init__ result: execute=%s execute_timeout=%s",
getattr(execute, "__name__", type(execute).__name__),
execute_timeout,
) )
async def run(self) -> None: def _notify_error(self, exc: Exception) -> None:
"""Вызывать execute циклически до запроса остановки.""" if self._on_error is not None:
logger.warning("WorkerLoop.run result: started") self._on_error(exc)
while not self._halt_event.is_set():
def _set_degraded(self, value: bool) -> None:
if self._degraded == value:
return
self._degraded = value
if self._on_degraded_change is not None:
self._on_degraded_change(value)
logger.warning("WorkerLoop.run degraded state changed: degraded=%s", value)
def _start_execute(self) -> None:
execute_id = next(self._id_seq)
execution = _InFlightExecute(
id=execute_id,
task=asyncio.create_task(asyncio.to_thread(self._execute), name=f"worker-loop-execute-{execute_id}"),
started_at=time.monotonic(),
)
self._inflight.append(execution)
logger.debug("WorkerLoop.run result: execute started id=%s inflight=%s", execution.id, len(self._inflight))
def _collect_finished(self) -> None:
pending: list[_InFlightExecute] = []
for execution in self._inflight:
task = execution.task
if not task.done():
pending.append(execution)
continue
try: try:
await asyncio.to_thread(self._execute) task.result()
if self._on_success is not None: if self._on_success is not None:
self._on_success() self._on_success()
logger.warning("WorkerLoop.run result: execute completed") logger.debug("WorkerLoop.run result: execute completed id=%s", execution.id)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
logger.exception("WorkerLoop.run error during execute") self._notify_error(exc)
if self._on_error is not None: logger.exception("WorkerLoop.run error during execute id=%s", execution.id)
self._on_error(exc) self._inflight = pending
logger.warning("WorkerLoop.run result: execute failed")
def _mark_timeouts(self) -> None:
if self._execute_timeout is None or self._execute_timeout <= 0:
return
now = time.monotonic()
for execution in self._inflight:
if execution.timeout_reported:
continue
if execution.task.done():
continue
elapsed = now - execution.started_at
if elapsed < self._execute_timeout:
continue
execution.timeout_reported = True
self._notify_error(TimeoutError(f"execute() did not finish within {self._execute_timeout}s"))
logger.warning(
"WorkerLoop.run execute timeout: id=%s elapsed=%.3fs timeout=%ss",
execution.id,
elapsed,
self._execute_timeout,
)
def _has_timed_out_inflight(self) -> bool:
return any(item.timeout_reported and not item.task.done() for item in self._inflight)
def _ensure_capacity(self) -> None:
active_count = len(self._inflight)
if active_count == 0:
self._start_execute()
return
if active_count == 1 and self._has_timed_out_inflight():
self._start_execute()
return
def _emit_metrics(self) -> None:
if self._on_metrics_change is None:
return
inflight_count = len(self._inflight)
timed_out_count = sum(1 for item in self._inflight if item.timeout_reported and not item.task.done())
metrics = (inflight_count, timed_out_count)
if self._last_metrics == metrics:
return
self._last_metrics = metrics
self._on_metrics_change(inflight_count, timed_out_count)
async def run(self) -> None:
logger.debug("WorkerLoop.run result: started")
while not self._halt_event.is_set():
self._collect_finished()
self._mark_timeouts()
self._set_degraded(self._has_timed_out_inflight())
self._ensure_capacity()
self._emit_metrics()
timeout = max(self._get_interval(), 0.01) timeout = max(self._get_interval(), 0.01)
try: try:
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout) await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
logger.warning("WorkerLoop.run result: stopped")
if self._inflight:
if self._has_timed_out_inflight():
logger.warning("WorkerLoop.run stop: timed-out execute still running; exiting without waiting")
else:
await asyncio.gather(*(item.task for item in self._inflight), return_exceptions=True)
self._collect_finished()
self._set_degraded(False)
self._emit_metrics()
logger.debug("WorkerLoop.run result: stopped")

View File

@@ -1,11 +1,10 @@
"""Типы core: состояние здоровья, жизненного цикла и настройки HTTP-канала из config.yaml. """Типы core: состояние здоровья и жизненного цикла.
Используются в core и control для единообразных контрактов.""" Используются в core и control для единообразных контрактов."""
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass
from enum import Enum from enum import Enum
from typing import Any, Literal, TypedDict from typing import Literal, TypedDict
HealthState = Literal["ok", "degraded", "unhealthy"] HealthState = Literal["ok", "degraded", "unhealthy"]
@@ -24,39 +23,3 @@ class LifecycleState(str, Enum):
RUNNING = "running" RUNNING = "running"
STOPPING = "stopping" STOPPING = "stopping"
STOPPED = "stopped" STOPPED = "stopped"
@dataclass
class ManagementServerSettings:
"""Настройки HTTP-канала управления и healthcheck (читаются из config.yaml, секция management)."""
enabled: bool = False
host: str = "0.0.0.0"
port: int = 8000
timeout: int = 3
"""Таймаут запроса health (секунды)."""
health_timeout: int = 30
"""Секунды без успешного execute(), после которых health = unhealthy."""
HealthServerSettings = ManagementServerSettings
def management_settings_from_config(config: Any) -> ManagementServerSettings:
"""Извлечь настройки HTTP-канала из конфига (секция `management`)."""
if not isinstance(config, dict):
return ManagementServerSettings(enabled=False)
m = config.get("management")
if not isinstance(m, dict):
return ManagementServerSettings(enabled=False)
enabled = bool(m.get("enabled", False))
host = str(m.get("host", "0.0.0.0"))
port = int(m.get("port", 8000)) if isinstance(m.get("port"), (int, float)) else 8000
timeout = int(m.get("timeout", 3)) if isinstance(m.get("timeout"), (int, float)) else 3
health_timeout = int(m.get("health_timeout", 30)) if isinstance(m.get("health_timeout"), (int, float)) else 30
return ManagementServerSettings(
enabled=enabled,
host=host,
port=port,
timeout=timeout,
health_timeout=health_timeout,
)

View File

@@ -2,12 +2,12 @@
runtime: 5 runtime: 5
# === HTTP-канал управления (ConfigManagerV2): /health, /actions/start, /actions/stop === # === HTTP-канал управления (ConfigManagerV2): /health, /actions/start, /actions/stop ===
# management: management:
# enabled: true enabled: true
# host: "0.0.0.0" host: "0.0.0.0"
# port: 8000 port: 8000
# timeout: 3 timeout: 3
# health_timeout: 30 health_timeout: 30
# === Логирование === # === Логирование ===
log: log:

View File

@@ -6,14 +6,10 @@ import logging
from pathlib import Path from pathlib import Path
from config_manager import ConfigManager from config_manager import ConfigManager
from config_manager.v2.core import LogManager from config_manager.v2.control import HttpControlChannel
from config_manager.v2 import ManagementServerSettings
logger = logging.getLogger() logger = logging.getLogger()
# Таймаут health: без успешного execute() дольше этого времени — unhealthy.
HEALTH_TIMEOUT = 3.0
class MyApp(ConfigManager): class MyApp(ConfigManager):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@@ -31,21 +27,20 @@ class MyApp(ConfigManager):
async def main() -> None: async def main() -> None:
log_manager = LogManager()
# Один объект: и HTTP management-сервер (enabled, port), и health (health_timeout).
management_settings = ManagementServerSettings(
enabled=True,
port=8000,
health_timeout=HEALTH_TIMEOUT,
)
config_path = Path(__file__).parent / "config.yaml" config_path = Path(__file__).parent / "config.yaml"
print(config_path) print(config_path)
app = MyApp( app = MyApp(
str(config_path), str(config_path),
log_manager=log_manager, control_channels=lambda m: [
management_settings=management_settings, HttpControlChannel(
host="0.0.0.0",
port=8000,
timeout=3,
health_provider=m.get_health_provider(),
)
],
) )
logger.info("App starting (health_timeout=%s)", HEALTH_TIMEOUT) logger.info("App starting")
# Менеджер запускаем в фоне (start() не возвращает управление до stop). # Менеджер запускаем в фоне (start() не возвращает управление до stop).
asyncio.create_task(app.start()) asyncio.create_task(app.start())

View File

@@ -36,7 +36,7 @@ def test_control_channel_can_stop_manager(tmp_path):
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8") cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
channel = DummyControlChannel() channel = DummyControlChannel()
app = ControlledApp(str(cfg), control_channel=channel) app = ControlledApp(str(cfg), control_channels=[channel])
runner = asyncio.create_task(app.start()) runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12) await asyncio.sleep(0.12)
@@ -47,11 +47,14 @@ def test_control_channel_can_stop_manager(tmp_path):
status_text = await channel.on_status() status_text = await channel.on_status()
assert "state=running" in status_text assert "state=running" in status_text
assert "worker_inflight=" in status_text
assert "worker_timed_out_inflight=" in status_text
stop_text = await channel.on_stop() stop_text = await channel.on_stop()
assert "stop signal accepted" in stop_text assert "stop signal accepted" in stop_text
await runner await runner
# Менеджер при остановке не вызывает control_channel.stop() (канал остаётся доступным) await app.stop()
assert channel.stopped is True
asyncio.run(scenario()) asyncio.run(scenario())

View File

@@ -0,0 +1,153 @@
import asyncio
import threading
import time
from config_manager.v2 import ConfigManagerV2
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
class DummyControlChannel(ControlChannel):
def __init__(self):
self.on_start: StartHandler | None = None
self.on_stop: StopHandler | None = None
self.on_status: StatusHandler | None = None
self.started = False
self.stopped = False
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
self.on_start = on_start
self.on_stop = on_stop
self.on_status = on_status
self.started = True
async def stop(self) -> None:
self.stopped = True
class RestartableApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.05
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
def execute(self) -> None:
self.calls += 1
class TimeoutAwareApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.02
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
self.active = 0
self.max_active = 0
self._lock = threading.Lock()
def execute(self) -> None:
with self._lock:
self.calls += 1
self.active += 1
self.max_active = max(self.max_active, self.active)
try:
time.sleep(0.2)
finally:
with self._lock:
self.active -= 1
class NormalSingleThreadApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.02
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
self.active = 0
self.max_active = 0
self._lock = threading.Lock()
def execute(self) -> None:
with self._lock:
self.calls += 1
self.active += 1
self.max_active = max(self.max_active, self.active)
try:
time.sleep(0.03)
finally:
with self._lock:
self.active -= 1
def test_control_channel_stop_and_start_resumes_execute(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
channel = DummyControlChannel()
app = RestartableApp(str(cfg), control_channels=[channel])
await app.start()
await asyncio.sleep(0.2)
before_stop = app.calls
assert before_stop > 0
assert channel.on_stop is not None
assert channel.on_start is not None
stop_text = await channel.on_stop()
assert "stop signal accepted" in stop_text
await asyncio.sleep(0.2)
after_stop = app.calls
assert after_stop == before_stop
start_text = await channel.on_start()
assert "start signal accepted" in start_text
await asyncio.sleep(0.2)
assert app.calls > after_stop
await app.stop()
assert channel.stopped is True
asyncio.run(scenario())
def test_normal_mode_uses_single_inflight_execute(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = NormalSingleThreadApp(str(cfg))
await app.start()
await asyncio.sleep(0.25)
health = await app.get_health_provider()()
await app.stop()
assert app.calls >= 2
assert app.max_active == 1
assert health["status"] == "ok"
asyncio.run(scenario())
def test_execute_timeout_does_not_start_parallel_runs(tmp_path, monkeypatch):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
monkeypatch.setenv("EXECUTE_TIMEOUT", "0.05")
app = TimeoutAwareApp(str(cfg))
await app.start()
await asyncio.sleep(0.35)
degraded_health = await app.get_health_provider()()
await app.stop()
assert app.calls >= 1
assert app._last_execute_error is not None
assert "did not finish within" in app._last_execute_error
assert app.max_active == 2
assert degraded_health["status"] == "degraded"
asyncio.run(scenario())