Небольшие доработки по трейсу

This commit is contained in:
2026-04-26 20:47:13 +03:00
parent 314e6f3c46
commit ed33f6e9cd
16 changed files with 725 additions and 27 deletions
+3 -1
View File
@@ -1,3 +1,5 @@
__pycache__/
*.pyc
plba.egg-ingo/
.pytest_cache/
build/
plba.egg-info/
+7
View File
@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
+58
View File
@@ -1,5 +1,29 @@
# PLBA
## Установка
Установка пакета напрямую из Git-репозитория через `pip`:
```bash
export GIT_PLBA_TOKEN="<ваш_токен>"
pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git"
```
Если нужен конкретный тег, ветка или commit, добавьте ref после `.git`:
```bash
pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git@main"
```
### Доступ через `GIT_PLBA_TOKEN`
Для установки по HTTPS используется переменная окружения `GIT_PLBA_TOKEN`. В нее нужно положить персональный Git-токен с правом чтения репозитория `alex/plba`. Токен передается в URL установки и позволяет `pip` скачать исходники без SSH-доступа.
Рекомендуется:
- экспортировать токен только в текущую shell-сессию;
- не хардкодить токен в `requirements.txt`, `pyproject.toml` или исходниках;
- использовать отдельный read-only токен, если Git-сервер это поддерживает.
## 1. Назначение платформы
PLBA (`Platform Runtime for Business Applications`) - это runtime-слой для бизнес-приложений, который забирает на себя инфраструктурную часть исполнения. Платформа стандартизирует запуск и остановку рабочих процессов, контроль состояния приложения и эксплуатационные сервисы вокруг них. За счет этого прикладной код концентрируется на бизнес-логике, а не на lifecycle, диагностике и служебных механизмах. Базовая модель использования строится вокруг `ApplicationModule`, `Worker` и прикладной `Routine`, где каждый уровень отвечает за свою часть ответственности. В результате приложение получается предсказуемым в эксплуатации, проще в сопровождении и масштабировании.
@@ -189,6 +213,11 @@ classDiagram
- Для чего нужен: позволяет собрать дерево исполнения и связать все сообщения конкретной бизнес-операции.
- Атрибуты контекста (`TraceContextRecord`): `trace_id`, `alias`, `parent_id`, `type`, `event_time`, `attrs`.
- Иерархия: `parent_id` указывает на родительский контекст; так строится цепочка root -> child.
- Стандартная модель для бизнес-приложений:
- root-context создается на внешний триггер верхнего уровня, например входящее письмо, webhook, команду оператора или batch-запуск;
- child-context создается на каждую производную единицу работы, например вложение письма, отдельную задачу, заказ или документ;
- child-context должен создаваться через `parent_id=<trace_id root-context>` и передаваться дальше через `trace_context` / runtime metadata как активный trace конкретной бизнес-операции;
- если одно входящее сообщение порождает несколько задач, у них должен быть общий parent root-context и разные child trace contexts.
- Таблица: `trace_contexts`.
- Как объявляется в коде:
```python
@@ -199,6 +228,22 @@ with traces.open_context(alias="orders-worker", kind="worker", attrs={"routine":
root = traces.new_root("orders.sync")
child = traces.child_of(root, "orders.process_batch")
```
```python
with traces.open_context(alias="email:123", kind="email") as message_trace_id:
...
with traces.open_context(
alias="attachment:invoice.xlsx",
parent_id=message_trace_id,
kind="task",
attrs={"attachment_index": 0},
) as task_trace_id:
runtime["trace_id"] = task_trace_id
runtime["message_trace_id"] = message_trace_id
...
```
- Стандарт для workflow persistence:
- если workflow представляет отдельную бизнес-задачу, в `state.runtime.trace_id` должен лежать активный trace этой задачи;
- дополнительные идентификаторы родительских контекстов (`message_trace_id`, `email_trace_id` и т.д.) могут храниться рядом в runtime для логов и связности, но `workflow_runs.trace_id` должен ссылаться именно на активный trace текущей задачи.
- Trace Message:
- Что это: событие внутри активного context (статус шага, предупреждение, ошибка, служебная информация).
- Роль `step`: текущая стадия операции (`parse`, `validate`, `persist` и т.д.), которую выставляют через `traces.step("...")`.
@@ -245,6 +290,19 @@ child = traces.child_of(root, "orders.process_batch")
- При создании runtime включить `enable_http_control=True`.
- Использовать `/health` для readiness/liveness и `/actions/*` для операционного контроля.
#### Простой Web UI через nginx (порт 15000)
- Статические файлы UI: `web/control-ui/` (`index.html`, `app.js`, `styles.css`).
- UI использует polling (`/api/health` раз в 2 секунды) и кнопки `Start/Stop` (`POST /api/actions/start|stop`).
- Для вызовов API UI передает заголовок `X-Client-Source: web-ui`.
- Пример server-конфига nginx: `deploy/nginx/plba-control-ui.conf`.
- Слушает `15000`.
- Отдает UI из `/usr/share/nginx/html`.
- Проксирует `/api/*` на control API `http://app:8080/*` (sidecar в docker network).
- Пример запуска в составе бизнес-приложения: `deploy/docker-compose.control-ui.yml`.
- Публикуется только один внешний порт: `15000`.
- Внутренний control API остается в сети compose и доступен nginx по имени сервиса `app`.
- В `app` нужно поднять control channel на `0.0.0.0:8080`.
#### Queue
- Назначение: простой in-memory буфер задач/сообщений внутри приложения.
- Реализация: `InMemoryTaskQueue[T]`.
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "plba"
version = "0.2.7"
version = "0.2.8"
description = "Platform runtime for business applications"
readme = "README.md"
requires-python = ">=3.11"
+2 -2
View File
@@ -1,5 +1,5 @@
from app_runtime.control.base import ControlActionSet, ControlChannel
from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel
from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.control.service import ControlPlaneService
__all__ = ["ControlActionSet", "ControlChannel", "ControlPlaneService", "HttpControlChannel"]
__all__ = ["ControlActionRequest", "ControlActionSet", "ControlChannel", "ControlPlaneService", "HttpControlChannel"]
+10 -1
View File
@@ -6,7 +6,16 @@ from dataclasses import dataclass
from app_runtime.core.types import HealthPayload
ActionHandler = Callable[[], Awaitable[str]]
@dataclass(slots=True)
class ControlActionRequest:
wait: bool | None = None
timeout: float | None = None
force: bool | None = None
ActionResult = str | dict[str, object]
ActionHandler = Callable[[ControlActionRequest], Awaitable[ActionResult]]
HealthHandler = Callable[[], Awaitable[HealthPayload]]
+53 -3
View File
@@ -1,19 +1,23 @@
from __future__ import annotations
import logging
import time
from collections.abc import Awaitable, Callable
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from app_runtime.control.base import ControlActionRequest
from app_runtime.core.types import HealthPayload
LOGGER = logging.getLogger(__name__)
class HttpControlAppFactory:
def create(
self,
health_provider: Callable[[], Awaitable[HealthPayload]],
action_provider: Callable[[str], Awaitable[JSONResponse]],
action_provider: Callable[[str, str, ControlActionRequest], Awaitable[JSONResponse]],
) -> FastAPI:
app = FastAPI(title="PLBA Control API")
@@ -32,7 +36,53 @@ class HttpControlAppFactory:
@app.get("/actions/{action}")
@app.post("/actions/{action}")
async def action(action: str) -> JSONResponse:
return await action_provider(action)
async def action(action: str, request: Request) -> JSONResponse:
client_source = self._client_source(request)
if action in {"start", "stop"}:
LOGGER.warning("Control action requested: /actions/%s client=%s", action, client_source)
try:
action_request = self._action_request(request)
except ValueError as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=400)
return await action_provider(action, client_source, action_request)
return app
def _action_request(self, request: Request) -> ControlActionRequest:
return ControlActionRequest(
wait=self._bool_param(request, "wait"),
timeout=self._float_param(request, "timeout"),
force=self._bool_param(request, "force"),
)
def _client_source(self, request: Request) -> str:
explicit_header = request.headers.get("X-Client-Source", "").strip()
if explicit_header:
return explicit_header
user_agent = request.headers.get("User-Agent", "").strip()
if user_agent:
return f"user-agent:{user_agent}"
return "unknown"
def _bool_param(self, request: Request, name: str) -> bool | None:
raw_value = request.query_params.get(name)
if raw_value is None:
return None
normalized = raw_value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
raise ValueError(f"invalid boolean query parameter: {name}={raw_value}")
def _float_param(self, request: Request, name: str) -> float | None:
raw_value = request.query_params.get(name)
if raw_value is None:
return None
try:
value = float(raw_value)
except ValueError as exc:
raise ValueError(f"invalid numeric query parameter: {name}={raw_value}") from exc
if value < 0:
raise ValueError(f"query parameter must be >= 0: {name}={raw_value}")
return value
+16 -4
View File
@@ -4,7 +4,7 @@ import asyncio
from fastapi.responses import JSONResponse
from app_runtime.control.base import ControlActionSet, ControlChannel
from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel
from app_runtime.control.http_app import HttpControlAppFactory
from app_runtime.control.http_runner import UvicornThreadRunner
@@ -33,7 +33,12 @@ class HttpControlChannel(ControlChannel):
return {"status": "unhealthy", "detail": "control actions are not configured"}
return await asyncio.wait_for(self._actions.health(), timeout=float(self._timeout))
async def _action_response(self, action: str) -> JSONResponse:
async def _action_response(
self,
action: str,
_client_source: str = "unknown",
request: ControlActionRequest | None = None,
) -> JSONResponse:
if self._actions is None:
return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404)
callbacks = {
@@ -44,9 +49,10 @@ class HttpControlChannel(ControlChannel):
callback = callbacks.get(action)
if callback is None:
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
action_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
action_request = request or ControlActionRequest()
action_timeout = self._action_timeout(action, action_request)
try:
detail = await asyncio.wait_for(callback(), timeout=action_timeout)
detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
except asyncio.TimeoutError:
return JSONResponse(
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
@@ -55,3 +61,9 @@ class HttpControlChannel(ControlChannel):
except Exception as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
if action != "stop" or request.wait is False or request.timeout is None:
return base_timeout
return max(base_timeout, float(request.timeout) + 1.0)
+11 -5
View File
@@ -4,6 +4,7 @@ from time import monotonic, sleep
from app_runtime.config.providers import FileConfigProvider
from app_runtime.contracts.application import ApplicationModule
from app_runtime.control.base import ControlActionRequest
from app_runtime.control.service import ControlPlaneService
from app_runtime.core.configuration import ConfigurationManager
from app_runtime.core.registration import ModuleRegistry
@@ -87,7 +88,7 @@ class RuntimeManager:
async def health_status(self) -> HealthPayload:
return self.current_health()
async def start_runtime(self) -> dict[str, object] | str:
async def start_runtime(self, _request: ControlActionRequest) -> dict[str, object] | str:
self._refresh_state()
if self._started:
return "runtime already running"
@@ -100,24 +101,29 @@ class RuntimeManager:
return self._action_detail("runtime started", timed_out=False)
return self._action_detail("runtime start is still in progress", timed_out=True)
async def stop_runtime(self) -> dict[str, object] | str:
async def stop_runtime(self, request: ControlActionRequest) -> dict[str, object] | str:
self._refresh_state()
if not self._started:
if self._state == LifecycleState.STOPPING:
return self._action_detail("runtime stop is still in progress", timed_out=True)
return "runtime already stopped"
wait = True if request.wait is None else request.wait
timeout = self.ACTION_TIMEOUT_SECONDS if request.timeout is None else float(request.timeout)
force = False if request.force is None else request.force
self._state = LifecycleState.STOPPING
try:
self.workers.stop(timeout=self.ACTION_TIMEOUT_SECONDS, force=False)
self.workers.stop(timeout=timeout, force=force, wait=wait)
except TimeoutError:
return self._action_detail("runtime stop is still in progress", timed_out=True)
self._refresh_state()
if self._state == LifecycleState.STOPPED:
self._started = False
self._state = LifecycleState.STOPPED
return self._action_detail("runtime stopped", timed_out=False)
return self._action_detail("runtime stop requested", timed_out=False)
async def runtime_status(self) -> str:
async def runtime_status(self, _request: ControlActionRequest) -> str:
self._refresh_state()
return self._state.value
+2 -2
View File
@@ -18,10 +18,10 @@ class WorkerSupervisor:
for worker in self._workers:
worker.start()
def stop(self, timeout: float = 30.0, force: bool = False) -> None:
def stop(self, timeout: float = 30.0, force: bool = False, wait: bool = True) -> None:
for worker in self._workers:
worker.stop(force=force)
if force:
if not wait:
return
deadline = monotonic() + timeout
while True:
@@ -133,7 +133,17 @@ class WorkflowRepository:
None,
"running",
self._connection_factory.dumps(snapshot),
runtime.get("email_trace_id"),
self._resolve_trace_id(runtime),
)
@staticmethod
def _resolve_trace_id(runtime: dict[str, Any]) -> str | None:
return (
runtime.get("trace_id")
or runtime.get("task_trace_id")
or runtime.get("order_trace_id")
or runtime.get("attachment_trace_id")
or runtime.get("email_trace_id")
)
def _use_memory(self) -> bool:
+2 -1
View File
@@ -1,6 +1,6 @@
from plba.bootstrap import create_runtime
from plba.config import ConfigFileLoader, FileConfigProvider
from plba.control import ControlActionSet, ControlChannel, ControlPlaneService, HttpControlChannel
from plba.control import ControlActionRequest, ControlActionSet, ControlChannel, ControlPlaneService, HttpControlChannel
from plba.contracts import (
ApplicationModule,
ConfigProvider,
@@ -35,6 +35,7 @@ __all__ = [
"ConfigFileLoader",
"ConfigProvider",
"ConfigurationManager",
"ControlActionRequest",
"ControlActionSet",
"ControlChannel",
"ControlPlaneService",
+2 -1
View File
@@ -1,8 +1,9 @@
from app_runtime.control.base import ControlActionSet, ControlChannel
from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel
from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.control.service import ControlPlaneService
__all__ = [
"ControlActionRequest",
"ControlActionSet",
"ControlChannel",
"ControlPlaneService",
+394
View File
@@ -0,0 +1,394 @@
from __future__ import annotations
from dataclasses import dataclass, field
from threading import Event, Lock, Thread
from time import monotonic, sleep
from fastapi.testclient import TestClient
from app_runtime.contracts.application import ApplicationModule
from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus
from app_runtime.control.base import ControlActionSet
from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.core.registration import ModuleRegistry
from app_runtime.core.runtime import RuntimeManager
@dataclass
class IntervalRoutine:
calls: list[float] = field(default_factory=list)
_lock: Lock = field(default_factory=Lock)
def run(self) -> None:
with self._lock:
self.calls.append(monotonic())
def wait_runs(self, count: int, timeout: float) -> bool:
deadline = monotonic() + timeout
while monotonic() < deadline:
with self._lock:
if len(self.calls) >= count:
return True
sleep(0.01)
return False
def deltas(self) -> list[float]:
with self._lock:
return [self.calls[index + 1] - self.calls[index] for index in range(len(self.calls) - 1)]
class BlockingRoutine:
def __init__(self, started: Event, release: Event) -> None:
self._started = started
self._release = release
def run(self) -> None:
self._started.set()
self._release.wait(timeout=5.0)
class ScenarioWorker(Worker):
def __init__(self, name: str, routine: object, *, interval: float = 0.05) -> None:
self._name = name
self._routine = routine
self._interval = interval
self._thread: Thread | None = None
self._stop_requested = Event()
self._in_flight = 0
self._runs = 0
self._lock = Lock()
@property
def name(self) -> str:
return self._name
@property
def critical(self) -> bool:
return True
def start(self) -> None:
if self._thread is not None and self._thread.is_alive():
return
self._stop_requested.clear()
self._thread = Thread(target=self._loop, name=f"{self._name}-thread", daemon=True)
self._thread.start()
def stop(self, force: bool = False) -> None:
self._stop_requested.set()
def health(self) -> WorkerHealth:
return WorkerHealth(name=self.name, status="ok", critical=True, meta={"runs": self._runs})
def status(self) -> WorkerStatus:
thread_alive = self._thread is not None and self._thread.is_alive()
with self._lock:
in_flight = self._in_flight
runs = self._runs
if not thread_alive:
state = "stopped"
elif self._stop_requested.is_set():
state = "stopping"
elif in_flight > 0:
state = "busy"
else:
state = "idle"
return WorkerStatus(name=self.name, state=state, in_flight=in_flight, meta={"runs": runs})
def _loop(self) -> None:
while not self._stop_requested.is_set():
with self._lock:
self._in_flight += 1
try:
self._routine.run()
with self._lock:
self._runs += 1
finally:
with self._lock:
self._in_flight -= 1
if self._stop_requested.is_set():
return
sleep(self._interval)
class ForceAwareWorker(Worker):
def __init__(self, name: str, started: Event, release: Event) -> None:
self._name = name
self._started = started
self._release = release
self._thread: Thread | None = None
self._stop_requested = Event()
self._force_stop = Event()
self.stop_calls: list[bool] = []
@property
def name(self) -> str:
return self._name
@property
def critical(self) -> bool:
return True
def start(self) -> None:
if self._thread is not None and self._thread.is_alive():
return
self._stop_requested.clear()
self._force_stop.clear()
self._thread = Thread(target=self._loop, name=f"{self._name}-thread", daemon=True)
self._thread.start()
def stop(self, force: bool = False) -> None:
self.stop_calls.append(force)
self._stop_requested.set()
if force:
self._force_stop.set()
def health(self) -> WorkerHealth:
return WorkerHealth(name=self.name, status="ok", critical=True)
def status(self) -> WorkerStatus:
thread_alive = self._thread is not None and self._thread.is_alive()
if not thread_alive:
state = "stopped"
elif self._stop_requested.is_set():
state = "stopping"
else:
state = "busy"
return WorkerStatus(name=self.name, state=state, in_flight=1 if thread_alive else 0)
def _loop(self) -> None:
self._started.set()
while not self._force_stop.is_set():
if self._release.wait(timeout=0.05):
return
class WorkerModule(ApplicationModule):
def __init__(self, worker: Worker) -> None:
self._worker = worker
@property
def name(self) -> str:
return "business-tests"
def register(self, registry: ModuleRegistry) -> None:
registry.add_worker(self._worker)
def _http_call_json(
client: TestClient,
path: str,
*,
method: str = "GET",
headers: dict[str, str] | None = None,
) -> tuple[int, dict[str, object]]:
response = client.request(method, path, headers=headers or {})
return response.status_code, response.json()
def _poll_until_stopped(client: TestClient, timeout: float = 2.0) -> bool:
deadline = monotonic() + timeout
while monotonic() < deadline:
_, payload = _http_call_json(client, "/actions/status")
if payload.get("detail") == "stopped":
return True
sleep(0.05)
return False
def _build_runtime(worker: Worker, *, control_timeout: int = 1) -> tuple[RuntimeManager, TestClient]:
runtime = RuntimeManager()
runtime.register_module(WorkerModule(worker))
runtime.start(start_control_plane=False)
channel = HttpControlChannel(host="127.0.0.1", port=0, timeout=control_timeout)
channel._actions = ControlActionSet(
health=runtime.health_status,
start=runtime.start_runtime,
stop=runtime.stop_runtime,
status=runtime.runtime_status,
)
app = channel._factory.create(channel._health_response, channel._action_response)
return runtime, TestClient(app)
def test_worker_wakes_up_with_configured_interval() -> None:
interval = 0.15
routine = IntervalRoutine()
worker = ScenarioWorker("interval-worker", routine, interval=interval)
runtime = RuntimeManager()
runtime.register_module(WorkerModule(worker))
runtime.start()
try:
assert routine.wait_runs(count=3, timeout=2.0) is True
finally:
runtime.stop()
deltas = routine.deltas()
assert len(deltas) >= 2
assert all(delta >= interval * 0.7 for delta in deltas[:2])
def test_actions_start_stop_and_health_when_worker_is_idle() -> None:
runtime, client = _build_runtime(ScenarioWorker("idle-worker", IntervalRoutine(), interval=0.2))
try:
stop_status, stop_payload = _http_call_json(client, "/actions/stop", method="POST")
assert stop_status == 200
assert isinstance(stop_payload.get("detail"), dict)
assert stop_payload["detail"]["timed_out"] is False
assert stop_payload["detail"]["state"] == "stopped"
health_stopped_status, health_stopped_payload = _http_call_json(client, "/health")
assert health_stopped_status == 503
assert health_stopped_payload["state"] == "stopped"
assert health_stopped_payload["status"] == "unhealthy"
start_status, start_payload = _http_call_json(client, "/actions/start", method="POST")
assert start_status == 200
assert isinstance(start_payload.get("detail"), dict)
assert start_payload["detail"]["timed_out"] is False
assert start_payload["detail"]["state"] in {"idle", "busy"}
health_started_status, health_started_payload = _http_call_json(client, "/health")
assert health_started_status == 200
assert health_started_payload["status"] == "ok"
assert health_started_payload["state"] in {"idle", "busy"}
finally:
client.close()
runtime.stop()
def test_actions_stop_busy_worker_before_timeout() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("busy-worker", BlockingRoutine(started, release)))
assert started.wait(timeout=1.0) is True
def release_later() -> None:
sleep(0.2)
release.set()
Thread(target=release_later, daemon=True).start()
try:
status, payload = _http_call_json(client, "/actions/stop", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopped"
health_status, health_payload = _http_call_json(client, "/health")
assert health_status == 503
assert health_payload["status"] == "unhealthy"
assert health_payload["state"] == "stopped"
finally:
client.close()
runtime.stop()
def test_actions_stop_busy_worker_after_timeout() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("slow-stop-worker", BlockingRoutine(started, release)))
runtime.ACTION_TIMEOUT_SECONDS = 0.3
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is True
assert payload["detail"]["state"] == "stopping"
health_status, health_payload = _http_call_json(client, "/health")
assert health_status == 503
assert health_payload["status"] == "degraded"
assert health_payload["state"] == "stopping"
release.set()
assert _poll_until_stopped(client, timeout=2.0) is True
stopped_health_status, stopped_health_payload = _http_call_json(client, "/health")
assert stopped_health_status == 503
assert stopped_health_payload["state"] == "stopped"
finally:
client.close()
runtime.stop()
def test_actions_stop_wait_false_returns_before_worker_finishes() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("async-stop-worker", BlockingRoutine(started, release)))
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop?wait=false", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopping"
health_status, health_payload = _http_call_json(client, "/health")
assert health_status == 503
assert health_payload["status"] == "degraded"
assert health_payload["state"] == "stopping"
release.set()
assert _poll_until_stopped(client, timeout=2.0) is True
finally:
client.close()
runtime.stop()
def test_actions_stop_honors_timeout_query_parameter() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("timeout-stop-worker", BlockingRoutine(started, release)))
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop?timeout=0.1", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is True
assert payload["detail"]["state"] == "stopping"
release.set()
assert _poll_until_stopped(client, timeout=2.0) is True
finally:
client.close()
runtime.stop()
def test_actions_stop_honors_force_query_parameter() -> None:
started = Event()
release = Event()
worker = ForceAwareWorker("force-stop-worker", started, release)
runtime, client = _build_runtime(worker)
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop?force=true&timeout=0.5", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopped"
assert worker.stop_calls == [True]
finally:
release.set()
client.close()
runtime.stop()
def test_actions_log_client_source_for_start_and_stop(caplog) -> None:
runtime, client = _build_runtime(ScenarioWorker("log-worker", IntervalRoutine(), interval=0.2))
caplog.set_level("WARNING", logger="app_runtime.control.http_app")
try:
_http_call_json(client, "/actions/stop", method="POST", headers={"X-Client-Source": "web-ui"})
_http_call_json(client, "/actions/start", method="POST", headers={"X-Client-Source": "web-ui"})
finally:
client.close()
runtime.stop()
messages = [record.getMessage() for record in caplog.records]
assert any("/actions/stop client=web-ui" in message for message in messages)
assert any("/actions/start client=web-ui" in message for message in messages)
+110 -3
View File
@@ -5,6 +5,8 @@ from dataclasses import dataclass, field
from threading import Event, Lock, Thread
from time import sleep
from fastapi.testclient import TestClient
from app_runtime.contracts.application import ApplicationModule
from app_runtime.contracts.health import HealthContributor
from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus
@@ -176,6 +178,28 @@ class BlockingModule(ApplicationModule):
registry.add_worker(RoutineWorker("blocking-worker", self.routine))
class WorkerModuleAdapter(ApplicationModule):
def __init__(self, worker: Worker) -> None:
self._worker = worker
@property
def name(self) -> str:
return "worker-adapter"
def register(self, registry: ModuleRegistry) -> None:
registry.add_worker(self._worker)
class ForceRecordingWorker(RoutineWorker):
def __init__(self) -> None:
super().__init__("force-recorder", CollectingRoutine(), interval=0.01)
self.stop_flags: list[bool] = []
def stop(self, force: bool = False) -> None:
self.stop_flags.append(force)
super().stop(force=force)
class RecordingTransport(NoOpTraceTransport):
def __init__(self) -> None:
self.contexts: list[object] = []
@@ -188,6 +212,21 @@ class RecordingTransport(NoOpTraceTransport):
self.messages.append(record)
def _build_control_client(runtime: RuntimeManager, *, control_timeout: int = 1) -> TestClient:
from app_runtime.control.base import ControlActionSet
from app_runtime.control.http_channel import HttpControlChannel
channel = HttpControlChannel("127.0.0.1", 0, control_timeout)
channel._actions = ControlActionSet(
health=runtime.health_status,
start=runtime.start_runtime,
stop=runtime.stop_runtime,
status=runtime.runtime_status,
)
app = channel._factory.create(channel._health_response, channel._action_response)
return TestClient(app)
def test_runtime_runs_worker_routine_and_exposes_status(tmp_path) -> None:
config_path = tmp_path / "config.yml"
config_path.write_text(
@@ -302,15 +341,15 @@ def test_http_control_channel_exposes_health_and_actions() -> None:
async def health():
return {"status": "ok" if state["started"] else "unhealthy", "state": "idle" if state["started"] else "stopped"}
async def start_handler() -> str:
async def start_handler(_request) -> str:
state["started"] = True
return "started"
async def stop_handler() -> str:
async def stop_handler(_request) -> str:
state["started"] = False
return "stopped"
async def status_handler() -> str:
async def status_handler(_request) -> str:
return "idle" if state["started"] else "stopped"
async def scenario() -> None:
@@ -339,6 +378,74 @@ def test_http_control_channel_exposes_health_and_actions() -> None:
asyncio.run(scenario())
def test_http_control_stop_wait_false_returns_immediately() -> None:
started = Event()
release = Event()
runtime = RuntimeManager()
runtime.register_module(BlockingModule(started, release))
runtime.start(start_control_plane=False)
assert started.wait(timeout=1.0) is True
client = _build_control_client(runtime)
try:
response = client.post("/actions/stop?wait=false")
payload = response.json()
assert response.status_code == 200
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopping"
health_response = client.get("/health")
health_payload = health_response.json()
assert health_response.status_code == 503
assert health_payload["state"] == "stopping"
assert health_payload["status"] == "degraded"
finally:
release.set()
client.close()
runtime.stop()
def test_http_control_stop_timeout_query_changes_wait_window() -> None:
started = Event()
release = Event()
runtime = RuntimeManager()
runtime.ACTION_TIMEOUT_SECONDS = 5.0
runtime.register_module(BlockingModule(started, release))
runtime.start(start_control_plane=False)
assert started.wait(timeout=1.0) is True
client = _build_control_client(runtime)
try:
response = client.post("/actions/stop?timeout=0.1")
payload = response.json()
assert response.status_code == 200
assert payload["detail"]["timed_out"] is True
assert payload["detail"]["state"] == "stopping"
finally:
release.set()
client.close()
runtime.stop()
def test_http_control_stop_force_query_propagates_to_worker() -> None:
runtime = RuntimeManager()
worker = ForceRecordingWorker()
runtime.register_module(WorkerModuleAdapter(worker))
runtime.start(start_control_plane=False)
client = _build_control_client(runtime)
try:
response = client.post("/actions/stop?force=true")
payload = response.json()
assert response.status_code == 200
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopped"
assert worker.stop_flags == [True]
finally:
client.close()
runtime.stop()
def test_public_plba_package_exports_runtime_builder_and_worker_contract(tmp_path) -> None:
import plba
from plba import ApplicationModule as PublicApplicationModule
+41
View File
@@ -0,0 +1,41 @@
from app_runtime.workflow.persistence.workflow_repository import WorkflowRepository
class StubConnectionFactory:
@staticmethod
def dumps(snapshot) -> str:
return str(snapshot)
def test_build_run_payload_prefers_active_task_trace_id() -> None:
repository = WorkflowRepository(StubConnectionFactory())
snapshot = {
"payload": {"inbox_message": {"external_message_id": "msg-1", "id": 7}},
"state": {
"runtime": {
"trace_id": "task-trace-1",
"email_trace_id": "message-trace-1",
"queue_task_id": 13,
}
},
}
payload = repository._build_run_payload("estimate", snapshot)
assert payload[0] == "estimate"
assert payload[2] == "msg-1"
assert payload[3] == 13
assert payload[4] == 7
assert payload[8] == "task-trace-1"
def test_build_run_payload_falls_back_to_legacy_email_trace_id() -> None:
repository = WorkflowRepository(StubConnectionFactory())
snapshot = {
"payload": {"inbox_message": {"external_message_id": "msg-2", "id": 8}},
"state": {"runtime": {"email_trace_id": "message-trace-2"}},
}
payload = repository._build_run_payload("estimate", snapshot)
assert payload[8] == "message-trace-2"