From 93e9f4c37e086d96141ecb0e4bea814710124810 Mon Sep 17 00:00:00 2001 From: zosimovaa Date: Sat, 7 Mar 2026 17:21:27 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9F=D1=80=D0=B0=D0=B2=D0=BA=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + README.md | 425 +++++++++++++++--------- pyproject.toml | 2 +- requirements/sql/trace_mysql_schema.sql | 37 +++ src/app_runtime/contracts/trace.py | 7 +- src/app_runtime/tracing/service.py | 11 +- tests/test_runtime.py | 32 ++ 7 files changed, 347 insertions(+), 168 deletions(-) create mode 100644 requirements/sql/trace_mysql_schema.sql diff --git a/.gitignore b/.gitignore index c18dd8d..7a60b85 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ __pycache__/ +*.pyc diff --git a/README.md b/README.md index fb3fb0d..ea8b2fb 100644 --- a/README.md +++ b/README.md @@ -1,214 +1,319 @@ # PLBA -`PLBA` (`Platform Runtime for Business Applications`) - runtime для бизнес-приложений. +## 1. Назначение платформы +PLBA (`Platform Runtime for Business Applications`) - это runtime-слой для бизнес-приложений, который забирает на себя инфраструктурную часть исполнения. Платформа стандартизирует запуск и остановку рабочих процессов, контроль состояния приложения и эксплуатационные сервисы вокруг них. За счет этого прикладной код концентрируется на бизнес-логике, а не на lifecycle, диагностике и служебных механизмах. Базовая модель использования строится вокруг `ApplicationModule`, `Worker` и прикладной `Routine`, где каждый уровень отвечает за свою часть ответственности. В результате приложение получается предсказуемым в эксплуатации, проще в сопровождении и масштабировании. -Платформа берет на себя инфраструктурные обязанности: -- lifecycle приложения -- запуск и остановку воркеров -- health/status -- tracing -- logging -- control plane -- загрузку конфигурации +## 2. Концепция использования +`ApplicationModule` - точка сборки приложения: здесь создаются зависимости, собираются рутины, создаются воркеры и регистрируются дополнительные health-контрибьюторы. -Бизнес-приложение на базе `plba` собирается вокруг трех уровней: -- `ApplicationModule` собирает приложение и регистрирует воркеры -- `Worker` управляет исполнением и lifecycle -- `Routine` реализует бизнес-функцию +`Worker` - основной runtime-контракт платформы: он управляет исполнением (потоки, цикл, стратегия остановки), возвращает `health()` и `status()`, а также определяет критичность компонента для общего health. -`Routine` не является контрактом `plba`. Это рекомендуемый архитектурный паттерн для прикладного кода. +`Routine` - прикладной паттерн (не обязательный контракт платформы): класс или набор классов, где сосредоточена бизнес-логика, которую воркер регулярно или однократно запускает. -Правила построения приложений на платформе собраны отдельно в [application_guidelines.md](/Users/alex/Dev_projects_v2/apps/plba/requirements/application_guidelines.md). +Вспомогательные сервисы платформы дополняют core-модель: +- `tracing` (`TraceService`, транспорты) - операционная трассировка контекстов и сообщений. +- `logging` (`LogManager`) - применение конфигурации логирования из runtime-конфига. +- `health` (`HealthRegistry`) - агрегирование здоровья воркеров и дополнительных компонентов. +- `workflow` (`WorkflowEngine` и persistence-слой) - исполнение шагов бизнес-процесса с переходами и фиксацией состояния. +- `control plane` (`ControlPlaneService`, `HttpControlChannel`) - внешние health/action endpoints. +- `queue` (`InMemoryTaskQueue`) - локальный in-memory буфер как утилита прикладного уровня. -## Installation +## 3. Архитектура +```mermaid +classDiagram + class ApplicationModule { + <> + +name: str + +register(registry) + } + class ModuleRegistry { + +add_worker(worker) + +add_health_contributor(contributor) + +register_module(name) + } + class RuntimeManager { + +register_module(module) + +add_config_file(path) + +start() + +stop(timeout, force) + +status() + +current_health() + } + class WorkerSupervisor { + +register(worker) + +start() + +stop(timeout, force) + +statuses() + +healths() + } + class Worker { + <> + +name: str + +critical: bool + +start() + +stop(force) + +health() + +status() + } + class Routine { + <> + +run() + } -Установка `plba` через `pip` из git-репозитория: + class ConfigurationManager + class LogManager + class HealthRegistry + class TraceService + class ControlPlaneService + class WorkflowRuntimeFactory + class WorkflowEngine + class WorkflowPersistence + class WorkflowRepository + class CheckpointRepository + class InMemoryTaskQueue + class MySqlTraceTransport -```bash -pip install "plba @ git+https://git.lesha.spb.ru/alex/plba.git" + ApplicationModule --> ModuleRegistry : register(...) + RuntimeManager --> ApplicationModule : register_module(...) + RuntimeManager --> ModuleRegistry + RuntimeManager --> ConfigurationManager + RuntimeManager --> LogManager + RuntimeManager --> HealthRegistry + RuntimeManager --> TraceService + RuntimeManager --> WorkerSupervisor + RuntimeManager --> ControlPlaneService + + WorkerSupervisor --> Worker + Worker --> Routine : invokes + + WorkflowRuntimeFactory --> WorkflowEngine : create_engine(...) + WorkflowEngine --> WorkflowPersistence + WorkflowPersistence --> WorkflowRepository + WorkflowPersistence --> CheckpointRepository + + TraceService --> MySqlTraceTransport ``` -## Runtime model +## 4. Описание компонентов -1. приложение объявляет `ApplicationModule` -2. модуль регистрирует один или несколько `Worker` -3. `RuntimeManager` запускает все воркеры -4. каждый `Worker` запускает свою бизнес-активность -5. runtime агрегирует health и status -6. runtime останавливает воркеры graceful или forcefully +### 4.1 Core модули -## Main contracts +#### ApplicationModule +- Назначение: композиция прикладного приложения и регистрация runtime-компонентов. +- Реализация: контракт `app_runtime.contracts.application.ApplicationModule`, регистрация через `app_runtime.core.registration.ModuleRegistry`. +- Как работает / API / вызовы / таблицы: + - API: `name`, `register(registry)`. + - Типичные вызовы: `registry.add_worker(worker)`, `registry.add_health_contributor(contributor)`. + - `RuntimeManager.register_module()` вызывает `module.register(...)` и добавляет имя модуля в снимок runtime. + - В БД напрямую не пишет. +- Типовая схема использования: + - Создать инфраструктурные и доменные сервисы. + - Создать `Routine`. + - Создать `Worker` с зависимостью на рутину. + - Зарегистрировать воркер и опциональные health contributors. -### `ApplicationModule` +#### Worker +- Назначение: runtime-исполнение бизнес-активности, управление lifecycle, интерпретация health/status. +- Реализация: контракт `app_runtime.contracts.worker.Worker`, оркестрация через `app_runtime.workers.supervisor.WorkerSupervisor`. +- Как работает / API / вызовы / таблицы: + - API: `start()`, `stop(force=False)`, `health() -> WorkerHealth`, `status() -> WorkerStatus`, свойства `name`, `critical`. + - `WorkerSupervisor.start()` запускает все воркеры, `stop()` останавливает и ждет state=`stopped`. + - `RuntimeManager` получает агрегированные `statuses()`/`healths()` у супервизора. + - В БД напрямую не пишет (если прикладной worker сам не реализует persistence). +- Типовая схема использования: + - Внутри `start()` поднимать поток/пул или запускать одноразовую задачу. + - В цикле вызывать `routine.run()`. + - Ошибки бизнес-логики транслировать в `health()`/`status()`. -Описывает, из чего состоит приложение. +#### Routine +- Назначение: изоляция прикладной бизнес-логики от runtime-обвязки. +- Реализация: прикладной класс (паттерн), обычно с методом `run()`; платформой не навязывается отдельный интерфейс. +- Как работает / API / вызовы / таблицы: + - Типичный API: `run()` + дополнительные domain-методы. + - Вызывается воркером в выбранной стратегии выполнения (single-run/loop, 1..N потоков). + - Может вызывать сервисы интеграций, workflow, очереди, доменные репозитории. + - Запись в таблицы определяется прикладными сервисами, а не самой платформой. +- Типовая схема использования: + - Оставлять рутину тонким оркестратором бизнес-действий. + - Сложную логику выносить в отдельные доменные сервисы. -Ответственность: -- дать имя модуля -- зарегистрировать воркеры -- зарегистрировать health contributors при необходимости -- собрать прикладные зависимости +### 4.2 Service модули -### `Worker` +#### Configuration +- Назначение: централизованная загрузка и слияние runtime-конфигурации. +- Реализация: `ConfigurationManager`, `FileConfigProvider`, `ConfigFileLoader`. +- Как работает / API / вызовы / таблицы: + - API: `add_provider()`, `load()`, `reload()`, `get()`, `section()`. + - `RuntimeManager.start()` вызывает `configuration.load()`. + - Поддерживается YAML/JSON через `ConfigFileLoader.parse()`. + - В БД не пишет. +- Типовая схема использования: + - В bootstrap добавить файл: `runtime.add_config_file("config.yml")`. + - Читать секции из `runtime.configuration.section("...")` в прикладных фабриках. -Главный runtime-контракт платформы. +#### Logging +- Назначение: применение и восстановление валидной конфигурации логирования. +- Реализация: `LogManager`. +- Как работает / API / вызовы / таблицы: + - API: `apply_config(config)`. + - `RuntimeManager.start()` вызывает `logs.apply_config(config)`. + - Если секция `log` некорректна, сервис пытается восстановить предыдущую валидную конфигурацию. + - В БД не пишет. +- Типовая схема использования: + - Передать `log` секцию в конфиг и запускать runtime стандартно через `start()`. -Контракт: -- `name` -- `critical` -- `start()` -- `stop(force=False)` -- `health()` -- `status()` - -`Worker` отвечает только за runtime-поведение: -- как запускается бизнес-активность -- в одном потоке или нескольких -- single-run или loop -- graceful shutdown -- интерпретацию ошибок в `health/status` - -### `Routine` - -Рекомендуемый application-level паттерн. - -`Routine` описывает бизнес-функцию: -- что читать -- какие сервисы вызывать -- какие бизнес-решения принимать -- что сохранять или отправлять наружу - -Обычно воркер получает одну routine через конструктор и вызывает ее в `start()` или во внутренних helper-методах. - -## Minimal example +#### Health +- Назначение: сводный health приложения по воркерам и дополнительным контрибьюторам. +- Реализация: `HealthRegistry` + контрибьюторы по контракту `HealthContributor`. +- Как работает / API / вызовы / таблицы: + - API: `register()`, `snapshot(worker_healths)`, `payload(state, worker_healths)`. + - Агрегация: `unhealthy` при критичном `unhealthy`, `degraded` при любой деградации, иначе `ok`. + - `RuntimeManager.current_health()` использует `HealthRegistry.payload(...)`. + - В БД не пишет. +- Типовая схема использования: + - Зарегистрировать contributor в `ApplicationModule.register(...)`. + - Отдавать health наружу через control plane `/health`. +#### Tracing +- Назначение: фиксация контекстов выполнения и событий по шагам операций. +- Реализация: `TraceService`, `TraceContextStore`, `NoOpTraceTransport`, `MySqlTraceTransport`. +- Как работает / API / вызовы / таблицы: + - API: `create_context()`, `open_context()`, `step()`, `info()/warning()/error()/exception()`, `new_root()`, `child_of()`, `attach()`, `resume()`. + - `TraceService` пишет записи через transport; ошибки транспорта изолируются в логах. + - При `MySqlTraceTransport` записи идут в таблицы: + - `trace_contexts` + - `trace_messages` + - Эталонная схема и migration для MySQL: `requirements/sql/trace_mysql_schema.sql`. +- Trace Context: + - Что это: запись о начале логического контекста выполнения (операция, воркер, подоперация), к которой потом привязываются trace-сообщения. + - Для чего нужен: позволяет собрать дерево исполнения и связать все сообщения конкретной бизнес-операции. + - Атрибуты контекста (`TraceContextRecord`): `trace_id`, `alias`, `parent_id`, `type`, `event_time`, `attrs`. + - Иерархия: `parent_id` указывает на родительский контекст; так строится цепочка root -> child. + - Таблица: `trace_contexts`. + - Как объявляется в коде: ```python -from threading import Event, Lock, Thread -from time import sleep +with traces.open_context(alias="orders-worker", kind="worker", attrs={"routine": "orders"}) as trace_id: + ... +``` +```python +root = traces.new_root("orders.sync") +child = traces.child_of(root, "orders.process_batch") +``` +- Trace Message: + - Что это: событие внутри активного context (статус шага, предупреждение, ошибка, служебная информация). + - Роль `step`: текущая стадия операции (`parse`, `validate`, `persist` и т.д.), которую выставляют через `traces.step("...")`. + - Уровни сообщений: `INFO`, `WARNING`, `ERROR`; `exception(...)` пишет сообщение уровня `ERROR`. + - Атрибуты сообщения (`TraceLogMessage`): `trace_id`, `step`, `status`, `message`, `level`, `event_time`, `attrs`. + - Связь с context: каждое сообщение обязательно связано с текущим активным `trace_id`; без активного контекста `TraceService` выбрасывает ошибку. + - Таблица: `trace_messages`. + - Как правильно применять в проекте: + - Открывать один root-context на единицу бизнес-работы (например, обработка одного сообщения/заказа). + - Перед важными этапами явно менять `step`. + - `info` использовать для нормального прогресса, `warning` для recoverable ситуаций (retry/fallback), `error`/`exception` для сбоев. + - В `attrs` класть диагностические ключи (`entity_id`, `attempt`, `integration`, `duration_ms`), чтобы ускорить расследование инцидентов. +- Типовая схема использования: + - В воркере открыть контекст (`open_context`) и на каждом шаге рутины писать `step()/info()`. -from plba import ( - ApplicationModule, - Worker, - WorkerHealth, - WorkerStatus, - create_runtime, -) +#### Workflow +- Назначение: исполнение step-based workflow с переходами и сохранением прогресса. +- Реализация: `WorkflowRuntimeFactory`, `WorkflowEngine`, `WorkflowPersistence`, `WorkflowRepository`, `CheckpointRepository`. +- Как работает / API / вызовы / таблицы: + - API верхнего уровня: `WorkflowRuntimeFactory.create_engine(workflow)`, `WorkflowEngine.run(context)`. + - `WorkflowEngine` по шагам вызывает `WorkflowStep.run(context)`, применяет `TransitionResolver`, вызывает hooks. + - `WorkflowPersistence` пишет состояние run/step/checkpoint через репозитории. + - При настроенном подключении к БД записи идут в таблицы: + - `workflow_runs` + - `workflow_steps` + - `workflow_checkpoints` + - Без подключения работает in-memory fallback репозиториев. +- Типовая схема использования: + - Описать `WorkflowDefinition` (узлы и transitions). + - Создать engine через фабрику. + - Запускать из рутины или воркера: `engine.run(context)`. + +#### Control Plane +- Назначение: внешний канал управления runtime и чтения health/status. +- Реализация: `ControlPlaneService`, `HttpControlChannel`, `HttpControlAppFactory`. +- Как работает / API / вызовы / таблицы: + - API: `register_channel()`, `start(runtime)`, `stop()`, `snapshot(runtime)`. + - HTTP-канал поднимает endpoint'ы: + - `GET /health` + - `GET|POST /actions/{action}` (`start`, `stop`, `status`) + - Колбэки action'ов вызывают async-методы `RuntimeManager`. + - В БД не пишет. +- Типовая схема использования: + - При создании runtime включить `enable_http_control=True`. + - Использовать `/health` для readiness/liveness и `/actions/*` для операционного контроля. + +#### Queue +- Назначение: простой in-memory буфер задач/сообщений внутри приложения. +- Реализация: `InMemoryTaskQueue[T]`. +- Как работает / API / вызовы / таблицы: + - API: `put(item)`, `get(timeout)`, `task_done()`, `qsize()`, `stats()`. + - Может использоваться между воркерами/сервисами как локальная очередь. + - В БД не пишет. +- Типовая схема использования: + - Producer в рутине кладет элементы через `put`. + - Consumer-воркер извлекает через `get(timeout)` и обрабатывает. + +## 5. MVP бизнес-приложения +Минимальная конфигурация запуска: +1. Создать один `ApplicationModule`. +2. В модуле собрать одну `Routine` и один `Worker` (1 worker -> 1 routine). +3. Зарегистрировать воркер через `registry.add_worker(...)`. +4. Создать runtime: `create_runtime(module, config_path="config.yml")`. +5. Вызвать `runtime.start()`. + +Минимальный пример: +```python +from plba import ApplicationModule, Worker, WorkerHealth, WorkerStatus, create_runtime +from app_runtime.core.registration import ModuleRegistry -class OrdersRoutine: - def __init__(self, service) -> None: - self._service = service - +class DemoRoutine: def run(self) -> None: - self._service.process_new_orders() + print("business action") -class OrdersWorker(Worker): - def __init__(self, routine: OrdersRoutine, interval: float = 1.0) -> None: +class DemoWorker(Worker): + def __init__(self, routine: DemoRoutine) -> None: self._routine = routine - self._interval = interval - self._thread: Thread | None = None - self._stop_requested = Event() - self._lock = Lock() - self._in_flight = 0 - self._failures = 0 + self._started = False @property def name(self) -> str: - return "orders-worker" + return "demo-worker" @property def critical(self) -> bool: return True def start(self) -> None: - if self._thread and self._thread.is_alive(): - return - self._stop_requested.clear() - self._thread = Thread(target=self._run_loop, daemon=True) - self._thread.start() + self._started = True + self._routine.run() def stop(self, force: bool = False) -> None: del force - self._stop_requested.set() + self._started = False def health(self) -> WorkerHealth: - if self._failures > 0: - return WorkerHealth(self.name, "degraded", self.critical) - return WorkerHealth(self.name, "ok", self.critical) + return WorkerHealth(name=self.name, status="ok", critical=self.critical) def status(self) -> WorkerStatus: - alive = self._thread is not None and self._thread.is_alive() - state = "busy" if self._in_flight else "idle" - if not alive: - state = "stopped" - elif self._stop_requested.is_set(): - state = "stopping" - return WorkerStatus(name=self.name, state=state, in_flight=self._in_flight) - - def _run_loop(self) -> None: - while not self._stop_requested.is_set(): - with self._lock: - self._in_flight += 1 - try: - self._routine.run() - except Exception: - self._failures += 1 - finally: - with self._lock: - self._in_flight -= 1 - sleep(self._interval) + return WorkerStatus(name=self.name, state="idle" if self._started else "stopped") -class OrdersModule(ApplicationModule): +class DemoModule(ApplicationModule): @property def name(self) -> str: - return "orders" + return "demo" - def register(self, registry) -> None: - service = OrderService() - routine = OrdersRoutine(service) - registry.add_worker(OrdersWorker(routine)) + def register(self, registry: ModuleRegistry) -> None: + registry.add_worker(DemoWorker(DemoRoutine())) -runtime = create_runtime(OrdersModule(), config_path="config.yml") +runtime = create_runtime(DemoModule(), config_path="config.yml") runtime.start() ``` -## Health and status - -Практика такая: -- `Routine` выполняет бизнес-работу -- `Worker` ловит ее ошибки -- `Worker` интерпретирует outcome в `health()` и `status()` - -То есть routine не выставляет health напрямую. - -## In-memory queue - -`InMemoryTaskQueue` остается в платформе как простой in-memory utility. - -Это не базовый платформенный контракт и не обязательный паттерн архитектуры. -Ее можно использовать в прикладном коде как локальный буфер между компонентами, если это действительно помогает. - -```python -from plba import InMemoryTaskQueue - -queue = InMemoryTaskQueue[str]() -queue.put("payload") -item = queue.get(timeout=0.1) -``` - -## Public API - -Основные публичные сущности: -- `ApplicationModule` -- `Worker` -- `WorkerHealth` -- `WorkerStatus` -- `RuntimeManager` -- `WorkerSupervisor` -- `TraceService` -- `HealthRegistry` -- `InMemoryTaskQueue` -- `create_runtime(...)` +Для production-сценария после MVP обычно добавляют `tracing`, `health contributors`, `workflow` и HTTP control plane, но базовый запуск не требует этих расширений. diff --git a/pyproject.toml b/pyproject.toml index f84d01f..1bc34f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plba" -version = "0.2.1" +version = "0.2.2" description = "Platform runtime for business applications" readme = "README.md" requires-python = ">=3.11" diff --git a/requirements/sql/trace_mysql_schema.sql b/requirements/sql/trace_mysql_schema.sql new file mode 100644 index 0000000..522f93b --- /dev/null +++ b/requirements/sql/trace_mysql_schema.sql @@ -0,0 +1,37 @@ +-- MySQL schema for PLBA tracing. +-- Guarantees compatibility with TraceService levels: INFO, WARNING, ERROR. + +CREATE TABLE IF NOT EXISTS trace_contexts ( + trace_id VARCHAR(64) PRIMARY KEY, + parent_id VARCHAR(64) NULL, + alias VARCHAR(255) NOT NULL, + type VARCHAR(64) NULL, + event_time DATETIME(6) NOT NULL, + attrs_json JSON NOT NULL, + KEY idx_trace_contexts_parent_id (parent_id), + CONSTRAINT fk_trace_contexts_parent + FOREIGN KEY (parent_id) REFERENCES trace_contexts(trace_id) + ON DELETE SET NULL +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +CREATE TABLE IF NOT EXISTS trace_messages ( + id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + trace_id VARCHAR(64) NOT NULL, + event_time DATETIME(6) NOT NULL, + step VARCHAR(128) NOT NULL, + status VARCHAR(64) NOT NULL, + level ENUM('INFO', 'WARNING', 'ERROR') NOT NULL, + message TEXT NOT NULL, + attrs_json JSON NOT NULL, + KEY idx_trace_messages_trace_id (trace_id), + KEY idx_trace_messages_event_time (event_time), + KEY idx_trace_messages_level (level), + CONSTRAINT fk_trace_messages_context + FOREIGN KEY (trace_id) REFERENCES trace_contexts(trace_id) + ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +-- Migration for existing installations. +-- If your `trace_messages.level` does not include WARNING, run this ALTER. +ALTER TABLE trace_messages + MODIFY COLUMN level ENUM('INFO', 'WARNING', 'ERROR') NOT NULL; diff --git a/src/app_runtime/contracts/trace.py b/src/app_runtime/contracts/trace.py index e9c8fee..fe339d0 100644 --- a/src/app_runtime/contracts/trace.py +++ b/src/app_runtime/contracts/trace.py @@ -3,7 +3,10 @@ from __future__ import annotations from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Any, Protocol +from typing import Any, Literal, Protocol + + +TraceLevel = Literal["INFO", "WARNING", "ERROR"] def utc_now() -> datetime: @@ -44,7 +47,7 @@ class TraceLogMessage: step: str status: str message: str - level: str + level: TraceLevel event_time: datetime = field(default_factory=utc_now) attrs: dict[str, Any] = field(default_factory=dict) diff --git a/src/app_runtime/tracing/service.py b/src/app_runtime/tracing/service.py index 00bc284..6b5721a 100644 --- a/src/app_runtime/tracing/service.py +++ b/src/app_runtime/tracing/service.py @@ -9,6 +9,7 @@ from app_runtime.contracts.trace import ( TraceContext, TraceContextFactory, TraceContextRecord, + TraceLevel, TraceLogMessage, TraceTransport, utc_now, @@ -86,13 +87,13 @@ class TraceService(TraceContextFactory): def step(self, name: str) -> None: self.store.set_step(str(name or "")) - def info(self, message: str, *, status: str, attrs: dict[str, Any] | None = None) -> None: + def info(self, message: str, *, status: str | None = None, attrs: dict[str, Any] | None = None) -> None: self._write_message("INFO", message, status, attrs) - def warning(self, message: str, *, status: str, attrs: dict[str, Any] | None = None) -> None: + def warning(self, message: str, *, status: str | None = None, attrs: dict[str, Any] | None = None) -> None: self._write_message("WARNING", message, status, attrs) - def error(self, message: str, *, status: str, attrs: dict[str, Any] | None = None) -> None: + def error(self, message: str, *, status: str | None = None, attrs: dict[str, Any] | None = None) -> None: self._write_message("ERROR", message, status, attrs) def exception(self, message: str, *, status: str = "failed", attrs: dict[str, Any] | None = None) -> None: @@ -142,9 +143,9 @@ class TraceService(TraceContextFactory): def _write_message( self, - level: str, + level: TraceLevel, message: str, - status: str, + status: str | None, attrs: dict[str, Any] | None, ) -> None: active = self.store.current() diff --git a/tests/test_runtime.py b/tests/test_runtime.py index fc5a1a6..dfd8f9c 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -259,6 +259,38 @@ def test_trace_service_writes_contexts_and_messages() -> None: assert transport.messages[0].step == "parse" +def test_trace_service_supports_warning_and_error_levels() -> None: + from app_runtime.tracing.service import TraceService + + transport = RecordingTransport() + manager = TraceService(transport=transport) + + with manager.open_context(alias="worker", kind="worker", attrs={"routine": "incoming"}): + manager.step("validate") + manager.warning("validation warning", status="degraded", attrs={"attempt": 1}) + manager.error("integration failed", status="failed", attrs={"integration": "crm"}) + manager.exception("caught exception", attrs={"exception_type": "RuntimeError"}) + + levels = [message.level for message in transport.messages] + assert levels == ["WARNING", "ERROR", "ERROR"] + + +def test_trace_service_allows_messages_without_status() -> None: + from app_runtime.tracing.service import TraceService + + transport = RecordingTransport() + manager = TraceService(transport=transport) + + with manager.open_context(alias="worker", kind="worker"): + manager.step("optional-status") + manager.info("info without status") + manager.warning("warning without status") + manager.error("error without status") + + assert [message.status for message in transport.messages] == ["", "", ""] + assert all(message.trace_id == transport.contexts[0].trace_id for message in transport.messages) + + def test_http_control_channel_exposes_health_and_actions() -> None: from app_runtime.control.base import ControlActionSet from app_runtime.control.http_channel import HttpControlChannel