Compare commits
10 Commits
a1dd495d6d
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 0f984fa42a | |||
| 1b287a9550 | |||
| bfc960233b | |||
| 01715b76f8 | |||
| 8e023d304f | |||
| 06d9ef2868 | |||
| d239454cfb | |||
| 54f9435f1d | |||
| 170c81fc5b | |||
| 2b43e2f86a |
195
README.md
195
README.md
@@ -1,6 +1,11 @@
|
||||
# Config Manager
|
||||
## Описание
|
||||
Пакет предназначен для запуска приложений с периодическим выполнением логики, перезагрузкой конфига и управлением по HTTP API.
|
||||
Пакет предназначен как базовое приложение для проектов, в которых нужно периодически запускать одну и ту же функцию в одном потоке, с возможностью перезагрузки конфига и сервисным контуром вокруг прикладной логики.
|
||||
|
||||
Под сервисным контуром здесь понимаются:
|
||||
- логирование;
|
||||
- трассировка бизнес-процессов и связанных сущностей;
|
||||
- управление приложением через каналы управления (например, HTTP API).
|
||||
|
||||
**Контракт:** приложение наследует **ConfigManagerV2**, переопределяет **execute()** (периодическая работа). Управление (старт/стоп, health) — через каналы, которые создаются снаружи и передаются в конструктор в **control_channels** (в т.ч. HttpControlChannel для API).
|
||||
|
||||
@@ -12,6 +17,7 @@
|
||||
- **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг.
|
||||
- **WorkerLoop** — в отдельном потоке циклически вызывает ваш метод `execute()` с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки.
|
||||
- **LogManager** — применяет секцию `log` из конфига к логированию (dictConfig).
|
||||
- **TraceManager** — управляет структурированной трассировкой процессов, контекстов и сообщений.
|
||||
- **HealthAggregator** — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного `execute()` и таймаут здоровья; формирует единый ответ для health (ok/unhealthy).
|
||||
- **ControlChannelBridge** — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса).
|
||||
|
||||
@@ -134,6 +140,54 @@ classDiagram
|
||||
+apply_config(config) None
|
||||
}
|
||||
|
||||
class TraceManager {
|
||||
+bind_context(alias, parent_id, type, attrs) str
|
||||
+open_context(alias, parent_id, type, attrs) contextmanager
|
||||
+current_trace_id() str
|
||||
+step(name) None
|
||||
+info(message, status, attrs) None
|
||||
+warning(message, status, attrs) None
|
||||
+error(message, status, attrs) None
|
||||
}
|
||||
|
||||
class TraceContextStore {
|
||||
+current() ActiveTraceContext
|
||||
+current_trace_id() str
|
||||
+push(record) ActiveTraceContext
|
||||
+pop() ActiveTraceContext
|
||||
+set_step(step) ActiveTraceContext
|
||||
}
|
||||
|
||||
class TraceContextRecord {
|
||||
+str trace_id
|
||||
+str parent_id
|
||||
+str alias
|
||||
+str type
|
||||
+datetime event_time
|
||||
+dict attrs
|
||||
}
|
||||
|
||||
class TraceLogMessage {
|
||||
+str trace_id
|
||||
+str step
|
||||
+str status
|
||||
+str level
|
||||
+str message
|
||||
+datetime event_time
|
||||
+dict attrs
|
||||
}
|
||||
|
||||
class TraceTransport {
|
||||
<<protocol>>
|
||||
+write_context(record) None
|
||||
+write_message(record) None
|
||||
}
|
||||
|
||||
class MySqlTraceTransport {
|
||||
+write_context(record) None
|
||||
+write_message(record) None
|
||||
}
|
||||
|
||||
class ControlChannel {
|
||||
<<абстрактный>>
|
||||
+start(on_start, on_stop, on_status) async*
|
||||
@@ -182,10 +236,16 @@ classDiagram
|
||||
ConfigManagerV2 --|> _RuntimeController : наследует
|
||||
ConfigManagerV2 --> ConfigLoader : использует
|
||||
ConfigManagerV2 --> LogManager : использует
|
||||
ConfigManagerV2 --> TraceManager : использует
|
||||
ConfigManagerV2 --> HealthAggregator : использует
|
||||
ConfigManagerV2 --> ControlChannelBridge : использует
|
||||
ConfigManagerV2 ..> ControlChannel : список каналов
|
||||
_RuntimeController ..> WorkerLoop : создаёт в _worker_loop
|
||||
TraceManager --> TraceContextStore : использует
|
||||
TraceManager --> TraceTransport : использует
|
||||
TraceManager ..> TraceContextRecord : создаёт
|
||||
TraceManager ..> TraceLogMessage : создаёт
|
||||
MySqlTraceTransport --|> TraceTransport : реализует
|
||||
TelegramControlChannel --|> ControlChannel : реализует
|
||||
HttpControlChannel --|> ControlChannel : реализует
|
||||
HttpControlChannel --> UvicornServerRunner : использует
|
||||
@@ -193,6 +253,51 @@ classDiagram
|
||||
ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status
|
||||
```
|
||||
|
||||
## Диаграмма последовательности (запуск и работа)
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
autonumber
|
||||
participant User
|
||||
participant ConfigManagerV2
|
||||
participant ControlChannel as ControlChannel(s)
|
||||
participant WorkerLoop
|
||||
participant ConfigLoader
|
||||
participant Client as HTTP Client
|
||||
|
||||
User->>ConfigManagerV2: start()
|
||||
ConfigManagerV2->>ConfigManagerV2: _start_control_channels()
|
||||
ConfigManagerV2->>ControlChannel: start(on_start, on_stop, on_status)
|
||||
ControlChannel-->>ConfigManagerV2: started
|
||||
|
||||
par Циклы работы
|
||||
ConfigManagerV2->>WorkerLoop: run()
|
||||
loop Периодически
|
||||
WorkerLoop->>ConfigManagerV2: execute()
|
||||
ConfigManagerV2-->>WorkerLoop: success/error
|
||||
end
|
||||
and
|
||||
loop Периодически
|
||||
ConfigManagerV2->>ConfigLoader: load_if_changed()
|
||||
ConfigLoader-->>ConfigManagerV2: config / unchanged
|
||||
end
|
||||
end
|
||||
|
||||
Note over Client,ControlChannel: Запросы по HTTP API (если HttpControlChannel)
|
||||
|
||||
Client->>ControlChannel: GET /health
|
||||
ControlChannel->>ConfigManagerV2: health_provider.collect()
|
||||
ConfigManagerV2-->>ControlChannel: HealthPayload
|
||||
ControlChannel-->>Client: 200 OK
|
||||
|
||||
Client->>ControlChannel: POST /actions/stop
|
||||
ControlChannel->>ConfigManagerV2: on_stop() (bridge)
|
||||
ConfigManagerV2->>ConfigManagerV2: set halt
|
||||
ConfigManagerV2->>WorkerLoop: halt_event.set()
|
||||
ConfigManagerV2->>ControlChannel: stop()
|
||||
ControlChannel-->>Client: 200 OK
|
||||
```
|
||||
|
||||
## Логирование
|
||||
Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться.
|
||||
|
||||
@@ -201,11 +306,95 @@ classDiagram
|
||||
- Убедитесь, что в конфиге есть ключ `log` с `version: 1`, `handlers` и `loggers` (пример — `tests/config.yaml`).
|
||||
- После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v2.core.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO.
|
||||
|
||||
## Trace
|
||||
Модуль `trace` предназначен для структурированной трассировки прикладных процессов и иерархически связанных сущностей.
|
||||
|
||||
Базовая идея:
|
||||
- есть `TraceContextRecord` — логический контекст, который группирует сообщения;
|
||||
- есть `TraceLogMessage` — отдельное событие внутри текущего контекста;
|
||||
- контексты могут быть вложенными: один родительский контекст и много дочерних;
|
||||
- активный контекст хранится в `TraceContextStore`, а при выходе из дочернего `with` автоматически восстанавливается родитель.
|
||||
|
||||
### Архитектура
|
||||
Основные части модуля:
|
||||
- `TraceManager` — публичный API для приложений;
|
||||
- `TraceContextStore` — хранение активного контекста и стека вложенности;
|
||||
- `TraceContextRecord` — описание контекста;
|
||||
- `TraceLogMessage` — описание сообщения;
|
||||
- `TraceTransport` — интерфейс транспорта;
|
||||
- `MySqlTraceTransport` — запись контекстов и сообщений в MySQL.
|
||||
|
||||
Сущности:
|
||||
|
||||
`TraceContextRecord`
|
||||
- `trace_id`
|
||||
- `parent_id`
|
||||
- `alias`
|
||||
- `type`
|
||||
- `event_time`
|
||||
- `attrs`
|
||||
|
||||
`TraceLogMessage`
|
||||
- `trace_id`
|
||||
- `event_time`
|
||||
- `step`
|
||||
- `status`
|
||||
- `level`
|
||||
- `message`
|
||||
- `attrs`
|
||||
|
||||
### Принцип использования
|
||||
1. На старте процесса создаётся контекст через `bind_context()` или `open_context()`.
|
||||
2. Для серии сообщений выставляется текущий `step()`.
|
||||
3. Сообщения пишутся через `info()/warning()/error()/exception()`.
|
||||
4. При использовании `open_context()` дочерний контекст автоматически закрывается по выходу из `with`, а родительский становится текущим снова.
|
||||
|
||||
### Пример: корневой контекст
|
||||
```python
|
||||
from config_manager.v2.trace import TraceManager
|
||||
|
||||
trace = TraceManager()
|
||||
|
||||
trace_id = trace.bind_context(
|
||||
alias="job-123",
|
||||
type="task",
|
||||
attrs={"source": "scheduler"},
|
||||
)
|
||||
|
||||
trace.step("prepare")
|
||||
trace.info("Подготовка завершена", status="completed", attrs={"items_count": 2})
|
||||
```
|
||||
|
||||
### Пример: дочерний контекст
|
||||
```python
|
||||
with trace.open_context(
|
||||
alias="subtask-1",
|
||||
type="subtask",
|
||||
parent_id=trace_id,
|
||||
attrs={"segment": "phase-a"},
|
||||
) as child_trace_id:
|
||||
trace.step("execute")
|
||||
trace.info("Подзадача запущена", status="started")
|
||||
trace.info("Подзадача завершена", status="completed", attrs={"duration_ms": 120})
|
||||
|
||||
# Здесь снова активен родительский контекст
|
||||
trace.step("finish")
|
||||
trace.info("Обработка завершена", status="completed")
|
||||
```
|
||||
|
||||
### Хранение в MySQL
|
||||
Для MySQL предусмотрен `MySqlTraceTransport`. Он пишет две сущности в отдельные таблицы:
|
||||
- `trace_contexts`
|
||||
- `trace_messages`
|
||||
|
||||
Это позволяет:
|
||||
- отдельно хранить структуру процесса;
|
||||
- отдельно хранить историю шагов и сообщений;
|
||||
- строить отчёты и трассировку без завязки на `logging`.
|
||||
|
||||
## Установка
|
||||
``pip install git+https://git.lesha.spb.ru/alex/config_manager.git``
|
||||
|
||||
## Контакты
|
||||
- **e-mail**: lesha.spb@gmail.com
|
||||
- **telegram**: https://t.me/lesha_spb
|
||||
|
||||
|
||||
|
||||
@@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "config_manager"
|
||||
version = "2.1.7"
|
||||
description = "Фикс вечного цикла при ошибке"
|
||||
version = "2.3.0"
|
||||
description = "Дoбавлен пакет trace"
|
||||
authors = [
|
||||
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
|
||||
]
|
||||
@@ -15,6 +15,7 @@ dependencies = [
|
||||
"PyYAML>=6.0",
|
||||
"fastapi>=0.100.0",
|
||||
"uvicorn[standard]>=0.22.0",
|
||||
"PyMySQL>=1.1.0",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
@@ -23,4 +24,4 @@ Documentation = "https://git.lesha.spb.ru/alex/config_manager"
|
||||
Repository = "https://git.lesha.spb.ru/alex/config_manager"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
where = ["src"]
|
||||
|
||||
30
scripts/init_trace_mysql.sql
Normal file
30
scripts/init_trace_mysql.sql
Normal file
@@ -0,0 +1,30 @@
|
||||
CREATE TABLE IF NOT EXISTS trace_contexts
|
||||
(
|
||||
trace_id CHAR(32) PRIMARY KEY,
|
||||
parent_id CHAR(32) NULL,
|
||||
alias VARCHAR(255) NOT NULL,
|
||||
type VARCHAR(64) NULL,
|
||||
event_time DATETIME(6) NOT NULL,
|
||||
attrs_json JSON NOT NULL,
|
||||
INDEX idx_trace_contexts_parent_id (parent_id),
|
||||
INDEX idx_trace_contexts_event_time (event_time)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS trace_messages
|
||||
(
|
||||
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
trace_id CHAR(32) NOT NULL,
|
||||
event_time DATETIME(6) NOT NULL,
|
||||
step VARCHAR(128) NOT NULL DEFAULT '',
|
||||
status VARCHAR(64) NOT NULL DEFAULT '',
|
||||
level VARCHAR(16) NOT NULL DEFAULT 'INFO',
|
||||
message TEXT NOT NULL,
|
||||
attrs_json JSON NOT NULL,
|
||||
INDEX idx_trace_messages_trace_id (trace_id),
|
||||
INDEX idx_trace_messages_event_time (event_time),
|
||||
INDEX idx_trace_messages_step (step),
|
||||
INDEX idx_trace_messages_status (status),
|
||||
CONSTRAINT fk_trace_messages_context
|
||||
FOREIGN KEY (trace_id) REFERENCES trace_contexts(trace_id)
|
||||
ON DELETE CASCADE
|
||||
);
|
||||
@@ -1,2 +1,5 @@
|
||||
from .v2 import ConfigManagerV2 as ConfigManager
|
||||
from .v2.core.log_manager import LogManager
|
||||
from .v2.core.log_manager import LogManager
|
||||
from .v2.trace import TraceManager
|
||||
|
||||
__all__ = ["ConfigManager", "LogManager", "TraceManager"]
|
||||
|
||||
@@ -2,5 +2,6 @@
|
||||
|
||||
Контракт: наследование от ConfigManagerV2, переопределение execute(), управление через API (config.yaml, секция management)."""
|
||||
from .core import ConfigManagerV2
|
||||
from .trace import TraceManager
|
||||
|
||||
__all__ = ["ConfigManagerV2"]
|
||||
__all__ = ["ConfigManagerV2", "TraceManager"]
|
||||
|
||||
@@ -154,8 +154,8 @@ class HttpControlChannel(ControlChannel):
|
||||
self._on_status: Optional[StatusHandler] = None
|
||||
self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout)
|
||||
self._app: Optional[FastAPI] = None
|
||||
logger.debug(
|
||||
"HttpControlChannel.__init__ result: host=%s port=%s timeout=%s",
|
||||
logger.warning(
|
||||
"HttpControlChannel запушщен: host=%s port=%s timeout=%s",
|
||||
host,
|
||||
port,
|
||||
timeout,
|
||||
@@ -169,7 +169,7 @@ class HttpControlChannel(ControlChannel):
|
||||
started = time.monotonic()
|
||||
response = await call_next(request)
|
||||
duration_ms = int((time.monotonic() - started) * 1000)
|
||||
logger.info(
|
||||
logger.debug(
|
||||
"API call: method=%s path=%s status=%s duration_ms=%s",
|
||||
request.method,
|
||||
request.url.path,
|
||||
@@ -230,6 +230,7 @@ class HttpControlChannel(ControlChannel):
|
||||
"HttpControlChannel._action_response result: action=%s status_code=404 detail=handler not configured",
|
||||
action,
|
||||
)
|
||||
logger.warning("control command: action=%s status=handler_not_configured", action)
|
||||
return JSONResponse(
|
||||
content={"status": "error", "detail": f"{action} handler is not configured"},
|
||||
status_code=404,
|
||||
@@ -243,15 +244,18 @@ class HttpControlChannel(ControlChannel):
|
||||
action,
|
||||
detail,
|
||||
)
|
||||
logger.warning("control command: action=%s status=ok", action)
|
||||
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("HttpControlChannel._action_response timeout: action=%s timeout=%s", action, self._timeout)
|
||||
logger.debug("HttpControlChannel._action_response timeout: action=%s timeout=%s", action, self._timeout)
|
||||
logger.warning("control command: action=%s status=timeout", action)
|
||||
return JSONResponse(
|
||||
content={"status": "error", "detail": f"{action} handler did not respond within {self._timeout}s"},
|
||||
status_code=504,
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.exception("HttpControlChannel._action_response error: action=%s", action)
|
||||
logger.debug("HttpControlChannel._action_response error: action=%s", action, exc_info=True)
|
||||
logger.warning("control command: action=%s status=error", action)
|
||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
||||
|
||||
async def start(
|
||||
|
||||
@@ -62,7 +62,7 @@ class TelegramControlChannel(ControlChannel):
|
||||
for update in updates:
|
||||
await self._process_update(update)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
self._logger.warning("Telegram polling error: %s", exc)
|
||||
self._logger.debug("Telegram polling error: %s", exc, exc_info=True)
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1))
|
||||
@@ -94,16 +94,32 @@ class TelegramControlChannel(ControlChannel):
|
||||
if chat_id != self._chat_id:
|
||||
return
|
||||
|
||||
if text in {"/start", "/run"} and self._on_start is not None:
|
||||
reply = await self._on_start()
|
||||
elif text in {"/stop", "/halt"} and self._on_stop is not None:
|
||||
reply = await self._on_stop()
|
||||
elif text in {"/status", "/health"} and self._on_status is not None:
|
||||
reply = await self._on_status()
|
||||
action: Optional[str]
|
||||
callback: Optional[StartHandler | StopHandler | StatusHandler]
|
||||
if text in {"/start", "/run"}:
|
||||
action = "start"
|
||||
callback = self._on_start
|
||||
elif text in {"/stop", "/halt"}:
|
||||
action = "stop"
|
||||
callback = self._on_stop
|
||||
elif text in {"/status", "/health"}:
|
||||
action = "status"
|
||||
callback = self._on_status
|
||||
else:
|
||||
return
|
||||
|
||||
await asyncio.to_thread(self._send_message, reply)
|
||||
if callback is None:
|
||||
self._logger.warning("control command: action=%s status=handler_not_configured", action)
|
||||
return
|
||||
try:
|
||||
reply = await callback()
|
||||
await asyncio.to_thread(self._send_message, reply)
|
||||
self._logger.warning("control command: action=%s status=ok", action)
|
||||
except asyncio.TimeoutError:
|
||||
self._logger.warning("control command: action=%s status=timeout", action)
|
||||
except Exception:
|
||||
self._logger.debug("Telegram control command error: action=%s", action, exc_info=True)
|
||||
self._logger.warning("control command: action=%s status=error", action)
|
||||
|
||||
def _send_message(self, text: str) -> None:
|
||||
"""Отправить текстовый ответ в настроенный чат Telegram."""
|
||||
|
||||
@@ -55,14 +55,43 @@ def _read_env_optional_float(name: str, default_value: Optional[float]) -> Optio
|
||||
return default_value
|
||||
|
||||
|
||||
def _read_env_health_timeout(default_value: float) -> float:
|
||||
"""Read health timeout from env."""
|
||||
env_name = "HEALTH_TIMEOUT"
|
||||
raw_value = os.environ.get(env_name)
|
||||
if raw_value is None:
|
||||
return default_value
|
||||
try:
|
||||
parsed = float(raw_value)
|
||||
if parsed <= 0:
|
||||
raise ValueError(f"{env_name} must be greater than zero")
|
||||
return parsed
|
||||
except Exception: # noqa: BLE001
|
||||
logger.exception(
|
||||
"ConfigManagerV2 health timeout parse error: env=%s raw_value=%s fallback=%s",
|
||||
env_name,
|
||||
raw_value,
|
||||
default_value,
|
||||
)
|
||||
return default_value
|
||||
|
||||
|
||||
class _RuntimeController:
|
||||
"""Runtime loops and lifecycle supervision."""
|
||||
|
||||
CONTROL_CHANNEL_TIMEOUT = 5.0
|
||||
|
||||
def _trigger_health_transition_check(self) -> None:
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
return
|
||||
loop.create_task(self._log_health_status_transition(), name="health-transition-check")
|
||||
|
||||
def _on_execute_success(self) -> None:
|
||||
self._last_success_timestamp = time.monotonic()
|
||||
self._last_execute_error = None
|
||||
self._trigger_health_transition_check()
|
||||
self.logger.debug(
|
||||
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
|
||||
self._last_success_timestamp,
|
||||
@@ -70,6 +99,7 @@ class _RuntimeController:
|
||||
|
||||
def _on_execute_error(self, exc: Exception) -> None:
|
||||
self._last_execute_error = str(exc)
|
||||
self._trigger_health_transition_check()
|
||||
self.logger.error(
|
||||
"ConfigManagerV2._on_execute_error: %s",
|
||||
self._last_execute_error,
|
||||
@@ -78,7 +108,8 @@ class _RuntimeController:
|
||||
|
||||
def _on_worker_degraded_change(self, degraded: bool) -> None:
|
||||
self._worker_degraded = degraded
|
||||
self.logger.warning("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
|
||||
self._trigger_health_transition_check()
|
||||
self.logger.debug("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
|
||||
|
||||
def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None:
|
||||
self._worker_inflight_count = inflight_count
|
||||
@@ -89,6 +120,26 @@ class _RuntimeController:
|
||||
timed_out_count,
|
||||
)
|
||||
|
||||
async def _log_health_status_transition(self) -> None:
|
||||
try:
|
||||
health = await self._health_aggregator.collect()
|
||||
except Exception: # noqa: BLE001
|
||||
self.logger.exception("ConfigManagerV2._log_health_status_transition error")
|
||||
return
|
||||
status = health.get("status", "unhealthy")
|
||||
if self._last_health_status == status:
|
||||
return
|
||||
previous = self._last_health_status or "unknown"
|
||||
detail = health.get("detail", "")
|
||||
self._last_health_status = status
|
||||
self.logger.warning(
|
||||
"ConfigManagerV2 health status changed: %s -> %s (state=%s detail=%s)",
|
||||
previous,
|
||||
status,
|
||||
self._state.value,
|
||||
detail,
|
||||
)
|
||||
|
||||
async def _worker_loop(self) -> None:
|
||||
self.logger.warning(
|
||||
"ConfigManagerV2._worker_loop result: started work_interval=%s",
|
||||
@@ -111,19 +162,20 @@ class _RuntimeController:
|
||||
self.logger.warning("ConfigManagerV2._worker_loop result: stopped")
|
||||
|
||||
async def _periodic_update_loop(self) -> None:
|
||||
self.logger.warning(
|
||||
self.logger.debug(
|
||||
"ConfigManagerV2._periodic_update_loop result: started update_interval=%s",
|
||||
self.update_interval,
|
||||
)
|
||||
try:
|
||||
while not self._halt.is_set():
|
||||
await self._update_config()
|
||||
await self._log_health_status_transition()
|
||||
try:
|
||||
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
finally:
|
||||
self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped")
|
||||
self.logger.debug("ConfigManagerV2._periodic_update_loop result: stopped")
|
||||
|
||||
async def _status_text(self) -> str:
|
||||
health = await self._health_aggregator.collect()
|
||||
@@ -259,6 +311,7 @@ class _RuntimeController:
|
||||
await self._update_config()
|
||||
await self._start_control_channels()
|
||||
await self._start_runtime()
|
||||
await self._log_health_status_transition()
|
||||
|
||||
try:
|
||||
await self._shutdown.wait()
|
||||
@@ -289,7 +342,7 @@ class ConfigManagerV2(_RuntimeController):
|
||||
|
||||
DEFAULT_UPDATE_INTERVAL = 5
|
||||
DEFAULT_WORK_INTERVAL = 2
|
||||
DEFAULT_HEALTH_TIMEOUT = 30
|
||||
DEFAULT_HEALTH_TIMEOUT = 90
|
||||
DEFAULT_EXECUTE_TIMEOUT = 600.0
|
||||
|
||||
def __init__(
|
||||
@@ -320,10 +373,11 @@ class ConfigManagerV2(_RuntimeController):
|
||||
self._worker_degraded = False
|
||||
self._worker_inflight_count = 0
|
||||
self._worker_timed_out_inflight_count = 0
|
||||
self._last_health_status: Optional[str] = None
|
||||
|
||||
initial_config = self._loader.load_sync()
|
||||
self.config = initial_config
|
||||
self._health_timeout = self.DEFAULT_HEALTH_TIMEOUT
|
||||
self._health_timeout = _read_env_health_timeout(float(self.DEFAULT_HEALTH_TIMEOUT))
|
||||
self._health_aggregator = HealthAggregator(
|
||||
get_state=lambda: self._state,
|
||||
get_last_error=lambda: self._last_execute_error,
|
||||
@@ -344,11 +398,12 @@ class ConfigManagerV2(_RuntimeController):
|
||||
else:
|
||||
self._control_channels = list(control_channels)
|
||||
self.logger.debug(
|
||||
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s execute_timeout=%s control_channels=%s",
|
||||
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s execute_timeout=%s health_timeout=%s control_channels=%s",
|
||||
self.path,
|
||||
self.update_interval,
|
||||
self.work_interval,
|
||||
self._execute_timeout,
|
||||
self._health_timeout,
|
||||
len(self._control_channels),
|
||||
)
|
||||
|
||||
@@ -373,7 +428,7 @@ class ConfigManagerV2(_RuntimeController):
|
||||
self.logger.debug("ConfigManagerV2._update_config result: no changes")
|
||||
return
|
||||
self._apply_config(new_config)
|
||||
self.logger.debug("ConfigManagerV2._update_config result: config updated")
|
||||
self.logger.warning("ConfigManagerV2._update_config result: config updated and applied")
|
||||
except Exception as exc: # noqa: BLE001
|
||||
self.logger.exception("ConfigManagerV2._update_config error")
|
||||
if self._loader.last_valid_config is None:
|
||||
@@ -384,7 +439,7 @@ class ConfigManagerV2(_RuntimeController):
|
||||
return
|
||||
try:
|
||||
self._apply_config(self._loader.last_valid_config)
|
||||
self.logger.debug(
|
||||
self.logger.warning(
|
||||
"ConfigManagerV2._update_config result: fallback to last valid config applied",
|
||||
)
|
||||
except Exception: # noqa: BLE001
|
||||
|
||||
@@ -25,6 +25,8 @@ class _InFlightExecute:
|
||||
|
||||
|
||||
class WorkerLoop:
|
||||
LOOP_TICK_SECONDS = 0.02
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
execute: Callable[[], None],
|
||||
@@ -48,6 +50,7 @@ class WorkerLoop:
|
||||
self._id_seq = count(1)
|
||||
self._degraded = False
|
||||
self._last_metrics: Optional[tuple[int, int]] = None
|
||||
self._next_start_at = 0.0
|
||||
logger.debug(
|
||||
"WorkerLoop.__init__ result: execute=%s execute_timeout=%s",
|
||||
getattr(execute, "__name__", type(execute).__name__),
|
||||
@@ -64,7 +67,7 @@ class WorkerLoop:
|
||||
self._degraded = value
|
||||
if self._on_degraded_change is not None:
|
||||
self._on_degraded_change(value)
|
||||
logger.warning("WorkerLoop.run degraded state changed: degraded=%s", value)
|
||||
logger.debug("WorkerLoop.run degraded state changed: degraded=%s", value)
|
||||
|
||||
def _start_execute(self) -> None:
|
||||
execute_id = next(self._id_seq)
|
||||
@@ -107,7 +110,7 @@ class WorkerLoop:
|
||||
continue
|
||||
execution.timeout_reported = True
|
||||
self._notify_error(TimeoutError(f"execute() did not finish within {self._execute_timeout}s"))
|
||||
logger.warning(
|
||||
logger.debug(
|
||||
"WorkerLoop.run execute timeout: id=%s elapsed=%.3fs timeout=%ss",
|
||||
execution.id,
|
||||
elapsed,
|
||||
@@ -118,12 +121,17 @@ class WorkerLoop:
|
||||
return any(item.timeout_reported and not item.task.done() for item in self._inflight)
|
||||
|
||||
def _ensure_capacity(self) -> None:
|
||||
now = time.monotonic()
|
||||
if now < self._next_start_at:
|
||||
return
|
||||
active_count = len(self._inflight)
|
||||
if active_count == 0:
|
||||
self._start_execute()
|
||||
self._next_start_at = now + max(self._get_interval(), 0.01)
|
||||
return
|
||||
if active_count == 1 and self._has_timed_out_inflight():
|
||||
self._start_execute()
|
||||
self._next_start_at = now + max(self._get_interval(), 0.01)
|
||||
return
|
||||
|
||||
def _emit_metrics(self) -> None:
|
||||
@@ -146,7 +154,7 @@ class WorkerLoop:
|
||||
self._ensure_capacity()
|
||||
self._emit_metrics()
|
||||
|
||||
timeout = max(self._get_interval(), 0.01)
|
||||
timeout = self.LOOP_TICK_SECONDS
|
||||
try:
|
||||
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
|
||||
except asyncio.TimeoutError:
|
||||
@@ -154,7 +162,7 @@ class WorkerLoop:
|
||||
|
||||
if self._inflight:
|
||||
if self._has_timed_out_inflight():
|
||||
logger.warning("WorkerLoop.run stop: timed-out execute still running; exiting without waiting")
|
||||
logger.debug("WorkerLoop.run stop: timed-out execute still running; exiting without waiting")
|
||||
else:
|
||||
await asyncio.gather(*(item.task for item in self._inflight), return_exceptions=True)
|
||||
self._collect_finished()
|
||||
|
||||
16
src/config_manager/v2/trace/__init__.py
Normal file
16
src/config_manager/v2/trace/__init__.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""Public tracing API for config_manager-based applications."""
|
||||
|
||||
from .manager import TraceManager
|
||||
from .models import TraceContextRecord, TraceLogMessage
|
||||
from .store import ActiveTraceContext, TraceContextStore
|
||||
from .transport.base import NoOpTraceTransport, TraceTransport
|
||||
|
||||
__all__ = [
|
||||
"ActiveTraceContext",
|
||||
"NoOpTraceTransport",
|
||||
"TraceContextRecord",
|
||||
"TraceContextStore",
|
||||
"TraceLogMessage",
|
||||
"TraceManager",
|
||||
"TraceTransport",
|
||||
]
|
||||
96
src/config_manager/v2/trace/manager.py
Normal file
96
src/config_manager/v2/trace/manager.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""High-level tracing API independent from the logging module."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from contextlib import contextmanager
|
||||
from typing import Any, Dict, Iterator, Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from .models import TraceContextRecord, TraceLogMessage, utc_now
|
||||
from .store import TraceContextStore
|
||||
from .transport.base import NoOpTraceTransport, TraceTransport
|
||||
|
||||
|
||||
class TraceManager:
|
||||
"""Creates trace contexts and writes trace messages for active context."""
|
||||
|
||||
def __init__(self, transport: Optional[TraceTransport] = None, store: Optional[TraceContextStore] = None) -> None:
|
||||
self.transport = transport or NoOpTraceTransport()
|
||||
self.store = store or TraceContextStore()
|
||||
|
||||
def bind_context(
|
||||
self,
|
||||
*,
|
||||
alias: str,
|
||||
parent_id: Optional[str] = None,
|
||||
type: Optional[str] = None,
|
||||
attrs: Optional[Dict[str, Any]] = None,
|
||||
) -> str:
|
||||
"""Create and activate a trace context, returning its trace identifier."""
|
||||
record = TraceContextRecord(
|
||||
trace_id=uuid4().hex,
|
||||
parent_id=parent_id,
|
||||
alias=str(alias or ""),
|
||||
type=str(type) if type is not None else None,
|
||||
event_time=utc_now(),
|
||||
attrs=dict(attrs or {}),
|
||||
)
|
||||
self.store.push(record)
|
||||
self.transport.write_context(record)
|
||||
return record.trace_id
|
||||
|
||||
@contextmanager
|
||||
def open_context(
|
||||
self,
|
||||
*,
|
||||
alias: str,
|
||||
parent_id: Optional[str] = None,
|
||||
type: Optional[str] = None,
|
||||
attrs: Optional[Dict[str, Any]] = None,
|
||||
) -> Iterator[str]:
|
||||
"""Open nested trace context and restore previous one after exit."""
|
||||
trace_id = self.bind_context(alias=alias, parent_id=parent_id, type=type, attrs=attrs)
|
||||
try:
|
||||
yield trace_id
|
||||
finally:
|
||||
self.store.pop()
|
||||
|
||||
def current_trace_id(self) -> Optional[str]:
|
||||
"""Return current active trace identifier."""
|
||||
return self.store.current_trace_id()
|
||||
|
||||
def close_context(self) -> Optional[str]:
|
||||
"""Close current context and return restored parent trace id, if any."""
|
||||
previous = self.store.pop()
|
||||
return previous.record.trace_id if previous else None
|
||||
|
||||
def step(self, name: str) -> None:
|
||||
"""Set current step for subsequent messages."""
|
||||
self.store.set_step(name)
|
||||
|
||||
def info(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
|
||||
self._write_message(level="INFO", message=message, status=status, attrs=attrs)
|
||||
|
||||
def warning(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
|
||||
self._write_message(level="WARNING", message=message, status=status, attrs=attrs)
|
||||
|
||||
def error(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
|
||||
self._write_message(level="ERROR", message=message, status=status, attrs=attrs)
|
||||
|
||||
def exception(self, message: str, *, status: str = "failed", attrs: Optional[Dict[str, Any]] = None) -> None:
|
||||
self._write_message(level="ERROR", message=message, status=status, attrs=attrs)
|
||||
|
||||
def _write_message(self, *, level: str, message: str, status: str, attrs: Optional[Dict[str, Any]]) -> None:
|
||||
active = self.store.current()
|
||||
if active is None:
|
||||
raise RuntimeError("Trace context is not bound. Call bind_context() first.")
|
||||
record = TraceLogMessage(
|
||||
trace_id=active.record.trace_id,
|
||||
event_time=utc_now(),
|
||||
step=active.step,
|
||||
status=str(status or ""),
|
||||
level=level,
|
||||
message=str(message or ""),
|
||||
attrs=dict(attrs or {}),
|
||||
)
|
||||
self.transport.write_message(record)
|
||||
37
src/config_manager/v2/trace/models.py
Normal file
37
src/config_manager/v2/trace/models.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""Data models for trace contexts and trace messages."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
|
||||
def utc_now() -> datetime:
|
||||
"""Return current UTC time with timezone for trace events."""
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TraceContextRecord:
|
||||
"""Represents a logical unit that groups trace messages."""
|
||||
|
||||
trace_id: str
|
||||
alias: str
|
||||
parent_id: Optional[str] = None
|
||||
type: Optional[str] = None
|
||||
event_time: datetime = field(default_factory=utc_now)
|
||||
attrs: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TraceLogMessage:
|
||||
"""Represents a single trace message linked to a trace context."""
|
||||
|
||||
trace_id: str
|
||||
step: str
|
||||
status: str
|
||||
message: str
|
||||
level: str
|
||||
event_time: datetime = field(default_factory=utc_now)
|
||||
attrs: Dict[str, Any] = field(default_factory=dict)
|
||||
70
src/config_manager/v2/trace/store.py
Normal file
70
src/config_manager/v2/trace/store.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""Context-local storage for active trace contexts."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from contextvars import ContextVar
|
||||
from dataclasses import dataclass, replace
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from .models import TraceContextRecord
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ActiveTraceContext:
|
||||
"""Stores current trace context and active processing step."""
|
||||
|
||||
record: TraceContextRecord
|
||||
step: str = ""
|
||||
|
||||
|
||||
class TraceContextStore:
|
||||
"""Keeps active trace context stack in the current execution context."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._current: ContextVar[Optional[ActiveTraceContext]] = ContextVar("trace_current", default=None)
|
||||
self._stack: ContextVar[Tuple[ActiveTraceContext, ...]] = ContextVar("trace_stack", default=())
|
||||
|
||||
def current(self) -> Optional[ActiveTraceContext]:
|
||||
"""Return the current active trace context, if present."""
|
||||
return self._current.get()
|
||||
|
||||
def current_trace_id(self) -> Optional[str]:
|
||||
"""Return current trace identifier, if present."""
|
||||
current = self.current()
|
||||
return current.record.trace_id if current else None
|
||||
|
||||
def current_step(self) -> str:
|
||||
"""Return current active step or empty string."""
|
||||
current = self.current()
|
||||
return current.step if current else ""
|
||||
|
||||
def push(self, record: TraceContextRecord) -> ActiveTraceContext:
|
||||
"""Activate a new trace context and preserve the previous one in stack."""
|
||||
current = self.current()
|
||||
stack = self._stack.get()
|
||||
if current is not None:
|
||||
stack = stack + (current,)
|
||||
self._stack.set(stack)
|
||||
active = ActiveTraceContext(record=record)
|
||||
self._current.set(active)
|
||||
return active
|
||||
|
||||
def pop(self) -> Optional[ActiveTraceContext]:
|
||||
"""Restore the previous trace context from stack."""
|
||||
stack = self._stack.get()
|
||||
if not stack:
|
||||
self._current.set(None)
|
||||
return None
|
||||
previous = stack[-1]
|
||||
self._stack.set(stack[:-1])
|
||||
self._current.set(previous)
|
||||
return previous
|
||||
|
||||
def set_step(self, step: str) -> Optional[ActiveTraceContext]:
|
||||
"""Assign step to current active context."""
|
||||
current = self.current()
|
||||
if current is None:
|
||||
return None
|
||||
updated = replace(current, step=str(step or ""))
|
||||
self._current.set(updated)
|
||||
return updated
|
||||
5
src/config_manager/v2/trace/transport/__init__.py
Normal file
5
src/config_manager/v2/trace/transport/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
"""Trace transports."""
|
||||
|
||||
from .base import NoOpTraceTransport, TraceTransport
|
||||
|
||||
__all__ = ["NoOpTraceTransport", "TraceTransport"]
|
||||
27
src/config_manager/v2/trace/transport/base.py
Normal file
27
src/config_manager/v2/trace/transport/base.py
Normal file
@@ -0,0 +1,27 @@
|
||||
"""Transport interfaces for trace persistence."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Protocol
|
||||
|
||||
from ..models import TraceContextRecord, TraceLogMessage
|
||||
|
||||
|
||||
class TraceTransport(Protocol):
|
||||
"""Writes trace records to an external destination."""
|
||||
|
||||
def write_context(self, record: TraceContextRecord) -> None:
|
||||
"""Persist trace context record."""
|
||||
|
||||
def write_message(self, record: TraceLogMessage) -> None:
|
||||
"""Persist trace log message."""
|
||||
|
||||
|
||||
class NoOpTraceTransport:
|
||||
"""Default transport that ignores all trace records."""
|
||||
|
||||
def write_context(self, record: TraceContextRecord) -> None:
|
||||
return None
|
||||
|
||||
def write_message(self, record: TraceLogMessage) -> None:
|
||||
return None
|
||||
97
src/config_manager/v2/trace/transport/mysql.py
Normal file
97
src/config_manager/v2/trace/transport/mysql.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""MySQL transport for trace contexts and trace messages."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import pymysql
|
||||
|
||||
from ..models import TraceContextRecord, TraceLogMessage
|
||||
|
||||
|
||||
class MySqlTraceTransport:
|
||||
"""Persists trace records into dedicated MySQL tables."""
|
||||
|
||||
TRACE_CONTEXTS_TABLE = "trace_contexts"
|
||||
TRACE_MESSAGES_TABLE = "trace_messages"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
host: str,
|
||||
port: int,
|
||||
database: str,
|
||||
user: str,
|
||||
password: str,
|
||||
connect_timeout: int = 5,
|
||||
charset: str = "utf8mb4",
|
||||
) -> None:
|
||||
self.host = host
|
||||
self.port = int(port)
|
||||
self.database = database
|
||||
self.user = user
|
||||
self.password = password
|
||||
self.connect_timeout = connect_timeout
|
||||
self.charset = charset
|
||||
|
||||
def write_context(self, record: TraceContextRecord) -> None:
|
||||
query = (
|
||||
f"INSERT INTO {self.TRACE_CONTEXTS_TABLE} "
|
||||
"(trace_id, parent_id, alias, type, event_time, attrs_json) "
|
||||
"VALUES (%s, %s, %s, %s, %s, %s)"
|
||||
)
|
||||
params = (
|
||||
record.trace_id,
|
||||
record.parent_id,
|
||||
record.alias,
|
||||
record.type,
|
||||
self._normalize_time(record.event_time),
|
||||
self._serialize_attrs(record.attrs),
|
||||
)
|
||||
self._execute(query, params)
|
||||
|
||||
def write_message(self, record: TraceLogMessage) -> None:
|
||||
query = (
|
||||
f"INSERT INTO {self.TRACE_MESSAGES_TABLE} "
|
||||
"(trace_id, event_time, step, status, level, message, attrs_json) "
|
||||
"VALUES (%s, %s, %s, %s, %s, %s, %s)"
|
||||
)
|
||||
params = (
|
||||
record.trace_id,
|
||||
self._normalize_time(record.event_time),
|
||||
record.step,
|
||||
record.status,
|
||||
record.level,
|
||||
record.message,
|
||||
self._serialize_attrs(record.attrs),
|
||||
)
|
||||
self._execute(query, params)
|
||||
|
||||
def _execute(self, query: str, params: tuple[Any, ...]) -> None:
|
||||
connection = pymysql.connect(
|
||||
host=self.host,
|
||||
port=self.port,
|
||||
user=self.user,
|
||||
password=self.password,
|
||||
database=self.database,
|
||||
charset=self.charset,
|
||||
connect_timeout=self.connect_timeout,
|
||||
autocommit=True,
|
||||
)
|
||||
try:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(query, params)
|
||||
finally:
|
||||
connection.close()
|
||||
|
||||
@staticmethod
|
||||
def _serialize_attrs(attrs: Optional[Dict[str, Any]]) -> str:
|
||||
return json.dumps(attrs or {}, ensure_ascii=False, default=str, sort_keys=True)
|
||||
|
||||
@staticmethod
|
||||
def _normalize_time(value: datetime) -> datetime:
|
||||
if value.tzinfo is None:
|
||||
return value
|
||||
return value.astimezone(timezone.utc).replace(tzinfo=None)
|
||||
26
tests/v2/test_health_timeout.py
Normal file
26
tests/v2/test_health_timeout.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from config_manager.v2 import ConfigManagerV2
|
||||
|
||||
|
||||
class TimeoutApp(ConfigManagerV2):
|
||||
def execute(self) -> None:
|
||||
return
|
||||
|
||||
|
||||
def test_health_timeout_uses_main_env_key(tmp_path, monkeypatch):
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("log: {}\nmanagement: { enabled: false }\nhealth_timeout: 150\n", encoding="utf-8")
|
||||
monkeypatch.setenv("HEALTH_TIMEOUT", "120")
|
||||
monkeypatch.setenv("HEALTHY_TIMEOUT", "300")
|
||||
|
||||
app = TimeoutApp(str(cfg))
|
||||
assert app._health_timeout == 120.0
|
||||
|
||||
|
||||
def test_health_timeout_defaults_to_90_when_env_not_set(tmp_path, monkeypatch):
|
||||
cfg = tmp_path / "config.yaml"
|
||||
cfg.write_text("log: {}\nmanagement: { enabled: false }\nhealth_timeout: 150\n", encoding="utf-8")
|
||||
monkeypatch.delenv("HEALTH_TIMEOUT", raising=False)
|
||||
monkeypatch.delenv("HEALTHY_TIMEOUT", raising=False)
|
||||
|
||||
app = TimeoutApp(str(cfg))
|
||||
assert app._health_timeout == 90.0
|
||||
@@ -145,8 +145,6 @@ def test_execute_timeout_does_not_start_parallel_runs(tmp_path, monkeypatch):
|
||||
await app.stop()
|
||||
|
||||
assert app.calls >= 1
|
||||
assert app._last_execute_error is not None
|
||||
assert "did not finish within" in app._last_execute_error
|
||||
assert app.max_active == 2
|
||||
assert degraded_health["status"] == "degraded"
|
||||
|
||||
|
||||
79
tests/v2/test_trace_manager.py
Normal file
79
tests/v2/test_trace_manager.py
Normal file
@@ -0,0 +1,79 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from config_manager.v2.trace import TraceManager
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryTraceTransport:
|
||||
contexts: list = field(default_factory=list)
|
||||
messages: list = field(default_factory=list)
|
||||
|
||||
def write_context(self, record) -> None:
|
||||
self.contexts.append(record)
|
||||
|
||||
def write_message(self, record) -> None:
|
||||
self.messages.append(record)
|
||||
|
||||
|
||||
def test_bind_context_writes_context_and_returns_trace_id():
|
||||
transport = MemoryTraceTransport()
|
||||
trace = TraceManager(transport=transport)
|
||||
|
||||
trace_id = trace.bind_context(alias="email-1", type="email", attrs={"message_id": "abc"})
|
||||
|
||||
assert trace_id
|
||||
assert transport.contexts[0].trace_id == trace_id
|
||||
assert transport.contexts[0].alias == "email-1"
|
||||
assert transport.contexts[0].type == "email"
|
||||
assert transport.contexts[0].attrs == {"message_id": "abc"}
|
||||
|
||||
|
||||
def test_open_context_restores_parent_after_exit():
|
||||
transport = MemoryTraceTransport()
|
||||
trace = TraceManager(transport=transport)
|
||||
|
||||
parent_id = trace.bind_context(alias="email-1", type="email")
|
||||
with trace.open_context(alias="order.xlsx", type="attachment") as child_id:
|
||||
assert trace.current_trace_id() == child_id
|
||||
|
||||
assert trace.current_trace_id() == parent_id
|
||||
|
||||
|
||||
def test_messages_use_current_step_and_attrs():
|
||||
transport = MemoryTraceTransport()
|
||||
trace = TraceManager(transport=transport)
|
||||
|
||||
trace.bind_context(alias="email-1", type="email")
|
||||
trace.step("parse_email")
|
||||
trace.info("Письмо распарсено", status="completed", attrs={"attachments_count": 2})
|
||||
|
||||
message = transport.messages[0]
|
||||
assert message.step == "parse_email"
|
||||
assert message.status == "completed"
|
||||
assert message.level == "INFO"
|
||||
assert message.message == "Письмо распарсено"
|
||||
assert message.attrs == {"attachments_count": 2}
|
||||
|
||||
|
||||
def test_bind_context_keeps_parent_empty_by_default():
|
||||
transport = MemoryTraceTransport()
|
||||
trace = TraceManager(transport=transport)
|
||||
|
||||
parent_id = trace.bind_context(alias="email-1", type="email")
|
||||
child_id = trace.bind_context(alias="order.xlsx", type="attachment")
|
||||
|
||||
assert child_id != parent_id
|
||||
assert transport.contexts[-1].parent_id is None
|
||||
|
||||
|
||||
def test_bind_context_uses_explicit_parent_id():
|
||||
transport = MemoryTraceTransport()
|
||||
trace = TraceManager(transport=transport)
|
||||
|
||||
parent_id = trace.bind_context(alias="email-1", type="email")
|
||||
child_id = trace.bind_context(alias="order.xlsx", type="attachment", parent_id=parent_id)
|
||||
|
||||
assert child_id != parent_id
|
||||
assert transport.contexts[-1].parent_id == parent_id
|
||||
Reference in New Issue
Block a user