Compare commits
10 Commits
a1dd495d6d
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 0f984fa42a | |||
| 1b287a9550 | |||
| bfc960233b | |||
| 01715b76f8 | |||
| 8e023d304f | |||
| 06d9ef2868 | |||
| d239454cfb | |||
| 54f9435f1d | |||
| 170c81fc5b | |||
| 2b43e2f86a |
195
README.md
195
README.md
@@ -1,6 +1,11 @@
|
|||||||
# Config Manager
|
# Config Manager
|
||||||
## Описание
|
## Описание
|
||||||
Пакет предназначен для запуска приложений с периодическим выполнением логики, перезагрузкой конфига и управлением по HTTP API.
|
Пакет предназначен как базовое приложение для проектов, в которых нужно периодически запускать одну и ту же функцию в одном потоке, с возможностью перезагрузки конфига и сервисным контуром вокруг прикладной логики.
|
||||||
|
|
||||||
|
Под сервисным контуром здесь понимаются:
|
||||||
|
- логирование;
|
||||||
|
- трассировка бизнес-процессов и связанных сущностей;
|
||||||
|
- управление приложением через каналы управления (например, HTTP API).
|
||||||
|
|
||||||
**Контракт:** приложение наследует **ConfigManagerV2**, переопределяет **execute()** (периодическая работа). Управление (старт/стоп, health) — через каналы, которые создаются снаружи и передаются в конструктор в **control_channels** (в т.ч. HttpControlChannel для API).
|
**Контракт:** приложение наследует **ConfigManagerV2**, переопределяет **execute()** (периодическая работа). Управление (старт/стоп, health) — через каналы, которые создаются снаружи и передаются в конструктор в **control_channels** (в т.ч. HttpControlChannel для API).
|
||||||
|
|
||||||
@@ -12,6 +17,7 @@
|
|||||||
- **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг.
|
- **ConfigLoader** — читает конфиг из файла (YAML/JSON), считает хеш и отдаёт конфиг только при изменении; при ошибке парсинга возвращает последний валидный конфиг.
|
||||||
- **WorkerLoop** — в отдельном потоке циклически вызывает ваш метод `execute()` с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки.
|
- **WorkerLoop** — в отдельном потоке циклически вызывает ваш метод `execute()` с паузой между вызовами; реагирует на событие остановки и колбэки успеха/ошибки.
|
||||||
- **LogManager** — применяет секцию `log` из конфига к логированию (dictConfig).
|
- **LogManager** — применяет секцию `log` из конфига к логированию (dictConfig).
|
||||||
|
- **TraceManager** — управляет структурированной трассировкой процессов, контекстов и сообщений.
|
||||||
- **HealthAggregator** — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного `execute()` и таймаут здоровья; формирует единый ответ для health (ok/unhealthy).
|
- **HealthAggregator** — собирает состояние: жизненный цикл (idle/starting/running/…), время последнего успешного `execute()` и таймаут здоровья; формирует единый ответ для health (ok/unhealthy).
|
||||||
- **ControlChannelBridge** — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса).
|
- **ControlChannelBridge** — один мост для всех каналов: обработчики on_start/on_stop/on_status (сброс/установка halt, текст статуса).
|
||||||
|
|
||||||
@@ -134,6 +140,54 @@ classDiagram
|
|||||||
+apply_config(config) None
|
+apply_config(config) None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class TraceManager {
|
||||||
|
+bind_context(alias, parent_id, type, attrs) str
|
||||||
|
+open_context(alias, parent_id, type, attrs) contextmanager
|
||||||
|
+current_trace_id() str
|
||||||
|
+step(name) None
|
||||||
|
+info(message, status, attrs) None
|
||||||
|
+warning(message, status, attrs) None
|
||||||
|
+error(message, status, attrs) None
|
||||||
|
}
|
||||||
|
|
||||||
|
class TraceContextStore {
|
||||||
|
+current() ActiveTraceContext
|
||||||
|
+current_trace_id() str
|
||||||
|
+push(record) ActiveTraceContext
|
||||||
|
+pop() ActiveTraceContext
|
||||||
|
+set_step(step) ActiveTraceContext
|
||||||
|
}
|
||||||
|
|
||||||
|
class TraceContextRecord {
|
||||||
|
+str trace_id
|
||||||
|
+str parent_id
|
||||||
|
+str alias
|
||||||
|
+str type
|
||||||
|
+datetime event_time
|
||||||
|
+dict attrs
|
||||||
|
}
|
||||||
|
|
||||||
|
class TraceLogMessage {
|
||||||
|
+str trace_id
|
||||||
|
+str step
|
||||||
|
+str status
|
||||||
|
+str level
|
||||||
|
+str message
|
||||||
|
+datetime event_time
|
||||||
|
+dict attrs
|
||||||
|
}
|
||||||
|
|
||||||
|
class TraceTransport {
|
||||||
|
<<protocol>>
|
||||||
|
+write_context(record) None
|
||||||
|
+write_message(record) None
|
||||||
|
}
|
||||||
|
|
||||||
|
class MySqlTraceTransport {
|
||||||
|
+write_context(record) None
|
||||||
|
+write_message(record) None
|
||||||
|
}
|
||||||
|
|
||||||
class ControlChannel {
|
class ControlChannel {
|
||||||
<<абстрактный>>
|
<<абстрактный>>
|
||||||
+start(on_start, on_stop, on_status) async*
|
+start(on_start, on_stop, on_status) async*
|
||||||
@@ -182,10 +236,16 @@ classDiagram
|
|||||||
ConfigManagerV2 --|> _RuntimeController : наследует
|
ConfigManagerV2 --|> _RuntimeController : наследует
|
||||||
ConfigManagerV2 --> ConfigLoader : использует
|
ConfigManagerV2 --> ConfigLoader : использует
|
||||||
ConfigManagerV2 --> LogManager : использует
|
ConfigManagerV2 --> LogManager : использует
|
||||||
|
ConfigManagerV2 --> TraceManager : использует
|
||||||
ConfigManagerV2 --> HealthAggregator : использует
|
ConfigManagerV2 --> HealthAggregator : использует
|
||||||
ConfigManagerV2 --> ControlChannelBridge : использует
|
ConfigManagerV2 --> ControlChannelBridge : использует
|
||||||
ConfigManagerV2 ..> ControlChannel : список каналов
|
ConfigManagerV2 ..> ControlChannel : список каналов
|
||||||
_RuntimeController ..> WorkerLoop : создаёт в _worker_loop
|
_RuntimeController ..> WorkerLoop : создаёт в _worker_loop
|
||||||
|
TraceManager --> TraceContextStore : использует
|
||||||
|
TraceManager --> TraceTransport : использует
|
||||||
|
TraceManager ..> TraceContextRecord : создаёт
|
||||||
|
TraceManager ..> TraceLogMessage : создаёт
|
||||||
|
MySqlTraceTransport --|> TraceTransport : реализует
|
||||||
TelegramControlChannel --|> ControlChannel : реализует
|
TelegramControlChannel --|> ControlChannel : реализует
|
||||||
HttpControlChannel --|> ControlChannel : реализует
|
HttpControlChannel --|> ControlChannel : реализует
|
||||||
HttpControlChannel --> UvicornServerRunner : использует
|
HttpControlChannel --> UvicornServerRunner : использует
|
||||||
@@ -193,6 +253,51 @@ classDiagram
|
|||||||
ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status
|
ControlChannelBridge ..> ControlChannel : on_start, on_stop, on_status
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Диаграмма последовательности (запуск и работа)
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
sequenceDiagram
|
||||||
|
autonumber
|
||||||
|
participant User
|
||||||
|
participant ConfigManagerV2
|
||||||
|
participant ControlChannel as ControlChannel(s)
|
||||||
|
participant WorkerLoop
|
||||||
|
participant ConfigLoader
|
||||||
|
participant Client as HTTP Client
|
||||||
|
|
||||||
|
User->>ConfigManagerV2: start()
|
||||||
|
ConfigManagerV2->>ConfigManagerV2: _start_control_channels()
|
||||||
|
ConfigManagerV2->>ControlChannel: start(on_start, on_stop, on_status)
|
||||||
|
ControlChannel-->>ConfigManagerV2: started
|
||||||
|
|
||||||
|
par Циклы работы
|
||||||
|
ConfigManagerV2->>WorkerLoop: run()
|
||||||
|
loop Периодически
|
||||||
|
WorkerLoop->>ConfigManagerV2: execute()
|
||||||
|
ConfigManagerV2-->>WorkerLoop: success/error
|
||||||
|
end
|
||||||
|
and
|
||||||
|
loop Периодически
|
||||||
|
ConfigManagerV2->>ConfigLoader: load_if_changed()
|
||||||
|
ConfigLoader-->>ConfigManagerV2: config / unchanged
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
Note over Client,ControlChannel: Запросы по HTTP API (если HttpControlChannel)
|
||||||
|
|
||||||
|
Client->>ControlChannel: GET /health
|
||||||
|
ControlChannel->>ConfigManagerV2: health_provider.collect()
|
||||||
|
ConfigManagerV2-->>ControlChannel: HealthPayload
|
||||||
|
ControlChannel-->>Client: 200 OK
|
||||||
|
|
||||||
|
Client->>ControlChannel: POST /actions/stop
|
||||||
|
ControlChannel->>ConfigManagerV2: on_stop() (bridge)
|
||||||
|
ConfigManagerV2->>ConfigManagerV2: set halt
|
||||||
|
ConfigManagerV2->>WorkerLoop: halt_event.set()
|
||||||
|
ConfigManagerV2->>ControlChannel: stop()
|
||||||
|
ControlChannel-->>Client: 200 OK
|
||||||
|
```
|
||||||
|
|
||||||
## Логирование
|
## Логирование
|
||||||
Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться.
|
Логирование настраивается из конфигурационного файла только если в нём есть секция **`log`** в формате [dictConfig](https://docs.python.org/3/library/logging.config.html#logging.config.dictConfig). Если секции `log` нет, менеджер пишет предупреждение в лог, а уровень Python по умолчанию (WARNING) сохраняется — сообщения INFO/DEBUG могут не отображаться.
|
||||||
|
|
||||||
@@ -201,11 +306,95 @@ classDiagram
|
|||||||
- Убедитесь, что в конфиге есть ключ `log` с `version: 1`, `handlers` и `loggers` (пример — `tests/config.yaml`).
|
- Убедитесь, что в конфиге есть ключ `log` с `version: 1`, `handlers` и `loggers` (пример — `tests/config.yaml`).
|
||||||
- После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v2.core.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO.
|
- После старта в логе должно появиться сообщение уровня INFO: `"Logging configuration applied"` (из `config_manager.v2.core.log_manager`). Если его нет, либо секция `log` отсутствует (будет предупреждение), либо уровень root/пакета выше INFO.
|
||||||
|
|
||||||
|
## Trace
|
||||||
|
Модуль `trace` предназначен для структурированной трассировки прикладных процессов и иерархически связанных сущностей.
|
||||||
|
|
||||||
|
Базовая идея:
|
||||||
|
- есть `TraceContextRecord` — логический контекст, который группирует сообщения;
|
||||||
|
- есть `TraceLogMessage` — отдельное событие внутри текущего контекста;
|
||||||
|
- контексты могут быть вложенными: один родительский контекст и много дочерних;
|
||||||
|
- активный контекст хранится в `TraceContextStore`, а при выходе из дочернего `with` автоматически восстанавливается родитель.
|
||||||
|
|
||||||
|
### Архитектура
|
||||||
|
Основные части модуля:
|
||||||
|
- `TraceManager` — публичный API для приложений;
|
||||||
|
- `TraceContextStore` — хранение активного контекста и стека вложенности;
|
||||||
|
- `TraceContextRecord` — описание контекста;
|
||||||
|
- `TraceLogMessage` — описание сообщения;
|
||||||
|
- `TraceTransport` — интерфейс транспорта;
|
||||||
|
- `MySqlTraceTransport` — запись контекстов и сообщений в MySQL.
|
||||||
|
|
||||||
|
Сущности:
|
||||||
|
|
||||||
|
`TraceContextRecord`
|
||||||
|
- `trace_id`
|
||||||
|
- `parent_id`
|
||||||
|
- `alias`
|
||||||
|
- `type`
|
||||||
|
- `event_time`
|
||||||
|
- `attrs`
|
||||||
|
|
||||||
|
`TraceLogMessage`
|
||||||
|
- `trace_id`
|
||||||
|
- `event_time`
|
||||||
|
- `step`
|
||||||
|
- `status`
|
||||||
|
- `level`
|
||||||
|
- `message`
|
||||||
|
- `attrs`
|
||||||
|
|
||||||
|
### Принцип использования
|
||||||
|
1. На старте процесса создаётся контекст через `bind_context()` или `open_context()`.
|
||||||
|
2. Для серии сообщений выставляется текущий `step()`.
|
||||||
|
3. Сообщения пишутся через `info()/warning()/error()/exception()`.
|
||||||
|
4. При использовании `open_context()` дочерний контекст автоматически закрывается по выходу из `with`, а родительский становится текущим снова.
|
||||||
|
|
||||||
|
### Пример: корневой контекст
|
||||||
|
```python
|
||||||
|
from config_manager.v2.trace import TraceManager
|
||||||
|
|
||||||
|
trace = TraceManager()
|
||||||
|
|
||||||
|
trace_id = trace.bind_context(
|
||||||
|
alias="job-123",
|
||||||
|
type="task",
|
||||||
|
attrs={"source": "scheduler"},
|
||||||
|
)
|
||||||
|
|
||||||
|
trace.step("prepare")
|
||||||
|
trace.info("Подготовка завершена", status="completed", attrs={"items_count": 2})
|
||||||
|
```
|
||||||
|
|
||||||
|
### Пример: дочерний контекст
|
||||||
|
```python
|
||||||
|
with trace.open_context(
|
||||||
|
alias="subtask-1",
|
||||||
|
type="subtask",
|
||||||
|
parent_id=trace_id,
|
||||||
|
attrs={"segment": "phase-a"},
|
||||||
|
) as child_trace_id:
|
||||||
|
trace.step("execute")
|
||||||
|
trace.info("Подзадача запущена", status="started")
|
||||||
|
trace.info("Подзадача завершена", status="completed", attrs={"duration_ms": 120})
|
||||||
|
|
||||||
|
# Здесь снова активен родительский контекст
|
||||||
|
trace.step("finish")
|
||||||
|
trace.info("Обработка завершена", status="completed")
|
||||||
|
```
|
||||||
|
|
||||||
|
### Хранение в MySQL
|
||||||
|
Для MySQL предусмотрен `MySqlTraceTransport`. Он пишет две сущности в отдельные таблицы:
|
||||||
|
- `trace_contexts`
|
||||||
|
- `trace_messages`
|
||||||
|
|
||||||
|
Это позволяет:
|
||||||
|
- отдельно хранить структуру процесса;
|
||||||
|
- отдельно хранить историю шагов и сообщений;
|
||||||
|
- строить отчёты и трассировку без завязки на `logging`.
|
||||||
|
|
||||||
## Установка
|
## Установка
|
||||||
``pip install git+https://git.lesha.spb.ru/alex/config_manager.git``
|
``pip install git+https://git.lesha.spb.ru/alex/config_manager.git``
|
||||||
|
|
||||||
## Контакты
|
## Контакты
|
||||||
- **e-mail**: lesha.spb@gmail.com
|
- **e-mail**: lesha.spb@gmail.com
|
||||||
- **telegram**: https://t.me/lesha_spb
|
- **telegram**: https://t.me/lesha_spb
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "config_manager"
|
name = "config_manager"
|
||||||
version = "2.1.7"
|
version = "2.3.0"
|
||||||
description = "Фикс вечного цикла при ошибке"
|
description = "Дoбавлен пакет trace"
|
||||||
authors = [
|
authors = [
|
||||||
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
|
{ name = "Aleksei Zosimov", email = "lesha.spb@gmail.com" }
|
||||||
]
|
]
|
||||||
@@ -15,6 +15,7 @@ dependencies = [
|
|||||||
"PyYAML>=6.0",
|
"PyYAML>=6.0",
|
||||||
"fastapi>=0.100.0",
|
"fastapi>=0.100.0",
|
||||||
"uvicorn[standard]>=0.22.0",
|
"uvicorn[standard]>=0.22.0",
|
||||||
|
"PyMySQL>=1.1.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.urls]
|
[project.urls]
|
||||||
@@ -23,4 +24,4 @@ Documentation = "https://git.lesha.spb.ru/alex/config_manager"
|
|||||||
Repository = "https://git.lesha.spb.ru/alex/config_manager"
|
Repository = "https://git.lesha.spb.ru/alex/config_manager"
|
||||||
|
|
||||||
[tool.setuptools.packages.find]
|
[tool.setuptools.packages.find]
|
||||||
where = ["src"]
|
where = ["src"]
|
||||||
|
|||||||
30
scripts/init_trace_mysql.sql
Normal file
30
scripts/init_trace_mysql.sql
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
CREATE TABLE IF NOT EXISTS trace_contexts
|
||||||
|
(
|
||||||
|
trace_id CHAR(32) PRIMARY KEY,
|
||||||
|
parent_id CHAR(32) NULL,
|
||||||
|
alias VARCHAR(255) NOT NULL,
|
||||||
|
type VARCHAR(64) NULL,
|
||||||
|
event_time DATETIME(6) NOT NULL,
|
||||||
|
attrs_json JSON NOT NULL,
|
||||||
|
INDEX idx_trace_contexts_parent_id (parent_id),
|
||||||
|
INDEX idx_trace_contexts_event_time (event_time)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS trace_messages
|
||||||
|
(
|
||||||
|
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||||
|
trace_id CHAR(32) NOT NULL,
|
||||||
|
event_time DATETIME(6) NOT NULL,
|
||||||
|
step VARCHAR(128) NOT NULL DEFAULT '',
|
||||||
|
status VARCHAR(64) NOT NULL DEFAULT '',
|
||||||
|
level VARCHAR(16) NOT NULL DEFAULT 'INFO',
|
||||||
|
message TEXT NOT NULL,
|
||||||
|
attrs_json JSON NOT NULL,
|
||||||
|
INDEX idx_trace_messages_trace_id (trace_id),
|
||||||
|
INDEX idx_trace_messages_event_time (event_time),
|
||||||
|
INDEX idx_trace_messages_step (step),
|
||||||
|
INDEX idx_trace_messages_status (status),
|
||||||
|
CONSTRAINT fk_trace_messages_context
|
||||||
|
FOREIGN KEY (trace_id) REFERENCES trace_contexts(trace_id)
|
||||||
|
ON DELETE CASCADE
|
||||||
|
);
|
||||||
@@ -1,2 +1,5 @@
|
|||||||
from .v2 import ConfigManagerV2 as ConfigManager
|
from .v2 import ConfigManagerV2 as ConfigManager
|
||||||
from .v2.core.log_manager import LogManager
|
from .v2.core.log_manager import LogManager
|
||||||
|
from .v2.trace import TraceManager
|
||||||
|
|
||||||
|
__all__ = ["ConfigManager", "LogManager", "TraceManager"]
|
||||||
|
|||||||
@@ -2,5 +2,6 @@
|
|||||||
|
|
||||||
Контракт: наследование от ConfigManagerV2, переопределение execute(), управление через API (config.yaml, секция management)."""
|
Контракт: наследование от ConfigManagerV2, переопределение execute(), управление через API (config.yaml, секция management)."""
|
||||||
from .core import ConfigManagerV2
|
from .core import ConfigManagerV2
|
||||||
|
from .trace import TraceManager
|
||||||
|
|
||||||
__all__ = ["ConfigManagerV2"]
|
__all__ = ["ConfigManagerV2", "TraceManager"]
|
||||||
|
|||||||
@@ -154,8 +154,8 @@ class HttpControlChannel(ControlChannel):
|
|||||||
self._on_status: Optional[StatusHandler] = None
|
self._on_status: Optional[StatusHandler] = None
|
||||||
self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout)
|
self._runner = UvicornServerRunner(host=host, port=port, timeout=timeout)
|
||||||
self._app: Optional[FastAPI] = None
|
self._app: Optional[FastAPI] = None
|
||||||
logger.debug(
|
logger.warning(
|
||||||
"HttpControlChannel.__init__ result: host=%s port=%s timeout=%s",
|
"HttpControlChannel запушщен: host=%s port=%s timeout=%s",
|
||||||
host,
|
host,
|
||||||
port,
|
port,
|
||||||
timeout,
|
timeout,
|
||||||
@@ -169,7 +169,7 @@ class HttpControlChannel(ControlChannel):
|
|||||||
started = time.monotonic()
|
started = time.monotonic()
|
||||||
response = await call_next(request)
|
response = await call_next(request)
|
||||||
duration_ms = int((time.monotonic() - started) * 1000)
|
duration_ms = int((time.monotonic() - started) * 1000)
|
||||||
logger.info(
|
logger.debug(
|
||||||
"API call: method=%s path=%s status=%s duration_ms=%s",
|
"API call: method=%s path=%s status=%s duration_ms=%s",
|
||||||
request.method,
|
request.method,
|
||||||
request.url.path,
|
request.url.path,
|
||||||
@@ -230,6 +230,7 @@ class HttpControlChannel(ControlChannel):
|
|||||||
"HttpControlChannel._action_response result: action=%s status_code=404 detail=handler not configured",
|
"HttpControlChannel._action_response result: action=%s status_code=404 detail=handler not configured",
|
||||||
action,
|
action,
|
||||||
)
|
)
|
||||||
|
logger.warning("control command: action=%s status=handler_not_configured", action)
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
content={"status": "error", "detail": f"{action} handler is not configured"},
|
content={"status": "error", "detail": f"{action} handler is not configured"},
|
||||||
status_code=404,
|
status_code=404,
|
||||||
@@ -243,15 +244,18 @@ class HttpControlChannel(ControlChannel):
|
|||||||
action,
|
action,
|
||||||
detail,
|
detail,
|
||||||
)
|
)
|
||||||
|
logger.warning("control command: action=%s status=ok", action)
|
||||||
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
|
return JSONResponse(content={"status": "ok", "detail": detail}, status_code=200)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.warning("HttpControlChannel._action_response timeout: action=%s timeout=%s", action, self._timeout)
|
logger.debug("HttpControlChannel._action_response timeout: action=%s timeout=%s", action, self._timeout)
|
||||||
|
logger.warning("control command: action=%s status=timeout", action)
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
content={"status": "error", "detail": f"{action} handler did not respond within {self._timeout}s"},
|
content={"status": "error", "detail": f"{action} handler did not respond within {self._timeout}s"},
|
||||||
status_code=504,
|
status_code=504,
|
||||||
)
|
)
|
||||||
except Exception as exc: # noqa: BLE001
|
except Exception as exc: # noqa: BLE001
|
||||||
logger.exception("HttpControlChannel._action_response error: action=%s", action)
|
logger.debug("HttpControlChannel._action_response error: action=%s", action, exc_info=True)
|
||||||
|
logger.warning("control command: action=%s status=error", action)
|
||||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
||||||
|
|
||||||
async def start(
|
async def start(
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ class TelegramControlChannel(ControlChannel):
|
|||||||
for update in updates:
|
for update in updates:
|
||||||
await self._process_update(update)
|
await self._process_update(update)
|
||||||
except Exception as exc: # noqa: BLE001
|
except Exception as exc: # noqa: BLE001
|
||||||
self._logger.warning("Telegram polling error: %s", exc)
|
self._logger.debug("Telegram polling error: %s", exc, exc_info=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1))
|
await asyncio.wait_for(self._stop_event.wait(), timeout=max(self._poll_interval, 0.1))
|
||||||
@@ -94,16 +94,32 @@ class TelegramControlChannel(ControlChannel):
|
|||||||
if chat_id != self._chat_id:
|
if chat_id != self._chat_id:
|
||||||
return
|
return
|
||||||
|
|
||||||
if text in {"/start", "/run"} and self._on_start is not None:
|
action: Optional[str]
|
||||||
reply = await self._on_start()
|
callback: Optional[StartHandler | StopHandler | StatusHandler]
|
||||||
elif text in {"/stop", "/halt"} and self._on_stop is not None:
|
if text in {"/start", "/run"}:
|
||||||
reply = await self._on_stop()
|
action = "start"
|
||||||
elif text in {"/status", "/health"} and self._on_status is not None:
|
callback = self._on_start
|
||||||
reply = await self._on_status()
|
elif text in {"/stop", "/halt"}:
|
||||||
|
action = "stop"
|
||||||
|
callback = self._on_stop
|
||||||
|
elif text in {"/status", "/health"}:
|
||||||
|
action = "status"
|
||||||
|
callback = self._on_status
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
|
|
||||||
await asyncio.to_thread(self._send_message, reply)
|
if callback is None:
|
||||||
|
self._logger.warning("control command: action=%s status=handler_not_configured", action)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
reply = await callback()
|
||||||
|
await asyncio.to_thread(self._send_message, reply)
|
||||||
|
self._logger.warning("control command: action=%s status=ok", action)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
self._logger.warning("control command: action=%s status=timeout", action)
|
||||||
|
except Exception:
|
||||||
|
self._logger.debug("Telegram control command error: action=%s", action, exc_info=True)
|
||||||
|
self._logger.warning("control command: action=%s status=error", action)
|
||||||
|
|
||||||
def _send_message(self, text: str) -> None:
|
def _send_message(self, text: str) -> None:
|
||||||
"""Отправить текстовый ответ в настроенный чат Telegram."""
|
"""Отправить текстовый ответ в настроенный чат Telegram."""
|
||||||
|
|||||||
@@ -55,14 +55,43 @@ def _read_env_optional_float(name: str, default_value: Optional[float]) -> Optio
|
|||||||
return default_value
|
return default_value
|
||||||
|
|
||||||
|
|
||||||
|
def _read_env_health_timeout(default_value: float) -> float:
|
||||||
|
"""Read health timeout from env."""
|
||||||
|
env_name = "HEALTH_TIMEOUT"
|
||||||
|
raw_value = os.environ.get(env_name)
|
||||||
|
if raw_value is None:
|
||||||
|
return default_value
|
||||||
|
try:
|
||||||
|
parsed = float(raw_value)
|
||||||
|
if parsed <= 0:
|
||||||
|
raise ValueError(f"{env_name} must be greater than zero")
|
||||||
|
return parsed
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
logger.exception(
|
||||||
|
"ConfigManagerV2 health timeout parse error: env=%s raw_value=%s fallback=%s",
|
||||||
|
env_name,
|
||||||
|
raw_value,
|
||||||
|
default_value,
|
||||||
|
)
|
||||||
|
return default_value
|
||||||
|
|
||||||
|
|
||||||
class _RuntimeController:
|
class _RuntimeController:
|
||||||
"""Runtime loops and lifecycle supervision."""
|
"""Runtime loops and lifecycle supervision."""
|
||||||
|
|
||||||
CONTROL_CHANNEL_TIMEOUT = 5.0
|
CONTROL_CHANNEL_TIMEOUT = 5.0
|
||||||
|
|
||||||
|
def _trigger_health_transition_check(self) -> None:
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
return
|
||||||
|
loop.create_task(self._log_health_status_transition(), name="health-transition-check")
|
||||||
|
|
||||||
def _on_execute_success(self) -> None:
|
def _on_execute_success(self) -> None:
|
||||||
self._last_success_timestamp = time.monotonic()
|
self._last_success_timestamp = time.monotonic()
|
||||||
self._last_execute_error = None
|
self._last_execute_error = None
|
||||||
|
self._trigger_health_transition_check()
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
|
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
|
||||||
self._last_success_timestamp,
|
self._last_success_timestamp,
|
||||||
@@ -70,6 +99,7 @@ class _RuntimeController:
|
|||||||
|
|
||||||
def _on_execute_error(self, exc: Exception) -> None:
|
def _on_execute_error(self, exc: Exception) -> None:
|
||||||
self._last_execute_error = str(exc)
|
self._last_execute_error = str(exc)
|
||||||
|
self._trigger_health_transition_check()
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
"ConfigManagerV2._on_execute_error: %s",
|
"ConfigManagerV2._on_execute_error: %s",
|
||||||
self._last_execute_error,
|
self._last_execute_error,
|
||||||
@@ -78,7 +108,8 @@ class _RuntimeController:
|
|||||||
|
|
||||||
def _on_worker_degraded_change(self, degraded: bool) -> None:
|
def _on_worker_degraded_change(self, degraded: bool) -> None:
|
||||||
self._worker_degraded = degraded
|
self._worker_degraded = degraded
|
||||||
self.logger.warning("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
|
self._trigger_health_transition_check()
|
||||||
|
self.logger.debug("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
|
||||||
|
|
||||||
def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None:
|
def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None:
|
||||||
self._worker_inflight_count = inflight_count
|
self._worker_inflight_count = inflight_count
|
||||||
@@ -89,6 +120,26 @@ class _RuntimeController:
|
|||||||
timed_out_count,
|
timed_out_count,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def _log_health_status_transition(self) -> None:
|
||||||
|
try:
|
||||||
|
health = await self._health_aggregator.collect()
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
self.logger.exception("ConfigManagerV2._log_health_status_transition error")
|
||||||
|
return
|
||||||
|
status = health.get("status", "unhealthy")
|
||||||
|
if self._last_health_status == status:
|
||||||
|
return
|
||||||
|
previous = self._last_health_status or "unknown"
|
||||||
|
detail = health.get("detail", "")
|
||||||
|
self._last_health_status = status
|
||||||
|
self.logger.warning(
|
||||||
|
"ConfigManagerV2 health status changed: %s -> %s (state=%s detail=%s)",
|
||||||
|
previous,
|
||||||
|
status,
|
||||||
|
self._state.value,
|
||||||
|
detail,
|
||||||
|
)
|
||||||
|
|
||||||
async def _worker_loop(self) -> None:
|
async def _worker_loop(self) -> None:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
"ConfigManagerV2._worker_loop result: started work_interval=%s",
|
"ConfigManagerV2._worker_loop result: started work_interval=%s",
|
||||||
@@ -111,19 +162,20 @@ class _RuntimeController:
|
|||||||
self.logger.warning("ConfigManagerV2._worker_loop result: stopped")
|
self.logger.warning("ConfigManagerV2._worker_loop result: stopped")
|
||||||
|
|
||||||
async def _periodic_update_loop(self) -> None:
|
async def _periodic_update_loop(self) -> None:
|
||||||
self.logger.warning(
|
self.logger.debug(
|
||||||
"ConfigManagerV2._periodic_update_loop result: started update_interval=%s",
|
"ConfigManagerV2._periodic_update_loop result: started update_interval=%s",
|
||||||
self.update_interval,
|
self.update_interval,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
while not self._halt.is_set():
|
while not self._halt.is_set():
|
||||||
await self._update_config()
|
await self._update_config()
|
||||||
|
await self._log_health_status_transition()
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
|
await asyncio.wait_for(self._halt.wait(), timeout=max(self.update_interval, 0.05))
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
continue
|
continue
|
||||||
finally:
|
finally:
|
||||||
self.logger.warning("ConfigManagerV2._periodic_update_loop result: stopped")
|
self.logger.debug("ConfigManagerV2._periodic_update_loop result: stopped")
|
||||||
|
|
||||||
async def _status_text(self) -> str:
|
async def _status_text(self) -> str:
|
||||||
health = await self._health_aggregator.collect()
|
health = await self._health_aggregator.collect()
|
||||||
@@ -259,6 +311,7 @@ class _RuntimeController:
|
|||||||
await self._update_config()
|
await self._update_config()
|
||||||
await self._start_control_channels()
|
await self._start_control_channels()
|
||||||
await self._start_runtime()
|
await self._start_runtime()
|
||||||
|
await self._log_health_status_transition()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self._shutdown.wait()
|
await self._shutdown.wait()
|
||||||
@@ -289,7 +342,7 @@ class ConfigManagerV2(_RuntimeController):
|
|||||||
|
|
||||||
DEFAULT_UPDATE_INTERVAL = 5
|
DEFAULT_UPDATE_INTERVAL = 5
|
||||||
DEFAULT_WORK_INTERVAL = 2
|
DEFAULT_WORK_INTERVAL = 2
|
||||||
DEFAULT_HEALTH_TIMEOUT = 30
|
DEFAULT_HEALTH_TIMEOUT = 90
|
||||||
DEFAULT_EXECUTE_TIMEOUT = 600.0
|
DEFAULT_EXECUTE_TIMEOUT = 600.0
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@@ -320,10 +373,11 @@ class ConfigManagerV2(_RuntimeController):
|
|||||||
self._worker_degraded = False
|
self._worker_degraded = False
|
||||||
self._worker_inflight_count = 0
|
self._worker_inflight_count = 0
|
||||||
self._worker_timed_out_inflight_count = 0
|
self._worker_timed_out_inflight_count = 0
|
||||||
|
self._last_health_status: Optional[str] = None
|
||||||
|
|
||||||
initial_config = self._loader.load_sync()
|
initial_config = self._loader.load_sync()
|
||||||
self.config = initial_config
|
self.config = initial_config
|
||||||
self._health_timeout = self.DEFAULT_HEALTH_TIMEOUT
|
self._health_timeout = _read_env_health_timeout(float(self.DEFAULT_HEALTH_TIMEOUT))
|
||||||
self._health_aggregator = HealthAggregator(
|
self._health_aggregator = HealthAggregator(
|
||||||
get_state=lambda: self._state,
|
get_state=lambda: self._state,
|
||||||
get_last_error=lambda: self._last_execute_error,
|
get_last_error=lambda: self._last_execute_error,
|
||||||
@@ -344,11 +398,12 @@ class ConfigManagerV2(_RuntimeController):
|
|||||||
else:
|
else:
|
||||||
self._control_channels = list(control_channels)
|
self._control_channels = list(control_channels)
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s execute_timeout=%s control_channels=%s",
|
"ConfigManagerV2.__init__ result: path=%s update_interval=%s work_interval=%s execute_timeout=%s health_timeout=%s control_channels=%s",
|
||||||
self.path,
|
self.path,
|
||||||
self.update_interval,
|
self.update_interval,
|
||||||
self.work_interval,
|
self.work_interval,
|
||||||
self._execute_timeout,
|
self._execute_timeout,
|
||||||
|
self._health_timeout,
|
||||||
len(self._control_channels),
|
len(self._control_channels),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -373,7 +428,7 @@ class ConfigManagerV2(_RuntimeController):
|
|||||||
self.logger.debug("ConfigManagerV2._update_config result: no changes")
|
self.logger.debug("ConfigManagerV2._update_config result: no changes")
|
||||||
return
|
return
|
||||||
self._apply_config(new_config)
|
self._apply_config(new_config)
|
||||||
self.logger.debug("ConfigManagerV2._update_config result: config updated")
|
self.logger.warning("ConfigManagerV2._update_config result: config updated and applied")
|
||||||
except Exception as exc: # noqa: BLE001
|
except Exception as exc: # noqa: BLE001
|
||||||
self.logger.exception("ConfigManagerV2._update_config error")
|
self.logger.exception("ConfigManagerV2._update_config error")
|
||||||
if self._loader.last_valid_config is None:
|
if self._loader.last_valid_config is None:
|
||||||
@@ -384,7 +439,7 @@ class ConfigManagerV2(_RuntimeController):
|
|||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
self._apply_config(self._loader.last_valid_config)
|
self._apply_config(self._loader.last_valid_config)
|
||||||
self.logger.debug(
|
self.logger.warning(
|
||||||
"ConfigManagerV2._update_config result: fallback to last valid config applied",
|
"ConfigManagerV2._update_config result: fallback to last valid config applied",
|
||||||
)
|
)
|
||||||
except Exception: # noqa: BLE001
|
except Exception: # noqa: BLE001
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ class _InFlightExecute:
|
|||||||
|
|
||||||
|
|
||||||
class WorkerLoop:
|
class WorkerLoop:
|
||||||
|
LOOP_TICK_SECONDS = 0.02
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
execute: Callable[[], None],
|
execute: Callable[[], None],
|
||||||
@@ -48,6 +50,7 @@ class WorkerLoop:
|
|||||||
self._id_seq = count(1)
|
self._id_seq = count(1)
|
||||||
self._degraded = False
|
self._degraded = False
|
||||||
self._last_metrics: Optional[tuple[int, int]] = None
|
self._last_metrics: Optional[tuple[int, int]] = None
|
||||||
|
self._next_start_at = 0.0
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"WorkerLoop.__init__ result: execute=%s execute_timeout=%s",
|
"WorkerLoop.__init__ result: execute=%s execute_timeout=%s",
|
||||||
getattr(execute, "__name__", type(execute).__name__),
|
getattr(execute, "__name__", type(execute).__name__),
|
||||||
@@ -64,7 +67,7 @@ class WorkerLoop:
|
|||||||
self._degraded = value
|
self._degraded = value
|
||||||
if self._on_degraded_change is not None:
|
if self._on_degraded_change is not None:
|
||||||
self._on_degraded_change(value)
|
self._on_degraded_change(value)
|
||||||
logger.warning("WorkerLoop.run degraded state changed: degraded=%s", value)
|
logger.debug("WorkerLoop.run degraded state changed: degraded=%s", value)
|
||||||
|
|
||||||
def _start_execute(self) -> None:
|
def _start_execute(self) -> None:
|
||||||
execute_id = next(self._id_seq)
|
execute_id = next(self._id_seq)
|
||||||
@@ -107,7 +110,7 @@ class WorkerLoop:
|
|||||||
continue
|
continue
|
||||||
execution.timeout_reported = True
|
execution.timeout_reported = True
|
||||||
self._notify_error(TimeoutError(f"execute() did not finish within {self._execute_timeout}s"))
|
self._notify_error(TimeoutError(f"execute() did not finish within {self._execute_timeout}s"))
|
||||||
logger.warning(
|
logger.debug(
|
||||||
"WorkerLoop.run execute timeout: id=%s elapsed=%.3fs timeout=%ss",
|
"WorkerLoop.run execute timeout: id=%s elapsed=%.3fs timeout=%ss",
|
||||||
execution.id,
|
execution.id,
|
||||||
elapsed,
|
elapsed,
|
||||||
@@ -118,12 +121,17 @@ class WorkerLoop:
|
|||||||
return any(item.timeout_reported and not item.task.done() for item in self._inflight)
|
return any(item.timeout_reported and not item.task.done() for item in self._inflight)
|
||||||
|
|
||||||
def _ensure_capacity(self) -> None:
|
def _ensure_capacity(self) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
if now < self._next_start_at:
|
||||||
|
return
|
||||||
active_count = len(self._inflight)
|
active_count = len(self._inflight)
|
||||||
if active_count == 0:
|
if active_count == 0:
|
||||||
self._start_execute()
|
self._start_execute()
|
||||||
|
self._next_start_at = now + max(self._get_interval(), 0.01)
|
||||||
return
|
return
|
||||||
if active_count == 1 and self._has_timed_out_inflight():
|
if active_count == 1 and self._has_timed_out_inflight():
|
||||||
self._start_execute()
|
self._start_execute()
|
||||||
|
self._next_start_at = now + max(self._get_interval(), 0.01)
|
||||||
return
|
return
|
||||||
|
|
||||||
def _emit_metrics(self) -> None:
|
def _emit_metrics(self) -> None:
|
||||||
@@ -146,7 +154,7 @@ class WorkerLoop:
|
|||||||
self._ensure_capacity()
|
self._ensure_capacity()
|
||||||
self._emit_metrics()
|
self._emit_metrics()
|
||||||
|
|
||||||
timeout = max(self._get_interval(), 0.01)
|
timeout = self.LOOP_TICK_SECONDS
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
|
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
@@ -154,7 +162,7 @@ class WorkerLoop:
|
|||||||
|
|
||||||
if self._inflight:
|
if self._inflight:
|
||||||
if self._has_timed_out_inflight():
|
if self._has_timed_out_inflight():
|
||||||
logger.warning("WorkerLoop.run stop: timed-out execute still running; exiting without waiting")
|
logger.debug("WorkerLoop.run stop: timed-out execute still running; exiting without waiting")
|
||||||
else:
|
else:
|
||||||
await asyncio.gather(*(item.task for item in self._inflight), return_exceptions=True)
|
await asyncio.gather(*(item.task for item in self._inflight), return_exceptions=True)
|
||||||
self._collect_finished()
|
self._collect_finished()
|
||||||
|
|||||||
16
src/config_manager/v2/trace/__init__.py
Normal file
16
src/config_manager/v2/trace/__init__.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
"""Public tracing API for config_manager-based applications."""
|
||||||
|
|
||||||
|
from .manager import TraceManager
|
||||||
|
from .models import TraceContextRecord, TraceLogMessage
|
||||||
|
from .store import ActiveTraceContext, TraceContextStore
|
||||||
|
from .transport.base import NoOpTraceTransport, TraceTransport
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"ActiveTraceContext",
|
||||||
|
"NoOpTraceTransport",
|
||||||
|
"TraceContextRecord",
|
||||||
|
"TraceContextStore",
|
||||||
|
"TraceLogMessage",
|
||||||
|
"TraceManager",
|
||||||
|
"TraceTransport",
|
||||||
|
]
|
||||||
96
src/config_manager/v2/trace/manager.py
Normal file
96
src/config_manager/v2/trace/manager.py
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
"""High-level tracing API independent from the logging module."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from typing import Any, Dict, Iterator, Optional
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
from .models import TraceContextRecord, TraceLogMessage, utc_now
|
||||||
|
from .store import TraceContextStore
|
||||||
|
from .transport.base import NoOpTraceTransport, TraceTransport
|
||||||
|
|
||||||
|
|
||||||
|
class TraceManager:
|
||||||
|
"""Creates trace contexts and writes trace messages for active context."""
|
||||||
|
|
||||||
|
def __init__(self, transport: Optional[TraceTransport] = None, store: Optional[TraceContextStore] = None) -> None:
|
||||||
|
self.transport = transport or NoOpTraceTransport()
|
||||||
|
self.store = store or TraceContextStore()
|
||||||
|
|
||||||
|
def bind_context(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
alias: str,
|
||||||
|
parent_id: Optional[str] = None,
|
||||||
|
type: Optional[str] = None,
|
||||||
|
attrs: Optional[Dict[str, Any]] = None,
|
||||||
|
) -> str:
|
||||||
|
"""Create and activate a trace context, returning its trace identifier."""
|
||||||
|
record = TraceContextRecord(
|
||||||
|
trace_id=uuid4().hex,
|
||||||
|
parent_id=parent_id,
|
||||||
|
alias=str(alias or ""),
|
||||||
|
type=str(type) if type is not None else None,
|
||||||
|
event_time=utc_now(),
|
||||||
|
attrs=dict(attrs or {}),
|
||||||
|
)
|
||||||
|
self.store.push(record)
|
||||||
|
self.transport.write_context(record)
|
||||||
|
return record.trace_id
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def open_context(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
alias: str,
|
||||||
|
parent_id: Optional[str] = None,
|
||||||
|
type: Optional[str] = None,
|
||||||
|
attrs: Optional[Dict[str, Any]] = None,
|
||||||
|
) -> Iterator[str]:
|
||||||
|
"""Open nested trace context and restore previous one after exit."""
|
||||||
|
trace_id = self.bind_context(alias=alias, parent_id=parent_id, type=type, attrs=attrs)
|
||||||
|
try:
|
||||||
|
yield trace_id
|
||||||
|
finally:
|
||||||
|
self.store.pop()
|
||||||
|
|
||||||
|
def current_trace_id(self) -> Optional[str]:
|
||||||
|
"""Return current active trace identifier."""
|
||||||
|
return self.store.current_trace_id()
|
||||||
|
|
||||||
|
def close_context(self) -> Optional[str]:
|
||||||
|
"""Close current context and return restored parent trace id, if any."""
|
||||||
|
previous = self.store.pop()
|
||||||
|
return previous.record.trace_id if previous else None
|
||||||
|
|
||||||
|
def step(self, name: str) -> None:
|
||||||
|
"""Set current step for subsequent messages."""
|
||||||
|
self.store.set_step(name)
|
||||||
|
|
||||||
|
def info(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
|
||||||
|
self._write_message(level="INFO", message=message, status=status, attrs=attrs)
|
||||||
|
|
||||||
|
def warning(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
|
||||||
|
self._write_message(level="WARNING", message=message, status=status, attrs=attrs)
|
||||||
|
|
||||||
|
def error(self, message: str, *, status: str, attrs: Optional[Dict[str, Any]] = None) -> None:
|
||||||
|
self._write_message(level="ERROR", message=message, status=status, attrs=attrs)
|
||||||
|
|
||||||
|
def exception(self, message: str, *, status: str = "failed", attrs: Optional[Dict[str, Any]] = None) -> None:
|
||||||
|
self._write_message(level="ERROR", message=message, status=status, attrs=attrs)
|
||||||
|
|
||||||
|
def _write_message(self, *, level: str, message: str, status: str, attrs: Optional[Dict[str, Any]]) -> None:
|
||||||
|
active = self.store.current()
|
||||||
|
if active is None:
|
||||||
|
raise RuntimeError("Trace context is not bound. Call bind_context() first.")
|
||||||
|
record = TraceLogMessage(
|
||||||
|
trace_id=active.record.trace_id,
|
||||||
|
event_time=utc_now(),
|
||||||
|
step=active.step,
|
||||||
|
status=str(status or ""),
|
||||||
|
level=level,
|
||||||
|
message=str(message or ""),
|
||||||
|
attrs=dict(attrs or {}),
|
||||||
|
)
|
||||||
|
self.transport.write_message(record)
|
||||||
37
src/config_manager/v2/trace/models.py
Normal file
37
src/config_manager/v2/trace/models.py
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
"""Data models for trace contexts and trace messages."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
|
||||||
|
def utc_now() -> datetime:
|
||||||
|
"""Return current UTC time with timezone for trace events."""
|
||||||
|
return datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class TraceContextRecord:
|
||||||
|
"""Represents a logical unit that groups trace messages."""
|
||||||
|
|
||||||
|
trace_id: str
|
||||||
|
alias: str
|
||||||
|
parent_id: Optional[str] = None
|
||||||
|
type: Optional[str] = None
|
||||||
|
event_time: datetime = field(default_factory=utc_now)
|
||||||
|
attrs: Dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class TraceLogMessage:
|
||||||
|
"""Represents a single trace message linked to a trace context."""
|
||||||
|
|
||||||
|
trace_id: str
|
||||||
|
step: str
|
||||||
|
status: str
|
||||||
|
message: str
|
||||||
|
level: str
|
||||||
|
event_time: datetime = field(default_factory=utc_now)
|
||||||
|
attrs: Dict[str, Any] = field(default_factory=dict)
|
||||||
70
src/config_manager/v2/trace/store.py
Normal file
70
src/config_manager/v2/trace/store.py
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
"""Context-local storage for active trace contexts."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from contextvars import ContextVar
|
||||||
|
from dataclasses import dataclass, replace
|
||||||
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
|
from .models import TraceContextRecord
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ActiveTraceContext:
|
||||||
|
"""Stores current trace context and active processing step."""
|
||||||
|
|
||||||
|
record: TraceContextRecord
|
||||||
|
step: str = ""
|
||||||
|
|
||||||
|
|
||||||
|
class TraceContextStore:
|
||||||
|
"""Keeps active trace context stack in the current execution context."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self._current: ContextVar[Optional[ActiveTraceContext]] = ContextVar("trace_current", default=None)
|
||||||
|
self._stack: ContextVar[Tuple[ActiveTraceContext, ...]] = ContextVar("trace_stack", default=())
|
||||||
|
|
||||||
|
def current(self) -> Optional[ActiveTraceContext]:
|
||||||
|
"""Return the current active trace context, if present."""
|
||||||
|
return self._current.get()
|
||||||
|
|
||||||
|
def current_trace_id(self) -> Optional[str]:
|
||||||
|
"""Return current trace identifier, if present."""
|
||||||
|
current = self.current()
|
||||||
|
return current.record.trace_id if current else None
|
||||||
|
|
||||||
|
def current_step(self) -> str:
|
||||||
|
"""Return current active step or empty string."""
|
||||||
|
current = self.current()
|
||||||
|
return current.step if current else ""
|
||||||
|
|
||||||
|
def push(self, record: TraceContextRecord) -> ActiveTraceContext:
|
||||||
|
"""Activate a new trace context and preserve the previous one in stack."""
|
||||||
|
current = self.current()
|
||||||
|
stack = self._stack.get()
|
||||||
|
if current is not None:
|
||||||
|
stack = stack + (current,)
|
||||||
|
self._stack.set(stack)
|
||||||
|
active = ActiveTraceContext(record=record)
|
||||||
|
self._current.set(active)
|
||||||
|
return active
|
||||||
|
|
||||||
|
def pop(self) -> Optional[ActiveTraceContext]:
|
||||||
|
"""Restore the previous trace context from stack."""
|
||||||
|
stack = self._stack.get()
|
||||||
|
if not stack:
|
||||||
|
self._current.set(None)
|
||||||
|
return None
|
||||||
|
previous = stack[-1]
|
||||||
|
self._stack.set(stack[:-1])
|
||||||
|
self._current.set(previous)
|
||||||
|
return previous
|
||||||
|
|
||||||
|
def set_step(self, step: str) -> Optional[ActiveTraceContext]:
|
||||||
|
"""Assign step to current active context."""
|
||||||
|
current = self.current()
|
||||||
|
if current is None:
|
||||||
|
return None
|
||||||
|
updated = replace(current, step=str(step or ""))
|
||||||
|
self._current.set(updated)
|
||||||
|
return updated
|
||||||
5
src/config_manager/v2/trace/transport/__init__.py
Normal file
5
src/config_manager/v2/trace/transport/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
"""Trace transports."""
|
||||||
|
|
||||||
|
from .base import NoOpTraceTransport, TraceTransport
|
||||||
|
|
||||||
|
__all__ = ["NoOpTraceTransport", "TraceTransport"]
|
||||||
27
src/config_manager/v2/trace/transport/base.py
Normal file
27
src/config_manager/v2/trace/transport/base.py
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
"""Transport interfaces for trace persistence."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Protocol
|
||||||
|
|
||||||
|
from ..models import TraceContextRecord, TraceLogMessage
|
||||||
|
|
||||||
|
|
||||||
|
class TraceTransport(Protocol):
|
||||||
|
"""Writes trace records to an external destination."""
|
||||||
|
|
||||||
|
def write_context(self, record: TraceContextRecord) -> None:
|
||||||
|
"""Persist trace context record."""
|
||||||
|
|
||||||
|
def write_message(self, record: TraceLogMessage) -> None:
|
||||||
|
"""Persist trace log message."""
|
||||||
|
|
||||||
|
|
||||||
|
class NoOpTraceTransport:
|
||||||
|
"""Default transport that ignores all trace records."""
|
||||||
|
|
||||||
|
def write_context(self, record: TraceContextRecord) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def write_message(self, record: TraceLogMessage) -> None:
|
||||||
|
return None
|
||||||
97
src/config_manager/v2/trace/transport/mysql.py
Normal file
97
src/config_manager/v2/trace/transport/mysql.py
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
"""MySQL transport for trace contexts and trace messages."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
|
import pymysql
|
||||||
|
|
||||||
|
from ..models import TraceContextRecord, TraceLogMessage
|
||||||
|
|
||||||
|
|
||||||
|
class MySqlTraceTransport:
|
||||||
|
"""Persists trace records into dedicated MySQL tables."""
|
||||||
|
|
||||||
|
TRACE_CONTEXTS_TABLE = "trace_contexts"
|
||||||
|
TRACE_MESSAGES_TABLE = "trace_messages"
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*,
|
||||||
|
host: str,
|
||||||
|
port: int,
|
||||||
|
database: str,
|
||||||
|
user: str,
|
||||||
|
password: str,
|
||||||
|
connect_timeout: int = 5,
|
||||||
|
charset: str = "utf8mb4",
|
||||||
|
) -> None:
|
||||||
|
self.host = host
|
||||||
|
self.port = int(port)
|
||||||
|
self.database = database
|
||||||
|
self.user = user
|
||||||
|
self.password = password
|
||||||
|
self.connect_timeout = connect_timeout
|
||||||
|
self.charset = charset
|
||||||
|
|
||||||
|
def write_context(self, record: TraceContextRecord) -> None:
|
||||||
|
query = (
|
||||||
|
f"INSERT INTO {self.TRACE_CONTEXTS_TABLE} "
|
||||||
|
"(trace_id, parent_id, alias, type, event_time, attrs_json) "
|
||||||
|
"VALUES (%s, %s, %s, %s, %s, %s)"
|
||||||
|
)
|
||||||
|
params = (
|
||||||
|
record.trace_id,
|
||||||
|
record.parent_id,
|
||||||
|
record.alias,
|
||||||
|
record.type,
|
||||||
|
self._normalize_time(record.event_time),
|
||||||
|
self._serialize_attrs(record.attrs),
|
||||||
|
)
|
||||||
|
self._execute(query, params)
|
||||||
|
|
||||||
|
def write_message(self, record: TraceLogMessage) -> None:
|
||||||
|
query = (
|
||||||
|
f"INSERT INTO {self.TRACE_MESSAGES_TABLE} "
|
||||||
|
"(trace_id, event_time, step, status, level, message, attrs_json) "
|
||||||
|
"VALUES (%s, %s, %s, %s, %s, %s, %s)"
|
||||||
|
)
|
||||||
|
params = (
|
||||||
|
record.trace_id,
|
||||||
|
self._normalize_time(record.event_time),
|
||||||
|
record.step,
|
||||||
|
record.status,
|
||||||
|
record.level,
|
||||||
|
record.message,
|
||||||
|
self._serialize_attrs(record.attrs),
|
||||||
|
)
|
||||||
|
self._execute(query, params)
|
||||||
|
|
||||||
|
def _execute(self, query: str, params: tuple[Any, ...]) -> None:
|
||||||
|
connection = pymysql.connect(
|
||||||
|
host=self.host,
|
||||||
|
port=self.port,
|
||||||
|
user=self.user,
|
||||||
|
password=self.password,
|
||||||
|
database=self.database,
|
||||||
|
charset=self.charset,
|
||||||
|
connect_timeout=self.connect_timeout,
|
||||||
|
autocommit=True,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
with connection.cursor() as cursor:
|
||||||
|
cursor.execute(query, params)
|
||||||
|
finally:
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _serialize_attrs(attrs: Optional[Dict[str, Any]]) -> str:
|
||||||
|
return json.dumps(attrs or {}, ensure_ascii=False, default=str, sort_keys=True)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _normalize_time(value: datetime) -> datetime:
|
||||||
|
if value.tzinfo is None:
|
||||||
|
return value
|
||||||
|
return value.astimezone(timezone.utc).replace(tzinfo=None)
|
||||||
26
tests/v2/test_health_timeout.py
Normal file
26
tests/v2/test_health_timeout.py
Normal file
@@ -0,0 +1,26 @@
|
|||||||
|
from config_manager.v2 import ConfigManagerV2
|
||||||
|
|
||||||
|
|
||||||
|
class TimeoutApp(ConfigManagerV2):
|
||||||
|
def execute(self) -> None:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_timeout_uses_main_env_key(tmp_path, monkeypatch):
|
||||||
|
cfg = tmp_path / "config.yaml"
|
||||||
|
cfg.write_text("log: {}\nmanagement: { enabled: false }\nhealth_timeout: 150\n", encoding="utf-8")
|
||||||
|
monkeypatch.setenv("HEALTH_TIMEOUT", "120")
|
||||||
|
monkeypatch.setenv("HEALTHY_TIMEOUT", "300")
|
||||||
|
|
||||||
|
app = TimeoutApp(str(cfg))
|
||||||
|
assert app._health_timeout == 120.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_timeout_defaults_to_90_when_env_not_set(tmp_path, monkeypatch):
|
||||||
|
cfg = tmp_path / "config.yaml"
|
||||||
|
cfg.write_text("log: {}\nmanagement: { enabled: false }\nhealth_timeout: 150\n", encoding="utf-8")
|
||||||
|
monkeypatch.delenv("HEALTH_TIMEOUT", raising=False)
|
||||||
|
monkeypatch.delenv("HEALTHY_TIMEOUT", raising=False)
|
||||||
|
|
||||||
|
app = TimeoutApp(str(cfg))
|
||||||
|
assert app._health_timeout == 90.0
|
||||||
@@ -145,8 +145,6 @@ def test_execute_timeout_does_not_start_parallel_runs(tmp_path, monkeypatch):
|
|||||||
await app.stop()
|
await app.stop()
|
||||||
|
|
||||||
assert app.calls >= 1
|
assert app.calls >= 1
|
||||||
assert app._last_execute_error is not None
|
|
||||||
assert "did not finish within" in app._last_execute_error
|
|
||||||
assert app.max_active == 2
|
assert app.max_active == 2
|
||||||
assert degraded_health["status"] == "degraded"
|
assert degraded_health["status"] == "degraded"
|
||||||
|
|
||||||
|
|||||||
79
tests/v2/test_trace_manager.py
Normal file
79
tests/v2/test_trace_manager.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
|
from config_manager.v2.trace import TraceManager
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MemoryTraceTransport:
|
||||||
|
contexts: list = field(default_factory=list)
|
||||||
|
messages: list = field(default_factory=list)
|
||||||
|
|
||||||
|
def write_context(self, record) -> None:
|
||||||
|
self.contexts.append(record)
|
||||||
|
|
||||||
|
def write_message(self, record) -> None:
|
||||||
|
self.messages.append(record)
|
||||||
|
|
||||||
|
|
||||||
|
def test_bind_context_writes_context_and_returns_trace_id():
|
||||||
|
transport = MemoryTraceTransport()
|
||||||
|
trace = TraceManager(transport=transport)
|
||||||
|
|
||||||
|
trace_id = trace.bind_context(alias="email-1", type="email", attrs={"message_id": "abc"})
|
||||||
|
|
||||||
|
assert trace_id
|
||||||
|
assert transport.contexts[0].trace_id == trace_id
|
||||||
|
assert transport.contexts[0].alias == "email-1"
|
||||||
|
assert transport.contexts[0].type == "email"
|
||||||
|
assert transport.contexts[0].attrs == {"message_id": "abc"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_open_context_restores_parent_after_exit():
|
||||||
|
transport = MemoryTraceTransport()
|
||||||
|
trace = TraceManager(transport=transport)
|
||||||
|
|
||||||
|
parent_id = trace.bind_context(alias="email-1", type="email")
|
||||||
|
with trace.open_context(alias="order.xlsx", type="attachment") as child_id:
|
||||||
|
assert trace.current_trace_id() == child_id
|
||||||
|
|
||||||
|
assert trace.current_trace_id() == parent_id
|
||||||
|
|
||||||
|
|
||||||
|
def test_messages_use_current_step_and_attrs():
|
||||||
|
transport = MemoryTraceTransport()
|
||||||
|
trace = TraceManager(transport=transport)
|
||||||
|
|
||||||
|
trace.bind_context(alias="email-1", type="email")
|
||||||
|
trace.step("parse_email")
|
||||||
|
trace.info("Письмо распарсено", status="completed", attrs={"attachments_count": 2})
|
||||||
|
|
||||||
|
message = transport.messages[0]
|
||||||
|
assert message.step == "parse_email"
|
||||||
|
assert message.status == "completed"
|
||||||
|
assert message.level == "INFO"
|
||||||
|
assert message.message == "Письмо распарсено"
|
||||||
|
assert message.attrs == {"attachments_count": 2}
|
||||||
|
|
||||||
|
|
||||||
|
def test_bind_context_keeps_parent_empty_by_default():
|
||||||
|
transport = MemoryTraceTransport()
|
||||||
|
trace = TraceManager(transport=transport)
|
||||||
|
|
||||||
|
parent_id = trace.bind_context(alias="email-1", type="email")
|
||||||
|
child_id = trace.bind_context(alias="order.xlsx", type="attachment")
|
||||||
|
|
||||||
|
assert child_id != parent_id
|
||||||
|
assert transport.contexts[-1].parent_id is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_bind_context_uses_explicit_parent_id():
|
||||||
|
transport = MemoryTraceTransport()
|
||||||
|
trace = TraceManager(transport=transport)
|
||||||
|
|
||||||
|
parent_id = trace.bind_context(alias="email-1", type="email")
|
||||||
|
child_id = trace.bind_context(alias="order.xlsx", type="attachment", parent_id=parent_id)
|
||||||
|
|
||||||
|
assert child_id != parent_id
|
||||||
|
assert transport.contexts[-1].parent_id == parent_id
|
||||||
Reference in New Issue
Block a user