Фикс зависаний и логирование от кодекса

This commit is contained in:
2026-02-26 11:24:34 +03:00
parent 6502f2252d
commit ad89b3db92
6 changed files with 515 additions and 209 deletions

View File

@@ -6,11 +6,14 @@ from __future__ import annotations
import asyncio import asyncio
import hashlib import hashlib
import json import json
import logging
import os import os
from typing import Any, Optional from typing import Any, Optional
import yaml import yaml
logger = logging.getLogger(__name__)
class ConfigLoader: class ConfigLoader:
def __init__(self, path: str): def __init__(self, path: str):
@@ -19,39 +22,59 @@ class ConfigLoader:
self.config: Any = None self.config: Any = None
self.last_valid_config: Any = None self.last_valid_config: Any = None
self._last_seen_hash: Optional[str] = None self._last_seen_hash: Optional[str] = None
logger.warning("ConfigLoader.__init__ result: path=%s", self.path)
def _read_file_sync(self) -> str: def _read_file_sync(self) -> str:
"""Синхронно прочитать сырой текст конфига с диска.""" """Синхронно прочитать сырой текст конфига с диска."""
with open(self.path, "r", encoding="utf-8") as fh: with open(self.path, "r", encoding="utf-8") as fh:
return fh.read() data = fh.read()
logger.warning("ConfigLoader._read_file_sync result: bytes=%s", len(data))
return data
async def read_file_async(self) -> str: async def read_file_async(self) -> str:
"""Прочитать сырой текст конфига с диска в рабочем потоке.""" """Прочитать сырой текст конфига с диска в рабочем потоке."""
return await asyncio.to_thread(self._read_file_sync) result = await asyncio.to_thread(self._read_file_sync)
logger.warning("ConfigLoader.read_file_async result: bytes=%s", len(result))
return result
def parse_config(self, data: str) -> Any: def parse_config(self, data: str) -> Any:
"""Распарсить текст конфига как YAML или JSON по расширению файла.""" """Распарсить текст конфига как YAML или JSON по расширению файла."""
extension = os.path.splitext(self.path)[1].lower() extension = os.path.splitext(self.path)[1].lower()
if extension in (".yaml", ".yml"): try:
return yaml.safe_load(data) if extension in (".yaml", ".yml"):
return json.loads(data) result = yaml.safe_load(data)
else:
result = json.loads(data)
except Exception: # noqa: BLE001
logger.exception("ConfigLoader.parse_config error: extension=%s", extension)
raise
logger.warning(
"ConfigLoader.parse_config result: extension=%s type=%s",
extension,
type(result).__name__,
)
return result
@staticmethod @staticmethod
def _calculate_hash(data: str) -> str: def _calculate_hash(data: str) -> str:
"""Вычислить устойчивый хеш содержимого для обнаружения изменений.""" """Вычислить устойчивый хеш содержимого для обнаружения изменений."""
return hashlib.sha256(data.encode("utf-8")).hexdigest() result = hashlib.sha256(data.encode("utf-8")).hexdigest()
logger.warning("ConfigLoader._calculate_hash result: hash=%s", result)
return result
async def load_if_changed(self) -> tuple[bool, Any]: async def load_if_changed(self) -> tuple[bool, Any]:
"""Загрузить и распарсить конфиг только при изменении содержимого файла.""" """Загрузить и распарсить конфиг только при изменении содержимого файла."""
raw_data = await self.read_file_async() raw_data = await self.read_file_async()
current_hash = self._calculate_hash(raw_data) current_hash = self._calculate_hash(raw_data)
if current_hash == self._last_seen_hash: if current_hash == self._last_seen_hash:
logger.warning("ConfigLoader.load_if_changed result: changed=False")
return False, self.config return False, self.config
self._last_seen_hash = current_hash self._last_seen_hash = current_hash
parsed = self.parse_config(raw_data) parsed = self.parse_config(raw_data)
self.config = parsed self.config = parsed
self.last_valid_config = parsed self.last_valid_config = parsed
logger.warning("ConfigLoader.load_if_changed result: changed=True")
return True, parsed return True, parsed

View File

