Compare commits
8 Commits
2e75e53b89
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| e6509ee0cd | |||
| e2b817f785 | |||
| 8cad1d00ec | |||
| ec3198dbf1 | |||
| aed12c9c4e | |||
| df50e7acbb | |||
| 8789fcc0d1 | |||
| b2915c3987 |
@@ -319,7 +319,7 @@ with traces.open_context(alias="email:123", kind="email") as message_trace_id:
|
||||
- Consumer-воркер извлекает через `get(timeout)` и обрабатывает.
|
||||
|
||||
## 5. Application HTTP
|
||||
`PLBA` поддерживает отдельный прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. Он не смешивается с `control plane` и поднимается отдельным сервисом внутри того же runtime.
|
||||
`PLBA` поддерживает прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. По умолчанию он отделён от `control plane`, но при необходимости можно опубликовать control routes и business routes через один HTTP channel.
|
||||
|
||||
`Control Plane` и `Application HTTP` обслуживают разные контуры:
|
||||
- `Control Plane` используется для `/health`, `/actions/*`, `/traces/*`.
|
||||
@@ -330,6 +330,7 @@ with traces.open_context(alias="email:123", kind="email") as message_trace_id:
|
||||
- `HttpApplicationChannel` поднимает отдельный `FastAPI` app через `uvicorn`.
|
||||
- `HttpRouteRegistrar` регистрирует пользовательские routes и получает `ServiceContainer`.
|
||||
- `HttpApplicationAppFactory` собирает `FastAPI(title="PLBA Application API")`, middleware и routes.
|
||||
- `UnifiedHttpService` собирает один `FastAPI` app с control routes (`/health`, `/actions/*`, `/traces/*`) и application routes.
|
||||
|
||||
### Минимальный пример
|
||||
```python
|
||||
@@ -370,6 +371,20 @@ runtime.start()
|
||||
- `GET /demo`
|
||||
- `POST /demo/api/tasks`
|
||||
|
||||
### Единый HTTP API
|
||||
Если бизнес-приложению нужен один опубликованный порт для control и application routes, передайте `UnifiedHttpService` как `application_http`:
|
||||
|
||||
```python
|
||||
from plba import HttpApplicationChannel, RuntimeManager, UnifiedHttpService
|
||||
|
||||
runtime = RuntimeManager(application_http=UnifiedHttpService())
|
||||
runtime.application_http.register_channel(
|
||||
HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
|
||||
)
|
||||
```
|
||||
|
||||
Старый режим с отдельными `ControlPlaneService` и `ApplicationHttpService` остаётся доступен и работает как раньше.
|
||||
|
||||
### Типовые сценарии
|
||||
- `Web UI для фоновых задач`: список задач, запуск, статус, cancel, download result.
|
||||
- `Внутренний REST API`: например `POST /jobs`, `GET /jobs/{id}`, `POST /jobs/{id}/cancel`.
|
||||
@@ -377,7 +392,7 @@ runtime.start()
|
||||
- `Webhook endpoint`: прием callback от внешней системы и передача события в worker pipeline.
|
||||
|
||||
### Ограничения и рекомендации
|
||||
- Не смешивайте business routes с `control plane`.
|
||||
- По умолчанию держите business routes и `control plane` раздельно; используйте `UnifiedHttpService`, когда один опубликованный порт является явным эксплуатационным требованием.
|
||||
- Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер.
|
||||
- Runtime предоставляет доступ к зависимостям через `services`, поэтому handlers не должны зависеть от `RuntimeManager`.
|
||||
- В первой версии `Application HTTP` не решает auth, static files, шаблонизаторы, websocket и OpenAPI customization.
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "plba"
|
||||
version = "0.3.14"
|
||||
version = "0.4.0"
|
||||
description = "Platform runtime for business applications"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
|
||||
@@ -92,8 +92,14 @@ class TraceLogView:
|
||||
parent_id: str | None
|
||||
child_ids: tuple[str, ...] = ()
|
||||
records: tuple[TraceLogRecord, ...] = ()
|
||||
ancestors: tuple[TraceLogView, ...] = ()
|
||||
|
||||
|
||||
class TraceLogReader(Protocol):
|
||||
def read_trace(self, trace_id: str, levels: tuple[TraceLevel, ...]) -> TraceLogView | None:
|
||||
def read_trace(
|
||||
self,
|
||||
trace_id: str,
|
||||
levels: tuple[TraceLevel, ...],
|
||||
ancestor_depth: int | None = 0,
|
||||
) -> TraceLogView | None:
|
||||
"""Load trace context and filtered log records."""
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from app_runtime.control.base import ControlActionRequest, ControlActionSet
|
||||
|
||||
|
||||
class ControlActionResponder:
|
||||
def __init__(self, actions: ControlActionSet, timeout: int) -> None:
|
||||
self._actions = actions
|
||||
self._timeout = timeout
|
||||
|
||||
async def respond(
|
||||
self,
|
||||
action: str,
|
||||
_client_source: str = "unknown",
|
||||
request: ControlActionRequest | None = None,
|
||||
) -> JSONResponse:
|
||||
callbacks = {
|
||||
"start": self._actions.start,
|
||||
"stop": self._actions.stop,
|
||||
"status": self._actions.status,
|
||||
}
|
||||
callback = callbacks.get(action)
|
||||
if callback is None:
|
||||
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
|
||||
action_request = request or ControlActionRequest()
|
||||
action_timeout = self._action_timeout(action, action_request)
|
||||
try:
|
||||
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
|
||||
except asyncio.TimeoutError:
|
||||
return JSONResponse(
|
||||
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
|
||||
status_code=202,
|
||||
)
|
||||
except Exception as exc:
|
||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
||||
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
|
||||
|
||||
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
|
||||
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
|
||||
if action != "stop" or request.wait is False or request.timeout is None:
|
||||
return base_timeout
|
||||
return max(base_timeout, float(request.timeout) + 1.0)
|
||||
@@ -27,6 +27,7 @@ class TraceQueryRequest:
|
||||
levels: tuple[TraceLevel, ...] = ("ERROR", "WARNING", "INFO")
|
||||
include_attrs_json: bool = False
|
||||
response_format: TraceResponseFormat = "html"
|
||||
ancestor_depth: int | None = 0
|
||||
|
||||
|
||||
TraceLookupHandler = Callable[[str, TraceQueryRequest], Awaitable[TraceLogView]]
|
||||
|
||||
@@ -4,6 +4,7 @@ import asyncio
|
||||
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from app_runtime.control.action_responder import ControlActionResponder
|
||||
from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel, TraceQueryRequest
|
||||
from app_runtime.contracts.trace import TraceLogView
|
||||
from app_runtime.control.http_app import HttpControlAppFactory
|
||||
@@ -16,9 +17,11 @@ class HttpControlChannel(ControlChannel):
|
||||
self._runner = UvicornThreadRunner(host, port, timeout)
|
||||
self._factory = HttpControlAppFactory()
|
||||
self._actions: ControlActionSet | None = None
|
||||
self._action_responder: ControlActionResponder | None = None
|
||||
|
||||
async def start(self, actions: ControlActionSet) -> None:
|
||||
self._actions = actions
|
||||
self._action_responder = ControlActionResponder(actions, self._timeout)
|
||||
app = self._factory.create(self._health_response, self._action_response, self._trace_response)
|
||||
await self._runner.start(app)
|
||||
|
||||
@@ -40,34 +43,14 @@ class HttpControlChannel(ControlChannel):
|
||||
_client_source: str = "unknown",
|
||||
request: ControlActionRequest | None = None,
|
||||
) -> JSONResponse:
|
||||
if self._action_responder is None:
|
||||
if self._actions is None:
|
||||
return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404)
|
||||
callbacks = {
|
||||
"start": self._actions.start,
|
||||
"stop": self._actions.stop,
|
||||
"status": self._actions.status,
|
||||
}
|
||||
callback = callbacks.get(action)
|
||||
if callback is None:
|
||||
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
|
||||
action_request = request or ControlActionRequest()
|
||||
action_timeout = self._action_timeout(action, action_request)
|
||||
try:
|
||||
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
|
||||
except asyncio.TimeoutError:
|
||||
return JSONResponse(
|
||||
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
|
||||
status_code=202,
|
||||
content={"status": "error", "detail": f"{action} handler is not configured"},
|
||||
status_code=404,
|
||||
)
|
||||
except Exception as exc:
|
||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
||||
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
|
||||
|
||||
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
|
||||
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
|
||||
if action != "stop" or request.wait is False or request.timeout is None:
|
||||
return base_timeout
|
||||
return max(base_timeout, float(request.timeout) + 1.0)
|
||||
self._action_responder = ControlActionResponder(self._actions, self._timeout)
|
||||
return await self._action_responder.respond(action, _client_source, request)
|
||||
|
||||
async def _trace_response(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView:
|
||||
if self._actions is None or self._actions.trace_lookup is None:
|
||||
|
||||
@@ -11,6 +11,9 @@ from app_runtime.control.base import TraceQueryRequest
|
||||
from app_runtime.contracts.trace import TraceLevel, TraceLogRecord, TraceLogView
|
||||
|
||||
|
||||
TRACE_SECTION_SEPARATOR = "=" * 30
|
||||
|
||||
|
||||
class TraceRequestParser:
|
||||
def parse(self, request: Request) -> TraceQueryRequest:
|
||||
raw_levels = request.query_params.get("levels")
|
||||
@@ -22,6 +25,7 @@ class TraceRequestParser:
|
||||
levels=self._trace_levels(raw_levels),
|
||||
include_attrs_json=self._bool_param(request, "attrs_json") or False,
|
||||
response_format=response_format,
|
||||
ancestor_depth=self._ancestor_depth(request),
|
||||
)
|
||||
|
||||
def _trace_levels(self, raw_levels: str | None) -> tuple[TraceLevel, ...]:
|
||||
@@ -47,6 +51,21 @@ class TraceRequestParser:
|
||||
return False
|
||||
raise ValueError(f"invalid boolean query parameter: {name}={raw_value}")
|
||||
|
||||
def _ancestor_depth(self, request: Request) -> int | None:
|
||||
raw_value = request.query_params.get("ancestor_depth")
|
||||
if raw_value is None:
|
||||
return 0
|
||||
normalized = raw_value.strip().lower()
|
||||
if normalized == "all":
|
||||
return None
|
||||
try:
|
||||
value = int(normalized)
|
||||
except ValueError as exc:
|
||||
raise ValueError(f"invalid ancestor depth query parameter: ancestor_depth={raw_value}") from exc
|
||||
if value < 0:
|
||||
raise ValueError(f"query parameter must be >= 0: ancestor_depth={raw_value}")
|
||||
return value
|
||||
|
||||
|
||||
class TraceResponseRenderer:
|
||||
def render(self, trace_view: TraceLogView, request: TraceQueryRequest) -> Response:
|
||||
@@ -63,26 +82,19 @@ class TraceResponseRenderer:
|
||||
"parent_id": trace_view.parent_id or "",
|
||||
"child_ids": list(trace_view.child_ids),
|
||||
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
|
||||
"ancestors": [self._trace_payload(view, request) for view in trace_view.ancestors],
|
||||
}
|
||||
)
|
||||
|
||||
def _render_text(self, trace_view: TraceLogView, request: TraceQueryRequest) -> PlainTextResponse:
|
||||
lines = [
|
||||
f"trace_id: {trace_view.trace_id}",
|
||||
f"parent_id: {trace_view.parent_id or ''}",
|
||||
*self._child_id_lines(trace_view.child_ids),
|
||||
"--------------------------------------------------",
|
||||
]
|
||||
previous_step: str | None = None
|
||||
for record in trace_view.records:
|
||||
current_step = str(record.step or "")
|
||||
if previous_step is None:
|
||||
lines.append(f"step: {current_step}")
|
||||
elif current_step != previous_step:
|
||||
lines.append("--------------------------------------------------")
|
||||
lines.append(f"step: {current_step}")
|
||||
previous_step = current_step
|
||||
lines.append(self._text_message(record, request.include_attrs_json))
|
||||
lineage = [*trace_view.ancestors, trace_view]
|
||||
lines = self._text_trace_summary_lines(trace_view)
|
||||
for index, entry in enumerate(lineage):
|
||||
if index == 0:
|
||||
lines.append("")
|
||||
else:
|
||||
lines.extend(["", ""])
|
||||
lines.extend(self._text_trace_log_lines(entry, request))
|
||||
return PlainTextResponse(content="\n".join(lines))
|
||||
|
||||
def _render_html(self, trace_view: TraceLogView, request: TraceQueryRequest) -> HTMLResponse:
|
||||
@@ -162,22 +174,75 @@ class TraceResponseRenderer:
|
||||
return f"{record.message}, {json.dumps(record.attrs_json, ensure_ascii=False, separators=(',', ':'))}"
|
||||
|
||||
def _trace_href(self, trace_id: str, request: TraceQueryRequest) -> str:
|
||||
params = urlencode(
|
||||
{
|
||||
params = {
|
||||
"format": "html",
|
||||
"levels": ",".join(request.levels),
|
||||
"attrs_json": "true" if request.include_attrs_json else "false",
|
||||
}
|
||||
)
|
||||
return f"/traces/{trace_id}?{params}"
|
||||
if request.ancestor_depth is None:
|
||||
params["ancestor_depth"] = "all"
|
||||
elif request.ancestor_depth > 0:
|
||||
params["ancestor_depth"] = str(request.ancestor_depth)
|
||||
query = urlencode(params)
|
||||
return f"/traces/{trace_id}?{query}"
|
||||
|
||||
def _html_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> str:
|
||||
lineage = [*trace_view.ancestors, trace_view]
|
||||
lines = self._html_trace_summary_lines(trace_view, request)
|
||||
for index, entry in enumerate(lineage):
|
||||
if index == 0:
|
||||
lines.append(self._html_plain_line(""))
|
||||
else:
|
||||
lines.extend([self._html_plain_line(""), self._html_plain_line("")])
|
||||
lines.extend(self._html_trace_log_lines(entry, request))
|
||||
return "".join(lines)
|
||||
|
||||
def _trace_payload(self, trace_view: TraceLogView, request: TraceQueryRequest) -> dict[str, object]:
|
||||
return {
|
||||
"trace_id": trace_view.trace_id,
|
||||
"parent_id": trace_view.parent_id or "",
|
||||
"child_ids": list(trace_view.child_ids),
|
||||
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
|
||||
}
|
||||
|
||||
def _text_trace_summary_lines(self, trace_view: TraceLogView) -> list[str]:
|
||||
return [
|
||||
f"trace_id: {trace_view.trace_id}",
|
||||
f"parent_id: {trace_view.parent_id or ''}",
|
||||
*self._child_id_lines(trace_view.child_ids),
|
||||
]
|
||||
|
||||
def _text_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
|
||||
lines = [
|
||||
TRACE_SECTION_SEPARATOR,
|
||||
f"trace_id: {trace_view.trace_id}",
|
||||
"",
|
||||
]
|
||||
previous_step: str | None = None
|
||||
for record in trace_view.records:
|
||||
current_step = str(record.step or "")
|
||||
if previous_step is None:
|
||||
lines.append(f"step: {current_step}")
|
||||
elif current_step != previous_step:
|
||||
lines.append("------------------------------")
|
||||
lines.append(f"step: {current_step}")
|
||||
previous_step = current_step
|
||||
lines.append(self._text_message(record, request.include_attrs_json))
|
||||
return lines
|
||||
|
||||
def _html_trace_summary_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
|
||||
return [
|
||||
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
|
||||
self._html_plain_line(f"parent_id: {self._optional_trace_link(trace_view.parent_id, request)}"),
|
||||
self._html_plain_line("child_ids:"),
|
||||
*(self._html_plain_line(f" - {self._trace_link(child_id, request)}") for child_id in trace_view.child_ids),
|
||||
self._html_plain_line("--------------------------------------------------"),
|
||||
]
|
||||
|
||||
def _html_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
|
||||
lines = [
|
||||
self._html_plain_line(TRACE_SECTION_SEPARATOR),
|
||||
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
|
||||
self._html_plain_line(""),
|
||||
]
|
||||
previous_step: str | None = None
|
||||
for record in trace_view.records:
|
||||
@@ -187,12 +252,12 @@ class TraceResponseRenderer:
|
||||
lines.append(self._html_plain_line(""))
|
||||
elif current_step != previous_step:
|
||||
lines.append(self._html_plain_line(""))
|
||||
lines.append(self._html_plain_line("--------------------------------------------------"))
|
||||
lines.append(self._html_plain_line("------------------------------"))
|
||||
lines.append(self._html_step_line(current_step))
|
||||
lines.append(self._html_plain_line(""))
|
||||
previous_step = current_step
|
||||
lines.extend(self._html_message_lines(record, request.include_attrs_json))
|
||||
return "".join(lines)
|
||||
return lines
|
||||
|
||||
def _html_message_lines(self, record: TraceLogRecord, include_attrs_json: bool) -> list[str]:
|
||||
lines = [self._html_colored_line(self._text_message(record, include_attrs_json), record.level)]
|
||||
|
||||
@@ -141,7 +141,7 @@ class RuntimeManager:
|
||||
reader = build_trace_log_reader(self.traces.transport)
|
||||
if reader is None:
|
||||
raise RuntimeError("trace log reader is not configured")
|
||||
trace_view = reader.read_trace(trace_id, request.levels)
|
||||
trace_view = reader.read_trace(trace_id, request.levels, request.ancestor_depth)
|
||||
if trace_view is None:
|
||||
raise KeyError(trace_id)
|
||||
return trace_view
|
||||
|
||||
@@ -2,6 +2,7 @@ from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
|
||||
from app_runtime.http.http_app import HttpApplicationAppFactory
|
||||
from app_runtime.http.http_channel import HttpApplicationChannel
|
||||
from app_runtime.http.service import ApplicationHttpService
|
||||
from app_runtime.http.unified_service import UnifiedHttpService
|
||||
|
||||
__all__ = [
|
||||
"ApplicationHttpChannel",
|
||||
@@ -9,4 +10,5 @@ __all__ = [
|
||||
"HttpApplicationAppFactory",
|
||||
"HttpApplicationChannel",
|
||||
"HttpRouteRegistrar",
|
||||
"UnifiedHttpService",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from app_runtime.control.action_responder import ControlActionResponder
|
||||
from app_runtime.control.base import ControlActionSet
|
||||
from app_runtime.control.http_app import HttpControlAppFactory
|
||||
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
|
||||
from app_runtime.http.http_app import HttpApplicationAppFactory
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app_runtime.core.runtime import RuntimeManager
|
||||
|
||||
|
||||
class UnifiedHttpService:
|
||||
"""Publish control and application routes through the same HTTP channel."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
app_factory: HttpApplicationAppFactory | None = None,
|
||||
control_app_factory: HttpControlAppFactory | None = None,
|
||||
control_timeout: int = 5,
|
||||
) -> None:
|
||||
self._channels: list[ApplicationHttpChannel] = []
|
||||
self._registrars: list[HttpRouteRegistrar] = []
|
||||
self._app_factory = app_factory or HttpApplicationAppFactory()
|
||||
self._control_app_factory = control_app_factory or HttpControlAppFactory()
|
||||
self._control_timeout = control_timeout
|
||||
|
||||
def register_channel(self, channel: ApplicationHttpChannel) -> None:
|
||||
self._channels.append(channel)
|
||||
|
||||
def register_routes(self, registrar: HttpRouteRegistrar) -> None:
|
||||
self._registrars.append(registrar)
|
||||
|
||||
def start(self, runtime: RuntimeManager) -> None:
|
||||
if not self._channels:
|
||||
return
|
||||
asyncio.run(self._start_async(runtime))
|
||||
|
||||
def stop(self) -> None:
|
||||
if not self._channels:
|
||||
return
|
||||
asyncio.run(self._stop_async())
|
||||
|
||||
async def _start_async(self, runtime: RuntimeManager) -> None:
|
||||
app = self._app_factory.create(self._registrars, runtime.services)
|
||||
actions = ControlActionSet(
|
||||
health=runtime.health_status,
|
||||
start=runtime.start_runtime,
|
||||
stop=runtime.stop_runtime,
|
||||
status=runtime.runtime_status,
|
||||
trace_lookup=runtime.trace_logs,
|
||||
)
|
||||
action_responder = ControlActionResponder(actions, self._control_timeout)
|
||||
control_app = self._control_app_factory.create(
|
||||
actions.health,
|
||||
action_responder.respond,
|
||||
actions.trace_lookup,
|
||||
)
|
||||
app.router.routes.extend(control_app.router.routes)
|
||||
for channel in self._channels:
|
||||
await channel.start(app)
|
||||
|
||||
async def _stop_async(self) -> None:
|
||||
for channel in reversed(self._channels):
|
||||
await channel.stop()
|
||||
@@ -11,10 +11,16 @@ class MySqlTraceLogReader(TraceLogReader):
|
||||
def __init__(self, connection_factory: MySqlTraceConnectionFactory) -> None:
|
||||
self._connection_factory = connection_factory
|
||||
|
||||
def read_trace(self, trace_id: str, levels: tuple[TraceLevel, ...]) -> TraceLogView | None:
|
||||
def read_trace(
|
||||
self,
|
||||
trace_id: str,
|
||||
levels: tuple[TraceLevel, ...],
|
||||
ancestor_depth: int | None = 0,
|
||||
) -> TraceLogView | None:
|
||||
parent_id = self._read_parent_id(trace_id)
|
||||
if parent_id is None and not self._trace_exists(trace_id):
|
||||
return None
|
||||
ancestors = self._read_ancestors(parent_id, levels, ancestor_depth)
|
||||
child_ids = self._read_child_ids(trace_id)
|
||||
records = self._read_records(trace_id, levels)
|
||||
return TraceLogView(
|
||||
@@ -22,8 +28,36 @@ class MySqlTraceLogReader(TraceLogReader):
|
||||
parent_id=parent_id,
|
||||
child_ids=tuple(child_ids),
|
||||
records=tuple(records),
|
||||
ancestors=tuple(ancestors),
|
||||
)
|
||||
|
||||
def _read_ancestors(
|
||||
self,
|
||||
parent_id: str | None,
|
||||
levels: tuple[TraceLevel, ...],
|
||||
ancestor_depth: int | None,
|
||||
) -> list[TraceLogView]:
|
||||
if parent_id is None or ancestor_depth == 0:
|
||||
return []
|
||||
remaining_depth = ancestor_depth
|
||||
ancestors: list[TraceLogView] = []
|
||||
current_trace_id = parent_id
|
||||
while current_trace_id is not None and (remaining_depth is None or remaining_depth > 0):
|
||||
current_parent_id = self._read_parent_id(current_trace_id)
|
||||
ancestors.append(
|
||||
TraceLogView(
|
||||
trace_id=current_trace_id,
|
||||
parent_id=current_parent_id,
|
||||
child_ids=tuple(self._read_child_ids(current_trace_id)),
|
||||
records=tuple(self._read_records(current_trace_id, levels)),
|
||||
)
|
||||
)
|
||||
current_trace_id = current_parent_id
|
||||
if remaining_depth is not None:
|
||||
remaining_depth -= 1
|
||||
ancestors.reverse()
|
||||
return ancestors
|
||||
|
||||
def _trace_exists(self, trace_id: str) -> bool:
|
||||
query = "SELECT 1 FROM trace_contexts WHERE trace_id = %s"
|
||||
with self._connection_factory.connect() as connection:
|
||||
|
||||
@@ -21,6 +21,7 @@ from plba.http import (
|
||||
HttpApplicationAppFactory,
|
||||
HttpApplicationChannel,
|
||||
HttpRouteRegistrar,
|
||||
UnifiedHttpService,
|
||||
)
|
||||
from plba.logging import LogManager
|
||||
from plba.queue import InMemoryTaskQueue
|
||||
@@ -56,6 +57,7 @@ __all__ = [
|
||||
"HttpApplicationChannel",
|
||||
"HttpRouteRegistrar",
|
||||
"HttpControlChannel",
|
||||
"UnifiedHttpService",
|
||||
"InMemoryTaskQueue",
|
||||
"LogManager",
|
||||
"MySqlTraceTransport",
|
||||
|
||||
@@ -2,6 +2,7 @@ from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
|
||||
from app_runtime.http.http_app import HttpApplicationAppFactory
|
||||
from app_runtime.http.http_channel import HttpApplicationChannel
|
||||
from app_runtime.http.service import ApplicationHttpService
|
||||
from app_runtime.http.unified_service import UnifiedHttpService
|
||||
|
||||
__all__ = [
|
||||
"ApplicationHttpChannel",
|
||||
@@ -9,4 +10,5 @@ __all__ = [
|
||||
"HttpApplicationAppFactory",
|
||||
"HttpApplicationChannel",
|
||||
"HttpRouteRegistrar",
|
||||
"UnifiedHttpService",
|
||||
]
|
||||
|
||||
@@ -15,6 +15,7 @@ from app_runtime.core.registration import ModuleRegistry
|
||||
from app_runtime.core.runtime import RuntimeManager
|
||||
from app_runtime.http.base import ApplicationHttpChannel
|
||||
from app_runtime.http.http_channel import HttpApplicationChannel
|
||||
from app_runtime.http.unified_service import UnifiedHttpService
|
||||
|
||||
try:
|
||||
import python_multipart # noqa: F401
|
||||
@@ -207,3 +208,27 @@ def test_control_plane_and_application_http_work_independently() -> None:
|
||||
finally:
|
||||
runtime.control_plane.stop()
|
||||
runtime.stop(stop_control_plane=False)
|
||||
|
||||
|
||||
def test_unified_http_serves_control_and_application_routes_on_one_app() -> None:
|
||||
runtime = RuntimeManager(application_http=UnifiedHttpService())
|
||||
channel = RecordingChannel()
|
||||
runtime.application_http.register_channel(channel)
|
||||
runtime.register_module(HttpModule(PingRoutes()))
|
||||
runtime.start(start_control_plane=False)
|
||||
|
||||
try:
|
||||
client = _application_client(channel)
|
||||
with client:
|
||||
health_response = client.get("/health")
|
||||
action_response = client.get("/actions/status")
|
||||
app_response = client.get("/estimate/ping")
|
||||
|
||||
assert health_response.status_code == 200
|
||||
assert health_response.json()["status"] == "ok"
|
||||
assert action_response.status_code == 200
|
||||
assert action_response.json() == {"status": "ok", "detail": "idle"}
|
||||
assert app_response.status_code == 200
|
||||
assert app_response.json() == {"status": "ok"}
|
||||
finally:
|
||||
runtime.stop(stop_control_plane=False)
|
||||
|
||||
@@ -72,7 +72,17 @@ def test_trace_endpoint_returns_html_by_default() -> None:
|
||||
assert "trace_id:" in response.text
|
||||
assert "first error" in response.text
|
||||
assert "second warning" in response.text
|
||||
assert captured == [("trace-1", TraceQueryRequest(levels=("ERROR", "WARNING", "INFO"), include_attrs_json=False, response_format="html"))]
|
||||
assert captured == [
|
||||
(
|
||||
"trace-1",
|
||||
TraceQueryRequest(
|
||||
levels=("ERROR", "WARNING", "INFO"),
|
||||
include_attrs_json=False,
|
||||
response_format="html",
|
||||
ancestor_depth=0,
|
||||
),
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_trace_endpoint_returns_text_when_requested() -> None:
|
||||
@@ -100,7 +110,10 @@ def test_trace_endpoint_returns_text_when_requested() -> None:
|
||||
"child_ids:\n"
|
||||
" - child-1\n"
|
||||
" - child-2\n"
|
||||
"--------------------------------------------------\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: trace-1\n"
|
||||
"\n"
|
||||
"step: process\n"
|
||||
"first error\n"
|
||||
"second warning"
|
||||
@@ -129,7 +142,10 @@ def test_trace_endpoint_appends_attrs_json_in_text_mode() -> None:
|
||||
"trace_id: trace-1\n"
|
||||
"parent_id: \n"
|
||||
"child_ids:\n"
|
||||
"--------------------------------------------------\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: trace-1\n"
|
||||
"\n"
|
||||
"step: process\n"
|
||||
'failure, {"attempt":2,"source":"crm"}'
|
||||
)
|
||||
@@ -159,11 +175,14 @@ def test_trace_endpoint_separates_messages_by_step_in_text_mode() -> None:
|
||||
"trace_id: trace-1\n"
|
||||
"parent_id: \n"
|
||||
"child_ids:\n"
|
||||
"--------------------------------------------------\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: trace-1\n"
|
||||
"\n"
|
||||
"step: load_stocks\n"
|
||||
"load first\n"
|
||||
"load second\n"
|
||||
"--------------------------------------------------\n"
|
||||
"------------------------------\n"
|
||||
"step: filter_stocks\n"
|
||||
"filter first"
|
||||
)
|
||||
@@ -203,9 +222,82 @@ def test_trace_endpoint_returns_json_payload() -> None:
|
||||
"attrs_json": {"batch": 7},
|
||||
}
|
||||
],
|
||||
"ancestors": [],
|
||||
}
|
||||
|
||||
|
||||
def test_trace_endpoint_returns_json_payload_with_ancestors() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="parent-1",
|
||||
child_ids=("child-1",),
|
||||
records=(
|
||||
_trace_record(row_id=3, level="INFO", message="done", attrs_json={"batch": 7}),
|
||||
),
|
||||
ancestors=(
|
||||
TraceLogView(
|
||||
trace_id="root-1",
|
||||
parent_id=None,
|
||||
child_ids=("parent-1",),
|
||||
records=(
|
||||
_trace_record(row_id=4, level="INFO", message="root info"),
|
||||
),
|
||||
),
|
||||
TraceLogView(
|
||||
trace_id="parent-1",
|
||||
parent_id="root-1",
|
||||
child_ids=("trace-1", "sibling-1"),
|
||||
records=(
|
||||
_trace_record(row_id=5, level="WARNING", message="parent warning"),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1?format=json&ancestor_depth=1")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json()["ancestors"] == [
|
||||
{
|
||||
"trace_id": "root-1",
|
||||
"parent_id": "",
|
||||
"child_ids": ["parent-1"],
|
||||
"messages": [
|
||||
{
|
||||
"id": 4,
|
||||
"trace_id": "trace-1",
|
||||
"event_time": "2026-04-28T10:11:12+00:00",
|
||||
"step": "process",
|
||||
"status": "failed",
|
||||
"level": "INFO",
|
||||
"message": "root info",
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"trace_id": "parent-1",
|
||||
"parent_id": "root-1",
|
||||
"child_ids": ["trace-1", "sibling-1"],
|
||||
"messages": [
|
||||
{
|
||||
"id": 5,
|
||||
"trace_id": "trace-1",
|
||||
"event_time": "2026-04-28T10:11:12+00:00",
|
||||
"step": "process",
|
||||
"status": "failed",
|
||||
"level": "WARNING",
|
||||
"message": "parent warning",
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def test_trace_endpoint_returns_html_page_with_related_links() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
@@ -240,8 +332,9 @@ def test_trace_endpoint_returns_html_page_with_related_links() -> None:
|
||||
assert '<div class="line">child_ids:</div>' in response.text
|
||||
assert '<div class="line"> - <a href="/traces/child-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">child-1</a></div>' in response.text
|
||||
assert '<div class="line"> - <a href="/traces/child-2?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">child-2</a></div>' in response.text
|
||||
assert '<div class="line">==============================</div>' in response.text
|
||||
assert '<div class="line" style="color: var(--step);">load_stocks</div>' in response.text
|
||||
assert '<div class="line">--------------------------------------------------</div>' in response.text
|
||||
assert '<div class="line">trace_id: <a href="/traces/trace-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">trace-1</a></div>' in response.text
|
||||
assert '<div class="line" style="color: var(--step);">filter_stocks</div>' in response.text
|
||||
assert "loaded prices" in response.text
|
||||
assert "filtered suspicious ticker" in response.text
|
||||
@@ -250,11 +343,108 @@ def test_trace_endpoint_returns_html_page_with_related_links() -> None:
|
||||
assert "Related Traces" not in response.text
|
||||
|
||||
|
||||
def test_trace_endpoint_renders_ancestors_in_text_mode() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="parent-1",
|
||||
child_ids=(),
|
||||
records=(_trace_record(row_id=1, level="INFO", message="child message"),),
|
||||
ancestors=(
|
||||
TraceLogView(
|
||||
trace_id="root-1",
|
||||
parent_id=None,
|
||||
child_ids=("parent-1",),
|
||||
records=(_trace_record(row_id=2, level="INFO", message="root message"),),
|
||||
),
|
||||
TraceLogView(
|
||||
trace_id="parent-1",
|
||||
parent_id="root-1",
|
||||
child_ids=("trace-1",),
|
||||
records=(_trace_record(row_id=3, level="WARNING", message="parent message"),),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1?format=text&ancestor_depth=1")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.text == (
|
||||
"trace_id: trace-1\n"
|
||||
"parent_id: parent-1\n"
|
||||
"child_ids:\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: root-1\n"
|
||||
"\n"
|
||||
"step: process\n"
|
||||
"root message\n"
|
||||
"\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: parent-1\n"
|
||||
"\n"
|
||||
"step: process\n"
|
||||
"parent message\n"
|
||||
"\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: trace-1\n"
|
||||
"\n"
|
||||
"step: process\n"
|
||||
"child message"
|
||||
)
|
||||
|
||||
|
||||
def test_trace_endpoint_preserves_ancestor_depth_in_html_links() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="parent-1",
|
||||
child_ids=("child-1",),
|
||||
records=(_trace_record(row_id=1, level="INFO", message="loaded prices"),),
|
||||
ancestors=(
|
||||
TraceLogView(
|
||||
trace_id="root-1",
|
||||
parent_id=None,
|
||||
child_ids=("parent-1",),
|
||||
records=(_trace_record(row_id=2, level="INFO", message="root info"),),
|
||||
),
|
||||
TraceLogView(
|
||||
trace_id="parent-1",
|
||||
parent_id="root-1",
|
||||
child_ids=("trace-1",),
|
||||
records=(_trace_record(row_id=3, level="WARNING", message="parent warning"),),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1?format=html&attrs_json=true&ancestor_depth=all")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert 'href="/traces/trace-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
||||
assert 'href="/traces/root-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
||||
assert 'href="/traces/parent-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
||||
assert response.text.index("root info") < response.text.index("parent warning") < response.text.index("loaded prices")
|
||||
assert "root info" in response.text
|
||||
assert "parent warning" in response.text
|
||||
|
||||
|
||||
def test_trace_endpoint_validates_query_params() -> None:
|
||||
client = _build_client(lambda _trace_id, _request: None)
|
||||
try:
|
||||
invalid_level = client.get("/traces/trace-1?levels=error,fatal")
|
||||
invalid_format = client.get("/traces/trace-1?format=xml")
|
||||
invalid_ancestor_depth = client.get("/traces/trace-1?ancestor_depth=-1")
|
||||
invalid_ancestor_type = client.get("/traces/trace-1?ancestor_depth=up")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
@@ -262,6 +452,16 @@ def test_trace_endpoint_validates_query_params() -> None:
|
||||
assert invalid_level.json() == {"status": "error", "detail": "unsupported trace levels: FATAL"}
|
||||
assert invalid_format.status_code == 400
|
||||
assert invalid_format.json() == {"status": "error", "detail": "unsupported trace format: xml"}
|
||||
assert invalid_ancestor_depth.status_code == 400
|
||||
assert invalid_ancestor_depth.json() == {
|
||||
"status": "error",
|
||||
"detail": "query parameter must be >= 0: ancestor_depth=-1",
|
||||
}
|
||||
assert invalid_ancestor_type.status_code == 400
|
||||
assert invalid_ancestor_type.json() == {
|
||||
"status": "error",
|
||||
"detail": "invalid ancestor depth query parameter: ancestor_depth=up",
|
||||
}
|
||||
|
||||
|
||||
def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
|
||||
@@ -273,15 +473,21 @@ def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
|
||||
)
|
||||
|
||||
class StubReader:
|
||||
def read_trace(self, trace_id: str, levels: tuple[str, ...]) -> TraceLogView | None:
|
||||
def read_trace(
|
||||
self,
|
||||
trace_id: str,
|
||||
levels: tuple[str, ...],
|
||||
ancestor_depth: int | None = 0,
|
||||
) -> TraceLogView | None:
|
||||
assert trace_id == "trace-1"
|
||||
assert levels == ("ERROR",)
|
||||
assert ancestor_depth is None
|
||||
return expected
|
||||
|
||||
monkeypatch.setattr(runtime_module, "build_trace_log_reader", lambda _transport: StubReader())
|
||||
runtime = RuntimeManager()
|
||||
|
||||
result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",))))
|
||||
result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",), ancestor_depth=None)))
|
||||
|
||||
assert result == expected
|
||||
|
||||
@@ -297,7 +503,11 @@ def test_mysql_trace_log_reader_maps_db_rows() -> None:
|
||||
self._current_query = query
|
||||
|
||||
def fetchone(self) -> dict[str, object] | None:
|
||||
if self.executed[-1][1] == ("trace-1",):
|
||||
return {"parent_id": "root-77"}
|
||||
if self.executed[-1][1] == ("root-77",):
|
||||
return {"parent_id": None}
|
||||
return None
|
||||
|
||||
def fetchall(self) -> list[dict[str, object]]:
|
||||
if "WHERE parent_id = %s" in self._current_query:
|
||||
@@ -362,7 +572,167 @@ def test_mysql_trace_log_reader_maps_db_rows() -> None:
|
||||
attrs_json={"attempt": 1},
|
||||
),
|
||||
),
|
||||
ancestors=(),
|
||||
)
|
||||
assert len(factory.cursor.executed) == 3
|
||||
assert factory.cursor.executed[1][1] == ("trace-1",)
|
||||
assert factory.cursor.executed[2][1] == ("trace-1", "ERROR", "WARNING")
|
||||
|
||||
|
||||
def test_mysql_trace_log_reader_loads_requested_ancestors() -> None:
|
||||
class FakeCursor:
|
||||
def __init__(self) -> None:
|
||||
self.executed: list[tuple[str, tuple[object, ...]]] = []
|
||||
self._current_query = ""
|
||||
|
||||
def execute(self, query: str, params: tuple[object, ...]) -> None:
|
||||
self.executed.append((query, params))
|
||||
self._current_query = query
|
||||
|
||||
def fetchone(self) -> dict[str, object] | None:
|
||||
if self.executed[-1][1] == ("trace-1",):
|
||||
return {"parent_id": "parent-1"}
|
||||
if self.executed[-1][1] == ("parent-1",):
|
||||
return {"parent_id": "root-1"}
|
||||
if self.executed[-1][1] == ("root-1",):
|
||||
return {"parent_id": None}
|
||||
return None
|
||||
|
||||
def fetchall(self) -> list[dict[str, object]]:
|
||||
if "WHERE parent_id = %s" in self._current_query:
|
||||
parent_id = self.executed[-1][1][0]
|
||||
if parent_id == "trace-1":
|
||||
return []
|
||||
if parent_id == "parent-1":
|
||||
return [{"trace_id": "trace-1"}]
|
||||
if parent_id == "root-1":
|
||||
return [{"trace_id": "parent-1"}]
|
||||
return []
|
||||
trace_id = self.executed[-1][1][0]
|
||||
return [
|
||||
{
|
||||
"id": 8 if trace_id == "trace-1" else 9,
|
||||
"trace_id": trace_id,
|
||||
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
||||
"step": "parse",
|
||||
"status": "failed",
|
||||
"level": "ERROR",
|
||||
"message": f"broken:{trace_id}",
|
||||
"attrs_json": '{"attempt":1}',
|
||||
}
|
||||
]
|
||||
|
||||
def __enter__(self) -> FakeCursor:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
return None
|
||||
|
||||
class FakeConnection:
|
||||
def __init__(self, cursor: FakeCursor) -> None:
|
||||
self._cursor = cursor
|
||||
|
||||
def cursor(self) -> FakeCursor:
|
||||
return self._cursor
|
||||
|
||||
def __enter__(self) -> FakeConnection:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
return None
|
||||
|
||||
class FakeConnectionFactory:
|
||||
def __init__(self) -> None:
|
||||
self.cursor = FakeCursor()
|
||||
|
||||
def connect(self) -> FakeConnection:
|
||||
return FakeConnection(self.cursor)
|
||||
|
||||
factory = FakeConnectionFactory()
|
||||
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
|
||||
|
||||
view = reader.read_trace("trace-1", ("ERROR",), 1)
|
||||
|
||||
assert view is not None
|
||||
assert view.trace_id == "trace-1"
|
||||
assert view.parent_id == "parent-1"
|
||||
assert len(view.ancestors) == 1
|
||||
assert view.ancestors[0].trace_id == "parent-1"
|
||||
assert view.ancestors[0].parent_id == "root-1"
|
||||
assert view.ancestors[0].child_ids == ("trace-1",)
|
||||
|
||||
|
||||
def test_mysql_trace_log_reader_orders_ancestors_root_first() -> None:
|
||||
class FakeCursor:
|
||||
def __init__(self) -> None:
|
||||
self.executed: list[tuple[str, tuple[object, ...]]] = []
|
||||
self._current_query = ""
|
||||
|
||||
def execute(self, query: str, params: tuple[object, ...]) -> None:
|
||||
self.executed.append((query, params))
|
||||
self._current_query = query
|
||||
|
||||
def fetchone(self) -> dict[str, object] | None:
|
||||
if self.executed[-1][1] == ("trace-1",):
|
||||
return {"parent_id": "parent-1"}
|
||||
if self.executed[-1][1] == ("parent-1",):
|
||||
return {"parent_id": "root-1"}
|
||||
if self.executed[-1][1] == ("root-1",):
|
||||
return {"parent_id": None}
|
||||
return None
|
||||
|
||||
def fetchall(self) -> list[dict[str, object]]:
|
||||
if "WHERE parent_id = %s" in self._current_query:
|
||||
parent_id = self.executed[-1][1][0]
|
||||
if parent_id == "root-1":
|
||||
return [{"trace_id": "parent-1"}]
|
||||
if parent_id == "parent-1":
|
||||
return [{"trace_id": "trace-1"}]
|
||||
return []
|
||||
trace_id = self.executed[-1][1][0]
|
||||
return [
|
||||
{
|
||||
"id": 8,
|
||||
"trace_id": trace_id,
|
||||
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
||||
"step": "parse",
|
||||
"status": "failed",
|
||||
"level": "ERROR",
|
||||
"message": f"broken:{trace_id}",
|
||||
"attrs_json": '{"attempt":1}',
|
||||
}
|
||||
]
|
||||
|
||||
def __enter__(self) -> FakeCursor:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
return None
|
||||
|
||||
class FakeConnection:
|
||||
def __init__(self, cursor: FakeCursor) -> None:
|
||||
self._cursor = cursor
|
||||
|
||||
def cursor(self) -> FakeCursor:
|
||||
return self._cursor
|
||||
|
||||
def __enter__(self) -> FakeConnection:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
return None
|
||||
|
||||
class FakeConnectionFactory:
|
||||
def __init__(self) -> None:
|
||||
self.cursor = FakeCursor()
|
||||
|
||||
def connect(self) -> FakeConnection:
|
||||
return FakeConnection(self.cursor)
|
||||
|
||||
factory = FakeConnectionFactory()
|
||||
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
|
||||
|
||||
view = reader.read_trace("trace-1", ("ERROR",), 2)
|
||||
|
||||
assert view is not None
|
||||
assert tuple(ancestor.trace_id for ancestor in view.ancestors) == ("root-1", "parent-1")
|
||||
|
||||
Reference in New Issue
Block a user