Compare commits

..

18 Commits

Author SHA1 Message Date
alex e6509ee0cd Единый http сервис на одном порту 2026-05-14 21:08:18 +03:00
alex e2b817f785 INF-3 Поправил шапку с трейс ид 2026-05-04 13:17:22 +03:00
alex 8cad1d00ec INF-3 Правка последовательности выводы сообщений 2026-05-04 11:51:40 +03:00
alex ec3198dbf1 INF-3 Апдейт версии 2026-05-04 11:18:28 +03:00
alex aed12c9c4e INF-3 Добавить параметр для вывода дочерних и родительских трейсов 2026-05-04 11:12:19 +03:00
alex df50e7acbb Скорректировал разделитель в трейс логах 2026-05-03 08:45:33 +03:00
alex 8789fcc0d1 INF-2 Новая версия 2026-05-02 23:45:20 +03:00
alex b2915c3987 Новая версия 2026-05-02 23:45:00 +03:00
alex 2e75e53b89 Поставил HTML по дефолту 2026-05-02 23:41:14 +03:00
alex 3184ff16ca Правки html формата 2026-05-02 23:38:29 +03:00
alex ef8732f079 Доработка формата html 2026-05-02 23:31:57 +03:00
alex cd4d1b3169 ДОработка html формата 2026-05-02 23:24:17 +03:00
alex 62f08776eb Доработка trace html 2026-05-02 23:14:27 +03:00
alex 90422a0c2a Добавлена возможность регистрировать прикладные апи 2026-04-30 13:47:23 +03:00
alex 9eb7282437 Убрал текущий трейс из ссылок 2026-04-30 10:25:35 +03:00
alex 238c65c9c2 Одноколоночный трейс 2026-04-30 10:15:07 +03:00
alex 72162dd050 Апдейт версии 2026-04-30 10:06:06 +03:00
alex fa314bc1e5 html рендер логов 2026-04-30 09:56:42 +03:00
23 changed files with 1450 additions and 119 deletions
+87 -3
View File
@@ -40,6 +40,7 @@ PLBA (`Platform Runtime for Business Applications`) - это runtime-слой д
- `health` (`HealthRegistry`) - агрегирование здоровья воркеров и дополнительных компонентов. - `health` (`HealthRegistry`) - агрегирование здоровья воркеров и дополнительных компонентов.
- `workflow` (`WorkflowEngine` и persistence-слой) - исполнение шагов бизнес-процесса с переходами и фиксацией состояния. - `workflow` (`WorkflowEngine` и persistence-слой) - исполнение шагов бизнес-процесса с переходами и фиксацией состояния.
- `control plane` (`ControlPlaneService`, `HttpControlChannel`) - внешние health/action endpoints. - `control plane` (`ControlPlaneService`, `HttpControlChannel`) - внешние health/action endpoints.
- `application HTTP` (`ApplicationHttpService`, `HttpApplicationChannel`) - пользовательские HTTP routes бизнес-приложения.
- `queue` (`InMemoryTaskQueue`) - локальный in-memory буфер как утилита прикладного уровня. - `queue` (`InMemoryTaskQueue`) - локальный in-memory буфер как утилита прикладного уровня.
## 3. Архитектура ## 3. Архитектура
@@ -89,6 +90,7 @@ classDiagram
class HealthRegistry class HealthRegistry
class TraceService class TraceService
class ControlPlaneService class ControlPlaneService
class ApplicationHttpService
class WorkflowRuntimeFactory class WorkflowRuntimeFactory
class WorkflowEngine class WorkflowEngine
class WorkflowPersistence class WorkflowPersistence
@@ -106,6 +108,7 @@ classDiagram
RuntimeManager --> TraceService RuntimeManager --> TraceService
RuntimeManager --> WorkerSupervisor RuntimeManager --> WorkerSupervisor
RuntimeManager --> ControlPlaneService RuntimeManager --> ControlPlaneService
RuntimeManager --> ApplicationHttpService
WorkerSupervisor --> Worker WorkerSupervisor --> Worker
Worker --> Routine : invokes Worker --> Routine : invokes
@@ -127,7 +130,8 @@ classDiagram
- Реализация: контракт `app_runtime.contracts.application.ApplicationModule`, регистрация через `app_runtime.core.registration.ModuleRegistry`. - Реализация: контракт `app_runtime.contracts.application.ApplicationModule`, регистрация через `app_runtime.core.registration.ModuleRegistry`.
- Как работает / API / вызовы / таблицы: - Как работает / API / вызовы / таблицы:
- API: `name`, `register(registry)`. - API: `name`, `register(registry)`.
- Типичные вызовы: `registry.add_worker(worker)`, `registry.add_health_contributor(contributor)`. - Типичные вызовы: `registry.add_worker(worker)`, `registry.add_health_contributor(contributor)`.
- Для HTTP-модулей: `registry.add_http_routes(registrar)`.
- `RuntimeManager.register_module()` вызывает `module.register(...)` и добавляет имя модуля в снимок runtime. - `RuntimeManager.register_module()` вызывает `module.register(...)` и добавляет имя модуля в снимок runtime.
- В БД напрямую не пишет. - В БД напрямую не пишет.
- Типовая схема использования: - Типовая схема использования:
@@ -314,7 +318,87 @@ with traces.open_context(alias="email:123", kind="email") as message_trace_id:
- Producer в рутине кладет элементы через `put`. - Producer в рутине кладет элементы через `put`.
- Consumer-воркер извлекает через `get(timeout)` и обрабатывает. - Consumer-воркер извлекает через `get(timeout)` и обрабатывает.
## 5. MVP бизнес-приложения ## 5. Application HTTP
`PLBA` поддерживает прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. По умолчанию он отделён от `control plane`, но при необходимости можно опубликовать control routes и business routes через один HTTP channel.
`Control Plane` и `Application HTTP` обслуживают разные контуры:
- `Control Plane` используется для `/health`, `/actions/*`, `/traces/*`.
- `Application HTTP` используется для бизнес-маршрутов приложения, например `/estimate`, `/estimate/api/tasks`, `/api/orders`.
### Основные компоненты
- `ApplicationHttpService` управляет lifecycle прикладного HTTP на уровне runtime.
- `HttpApplicationChannel` поднимает отдельный `FastAPI` app через `uvicorn`.
- `HttpRouteRegistrar` регистрирует пользовательские routes и получает `ServiceContainer`.
- `HttpApplicationAppFactory` собирает `FastAPI(title="PLBA Application API")`, middleware и routes.
- `UnifiedHttpService` собирает один `FastAPI` app с control routes (`/health`, `/actions/*`, `/traces/*`) и application routes.
### Минимальный пример
```python
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import HTMLResponse
from plba import ApplicationModule, HttpApplicationChannel, create_runtime
class DemoRoutes:
def register(self, app: FastAPI, services) -> None:
@app.get("/demo")
async def demo_page():
return HTMLResponse("<h1>Demo</h1>")
@app.post("/demo/api/tasks")
async def create_task(file: UploadFile = File(...)):
payload = await file.read()
return {"filename": file.filename, "size": len(payload)}
class DemoModule(ApplicationModule):
@property
def name(self) -> str:
return "demo"
def register(self, registry) -> None:
registry.add_http_routes(DemoRoutes())
runtime = create_runtime(DemoModule(), config_path="config.yml")
runtime.application_http.register_channel(
HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
)
runtime.start()
```
После старта runtime приложение будет обслуживать:
- `GET /demo`
- `POST /demo/api/tasks`
### Единый HTTP API
Если бизнес-приложению нужен один опубликованный порт для control и application routes, передайте `UnifiedHttpService` как `application_http`:
```python
from plba import HttpApplicationChannel, RuntimeManager, UnifiedHttpService
runtime = RuntimeManager(application_http=UnifiedHttpService())
runtime.application_http.register_channel(
HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
)
```
Старый режим с отдельными `ControlPlaneService` и `ApplicationHttpService` остаётся доступен и работает как раньше.
### Типовые сценарии
- `Web UI для фоновых задач`: список задач, запуск, статус, cancel, download result.
- `Внутренний REST API`: например `POST /jobs`, `GET /jobs/{id}`, `POST /jobs/{id}/cancel`.
- `Файловый шлюз`: загрузка входного файла, асинхронная обработка через worker, скачивание результата.
- `Webhook endpoint`: прием callback от внешней системы и передача события в worker pipeline.
### Ограничения и рекомендации
- По умолчанию держите business routes и `control plane` раздельно; используйте `UnifiedHttpService`, когда один опубликованный порт является явным эксплуатационным требованием.
- Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер.
- Runtime предоставляет доступ к зависимостям через `services`, поэтому handlers не должны зависеть от `RuntimeManager`.
- В первой версии `Application HTTP` не решает auth, static files, шаблонизаторы, websocket и OpenAPI customization.
- Для публичного доступа выносите auth, TLS и reverse proxy на внешний ingress.
## 6. MVP бизнес-приложения
Минимальная конфигурация запуска: Минимальная конфигурация запуска:
1. Создать один `ApplicationModule`. 1. Создать один `ApplicationModule`.
2. В модуле собрать одну `Routine` и один `Worker` (1 worker -> 1 routine). 2. В модуле собрать одну `Routine` и один `Worker` (1 worker -> 1 routine).
@@ -374,4 +458,4 @@ runtime = create_runtime(DemoModule(), config_path="config.yml")
runtime.start() runtime.start()
``` ```
Для production-сценария после MVP обычно добавляют `tracing`, `health contributors`, `workflow` и HTTP control plane, но базовый запуск не требует этих расширений. Для production-сценария после MVP обычно добавляют `tracing`, `health contributors`, `workflow`, HTTP control plane и при необходимости `application HTTP`, но базовый запуск не требует этих расширений.
+3 -2
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "plba" name = "plba"
version = "0.3.4" version = "0.4.0"
description = "Platform runtime for business applications" description = "Platform runtime for business applications"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
@@ -12,9 +12,10 @@ dependencies = [
"fastapi>=0.129.0", "fastapi>=0.129.0",
"PyMySQL>=1.1", "PyMySQL>=1.1",
"PyYAML>=6.0.3", "PyYAML>=6.0.3",
"python-multipart>=0.0.9",
"uvicorn>=0.41.0", "uvicorn>=0.41.0",
] ]
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["src"] where = ["src"]
+7 -1
View File
@@ -92,8 +92,14 @@ class TraceLogView:
parent_id: str | None parent_id: str | None
child_ids: tuple[str, ...] = () child_ids: tuple[str, ...] = ()
records: tuple[TraceLogRecord, ...] = () records: tuple[TraceLogRecord, ...] = ()
ancestors: tuple[TraceLogView, ...] = ()
class TraceLogReader(Protocol): class TraceLogReader(Protocol):
def read_trace(self, trace_id: str, levels: tuple[TraceLevel, ...]) -> TraceLogView | None: def read_trace(
self,
trace_id: str,
levels: tuple[TraceLevel, ...],
ancestor_depth: int | None = 0,
) -> TraceLogView | None:
"""Load trace context and filtered log records.""" """Load trace context and filtered log records."""
@@ -0,0 +1,46 @@
from __future__ import annotations
import asyncio
from fastapi.responses import JSONResponse
from app_runtime.control.base import ControlActionRequest, ControlActionSet
class ControlActionResponder:
def __init__(self, actions: ControlActionSet, timeout: int) -> None:
self._actions = actions
self._timeout = timeout
async def respond(
self,
action: str,
_client_source: str = "unknown",
request: ControlActionRequest | None = None,
) -> JSONResponse:
callbacks = {
"start": self._actions.start,
"stop": self._actions.stop,
"status": self._actions.status,
}
callback = callbacks.get(action)
if callback is None:
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
action_request = request or ControlActionRequest()
action_timeout = self._action_timeout(action, action_request)
try:
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
except asyncio.TimeoutError:
return JSONResponse(
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
status_code=202,
)
except Exception as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
if action != "stop" or request.wait is False or request.timeout is None:
return base_timeout
return max(base_timeout, float(request.timeout) + 1.0)
+3 -2
View File
@@ -19,14 +19,15 @@ class ControlActionRequest:
ActionResult = str | dict[str, object] ActionResult = str | dict[str, object]
ActionHandler = Callable[[ControlActionRequest], Awaitable[ActionResult]] ActionHandler = Callable[[ControlActionRequest], Awaitable[ActionResult]]
HealthHandler = Callable[[], Awaitable[HealthPayload]] HealthHandler = Callable[[], Awaitable[HealthPayload]]
TraceResponseFormat = Literal["json", "text"] TraceResponseFormat = Literal["json", "text", "html"]
@dataclass(slots=True) @dataclass(slots=True)
class TraceQueryRequest: class TraceQueryRequest:
levels: tuple[TraceLevel, ...] = ("ERROR", "WARNING", "INFO") levels: tuple[TraceLevel, ...] = ("ERROR", "WARNING", "INFO")
include_attrs_json: bool = False include_attrs_json: bool = False
response_format: TraceResponseFormat = "text" response_format: TraceResponseFormat = "html"
ancestor_depth: int | None = 0
TraceLookupHandler = Callable[[str, TraceQueryRequest], Awaitable[TraceLogView]] TraceLookupHandler = Callable[[str, TraceQueryRequest], Awaitable[TraceLogView]]
+14 -68
View File
@@ -1,22 +1,30 @@
from __future__ import annotations from __future__ import annotations
import json
import logging import logging
import time import time
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from typing import cast
from fastapi import FastAPI, Request from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, PlainTextResponse from fastapi.responses import JSONResponse
from app_runtime.control.base import ControlActionRequest, TraceQueryRequest from app_runtime.control.base import ControlActionRequest, TraceQueryRequest
from app_runtime.contracts.trace import TraceLevel, TraceLogView from app_runtime.control.trace_presenter import TraceRequestParser, TraceResponseRenderer
from app_runtime.contracts.trace import TraceLogView
from app_runtime.core.types import HealthPayload from app_runtime.core.types import HealthPayload
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class HttpControlAppFactory: class HttpControlAppFactory:
def __init__(
self,
*,
trace_request_parser: TraceRequestParser | None = None,
trace_response_renderer: TraceResponseRenderer | None = None,
) -> None:
self._trace_request_parser = trace_request_parser or TraceRequestParser()
self._trace_response_renderer = trace_response_renderer or TraceResponseRenderer()
def create( def create(
self, self,
health_provider: Callable[[], Awaitable[HealthPayload]], health_provider: Callable[[], Awaitable[HealthPayload]],
@@ -55,7 +63,7 @@ class HttpControlAppFactory:
if trace_provider is None: if trace_provider is None:
return JSONResponse(content={"status": "error", "detail": "trace lookup is not configured"}, status_code=503) return JSONResponse(content={"status": "error", "detail": "trace lookup is not configured"}, status_code=503)
try: try:
trace_request = self._trace_request(request) trace_request = self._trace_request_parser.parse(request)
except ValueError as exc: except ValueError as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=400) return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=400)
try: try:
@@ -64,7 +72,7 @@ class HttpControlAppFactory:
return JSONResponse(content={"status": "error", "detail": f"trace not found: {traceid}"}, status_code=404) return JSONResponse(content={"status": "error", "detail": f"trace not found: {traceid}"}, status_code=404)
except RuntimeError as exc: except RuntimeError as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=503) return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=503)
return self._trace_response(payload, trace_request) return self._trace_response_renderer.render(payload, trace_request)
return app return app
@@ -106,65 +114,3 @@ class HttpControlAppFactory:
if value < 0: if value < 0:
raise ValueError(f"query parameter must be >= 0: {name}={raw_value}") raise ValueError(f"query parameter must be >= 0: {name}={raw_value}")
return value return value
def _trace_request(self, request: Request) -> TraceQueryRequest:
raw_levels = request.query_params.get("levels")
raw_format = request.query_params.get("format", "text")
response_format = raw_format.strip().lower()
if response_format not in {"json", "text"}:
raise ValueError(f"unsupported trace format: {raw_format}")
return TraceQueryRequest(
levels=self._trace_levels(raw_levels),
include_attrs_json=self._bool_param(request, "attrs_json") or False,
response_format=response_format,
)
def _trace_levels(self, raw_levels: str | None) -> tuple[TraceLevel, ...]:
if raw_levels is None:
return ("ERROR", "WARNING", "INFO")
parts = [item.strip().upper() for item in raw_levels.split(",")]
levels = tuple(item for item in parts if item)
if not levels:
raise ValueError("trace levels must not be empty")
unsupported = [level for level in levels if level not in {"DEBUG", "INFO", "WARNING", "ERROR"}]
if unsupported:
raise ValueError(f"unsupported trace levels: {', '.join(unsupported)}")
return cast(tuple[TraceLevel, ...], levels)
def _trace_response(self, trace_view: TraceLogView, request: TraceQueryRequest) -> JSONResponse | PlainTextResponse:
if request.response_format == "json":
return JSONResponse(
content={
"trace_id": trace_view.trace_id,
"parent_id": trace_view.parent_id or "",
"child_ids": list(trace_view.child_ids),
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
}
)
lines = [
f"trace_id: {trace_view.trace_id}",
f"parent_id: {trace_view.parent_id or ''}",
]
lines.extend(self._child_id_lines(trace_view.child_ids))
lines.append("--------------------------------------------------")
previous_step: str | None = None
for record in trace_view.records:
current_step = str(record.step or "")
if previous_step is None:
lines.append(f"step: {current_step}")
elif current_step != previous_step:
lines.append("--------------------------------------------------")
lines.append(f"step: {current_step}")
previous_step = current_step
line = record.message
if request.include_attrs_json:
line = f"{line}, {json.dumps(record.attrs_json, ensure_ascii=False, separators=(',', ':'))}"
lines.append(line)
return PlainTextResponse(content="\n".join(lines))
def _child_id_lines(self, child_ids: tuple[str, ...]) -> list[str]:
lines = ["child_ids:"]
if not child_ids:
return lines
lines.extend(f" - {child_id}" for child_id in child_ids)
return lines
+11 -28
View File
@@ -4,6 +4,7 @@ import asyncio
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from app_runtime.control.action_responder import ControlActionResponder
from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel, TraceQueryRequest from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel, TraceQueryRequest
from app_runtime.contracts.trace import TraceLogView from app_runtime.contracts.trace import TraceLogView
from app_runtime.control.http_app import HttpControlAppFactory from app_runtime.control.http_app import HttpControlAppFactory
@@ -16,9 +17,11 @@ class HttpControlChannel(ControlChannel):
self._runner = UvicornThreadRunner(host, port, timeout) self._runner = UvicornThreadRunner(host, port, timeout)
self._factory = HttpControlAppFactory() self._factory = HttpControlAppFactory()
self._actions: ControlActionSet | None = None self._actions: ControlActionSet | None = None
self._action_responder: ControlActionResponder | None = None
async def start(self, actions: ControlActionSet) -> None: async def start(self, actions: ControlActionSet) -> None:
self._actions = actions self._actions = actions
self._action_responder = ControlActionResponder(actions, self._timeout)
app = self._factory.create(self._health_response, self._action_response, self._trace_response) app = self._factory.create(self._health_response, self._action_response, self._trace_response)
await self._runner.start(app) await self._runner.start(app)
@@ -40,34 +43,14 @@ class HttpControlChannel(ControlChannel):
_client_source: str = "unknown", _client_source: str = "unknown",
request: ControlActionRequest | None = None, request: ControlActionRequest | None = None,
) -> JSONResponse: ) -> JSONResponse:
if self._actions is None: if self._action_responder is None:
return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404) if self._actions is None:
callbacks = { return JSONResponse(
"start": self._actions.start, content={"status": "error", "detail": f"{action} handler is not configured"},
"stop": self._actions.stop, status_code=404,
"status": self._actions.status, )
} self._action_responder = ControlActionResponder(self._actions, self._timeout)
callback = callbacks.get(action) return await self._action_responder.respond(action, _client_source, request)
if callback is None:
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
action_request = request or ControlActionRequest()
action_timeout = self._action_timeout(action, action_request)
try:
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
except asyncio.TimeoutError:
return JSONResponse(
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
status_code=202,
)
except Exception as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
if action != "stop" or request.wait is False or request.timeout is None:
return base_timeout
return max(base_timeout, float(request.timeout) + 1.0)
async def _trace_response(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView: async def _trace_response(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView:
if self._actions is None or self._actions.trace_lookup is None: if self._actions is None or self._actions.trace_lookup is None:
+3 -2
View File
@@ -8,10 +8,11 @@ from uvicorn import Config, Server
class UvicornThreadRunner: class UvicornThreadRunner:
def __init__(self, host: str, port: int, timeout: int) -> None: def __init__(self, host: str, port: int, timeout: int, *, thread_name: str = "plba-http-control") -> None:
self._host = host self._host = host
self._port = port self._port = port
self._timeout = timeout self._timeout = timeout
self._thread_name = thread_name
self._server: Server | None = None self._server: Server | None = None
self._thread: Thread | None = None self._thread: Thread | None = None
self._error: BaseException | None = None self._error: BaseException | None = None
@@ -22,7 +23,7 @@ class UvicornThreadRunner:
self._error = None self._error = None
config = Config(app=app, host=self._host, port=self._port, log_level="warning") config = Config(app=app, host=self._host, port=self._port, log_level="warning")
self._server = Server(config) self._server = Server(config)
self._thread = Thread(target=self._serve, name="plba-http-control", daemon=True) self._thread = Thread(target=self._serve, name=self._thread_name, daemon=True)
self._thread.start() self._thread.start()
await self._wait_until_started() await self._wait_until_started()
+293
View File
@@ -0,0 +1,293 @@
from __future__ import annotations
import json
from html import escape
from urllib.parse import urlencode
from fastapi import Request
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, Response
from app_runtime.control.base import TraceQueryRequest
from app_runtime.contracts.trace import TraceLevel, TraceLogRecord, TraceLogView
TRACE_SECTION_SEPARATOR = "=" * 30
class TraceRequestParser:
def parse(self, request: Request) -> TraceQueryRequest:
raw_levels = request.query_params.get("levels")
raw_format = request.query_params.get("format", "html")
response_format = raw_format.strip().lower()
if response_format not in {"json", "text", "html"}:
raise ValueError(f"unsupported trace format: {raw_format}")
return TraceQueryRequest(
levels=self._trace_levels(raw_levels),
include_attrs_json=self._bool_param(request, "attrs_json") or False,
response_format=response_format,
ancestor_depth=self._ancestor_depth(request),
)
def _trace_levels(self, raw_levels: str | None) -> tuple[TraceLevel, ...]:
if raw_levels is None:
return ("ERROR", "WARNING", "INFO")
parts = [item.strip().upper() for item in raw_levels.split(",")]
levels = tuple(item for item in parts if item)
if not levels:
raise ValueError("trace levels must not be empty")
unsupported = [level for level in levels if level not in {"DEBUG", "INFO", "WARNING", "ERROR"}]
if unsupported:
raise ValueError(f"unsupported trace levels: {', '.join(unsupported)}")
return levels
def _bool_param(self, request: Request, name: str) -> bool | None:
raw_value = request.query_params.get(name)
if raw_value is None:
return None
normalized = raw_value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
raise ValueError(f"invalid boolean query parameter: {name}={raw_value}")
def _ancestor_depth(self, request: Request) -> int | None:
raw_value = request.query_params.get("ancestor_depth")
if raw_value is None:
return 0
normalized = raw_value.strip().lower()
if normalized == "all":
return None
try:
value = int(normalized)
except ValueError as exc:
raise ValueError(f"invalid ancestor depth query parameter: ancestor_depth={raw_value}") from exc
if value < 0:
raise ValueError(f"query parameter must be >= 0: ancestor_depth={raw_value}")
return value
class TraceResponseRenderer:
def render(self, trace_view: TraceLogView, request: TraceQueryRequest) -> Response:
if request.response_format == "json":
return self._render_json(trace_view, request)
if request.response_format == "html":
return self._render_html(trace_view, request)
return self._render_text(trace_view, request)
def _render_json(self, trace_view: TraceLogView, request: TraceQueryRequest) -> JSONResponse:
return JSONResponse(
content={
"trace_id": trace_view.trace_id,
"parent_id": trace_view.parent_id or "",
"child_ids": list(trace_view.child_ids),
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
"ancestors": [self._trace_payload(view, request) for view in trace_view.ancestors],
}
)
def _render_text(self, trace_view: TraceLogView, request: TraceQueryRequest) -> PlainTextResponse:
lineage = [*trace_view.ancestors, trace_view]
lines = self._text_trace_summary_lines(trace_view)
for index, entry in enumerate(lineage):
if index == 0:
lines.append("")
else:
lines.extend(["", ""])
lines.extend(self._text_trace_log_lines(entry, request))
return PlainTextResponse(content="\n".join(lines))
def _render_html(self, trace_view: TraceLogView, request: TraceQueryRequest) -> HTMLResponse:
title = escape(f"Trace {trace_view.trace_id}")
lines = self._html_lines(trace_view, request)
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{title}</title>
<style>
:root {{
color-scheme: dark;
--bg: #000000;
--fg: #ececec;
--step: #ffffff;
--link: #66d9ef;
--error: #ff817d;
--warning: #e9ebec;
--info: #d6d7d9;
--other: #ececec;
}}
body {{
margin: 0;
background: var(--bg);
color: var(--fg);
font: 13px/1.1 "SFMono-Regular", monospace;
}}
.page {{
padding: 14px 16px 24px;
}}
a {{
color: var(--link);
text-decoration: underline;
text-underline-offset: 2px;
}}
.line {{
white-space: pre-wrap;
word-break: break-word;
}}
.msg-error {{
color: var(--error);
}}
.msg-warning {{
color: var(--warning);
}}
.msg-info {{
color: var(--info);
}}
.msg-debug {{
color: var(--other);
}}
@media (max-width: 640px) {{
.page {{
padding: 12px 12px 20px;
}}
}}
</style>
</head>
<body>
<div class="page">
{lines}
</div>
</body>
</html>"""
return HTMLResponse(content=html)
def _child_id_lines(self, child_ids: tuple[str, ...]) -> list[str]:
lines = ["child_ids:"]
lines.extend(f" - {child_id}" for child_id in child_ids)
return lines
def _text_message(self, record: TraceLogRecord, include_attrs_json: bool) -> str:
if not include_attrs_json:
return record.message
return f"{record.message}, {json.dumps(record.attrs_json, ensure_ascii=False, separators=(',', ':'))}"
def _trace_href(self, trace_id: str, request: TraceQueryRequest) -> str:
params = {
"format": "html",
"levels": ",".join(request.levels),
"attrs_json": "true" if request.include_attrs_json else "false",
}
if request.ancestor_depth is None:
params["ancestor_depth"] = "all"
elif request.ancestor_depth > 0:
params["ancestor_depth"] = str(request.ancestor_depth)
query = urlencode(params)
return f"/traces/{trace_id}?{query}"
def _html_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> str:
lineage = [*trace_view.ancestors, trace_view]
lines = self._html_trace_summary_lines(trace_view, request)
for index, entry in enumerate(lineage):
if index == 0:
lines.append(self._html_plain_line(""))
else:
lines.extend([self._html_plain_line(""), self._html_plain_line("")])
lines.extend(self._html_trace_log_lines(entry, request))
return "".join(lines)
def _trace_payload(self, trace_view: TraceLogView, request: TraceQueryRequest) -> dict[str, object]:
return {
"trace_id": trace_view.trace_id,
"parent_id": trace_view.parent_id or "",
"child_ids": list(trace_view.child_ids),
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
}
def _text_trace_summary_lines(self, trace_view: TraceLogView) -> list[str]:
return [
f"trace_id: {trace_view.trace_id}",
f"parent_id: {trace_view.parent_id or ''}",
*self._child_id_lines(trace_view.child_ids),
]
def _text_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
lines = [
TRACE_SECTION_SEPARATOR,
f"trace_id: {trace_view.trace_id}",
"",
]
previous_step: str | None = None
for record in trace_view.records:
current_step = str(record.step or "")
if previous_step is None:
lines.append(f"step: {current_step}")
elif current_step != previous_step:
lines.append("------------------------------")
lines.append(f"step: {current_step}")
previous_step = current_step
lines.append(self._text_message(record, request.include_attrs_json))
return lines
def _html_trace_summary_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
return [
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
self._html_plain_line(f"parent_id: {self._optional_trace_link(trace_view.parent_id, request)}"),
self._html_plain_line("child_ids:"),
*(self._html_plain_line(f" - {self._trace_link(child_id, request)}") for child_id in trace_view.child_ids),
]
def _html_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
lines = [
self._html_plain_line(TRACE_SECTION_SEPARATOR),
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
self._html_plain_line(""),
]
previous_step: str | None = None
for record in trace_view.records:
current_step = str(record.step or "")
if previous_step is None:
lines.append(self._html_step_line(current_step))
lines.append(self._html_plain_line(""))
elif current_step != previous_step:
lines.append(self._html_plain_line(""))
lines.append(self._html_plain_line("------------------------------"))
lines.append(self._html_step_line(current_step))
lines.append(self._html_plain_line(""))
previous_step = current_step
lines.extend(self._html_message_lines(record, request.include_attrs_json))
return lines
def _html_message_lines(self, record: TraceLogRecord, include_attrs_json: bool) -> list[str]:
lines = [self._html_colored_line(self._text_message(record, include_attrs_json), record.level)]
return lines
def _html_plain_line(self, content: str) -> str:
return f"<div class=\"line\">{content or '&nbsp;'}</div>"
def _html_step_line(self, content: str) -> str:
return f"<div class=\"line\" style=\"color: var(--step);\">{escape(content) or '&nbsp;'}</div>"
def _html_colored_line(self, content: str, level: str) -> str:
level_class = self._level_class(level)
return f"<div class=\"line {level_class}\">{escape(content)}</div>"
def _trace_link(self, trace_id: str, request: TraceQueryRequest) -> str:
href = escape(self._trace_href(trace_id, request), quote=True)
text = escape(trace_id)
return f"<a href=\"{href}\">{text}</a>"
def _optional_trace_link(self, trace_id: str | None, request: TraceQueryRequest) -> str:
if not trace_id:
return ""
return self._trace_link(trace_id, request)
def _level_class(self, level: str) -> str:
if level == "ERROR":
return "msg-error"
if level == "WARNING":
return "msg-warning"
if level == "INFO":
return "msg-info"
return "msg-debug"
+5
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
from app_runtime.contracts.health import HealthContributor from app_runtime.contracts.health import HealthContributor
from app_runtime.contracts.worker import Worker from app_runtime.contracts.worker import Worker
from app_runtime.core.service_container import ServiceContainer from app_runtime.core.service_container import ServiceContainer
from app_runtime.http.base import HttpRouteRegistrar
class ModuleRegistry: class ModuleRegistry:
@@ -10,6 +11,7 @@ class ModuleRegistry:
self.services = services self.services = services
self.workers: list[Worker] = [] self.workers: list[Worker] = []
self.health_contributors: list[HealthContributor] = [] self.health_contributors: list[HealthContributor] = []
self.http_route_registrars: list[HttpRouteRegistrar] = []
self.modules: list[str] = [] self.modules: list[str] = []
def register_module(self, name: str) -> None: def register_module(self, name: str) -> None:
@@ -20,3 +22,6 @@ class ModuleRegistry:
def add_health_contributor(self, contributor: HealthContributor) -> None: def add_health_contributor(self, contributor: HealthContributor) -> None:
self.health_contributors.append(contributor) self.health_contributors.append(contributor)
def add_http_routes(self, registrar: HttpRouteRegistrar) -> None:
self.http_route_registrars.append(registrar)
+14 -1
View File
@@ -13,6 +13,7 @@ from app_runtime.core.registration import ModuleRegistry
from app_runtime.core.service_container import ServiceContainer from app_runtime.core.service_container import ServiceContainer
from app_runtime.core.types import HealthPayload, LifecycleState from app_runtime.core.types import HealthPayload, LifecycleState
from app_runtime.health.registry import HealthRegistry from app_runtime.health.registry import HealthRegistry
from app_runtime.http.service import ApplicationHttpService
from app_runtime.logging.manager import LogManager from app_runtime.logging.manager import LogManager
from app_runtime.tracing.reader import build_trace_log_reader from app_runtime.tracing.reader import build_trace_log_reader
from app_runtime.tracing.service import TraceService from app_runtime.tracing.service import TraceService
@@ -32,6 +33,7 @@ class RuntimeManager:
logs: LogManager | None = None, logs: LogManager | None = None,
workers: WorkerSupervisor | None = None, workers: WorkerSupervisor | None = None,
control_plane: ControlPlaneService | None = None, control_plane: ControlPlaneService | None = None,
application_http: ApplicationHttpService | None = None,
) -> None: ) -> None:
self.configuration = configuration or ConfigurationManager() self.configuration = configuration or ConfigurationManager()
self.services = services or ServiceContainer() self.services = services or ServiceContainer()
@@ -40,6 +42,7 @@ class RuntimeManager:
self.logs = logs or LogManager() self.logs = logs or LogManager()
self.workers = workers or WorkerSupervisor() self.workers = workers or WorkerSupervisor()
self.control_plane = control_plane or ControlPlaneService() self.control_plane = control_plane or ControlPlaneService()
self.application_http = application_http or ApplicationHttpService()
self.registry = ModuleRegistry(self.services) self.registry = ModuleRegistry(self.services)
self._started = False self._started = False
self._state = LifecycleState.IDLE self._state = LifecycleState.IDLE
@@ -67,6 +70,8 @@ class RuntimeManager:
self.workers.start() self.workers.start()
if start_control_plane: if start_control_plane:
self.control_plane.start(self) self.control_plane.start(self)
self._register_application_http_routes()
self.application_http.start(self)
self._started = True self._started = True
self._refresh_state() self._refresh_state()
@@ -75,6 +80,7 @@ class RuntimeManager:
return return
self._state = LifecycleState.STOPPING self._state = LifecycleState.STOPPING
self.workers.stop(timeout=timeout, force=force) self.workers.stop(timeout=timeout, force=force)
self.application_http.stop()
if stop_control_plane: if stop_control_plane:
self.control_plane.stop() self.control_plane.stop()
self._started = False self._started = False
@@ -120,6 +126,7 @@ class RuntimeManager:
except TimeoutError: except TimeoutError:
return self._action_detail("runtime stop is still in progress", timed_out=True) return self._action_detail("runtime stop is still in progress", timed_out=True)
self.application_http.stop()
self._refresh_state() self._refresh_state()
if self._state == LifecycleState.STOPPED: if self._state == LifecycleState.STOPPED:
self._started = False self._started = False
@@ -134,7 +141,7 @@ class RuntimeManager:
reader = build_trace_log_reader(self.traces.transport) reader = build_trace_log_reader(self.traces.transport)
if reader is None: if reader is None:
raise RuntimeError("trace log reader is not configured") raise RuntimeError("trace log reader is not configured")
trace_view = reader.read_trace(trace_id, request.levels) trace_view = reader.read_trace(trace_id, request.levels, request.ancestor_depth)
if trace_view is None: if trace_view is None:
raise KeyError(trace_id) raise KeyError(trace_id)
return trace_view return trace_view
@@ -148,6 +155,7 @@ class RuntimeManager:
self.services.register("logs", self.logs) self.services.register("logs", self.logs)
self.services.register("workers", self.workers) self.services.register("workers", self.workers)
self.services.register("control_plane", self.control_plane) self.services.register("control_plane", self.control_plane)
self.services.register("application_http", self.application_http)
self._core_registered = True self._core_registered = True
def _register_health_contributors(self) -> None: def _register_health_contributors(self) -> None:
@@ -161,6 +169,11 @@ class RuntimeManager:
self.workers.register(worker) self.workers.register(worker)
self._workers_registered = True self._workers_registered = True
def _register_application_http_routes(self) -> None:
for registrar in self.registry.http_route_registrars:
self.application_http.register_routes(registrar)
self.registry.http_route_registrars.clear()
def _refresh_state(self) -> None: def _refresh_state(self) -> None:
lifecycle = self.workers.lifecycle_state() lifecycle = self.workers.lifecycle_state()
+14
View File
@@ -0,0 +1,14 @@
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
from app_runtime.http.http_channel import HttpApplicationChannel
from app_runtime.http.service import ApplicationHttpService
from app_runtime.http.unified_service import UnifiedHttpService
__all__ = [
"ApplicationHttpChannel",
"ApplicationHttpService",
"HttpApplicationAppFactory",
"HttpApplicationChannel",
"HttpRouteRegistrar",
"UnifiedHttpService",
]
+23
View File
@@ -0,0 +1,23 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Protocol
from fastapi import FastAPI
from app_runtime.core.service_container import ServiceContainer
class HttpRouteRegistrar(Protocol):
def register(self, app: FastAPI, services: ServiceContainer) -> None:
"""Register application routes on the provided FastAPI app."""
class ApplicationHttpChannel(ABC):
@abstractmethod
async def start(self, app: FastAPI) -> None:
"""Start the HTTP channel with the prepared application app."""
@abstractmethod
async def stop(self) -> None:
"""Stop the HTTP channel and release resources."""
+37
View File
@@ -0,0 +1,37 @@
from __future__ import annotations
import logging
import time
from collections.abc import Iterable
from fastapi import FastAPI, Request
from app_runtime.core.service_container import ServiceContainer
from app_runtime.http.base import HttpRouteRegistrar
LOGGER = logging.getLogger(__name__)
class HttpApplicationAppFactory:
def create(self, registrars: Iterable[HttpRouteRegistrar], services: ServiceContainer) -> FastAPI:
app = FastAPI(title="PLBA Application API")
self._register_middleware(app)
for registrar in registrars:
registrar.register(app, services)
return app
def _register_middleware(self, app: FastAPI) -> None:
@app.middleware("http")
async def track_request(request: Request, call_next): # type: ignore[no-untyped-def]
started = time.monotonic()
response = await call_next(request)
duration_ms = int((time.monotonic() - started) * 1000)
response.headers["X-Response-Time-Ms"] = str(duration_ms)
LOGGER.info(
"Application HTTP request handled: method=%s path=%s status=%s duration_ms=%s",
request.method,
request.url.path,
response.status_code,
duration_ms,
)
return response
+21
View File
@@ -0,0 +1,21 @@
from __future__ import annotations
from fastapi import FastAPI
from app_runtime.control.http_runner import UvicornThreadRunner
from app_runtime.http.base import ApplicationHttpChannel
class HttpApplicationChannel(ApplicationHttpChannel):
def __init__(self, host: str, port: int, timeout: int) -> None:
self._runner = UvicornThreadRunner(host, port, timeout, thread_name="plba-http-application")
async def start(self, app: FastAPI) -> None:
await self._runner.start(app)
async def stop(self) -> None:
await self._runner.stop()
@property
def port(self) -> int:
return self._runner.port
+42
View File
@@ -0,0 +1,42 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
if TYPE_CHECKING:
from app_runtime.core.runtime import RuntimeManager
class ApplicationHttpService:
def __init__(self, app_factory: HttpApplicationAppFactory | None = None) -> None:
self._channels: list[ApplicationHttpChannel] = []
self._registrars: list[HttpRouteRegistrar] = []
self._app_factory = app_factory or HttpApplicationAppFactory()
def register_channel(self, channel: ApplicationHttpChannel) -> None:
self._channels.append(channel)
def register_routes(self, registrar: HttpRouteRegistrar) -> None:
self._registrars.append(registrar)
def start(self, runtime: RuntimeManager) -> None:
if not self._channels:
return
asyncio.run(self._start_async(runtime))
def stop(self) -> None:
if not self._channels:
return
asyncio.run(self._stop_async())
async def _start_async(self, runtime: RuntimeManager) -> None:
app = self._app_factory.create(self._registrars, runtime.services)
for channel in self._channels:
await channel.start(app)
async def _stop_async(self) -> None:
for channel in reversed(self._channels):
await channel.stop()
+68
View File
@@ -0,0 +1,68 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from app_runtime.control.action_responder import ControlActionResponder
from app_runtime.control.base import ControlActionSet
from app_runtime.control.http_app import HttpControlAppFactory
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
if TYPE_CHECKING:
from app_runtime.core.runtime import RuntimeManager
class UnifiedHttpService:
"""Publish control and application routes through the same HTTP channel."""
def __init__(
self,
app_factory: HttpApplicationAppFactory | None = None,
control_app_factory: HttpControlAppFactory | None = None,
control_timeout: int = 5,
) -> None:
self._channels: list[ApplicationHttpChannel] = []
self._registrars: list[HttpRouteRegistrar] = []
self._app_factory = app_factory or HttpApplicationAppFactory()
self._control_app_factory = control_app_factory or HttpControlAppFactory()
self._control_timeout = control_timeout
def register_channel(self, channel: ApplicationHttpChannel) -> None:
self._channels.append(channel)
def register_routes(self, registrar: HttpRouteRegistrar) -> None:
self._registrars.append(registrar)
def start(self, runtime: RuntimeManager) -> None:
if not self._channels:
return
asyncio.run(self._start_async(runtime))
def stop(self) -> None:
if not self._channels:
return
asyncio.run(self._stop_async())
async def _start_async(self, runtime: RuntimeManager) -> None:
app = self._app_factory.create(self._registrars, runtime.services)
actions = ControlActionSet(
health=runtime.health_status,
start=runtime.start_runtime,
stop=runtime.stop_runtime,
status=runtime.runtime_status,
trace_lookup=runtime.trace_logs,
)
action_responder = ControlActionResponder(actions, self._control_timeout)
control_app = self._control_app_factory.create(
actions.health,
action_responder.respond,
actions.trace_lookup,
)
app.router.routes.extend(control_app.router.routes)
for channel in self._channels:
await channel.start(app)
async def _stop_async(self) -> None:
for channel in reversed(self._channels):
await channel.stop()
+35 -1
View File
@@ -11,10 +11,16 @@ class MySqlTraceLogReader(TraceLogReader):
def __init__(self, connection_factory: MySqlTraceConnectionFactory) -> None: def __init__(self, connection_factory: MySqlTraceConnectionFactory) -> None:
self._connection_factory = connection_factory self._connection_factory = connection_factory
def read_trace(self, trace_id: str, levels: tuple[TraceLevel, ...]) -> TraceLogView | None: def read_trace(
self,
trace_id: str,
levels: tuple[TraceLevel, ...],
ancestor_depth: int | None = 0,
) -> TraceLogView | None:
parent_id = self._read_parent_id(trace_id) parent_id = self._read_parent_id(trace_id)
if parent_id is None and not self._trace_exists(trace_id): if parent_id is None and not self._trace_exists(trace_id):
return None return None
ancestors = self._read_ancestors(parent_id, levels, ancestor_depth)
child_ids = self._read_child_ids(trace_id) child_ids = self._read_child_ids(trace_id)
records = self._read_records(trace_id, levels) records = self._read_records(trace_id, levels)
return TraceLogView( return TraceLogView(
@@ -22,8 +28,36 @@ class MySqlTraceLogReader(TraceLogReader):
parent_id=parent_id, parent_id=parent_id,
child_ids=tuple(child_ids), child_ids=tuple(child_ids),
records=tuple(records), records=tuple(records),
ancestors=tuple(ancestors),
) )
def _read_ancestors(
self,
parent_id: str | None,
levels: tuple[TraceLevel, ...],
ancestor_depth: int | None,
) -> list[TraceLogView]:
if parent_id is None or ancestor_depth == 0:
return []
remaining_depth = ancestor_depth
ancestors: list[TraceLogView] = []
current_trace_id = parent_id
while current_trace_id is not None and (remaining_depth is None or remaining_depth > 0):
current_parent_id = self._read_parent_id(current_trace_id)
ancestors.append(
TraceLogView(
trace_id=current_trace_id,
parent_id=current_parent_id,
child_ids=tuple(self._read_child_ids(current_trace_id)),
records=tuple(self._read_records(current_trace_id, levels)),
)
)
current_trace_id = current_parent_id
if remaining_depth is not None:
remaining_depth -= 1
ancestors.reverse()
return ancestors
def _trace_exists(self, trace_id: str) -> bool: def _trace_exists(self, trace_id: str) -> bool:
query = "SELECT 1 FROM trace_contexts WHERE trace_id = %s" query = "SELECT 1 FROM trace_contexts WHERE trace_id = %s"
with self._connection_factory.connect() as connection: with self._connection_factory.connect() as connection:
+14
View File
@@ -15,6 +15,14 @@ from plba.contracts import (
) )
from plba.core import ConfigurationManager, RuntimeManager, ServiceContainer from plba.core import ConfigurationManager, RuntimeManager, ServiceContainer
from plba.health import HealthRegistry from plba.health import HealthRegistry
from plba.http import (
ApplicationHttpChannel,
ApplicationHttpService,
HttpApplicationAppFactory,
HttpApplicationChannel,
HttpRouteRegistrar,
UnifiedHttpService,
)
from plba.logging import LogManager from plba.logging import LogManager
from plba.queue import InMemoryTaskQueue from plba.queue import InMemoryTaskQueue
from plba.tracing import MySqlTraceTransport, NoOpTraceTransport, TraceService from plba.tracing import MySqlTraceTransport, NoOpTraceTransport, TraceService
@@ -43,7 +51,13 @@ __all__ = [
"FileConfigProvider", "FileConfigProvider",
"HealthContributor", "HealthContributor",
"HealthRegistry", "HealthRegistry",
"ApplicationHttpChannel",
"ApplicationHttpService",
"HttpApplicationAppFactory",
"HttpApplicationChannel",
"HttpRouteRegistrar",
"HttpControlChannel", "HttpControlChannel",
"UnifiedHttpService",
"InMemoryTaskQueue", "InMemoryTaskQueue",
"LogManager", "LogManager",
"MySqlTraceTransport", "MySqlTraceTransport",
+12
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
from app_runtime.control.http_channel import HttpControlChannel from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.core.runtime import RuntimeManager from app_runtime.core.runtime import RuntimeManager
from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.application import ApplicationModule
from app_runtime.http.http_channel import HttpApplicationChannel
def create_runtime( def create_runtime(
@@ -13,6 +14,9 @@ def create_runtime(
control_host: str = "127.0.0.1", control_host: str = "127.0.0.1",
control_port: int = 8080, control_port: int = 8080,
control_timeout: int = 5, control_timeout: int = 5,
application_host: str | None = None,
application_port: int = 15000,
application_timeout: int = 5,
) -> RuntimeManager: ) -> RuntimeManager:
runtime = RuntimeManager() runtime = RuntimeManager()
if config_path is not None: if config_path is not None:
@@ -25,5 +29,13 @@ def create_runtime(
timeout=control_timeout, timeout=control_timeout,
) )
) )
if application_host is not None:
runtime.application_http.register_channel(
HttpApplicationChannel(
host=application_host,
port=application_port,
timeout=application_timeout,
)
)
runtime.register_module(module) runtime.register_module(module)
return runtime return runtime
+14
View File
@@ -0,0 +1,14 @@
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
from app_runtime.http.http_channel import HttpApplicationChannel
from app_runtime.http.service import ApplicationHttpService
from app_runtime.http.unified_service import UnifiedHttpService
__all__ = [
"ApplicationHttpChannel",
"ApplicationHttpService",
"HttpApplicationAppFactory",
"HttpApplicationChannel",
"HttpRouteRegistrar",
"UnifiedHttpService",
]
+234
View File
@@ -0,0 +1,234 @@
from __future__ import annotations
import http.client
from dataclasses import dataclass
from pathlib import Path
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse
from fastapi.testclient import TestClient
import pytest
from app_runtime.contracts.application import ApplicationModule
from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.core.registration import ModuleRegistry
from app_runtime.core.runtime import RuntimeManager
from app_runtime.http.base import ApplicationHttpChannel
from app_runtime.http.http_channel import HttpApplicationChannel
from app_runtime.http.unified_service import UnifiedHttpService
try:
import python_multipart # noqa: F401
except ImportError:
HAS_MULTIPART = False
else:
HAS_MULTIPART = True
class RecordingChannel(ApplicationHttpChannel):
def __init__(self) -> None:
self.apps: list[FastAPI] = []
self.stop_calls = 0
async def start(self, app: FastAPI) -> None:
self.apps.append(app)
async def stop(self) -> None:
self.stop_calls += 1
class PingRoutes:
def register(self, app: FastAPI, services) -> None: # type: ignore[no-untyped-def]
@app.get("/estimate/ping")
async def ping() -> dict[str, str]:
return {"status": "ok"}
@dataclass
class ServiceBackedRoutes:
download_path: Path
def register(self, app: FastAPI, services) -> None: # type: ignore[no-untyped-def]
marker = services.get("task_query_service")
@app.get("/estimate/api/tasks")
async def list_tasks() -> dict[str, object]:
return {"marker": marker["marker"]}
@app.post("/estimate/api/tasks")
async def create_task(file: UploadFile = File(...)) -> dict[str, object]:
payload = await file.read()
return {"filename": file.filename, "size": len(payload)}
@app.get("/estimate/api/tasks/result")
async def download_result() -> FileResponse:
return FileResponse(self.download_path, filename=self.download_path.name)
class MetricsRoutes:
def register(self, app: FastAPI, services) -> None: # type: ignore[no-untyped-def]
@app.get("/estimate/api/metrics")
async def metrics() -> dict[str, int]:
return {"count": 1}
class HttpModule(ApplicationModule):
def __init__(self, *registrars: object) -> None:
self._registrars = registrars
@property
def name(self) -> str:
return "http-module"
def register(self, registry: ModuleRegistry) -> None:
for registrar in self._registrars:
registry.add_http_routes(registrar)
def _application_client(channel: RecordingChannel) -> TestClient:
assert channel.apps
return TestClient(channel.apps[0])
def _http_request(port: int, path: str) -> tuple[int, bytes]:
connection = http.client.HTTPConnection("127.0.0.1", port, timeout=2)
try:
connection.request("GET", path)
response = connection.getresponse()
payload = response.read()
return response.status, payload
finally:
connection.close()
def test_runtime_starts_application_http_and_registers_routes() -> None:
runtime = RuntimeManager()
channel = RecordingChannel()
runtime.application_http.register_channel(channel)
runtime.register_module(HttpModule(PingRoutes()))
runtime.start(start_control_plane=False)
try:
assert len(channel.apps) == 1
client = _application_client(channel)
with client:
response = client.get("/estimate/ping")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
assert response.headers["x-response-time-ms"].isdigit()
finally:
runtime.stop(stop_control_plane=False)
assert channel.stop_calls == 1
def test_application_routes_see_runtime_services_and_support_upload_download(tmp_path: Path) -> None:
if not HAS_MULTIPART:
pytest.skip("python-multipart is not installed in the local environment")
runtime = RuntimeManager()
runtime.services.register("task_query_service", {"marker": "from-container"})
result_path = tmp_path / "result.txt"
result_path.write_text("ready", encoding="utf-8")
channel = RecordingChannel()
runtime.application_http.register_channel(channel)
runtime.register_module(HttpModule(ServiceBackedRoutes(result_path), MetricsRoutes()))
runtime.start(start_control_plane=False)
client = _application_client(channel)
try:
with client:
list_response = client.get("/estimate/api/tasks")
assert list_response.status_code == 200
assert list_response.json() == {"marker": "from-container"}
upload_response = client.post(
"/estimate/api/tasks",
files={"file": ("input.txt", b"payload", "text/plain")},
)
assert upload_response.status_code == 200
assert upload_response.json() == {"filename": "input.txt", "size": 7}
metrics_response = client.get("/estimate/api/metrics")
assert metrics_response.status_code == 200
assert metrics_response.json() == {"count": 1}
download_response = client.get("/estimate/api/tasks/result")
assert download_response.status_code == 200
assert download_response.content == b"ready"
finally:
runtime.stop(stop_control_plane=False)
def test_application_http_stop_shuts_down_real_server() -> None:
runtime = RuntimeManager()
channel = HttpApplicationChannel(host="127.0.0.1", port=0, timeout=2)
runtime.application_http.register_channel(channel)
runtime.register_module(HttpModule(PingRoutes()))
runtime.start(start_control_plane=False)
try:
status, _ = _http_request(channel.port, "/estimate/ping")
assert status == 200
finally:
runtime.stop(stop_control_plane=False)
try:
_http_request(channel.port, "/estimate/ping")
except OSError:
pass
else:
raise AssertionError("application HTTP server is still reachable after stop")
def test_control_plane_and_application_http_work_independently() -> None:
runtime = RuntimeManager()
control_channel = HttpControlChannel(host="127.0.0.1", port=0, timeout=2)
app_channel = HttpApplicationChannel(host="127.0.0.1", port=0, timeout=2)
runtime.control_plane.register_channel(control_channel)
runtime.application_http.register_channel(app_channel)
runtime.register_module(HttpModule(PingRoutes()))
runtime.start()
try:
control_status, _ = _http_request(control_channel.port, "/health")
app_status, _ = _http_request(app_channel.port, "/estimate/ping")
assert control_status == 200
assert app_status == 200
control_missing_status, _ = _http_request(control_channel.port, "/estimate/ping")
app_missing_status, _ = _http_request(app_channel.port, "/health")
assert control_missing_status == 404
assert app_missing_status == 404
runtime.application_http.stop()
control_status, _ = _http_request(control_channel.port, "/health")
assert control_status == 200
finally:
runtime.control_plane.stop()
runtime.stop(stop_control_plane=False)
def test_unified_http_serves_control_and_application_routes_on_one_app() -> None:
runtime = RuntimeManager(application_http=UnifiedHttpService())
channel = RecordingChannel()
runtime.application_http.register_channel(channel)
runtime.register_module(HttpModule(PingRoutes()))
runtime.start(start_control_plane=False)
try:
client = _application_client(channel)
with client:
health_response = client.get("/health")
action_response = client.get("/actions/status")
app_response = client.get("/estimate/ping")
assert health_response.status_code == 200
assert health_response.json()["status"] == "ok"
assert action_response.status_code == 200
assert action_response.json() == {"status": "ok", "detail": "idle"}
assert app_response.status_code == 200
assert app_response.json() == {"status": "ok"}
finally:
runtime.stop(stop_control_plane=False)
+450 -11
View File
@@ -46,7 +46,7 @@ def _build_client(trace_provider=None) -> TestClient:
return TestClient(app) return TestClient(app)
def test_trace_endpoint_returns_text_with_default_levels() -> None: def test_trace_endpoint_returns_html_by_default() -> None:
captured: list[tuple[str, TraceQueryRequest]] = [] captured: list[tuple[str, TraceQueryRequest]] = []
async def trace_provider(trace_id: str, request: TraceQueryRequest) -> TraceLogView: async def trace_provider(trace_id: str, request: TraceQueryRequest) -> TraceLogView:
@@ -67,6 +67,42 @@ def test_trace_endpoint_returns_text_with_default_levels() -> None:
finally: finally:
client.close() client.close()
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/html")
assert "trace_id:" in response.text
assert "first error" in response.text
assert "second warning" in response.text
assert captured == [
(
"trace-1",
TraceQueryRequest(
levels=("ERROR", "WARNING", "INFO"),
include_attrs_json=False,
response_format="html",
ancestor_depth=0,
),
)
]
def test_trace_endpoint_returns_text_when_requested() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="root-trace",
child_ids=("child-1", "child-2"),
records=(
_trace_record(row_id=1, level="ERROR", message="first error"),
_trace_record(row_id=2, level="WARNING", message="second warning"),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=text")
finally:
client.close()
assert response.status_code == 200 assert response.status_code == 200
assert response.text == ( assert response.text == (
"trace_id: trace-1\n" "trace_id: trace-1\n"
@@ -74,12 +110,14 @@ def test_trace_endpoint_returns_text_with_default_levels() -> None:
"child_ids:\n" "child_ids:\n"
" - child-1\n" " - child-1\n"
" - child-2\n" " - child-2\n"
"--------------------------------------------------\n" "\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: process\n" "step: process\n"
"first error\n" "first error\n"
"second warning" "second warning"
) )
assert captured == [("trace-1", TraceQueryRequest(levels=("ERROR", "WARNING", "INFO"), include_attrs_json=False, response_format="text"))]
def test_trace_endpoint_appends_attrs_json_in_text_mode() -> None: def test_trace_endpoint_appends_attrs_json_in_text_mode() -> None:
@@ -95,7 +133,7 @@ def test_trace_endpoint_appends_attrs_json_in_text_mode() -> None:
client = _build_client(trace_provider) client = _build_client(trace_provider)
try: try:
response = client.get("/traces/trace-1?attrs_json=true") response = client.get("/traces/trace-1?format=text&attrs_json=true")
finally: finally:
client.close() client.close()
@@ -104,7 +142,10 @@ def test_trace_endpoint_appends_attrs_json_in_text_mode() -> None:
"trace_id: trace-1\n" "trace_id: trace-1\n"
"parent_id: \n" "parent_id: \n"
"child_ids:\n" "child_ids:\n"
"--------------------------------------------------\n" "\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: process\n" "step: process\n"
'failure, {"attempt":2,"source":"crm"}' 'failure, {"attempt":2,"source":"crm"}'
) )
@@ -125,7 +166,7 @@ def test_trace_endpoint_separates_messages_by_step_in_text_mode() -> None:
client = _build_client(trace_provider) client = _build_client(trace_provider)
try: try:
response = client.get("/traces/trace-1") response = client.get("/traces/trace-1?format=text")
finally: finally:
client.close() client.close()
@@ -134,11 +175,14 @@ def test_trace_endpoint_separates_messages_by_step_in_text_mode() -> None:
"trace_id: trace-1\n" "trace_id: trace-1\n"
"parent_id: \n" "parent_id: \n"
"child_ids:\n" "child_ids:\n"
"--------------------------------------------------\n" "\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: load_stocks\n" "step: load_stocks\n"
"load first\n" "load first\n"
"load second\n" "load second\n"
"--------------------------------------------------\n" "------------------------------\n"
"step: filter_stocks\n" "step: filter_stocks\n"
"filter first" "filter first"
) )
@@ -178,14 +222,229 @@ def test_trace_endpoint_returns_json_payload() -> None:
"attrs_json": {"batch": 7}, "attrs_json": {"batch": 7},
} }
], ],
"ancestors": [],
} }
def test_trace_endpoint_returns_json_payload_with_ancestors() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=("child-1",),
records=(
_trace_record(row_id=3, level="INFO", message="done", attrs_json={"batch": 7}),
),
ancestors=(
TraceLogView(
trace_id="root-1",
parent_id=None,
child_ids=("parent-1",),
records=(
_trace_record(row_id=4, level="INFO", message="root info"),
),
),
TraceLogView(
trace_id="parent-1",
parent_id="root-1",
child_ids=("trace-1", "sibling-1"),
records=(
_trace_record(row_id=5, level="WARNING", message="parent warning"),
),
),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=json&ancestor_depth=1")
finally:
client.close()
assert response.status_code == 200
assert response.json()["ancestors"] == [
{
"trace_id": "root-1",
"parent_id": "",
"child_ids": ["parent-1"],
"messages": [
{
"id": 4,
"trace_id": "trace-1",
"event_time": "2026-04-28T10:11:12+00:00",
"step": "process",
"status": "failed",
"level": "INFO",
"message": "root info",
}
],
},
{
"trace_id": "parent-1",
"parent_id": "root-1",
"child_ids": ["trace-1", "sibling-1"],
"messages": [
{
"id": 5,
"trace_id": "trace-1",
"event_time": "2026-04-28T10:11:12+00:00",
"step": "process",
"status": "failed",
"level": "WARNING",
"message": "parent warning",
}
],
}
]
def test_trace_endpoint_returns_html_page_with_related_links() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=("child-1", "child-2"),
records=(
_trace_record(row_id=1, level="INFO", message="loaded prices", step="load_stocks", status="ok"),
_trace_record(row_id=2, level="WARNING", message="filtered suspicious ticker", step="filter_stocks", status="degraded"),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=html&attrs_json=true")
finally:
client.close()
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/html")
assert "background: var(--bg);" in response.text
assert "--bg: #000000;" in response.text
assert "--fg: #ececec;" in response.text
assert "--step: #ffffff;" in response.text
assert "--info: #d6d7d9;" in response.text
assert "--warning: #e9ebec;" in response.text
assert "--error: #ff817d;" in response.text
assert "--other: #ececec;" in response.text
assert 'font: 13px/1.1 "SFMono-Regular", monospace;' in response.text
assert '<div class="line">trace_id: <a href="/traces/trace-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">trace-1</a></div>' in response.text
assert '<div class="line">parent_id: <a href="/traces/parent-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">parent-1</a></div>' in response.text
assert '<div class="line">child_ids:</div>' in response.text
assert '<div class="line"> - <a href="/traces/child-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">child-1</a></div>' in response.text
assert '<div class="line"> - <a href="/traces/child-2?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">child-2</a></div>' in response.text
assert '<div class="line">==============================</div>' in response.text
assert '<div class="line" style="color: var(--step);">load_stocks</div>' in response.text
assert '<div class="line">trace_id: <a href="/traces/trace-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">trace-1</a></div>' in response.text
assert '<div class="line" style="color: var(--step);">filter_stocks</div>' in response.text
assert "loaded prices" in response.text
assert "filtered suspicious ticker" in response.text
assert "2026-04-28T10:11:12+00:00 | INFO | ok" not in response.text
assert "2026-04-28T10:11:12+00:00 | WARNING | degraded" not in response.text
assert "Related Traces" not in response.text
def test_trace_endpoint_renders_ancestors_in_text_mode() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=(),
records=(_trace_record(row_id=1, level="INFO", message="child message"),),
ancestors=(
TraceLogView(
trace_id="root-1",
parent_id=None,
child_ids=("parent-1",),
records=(_trace_record(row_id=2, level="INFO", message="root message"),),
),
TraceLogView(
trace_id="parent-1",
parent_id="root-1",
child_ids=("trace-1",),
records=(_trace_record(row_id=3, level="WARNING", message="parent message"),),
),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=text&ancestor_depth=1")
finally:
client.close()
assert response.status_code == 200
assert response.text == (
"trace_id: trace-1\n"
"parent_id: parent-1\n"
"child_ids:\n"
"\n"
"==============================\n"
"trace_id: root-1\n"
"\n"
"step: process\n"
"root message\n"
"\n"
"\n"
"==============================\n"
"trace_id: parent-1\n"
"\n"
"step: process\n"
"parent message\n"
"\n"
"\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: process\n"
"child message"
)
def test_trace_endpoint_preserves_ancestor_depth_in_html_links() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=("child-1",),
records=(_trace_record(row_id=1, level="INFO", message="loaded prices"),),
ancestors=(
TraceLogView(
trace_id="root-1",
parent_id=None,
child_ids=("parent-1",),
records=(_trace_record(row_id=2, level="INFO", message="root info"),),
),
TraceLogView(
trace_id="parent-1",
parent_id="root-1",
child_ids=("trace-1",),
records=(_trace_record(row_id=3, level="WARNING", message="parent warning"),),
),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=html&attrs_json=true&ancestor_depth=all")
finally:
client.close()
assert response.status_code == 200
assert 'href="/traces/trace-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true&amp;ancestor_depth=all"' in response.text
assert 'href="/traces/root-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true&amp;ancestor_depth=all"' in response.text
assert 'href="/traces/parent-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true&amp;ancestor_depth=all"' in response.text
assert response.text.index("root info") < response.text.index("parent warning") < response.text.index("loaded prices")
assert "root info" in response.text
assert "parent warning" in response.text
def test_trace_endpoint_validates_query_params() -> None: def test_trace_endpoint_validates_query_params() -> None:
client = _build_client(lambda _trace_id, _request: None) client = _build_client(lambda _trace_id, _request: None)
try: try:
invalid_level = client.get("/traces/trace-1?levels=error,fatal") invalid_level = client.get("/traces/trace-1?levels=error,fatal")
invalid_format = client.get("/traces/trace-1?format=xml") invalid_format = client.get("/traces/trace-1?format=xml")
invalid_ancestor_depth = client.get("/traces/trace-1?ancestor_depth=-1")
invalid_ancestor_type = client.get("/traces/trace-1?ancestor_depth=up")
finally: finally:
client.close() client.close()
@@ -193,6 +452,16 @@ def test_trace_endpoint_validates_query_params() -> None:
assert invalid_level.json() == {"status": "error", "detail": "unsupported trace levels: FATAL"} assert invalid_level.json() == {"status": "error", "detail": "unsupported trace levels: FATAL"}
assert invalid_format.status_code == 400 assert invalid_format.status_code == 400
assert invalid_format.json() == {"status": "error", "detail": "unsupported trace format: xml"} assert invalid_format.json() == {"status": "error", "detail": "unsupported trace format: xml"}
assert invalid_ancestor_depth.status_code == 400
assert invalid_ancestor_depth.json() == {
"status": "error",
"detail": "query parameter must be >= 0: ancestor_depth=-1",
}
assert invalid_ancestor_type.status_code == 400
assert invalid_ancestor_type.json() == {
"status": "error",
"detail": "invalid ancestor depth query parameter: ancestor_depth=up",
}
def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None: def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
@@ -204,15 +473,21 @@ def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
) )
class StubReader: class StubReader:
def read_trace(self, trace_id: str, levels: tuple[str, ...]) -> TraceLogView | None: def read_trace(
self,
trace_id: str,
levels: tuple[str, ...],
ancestor_depth: int | None = 0,
) -> TraceLogView | None:
assert trace_id == "trace-1" assert trace_id == "trace-1"
assert levels == ("ERROR",) assert levels == ("ERROR",)
assert ancestor_depth is None
return expected return expected
monkeypatch.setattr(runtime_module, "build_trace_log_reader", lambda _transport: StubReader()) monkeypatch.setattr(runtime_module, "build_trace_log_reader", lambda _transport: StubReader())
runtime = RuntimeManager() runtime = RuntimeManager()
result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",)))) result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",), ancestor_depth=None)))
assert result == expected assert result == expected
@@ -228,7 +503,11 @@ def test_mysql_trace_log_reader_maps_db_rows() -> None:
self._current_query = query self._current_query = query
def fetchone(self) -> dict[str, object] | None: def fetchone(self) -> dict[str, object] | None:
return {"parent_id": "root-77"} if self.executed[-1][1] == ("trace-1",):
return {"parent_id": "root-77"}
if self.executed[-1][1] == ("root-77",):
return {"parent_id": None}
return None
def fetchall(self) -> list[dict[str, object]]: def fetchall(self) -> list[dict[str, object]]:
if "WHERE parent_id = %s" in self._current_query: if "WHERE parent_id = %s" in self._current_query:
@@ -293,7 +572,167 @@ def test_mysql_trace_log_reader_maps_db_rows() -> None:
attrs_json={"attempt": 1}, attrs_json={"attempt": 1},
), ),
), ),
ancestors=(),
) )
assert len(factory.cursor.executed) == 3 assert len(factory.cursor.executed) == 3
assert factory.cursor.executed[1][1] == ("trace-1",) assert factory.cursor.executed[1][1] == ("trace-1",)
assert factory.cursor.executed[2][1] == ("trace-1", "ERROR", "WARNING") assert factory.cursor.executed[2][1] == ("trace-1", "ERROR", "WARNING")
def test_mysql_trace_log_reader_loads_requested_ancestors() -> None:
class FakeCursor:
def __init__(self) -> None:
self.executed: list[tuple[str, tuple[object, ...]]] = []
self._current_query = ""
def execute(self, query: str, params: tuple[object, ...]) -> None:
self.executed.append((query, params))
self._current_query = query
def fetchone(self) -> dict[str, object] | None:
if self.executed[-1][1] == ("trace-1",):
return {"parent_id": "parent-1"}
if self.executed[-1][1] == ("parent-1",):
return {"parent_id": "root-1"}
if self.executed[-1][1] == ("root-1",):
return {"parent_id": None}
return None
def fetchall(self) -> list[dict[str, object]]:
if "WHERE parent_id = %s" in self._current_query:
parent_id = self.executed[-1][1][0]
if parent_id == "trace-1":
return []
if parent_id == "parent-1":
return [{"trace_id": "trace-1"}]
if parent_id == "root-1":
return [{"trace_id": "parent-1"}]
return []
trace_id = self.executed[-1][1][0]
return [
{
"id": 8 if trace_id == "trace-1" else 9,
"trace_id": trace_id,
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
"step": "parse",
"status": "failed",
"level": "ERROR",
"message": f"broken:{trace_id}",
"attrs_json": '{"attempt":1}',
}
]
def __enter__(self) -> FakeCursor:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnection:
def __init__(self, cursor: FakeCursor) -> None:
self._cursor = cursor
def cursor(self) -> FakeCursor:
return self._cursor
def __enter__(self) -> FakeConnection:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnectionFactory:
def __init__(self) -> None:
self.cursor = FakeCursor()
def connect(self) -> FakeConnection:
return FakeConnection(self.cursor)
factory = FakeConnectionFactory()
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
view = reader.read_trace("trace-1", ("ERROR",), 1)
assert view is not None
assert view.trace_id == "trace-1"
assert view.parent_id == "parent-1"
assert len(view.ancestors) == 1
assert view.ancestors[0].trace_id == "parent-1"
assert view.ancestors[0].parent_id == "root-1"
assert view.ancestors[0].child_ids == ("trace-1",)
def test_mysql_trace_log_reader_orders_ancestors_root_first() -> None:
class FakeCursor:
def __init__(self) -> None:
self.executed: list[tuple[str, tuple[object, ...]]] = []
self._current_query = ""
def execute(self, query: str, params: tuple[object, ...]) -> None:
self.executed.append((query, params))
self._current_query = query
def fetchone(self) -> dict[str, object] | None:
if self.executed[-1][1] == ("trace-1",):
return {"parent_id": "parent-1"}
if self.executed[-1][1] == ("parent-1",):
return {"parent_id": "root-1"}
if self.executed[-1][1] == ("root-1",):
return {"parent_id": None}
return None
def fetchall(self) -> list[dict[str, object]]:
if "WHERE parent_id = %s" in self._current_query:
parent_id = self.executed[-1][1][0]
if parent_id == "root-1":
return [{"trace_id": "parent-1"}]
if parent_id == "parent-1":
return [{"trace_id": "trace-1"}]
return []
trace_id = self.executed[-1][1][0]
return [
{
"id": 8,
"trace_id": trace_id,
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
"step": "parse",
"status": "failed",
"level": "ERROR",
"message": f"broken:{trace_id}",
"attrs_json": '{"attempt":1}',
}
]
def __enter__(self) -> FakeCursor:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnection:
def __init__(self, cursor: FakeCursor) -> None:
self._cursor = cursor
def cursor(self) -> FakeCursor:
return self._cursor
def __enter__(self) -> FakeConnection:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnectionFactory:
def __init__(self) -> None:
self.cursor = FakeCursor()
def connect(self) -> FakeConnection:
return FakeConnection(self.cursor)
factory = FakeConnectionFactory()
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
view = reader.read_trace("trace-1", ("ERROR",), 2)
assert view is not None
assert tuple(ancestor.trace_id for ancestor in view.ancestors) == ("root-1", "parent-1")