13 Commits

Author SHA1 Message Date
0f984fa42a Новый модуль trace 2026-03-03 10:41:39 +03:00
1b287a9550 FEAT-2. Удалил английское описание и добавил сиквенс 2026-02-27 20:10:19 +03:00
bfc960233b FEAT-1. Описание readme.md на англйиском языке 2026-02-27 19:34:50 +03:00
01715b76f8 Поправлен переход в ок 2026-02-26 22:55:12 +03:00
8e023d304f Поправлен переход в ОК 2026-02-26 22:54:53 +03:00
06d9ef2868 Фикс таймаута healthy 2026-02-26 22:47:42 +03:00
d239454cfb Доработал лог 2026-02-26 22:34:47 +03:00
54f9435f1d Доработка логирования 2026-02-26 22:16:56 +03:00
170c81fc5b Большой рефакторинг с кодекс 2026-02-26 22:05:21 +03:00
2b43e2f86a Поправил версию большого рефакторинга 2026-02-26 21:59:10 +03:00
a1dd495d6d Большой рефакторинг с кодексом 2026-02-26 21:58:21 +03:00
aa32c23dba Перенос LogManager в v2 и обновление документации. Обновлены импорты и исправлены ссылки на LogManager в README и тестах. Удалены устаревшие типы и рефакторинг конфигурации управления. 2026-02-26 20:37:00 +03:00
7b5d6d2156 Рефакторинг канала управления 2026-02-26 20:26:22 +03:00
38 changed files with 1759 additions and 623 deletions

405
README.md
View File

@@ -1,21 +1,400 @@
# Config Manager # Config Manager
## Description ## Описание
This package was created to run my applications. Пакет предназначен как базовое приложение для проектов, в которых нужно периодически запускать одну и ту же функцию в одном потоке, с возможностью перезагрузки конфига и сервисным контуром вокруг прикладной логики.
The ConfigManager class implements the entry point for the program and provides the actual application configuration. It also simplifies logging setup.
## Logging (v2) Под сервисным контуром здесь понимаются:
Logging is configured from the config file only if it contains a **`log`** section in [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig) format. If there is no `log` section, the manager logs a warning and the default Python level (WARNING) remains, so INFO/DEBUG messages may not appear. - логирование;
- трассировка бизнес-процессов и связанных сущностей;
- управление приложением через каналы управления (например, HTTP API).
**How to verify that logging config is applied:** **Контракт:** приложение наследует **ConfigManagerV2**, переопределяет **execute()** (периодическая работа). Управление (старт/стоп, health) — через каналы, которые создаются снаружи и передаются в конструктор в **control_channels** (в т.ч. HttpControlChannel для API).
- Ensure your config file path is correct and the file is loaded on startup (no error in logs about reading config).
- Ensure the config has a `log` key with `version: 1`, `handlers`, and `loggers` (see `tests/config.yaml` for an example).
- After startup you should see an INFO message: `"Logging configuration applied"` (from `config_manager.v1.log_manager`). If you do not see it, either the `log` section is missing (you will see a warning) or the root/package log level is above INFO.
## Installation ## ConfigManager: устройство и взаимосвязи
**ConfigManager** (класс ConfigManagerV2) — точка входа приложения. Он наследует внутреннюю логику от **\_RuntimeController** (циклы воркера и обновления конфига, запуск/остановка каналов управления).
**Ядро (core):**
- **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг.
- **WorkerLoop** — в отдельном потоке циклически вызывает ваш метод `execute()` с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки.
- **LogManager** — применяет секцию `log` из конфига к логированию (dictConfig).
- **TraceManager** — управляет структурированной трассировкой процессов, контекстов и сообщений.
- **HealthAggregator** — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного `execute()` и таймаут здоровья; формирует единый ответ для health (ok/unhealthy).
- **ControlChannelBridge** — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса).
**Каналы управления (control):**
- **ControlChannel** — абстрактный контракт: `start(on_start, on_stop, on_status)`, `stop()`.
- **HttpControlChannel** — HTTP API (`/health`, `/actions/start`, `/actions/stop`, `/actions/status`); использует **UvicornServerRunner**; для `/health` вызывает **HealthAggregator.collect()**, для действий — переданные обработчики из **ControlChannelBridge**.
- **TelegramControlChannel** — реализация через long polling Telegram; команды `/start`, `/stop`, `/status` вызывают переданные обработчики.
**Поток работы:** при `start()` менеджер поднимает каналы из **control_channels** (заданные снаружи), затем запускает два цикла: **WorkerLoop** и периодическое обновление конфига через **ConfigLoader**. Управление по API: `/health`, `/actions/start`, `/actions/stop` — если в control_channels передан **HttpControlChannel**. Остановка по halt завершает оба цикла; в конце останавливаются все каналы.
## Запуск приложения с ConfigManagerV2 и HttpControlChannel
1. **Наследуйте ConfigManagerV2** и реализуйте метод `execute()` (в нём — ваша периодическая работа). При необходимости переопределите `get_health_status()` для кастомного ответа `/health`.
2. **Создайте каналы снаружи и передайте в конструктор.** Для HTTP API создайте **HttpControlChannel**; для health нужен колбэк менеджера — передайте **control_channels** как фабрику (lambda, получающую менеджер):
```python
from config_manager.v2.control import HttpControlChannel
app = MyApp(
str(path_to_config),
control_channels=lambda m: [
HttpControlChannel(
host="0.0.0.0",
port=8000,
timeout=3,
health_provider=m.get_health_provider(),
)
],
)
```
Либо передайте готовый список каналов: `control_channels=[channel1, channel2]`.
3. **Запустите из async-контекста:** `await app.start()` или `asyncio.create_task(app.start())` для фона. Остановка: `await app.stop()` или запрос `/actions/stop` по HTTP.
**Минимальный пример с HTTP API:**
```python
import asyncio
import logging
from pathlib import Path
from config_manager import ConfigManager
from config_manager.v2.control import HttpControlChannel
class MyApp(ConfigManager):
def execute(self) -> None:
pass # ваша периодическая работа
async def main() -> None:
app = MyApp(
str(Path(__file__).parent / "config.yaml"),
control_channels=lambda m: [
HttpControlChannel(
host="0.0.0.0", port=8000, timeout=3,
health_provider=m.get_health_provider(),
)
],
)
asyncio.create_task(app.start())
await asyncio.sleep(3600)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
```
Готовый пример: `tests/test_app.py`.
## Диаграмма классов
```mermaid
classDiagram
direction TB
class ConfigManagerV2 {
+str path
+Any config
+float update_interval
+float work_interval
-ConfigLoader _loader
-LifecycleState _state
+start() async
+stop() async
+execute()*
+get_health_status() HealthPayload
-_run() async
-_worker_loop() async
-_periodic_update_loop() async
}
class _RuntimeController {
<<внутренний>>
-_on_execute_success()
-_on_execute_error(exc)
-_worker_loop() async
-_periodic_update_loop() async
-_start_control_channels() async
-_stop_control_channels() async
-_run() async
}
class ConfigLoader {
+str path
+Any config
+Any last_valid_config
+load_if_changed() async
+parse_config(data) Any
-_read_file_sync() str
-read_file_async() async
}
class WorkerLoop {
-Callable execute
-Callable get_interval
-Event halt_event
+run() async
}
class LogManager {
+apply_config(config) None
}
class TraceManager {
+bind_context(alias, parent_id, type, attrs) str
+open_context(alias, parent_id, type, attrs) contextmanager
+current_trace_id() str
+step(name) None
+info(message, status, attrs) None
+warning(message, status, attrs) None
+error(message, status, attrs) None
}
class TraceContextStore {
+current() ActiveTraceContext
+current_trace_id() str
+push(record) ActiveTraceContext
+pop() ActiveTraceContext
+set_step(step) ActiveTraceContext
}
class TraceContextRecord {
+str trace_id
+str parent_id
+str alias
+str type
+datetime event_time
+dict attrs
}
class TraceLogMessage {
+str trace_id
+str step
+str status
+str level
+str message
+datetime event_time
+dict attrs
}
class TraceTransport {
<<protocol>>
+write_context(record) None
+write_message(record) None
}
class MySqlTraceTransport {
+write_context(record) None
+write_message(record) None
}
class ControlChannel {
<<абстрактный>>
+start(on_start, on_stop, on_status) async*
+stop() async*
}
class TelegramControlChannel {
-str _token
-int _chat_id
+start(on_start, on_stop, on_status) async
+stop() async
-_poll_loop() async
}
class HttpControlChannel {
-UvicornServerRunner _runner
-Callable _health_provider
+start(on_start, on_stop, on_status) async
+stop() async
+int port
}
class HealthAggregator {
-Callable get_state
-Callable get_app_health
+collect() async HealthPayload
}
class ControlChannelBridge {
-Event _halt
-Callable _get_state
-Callable _get_status
+on_start() async str
+on_stop() async str
+on_status() async str
}
class UvicornServerRunner {
-Server _server
-Task _serve_task
+start(app) async
+stop() async
+int port
}
ConfigManagerV2 --|> _RuntimeController : наследует
ConfigManagerV2 --> ConfigLoader : использует
ConfigManagerV2 --> LogManager : использует
ConfigManagerV2 --> TraceManager : использует
ConfigManagerV2 --> HealthAggregator : использует
ConfigManagerV2 --> ControlChannelBridge : использует
ConfigManagerV2 ..> ControlChannel : список каналов
_RuntimeController ..> WorkerLoop : создаёт в _worker_loop
TraceManager --> TraceContextStore : использует
TraceManager --> TraceTransport : использует
TraceManager ..> TraceContextRecord : создаёт
TraceManager ..> TraceLogMessage : создаёт
MySqlTraceTransport --|> TraceTransport : реализует
TelegramControlChannel --|> ControlChannel : реализует
HttpControlChannel --|> ControlChannel : реализует
HttpControlChannel --> UvicornServerRunner : использует
HttpControlChannel ..> HealthAggregator : health_provider
ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status
```
## Диаграмма последовательности (запуск и работа)
```mermaid
sequenceDiagram
autonumber
participant User
participant ConfigManagerV2
participant ControlChannel as ControlChannel(s)
participant WorkerLoop
participant ConfigLoader
participant Client as HTTP Client
User->>ConfigManagerV2: start()
ConfigManagerV2->>ConfigManagerV2: _start_control_channels()
ConfigManagerV2->>ControlChannel: start(on_start, on_stop, on_status)
ControlChannel-->>ConfigManagerV2: started
par Циклы работы
ConfigManagerV2->>WorkerLoop: run()
loop Периодически
WorkerLoop->>ConfigManagerV2: execute()
ConfigManagerV2-->>WorkerLoop: success/error
end
and
loop Периодически
ConfigManagerV2->>ConfigLoader: load_if_changed()
ConfigLoader-->>ConfigManagerV2: config / unchanged
end
end
Note over Client,ControlChannel: Запросы по HTTP API (если HttpControlChannel)
Client->>ControlChannel: GET /health
ControlChannel->>ConfigManagerV2: health_provider.collect()
ConfigManagerV2-->>ControlChannel: HealthPayload
ControlChannel-->>Client: 200 OK
Client->>ControlChannel: POST /actions/stop
ControlChannel->>ConfigManagerV2: on_stop() (bridge)
ConfigManagerV2->>ConfigManagerV2: set halt
ConfigManagerV2->>WorkerLoop: halt_event.set()
ConfigManagerV2->>ControlChannel: stop()
ControlChannel-->>Client: 200 OK
```
## Логирование
Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться.
**Как проверить, что конфигурация логирования применилась:**
- Убедитесь, что путь к файлу конфига верный и файл загружается при старте (в логах нет ошибки чтения конфига).
- Убедитесь, что в конфиге есть ключ `log` с `version: 1`, `handlers` и `loggers` (пример — `tests/config.yaml`).
- После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v2.core.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO.
## Trace
Модуль `trace` предназначен для структурированной трассировки прикладных процессов и иерархически связанных сущностей.
Базовая идея:
- есть `TraceContextRecord` — логический контекст, который группирует сообщения;
- есть `TraceLogMessage` — отдельное событие внутри текущего контекста;
- контексты могут быть вложенными: один родительский контекст и много дочерних;
- активный контекст хранится в `TraceContextStore`, а при выходе из дочернего `with` автоматически восстанавливается родитель.
### Архитектура
Основные части модуля:
- `TraceManager` — публичный API для приложений;
- `TraceContextStore` — хранение активного контекста и стека вложенности;
- `TraceContextRecord` — описание контекста;
- `TraceLogMessage` — описание сообщения;
- `TraceTransport` — интерфейс транспорта;
- `MySqlTraceTransport` — запись контекстов и сообщений в MySQL.
Сущности:
`TraceContextRecord`
- `trace_id`
- `parent_id`
- `alias`
- `type`
- `event_time`
- `attrs`
`TraceLogMessage`
- `trace_id`
- `event_time`
- `step`
- `status`
- `level`
- `message`
- `attrs`
### Принцип использования
1. На старте процесса создаётся контекст через `bind_context()` или `open_context()`.
2. Для серии сообщений выставляется текущий `step()`.
3. Сообщения пишутся через `info()/warning()/error()/exception()`.
4. При использовании `open_context()` дочерний контекст автоматически закрывается по выходу из `with`, а родительский становится текущим снова.
### Пример: корневой контекст
```python
from config_manager.v2.trace import TraceManager
trace = TraceManager()
trace_id = trace.bind_context(
alias="job-123",
type="task",
attrs={"source": "scheduler"},
)
trace.step("prepare")
trace.info("Подготовка завершена", status="completed", attrs={"items_count": 2})
```
### Пример: дочерний контекст
```python
with trace.open_context(
alias="subtask-1",
type="subtask",
parent_id=trace_id,
attrs={"segment": "phase-a"},
) as child_trace_id:
trace.step("execute")
trace.info("Подзадача запущена", status="started")
trace.info("Подзадача завершена", status="completed", attrs={"duration_ms": 120})
# Здесь снова активен родительский контекст
trace.step("finish")
trace.info("Обработка завершена", status="completed")
```
### Хранение в MySQL
Для MySQL предусмотрен `MySqlTraceTransport`. Он пишет две сущности в отдельные таблицы:
- `trace_contexts`
- `trace_messages`
Это позволяет:
- отдельно хранить структуру процесса;
- отдельно хранить историю шагов и сообщений;
- строить отчёты и трассировку без завязки на `logging`.
## Установка
``pip install git+https://git.lesha.spb.ru/alex/config_manager.git`` ``pip install git+https://git.lesha.spb.ru/alex/config_manager.git``
## Contacts ## Контакты
- **e-mail**: lesha.spb@gmail.com - **e-mail**: lesha.spb@gmail.com
- **telegram**: https://t.me/lesha_spb - **telegram**: https://t.me/lesha_spb