@@ -1,14 +1,11 @@
"""Главный класс менеджера V2: оркестрация жизненного цикла, конфига, API и каналов управления. """Config manager v2: runtime orchestration and configuration updates."""
Запускает воркер и периодическое обновление конфига, поднимает management-сервер и control-канал при наличии настроек."""
from __future__ import annotations from __future__ import annotations
import os
import asyncio import asyncio
import logging import logging
import os
import time import time
from typing import Any, Optional from typing import Any, Optional
import logging
from ...v1.log_manager import LogManager from ...v1.log_manager import LogManager
from ..control.base import ControlChannel from ..control.base import ControlChannel
@@ -24,7 +21,197 @@ from .scheduler import WorkerLoop
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ConfigManagerV2:
def _read_env_interval(name: str, default_value: float) -> float:
"""Read positive float interval from env."""
raw_value = os.environ.get(name)
if raw_value is None:
return float(default_value)
try:
parsed = float(raw_value)
if parsed <= 0:
raise ValueError(f"{name} must be greater than zero")
return parsed
except Exception: # noqa: BLE001
logger.exception(
"ConfigManagerV2 interval parse error: env=%s raw_value=%s fallback=%s",
name,
raw_value,
default_value,
)
return float(default_value)
class _RuntimeController:
"""Runtime loops and lifecycle supervision."""
def _on_execute_success(self) -> None:
self._last_success_timestamp = time.monotonic()
self._last_execute_error = None
self.logger.warning(
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
self._last_success_timestamp,
)
def _on_execute_error(self, exc: Exception) -> None:
self._last_execute_error = str(exc)
self.logger.exception("ConfigManagerV2._on_execute_error")
self.logger.warning(
"ConfigManagerV2._on_execute_error result: last_execute_error=%s",
self._last_execute_error,
)
async def _worker_loop(self) -> None:
self.logger.warning(
"ConfigManagerV2._worker_loop result: started work_interval=%s",
self.work_interval,
)
worker = WorkerLoop(
execute=self.execute,
get_interval=lambda: self.work_interval,
halt_event=self._halt,
on_error=self._on_execute_error,
on_success=self._on_execute_success,
)
try:
await worker.run()
self.logger.warning("ConfigManagerV2._worker_loop result: completed")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._worker_loop error")
raise
finally:
self.logger.warning("ConfigManagerV2._worker_loop result: stopped")
async def _periodic_update_loop(self) -> None:
self.logger.warning(
"ConfigManagerV2._periodic_update_loop result: started update_interval=%s",
self.update_interval,
)
try:
while not self._halt.is_set():
await self._update_config()
try:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError:
continue
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._periodic_update_loop error")
raise
finally:
self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped")
async def _status_text(self) -> str:
health = await self._health_aggregator.collect()
detail = health.get("detail")
if detail:
status_text = f"state={self._state.value}; health={health['status']}; detail={detail}"
self.logger.warning("ConfigManagerV2._status_text result: %s", status_text)
return status_text
status_text = f"state={self._state.value}; health={health['status']}"
self.logger.warning("ConfigManagerV2._status_text result: %s", status_text)
return status_text
async def _start_control_channel(self) -> None:
if self._control_channel is None:
self.logger.warning("ConfigManagerV2._start_control_channel result: no control channel")
return
try:
await self._control_channel.start(
self._control_bridge.on_start,
self._control_bridge.on_stop,
self._control_bridge.on_status,
)
self.logger.warning("ConfigManagerV2._start_control_channel result: started")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._start_control_channel error")
async def _stop_control_channel(self) -> None:
if self._control_channel is None:
self.logger.warning("ConfigManagerV2._stop_control_channel result: no control channel")
return
try:
await self._control_channel.stop()
self.logger.warning("ConfigManagerV2._stop_control_channel result: stopped")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._stop_control_channel error")
async def _start_management_server(self) -> None:
if self._management_server is None:
self.logger.warning("ConfigManagerV2._start_management_server result: disabled")
return
try:
await self._management_server.start()
self.logger.warning(
"ConfigManagerV2._start_management_server result: started port=%s",
self._management_server.port,
)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._start_management_server error")
self.logger.warning(
"ConfigManagerV2._start_management_server result: failed worker will continue",
)
def _on_runtime_task_done(self, task: asyncio.Task) -> None:
if task.cancelled():
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: cancelled")
return
try:
exc = task.exception()
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._on_runtime_task_done error while reading task exception")
return
if exc is None:
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: completed")
return
self.logger.error(
"ConfigManagerV2 background task failed",
exc_info=(type(exc), exc, exc.__traceback__),
)
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: failed")
async def _run(self) -> None:
self._state = LifecycleState.STARTING
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value)
self._halt.clear()
await self._update_config()
await self._start_management_server()
await self._start_control_channel()
self._state = LifecycleState.RUNNING
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value)
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try:
await asyncio.gather(*tasks)
self.logger.warning("ConfigManagerV2._run result: background loops completed")
except asyncio.CancelledError:
self.logger.warning("ConfigManagerV2._run result: cancelled")
raise
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._run error")
raise
finally:
self._state = LifecycleState.STOPPING
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value)
self._halt.set()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
self._state = LifecycleState.STOPPED
self._task = None
self.logger.warning(
"ConfigManagerV2._run result: state=%s api_and_control_available=%s",
self._state.value,
True,
)
class ConfigManagerV2(_RuntimeController):
"""Public manager API."""
DEFAULT_UPDATE_INTERVAL = 5 DEFAULT_UPDATE_INTERVAL = 5
DEFAULT_WORK_INTERVAL = 2 DEFAULT_WORK_INTERVAL = 2
@@ -35,29 +222,25 @@ class ConfigManagerV2:
management_settings: Optional[ManagementServerSettings] = None, management_settings: Optional[ManagementServerSettings] = None,
control_channel: Optional[ControlChannel] = None, control_channel: Optional[ControlChannel] = None,
): ):
"""Инициализация подсистем менеджера и состояния рантайма.""" self.logger = logging.getLogger(__name__)
self.path = path self.path = path
self.config: Any = None self.config: Any = None
# Интервалы опроса (минуты): только здесь, в конфиг не пишем self.update_interval = _read_env_interval("UPDATE_INTERVAL", float(self.DEFAULT_UPDATE_INTERVAL))
self.update_interval = int(os.environ.get("UPDATE_INTERVAL", self.DEFAULT_UPDATE_INTERVAL)) self.work_interval = _read_env_interval("WORK_INTERVAL", float(self.DEFAULT_WORK_INTERVAL))
self.work_interval = int(os.environ.get("WORK_INTERVAL", self.DEFAULT_WORK_INTERVAL))
print(f"self.update_interval {self.update_interval}")
print(f"self.work_interval {self.work_interval}")
self._loader = ConfigLoader(path) self._loader = ConfigLoader(path)
self._log_manager = log_manager or LogManager() self._log_manager = log_manager or LogManager()
self._control_channel = control_channel self._control_channel = control_channel
self._halt = asyncio.Event() self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None self._loop: Optional[asyncio.AbstractEventLoop] = None
self._state = LifecycleState.IDLE self._state = LifecycleState.IDLE
self._last_execute_error: Optional[str] = None self._last_execute_error: Optional[str] = None
self._last_success_timestamp: Optional[float] = None self._last_success_timestamp: Optional[float] = None
self._management_settings = management_settings or ManagementServerSettings(enabled=True) settings = management_settings or ManagementServerSettings(enabled=True)
self._health_timeout = self._management_settings.health_timeout self._management_settings = settings
self._health_timeout = settings.health_timeout
self._health_aggregator = HealthAggregator( self._health_aggregator = HealthAggregator(
get_state=lambda: self._state, get_state=lambda: self._state,
get_last_error=lambda: self._last_execute_error, get_last_error=lambda: self._last_execute_error,
@@ -72,168 +255,94 @@ class ConfigManagerV2:
get_status=self._status_text, get_status=self._status_text,
) )
self._management_server: Optional[ManagementServer] = None self._management_server: Optional[ManagementServer] = None
if self._management_settings.enabled: if settings.enabled:
self._management_server = ManagementServer( self._management_server = ManagementServer(
host=self._management_settings.host, host=settings.host,
port=self._management_settings.port, port=settings.port,
timeout=self._management_settings.timeout, timeout=settings.timeout,
health_provider=self._health_aggregator.collect, health_provider=self._health_aggregator.collect,
on_start=self._api_bridge.on_start, on_start=self._api_bridge.on_start,
on_stop=self._api_bridge.on_stop, on_stop=self._api_bridge.on_stop,
) )
self.logger.warning(
self.logger = logging.getLogger(__name__) "ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s management_enabled=%s",
self.path,
self.update_interval,
self.work_interval,
self._management_server is not None,
)
def _apply_config(self, new_config: Any) -> None: def _apply_config(self, new_config: Any) -> None:
"""Применить загруженный конфиг: log_manager. Интервалы (update_interval, work_interval) задаются только в классе/наследнике."""
self.config = new_config self.config = new_config
if isinstance(new_config, dict): if isinstance(new_config, dict):
self._log_manager.apply_config(new_config) try:
self._log_manager.apply_config(new_config)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._apply_config error while applying logging config")
raise
self.logger.warning(
"ConfigManagerV2._apply_config result: config_type=%s is_dict=%s",
type(new_config).__name__,
isinstance(new_config, dict),
)
async def _update_config(self) -> None: async def _update_config(self) -> None:
"""Перезагрузить конфиг при изменении файла и применить к состоянию и log_manager."""
try: try:
changed, new_config = await self._loader.load_if_changed() changed, new_config = await self._loader.load_if_changed()
if not changed: if not changed:
self.logger.warning("ConfigManagerV2._update_config result: no changes")
return return
self._apply_config(new_config) self._apply_config(new_config)
self.logger.warning("ConfigManagerV2._update_config result: config updated")
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
self.logger.error("Error reading/parsing config file: %s", exc) self.logger.exception("ConfigManagerV2._update_config error")
if self._loader.last_valid_config is not None: if self._loader.last_valid_config is None:
self.logger.warning(
"ConfigManagerV2._update_config result: no fallback config available detail=%s",
str(exc),
)
return
try:
self._apply_config(self._loader.last_valid_config) self._apply_config(self._loader.last_valid_config)
self.logger.warning(
"ConfigManagerV2._update_config result: fallback to last valid config applied",
)
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._update_config fallback error")
def execute(self) -> None: def execute(self) -> None:
"""Переопределить в подклассе для реализации одной единицы блокирующей работы.""" """Override in subclasses."""
def get_health_status(self) -> HealthPayload: def get_health_status(self) -> HealthPayload:
"""Вернуть payload здоровья приложения для /health.
Варианты ответа по статусу:
- ``{"status": "ok"}`` — сервис в норме; GET /health → 200.
- ``{"status": "degraded", "detail": "..."}`` — работает с ограничениями; GET /health → 503.
- ``{"status": "unhealthy", "detail": "..."}`` — неработоспособен; GET /health → 503.
Поле ``detail`` опционально; для ``ok`` обычно не задаётся.
Переопределить в подклассе для своей логики здоровья."""
return {"status": "ok"} return {"status": "ok"}
def _on_execute_success(self) -> None:
"""Обновить время последнего успешного execute() и сбросить маркер ошибки."""
self._last_success_timestamp = time.monotonic()
self._last_execute_error = None
def _on_execute_error(self, exc: Exception) -> None:
"""Сохранить и залогировать детали ошибки выполнения для отчёта здоровья."""
self._last_execute_error = str(exc)
self.logger.error("Execution error: %s", exc)
async def _worker_loop(self) -> None:
"""Вызывать execute() циклически до запроса остановки."""
logger.warning("Worker loop started")
logger.debug(f"Запускаем _worker_loop с интервалом {self.work_interval}")
worker = WorkerLoop(
execute=self.execute,
get_interval=lambda: self.work_interval,
halt_event=self._halt,
on_error=self._on_execute_error,
on_success=self._on_execute_success,
)
try:
await worker.run()
finally:
logger.warning("Worker loop stopped")
async def _periodic_update_loop(self) -> None:
"""Периодически проверять файл конфига на обновления до остановки."""
while not self._halt.is_set():
await self._update_config()
try:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError:
continue
async def _status_text(self) -> str:
"""Сформировать читаемый статус рантайма для каналов управления."""
health = await self._health_aggregator.collect()
detail = health.get("detail")
if detail:
return f"state={self._state.value}; health={health['status']}; detail={detail}"
return f"state={self._state.value}; health={health['status']}"
async def _start_control_channel(self) -> None:
"""Запустить настроенный канал управления с привязанными обработчиками команд."""
if self._control_channel is None:
return
await self._control_channel.start(
self._control_bridge.on_start,
self._control_bridge.on_stop,
self._control_bridge.on_status,
)
async def _stop_control_channel(self) -> None:
"""Остановить настроенный канал управления, если он активен."""
if self._control_channel is None:
return
await self._control_channel.stop()
async def _run(self) -> None:
"""Запустить жизненный цикл менеджера и координировать фоновые задачи."""
self._state = LifecycleState.STARTING
self._halt.clear()
await self._update_config()
if self._management_server is not None:
await self._management_server.start()
await self._start_control_channel()
self._state = LifecycleState.RUNNING
self.logger.info("ConfigManagerV2 started")
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
raise
finally:
self._state = LifecycleState.STOPPING
self._halt.set()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
# Management-сервер и control channel не останавливаем: API и канал управления остаются доступными.
self._state = LifecycleState.STOPPED
self._task = None
self.logger.info("ConfigManagerV2 stopped (API and control channel remain available)")
async def start(self) -> None: async def start(self) -> None:
"""Запустить циклы execute и конфига в фоне; возвращает управление сразу (ответ на /actions/start приходит без ожидания)."""
if self._task is not None and not self._task.done(): if self._task is not None and not self._task.done():
self.logger.warning("ConfigManagerV2 is already running") self.logger.warning("ConfigManagerV2.start result: already running")
return return
try: try:
self._loop = asyncio.get_running_loop() self._loop = asyncio.get_running_loop()
except RuntimeError: except RuntimeError:
self.logger.error("start() must be called from within an async context") self.logger.exception("ConfigManagerV2.start error: must be called from within async context")
raise raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2") self._task = asyncio.create_task(self._run(), name="config-manager-v2")
self._task.add_done_callback(self._on_runtime_task_done)
self.logger.warning("ConfigManagerV2.start result: background task started")
async def stop(self) -> None: async def stop(self) -> None:
"""Запросить плавную остановку и дождаться завершения менеджера."""
if self._task is None: if self._task is None:
self.logger.warning("ConfigManagerV2 is not running") self.logger.warning("ConfigManagerV2.stop result: not running")
return return
self._halt.set() self._halt.set()
if asyncio.current_task() is self._task: if asyncio.current_task() is self._task:
self.logger.warning("ConfigManagerV2.stop result: stop requested from runtime task")
return return
try: try:
await self._task await self._task
except asyncio.CancelledError: except asyncio.CancelledError:
pass self.logger.warning("ConfigManagerV2.stop result: runtime task cancelled")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2.stop error while awaiting runtime task")
raise
finally:
self.logger.warning("ConfigManagerV2.stop result: completed")

