Compare commits
8 Commits
2e75e53b89
..
main
| Author | SHA1 | Date | |
|---|---|---|---|
| e6509ee0cd | |||
| e2b817f785 | |||
| 8cad1d00ec | |||
| ec3198dbf1 | |||
| aed12c9c4e | |||
| df50e7acbb | |||
| 8789fcc0d1 | |||
| b2915c3987 |
@@ -319,7 +319,7 @@ with traces.open_context(alias="email:123", kind="email") as message_trace_id:
|
|||||||
- Consumer-воркер извлекает через `get(timeout)` и обрабатывает.
|
- Consumer-воркер извлекает через `get(timeout)` и обрабатывает.
|
||||||
|
|
||||||
## 5. Application HTTP
|
## 5. Application HTTP
|
||||||
`PLBA` поддерживает отдельный прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. Он не смешивается с `control plane` и поднимается отдельным сервисом внутри того же runtime.
|
`PLBA` поддерживает прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. По умолчанию он отделён от `control plane`, но при необходимости можно опубликовать control routes и business routes через один HTTP channel.
|
||||||
|
|
||||||
`Control Plane` и `Application HTTP` обслуживают разные контуры:
|
`Control Plane` и `Application HTTP` обслуживают разные контуры:
|
||||||
- `Control Plane` используется для `/health`, `/actions/*`, `/traces/*`.
|
- `Control Plane` используется для `/health`, `/actions/*`, `/traces/*`.
|
||||||
@@ -330,6 +330,7 @@ with traces.open_context(alias="email:123", kind="email") as message_trace_id:
|
|||||||
- `HttpApplicationChannel` поднимает отдельный `FastAPI` app через `uvicorn`.
|
- `HttpApplicationChannel` поднимает отдельный `FastAPI` app через `uvicorn`.
|
||||||
- `HttpRouteRegistrar` регистрирует пользовательские routes и получает `ServiceContainer`.
|
- `HttpRouteRegistrar` регистрирует пользовательские routes и получает `ServiceContainer`.
|
||||||
- `HttpApplicationAppFactory` собирает `FastAPI(title="PLBA Application API")`, middleware и routes.
|
- `HttpApplicationAppFactory` собирает `FastAPI(title="PLBA Application API")`, middleware и routes.
|
||||||
|
- `UnifiedHttpService` собирает один `FastAPI` app с control routes (`/health`, `/actions/*`, `/traces/*`) и application routes.
|
||||||
|
|
||||||
### Минимальный пример
|
### Минимальный пример
|
||||||
```python
|
```python
|
||||||
@@ -370,6 +371,20 @@ runtime.start()
|
|||||||
- `GET /demo`
|
- `GET /demo`
|
||||||
- `POST /demo/api/tasks`
|
- `POST /demo/api/tasks`
|
||||||
|
|
||||||
|
### Единый HTTP API
|
||||||
|
Если бизнес-приложению нужен один опубликованный порт для control и application routes, передайте `UnifiedHttpService` как `application_http`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from plba import HttpApplicationChannel, RuntimeManager, UnifiedHttpService
|
||||||
|
|
||||||
|
runtime = RuntimeManager(application_http=UnifiedHttpService())
|
||||||
|
runtime.application_http.register_channel(
|
||||||
|
HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
Старый режим с отдельными `ControlPlaneService` и `ApplicationHttpService` остаётся доступен и работает как раньше.
|
||||||
|
|
||||||
### Типовые сценарии
|
### Типовые сценарии
|
||||||
- `Web UI для фоновых задач`: список задач, запуск, статус, cancel, download result.
|
- `Web UI для фоновых задач`: список задач, запуск, статус, cancel, download result.
|
||||||
- `Внутренний REST API`: например `POST /jobs`, `GET /jobs/{id}`, `POST /jobs/{id}/cancel`.
|
- `Внутренний REST API`: например `POST /jobs`, `GET /jobs/{id}`, `POST /jobs/{id}/cancel`.
|
||||||
@@ -377,7 +392,7 @@ runtime.start()
|
|||||||
- `Webhook endpoint`: прием callback от внешней системы и передача события в worker pipeline.
|
- `Webhook endpoint`: прием callback от внешней системы и передача события в worker pipeline.
|
||||||
|
|
||||||
### Ограничения и рекомендации
|
### Ограничения и рекомендации
|
||||||
- Не смешивайте business routes с `control plane`.
|
- По умолчанию держите business routes и `control plane` раздельно; используйте `UnifiedHttpService`, когда один опубликованный порт является явным эксплуатационным требованием.
|
||||||
- Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер.
|
- Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер.
|
||||||
- Runtime предоставляет доступ к зависимостям через `services`, поэтому handlers не должны зависеть от `RuntimeManager`.
|
- Runtime предоставляет доступ к зависимостям через `services`, поэтому handlers не должны зависеть от `RuntimeManager`.
|
||||||
- В первой версии `Application HTTP` не решает auth, static files, шаблонизаторы, websocket и OpenAPI customization.
|
- В первой версии `Application HTTP` не решает auth, static files, шаблонизаторы, websocket и OpenAPI customization.
|
||||||
|
|||||||
+1
-1
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "plba"
|
name = "plba"
|
||||||
version = "0.3.14"
|
version = "0.4.0"
|
||||||
description = "Platform runtime for business applications"
|
description = "Platform runtime for business applications"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
requires-python = ">=3.11"
|
requires-python = ">=3.11"
|
||||||
|
|||||||
@@ -92,8 +92,14 @@ class TraceLogView:
|
|||||||
parent_id: str | None
|
parent_id: str | None
|
||||||
child_ids: tuple[str, ...] = ()
|
child_ids: tuple[str, ...] = ()
|
||||||
records: tuple[TraceLogRecord, ...] = ()
|
records: tuple[TraceLogRecord, ...] = ()
|
||||||
|
ancestors: tuple[TraceLogView, ...] = ()
|
||||||
|
|
||||||
|
|
||||||
class TraceLogReader(Protocol):
|
class TraceLogReader(Protocol):
|
||||||
def read_trace(self, trace_id: str, levels: tuple[TraceLevel, ...]) -> TraceLogView | None:
|
def read_trace(
|
||||||
|
self,
|
||||||
|
trace_id: str,
|
||||||
|
levels: tuple[TraceLevel, ...],
|
||||||
|
ancestor_depth: int | None = 0,
|
||||||
|
) -> TraceLogView | None:
|
||||||
"""Load trace context and filtered log records."""
|
"""Load trace context and filtered log records."""
|
||||||
|
|||||||
@@ -0,0 +1,46 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from fastapi.responses import JSONResponse
|
||||||
|
|
||||||
|
from app_runtime.control.base import ControlActionRequest, ControlActionSet
|
||||||
|
|
||||||
|
|
||||||
|
class ControlActionResponder:
|
||||||
|
def __init__(self, actions: ControlActionSet, timeout: int) -> None:
|
||||||
|
self._actions = actions
|
||||||
|
self._timeout = timeout
|
||||||
|
|
||||||
|
async def respond(
|
||||||
|
self,
|
||||||
|
action: str,
|
||||||
|
_client_source: str = "unknown",
|
||||||
|
request: ControlActionRequest | None = None,
|
||||||
|
) -> JSONResponse:
|
||||||
|
callbacks = {
|
||||||
|
"start": self._actions.start,
|
||||||
|
"stop": self._actions.stop,
|
||||||
|
"status": self._actions.status,
|
||||||
|
}
|
||||||
|
callback = callbacks.get(action)
|
||||||
|
if callback is None:
|
||||||
|
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
|
||||||
|
action_request = request or ControlActionRequest()
|
||||||
|
action_timeout = self._action_timeout(action, action_request)
|
||||||
|
try:
|
||||||
|
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
return JSONResponse(
|
||||||
|
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
|
||||||
|
status_code=202,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
||||||
|
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
|
||||||
|
|
||||||
|
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
|
||||||
|
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
|
||||||
|
if action != "stop" or request.wait is False or request.timeout is None:
|
||||||
|
return base_timeout
|
||||||
|
return max(base_timeout, float(request.timeout) + 1.0)
|
||||||
@@ -27,6 +27,7 @@ class TraceQueryRequest:
|
|||||||
levels: tuple[TraceLevel, ...] = ("ERROR", "WARNING", "INFO")
|
levels: tuple[TraceLevel, ...] = ("ERROR", "WARNING", "INFO")
|
||||||
include_attrs_json: bool = False
|
include_attrs_json: bool = False
|
||||||
response_format: TraceResponseFormat = "html"
|
response_format: TraceResponseFormat = "html"
|
||||||
|
ancestor_depth: int | None = 0
|
||||||
|
|
||||||
|
|
||||||
TraceLookupHandler = Callable[[str, TraceQueryRequest], Awaitable[TraceLogView]]
|
TraceLookupHandler = Callable[[str, TraceQueryRequest], Awaitable[TraceLogView]]
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import asyncio
|
|||||||
|
|
||||||
from fastapi.responses import JSONResponse
|
from fastapi.responses import JSONResponse
|
||||||
|
|
||||||
|
from app_runtime.control.action_responder import ControlActionResponder
|
||||||
from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel, TraceQueryRequest
|
from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel, TraceQueryRequest
|
||||||
from app_runtime.contracts.trace import TraceLogView
|
from app_runtime.contracts.trace import TraceLogView
|
||||||
from app_runtime.control.http_app import HttpControlAppFactory
|
from app_runtime.control.http_app import HttpControlAppFactory
|
||||||
@@ -16,9 +17,11 @@ class HttpControlChannel(ControlChannel):
|
|||||||
self._runner = UvicornThreadRunner(host, port, timeout)
|
self._runner = UvicornThreadRunner(host, port, timeout)
|
||||||
self._factory = HttpControlAppFactory()
|
self._factory = HttpControlAppFactory()
|
||||||
self._actions: ControlActionSet | None = None
|
self._actions: ControlActionSet | None = None
|
||||||
|
self._action_responder: ControlActionResponder | None = None
|
||||||
|
|
||||||
async def start(self, actions: ControlActionSet) -> None:
|
async def start(self, actions: ControlActionSet) -> None:
|
||||||
self._actions = actions
|
self._actions = actions
|
||||||
|
self._action_responder = ControlActionResponder(actions, self._timeout)
|
||||||
app = self._factory.create(self._health_response, self._action_response, self._trace_response)
|
app = self._factory.create(self._health_response, self._action_response, self._trace_response)
|
||||||
await self._runner.start(app)
|
await self._runner.start(app)
|
||||||
|
|
||||||
@@ -40,34 +43,14 @@ class HttpControlChannel(ControlChannel):
|
|||||||
_client_source: str = "unknown",
|
_client_source: str = "unknown",
|
||||||
request: ControlActionRequest | None = None,
|
request: ControlActionRequest | None = None,
|
||||||
) -> JSONResponse:
|
) -> JSONResponse:
|
||||||
|
if self._action_responder is None:
|
||||||
if self._actions is None:
|
if self._actions is None:
|
||||||
return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404)
|
|
||||||
callbacks = {
|
|
||||||
"start": self._actions.start,
|
|
||||||
"stop": self._actions.stop,
|
|
||||||
"status": self._actions.status,
|
|
||||||
}
|
|
||||||
callback = callbacks.get(action)
|
|
||||||
if callback is None:
|
|
||||||
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
|
|
||||||
action_request = request or ControlActionRequest()
|
|
||||||
action_timeout = self._action_timeout(action, action_request)
|
|
||||||
try:
|
|
||||||
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
return JSONResponse(
|
return JSONResponse(
|
||||||
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
|
content={"status": "error", "detail": f"{action} handler is not configured"},
|
||||||
status_code=202,
|
status_code=404,
|
||||||
)
|
)
|
||||||
except Exception as exc:
|
self._action_responder = ControlActionResponder(self._actions, self._timeout)
|
||||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
return await self._action_responder.respond(action, _client_source, request)
|
||||||
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
|
|
||||||
|
|
||||||
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
|
|
||||||
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
|
|
||||||
if action != "stop" or request.wait is False or request.timeout is None:
|
|
||||||
return base_timeout
|
|
||||||
return max(base_timeout, float(request.timeout) + 1.0)
|
|
||||||
|
|
||||||
async def _trace_response(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView:
|
async def _trace_response(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView:
|
||||||
if self._actions is None or self._actions.trace_lookup is None:
|
if self._actions is None or self._actions.trace_lookup is None:
|
||||||
|
|||||||
@@ -11,6 +11,9 @@ from app_runtime.control.base import TraceQueryRequest
|
|||||||
from app_runtime.contracts.trace import TraceLevel, TraceLogRecord, TraceLogView
|
from app_runtime.contracts.trace import TraceLevel, TraceLogRecord, TraceLogView
|
||||||
|
|
||||||
|
|
||||||
|
TRACE_SECTION_SEPARATOR = "=" * 30
|
||||||
|
|
||||||
|
|
||||||
class TraceRequestParser:
|
class TraceRequestParser:
|
||||||
def parse(self, request: Request) -> TraceQueryRequest:
|
def parse(self, request: Request) -> TraceQueryRequest:
|
||||||
raw_levels = request.query_params.get("levels")
|
raw_levels = request.query_params.get("levels")
|
||||||
@@ -22,6 +25,7 @@ class TraceRequestParser:
|
|||||||
levels=self._trace_levels(raw_levels),
|
levels=self._trace_levels(raw_levels),
|
||||||
include_attrs_json=self._bool_param(request, "attrs_json") or False,
|
include_attrs_json=self._bool_param(request, "attrs_json") or False,
|
||||||
response_format=response_format,
|
response_format=response_format,
|
||||||
|
ancestor_depth=self._ancestor_depth(request),
|
||||||
)
|
)
|
||||||
|
|
||||||
def _trace_levels(self, raw_levels: str | None) -> tuple[TraceLevel, ...]:
|
def _trace_levels(self, raw_levels: str | None) -> tuple[TraceLevel, ...]:
|
||||||
@@ -47,6 +51,21 @@ class TraceRequestParser:
|
|||||||
return False
|
return False
|
||||||
raise ValueError(f"invalid boolean query parameter: {name}={raw_value}")
|
raise ValueError(f"invalid boolean query parameter: {name}={raw_value}")
|
||||||
|
|
||||||
|
def _ancestor_depth(self, request: Request) -> int | None:
|
||||||
|
raw_value = request.query_params.get("ancestor_depth")
|
||||||
|
if raw_value is None:
|
||||||
|
return 0
|
||||||
|
normalized = raw_value.strip().lower()
|
||||||
|
if normalized == "all":
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
value = int(normalized)
|
||||||
|
except ValueError as exc:
|
||||||
|
raise ValueError(f"invalid ancestor depth query parameter: ancestor_depth={raw_value}") from exc
|
||||||
|
if value < 0:
|
||||||
|
raise ValueError(f"query parameter must be >= 0: ancestor_depth={raw_value}")
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
class TraceResponseRenderer:
|
class TraceResponseRenderer:
|
||||||
def render(self, trace_view: TraceLogView, request: TraceQueryRequest) -> Response:
|
def render(self, trace_view: TraceLogView, request: TraceQueryRequest) -> Response:
|
||||||
@@ -63,26 +82,19 @@ class TraceResponseRenderer:
|
|||||||
"parent_id": trace_view.parent_id or "",
|
"parent_id": trace_view.parent_id or "",
|
||||||
"child_ids": list(trace_view.child_ids),
|
"child_ids": list(trace_view.child_ids),
|
||||||
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
|
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
|
||||||
|
"ancestors": [self._trace_payload(view, request) for view in trace_view.ancestors],
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
def _render_text(self, trace_view: TraceLogView, request: TraceQueryRequest) -> PlainTextResponse:
|
def _render_text(self, trace_view: TraceLogView, request: TraceQueryRequest) -> PlainTextResponse:
|
||||||
lines = [
|
lineage = [*trace_view.ancestors, trace_view]
|
||||||
f"trace_id: {trace_view.trace_id}",
|
lines = self._text_trace_summary_lines(trace_view)
|
||||||
f"parent_id: {trace_view.parent_id or ''}",
|
for index, entry in enumerate(lineage):
|
||||||
*self._child_id_lines(trace_view.child_ids),
|
if index == 0:
|
||||||
"--------------------------------------------------",
|
lines.append("")
|
||||||
]
|
else:
|
||||||
previous_step: str | None = None
|
lines.extend(["", ""])
|
||||||
for record in trace_view.records:
|
lines.extend(self._text_trace_log_lines(entry, request))
|
||||||
current_step = str(record.step or "")
|
|
||||||
if previous_step is None:
|
|
||||||
lines.append(f"step: {current_step}")
|
|
||||||
elif current_step != previous_step:
|
|
||||||
lines.append("--------------------------------------------------")
|
|
||||||
lines.append(f"step: {current_step}")
|
|
||||||
previous_step = current_step
|
|
||||||
lines.append(self._text_message(record, request.include_attrs_json))
|
|
||||||
return PlainTextResponse(content="\n".join(lines))
|
return PlainTextResponse(content="\n".join(lines))
|
||||||
|
|
||||||
def _render_html(self, trace_view: TraceLogView, request: TraceQueryRequest) -> HTMLResponse:
|
def _render_html(self, trace_view: TraceLogView, request: TraceQueryRequest) -> HTMLResponse:
|
||||||
@@ -162,22 +174,75 @@ class TraceResponseRenderer:
|
|||||||
return f"{record.message}, {json.dumps(record.attrs_json, ensure_ascii=False, separators=(',', ':'))}"
|
return f"{record.message}, {json.dumps(record.attrs_json, ensure_ascii=False, separators=(',', ':'))}"
|
||||||
|
|
||||||
def _trace_href(self, trace_id: str, request: TraceQueryRequest) -> str:
|
def _trace_href(self, trace_id: str, request: TraceQueryRequest) -> str:
|
||||||
params = urlencode(
|
params = {
|
||||||
{
|
|
||||||
"format": "html",
|
"format": "html",
|
||||||
"levels": ",".join(request.levels),
|
"levels": ",".join(request.levels),
|
||||||
"attrs_json": "true" if request.include_attrs_json else "false",
|
"attrs_json": "true" if request.include_attrs_json else "false",
|
||||||
}
|
}
|
||||||
)
|
if request.ancestor_depth is None:
|
||||||
return f"/traces/{trace_id}?{params}"
|
params["ancestor_depth"] = "all"
|
||||||
|
elif request.ancestor_depth > 0:
|
||||||
|
params["ancestor_depth"] = str(request.ancestor_depth)
|
||||||
|
query = urlencode(params)
|
||||||
|
return f"/traces/{trace_id}?{query}"
|
||||||
|
|
||||||
def _html_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> str:
|
def _html_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> str:
|
||||||
|
lineage = [*trace_view.ancestors, trace_view]
|
||||||
|
lines = self._html_trace_summary_lines(trace_view, request)
|
||||||
|
for index, entry in enumerate(lineage):
|
||||||
|
if index == 0:
|
||||||
|
lines.append(self._html_plain_line(""))
|
||||||
|
else:
|
||||||
|
lines.extend([self._html_plain_line(""), self._html_plain_line("")])
|
||||||
|
lines.extend(self._html_trace_log_lines(entry, request))
|
||||||
|
return "".join(lines)
|
||||||
|
|
||||||
|
def _trace_payload(self, trace_view: TraceLogView, request: TraceQueryRequest) -> dict[str, object]:
|
||||||
|
return {
|
||||||
|
"trace_id": trace_view.trace_id,
|
||||||
|
"parent_id": trace_view.parent_id or "",
|
||||||
|
"child_ids": list(trace_view.child_ids),
|
||||||
|
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
|
||||||
|
}
|
||||||
|
|
||||||
|
def _text_trace_summary_lines(self, trace_view: TraceLogView) -> list[str]:
|
||||||
|
return [
|
||||||
|
f"trace_id: {trace_view.trace_id}",
|
||||||
|
f"parent_id: {trace_view.parent_id or ''}",
|
||||||
|
*self._child_id_lines(trace_view.child_ids),
|
||||||
|
]
|
||||||
|
|
||||||
|
def _text_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
|
||||||
lines = [
|
lines = [
|
||||||
|
TRACE_SECTION_SEPARATOR,
|
||||||
|
f"trace_id: {trace_view.trace_id}",
|
||||||
|
"",
|
||||||
|
]
|
||||||
|
previous_step: str | None = None
|
||||||
|
for record in trace_view.records:
|
||||||
|
current_step = str(record.step or "")
|
||||||
|
if previous_step is None:
|
||||||
|
lines.append(f"step: {current_step}")
|
||||||
|
elif current_step != previous_step:
|
||||||
|
lines.append("------------------------------")
|
||||||
|
lines.append(f"step: {current_step}")
|
||||||
|
previous_step = current_step
|
||||||
|
lines.append(self._text_message(record, request.include_attrs_json))
|
||||||
|
return lines
|
||||||
|
|
||||||
|
def _html_trace_summary_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
|
||||||
|
return [
|
||||||
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
|
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
|
||||||
self._html_plain_line(f"parent_id: {self._optional_trace_link(trace_view.parent_id, request)}"),
|
self._html_plain_line(f"parent_id: {self._optional_trace_link(trace_view.parent_id, request)}"),
|
||||||
self._html_plain_line("child_ids:"),
|
self._html_plain_line("child_ids:"),
|
||||||
*(self._html_plain_line(f" - {self._trace_link(child_id, request)}") for child_id in trace_view.child_ids),
|
*(self._html_plain_line(f" - {self._trace_link(child_id, request)}") for child_id in trace_view.child_ids),
|
||||||
self._html_plain_line("--------------------------------------------------"),
|
]
|
||||||
|
|
||||||
|
def _html_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
|
||||||
|
lines = [
|
||||||
|
self._html_plain_line(TRACE_SECTION_SEPARATOR),
|
||||||
|
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
|
||||||
|
self._html_plain_line(""),
|
||||||
]
|
]
|
||||||
previous_step: str | None = None
|
previous_step: str | None = None
|
||||||
for record in trace_view.records:
|
for record in trace_view.records:
|
||||||
@@ -187,12 +252,12 @@ class TraceResponseRenderer:
|
|||||||
lines.append(self._html_plain_line(""))
|
lines.append(self._html_plain_line(""))
|
||||||
elif current_step != previous_step:
|
elif current_step != previous_step:
|
||||||
lines.append(self._html_plain_line(""))
|
lines.append(self._html_plain_line(""))
|
||||||
lines.append(self._html_plain_line("--------------------------------------------------"))
|
lines.append(self._html_plain_line("------------------------------"))
|
||||||
lines.append(self._html_step_line(current_step))
|
lines.append(self._html_step_line(current_step))
|
||||||
lines.append(self._html_plain_line(""))
|
lines.append(self._html_plain_line(""))
|
||||||
previous_step = current_step
|
previous_step = current_step
|
||||||
lines.extend(self._html_message_lines(record, request.include_attrs_json))
|
lines.extend(self._html_message_lines(record, request.include_attrs_json))
|
||||||
return "".join(lines)
|
return lines
|
||||||
|
|
||||||
def _html_message_lines(self, record: TraceLogRecord, include_attrs_json: bool) -> list[str]:
|
def _html_message_lines(self, record: TraceLogRecord, include_attrs_json: bool) -> list[str]:
|
||||||
lines = [self._html_colored_line(self._text_message(record, include_attrs_json), record.level)]
|
lines = [self._html_colored_line(self._text_message(record, include_attrs_json), record.level)]
|
||||||
|
|||||||
@@ -141,7 +141,7 @@ class RuntimeManager:
|
|||||||
reader = build_trace_log_reader(self.traces.transport)
|
reader = build_trace_log_reader(self.traces.transport)
|
||||||
if reader is None:
|
if reader is None:
|
||||||
raise RuntimeError("trace log reader is not configured")
|
raise RuntimeError("trace log reader is not configured")
|
||||||
trace_view = reader.read_trace(trace_id, request.levels)
|
trace_view = reader.read_trace(trace_id, request.levels, request.ancestor_depth)
|
||||||
if trace_view is None:
|
if trace_view is None:
|
||||||
raise KeyError(trace_id)
|
raise KeyError(trace_id)
|
||||||
return trace_view
|
return trace_view
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
|
|||||||
from app_runtime.http.http_app import HttpApplicationAppFactory
|
from app_runtime.http.http_app import HttpApplicationAppFactory
|
||||||
from app_runtime.http.http_channel import HttpApplicationChannel
|
from app_runtime.http.http_channel import HttpApplicationChannel
|
||||||
from app_runtime.http.service import ApplicationHttpService
|
from app_runtime.http.service import ApplicationHttpService
|
||||||
|
from app_runtime.http.unified_service import UnifiedHttpService
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"ApplicationHttpChannel",
|
"ApplicationHttpChannel",
|
||||||
@@ -9,4 +10,5 @@ __all__ = [
|
|||||||
"HttpApplicationAppFactory",
|
"HttpApplicationAppFactory",
|
||||||
"HttpApplicationChannel",
|
"HttpApplicationChannel",
|
||||||
"HttpRouteRegistrar",
|
"HttpRouteRegistrar",
|
||||||
|
"UnifiedHttpService",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -0,0 +1,68 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from app_runtime.control.action_responder import ControlActionResponder
|
||||||
|
from app_runtime.control.base import ControlActionSet
|
||||||
|
from app_runtime.control.http_app import HttpControlAppFactory
|
||||||
|
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
|
||||||
|
from app_runtime.http.http_app import HttpApplicationAppFactory
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from app_runtime.core.runtime import RuntimeManager
|
||||||
|
|
||||||
|
|
||||||
|
class UnifiedHttpService:
|
||||||
|
"""Publish control and application routes through the same HTTP channel."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
app_factory: HttpApplicationAppFactory | None = None,
|
||||||
|
control_app_factory: HttpControlAppFactory | None = None,
|
||||||
|
control_timeout: int = 5,
|
||||||
|
) -> None:
|
||||||
|
self._channels: list[ApplicationHttpChannel] = []
|
||||||
|
self._registrars: list[HttpRouteRegistrar] = []
|
||||||
|
self._app_factory = app_factory or HttpApplicationAppFactory()
|
||||||
|
self._control_app_factory = control_app_factory or HttpControlAppFactory()
|
||||||
|
self._control_timeout = control_timeout
|
||||||
|
|
||||||
|
def register_channel(self, channel: ApplicationHttpChannel) -> None:
|
||||||
|
self._channels.append(channel)
|
||||||
|
|
||||||
|
def register_routes(self, registrar: HttpRouteRegistrar) -> None:
|
||||||
|
self._registrars.append(registrar)
|
||||||
|
|
||||||
|
def start(self, runtime: RuntimeManager) -> None:
|
||||||
|
if not self._channels:
|
||||||
|
return
|
||||||
|
asyncio.run(self._start_async(runtime))
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
if not self._channels:
|
||||||
|
return
|
||||||
|
asyncio.run(self._stop_async())
|
||||||
|
|
||||||
|
async def _start_async(self, runtime: RuntimeManager) -> None:
|
||||||
|
app = self._app_factory.create(self._registrars, runtime.services)
|
||||||
|
actions = ControlActionSet(
|
||||||
|
health=runtime.health_status,
|
||||||
|
start=runtime.start_runtime,
|
||||||
|
stop=runtime.stop_runtime,
|
||||||
|
status=runtime.runtime_status,
|
||||||
|
trace_lookup=runtime.trace_logs,
|
||||||
|
)
|
||||||
|
action_responder = ControlActionResponder(actions, self._control_timeout)
|
||||||
|
control_app = self._control_app_factory.create(
|
||||||
|
actions.health,
|
||||||
|
action_responder.respond,
|
||||||
|
actions.trace_lookup,
|
||||||
|
)
|
||||||
|
app.router.routes.extend(control_app.router.routes)
|
||||||
|
for channel in self._channels:
|
||||||
|
await channel.start(app)
|
||||||
|
|
||||||
|
async def _stop_async(self) -> None:
|
||||||
|
for channel in reversed(self._channels):
|
||||||
|
await channel.stop()
|
||||||
@@ -11,10 +11,16 @@ class MySqlTraceLogReader(TraceLogReader):
|
|||||||
def __init__(self, connection_factory: MySqlTraceConnectionFactory) -> None:
|
def __init__(self, connection_factory: MySqlTraceConnectionFactory) -> None:
|
||||||
self._connection_factory = connection_factory
|
self._connection_factory = connection_factory
|
||||||
|
|
||||||
def read_trace(self, trace_id: str, levels: tuple[TraceLevel, ...]) -> TraceLogView | None:
|
def read_trace(
|
||||||
|
self,
|
||||||
|
trace_id: str,
|
||||||
|
levels: tuple[TraceLevel, ...],
|
||||||
|
ancestor_depth: int | None = 0,
|
||||||
|
) -> TraceLogView | None:
|
||||||
parent_id = self._read_parent_id(trace_id)
|
parent_id = self._read_parent_id(trace_id)
|
||||||
if parent_id is None and not self._trace_exists(trace_id):
|
if parent_id is None and not self._trace_exists(trace_id):
|
||||||
return None
|
return None
|
||||||
|
ancestors = self._read_ancestors(parent_id, levels, ancestor_depth)
|
||||||
child_ids = self._read_child_ids(trace_id)
|
child_ids = self._read_child_ids(trace_id)
|
||||||
records = self._read_records(trace_id, levels)
|
records = self._read_records(trace_id, levels)
|
||||||
return TraceLogView(
|
return TraceLogView(
|
||||||
@@ -22,8 +28,36 @@ class MySqlTraceLogReader(TraceLogReader):
|
|||||||
parent_id=parent_id,
|
parent_id=parent_id,
|
||||||
child_ids=tuple(child_ids),
|
child_ids=tuple(child_ids),
|
||||||
records=tuple(records),
|
records=tuple(records),
|
||||||
|
ancestors=tuple(ancestors),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _read_ancestors(
|
||||||
|
self,
|
||||||
|
parent_id: str | None,
|
||||||
|
levels: tuple[TraceLevel, ...],
|
||||||
|
ancestor_depth: int | None,
|
||||||
|
) -> list[TraceLogView]:
|
||||||
|
if parent_id is None or ancestor_depth == 0:
|
||||||
|
return []
|
||||||
|
remaining_depth = ancestor_depth
|
||||||
|
ancestors: list[TraceLogView] = []
|
||||||
|
current_trace_id = parent_id
|
||||||
|
while current_trace_id is not None and (remaining_depth is None or remaining_depth > 0):
|
||||||
|
current_parent_id = self._read_parent_id(current_trace_id)
|
||||||
|
ancestors.append(
|
||||||
|
TraceLogView(
|
||||||
|
trace_id=current_trace_id,
|
||||||
|
parent_id=current_parent_id,
|
||||||
|
child_ids=tuple(self._read_child_ids(current_trace_id)),
|
||||||
|
records=tuple(self._read_records(current_trace_id, levels)),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
current_trace_id = current_parent_id
|
||||||
|
if remaining_depth is not None:
|
||||||
|
remaining_depth -= 1
|
||||||
|
ancestors.reverse()
|
||||||
|
return ancestors
|
||||||
|
|
||||||
def _trace_exists(self, trace_id: str) -> bool:
|
def _trace_exists(self, trace_id: str) -> bool:
|
||||||
query = "SELECT 1 FROM trace_contexts WHERE trace_id = %s"
|
query = "SELECT 1 FROM trace_contexts WHERE trace_id = %s"
|
||||||
with self._connection_factory.connect() as connection:
|
with self._connection_factory.connect() as connection:
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ from plba.http import (
|
|||||||
HttpApplicationAppFactory,
|
HttpApplicationAppFactory,
|
||||||
HttpApplicationChannel,
|
HttpApplicationChannel,
|
||||||
HttpRouteRegistrar,
|
HttpRouteRegistrar,
|
||||||
|
UnifiedHttpService,
|
||||||
)
|
)
|
||||||
from plba.logging import LogManager
|
from plba.logging import LogManager
|
||||||
from plba.queue import InMemoryTaskQueue
|
from plba.queue import InMemoryTaskQueue
|
||||||
@@ -56,6 +57,7 @@ __all__ = [
|
|||||||
"HttpApplicationChannel",
|
"HttpApplicationChannel",
|
||||||
"HttpRouteRegistrar",
|
"HttpRouteRegistrar",
|
||||||
"HttpControlChannel",
|
"HttpControlChannel",
|
||||||
|
"UnifiedHttpService",
|
||||||
"InMemoryTaskQueue",
|
"InMemoryTaskQueue",
|
||||||
"LogManager",
|
"LogManager",
|
||||||
"MySqlTraceTransport",
|
"MySqlTraceTransport",
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
|
|||||||
from app_runtime.http.http_app import HttpApplicationAppFactory
|
from app_runtime.http.http_app import HttpApplicationAppFactory
|
||||||
from app_runtime.http.http_channel import HttpApplicationChannel
|
from app_runtime.http.http_channel import HttpApplicationChannel
|
||||||
from app_runtime.http.service import ApplicationHttpService
|
from app_runtime.http.service import ApplicationHttpService
|
||||||
|
from app_runtime.http.unified_service import UnifiedHttpService
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"ApplicationHttpChannel",
|
"ApplicationHttpChannel",
|
||||||
@@ -9,4 +10,5 @@ __all__ = [
|
|||||||
"HttpApplicationAppFactory",
|
"HttpApplicationAppFactory",
|
||||||
"HttpApplicationChannel",
|
"HttpApplicationChannel",
|
||||||
"HttpRouteRegistrar",
|
"HttpRouteRegistrar",
|
||||||
|
"UnifiedHttpService",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ from app_runtime.core.registration import ModuleRegistry
|
|||||||
from app_runtime.core.runtime import RuntimeManager
|
from app_runtime.core.runtime import RuntimeManager
|
||||||
from app_runtime.http.base import ApplicationHttpChannel
|
from app_runtime.http.base import ApplicationHttpChannel
|
||||||
from app_runtime.http.http_channel import HttpApplicationChannel
|
from app_runtime.http.http_channel import HttpApplicationChannel
|
||||||
|
from app_runtime.http.unified_service import UnifiedHttpService
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import python_multipart # noqa: F401
|
import python_multipart # noqa: F401
|
||||||
@@ -207,3 +208,27 @@ def test_control_plane_and_application_http_work_independently() -> None:
|
|||||||
finally:
|
finally:
|
||||||
runtime.control_plane.stop()
|
runtime.control_plane.stop()
|
||||||
runtime.stop(stop_control_plane=False)
|
runtime.stop(stop_control_plane=False)
|
||||||
|
|
||||||
|
|
||||||
|
def test_unified_http_serves_control_and_application_routes_on_one_app() -> None:
|
||||||
|
runtime = RuntimeManager(application_http=UnifiedHttpService())
|
||||||
|
channel = RecordingChannel()
|
||||||
|
runtime.application_http.register_channel(channel)
|
||||||
|
runtime.register_module(HttpModule(PingRoutes()))
|
||||||
|
runtime.start(start_control_plane=False)
|
||||||
|
|
||||||
|
try:
|
||||||
|
client = _application_client(channel)
|
||||||
|
with client:
|
||||||
|
health_response = client.get("/health")
|
||||||
|
action_response = client.get("/actions/status")
|
||||||
|
app_response = client.get("/estimate/ping")
|
||||||
|
|
||||||
|
assert health_response.status_code == 200
|
||||||
|
assert health_response.json()["status"] == "ok"
|
||||||
|
assert action_response.status_code == 200
|
||||||
|
assert action_response.json() == {"status": "ok", "detail": "idle"}
|
||||||
|
assert app_response.status_code == 200
|
||||||
|
assert app_response.json() == {"status": "ok"}
|
||||||
|
finally:
|
||||||
|
runtime.stop(stop_control_plane=False)
|
||||||
|
|||||||
@@ -72,7 +72,17 @@ def test_trace_endpoint_returns_html_by_default() -> None:
|
|||||||
assert "trace_id:" in response.text
|
assert "trace_id:" in response.text
|
||||||
assert "first error" in response.text
|
assert "first error" in response.text
|
||||||
assert "second warning" in response.text
|
assert "second warning" in response.text
|
||||||
assert captured == [("trace-1", TraceQueryRequest(levels=("ERROR", "WARNING", "INFO"), include_attrs_json=False, response_format="html"))]
|
assert captured == [
|
||||||
|
(
|
||||||
|
"trace-1",
|
||||||
|
TraceQueryRequest(
|
||||||
|
levels=("ERROR", "WARNING", "INFO"),
|
||||||
|
include_attrs_json=False,
|
||||||
|
response_format="html",
|
||||||
|
ancestor_depth=0,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_trace_endpoint_returns_text_when_requested() -> None:
|
def test_trace_endpoint_returns_text_when_requested() -> None:
|
||||||
@@ -100,7 +110,10 @@ def test_trace_endpoint_returns_text_when_requested() -> None:
|
|||||||
"child_ids:\n"
|
"child_ids:\n"
|
||||||
" - child-1\n"
|
" - child-1\n"
|
||||||
" - child-2\n"
|
" - child-2\n"
|
||||||
"--------------------------------------------------\n"
|
"\n"
|
||||||
|
"==============================\n"
|
||||||
|
"trace_id: trace-1\n"
|
||||||
|
"\n"
|
||||||
"step: process\n"
|
"step: process\n"
|
||||||
"first error\n"
|
"first error\n"
|
||||||
"second warning"
|
"second warning"
|
||||||
@@ -129,7 +142,10 @@ def test_trace_endpoint_appends_attrs_json_in_text_mode() -> None:
|
|||||||
"trace_id: trace-1\n"
|
"trace_id: trace-1\n"
|
||||||
"parent_id: \n"
|
"parent_id: \n"
|
||||||
"child_ids:\n"
|
"child_ids:\n"
|
||||||
"--------------------------------------------------\n"
|
"\n"
|
||||||
|
"==============================\n"
|
||||||
|
"trace_id: trace-1\n"
|
||||||
|
"\n"
|
||||||
"step: process\n"
|
"step: process\n"
|
||||||
'failure, {"attempt":2,"source":"crm"}'
|
'failure, {"attempt":2,"source":"crm"}'
|
||||||
)
|
)
|
||||||
@@ -159,11 +175,14 @@ def test_trace_endpoint_separates_messages_by_step_in_text_mode() -> None:
|
|||||||
"trace_id: trace-1\n"
|
"trace_id: trace-1\n"
|
||||||
"parent_id: \n"
|
"parent_id: \n"
|
||||||
"child_ids:\n"
|
"child_ids:\n"
|
||||||
"--------------------------------------------------\n"
|
"\n"
|
||||||
|
"==============================\n"
|
||||||
|
"trace_id: trace-1\n"
|
||||||
|
"\n"
|
||||||
"step: load_stocks\n"
|
"step: load_stocks\n"
|
||||||
"load first\n"
|
"load first\n"
|
||||||
"load second\n"
|
"load second\n"
|
||||||
"--------------------------------------------------\n"
|
"------------------------------\n"
|
||||||
"step: filter_stocks\n"
|
"step: filter_stocks\n"
|
||||||
"filter first"
|
"filter first"
|
||||||
)
|
)
|
||||||
@@ -203,9 +222,82 @@ def test_trace_endpoint_returns_json_payload() -> None:
|
|||||||
"attrs_json": {"batch": 7},
|
"attrs_json": {"batch": 7},
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
|
"ancestors": [],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_trace_endpoint_returns_json_payload_with_ancestors() -> None:
|
||||||
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||||
|
return TraceLogView(
|
||||||
|
trace_id="trace-1",
|
||||||
|
parent_id="parent-1",
|
||||||
|
child_ids=("child-1",),
|
||||||
|
records=(
|
||||||
|
_trace_record(row_id=3, level="INFO", message="done", attrs_json={"batch": 7}),
|
||||||
|
),
|
||||||
|
ancestors=(
|
||||||
|
TraceLogView(
|
||||||
|
trace_id="root-1",
|
||||||
|
parent_id=None,
|
||||||
|
child_ids=("parent-1",),
|
||||||
|
records=(
|
||||||
|
_trace_record(row_id=4, level="INFO", message="root info"),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
TraceLogView(
|
||||||
|
trace_id="parent-1",
|
||||||
|
parent_id="root-1",
|
||||||
|
child_ids=("trace-1", "sibling-1"),
|
||||||
|
records=(
|
||||||
|
_trace_record(row_id=5, level="WARNING", message="parent warning"),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
client = _build_client(trace_provider)
|
||||||
|
try:
|
||||||
|
response = client.get("/traces/trace-1?format=json&ancestor_depth=1")
|
||||||
|
finally:
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json()["ancestors"] == [
|
||||||
|
{
|
||||||
|
"trace_id": "root-1",
|
||||||
|
"parent_id": "",
|
||||||
|
"child_ids": ["parent-1"],
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"id": 4,
|
||||||
|
"trace_id": "trace-1",
|
||||||
|
"event_time": "2026-04-28T10:11:12+00:00",
|
||||||
|
"step": "process",
|
||||||
|
"status": "failed",
|
||||||
|
"level": "INFO",
|
||||||
|
"message": "root info",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"trace_id": "parent-1",
|
||||||
|
"parent_id": "root-1",
|
||||||
|
"child_ids": ["trace-1", "sibling-1"],
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"id": 5,
|
||||||
|
"trace_id": "trace-1",
|
||||||
|
"event_time": "2026-04-28T10:11:12+00:00",
|
||||||
|
"step": "process",
|
||||||
|
"status": "failed",
|
||||||
|
"level": "WARNING",
|
||||||
|
"message": "parent warning",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def test_trace_endpoint_returns_html_page_with_related_links() -> None:
|
def test_trace_endpoint_returns_html_page_with_related_links() -> None:
|
||||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||||
return TraceLogView(
|
return TraceLogView(
|
||||||
@@ -240,8 +332,9 @@ def test_trace_endpoint_returns_html_page_with_related_links() -> None:
|
|||||||
assert '<div class="line">child_ids:</div>' in response.text
|
assert '<div class="line">child_ids:</div>' in response.text
|
||||||
assert '<div class="line"> - <a href="/traces/child-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">child-1</a></div>' in response.text
|
assert '<div class="line"> - <a href="/traces/child-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">child-1</a></div>' in response.text
|
||||||
assert '<div class="line"> - <a href="/traces/child-2?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">child-2</a></div>' in response.text
|
assert '<div class="line"> - <a href="/traces/child-2?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">child-2</a></div>' in response.text
|
||||||
|
assert '<div class="line">==============================</div>' in response.text
|
||||||
assert '<div class="line" style="color: var(--step);">load_stocks</div>' in response.text
|
assert '<div class="line" style="color: var(--step);">load_stocks</div>' in response.text
|
||||||
assert '<div class="line">--------------------------------------------------</div>' in response.text
|
assert '<div class="line">trace_id: <a href="/traces/trace-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">trace-1</a></div>' in response.text
|
||||||
assert '<div class="line" style="color: var(--step);">filter_stocks</div>' in response.text
|
assert '<div class="line" style="color: var(--step);">filter_stocks</div>' in response.text
|
||||||
assert "loaded prices" in response.text
|
assert "loaded prices" in response.text
|
||||||
assert "filtered suspicious ticker" in response.text
|
assert "filtered suspicious ticker" in response.text
|
||||||
@@ -250,11 +343,108 @@ def test_trace_endpoint_returns_html_page_with_related_links() -> None:
|
|||||||
assert "Related Traces" not in response.text
|
assert "Related Traces" not in response.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_trace_endpoint_renders_ancestors_in_text_mode() -> None:
|
||||||
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||||
|
return TraceLogView(
|
||||||
|
trace_id="trace-1",
|
||||||
|
parent_id="parent-1",
|
||||||
|
child_ids=(),
|
||||||
|
records=(_trace_record(row_id=1, level="INFO", message="child message"),),
|
||||||
|
ancestors=(
|
||||||
|
TraceLogView(
|
||||||
|
trace_id="root-1",
|
||||||
|
parent_id=None,
|
||||||
|
child_ids=("parent-1",),
|
||||||
|
records=(_trace_record(row_id=2, level="INFO", message="root message"),),
|
||||||
|
),
|
||||||
|
TraceLogView(
|
||||||
|
trace_id="parent-1",
|
||||||
|
parent_id="root-1",
|
||||||
|
child_ids=("trace-1",),
|
||||||
|
records=(_trace_record(row_id=3, level="WARNING", message="parent message"),),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
client = _build_client(trace_provider)
|
||||||
|
try:
|
||||||
|
response = client.get("/traces/trace-1?format=text&ancestor_depth=1")
|
||||||
|
finally:
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.text == (
|
||||||
|
"trace_id: trace-1\n"
|
||||||
|
"parent_id: parent-1\n"
|
||||||
|
"child_ids:\n"
|
||||||
|
"\n"
|
||||||
|
"==============================\n"
|
||||||
|
"trace_id: root-1\n"
|
||||||
|
"\n"
|
||||||
|
"step: process\n"
|
||||||
|
"root message\n"
|
||||||
|
"\n"
|
||||||
|
"\n"
|
||||||
|
"==============================\n"
|
||||||
|
"trace_id: parent-1\n"
|
||||||
|
"\n"
|
||||||
|
"step: process\n"
|
||||||
|
"parent message\n"
|
||||||
|
"\n"
|
||||||
|
"\n"
|
||||||
|
"==============================\n"
|
||||||
|
"trace_id: trace-1\n"
|
||||||
|
"\n"
|
||||||
|
"step: process\n"
|
||||||
|
"child message"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_trace_endpoint_preserves_ancestor_depth_in_html_links() -> None:
|
||||||
|
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||||
|
return TraceLogView(
|
||||||
|
trace_id="trace-1",
|
||||||
|
parent_id="parent-1",
|
||||||
|
child_ids=("child-1",),
|
||||||
|
records=(_trace_record(row_id=1, level="INFO", message="loaded prices"),),
|
||||||
|
ancestors=(
|
||||||
|
TraceLogView(
|
||||||
|
trace_id="root-1",
|
||||||
|
parent_id=None,
|
||||||
|
child_ids=("parent-1",),
|
||||||
|
records=(_trace_record(row_id=2, level="INFO", message="root info"),),
|
||||||
|
),
|
||||||
|
TraceLogView(
|
||||||
|
trace_id="parent-1",
|
||||||
|
parent_id="root-1",
|
||||||
|
child_ids=("trace-1",),
|
||||||
|
records=(_trace_record(row_id=3, level="WARNING", message="parent warning"),),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
client = _build_client(trace_provider)
|
||||||
|
try:
|
||||||
|
response = client.get("/traces/trace-1?format=html&attrs_json=true&ancestor_depth=all")
|
||||||
|
finally:
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert 'href="/traces/trace-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
||||||
|
assert 'href="/traces/root-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
||||||
|
assert 'href="/traces/parent-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
||||||
|
assert response.text.index("root info") < response.text.index("parent warning") < response.text.index("loaded prices")
|
||||||
|
assert "root info" in response.text
|
||||||
|
assert "parent warning" in response.text
|
||||||
|
|
||||||
|
|
||||||
def test_trace_endpoint_validates_query_params() -> None:
|
def test_trace_endpoint_validates_query_params() -> None:
|
||||||
client = _build_client(lambda _trace_id, _request: None)
|
client = _build_client(lambda _trace_id, _request: None)
|
||||||
try:
|
try:
|
||||||
invalid_level = client.get("/traces/trace-1?levels=error,fatal")
|
invalid_level = client.get("/traces/trace-1?levels=error,fatal")
|
||||||
invalid_format = client.get("/traces/trace-1?format=xml")
|
invalid_format = client.get("/traces/trace-1?format=xml")
|
||||||
|
invalid_ancestor_depth = client.get("/traces/trace-1?ancestor_depth=-1")
|
||||||
|
invalid_ancestor_type = client.get("/traces/trace-1?ancestor_depth=up")
|
||||||
finally:
|
finally:
|
||||||
client.close()
|
client.close()
|
||||||
|
|
||||||
@@ -262,6 +452,16 @@ def test_trace_endpoint_validates_query_params() -> None:
|
|||||||
assert invalid_level.json() == {"status": "error", "detail": "unsupported trace levels: FATAL"}
|
assert invalid_level.json() == {"status": "error", "detail": "unsupported trace levels: FATAL"}
|
||||||
assert invalid_format.status_code == 400
|
assert invalid_format.status_code == 400
|
||||||
assert invalid_format.json() == {"status": "error", "detail": "unsupported trace format: xml"}
|
assert invalid_format.json() == {"status": "error", "detail": "unsupported trace format: xml"}
|
||||||
|
assert invalid_ancestor_depth.status_code == 400
|
||||||
|
assert invalid_ancestor_depth.json() == {
|
||||||
|
"status": "error",
|
||||||
|
"detail": "query parameter must be >= 0: ancestor_depth=-1",
|
||||||
|
}
|
||||||
|
assert invalid_ancestor_type.status_code == 400
|
||||||
|
assert invalid_ancestor_type.json() == {
|
||||||
|
"status": "error",
|
||||||
|
"detail": "invalid ancestor depth query parameter: ancestor_depth=up",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
|
def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
|
||||||
@@ -273,15 +473,21 @@ def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
class StubReader:
|
class StubReader:
|
||||||
def read_trace(self, trace_id: str, levels: tuple[str, ...]) -> TraceLogView | None:
|
def read_trace(
|
||||||
|
self,
|
||||||
|
trace_id: str,
|
||||||
|
levels: tuple[str, ...],
|
||||||
|
ancestor_depth: int | None = 0,
|
||||||
|
) -> TraceLogView | None:
|
||||||
assert trace_id == "trace-1"
|
assert trace_id == "trace-1"
|
||||||
assert levels == ("ERROR",)
|
assert levels == ("ERROR",)
|
||||||
|
assert ancestor_depth is None
|
||||||
return expected
|
return expected
|
||||||
|
|
||||||
monkeypatch.setattr(runtime_module, "build_trace_log_reader", lambda _transport: StubReader())
|
monkeypatch.setattr(runtime_module, "build_trace_log_reader", lambda _transport: StubReader())
|
||||||
runtime = RuntimeManager()
|
runtime = RuntimeManager()
|
||||||
|
|
||||||
result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",))))
|
result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",), ancestor_depth=None)))
|
||||||
|
|
||||||
assert result == expected
|
assert result == expected
|
||||||
|
|
||||||
@@ -297,7 +503,11 @@ def test_mysql_trace_log_reader_maps_db_rows() -> None:
|
|||||||
self._current_query = query
|
self._current_query = query
|
||||||
|
|
||||||
def fetchone(self) -> dict[str, object] | None:
|
def fetchone(self) -> dict[str, object] | None:
|
||||||
|
if self.executed[-1][1] == ("trace-1",):
|
||||||
return {"parent_id": "root-77"}
|
return {"parent_id": "root-77"}
|
||||||
|
if self.executed[-1][1] == ("root-77",):
|
||||||
|
return {"parent_id": None}
|
||||||
|
return None
|
||||||
|
|
||||||
def fetchall(self) -> list[dict[str, object]]:
|
def fetchall(self) -> list[dict[str, object]]:
|
||||||
if "WHERE parent_id = %s" in self._current_query:
|
if "WHERE parent_id = %s" in self._current_query:
|
||||||
@@ -362,7 +572,167 @@ def test_mysql_trace_log_reader_maps_db_rows() -> None:
|
|||||||
attrs_json={"attempt": 1},
|
attrs_json={"attempt": 1},
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
ancestors=(),
|
||||||
)
|
)
|
||||||
assert len(factory.cursor.executed) == 3
|
assert len(factory.cursor.executed) == 3
|
||||||
assert factory.cursor.executed[1][1] == ("trace-1",)
|
assert factory.cursor.executed[1][1] == ("trace-1",)
|
||||||
assert factory.cursor.executed[2][1] == ("trace-1", "ERROR", "WARNING")
|
assert factory.cursor.executed[2][1] == ("trace-1", "ERROR", "WARNING")
|
||||||
|
|
||||||
|
|
||||||
|
def test_mysql_trace_log_reader_loads_requested_ancestors() -> None:
|
||||||
|
class FakeCursor:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.executed: list[tuple[str, tuple[object, ...]]] = []
|
||||||
|
self._current_query = ""
|
||||||
|
|
||||||
|
def execute(self, query: str, params: tuple[object, ...]) -> None:
|
||||||
|
self.executed.append((query, params))
|
||||||
|
self._current_query = query
|
||||||
|
|
||||||
|
def fetchone(self) -> dict[str, object] | None:
|
||||||
|
if self.executed[-1][1] == ("trace-1",):
|
||||||
|
return {"parent_id": "parent-1"}
|
||||||
|
if self.executed[-1][1] == ("parent-1",):
|
||||||
|
return {"parent_id": "root-1"}
|
||||||
|
if self.executed[-1][1] == ("root-1",):
|
||||||
|
return {"parent_id": None}
|
||||||
|
return None
|
||||||
|
|
||||||
|
def fetchall(self) -> list[dict[str, object]]:
|
||||||
|
if "WHERE parent_id = %s" in self._current_query:
|
||||||
|
parent_id = self.executed[-1][1][0]
|
||||||
|
if parent_id == "trace-1":
|
||||||
|
return []
|
||||||
|
if parent_id == "parent-1":
|
||||||
|
return [{"trace_id": "trace-1"}]
|
||||||
|
if parent_id == "root-1":
|
||||||
|
return [{"trace_id": "parent-1"}]
|
||||||
|
return []
|
||||||
|
trace_id = self.executed[-1][1][0]
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"id": 8 if trace_id == "trace-1" else 9,
|
||||||
|
"trace_id": trace_id,
|
||||||
|
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
||||||
|
"step": "parse",
|
||||||
|
"status": "failed",
|
||||||
|
"level": "ERROR",
|
||||||
|
"message": f"broken:{trace_id}",
|
||||||
|
"attrs_json": '{"attempt":1}',
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
def __enter__(self) -> FakeCursor:
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
class FakeConnection:
|
||||||
|
def __init__(self, cursor: FakeCursor) -> None:
|
||||||
|
self._cursor = cursor
|
||||||
|
|
||||||
|
def cursor(self) -> FakeCursor:
|
||||||
|
return self._cursor
|
||||||
|
|
||||||
|
def __enter__(self) -> FakeConnection:
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
class FakeConnectionFactory:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.cursor = FakeCursor()
|
||||||
|
|
||||||
|
def connect(self) -> FakeConnection:
|
||||||
|
return FakeConnection(self.cursor)
|
||||||
|
|
||||||
|
factory = FakeConnectionFactory()
|
||||||
|
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
|
||||||
|
|
||||||
|
view = reader.read_trace("trace-1", ("ERROR",), 1)
|
||||||
|
|
||||||
|
assert view is not None
|
||||||
|
assert view.trace_id == "trace-1"
|
||||||
|
assert view.parent_id == "parent-1"
|
||||||
|
assert len(view.ancestors) == 1
|
||||||
|
assert view.ancestors[0].trace_id == "parent-1"
|
||||||
|
assert view.ancestors[0].parent_id == "root-1"
|
||||||
|
assert view.ancestors[0].child_ids == ("trace-1",)
|
||||||
|
|
||||||
|
|
||||||
|
def test_mysql_trace_log_reader_orders_ancestors_root_first() -> None:
|
||||||
|
class FakeCursor:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.executed: list[tuple[str, tuple[object, ...]]] = []
|
||||||
|
self._current_query = ""
|
||||||
|
|
||||||
|
def execute(self, query: str, params: tuple[object, ...]) -> None:
|
||||||
|
self.executed.append((query, params))
|
||||||
|
self._current_query = query
|
||||||
|
|
||||||
|
def fetchone(self) -> dict[str, object] | None:
|
||||||
|
if self.executed[-1][1] == ("trace-1",):
|
||||||
|
return {"parent_id": "parent-1"}
|
||||||
|
if self.executed[-1][1] == ("parent-1",):
|
||||||
|
return {"parent_id": "root-1"}
|
||||||
|
if self.executed[-1][1] == ("root-1",):
|
||||||
|
return {"parent_id": None}
|
||||||
|
return None
|
||||||
|
|
||||||
|
def fetchall(self) -> list[dict[str, object]]:
|
||||||
|
if "WHERE parent_id = %s" in self._current_query:
|
||||||
|
parent_id = self.executed[-1][1][0]
|
||||||
|
if parent_id == "root-1":
|
||||||
|
return [{"trace_id": "parent-1"}]
|
||||||
|
if parent_id == "parent-1":
|
||||||
|
return [{"trace_id": "trace-1"}]
|
||||||
|
return []
|
||||||
|
trace_id = self.executed[-1][1][0]
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"id": 8,
|
||||||
|
"trace_id": trace_id,
|
||||||
|
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
||||||
|
"step": "parse",
|
||||||
|
"status": "failed",
|
||||||
|
"level": "ERROR",
|
||||||
|
"message": f"broken:{trace_id}",
|
||||||
|
"attrs_json": '{"attempt":1}',
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
def __enter__(self) -> FakeCursor:
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
class FakeConnection:
|
||||||
|
def __init__(self, cursor: FakeCursor) -> None:
|
||||||
|
self._cursor = cursor
|
||||||
|
|
||||||
|
def cursor(self) -> FakeCursor:
|
||||||
|
return self._cursor
|
||||||
|
|
||||||
|
def __enter__(self) -> FakeConnection:
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
class FakeConnectionFactory:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.cursor = FakeCursor()
|
||||||
|
|
||||||
|
def connect(self) -> FakeConnection:
|
||||||
|
return FakeConnection(self.cursor)
|
||||||
|
|
||||||
|
factory = FakeConnectionFactory()
|
||||||
|
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
|
||||||
|
|
||||||
|
view = reader.read_trace("trace-1", ("ERROR",), 2)
|
||||||
|
|
||||||
|
assert view is not None
|
||||||
|
assert tuple(ancestor.trace_id for ancestor in view.ancestors) == ("root-1", "parent-1")
|
||||||
|
|||||||
Reference in New Issue
Block a user