View File

@@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "config_manager" name = "config_manager"
version = "2.1.7" version = "2.3.0"
description = "Фикс вечного цикла при ошибке" description = "Дoбавлен пакет trace"
authors = [ authors = [
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" } { name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
] ]
@@ -15,6 +15,7 @@ dependencies = [
"PyYAML>=6.0", "PyYAML>=6.0",
"fastapi>=0.100.0", "fastapi>=0.100.0",
"uvicorn[standard]>=0.22.0", "uvicorn[standard]>=0.22.0",
"PyMySQL>=1.1.0",
] ]
[project.urls] [project.urls]

View File

@@ -0,0 +1,30 @@
CREATE TABLE IF NOT EXISTS trace_contexts
(
trace_id CHAR(32) PRIMARY KEY,
parent_id CHAR(32) NULL,
alias VARCHAR(255) NOT NULL,
type VARCHAR(64) NULL,
event_time DATETIME(6) NOT NULL,
attrs_json JSON NOT NULL,
INDEX idx_trace_contexts_parent_id (parent_id),
INDEX idx_trace_contexts_event_time (event_time)
);
CREATE TABLE IF NOT EXISTS trace_messages
(
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
trace_id CHAR(32) NOT NULL,
event_time DATETIME(6) NOT NULL,
step VARCHAR(128) NOT NULL DEFAULT '',
status VARCHAR(64) NOT NULL DEFAULT '',
level VARCHAR(16) NOT NULL DEFAULT 'INFO',
message TEXT NOT NULL,
attrs_json JSON NOT NULL,
INDEX idx_trace_messages_trace_id (trace_id),
INDEX idx_trace_messages_event_time (event_time),
INDEX idx_trace_messages_step (step),
INDEX idx_trace_messages_status (status),
CONSTRAINT fk_trace_messages_context
FOREIGN KEY (trace_id) REFERENCES trace_contexts(trace_id)
ON DELETE CASCADE
);

View File

@@ -1,3 +1,5 @@
from .v2 import ConfigManagerV2 as ConfigManager from .v2 import ConfigManagerV2 as ConfigManager
from .v1.cfg_manager import ConfigManager as LegacyConfigManager from .v2.core.log_manager import LogManager
from .v1.log_manager import LogManager from .v2.trace import TraceManager
__all__ = ["ConfigManager", "LogManager", "TraceManager"]

View File

@@ -1,141 +0,0 @@
import logging
import logging.config
import asyncio
import json
import yaml
import os
from typing import Any, Optional
from .log_manager import LogManager
class ConfigManager:
DEFAULT_UPDATE_INTERVAL = 5
DEFAULT_WORK_INTERVAL = 2
def __init__(self, path: str, log_manager: Optional[LogManager] = None):
self.path = path
self.config: Any = None
self._last_hash = None
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
self.work_interval = self.DEFAULT_WORK_INTERVAL
self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._log_manager = log_manager or LogManager()
self.logger = logging.getLogger(__name__)
def _read_file_sync(self) -> str:
with open(self.path, "r", encoding="utf-8") as f:
return f.read()
async def _read_file_async(self) -> str:
return await asyncio.to_thread(self._read_file_sync)
def _parse_config(self, data) -> Any:
extension = os.path.splitext(self.path)[1].lower()
if extension in (".yaml", ".yml"):
return yaml.safe_load(data)
else:
return json.loads(data)
def _update_intervals_from_config(self) -> None:
if not self.config:
return
upd = self.config.get("update_interval")
wrk = self.config.get("work_interval")
if isinstance(upd, (int, float)) and upd > 0:
self.update_interval = float(upd)
self.logger.info(f"Update interval set to {self.update_interval} seconds")
else:
self.update_interval = self.DEFAULT_UPDATE_INTERVAL
if isinstance(wrk, (int, float)) and wrk > 0:
self.work_interval = float(wrk)
self.logger.info(f"Work interval set to {self.work_interval} seconds")
else:
self.work_interval = self.DEFAULT_WORK_INTERVAL
async def _update_config(self) -> None:
try:
data = await self._read_file_async()
current_hash = hash(data)
if current_hash != self._last_hash:
new_config = self._parse_config(data)
self.config = new_config
self._last_hash = current_hash
self._log_manager.apply_config(new_config)
self._update_intervals_from_config()
except Exception as e:
self.logger.error(f"Error reading/parsing config file: {e}")
def execute(self) -> None:
"""
Метод для переопределения в подклассах.
Здесь может быть блокирующая работа.
Запускается в отдельном потоке.
"""
pass
async def _worker_loop(self) -> None:
self.logger.warning("Worker loop started")
try:
while not self._halt.is_set():
await asyncio.to_thread(self.execute)
await asyncio.sleep(self.work_interval)
finally:
self.logger.warning("Worker loop stopped")
async def _periodic_update_loop(self) -> None:
while not self._halt.is_set():
await self._update_config()
await asyncio.sleep(self.update_interval)
async def _run(self) -> None:
"""Внутренняя корутина, запускающая все циклы"""
self._halt.clear()
self.logger.info("ConfigManager started")
try:
await asyncio.gather(
self._worker_loop(),
self._periodic_update_loop()
)
except asyncio.CancelledError:
self.logger.info("ConfigManager tasks cancelled")
finally:
self.logger.info("ConfigManager stopped")
async def start(self) -> None:
if self._task is not None and not self._task.done():
self.logger.warning("ConfigManager is already running")
return
try:
self._loop = asyncio.get_running_loop()
except RuntimeError:
self.logger.error("start() must be called from within an async context")
raise
self.logger.info("ConfigManager starting and awaiting _run()")
await self._run()
async def stop(self) -> None:
"""Останавливает менеджер конфигурации и ожидает завершения"""
if self._task is None:
self.logger.warning("ConfigManager is not running")
return
self.logger.info("ConfigManager stopping...")
self._halt.set()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
self.logger.info("ConfigManager stopped successfully")

View File

