Compare commits

..

28 Commits

Author SHA1 Message Date
alex e6509ee0cd Единый http сервис на одном порту 2026-05-14 21:08:18 +03:00
alex e2b817f785 INF-3 Поправил шапку с трейс ид 2026-05-04 13:17:22 +03:00
alex 8cad1d00ec INF-3 Правка последовательности выводы сообщений 2026-05-04 11:51:40 +03:00
alex ec3198dbf1 INF-3 Апдейт версии 2026-05-04 11:18:28 +03:00
alex aed12c9c4e INF-3 Добавить параметр для вывода дочерних и родительских трейсов 2026-05-04 11:12:19 +03:00
alex df50e7acbb Скорректировал разделитель в трейс логах 2026-05-03 08:45:33 +03:00
alex 8789fcc0d1 INF-2 Новая версия 2026-05-02 23:45:20 +03:00
alex b2915c3987 Новая версия 2026-05-02 23:45:00 +03:00
alex 2e75e53b89 Поставил HTML по дефолту 2026-05-02 23:41:14 +03:00
alex 3184ff16ca Правки html формата 2026-05-02 23:38:29 +03:00
alex ef8732f079 Доработка формата html 2026-05-02 23:31:57 +03:00
alex cd4d1b3169 ДОработка html формата 2026-05-02 23:24:17 +03:00
alex 62f08776eb Доработка trace html 2026-05-02 23:14:27 +03:00
alex 90422a0c2a Добавлена возможность регистрировать прикладные апи 2026-04-30 13:47:23 +03:00
alex 9eb7282437 Убрал текущий трейс из ссылок 2026-04-30 10:25:35 +03:00
alex 238c65c9c2 Одноколоночный трейс 2026-04-30 10:15:07 +03:00
alex 72162dd050 Апдейт версии 2026-04-30 10:06:06 +03:00
alex fa314bc1e5 html рендер логов 2026-04-30 09:56:42 +03:00
alex a144fd2912 Апдейт версии 2026-04-30 08:05:51 +03:00
alex fc4aeebfca Добавил форматирование вывода логов 2026-04-30 08:05:33 +03:00
alex b2f0716a3b Расширил дефолтный список уровней 2026-04-28 17:40:19 +03:00
alex 72f2d54553 Добавил дочерние trace_ids 2026-04-28 15:48:23 +03:00
alex d1ceef7872 Фикс версии 2026-04-28 15:39:52 +03:00
alex beee0e0e4b Правка формата логов 2026-04-28 15:39:37 +03:00
alex 85fcaae31b API для просмотра логов 2026-04-28 14:57:09 +03:00
alex 2cedacfbe5 Апдейт версии 2026-04-26 20:52:03 +03:00
alex ed33f6e9cd Небольшие доработки по трейсу 2026-04-26 20:47:13 +03:00
alex 314e6f3c46 Гитигнор 2026-04-26 20:36:31 +03:00
34 changed files with 2682 additions and 65 deletions
+3
View File
@@ -1,2 +1,5 @@
__pycache__/ __pycache__/
*.pyc *.pyc
.pytest_cache/
build/
plba.egg-info/
+7
View File
@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
+145 -3
View File
@@ -1,5 +1,29 @@
# PLBA # PLBA
## Установка
Установка пакета напрямую из Git-репозитория через `pip`:
```bash
export GIT_PLBA_TOKEN="<ваш_токен>"
pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git"
```
Если нужен конкретный тег, ветка или commit, добавьте ref после `.git`:
```bash
pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git@main"
```
### Доступ через `GIT_PLBA_TOKEN`
Для установки по HTTPS используется переменная окружения `GIT_PLBA_TOKEN`. В нее нужно положить персональный Git-токен с правом чтения репозитория `alex/plba`. Токен передается в URL установки и позволяет `pip` скачать исходники без SSH-доступа.
Рекомендуется:
- экспортировать токен только в текущую shell-сессию;
- не хардкодить токен в `requirements.txt`, `pyproject.toml` или исходниках;
- использовать отдельный read-only токен, если Git-сервер это поддерживает.
## 1. Назначение платформы ## 1. Назначение платформы
PLBA (`Platform Runtime for Business Applications`) - это runtime-слой для бизнес-приложений, который забирает на себя инфраструктурную часть исполнения. Платформа стандартизирует запуск и остановку рабочих процессов, контроль состояния приложения и эксплуатационные сервисы вокруг них. За счет этого прикладной код концентрируется на бизнес-логике, а не на lifecycle, диагностике и служебных механизмах. Базовая модель использования строится вокруг `ApplicationModule`, `Worker` и прикладной `Routine`, где каждый уровень отвечает за свою часть ответственности. В результате приложение получается предсказуемым в эксплуатации, проще в сопровождении и масштабировании. PLBA (`Platform Runtime for Business Applications`) - это runtime-слой для бизнес-приложений, который забирает на себя инфраструктурную часть исполнения. Платформа стандартизирует запуск и остановку рабочих процессов, контроль состояния приложения и эксплуатационные сервисы вокруг них. За счет этого прикладной код концентрируется на бизнес-логике, а не на lifecycle, диагностике и служебных механизмах. Базовая модель использования строится вокруг `ApplicationModule`, `Worker` и прикладной `Routine`, где каждый уровень отвечает за свою часть ответственности. В результате приложение получается предсказуемым в эксплуатации, проще в сопровождении и масштабировании.
@@ -16,6 +40,7 @@ PLBA (`Platform Runtime for Business Applications`) - это runtime-слой д
- `health` (`HealthRegistry`) - агрегирование здоровья воркеров и дополнительных компонентов. - `health` (`HealthRegistry`) - агрегирование здоровья воркеров и дополнительных компонентов.
- `workflow` (`WorkflowEngine` и persistence-слой) - исполнение шагов бизнес-процесса с переходами и фиксацией состояния. - `workflow` (`WorkflowEngine` и persistence-слой) - исполнение шагов бизнес-процесса с переходами и фиксацией состояния.
- `control plane` (`ControlPlaneService`, `HttpControlChannel`) - внешние health/action endpoints. - `control plane` (`ControlPlaneService`, `HttpControlChannel`) - внешние health/action endpoints.
- `application HTTP` (`ApplicationHttpService`, `HttpApplicationChannel`) - пользовательские HTTP routes бизнес-приложения.
- `queue` (`InMemoryTaskQueue`) - локальный in-memory буфер как утилита прикладного уровня. - `queue` (`InMemoryTaskQueue`) - локальный in-memory буфер как утилита прикладного уровня.
## 3. Архитектура ## 3. Архитектура
@@ -65,6 +90,7 @@ classDiagram
class HealthRegistry class HealthRegistry
class TraceService class TraceService
class ControlPlaneService class ControlPlaneService
class ApplicationHttpService
class WorkflowRuntimeFactory class WorkflowRuntimeFactory
class WorkflowEngine class WorkflowEngine
class WorkflowPersistence class WorkflowPersistence
@@ -82,6 +108,7 @@ classDiagram
RuntimeManager --> TraceService RuntimeManager --> TraceService
RuntimeManager --> WorkerSupervisor RuntimeManager --> WorkerSupervisor
RuntimeManager --> ControlPlaneService RuntimeManager --> ControlPlaneService
RuntimeManager --> ApplicationHttpService
WorkerSupervisor --> Worker WorkerSupervisor --> Worker
Worker --> Routine : invokes Worker --> Routine : invokes
@@ -103,7 +130,8 @@ classDiagram
- Реализация: контракт `app_runtime.contracts.application.ApplicationModule`, регистрация через `app_runtime.core.registration.ModuleRegistry`. - Реализация: контракт `app_runtime.contracts.application.ApplicationModule`, регистрация через `app_runtime.core.registration.ModuleRegistry`.
- Как работает / API / вызовы / таблицы: - Как работает / API / вызовы / таблицы:
- API: `name`, `register(registry)`. - API: `name`, `register(registry)`.
- Типичные вызовы: `registry.add_worker(worker)`, `registry.add_health_contributor(contributor)`. - Типичные вызовы: `registry.add_worker(worker)`, `registry.add_health_contributor(contributor)`.
- Для HTTP-модулей: `registry.add_http_routes(registrar)`.
- `RuntimeManager.register_module()` вызывает `module.register(...)` и добавляет имя модуля в снимок runtime. - `RuntimeManager.register_module()` вызывает `module.register(...)` и добавляет имя модуля в снимок runtime.
- В БД напрямую не пишет. - В БД напрямую не пишет.
- Типовая схема использования: - Типовая схема использования:
@@ -189,6 +217,11 @@ classDiagram
- Для чего нужен: позволяет собрать дерево исполнения и связать все сообщения конкретной бизнес-операции. - Для чего нужен: позволяет собрать дерево исполнения и связать все сообщения конкретной бизнес-операции.
- Атрибуты контекста (`TraceContextRecord`): `trace_id`, `alias`, `parent_id`, `type`, `event_time`, `attrs`. - Атрибуты контекста (`TraceContextRecord`): `trace_id`, `alias`, `parent_id`, `type`, `event_time`, `attrs`.
- Иерархия: `parent_id` указывает на родительский контекст; так строится цепочка root -> child. - Иерархия: `parent_id` указывает на родительский контекст; так строится цепочка root -> child.
- Стандартная модель для бизнес-приложений:
- root-context создается на внешний триггер верхнего уровня, например входящее письмо, webhook, команду оператора или batch-запуск;
- child-context создается на каждую производную единицу работы, например вложение письма, отдельную задачу, заказ или документ;
- child-context должен создаваться через `parent_id=<trace_id root-context>` и передаваться дальше через `trace_context` / runtime metadata как активный trace конкретной бизнес-операции;
- если одно входящее сообщение порождает несколько задач, у них должен быть общий parent root-context и разные child trace contexts.
- Таблица: `trace_contexts`. - Таблица: `trace_contexts`.
- Как объявляется в коде: - Как объявляется в коде:
```python ```python
@@ -199,6 +232,22 @@ with traces.open_context(alias="orders-worker", kind="worker", attrs={"routine":
root = traces.new_root("orders.sync") root = traces.new_root("orders.sync")
child = traces.child_of(root, "orders.process_batch") child = traces.child_of(root, "orders.process_batch")
``` ```
```python
with traces.open_context(alias="email:123", kind="email") as message_trace_id:
...
with traces.open_context(
alias="attachment:invoice.xlsx",
parent_id=message_trace_id,
kind="task",
attrs={"attachment_index": 0},
) as task_trace_id:
runtime["trace_id"] = task_trace_id
runtime["message_trace_id"] = message_trace_id
...
```
- Стандарт для workflow persistence:
- если workflow представляет отдельную бизнес-задачу, в `state.runtime.trace_id` должен лежать активный trace этой задачи;
- дополнительные идентификаторы родительских контекстов (`message_trace_id`, `email_trace_id` и т.д.) могут храниться рядом в runtime для логов и связности, но `workflow_runs.trace_id` должен ссылаться именно на активный trace текущей задачи.
- Trace Message: - Trace Message:
- Что это: событие внутри активного context (статус шага, предупреждение, ошибка, служебная информация). - Что это: событие внутри активного context (статус шага, предупреждение, ошибка, служебная информация).
- Роль `step`: текущая стадия операции (`parse`, `validate`, `persist` и т.д.), которую выставляют через `traces.step("...")`. - Роль `step`: текущая стадия операции (`parse`, `validate`, `persist` и т.д.), которую выставляют через `traces.step("...")`.
@@ -245,6 +294,19 @@ child = traces.child_of(root, "orders.process_batch")
- При создании runtime включить `enable_http_control=True`. - При создании runtime включить `enable_http_control=True`.
- Использовать `/health` для readiness/liveness и `/actions/*` для операционного контроля. - Использовать `/health` для readiness/liveness и `/actions/*` для операционного контроля.
#### Простой Web UI через nginx (порт 15000)
- Статические файлы UI: `web/control-ui/` (`index.html`, `app.js`, `styles.css`).
- UI использует polling (`/api/health` раз в 2 секунды) и кнопки `Start/Stop` (`POST /api/actions/start|stop`).
- Для вызовов API UI передает заголовок `X-Client-Source: web-ui`.
- Пример server-конфига nginx: `deploy/nginx/plba-control-ui.conf`.
- Слушает `15000`.
- Отдает UI из `/usr/share/nginx/html`.
- Проксирует `/api/*` на control API `http://app:8080/*` (sidecar в docker network).
- Пример запуска в составе бизнес-приложения: `deploy/docker-compose.control-ui.yml`.
- Публикуется только один внешний порт: `15000`.
- Внутренний control API остается в сети compose и доступен nginx по имени сервиса `app`.
- В `app` нужно поднять control channel на `0.0.0.0:8080`.
#### Queue #### Queue
- Назначение: простой in-memory буфер задач/сообщений внутри приложения. - Назначение: простой in-memory буфер задач/сообщений внутри приложения.
- Реализация: `InMemoryTaskQueue[T]`. - Реализация: `InMemoryTaskQueue[T]`.
@@ -256,7 +318,87 @@ child = traces.child_of(root, "orders.process_batch")
- Producer в рутине кладет элементы через `put`. - Producer в рутине кладет элементы через `put`.
- Consumer-воркер извлекает через `get(timeout)` и обрабатывает. - Consumer-воркер извлекает через `get(timeout)` и обрабатывает.
## 5. MVP бизнес-приложения ## 5. Application HTTP
`PLBA` поддерживает прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. По умолчанию он отделён от `control plane`, но при необходимости можно опубликовать control routes и business routes через один HTTP channel.
`Control Plane` и `Application HTTP` обслуживают разные контуры:
- `Control Plane` используется для `/health`, `/actions/*`, `/traces/*`.
- `Application HTTP` используется для бизнес-маршрутов приложения, например `/estimate`, `/estimate/api/tasks`, `/api/orders`.
### Основные компоненты
- `ApplicationHttpService` управляет lifecycle прикладного HTTP на уровне runtime.
- `HttpApplicationChannel` поднимает отдельный `FastAPI` app через `uvicorn`.
- `HttpRouteRegistrar` регистрирует пользовательские routes и получает `ServiceContainer`.
- `HttpApplicationAppFactory` собирает `FastAPI(title="PLBA Application API")`, middleware и routes.
- `UnifiedHttpService` собирает один `FastAPI` app с control routes (`/health`, `/actions/*`, `/traces/*`) и application routes.
### Минимальный пример
```python
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import HTMLResponse
from plba import ApplicationModule, HttpApplicationChannel, create_runtime
class DemoRoutes:
def register(self, app: FastAPI, services) -> None:
@app.get("/demo")
async def demo_page():
return HTMLResponse("<h1>Demo</h1>")
@app.post("/demo/api/tasks")
async def create_task(file: UploadFile = File(...)):
payload = await file.read()
return {"filename": file.filename, "size": len(payload)}
class DemoModule(ApplicationModule):
@property
def name(self) -> str:
return "demo"
def register(self, registry) -> None:
registry.add_http_routes(DemoRoutes())
runtime = create_runtime(DemoModule(), config_path="config.yml")
runtime.application_http.register_channel(
HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
)
runtime.start()
```
После старта runtime приложение будет обслуживать:
- `GET /demo`
- `POST /demo/api/tasks`
### Единый HTTP API
Если бизнес-приложению нужен один опубликованный порт для control и application routes, передайте `UnifiedHttpService` как `application_http`:
```python
from plba import HttpApplicationChannel, RuntimeManager, UnifiedHttpService
runtime = RuntimeManager(application_http=UnifiedHttpService())
runtime.application_http.register_channel(
HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5)
)
```
Старый режим с отдельными `ControlPlaneService` и `ApplicationHttpService` остаётся доступен и работает как раньше.
### Типовые сценарии
- `Web UI для фоновых задач`: список задач, запуск, статус, cancel, download result.
- `Внутренний REST API`: например `POST /jobs`, `GET /jobs/{id}`, `POST /jobs/{id}/cancel`.
- `Файловый шлюз`: загрузка входного файла, асинхронная обработка через worker, скачивание результата.
- `Webhook endpoint`: прием callback от внешней системы и передача события в worker pipeline.
### Ограничения и рекомендации
- По умолчанию держите business routes и `control plane` раздельно; используйте `UnifiedHttpService`, когда один опубликованный порт является явным эксплуатационным требованием.
- Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер.
- Runtime предоставляет доступ к зависимостям через `services`, поэтому handlers не должны зависеть от `RuntimeManager`.
- В первой версии `Application HTTP` не решает auth, static files, шаблонизаторы, websocket и OpenAPI customization.
- Для публичного доступа выносите auth, TLS и reverse proxy на внешний ingress.
## 6. MVP бизнес-приложения
Минимальная конфигурация запуска: Минимальная конфигурация запуска:
1. Создать один `ApplicationModule`. 1. Создать один `ApplicationModule`.
2. В модуле собрать одну `Routine` и один `Worker` (1 worker -> 1 routine). 2. В модуле собрать одну `Routine` и один `Worker` (1 worker -> 1 routine).
@@ -316,4 +458,4 @@ runtime = create_runtime(DemoModule(), config_path="config.yml")
runtime.start() runtime.start()
``` ```
Для production-сценария после MVP обычно добавляют `tracing`, `health contributors`, `workflow` и HTTP control plane, но базовый запуск не требует этих расширений. Для production-сценария после MVP обычно добавляют `tracing`, `health contributors`, `workflow`, HTTP control plane и при необходимости `application HTTP`, но базовый запуск не требует этих расширений.
+3 -2
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "plba" name = "plba"
version = "0.2.7" version = "0.4.0"
description = "Platform runtime for business applications" description = "Platform runtime for business applications"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
@@ -12,9 +12,10 @@ dependencies = [
"fastapi>=0.129.0", "fastapi>=0.129.0",
"PyMySQL>=1.1", "PyMySQL>=1.1",
"PyYAML>=6.0.3", "PyYAML>=6.0.3",
"python-multipart>=0.0.9",
"uvicorn>=0.41.0", "uvicorn>=0.41.0",
] ]
[tool.setuptools.packages.find] [tool.setuptools.packages.find]
where = ["src"] where = ["src"]
+45
View File
@@ -58,3 +58,48 @@ class TraceTransport(Protocol):
def write_message(self, record: TraceLogMessage) -> None: def write_message(self, record: TraceLogMessage) -> None:
"""Persist trace log message.""" """Persist trace log message."""
@dataclass(frozen=True)
class TraceLogRecord:
id: int
trace_id: str
event_time: datetime
step: str
status: str
level: TraceLevel
message: str
attrs_json: Any
def as_dict(self, *, include_attrs_json: bool) -> dict[str, Any]:
payload: dict[str, Any] = {
"id": self.id,
"trace_id": self.trace_id,
"event_time": self.event_time.isoformat(),
"step": self.step,
"status": self.status,
"level": self.level,
"message": self.message,
}
if include_attrs_json:
payload["attrs_json"] = self.attrs_json
return payload
@dataclass(frozen=True)
class TraceLogView:
trace_id: str
parent_id: str | None
child_ids: tuple[str, ...] = ()
records: tuple[TraceLogRecord, ...] = ()
ancestors: tuple[TraceLogView, ...] = ()
class TraceLogReader(Protocol):
def read_trace(
self,
trace_id: str,
levels: tuple[TraceLevel, ...],
ancestor_depth: int | None = 0,
) -> TraceLogView | None:
"""Load trace context and filtered log records."""
+2 -2
View File
@@ -1,5 +1,5 @@
from app_runtime.control.base import ControlActionSet, ControlChannel from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel
from app_runtime.control.http_channel import HttpControlChannel from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.control.service import ControlPlaneService from app_runtime.control.service import ControlPlaneService
__all__ = ["ControlActionSet", "ControlChannel", "ControlPlaneService", "HttpControlChannel"] __all__ = ["ControlActionRequest", "ControlActionSet", "ControlChannel", "ControlPlaneService", "HttpControlChannel"]
@@ -0,0 +1,46 @@
from __future__ import annotations
import asyncio
from fastapi.responses import JSONResponse
from app_runtime.control.base import ControlActionRequest, ControlActionSet
class ControlActionResponder:
def __init__(self, actions: ControlActionSet, timeout: int) -> None:
self._actions = actions
self._timeout = timeout
async def respond(
self,
action: str,
_client_source: str = "unknown",
request: ControlActionRequest | None = None,
) -> JSONResponse:
callbacks = {
"start": self._actions.start,
"stop": self._actions.stop,
"status": self._actions.status,
}
callback = callbacks.get(action)
if callback is None:
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
action_request = request or ControlActionRequest()
action_timeout = self._action_timeout(action, action_request)
try:
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
except asyncio.TimeoutError:
return JSONResponse(
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
status_code=202,
)
except Exception as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
if action != "stop" or request.wait is False or request.timeout is None:
return base_timeout
return max(base_timeout, float(request.timeout) + 1.0)
+25 -1
View File
@@ -3,11 +3,34 @@ from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from dataclasses import dataclass from dataclasses import dataclass
from typing import Literal
from app_runtime.contracts.trace import TraceLevel, TraceLogView
from app_runtime.core.types import HealthPayload from app_runtime.core.types import HealthPayload
ActionHandler = Callable[[], Awaitable[str]]
@dataclass(slots=True)
class ControlActionRequest:
wait: bool | None = None
timeout: float | None = None
force: bool | None = None
ActionResult = str | dict[str, object]
ActionHandler = Callable[[ControlActionRequest], Awaitable[ActionResult]]
HealthHandler = Callable[[], Awaitable[HealthPayload]] HealthHandler = Callable[[], Awaitable[HealthPayload]]
TraceResponseFormat = Literal["json", "text", "html"]
@dataclass(slots=True)
class TraceQueryRequest:
levels: tuple[TraceLevel, ...] = ("ERROR", "WARNING", "INFO")
include_attrs_json: bool = False
response_format: TraceResponseFormat = "html"
ancestor_depth: int | None = 0
TraceLookupHandler = Callable[[str, TraceQueryRequest], Awaitable[TraceLogView]]
@dataclass(slots=True) @dataclass(slots=True)
@@ -16,6 +39,7 @@ class ControlActionSet:
start: ActionHandler start: ActionHandler
stop: ActionHandler stop: ActionHandler
status: ActionHandler status: ActionHandler
trace_lookup: TraceLookupHandler | None = None
class ControlChannel(ABC): class ControlChannel(ABC):
+81 -3
View File
@@ -1,19 +1,35 @@
from __future__ import annotations from __future__ import annotations
import logging
import time import time
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from fastapi import FastAPI, Request from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from app_runtime.control.base import ControlActionRequest, TraceQueryRequest
from app_runtime.control.trace_presenter import TraceRequestParser, TraceResponseRenderer
from app_runtime.contracts.trace import TraceLogView
from app_runtime.core.types import HealthPayload from app_runtime.core.types import HealthPayload
LOGGER = logging.getLogger(__name__)
class HttpControlAppFactory: class HttpControlAppFactory:
def __init__(
self,
*,
trace_request_parser: TraceRequestParser | None = None,
trace_response_renderer: TraceResponseRenderer | None = None,
) -> None:
self._trace_request_parser = trace_request_parser or TraceRequestParser()
self._trace_response_renderer = trace_response_renderer or TraceResponseRenderer()
def create( def create(
self, self,
health_provider: Callable[[], Awaitable[HealthPayload]], health_provider: Callable[[], Awaitable[HealthPayload]],
action_provider: Callable[[str], Awaitable[JSONResponse]], action_provider: Callable[[str, str, ControlActionRequest], Awaitable[JSONResponse]],
trace_provider: Callable[[str, TraceQueryRequest], Awaitable[TraceLogView]] | None = None,
) -> FastAPI: ) -> FastAPI:
app = FastAPI(title="PLBA Control API") app = FastAPI(title="PLBA Control API")
@@ -32,7 +48,69 @@ class HttpControlAppFactory:
@app.get("/actions/{action}") @app.get("/actions/{action}")
@app.post("/actions/{action}") @app.post("/actions/{action}")
async def action(action: str) -> JSONResponse: async def action(action: str, request: Request) -> JSONResponse:
return await action_provider(action) client_source = self._client_source(request)
if action in {"start", "stop"}:
LOGGER.warning("Control action requested: /actions/%s client=%s", action, client_source)
try:
action_request = self._action_request(request)
except ValueError as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=400)
return await action_provider(action, client_source, action_request)
@app.get("/traces/{traceid}")
async def trace(traceid: str, request: Request):
if trace_provider is None:
return JSONResponse(content={"status": "error", "detail": "trace lookup is not configured"}, status_code=503)
try:
trace_request = self._trace_request_parser.parse(request)
except ValueError as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=400)
try:
payload = await trace_provider(traceid, trace_request)
except KeyError:
return JSONResponse(content={"status": "error", "detail": f"trace not found: {traceid}"}, status_code=404)
except RuntimeError as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=503)
return self._trace_response_renderer.render(payload, trace_request)
return app return app
def _action_request(self, request: Request) -> ControlActionRequest:
return ControlActionRequest(
wait=self._bool_param(request, "wait"),
timeout=self._float_param(request, "timeout"),
force=self._bool_param(request, "force"),
)
def _client_source(self, request: Request) -> str:
explicit_header = request.headers.get("X-Client-Source", "").strip()
if explicit_header:
return explicit_header
user_agent = request.headers.get("User-Agent", "").strip()
if user_agent:
return f"user-agent:{user_agent}"
return "unknown"
def _bool_param(self, request: Request, name: str) -> bool | None:
raw_value = request.query_params.get(name)
if raw_value is None:
return None
normalized = raw_value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
raise ValueError(f"invalid boolean query parameter: {name}={raw_value}")
def _float_param(self, request: Request, name: str) -> float | None:
raw_value = request.query_params.get(name)
if raw_value is None:
return None
try:
value = float(raw_value)
except ValueError as exc:
raise ValueError(f"invalid numeric query parameter: {name}={raw_value}") from exc
if value < 0:
raise ValueError(f"query parameter must be >= 0: {name}={raw_value}")
return value
+28 -24
View File
@@ -4,7 +4,9 @@ import asyncio
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from app_runtime.control.base import ControlActionSet, ControlChannel from app_runtime.control.action_responder import ControlActionResponder
from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel, TraceQueryRequest
from app_runtime.contracts.trace import TraceLogView
from app_runtime.control.http_app import HttpControlAppFactory from app_runtime.control.http_app import HttpControlAppFactory
from app_runtime.control.http_runner import UvicornThreadRunner from app_runtime.control.http_runner import UvicornThreadRunner
@@ -15,10 +17,12 @@ class HttpControlChannel(ControlChannel):
self._runner = UvicornThreadRunner(host, port, timeout) self._runner = UvicornThreadRunner(host, port, timeout)
self._factory = HttpControlAppFactory() self._factory = HttpControlAppFactory()
self._actions: ControlActionSet | None = None self._actions: ControlActionSet | None = None
self._action_responder: ControlActionResponder | None = None
async def start(self, actions: ControlActionSet) -> None: async def start(self, actions: ControlActionSet) -> None:
self._actions = actions self._actions = actions
app = self._factory.create(self._health_response, self._action_response) self._action_responder = ControlActionResponder(actions, self._timeout)
app = self._factory.create(self._health_response, self._action_response, self._trace_response)
await self._runner.start(app) await self._runner.start(app)
async def stop(self) -> None: async def stop(self) -> None:
@@ -33,25 +37,25 @@ class HttpControlChannel(ControlChannel):
return {"status": "unhealthy", "detail": "control actions are not configured"} return {"status": "unhealthy", "detail": "control actions are not configured"}
return await asyncio.wait_for(self._actions.health(), timeout=float(self._timeout)) return await asyncio.wait_for(self._actions.health(), timeout=float(self._timeout))
async def _action_response(self, action: str) -> JSONResponse: async def _action_response(
if self._actions is None: self,
return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404) action: str,
callbacks = { _client_source: str = "unknown",
"start": self._actions.start, request: ControlActionRequest | None = None,
"stop": self._actions.stop, ) -> JSONResponse:
"status": self._actions.status, if self._action_responder is None:
} if self._actions is None:
callback = callbacks.get(action) return JSONResponse(
if callback is None: content={"status": "error", "detail": f"{action} handler is not configured"},
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404) status_code=404,
action_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout) )
try: self._action_responder = ControlActionResponder(self._actions, self._timeout)
detail = await asyncio.wait_for(callback(), timeout=action_timeout) return await self._action_responder.respond(action, _client_source, request)
except asyncio.TimeoutError:
return JSONResponse( async def _trace_response(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView:
content={"status": "accepted", "detail": f"{action} operation is still in progress"}, if self._actions is None or self._actions.trace_lookup is None:
status_code=202, raise RuntimeError("trace lookup is not configured")
) return await asyncio.wait_for(
except Exception as exc: self._actions.trace_lookup(trace_id, request),
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) timeout=float(self._timeout),
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200) )
+3 -2
View File
@@ -8,10 +8,11 @@ from uvicorn import Config, Server
class UvicornThreadRunner: class UvicornThreadRunner:
def __init__(self, host: str, port: int, timeout: int) -> None: def __init__(self, host: str, port: int, timeout: int, *, thread_name: str = "plba-http-control") -> None:
self._host = host self._host = host
self._port = port self._port = port
self._timeout = timeout self._timeout = timeout
self._thread_name = thread_name
self._server: Server | None = None self._server: Server | None = None
self._thread: Thread | None = None self._thread: Thread | None = None
self._error: BaseException | None = None self._error: BaseException | None = None
@@ -22,7 +23,7 @@ class UvicornThreadRunner:
self._error = None self._error = None
config = Config(app=app, host=self._host, port=self._port, log_level="warning") config = Config(app=app, host=self._host, port=self._port, log_level="warning")
self._server = Server(config) self._server = Server(config)
self._thread = Thread(target=self._serve, name="plba-http-control", daemon=True) self._thread = Thread(target=self._serve, name=self._thread_name, daemon=True)
self._thread.start() self._thread.start()
await self._wait_until_started() await self._wait_until_started()
+1
View File
@@ -43,6 +43,7 @@ class ControlPlaneService:
start=runtime.start_runtime, start=runtime.start_runtime,
stop=runtime.stop_runtime, stop=runtime.stop_runtime,
status=runtime.runtime_status, status=runtime.runtime_status,
trace_lookup=runtime.trace_logs,
) )
for channel in self._channels: for channel in self._channels:
await channel.start(actions) await channel.start(actions)
+293
View File
@@ -0,0 +1,293 @@
from __future__ import annotations
import json
from html import escape
from urllib.parse import urlencode
from fastapi import Request
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, Response
from app_runtime.control.base import TraceQueryRequest
from app_runtime.contracts.trace import TraceLevel, TraceLogRecord, TraceLogView
TRACE_SECTION_SEPARATOR = "=" * 30
class TraceRequestParser:
def parse(self, request: Request) -> TraceQueryRequest:
raw_levels = request.query_params.get("levels")
raw_format = request.query_params.get("format", "html")
response_format = raw_format.strip().lower()
if response_format not in {"json", "text", "html"}:
raise ValueError(f"unsupported trace format: {raw_format}")
return TraceQueryRequest(
levels=self._trace_levels(raw_levels),
include_attrs_json=self._bool_param(request, "attrs_json") or False,
response_format=response_format,
ancestor_depth=self._ancestor_depth(request),
)
def _trace_levels(self, raw_levels: str | None) -> tuple[TraceLevel, ...]:
if raw_levels is None:
return ("ERROR", "WARNING", "INFO")
parts = [item.strip().upper() for item in raw_levels.split(",")]
levels = tuple(item for item in parts if item)
if not levels:
raise ValueError("trace levels must not be empty")
unsupported = [level for level in levels if level not in {"DEBUG", "INFO", "WARNING", "ERROR"}]
if unsupported:
raise ValueError(f"unsupported trace levels: {', '.join(unsupported)}")
return levels
def _bool_param(self, request: Request, name: str) -> bool | None:
raw_value = request.query_params.get(name)
if raw_value is None:
return None
normalized = raw_value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
raise ValueError(f"invalid boolean query parameter: {name}={raw_value}")
def _ancestor_depth(self, request: Request) -> int | None:
raw_value = request.query_params.get("ancestor_depth")
if raw_value is None:
return 0
normalized = raw_value.strip().lower()
if normalized == "all":
return None
try:
value = int(normalized)
except ValueError as exc:
raise ValueError(f"invalid ancestor depth query parameter: ancestor_depth={raw_value}") from exc
if value < 0:
raise ValueError(f"query parameter must be >= 0: ancestor_depth={raw_value}")
return value
class TraceResponseRenderer:
def render(self, trace_view: TraceLogView, request: TraceQueryRequest) -> Response:
if request.response_format == "json":
return self._render_json(trace_view, request)
if request.response_format == "html":
return self._render_html(trace_view, request)
return self._render_text(trace_view, request)
def _render_json(self, trace_view: TraceLogView, request: TraceQueryRequest) -> JSONResponse:
return JSONResponse(
content={
"trace_id": trace_view.trace_id,
"parent_id": trace_view.parent_id or "",
"child_ids": list(trace_view.child_ids),
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
"ancestors": [self._trace_payload(view, request) for view in trace_view.ancestors],
}
)
def _render_text(self, trace_view: TraceLogView, request: TraceQueryRequest) -> PlainTextResponse:
lineage = [*trace_view.ancestors, trace_view]
lines = self._text_trace_summary_lines(trace_view)
for index, entry in enumerate(lineage):
if index == 0:
lines.append("")
else:
lines.extend(["", ""])
lines.extend(self._text_trace_log_lines(entry, request))
return PlainTextResponse(content="\n".join(lines))
def _render_html(self, trace_view: TraceLogView, request: TraceQueryRequest) -> HTMLResponse:
title = escape(f"Trace {trace_view.trace_id}")
lines = self._html_lines(trace_view, request)
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>{title}</title>
<style>
:root {{
color-scheme: dark;
--bg: #000000;
--fg: #ececec;
--step: #ffffff;
--link: #66d9ef;
--error: #ff817d;
--warning: #e9ebec;
--info: #d6d7d9;
--other: #ececec;
}}
body {{
margin: 0;
background: var(--bg);
color: var(--fg);
font: 13px/1.1 "SFMono-Regular", monospace;
}}
.page {{
padding: 14px 16px 24px;
}}
a {{
color: var(--link);
text-decoration: underline;
text-underline-offset: 2px;
}}
.line {{
white-space: pre-wrap;
word-break: break-word;
}}
.msg-error {{
color: var(--error);
}}
.msg-warning {{
color: var(--warning);
}}
.msg-info {{
color: var(--info);
}}
.msg-debug {{
color: var(--other);
}}
@media (max-width: 640px) {{
.page {{
padding: 12px 12px 20px;
}}
}}
</style>
</head>
<body>
<div class="page">
{lines}
</div>
</body>
</html>"""
return HTMLResponse(content=html)
def _child_id_lines(self, child_ids: tuple[str, ...]) -> list[str]:
lines = ["child_ids:"]
lines.extend(f" - {child_id}" for child_id in child_ids)
return lines
def _text_message(self, record: TraceLogRecord, include_attrs_json: bool) -> str:
if not include_attrs_json:
return record.message
return f"{record.message}, {json.dumps(record.attrs_json, ensure_ascii=False, separators=(',', ':'))}"
def _trace_href(self, trace_id: str, request: TraceQueryRequest) -> str:
params = {
"format": "html",
"levels": ",".join(request.levels),
"attrs_json": "true" if request.include_attrs_json else "false",
}
if request.ancestor_depth is None:
params["ancestor_depth"] = "all"
elif request.ancestor_depth > 0:
params["ancestor_depth"] = str(request.ancestor_depth)
query = urlencode(params)
return f"/traces/{trace_id}?{query}"
def _html_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> str:
lineage = [*trace_view.ancestors, trace_view]
lines = self._html_trace_summary_lines(trace_view, request)
for index, entry in enumerate(lineage):
if index == 0:
lines.append(self._html_plain_line(""))
else:
lines.extend([self._html_plain_line(""), self._html_plain_line("")])
lines.extend(self._html_trace_log_lines(entry, request))
return "".join(lines)
def _trace_payload(self, trace_view: TraceLogView, request: TraceQueryRequest) -> dict[str, object]:
return {
"trace_id": trace_view.trace_id,
"parent_id": trace_view.parent_id or "",
"child_ids": list(trace_view.child_ids),
"messages": [record.as_dict(include_attrs_json=request.include_attrs_json) for record in trace_view.records],
}
def _text_trace_summary_lines(self, trace_view: TraceLogView) -> list[str]:
return [
f"trace_id: {trace_view.trace_id}",
f"parent_id: {trace_view.parent_id or ''}",
*self._child_id_lines(trace_view.child_ids),
]
def _text_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
lines = [
TRACE_SECTION_SEPARATOR,
f"trace_id: {trace_view.trace_id}",
"",
]
previous_step: str | None = None
for record in trace_view.records:
current_step = str(record.step or "")
if previous_step is None:
lines.append(f"step: {current_step}")
elif current_step != previous_step:
lines.append("------------------------------")
lines.append(f"step: {current_step}")
previous_step = current_step
lines.append(self._text_message(record, request.include_attrs_json))
return lines
def _html_trace_summary_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
return [
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
self._html_plain_line(f"parent_id: {self._optional_trace_link(trace_view.parent_id, request)}"),
self._html_plain_line("child_ids:"),
*(self._html_plain_line(f" - {self._trace_link(child_id, request)}") for child_id in trace_view.child_ids),
]
def _html_trace_log_lines(self, trace_view: TraceLogView, request: TraceQueryRequest) -> list[str]:
lines = [
self._html_plain_line(TRACE_SECTION_SEPARATOR),
self._html_plain_line(f"trace_id: {self._trace_link(trace_view.trace_id, request)}"),
self._html_plain_line(""),
]
previous_step: str | None = None
for record in trace_view.records:
current_step = str(record.step or "")
if previous_step is None:
lines.append(self._html_step_line(current_step))
lines.append(self._html_plain_line(""))
elif current_step != previous_step:
lines.append(self._html_plain_line(""))
lines.append(self._html_plain_line("------------------------------"))
lines.append(self._html_step_line(current_step))
lines.append(self._html_plain_line(""))
previous_step = current_step
lines.extend(self._html_message_lines(record, request.include_attrs_json))
return lines
def _html_message_lines(self, record: TraceLogRecord, include_attrs_json: bool) -> list[str]:
lines = [self._html_colored_line(self._text_message(record, include_attrs_json), record.level)]
return lines
def _html_plain_line(self, content: str) -> str:
return f"<div class=\"line\">{content or '&nbsp;'}</div>"
def _html_step_line(self, content: str) -> str:
return f"<div class=\"line\" style=\"color: var(--step);\">{escape(content) or '&nbsp;'}</div>"
def _html_colored_line(self, content: str, level: str) -> str:
level_class = self._level_class(level)
return f"<div class=\"line {level_class}\">{escape(content)}</div>"
def _trace_link(self, trace_id: str, request: TraceQueryRequest) -> str:
href = escape(self._trace_href(trace_id, request), quote=True)
text = escape(trace_id)
return f"<a href=\"{href}\">{text}</a>"
def _optional_trace_link(self, trace_id: str | None, request: TraceQueryRequest) -> str:
if not trace_id:
return ""
return self._trace_link(trace_id, request)
def _level_class(self, level: str) -> str:
if level == "ERROR":
return "msg-error"
if level == "WARNING":
return "msg-warning"
if level == "INFO":
return "msg-info"
return "msg-debug"
+5
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
from app_runtime.contracts.health import HealthContributor from app_runtime.contracts.health import HealthContributor
from app_runtime.contracts.worker import Worker from app_runtime.contracts.worker import Worker
from app_runtime.core.service_container import ServiceContainer from app_runtime.core.service_container import ServiceContainer
from app_runtime.http.base import HttpRouteRegistrar
class ModuleRegistry: class ModuleRegistry:
@@ -10,6 +11,7 @@ class ModuleRegistry:
self.services = services self.services = services
self.workers: list[Worker] = [] self.workers: list[Worker] = []
self.health_contributors: list[HealthContributor] = [] self.health_contributors: list[HealthContributor] = []
self.http_route_registrars: list[HttpRouteRegistrar] = []
self.modules: list[str] = [] self.modules: list[str] = []
def register_module(self, name: str) -> None: def register_module(self, name: str) -> None:
@@ -20,3 +22,6 @@ class ModuleRegistry:
def add_health_contributor(self, contributor: HealthContributor) -> None: def add_health_contributor(self, contributor: HealthContributor) -> None:
self.health_contributors.append(contributor) self.health_contributors.append(contributor)
def add_http_routes(self, registrar: HttpRouteRegistrar) -> None:
self.http_route_registrars.append(registrar)
+38 -7
View File
@@ -4,13 +4,18 @@ from time import monotonic, sleep
from app_runtime.config.providers import FileConfigProvider from app_runtime.config.providers import FileConfigProvider
from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.application import ApplicationModule
from app_runtime.control.base import ControlActionRequest
from app_runtime.control.base import TraceQueryRequest
from app_runtime.contracts.trace import TraceLogView
from app_runtime.control.service import ControlPlaneService from app_runtime.control.service import ControlPlaneService
from app_runtime.core.configuration import ConfigurationManager from app_runtime.core.configuration import ConfigurationManager
from app_runtime.core.registration import ModuleRegistry from app_runtime.core.registration import ModuleRegistry
from app_runtime.core.service_container import ServiceContainer from app_runtime.core.service_container import ServiceContainer
from app_runtime.core.types import HealthPayload, LifecycleState from app_runtime.core.types import HealthPayload, LifecycleState
from app_runtime.health.registry import HealthRegistry from app_runtime.health.registry import HealthRegistry
from app_runtime.http.service import ApplicationHttpService
from app_runtime.logging.manager import LogManager from app_runtime.logging.manager import LogManager
from app_runtime.tracing.reader import build_trace_log_reader
from app_runtime.tracing.service import TraceService from app_runtime.tracing.service import TraceService
from app_runtime.workers.supervisor import WorkerSupervisor from app_runtime.workers.supervisor import WorkerSupervisor
@@ -28,6 +33,7 @@ class RuntimeManager:
logs: LogManager | None = None, logs: LogManager | None = None,
workers: WorkerSupervisor | None = None, workers: WorkerSupervisor | None = None,
control_plane: ControlPlaneService | None = None, control_plane: ControlPlaneService | None = None,
application_http: ApplicationHttpService | None = None,
) -> None: ) -> None:
self.configuration = configuration or ConfigurationManager() self.configuration = configuration or ConfigurationManager()
self.services = services or ServiceContainer() self.services = services or ServiceContainer()
@@ -36,6 +42,7 @@ class RuntimeManager:
self.logs = logs or LogManager() self.logs = logs or LogManager()
self.workers = workers or WorkerSupervisor() self.workers = workers or WorkerSupervisor()
self.control_plane = control_plane or ControlPlaneService() self.control_plane = control_plane or ControlPlaneService()
self.application_http = application_http or ApplicationHttpService()
self.registry = ModuleRegistry(self.services) self.registry = ModuleRegistry(self.services)
self._started = False self._started = False
self._state = LifecycleState.IDLE self._state = LifecycleState.IDLE
@@ -63,6 +70,8 @@ class RuntimeManager:
self.workers.start() self.workers.start()
if start_control_plane: if start_control_plane:
self.control_plane.start(self) self.control_plane.start(self)
self._register_application_http_routes()
self.application_http.start(self)
self._started = True self._started = True
self._refresh_state() self._refresh_state()
@@ -71,6 +80,7 @@ class RuntimeManager:
return return
self._state = LifecycleState.STOPPING self._state = LifecycleState.STOPPING
self.workers.stop(timeout=timeout, force=force) self.workers.stop(timeout=timeout, force=force)
self.application_http.stop()
if stop_control_plane: if stop_control_plane:
self.control_plane.stop() self.control_plane.stop()
self._started = False self._started = False
@@ -87,7 +97,7 @@ class RuntimeManager:
async def health_status(self) -> HealthPayload: async def health_status(self) -> HealthPayload:
return self.current_health() return self.current_health()
async def start_runtime(self) -> dict[str, object] | str: async def start_runtime(self, _request: ControlActionRequest) -> dict[str, object] | str:
self._refresh_state() self._refresh_state()
if self._started: if self._started:
return "runtime already running" return "runtime already running"
@@ -100,27 +110,42 @@ class RuntimeManager:
return self._action_detail("runtime started", timed_out=False) return self._action_detail("runtime started", timed_out=False)
return self._action_detail("runtime start is still in progress", timed_out=True) return self._action_detail("runtime start is still in progress", timed_out=True)
async def stop_runtime(self) -> dict[str, object] | str: async def stop_runtime(self, request: ControlActionRequest) -> dict[str, object] | str:
self._refresh_state() self._refresh_state()
if not self._started: if not self._started:
if self._state == LifecycleState.STOPPING: if self._state == LifecycleState.STOPPING:
return self._action_detail("runtime stop is still in progress", timed_out=True) return self._action_detail("runtime stop is still in progress", timed_out=True)
return "runtime already stopped" return "runtime already stopped"
wait = True if request.wait is None else request.wait
timeout = self.ACTION_TIMEOUT_SECONDS if request.timeout is None else float(request.timeout)
force = False if request.force is None else request.force
self._state = LifecycleState.STOPPING self._state = LifecycleState.STOPPING
try: try:
self.workers.stop(timeout=self.ACTION_TIMEOUT_SECONDS, force=False) self.workers.stop(timeout=timeout, force=force, wait=wait)
except TimeoutError: except TimeoutError:
return self._action_detail("runtime stop is still in progress", timed_out=True) return self._action_detail("runtime stop is still in progress", timed_out=True)
self._started = False self.application_http.stop()
self._state = LifecycleState.STOPPED self._refresh_state()
return self._action_detail("runtime stopped", timed_out=False) if self._state == LifecycleState.STOPPED:
self._started = False
return self._action_detail("runtime stopped", timed_out=False)
return self._action_detail("runtime stop requested", timed_out=False)
async def runtime_status(self) -> str: async def runtime_status(self, _request: ControlActionRequest) -> str:
self._refresh_state() self._refresh_state()
return self._state.value return self._state.value
async def trace_logs(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView:
reader = build_trace_log_reader(self.traces.transport)
if reader is None:
raise RuntimeError("trace log reader is not configured")
trace_view = reader.read_trace(trace_id, request.levels, request.ancestor_depth)
if trace_view is None:
raise KeyError(trace_id)
return trace_view
def _register_core_services(self) -> None: def _register_core_services(self) -> None:
if self._core_registered: if self._core_registered:
return return
@@ -130,6 +155,7 @@ class RuntimeManager:
self.services.register("logs", self.logs) self.services.register("logs", self.logs)
self.services.register("workers", self.workers) self.services.register("workers", self.workers)
self.services.register("control_plane", self.control_plane) self.services.register("control_plane", self.control_plane)
self.services.register("application_http", self.application_http)
self._core_registered = True self._core_registered = True
def _register_health_contributors(self) -> None: def _register_health_contributors(self) -> None:
@@ -143,6 +169,11 @@ class RuntimeManager:
self.workers.register(worker) self.workers.register(worker)
self._workers_registered = True self._workers_registered = True
def _register_application_http_routes(self) -> None:
for registrar in self.registry.http_route_registrars:
self.application_http.register_routes(registrar)
self.registry.http_route_registrars.clear()
def _refresh_state(self) -> None: def _refresh_state(self) -> None:
lifecycle = self.workers.lifecycle_state() lifecycle = self.workers.lifecycle_state()
+14
View File
@@ -0,0 +1,14 @@
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
from app_runtime.http.http_channel import HttpApplicationChannel
from app_runtime.http.service import ApplicationHttpService
from app_runtime.http.unified_service import UnifiedHttpService
__all__ = [
"ApplicationHttpChannel",
"ApplicationHttpService",
"HttpApplicationAppFactory",
"HttpApplicationChannel",
"HttpRouteRegistrar",
"UnifiedHttpService",
]
+23
View File
@@ -0,0 +1,23 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Protocol
from fastapi import FastAPI
from app_runtime.core.service_container import ServiceContainer
class HttpRouteRegistrar(Protocol):
def register(self, app: FastAPI, services: ServiceContainer) -> None:
"""Register application routes on the provided FastAPI app."""
class ApplicationHttpChannel(ABC):
@abstractmethod
async def start(self, app: FastAPI) -> None:
"""Start the HTTP channel with the prepared application app."""
@abstractmethod
async def stop(self) -> None:
"""Stop the HTTP channel and release resources."""
+37
View File
@@ -0,0 +1,37 @@
from __future__ import annotations
import logging
import time
from collections.abc import Iterable
from fastapi import FastAPI, Request
from app_runtime.core.service_container import ServiceContainer
from app_runtime.http.base import HttpRouteRegistrar
LOGGER = logging.getLogger(__name__)
class HttpApplicationAppFactory:
def create(self, registrars: Iterable[HttpRouteRegistrar], services: ServiceContainer) -> FastAPI:
app = FastAPI(title="PLBA Application API")
self._register_middleware(app)
for registrar in registrars:
registrar.register(app, services)
return app
def _register_middleware(self, app: FastAPI) -> None:
@app.middleware("http")
async def track_request(request: Request, call_next): # type: ignore[no-untyped-def]
started = time.monotonic()
response = await call_next(request)
duration_ms = int((time.monotonic() - started) * 1000)
response.headers["X-Response-Time-Ms"] = str(duration_ms)
LOGGER.info(
"Application HTTP request handled: method=%s path=%s status=%s duration_ms=%s",
request.method,
request.url.path,
response.status_code,
duration_ms,
)
return response
+21
View File
@@ -0,0 +1,21 @@
from __future__ import annotations
from fastapi import FastAPI
from app_runtime.control.http_runner import UvicornThreadRunner
from app_runtime.http.base import ApplicationHttpChannel
class HttpApplicationChannel(ApplicationHttpChannel):
def __init__(self, host: str, port: int, timeout: int) -> None:
self._runner = UvicornThreadRunner(host, port, timeout, thread_name="plba-http-application")
async def start(self, app: FastAPI) -> None:
await self._runner.start(app)
async def stop(self) -> None:
await self._runner.stop()
@property
def port(self) -> int:
return self._runner.port
+42
View File
@@ -0,0 +1,42 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
if TYPE_CHECKING:
from app_runtime.core.runtime import RuntimeManager
class ApplicationHttpService:
def __init__(self, app_factory: HttpApplicationAppFactory | None = None) -> None:
self._channels: list[ApplicationHttpChannel] = []
self._registrars: list[HttpRouteRegistrar] = []
self._app_factory = app_factory or HttpApplicationAppFactory()
def register_channel(self, channel: ApplicationHttpChannel) -> None:
self._channels.append(channel)
def register_routes(self, registrar: HttpRouteRegistrar) -> None:
self._registrars.append(registrar)
def start(self, runtime: RuntimeManager) -> None:
if not self._channels:
return
asyncio.run(self._start_async(runtime))
def stop(self) -> None:
if not self._channels:
return
asyncio.run(self._stop_async())
async def _start_async(self, runtime: RuntimeManager) -> None:
app = self._app_factory.create(self._registrars, runtime.services)
for channel in self._channels:
await channel.start(app)
async def _stop_async(self) -> None:
for channel in reversed(self._channels):
await channel.stop()
+68
View File
@@ -0,0 +1,68 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from app_runtime.control.action_responder import ControlActionResponder
from app_runtime.control.base import ControlActionSet
from app_runtime.control.http_app import HttpControlAppFactory
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
if TYPE_CHECKING:
from app_runtime.core.runtime import RuntimeManager
class UnifiedHttpService:
"""Publish control and application routes through the same HTTP channel."""
def __init__(
self,
app_factory: HttpApplicationAppFactory | None = None,
control_app_factory: HttpControlAppFactory | None = None,
control_timeout: int = 5,
) -> None:
self._channels: list[ApplicationHttpChannel] = []
self._registrars: list[HttpRouteRegistrar] = []
self._app_factory = app_factory or HttpApplicationAppFactory()
self._control_app_factory = control_app_factory or HttpControlAppFactory()
self._control_timeout = control_timeout
def register_channel(self, channel: ApplicationHttpChannel) -> None:
self._channels.append(channel)
def register_routes(self, registrar: HttpRouteRegistrar) -> None:
self._registrars.append(registrar)
def start(self, runtime: RuntimeManager) -> None:
if not self._channels:
return
asyncio.run(self._start_async(runtime))
def stop(self) -> None:
if not self._channels:
return
asyncio.run(self._stop_async())
async def _start_async(self, runtime: RuntimeManager) -> None:
app = self._app_factory.create(self._registrars, runtime.services)
actions = ControlActionSet(
health=runtime.health_status,
start=runtime.start_runtime,
stop=runtime.stop_runtime,
status=runtime.runtime_status,
trace_lookup=runtime.trace_logs,
)
action_responder = ControlActionResponder(actions, self._control_timeout)
control_app = self._control_app_factory.create(
actions.health,
action_responder.respond,
actions.trace_lookup,
)
app.router.routes.extend(control_app.router.routes)
for channel in self._channels:
await channel.start(app)
async def _stop_async(self) -> None:
for channel in reversed(self._channels):
await channel.stop()
+140
View File
@@ -0,0 +1,140 @@
from __future__ import annotations
import json
from typing import Any
from app_runtime.contracts.trace import TraceLevel, TraceLogReader, TraceLogRecord, TraceLogView, TraceTransport
from app_runtime.tracing.transport import MySqlTraceConnectionFactory, MySqlTraceTransport
class MySqlTraceLogReader(TraceLogReader):
def __init__(self, connection_factory: MySqlTraceConnectionFactory) -> None:
self._connection_factory = connection_factory
def read_trace(
self,
trace_id: str,
levels: tuple[TraceLevel, ...],
ancestor_depth: int | None = 0,
) -> TraceLogView | None:
parent_id = self._read_parent_id(trace_id)
if parent_id is None and not self._trace_exists(trace_id):
return None
ancestors = self._read_ancestors(parent_id, levels, ancestor_depth)
child_ids = self._read_child_ids(trace_id)
records = self._read_records(trace_id, levels)
return TraceLogView(
trace_id=trace_id,
parent_id=parent_id,
child_ids=tuple(child_ids),
records=tuple(records),
ancestors=tuple(ancestors),
)
def _read_ancestors(
self,
parent_id: str | None,
levels: tuple[TraceLevel, ...],
ancestor_depth: int | None,
) -> list[TraceLogView]:
if parent_id is None or ancestor_depth == 0:
return []
remaining_depth = ancestor_depth
ancestors: list[TraceLogView] = []
current_trace_id = parent_id
while current_trace_id is not None and (remaining_depth is None or remaining_depth > 0):
current_parent_id = self._read_parent_id(current_trace_id)
ancestors.append(
TraceLogView(
trace_id=current_trace_id,
parent_id=current_parent_id,
child_ids=tuple(self._read_child_ids(current_trace_id)),
records=tuple(self._read_records(current_trace_id, levels)),
)
)
current_trace_id = current_parent_id
if remaining_depth is not None:
remaining_depth -= 1
ancestors.reverse()
return ancestors
def _trace_exists(self, trace_id: str) -> bool:
query = "SELECT 1 FROM trace_contexts WHERE trace_id = %s"
with self._connection_factory.connect() as connection:
with connection.cursor() as cursor:
cursor.execute(query, (trace_id,))
return cursor.fetchone() is not None
def _read_parent_id(self, trace_id: str) -> str | None:
query = "SELECT parent_id FROM trace_contexts WHERE trace_id = %s"
with self._connection_factory.connect() as connection:
with connection.cursor() as cursor:
cursor.execute(query, (trace_id,))
row = cursor.fetchone()
if row is None:
return None
return self._string_or_none(row.get("parent_id"))
def _read_records(self, trace_id: str, levels: tuple[TraceLevel, ...]) -> list[TraceLogRecord]:
placeholders = ", ".join(["%s"] * len(levels))
query = f"""
SELECT id, trace_id, event_time, step, status, level, message, attrs_json
FROM trace_messages
WHERE trace_id = %s AND level IN ({placeholders})
ORDER BY event_time ASC, id ASC
"""
params: tuple[object, ...] = (trace_id, *levels)
with self._connection_factory.connect() as connection:
with connection.cursor() as cursor:
cursor.execute(query, params)
rows = cursor.fetchall()
return [self._build_record(row) for row in rows]
def _read_child_ids(self, trace_id: str) -> list[str]:
query = """
SELECT trace_id
FROM trace_contexts
WHERE parent_id = %s
ORDER BY event_time ASC, trace_id ASC
"""
with self._connection_factory.connect() as connection:
with connection.cursor() as cursor:
cursor.execute(query, (trace_id,))
rows = cursor.fetchall()
return [str(row["trace_id"]) for row in rows]
def _build_record(self, row: dict[str, Any]) -> TraceLogRecord:
return TraceLogRecord(
id=int(row["id"]),
trace_id=str(row["trace_id"]),
event_time=row["event_time"],
step=str(row["step"] or ""),
status=str(row["status"] or ""),
level=str(row["level"]),
message=str(row["message"] or ""),
attrs_json=self._load_json(row.get("attrs_json")),
)
def _load_json(self, raw_value: Any) -> Any:
if raw_value is None or isinstance(raw_value, (dict, list, int, float, bool)):
return raw_value
if isinstance(raw_value, (bytes, bytearray)):
raw_value = raw_value.decode("utf-8")
if isinstance(raw_value, str):
try:
return json.loads(raw_value)
except json.JSONDecodeError:
return raw_value
return raw_value
def _string_or_none(self, value: Any) -> str | None:
if value is None:
return None
text = str(value)
return text or None
def build_trace_log_reader(transport: TraceTransport) -> TraceLogReader | None:
if isinstance(transport, MySqlTraceTransport):
return MySqlTraceLogReader(transport.create_connection_factory())
return None
+38 -13
View File
@@ -15,7 +15,7 @@ class NoOpTraceTransport(TraceTransport):
del record del record
class MySqlTraceTransport(TraceTransport): class MySqlTraceConnectionFactory:
def __init__( def __init__(
self, self,
*, *,
@@ -31,6 +31,39 @@ class MySqlTraceTransport(TraceTransport):
self._user = user self._user = user
self._password = password self._password = password
def connect(self): # type: ignore[no-untyped-def]
import pymysql
return pymysql.connect(
host=self._host,
port=self._port,
user=self._user,
password=self._password,
database=self._database,
charset="utf8mb4",
autocommit=True,
cursorclass=pymysql.cursors.DictCursor,
)
class MySqlTraceTransport(TraceTransport):
def __init__(
self,
*,
host: str,
port: int,
database: str,
user: str,
password: str,
) -> None:
self._connections = MySqlTraceConnectionFactory(
host=host,
port=port,
database=database,
user=user,
password=password,
)
def write_context(self, record: TraceContextRecord) -> None: def write_context(self, record: TraceContextRecord) -> None:
query = """ query = """
INSERT INTO trace_contexts (trace_id, parent_id, alias, type, event_time, attrs_json) INSERT INTO trace_contexts (trace_id, parent_id, alias, type, event_time, attrs_json)
@@ -69,21 +102,13 @@ class MySqlTraceTransport(TraceTransport):
self._execute(query, params) self._execute(query, params)
def _execute(self, query: str, params: tuple[object, ...]) -> None: def _execute(self, query: str, params: tuple[object, ...]) -> None:
import pymysql with self._connections.connect() as connection:
with pymysql.connect(
host=self._host,
port=self._port,
user=self._user,
password=self._password,
database=self._database,
charset="utf8mb4",
autocommit=True,
cursorclass=pymysql.cursors.DictCursor,
) as connection:
with connection.cursor() as cursor: with connection.cursor() as cursor:
cursor.execute(query, params) cursor.execute(query, params)
def create_connection_factory(self) -> MySqlTraceConnectionFactory:
return self._connections
def _dumps(self, payload: dict[str, object]) -> str: def _dumps(self, payload: dict[str, object]) -> str:
return json.dumps(payload, ensure_ascii=False, default=self._json_default) return json.dumps(payload, ensure_ascii=False, default=self._json_default)
+2 -2
View File
@@ -18,10 +18,10 @@ class WorkerSupervisor:
for worker in self._workers: for worker in self._workers:
worker.start() worker.start()
def stop(self, timeout: float = 30.0, force: bool = False) -> None: def stop(self, timeout: float = 30.0, force: bool = False, wait: bool = True) -> None:
for worker in self._workers: for worker in self._workers:
worker.stop(force=force) worker.stop(force=force)
if force: if not wait:
return return
deadline = monotonic() + timeout deadline = monotonic() + timeout
while True: while True:
@@ -133,7 +133,17 @@ class WorkflowRepository:
None, None,
"running", "running",
self._connection_factory.dumps(snapshot), self._connection_factory.dumps(snapshot),
runtime.get("email_trace_id"), self._resolve_trace_id(runtime),
)
@staticmethod
def _resolve_trace_id(runtime: dict[str, Any]) -> str | None:
return (
runtime.get("trace_id")
or runtime.get("task_trace_id")
or runtime.get("order_trace_id")
or runtime.get("attachment_trace_id")
or runtime.get("email_trace_id")
) )
def _use_memory(self) -> bool: def _use_memory(self) -> bool:
+16 -1
View File
@@ -1,6 +1,6 @@
from plba.bootstrap import create_runtime from plba.bootstrap import create_runtime
from plba.config import ConfigFileLoader, FileConfigProvider from plba.config import ConfigFileLoader, FileConfigProvider
from plba.control import ControlActionSet, ControlChannel, ControlPlaneService, HttpControlChannel from plba.control import ControlActionRequest, ControlActionSet, ControlChannel, ControlPlaneService, HttpControlChannel
from plba.contracts import ( from plba.contracts import (
ApplicationModule, ApplicationModule,
ConfigProvider, ConfigProvider,
@@ -15,6 +15,14 @@ from plba.contracts import (
) )
from plba.core import ConfigurationManager, RuntimeManager, ServiceContainer from plba.core import ConfigurationManager, RuntimeManager, ServiceContainer
from plba.health import HealthRegistry from plba.health import HealthRegistry
from plba.http import (
ApplicationHttpChannel,
ApplicationHttpService,
HttpApplicationAppFactory,
HttpApplicationChannel,
HttpRouteRegistrar,
UnifiedHttpService,
)
from plba.logging import LogManager from plba.logging import LogManager
from plba.queue import InMemoryTaskQueue from plba.queue import InMemoryTaskQueue
from plba.tracing import MySqlTraceTransport, NoOpTraceTransport, TraceService from plba.tracing import MySqlTraceTransport, NoOpTraceTransport, TraceService
@@ -35,6 +43,7 @@ __all__ = [
"ConfigFileLoader", "ConfigFileLoader",
"ConfigProvider", "ConfigProvider",
"ConfigurationManager", "ConfigurationManager",
"ControlActionRequest",
"ControlActionSet", "ControlActionSet",
"ControlChannel", "ControlChannel",
"ControlPlaneService", "ControlPlaneService",
@@ -42,7 +51,13 @@ __all__ = [
"FileConfigProvider", "FileConfigProvider",
"HealthContributor", "HealthContributor",
"HealthRegistry", "HealthRegistry",
"ApplicationHttpChannel",
"ApplicationHttpService",
"HttpApplicationAppFactory",
"HttpApplicationChannel",
"HttpRouteRegistrar",
"HttpControlChannel", "HttpControlChannel",
"UnifiedHttpService",
"InMemoryTaskQueue", "InMemoryTaskQueue",
"LogManager", "LogManager",
"MySqlTraceTransport", "MySqlTraceTransport",
+12
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
from app_runtime.control.http_channel import HttpControlChannel from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.core.runtime import RuntimeManager from app_runtime.core.runtime import RuntimeManager
from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.application import ApplicationModule
from app_runtime.http.http_channel import HttpApplicationChannel
def create_runtime( def create_runtime(
@@ -13,6 +14,9 @@ def create_runtime(
control_host: str = "127.0.0.1", control_host: str = "127.0.0.1",
control_port: int = 8080, control_port: int = 8080,
control_timeout: int = 5, control_timeout: int = 5,
application_host: str | None = None,
application_port: int = 15000,
application_timeout: int = 5,
) -> RuntimeManager: ) -> RuntimeManager:
runtime = RuntimeManager() runtime = RuntimeManager()
if config_path is not None: if config_path is not None:
@@ -25,5 +29,13 @@ def create_runtime(
timeout=control_timeout, timeout=control_timeout,
) )
) )
if application_host is not None:
runtime.application_http.register_channel(
HttpApplicationChannel(
host=application_host,
port=application_port,
timeout=application_timeout,
)
)
runtime.register_module(module) runtime.register_module(module)
return runtime return runtime
+2 -1
View File
@@ -1,8 +1,9 @@
from app_runtime.control.base import ControlActionSet, ControlChannel from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel
from app_runtime.control.http_channel import HttpControlChannel from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.control.service import ControlPlaneService from app_runtime.control.service import ControlPlaneService
__all__ = [ __all__ = [
"ControlActionRequest",
"ControlActionSet", "ControlActionSet",
"ControlChannel", "ControlChannel",
"ControlPlaneService", "ControlPlaneService",
+14
View File
@@ -0,0 +1,14 @@
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
from app_runtime.http.http_channel import HttpApplicationChannel
from app_runtime.http.service import ApplicationHttpService
from app_runtime.http.unified_service import UnifiedHttpService
__all__ = [
"ApplicationHttpChannel",
"ApplicationHttpService",
"HttpApplicationAppFactory",
"HttpApplicationChannel",
"HttpRouteRegistrar",
"UnifiedHttpService",
]
+234
View File
@@ -0,0 +1,234 @@
from __future__ import annotations
import http.client
from dataclasses import dataclass
from pathlib import Path
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import FileResponse
from fastapi.testclient import TestClient
import pytest
from app_runtime.contracts.application import ApplicationModule
from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.core.registration import ModuleRegistry
from app_runtime.core.runtime import RuntimeManager
from app_runtime.http.base import ApplicationHttpChannel
from app_runtime.http.http_channel import HttpApplicationChannel
from app_runtime.http.unified_service import UnifiedHttpService
try:
import python_multipart # noqa: F401
except ImportError:
HAS_MULTIPART = False
else:
HAS_MULTIPART = True
class RecordingChannel(ApplicationHttpChannel):
def __init__(self) -> None:
self.apps: list[FastAPI] = []
self.stop_calls = 0
async def start(self, app: FastAPI) -> None:
self.apps.append(app)
async def stop(self) -> None:
self.stop_calls += 1
class PingRoutes:
def register(self, app: FastAPI, services) -> None: # type: ignore[no-untyped-def]
@app.get("/estimate/ping")
async def ping() -> dict[str, str]:
return {"status": "ok"}
@dataclass
class ServiceBackedRoutes:
download_path: Path
def register(self, app: FastAPI, services) -> None: # type: ignore[no-untyped-def]
marker = services.get("task_query_service")
@app.get("/estimate/api/tasks")
async def list_tasks() -> dict[str, object]:
return {"marker": marker["marker"]}
@app.post("/estimate/api/tasks")
async def create_task(file: UploadFile = File(...)) -> dict[str, object]:
payload = await file.read()
return {"filename": file.filename, "size": len(payload)}
@app.get("/estimate/api/tasks/result")
async def download_result() -> FileResponse:
return FileResponse(self.download_path, filename=self.download_path.name)
class MetricsRoutes:
def register(self, app: FastAPI, services) -> None: # type: ignore[no-untyped-def]
@app.get("/estimate/api/metrics")
async def metrics() -> dict[str, int]:
return {"count": 1}
class HttpModule(ApplicationModule):
def __init__(self, *registrars: object) -> None:
self._registrars = registrars
@property
def name(self) -> str:
return "http-module"
def register(self, registry: ModuleRegistry) -> None:
for registrar in self._registrars:
registry.add_http_routes(registrar)
def _application_client(channel: RecordingChannel) -> TestClient:
assert channel.apps
return TestClient(channel.apps[0])
def _http_request(port: int, path: str) -> tuple[int, bytes]:
connection = http.client.HTTPConnection("127.0.0.1", port, timeout=2)
try:
connection.request("GET", path)
response = connection.getresponse()
payload = response.read()
return response.status, payload
finally:
connection.close()
def test_runtime_starts_application_http_and_registers_routes() -> None:
runtime = RuntimeManager()
channel = RecordingChannel()
runtime.application_http.register_channel(channel)
runtime.register_module(HttpModule(PingRoutes()))
runtime.start(start_control_plane=False)
try:
assert len(channel.apps) == 1
client = _application_client(channel)
with client:
response = client.get("/estimate/ping")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
assert response.headers["x-response-time-ms"].isdigit()
finally:
runtime.stop(stop_control_plane=False)
assert channel.stop_calls == 1
def test_application_routes_see_runtime_services_and_support_upload_download(tmp_path: Path) -> None:
if not HAS_MULTIPART:
pytest.skip("python-multipart is not installed in the local environment")
runtime = RuntimeManager()
runtime.services.register("task_query_service", {"marker": "from-container"})
result_path = tmp_path / "result.txt"
result_path.write_text("ready", encoding="utf-8")
channel = RecordingChannel()
runtime.application_http.register_channel(channel)
runtime.register_module(HttpModule(ServiceBackedRoutes(result_path), MetricsRoutes()))
runtime.start(start_control_plane=False)
client = _application_client(channel)
try:
with client:
list_response = client.get("/estimate/api/tasks")
assert list_response.status_code == 200
assert list_response.json() == {"marker": "from-container"}
upload_response = client.post(
"/estimate/api/tasks",
files={"file": ("input.txt", b"payload", "text/plain")},
)
assert upload_response.status_code == 200
assert upload_response.json() == {"filename": "input.txt", "size": 7}
metrics_response = client.get("/estimate/api/metrics")
assert metrics_response.status_code == 200
assert metrics_response.json() == {"count": 1}
download_response = client.get("/estimate/api/tasks/result")
assert download_response.status_code == 200
assert download_response.content == b"ready"
finally:
runtime.stop(stop_control_plane=False)
def test_application_http_stop_shuts_down_real_server() -> None:
runtime = RuntimeManager()
channel = HttpApplicationChannel(host="127.0.0.1", port=0, timeout=2)
runtime.application_http.register_channel(channel)
runtime.register_module(HttpModule(PingRoutes()))
runtime.start(start_control_plane=False)
try:
status, _ = _http_request(channel.port, "/estimate/ping")
assert status == 200
finally:
runtime.stop(stop_control_plane=False)
try:
_http_request(channel.port, "/estimate/ping")
except OSError:
pass
else:
raise AssertionError("application HTTP server is still reachable after stop")
def test_control_plane_and_application_http_work_independently() -> None:
runtime = RuntimeManager()
control_channel = HttpControlChannel(host="127.0.0.1", port=0, timeout=2)
app_channel = HttpApplicationChannel(host="127.0.0.1", port=0, timeout=2)
runtime.control_plane.register_channel(control_channel)
runtime.application_http.register_channel(app_channel)
runtime.register_module(HttpModule(PingRoutes()))
runtime.start()
try:
control_status, _ = _http_request(control_channel.port, "/health")
app_status, _ = _http_request(app_channel.port, "/estimate/ping")
assert control_status == 200
assert app_status == 200
control_missing_status, _ = _http_request(control_channel.port, "/estimate/ping")
app_missing_status, _ = _http_request(app_channel.port, "/health")
assert control_missing_status == 404
assert app_missing_status == 404
runtime.application_http.stop()
control_status, _ = _http_request(control_channel.port, "/health")
assert control_status == 200
finally:
runtime.control_plane.stop()
runtime.stop(stop_control_plane=False)
def test_unified_http_serves_control_and_application_routes_on_one_app() -> None:
runtime = RuntimeManager(application_http=UnifiedHttpService())
channel = RecordingChannel()
runtime.application_http.register_channel(channel)
runtime.register_module(HttpModule(PingRoutes()))
runtime.start(start_control_plane=False)
try:
client = _application_client(channel)
with client:
health_response = client.get("/health")
action_response = client.get("/actions/status")
app_response = client.get("/estimate/ping")
assert health_response.status_code == 200
assert health_response.json()["status"] == "ok"
assert action_response.status_code == 200
assert action_response.json() == {"status": "ok", "detail": "idle"}
assert app_response.status_code == 200
assert app_response.json() == {"status": "ok"}
finally:
runtime.stop(stop_control_plane=False)
+394
View File
@@ -0,0 +1,394 @@
from __future__ import annotations
from dataclasses import dataclass, field
from threading import Event, Lock, Thread
from time import monotonic, sleep
from fastapi.testclient import TestClient
from app_runtime.contracts.application import ApplicationModule
from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus
from app_runtime.control.base import ControlActionSet
from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.core.registration import ModuleRegistry
from app_runtime.core.runtime import RuntimeManager
@dataclass
class IntervalRoutine:
calls: list[float] = field(default_factory=list)
_lock: Lock = field(default_factory=Lock)
def run(self) -> None:
with self._lock:
self.calls.append(monotonic())
def wait_runs(self, count: int, timeout: float) -> bool:
deadline = monotonic() + timeout
while monotonic() < deadline:
with self._lock:
if len(self.calls) >= count:
return True
sleep(0.01)
return False
def deltas(self) -> list[float]:
with self._lock:
return [self.calls[index + 1] - self.calls[index] for index in range(len(self.calls) - 1)]
class BlockingRoutine:
def __init__(self, started: Event, release: Event) -> None:
self._started = started
self._release = release
def run(self) -> None:
self._started.set()
self._release.wait(timeout=5.0)
class ScenarioWorker(Worker):
def __init__(self, name: str, routine: object, *, interval: float = 0.05) -> None:
self._name = name
self._routine = routine
self._interval = interval
self._thread: Thread | None = None
self._stop_requested = Event()
self._in_flight = 0
self._runs = 0
self._lock = Lock()
@property
def name(self) -> str:
return self._name
@property
def critical(self) -> bool:
return True
def start(self) -> None:
if self._thread is not None and self._thread.is_alive():
return
self._stop_requested.clear()
self._thread = Thread(target=self._loop, name=f"{self._name}-thread", daemon=True)
self._thread.start()
def stop(self, force: bool = False) -> None:
self._stop_requested.set()
def health(self) -> WorkerHealth:
return WorkerHealth(name=self.name, status="ok", critical=True, meta={"runs": self._runs})
def status(self) -> WorkerStatus:
thread_alive = self._thread is not None and self._thread.is_alive()
with self._lock:
in_flight = self._in_flight
runs = self._runs
if not thread_alive:
state = "stopped"
elif self._stop_requested.is_set():
state = "stopping"
elif in_flight > 0:
state = "busy"
else:
state = "idle"
return WorkerStatus(name=self.name, state=state, in_flight=in_flight, meta={"runs": runs})
def _loop(self) -> None:
while not self._stop_requested.is_set():
with self._lock:
self._in_flight += 1
try:
self._routine.run()
with self._lock:
self._runs += 1
finally:
with self._lock:
self._in_flight -= 1
if self._stop_requested.is_set():
return
sleep(self._interval)
class ForceAwareWorker(Worker):
def __init__(self, name: str, started: Event, release: Event) -> None:
self._name = name
self._started = started
self._release = release
self._thread: Thread | None = None
self._stop_requested = Event()
self._force_stop = Event()
self.stop_calls: list[bool] = []
@property
def name(self) -> str:
return self._name
@property
def critical(self) -> bool:
return True
def start(self) -> None:
if self._thread is not None and self._thread.is_alive():
return
self._stop_requested.clear()
self._force_stop.clear()
self._thread = Thread(target=self._loop, name=f"{self._name}-thread", daemon=True)
self._thread.start()
def stop(self, force: bool = False) -> None:
self.stop_calls.append(force)
self._stop_requested.set()
if force:
self._force_stop.set()
def health(self) -> WorkerHealth:
return WorkerHealth(name=self.name, status="ok", critical=True)
def status(self) -> WorkerStatus:
thread_alive = self._thread is not None and self._thread.is_alive()
if not thread_alive:
state = "stopped"
elif self._stop_requested.is_set():
state = "stopping"
else:
state = "busy"
return WorkerStatus(name=self.name, state=state, in_flight=1 if thread_alive else 0)
def _loop(self) -> None:
self._started.set()
while not self._force_stop.is_set():
if self._release.wait(timeout=0.05):
return
class WorkerModule(ApplicationModule):
def __init__(self, worker: Worker) -> None:
self._worker = worker
@property
def name(self) -> str:
return "business-tests"
def register(self, registry: ModuleRegistry) -> None:
registry.add_worker(self._worker)
def _http_call_json(
client: TestClient,
path: str,
*,
method: str = "GET",
headers: dict[str, str] | None = None,
) -> tuple[int, dict[str, object]]:
response = client.request(method, path, headers=headers or {})
return response.status_code, response.json()
def _poll_until_stopped(client: TestClient, timeout: float = 2.0) -> bool:
deadline = monotonic() + timeout
while monotonic() < deadline:
_, payload = _http_call_json(client, "/actions/status")
if payload.get("detail") == "stopped":
return True
sleep(0.05)
return False
def _build_runtime(worker: Worker, *, control_timeout: int = 1) -> tuple[RuntimeManager, TestClient]:
runtime = RuntimeManager()
runtime.register_module(WorkerModule(worker))
runtime.start(start_control_plane=False)
channel = HttpControlChannel(host="127.0.0.1", port=0, timeout=control_timeout)
channel._actions = ControlActionSet(
health=runtime.health_status,
start=runtime.start_runtime,
stop=runtime.stop_runtime,
status=runtime.runtime_status,
)
app = channel._factory.create(channel._health_response, channel._action_response)
return runtime, TestClient(app)
def test_worker_wakes_up_with_configured_interval() -> None:
interval = 0.15
routine = IntervalRoutine()
worker = ScenarioWorker("interval-worker", routine, interval=interval)
runtime = RuntimeManager()
runtime.register_module(WorkerModule(worker))
runtime.start()
try:
assert routine.wait_runs(count=3, timeout=2.0) is True
finally:
runtime.stop()
deltas = routine.deltas()
assert len(deltas) >= 2
assert all(delta >= interval * 0.7 for delta in deltas[:2])
def test_actions_start_stop_and_health_when_worker_is_idle() -> None:
runtime, client = _build_runtime(ScenarioWorker("idle-worker", IntervalRoutine(), interval=0.2))
try:
stop_status, stop_payload = _http_call_json(client, "/actions/stop", method="POST")
assert stop_status == 200
assert isinstance(stop_payload.get("detail"), dict)
assert stop_payload["detail"]["timed_out"] is False
assert stop_payload["detail"]["state"] == "stopped"
health_stopped_status, health_stopped_payload = _http_call_json(client, "/health")
assert health_stopped_status == 503
assert health_stopped_payload["state"] == "stopped"
assert health_stopped_payload["status"] == "unhealthy"
start_status, start_payload = _http_call_json(client, "/actions/start", method="POST")
assert start_status == 200
assert isinstance(start_payload.get("detail"), dict)
assert start_payload["detail"]["timed_out"] is False
assert start_payload["detail"]["state"] in {"idle", "busy"}
health_started_status, health_started_payload = _http_call_json(client, "/health")
assert health_started_status == 200
assert health_started_payload["status"] == "ok"
assert health_started_payload["state"] in {"idle", "busy"}
finally:
client.close()
runtime.stop()
def test_actions_stop_busy_worker_before_timeout() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("busy-worker", BlockingRoutine(started, release)))
assert started.wait(timeout=1.0) is True
def release_later() -> None:
sleep(0.2)
release.set()
Thread(target=release_later, daemon=True).start()
try:
status, payload = _http_call_json(client, "/actions/stop", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopped"
health_status, health_payload = _http_call_json(client, "/health")
assert health_status == 503
assert health_payload["status"] == "unhealthy"
assert health_payload["state"] == "stopped"
finally:
client.close()
runtime.stop()
def test_actions_stop_busy_worker_after_timeout() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("slow-stop-worker", BlockingRoutine(started, release)))
runtime.ACTION_TIMEOUT_SECONDS = 0.3
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is True
assert payload["detail"]["state"] == "stopping"
health_status, health_payload = _http_call_json(client, "/health")
assert health_status == 503
assert health_payload["status"] == "degraded"
assert health_payload["state"] == "stopping"
release.set()
assert _poll_until_stopped(client, timeout=2.0) is True
stopped_health_status, stopped_health_payload = _http_call_json(client, "/health")
assert stopped_health_status == 503
assert stopped_health_payload["state"] == "stopped"
finally:
client.close()
runtime.stop()
def test_actions_stop_wait_false_returns_before_worker_finishes() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("async-stop-worker", BlockingRoutine(started, release)))
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop?wait=false", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopping"
health_status, health_payload = _http_call_json(client, "/health")
assert health_status == 503
assert health_payload["status"] == "degraded"
assert health_payload["state"] == "stopping"
release.set()
assert _poll_until_stopped(client, timeout=2.0) is True
finally:
client.close()
runtime.stop()
def test_actions_stop_honors_timeout_query_parameter() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("timeout-stop-worker", BlockingRoutine(started, release)))
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop?timeout=0.1", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is True
assert payload["detail"]["state"] == "stopping"
release.set()
assert _poll_until_stopped(client, timeout=2.0) is True
finally:
client.close()
runtime.stop()
def test_actions_stop_honors_force_query_parameter() -> None:
started = Event()
release = Event()
worker = ForceAwareWorker("force-stop-worker", started, release)
runtime, client = _build_runtime(worker)
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop?force=true&timeout=0.5", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopped"
assert worker.stop_calls == [True]
finally:
release.set()
client.close()
runtime.stop()
def test_actions_log_client_source_for_start_and_stop(caplog) -> None:
runtime, client = _build_runtime(ScenarioWorker("log-worker", IntervalRoutine(), interval=0.2))
caplog.set_level("WARNING", logger="app_runtime.control.http_app")
try:
_http_call_json(client, "/actions/stop", method="POST", headers={"X-Client-Source": "web-ui"})
_http_call_json(client, "/actions/start", method="POST", headers={"X-Client-Source": "web-ui"})
finally:
client.close()
runtime.stop()
messages = [record.getMessage() for record in caplog.records]
assert any("/actions/stop client=web-ui" in message for message in messages)
assert any("/actions/start client=web-ui" in message for message in messages)
+110 -3
View File
@@ -5,6 +5,8 @@ from dataclasses import dataclass, field
from threading import Event, Lock, Thread from threading import Event, Lock, Thread
from time import sleep from time import sleep
from fastapi.testclient import TestClient
from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.application import ApplicationModule
from app_runtime.contracts.health import HealthContributor from app_runtime.contracts.health import HealthContributor
from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus
@@ -176,6 +178,28 @@ class BlockingModule(ApplicationModule):
registry.add_worker(RoutineWorker("blocking-worker", self.routine)) registry.add_worker(RoutineWorker("blocking-worker", self.routine))
class WorkerModuleAdapter(ApplicationModule):
def __init__(self, worker: Worker) -> None:
self._worker = worker
@property
def name(self) -> str:
return "worker-adapter"
def register(self, registry: ModuleRegistry) -> None:
registry.add_worker(self._worker)
class ForceRecordingWorker(RoutineWorker):
def __init__(self) -> None:
super().__init__("force-recorder", CollectingRoutine(), interval=0.01)
self.stop_flags: list[bool] = []
def stop(self, force: bool = False) -> None:
self.stop_flags.append(force)
super().stop(force=force)
class RecordingTransport(NoOpTraceTransport): class RecordingTransport(NoOpTraceTransport):
def __init__(self) -> None: def __init__(self) -> None:
self.contexts: list[object] = [] self.contexts: list[object] = []
@@ -188,6 +212,21 @@ class RecordingTransport(NoOpTraceTransport):
self.messages.append(record) self.messages.append(record)
def _build_control_client(runtime: RuntimeManager, *, control_timeout: int = 1) -> TestClient:
from app_runtime.control.base import ControlActionSet
from app_runtime.control.http_channel import HttpControlChannel
channel = HttpControlChannel("127.0.0.1", 0, control_timeout)
channel._actions = ControlActionSet(
health=runtime.health_status,
start=runtime.start_runtime,
stop=runtime.stop_runtime,
status=runtime.runtime_status,
)
app = channel._factory.create(channel._health_response, channel._action_response)
return TestClient(app)
def test_runtime_runs_worker_routine_and_exposes_status(tmp_path) -> None: def test_runtime_runs_worker_routine_and_exposes_status(tmp_path) -> None:
config_path = tmp_path / "config.yml" config_path = tmp_path / "config.yml"
config_path.write_text( config_path.write_text(
@@ -302,15 +341,15 @@ def test_http_control_channel_exposes_health_and_actions() -> None:
async def health(): async def health():
return {"status": "ok" if state["started"] else "unhealthy", "state": "idle" if state["started"] else "stopped"} return {"status": "ok" if state["started"] else "unhealthy", "state": "idle" if state["started"] else "stopped"}
async def start_handler() -> str: async def start_handler(_request) -> str:
state["started"] = True state["started"] = True
return "started" return "started"
async def stop_handler() -> str: async def stop_handler(_request) -> str:
state["started"] = False state["started"] = False
return "stopped" return "stopped"
async def status_handler() -> str: async def status_handler(_request) -> str:
return "idle" if state["started"] else "stopped" return "idle" if state["started"] else "stopped"
async def scenario() -> None: async def scenario() -> None:
@@ -339,6 +378,74 @@ def test_http_control_channel_exposes_health_and_actions() -> None:
asyncio.run(scenario()) asyncio.run(scenario())
def test_http_control_stop_wait_false_returns_immediately() -> None:
started = Event()
release = Event()
runtime = RuntimeManager()
runtime.register_module(BlockingModule(started, release))
runtime.start(start_control_plane=False)
assert started.wait(timeout=1.0) is True
client = _build_control_client(runtime)
try:
response = client.post("/actions/stop?wait=false")
payload = response.json()
assert response.status_code == 200
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopping"
health_response = client.get("/health")
health_payload = health_response.json()
assert health_response.status_code == 503
assert health_payload["state"] == "stopping"
assert health_payload["status"] == "degraded"
finally:
release.set()
client.close()
runtime.stop()
def test_http_control_stop_timeout_query_changes_wait_window() -> None:
started = Event()
release = Event()
runtime = RuntimeManager()
runtime.ACTION_TIMEOUT_SECONDS = 5.0
runtime.register_module(BlockingModule(started, release))
runtime.start(start_control_plane=False)
assert started.wait(timeout=1.0) is True
client = _build_control_client(runtime)
try:
response = client.post("/actions/stop?timeout=0.1")
payload = response.json()
assert response.status_code == 200
assert payload["detail"]["timed_out"] is True
assert payload["detail"]["state"] == "stopping"
finally:
release.set()
client.close()
runtime.stop()
def test_http_control_stop_force_query_propagates_to_worker() -> None:
runtime = RuntimeManager()
worker = ForceRecordingWorker()
runtime.register_module(WorkerModuleAdapter(worker))
runtime.start(start_control_plane=False)
client = _build_control_client(runtime)
try:
response = client.post("/actions/stop?force=true")
payload = response.json()
assert response.status_code == 200
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopped"
assert worker.stop_flags == [True]
finally:
client.close()
runtime.stop()
def test_public_plba_package_exports_runtime_builder_and_worker_contract(tmp_path) -> None: def test_public_plba_package_exports_runtime_builder_and_worker_contract(tmp_path) -> None:
import plba import plba
from plba import ApplicationModule as PublicApplicationModule from plba import ApplicationModule as PublicApplicationModule
+738
View File
@@ -0,0 +1,738 @@
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from fastapi.responses import JSONResponse
from fastapi.testclient import TestClient
import app_runtime.core.runtime as runtime_module
from app_runtime.control.base import ControlActionRequest, TraceQueryRequest
from app_runtime.control.http_app import HttpControlAppFactory
from app_runtime.contracts.trace import TraceLogRecord, TraceLogView
from app_runtime.core.runtime import RuntimeManager
from app_runtime.tracing.reader import MySqlTraceLogReader
def _trace_record(
*,
row_id: int,
level: str,
message: str,
step: str = "process",
status: str = "failed",
attrs_json: object | None = None,
) -> TraceLogRecord:
return TraceLogRecord(
id=row_id,
trace_id="trace-1",
event_time=datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
step=step,
status=status,
level=level, # type: ignore[arg-type]
message=message,
attrs_json=attrs_json if attrs_json is not None else {},
)
def _build_client(trace_provider=None) -> TestClient:
async def health_provider():
return {"status": "ok"}
async def action_provider(_action: str, _client_source: str, _request: ControlActionRequest) -> JSONResponse:
return JSONResponse(content={"status": "ok"})
app = HttpControlAppFactory().create(health_provider, action_provider, trace_provider)
return TestClient(app)
def test_trace_endpoint_returns_html_by_default() -> None:
captured: list[tuple[str, TraceQueryRequest]] = []
async def trace_provider(trace_id: str, request: TraceQueryRequest) -> TraceLogView:
captured.append((trace_id, request))
return TraceLogView(
trace_id="trace-1",
parent_id="root-trace",
child_ids=("child-1", "child-2"),
records=(
_trace_record(row_id=1, level="ERROR", message="first error"),
_trace_record(row_id=2, level="WARNING", message="second warning"),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1")
finally:
client.close()
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/html")
assert "trace_id:" in response.text
assert "first error" in response.text
assert "second warning" in response.text
assert captured == [
(
"trace-1",
TraceQueryRequest(
levels=("ERROR", "WARNING", "INFO"),
include_attrs_json=False,
response_format="html",
ancestor_depth=0,
),
)
]
def test_trace_endpoint_returns_text_when_requested() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="root-trace",
child_ids=("child-1", "child-2"),
records=(
_trace_record(row_id=1, level="ERROR", message="first error"),
_trace_record(row_id=2, level="WARNING", message="second warning"),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=text")
finally:
client.close()
assert response.status_code == 200
assert response.text == (
"trace_id: trace-1\n"
"parent_id: root-trace\n"
"child_ids:\n"
" - child-1\n"
" - child-2\n"
"\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: process\n"
"first error\n"
"second warning"
)
def test_trace_endpoint_appends_attrs_json_in_text_mode() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id=None,
child_ids=(),
records=(
_trace_record(row_id=1, level="ERROR", message="failure", attrs_json={"attempt": 2, "source": "crm"}),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=text&attrs_json=true")
finally:
client.close()
assert response.status_code == 200
assert response.text == (
"trace_id: trace-1\n"
"parent_id: \n"
"child_ids:\n"
"\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: process\n"
'failure, {"attempt":2,"source":"crm"}'
)
def test_trace_endpoint_separates_messages_by_step_in_text_mode() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id=None,
child_ids=(),
records=(
_trace_record(row_id=1, level="INFO", message="load first", step="load_stocks"),
_trace_record(row_id=2, level="INFO", message="load second", step="load_stocks"),
_trace_record(row_id=3, level="INFO", message="filter first", step="filter_stocks"),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=text")
finally:
client.close()
assert response.status_code == 200
assert response.text == (
"trace_id: trace-1\n"
"parent_id: \n"
"child_ids:\n"
"\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: load_stocks\n"
"load first\n"
"load second\n"
"------------------------------\n"
"step: filter_stocks\n"
"filter first"
)
def test_trace_endpoint_returns_json_payload() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=("child-1",),
records=(
_trace_record(row_id=3, level="INFO", message="done", attrs_json={"batch": 7}),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=json&attrs_json=true&levels=info")
finally:
client.close()
assert response.status_code == 200
assert response.json() == {
"trace_id": "trace-1",
"parent_id": "parent-1",
"child_ids": ["child-1"],
"messages": [
{
"id": 3,
"trace_id": "trace-1",
"event_time": "2026-04-28T10:11:12+00:00",
"step": "process",
"status": "failed",
"level": "INFO",
"message": "done",
"attrs_json": {"batch": 7},
}
],
"ancestors": [],
}
def test_trace_endpoint_returns_json_payload_with_ancestors() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=("child-1",),
records=(
_trace_record(row_id=3, level="INFO", message="done", attrs_json={"batch": 7}),
),
ancestors=(
TraceLogView(
trace_id="root-1",
parent_id=None,
child_ids=("parent-1",),
records=(
_trace_record(row_id=4, level="INFO", message="root info"),
),
),
TraceLogView(
trace_id="parent-1",
parent_id="root-1",
child_ids=("trace-1", "sibling-1"),
records=(
_trace_record(row_id=5, level="WARNING", message="parent warning"),
),
),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=json&ancestor_depth=1")
finally:
client.close()
assert response.status_code == 200
assert response.json()["ancestors"] == [
{
"trace_id": "root-1",
"parent_id": "",
"child_ids": ["parent-1"],
"messages": [
{
"id": 4,
"trace_id": "trace-1",
"event_time": "2026-04-28T10:11:12+00:00",
"step": "process",
"status": "failed",
"level": "INFO",
"message": "root info",
}
],
},
{
"trace_id": "parent-1",
"parent_id": "root-1",
"child_ids": ["trace-1", "sibling-1"],
"messages": [
{
"id": 5,
"trace_id": "trace-1",
"event_time": "2026-04-28T10:11:12+00:00",
"step": "process",
"status": "failed",
"level": "WARNING",
"message": "parent warning",
}
],
}
]
def test_trace_endpoint_returns_html_page_with_related_links() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=("child-1", "child-2"),
records=(
_trace_record(row_id=1, level="INFO", message="loaded prices", step="load_stocks", status="ok"),
_trace_record(row_id=2, level="WARNING", message="filtered suspicious ticker", step="filter_stocks", status="degraded"),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=html&attrs_json=true")
finally:
client.close()
assert response.status_code == 200
assert response.headers["content-type"].startswith("text/html")
assert "background: var(--bg);" in response.text
assert "--bg: #000000;" in response.text
assert "--fg: #ececec;" in response.text
assert "--step: #ffffff;" in response.text
assert "--info: #d6d7d9;" in response.text
assert "--warning: #e9ebec;" in response.text
assert "--error: #ff817d;" in response.text
assert "--other: #ececec;" in response.text
assert 'font: 13px/1.1 "SFMono-Regular", monospace;' in response.text
assert '<div class="line">trace_id: <a href="/traces/trace-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">trace-1</a></div>' in response.text
assert '<div class="line">parent_id: <a href="/traces/parent-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">parent-1</a></div>' in response.text
assert '<div class="line">child_ids:</div>' in response.text
assert '<div class="line"> - <a href="/traces/child-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">child-1</a></div>' in response.text
assert '<div class="line"> - <a href="/traces/child-2?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">child-2</a></div>' in response.text
assert '<div class="line">==============================</div>' in response.text
assert '<div class="line" style="color: var(--step);">load_stocks</div>' in response.text
assert '<div class="line">trace_id: <a href="/traces/trace-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true">trace-1</a></div>' in response.text
assert '<div class="line" style="color: var(--step);">filter_stocks</div>' in response.text
assert "loaded prices" in response.text
assert "filtered suspicious ticker" in response.text
assert "2026-04-28T10:11:12+00:00 | INFO | ok" not in response.text
assert "2026-04-28T10:11:12+00:00 | WARNING | degraded" not in response.text
assert "Related Traces" not in response.text
def test_trace_endpoint_renders_ancestors_in_text_mode() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=(),
records=(_trace_record(row_id=1, level="INFO", message="child message"),),
ancestors=(
TraceLogView(
trace_id="root-1",
parent_id=None,
child_ids=("parent-1",),
records=(_trace_record(row_id=2, level="INFO", message="root message"),),
),
TraceLogView(
trace_id="parent-1",
parent_id="root-1",
child_ids=("trace-1",),
records=(_trace_record(row_id=3, level="WARNING", message="parent message"),),
),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=text&ancestor_depth=1")
finally:
client.close()
assert response.status_code == 200
assert response.text == (
"trace_id: trace-1\n"
"parent_id: parent-1\n"
"child_ids:\n"
"\n"
"==============================\n"
"trace_id: root-1\n"
"\n"
"step: process\n"
"root message\n"
"\n"
"\n"
"==============================\n"
"trace_id: parent-1\n"
"\n"
"step: process\n"
"parent message\n"
"\n"
"\n"
"==============================\n"
"trace_id: trace-1\n"
"\n"
"step: process\n"
"child message"
)
def test_trace_endpoint_preserves_ancestor_depth_in_html_links() -> None:
async def trace_provider(_trace_id: str, _request: TraceQueryRequest) -> TraceLogView:
return TraceLogView(
trace_id="trace-1",
parent_id="parent-1",
child_ids=("child-1",),
records=(_trace_record(row_id=1, level="INFO", message="loaded prices"),),
ancestors=(
TraceLogView(
trace_id="root-1",
parent_id=None,
child_ids=("parent-1",),
records=(_trace_record(row_id=2, level="INFO", message="root info"),),
),
TraceLogView(
trace_id="parent-1",
parent_id="root-1",
child_ids=("trace-1",),
records=(_trace_record(row_id=3, level="WARNING", message="parent warning"),),
),
),
)
client = _build_client(trace_provider)
try:
response = client.get("/traces/trace-1?format=html&attrs_json=true&ancestor_depth=all")
finally:
client.close()
assert response.status_code == 200
assert 'href="/traces/trace-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true&amp;ancestor_depth=all"' in response.text
assert 'href="/traces/root-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true&amp;ancestor_depth=all"' in response.text
assert 'href="/traces/parent-1?format=html&amp;levels=ERROR%2CWARNING%2CINFO&amp;attrs_json=true&amp;ancestor_depth=all"' in response.text
assert response.text.index("root info") < response.text.index("parent warning") < response.text.index("loaded prices")
assert "root info" in response.text
assert "parent warning" in response.text
def test_trace_endpoint_validates_query_params() -> None:
client = _build_client(lambda _trace_id, _request: None)
try:
invalid_level = client.get("/traces/trace-1?levels=error,fatal")
invalid_format = client.get("/traces/trace-1?format=xml")
invalid_ancestor_depth = client.get("/traces/trace-1?ancestor_depth=-1")
invalid_ancestor_type = client.get("/traces/trace-1?ancestor_depth=up")
finally:
client.close()
assert invalid_level.status_code == 400
assert invalid_level.json() == {"status": "error", "detail": "unsupported trace levels: FATAL"}
assert invalid_format.status_code == 400
assert invalid_format.json() == {"status": "error", "detail": "unsupported trace format: xml"}
assert invalid_ancestor_depth.status_code == 400
assert invalid_ancestor_depth.json() == {
"status": "error",
"detail": "query parameter must be >= 0: ancestor_depth=-1",
}
assert invalid_ancestor_type.status_code == 400
assert invalid_ancestor_type.json() == {
"status": "error",
"detail": "invalid ancestor depth query parameter: ancestor_depth=up",
}
def test_runtime_trace_logs_uses_configured_reader(monkeypatch) -> None:
expected = TraceLogView(
trace_id="trace-1",
parent_id="root",
child_ids=("child-1",),
records=(_trace_record(row_id=1, level="ERROR", message="boom"),),
)
class StubReader:
def read_trace(
self,
trace_id: str,
levels: tuple[str, ...],
ancestor_depth: int | None = 0,
) -> TraceLogView | None:
assert trace_id == "trace-1"
assert levels == ("ERROR",)
assert ancestor_depth is None
return expected
monkeypatch.setattr(runtime_module, "build_trace_log_reader", lambda _transport: StubReader())
runtime = RuntimeManager()
result = asyncio.run(runtime.trace_logs("trace-1", TraceQueryRequest(levels=("ERROR",), ancestor_depth=None)))
assert result == expected
def test_mysql_trace_log_reader_maps_db_rows() -> None:
class FakeCursor:
def __init__(self) -> None:
self.executed: list[tuple[str, tuple[object, ...]]] = []
self._current_query = ""
def execute(self, query: str, params: tuple[object, ...]) -> None:
self.executed.append((query, params))
self._current_query = query
def fetchone(self) -> dict[str, object] | None:
if self.executed[-1][1] == ("trace-1",):
return {"parent_id": "root-77"}
if self.executed[-1][1] == ("root-77",):
return {"parent_id": None}
return None
def fetchall(self) -> list[dict[str, object]]:
if "WHERE parent_id = %s" in self._current_query:
return [{"trace_id": "child-1"}, {"trace_id": "child-2"}]
return [
{
"id": 8,
"trace_id": "trace-1",
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
"step": "parse",
"status": "failed",
"level": "ERROR",
"message": "broken",
"attrs_json": '{"attempt":1}',
}
]
def __enter__(self) -> FakeCursor:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnection:
def __init__(self, cursor: FakeCursor) -> None:
self._cursor = cursor
def cursor(self) -> FakeCursor:
return self._cursor
def __enter__(self) -> FakeConnection:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnectionFactory:
def __init__(self) -> None:
self.cursor = FakeCursor()
def connect(self) -> FakeConnection:
return FakeConnection(self.cursor)
factory = FakeConnectionFactory()
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
view = reader.read_trace("trace-1", ("ERROR", "WARNING"))
assert view == TraceLogView(
trace_id="trace-1",
parent_id="root-77",
child_ids=("child-1", "child-2"),
records=(
TraceLogRecord(
id=8,
trace_id="trace-1",
event_time=datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
step="parse",
status="failed",
level="ERROR",
message="broken",
attrs_json={"attempt": 1},
),
),
ancestors=(),
)
assert len(factory.cursor.executed) == 3
assert factory.cursor.executed[1][1] == ("trace-1",)
assert factory.cursor.executed[2][1] == ("trace-1", "ERROR", "WARNING")
def test_mysql_trace_log_reader_loads_requested_ancestors() -> None:
class FakeCursor:
def __init__(self) -> None:
self.executed: list[tuple[str, tuple[object, ...]]] = []
self._current_query = ""
def execute(self, query: str, params: tuple[object, ...]) -> None:
self.executed.append((query, params))
self._current_query = query
def fetchone(self) -> dict[str, object] | None:
if self.executed[-1][1] == ("trace-1",):
return {"parent_id": "parent-1"}
if self.executed[-1][1] == ("parent-1",):
return {"parent_id": "root-1"}
if self.executed[-1][1] == ("root-1",):
return {"parent_id": None}
return None
def fetchall(self) -> list[dict[str, object]]:
if "WHERE parent_id = %s" in self._current_query:
parent_id = self.executed[-1][1][0]
if parent_id == "trace-1":
return []
if parent_id == "parent-1":
return [{"trace_id": "trace-1"}]
if parent_id == "root-1":
return [{"trace_id": "parent-1"}]
return []
trace_id = self.executed[-1][1][0]
return [
{
"id": 8 if trace_id == "trace-1" else 9,
"trace_id": trace_id,
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
"step": "parse",
"status": "failed",
"level": "ERROR",
"message": f"broken:{trace_id}",
"attrs_json": '{"attempt":1}',
}
]
def __enter__(self) -> FakeCursor:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnection:
def __init__(self, cursor: FakeCursor) -> None:
self._cursor = cursor
def cursor(self) -> FakeCursor:
return self._cursor
def __enter__(self) -> FakeConnection:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnectionFactory:
def __init__(self) -> None:
self.cursor = FakeCursor()
def connect(self) -> FakeConnection:
return FakeConnection(self.cursor)
factory = FakeConnectionFactory()
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
view = reader.read_trace("trace-1", ("ERROR",), 1)
assert view is not None
assert view.trace_id == "trace-1"
assert view.parent_id == "parent-1"
assert len(view.ancestors) == 1
assert view.ancestors[0].trace_id == "parent-1"
assert view.ancestors[0].parent_id == "root-1"
assert view.ancestors[0].child_ids == ("trace-1",)
def test_mysql_trace_log_reader_orders_ancestors_root_first() -> None:
class FakeCursor:
def __init__(self) -> None:
self.executed: list[tuple[str, tuple[object, ...]]] = []
self._current_query = ""
def execute(self, query: str, params: tuple[object, ...]) -> None:
self.executed.append((query, params))
self._current_query = query
def fetchone(self) -> dict[str, object] | None:
if self.executed[-1][1] == ("trace-1",):
return {"parent_id": "parent-1"}
if self.executed[-1][1] == ("parent-1",):
return {"parent_id": "root-1"}
if self.executed[-1][1] == ("root-1",):
return {"parent_id": None}
return None
def fetchall(self) -> list[dict[str, object]]:
if "WHERE parent_id = %s" in self._current_query:
parent_id = self.executed[-1][1][0]
if parent_id == "root-1":
return [{"trace_id": "parent-1"}]
if parent_id == "parent-1":
return [{"trace_id": "trace-1"}]
return []
trace_id = self.executed[-1][1][0]
return [
{
"id": 8,
"trace_id": trace_id,
"event_time": datetime(2026, 4, 28, 10, 11, 12, tzinfo=timezone.utc),
"step": "parse",
"status": "failed",
"level": "ERROR",
"message": f"broken:{trace_id}",
"attrs_json": '{"attempt":1}',
}
]
def __enter__(self) -> FakeCursor:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnection:
def __init__(self, cursor: FakeCursor) -> None:
self._cursor = cursor
def cursor(self) -> FakeCursor:
return self._cursor
def __enter__(self) -> FakeConnection:
return self
def __exit__(self, exc_type, exc, tb) -> None:
return None
class FakeConnectionFactory:
def __init__(self) -> None:
self.cursor = FakeCursor()
def connect(self) -> FakeConnection:
return FakeConnection(self.cursor)
factory = FakeConnectionFactory()
reader = MySqlTraceLogReader(factory) # type: ignore[arg-type]
view = reader.read_trace("trace-1", ("ERROR",), 2)
assert view is not None
assert tuple(ancestor.trace_id for ancestor in view.ancestors) == ("root-1", "parent-1")
+41
View File
@@ -0,0 +1,41 @@
from app_runtime.workflow.persistence.workflow_repository import WorkflowRepository
class StubConnectionFactory:
@staticmethod
def dumps(snapshot) -> str:
return str(snapshot)
def test_build_run_payload_prefers_active_task_trace_id() -> None:
repository = WorkflowRepository(StubConnectionFactory())
snapshot = {
"payload": {"inbox_message": {"external_message_id": "msg-1", "id": 7}},
"state": {
"runtime": {
"trace_id": "task-trace-1",
"email_trace_id": "message-trace-1",
"queue_task_id": 13,
}
},
}
payload = repository._build_run_payload("estimate", snapshot)
assert payload[0] == "estimate"
assert payload[2] == "msg-1"
assert payload[3] == 13
assert payload[4] == 7
assert payload[8] == "task-trace-1"
def test_build_run_payload_falls_back_to_legacy_email_trace_id() -> None:
repository = WorkflowRepository(StubConnectionFactory())
snapshot = {
"payload": {"inbox_message": {"external_message_id": "msg-2", "id": 8}},
"state": {"runtime": {"email_trace_id": "message-trace-2"}},
}
payload = repository._build_run_payload("estimate", snapshot)
assert payload[8] == "message-trace-2"