View File

@@ -4,9 +4,12 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
from collections.abc import Callable from collections.abc import Callable
from typing import Optional from typing import Optional
logger = logging.getLogger(__name__)
class WorkerLoop: class WorkerLoop:
def __init__( def __init__(
@@ -23,20 +26,29 @@ class WorkerLoop:
self._halt_event = halt_event self._halt_event = halt_event
self._on_error = on_error self._on_error = on_error
self._on_success = on_success self._on_success = on_success
logger.warning(
"WorkerLoop.__init__ result: execute=%s",
getattr(execute, "__name__", execute.__class__.__name__),
)
async def run(self) -> None: async def run(self) -> None:
"""Вызывать execute циклически до запроса остановки.""" """Вызывать execute циклически до запроса остановки."""
logger.warning("WorkerLoop.run result: started")
while not self._halt_event.is_set(): while not self._halt_event.is_set():
try: try:
await asyncio.to_thread(self._execute) await asyncio.to_thread(self._execute)
if self._on_success is not None: if self._on_success is not None:
self._on_success() self._on_success()
logger.warning("WorkerLoop.run result: execute completed")
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
logger.exception("WorkerLoop.run error during execute")
if self._on_error is not None: if self._on_error is not None:
self._on_error(exc) self._on_error(exc)
logger.warning("WorkerLoop.run result: execute failed")
timeout = max(self._get_interval(), 0.01) timeout = max(self._get_interval(), 0.01)
try: try:
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout) await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
logger.warning("WorkerLoop.run result: stopped")

View File

@@ -4,10 +4,13 @@ ManagementApiBridge отдаёт start/stop в HTTP; ControlChannelBridge — st
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from ..types import LifecycleState from ..types import LifecycleState
logger = logging.getLogger(__name__)
class ManagementApiBridge: class ManagementApiBridge:
"""Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop).""" """Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop)."""
@@ -19,16 +22,29 @@ class ManagementApiBridge:
): ):
self._start_fn = start_fn self._start_fn = start_fn
self._stop_fn = stop_fn self._stop_fn = stop_fn
logger.warning("ManagementApiBridge.__init__ result: callbacks configured")
async def on_start(self) -> str: async def on_start(self) -> str:
"""Выполнить start и вернуть сообщение для HTTP-ответа.""" """Выполнить start и вернуть сообщение для HTTP-ответа."""
await self._start_fn() try:
return "start completed" await self._start_fn()
except Exception: # noqa: BLE001
logger.exception("ManagementApiBridge.on_start error")
raise
result = "start completed"
logger.warning("ManagementApiBridge.on_start result: %s", result)
return result
async def on_stop(self) -> str: async def on_stop(self) -> str:
"""Выполнить stop и вернуть сообщение для HTTP-ответа.""" """Выполнить stop и вернуть сообщение для HTTP-ответа."""
await self._stop_fn() try:
return "stop completed" await self._stop_fn()
except Exception: # noqa: BLE001
logger.exception("ManagementApiBridge.on_stop error")
raise
result = "stop completed"
logger.warning("ManagementApiBridge.on_stop result: %s", result)
return result
class ControlChannelBridge: class ControlChannelBridge:
@@ -43,19 +59,32 @@ class ControlChannelBridge:
self._halt = halt self._halt = halt
self._get_state = get_state self._get_state = get_state
self._get_status = get_status self._get_status = get_status
logger.warning("ControlChannelBridge.__init__ result: callbacks configured")
async def on_start(self) -> str: async def on_start(self) -> str:
"""Обработать внешний start: сбросить halt; идемпотентно при уже running.""" """Обработать внешний start: сбросить halt; идемпотентно при уже running."""
if self._get_state() == LifecycleState.RUNNING: if self._get_state() == LifecycleState.RUNNING:
return "already running" result = "already running"
logger.warning("ControlChannelBridge.on_start result: %s", result)
return result
self._halt.clear() self._halt.clear()
return "start signal accepted" result = "start signal accepted"
logger.warning("ControlChannelBridge.on_start result: %s", result)
return result
async def on_stop(self) -> str: async def on_stop(self) -> str:
"""Обработать внешний stop: установить halt.""" """Обработать внешний stop: установить halt."""
self._halt.set() self._halt.set()
return "stop signal accepted" result = "stop signal accepted"
logger.warning("ControlChannelBridge.on_stop result: %s", result)
return result
async def on_status(self) -> str: async def on_status(self) -> str:
"""Вернуть текущий текст статуса.""" """Вернуть текущий текст статуса."""
return await self._get_status() try:
result = await self._get_status()
except Exception: # noqa: BLE001
logger.exception("ControlChannelBridge.on_status error")
raise
logger.warning("ControlChannelBridge.on_status result: %s", result)
return result

