Compare commits
26 Commits
ed33f6e9cd
..
main
| Author | SHA1 | Date | |
|---|---|---|---|
| e6509ee0cd | |||
| e2b817f785 | |||
| 8cad1d00ec | |||
| ec3198dbf1 | |||
| aed12c9c4e | |||
| df50e7acbb | |||
| 8789fcc0d1 | |||
| b2915c3987 | |||
| 2e75e53b89 | |||
| 3184ff16ca | |||
| ef8732f079 | |||
| cd4d1b3169 | |||
| 62f08776eb | |||
| 90422a0c2a | |||
| 9eb7282437 | |||
| 238c65c9c2 | |||
| 72162dd050 | |||
| fa314bc1e5 | |||
| a144fd2912 | |||
| fc4aeebfca | |||
| b2f0716a3b | |||
| 72f2d54553 | |||
| d1ceef7872 | |||
| beee0e0e4b | |||
| 85fcaae31b | |||
| 2cedacfbe5 |
@@ -40,6 +40,7 @@ PLBA (`Platform Runtime for Business Applications`) - это runtime-слой д
|
||||
- `health` (`HealthRegistry`) - агрегирование здоровья воркеров и дополнительных компонентов.
|
||||
- `workflow` (`WorkflowEngine` и persistence-слой) - исполнение шагов бизнес-процесса с переходами и фиксацией состояния.
|
||||
- `control plane` (`ControlPlaneService`, `HttpControlChannel`) - внешние health/action endpoints.
|
||||
- `application HTTP` (`ApplicationHttpService`, `HttpApplicationChannel`) - пользовательские HTTP routes бизнес-приложения.
|
||||
- `queue` (`InMemoryTaskQueue`) - локальный in-memory буфер как утилита прикладного уровня.
|
||||
|
||||
## 3. Архитектура
|
||||
@@ -89,6 +90,7 @@ classDiagram
|
||||
class HealthRegistry
|
||||
class TraceService
|
||||
class ControlPlaneService
|
||||
class ApplicationHttpService
|
||||
class WorkflowRuntimeFactory
|
||||
class WorkflowEngine
|
||||
class WorkflowPersistence
|
||||
@@ -106,6 +108,7 @@ classDiagram
|
||||
RuntimeManager --> TraceService
|
||||
RuntimeManager --> WorkerSupervisor
|
||||
RuntimeManager --> ControlPlaneService
|
||||
RuntimeManager --> ApplicationHttpService
|
||||
|
||||
WorkerSupervisor --> Worker
|
||||
Worker --> Routine : invokes
|
||||
@@ -127,7 +130,8 @@ classDiagram
|
||||
- Реализация: контракт `app_runtime.contracts.application.ApplicationModule`, регистрация через `app_runtime.core.registration.ModuleRegistry`.
|
||||
- Как работает / API / вызовы / таблицы:
|
||||
- API: `name`, `register(registry)`.
|
||||
- Типичные вызовы: `registry.add_worker(worker)`, `registry.add_health_contributor(contributor)`.
|
||||
- Типичные вызовы: `registry.add_worker(worker)`, `registry.add_health_contributor(contributor)`.
|
||||
- Для HTTP-модулей: `registry.add_http_routes(registrar)`.
|
||||
- `RuntimeManager.register_module()` вызывает `module.register(...)` и добавляет имя модуля в снимок runtime.
|
||||
- В БД напрямую не пишет.
|
||||
- Типовая схема использования:
|
||||
@@ -314,7 +318,87 @@ with traces.open_context(alias="email:123", kind="email") as message_trace_id:
|
||||
- Producer в рутине кладет элементы через `put`.
|
||||
- Consumer-воркер извлекает через `get(timeout)` и обрабатывает.
|
||||
|
||||
## 5. MVP бизнес-приложения
|
||||
## 5. Application HTTP
|
||||
`PLBA` поддерживает прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. По умолчанию он отделён от `control plane`, но при необходимости можно опубликовать control routes и business routes через один HTTP channel.
|
||||
|
||||
`Control Plane` и `Application HTTP` обслуживают разные контуры:
|
||||
- `Control Plane` используется для `/health`, `/actions/*`, `/traces/*`.
|
||||
- `Application HTTP` используется для бизнес-маршрутов приложения, например `/estimate`, `/estimate/api/tasks`, `/api/orders`.
|
||||
|
||||
### Основные компоненты
|
||||
- `ApplicationHttpService` управляет lifecycle прикладного HTTP на уровне runtime.
|
||||
- `HttpApplicationChannel` поднимает отдельный `FastAPI` app через `uvicorn`.
|
||||
- `HttpRouteRegistrar` регистрирует пользовательские routes и получает `ServiceContainer`.
|
||||
- `HttpApplicationAppFactory` собирает `FastAPI(title="PLBA Application API")`, middleware и routes.
|
||||
- `UnifiedHttpService` собирает один `FastAPI` app с control routes (`/health`, `/actions/*`, `/traces/*`) и application routes.
|
||||
|
||||
### Минимальный пример
|
||||
```python
|
||||
from fastapi import FastAPI, File, UploadFile
|
||||
from fastapi.responses import HTMLResponse
|
||||
from plba import ApplicationModule, HttpApplicationChannel, create_runtime
|
||||
|
||||
|
||||
class DemoRoutes:
|
||||
def register(self, app: FastAPI, services) -> None:
|
||||
@app.get("/demo")
|
||||
async def demo_page():
|
||||
return HTMLResponse("<h1>Demo</h1>")
|
||||
|
||||
@app.post("/demo/api/tasks")
|
||||
async def create_task(file: UploadFile = File(...)):
|
||||
payload = await file.read()
|
||||
return {"filename": file.filename, "size": len(payload)}
|
||||
|
||||
|
||||
class DemoModule(ApplicationModule):
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "demo"
|
||||
|
||||
def register(self, registry) -> None:
|
||||
registry.add_http_routes(DemoRoutes())
|
||||
|
||||
|
||||
runtime = create_runtime(DemoModule(), config_path="config.yml")
|
||||
runtime.application_http.register_channel(
|
||||
HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
|
||||
)
|
||||
runtime.start()
|
||||
```
|
||||
|
||||
После старта runtime приложение будет обслуживать:
|
||||
- `GET /demo`
|
||||
- `POST /demo/api/tasks`
|
||||
|
||||
### Единый HTTP API
|
||||
Если бизнес-приложению нужен один опубликованный порт для control и application routes, передайте `UnifiedHttpService` как `application_http`:
|
||||
|
||||
```python
|
||||
from plba import HttpApplicationChannel, RuntimeManager, UnifiedHttpService
|
||||
|
||||
runtime = RuntimeManager(application_http=UnifiedHttpService())
|
||||
runtime.application_http.register_channel(
|
||||
HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
|
||||
)
|
||||
```
|
||||
|
||||
Старый режим с отдельными `ControlPlaneService` и `ApplicationHttpService` остаётся доступен и работает как раньше.
|
||||
|
||||
### Типовые сценарии
|
||||
- `Web UI для фоновых задач`: список задач, запуск, статус, cancel, download result.
|
||||
- `Внутренний REST API`: например `POST /jobs`, `GET /jobs/{id}`, `POST /jobs/{id}/cancel`.
|
||||
- `Файловый шлюз`: загрузка входного файла, асинхронная обработка через worker, скачивание результата.
|
||||
- `Webhook endpoint`: прием callback от внешней системы и передача события в worker pipeline.
|
||||
|
||||
### Ограничения и рекомендации
|
||||
- По умолчанию держите business routes и `control plane` раздельно; используйте `UnifiedHttpService`, когда один опубликованный порт является явным эксплуатационным требованием.
|
||||
- Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер.
|
||||
- Runtime предоставляет доступ к зависимостям через `services`, поэтому handlers не должны зависеть от `RuntimeManager`.
|
||||
- В первой версии `Application HTTP` не решает auth, static files, шаблонизаторы, websocket и OpenAPI customization.
|
||||
- Для публичного доступа выносите auth, TLS и reverse proxy на внешний ingress.
|
||||
|
||||
## 6. MVP бизнес-приложения
|
||||
Минимальная конфигурация запуска:
|
||||
1. Создать один `ApplicationModule`.
|
||||
2. В модуле собрать одну `Routine` и один `Worker` (1 worker -> 1 routine).
|
||||
@@ -374,4 +458,4 @@ runtime = create_runtime(DemoModule(), config_path="config.yml")
|
||||
runtime.start()
|
||||
```
|
||||
|
||||
Для production-сценария после MVP обычно добавляют `tracing`, `health contributors`, `workflow` и HTTP control plane, но базовый запуск не требует этих расширений.
|
||||
Для production-сценария после MVP обычно добавляют `tracing`, `health contributors`, `workflow`, HTTP control plane и при необходимости `application HTTP`, но базовый запуск не требует этих расширений.
|
||||
|
||||
+2
-1
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "plba"
|
||||
version = "0.2.8"
|
||||
version = "0.4.0"
|
||||
description = "Platform runtime for business applications"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
@@ -12,6 +12,7 @@ dependencies = [
|
||||
"fastapi>=0.129.0",
|
||||
"PyMySQL>=1.1",
|
||||
"PyYAML>=6.0.3",
|
||||
"python-multipart>=0.0.9",
|
||||
"uvicorn>=0.41.0",
|
||||
]
|
||||
|
||||
|
||||
@@ -58,3 +58,48 @@ class TraceTransport(Protocol):
|
||||
|
||||
def write_message(self, record: TraceLogMessage) -> None:
|
||||
"""Persist trace log message."""
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TraceLogRecord:
|
||||
id: int
|
||||
trace_id: str
|
||||
event_time: datetime
|
||||
step: str
|
||||
status: str
|
||||
level: TraceLevel
|
||||
message: str
|
||||
attrs_json: Any
|
||||
|
||||
def as_dict(self, *, include_attrs_json: bool) -> dict[str, Any]:
|
||||
payload: dict[str, Any] = {
|
||||
"id": self.id,
|
||||
"trace_id": self.trace_id,
|
||||
"event_time": self.event_time.isoformat(),
|
||||
"step": self.step,
|
||||
"status": self.status,
|
||||
"level": self.level,
|
||||
"message": self.message,
|
||||
}
|
||||
if include_attrs_json:
|
||||
payload["attrs_json"] = self.attrs_json
|
||||
return payload
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TraceLogView:
|
||||
trace_id: str
|
||||
parent_id: str | None
|
||||
child_ids: tuple[str, ...] = ()
|
||||
records: tuple[TraceLogRecord, ...] = ()
|
||||
ancestors: tuple[TraceLogView, ...] = ()
|
||||
|
||||
|
||||
class TraceLogReader(Protocol):
|
||||
def read_trace(
|
||||
self,
|
||||
trace_id: str,
|
||||
levels: tuple[TraceLevel, ...],
|
||||
ancestor_depth: int | None = 0,
|
||||
) -> TraceLogView | None:
|
||||
"""Load trace context and filtered log records."""
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from app_runtime.control.base import ControlActionRequest, ControlActionSet
|
||||
|
||||
|
||||
class ControlActionResponder:
|
||||
def __init__(self, actions: ControlActionSet, timeout: int) -> None:
|
||||
self._actions = actions
|
||||
self._timeout = timeout
|
||||
|
||||
async def respond(
|
||||
self,
|
||||
action: str,
|
||||
_client_source: str = "unknown",
|
||||
request: ControlActionRequest | None = None,
|
||||
) -> JSONResponse:
|
||||
callbacks = {
|
||||
"start": self._actions.start,
|
||||
"stop": self._actions.stop,
|
||||
"status": self._actions.status,
|
||||
}
|
||||
callback = callbacks.get(action)
|
||||
if callback is None:
|
||||
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
|
||||
action_request = request or ControlActionRequest()
|
||||
action_timeout = self._action_timeout(action, action_request)
|
||||
try:
|
||||
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
|
||||
except asyncio.TimeoutError:
|
||||
return JSONResponse(
|
||||
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
|
||||
status_code=202,
|
||||
)
|
||||
except Exception as exc:
|
||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
||||
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
|
||||
|
||||
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
|
||||
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
|
||||
if action != "stop" or request.wait is False or request.timeout is None:
|
||||
return base_timeout
|
||||
return max(base_timeout, float(request.timeout) + 1.0)
|
||||
@@ -3,7 +3,9 @@ from __future__ import annotations
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import Awaitable, Callable
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal
|
||||
|
||||
from app_runtime.contracts.trace import TraceLevel, TraceLogView
|
||||
from app_runtime.core.types import HealthPayload
|
||||
|
||||
|
||||
@@ -17,6 +19,18 @@ class ControlActionRequest:
|
||||
ActionResult = str | dict[str, object]
|
||||
ActionHandler = Callable[[ControlActionRequest], Awaitable[ActionResult]]
|
||||
HealthHandler = Callable[[], Awaitable[HealthPayload]]
|
||||
TraceResponseFormat = Literal["json", "text", "html"]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class TraceQueryRequest:
|
||||
levels: tuple[TraceLevel, ...] = ("ERROR", "WARNING", "INFO")
|
||||
include_attrs_json: bool = False
|
||||
response_format: TraceResponseFormat = "html"
|
||||
ancestor_depth: int | None = 0
|
||||
|
||||
|
||||
TraceLookupHandler = Callable[[str, TraceQueryRequest], Awaitable[TraceLogView]]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
@@ -25,6 +39,7 @@ class ControlActionSet:
|
||||
start: ActionHandler
|
||||
stop: ActionHandler
|
||||
status: ActionHandler
|
||||
trace_lookup: TraceLookupHandler | None = None
|
||||
|
||||
|
||||
class ControlChannel(ABC):
|
||||
|
||||
@@ -7,17 +7,29 @@ from collections.abc import Awaitable, Callable
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from app_runtime.control.base import ControlActionRequest
|
||||
from app_runtime.control.base import ControlActionRequest, TraceQueryRequest
|
||||
from app_runtime.control.trace_presenter import TraceRequestParser, TraceResponseRenderer
|
||||
from app_runtime.contracts.trace import TraceLogView
|
||||
from app_runtime.core.types import HealthPayload
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HttpControlAppFactory:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
trace_request_parser: TraceRequestParser | None = None,
|
||||
trace_response_renderer: TraceResponseRenderer | None = None,
|
||||
) -> None:
|
||||
self._trace_request_parser = trace_request_parser or TraceRequestParser()
|
||||
self._trace_response_renderer = trace_response_renderer or TraceResponseRenderer()
|
||||
|
||||
def create(
|
||||
self,
|
||||
health_provider: Callable[[], Awaitable[HealthPayload]],
|
||||
action_provider: Callable[[str, str, ControlActionRequest], Awaitable[JSONResponse]],
|
||||
trace_provider: Callable[[str, TraceQueryRequest], Awaitable[TraceLogView]] | None = None,
|
||||
) -> FastAPI:
|
||||
app = FastAPI(title="PLBA Control API")
|
||||
|
||||
@@ -46,6 +58,22 @@ class HttpControlAppFactory:
|
||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=400)
|
||||
return await action_provider(action, client_source, action_request)
|
||||
|
||||
@app.get("/traces/{traceid}")
|
||||
async def trace(traceid: str, request: Request):
|
||||
if trace_provider is None:
|
||||
return JSONResponse(content={"status": "error", "detail": "trace lookup is not configured"}, status_code=503)
|
||||
try:
|
||||
trace_request = self._trace_request_parser.parse(request)
|
||||
except ValueError as exc:
|
||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=400)
|
||||
try:
|
||||
payload = await trace_provider(traceid, trace_request)
|
||||
except KeyError:
|
||||
return JSONResponse(content={"status": "error", "detail": f"trace not found: {traceid}"}, status_code=404)
|
||||
except RuntimeError as exc:
|
||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=503)
|
||||
return self._trace_response_renderer.render(payload, trace_request)
|
||||
|
||||
return app
|
||||
|
||||
def _action_request(self, request: Request) -> ControlActionRequest:
|
||||
|
||||
@@ -4,7 +4,9 @@ import asyncio
|
||||
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel
|
||||
from app_runtime.control.action_responder import ControlActionResponder
|
||||
from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel, TraceQueryRequest
|
||||
from app_runtime.contracts.trace import TraceLogView
|
||||
from app_runtime.control.http_app import HttpControlAppFactory
|
||||
from app_runtime.control.http_runner import UvicornThreadRunner
|
||||
|
||||
@@ -15,10 +17,12 @@ class HttpControlChannel(ControlChannel):
|
||||
self._runner = UvicornThreadRunner(host, port, timeout)
|
||||
self._factory = HttpControlAppFactory()
|
||||
self._actions: ControlActionSet | None = None
|
||||
self._action_responder: ControlActionResponder | None = None
|
||||
|
||||
async def start(self, actions: ControlActionSet) -> None:
|
||||
self._actions = actions
|
||||
app = self._factory.create(self._health_response, self._action_response)
|
||||
self._action_responder = ControlActionResponder(actions, self._timeout)
|
||||
app = self._factory.create(self._health_response, self._action_response, self._trace_response)
|
||||
await self._runner.start(app)
|
||||
|
||||
async def stop(self) -> None:
|
||||
@@ -39,31 +43,19 @@ class HttpControlChannel(ControlChannel):
|
||||
_client_source: str = "unknown",
|
||||
request: ControlActionRequest | None = None,
|
||||
) -> JSONResponse:
|
||||
if self._actions is None:
|
||||
return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404)
|
||||
callbacks = {
|
||||
"start": self._actions.start,
|
||||
"stop": self._actions.stop,
|
||||
"status": self._actions.status,
|
||||
}
|
||||
callback = callbacks.get(action)
|
||||
if callback is None:
|
||||
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
|
||||
action_request = request or ControlActionRequest()
|
||||
action_timeout = self._action_timeout(action, action_request)
|
||||
try:
|
||||
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
|
||||
except asyncio.TimeoutError:
|
||||
return JSONResponse(
|
||||
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
|
||||
status_code=202,
|
||||
)
|
||||
except Exception as exc:
|
||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
||||
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
|
||||
if self._action_responder is None:
|
||||
if self._actions is None:
|
||||
return JSONResponse(
|
||||
content={"status": "error", "detail": f"{action} handler is not configured"},
|
||||
status_code=404,
|
||||
)
|
||||
self._action_responder = ControlActionResponder(self._actions, self._timeout)
|
||||
return await self._action_responder.respond(action, _client_source, request)
|
||||
|
||||
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
|
||||
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
|
||||
if action != "stop" or request.wait is False or request.timeout is None:
|
||||
return base_timeout
|
||||
return max(base_timeout, float(request.timeout) + 1.0)
|
||||
async def _trace_response(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView:
|
||||
if self._actions is None or self._actions.trace_lookup is None:
|
||||
raise RuntimeError("trace lookup is not configured")
|
||||
return await asyncio.wait_for(
|
||||
self._actions.trace_lookup(trace_id, request),
|
||||
timeout=float(self._timeout),
|
||||
)
|
||||
|
||||
@@ -8,10 +8,11 @@ from uvicorn import Config, Server
|
||||
|
||||
|
||||
class UvicornThreadRunner:
|
||||
def __init__(self, host: str, port: int, timeout: int) -> None:
|
||||
def __init__(self, host: str, port: int, timeout: int, *, thread_name: str = "plba-http-control") -> None:
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._timeout = timeout
|
||||
self._thread_name = thread_name
|
||||
self._server: Server | None = None
|
||||
self._thread: Thread | None = None
|
||||
self._error: BaseException | None = None
|
||||
@@ -22,7 +23,7 @@ class UvicornThreadRunner:
|
||||
self._error = None
|
||||
config = Config(app=app, host=self._host, port=self._port, log_level="warning")
|
||||
self._server = Server(config)
|
||||
self._thread = Thread(target=self._serve, name="plba-http-control", daemon=True)
|
||||
self._thread = Thread(target=self._serve, name=self._thread_name, daemon=True)
|
||||
self._thread.start()
|
||||
await self._wait_until_started()
|
||||
|
||||
|
||||
@@ -43,6 +43,7 @@ class ControlPlaneService:
|
||||
start=runtime.start_runtime,
|
||||
stop=runtime.stop_runtime,
|
||||
status=runtime.runtime_status,
|
||||
trace_lookup=runtime.trace_logs,
|
||||
)
|
||||
for channel in self._channels:
|
||||
await channel.start(actions)
|
||||
|
||||
@@ -0,0 +1,293 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from html import escape
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from fastapi import Request
|
||||
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, Response
|
||||
|
||||
from app_runtime.control.base import TraceQueryRequest
|
||||
from app_runtime.contracts.trace import TraceLevel, TraceLogRecord, TraceLogView
|
||||
|
||||
|
||||
TRACE_SECTION_SEPARATOR = "=" * 30
|
||||
|
||||
|
||||
class TraceRequestParser:
|
||||
def parse(self, request: Request) -> TraceQueryRequest:
|
||||
raw_levels = request.query_params.get("levels")
|
||||
raw_format = request.query_params.get("format", "html")
|
||||
response_format = raw_format.strip().lower()
|
||||
if response_format not in {"json", "text", "html"}:
|
||||
raise ValueError(f"unsupported trace format: {raw_format}")
|
||||
return TraceQueryRequest(
|
||||
levels=self._trace_levels(raw_levels),
|
||||
include_attrs_json=self._bool_param(request, "attrs_json") or False,
|
||||
response_format=response_format,
|
||||
ancestor_depth=self._ancestor_depth(request),
|
||||
)
|
||||
|
||||
def _trace_levels(self, raw_levels: str | None) -> tuple[TraceLevel, ...]:
|
||||
if raw_levels is None:
|
||||
return ("ERROR", "WARNING", "INFO")
|
||||
parts = [item.strip().upper() for item in raw_levels.split(",")]
|
||||
levels = tuple(item for item in parts if item)
|
||||
if not levels:
|
||||
raise ValueError("trace levels must not be empty")
|
||||
unsupported = [level for level in levels if level not in {"DEBUG", "INFO", "WARNING", "ERROR"}]
|
||||
if unsupported:
|
||||
raise ValueError(f"unsupported trace levels: {', '.join(unsupported)}")
|
||||
return levels
|
||||
|
||||
def _bool_param(self, request: Request, name: str) -> bool | None:
|
||||
raw_value = request.query_params.get(name)
|
||||
if raw_value is None:
|
||||
return None
|
||||
normalized = raw_value.strip().lower()
|
||||
if normalized in {"1", "true", "yes", "on"}:
|
||||
return True
|
||||
if normalized in {"0", "false", "no", "off"}:
|
||||
return False
|
||||
raise ValueError(f"invalid boolean query parameter: {name}={raw_value}")
|
||||
|
||||
def _ancestor_depth(self, request: Request) -> int | None:
|
||||
raw_value = request.query_params.get("ancestor_depth")
|
||||
if raw_value is None:
|
||||
return 0
|
||||
normalized = raw_value.strip().lower()
|
||||
if normalized == "all":
|
||||
return None
|
||||
try:
|
||||
value = int(normalized)
|
||||
except ValueError as exc:
|
||||
raise ValueError(f"invalid ancestor depth query parameter: ancestor_depth={raw_value}") from exc
|
||||
if value < 0:
|
||||
raise ValueError(f"query parameter must be >= 0: ancestor_depth={raw_value}")
|
||||
return value
|
||||
|
||||
|
||||
class TraceResponseRenderer:
|
||||
def render(self, trace_view: TraceLogView, request: TraceQueryRequest) -> Response:
|
||||
if request.response_format == "json":
|
||||
return self._render_json(trace_view, request)
|
||||
if request.response_format == "html":
|
||||
return self._render_html(trace_view, request)
|
||||
return self._render_text(trace_view, request)
|
||||
|
||||
def _render_json(self, trace_view: TraceLogView, request: TraceQueryRequest) -> JSONResponse:
|
||||
return JSONResponse(
|
||||
content={
|
||||
"trace_id": trace_view.trace_id,
|
||||
"parent_id": trace_view.parent_id or "",
|
||||
"child_ids": list(trace_view.child_ids),
|
||||
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
|
||||
"ancestors": [self._trace_payload(view, request) for view in trace_view.ancestors],
|
||||
}
|
||||
)
|
||||
|
||||
def _render_text(self, trace_view: TraceLogView, request: TraceQueryRequest) -> PlainTextResponse:
|
||||
lineage = [*trace_view.ancestors, trace_view]
|
||||
lines = self._text_trace_summary_lines(trace_view)
|
||||
for index, entry in enumerate(lineage):
|
||||
if index == 0:
|
||||
lines.append("")
|
||||
else:
|
||||
lines.extend(["", ""])
|
||||
lines.extend(self._text_trace_log_lines(entry, request))
|
||||
return PlainTextResponse(content="\n".join(lines))
|
||||
|
||||
def _render_html(self, trace_view: TraceLogView, request: TraceQueryRequest) -> HTMLResponse:
|
||||
title = escape(f"Trace {trace_view.trace_id}")
|
||||
lines = self._html_lines(trace_view, request)
|
||||
html = f"""<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="utf-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||||
<title>{title}</title>
|
||||
<style>
|
||||
:root {{
|
||||
color-scheme: dark;
|
||||
--bg: #000000;
|
||||
--fg: #ececec;
|
||||
--step: #ffffff;
|
||||
--link: #66d9ef;
|
||||
--error: #ff817d;
|
||||
--warning: #e9ebec;
|
||||
--info: #d6d7d9;
|
||||
--other: #ececec;
|
||||
}}
|
||||
body {{
|
||||
margin: 0;
|
||||
background: var(--bg);
|
||||
color: var(--fg);
|
||||
font: 13px/1.1 "SFMono-Regular", monospace;
|
||||
}}
|
||||
.page {{
|
||||
padding: 14px 16px 24px;
|
||||
}}
|
||||
a {{
|
||||
color: var(--link);
|
||||
text-decoration: underline;
|
||||
text-underline-offset: 2px;
|
||||
}}
|
||||
.line {{
|
||||
white-space: pre-wrap;
|
||||
word-break: break-word;
|
||||
}}
|
||||
.msg-error {{
|
||||
color: var(--error);
|
||||
}}
|
||||
.msg-warning {{
|
||||
color: var(--warning);
|
||||
}}
|
||||
.msg-info {{
|
||||
color: var(--info);
|
||||
}}
|
||||
.msg-debug {{
|
||||
color: var(--other);
|
||||
}}
|
||||
@media (max-width: 640px) {{
|
||||
.page {{
|
||||
padding: 12px 12px 20px;
|
||||
}}
|
||||
}}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="page">
|
||||
{lines}
|
||||
</div>
|
||||
</body>
|
||||
</html>"""
|
||||
return HTMLResponse(content=html)
|
||||
|
||||
def _child_id_lines(self, child_ids: tuple[str, ...]) -> list[str]:
|
||||
lines = ["child_ids:"]
|
||||
lines.extend(f" - {child_id}" for child_id in child_ids)
|
||||
return lines
|
||||
|
||||
def _text_message(self, record: TraceLogRecord, include_attrs_json: bool) -> str:
|
||||
if not include_attrs_json:
|
||||
return record.message
|
||||
return f"{record.message}, {json.dumps(record.attrs_json, ensure_ascii=False, separators=(',', ':'))}"
|
||||
|
||||
def _trace_href(self, trace_id: str, request: TraceQueryRequest) -> str:
|
||||
params = {
|
||||
"format": "html",
|
||||
"levels": ",".join(request.levels),
|
||||
"attrs_json": "true" if request.include_attrs_json else "false",
|
||||
}
|
||||
if request.ancestor_depth is None:
|
||||
params["ancestor_depth"] = "all"
|
||||
elif request.ancestor_depth > 0:
|
||||
params["ancestor_depth"] = str(request.ancestor_depth)
|
||||
query = urlencode(params)
|
||||
return f"/traces/{trace_id}?{query}"
|
||||
|
||||
def _html_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> str:
|
||||
lineage = [*trace_view.ancestors, trace_view]
|
||||
lines = self._html_trace_summary_lines(trace_view, request)
|
||||
for index, entry in enumerate(lineage):
|
||||
if index == 0:
|
||||
lines.append(self._html_plain_line(""))
|
||||
else:
|
||||
lines.extend([self._html_plain_line(""), self._html_plain_line("")])
|
||||
lines.extend(self._html_trace_log_lines(entry, request))
|
||||
return "".join(lines)
|
||||
|
||||
def _trace_payload(self, trace_view: TraceLogView, request: TraceQueryRequest) -> dict[str, object]:
|
||||
return {
|
||||
"trace_id": trace_view.trace_id,
|
||||
"parent_id": trace_view.parent_id or "",
|
||||
"child_ids": list(trace_view.child_ids),
|
||||
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
|
||||
}
|
||||
|
||||
def _text_trace_summary_lines(self, trace_view: TraceLogView) -> list[str]:
|
||||
return [
|
||||
f"trace_id: {trace_view.trace_id}",
|
||||
f"parent_id: {trace_view.parent_id or ''}",
|
||||
*self._child_id_lines(trace_view.child_ids),
|
||||
]
|
||||
|
||||
def _text_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
|
||||
lines = [
|
||||
TRACE_SECTION_SEPARATOR,
|
||||
f"trace_id: {trace_view.trace_id}",
|
||||
"",
|
||||
]
|
||||
previous_step: str | None = None
|
||||
for record in trace_view.records:
|
||||
current_step = str(record.step or "")
|
||||
if previous_step is None:
|
||||
lines.append(f"step: {current_step}")
|
||||
elif current_step != previous_step:
|
||||
lines.append("------------------------------")
|
||||
lines.append(f"step: {current_step}")
|
||||
previous_step = current_step
|
||||
lines.append(self._text_message(record, request.include_attrs_json))
|
||||
return lines
|
||||
|
||||
def _html_trace_summary_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
|
||||
return [
|
||||
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
|
||||
self._html_plain_line(f"parent_id: {self._optional_trace_link(trace_view.parent_id, request)}"),
|
||||
self._html_plain_line("child_ids:"),
|
||||
*(self._html_plain_line(f" - {self._trace_link(child_id, request)}") for child_id in trace_view.child_ids),
|
||||
]
|
||||
|
||||
def _html_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
|
||||
lines = [
|
||||
self._html_plain_line(TRACE_SECTION_SEPARATOR),
|
||||
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
|
||||
self._html_plain_line(""),
|
||||
]
|
||||
previous_step: str | None = None
|
||||
for record in trace_view.records:
|
||||
current_step = str(record.step or "")
|
||||
if previous_step is None:
|
||||
lines.append(self._html_step_line(current_step))
|
||||
lines.append(self._html_plain_line(""))
|
||||
elif current_step != previous_step:
|
||||
lines.append(self._html_plain_line(""))
|
||||
lines.append(self._html_plain_line("------------------------------"))
|
||||
lines.append(self._html_step_line(current_step))
|
||||
lines.append(self._html_plain_line(""))
|
||||
previous_step = current_step
|
||||
lines.extend(self._html_message_lines(record, request.include_attrs_json))
|
||||
return lines
|
||||
|
||||
def _html_message_lines(self, record: TraceLogRecord, include_attrs_json: bool) -> list[str]:
|
||||
lines = [self._html_colored_line(self._text_message(record, include_attrs_json), record.level)]
|
||||
return lines
|
||||
|
||||
def _html_plain_line(self, content: str) -> str:
|
||||
return f"<div class=\"line\">{content or ' '}</div>"
|
||||
|
||||
def _html_step_line(self, content: str) -> str:
|
||||
return f"<div class=\"line\" style=\"color: var(--step);\">{escape(content) or ' '}</div>"
|
||||
|
||||
def _html_colored_line(self, content: str, level: str) -> str:
|
||||
level_class = self._level_class(level)
|
||||
return f"<div class=\"line {level_class}\">{escape(content)}</div>"
|
||||
|
||||
def _trace_link(self, trace_id: str, request: TraceQueryRequest) -> str:
|
||||
href = escape(self._trace_href(trace_id, request), quote=True)
|
||||
text = escape(trace_id)
|
||||
return f"<a href=\"{href}\">{text}</a>"
|
||||
|
||||
def _optional_trace_link(self, trace_id: str | None, request: TraceQueryRequest) -> str:
|
||||
if not trace_id:
|
||||
return ""
|
||||
return self._trace_link(trace_id, request)
|
||||
|
||||
def _level_class(self, level: str) -> str:
|
||||
if level == "ERROR":
|
||||
return "msg-error"
|
||||
if level == "WARNING":
|
||||
return "msg-warning"
|
||||
if level == "INFO":
|
||||
return "msg-info"
|
||||
return "msg-debug"
|
||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
from app_runtime.contracts.health import HealthContributor
|
||||
from app_runtime.contracts.worker import Worker
|
||||
from app_runtime.core.service_container import ServiceContainer
|
||||
from app_runtime.http.base import HttpRouteRegistrar
|
||||
|
||||
|
||||
class ModuleRegistry:
|
||||
@@ -10,6 +11,7 @@ class ModuleRegistry:
|
||||
self.services = services
|
||||
self.workers: list[Worker] = []
|
||||
self.health_contributors: list[HealthContributor] = []
|
||||
self.http_route_registrars: list[HttpRouteRegistrar] = []
|
||||
self.modules: list[str] = []
|
||||
|
||||
def register_module(self, name: str) -> None:
|
||||
@@ -20,3 +22,6 @@ class ModuleRegistry:
|
||||
|
||||
def add_health_contributor(self, contributor: HealthContributor) -> None:
|
||||
self.health_contributors.append(contributor)
|
||||
|
||||
def add_http_routes(self, registrar: HttpRouteRegistrar) -> None:
|
||||
self.http_route_registrars.append(registrar)
|
||||
|
||||
@@ -5,13 +5,17 @@ from time import monotonic, sleep
|
||||
from app_runtime.config.providers import FileConfigProvider
|
||||
from app_runtime.contracts.application import ApplicationModule
|
||||
from app_runtime.control.base import ControlActionRequest
|
||||
from app_runtime.control.base import TraceQueryRequest
|
||||
from app_runtime.contracts.trace import TraceLogView
|
||||
from app_runtime.control.service import ControlPlaneService
|
||||
from app_runtime.core.configuration import ConfigurationManager
|
||||
from app_runtime.core.registration import ModuleRegistry
|
||||
from app_runtime.core.service_container import ServiceContainer
|
||||
from app_runtime.core.types import HealthPayload, LifecycleState
|
||||
from app_runtime.health.registry import HealthRegistry
|
||||
from app_runtime.http.service import ApplicationHttpService
|
||||
from app_runtime.logging.manager import LogManager
|
||||
from app_runtime.tracing.reader import build_trace_log_reader
|
||||
from app_runtime.tracing.service import TraceService
|
||||
from app_runtime.workers.supervisor import WorkerSupervisor
|
||||
|
||||
@@ -29,6 +33,7 @@ class RuntimeManager:
|
||||
logs: LogManager | None = None,
|
||||
workers: WorkerSupervisor | None = None,
|
||||
control_plane: ControlPlaneService | None = None,
|
||||
application_http: ApplicationHttpService | None = None,
|
||||
) -> None:
|
||||
self.configuration = configuration or ConfigurationManager()
|
||||
self.services = services or ServiceContainer()
|
||||
@@ -37,6 +42,7 @@ class RuntimeManager:
|
||||
self.logs = logs or LogManager()
|
||||
self.workers = workers or WorkerSupervisor()
|
||||
self.control_plane = control_plane or ControlPlaneService()
|
||||
self.application_http = application_http or ApplicationHttpService()
|
||||
self.registry = ModuleRegistry(self.services)
|
||||
self._started = False
|
||||
self._state = LifecycleState.IDLE
|
||||
@@ -64,6 +70,8 @@ class RuntimeManager:
|
||||
self.workers.start()
|
||||
if start_control_plane:
|
||||
self.control_plane.start(self)
|
||||
self._register_application_http_routes()
|
||||
self.application_http.start(self)
|
||||
self._started = True
|
||||
self._refresh_state()
|
||||
|
||||
@@ -72,6 +80,7 @@ class RuntimeManager:
|
||||
return
|
||||
self._state = LifecycleState.STOPPING
|
||||
self.workers.stop(timeout=timeout, force=force)
|
||||
self.application_http.stop()
|
||||
if stop_control_plane:
|
||||
self.control_plane.stop()
|
||||
self._started = False
|
||||
@@ -117,6 +126,7 @@ class RuntimeManager:
|
||||
except TimeoutError:
|
||||
return self._action_detail("runtime stop is still in progress", timed_out=True)
|
||||
|
||||
self.application_http.stop()
|
||||
self._refresh_state()
|
||||
if self._state == LifecycleState.STOPPED:
|
||||
self._started = False
|
||||
@@ -127,6 +137,15 @@ class RuntimeManager:
|
||||
self._refresh_state()
|
||||
return self._state.value
|
||||
|
||||
async def trace_logs(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView:
|
||||
reader = build_trace_log_reader(self.traces.transport)
|
||||
if reader is None:
|
||||
raise RuntimeError("trace log reader is not configured")
|
||||
trace_view = reader.read_trace(trace_id, request.levels, request.ancestor_depth)
|
||||
if trace_view is None:
|
||||
raise KeyError(trace_id)
|
||||
return trace_view
|
||||
|
||||
def _register_core_services(self) -> None:
|
||||
if self._core_registered:
|
||||
return
|
||||
@@ -136,6 +155,7 @@ class RuntimeManager:
|
||||
self.services.register("logs", self.logs)
|
||||
self.services.register("workers", self.workers)
|
||||
self.services.register("control_plane", self.control_plane)
|
||||
self.services.register("application_http", self.application_http)
|
||||
self._core_registered = True
|
||||
|
||||
def _register_health_contributors(self) -> None:
|
||||
@@ -149,6 +169,11 @@ class RuntimeManager:
|
||||
self.workers.register(worker)
|
||||
self._workers_registered = True
|
||||
|
||||
def _register_application_http_routes(self) -> None:
|
||||
for registrar in self.registry.http_route_registrars:
|
||||
self.application_http.register_routes(registrar)
|
||||
self.registry.http_route_registrars.clear()
|
||||
|
||||
def _refresh_state(self) -> None:
|
||||
lifecycle = self.workers.lifecycle_state()
|
||||
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
|
||||
from app_runtime.http.http_app import HttpApplicationAppFactory
|
||||
from app_runtime.http.http_channel import HttpApplicationChannel
|
||||
from app_runtime.http.service import ApplicationHttpService
|
||||
from app_runtime.http.unified_service import UnifiedHttpService
|
||||
|
||||
__all__ = [
|
||||
"ApplicationHttpChannel",
|
||||
"ApplicationHttpService",
|
||||
"HttpApplicationAppFactory",
|
||||
"HttpApplicationChannel",
|
||||
"HttpRouteRegistrar",
|
||||
"UnifiedHttpService",
|
||||
]
|
||||
@@ -0,0 +1,23 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Protocol
|
||||
|
||||
from fastapi import FastAPI
|
||||
|
||||
from app_runtime.core.service_container import ServiceContainer
|
||||
|
||||
|
||||
class HttpRouteRegistrar(Protocol):
|
||||
def register(self, app: FastAPI, services: ServiceContainer) -> None:
|
||||
"""Register application routes on the provided FastAPI app."""
|
||||
|
||||
|
||||
class ApplicationHttpChannel(ABC):
|
||||
@abstractmethod
|
||||
async def start(self, app: FastAPI) -> None:
|
||||
"""Start the HTTP channel with the prepared application app."""
|
||||
|
||||
@abstractmethod
|
||||
async def stop(self) -> None:
|
||||
"""Stop the HTTP channel and release resources."""
|
||||
@@ -0,0 +1,37 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
from collections.abc import Iterable
|
||||
|
||||
from fastapi import FastAPI, Request
|
||||
|
||||
from app_runtime.core.service_container import ServiceContainer
|
||||
from app_runtime.http.base import HttpRouteRegistrar
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HttpApplicationAppFactory:
|
||||
def create(self, registrars: Iterable[HttpRouteRegistrar], services: ServiceContainer) -> FastAPI:
|
||||
app = FastAPI(title="PLBA Application API")
|
||||
self._register_middleware(app)
|
||||
for registrar in registrars:
|
||||
registrar.register(app, services)
|
||||
return app
|
||||
|
||||
def _register_middleware(self, app: FastAPI) -> None:
|
||||
@app.middleware("http")
|
||||
async def track_request(request: Request, call_next): # type: ignore[no-untyped-def]
|
||||
started = time.monotonic()
|
||||
response = await call_next(request)
|
||||
duration_ms = int((time.monotonic() - started) * 1000)
|
||||
response.headers["X-Response-Time-Ms"] = str(duration_ms)
|
||||
LOGGER.info(
|
||||
"Application HTTP request handled: method=%s path=%s status=%s duration_ms=%s",
|
||||
request.method,
|
||||
request.url.path,
|
||||
response.status_code,
|
||||
duration_ms,
|
||||
)
|
||||
return response
|
||||
@@ -0,0 +1,21 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fastapi import FastAPI
|
||||
|
||||
from app_runtime.control.http_runner import UvicornThreadRunner
|
||||
from app_runtime.http.base import ApplicationHttpChannel
|
||||
|
||||
|
||||
class HttpApplicationChannel(ApplicationHttpChannel):
|
||||
def __init__(self, host: str, port: int, timeout: int) -> None:
|
||||
self._runner = UvicornThreadRunner(host, port, timeout, thread_name="plba-http-application")
|
||||
|
||||
async def start(self, app: FastAPI) -> None:
|
||||
await self._runner.start(app)
|
||||
|
||||
async def stop(self) -> None:
|
||||
await self._runner.stop()
|
||||
|
||||
@property
|
||||
def port(self) -> int:
|
||||
return self._runner.port
|
||||
@@ -0,0 +1,42 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
|
||||
from app_runtime.http.http_app import HttpApplicationAppFactory
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app_runtime.core.runtime import RuntimeManager
|
||||
|
||||
|
||||
class ApplicationHttpService:
|
||||
def __init__(self, app_factory: HttpApplicationAppFactory | None = None) -> None:
|
||||
self._channels: list[ApplicationHttpChannel] = []
|
||||
self._registrars: list[HttpRouteRegistrar] = []
|
||||
self._app_factory = app_factory or HttpApplicationAppFactory()
|
||||
|
||||
def register_channel(self, channel: ApplicationHttpChannel) -> None:
|
||||
self._channels.append(channel)
|
||||
|
||||
def register_routes(self, registrar: HttpRouteRegistrar) -> None:
|
||||
self._registrars.append(registrar)
|
||||
|
||||
def start(self, runtime: RuntimeManager) -> None:
|
||||
if not self._channels:
|
||||
return
|
||||
asyncio.run(self._start_async(runtime))
|
||||
|
||||
def stop(self) -> None:
|
||||
if not self._channels:
|
||||
return
|
||||
asyncio.run(self._stop_async())
|
||||
|
||||
async def _start_async(self, runtime: RuntimeManager) -> None:
|
||||
app = self._app_factory.create(self._registrars, runtime.services)
|
||||
for channel in self._channels:
|
||||
await channel.start(app)
|
||||
|
||||
async def _stop_async(self) -> None:
|
||||
for channel in reversed(self._channels):
|
||||
await channel.stop()
|
||||
@@ -0,0 +1,68 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from app_runtime.control.action_responder import ControlActionResponder
|
||||
from app_runtime.control.base import ControlActionSet
|
||||
from app_runtime.control.http_app import HttpControlAppFactory
|
||||
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
|
||||
from app_runtime.http.http_app import HttpApplicationAppFactory
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from app_runtime.core.runtime import RuntimeManager
|
||||
|
||||
|
||||
class UnifiedHttpService:
|
||||
"""Publish control and application routes through the same HTTP channel."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
app_factory: HttpApplicationAppFactory | None = None,
|
||||
control_app_factory: HttpControlAppFactory | None = None,
|
||||
control_timeout: int = 5,
|
||||
) -> None:
|
||||
self._channels: list[ApplicationHttpChannel] = []
|
||||
self._registrars: list[HttpRouteRegistrar] = []
|
||||
self._app_factory = app_factory or HttpApplicationAppFactory()
|
||||
self._control_app_factory = control_app_factory or HttpControlAppFactory()
|
||||
self._control_timeout = control_timeout
|
||||
|
||||
def register_channel(self, channel: ApplicationHttpChannel) -> None:
|
||||
self._channels.append(channel)
|
||||
|
||||
def register_routes(self, registrar: HttpRouteRegistrar) -> None:
|
||||
self._registrars.append(registrar)
|
||||
|
||||
def start(self, runtime: RuntimeManager) -> None:
|
||||
if not self._channels:
|
||||
return
|
||||
asyncio.run(self._start_async(runtime))
|
||||
|
||||
def stop(self) -> None:
|
||||
if not self._channels:
|
||||
return
|
||||
asyncio.run(self._stop_async())
|
||||
|
||||
async def _start_async(self, runtime: RuntimeManager) -> None:
|
||||
app = self._app_factory.create(self._registrars, runtime.services)
|
||||
actions = ControlActionSet(
|
||||
health=runtime.health_status,
|
||||
start=runtime.start_runtime,
|
||||
stop=runtime.stop_runtime,
|
||||
status=runtime.runtime_status,
|
||||
trace_lookup=runtime.trace_logs,
|
||||
)
|
||||
action_responder = ControlActionResponder(actions, self._control_timeout)
|
||||
control_app = self._control_app_factory.create(
|
||||
actions.health,
|
||||
action_responder.respond,
|
||||
actions.trace_lookup,
|
||||
)
|
||||
app.router.routes.extend(control_app.router.routes)
|
||||
for channel in self._channels:
|
||||
await channel.start(app)
|
||||
|
||||
async def _stop_async(self) -> None:
|
||||
for channel in reversed(self._channels):
|
||||
await channel.stop()
|
||||
@@ -0,0 +1,140 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from app_runtime.contracts.trace import TraceLevel, TraceLogReader, TraceLogRecord, TraceLogView, TraceTransport
|
||||
from app_runtime.tracing.transport import MySqlTraceConnectionFactory, MySqlTraceTransport
|
||||
|
||||
|
||||
class MySqlTraceLogReader(TraceLogReader):
|
||||
def __init__(self, connection_factory: MySqlTraceConnectionFactory) -> None:
|
||||
self._connection_factory = connection_factory
|
||||
|
||||
def read_trace(
|
||||
self,
|
||||
trace_id: str,
|
||||
levels: tuple[TraceLevel, ...],
|
||||
ancestor_depth: int | None = 0,
|
||||
) -> TraceLogView | None:
|
||||
parent_id = self._read_parent_id(trace_id)
|
||||
if parent_id is None and not self._trace_exists(trace_id):
|
||||
return None
|
||||
ancestors = self._read_ancestors(parent_id, levels, ancestor_depth)
|
||||
child_ids = self._read_child_ids(trace_id)
|
||||
records = self._read_records(trace_id, levels)
|
||||
return TraceLogView(
|
||||
trace_id=trace_id,
|
||||
parent_id=parent_id,
|
||||
child_ids=tuple(child_ids),
|
||||
records=tuple(records),
|
||||
ancestors=tuple(ancestors),
|
||||
)
|
||||
|
||||
def _read_ancestors(
|
||||
self,
|
||||
parent_id: str | None,
|
||||
levels: tuple[TraceLevel, ...],
|
||||
ancestor_depth: int | None,
|
||||
) -> list[TraceLogView]:
|
||||
if parent_id is None or ancestor_depth == 0:
|
||||
return []
|
||||
remaining_depth = ancestor_depth
|
||||
ancestors: list[TraceLogView] = []
|
||||
current_trace_id = parent_id
|
||||
while current_trace_id is not None and (remaining_depth is None or remaining_depth > 0):
|
||||
current_parent_id = self._read_parent_id(current_trace_id)
|
||||
ancestors.append(
|
||||
TraceLogView(
|
||||
trace_id=current_trace_id,
|
||||
parent_id=current_parent_id,
|
||||
child_ids=tuple(self._read_child_ids(current_trace_id)),
|
||||
records=tuple(self._read_records(current_trace_id, levels)),
|
||||
)
|
||||
)
|
||||
current_trace_id = current_parent_id
|
||||
if remaining_depth is not None:
|
||||
remaining_depth -= 1
|
||||
ancestors.reverse()
|
||||
return ancestors
|
||||
|
||||
def _trace_exists(self, trace_id: str) -> bool:
|
||||
query = "SELECT 1 FROM trace_contexts WHERE trace_id = %s"
|
||||
with self._connection_factory.connect() as connection:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(query, (trace_id,))
|
||||
return cursor.fetchone() is not None
|
||||
|
||||
def _read_parent_id(self, trace_id: str) -> str | None:
|
||||
query = "SELECT parent_id FROM trace_contexts WHERE trace_id = %s"
|
||||
with self._connection_factory.connect() as connection:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(query, (trace_id,))
|
||||
row = cursor.fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
return self._string_or_none(row.get("parent_id"))
|
||||
|
||||
def _read_records(self, trace_id: str, levels: tuple[TraceLevel, ...]) -> list[TraceLogRecord]:
|
||||
placeholders = ", ".join(["%s"] * len(levels))
|
||||
query = f"""
|
||||
SELECT id, trace_id, event_time, step, status, level, message, attrs_json
|
||||
FROM trace_messages
|
||||
WHERE trace_id = %s AND level IN ({placeholders})
|
||||
ORDER BY event_time ASC, id ASC
|
||||
"""
|
||||
params: tuple[object, ...] = (trace_id, *levels)
|
||||
with self._connection_factory.connect() as connection:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(query, params)
|
||||
rows = cursor.fetchall()
|
||||
return [self._build_record(row) for row in rows]
|
||||
|
||||
def _read_child_ids(self, trace_id: str) -> list[str]:
|
||||
query = """
|
||||
SELECT trace_id
|
||||
FROM trace_contexts
|
||||
WHERE parent_id = %s
|
||||
ORDER BY event_time ASC, trace_id ASC
|
||||
"""
|
||||
with self._connection_factory.connect() as connection:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(query, (trace_id,))
|
||||
rows = cursor.fetchall()
|
||||
return [str(row["trace_id"]) for row in rows]
|
||||
|
||||
def _build_record(self, row: dict[str, Any]) -> TraceLogRecord:
|
||||
return TraceLogRecord(
|
||||
id=int(row["id"]),
|
||||
trace_id=str(row["trace_id"]),
|
||||
event_time=row["event_time"],
|
||||
step=str(row["step"] or ""),
|
||||
status=str(row["status"] or ""),
|
||||
level=str(row["level"]),
|
||||
message=str(row["message"] or ""),
|
||||
attrs_json=self._load_json(row.get("attrs_json")),
|
||||
)
|
||||
|
||||
def _load_json(self, raw_value: Any) -> Any:
|
||||
if raw_value is None or isinstance(raw_value, (dict, list, int, float, bool)):
|
||||
return raw_value
|
||||
if isinstance(raw_value, (bytes, bytearray)):
|
||||
raw_value = raw_value.decode("utf-8")
|
||||
if isinstance(raw_value, str):
|
||||
try:
|
||||
return json.loads(raw_value)
|
||||
except json.JSONDecodeError:
|
||||
return raw_value
|
||||
return raw_value
|
||||
|
||||
def _string_or_none(self, value: Any) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
text = str(value)
|
||||
return text or None
|
||||
|
||||
|
||||
def build_trace_log_reader(transport: TraceTransport) -> TraceLogReader | None:
|
||||
if isinstance(transport, MySqlTraceTransport):
|
||||
return MySqlTraceLogReader(transport.create_connection_factory())
|
||||
return None
|
||||
@@ -15,7 +15,7 @@ class NoOpTraceTransport(TraceTransport):
|
||||
del record
|
||||
|
||||
|
||||
class MySqlTraceTransport(TraceTransport):
|
||||
class MySqlTraceConnectionFactory:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
@@ -31,6 +31,39 @@ class MySqlTraceTransport(TraceTransport):
|
||||
self._user = user
|
||||
self._password = password
|
||||
|
||||
def connect(self): # type: ignore[no-untyped-def]
|
||||
import pymysql
|
||||
|
||||
return pymysql.connect(
|
||||
host=self._host,
|
||||
port=self._port,
|
||||
user=self._user,
|
||||
password=self._password,
|
||||
database=self._database,
|
||||
charset="utf8mb4",
|
||||
autocommit=True,
|
||||
cursorclass=pymysql.cursors.DictCursor,
|
||||
)
|
||||
|
||||
|
||||
class MySqlTraceTransport(TraceTransport):
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
host: str,
|
||||
port: int,
|
||||
database: str,
|
||||
user: str,
|
||||
password: str,
|
||||
) -> None:
|
||||
self._connections = MySqlTraceConnectionFactory(
|
||||
host=host,
|
||||
port=port,
|
||||
database=database,
|
||||
user=user,
|
||||
password=password,
|
||||
)
|
||||
|
||||
def write_context(self, record: TraceContextRecord) -> None:
|
||||
query = """
|
||||
INSERT INTO trace_contexts (trace_id, parent_id, alias, type, event_time, attrs_json)
|
||||
@@ -69,21 +102,13 @@ class MySqlTraceTransport(TraceTransport):
|
||||
self._execute(query, params)
|
||||
|
||||
def _execute(self, query: str, params: tuple[object, ...]) -> None:
|
||||
import pymysql
|
||||
|
||||
with pymysql.connect(
|
||||
host=self._host,
|
||||
port=self._port,
|
||||
user=self._user,
|
||||
password=self._password,
|
||||
database=self._database,
|
||||
charset="utf8mb4",
|
||||
autocommit=True,
|
||||
cursorclass=pymysql.cursors.DictCursor,
|
||||
) as connection:
|
||||
with self._connections.connect() as connection:
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(query, params)
|
||||
|
||||
def create_connection_factory(self) -> MySqlTraceConnectionFactory:
|
||||
return self._connections
|
||||
|
||||
def _dumps(self, payload: dict[str, object]) -> str:
|
||||
return json.dumps(payload, ensure_ascii=False, default=self._json_default)
|
||||
|
||||
|
||||
@@ -15,6 +15,14 @@ from plba.contracts import (
|
||||
)
|
||||
from plba.core import ConfigurationManager, RuntimeManager, ServiceContainer
|
||||
from plba.health import HealthRegistry
|
||||
from plba.http import (
|
||||
ApplicationHttpChannel,
|
||||
ApplicationHttpService,
|
||||
HttpApplicationAppFactory,
|
||||
HttpApplicationChannel,
|
||||
HttpRouteRegistrar,
|
||||
UnifiedHttpService,
|
||||
)
|
||||
from plba.logging import LogManager
|
||||
from plba.queue import InMemoryTaskQueue
|
||||
from plba.tracing import MySqlTraceTransport, NoOpTraceTransport, TraceService
|
||||
@@ -43,7 +51,13 @@ __all__ = [
|
||||
"FileConfigProvider",
|
||||
"HealthContributor",
|
||||
"HealthRegistry",
|
||||
"ApplicationHttpChannel",
|
||||
"ApplicationHttpService",
|
||||
"HttpApplicationAppFactory",
|
||||
"HttpApplicationChannel",
|
||||
"HttpRouteRegistrar",
|
||||
"HttpControlChannel",
|
||||
"UnifiedHttpService",
|
||||
"InMemoryTaskQueue",
|
||||
"LogManager",
|
||||
"MySqlTraceTransport",
|
||||
|
||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
from app_runtime.control.http_channel import HttpControlChannel
|
||||
from app_runtime.core.runtime import RuntimeManager
|
||||
from app_runtime.contracts.application import ApplicationModule
|
||||
from app_runtime.http.http_channel import HttpApplicationChannel
|
||||
|
||||
|
||||
def create_runtime(
|
||||
@@ -13,6 +14,9 @@ def create_runtime(
|
||||
control_host: str = "127.0.0.1",
|
||||
control_port: int = 8080,
|
||||
control_timeout: int = 5,
|
||||
application_host: str | None = None,
|
||||
application_port: int = 15000,
|
||||
application_timeout: int = 5,
|
||||
) -> RuntimeManager:
|
||||
runtime = RuntimeManager()
|
||||
if config_path is not None:
|
||||
@@ -25,5 +29,13 @@ def create_runtime(
|
||||
timeout=control_timeout,
|
||||
)
|
||||
)
|
||||
if application_host is not None:
|
||||
runtime.application_http.register_channel(
|
||||
HttpApplicationChannel(
|
||||
host=application_host,
|
||||
port=application_port,
|
||||
timeout=application_timeout,
|
||||
)
|
||||
)
|
||||
runtime.register_module(module)
|
||||
return runtime
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
|
||||
from app_runtime.http.http_app import HttpApplicationAppFactory
|
||||
from app_runtime.http.http_channel import HttpApplicationChannel
|
||||
from app_runtime.http.service import ApplicationHttpService
|
||||
from app_runtime.http.unified_service import UnifiedHttpService
|
||||
|
||||
__all__ = [
|
||||
"ApplicationHttpChannel",
|
||||
"ApplicationHttpService",
|
||||
"HttpApplicationAppFactory",
|
||||
"HttpApplicationChannel",
|
||||
"HttpRouteRegistrar",
|
||||
"UnifiedHttpService",
|
||||
]
|
||||
@@ -0,0 +1,234 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import http.client
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import FastAPI, File, UploadFile
|
||||
from fastapi.responses import FileResponse
|
||||
from fastapi.testclient import TestClient
|
||||
import pytest
|
||||
|
||||
from app_runtime.contracts.application import ApplicationModule
|
||||
from app_runtime.control.http_channel import HttpControlChannel
|
||||
from app_runtime.core.registration import ModuleRegistry
|
||||
from app_runtime.core.runtime import RuntimeManager
|
||||
from app_runtime.http.base import ApplicationHttpChannel
|
||||
from app_runtime.http.http_channel import HttpApplicationChannel
|
||||
from app_runtime.http.unified_service import UnifiedHttpService
|
||||
|
||||
try:
|
||||
import python_multipart # noqa: F401
|
||||
except ImportError:
|
||||
HAS_MULTIPART = False
|
||||
else:
|
||||
HAS_MULTIPART = True
|
||||
|
||||
|
||||
class RecordingChannel(ApplicationHttpChannel):
|
||||
def __init__(self) -> None:
|
||||
self.apps: list[FastAPI] = []
|
||||
self.stop_calls = 0
|
||||
|
||||
async def start(self, app: FastAPI) -> None:
|
||||
self.apps.append(app)
|
||||
|
||||
async def stop(self) -> None:
|
||||
self.stop_calls += 1
|
||||
|
||||
|
||||
class PingRoutes:
|
||||
def register(self, app: FastAPI, services) -> None: # type: ignore[no-untyped-def]
|
||||
@app.get("/estimate/ping")
|
||||
async def ping() -> dict[str, str]:
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ServiceBackedRoutes:
|
||||
download_path: Path
|
||||
|
||||
def register(self, app: FastAPI, services) -> None: # type: ignore[no-untyped-def]
|
||||
marker = services.get("task_query_service")
|
||||
|
||||
@app.get("/estimate/api/tasks")
|
||||
async def list_tasks() -> dict[str, object]:
|
||||
return {"marker": marker["marker"]}
|
||||
|
||||
@app.post("/estimate/api/tasks")
|
||||
async def create_task(file: UploadFile = File(...)) -> dict[str, object]:
|
||||
payload = await file.read()
|
||||
return {"filename": file.filename, "size": len(payload)}
|
||||
|
||||
@app.get("/estimate/api/tasks/result")
|
||||
async def download_result() -> FileResponse:
|
||||
return FileResponse(self.download_path, filename=self.download_path.name)
|
||||
|
||||
|
||||
class MetricsRoutes:
|
||||
def register(self, app: FastAPI, services) -> None: # type: ignore[no-untyped-def]
|
||||
@app.get("/estimate/api/metrics")
|
||||
async def metrics() -> dict[str, int]:
|
||||
return {"count": 1}
|
||||
|
||||
|
||||
class HttpModule(ApplicationModule):
|
||||
def __init__(self, *registrars: object) -> None:
|
||||
self._registrars = registrars
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "http-module"
|
||||
|
||||
def register(self, registry: ModuleRegistry) -> None:
|
||||
for registrar in self._registrars:
|
||||
registry.add_http_routes(registrar)
|
||||
|
||||
|
||||
def _application_client(channel: RecordingChannel) -> TestClient:
|
||||
assert channel.apps
|
||||
return TestClient(channel.apps[0])
|
||||
|
||||
|
||||
def _http_request(port: int, path: str) -> tuple[int, bytes]:
|
||||
connection = http.client.HTTPConnection("127.0.0.1", port, timeout=2)
|
||||
try:
|
||||
connection.request("GET", path)
|
||||
response = connection.getresponse()
|
||||
payload = response.read()
|
||||
return response.status, payload
|
||||
finally:
|
||||
connection.close()
|
||||
|
||||
|
||||
def test_runtime_starts_application_http_and_registers_routes() -> None:
|
||||
runtime = RuntimeManager()
|
||||
channel = RecordingChannel()
|
||||
runtime.application_http.register_channel(channel)
|
||||
runtime.register_module(HttpModule(PingRoutes()))
|
||||
|
||||
runtime.start(start_control_plane=False)
|
||||
try:
|
||||
assert len(channel.apps) == 1
|
||||
client = _application_client(channel)
|
||||
with client:
|
||||
response = client.get("/estimate/ping")
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "ok"}
|
||||
assert response.headers["x-response-time-ms"].isdigit()
|
||||
finally:
|
||||
runtime.stop(stop_control_plane=False)
|
||||
|
||||
assert channel.stop_calls == 1
|
||||
|
||||
|
||||
def test_application_routes_see_runtime_services_and_support_upload_download(tmp_path: Path) -> None:
|
||||
if not HAS_MULTIPART:
|
||||
pytest.skip("python-multipart is not installed in the local environment")
|
||||
|
||||
runtime = RuntimeManager()
|
||||
runtime.services.register("task_query_service", {"marker": "from-container"})
|
||||
result_path = tmp_path / "result.txt"
|
||||
result_path.write_text("ready", encoding="utf-8")
|
||||
|
||||
channel = RecordingChannel()
|
||||
runtime.application_http.register_channel(channel)
|
||||
runtime.register_module(HttpModule(ServiceBackedRoutes(result_path), MetricsRoutes()))
|
||||
runtime.start(start_control_plane=False)
|
||||
client = _application_client(channel)
|
||||
|
||||
try:
|
||||
with client:
|
||||
list_response = client.get("/estimate/api/tasks")
|
||||
assert list_response.status_code == 200
|
||||
assert list_response.json() == {"marker": "from-container"}
|
||||
|
||||
upload_response = client.post(
|
||||
"/estimate/api/tasks",
|
||||
files={"file": ("input.txt", b"payload", "text/plain")},
|
||||
)
|
||||
assert upload_response.status_code == 200
|
||||
assert upload_response.json() == {"filename": "input.txt", "size": 7}
|
||||
|
||||
metrics_response = client.get("/estimate/api/metrics")
|
||||
assert metrics_response.status_code == 200
|
||||
assert metrics_response.json() == {"count": 1}
|
||||
|
||||
download_response = client.get("/estimate/api/tasks/result")
|
||||
assert download_response.status_code == 200
|
||||
assert download_response.content == b"ready"
|
||||
finally:
|
||||
runtime.stop(stop_control_plane=False)
|
||||
|
||||
|
||||
def test_application_http_stop_shuts_down_real_server() -> None:
|
||||
runtime = RuntimeManager()
|
||||
channel = HttpApplicationChannel(host="127.0.0.1", port=0, timeout=2)
|
||||
runtime.application_http.register_channel(channel)
|
||||
runtime.register_module(HttpModule(PingRoutes()))
|
||||
runtime.start(start_control_plane=False)
|
||||
|
||||
try:
|
||||
status, _ = _http_request(channel.port, "/estimate/ping")
|
||||
assert status == 200
|
||||
finally:
|
||||
runtime.stop(stop_control_plane=False)
|
||||
|
||||
try:
|
||||
_http_request(channel.port, "/estimate/ping")
|
||||
except OSError:
|
||||
pass
|
||||
else:
|
||||
raise AssertionError("application HTTP server is still reachable after stop")
|
||||
|
||||
|
||||
def test_control_plane_and_application_http_work_independently() -> None:
|
||||
runtime = RuntimeManager()
|
||||
control_channel = HttpControlChannel(host="127.0.0.1", port=0, timeout=2)
|
||||
app_channel = HttpApplicationChannel(host="127.0.0.1", port=0, timeout=2)
|
||||
runtime.control_plane.register_channel(control_channel)
|
||||
runtime.application_http.register_channel(app_channel)
|
||||
runtime.register_module(HttpModule(PingRoutes()))
|
||||
runtime.start()
|
||||
|
||||
try:
|
||||
control_status, _ = _http_request(control_channel.port, "/health")
|
||||
app_status, _ = _http_request(app_channel.port, "/estimate/ping")
|
||||
assert control_status == 200
|
||||
assert app_status == 200
|
||||
|
||||
control_missing_status, _ = _http_request(control_channel.port, "/estimate/ping")
|
||||
app_missing_status, _ = _http_request(app_channel.port, "/health")
|
||||
assert control_missing_status == 404
|
||||
assert app_missing_status == 404
|
||||
|
||||
runtime.application_http.stop()
|
||||
control_status, _ = _http_request(control_channel.port, "/health")
|
||||
assert control_status == 200
|
||||
finally:
|
||||
runtime.control_plane.stop()
|
||||
runtime.stop(stop_control_plane=False)
|
||||
|
||||
|
||||
def test_unified_http_serves_control_and_application_routes_on_one_app() -> None:
|
||||
runtime = RuntimeManager(application_http=UnifiedHttpService())
|
||||
channel = RecordingChannel()
|
||||
runtime.application_http.register_channel(channel)
|
||||
runtime.register_module(HttpModule(PingRoutes()))
|
||||
runtime.start(start_control_plane=False)
|
||||
|
||||
try:
|
||||
client = _application_client(channel)
|
||||
with client:
|
||||
health_response = client.get("/health")
|
||||
action_response = client.get("/actions/status")
|
||||
app_response = client.get("/estimate/ping")
|
||||
|
||||
assert health_response.status_code == 200
|
||||
assert health_response.json()["status"] == "ok"
|
||||
assert action_response.status_code == 200
|
||||
assert action_response.json() == {"status": "ok", "detail": "idle"}
|
||||
assert app_response.status_code == 200
|
||||
assert app_response.json() == {"status": "ok"}
|
||||
finally:
|
||||
runtime.stop(stop_control_plane=False)
|
||||
@@ -0,0 +1,738 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
import app_runtime.core.runtime as runtime_module
|
||||
from app_runtime.control.base import ControlActionRequest, TraceQueryRequest
|
||||
from app_runtime.control.http_app import HttpControlAppFactory
|
||||
from app_runtime.contracts.trace import TraceLogRecord, TraceLogView
|
||||
from app_runtime.core.runtime import RuntimeManager
|
||||
from app_runtime.tracing.reader import MySqlTraceLogReader
|
||||
|
||||
|
||||
def _trace_record(
|
||||
*,
|
||||
row_id: int,
|
||||
level: str,
|
||||
message: str,
|
||||
step: str = "process",
|
||||
status: str = "failed",
|
||||
attrs_json: object | None = None,
|
||||
) -> TraceLogRecord:
|
||||
return TraceLogRecord(
|
||||
id=row_id,
|
||||
trace_id="trace-1",
|
||||
event_time=datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
||||
step=step,
|
||||
status=status,
|
||||
level=level, # type: ignore[arg-type]
|
||||
message=message,
|
||||
attrs_json=attrs_json if attrs_json is not None else {},
|
||||
)
|
||||
|
||||
|
||||
def _build_client(trace_provider=None) -> TestClient:
|
||||
async def health_provider():
|
||||
return {"status": "ok"}
|
||||
|
||||
async def action_provider(_action: str, _client_source: str, _request: ControlActionRequest) -> JSONResponse:
|
||||
return JSONResponse(content={"status": "ok"})
|
||||
|
||||
app = HttpControlAppFactory().create(health_provider, action_provider, trace_provider)
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
def test_trace_endpoint_returns_html_by_default() -> None:
|
||||
captured: list[tuple[str, TraceQueryRequest]] = []
|
||||
|
||||
async def trace_provider(trace_id: str, request: TraceQueryRequest) -> TraceLogView:
|
||||
captured.append((trace_id, request))
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="root-trace",
|
||||
child_ids=("child-1", "child-2"),
|
||||
records=(
|
||||
_trace_record(row_id=1, level="ERROR", message="first error"),
|
||||
_trace_record(row_id=2, level="WARNING", message="second warning"),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.headers["content-type"].startswith("text/html")
|
||||
assert "trace_id:" in response.text
|
||||
assert "first error" in response.text
|
||||
assert "second warning" in response.text
|
||||
assert captured == [
|
||||
(
|
||||
"trace-1",
|
||||
TraceQueryRequest(
|
||||
levels=("ERROR", "WARNING", "INFO"),
|
||||
include_attrs_json=False,
|
||||
response_format="html",
|
||||
ancestor_depth=0,
|
||||
),
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_trace_endpoint_returns_text_when_requested() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="root-trace",
|
||||
child_ids=("child-1", "child-2"),
|
||||
records=(
|
||||
_trace_record(row_id=1, level="ERROR", message="first error"),
|
||||
_trace_record(row_id=2, level="WARNING", message="second warning"),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1?format=text")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.text == (
|
||||
"trace_id: trace-1\n"
|
||||
"parent_id: root-trace\n"
|
||||
"child_ids:\n"
|
||||
" - child-1\n"
|
||||
" - child-2\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: trace-1\n"
|
||||
"\n"
|
||||
"step: process\n"
|
||||
"first error\n"
|
||||
"second warning"
|
||||
)
|
||||
|
||||
|
||||
def test_trace_endpoint_appends_attrs_json_in_text_mode() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id=None,
|
||||
child_ids=(),
|
||||
records=(
|
||||
_trace_record(row_id=1, level="ERROR", message="failure", attrs_json={"attempt": 2, "source": "crm"}),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1?format=text&attrs_json=true")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.text == (
|
||||
"trace_id: trace-1\n"
|
||||
"parent_id: \n"
|
||||
"child_ids:\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: trace-1\n"
|
||||
"\n"
|
||||
"step: process\n"
|
||||
'failure, {"attempt":2,"source":"crm"}'
|
||||
)
|
||||
|
||||
|
||||
def test_trace_endpoint_separates_messages_by_step_in_text_mode() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id=None,
|
||||
child_ids=(),
|
||||
records=(
|
||||
_trace_record(row_id=1, level="INFO", message="load first", step="load_stocks"),
|
||||
_trace_record(row_id=2, level="INFO", message="load second", step="load_stocks"),
|
||||
_trace_record(row_id=3, level="INFO", message="filter first", step="filter_stocks"),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1?format=text")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.text == (
|
||||
"trace_id: trace-1\n"
|
||||
"parent_id: \n"
|
||||
"child_ids:\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: trace-1\n"
|
||||
"\n"
|
||||
"step: load_stocks\n"
|
||||
"load first\n"
|
||||
"load second\n"
|
||||
"------------------------------\n"
|
||||
"step: filter_stocks\n"
|
||||
"filter first"
|
||||
)
|
||||
|
||||
|
||||
def test_trace_endpoint_returns_json_payload() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="parent-1",
|
||||
child_ids=("child-1",),
|
||||
records=(
|
||||
_trace_record(row_id=3, level="INFO", message="done", attrs_json={"batch": 7}),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1?format=json&attrs_json=true&levels=info")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {
|
||||
"trace_id": "trace-1",
|
||||
"parent_id": "parent-1",
|
||||
"child_ids": ["child-1"],
|
||||
"messages": [
|
||||
{
|
||||
"id": 3,
|
||||
"trace_id": "trace-1",
|
||||
"event_time": "2026-04-28T10:11:12+00:00",
|
||||
"step": "process",
|
||||
"status": "failed",
|
||||
"level": "INFO",
|
||||
"message": "done",
|
||||
"attrs_json": {"batch": 7},
|
||||
}
|
||||
],
|
||||
"ancestors": [],
|
||||
}
|
||||
|
||||
|
||||
def test_trace_endpoint_returns_json_payload_with_ancestors() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="parent-1",
|
||||
child_ids=("child-1",),
|
||||
records=(
|
||||
_trace_record(row_id=3, level="INFO", message="done", attrs_json={"batch": 7}),
|
||||
),
|
||||
ancestors=(
|
||||
TraceLogView(
|
||||
trace_id="root-1",
|
||||
parent_id=None,
|
||||
child_ids=("parent-1",),
|
||||
records=(
|
||||
_trace_record(row_id=4, level="INFO", message="root info"),
|
||||
),
|
||||
),
|
||||
TraceLogView(
|
||||
trace_id="parent-1",
|
||||
parent_id="root-1",
|
||||
child_ids=("trace-1", "sibling-1"),
|
||||
records=(
|
||||
_trace_record(row_id=5, level="WARNING", message="parent warning"),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1?format=json&ancestor_depth=1")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json()["ancestors"] == [
|
||||
{
|
||||
"trace_id": "root-1",
|
||||
"parent_id": "",
|
||||
"child_ids": ["parent-1"],
|
||||
"messages": [
|
||||
{
|
||||
"id": 4,
|
||||
"trace_id": "trace-1",
|
||||
"event_time": "2026-04-28T10:11:12+00:00",
|
||||
"step": "process",
|
||||
"status": "failed",
|
||||
"level": "INFO",
|
||||
"message": "root info",
|
||||
}
|
||||
],
|
||||
},
|
||||
{
|
||||
"trace_id": "parent-1",
|
||||
"parent_id": "root-1",
|
||||
"child_ids": ["trace-1", "sibling-1"],
|
||||
"messages": [
|
||||
{
|
||||
"id": 5,
|
||||
"trace_id": "trace-1",
|
||||
"event_time": "2026-04-28T10:11:12+00:00",
|
||||
"step": "process",
|
||||
"status": "failed",
|
||||
"level": "WARNING",
|
||||
"message": "parent warning",
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def test_trace_endpoint_returns_html_page_with_related_links() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="parent-1",
|
||||
child_ids=("child-1", "child-2"),
|
||||
records=(
|
||||
_trace_record(row_id=1, level="INFO", message="loaded prices", step="load_stocks", status="ok"),
|
||||
_trace_record(row_id=2, level="WARNING", message="filtered suspicious ticker", step="filter_stocks", status="degraded"),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1?format=html&attrs_json=true")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.headers["content-type"].startswith("text/html")
|
||||
assert "background: var(--bg);" in response.text
|
||||
assert "--bg: #000000;" in response.text
|
||||
assert "--fg: #ececec;" in response.text
|
||||
assert "--step: #ffffff;" in response.text
|
||||
assert "--info: #d6d7d9;" in response.text
|
||||
assert "--warning: #e9ebec;" in response.text
|
||||
assert "--error: #ff817d;" in response.text
|
||||
assert "--other: #ececec;" in response.text
|
||||
assert 'font: 13px/1.1 "SFMono-Regular", monospace;' in response.text
|
||||
assert '<div class="line">trace_id: <a href="/traces/trace-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">trace-1</a></div>' in response.text
|
||||
assert '<div class="line">parent_id: <a href="/traces/parent-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">parent-1</a></div>' in response.text
|
||||
assert '<div class="line">child_ids:</div>' in response.text
|
||||
assert '<div class="line"> - <a href="/traces/child-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">child-1</a></div>' in response.text
|
||||
assert '<div class="line"> - <a href="/traces/child-2?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">child-2</a></div>' in response.text
|
||||
assert '<div class="line">==============================</div>' in response.text
|
||||
assert '<div class="line" style="color: var(--step);">load_stocks</div>' in response.text
|
||||
assert '<div class="line">trace_id: <a href="/traces/trace-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true">trace-1</a></div>' in response.text
|
||||
assert '<div class="line" style="color: var(--step);">filter_stocks</div>' in response.text
|
||||
assert "loaded prices" in response.text
|
||||
assert "filtered suspicious ticker" in response.text
|
||||
assert "2026-04-28T10:11:12+00:00 | INFO | ok" not in response.text
|
||||
assert "2026-04-28T10:11:12+00:00 | WARNING | degraded" not in response.text
|
||||
assert "Related Traces" not in response.text
|
||||
|
||||
|
||||
def test_trace_endpoint_renders_ancestors_in_text_mode() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="parent-1",
|
||||
child_ids=(),
|
||||
records=(_trace_record(row_id=1, level="INFO", message="child message"),),
|
||||
ancestors=(
|
||||
TraceLogView(
|
||||
trace_id="root-1",
|
||||
parent_id=None,
|
||||
child_ids=("parent-1",),
|
||||
records=(_trace_record(row_id=2, level="INFO", message="root message"),),
|
||||
),
|
||||
TraceLogView(
|
||||
trace_id="parent-1",
|
||||
parent_id="root-1",
|
||||
child_ids=("trace-1",),
|
||||
records=(_trace_record(row_id=3, level="WARNING", message="parent message"),),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1?format=text&ancestor_depth=1")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.text == (
|
||||
"trace_id: trace-1\n"
|
||||
"parent_id: parent-1\n"
|
||||
"child_ids:\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: root-1\n"
|
||||
"\n"
|
||||
"step: process\n"
|
||||
"root message\n"
|
||||
"\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: parent-1\n"
|
||||
"\n"
|
||||
"step: process\n"
|
||||
"parent message\n"
|
||||
"\n"
|
||||
"\n"
|
||||
"==============================\n"
|
||||
"trace_id: trace-1\n"
|
||||
"\n"
|
||||
"step: process\n"
|
||||
"child message"
|
||||
)
|
||||
|
||||
|
||||
def test_trace_endpoint_preserves_ancestor_depth_in_html_links() -> None:
|
||||
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
|
||||
return TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="parent-1",
|
||||
child_ids=("child-1",),
|
||||
records=(_trace_record(row_id=1, level="INFO", message="loaded prices"),),
|
||||
ancestors=(
|
||||
TraceLogView(
|
||||
trace_id="root-1",
|
||||
parent_id=None,
|
||||
child_ids=("parent-1",),
|
||||
records=(_trace_record(row_id=2, level="INFO", message="root info"),),
|
||||
),
|
||||
TraceLogView(
|
||||
trace_id="parent-1",
|
||||
parent_id="root-1",
|
||||
child_ids=("trace-1",),
|
||||
records=(_trace_record(row_id=3, level="WARNING", message="parent warning"),),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
client = _build_client(trace_provider)
|
||||
try:
|
||||
response = client.get("/traces/trace-1?format=html&attrs_json=true&ancestor_depth=all")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert response.status_code == 200
|
||||
assert 'href="/traces/trace-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
||||
assert 'href="/traces/root-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
||||
assert 'href="/traces/parent-1?format=html&levels=ERROR%2CWARNING%2CINFO&attrs_json=true&ancestor_depth=all"' in response.text
|
||||
assert response.text.index("root info") < response.text.index("parent warning") < response.text.index("loaded prices")
|
||||
assert "root info" in response.text
|
||||
assert "parent warning" in response.text
|
||||
|
||||
|
||||
def test_trace_endpoint_validates_query_params() -> None:
|
||||
client = _build_client(lambda _trace_id, _request: None)
|
||||
try:
|
||||
invalid_level = client.get("/traces/trace-1?levels=error,fatal")
|
||||
invalid_format = client.get("/traces/trace-1?format=xml")
|
||||
invalid_ancestor_depth = client.get("/traces/trace-1?ancestor_depth=-1")
|
||||
invalid_ancestor_type = client.get("/traces/trace-1?ancestor_depth=up")
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
assert invalid_level.status_code == 400
|
||||
assert invalid_level.json() == {"status": "error", "detail": "unsupported trace levels: FATAL"}
|
||||
assert invalid_format.status_code == 400
|
||||
assert invalid_format.json() == {"status": "error", "detail": "unsupported trace format: xml"}
|
||||
assert invalid_ancestor_depth.status_code == 400
|
||||
assert invalid_ancestor_depth.json() == {
|
||||
"status": "error",
|
||||
"detail": "query parameter must be >= 0: ancestor_depth=-1",
|
||||
}
|
||||
assert invalid_ancestor_type.status_code == 400
|
||||
assert invalid_ancestor_type.json() == {
|
||||
"status": "error",
|
||||
"detail": "invalid ancestor depth query parameter: ancestor_depth=up",
|
||||
}
|
||||
|
||||
|
||||
def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
|
||||
expected = TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="root",
|
||||
child_ids=("child-1",),
|
||||
records=(_trace_record(row_id=1, level="ERROR", message="boom"),),
|
||||
)
|
||||
|
||||
class StubReader:
|
||||
def read_trace(
|
||||
self,
|
||||
trace_id: str,
|
||||
levels: tuple[str, ...],
|
||||
ancestor_depth: int | None = 0,
|
||||
) -> TraceLogView | None:
|
||||
assert trace_id == "trace-1"
|
||||
assert levels == ("ERROR",)
|
||||
assert ancestor_depth is None
|
||||
return expected
|
||||
|
||||
monkeypatch.setattr(runtime_module, "build_trace_log_reader", lambda _transport: StubReader())
|
||||
runtime = RuntimeManager()
|
||||
|
||||
result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",), ancestor_depth=None)))
|
||||
|
||||
assert result == expected
|
||||
|
||||
|
||||
def test_mysql_trace_log_reader_maps_db_rows() -> None:
|
||||
class FakeCursor:
|
||||
def __init__(self) -> None:
|
||||
self.executed: list[tuple[str, tuple[object, ...]]] = []
|
||||
self._current_query = ""
|
||||
|
||||
def execute(self, query: str, params: tuple[object, ...]) -> None:
|
||||
self.executed.append((query, params))
|
||||
self._current_query = query
|
||||
|
||||
def fetchone(self) -> dict[str, object] | None:
|
||||
if self.executed[-1][1] == ("trace-1",):
|
||||
return {"parent_id": "root-77"}
|
||||
if self.executed[-1][1] == ("root-77",):
|
||||
return {"parent_id": None}
|
||||
return None
|
||||
|
||||
def fetchall(self) -> list[dict[str, object]]:
|
||||
if "WHERE parent_id = %s" in self._current_query:
|
||||
return [{"trace_id": "child-1"}, {"trace_id": "child-2"}]
|
||||
return [
|
||||
{
|
||||
"id": 8,
|
||||
"trace_id": "trace-1",
|
||||
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
||||
"step": "parse",
|
||||
"status": "failed",
|
||||
"level": "ERROR",
|
||||
"message": "broken",
|
||||
"attrs_json": '{"attempt":1}',
|
||||
}
|
||||
]
|
||||
|
||||
def __enter__(self) -> FakeCursor:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
return None
|
||||
|
||||
class FakeConnection:
|
||||
def __init__(self, cursor: FakeCursor) -> None:
|
||||
self._cursor = cursor
|
||||
|
||||
def cursor(self) -> FakeCursor:
|
||||
return self._cursor
|
||||
|
||||
def __enter__(self) -> FakeConnection:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
return None
|
||||
|
||||
class FakeConnectionFactory:
|
||||
def __init__(self) -> None:
|
||||
self.cursor = FakeCursor()
|
||||
|
||||
def connect(self) -> FakeConnection:
|
||||
return FakeConnection(self.cursor)
|
||||
|
||||
factory = FakeConnectionFactory()
|
||||
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
|
||||
|
||||
view = reader.read_trace("trace-1", ("ERROR", "WARNING"))
|
||||
|
||||
assert view == TraceLogView(
|
||||
trace_id="trace-1",
|
||||
parent_id="root-77",
|
||||
child_ids=("child-1", "child-2"),
|
||||
records=(
|
||||
TraceLogRecord(
|
||||
id=8,
|
||||
trace_id="trace-1",
|
||||
event_time=datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
||||
step="parse",
|
||||
status="failed",
|
||||
level="ERROR",
|
||||
message="broken",
|
||||
attrs_json={"attempt": 1},
|
||||
),
|
||||
),
|
||||
ancestors=(),
|
||||
)
|
||||
assert len(factory.cursor.executed) == 3
|
||||
assert factory.cursor.executed[1][1] == ("trace-1",)
|
||||
assert factory.cursor.executed[2][1] == ("trace-1", "ERROR", "WARNING")
|
||||
|
||||
|
||||
def test_mysql_trace_log_reader_loads_requested_ancestors() -> None:
|
||||
class FakeCursor:
|
||||
def __init__(self) -> None:
|
||||
self.executed: list[tuple[str, tuple[object, ...]]] = []
|
||||
self._current_query = ""
|
||||
|
||||
def execute(self, query: str, params: tuple[object, ...]) -> None:
|
||||
self.executed.append((query, params))
|
||||
self._current_query = query
|
||||
|
||||
def fetchone(self) -> dict[str, object] | None:
|
||||
if self.executed[-1][1] == ("trace-1",):
|
||||
return {"parent_id": "parent-1"}
|
||||
if self.executed[-1][1] == ("parent-1",):
|
||||
return {"parent_id": "root-1"}
|
||||
if self.executed[-1][1] == ("root-1",):
|
||||
return {"parent_id": None}
|
||||
return None
|
||||
|
||||
def fetchall(self) -> list[dict[str, object]]:
|
||||
if "WHERE parent_id = %s" in self._current_query:
|
||||
parent_id = self.executed[-1][1][0]
|
||||
if parent_id == "trace-1":
|
||||
return []
|
||||
if parent_id == "parent-1":
|
||||
return [{"trace_id": "trace-1"}]
|
||||
if parent_id == "root-1":
|
||||
return [{"trace_id": "parent-1"}]
|
||||
return []
|
||||
trace_id = self.executed[-1][1][0]
|
||||
return [
|
||||
{
|
||||
"id": 8 if trace_id == "trace-1" else 9,
|
||||
"trace_id": trace_id,
|
||||
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
||||
"step": "parse",
|
||||
"status": "failed",
|
||||
"level": "ERROR",
|
||||
"message": f"broken:{trace_id}",
|
||||
"attrs_json": '{"attempt":1}',
|
||||
}
|
||||
]
|
||||
|
||||
def __enter__(self) -> FakeCursor:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
return None
|
||||
|
||||
class FakeConnection:
|
||||
def __init__(self, cursor: FakeCursor) -> None:
|
||||
self._cursor = cursor
|
||||
|
||||
def cursor(self) -> FakeCursor:
|
||||
return self._cursor
|
||||
|
||||
def __enter__(self) -> FakeConnection:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
return None
|
||||
|
||||
class FakeConnectionFactory:
|
||||
def __init__(self) -> None:
|
||||
self.cursor = FakeCursor()
|
||||
|
||||
def connect(self) -> FakeConnection:
|
||||
return FakeConnection(self.cursor)
|
||||
|
||||
factory = FakeConnectionFactory()
|
||||
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
|
||||
|
||||
view = reader.read_trace("trace-1", ("ERROR",), 1)
|
||||
|
||||
assert view is not None
|
||||
assert view.trace_id == "trace-1"
|
||||
assert view.parent_id == "parent-1"
|
||||
assert len(view.ancestors) == 1
|
||||
assert view.ancestors[0].trace_id == "parent-1"
|
||||
assert view.ancestors[0].parent_id == "root-1"
|
||||
assert view.ancestors[0].child_ids == ("trace-1",)
|
||||
|
||||
|
||||
def test_mysql_trace_log_reader_orders_ancestors_root_first() -> None:
|
||||
class FakeCursor:
|
||||
def __init__(self) -> None:
|
||||
self.executed: list[tuple[str, tuple[object, ...]]] = []
|
||||
self._current_query = ""
|
||||
|
||||
def execute(self, query: str, params: tuple[object, ...]) -> None:
|
||||
self.executed.append((query, params))
|
||||
self._current_query = query
|
||||
|
||||
def fetchone(self) -> dict[str, object] | None:
|
||||
if self.executed[-1][1] == ("trace-1",):
|
||||
return {"parent_id": "parent-1"}
|
||||
if self.executed[-1][1] == ("parent-1",):
|
||||
return {"parent_id": "root-1"}
|
||||
if self.executed[-1][1] == ("root-1",):
|
||||
return {"parent_id": None}
|
||||
return None
|
||||
|
||||
def fetchall(self) -> list[dict[str, object]]:
|
||||
if "WHERE parent_id = %s" in self._current_query:
|
||||
parent_id = self.executed[-1][1][0]
|
||||
if parent_id == "root-1":
|
||||
return [{"trace_id": "parent-1"}]
|
||||
if parent_id == "parent-1":
|
||||
return [{"trace_id": "trace-1"}]
|
||||
return []
|
||||
trace_id = self.executed[-1][1][0]
|
||||
return [
|
||||
{
|
||||
"id": 8,
|
||||
"trace_id": trace_id,
|
||||
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
|
||||
"step": "parse",
|
||||
"status": "failed",
|
||||
"level": "ERROR",
|
||||
"message": f"broken:{trace_id}",
|
||||
"attrs_json": '{"attempt":1}',
|
||||
}
|
||||
]
|
||||
|
||||
def __enter__(self) -> FakeCursor:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
return None
|
||||
|
||||
class FakeConnection:
|
||||
def __init__(self, cursor: FakeCursor) -> None:
|
||||
self._cursor = cursor
|
||||
|
||||
def cursor(self) -> FakeCursor:
|
||||
return self._cursor
|
||||
|
||||
def __enter__(self) -> FakeConnection:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb) -> None:
|
||||
return None
|
||||
|
||||
class FakeConnectionFactory:
|
||||
def __init__(self) -> None:
|
||||
self.cursor = FakeCursor()
|
||||
|
||||
def connect(self) -> FakeConnection:
|
||||
return FakeConnection(self.cursor)
|
||||
|
||||
factory = FakeConnectionFactory()
|
||||
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
|
||||
|
||||
view = reader.read_trace("trace-1", ("ERROR",), 2)
|
||||
|
||||
assert view is not None
|
||||
assert tuple(ancestor.trace_id for ancestor in view.ancestors) == ("root-1", "parent-1")
|
||||
Reference in New Issue
Block a user