10 Commits

19 changed files with 793 additions and 35 deletions

195
README.md
View File

@@ -1,6 +1,11 @@
# Config Manager
## Описание
Пакет предназначен для запуска приложений с периодическим выполнением логики, перезагрузкой конфига и управлением по HTTP API.
Пакет предназначен как базовое приложение для проектов, в которых нужно периодически запускать одну и ту же функцию в одном потоке, с возможностью перезагрузки конфига и сервисным контуром вокруг прикладной логики.
Под сервисным контуром здесь понимаются:
- логирование;
- трассировка бизнес-процессов и связанных сущностей;
- управление приложением через каналы управления (например, HTTP API).
**Контракт:** приложение наследует **ConfigManagerV2**, переопределяет **execute()** (периодическая работа). Управление (старт/стоп, health) — через каналы, которые создаются снаружи и передаются в конструктор в **control_channels** (в т.ч. HttpControlChannel для API).
@@ -12,6 +17,7 @@
- **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг.
- **WorkerLoop** — в отдельном потоке циклически вызывает ваш метод `execute()` с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки.
- **LogManager** — применяет секцию `log` из конфига к логированию (dictConfig).
- **TraceManager** — управляет структурированной трассировкой процессов, контекстов и сообщений.
- **HealthAggregator** — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного `execute()` и таймаут здоровья; формирует единый ответ для health (ok/unhealthy).
- **ControlChannelBridge** — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса).
@@ -134,6 +140,54 @@ classDiagram
+apply_config(config) None
}
class TraceManager {
+bind_context(alias, parent_id, type, attrs) str
+open_context(alias, parent_id, type, attrs) contextmanager
+current_trace_id() str
+step(name) None
+info(message, status, attrs) None
+warning(message, status, attrs) None
+error(message, status, attrs) None
}
class TraceContextStore {
+current() ActiveTraceContext
+current_trace_id() str
+push(record) ActiveTraceContext
+pop() ActiveTraceContext
+set_step(step) ActiveTraceContext
}
class TraceContextRecord {
+str trace_id
+str parent_id
+str alias
+str type
+datetime event_time
+dict attrs
}
class TraceLogMessage {
+str trace_id
+str step
+str status
+str level
+str message
+datetime event_time
+dict attrs
}
class TraceTransport {
<<protocol>>
+write_context(record) None
+write_message(record) None
}
class MySqlTraceTransport {
+write_context(record) None
+write_message(record) None
}
class ControlChannel {
<<абстрактный>>
+start(on_start, on_stop, on_status) async*
@@ -182,10 +236,16 @@ classDiagram
ConfigManagerV2 --|> _RuntimeController : наследует
ConfigManagerV2 --> ConfigLoader : использует
ConfigManagerV2 --> LogManager : использует
ConfigManagerV2 --> TraceManager : использует
ConfigManagerV2 --> HealthAggregator : использует
ConfigManagerV2 --> ControlChannelBridge : использует
ConfigManagerV2 ..> ControlChannel : список каналов
_RuntimeController ..> WorkerLoop : создаёт в _worker_loop
TraceManager --> TraceContextStore : использует
TraceManager --> TraceTransport : использует
TraceManager ..> TraceContextRecord : создаёт
TraceManager ..> TraceLogMessage : создаёт
MySqlTraceTransport --|> TraceTransport : реализует
TelegramControlChannel --|> ControlChannel : реализует
HttpControlChannel --|> ControlChannel : реализует
HttpControlChannel --> UvicornServerRunner : использует
@@ -193,6 +253,51 @@ classDiagram
ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status
```
## Диаграмма последовательности (запуск и работа)
```mermaid
sequenceDiagram
autonumber
participant User
participant ConfigManagerV2
participant ControlChannel as ControlChannel(s)
participant WorkerLoop
participant ConfigLoader
participant Client as HTTP Client
User->>ConfigManagerV2: start()
ConfigManagerV2->>ConfigManagerV2: _start_control_channels()
ConfigManagerV2->>ControlChannel: start(on_start, on_stop, on_status)
ControlChannel-->>ConfigManagerV2: started
par Циклы работы
ConfigManagerV2->>WorkerLoop: run()
loop Периодически
WorkerLoop->>ConfigManagerV2: execute()
ConfigManagerV2-->>WorkerLoop: success/error
end
and
loop Периодически
ConfigManagerV2->>ConfigLoader: load_if_changed()
ConfigLoader-->>ConfigManagerV2: config / unchanged
end
end
Note over Client,ControlChannel: Запросы по HTTP API (если HttpControlChannel)
Client->>ControlChannel: GET /health
ControlChannel->>ConfigManagerV2: health_provider.collect()
ConfigManagerV2-->>ControlChannel: HealthPayload
ControlChannel-->>Client: 200 OK
Client->>ControlChannel: POST /actions/stop
ControlChannel->>ConfigManagerV2: on_stop() (bridge)
ConfigManagerV2->>ConfigManagerV2: set halt
ConfigManagerV2->>WorkerLoop: halt_event.set()
ConfigManagerV2->>ControlChannel: stop()
ControlChannel-->>Client: 200 OK
```
## Логирование
Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться.
@@ -201,11 +306,95 @@ classDiagram
- Убедитесь, что в конфиге есть ключ `log` с `version: 1`, `handlers` и `loggers` (пример — `tests/config.yaml`).
- После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v2.core.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO.
## Trace
Модуль `trace` предназначен для структурированной трассировки прикладных процессов и иерархически связанных сущностей.
Базовая идея:
- есть `TraceContextRecord` — логический контекст, который группирует сообщения;
- есть `TraceLogMessage` — отдельное событие внутри текущего контекста;
- контексты могут быть вложенными: один родительский контекст и много дочерних;
- активный контекст хранится в `TraceContextStore`, а при выходе из дочернего `with` автоматически восстанавливается родитель.
### Архитектура
Основные части модуля:
- `TraceManager` — публичный API для приложений;
- `TraceContextStore` — хранение активного контекста и стека вложенности;
- `TraceContextRecord` — описание контекста;
- `TraceLogMessage` — описание сообщения;
- `TraceTransport` — интерфейс транспорта;
- `MySqlTraceTransport` — запись контекстов и сообщений в MySQL.
Сущности:
`TraceContextRecord`
- `trace_id`
- `parent_id`
- `alias`
- `type`
- `event_time`
- `attrs`
`TraceLogMessage`
- `trace_id`
- `event_time`
- `step`
- `status`
- `level`
- `message`
- `attrs`
### Принцип использования
1. На старте процесса создаётся контекст через `bind_context()` или `open_context()`.
2. Для серии сообщений выставляется текущий `step()`.
3. Сообщения пишутся через `info()/warning()/error()/exception()`.
4. При использовании `open_context()` дочерний контекст автоматически закрывается по выходу из `with`, а родительский становится текущим снова.
### Пример: корневой контекст
```python
from config_manager.v2.trace import TraceManager
trace = TraceManager()
trace_id = trace.bind_context(
alias="job-123",
type="task",
attrs={"source": "scheduler"},
)
trace.step("prepare")
trace.info("Подготовка завершена", status="completed", attrs={"items_count": 2})
```
### Пример: дочерний контекст
```python
with trace.open_context(
alias="subtask-1",
type="subtask",
parent_id=trace_id,
attrs={"segment": "phase-a"},
) as child_trace_id:
trace.step("execute")
trace.info("Подзадача запущена", status="started")
trace.info("Подзадача завершена", status="completed", attrs={"duration_ms": 120})
# Здесь снова активен родительский контекст
trace.step("finish")
trace.info("Обработка завершена", status="completed")
```
### Хранение в MySQL
Для MySQL предусмотрен `MySqlTraceTransport`. Он пишет две сущности в отдельные таблицы:
- `trace_contexts`
- `trace_messages`
Это позволяет:
- отдельно хранить структуру процесса;
- отдельно хранить историю шагов и сообщений;
- строить отчёты и трассировку без завязки на `logging`.
## Установка
``pip install git+https://git.lesha.spb.ru/alex/config_manager.git``
## Контакты
- **e-mail**: lesha.spb@gmail.com
- **telegram**: https://t.me/lesha_spb

View File

@@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta"
[project]
name = "config_manager"
version = "2.1.7"
description = "Фикс вечного цикла при ошибке"
version = "2.3.0"
description = "Дoбавлен пакет trace"
authors = [
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
]
@@ -15,6 +15,7 @@ dependencies = [
"PyYAML>=6.0",
"fastapi>=0.100.0",
"uvicorn[standard]>=0.22.0",
"PyMySQL>=1.1.0",
]
[project.urls]

View File

@@ -0,0 +1,30 @@
CREATE TABLE IF NOT EXISTS trace_contexts
(
trace_id CHAR(32) PRIMARY KEY,
parent_id CHAR(32) NULL,
alias VARCHAR(255) NOT NULL,
type VARCHAR(64) NULL,
event_time DATETIME(6) NOT NULL,
attrs_json JSON NOT NULL,
INDEX idx_trace_contexts_parent_id (parent_id),
INDEX idx_trace_contexts_event_time (event_time)
);
CREATE TABLE IF NOT EXISTS trace_messages
(
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
trace_id CHAR(32) NOT NULL,
event_time DATETIME(6) NOT NULL,
step VARCHAR(128) NOT NULL DEFAULT '',
status VARCHAR(64) NOT NULL DEFAULT '',
level VARCHAR(16) NOT NULL DEFAULT 'INFO',
message TEXT NOT NULL,
attrs_json JSON NOT NULL,
INDEX idx_trace_messages_trace_id (trace_id),
INDEX idx_trace_messages_event_time (event_time),
INDEX idx_trace_messages_step (step),
INDEX idx_trace_messages_status (status),
CONSTRAINT fk_trace_messages_context
FOREIGN KEY (trace_id) REFERENCES trace_contexts(trace_id)
ON DELETE CASCADE
);

View File

@@ -1,2 +1,5 @@
from .v2 import ConfigManagerV2 as ConfigManager
from .v2.core.log_manager import LogManager
from .v2.trace import TraceManager
__all__ = ["ConfigManager", "LogManager", "TraceManager"]

View File

@@ -2,5 +2,6 @@
Контракт: наследование от ConfigManagerV2, переопределение execute(), управление через API (config.yaml, секция management)."""
from .core import ConfigManagerV2
from .trace import TraceManager
__all__ = ["ConfigManagerV2"]
__all__ = ["ConfigManagerV2", "TraceManager"]

View File

@@ -154,8 +154,8 @@ class HttpControlChannel(ControlChannel):
self._on_status: Optional[StatusHandler] = None
self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout)
self._app: Optional[FastAPI] = None
logger.debug(
"HttpControlChannel.__init__ result: host=%s port=%s timeout=%s",
logger.warning(
"HttpControlChannel запушщен: host=%s port=%s timeout=%s",
host,
port,
timeout,
@@ -169,7 +169,7 @@ class HttpControlChannel(ControlChannel):
started = time.monotonic()
response = await call_next(request)
duration_ms = int((time.monotonic() - started) * 1000)
logger.info(
logger.debug(
"API call: method=%s path=%s status=%s duration_ms=%s",
request.method,
request.url.path,
@@ -230,6 +230,7 @@ class HttpControlChannel(ControlChannel):
"HttpControlChannel._action_response result: action=%s status_code=404 detail=handler not configured",
action,
)
logger.warning("control command: action=%s status=handler_not_configured", action)
return JSONResponse(
content={"status": "error", "detail": f"{action} handler is not configured"},
status_code=404,
@@ -243,15 +244,18 @@ class HttpControlChannel(ControlChannel):
action,
detail,
)
logger.warning("control command: action=%s status=ok", action)
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
except asyncio.TimeoutError:
logger.warning("HttpControlChannel._action_response timeout: action=%s timeout=%s", action, self._timeout)
logger.debug("HttpControlChannel._action_response timeout: action=%s timeout=%s", action, self._timeout)
logger.warning("control command: action=%s status=timeout", action)
return JSONResponse(
content={"status": "error", "detail": f"{action} handler did not respond within {self._timeout}s"},
status_code=504,
)
except Exception as exc: # noqa: BLE001
logger.exception("HttpControlChannel._action_response error: action=%s", action)
logger.debug("HttpControlChannel._action_response error: action=%s", action, exc_info=True)
logger.warning("control command: action=%s status=error", action)
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
async def start(

View File

@@ -62,7 +62,7 @@ class TelegramControlChannel(ControlChannel):
for update in updates:
await self._process_update(update)
except Exception as exc: # noqa: BLE001
self._logger.warning("Telegram polling error: %s", exc)
self._logger.debug("Telegram polling error: %s", exc, exc_info=True)
try:
await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1))
@@ -94,16 +94,32 @@ class TelegramControlChannel(ControlChannel):
if chat_id != self._chat_id:
return
if text in {"/start", "/run"} and self._on_start is not None:
reply = await self._on_start()
elif text in {"/stop", "/halt"} and self._on_stop is not None:
reply = await self._on_stop()
elif text in {"/status", "/health"} and self._on_status is not None:
reply = await self._on_status()
action: Optional[str]
callback: Optional[StartHandler | StopHandler | StatusHandler]
if text in {"/start", "/run"}:
action = "start"
callback = self._on_start
elif text in {"/stop", "/halt"}:
action = "stop"
callback = self._on_stop
elif text in {"/status", "/health"}:
action = "status"
callback = self._on_status
else:
return
await asyncio.to_thread(self._send_message, reply)
if callback is None:
self._logger.warning("control command: action=%s status=handler_not_configured", action)
return
try:
reply = await callback()
await asyncio.to_thread(self._send_message, reply)
self._logger.warning("control command: action=%s status=ok", action)
except asyncio.TimeoutError:
self._logger.warning("control command: action=%s status=timeout", action)
except Exception:
self._logger.debug("Telegram control command error: action=%s", action, exc_info=True)
self._logger.warning("control command: action=%s status=error", action)
def _send_message(self, text: str) -> None:
"""Отправить текстовый ответ в настроенный чат Telegram."""

View File

@@ -55,14 +55,43 @@ def _read_env_optional_float(name: str, default_value: Optional[float]) -> Optio
return default_value
def _read_env_health_timeout(default_value: float) -> float:
"""Read health timeout from env."""
env_name = "HEALTH_TIMEOUT"
raw_value = os.environ.get(env_name)
if raw_value is None:
return default_value
try:
parsed = float(raw_value)
if parsed <= 0:
raise ValueError(f"{env_name} must be greater than zero")
return parsed
except Exception: # noqa: BLE001
logger.exception(
"ConfigManagerV2 health timeout parse error: env=%s raw_value=%s fallback=%s",
env_name,
raw_value,
default_value,
)
return default_value
class _RuntimeController:
"""Runtime loops and lifecycle supervision."""
CONTROL_CHANNEL_TIMEOUT = 5.0
def _trigger_health_transition_check(self) -> None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
loop.create_task(self._log_health_status_transition(), name="health-transition-check")
def _on_execute_success(self) -> None:
self._last_success_timestamp = time.monotonic()
self._last_execute_error = None
self._trigger_health_transition_check()
self.logger.debug(
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
self._last_success_timestamp,
@@ -70,6 +99,7 @@ class _RuntimeController:
def _on_execute_error(self, exc: Exception) -> None:
self._last_execute_error = str(exc)
self._trigger_health_transition_check()
self.logger.error(
"ConfigManagerV2._on_execute_error: %s",
self._last_execute_error,
@@ -78,7 +108,8 @@ class _RuntimeController:
def _on_worker_degraded_change(self, degraded: bool) -> None:
self._worker_degraded = degraded
self.logger.warning("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
self._trigger_health_transition_check()
self.logger.debug("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None:
self._worker_inflight_count = inflight_count
@@ -89,6 +120,26 @@ class _RuntimeController:
timed_out_count,
)
async def _log_health_status_transition(self) -> None:
try:
health = await self._health_aggregator.collect()
except Exception: # noqa: BLE001
self.logger.exception("ConfigManagerV2._log_health_status_transition error")
return
status = health.get("status", "unhealthy")
if self._last_health_status == status:
return
previous = self._last_health_status or "unknown"
detail = health.get("detail", "")
self._last_health_status = status
self.logger.warning(
"ConfigManagerV2 health status changed: %s -> %s (state=%s detail=%s)",
previous,
status,
self._state.value,
detail,
)
async def _worker_loop(self) -> None:
self.logger.warning(
"ConfigManagerV2._worker_loop result: started work_interval=%s",
@@ -111,19 +162,20 @@ class _RuntimeController:
self.logger.warning("ConfigManagerV2._worker_loop result: stopped")
async def _periodic_update_loop(self) -> None:
self.logger.warning(
self.logger.debug(
"ConfigManagerV2._periodic_update_loop result: started update_interval=%s",
self.update_interval,
)
try:
while not self._halt.is_set():
await self._update_config()
await self._log_health_status_transition()
try:
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
except asyncio.TimeoutError:
continue
finally:
self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped")
self.logger.debug("ConfigManagerV2._periodic_update_loop result: stopped")
async def _status_text(self) -> str:
health = await self._health_aggregator.collect()
@@ -259,6 +311,7 @@ class _RuntimeController:
await self._update_config()
await self._start_control_channels()
await self._start_runtime()
await self._log_health_status_transition()
try:
await self._shutdown.wait()
@@ -289,7 +342,7 @@ class ConfigManagerV2(_RuntimeController):
DEFAULT_UPDATE_INTERVAL = 5
DEFAULT_WORK_INTERVAL = 2
DEFAULT_HEALTH_TIMEOUT = 30
DEFAULT_HEALTH_TIMEOUT = 90
DEFAULT_EXECUTE_TIMEOUT = 600.0
def __init__(
@@ -320,10 +373,11 @@ class ConfigManagerV2(_RuntimeController):
self._worker_degraded = False
self._worker_inflight_count = 0
self._worker_timed_out_inflight_count = 0
self._last_health_status: Optional[str] = None
initial_config = self._loader.load_sync()
self.config = initial_config
self._health_timeout = self.DEFAULT_HEALTH_TIMEOUT
self._health_timeout = _read_env_health_timeout(float(self.DEFAULT_HEALTH_TIMEOUT))
self._health_aggregator = HealthAggregator(
get_state=lambda: self._state,
get_last_error=lambda: self._last_execute_error,
@@ -344,11 +398,12 @@ class ConfigManagerV2(_RuntimeController):
else:
self._control_channels = list(control_channels)
self.logger.debug(
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s execute_timeout=%s control_channels=%s",
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s execute_timeout=%s health_timeout=%s control_channels=%s",
self.path,
self.update_interval,
self.work_interval,
self._execute_timeout,
self._health_timeout,
len(self._control_channels),
)
@@ -373,7 +428,7 @@ class ConfigManagerV2(_RuntimeController):
self.logger.debug("ConfigManagerV2._update_config result: no changes")
return
self._apply_config(new_config)
self.logger.debug("ConfigManagerV2._update_config result: config updated")
self.logger.warning("ConfigManagerV2._update_config result: config updated and applied")
except Exception as exc: # noqa: BLE001
self.logger.exception("ConfigManagerV2._update_config error")
if self._loader.last_valid_config is None:
@@ -384,7 +439,7 @@ class ConfigManagerV2(_RuntimeController):
return
try:
self._apply_config(self._loader.last_valid_config)
self.logger.debug(
self.logger.warning(
"ConfigManagerV2._update_config result: fallback to last valid config applied",
)
except Exception: # noqa: BLE001

View File

@@ -25,6 +25,8 @@ class _InFlightExecute:
class WorkerLoop:
LOOP_TICK_SECONDS = 0.02
def __init__(
self,
execute: Callable[[], None],
@@ -48,6 +50,7 @@ class WorkerLoop:
self._id_seq = count(1)
self._degraded = False
self._last_metrics: Optional[tuple[int, int]] = None
self._next_start_at = 0.0
logger.debug(
"WorkerLoop.__init__ result: execute=%s execute_timeout=%s",
getattr(execute, "__name__", type(execute).__name__),
@@ -64,7 +67,7 @@ class WorkerLoop:
self._degraded = value
if self._on_degraded_change is not None:
self._on_degraded_change(value)
logger.warning("WorkerLoop.run degraded state changed: degraded=%s", value)
logger.debug("WorkerLoop.run degraded state changed: degraded=%s", value)
def _start_execute(self) -> None:
execute_id = next(self._id_seq)
@@ -107,7 +110,7 @@ class WorkerLoop:
continue
execution.timeout_reported = True
self._notify_error(TimeoutError(f"execute() did not finish within {self._execute_timeout}s"))
logger.warning(
logger.debug(
"WorkerLoop.run execute timeout: id=%s elapsed=%.3fs timeout=%ss",
execution.id,
elapsed,
@@ -118,12 +121,17 @@ class WorkerLoop:
return any(item.timeout_reported and not item.task.done() for item in self._inflight)
def _ensure_capacity(self) -> None:
now = time.monotonic()
if now < self._next_start_at:
return
active_count = len(self._inflight)
if active_count == 0:
self._start_execute()
self._next_start_at = now + max(self._get_interval(), 0.01)
return
if active_count == 1 and self._has_timed_out_inflight():
self._start_execute()
self._next_start_at = now + max(self._get_interval(), 0.01)
return
def _emit_metrics(self) -> None:
@@ -146,7 +154,7 @@ class WorkerLoop:
self._ensure_capacity()
self._emit_metrics()
timeout = max(self._get_interval(), 0.01)
timeout = self.LOOP_TICK_SECONDS
try:
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
@@ -154,7 +162,7 @@ class WorkerLoop:
if self._inflight:
if self._has_timed_out_inflight():
logger.warning("WorkerLoop.run stop: timed-out execute still running; exiting without waiting")
logger.debug("WorkerLoop.run stop: timed-out execute still running; exiting without waiting")
else:
await asyncio.gather(*(item.task for item in self._inflight), return_exceptions=True)
self._collect_finished()

View File

@@ -0,0 +1,16 @@
"""Public tracing API for config_manager-based applications."""
from .manager import TraceManager
from .models import TraceContextRecord, TraceLogMessage
from .store import ActiveTraceContext, TraceContextStore
from .transport.base import NoOpTraceTransport, TraceTransport
__all__ = [
"ActiveTraceContext",
"NoOpTraceTransport",
"TraceContextRecord",
"TraceContextStore",
"TraceLogMessage",
"TraceManager",
"TraceTransport",
]

View File

@@ -0,0 +1,96 @@
"""High-level tracing API independent from the logging module."""
from __future__ import annotations
from contextlib import contextmanager
from typing import Any, Dict, Iterator, Optional
from uuid import uuid4
from .models import TraceContextRecord, TraceLogMessage, utc_now
from .store import TraceContextStore
from .transport.base import NoOpTraceTransport, TraceTransport
class TraceManager:
"""Creates trace contexts and writes trace messages for active context."""
def __init__(self, transport: Optional[TraceTransport] = None, store: Optional[TraceContextStore] = None) -> None:
self.transport = transport or NoOpTraceTransport()
self.store = store or TraceContextStore()
def bind_context(
self,
*,
alias: str,
parent_id: Optional[str] = None,
type: Optional[str] = None,
attrs: Optional[Dict[str, Any]] = None,
) -> str:
"""Create and activate a trace context, returning its trace identifier."""
record = TraceContextRecord(
trace_id=uuid4().hex,
parent_id=parent_id,
alias=str(alias or ""),
type=str(type) if type is not None else None,
event_time=utc_now(),
attrs=dict(attrs or {}),
)
self.store.push(record)
self.transport.write_context(record)
return record.trace_id
@contextmanager
def open_context(
self,
*,
alias: str,
parent_id: Optional[str] = None,
type: Optional[str] = None,
attrs: Optional[Dict[str, Any]] = None,
) -> Iterator[str]:
"""Open nested trace context and restore previous one after exit."""
trace_id = self.bind_context(alias=alias, parent_id=parent_id, type=type, attrs=attrs)
try:
yield trace_id
finally:
self.store.pop()
def current_trace_id(self) -> Optional[str]:
"""Return current active trace identifier."""
return self.store.current_trace_id()
def close_context(self) -> Optional[str]:
"""Close current context and return restored parent trace id, if any."""
previous = self.store.pop()
return previous.record.trace_id if previous else None
def step(self, name: str) -> None:
"""Set current step for subsequent messages."""
self.store.set_step(name)
def info(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
self._write_message(level="INFO", message=message, status=status, attrs=attrs)
def warning(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
self._write_message(level="WARNING", message=message, status=status, attrs=attrs)
def error(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
self._write_message(level="ERROR", message=message, status=status, attrs=attrs)
def exception(self, message: str, *, status: str = "failed", attrs: Optional[Dict[str, Any]] = None) -> None:
self._write_message(level="ERROR", message=message, status=status, attrs=attrs)
def _write_message(self, *, level: str, message: str, status: str, attrs: Optional[Dict[str, Any]]) -> None:
active = self.store.current()
if active is None:
raise RuntimeError("Trace context is not bound. Call bind_context() first.")
record = TraceLogMessage(
trace_id=active.record.trace_id,
event_time=utc_now(),
step=active.step,
status=str(status or ""),
level=level,
message=str(message or ""),
attrs=dict(attrs or {}),
)
self.transport.write_message(record)

View File

@@ -0,0 +1,37 @@
"""Data models for trace contexts and trace messages."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Dict, Optional
def utc_now() -> datetime:
"""Return current UTC time with timezone for trace events."""
return datetime.now(timezone.utc)
@dataclass(frozen=True)
class TraceContextRecord:
"""Represents a logical unit that groups trace messages."""
trace_id: str
alias: str
parent_id: Optional[str] = None
type: Optional[str] = None
event_time: datetime = field(default_factory=utc_now)
attrs: Dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class TraceLogMessage:
"""Represents a single trace message linked to a trace context."""
trace_id: str
step: str
status: str
message: str
level: str
event_time: datetime = field(default_factory=utc_now)
attrs: Dict[str, Any] = field(default_factory=dict)

View File

@@ -0,0 +1,70 @@
"""Context-local storage for active trace contexts."""
from __future__ import annotations
from contextvars import ContextVar
from dataclasses import dataclass, replace
from typing import Optional, Tuple
from .models import TraceContextRecord
@dataclass(frozen=True)
class ActiveTraceContext:
"""Stores current trace context and active processing step."""
record: TraceContextRecord
step: str = ""
class TraceContextStore:
"""Keeps active trace context stack in the current execution context."""
def __init__(self) -> None:
self._current: ContextVar[Optional[ActiveTraceContext]] = ContextVar("trace_current", default=None)
self._stack: ContextVar[Tuple[ActiveTraceContext, ...]] = ContextVar("trace_stack", default=())
def current(self) -> Optional[ActiveTraceContext]:
"""Return the current active trace context, if present."""
return self._current.get()
def current_trace_id(self) -> Optional[str]:
"""Return current trace identifier, if present."""
current = self.current()
return current.record.trace_id if current else None
def current_step(self) -> str:
"""Return current active step or empty string."""
current = self.current()
return current.step if current else ""
def push(self, record: TraceContextRecord) -> ActiveTraceContext:
"""Activate a new trace context and preserve the previous one in stack."""
current = self.current()
stack = self._stack.get()
if current is not None:
stack = stack + (current,)
self._stack.set(stack)
active = ActiveTraceContext(record=record)
self._current.set(active)
return active
def pop(self) -> Optional[ActiveTraceContext]:
"""Restore the previous trace context from stack."""
stack = self._stack.get()
if not stack:
self._current.set(None)
return None
previous = stack[-1]
self._stack.set(stack[:-1])
self._current.set(previous)
return previous
def set_step(self, step: str) -> Optional[ActiveTraceContext]:
"""Assign step to current active context."""
current = self.current()
if current is None:
return None
updated = replace(current, step=str(step or ""))
self._current.set(updated)
return updated

View File

@@ -0,0 +1,5 @@
"""Trace transports."""
from .base import NoOpTraceTransport, TraceTransport
__all__ = ["NoOpTraceTransport", "TraceTransport"]

View File

@@ -0,0 +1,27 @@
"""Transport interfaces for trace persistence."""
from __future__ import annotations
from typing import Protocol
from ..models import TraceContextRecord, TraceLogMessage
class TraceTransport(Protocol):
"""Writes trace records to an external destination."""
def write_context(self, record: TraceContextRecord) -> None:
"""Persist trace context record."""
def write_message(self, record: TraceLogMessage) -> None:
"""Persist trace log message."""
class NoOpTraceTransport:
"""Default transport that ignores all trace records."""
def write_context(self, record: TraceContextRecord) -> None:
return None
def write_message(self, record: TraceLogMessage) -> None:
return None

View File

@@ -0,0 +1,97 @@
"""MySQL transport for trace contexts and trace messages."""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any, Dict, Optional
import pymysql
from ..models import TraceContextRecord, TraceLogMessage
class MySqlTraceTransport:
"""Persists trace records into dedicated MySQL tables."""
TRACE_CONTEXTS_TABLE = "trace_contexts"
TRACE_MESSAGES_TABLE = "trace_messages"
def __init__(
self,
*,
host: str,
port: int,
database: str,
user: str,
password: str,
connect_timeout: int = 5,
charset: str = "utf8mb4",
) -> None:
self.host = host
self.port = int(port)
self.database = database
self.user = user
self.password = password
self.connect_timeout = connect_timeout
self.charset = charset
def write_context(self, record: TraceContextRecord) -> None:
query = (
f"INSERT INTO {self.TRACE_CONTEXTS_TABLE} "
"(trace_id, parent_id, alias, type, event_time, attrs_json) "
"VALUES (%s, %s, %s, %s, %s, %s)"
)
params = (
record.trace_id,
record.parent_id,
record.alias,
record.type,
self._normalize_time(record.event_time),
self._serialize_attrs(record.attrs),
)
self._execute(query, params)
def write_message(self, record: TraceLogMessage) -> None:
query = (
f"INSERT INTO {self.TRACE_MESSAGES_TABLE} "
"(trace_id, event_time, step, status, level, message, attrs_json) "
"VALUES (%s, %s, %s, %s, %s, %s, %s)"
)
params = (
record.trace_id,
self._normalize_time(record.event_time),
record.step,
record.status,
record.level,
record.message,
self._serialize_attrs(record.attrs),
)
self._execute(query, params)
def _execute(self, query: str, params: tuple[Any, ...]) -> None:
connection = pymysql.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
charset=self.charset,
connect_timeout=self.connect_timeout,
autocommit=True,
)
try:
with connection.cursor() as cursor:
cursor.execute(query, params)
finally:
connection.close()
@staticmethod
def _serialize_attrs(attrs: Optional[Dict[str, Any]]) -> str:
return json.dumps(attrs or {}, ensure_ascii=False, default=str, sort_keys=True)
@staticmethod
def _normalize_time(value: datetime) -> datetime:
if value.tzinfo is None:
return value
return value.astimezone(timezone.utc).replace(tzinfo=None)

View File

@@ -0,0 +1,26 @@
from config_manager.v2 import ConfigManagerV2
class TimeoutApp(ConfigManagerV2):
def execute(self) -> None:
return
def test_health_timeout_uses_main_env_key(tmp_path, monkeypatch):
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\nhealth_timeout: 150\n", encoding="utf-8")
monkeypatch.setenv("HEALTH_TIMEOUT", "120")
monkeypatch.setenv("HEALTHY_TIMEOUT", "300")
app = TimeoutApp(str(cfg))
assert app._health_timeout == 120.0
def test_health_timeout_defaults_to_90_when_env_not_set(tmp_path, monkeypatch):
cfg = tmp_path / "config.yaml"
cfg.write_text("log: {}\nmanagement: { enabled: false }\nhealth_timeout: 150\n", encoding="utf-8")
monkeypatch.delenv("HEALTH_TIMEOUT", raising=False)
monkeypatch.delenv("HEALTHY_TIMEOUT", raising=False)
app = TimeoutApp(str(cfg))
assert app._health_timeout == 90.0

View File

@@ -145,8 +145,6 @@ def test_execute_timeout_does_not_start_parallel_runs(tmp_path, monkeypatch):
await app.stop()
assert app.calls >= 1
assert app._last_execute_error is not None
assert "did not finish within" in app._last_execute_error
assert app.max_active == 2
assert degraded_health["status"] == "degraded"

View File

@@ -0,0 +1,79 @@
from __future__ import annotations
from dataclasses import dataclass, field
from config_manager.v2.trace import TraceManager
@dataclass
class MemoryTraceTransport:
contexts: list = field(default_factory=list)
messages: list = field(default_factory=list)
def write_context(self, record) -> None:
self.contexts.append(record)
def write_message(self, record) -> None:
self.messages.append(record)
def test_bind_context_writes_context_and_returns_trace_id():
transport = MemoryTraceTransport()
trace = TraceManager(transport=transport)
trace_id = trace.bind_context(alias="email-1", type="email", attrs={"message_id": "abc"})
assert trace_id
assert transport.contexts[0].trace_id == trace_id
assert transport.contexts[0].alias == "email-1"
assert transport.contexts[0].type == "email"
assert transport.contexts[0].attrs == {"message_id": "abc"}
def test_open_context_restores_parent_after_exit():
transport = MemoryTraceTransport()
trace = TraceManager(transport=transport)
parent_id = trace.bind_context(alias="email-1", type="email")
with trace.open_context(alias="order.xlsx", type="attachment") as child_id:
assert trace.current_trace_id() == child_id
assert trace.current_trace_id() == parent_id
def test_messages_use_current_step_and_attrs():
transport = MemoryTraceTransport()
trace = TraceManager(transport=transport)
trace.bind_context(alias="email-1", type="email")
trace.step("parse_email")
trace.info("Письмо распарсено", status="completed", attrs={"attachments_count": 2})
message = transport.messages[0]
assert message.step == "parse_email"
assert message.status == "completed"
assert message.level == "INFO"
assert message.message == "Письмо распарсено"
assert message.attrs == {"attachments_count": 2}
def test_bind_context_keeps_parent_empty_by_default():
transport = MemoryTraceTransport()
trace = TraceManager(transport=transport)
parent_id = trace.bind_context(alias="email-1", type="email")
child_id = trace.bind_context(alias="order.xlsx", type="attachment")
assert child_id != parent_id
assert transport.contexts[-1].parent_id is None
def test_bind_context_uses_explicit_parent_id():
transport = MemoryTraceTransport()
trace = TraceManager(transport=transport)
parent_id = trace.bind_context(alias="email-1", type="email")
child_id = trace.bind_context(alias="order.xlsx", type="attachment", parent_id=parent_id)
assert child_id != parent_id
assert transport.contexts[-1].parent_id == parent_id