View File

@@ -3,11 +3,14 @@
Здоровье = был успешный execute() за последние health_timeout секунд; иначе unhealthy с деталью (ошибка или таймаут).""" Здоровье = был успешный execute() за последние health_timeout секунд; иначе unhealthy с деталью (ошибка или таймаут)."""
from __future__ import annotations from __future__ import annotations
import logging
import time import time
from collections.abc import Callable from collections.abc import Callable
from ..types import HealthPayload, LifecycleState from ..types import HealthPayload, LifecycleState
logger = logging.getLogger(__name__)
class HealthAggregator: class HealthAggregator:
"""Формирует ответ здоровья по времени последнего успешного execute() и таймауту.""" """Формирует ответ здоровья по времени последнего успешного execute() и таймауту."""
@@ -25,6 +28,7 @@ class HealthAggregator:
self._get_last_success_timestamp = get_last_success_timestamp self._get_last_success_timestamp = get_last_success_timestamp
self._health_timeout = health_timeout self._health_timeout = health_timeout
self._get_app_health = get_app_health self._get_app_health = get_app_health
logger.warning("HealthAggregator.__init__ result: health_timeout=%s", self._health_timeout)
async def collect(self) -> HealthPayload: async def collect(self) -> HealthPayload:
"""Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state.""" """Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state."""
@@ -33,21 +37,31 @@ class HealthAggregator:
# Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy. # Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy.
if state != LifecycleState.RUNNING: if state != LifecycleState.RUNNING:
return {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value} result = {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result)
return result
last_success = self._get_last_success_timestamp() last_success = self._get_last_success_timestamp()
now = time.monotonic() now = time.monotonic()
if last_success is None: if last_success is None:
detail = self._get_last_error() or "no successful run yet" detail = self._get_last_error() or "no successful run yet"
return {"status": "unhealthy", "detail": detail, "state": state_value} result = {"status": "unhealthy", "detail": detail, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result)
return result
if (now - last_success) > self._health_timeout: if (now - last_success) > self._health_timeout:
detail = self._get_last_error() or f"no successful run within {self._health_timeout}s" detail = self._get_last_error() or f"no successful run within {self._health_timeout}s"
return {"status": "unhealthy", "detail": detail, "state": state_value} result = {"status": "unhealthy", "detail": detail, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result)
return result
result = self._get_app_health() result = self._get_app_health()
status = result.get("status", "unhealthy") status = result.get("status", "unhealthy")
if status != "ok": if status != "ok":
return {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value} unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value}
return {**result, "state": state_value} logger.warning("HealthAggregator.collect result: %s", unhealthy)
return unhealthy
healthy = {**result, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", healthy)
return healthy

View File

@@ -1,10 +1,9 @@
"""Management HTTP API на FastAPI: эндпоинты /health, /actions/start, /actions/stop. """Management HTTP API with /health, /actions/start and /actions/stop."""
Единообразное описание маршрутов через декораторы FastAPI."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import json import json
import logging
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from typing import Any, Optional from typing import Any, Optional
@@ -14,13 +13,127 @@ from uvicorn import Config, Server
from ..types import HealthPayload from ..types import HealthPayload
# Захардкоженные эндпоинты management API.
PATH_HEALTH = "/health" PATH_HEALTH = "/health"
PATH_ACTION_START = "/actions/start" PATH_ACTION_START = "/actions/start"
PATH_ACTION_STOP = "/actions/stop" PATH_ACTION_STOP = "/actions/stop"
logger = logging.getLogger(__name__)
class UvicornServerRunner:
"""Lifecycle wrapper around uvicorn Server."""
def __init__(self, host: str, port: int, timeout: int):
self._host = host
self._port = port
self._timeout = timeout
self._server: Optional[Server] = None
self._serve_task: Optional[asyncio.Task[None]] = None
self._bound_port: Optional[int] = None
logger.warning(
"UvicornServerRunner.__init__ result: host=%s port=%s timeout=%s",
self._host,
self._port,
self._timeout,
)
async def _raise_if_start_task_failed(self) -> None:
if self._serve_task is None or not self._serve_task.done():
return
try:
await self._serve_task
except SystemExit as exc:
raise RuntimeError(f"Management server exited during startup with code {exc.code}") from exc
except Exception as exc: # noqa: BLE001
raise RuntimeError("Management server failed during startup") from exc
raise RuntimeError("Management server stopped unexpectedly during startup")
async def _wait_until_started(self) -> None:
if self._server is None:
raise RuntimeError("Management server is not initialized")
loop = asyncio.get_running_loop()
deadline = loop.time() + max(float(self._timeout), 1.0)
while not self._server.started:
await self._raise_if_start_task_failed()
if loop.time() >= deadline:
raise TimeoutError("Management server startup timed out")
await asyncio.sleep(0.05)
def _resolve_bound_port(self) -> int:
if self._server is None:
return self._port
servers = getattr(self._server, "servers", None)
if not servers:
return self._port
sockets = getattr(servers[0], "sockets", None)
if not sockets:
return self._port
sockname = sockets[0].getsockname()
if isinstance(sockname, tuple) and len(sockname) > 1:
return int(sockname[1])
return self._port
async def _cleanup_start_failure(self) -> None:
if self._server is not None:
self._server.should_exit = True
if self._serve_task is not None:
try:
await self._serve_task
except BaseException: # noqa: BLE001
logger.exception("UvicornServerRunner._cleanup_start_failure error")
self._server = None
self._serve_task = None
self._bound_port = None
logger.warning("UvicornServerRunner._cleanup_start_failure result: state reset")
async def start(self, app: FastAPI) -> None:
if self._serve_task is not None and not self._serve_task.done():
logger.warning("UvicornServerRunner.start result: already running")
return
if self._serve_task is not None and self._serve_task.done():
self._serve_task = None
try:
config = Config(app=app, host=self._host, port=self._port, log_level="warning")
self._server = Server(config)
self._serve_task = asyncio.create_task(self._server.serve(), name="management-server-serve")
await self._wait_until_started()
self._bound_port = self._resolve_bound_port()
logger.warning(
"UvicornServerRunner.start result: running host=%s requested_port=%s bound_port=%s",
self._host,
self._port,
self._bound_port,
)
except Exception:
logger.exception("UvicornServerRunner.start error")
await self._cleanup_start_failure()
raise
async def stop(self) -> None:
if self._server is None or self._serve_task is None:
logger.warning("UvicornServerRunner.stop result: already stopped")
return
self._server.should_exit = True
try:
await self._serve_task
except BaseException: # noqa: BLE001
logger.exception("UvicornServerRunner.stop error")
raise
finally:
self._server = None
self._serve_task = None
self._bound_port = None
logger.warning("UvicornServerRunner.stop result: stopped")
@property
def port(self) -> int:
result = self._bound_port if self._bound_port is not None else self._port
logger.warning("UvicornServerRunner.port result: %s", result)
return result
class ManagementServer: class ManagementServer:
"""Management API endpoints and callback adapters."""
def __init__( def __init__(
self, self,
host: str, host: str,
@@ -30,17 +143,18 @@ class ManagementServer:
on_start: Optional[Callable[[], Awaitable[str]]] = None, on_start: Optional[Callable[[], Awaitable[str]]] = None,
on_stop: Optional[Callable[[], Awaitable[str]]] = None, on_stop: Optional[Callable[[], Awaitable[str]]] = None,
): ):
"""Настройка параметров и колбэков лёгкого HTTP management-сервера."""
self._host = host
self._port = port
self._timeout = timeout self._timeout = timeout
self._health_provider = health_provider self._health_provider = health_provider
self._on_start = on_start self._on_start = on_start
self._on_stop = on_stop self._on_stop = on_stop
self._uvicorn_server: Optional[Server] = None self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout)
self._serve_task: Optional[asyncio.Task[None]] = None
self._bound_port: Optional[int] = None
self._app = self._create_app() self._app = self._create_app()
logger.warning(
"ManagementServer.__init__ result: host=%s port=%s timeout=%s",
host,
port,
timeout,
)
def _create_app(self) -> FastAPI: def _create_app(self) -> FastAPI:
app = FastAPI(title="Config Manager Management API") app = FastAPI(title="Config Manager Management API")
@@ -59,28 +173,38 @@ class ManagementServer:
async def action_stop() -> JSONResponse: async def action_stop() -> JSONResponse:
return await self._action_response("stop", self._on_stop) return await self._action_response("stop", self._on_stop)
logger.warning(
"ManagementServer._create_app result: routes=%s,%s,%s",
PATH_HEALTH,
PATH_ACTION_START,
PATH_ACTION_STOP,
)
return app return app
async def _health_response(self) -> JSONResponse: async def _health_response(self) -> JSONResponse:
"""Сформировать HTTP-ответ из колбэка здоровья приложения."""
try: try:
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout) payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
status = payload.get("status", "unhealthy") status_code = 200 if payload.get("status", "unhealthy") == "ok" else 503
status_code = 200 if status == "ok" else 503 logger.warning(
"ManagementServer._health_response result: status_code=%s payload=%s",
status_code,
payload,
)
return JSONResponse(content=payload, status_code=status_code) return JSONResponse(content=payload, status_code=status_code)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
return JSONResponse( logger.exception("ManagementServer._health_response error")
content={"status": "unhealthy", "detail": str(exc)}, return JSONResponse(content={"status": "unhealthy", "detail": str(exc)}, status_code=503)
status_code=503,
)
async def _action_response( async def _action_response(
self, self,
action: str, action: str,
callback: Optional[Callable[[], Awaitable[str]]], callback: Optional[Callable[[], Awaitable[str]]],
) -> JSONResponse: ) -> JSONResponse:
"""Сформировать HTTP-ответ для колбэка действия start/stop."""
if callback is None: if callback is None:
logger.warning(
"ManagementServer._action_response result: action=%s status_code=404 detail=handler not configured",
action,
)
return JSONResponse( return JSONResponse(
content={"status": "error", "detail": f"{action} handler is not configured"}, content={"status": "error", "detail": f"{action} handler is not configured"},
status_code=404, status_code=404,
@@ -89,57 +213,52 @@ class ManagementServer:
detail = await callback() detail = await callback()
if not detail: if not detail:
detail = f"{action} action accepted" detail = f"{action} action accepted"
logger.warning(
"ManagementServer._action_response result: action=%s status_code=200 detail=%s",
action,
detail,
)
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200) return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
return JSONResponse( logger.exception("ManagementServer._action_response error: action=%s", action)
content={"status": "error", "detail": str(exc)}, return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
status_code=500,
)
def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]: def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]:
"""Для тестов: вернуть (status_code, payload) как раньше."""
async def _run() -> tuple[int, HealthPayload]: async def _run() -> tuple[int, HealthPayload]:
response = await self._health_response() response = await self._health_response()
body: Any = response.body body: Any = response.body
if isinstance(body, bytes): if isinstance(body, bytes):
body = json.loads(body.decode("utf-8")) body = json.loads(body.decode("utf-8"))
logger.warning(
"ManagementServer._build_health_response result: status_code=%s payload=%s",
response.status_code,
body,
)
return response.status_code, body return response.status_code, body
return _run() return _run()
async def start(self) -> None: async def start(self) -> None:
"""Начать приём запросов к API здоровья и действий, если ещё не запущен.""" try:
if self._serve_task is not None: await self._runner.start(self._app)
return logger.warning("ManagementServer.start result: started")
config = Config( except Exception: # noqa: BLE001
app=self._app, logger.exception("ManagementServer.start error")
host=self._host, raise
port=self._port,
log_level="warning",
)
self._uvicorn_server = Server(config)
self._serve_task = asyncio.create_task(self._uvicorn_server.serve())
await asyncio.sleep(0.05)
if self._uvicorn_server.servers:
sock = self._uvicorn_server.servers[0].sockets[0]
self._bound_port = sock.getsockname()[1]
else:
self._bound_port = self._port
async def stop(self) -> None: async def stop(self) -> None:
"""Остановить management-сервер и освободить сокет.""" try:
if self._uvicorn_server is None or self._serve_task is None: await self._runner.stop()
return logger.warning("ManagementServer.stop result: stopped")
self._uvicorn_server.should_exit = True except BaseException: # noqa: BLE001
await self._serve_task logger.exception("ManagementServer.stop error")
self._uvicorn_server = None raise
self._serve_task = None
self._bound_port = None
@property @property
def port(self) -> int: def port(self) -> int:
"""Порт, на котором слушает сервер (после start); при port=0 — фактически выданный ОС.""" result = self._runner.port
return self._bound_port if self._bound_port is not None else self._port logger.warning("ManagementServer.port result: %s", result)
return result
# Backward-compatible alias.
HealthServer = ManagementServer HealthServer = ManagementServer