Compare commits

..

8 Commits

15 changed files with 690 additions and 71 deletions
+17 -2
View File
@@ -319,7 +319,7 @@ with traces.open_context(alias="email:123", kind="email") as message_trace_id:
- Consumer-воркер извлекает через `get(timeout)` и обрабатывает. - Consumer-воркер извлекает через `get(timeout)` и обрабатывает.
## 5. Application HTTP ## 5. Application HTTP
`PLBA` поддерживает отдельный прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. Он не смешивается с `control plane` и поднимается отдельным сервисом внутри того же runtime. `PLBA` поддерживает прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. По умолчанию он отделён от `control plane`, но при необходимости можно опубликовать control routes и business routes через один HTTP channel.
`Control Plane` и `Application HTTP` обслуживают разные контуры: `Control Plane` и `Application HTTP` обслуживают разные контуры:
- `Control Plane` используется для `/health`, `/actions/*`, `/traces/*`. - `Control Plane` используется для `/health`, `/actions/*`, `/traces/*`.
@@ -330,6 +330,7 @@ with traces.open_context(alias="email:123", kind="email") as message_trace_id:
- `HttpApplicationChannel` поднимает отдельный `FastAPI` app через `uvicorn`. - `HttpApplicationChannel` поднимает отдельный `FastAPI` app через `uvicorn`.
- `HttpRouteRegistrar` регистрирует пользовательские routes и получает `ServiceContainer`. - `HttpRouteRegistrar` регистрирует пользовательские routes и получает `ServiceContainer`.
- `HttpApplicationAppFactory` собирает `FastAPI(title="PLBA Application API")`, middleware и routes. - `HttpApplicationAppFactory` собирает `FastAPI(title="PLBA Application API")`, middleware и routes.
- `UnifiedHttpService` собирает один `FastAPI` app с control routes (`/health`, `/actions/*`, `/traces/*`) и application routes.
### Минимальный пример ### Минимальный пример
```python ```python
@@ -370,6 +371,20 @@ runtime.start()
- `GET /demo` - `GET /demo`
- `POST /demo/api/tasks` - `POST /demo/api/tasks`
### Единый HTTP API
Если бизнес-приложению нужен один опубликованный порт для control и application routes, передайте `UnifiedHttpService` как `application_http`:
```python
from plba import HttpApplicationChannel, RuntimeManager, UnifiedHttpService
runtime = RuntimeManager(application_http=UnifiedHttpService())
runtime.application_http.register_channel(
HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
)
```
Старый режим с отдельными `ControlPlaneService` и `ApplicationHttpService` остаётся доступен и работает как раньше.
### Типовые сценарии ### Типовые сценарии
- `Web UI для фоновых задач`: список задач, запуск, статус, cancel, download result. - `Web UI для фоновых задач`: список задач, запуск, статус, cancel, download result.
- `Внутренний REST API`: например `POST /jobs`, `GET /jobs/{id}`, `POST /jobs/{id}/cancel`. - `Внутренний REST API`: например `POST /jobs`, `GET /jobs/{id}`, `POST /jobs/{id}/cancel`.
@@ -377,7 +392,7 @@ runtime.start()
- `Webhook endpoint`: прием callback от внешней системы и передача события в worker pipeline. - `Webhook endpoint`: прием callback от внешней системы и передача события в worker pipeline.
### Ограничения и рекомендации ### Ограничения и рекомендации
- Не смешивайте business routes с `control plane`. - По умолчанию держите business routes и `control plane` раздельно; используйте `UnifiedHttpService`, когда один опубликованный порт является явным эксплуатационным требованием.
- Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер. - Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер.
- Runtime предоставляет доступ к зависимостям через `services`, поэтому handlers не должны зависеть от `RuntimeManager`. - Runtime предоставляет доступ к зависимостям через `services`, поэтому handlers не должны зависеть от `RuntimeManager`.
- В первой версии `Application HTTP` не решает auth, static files, шаблонизаторы, websocket и OpenAPI customization. - В первой версии `Application HTTP` не решает auth, static files, шаблонизаторы, websocket и OpenAPI customization.
+2 -2
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "plba" name = "plba"
version = "0.3.14" version = "0.4.0"
description = "Platform runtime for business applications" description = "Platform runtime for business applications"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
@@ -15,7 +15,7 @@ dependencies = [
"python-multipart>=0.0.9", "python-multipart>=0.0.9",
"uvicorn>=0.41.0", "uvicorn>=0.41.0",
] ]
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["src"] where = ["src"]
+7 -1
View File
@@ -92,8 +92,14 @@ class TraceLogView:
parent_id: str | None parent_id: str | None
child_ids: tuple[str, ...] = () child_ids: tuple[str, ...] = ()
records: tuple[TraceLogRecord, ...] = () records: tuple[TraceLogRecord, ...] = ()
ancestors: tuple[TraceLogView, ...] = ()
class TraceLogReader(Protocol): class TraceLogReader(Protocol):
def read_trace(self, trace_id: str, levels: tuple[TraceLevel, ...]) -> TraceLogView | None: def read_trace(
self,
trace_id: str,
levels: tuple[TraceLevel, ...],
ancestor_depth: int | None = 0,
) -> TraceLogView | None:
"""Load trace context and filtered log records.""" """Load trace context and filtered log records."""
@@ -0,0 +1,46 @@
from __future__ import annotations
import asyncio
from fastapi.responses import JSONResponse
from app_runtime.control.base import ControlActionRequest, ControlActionSet
class ControlActionResponder:
def __init__(self, actions: ControlActionSet, timeout: int) -> None:
self._actions = actions
self._timeout = timeout
async def respond(
self,
action: str,
_client_source: str = "unknown",
request: ControlActionRequest | None = None,
) -> JSONResponse:
callbacks = {
"start": self._actions.start,
"stop": self._actions.stop,
"status": self._actions.status,
}
callback = callbacks.get(action)
if callback is None:
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
action_request = request or ControlActionRequest()
action_timeout = self._action_timeout(action, action_request)
try:
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
except asyncio.TimeoutError:
return JSONResponse(
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
status_code=202,
)
except Exception as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
if action != "stop" or request.wait is False or request.timeout is None:
return base_timeout
return max(base_timeout, float(request.timeout) + 1.0)
+1
View File
@@ -27,6 +27,7 @@ class TraceQueryRequest:
levels: tuple[TraceLevel, ...] = ("ERROR", "WARNING", "INFO") levels: tuple[TraceLevel, ...] = ("ERROR", "WARNING", "INFO")
include_attrs_json: bool = False include_attrs_json: bool = False
response_format: TraceResponseFormat = "html" response_format: TraceResponseFormat = "html"
ancestor_depth: int | None = 0
TraceLookupHandler = Callable[[str, TraceQueryRequest], Awaitable[TraceLogView]] TraceLookupHandler = Callable[[str, TraceQueryRequest], Awaitable[TraceLogView]]
+11 -28
View File
@@ -4,6 +4,7 @@ import asyncio
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from app_runtime.control.action_responder import ControlActionResponder
from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel, TraceQueryRequest from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel, TraceQueryRequest
from app_runtime.contracts.trace import TraceLogView from app_runtime.contracts.trace import TraceLogView
from app_runtime.control.http_app import HttpControlAppFactory from app_runtime.control.http_app import HttpControlAppFactory
@@ -16,9 +17,11 @@ class HttpControlChannel(ControlChannel):
self._runner = UvicornThreadRunner(host, port, timeout) self._runner = UvicornThreadRunner(host, port, timeout)
self._factory = HttpControlAppFactory() self._factory = HttpControlAppFactory()
self._actions: ControlActionSet | None = None self._actions: ControlActionSet | None = None
self._action_responder: ControlActionResponder | None = None
async def start(self, actions: ControlActionSet) -> None: async def start(self, actions: ControlActionSet) -> None:
self._actions = actions self._actions = actions
self._action_responder = ControlActionResponder(actions, self._timeout)
app = self._factory.create(self._health_response, self._action_response, self._trace_response) app = self._factory.create(self._health_response, self._action_response, self._trace_response)
await self._runner.start(app) await self._runner.start(app)
@@ -40,34 +43,14 @@ class HttpControlChannel(ControlChannel):
_client_source: str = "unknown", _client_source: str = "unknown",
request: ControlActionRequest | None = None, request: ControlActionRequest | None = None,
) -> JSONResponse: ) -> JSONResponse:
if self._actions is None: if self._action_responder is None:
return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404) if self._actions is None:
callbacks = { return JSONResponse(
"start": self._actions.start, content={"status": "error", "detail": f"{action} handler is not configured"},
"stop": self._actions.stop, status_code=404,
"status": self._actions.status, )
} self._action_responder = ControlActionResponder(self._actions, self._timeout)
callback = callbacks.get(action) return await self._action_responder.respond(action, _client_source, request)
if callback is None:
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
action_request = request or ControlActionRequest()
action_timeout = self._action_timeout(action, action_request)
try:
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
except asyncio.TimeoutError:
return JSONResponse(
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
status_code=202,
)
except Exception as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
if action != "stop" or request.wait is False or request.timeout is None:
return base_timeout
return max(base_timeout, float(request.timeout) + 1.0)
async def _trace_response(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView: async def _trace_response(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView:
if self._actions is None or self._actions.trace_lookup is None: if self._actions is None or self._actions.trace_lookup is None:
+92 -27
View File
@@ -11,6 +11,9 @@ from app_runtime.control.base import TraceQueryRequest
from app_runtime.contracts.trace import TraceLevel, TraceLogRecord, TraceLogView from app_runtime.contracts.trace import TraceLevel, TraceLogRecord, TraceLogView
TRACE_SECTION_SEPARATOR = "=" * 30
class TraceRequestParser: class TraceRequestParser:
def parse(self, request: Request) -> TraceQueryRequest: def parse(self, request: Request) -> TraceQueryRequest:
raw_levels = request.query_params.get("levels") raw_levels = request.query_params.get("levels")
@@ -22,6 +25,7 @@ class TraceRequestParser:
levels=self._trace_levels(raw_levels), levels=self._trace_levels(raw_levels),
include_attrs_json=self._bool_param(request, "attrs_json") or False, include_attrs_json=self._bool_param(request, "attrs_json") or False,
response_format=response_format, response_format=response_format,
ancestor_depth=self._ancestor_depth(request),
) )
def _trace_levels(self, raw_levels: str | None) -> tuple[TraceLevel, ...]: def _trace_levels(self, raw_levels: str | None) -> tuple[TraceLevel, ...]:
@@ -47,6 +51,21 @@ class TraceRequestParser:
return False return False
raise ValueError(f"invalid boolean query parameter: {name}={raw_value}") raise ValueError(f"invalid boolean query parameter: {name}={raw_value}")
def _ancestor_depth(self, request: Request) -> int | None:
raw_value = request.query_params.get("ancestor_depth")
if raw_value is None:
return 0
normalized = raw_value.strip().lower()
if normalized == "all":
return None
try:
value = int(normalized)
except ValueError as exc:
raise ValueError(f"invalid ancestor depth query parameter: ancestor_depth={raw_value}") from exc
if value < 0:
raise ValueError(f"query parameter must be >= 0: ancestor_depth={raw_value}")
return value
class TraceResponseRenderer: class TraceResponseRenderer:
def render(self, trace_view: TraceLogView, request: TraceQueryRequest) -> Response: def render(self, trace_view: TraceLogView, request: TraceQueryRequest) -> Response:
@@ -63,26 +82,19 @@ class TraceResponseRenderer:
"parent_id": trace_view.parent_id or "", "parent_id": trace_view.parent_id or "",
"child_ids": list(trace_view.child_ids), "child_ids": list(trace_view.child_ids),
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records], "messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
"ancestors": [self._trace_payload(view, request) for view in trace_view.ancestors],
} }
) )
def _render_text(self, trace_view: TraceLogView, request: TraceQueryRequest) -> PlainTextResponse: def _render_text(self, trace_view: TraceLogView, request: TraceQueryRequest) -> PlainTextResponse:
lines = [ lineage = [*trace_view.ancestors, trace_view]
f"trace_id: {trace_view.trace_id}", lines = self._text_trace_summary_lines(trace_view)
f"parent_id: {trace_view.parent_id or ''}", for index, entry in enumerate(lineage):
*self._child_id_lines(trace_view.child_ids), if index == 0:
"--------------------------------------------------", lines.append("")
] else:
previous_step: str | None = None lines.extend(["", ""])
for record in trace_view.records: lines.extend(self._text_trace_log_lines(entry, request))
current_step = str(record.step or "")
if previous_step is None:
lines.append(f"step: {current_step}")
elif current_step != previous_step:
lines.append("--------------------------------------------------")
lines.append(f"step: {current_step}")
previous_step = current_step
lines.append(self._text_message(record, request.include_attrs_json))
return PlainTextResponse(content="\n".join(lines)) return PlainTextResponse(content="\n".join(lines))
def _render_html(self, trace_view: TraceLogView, request: TraceQueryRequest) -> HTMLResponse: def _render_html(self, trace_view: TraceLogView, request: TraceQueryRequest) -> HTMLResponse:
@@ -162,22 +174,75 @@ class TraceResponseRenderer:
return f"{record.message}, {json.dumps(record.attrs_json, ensure_ascii=False, separators=(',', ':'))}" return f"{record.message}, {json.dumps(record.attrs_json, ensure_ascii=False, separators=(',', ':'))}"
def _trace_href(self, trace_id: str, request: TraceQueryRequest) -> str: def _trace_href(self, trace_id: str, request: TraceQueryRequest) -> str:
params = urlencode( params = {
{ "format": "html",
"format": "html", "levels": ",".join(request.levels),
"levels": ",".join(request.levels), "attrs_json": "true" if request.include_attrs_json else "false",
"attrs_json": "true" if request.include_attrs_json else "false", }
} if request.ancestor_depth is None:
) params["ancestor_depth"] = "all"
return f"/traces/{trace_id}?{params}" elif request.ancestor_depth > 0:
params["ancestor_depth"] = str(request.ancestor_depth)
query = urlencode(params)
return f"/traces/{trace_id}?{query}"
def _html_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> str: def _html_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> str:
lineage = [*trace_view.ancestors, trace_view]
lines = self._html_trace_summary_lines(trace_view, request)
for index, entry in enumerate(lineage):
if index == 0:
lines.append(self._html_plain_line(""))
else:
lines.extend([self._html_plain_line(""), self._html_plain_line("")])
lines.extend(self._html_trace_log_lines(entry, request))
return "".join(lines)
def _trace_payload(self, trace_view: TraceLogView, request: TraceQueryRequest) -> dict[str, object]:
return {
"trace_id": trace_view.trace_id,
"parent_id": trace_view.parent_id or "",
"child_ids": list(trace_view.child_ids),
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
}
def _text_trace_summary_lines(self, trace_view: TraceLogView) -> list[str]:
return [
f"trace_id: {trace_view.trace_id}",
f"parent_id: {trace_view.parent_id or ''}",
*self._child_id_lines(trace_view.child_ids),
]
def _text_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
lines = [ lines = [
TRACE_SECTION_SEPARATOR,
f"trace_id: {trace_view.trace_id}",
"",
]
previous_step: str | None = None
for record in trace_view.records:
current_step = str(record.step or "")
if previous_step is None:
lines.append(f"step: {current_step}")
elif current_step != previous_step:
lines.append("------------------------------")
lines.append(f"step: {current_step}")
previous_step = current_step
lines.append(self._text_message(record, request.include_attrs_json))
return lines
def _html_trace_summary_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
return [
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"), self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
self._html_plain_line(f"parent_id: {self._optional_trace_link(trace_view.parent_id, request)}"), self._html_plain_line(f"parent_id: {self._optional_trace_link(trace_view.parent_id, request)}"),
self._html_plain_line("child_ids:"), self._html_plain_line("child_ids:"),
*(self._html_plain_line(f" - {self._trace_link(child_id, request)}") for child_id in trace_view.child_ids), *(self._html_plain_line(f" - {self._trace_link(child_id, request)}") for child_id in trace_view.child_ids),
self._html_plain_line("--------------------------------------------------"), ]
def _html_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
lines = [
self._html_plain_line(TRACE_SECTION_SEPARATOR),
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
self._html_plain_line(""),
] ]
previous_step: str | None = None previous_step: str | None = None
for record in trace_view.records: for record in trace_view.records:
@@ -187,12 +252,12 @@ class TraceResponseRenderer:
lines.append(self._html_plain_line("")) lines.append(self._html_plain_line(""))
elif current_step != previous_step: elif current_step != previous_step:
lines.append(self._html_plain_line("")) lines.append(self._html_plain_line(""))
lines.append(self._html_plain_line("--------------------------------------------------")) lines.append(self._html_plain_line("------------------------------"))
lines.append(self._html_step_line(current_step)) lines.append(self._html_step_line(current_step))
lines.append(self._html_plain_line("")) lines.append(self._html_plain_line(""))
previous_step = current_step previous_step = current_step
lines.extend(self._html_message_lines(record, request.include_attrs_json)) lines.extend(self._html_message_lines(record, request.include_attrs_json))
return "".join(lines) return lines
def _html_message_lines(self, record: TraceLogRecord, include_attrs_json: bool) -> list[str]: def _html_message_lines(self, record: TraceLogRecord, include_attrs_json: bool) -> list[str]:
lines = [self._html_colored_line(self._text_message(record, include_attrs_json), record.level)] lines = [self._html_colored_line(self._text_message(record, include_attrs_json), record.level)]
+1 -1
View File
@@ -141,7 +141,7 @@ class RuntimeManager:
reader = build_trace_log_reader(self.traces.transport) reader = build_trace_log_reader(self.traces.transport)
if reader is None: if reader is None:
raise RuntimeError("trace log reader is not configured") raise RuntimeError("trace log reader is not configured")
trace_view = reader.read_trace(trace_id, request.levels) trace_view = reader.read_trace(trace_id, request.levels, request.ancestor_depth)
if trace_view is None: if trace_view is None:
raise KeyError(trace_id) raise KeyError(trace_id)
return trace_view return trace_view
+2
View File
@@ -2,6 +2,7 @@ from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory from app_runtime.http.http_app import HttpApplicationAppFactory
from app_runtime.http.http_channel import HttpApplicationChannel from app_runtime.http.http_channel import HttpApplicationChannel
from app_runtime.http.service import ApplicationHttpService from app_runtime.http.service import ApplicationHttpService
from app_runtime.http.unified_service import UnifiedHttpService
__all__ = [ __all__ = [
"ApplicationHttpChannel", "ApplicationHttpChannel",
@@ -9,4 +10,5 @@ __all__ = [
"HttpApplicationAppFactory", "HttpApplicationAppFactory",
"HttpApplicationChannel", "HttpApplicationChannel",
"HttpRouteRegistrar", "HttpRouteRegistrar",
"UnifiedHttpService",
] ]
+68
View File
@@ -0,0 +1,68 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from app_runtime.control.action_responder import ControlActionResponder
from app_runtime.control.base import ControlActionSet
from app_runtime.control.http_app import HttpControlAppFactory
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
if TYPE_CHECKING:
from app_runtime.core.runtime import RuntimeManager
class UnifiedHttpService:
"""Publish control and application routes through the same HTTP channel."""
def __init__(
self,
app_factory: HttpApplicationAppFactory | None = None,
control_app_factory: HttpControlAppFactory | None = None,
control_timeout: int = 5,
) -> None:
self._channels: list[ApplicationHttpChannel] = []
self._registrars: list[HttpRouteRegistrar] = []
self._app_factory = app_factory or HttpApplicationAppFactory()
self._control_app_factory = control_app_factory or HttpControlAppFactory()
self._control_timeout = control_timeout
def register_channel(self, channel: ApplicationHttpChannel) -> None:
self._channels.append(channel)
def register_routes(self, registrar: HttpRouteRegistrar) -> None:
self._registrars.append(registrar)
def start(self, runtime: RuntimeManager) -> None:
if not self._channels:
return
asyncio.run(self._start_async(runtime))
def stop(self) -> None:
if not self._channels:
return
asyncio.run(self._stop_async())
async def _start_async(self, runtime: RuntimeManager) -> None:
app = self._app_factory.create(self._registrars, runtime.services)
actions = ControlActionSet(
health=runtime.health_status,
start=runtime.start_runtime,
stop=runtime.stop_runtime,
status=runtime.runtime_status,
trace_lookup=runtime.trace_logs,
)
action_responder = ControlActionResponder(actions, self._control_timeout)
control_app = self._control_app_factory.create(
actions.health,
action_responder.respond,
actions.trace_lookup,
)
app.router.routes.extend(control_app.router.routes)
for channel in self._channels:
await channel.start(app)
async def _stop_async(self) -> None:
for channel in reversed(self._channels):
await channel.stop()
+35 -1
View File
@@ -11,10 +11,16 @@ class MySqlTraceLogReader(TraceLogReader):
def __init__(self, connection_factory: MySqlTraceConnectionFactory) -> None: def __init__(self, connection_factory: MySqlTraceConnectionFactory) -> None:
self._connection_factory = connection_factory self._connection_factory = connection_factory
def read_trace(self, trace_id: str, levels: tuple[TraceLevel, ...]) -> TraceLogView | None: def read_trace(
self,
trace_id: str,
levels: tuple[TraceLevel, ...],
ancestor_depth: int | None = 0,
) -> TraceLogView | None:
parent_id = self._read_parent_id(trace_id) parent_id = self._read_parent_id(trace_id)
if parent_id is None and not self._trace_exists(trace_id): if parent_id is None and not self._trace_exists(trace_id):
return None return None
ancestors = self._read_ancestors(parent_id, levels, ancestor_depth)
child_ids = self._read_child_ids(trace_id) child_ids = self._read_child_ids(trace_id)
records = self._read_records(trace_id, levels) records = self._read_records(trace_id, levels)
return TraceLogView( return TraceLogView(
@@ -22,8 +28,36 @@ class MySqlTraceLogReader(TraceLogReader):
parent_id=parent_id, parent_id=parent_id,
child_ids=tuple(child_ids), child_ids=tuple(child_ids),
records=tuple(records), records=tuple(records),
ancestors=tuple(ancestors),
) )
def _read_ancestors(
self,
parent_id: str | None,
levels: tuple[TraceLevel, ...],
ancestor_depth: int | None,
) -> list[TraceLogView]:
if parent_id is None or ancestor_depth == 0:
return []
remaining_depth = ancestor_depth
ancestors: list[TraceLogView] = []
current_trace_id = parent_id
while current_trace_id is not None and (remaining_depth is None or remaining_depth > 0):
current_parent_id = self._read_parent_id(current_trace_id)
ancestors.append(
TraceLogView(
trace_id=current_trace_id,
parent_id=current_parent_id,
child_ids=tuple(self._read_child_ids(current_trace_id)),
records=tuple(self._read_records(current_trace_id, levels)),
)
)
current_trace_id = current_parent_id
if remaining_depth is not None:
remaining_depth -= 1
ancestors.reverse()
return ancestors
def _trace_exists(self, trace_id: str) -> bool: def _trace_exists(self, trace_id: str) -> bool:
query = "SELECT 1 FROM trace_contexts WHERE trace_id = %s" query = "SELECT 1 FROM trace_contexts WHERE trace_id = %s"
with self._connection_factory.connect() as connection: with self._connection_factory.connect() as connection:
+2
View File
@@ -21,6 +21,7 @@ from plba.http import (
HttpApplicationAppFactory, HttpApplicationAppFactory,
HttpApplicationChannel, HttpApplicationChannel,
HttpRouteRegistrar, HttpRouteRegistrar,
UnifiedHttpService,
) )
from plba.logging import LogManager from plba.logging import LogManager
from plba.queue import InMemoryTaskQueue from plba.queue import InMemoryTaskQueue
@@ -56,6 +57,7 @@ __all__ = [
"HttpApplicationChannel", "HttpApplicationChannel",
"HttpRouteRegistrar", "HttpRouteRegistrar",
"HttpControlChannel", "HttpControlChannel",
"UnifiedHttpService",
"InMemoryTaskQueue", "InMemoryTaskQueue",
"LogManager", "LogManager",
"MySqlTraceTransport", "MySqlTraceTransport",
+2
View File
@@ -2,6 +2,7 @@ from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory from app_runtime.http.http_app import HttpApplicationAppFactory
from app_runtime.http.http_channel import HttpApplicationChannel from app_runtime.http.http_channel import HttpApplicationChannel
from app_runtime.http.service import ApplicationHttpService from app_runtime.http.service import ApplicationHttpService
from app_runtime.http.unified_service import UnifiedHttpService
__all__ = [ __all__ = [
"ApplicationHttpChannel", "ApplicationHttpChannel",
@@ -9,4 +10,5 @@ __all__ = [
"HttpApplicationAppFactory", "HttpApplicationAppFactory",
"HttpApplicationChannel", "HttpApplicationChannel",
"HttpRouteRegistrar", "HttpRouteRegistrar",
"UnifiedHttpService",
] ]
+25
View File
@@ -15,6 +15,7 @@ from app_runtime.core.registration import ModuleRegistry
from app_runtime.core.runtime import RuntimeManager from app_runtime.core.runtime import RuntimeManager
from app_runtime.http.base import ApplicationHttpChannel from app_runtime.http.base import ApplicationHttpChannel
from app_runtime.http.http_channel import HttpApplicationChannel from app_runtime.http.http_channel import HttpApplicationChannel
from app_runtime.http.unified_service import UnifiedHttpService
try: try:
import python_multipart # noqa: F401 import python_multipart # noqa: F401
@@ -207,3 +208,27 @@ def test_control_plane_and_application_http_work_independently() -> None:
finally: finally:
runtime.control_plane.stop() runtime.control_plane.stop()
runtime.stop(stop_control_plane=False) runtime.stop(stop_control_plane=False)
def test_unified_http_serves_control_and_application_routes_on_one_app() -> None:
runtime = RuntimeManager(application_http=UnifiedHttpService())
channel = RecordingChannel()
runtime.application_http.register_channel(channel)
runtime.register_module(HttpModule(PingRoutes()))
runtime.start(start_control_plane=False)
try:
client = _application_client(channel)
with client:
health_response = client.get("/health")
action_response = client.get("/actions/status")
app_response = client.get("/estimate/ping")
assert health_response.status_code == 200
assert health_response.json()["status"] == "ok"
assert action_response.status_code == 200
assert action_response.json() == {"status": "ok", "detail": "idle"}
assert app_response.status_code == 200
assert app_response.json() == {"status": "ok"}
finally:
runtime.stop(stop_control_plane=False)
+379 -9
View File
@@ -72,7 +72,17 @@ def test_trace_endpoint_returns_html_by_default() -> None:
assert "trace_id:" in response.text assert "trace_id:" in response.text
assert "first error" in response.text assert "first error" in response.text
assert "second warning" in response.text assert "second warning" in response.text
assert captured == [("trace-1", TraceQueryRequest(levels=("ERROR", "WARNING", "INFO"), include_attrs_json=False, response_format="html"))] assert captured == [
(
"trace-1",
TraceQueryRequest(
levels=("ERROR", "WARNING", "INFO"),
include_attrs_json=False,
response_format="html",
ancestor_depth=0,
),
)
]
def test_trace_endpoint_returns_text_when_requested() -> None: def test_trace_endpoint_returns_text_when_requested() -> None:
@@ -100,7 +110,10 @@ def test_trace_endpoint_returns_text_when_requested() -> None:
"child_ids:\n" "child_ids:\n"
" - child-1\n" " - child-1\n"
" - child-2\n" " - child-2\n"
"--------------------------------------------------\n" "\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: process\n" "step: process\n"
"first error\n" "first error\n"
"second warning" "second warning"
@@ -129,7 +142,10 @@ def test_trace_endpoint_appends_attrs_json_in_text_mode() -> None:
"trace_id: trace-1\n" "trace_id: trace-1\n"
"parent_id: \n" "parent_id: \n"
"child_ids:\n" "child_ids:\n"
"--------------------------------------------------\n" "\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: process\n" "step: process\n"
'failure, {"attempt":2,"source":"crm"}' 'failure, {"attempt":2,"source":"crm"}'
) )
@@ -159,11 +175,14 @@ def test_trace_endpoint_separates_messages_by_step_in_text_mode() -> None:
"trace_id: trace-1\n" "trace_id: trace-1\n"
"parent_id: \n" "parent_id: \n"
"child_ids:\n" "child_ids:\n"
"--------------------------------------------------\n" "\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: load_stocks\n" "step: load_stocks\n"
"load first\n" "load first\n"
"load second\n" "load second\n"
"--------------------------------------------------\n" "------------------------------\n"
"step: filter_stocks\n" "step: filter_stocks\n"
"filter first" "filter first"
) )
@@ -203,9 +222,82 @@ def test_trace_endpoint_returns_json_payload() -> None:
"attrs_json": {"batch": 7}, "attrs_json": {"batch": 7},
} }
], ],
"ancestors": [],
} }
def test_trace_endpoint_returns_json_payload_with_ancestors() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=("child-1",),
records=(
_trace_record(row_id=3, level="INFO", message="done", attrs_json={"batch": 7}),
),
ancestors=(
TraceLogView(
trace_id="root-1",
parent_id=None,
child_ids=("parent-1",),
records=(
_trace_record(row_id=4, level="INFO", message="root info"),
),
),
TraceLogView(
trace_id="parent-1",
parent_id="root-1",
child_ids=("trace-1", "sibling-1"),
records=(
_trace_record(row_id=5, level="WARNING", message="parent warning"),
),
),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=json&ancestor_depth=1")
finally:
client.close()
assert response.status_code == 200
assert response.json()["ancestors"] == [
{
"trace_id": "root-1",
"parent_id": "",
"child_ids": ["parent-1"],
"messages": [
{
"id": 4,
"trace_id": "trace-1",
"event_time": "2026-04-28T10:11:12+00:00",
"step": "process",
"status": "failed",
"level": "INFO",
"message": "root info",
}
],
},
{
"trace_id": "parent-1",
"parent_id": "root-1",
"child_ids": ["trace-1", "sibling-1"],
"messages": [
{
"id": 5,
"trace_id": "trace-1",
"event_time": "2026-04-28T10:11:12+00:00",
"step": "process",
"status": "failed",
"level": "WARNING",
"message": "parent warning",
}
],
}
]
def test_trace_endpoint_returns_html_page_with_related_links() -> None: def test_trace_endpoint_returns_html_page_with_related_links() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView: async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView( return TraceLogView(
@@ -240,8 +332,9 @@ def test_trace_endpoint_returns_html_page_with_related_links() -> None:
assert '<div class="line">child_ids:</div>' in response.text assert '<div class="line">child_ids:</div>' in response.text
assert '<div class="line"> - <a href="/traces/child-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">child-1</a></div>' in response.text assert '<div class="line"> - <a href="/traces/child-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">child-1</a></div>' in response.text
assert '<div class="line"> - <a href="/traces/child-2?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">child-2</a></div>' in response.text assert '<div class="line"> - <a href="/traces/child-2?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">child-2</a></div>' in response.text
assert '<div class="line">==============================</div>' in response.text
assert '<div class="line" style="color: var(--step);">load_stocks</div>' in response.text assert '<div class="line" style="color: var(--step);">load_stocks</div>' in response.text
assert '<div class="line">--------------------------------------------------</div>' in response.text assert '<div class="line">trace_id: <a href="/traces/trace-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">trace-1</a></div>' in response.text
assert '<div class="line" style="color: var(--step);">filter_stocks</div>' in response.text assert '<div class="line" style="color: var(--step);">filter_stocks</div>' in response.text
assert "loaded prices" in response.text assert "loaded prices" in response.text
assert "filtered suspicious ticker" in response.text assert "filtered suspicious ticker" in response.text
@@ -250,11 +343,108 @@ def test_trace_endpoint_returns_html_page_with_related_links() -> None:
assert "Related Traces" not in response.text assert "Related Traces" not in response.text
def test_trace_endpoint_renders_ancestors_in_text_mode() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=(),
records=(_trace_record(row_id=1, level="INFO", message="child message"),),
ancestors=(
TraceLogView(
trace_id="root-1",
parent_id=None,
child_ids=("parent-1",),
records=(_trace_record(row_id=2, level="INFO", message="root message"),),
),
TraceLogView(
trace_id="parent-1",
parent_id="root-1",
child_ids=("trace-1",),
records=(_trace_record(row_id=3, level="WARNING", message="parent message"),),
),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=text&ancestor_depth=1")
finally:
client.close()
assert response.status_code == 200
assert response.text == (
"trace_id: trace-1\n"
"parent_id: parent-1\n"
"child_ids:\n"
"\n"
"==============================\n"
"trace_id: root-1\n"
"\n"
"step: process\n"
"root message\n"
"\n"
"\n"
"==============================\n"
"trace_id: parent-1\n"
"\n"
"step: process\n"
"parent message\n"
"\n"
"\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: process\n"
"child message"
)
def test_trace_endpoint_preserves_ancestor_depth_in_html_links() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=("child-1",),
records=(_trace_record(row_id=1, level="INFO", message="loaded prices"),),
ancestors=(
TraceLogView(
trace_id="root-1",
parent_id=None,
child_ids=("parent-1",),
records=(_trace_record(row_id=2, level="INFO", message="root info"),),
),
TraceLogView(
trace_id="parent-1",
parent_id="root-1",
child_ids=("trace-1",),
records=(_trace_record(row_id=3, level="WARNING", message="parent warning"),),
),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=html&attrs_json=true&ancestor_depth=all")
finally:
client.close()
assert response.status_code == 200
assert 'href="/traces/trace-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true&amp;ancestor_depth=all"' in response.text
assert 'href="/traces/root-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true&amp;ancestor_depth=all"' in response.text
assert 'href="/traces/parent-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true&amp;ancestor_depth=all"' in response.text
assert response.text.index("root info") < response.text.index("parent warning") < response.text.index("loaded prices")
assert "root info" in response.text
assert "parent warning" in response.text
def test_trace_endpoint_validates_query_params() -> None: def test_trace_endpoint_validates_query_params() -> None:
client = _build_client(lambda _trace_id, _request: None) client = _build_client(lambda _trace_id, _request: None)
try: try:
invalid_level = client.get("/traces/trace-1?levels=error,fatal") invalid_level = client.get("/traces/trace-1?levels=error,fatal")
invalid_format = client.get("/traces/trace-1?format=xml") invalid_format = client.get("/traces/trace-1?format=xml")
invalid_ancestor_depth = client.get("/traces/trace-1?ancestor_depth=-1")
invalid_ancestor_type = client.get("/traces/trace-1?ancestor_depth=up")
finally: finally:
client.close() client.close()
@@ -262,6 +452,16 @@ def test_trace_endpoint_validates_query_params() -> None:
assert invalid_level.json() == {"status": "error", "detail": "unsupported trace levels: FATAL"} assert invalid_level.json() == {"status": "error", "detail": "unsupported trace levels: FATAL"}
assert invalid_format.status_code == 400 assert invalid_format.status_code == 400
assert invalid_format.json() == {"status": "error", "detail": "unsupported trace format: xml"} assert invalid_format.json() == {"status": "error", "detail": "unsupported trace format: xml"}
assert invalid_ancestor_depth.status_code == 400
assert invalid_ancestor_depth.json() == {
"status": "error",
"detail": "query parameter must be >= 0: ancestor_depth=-1",
}
assert invalid_ancestor_type.status_code == 400
assert invalid_ancestor_type.json() == {
"status": "error",
"detail": "invalid ancestor depth query parameter: ancestor_depth=up",
}
def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None: def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
@@ -273,15 +473,21 @@ def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
) )
class StubReader: class StubReader:
def read_trace(self, trace_id: str, levels: tuple[str, ...]) -> TraceLogView | None: def read_trace(
self,
trace_id: str,
levels: tuple[str, ...],
ancestor_depth: int | None = 0,
) -> TraceLogView | None:
assert trace_id == "trace-1" assert trace_id == "trace-1"
assert levels == ("ERROR",) assert levels == ("ERROR",)
assert ancestor_depth is None
return expected return expected
monkeypatch.setattr(runtime_module, "build_trace_log_reader", lambda _transport: StubReader()) monkeypatch.setattr(runtime_module, "build_trace_log_reader", lambda _transport: StubReader())
runtime = RuntimeManager() runtime = RuntimeManager()
result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",)))) result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",), ancestor_depth=None)))
assert result == expected assert result == expected
@@ -297,7 +503,11 @@ def test_mysql_trace_log_reader_maps_db_rows() -> None:
self._current_query = query self._current_query = query
def fetchone(self) -> dict[str, object] | None: def fetchone(self) -> dict[str, object] | None:
return {"parent_id": "root-77"} if self.executed[-1][1] == ("trace-1",):
return {"parent_id": "root-77"}
if self.executed[-1][1] == ("root-77",):
return {"parent_id": None}
return None
def fetchall(self) -> list[dict[str, object]]: def fetchall(self) -> list[dict[str, object]]:
if "WHERE parent_id = %s" in self._current_query: if "WHERE parent_id = %s" in self._current_query:
@@ -362,7 +572,167 @@ def test_mysql_trace_log_reader_maps_db_rows() -> None:
attrs_json={"attempt": 1}, attrs_json={"attempt": 1},
), ),
), ),
ancestors=(),
) )
assert len(factory.cursor.executed) == 3 assert len(factory.cursor.executed) == 3
assert factory.cursor.executed[1][1] == ("trace-1",) assert factory.cursor.executed[1][1] == ("trace-1",)
assert factory.cursor.executed[2][1] == ("trace-1", "ERROR", "WARNING") assert factory.cursor.executed[2][1] == ("trace-1", "ERROR", "WARNING")
def test_mysql_trace_log_reader_loads_requested_ancestors() -> None:
class FakeCursor:
def __init__(self) -> None:
self.executed: list[tuple[str, tuple[object, ...]]] = []
self._current_query = ""
def execute(self, query: str, params: tuple[object, ...]) -> None:
self.executed.append((query, params))
self._current_query = query
def fetchone(self) -> dict[str, object] | None:
if self.executed[-1][1] == ("trace-1",):
return {"parent_id": "parent-1"}
if self.executed[-1][1] == ("parent-1",):
return {"parent_id": "root-1"}
if self.executed[-1][1] == ("root-1",):
return {"parent_id": None}
return None
def fetchall(self) -> list[dict[str, object]]:
if "WHERE parent_id = %s" in self._current_query:
parent_id = self.executed[-1][1][0]
if parent_id == "trace-1":
return []
if parent_id == "parent-1":
return [{"trace_id": "trace-1"}]
if parent_id == "root-1":
return [{"trace_id": "parent-1"}]
return []
trace_id = self.executed[-1][1][0]
return [
{
"id": 8 if trace_id == "trace-1" else 9,
"trace_id": trace_id,
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
"step": "parse",
"status": "failed",
"level": "ERROR",
"message": f"broken:{trace_id}",
"attrs_json": '{"attempt":1}',
}
]
def __enter__(self) -> FakeCursor:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnection:
def __init__(self, cursor: FakeCursor) -> None:
self._cursor = cursor
def cursor(self) -> FakeCursor:
return self._cursor
def __enter__(self) -> FakeConnection:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnectionFactory:
def __init__(self) -> None:
self.cursor = FakeCursor()
def connect(self) -> FakeConnection:
return FakeConnection(self.cursor)
factory = FakeConnectionFactory()
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
view = reader.read_trace("trace-1", ("ERROR",), 1)
assert view is not None
assert view.trace_id == "trace-1"
assert view.parent_id == "parent-1"
assert len(view.ancestors) == 1
assert view.ancestors[0].trace_id == "parent-1"
assert view.ancestors[0].parent_id == "root-1"
assert view.ancestors[0].child_ids == ("trace-1",)
def test_mysql_trace_log_reader_orders_ancestors_root_first() -> None:
class FakeCursor:
def __init__(self) -> None:
self.executed: list[tuple[str, tuple[object, ...]]] = []
self._current_query = ""
def execute(self, query: str, params: tuple[object, ...]) -> None:
self.executed.append((query, params))
self._current_query = query
def fetchone(self) -> dict[str, object] | None:
if self.executed[-1][1] == ("trace-1",):
return {"parent_id": "parent-1"}
if self.executed[-1][1] == ("parent-1",):
return {"parent_id": "root-1"}
if self.executed[-1][1] == ("root-1",):
return {"parent_id": None}
return None
def fetchall(self) -> list[dict[str, object]]:
if "WHERE parent_id = %s" in self._current_query:
parent_id = self.executed[-1][1][0]
if parent_id == "root-1":
return [{"trace_id": "parent-1"}]
if parent_id == "parent-1":
return [{"trace_id": "trace-1"}]
return []
trace_id = self.executed[-1][1][0]
return [
{
"id": 8,
"trace_id": trace_id,
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
"step": "parse",
"status": "failed",
"level": "ERROR",
"message": f"broken:{trace_id}",
"attrs_json": '{"attempt":1}',
}
]
def __enter__(self) -> FakeCursor:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnection:
def __init__(self, cursor: FakeCursor) -> None:
self._cursor = cursor
def cursor(self) -> FakeCursor:
return self._cursor
def __enter__(self) -> FakeConnection:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnectionFactory:
def __init__(self) -> None:
self.cursor = FakeCursor()
def connect(self) -> FakeConnection:
return FakeConnection(self.cursor)
factory = FakeConnectionFactory()
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
view = reader.read_trace("trace-1", ("ERROR",), 2)
assert view is not None
assert tuple(ancestor.trace_id for ancestor in view.ancestors) == ("root-1", "parent-1")