@@ -1,43 +0,0 @@
import logging
from typing import Optional
class LogManager:
"""
Управляет конфигурацией логирования приложения.
Применяет конфигурацию из словаря с обработкой ошибок.
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self._last_valid_config: Optional[dict] = None
def apply_config(self, config: dict) -> None:
"""
Применяет конфигурацию логирования из словаря.
При ошибке восстанавливает последний валидный конфиг.
Args:
config: Словарь с настройками логирования (из файла конфига)
"""
logging_config = config.get("log")
if not logging_config:
self.logger.warning(
"Config has no 'log' section; logging parameters from config are not applied (default level may be WARNING)."
)
return
try:
logging.config.dictConfig(logging_config)
self._last_valid_config = logging_config
self.logger.info("Logging configuration applied")
except Exception as e:
self.logger.error(f"Error applying logging config: {e}")
# Если был предыдущий валидный конфиг, восстанавливаем его
if self._last_valid_config:
try:
logging.config.dictConfig(self._last_valid_config)
self.logger.info("Previous logging configuration restored")
except Exception as restore_error:
self.logger.error(f"Error restoring previous config: {restore_error}")

View File

@@ -1,7 +1,7 @@
"""Публичный API V2: точка входа в менеджер конфигурации и настройки management-сервера. """Публичный API: точка входа в менеджер конфигурации.
Экспортирует ConfigManagerV2 и типы настроек для использования приложениями.""" Контракт: наследование от ConfigManagerV2, переопределение execute(), управление через API (config.yaml, секция management)."""
from .core import ConfigManagerV2 from .core import ConfigManagerV2
from .types import HealthServerSettings, ManagementServerSettings from .trace import TraceManager
__all__ = ["ConfigManagerV2", "ManagementServerSettings", "HealthServerSettings"] __all__ = ["ConfigManagerV2", "TraceManager"]

View File

@@ -1,7 +1,8 @@
"""Каналы внешнего управления: абстракция и реализация (например, Telegram). """Каналы внешнего управления: абстракция и реализация (HTTP, Telegram).
Позволяет запускать, останавливать и запрашивать статус менеджера через ботов и другие интерфейсы.""" Позволяет запускать, останавливать и запрашивать статус менеджера через HTTP API и ботов."""
from .base import ControlChannel from .base import ControlChannel
from .http_channel import HttpControlChannel
from .telegram import TelegramControlChannel from .telegram import TelegramControlChannel
__all__ = ["ControlChannel", "TelegramControlChannel"] __all__ = ["ControlChannel", "HttpControlChannel", "TelegramControlChannel"]

View File

@@ -1,21 +1,25 @@
"""Management HTTP API with /health, /actions/start and /actions/stop.""" """HTTP-канал управления: /health, /actions/start, /actions/stop (реализация ControlChannel)."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import json import json
import logging import logging
import time
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from typing import Any, Optional from typing import Any, Optional
from fastapi import FastAPI from fastapi import FastAPI
from fastapi import Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from uvicorn import Config, Server from uvicorn import Config, Server
from ..types import HealthPayload from ..core.types import HealthPayload
from .base import ControlChannel, StartHandler, StatusHandler, StopHandler
PATH_HEALTH = "/health" PATH_HEALTH = "/health"
PATH_ACTION_START = "/actions/start" PATH_ACTION_START = "/actions/start"
PATH_ACTION_STOP = "/actions/stop" PATH_ACTION_STOP = "/actions/stop"
PATH_ACTION_STATUS = "/actions/status"
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -29,7 +33,7 @@ class UvicornServerRunner:
self._server: Optional[Server] = None self._server: Optional[Server] = None
self._serve_task: Optional[asyncio.Task[None]] = None self._serve_task: Optional[asyncio.Task[None]] = None
self._bound_port: Optional[int] = None self._bound_port: Optional[int] = None
logger.warning( logger.debug(
"UvicornServerRunner.__init__ result: host=%s port=%s timeout=%s", "UvicornServerRunner.__init__ result: host=%s port=%s timeout=%s",
self._host, self._host,
self._port, self._port,
@@ -83,21 +87,21 @@ class UvicornServerRunner:
self._server = None self._server = None
self._serve_task = None self._serve_task = None
self._bound_port = None self._bound_port = None
logger.warning("UvicornServerRunner._cleanup_start_failure result: state reset") logger.debug("UvicornServerRunner._cleanup_start_failure result: state reset")
async def start(self, app: FastAPI) -> None: async def start(self, app: FastAPI) -> None:
if self._serve_task is not None and not self._serve_task.done(): if self._serve_task is not None and not self._serve_task.done():
logger.warning("UvicornServerRunner.start result: already running") logger.debug("UvicornServerRunner.start result: already running")
return return
if self._serve_task is not None and self._serve_task.done(): if self._serve_task is not None and self._serve_task.done():
self._serve_task = None self._serve_task = None
try: try:
config = Config(app=app, host=self._host, port=self._port, log_level="warning") config = Config(app=app, host=self._host, port=self._port, log_level="warning")
self._server = Server(config) self._server = Server(config)
self._serve_task = asyncio.create_task(self._server.serve(), name="management-server-serve") self._serve_task = asyncio.create_task(self._server.serve(), name="http-control-channel-serve")
await self._wait_until_started() await self._wait_until_started()
self._bound_port = self._resolve_bound_port() self._bound_port = self._resolve_bound_port()
logger.warning( logger.debug(
"UvicornServerRunner.start result: running host=%s requested_port=%s bound_port=%s", "UvicornServerRunner.start result: running host=%s requested_port=%s bound_port=%s",
self._host, self._host,
self._port, self._port,
@@ -110,7 +114,7 @@ class UvicornServerRunner:
async def stop(self) -> None: async def stop(self) -> None:
if self._server is None or self._serve_task is None: if self._server is None or self._serve_task is None:
logger.warning("UvicornServerRunner.stop result: already stopped") logger.debug("UvicornServerRunner.stop result: already stopped")
return return
self._server.should_exit = True self._server.should_exit = True
try: try:
@@ -122,17 +126,17 @@ class UvicornServerRunner:
self._server = None self._server = None
self._serve_task = None self._serve_task = None
self._bound_port = None self._bound_port = None
logger.warning("UvicornServerRunner.stop result: stopped") logger.debug("UvicornServerRunner.stop result: stopped")
@property @property
def port(self) -> int: def port(self) -> int:
result = self._bound_port if self._bound_port is not None else self._port result = self._bound_port if self._bound_port is not None else self._port
logger.warning("UvicornServerRunner.port result: %s", result) logger.debug("UvicornServerRunner.port result: %s", result)
return result return result
class ManagementServer: class HttpControlChannel(ControlChannel):
"""Management API endpoints and callback adapters.""" """HTTP API как канал управления: /health, /actions/start, /actions/stop, /actions/status."""
def __init__( def __init__(
self, self,
@@ -140,17 +144,18 @@ class ManagementServer:
port: int, port: int,
timeout: int, timeout: int,
health_provider: Callable[[], Awaitable[HealthPayload]], health_provider: Callable[[], Awaitable[HealthPayload]],
on_start: Optional[Callable[[], Awaitable[str]]] = None,
on_stop: Optional[Callable[[], Awaitable[str]]] = None,
): ):
self._host = host
self._port = port
self._timeout = timeout self._timeout = timeout
self._health_provider = health_provider self._health_provider = health_provider
self._on_start = on_start self._on_start: Optional[StartHandler] = None
self._on_stop = on_stop self._on_stop: Optional[StopHandler] = None
self._on_status: Optional[StatusHandler] = None
self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout) self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout)
self._app = self._create_app() self._app: Optional[FastAPI] = None
logger.warning( logger.warning(
"ManagementServer.__init__ result: host=%s port=%s timeout=%s", "HttpControlChannel запушщен: host=%s port=%s timeout=%s",
host, host,
port, port,
timeout, timeout,
@@ -159,6 +164,20 @@ class ManagementServer:
def _create_app(self) -> FastAPI: def _create_app(self) -> FastAPI:
app = FastAPI(title="Config Manager Management API") app = FastAPI(title="Config Manager Management API")
@app.middleware("http")
async def log_api_call(request: Request, call_next): # type: ignore[no-untyped-def]
started = time.monotonic()
response = await call_next(request)
duration_ms = int((time.monotonic() - started) * 1000)
logger.debug(
"API call: method=%s path=%s status=%s duration_ms=%s",
request.method,
request.url.path,
response.status_code,
duration_ms,
)
return response
@app.get(PATH_HEALTH) @app.get(PATH_HEALTH)
async def health() -> JSONResponse: async def health() -> JSONResponse:
return await self._health_response() return await self._health_response()
@@ -173,11 +192,17 @@ class ManagementServer:
async def action_stop() -> JSONResponse: async def action_stop() -> JSONResponse:
return await self._action_response("stop", self._on_stop) return await self._action_response("stop", self._on_stop)
logger.warning( @app.get(PATH_ACTION_STATUS)
"ManagementServer._create_app result: routes=%s,%s,%s", @app.post(PATH_ACTION_STATUS)
async def action_status() -> JSONResponse:
return await self._action_response("status", self._on_status)
logger.debug(
"HttpControlChannel._create_app result: routes=%s,%s,%s,%s",
PATH_HEALTH, PATH_HEALTH,
PATH_ACTION_START, PATH_ACTION_START,
PATH_ACTION_STOP, PATH_ACTION_STOP,
PATH_ACTION_STATUS,
) )
return app return app
@@ -185,14 +210,14 @@ class ManagementServer:
try: try:
payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout) payload = await asyncio.wait_for(self._health_provider(), timeout=self._timeout)
status_code = 200 if payload.get("status", "unhealthy") == "ok" else 503 status_code = 200 if payload.get("status", "unhealthy") == "ok" else 503
logger.warning( logger.debug(
"ManagementServer._health_response result: status_code=%s payload=%s", "HttpControlChannel._health_response result: status_code=%s payload=%s",
status_code, status_code,
payload, payload,
) )
return JSONResponse(content=payload, status_code=status_code) return JSONResponse(content=payload, status_code=status_code)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
logger.exception("ManagementServer._health_response error") logger.exception("HttpControlChannel._health_response error")
return JSONResponse(content={"status": "unhealthy", "detail": str(exc)}, status_code=503) return JSONResponse(content={"status": "unhealthy", "detail": str(exc)}, status_code=503)
async def _action_response( async def _action_response(
@@ -201,64 +226,75 @@ class ManagementServer:
callback: Optional[Callable[[], Awaitable[str]]], callback: Optional[Callable[[], Awaitable[str]]],
) -> JSONResponse: ) -> JSONResponse:
if callback is None: if callback is None:
logger.warning( logger.debug(
"ManagementServer._action_response result: action=%s status_code=404 detail=handler not configured", "HttpControlChannel._action_response result: action=%s status_code=404 detail=handler not configured",
action, action,
) )
logger.warning("control command: action=%s status=handler_not_configured", action)
return JSONResponse( return JSONResponse(
content={"status": "error", "detail": f"{action} handler is not configured"}, content={"status": "error", "detail": f"{action} handler is not configured"},
status_code=404, status_code=404,
) )
try: try:
detail = await callback() detail = await asyncio.wait_for(callback(), timeout=float(self._timeout))
if not detail: if not detail:
detail = f"{action} action accepted" detail = f"{action} action accepted"
logger.warning( logger.debug(
"ManagementServer._action_response result: action=%s status_code=200 detail=%s", "HttpControlChannel._action_response result: action=%s status_code=200 detail=%s",
action, action,
detail, detail,
) )
logger.warning("control command: action=%s status=ok", action)
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200) return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
except asyncio.TimeoutError:
logger.debug("HttpControlChannel._action_response timeout: action=%s timeout=%s", action, self._timeout)
logger.warning("control command: action=%s status=timeout", action)
return JSONResponse(
content={"status": "error", "detail": f"{action} handler did not respond within {self._timeout}s"},
status_code=504,
)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
logger.exception("ManagementServer._action_response error: action=%s", action) logger.debug("HttpControlChannel._action_response error: action=%s", action, exc_info=True)
logger.warning("control command: action=%s status=error", action)
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
def _build_health_response(self) -> Awaitable[tuple[int, HealthPayload]]: async def start(
async def _run() -> tuple[int, HealthPayload]: self,
response = await self._health_response() on_start: StartHandler,
body: Any = response.body on_stop: StopHandler,
if isinstance(body, bytes): on_status: StatusHandler,
body = json.loads(body.decode("utf-8")) ) -> None:
logger.warning( self._on_start = on_start
"ManagementServer._build_health_response result: status_code=%s payload=%s", self._on_stop = on_stop
response.status_code, self._on_status = on_status
body, self._app = self._create_app()
)
return response.status_code, body
return _run()
async def start(self) -> None:
try: try:
await self._runner.start(self._app) await self._runner.start(self._app)
logger.warning("ManagementServer.start result: started") logger.debug("HttpControlChannel.start result: started port=%s", self._runner.port)
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
logger.exception("ManagementServer.start error") logger.exception("HttpControlChannel.start error")
raise raise
async def stop(self) -> None: async def stop(self) -> None:
try: try:
await self._runner.stop() await self._runner.stop()
logger.warning("ManagementServer.stop result: stopped") logger.debug("HttpControlChannel.stop result: stopped")
except BaseException: # noqa: BLE001 except BaseException: # noqa: BLE001
logger.exception("ManagementServer.stop error") logger.exception("HttpControlChannel.stop error")
raise raise
@property @property
def port(self) -> int: def port(self) -> int:
result = self._runner.port return self._runner.port
logger.warning("ManagementServer.port result: %s", result)
return result
def _build_health_response(self) -> Awaitable[tuple[int, Any]]:
"""Для тестов: вернуть (status_code, payload) без запуска сервера."""
HealthServer = ManagementServer async def _run() -> tuple[int, Any]:
response = await self._health_response()
body: Any = response.body
if isinstance(body, bytes):
body = json.loads(body.decode("utf-8"))
return response.status_code, body
return _run()

View File

@@ -62,7 +62,7 @@ class TelegramControlChannel(ControlChannel):
for update in updates: for update in updates:
await self._process_update(update) await self._process_update(update)
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
self._logger.warning("Telegram polling error: %s", exc) self._logger.debug("Telegram polling error: %s", exc, exc_info=True)
try: try:
await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1)) await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1))
@@ -94,16 +94,32 @@ class TelegramControlChannel(ControlChannel):
if chat_id != self._chat_id: if chat_id != self._chat_id:
return return
if text in {"/start", "/run"} and self._on_start is not None: action: Optional[str]
reply = await self._on_start() callback: Optional[StartHandler | StopHandler | StatusHandler]
elif text in {"/stop", "/halt"} and self._on_stop is not None: if text in {"/start", "/run"}:
reply = await self._on_stop() action = "start"
elif text in {"/status", "/health"} and self._on_status is not None: callback = self._on_start
reply = await self._on_status() elif text in {"/stop", "/halt"}:
action = "stop"
callback = self._on_stop
elif text in {"/status", "/health"}:
action = "status"
callback = self._on_status
else: else:
return return
if callback is None:
self._logger.warning("control command: action=%s status=handler_not_configured", action)
return
try:
reply = await callback()
await asyncio.to_thread(self._send_message, reply) await asyncio.to_thread(self._send_message, reply)
self._logger.warning("control command: action=%s status=ok", action)
except asyncio.TimeoutError:
self._logger.warning("control command: action=%s status=timeout", action)
except Exception:
self._logger.debug("Telegram control command error: action=%s", action, exc_info=True)
self._logger.warning("control command: action=%s status=error", action)
def _send_message(self, text: str) -> None: def _send_message(self, text: str) -> None:
"""Отправить текстовый ответ в настроенный чат Telegram.""" """Отправить текстовый ответ в настроенный чат Telegram."""

View File

@@ -2,7 +2,10 @@
Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute().""" Содержит ConfigManagerV2, загрузчик конфигурации и планировщик повторяющегося выполнения execute()."""
from .config_loader import ConfigLoader from .config_loader import ConfigLoader
from .control_bridge import ControlChannelBridge
from .health_aggregator import HealthAggregator
from .log_manager import LogManager
from .manager import ConfigManagerV2 from .manager import ConfigManagerV2
from .scheduler import WorkerLoop from .scheduler import WorkerLoop
__all__ = ["ConfigLoader", "ConfigManagerV2", "WorkerLoop"] __all__ = ["ConfigLoader", "ConfigManagerV2", "ControlChannelBridge", "HealthAggregator", "LogManager", "WorkerLoop"]

View File

