diff --git a/README.md b/README.md index 49ce451..6dbea18 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,11 @@ # Config Manager ## Описание -Пакет предназначен для запуска приложений с периодическим выполнением логики, перезагрузкой конфига и управлением по HTTP API. +Пакет предназначен как базовое приложение для проектов, в которых нужно периодически запускать одну и ту же функцию в одном потоке, с возможностью перезагрузки конфига и сервисным контуром вокруг прикладной логики. + +Под сервисным контуром здесь понимаются: +- логирование; +- трассировка бизнес-процессов и связанных сущностей; +- управление приложением через каналы управления (например, HTTP API). **Контракт:** приложение наследует **ConfigManagerV2**, переопределяет **execute()** (периодическая работа). Управление (старт/стоп, health) — через каналы, которые создаются снаружи и передаются в конструктор в **control_channels** (в т.ч. HttpControlChannel для API). @@ -12,6 +17,7 @@ - **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг. - **WorkerLoop** — в отдельном потоке циклически вызывает ваш метод `execute()` с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки. - **LogManager** — применяет секцию `log` из конфига к логированию (dictConfig). +- **TraceManager** — управляет структурированной трассировкой процессов, контекстов и сообщений. - **HealthAggregator** — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного `execute()` и таймаут здоровья; формирует единый ответ для health (ok/unhealthy). - **ControlChannelBridge** — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса). @@ -134,6 +140,54 @@ classDiagram +apply_config(config) None } + class TraceManager { + +bind_context(alias, parent_id, type, attrs) str + +open_context(alias, parent_id, type, attrs) contextmanager + +current_trace_id() str + +step(name) None + +info(message, status, attrs) None + +warning(message, status, attrs) None + +error(message, status, attrs) None + } + + class TraceContextStore { + +current() ActiveTraceContext + +current_trace_id() str + +push(record) ActiveTraceContext + +pop() ActiveTraceContext + +set_step(step) ActiveTraceContext + } + + class TraceContextRecord { + +str trace_id + +str parent_id + +str alias + +str type + +datetime event_time + +dict attrs + } + + class TraceLogMessage { + +str trace_id + +str step + +str status + +str level + +str message + +datetime event_time + +dict attrs + } + + class TraceTransport { + <> + +write_context(record) None + +write_message(record) None + } + + class MySqlTraceTransport { + +write_context(record) None + +write_message(record) None + } + class ControlChannel { <<абстрактный>> +start(on_start, on_stop, on_status) async* @@ -182,10 +236,16 @@ classDiagram ConfigManagerV2 --|> _RuntimeController : наследует ConfigManagerV2 --> ConfigLoader : использует ConfigManagerV2 --> LogManager : использует + ConfigManagerV2 --> TraceManager : использует ConfigManagerV2 --> HealthAggregator : использует ConfigManagerV2 --> ControlChannelBridge : использует ConfigManagerV2 ..> ControlChannel : список каналов _RuntimeController ..> WorkerLoop : создаёт в _worker_loop + TraceManager --> TraceContextStore : использует + TraceManager --> TraceTransport : использует + TraceManager ..> TraceContextRecord : создаёт + TraceManager ..> TraceLogMessage : создаёт + MySqlTraceTransport --|> TraceTransport : реализует TelegramControlChannel --|> ControlChannel : реализует HttpControlChannel --|> ControlChannel : реализует HttpControlChannel --> UvicornServerRunner : использует @@ -246,10 +306,95 @@ sequenceDiagram - Убедитесь, что в конфиге есть ключ `log` с `version: 1`, `handlers` и `loggers` (пример — `tests/config.yaml`). - После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v2.core.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO. +## Trace +Модуль `trace` предназначен для структурированной трассировки прикладных процессов и иерархически связанных сущностей. + +Базовая идея: +- есть `TraceContextRecord` — логический контекст, который группирует сообщения; +- есть `TraceLogMessage` — отдельное событие внутри текущего контекста; +- контексты могут быть вложенными: один родительский контекст и много дочерних; +- активный контекст хранится в `TraceContextStore`, а при выходе из дочернего `with` автоматически восстанавливается родитель. + +### Архитектура +Основные части модуля: +- `TraceManager` — публичный API для приложений; +- `TraceContextStore` — хранение активного контекста и стека вложенности; +- `TraceContextRecord` — описание контекста; +- `TraceLogMessage` — описание сообщения; +- `TraceTransport` — интерфейс транспорта; +- `MySqlTraceTransport` — запись контекстов и сообщений в MySQL. + +Сущности: + +`TraceContextRecord` +- `trace_id` +- `parent_id` +- `alias` +- `type` +- `event_time` +- `attrs` + +`TraceLogMessage` +- `trace_id` +- `event_time` +- `step` +- `status` +- `level` +- `message` +- `attrs` + +### Принцип использования +1. На старте процесса создаётся контекст через `bind_context()` или `open_context()`. +2. Для серии сообщений выставляется текущий `step()`. +3. Сообщения пишутся через `info()/warning()/error()/exception()`. +4. При использовании `open_context()` дочерний контекст автоматически закрывается по выходу из `with`, а родительский становится текущим снова. + +### Пример: корневой контекст +```python +from config_manager.v2.trace import TraceManager + +trace = TraceManager() + +trace_id = trace.bind_context( + alias="job-123", + type="task", + attrs={"source": "scheduler"}, +) + +trace.step("prepare") +trace.info("Подготовка завершена", status="completed", attrs={"items_count": 2}) +``` + +### Пример: дочерний контекст +```python +with trace.open_context( + alias="subtask-1", + type="subtask", + parent_id=trace_id, + attrs={"segment": "phase-a"}, +) as child_trace_id: + trace.step("execute") + trace.info("Подзадача запущена", status="started") + trace.info("Подзадача завершена", status="completed", attrs={"duration_ms": 120}) + +# Здесь снова активен родительский контекст +trace.step("finish") +trace.info("Обработка завершена", status="completed") +``` + +### Хранение в MySQL +Для MySQL предусмотрен `MySqlTraceTransport`. Он пишет две сущности в отдельные таблицы: +- `trace_contexts` +- `trace_messages` + +Это позволяет: +- отдельно хранить структуру процесса; +- отдельно хранить историю шагов и сообщений; +- строить отчёты и трассировку без завязки на `logging`. + ## Установка ``pip install git+https://git.lesha.spb.ru/alex/config_manager.git`` ## Контакты - **e-mail**: lesha.spb@gmail.com - **telegram**: https://t.me/lesha_spb - diff --git a/pyproject.toml b/pyproject.toml index 04e7d45..48923de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta" [project] name = "config_manager" -version = "2.2.5" -description = "Фикс таймаута healthy" +version = "2.3.0" +description = "Дoбавлен пакет trace" authors = [ { name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" } ] @@ -15,6 +15,7 @@ dependencies = [ "PyYAML>=6.0", "fastapi>=0.100.0", "uvicorn[standard]>=0.22.0", + "PyMySQL>=1.1.0", ] [project.urls] @@ -23,4 +24,4 @@ Documentation = "https://git.lesha.spb.ru/alex/config_manager" Repository = "https://git.lesha.spb.ru/alex/config_manager" [tool.setuptools.packages.find] -where = ["src"] \ No newline at end of file +where = ["src"] diff --git a/scripts/init_trace_mysql.sql b/scripts/init_trace_mysql.sql new file mode 100644 index 0000000..93da857 --- /dev/null +++ b/scripts/init_trace_mysql.sql @@ -0,0 +1,30 @@ +CREATE TABLE IF NOT EXISTS trace_contexts +( + trace_id CHAR(32) PRIMARY KEY, + parent_id CHAR(32) NULL, + alias VARCHAR(255) NOT NULL, + type VARCHAR(64) NULL, + event_time DATETIME(6) NOT NULL, + attrs_json JSON NOT NULL, + INDEX idx_trace_contexts_parent_id (parent_id), + INDEX idx_trace_contexts_event_time (event_time) +); + +CREATE TABLE IF NOT EXISTS trace_messages +( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + trace_id CHAR(32) NOT NULL, + event_time DATETIME(6) NOT NULL, + step VARCHAR(128) NOT NULL DEFAULT '', + status VARCHAR(64) NOT NULL DEFAULT '', + level VARCHAR(16) NOT NULL DEFAULT 'INFO', + message TEXT NOT NULL, + attrs_json JSON NOT NULL, + INDEX idx_trace_messages_trace_id (trace_id), + INDEX idx_trace_messages_event_time (event_time), + INDEX idx_trace_messages_step (step), + INDEX idx_trace_messages_status (status), + CONSTRAINT fk_trace_messages_context + FOREIGN KEY (trace_id) REFERENCES trace_contexts(trace_id) + ON DELETE CASCADE +); diff --git a/src/config_manager/__init__.py b/src/config_manager/__init__.py index 46c4f50..1d29035 100644 --- a/src/config_manager/__init__.py +++ b/src/config_manager/__init__.py @@ -1,2 +1,5 @@ from .v2 import ConfigManagerV2 as ConfigManager -from .v2.core.log_manager import LogManager \ No newline at end of file +from .v2.core.log_manager import LogManager +from .v2.trace import TraceManager + +__all__ = ["ConfigManager", "LogManager", "TraceManager"] diff --git a/src/config_manager/v2/__init__.py b/src/config_manager/v2/__init__.py index 2eb2bed..f8913ea 100644 --- a/src/config_manager/v2/__init__.py +++ b/src/config_manager/v2/__init__.py @@ -2,5 +2,6 @@ Контракт: наследование от ConfigManagerV2, переопределение execute(), управление через API (config.yaml, секция management).""" from .core import ConfigManagerV2 +from .trace import TraceManager -__all__ = ["ConfigManagerV2"] +__all__ = ["ConfigManagerV2", "TraceManager"] diff --git a/src/config_manager/v2/trace/__init__.py b/src/config_manager/v2/trace/__init__.py new file mode 100644 index 0000000..1fbb617 --- /dev/null +++ b/src/config_manager/v2/trace/__init__.py @@ -0,0 +1,16 @@ +"""Public tracing API for config_manager-based applications.""" + +from .manager import TraceManager +from .models import TraceContextRecord, TraceLogMessage +from .store import ActiveTraceContext, TraceContextStore +from .transport.base import NoOpTraceTransport, TraceTransport + +__all__ = [ + "ActiveTraceContext", + "NoOpTraceTransport", + "TraceContextRecord", + "TraceContextStore", + "TraceLogMessage", + "TraceManager", + "TraceTransport", +] diff --git a/src/config_manager/v2/trace/manager.py b/src/config_manager/v2/trace/manager.py new file mode 100644 index 0000000..5954032 --- /dev/null +++ b/src/config_manager/v2/trace/manager.py @@ -0,0 +1,96 @@ +"""High-level tracing API independent from the logging module.""" + +from __future__ import annotations + +from contextlib import contextmanager +from typing import Any, Dict, Iterator, Optional +from uuid import uuid4 + +from .models import TraceContextRecord, TraceLogMessage, utc_now +from .store import TraceContextStore +from .transport.base import NoOpTraceTransport, TraceTransport + + +class TraceManager: + """Creates trace contexts and writes trace messages for active context.""" + + def __init__(self, transport: Optional[TraceTransport] = None, store: Optional[TraceContextStore] = None) -> None: + self.transport = transport or NoOpTraceTransport() + self.store = store or TraceContextStore() + + def bind_context( + self, + *, + alias: str, + parent_id: Optional[str] = None, + type: Optional[str] = None, + attrs: Optional[Dict[str, Any]] = None, + ) -> str: + """Create and activate a trace context, returning its trace identifier.""" + record = TraceContextRecord( + trace_id=uuid4().hex, + parent_id=parent_id, + alias=str(alias or ""), + type=str(type) if type is not None else None, + event_time=utc_now(), + attrs=dict(attrs or {}), + ) + self.store.push(record) + self.transport.write_context(record) + return record.trace_id + + @contextmanager + def open_context( + self, + *, + alias: str, + parent_id: Optional[str] = None, + type: Optional[str] = None, + attrs: Optional[Dict[str, Any]] = None, + ) -> Iterator[str]: + """Open nested trace context and restore previous one after exit.""" + trace_id = self.bind_context(alias=alias, parent_id=parent_id, type=type, attrs=attrs) + try: + yield trace_id + finally: + self.store.pop() + + def current_trace_id(self) -> Optional[str]: + """Return current active trace identifier.""" + return self.store.current_trace_id() + + def close_context(self) -> Optional[str]: + """Close current context and return restored parent trace id, if any.""" + previous = self.store.pop() + return previous.record.trace_id if previous else None + + def step(self, name: str) -> None: + """Set current step for subsequent messages.""" + self.store.set_step(name) + + def info(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None: + self._write_message(level="INFO", message=message, status=status, attrs=attrs) + + def warning(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None: + self._write_message(level="WARNING", message=message, status=status, attrs=attrs) + + def error(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None: + self._write_message(level="ERROR", message=message, status=status, attrs=attrs) + + def exception(self, message: str, *, status: str = "failed", attrs: Optional[Dict[str, Any]] = None) -> None: + self._write_message(level="ERROR", message=message, status=status, attrs=attrs) + + def _write_message(self, *, level: str, message: str, status: str, attrs: Optional[Dict[str, Any]]) -> None: + active = self.store.current() + if active is None: + raise RuntimeError("Trace context is not bound. Call bind_context() first.") + record = TraceLogMessage( + trace_id=active.record.trace_id, + event_time=utc_now(), + step=active.step, + status=str(status or ""), + level=level, + message=str(message or ""), + attrs=dict(attrs or {}), + ) + self.transport.write_message(record) diff --git a/src/config_manager/v2/trace/models.py b/src/config_manager/v2/trace/models.py new file mode 100644 index 0000000..3cd9179 --- /dev/null +++ b/src/config_manager/v2/trace/models.py @@ -0,0 +1,37 @@ +"""Data models for trace contexts and trace messages.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Dict, Optional + + +def utc_now() -> datetime: + """Return current UTC time with timezone for trace events.""" + return datetime.now(timezone.utc) + + +@dataclass(frozen=True) +class TraceContextRecord: + """Represents a logical unit that groups trace messages.""" + + trace_id: str + alias: str + parent_id: Optional[str] = None + type: Optional[str] = None + event_time: datetime = field(default_factory=utc_now) + attrs: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True) +class TraceLogMessage: + """Represents a single trace message linked to a trace context.""" + + trace_id: str + step: str + status: str + message: str + level: str + event_time: datetime = field(default_factory=utc_now) + attrs: Dict[str, Any] = field(default_factory=dict) diff --git a/src/config_manager/v2/trace/store.py b/src/config_manager/v2/trace/store.py new file mode 100644 index 0000000..4d2616d --- /dev/null +++ b/src/config_manager/v2/trace/store.py @@ -0,0 +1,70 @@ +"""Context-local storage for active trace contexts.""" + +from __future__ import annotations + +from contextvars import ContextVar +from dataclasses import dataclass, replace +from typing import Optional, Tuple + +from .models import TraceContextRecord + + +@dataclass(frozen=True) +class ActiveTraceContext: + """Stores current trace context and active processing step.""" + + record: TraceContextRecord + step: str = "" + + +class TraceContextStore: + """Keeps active trace context stack in the current execution context.""" + + def __init__(self) -> None: + self._current: ContextVar[Optional[ActiveTraceContext]] = ContextVar("trace_current", default=None) + self._stack: ContextVar[Tuple[ActiveTraceContext, ...]] = ContextVar("trace_stack", default=()) + + def current(self) -> Optional[ActiveTraceContext]: + """Return the current active trace context, if present.""" + return self._current.get() + + def current_trace_id(self) -> Optional[str]: + """Return current trace identifier, if present.""" + current = self.current() + return current.record.trace_id if current else None + + def current_step(self) -> str: + """Return current active step or empty string.""" + current = self.current() + return current.step if current else "" + + def push(self, record: TraceContextRecord) -> ActiveTraceContext: + """Activate a new trace context and preserve the previous one in stack.""" + current = self.current() + stack = self._stack.get() + if current is not None: + stack = stack + (current,) + self._stack.set(stack) + active = ActiveTraceContext(record=record) + self._current.set(active) + return active + + def pop(self) -> Optional[ActiveTraceContext]: + """Restore the previous trace context from stack.""" + stack = self._stack.get() + if not stack: + self._current.set(None) + return None + previous = stack[-1] + self._stack.set(stack[:-1]) + self._current.set(previous) + return previous + + def set_step(self, step: str) -> Optional[ActiveTraceContext]: + """Assign step to current active context.""" + current = self.current() + if current is None: + return None + updated = replace(current, step=str(step or "")) + self._current.set(updated) + return updated diff --git a/src/config_manager/v2/trace/transport/__init__.py b/src/config_manager/v2/trace/transport/__init__.py new file mode 100644 index 0000000..54a231b --- /dev/null +++ b/src/config_manager/v2/trace/transport/__init__.py @@ -0,0 +1,5 @@ +"""Trace transports.""" + +from .base import NoOpTraceTransport, TraceTransport + +__all__ = ["NoOpTraceTransport", "TraceTransport"] diff --git a/src/config_manager/v2/trace/transport/base.py b/src/config_manager/v2/trace/transport/base.py new file mode 100644 index 0000000..26eb8ac --- /dev/null +++ b/src/config_manager/v2/trace/transport/base.py @@ -0,0 +1,27 @@ +"""Transport interfaces for trace persistence.""" + +from __future__ import annotations + +from typing import Protocol + +from ..models import TraceContextRecord, TraceLogMessage + + +class TraceTransport(Protocol): + """Writes trace records to an external destination.""" + + def write_context(self, record: TraceContextRecord) -> None: + """Persist trace context record.""" + + def write_message(self, record: TraceLogMessage) -> None: + """Persist trace log message.""" + + +class NoOpTraceTransport: + """Default transport that ignores all trace records.""" + + def write_context(self, record: TraceContextRecord) -> None: + return None + + def write_message(self, record: TraceLogMessage) -> None: + return None diff --git a/src/config_manager/v2/trace/transport/mysql.py b/src/config_manager/v2/trace/transport/mysql.py new file mode 100644 index 0000000..1300350 --- /dev/null +++ b/src/config_manager/v2/trace/transport/mysql.py @@ -0,0 +1,97 @@ +"""MySQL transport for trace contexts and trace messages.""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from typing import Any, Dict, Optional + +import pymysql + +from ..models import TraceContextRecord, TraceLogMessage + + +class MySqlTraceTransport: + """Persists trace records into dedicated MySQL tables.""" + + TRACE_CONTEXTS_TABLE = "trace_contexts" + TRACE_MESSAGES_TABLE = "trace_messages" + + def __init__( + self, + *, + host: str, + port: int, + database: str, + user: str, + password: str, + connect_timeout: int = 5, + charset: str = "utf8mb4", + ) -> None: + self.host = host + self.port = int(port) + self.database = database + self.user = user + self.password = password + self.connect_timeout = connect_timeout + self.charset = charset + + def write_context(self, record: TraceContextRecord) -> None: + query = ( + f"INSERT INTO {self.TRACE_CONTEXTS_TABLE} " + "(trace_id, parent_id, alias, type, event_time, attrs_json) " + "VALUES (%s, %s, %s, %s, %s, %s)" + ) + params = ( + record.trace_id, + record.parent_id, + record.alias, + record.type, + self._normalize_time(record.event_time), + self._serialize_attrs(record.attrs), + ) + self._execute(query, params) + + def write_message(self, record: TraceLogMessage) -> None: + query = ( + f"INSERT INTO {self.TRACE_MESSAGES_TABLE} " + "(trace_id, event_time, step, status, level, message, attrs_json) " + "VALUES (%s, %s, %s, %s, %s, %s, %s)" + ) + params = ( + record.trace_id, + self._normalize_time(record.event_time), + record.step, + record.status, + record.level, + record.message, + self._serialize_attrs(record.attrs), + ) + self._execute(query, params) + + def _execute(self, query: str, params: tuple[Any, ...]) -> None: + connection = pymysql.connect( + host=self.host, + port=self.port, + user=self.user, + password=self.password, + database=self.database, + charset=self.charset, + connect_timeout=self.connect_timeout, + autocommit=True, + ) + try: + with connection.cursor() as cursor: + cursor.execute(query, params) + finally: + connection.close() + + @staticmethod + def _serialize_attrs(attrs: Optional[Dict[str, Any]]) -> str: + return json.dumps(attrs or {}, ensure_ascii=False, default=str, sort_keys=True) + + @staticmethod + def _normalize_time(value: datetime) -> datetime: + if value.tzinfo is None: + return value + return value.astimezone(timezone.utc).replace(tzinfo=None) diff --git a/tests/v2/test_trace_manager.py b/tests/v2/test_trace_manager.py new file mode 100644 index 0000000..fa2b1f9 --- /dev/null +++ b/tests/v2/test_trace_manager.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + +from config_manager.v2.trace import TraceManager + + +@dataclass +class MemoryTraceTransport: + contexts: list = field(default_factory=list) + messages: list = field(default_factory=list) + + def write_context(self, record) -> None: + self.contexts.append(record) + + def write_message(self, record) -> None: + self.messages.append(record) + + +def test_bind_context_writes_context_and_returns_trace_id(): + transport = MemoryTraceTransport() + trace = TraceManager(transport=transport) + + trace_id = trace.bind_context(alias="email-1", type="email", attrs={"message_id": "abc"}) + + assert trace_id + assert transport.contexts[0].trace_id == trace_id + assert transport.contexts[0].alias == "email-1" + assert transport.contexts[0].type == "email" + assert transport.contexts[0].attrs == {"message_id": "abc"} + + +def test_open_context_restores_parent_after_exit(): + transport = MemoryTraceTransport() + trace = TraceManager(transport=transport) + + parent_id = trace.bind_context(alias="email-1", type="email") + with trace.open_context(alias="order.xlsx", type="attachment") as child_id: + assert trace.current_trace_id() == child_id + + assert trace.current_trace_id() == parent_id + + +def test_messages_use_current_step_and_attrs(): + transport = MemoryTraceTransport() + trace = TraceManager(transport=transport) + + trace.bind_context(alias="email-1", type="email") + trace.step("parse_email") + trace.info("Письмо распарсено", status="completed", attrs={"attachments_count": 2}) + + message = transport.messages[0] + assert message.step == "parse_email" + assert message.status == "completed" + assert message.level == "INFO" + assert message.message == "Письмо распарсено" + assert message.attrs == {"attachments_count": 2} + + +def test_bind_context_keeps_parent_empty_by_default(): + transport = MemoryTraceTransport() + trace = TraceManager(transport=transport) + + parent_id = trace.bind_context(alias="email-1", type="email") + child_id = trace.bind_context(alias="order.xlsx", type="attachment") + + assert child_id != parent_id + assert transport.contexts[-1].parent_id is None + + +def test_bind_context_uses_explicit_parent_id(): + transport = MemoryTraceTransport() + trace = TraceManager(transport=transport) + + parent_id = trace.bind_context(alias="email-1", type="email") + child_id = trace.bind_context(alias="order.xlsx", type="attachment", parent_id=parent_id) + + assert child_id != parent_id + assert transport.contexts[-1].parent_id == parent_id