@@ -22,19 +22,19 @@ class ConfigLoader:
self.config: Any = None self.config: Any = None
self.last_valid_config: Any = None self.last_valid_config: Any = None
self._last_seen_hash: Optional[str] = None self._last_seen_hash: Optional[str] = None
logger.warning("ConfigLoader.__init__ result: path=%s", self.path) logger.debug("ConfigLoader.__init__ result: path=%s", self.path)
def _read_file_sync(self) -> str: def _read_file_sync(self) -> str:
"""Синхронно прочитать сырой текст конфига с диска.""" """Синхронно прочитать сырой текст конфига с диска."""
with open(self.path, "r", encoding="utf-8") as fh: with open(self.path, "r", encoding="utf-8") as fh:
data = fh.read() data = fh.read()
logger.warning("ConfigLoader._read_file_sync result: bytes=%s", len(data)) logger.debug("ConfigLoader._read_file_sync result: bytes=%s", len(data))
return data return data
async def read_file_async(self) -> str: async def read_file_async(self) -> str:
"""Прочитать сырой текст конфига с диска в рабочем потоке.""" """Прочитать сырой текст конфига с диска в рабочем потоке."""
result = await asyncio.to_thread(self._read_file_sync) result = await asyncio.to_thread(self._read_file_sync)
logger.warning("ConfigLoader.read_file_async result: bytes=%s", len(result)) logger.debug("ConfigLoader.read_file_async result: bytes=%s", len(result))
return result return result
def parse_config(self, data: str) -> Any: def parse_config(self, data: str) -> Any:
@@ -48,7 +48,7 @@ class ConfigLoader:
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
logger.exception("ConfigLoader.parse_config error: extension=%s", extension) logger.exception("ConfigLoader.parse_config error: extension=%s", extension)
raise raise
logger.warning( logger.debug(
"ConfigLoader.parse_config result: extension=%s type=%s", "ConfigLoader.parse_config result: extension=%s type=%s",
extension, extension,
type(result).__name__, type(result).__name__,
@@ -59,22 +59,35 @@ class ConfigLoader:
def _calculate_hash(data: str) -> str: def _calculate_hash(data: str) -> str:
"""Вычислить устойчивый хеш содержимого для обнаружения изменений.""" """Вычислить устойчивый хеш содержимого для обнаружения изменений."""
result = hashlib.sha256(data.encode("utf-8")).hexdigest() result = hashlib.sha256(data.encode("utf-8")).hexdigest()
logger.warning("ConfigLoader._calculate_hash result: hash=%s", result) logger.debug("ConfigLoader._calculate_hash result: hash=%s", result)
return result return result
def load_sync(self) -> Any:
"""Синхронно загрузить и распарсить конфиг один раз (для чтения настроек при инициализации)."""
try:
raw_data = self._read_file_sync()
parsed = self.parse_config(raw_data)
self.config = parsed
self.last_valid_config = parsed
logger.debug("ConfigLoader.load_sync result: loaded")
return parsed
except Exception: # noqa: BLE001
logger.exception("ConfigLoader.load_sync error")
return None
async def load_if_changed(self) -> tuple[bool, Any]: async def load_if_changed(self) -> tuple[bool, Any]:
"""Загрузить и распарсить конфиг только при изменении содержимого файла.""" """Загрузить и распарсить конфиг только при изменении содержимого файла."""
raw_data = await self.read_file_async() raw_data = await self.read_file_async()
current_hash = self._calculate_hash(raw_data) current_hash = self._calculate_hash(raw_data)
if current_hash == self._last_seen_hash: if current_hash == self._last_seen_hash:
logger.warning("ConfigLoader.load_if_changed result: changed=False") logger.debug("ConfigLoader.load_if_changed result: changed=False")
return False, self.config return False, self.config
self._last_seen_hash = current_hash self._last_seen_hash = current_hash
parsed = self.parse_config(raw_data) parsed = self.parse_config(raw_data)
self.config = parsed self.config = parsed
self.last_valid_config = parsed self.last_valid_config = parsed
logger.warning("ConfigLoader.load_if_changed result: changed=True") logger.debug("ConfigLoader.load_if_changed result: changed=True")
return True, parsed return True, parsed

View File

@@ -0,0 +1,54 @@
"""Адаптер между каналами управления и жизненным циклом менеджера.
Предоставляет обработчики start/stop/status для ControlChannel (halt, state, status text)."""
from __future__ import annotations
import logging
from collections.abc import Awaitable, Callable
from .types import LifecycleState
logger = logging.getLogger(__name__)
class ControlChannelBridge:
"""Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram)."""
def __init__(
self,
get_state: Callable[[], LifecycleState],
get_status: Callable[[], Awaitable[str]],
start_runtime: Callable[[], Awaitable[str]],
stop_runtime: Callable[[], Awaitable[str]],
):
self._get_state = get_state
self._get_status = get_status
self._start_runtime = start_runtime
self._stop_runtime = stop_runtime
logger.debug("ControlChannelBridge.__init__ result: callbacks configured")
async def on_start(self) -> str:
"""Обработать внешний start через lifecycle-метод менеджера."""
if self._get_state() == LifecycleState.RUNNING:
result = "already running"
logger.debug("ControlChannelBridge.on_start result: %s", result)
return result
result = await self._start_runtime()
logger.debug("ControlChannelBridge.on_start result: %s", result)
return result
async def on_stop(self) -> str:
"""Обработать внешний stop через lifecycle-метод менеджера."""
result = await self._stop_runtime()
logger.debug("ControlChannelBridge.on_stop result: %s", result)
return result
async def on_status(self) -> str:
"""Вернуть текущий текст статуса."""
try:
result = await self._get_status()
except Exception: # noqa: BLE001
logger.exception("ControlChannelBridge.on_status error")
raise
logger.debug("ControlChannelBridge.on_status result: %s", result)
return result

View File

@@ -7,7 +7,7 @@ import logging
import time import time
from collections.abc import Callable from collections.abc import Callable
from ..types import HealthPayload, LifecycleState from .types import HealthPayload, LifecycleState
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -28,7 +28,7 @@ class HealthAggregator:
self._get_last_success_timestamp = get_last_success_timestamp self._get_last_success_timestamp = get_last_success_timestamp
self._health_timeout = health_timeout self._health_timeout = health_timeout
self._get_app_health = get_app_health self._get_app_health = get_app_health
logger.warning("HealthAggregator.__init__ result: health_timeout=%s", self._health_timeout) logger.debug("HealthAggregator.__init__ result: health_timeout=%s", self._health_timeout)
async def collect(self) -> HealthPayload: async def collect(self) -> HealthPayload:
"""Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state.""" """Вернуть ok, если был успешный execute() за последние health_timeout сек; иначе unhealthy. Всегда добавляем state."""
@@ -38,7 +38,7 @@ class HealthAggregator:
# Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy. # Только при state=RUNNING возможен status=ok; при остановке (STOPPING/STOPPED) сразу unhealthy.
if state != LifecycleState.RUNNING: if state != LifecycleState.RUNNING:
result = {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value} result = {"status": "unhealthy", "detail": f"state={state_value}", "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result) logger.debug("HealthAggregator.collect result: %s", result)
return result return result
last_success = self._get_last_success_timestamp() last_success = self._get_last_success_timestamp()
@@ -47,21 +47,25 @@ class HealthAggregator:
if last_success is None: if last_success is None:
detail = self._get_last_error() or "no successful run yet" detail = self._get_last_error() or "no successful run yet"
result = {"status": "unhealthy", "detail": detail, "state": state_value} result = {"status": "unhealthy", "detail": detail, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result) logger.debug("HealthAggregator.collect result: %s", result)
return result return result
if (now - last_success) > self._health_timeout: if (now - last_success) > self._health_timeout:
detail = self._get_last_error() or f"no successful run within {self._health_timeout}s" detail = self._get_last_error() or f"no successful run within {self._health_timeout}s"
result = {"status": "unhealthy", "detail": detail, "state": state_value} result = {"status": "unhealthy", "detail": detail, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", result) logger.debug("HealthAggregator.collect result: %s", result)
return result return result
result = self._get_app_health() result = self._get_app_health()
status = result.get("status", "unhealthy") status = result.get("status", "unhealthy")
if status == "degraded":
degraded = {"status": "degraded", "detail": result.get("detail", "app degraded"), "state": state_value}
logger.debug("HealthAggregator.collect result: %s", degraded)
return degraded
if status != "ok": if status != "ok":
unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value} unhealthy = {"status": "unhealthy", "detail": result.get("detail", "app reported non-ok"), "state": state_value}
logger.warning("HealthAggregator.collect result: %s", unhealthy) logger.debug("HealthAggregator.collect result: %s", unhealthy)
return unhealthy return unhealthy
healthy = {**result, "state": state_value} healthy = {**result, "state": state_value}
logger.warning("HealthAggregator.collect result: %s", healthy) logger.debug("HealthAggregator.collect result: %s", healthy)
return healthy return healthy

View File

@@ -0,0 +1,38 @@
"""Применение конфигурации логирования из словаря (секция `log` в config.yaml).
Управляет конфигурацией логирования приложения через dictConfig, с восстановлением последнего валидного конфига при ошибке."""
from __future__ import annotations
import logging
import logging.config
from typing import Optional
class LogManager:
"""Применяет секцию `log` из конфига к логированию (dictConfig). При ошибке восстанавливает последний валидный конфиг."""
def __init__(self) -> None:
self.logger = logging.getLogger(__name__)
self._last_valid_config: Optional[dict] = None
def apply_config(self, config: dict) -> None:
"""Применить конфигурацию логирования из словаря. При ошибке восстанавливает последний валидный конфиг."""
logging_config = config.get("log")
if not logging_config:
self.logger.warning(
"Config has no 'log' section; logging parameters from config are not applied (default level may be WARNING)."
)
return
try:
logging.config.dictConfig(logging_config)
self._last_valid_config = logging_config
self.logger.info("Logging configuration applied")
except Exception as e:
self.logger.error("Error applying logging config: %s", e)
if self._last_valid_config:
try:
logging.config.dictConfig(self._last_valid_config)
self.logger.info("Previous logging configuration restored")
except Exception as restore_error:
self.logger.error("Error restoring previous config: %s", restore_error)

View File

@@ -5,19 +5,16 @@ import asyncio
import logging import logging
import os import os
import time import time
from typing import Any, Optional from collections.abc import Callable, Iterable
from typing import Any, Optional, Union
from ...v1.log_manager import LogManager
from ..control.base import ControlChannel from ..control.base import ControlChannel
from ..management import (
ControlChannelBridge,
HealthAggregator,
ManagementApiBridge,
ManagementServer,
)
from ..types import HealthPayload, LifecycleState, ManagementServerSettings
from .config_loader import ConfigLoader from .config_loader import ConfigLoader
from .control_bridge import ControlChannelBridge
from .health_aggregator import HealthAggregator
from .log_manager import LogManager
from .scheduler import WorkerLoop from .scheduler import WorkerLoop
from .types import HealthPayload, LifecycleState
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -42,23 +39,105 @@ def _read_env_interval(name: str, default_value: float) -> float:
return float(default_value) return float(default_value)
def _read_env_optional_float(name: str, default_value: Optional[float]) -> Optional[float]:
"""Read optional non-negative float from env (0 or missing => default_value)."""
raw_value = os.environ.get(name)
if raw_value is None:
return default_value
try:
parsed = float(raw_value)
if parsed < 0:
logger.warning("ConfigManagerV2 %s must be >= 0, got %s; using default %s", name, parsed, default_value)
return default_value
return parsed if parsed > 0 else default_value
except Exception: # noqa: BLE001
logger.exception("ConfigManagerV2 %s parse error: raw_value=%s fallback=%s", name, raw_value, default_value)
return default_value
def _read_env_health_timeout(default_value: float) -> float:
"""Read health timeout from env."""
env_name = "HEALTH_TIMEOUT"
raw_value = os.environ.get(env_name)
if raw_value is None:
return default_value
try:
parsed = float(raw_value)
if parsed <= 0:
raise ValueError(f"{env_name} must be greater than zero")
return parsed
except Exception: # noqa: BLE001
logger.exception(
"ConfigManagerV2 health timeout parse error: env=%s raw_value=%s fallback=%s",
env_name,
raw_value,
default_value,
)
return default_value
class _RuntimeController: class _RuntimeController:
"""Runtime loops and lifecycle supervision.""" """Runtime loops and lifecycle supervision."""
CONTROL_CHANNEL_TIMEOUT = 5.0
def _trigger_health_transition_check(self) -> None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
loop.create_task(self._log_health_status_transition(), name="health-transition-check")
def _on_execute_success(self) -> None: def _on_execute_success(self) -> None:
self._last_success_timestamp = time.monotonic() self._last_success_timestamp = time.monotonic()
self._last_execute_error = None self._last_execute_error = None
self.logger.warning( self._trigger_health_transition_check()
self.logger.debug(
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s", "ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
self._last_success_timestamp, self._last_success_timestamp,
) )
def _on_execute_error(self, exc: Exception) -> None: def _on_execute_error(self, exc: Exception) -> None:
self._last_execute_error = str(exc) self._last_execute_error = str(exc)
self.logger.exception("ConfigManagerV2._on_execute_error") self._trigger_health_transition_check()
self.logger.warning( self.logger.error(
"ConfigManagerV2._on_execute_error result: last_execute_error=%s", "ConfigManagerV2._on_execute_error: %s",
self._last_execute_error, self._last_execute_error,
exc_info=(type(exc), exc, exc.__traceback__),
)
def _on_worker_degraded_change(self, degraded: bool) -> None:
self._worker_degraded = degraded
self._trigger_health_transition_check()
self.logger.debug("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None:
self._worker_inflight_count = inflight_count
self._worker_timed_out_inflight_count = timed_out_count
self.logger.debug(
"ConfigManagerV2._on_worker_metrics_change result: inflight=%s timed_out_inflight=%s",
inflight_count,
timed_out_count,
)
async def _log_health_status_transition(self) -> None:
try:
health = await self._health_aggregator.collect()
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._log_health_status_transition error")
return
status = health.get("status", "unhealthy")
if self._last_health_status == status:
return
previous = self._last_health_status or "unknown"
detail = health.get("detail", "")
self._last_health_status = status
self.logger.warning(
"ConfigManagerV2 health status changed: %s -> %s (state=%s detail=%s)",
previous,
status,
self._state.value,
detail,
) )
async def _worker_loop(self) -> None: async def _worker_loop(self) -> None:
@@ -72,88 +151,144 @@ class _RuntimeController:
halt_event=self._halt, halt_event=self._halt,
on_error=self._on_execute_error, on_error=self._on_execute_error,
on_success=self._on_execute_success, on_success=self._on_execute_success,
execute_timeout=self._execute_timeout,
on_degraded_change=self._on_worker_degraded_change,
on_metrics_change=self._on_worker_metrics_change,
) )
try: try:
await worker.run() await worker.run()
self.logger.warning("ConfigManagerV2._worker_loop result: completed") self.logger.debug("ConfigManagerV2._worker_loop result: completed")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._worker_loop error")
raise
finally: finally:
self.logger.warning("ConfigManagerV2._worker_loop result: stopped") self.logger.warning("ConfigManagerV2._worker_loop result: stopped")
async def _periodic_update_loop(self) -> None: async def _periodic_update_loop(self) -> None:
self.logger.warning( self.logger.debug(
"ConfigManagerV2._periodic_update_loop result: started update_interval=%s", "ConfigManagerV2._periodic_update_loop result: started update_interval=%s",
self.update_interval, self.update_interval,
) )
try: try:
while not self._halt.is_set(): while not self._halt.is_set():
await self._update_config() await self._update_config()
await self._log_health_status_transition()
try: try:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05)) await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._periodic_update_loop error")
raise
finally: finally:
self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped") self.logger.debug("ConfigManagerV2._periodic_update_loop result: stopped")
async def _status_text(self) -> str: async def _status_text(self) -> str:
health = await self._health_aggregator.collect() health = await self._health_aggregator.collect()
detail = health.get("detail") detail = health.get("detail")
worker_tail = (
f"worker_inflight={self._worker_inflight_count}; "
f"worker_timed_out_inflight={self._worker_timed_out_inflight_count}"
)
if detail: if detail:
status_text = f"state={self._state.value}; health={health['status']}; detail={detail}" status_text = f"state={self._state.value}; health={health['status']}; detail={detail}; {worker_tail}"
self.logger.warning("ConfigManagerV2._status_text result: %s", status_text) self.logger.debug("ConfigManagerV2._status_text result: %s", status_text)
return status_text return status_text
status_text = f"state={self._state.value}; health={health['status']}" status_text = f"state={self._state.value}; health={health['status']}; {worker_tail}"
self.logger.warning("ConfigManagerV2._status_text result: %s", status_text) self.logger.debug("ConfigManagerV2._status_text result: %s", status_text)
return status_text return status_text
async def _start_control_channel(self) -> None: async def _start_control_channels(self) -> None:
if self._control_channel is None: for channel in self._control_channels:
self.logger.warning("ConfigManagerV2._start_control_channel result: no control channel")
return
try: try:
await self._control_channel.start( await asyncio.wait_for(
channel.start(
self._control_bridge.on_start, self._control_bridge.on_start,
self._control_bridge.on_stop, self._control_bridge.on_stop,
self._control_bridge.on_status, self._control_bridge.on_status,
),
timeout=self.CONTROL_CHANNEL_TIMEOUT,
)
self.logger.debug("ConfigManagerV2._start_control_channels result: started channel=%s", type(channel).__name__)
except asyncio.TimeoutError:
self.logger.error(
"ConfigManagerV2._start_control_channels timeout channel=%s timeout=%ss",
type(channel).__name__,
self.CONTROL_CHANNEL_TIMEOUT,
) )
self.logger.warning("ConfigManagerV2._start_control_channel result: started")
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._start_control_channel error") self.logger.exception("ConfigManagerV2._start_control_channels error channel=%s", type(channel).__name__)
async def _stop_control_channel(self) -> None: async def _stop_control_channels(self) -> None:
if self._control_channel is None: for channel in self._control_channels:
self.logger.warning("ConfigManagerV2._stop_control_channel result: no control channel")
return
try: try:
await self._control_channel.stop() await asyncio.wait_for(channel.stop(), timeout=self.CONTROL_CHANNEL_TIMEOUT)
self.logger.warning("ConfigManagerV2._stop_control_channel result: stopped") self.logger.debug("ConfigManagerV2._stop_control_channels result: stopped channel=%s", type(channel).__name__)
except asyncio.TimeoutError:
self.logger.error(
"ConfigManagerV2._stop_control_channels timeout channel=%s timeout=%ss",
type(channel).__name__,
self.CONTROL_CHANNEL_TIMEOUT,
)
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._stop_control_channel error") self.logger.exception("ConfigManagerV2._stop_control_channels error channel=%s", type(channel).__name__)
async def _start_management_server(self) -> None: async def _run_runtime_loops(self) -> None:
if self._management_server is None: self._state = LifecycleState.RUNNING
self.logger.warning("ConfigManagerV2._start_management_server result: disabled") self.logger.debug("ConfigManagerV2._run_runtime_loops result: state=%s", self._state.value)
return tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try: try:
await self._management_server.start() await asyncio.gather(*tasks)
self.logger.warning( self.logger.debug("ConfigManagerV2._run_runtime_loops result: loops completed")
"ConfigManagerV2._start_management_server result: started port=%s", except asyncio.CancelledError:
self._management_server.port, self.logger.debug("ConfigManagerV2._run_runtime_loops result: cancelled")
) raise
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._start_management_server error") self.logger.exception("ConfigManagerV2._run_runtime_loops error")
self.logger.warning( raise
"ConfigManagerV2._start_management_server result: failed worker will continue", finally:
) for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
self._state = LifecycleState.STOPPED
self.logger.debug("ConfigManagerV2._run_runtime_loops result: state=%s", self._state.value)
def _on_runtime_task_done(self, task: asyncio.Task) -> None: async def _start_runtime(self) -> str:
if self._shutdown.is_set():
result = "manager is shutting down"
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
return result
async with self._runtime_lock:
if self._runtime_task is not None and not self._runtime_task.done():
result = "already running"
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
return result
self._halt.clear()
self._state = LifecycleState.STARTING
self._runtime_task = asyncio.create_task(self._run_runtime_loops(), name="config-manager-v2-runtime")
self._runtime_task.add_done_callback(self._on_runtime_task_done)
result = "start signal accepted"
self.logger.debug("ConfigManagerV2._start_runtime result: %s", result)
return result
async def _stop_runtime(self) -> str:
self._halt.set()
async with self._runtime_lock:
runtime_task = self._runtime_task
if runtime_task is None:
result = "already stopped"
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
return result
if runtime_task.done():
result = "already stopped"
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
return result
result = "stop signal accepted"
self.logger.debug("ConfigManagerV2._stop_runtime result: %s", result)
return result
def _on_runtime_task_done(self, task: asyncio.Task[None]) -> None:
if task.cancelled(): if task.cancelled():
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: cancelled") self.logger.debug("ConfigManagerV2._on_runtime_task_done result: cancelled")
return return
try: try:
exc = task.exception() exc = task.exception()
@@ -161,86 +296,88 @@ class _RuntimeController:
self.logger.exception("ConfigManagerV2._on_runtime_task_done error while reading task exception") self.logger.exception("ConfigManagerV2._on_runtime_task_done error while reading task exception")
return return
if exc is None: if exc is None:
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: completed") self.logger.debug("ConfigManagerV2._on_runtime_task_done result: completed")
return return
self.logger.error( self.logger.error(
"ConfigManagerV2 background task failed", "ConfigManagerV2 runtime task failed",
exc_info=(type(exc), exc, exc.__traceback__), exc_info=(type(exc), exc, exc.__traceback__),
) )
self.logger.warning("ConfigManagerV2._on_runtime_task_done result: failed")
async def _run(self) -> None: async def _run(self) -> None:
self._state = LifecycleState.STARTING self._shutdown.clear()
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value)
self._halt.clear() self._halt.clear()
self._state = LifecycleState.STARTING
self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
await self._update_config() await self._update_config()
await self._start_control_channels()
await self._start_runtime()
await self._log_health_status_transition()
await self._start_management_server()
await self._start_control_channel()
self._state = LifecycleState.RUNNING
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value)
tasks = [
asyncio.create_task(self._worker_loop(), name="v2-worker-loop"),
asyncio.create_task(self._periodic_update_loop(), name="v2-config-loop"),
]
try: try:
await asyncio.gather(*tasks) await self._shutdown.wait()
self.logger.warning("ConfigManagerV2._run result: background loops completed")
except asyncio.CancelledError:
self.logger.warning("ConfigManagerV2._run result: cancelled")
raise
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._run error")
raise
finally: finally:
self._state = LifecycleState.STOPPING self._state = LifecycleState.STOPPING
self.logger.warning("ConfigManagerV2._run result: state=%s", self._state.value) self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
self._halt.set() self._halt.set()
for task in tasks:
task.cancel() async with self._runtime_lock:
await asyncio.gather(*tasks, return_exceptions=True) runtime_task = self._runtime_task
if runtime_task is not None:
try:
await runtime_task
except asyncio.CancelledError:
self.logger.debug("ConfigManagerV2._run result: runtime task cancelled")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._run error while awaiting runtime task")
await self._stop_control_channels()
self._runtime_task = None
self._state = LifecycleState.STOPPED self._state = LifecycleState.STOPPED
self._task = None self._task = None
self.logger.warning( self.logger.debug("ConfigManagerV2._run result: state=%s", self._state.value)
"ConfigManagerV2._run result: state=%s api_and_control_available=%s",
self._state.value,
True,
)
class ConfigManagerV2(_RuntimeController): class ConfigManagerV2(_RuntimeController):
"""Public manager API.""" """Public manager API. Каналы управления задаются снаружи через control_channels."""
DEFAULT_UPDATE_INTERVAL = 5 DEFAULT_UPDATE_INTERVAL = 5
DEFAULT_WORK_INTERVAL = 2 DEFAULT_WORK_INTERVAL = 2
DEFAULT_HEALTH_TIMEOUT = 90
DEFAULT_EXECUTE_TIMEOUT = 600.0
def __init__( def __init__(
self, self,
path: str, path: str,
log_manager: Optional[LogManager] = None, control_channels: Optional[
management_settings: Optional[ManagementServerSettings] = None, Union[Iterable[ControlChannel], Callable[["ConfigManagerV2"], Iterable[ControlChannel]]]
control_channel: Optional[ControlChannel] = None, ] = None,
): ):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.path = path self.path = path
self.config: Any = None self.config: Any = None
self.update_interval = _read_env_interval("UPDATE_INTERVAL", float(self.DEFAULT_UPDATE_INTERVAL)) self.update_interval = _read_env_interval("UPDATE_INTERVAL", float(self.DEFAULT_UPDATE_INTERVAL))
self.work_interval = _read_env_interval("WORK_INTERVAL", float(self.DEFAULT_WORK_INTERVAL)) self.work_interval = _read_env_interval("WORK_INTERVAL", float(self.DEFAULT_WORK_INTERVAL))
self._execute_timeout = _read_env_optional_float("EXECUTE_TIMEOUT", float(self.DEFAULT_EXECUTE_TIMEOUT))
self._loader = ConfigLoader(path) self._loader = ConfigLoader(path)
self._log_manager = log_manager or LogManager() self._log_manager = LogManager()
self._control_channel = control_channel
self._halt = asyncio.Event() self._halt = asyncio.Event()
self._task: Optional[asyncio.Task] = None self._shutdown = asyncio.Event()
self._task: Optional[asyncio.Task[None]] = None
self._runtime_task: Optional[asyncio.Task[None]] = None
self._loop: Optional[asyncio.AbstractEventLoop] = None self._loop: Optional[asyncio.AbstractEventLoop] = None
self._runtime_lock = asyncio.Lock()
self._state = LifecycleState.IDLE self._state = LifecycleState.IDLE
self._last_execute_error: Optional[str] = None self._last_execute_error: Optional[str] = None
self._last_success_timestamp: Optional[float] = None self._last_success_timestamp: Optional[float] = None
self._worker_degraded = False
self._worker_inflight_count = 0
self._worker_timed_out_inflight_count = 0
self._last_health_status: Optional[str] = None
settings = management_settings or ManagementServerSettings(enabled=True) initial_config = self._loader.load_sync()
self._management_settings = settings self.config = initial_config
self._health_timeout = settings.health_timeout self._health_timeout = _read_env_health_timeout(float(self.DEFAULT_HEALTH_TIMEOUT))
self._health_aggregator = HealthAggregator( self._health_aggregator = HealthAggregator(
get_state=lambda: self._state, get_state=lambda: self._state,
get_last_error=lambda: self._last_execute_error, get_last_error=lambda: self._last_execute_error,
@@ -248,28 +385,26 @@ class ConfigManagerV2(_RuntimeController):
health_timeout=self._health_timeout, health_timeout=self._health_timeout,
get_app_health=self.get_health_status, get_app_health=self.get_health_status,
) )
self._api_bridge = ManagementApiBridge(start_fn=self.start, stop_fn=self.stop)
self._control_bridge = ControlChannelBridge( self._control_bridge = ControlChannelBridge(
halt=self._halt,
get_state=lambda: self._state, get_state=lambda: self._state,
get_status=self._status_text, get_status=self._status_text,
start_runtime=self._start_runtime,
stop_runtime=self._stop_runtime,
) )
self._management_server: Optional[ManagementServer] = None if control_channels is None:
if settings.enabled: self._control_channels = []
self._management_server = ManagementServer( elif callable(control_channels):
host=settings.host, self._control_channels = list(control_channels(self))
port=settings.port, else:
timeout=settings.timeout, self._control_channels = list(control_channels)
health_provider=self._health_aggregator.collect, self.logger.debug(
on_start=self._api_bridge.on_start, "ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s execute_timeout=%s health_timeout=%s control_channels=%s",
on_stop=self._api_bridge.on_stop,
)
self.logger.warning(
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s management_enabled=%s",
self.path, self.path,
self.update_interval, self.update_interval,
self.work_interval, self.work_interval,
self._management_server is not None, self._execute_timeout,
self._health_timeout,
len(self._control_channels),
) )
def _apply_config(self, new_config: Any) -> None: def _apply_config(self, new_config: Any) -> None:
@@ -280,7 +415,7 @@ class ConfigManagerV2(_RuntimeController):
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._apply_config error while applying logging config") self.logger.exception("ConfigManagerV2._apply_config error while applying logging config")
raise raise
self.logger.warning( self.logger.debug(
"ConfigManagerV2._apply_config result: config_type=%s is_dict=%s", "ConfigManagerV2._apply_config result: config_type=%s is_dict=%s",
type(new_config).__name__, type(new_config).__name__,
isinstance(new_config, dict), isinstance(new_config, dict),
@@ -290,14 +425,14 @@ class ConfigManagerV2(_RuntimeController):
try: try:
changed, new_config = await self._loader.load_if_changed() changed, new_config = await self._loader.load_if_changed()
if not changed: if not changed:
self.logger.warning("ConfigManagerV2._update_config result: no changes") self.logger.debug("ConfigManagerV2._update_config result: no changes")
return return
self._apply_config(new_config) self._apply_config(new_config)
self.logger.warning("ConfigManagerV2._update_config result: config updated") self.logger.warning("ConfigManagerV2._update_config result: config updated and applied")
except Exception as exc: # noqa: BLE001 except Exception as exc: # noqa: BLE001
self.logger.exception("ConfigManagerV2._update_config error") self.logger.exception("ConfigManagerV2._update_config error")
if self._loader.last_valid_config is None: if self._loader.last_valid_config is None:
self.logger.warning( self.logger.debug(
"ConfigManagerV2._update_config result: no fallback config available detail=%s", "ConfigManagerV2._update_config result: no fallback config available detail=%s",
str(exc), str(exc),
) )
@@ -314,11 +449,18 @@ class ConfigManagerV2(_RuntimeController):
"""Override in subclasses.""" """Override in subclasses."""
def get_health_status(self) -> HealthPayload: def get_health_status(self) -> HealthPayload:
if self._worker_degraded:
return {"status": "degraded", "detail": "worker has timed-out in-flight execute()"}
return {"status": "ok"} return {"status": "ok"}
def get_health_provider(self) -> Callable[[], Any]:
"""Вернуть колбэк для health (для передачи в HttpControlChannel при создании канала снаружи)."""
return self._health_aggregator.collect
async def start(self) -> None: async def start(self) -> None:
if self._task is not None and not self._task.done(): if self._task is not None and not self._task.done():
self.logger.warning("ConfigManagerV2.start result: already running") await self._start_runtime()
self.logger.debug("ConfigManagerV2.start result: already running")
return return
try: try:
self._loop = asyncio.get_running_loop() self._loop = asyncio.get_running_loop()
@@ -326,23 +468,20 @@ class ConfigManagerV2(_RuntimeController):
self.logger.exception("ConfigManagerV2.start error: must be called from within async context") self.logger.exception("ConfigManagerV2.start error: must be called from within async context")
raise raise
self._task = asyncio.create_task(self._run(), name="config-manager-v2") self._task = asyncio.create_task(self._run(), name="config-manager-v2")
self._task.add_done_callback(self._on_runtime_task_done) self.logger.debug("ConfigManagerV2.start result: background task started")
self.logger.warning("ConfigManagerV2.start result: background task started")
async def stop(self) -> None: async def stop(self) -> None:
if self._task is None: if self._task is None:
self.logger.warning("ConfigManagerV2.stop result: not running") self.logger.debug("ConfigManagerV2.stop result: not running")
return return
self._shutdown.set()
self._halt.set() self._halt.set()
if asyncio.current_task() is self._task: if asyncio.current_task() is self._task:
self.logger.warning("ConfigManagerV2.stop result: stop requested from runtime task") self.logger.debug("ConfigManagerV2.stop result: stop requested from supervisor task")
return return
try: try:
await self._task await self._task
except asyncio.CancelledError: except asyncio.CancelledError:
self.logger.warning("ConfigManagerV2.stop result: runtime task cancelled") self.logger.debug("ConfigManagerV2.stop result: supervisor task cancelled")
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2.stop error while awaiting runtime task")
raise
finally: finally:
self.logger.warning("ConfigManagerV2.stop result: completed") self.logger.debug("ConfigManagerV2.stop result: completed")

View File

@@ -1,17 +1,32 @@
"""Цикл воркера: повторяющийся вызов блокирующего execute() в потоке с паузой между итерациями. """Цикл воркера: повторяющийся вызов блокирующего execute() в потоке.
Поддерживает halt-событие для остановки, колбэки on_success/on_error для учёта ошибок и здоровья.""" Базовый режим: один in-flight execute().
Режим деградации: если активный execute() превысил timeout, запускается второй worker-thread,
чтобы работа продолжалась. Одновременно допускается не более двух in-flight задач."""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import logging import logging
import time
from collections.abc import Callable from collections.abc import Callable
from dataclasses import dataclass
from itertools import count
from typing import Optional from typing import Optional
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@dataclass
class _InFlightExecute:
id: int
task: asyncio.Task[None]
started_at: float
timeout_reported: bool = False
class WorkerLoop: class WorkerLoop:
LOOP_TICK_SECONDS = 0.02
def __init__( def __init__(
self, self,
execute: Callable[[], None], execute: Callable[[], None],
@@ -19,36 +34,139 @@ class WorkerLoop:
halt_event: asyncio.Event, halt_event: asyncio.Event,
on_error: Optional[Callable[[Exception], None]] = None, on_error: Optional[Callable[[Exception], None]] = None,
on_success: Optional[Callable[[], None]] = None, on_success: Optional[Callable[[], None]] = None,
execute_timeout: Optional[float] = None,
on_degraded_change: Optional[Callable[[bool], None]] = None,
on_metrics_change: Optional[Callable[[int, int], None]] = None,
): ):
"""Сохранить колбэки и примитивы синхронизации для выполнения воркера."""
self._execute = execute self._execute = execute
self._get_interval = get_interval self._get_interval = get_interval
self._halt_event = halt_event self._halt_event = halt_event
self._on_error = on_error self._on_error = on_error
self._on_success = on_success self._on_success = on_success
logger.warning( self._execute_timeout = execute_timeout
"WorkerLoop.__init__ result: execute=%s", self._on_degraded_change = on_degraded_change
getattr(execute, "__name__", execute.__class__.__name__), self._on_metrics_change = on_metrics_change
self._inflight: list[_InFlightExecute] = []
self._id_seq = count(1)
self._degraded = False
self._last_metrics: Optional[tuple[int, int]] = None
self._next_start_at = 0.0
logger.debug(
"WorkerLoop.__init__ result: execute=%s execute_timeout=%s",
getattr(execute, "__name__", type(execute).__name__),
execute_timeout,
) )
async def run(self) -> None: def _notify_error(self, exc: Exception) -> None:
"""Вызывать execute циклически до запроса остановки."""
logger.warning("WorkerLoop.run result: started")
while not self._halt_event.is_set():
try:
await asyncio.to_thread(self._execute)
if self._on_success is not None:
self._on_success()
logger.warning("WorkerLoop.run result: execute completed")
except Exception as exc: # noqa: BLE001
logger.exception("WorkerLoop.run error during execute")
if self._on_error is not None: if self._on_error is not None:
self._on_error(exc) self._on_error(exc)
logger.warning("WorkerLoop.run result: execute failed")
timeout = max(self._get_interval(), 0.01) def _set_degraded(self, value: bool) -> None:
if self._degraded == value:
return
self._degraded = value
if self._on_degraded_change is not None:
self._on_degraded_change(value)
logger.debug("WorkerLoop.run degraded state changed: degraded=%s", value)
def _start_execute(self) -> None:
execute_id = next(self._id_seq)
execution = _InFlightExecute(
id=execute_id,
task=asyncio.create_task(asyncio.to_thread(self._execute), name=f"worker-loop-execute-{execute_id}"),
started_at=time.monotonic(),
)
self._inflight.append(execution)
logger.debug("WorkerLoop.run result: execute started id=%s inflight=%s", execution.id, len(self._inflight))
def _collect_finished(self) -> None:
pending: list[_InFlightExecute] = []
for execution in self._inflight:
task = execution.task
if not task.done():
pending.append(execution)
continue
try:
task.result()
if self._on_success is not None:
self._on_success()
logger.debug("WorkerLoop.run result: execute completed id=%s", execution.id)
except Exception as exc: # noqa: BLE001
self._notify_error(exc)
logger.exception("WorkerLoop.run error during execute id=%s", execution.id)
self._inflight = pending
def _mark_timeouts(self) -> None:
if self._execute_timeout is None or self._execute_timeout <= 0:
return
now = time.monotonic()
for execution in self._inflight:
if execution.timeout_reported:
continue
if execution.task.done():
continue
elapsed = now - execution.started_at
if elapsed < self._execute_timeout:
continue
execution.timeout_reported = True
self._notify_error(TimeoutError(f"execute() did not finish within {self._execute_timeout}s"))
logger.debug(
"WorkerLoop.run execute timeout: id=%s elapsed=%.3fs timeout=%ss",
execution.id,
elapsed,
self._execute_timeout,
)
def _has_timed_out_inflight(self) -> bool:
return any(item.timeout_reported and not item.task.done() for item in self._inflight)
def _ensure_capacity(self) -> None:
now = time.monotonic()
if now < self._next_start_at:
return
active_count = len(self._inflight)
if active_count == 0:
self._start_execute()
self._next_start_at = now + max(self._get_interval(), 0.01)
return
if active_count == 1 and self._has_timed_out_inflight():
self._start_execute()
self._next_start_at = now + max(self._get_interval(), 0.01)
return
def _emit_metrics(self) -> None:
if self._on_metrics_change is None:
return
inflight_count = len(self._inflight)
timed_out_count = sum(1 for item in self._inflight if item.timeout_reported and not item.task.done())
metrics = (inflight_count, timed_out_count)
if self._last_metrics == metrics:
return
self._last_metrics = metrics
self._on_metrics_change(inflight_count, timed_out_count)
async def run(self) -> None:
logger.debug("WorkerLoop.run result: started")
while not self._halt_event.is_set():
self._collect_finished()
self._mark_timeouts()
self._set_degraded(self._has_timed_out_inflight())
self._ensure_capacity()
self._emit_metrics()
timeout = self.LOOP_TICK_SECONDS
try: try:
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout) await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
continue continue
logger.warning("WorkerLoop.run result: stopped")
if self._inflight:
if self._has_timed_out_inflight():
logger.debug("WorkerLoop.run stop: timed-out execute still running; exiting without waiting")
else:
await asyncio.gather(*(item.task for item in self._inflight), return_exceptions=True)
self._collect_finished()
self._set_degraded(False)
self._emit_metrics()
logger.debug("WorkerLoop.run result: stopped")

View File

@@ -0,0 +1,25 @@
"""Типы core: состояние здоровья и жизненного цикла.
Используются в core и control для единообразных контрактов."""
from __future__ import annotations
from enum import Enum
from typing import Literal, TypedDict
HealthState = Literal["ok", "degraded", "unhealthy"]
class HealthPayload(TypedDict, total=False):
status: HealthState
detail: str
state: str
"""Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped)."""
class LifecycleState(str, Enum):
IDLE = "idle"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
STOPPED = "stopped"

View File

@@ -1,14 +0,0 @@
"""Management API: HTTP-сервер для /health и /actions/start|stop, адаптеры и сбор здоровья.
Объединяет сервер, мосты к жизненному циклу и агрегатор здоровья для единого контракта API."""
from .bridges import ControlChannelBridge, ManagementApiBridge
from .health_aggregator import HealthAggregator
from .management_server import HealthServer, ManagementServer
__all__ = [
"ControlChannelBridge",
"HealthAggregator",
"HealthServer",
"ManagementApiBridge",
"ManagementServer",
]

View File

@@ -1,90 +0,0 @@
"""Адаптеры, связывающие жизненный цикл менеджера с HTTP API и каналами управления.
ManagementApiBridge отдаёт start/stop в HTTP; ControlChannelBridge — start/stop/status в Telegram и др."""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Awaitable, Callable
from ..types import LifecycleState
logger = logging.getLogger(__name__)
class ManagementApiBridge:
"""Предоставляет start/stop жизненного цикла как async-колбэки для ManagementServer (/actions/start, /actions/stop)."""
def __init__(
self,
start_fn: Callable[[], Awaitable[None]],
stop_fn: Callable[[], Awaitable[None]],
):
self._start_fn = start_fn
self._stop_fn = stop_fn
logger.warning("ManagementApiBridge.__init__ result: callbacks configured")
async def on_start(self) -> str:
"""Выполнить start и вернуть сообщение для HTTP-ответа."""
try:
await self._start_fn()
except Exception: # noqa: BLE001
logger.exception("ManagementApiBridge.on_start error")
raise
result = "start completed"
logger.warning("ManagementApiBridge.on_start result: %s", result)
return result
async def on_stop(self) -> str:
"""Выполнить stop и вернуть сообщение для HTTP-ответа."""
try:
await self._stop_fn()
except Exception: # noqa: BLE001
logger.exception("ManagementApiBridge.on_stop error")
raise
result = "stop completed"
logger.warning("ManagementApiBridge.on_stop result: %s", result)
return result
class ControlChannelBridge:
"""Предоставляет halt и status как обработчики start/stop/status для ControlChannel (например Telegram)."""
def __init__(
self,
halt: asyncio.Event,
get_state: Callable[[], LifecycleState],
get_status: Callable[[], Awaitable[str]],
):
self._halt = halt
self._get_state = get_state
self._get_status = get_status
logger.warning("ControlChannelBridge.__init__ result: callbacks configured")
async def on_start(self) -> str:
"""Обработать внешний start: сбросить halt; идемпотентно при уже running."""
if self._get_state() == LifecycleState.RUNNING:
result = "already running"
logger.warning("ControlChannelBridge.on_start result: %s", result)
return result
self._halt.clear()
result = "start signal accepted"
logger.warning("ControlChannelBridge.on_start result: %s", result)
return result
async def on_stop(self) -> str:
"""Обработать внешний stop: установить halt."""
self._halt.set()
result = "stop signal accepted"
logger.warning("ControlChannelBridge.on_stop result: %s", result)
return result
async def on_status(self) -> str:
"""Вернуть текущий текст статуса."""
try:
result = await self._get_status()
except Exception: # noqa: BLE001
logger.exception("ControlChannelBridge.on_status error")
raise
logger.warning("ControlChannelBridge.on_status result: %s", result)
return result

View File

@@ -0,0 +1,16 @@
"""Public tracing API for config_manager-based applications."""
from .manager import TraceManager
from .models import TraceContextRecord, TraceLogMessage
from .store import ActiveTraceContext, TraceContextStore
from .transport.base import NoOpTraceTransport, TraceTransport
__all__ = [
"ActiveTraceContext",
"NoOpTraceTransport",
"TraceContextRecord",
"TraceContextStore",
"TraceLogMessage",
"TraceManager",
"TraceTransport",
]

View File

@@ -0,0 +1,96 @@
"""High-level tracing API independent from the logging module."""
from __future__ import annotations
from contextlib import contextmanager
from typing import Any, Dict, Iterator, Optional
from uuid import uuid4
from .models import TraceContextRecord, TraceLogMessage, utc_now
from .store import TraceContextStore
from .transport.base import NoOpTraceTransport, TraceTransport
class TraceManager:
"""Creates trace contexts and writes trace messages for active context."""
def __init__(self, transport: Optional[TraceTransport] = None, store: Optional[TraceContextStore] = None) -> None:
self.transport = transport or NoOpTraceTransport()
self.store = store or TraceContextStore()
def bind_context(
self,
*,
alias: str,
parent_id: Optional[str] = None,
type: Optional[str] = None,
attrs: Optional[Dict[str, Any]] = None,
) -> str:
"""Create and activate a trace context, returning its trace identifier."""
record = TraceContextRecord(
trace_id=uuid4().hex,
parent_id=parent_id,
alias=str(alias or ""),
type=str(type) if type is not None else None,
event_time=utc_now(),
attrs=dict(attrs or {}),
)
self.store.push(record)
self.transport.write_context(record)
return record.trace_id
@contextmanager
def open_context(
self,
*,
alias: str,
parent_id: Optional[str] = None,
type: Optional[str] = None,
attrs: Optional[Dict[str, Any]] = None,
) -> Iterator[str]:
"""Open nested trace context and restore previous one after exit."""
trace_id = self.bind_context(alias=alias, parent_id=parent_id, type=type, attrs=attrs)
try:
yield trace_id
finally:
self.store.pop()
def current_trace_id(self) -> Optional[str]:
"""Return current active trace identifier."""
return self.store.current_trace_id()
def close_context(self) -> Optional[str]:
"""Close current context and return restored parent trace id, if any."""
previous = self.store.pop()
return previous.record.trace_id if previous else None
def step(self, name: str) -> None:
"""Set current step for subsequent messages."""
self.store.set_step(name)
def info(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
self._write_message(level="INFO", message=message, status=status, attrs=attrs)
def warning(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
self._write_message(level="WARNING", message=message, status=status, attrs=attrs)
def error(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
self._write_message(level="ERROR", message=message, status=status, attrs=attrs)
def exception(self, message: str, *, status: str = "failed", attrs: Optional[Dict[str, Any]] = None) -> None:
self._write_message(level="ERROR", message=message, status=status, attrs=attrs)
def _write_message(self, *, level: str, message: str, status: str, attrs: Optional[Dict[str, Any]]) -> None:
active = self.store.current()
if active is None:
raise RuntimeError("Trace context is not bound. Call bind_context() first.")
record = TraceLogMessage(
trace_id=active.record.trace_id,
event_time=utc_now(),
step=active.step,
status=str(status or ""),
level=level,
message=str(message or ""),
attrs=dict(attrs or {}),
)
self.transport.write_message(record)

View File

@@ -0,0 +1,37 @@
"""Data models for trace contexts and trace messages."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, Optional
def utc_now() -> datetime:
"""Return current UTC time with timezone for trace events."""
return datetime.now(timezone.utc)
@dataclass(frozen=True)
class TraceContextRecord:
"""Represents a logical unit that groups trace messages."""
trace_id: str
alias: str
parent_id: Optional[str] = None
type: Optional[str] = None
event_time: datetime = field(default_factory=utc_now)
attrs: Dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class TraceLogMessage:
"""Represents a single trace message linked to a trace context."""
trace_id: str
step: str
status: str
message: str
level: str
event_time: datetime = field(default_factory=utc_now)
attrs: Dict[str, Any] = field(default_factory=dict)

View File

@@ -0,0 +1,70 @@
"""Context-local storage for active trace contexts."""
from __future__ import annotations
from contextvars import ContextVar
from dataclasses import dataclass, replace
from typing import Optional, Tuple
from .models import TraceContextRecord
@dataclass(frozen=True)
class ActiveTraceContext:
"""Stores current trace context and active processing step."""
record: TraceContextRecord
step: str = ""
class TraceContextStore:
"""Keeps active trace context stack in the current execution context."""
def __init__(self) -> None:
self._current: ContextVar[Optional[ActiveTraceContext]] = ContextVar("trace_current", default=None)
self._stack: ContextVar[Tuple[ActiveTraceContext, ...]] = ContextVar("trace_stack", default=())
def current(self) -> Optional[ActiveTraceContext]:
"""Return the current active trace context, if present."""
return self._current.get()
def current_trace_id(self) -> Optional[str]:
"""Return current trace identifier, if present."""
current = self.current()
return current.record.trace_id if current else None
def current_step(self) -> str:
"""Return current active step or empty string."""
current = self.current()
return current.step if current else ""
def push(self, record: TraceContextRecord) -> ActiveTraceContext:
"""Activate a new trace context and preserve the previous one in stack."""
current = self.current()
stack = self._stack.get()
if current is not None:
stack = stack + (current,)
self._stack.set(stack)
active = ActiveTraceContext(record=record)
self._current.set(active)
return active
def pop(self) -> Optional[ActiveTraceContext]:
"""Restore the previous trace context from stack."""
stack = self._stack.get()
if not stack:
self._current.set(None)
return None
previous = stack[-1]
self._stack.set(stack[:-1])
self._current.set(previous)
return previous
def set_step(self, step: str) -> Optional[ActiveTraceContext]:
"""Assign step to current active context."""
current = self.current()
if current is None:
return None
updated = replace(current, step=str(step or ""))
self._current.set(updated)
return updated

View File

@@ -0,0 +1,5 @@
"""Trace transports."""
from .base import NoOpTraceTransport, TraceTransport
__all__ = ["NoOpTraceTransport", "TraceTransport"]

View File

@@ -0,0 +1,27 @@
"""Transport interfaces for trace persistence."""
from __future__ import annotations
from typing import Protocol
from ..models import TraceContextRecord, TraceLogMessage
class TraceTransport(Protocol):
"""Writes trace records to an external destination."""
def write_context(self, record: TraceContextRecord) -> None:
"""Persist trace context record."""
def write_message(self, record: TraceLogMessage) -> None:
"""Persist trace log message."""
class NoOpTraceTransport:
"""Default transport that ignores all trace records."""
def write_context(self, record: TraceContextRecord) -> None:
return None
def write_message(self, record: TraceLogMessage) -> None:
return None

View File

@@ -0,0 +1,97 @@
"""MySQL transport for trace contexts and trace messages."""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any, Dict, Optional
import pymysql
from ..models import TraceContextRecord, TraceLogMessage
class MySqlTraceTransport:
"""Persists trace records into dedicated MySQL tables."""
TRACE_CONTEXTS_TABLE = "trace_contexts"
TRACE_MESSAGES_TABLE = "trace_messages"
def __init__(
self,
*,
host: str,
port: int,
database: str,
user: str,
password: str,
connect_timeout: int = 5,
charset: str = "utf8mb4",
) -> None:
self.host = host
self.port = int(port)
self.database = database
self.user = user
self.password = password
self.connect_timeout = connect_timeout
self.charset = charset
def write_context(self, record: TraceContextRecord) -> None:
query = (
f"INSERT INTO {self.TRACE_CONTEXTS_TABLE} "
"(trace_id, parent_id, alias, type, event_time, attrs_json) "
"VALUES (%s, %s, %s, %s, %s, %s)"
)
params = (
record.trace_id,
record.parent_id,
record.alias,
record.type,
self._normalize_time(record.event_time),
self._serialize_attrs(record.attrs),
)
self._execute(query, params)
def write_message(self, record: TraceLogMessage) -> None:
query = (
f"INSERT INTO {self.TRACE_MESSAGES_TABLE} "
"(trace_id, event_time, step, status, level, message, attrs_json) "
"VALUES (%s, %s, %s, %s, %s, %s, %s)"
)
params = (
record.trace_id,
self._normalize_time(record.event_time),
record.step,
record.status,
record.level,
record.message,
self._serialize_attrs(record.attrs),
)
self._execute(query, params)
def _execute(self, query: str, params: tuple[Any, ...]) -> None:
connection = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset=self.charset,
connect_timeout=self.connect_timeout,
autocommit=True,
)
try:
with connection.cursor() as cursor:
cursor.execute(query, params)
finally:
connection.close()
@staticmethod
def _serialize_attrs(attrs: Optional[Dict[str, Any]]) -> str:
return json.dumps(attrs or {}, ensure_ascii=False, default=str, sort_keys=True)
@staticmethod
def _normalize_time(value: datetime) -> datetime:
if value.tzinfo is None:
return value
return value.astimezone(timezone.utc).replace(tzinfo=None)

View File

@@ -1,42 +0,0 @@
"""Общие типы V2: состояние здоровья, жизненного цикла и настройки management-сервера.
Используются в core, management и control для единообразных контрактов."""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Literal, TypedDict
HealthState = Literal["ok", "degraded", "unhealthy"]
class HealthPayload(TypedDict, total=False):
status: HealthState
detail: str
state: str
"""Текущее состояние жизненного цикла (idle/starting/running/stopping/stopped)."""
class LifecycleState(str, Enum):
IDLE = "idle"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
STOPPED = "stopped"
@dataclass
class ManagementServerSettings:
"""Настройки management HTTP-сервера и healthcheck (один объект на оба)."""
enabled: bool = False
host: str = "0.0.0.0"
port: int = 8000
timeout: int = 3
"""Таймаут запроса health (секунды)."""
health_timeout: int = 30
"""Секунды без успешного execute(), после которых health = unhealthy."""
# Backward-compatible alias.
HealthServerSettings = ManagementServerSettings

View File

@@ -1,6 +1,14 @@
# === Раздел с общими конфигурационными параметрами === # === Раздел с общими конфигурационными параметрами ===
runtime: 5 runtime: 5
# === HTTP-канал управления (ConfigManagerV2): /health, /actions/start, /actions/stop ===
management:
enabled: true
host: "0.0.0.0"
port: 8000
timeout: 3
health_timeout: 30
# === Логирование === # === Логирование ===
log: log:
version: 1 version: 1

View File

@@ -6,14 +6,10 @@ import logging
from pathlib import Path from pathlib import Path
from config_manager import ConfigManager from config_manager import ConfigManager
from config_manager.v1.log_manager import LogManager from config_manager.v2.control import HttpControlChannel
from config_manager.v2 import ManagementServerSettings
logger = logging.getLogger() logger = logging.getLogger()
# Таймаут health: без успешного execute() дольше этого времени — unhealthy.
HEALTH_TIMEOUT = 3.0
class MyApp(ConfigManager): class MyApp(ConfigManager):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@@ -31,21 +27,20 @@ class MyApp(ConfigManager):
async def main() -> None: async def main() -> None:
log_manager = LogManager()
# Один объект: и HTTP management-сервер (enabled, port), и health (health_timeout).
management_settings = ManagementServerSettings(
enabled=True,
port=8000,
health_timeout=HEALTH_TIMEOUT,
)
config_path = Path(__file__).parent / "config.yaml" config_path = Path(__file__).parent / "config.yaml"
print(config_path) print(config_path)
app = MyApp( app = MyApp(
str(config_path), str(config_path),
log_manager=log_manager, control_channels=lambda m: [
management_settings=management_settings, HttpControlChannel(
host="0.0.0.0",
port=8000,
timeout=3,
health_provider=m.get_health_provider(),
) )
logger.info("App starting (health_timeout=%s)", HEALTH_TIMEOUT) ],
)
logger.info("App starting")
# Менеджер запускаем в фоне (start() не возвращает управление до stop). # Менеджер запускаем в фоне (start() не возвращает управление до stop).
asyncio.create_task(app.start()) asyncio.create_task(app.start())

View File

@@ -1,6 +1,6 @@
import asyncio import asyncio
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings from config_manager.v2 import ConfigManagerV2
class ReloadApp(ConfigManagerV2): class ReloadApp(ConfigManagerV2):
@@ -14,9 +14,9 @@ class ReloadApp(ConfigManagerV2):
def test_invalid_config_keeps_last_valid(tmp_path): def test_invalid_config_keeps_last_valid(tmp_path):
async def scenario() -> None: async def scenario() -> None:
cfg = tmp_path / "config.yaml" cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8") cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = ReloadApp(str(cfg), management_settings=ManagementServerSettings(enabled=False)) app = ReloadApp(str(cfg))
runner = asyncio.create_task(app.start()) runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12) await asyncio.sleep(0.12)

View File

@@ -1,6 +1,6 @@
import asyncio import asyncio
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings from config_manager.v2 import ConfigManagerV2
class DemoApp(ConfigManagerV2): class DemoApp(ConfigManagerV2):
@@ -18,9 +18,9 @@ class DemoApp(ConfigManagerV2):
def test_execute_loop_runs(tmp_path): def test_execute_loop_runs(tmp_path):
async def scenario() -> None: async def scenario() -> None:
cfg = tmp_path / "config.yaml" cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8") cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = DemoApp(str(cfg), management_settings=ManagementServerSettings(enabled=False)) app = DemoApp(str(cfg))
runner = asyncio.create_task(app.start()) runner = asyncio.create_task(app.start())
await asyncio.sleep(0.18) await asyncio.sleep(0.18)

View File

@@ -1,6 +1,6 @@
import asyncio import asyncio
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings from config_manager.v2 import ConfigManagerV2
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
@@ -33,14 +33,10 @@ class ControlledApp(ConfigManagerV2):
def test_control_channel_can_stop_manager(tmp_path): def test_control_channel_can_stop_manager(tmp_path):
async def scenario() -> None: async def scenario() -> None:
cfg = tmp_path / "config.yaml" cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8") cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
channel = DummyControlChannel() channel = DummyControlChannel()
app = ControlledApp( app = ControlledApp(str(cfg), control_channels=[channel])
str(cfg),
control_channel=channel,
management_settings=ManagementServerSettings(enabled=False),
)
runner = asyncio.create_task(app.start()) runner = asyncio.create_task(app.start())
await asyncio.sleep(0.12) await asyncio.sleep(0.12)
@@ -51,11 +47,14 @@ def test_control_channel_can_stop_manager(tmp_path):
status_text = await channel.on_status() status_text = await channel.on_status()
assert "state=running" in status_text assert "state=running" in status_text
assert "worker_inflight=" in status_text
assert "worker_timed_out_inflight=" in status_text
stop_text = await channel.on_stop() stop_text = await channel.on_stop()
assert "stop signal accepted" in stop_text assert "stop signal accepted" in stop_text
await runner await runner
# Менеджер при остановке не вызывает control_channel.stop() (канал остаётся доступным) await app.stop()
assert channel.stopped is True
asyncio.run(scenario()) asyncio.run(scenario())

View File

@@ -1,7 +1,7 @@
import asyncio import asyncio
import json import json
from config_manager.v2.management import ManagementServer from config_manager.v2.control import HttpControlChannel
def test_health_mapping_ok_to_200(): def test_health_mapping_ok_to_200():
@@ -9,7 +9,7 @@ def test_health_mapping_ok_to_200():
return {"status": "ok"} return {"status": "ok"}
async def scenario() -> None: async def scenario() -> None:
server = ManagementServer( server = HttpControlChannel(
host="127.0.0.1", host="127.0.0.1",
port=8000, port=8000,
timeout=0.2, timeout=0.2,
@@ -27,7 +27,7 @@ def test_health_mapping_unhealthy_to_503():
return {"status": "unhealthy", "detail": "worker failed"} return {"status": "unhealthy", "detail": "worker failed"}
async def scenario() -> None: async def scenario() -> None:
server = ManagementServer( server = HttpControlChannel(
host="127.0.0.1", host="127.0.0.1",
port=8000, port=8000,
timeout=0.2, timeout=0.2,
@@ -54,6 +54,9 @@ def test_action_routes_call_callbacks():
events.append("stop") events.append("stop")
return "stop accepted" return "stop accepted"
async def on_status() -> str:
return "ok"
async def request(port: int, path: str) -> tuple[int, dict[str, str]]: async def request(port: int, path: str) -> tuple[int, dict[str, str]]:
reader, writer = await asyncio.open_connection("127.0.0.1", port) reader, writer = await asyncio.open_connection("127.0.0.1", port)
writer.write( writer.write(
@@ -70,23 +73,21 @@ def test_action_routes_call_callbacks():
return status_code, payload return status_code, payload
async def scenario() -> None: async def scenario() -> None:
server = ManagementServer( channel = HttpControlChannel(
host="127.0.0.1", host="127.0.0.1",
port=0, port=0,
timeout=0.2, timeout=0.2,
health_provider=provider, health_provider=provider,
on_start=on_start,
on_stop=on_stop,
) )
await server.start() await channel.start(on_start, on_stop, on_status)
try: try:
port = server.port port = channel.port
assert port > 0 assert port > 0
start_code, start_payload = await request(port, "/actions/start") start_code, start_payload = await request(port, "/actions/start")
stop_code, stop_payload = await request(port, "/actions/stop") stop_code, stop_payload = await request(port, "/actions/stop")
finally: finally:
await server.stop() await channel.stop()
assert start_code == 200 assert start_code == 200
assert start_payload["status"] == "ok" assert start_payload["status"] == "ok"

View File

@@ -0,0 +1,26 @@
from config_manager.v2 import ConfigManagerV2
class TimeoutApp(ConfigManagerV2):
def execute(self) -> None:
return
def test_health_timeout_uses_main_env_key(tmp_path, monkeypatch):
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\nhealth_timeout: 150\n", encoding="utf-8")
monkeypatch.setenv("HEALTH_TIMEOUT", "120")
monkeypatch.setenv("HEALTHY_TIMEOUT", "300")
app = TimeoutApp(str(cfg))
assert app._health_timeout == 120.0
def test_health_timeout_defaults_to_90_when_env_not_set(tmp_path, monkeypatch):
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\nhealth_timeout: 150\n", encoding="utf-8")
monkeypatch.delenv("HEALTH_TIMEOUT", raising=False)
monkeypatch.delenv("HEALTHY_TIMEOUT", raising=False)
app = TimeoutApp(str(cfg))
assert app._health_timeout == 90.0

View File

@@ -0,0 +1,151 @@
import asyncio
import threading
import time
from config_manager.v2 import ConfigManagerV2
from config_manager.v2.control.base import ControlChannel, StartHandler, StatusHandler, StopHandler
class DummyControlChannel(ControlChannel):
def __init__(self):
self.on_start: StartHandler | None = None
self.on_stop: StopHandler | None = None
self.on_status: StatusHandler | None = None
self.started = False
self.stopped = False
async def start(self, on_start: StartHandler, on_stop: StopHandler, on_status: StatusHandler) -> None:
self.on_start = on_start
self.on_stop = on_stop
self.on_status = on_status
self.started = True
async def stop(self) -> None:
self.stopped = True
class RestartableApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.05
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
def execute(self) -> None:
self.calls += 1
class TimeoutAwareApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.02
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
self.active = 0
self.max_active = 0
self._lock = threading.Lock()
def execute(self) -> None:
with self._lock:
self.calls += 1
self.active += 1
self.max_active = max(self.max_active, self.active)
try:
time.sleep(0.2)
finally:
with self._lock:
self.active -= 1
class NormalSingleThreadApp(ConfigManagerV2):
DEFAULT_UPDATE_INTERVAL = 0.05
DEFAULT_WORK_INTERVAL = 0.02
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.calls = 0
self.active = 0
self.max_active = 0
self._lock = threading.Lock()
def execute(self) -> None:
with self._lock:
self.calls += 1
self.active += 1
self.max_active = max(self.max_active, self.active)
try:
time.sleep(0.03)
finally:
with self._lock:
self.active -= 1
def test_control_channel_stop_and_start_resumes_execute(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
channel = DummyControlChannel()
app = RestartableApp(str(cfg), control_channels=[channel])
await app.start()
await asyncio.sleep(0.2)
before_stop = app.calls
assert before_stop > 0
assert channel.on_stop is not None
assert channel.on_start is not None
stop_text = await channel.on_stop()
assert "stop signal accepted" in stop_text
await asyncio.sleep(0.2)
after_stop = app.calls
assert after_stop == before_stop
start_text = await channel.on_start()
assert "start signal accepted" in start_text
await asyncio.sleep(0.2)
assert app.calls > after_stop
await app.stop()
assert channel.stopped is True
asyncio.run(scenario())
def test_normal_mode_uses_single_inflight_execute(tmp_path):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = NormalSingleThreadApp(str(cfg))
await app.start()
await asyncio.sleep(0.25)
health = await app.get_health_provider()()
await app.stop()
assert app.calls >= 2
assert app.max_active == 1
assert health["status"] == "ok"
asyncio.run(scenario())
def test_execute_timeout_does_not_start_parallel_runs(tmp_path, monkeypatch):
async def scenario() -> None:
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
monkeypatch.setenv("EXECUTE_TIMEOUT", "0.05")
app = TimeoutAwareApp(str(cfg))
await app.start()
await asyncio.sleep(0.35)
degraded_health = await app.get_health_provider()()
await app.stop()
assert app.calls >= 1
assert app.max_active == 2
assert degraded_health["status"] == "degraded"
asyncio.run(scenario())

View File

@@ -2,7 +2,7 @@ import asyncio
import threading import threading
import time import time
from config_manager.v2 import ConfigManagerV2, ManagementServerSettings from config_manager.v2 import ConfigManagerV2
class BlockingApp(ConfigManagerV2): class BlockingApp(ConfigManagerV2):
@@ -26,9 +26,9 @@ class BlockingApp(ConfigManagerV2):
def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path): def test_stop_waits_for_active_execute_and_prevents_next_run(tmp_path):
async def scenario() -> None: async def scenario() -> None:
cfg = tmp_path / "config.yaml" cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\n", encoding="utf-8") cfg.write_text("log: {}\nmanagement: { enabled: false }\n", encoding="utf-8")
app = BlockingApp(str(cfg), management_settings=ManagementServerSettings(enabled=False)) app = BlockingApp(str(cfg))
runner = asyncio.create_task(app.start()) runner = asyncio.create_task(app.start())
started = await asyncio.to_thread(app.started_event.wait, 1.0) started = await asyncio.to_thread(app.started_event.wait, 1.0)

View File

@@ -0,0 +1,79 @@
from __future__ import annotations
from dataclasses import dataclass, field
from config_manager.v2.trace import TraceManager
@dataclass
class MemoryTraceTransport:
contexts: list = field(default_factory=list)
messages: list = field(default_factory=list)
def write_context(self, record) -> None:
self.contexts.append(record)
def write_message(self, record) -> None:
self.messages.append(record)
def test_bind_context_writes_context_and_returns_trace_id():
transport = MemoryTraceTransport()
trace = TraceManager(transport=transport)
trace_id = trace.bind_context(alias="email-1", type="email", attrs={"message_id": "abc"})
assert trace_id
assert transport.contexts[0].trace_id == trace_id
assert transport.contexts[0].alias == "email-1"
assert transport.contexts[0].type == "email"
assert transport.contexts[0].attrs == {"message_id": "abc"}
def test_open_context_restores_parent_after_exit():
transport = MemoryTraceTransport()
trace = TraceManager(transport=transport)
parent_id = trace.bind_context(alias="email-1", type="email")
with trace.open_context(alias="order.xlsx", type="attachment") as child_id:
assert trace.current_trace_id() == child_id
assert trace.current_trace_id() == parent_id
def test_messages_use_current_step_and_attrs():
transport = MemoryTraceTransport()
trace = TraceManager(transport=transport)
trace.bind_context(alias="email-1", type="email")
trace.step("parse_email")
trace.info("Письмо распарсено", status="completed", attrs={"attachments_count": 2})
message = transport.messages[0]
assert message.step == "parse_email"
assert message.status == "completed"
assert message.level == "INFO"
assert message.message == "Письмо распарсено"
assert message.attrs == {"attachments_count": 2}
def test_bind_context_keeps_parent_empty_by_default():
transport = MemoryTraceTransport()
trace = TraceManager(transport=transport)
parent_id = trace.bind_context(alias="email-1", type="email")
child_id = trace.bind_context(alias="order.xlsx", type="attachment")
assert child_id != parent_id
assert transport.contexts[-1].parent_id is None
def test_bind_context_uses_explicit_parent_id():
transport = MemoryTraceTransport()
trace = TraceManager(transport=transport)
parent_id = trace.bind_context(alias="email-1", type="email")
child_id = trace.bind_context(alias="order.xlsx", type="attachment", parent_id=parent_id)
assert child_id != parent_id
assert transport.contexts[-1].parent_id == parent_id