Compare commits

...

2 Commits

Author SHA1 Message Date
alex ed33f6e9cd Небольшие доработки по трейсу 2026-04-26 20:47:13 +03:00
alex 314e6f3c46 Гитигнор 2026-04-26 20:36:31 +03:00
16 changed files with 725 additions and 26 deletions
+3
View File
@@ -1,2 +1,5 @@
__pycache__/ __pycache__/
*.pyc *.pyc
.pytest_cache/
build/
plba.egg-info/
+7
View File
@@ -0,0 +1,7 @@
{
"python.testing.pytestArgs": [
"tests"
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
+58
View File
@@ -1,5 +1,29 @@
# PLBA # PLBA
## Установка
Установка пакета напрямую из Git-репозитория через `pip`:
```bash
export GIT_PLBA_TOKEN="<ваш_токен>"
pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git"
```
Если нужен конкретный тег, ветка или commit, добавьте ref после `.git`:
```bash
pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git@main"
```
### Доступ через `GIT_PLBA_TOKEN`
Для установки по HTTPS используется переменная окружения `GIT_PLBA_TOKEN`. В нее нужно положить персональный Git-токен с правом чтения репозитория `alex/plba`. Токен передается в URL установки и позволяет `pip` скачать исходники без SSH-доступа.
Рекомендуется:
- экспортировать токен только в текущую shell-сессию;
- не хардкодить токен в `requirements.txt`, `pyproject.toml` или исходниках;
- использовать отдельный read-only токен, если Git-сервер это поддерживает.
## 1. Назначение платформы ## 1. Назначение платформы
PLBA (`Platform Runtime for Business Applications`) - это runtime-слой для бизнес-приложений, который забирает на себя инфраструктурную часть исполнения. Платформа стандартизирует запуск и остановку рабочих процессов, контроль состояния приложения и эксплуатационные сервисы вокруг них. За счет этого прикладной код концентрируется на бизнес-логике, а не на lifecycle, диагностике и служебных механизмах. Базовая модель использования строится вокруг `ApplicationModule`, `Worker` и прикладной `Routine`, где каждый уровень отвечает за свою часть ответственности. В результате приложение получается предсказуемым в эксплуатации, проще в сопровождении и масштабировании. PLBA (`Platform Runtime for Business Applications`) - это runtime-слой для бизнес-приложений, который забирает на себя инфраструктурную часть исполнения. Платформа стандартизирует запуск и остановку рабочих процессов, контроль состояния приложения и эксплуатационные сервисы вокруг них. За счет этого прикладной код концентрируется на бизнес-логике, а не на lifecycle, диагностике и служебных механизмах. Базовая модель использования строится вокруг `ApplicationModule`, `Worker` и прикладной `Routine`, где каждый уровень отвечает за свою часть ответственности. В результате приложение получается предсказуемым в эксплуатации, проще в сопровождении и масштабировании.
@@ -189,6 +213,11 @@ classDiagram
- Для чего нужен: позволяет собрать дерево исполнения и связать все сообщения конкретной бизнес-операции. - Для чего нужен: позволяет собрать дерево исполнения и связать все сообщения конкретной бизнес-операции.
- Атрибуты контекста (`TraceContextRecord`): `trace_id`, `alias`, `parent_id`, `type`, `event_time`, `attrs`. - Атрибуты контекста (`TraceContextRecord`): `trace_id`, `alias`, `parent_id`, `type`, `event_time`, `attrs`.
- Иерархия: `parent_id` указывает на родительский контекст; так строится цепочка root -> child. - Иерархия: `parent_id` указывает на родительский контекст; так строится цепочка root -> child.
- Стандартная модель для бизнес-приложений:
- root-context создается на внешний триггер верхнего уровня, например входящее письмо, webhook, команду оператора или batch-запуск;
- child-context создается на каждую производную единицу работы, например вложение письма, отдельную задачу, заказ или документ;
- child-context должен создаваться через `parent_id=<trace_id root-context>` и передаваться дальше через `trace_context` / runtime metadata как активный trace конкретной бизнес-операции;
- если одно входящее сообщение порождает несколько задач, у них должен быть общий parent root-context и разные child trace contexts.
- Таблица: `trace_contexts`. - Таблица: `trace_contexts`.
- Как объявляется в коде: - Как объявляется в коде:
```python ```python
@@ -199,6 +228,22 @@ with traces.open_context(alias="orders-worker", kind="worker", attrs={"routine":
root = traces.new_root("orders.sync") root = traces.new_root("orders.sync")
child = traces.child_of(root, "orders.process_batch") child = traces.child_of(root, "orders.process_batch")
``` ```
```python
with traces.open_context(alias="email:123", kind="email") as message_trace_id:
...
with traces.open_context(
alias="attachment:invoice.xlsx",
parent_id=message_trace_id,
kind="task",
attrs={"attachment_index": 0},
) as task_trace_id:
runtime["trace_id"] = task_trace_id
runtime["message_trace_id"] = message_trace_id
...
```
- Стандарт для workflow persistence:
- если workflow представляет отдельную бизнес-задачу, в `state.runtime.trace_id` должен лежать активный trace этой задачи;
- дополнительные идентификаторы родительских контекстов (`message_trace_id`, `email_trace_id` и т.д.) могут храниться рядом в runtime для логов и связности, но `workflow_runs.trace_id` должен ссылаться именно на активный trace текущей задачи.
- Trace Message: - Trace Message:
- Что это: событие внутри активного context (статус шага, предупреждение, ошибка, служебная информация). - Что это: событие внутри активного context (статус шага, предупреждение, ошибка, служебная информация).
- Роль `step`: текущая стадия операции (`parse`, `validate`, `persist` и т.д.), которую выставляют через `traces.step("...")`. - Роль `step`: текущая стадия операции (`parse`, `validate`, `persist` и т.д.), которую выставляют через `traces.step("...")`.
@@ -245,6 +290,19 @@ child = traces.child_of(root, "orders.process_batch")
- При создании runtime включить `enable_http_control=True`. - При создании runtime включить `enable_http_control=True`.
- Использовать `/health` для readiness/liveness и `/actions/*` для операционного контроля. - Использовать `/health` для readiness/liveness и `/actions/*` для операционного контроля.
#### Простой Web UI через nginx (порт 15000)
- Статические файлы UI: `web/control-ui/` (`index.html`, `app.js`, `styles.css`).
- UI использует polling (`/api/health` раз в 2 секунды) и кнопки `Start/Stop` (`POST /api/actions/start|stop`).
- Для вызовов API UI передает заголовок `X-Client-Source: web-ui`.
- Пример server-конфига nginx: `deploy/nginx/plba-control-ui.conf`.
- Слушает `15000`.
- Отдает UI из `/usr/share/nginx/html`.
- Проксирует `/api/*` на control API `http://app:8080/*` (sidecar в docker network).
- Пример запуска в составе бизнес-приложения: `deploy/docker-compose.control-ui.yml`.
- Публикуется только один внешний порт: `15000`.
- Внутренний control API остается в сети compose и доступен nginx по имени сервиса `app`.
- В `app` нужно поднять control channel на `0.0.0.0:8080`.
#### Queue #### Queue
- Назначение: простой in-memory буфер задач/сообщений внутри приложения. - Назначение: простой in-memory буфер задач/сообщений внутри приложения.
- Реализация: `InMemoryTaskQueue[T]`. - Реализация: `InMemoryTaskQueue[T]`.
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "plba" name = "plba"
version = "0.2.7" version = "0.2.8"
description = "Platform runtime for business applications" description = "Platform runtime for business applications"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
+2 -2
View File
@@ -1,5 +1,5 @@
from app_runtime.control.base import ControlActionSet, ControlChannel from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel
from app_runtime.control.http_channel import HttpControlChannel from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.control.service import ControlPlaneService from app_runtime.control.service import ControlPlaneService
__all__ = ["ControlActionSet", "ControlChannel", "ControlPlaneService", "HttpControlChannel"] __all__ = ["ControlActionRequest", "ControlActionSet", "ControlChannel", "ControlPlaneService", "HttpControlChannel"]
+10 -1
View File
@@ -6,7 +6,16 @@ from dataclasses import dataclass
from app_runtime.core.types import HealthPayload from app_runtime.core.types import HealthPayload
ActionHandler = Callable[[], Awaitable[str]]
@dataclass(slots=True)
class ControlActionRequest:
wait: bool | None = None
timeout: float | None = None
force: bool | None = None
ActionResult = str | dict[str, object]
ActionHandler = Callable[[ControlActionRequest], Awaitable[ActionResult]]
HealthHandler = Callable[[], Awaitable[HealthPayload]] HealthHandler = Callable[[], Awaitable[HealthPayload]]
+53 -3
View File
@@ -1,19 +1,23 @@
from __future__ import annotations from __future__ import annotations
import logging
import time import time
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from fastapi import FastAPI, Request from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from app_runtime.control.base import ControlActionRequest
from app_runtime.core.types import HealthPayload from app_runtime.core.types import HealthPayload
LOGGER = logging.getLogger(__name__)
class HttpControlAppFactory: class HttpControlAppFactory:
def create( def create(
self, self,
health_provider: Callable[[], Awaitable[HealthPayload]], health_provider: Callable[[], Awaitable[HealthPayload]],
action_provider: Callable[[str], Awaitable[JSONResponse]], action_provider: Callable[[str, str, ControlActionRequest], Awaitable[JSONResponse]],
) -> FastAPI: ) -> FastAPI:
app = FastAPI(title="PLBA Control API") app = FastAPI(title="PLBA Control API")
@@ -32,7 +36,53 @@ class HttpControlAppFactory:
@app.get("/actions/{action}") @app.get("/actions/{action}")
@app.post("/actions/{action}") @app.post("/actions/{action}")
async def action(action: str) -> JSONResponse: async def action(action: str, request: Request) -> JSONResponse:
return await action_provider(action) client_source = self._client_source(request)
if action in {"start", "stop"}:
LOGGER.warning("Control action requested: /actions/%s client=%s", action, client_source)
try:
action_request = self._action_request(request)
except ValueError as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=400)
return await action_provider(action, client_source, action_request)
return app return app
def _action_request(self, request: Request) -> ControlActionRequest:
return ControlActionRequest(
wait=self._bool_param(request, "wait"),
timeout=self._float_param(request, "timeout"),
force=self._bool_param(request, "force"),
)
def _client_source(self, request: Request) -> str:
explicit_header = request.headers.get("X-Client-Source", "").strip()
if explicit_header:
return explicit_header
user_agent = request.headers.get("User-Agent", "").strip()
if user_agent:
return f"user-agent:{user_agent}"
return "unknown"
def _bool_param(self, request: Request, name: str) -> bool | None:
raw_value = request.query_params.get(name)
if raw_value is None:
return None
normalized = raw_value.strip().lower()
if normalized in {"1", "true", "yes", "on"}:
return True
if normalized in {"0", "false", "no", "off"}:
return False
raise ValueError(f"invalid boolean query parameter: {name}={raw_value}")
def _float_param(self, request: Request, name: str) -> float | None:
raw_value = request.query_params.get(name)
if raw_value is None:
return None
try:
value = float(raw_value)
except ValueError as exc:
raise ValueError(f"invalid numeric query parameter: {name}={raw_value}") from exc
if value < 0:
raise ValueError(f"query parameter must be >= 0: {name}={raw_value}")
return value
+16 -4
View File
@@ -4,7 +4,7 @@ import asyncio
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from app_runtime.control.base import ControlActionSet, ControlChannel from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel
from app_runtime.control.http_app import HttpControlAppFactory from app_runtime.control.http_app import HttpControlAppFactory
from app_runtime.control.http_runner import UvicornThreadRunner from app_runtime.control.http_runner import UvicornThreadRunner
@@ -33,7 +33,12 @@ class HttpControlChannel(ControlChannel):
return {"status": "unhealthy", "detail": "control actions are not configured"} return {"status": "unhealthy", "detail": "control actions are not configured"}
return await asyncio.wait_for(self._actions.health(), timeout=float(self._timeout)) return await asyncio.wait_for(self._actions.health(), timeout=float(self._timeout))
async def _action_response(self, action: str) -> JSONResponse: async def _action_response(
self,
action: str,
_client_source: str = "unknown",
request: ControlActionRequest | None = None,
) -> JSONResponse:
if self._actions is None: if self._actions is None:
return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404) return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404)
callbacks = { callbacks = {
@@ -44,9 +49,10 @@ class HttpControlChannel(ControlChannel):
callback = callbacks.get(action) callback = callbacks.get(action)
if callback is None: if callback is None:
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404) return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
action_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout) action_request = request or ControlActionRequest()
action_timeout = self._action_timeout(action, action_request)
try: try:
detail = await asyncio.wait_for(callback(), timeout=action_timeout) detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
return JSONResponse( return JSONResponse(
content={"status": "accepted", "detail": f"{action} operation is still in progress"}, content={"status": "accepted", "detail": f"{action} operation is still in progress"},
@@ -55,3 +61,9 @@ class HttpControlChannel(ControlChannel):
except Exception as exc: except Exception as exc:
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200) return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
def _action_timeout(self, action: str, request: ControlActionRequest) -> float:
base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
if action != "stop" or request.wait is False or request.timeout is None:
return base_timeout
return max(base_timeout, float(request.timeout) + 1.0)
+11 -5
View File
@@ -4,6 +4,7 @@ from time import monotonic, sleep
from app_runtime.config.providers import FileConfigProvider from app_runtime.config.providers import FileConfigProvider
from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.application import ApplicationModule
from app_runtime.control.base import ControlActionRequest
from app_runtime.control.service import ControlPlaneService from app_runtime.control.service import ControlPlaneService
from app_runtime.core.configuration import ConfigurationManager from app_runtime.core.configuration import ConfigurationManager
from app_runtime.core.registration import ModuleRegistry from app_runtime.core.registration import ModuleRegistry
@@ -87,7 +88,7 @@ class RuntimeManager:
async def health_status(self) -> HealthPayload: async def health_status(self) -> HealthPayload:
return self.current_health() return self.current_health()
async def start_runtime(self) -> dict[str, object] | str: async def start_runtime(self, _request: ControlActionRequest) -> dict[str, object] | str:
self._refresh_state() self._refresh_state()
if self._started: if self._started:
return "runtime already running" return "runtime already running"
@@ -100,24 +101,29 @@ class RuntimeManager:
return self._action_detail("runtime started", timed_out=False) return self._action_detail("runtime started", timed_out=False)
return self._action_detail("runtime start is still in progress", timed_out=True) return self._action_detail("runtime start is still in progress", timed_out=True)
async def stop_runtime(self) -> dict[str, object] | str: async def stop_runtime(self, request: ControlActionRequest) -> dict[str, object] | str:
self._refresh_state() self._refresh_state()
if not self._started: if not self._started:
if self._state == LifecycleState.STOPPING: if self._state == LifecycleState.STOPPING:
return self._action_detail("runtime stop is still in progress", timed_out=True) return self._action_detail("runtime stop is still in progress", timed_out=True)
return "runtime already stopped" return "runtime already stopped"
wait = True if request.wait is None else request.wait
timeout = self.ACTION_TIMEOUT_SECONDS if request.timeout is None else float(request.timeout)
force = False if request.force is None else request.force
self._state = LifecycleState.STOPPING self._state = LifecycleState.STOPPING
try: try:
self.workers.stop(timeout=self.ACTION_TIMEOUT_SECONDS, force=False) self.workers.stop(timeout=timeout, force=force, wait=wait)
except TimeoutError: except TimeoutError:
return self._action_detail("runtime stop is still in progress", timed_out=True) return self._action_detail("runtime stop is still in progress", timed_out=True)
self._refresh_state()
if self._state == LifecycleState.STOPPED:
self._started = False self._started = False
self._state = LifecycleState.STOPPED
return self._action_detail("runtime stopped", timed_out=False) return self._action_detail("runtime stopped", timed_out=False)
return self._action_detail("runtime stop requested", timed_out=False)
async def runtime_status(self) -> str: async def runtime_status(self, _request: ControlActionRequest) -> str:
self._refresh_state() self._refresh_state()
return self._state.value return self._state.value
+2 -2
View File
@@ -18,10 +18,10 @@ class WorkerSupervisor:
for worker in self._workers: for worker in self._workers:
worker.start() worker.start()
def stop(self, timeout: float = 30.0, force: bool = False) -> None: def stop(self, timeout: float = 30.0, force: bool = False, wait: bool = True) -> None:
for worker in self._workers: for worker in self._workers:
worker.stop(force=force) worker.stop(force=force)
if force: if not wait:
return return
deadline = monotonic() + timeout deadline = monotonic() + timeout
while True: while True:
@@ -133,7 +133,17 @@ class WorkflowRepository:
None, None,
"running", "running",
self._connection_factory.dumps(snapshot), self._connection_factory.dumps(snapshot),
runtime.get("email_trace_id"), self._resolve_trace_id(runtime),
)
@staticmethod
def _resolve_trace_id(runtime: dict[str, Any]) -> str | None:
return (
runtime.get("trace_id")
or runtime.get("task_trace_id")
or runtime.get("order_trace_id")
or runtime.get("attachment_trace_id")
or runtime.get("email_trace_id")
) )
def _use_memory(self) -> bool: def _use_memory(self) -> bool:
+2 -1
View File
@@ -1,6 +1,6 @@
from plba.bootstrap import create_runtime from plba.bootstrap import create_runtime
from plba.config import ConfigFileLoader, FileConfigProvider from plba.config import ConfigFileLoader, FileConfigProvider
from plba.control import ControlActionSet, ControlChannel, ControlPlaneService, HttpControlChannel from plba.control import ControlActionRequest, ControlActionSet, ControlChannel, ControlPlaneService, HttpControlChannel
from plba.contracts import ( from plba.contracts import (
ApplicationModule, ApplicationModule,
ConfigProvider, ConfigProvider,
@@ -35,6 +35,7 @@ __all__ = [
"ConfigFileLoader", "ConfigFileLoader",
"ConfigProvider", "ConfigProvider",
"ConfigurationManager", "ConfigurationManager",
"ControlActionRequest",
"ControlActionSet", "ControlActionSet",
"ControlChannel", "ControlChannel",
"ControlPlaneService", "ControlPlaneService",
+2 -1
View File
@@ -1,8 +1,9 @@
from app_runtime.control.base import ControlActionSet, ControlChannel from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel
from app_runtime.control.http_channel import HttpControlChannel from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.control.service import ControlPlaneService from app_runtime.control.service import ControlPlaneService
__all__ = [ __all__ = [
"ControlActionRequest",
"ControlActionSet", "ControlActionSet",
"ControlChannel", "ControlChannel",
"ControlPlaneService", "ControlPlaneService",
+394
View File
@@ -0,0 +1,394 @@
from __future__ import annotations
from dataclasses import dataclass, field
from threading import Event, Lock, Thread
from time import monotonic, sleep
from fastapi.testclient import TestClient
from app_runtime.contracts.application import ApplicationModule
from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus
from app_runtime.control.base import ControlActionSet
from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.core.registration import ModuleRegistry
from app_runtime.core.runtime import RuntimeManager
@dataclass
class IntervalRoutine:
calls: list[float] = field(default_factory=list)
_lock: Lock = field(default_factory=Lock)
def run(self) -> None:
with self._lock:
self.calls.append(monotonic())
def wait_runs(self, count: int, timeout: float) -> bool:
deadline = monotonic() + timeout
while monotonic() < deadline:
with self._lock:
if len(self.calls) >= count:
return True
sleep(0.01)
return False
def deltas(self) -> list[float]:
with self._lock:
return [self.calls[index + 1] - self.calls[index] for index in range(len(self.calls) - 1)]
class BlockingRoutine:
def __init__(self, started: Event, release: Event) -> None:
self._started = started
self._release = release
def run(self) -> None:
self._started.set()
self._release.wait(timeout=5.0)
class ScenarioWorker(Worker):
def __init__(self, name: str, routine: object, *, interval: float = 0.05) -> None:
self._name = name
self._routine = routine
self._interval = interval
self._thread: Thread | None = None
self._stop_requested = Event()
self._in_flight = 0
self._runs = 0
self._lock = Lock()
@property
def name(self) -> str:
return self._name
@property
def critical(self) -> bool:
return True
def start(self) -> None:
if self._thread is not None and self._thread.is_alive():
return
self._stop_requested.clear()
self._thread = Thread(target=self._loop, name=f"{self._name}-thread", daemon=True)
self._thread.start()
def stop(self, force: bool = False) -> None:
self._stop_requested.set()
def health(self) -> WorkerHealth:
return WorkerHealth(name=self.name, status="ok", critical=True, meta={"runs": self._runs})
def status(self) -> WorkerStatus:
thread_alive = self._thread is not None and self._thread.is_alive()
with self._lock:
in_flight = self._in_flight
runs = self._runs
if not thread_alive:
state = "stopped"
elif self._stop_requested.is_set():
state = "stopping"
elif in_flight > 0:
state = "busy"
else:
state = "idle"
return WorkerStatus(name=self.name, state=state, in_flight=in_flight, meta={"runs": runs})
def _loop(self) -> None:
while not self._stop_requested.is_set():
with self._lock:
self._in_flight += 1
try:
self._routine.run()
with self._lock:
self._runs += 1
finally:
with self._lock:
self._in_flight -= 1
if self._stop_requested.is_set():
return
sleep(self._interval)
class ForceAwareWorker(Worker):
def __init__(self, name: str, started: Event, release: Event) -> None:
self._name = name
self._started = started
self._release = release
self._thread: Thread | None = None
self._stop_requested = Event()
self._force_stop = Event()
self.stop_calls: list[bool] = []
@property
def name(self) -> str:
return self._name
@property
def critical(self) -> bool:
return True
def start(self) -> None:
if self._thread is not None and self._thread.is_alive():
return
self._stop_requested.clear()
self._force_stop.clear()
self._thread = Thread(target=self._loop, name=f"{self._name}-thread", daemon=True)
self._thread.start()
def stop(self, force: bool = False) -> None:
self.stop_calls.append(force)
self._stop_requested.set()
if force:
self._force_stop.set()
def health(self) -> WorkerHealth:
return WorkerHealth(name=self.name, status="ok", critical=True)
def status(self) -> WorkerStatus:
thread_alive = self._thread is not None and self._thread.is_alive()
if not thread_alive:
state = "stopped"
elif self._stop_requested.is_set():
state = "stopping"
else:
state = "busy"
return WorkerStatus(name=self.name, state=state, in_flight=1 if thread_alive else 0)
def _loop(self) -> None:
self._started.set()
while not self._force_stop.is_set():
if self._release.wait(timeout=0.05):
return
class WorkerModule(ApplicationModule):
def __init__(self, worker: Worker) -> None:
self._worker = worker
@property
def name(self) -> str:
return "business-tests"
def register(self, registry: ModuleRegistry) -> None:
registry.add_worker(self._worker)
def _http_call_json(
client: TestClient,
path: str,
*,
method: str = "GET",
headers: dict[str, str] | None = None,
) -> tuple[int, dict[str, object]]:
response = client.request(method, path, headers=headers or {})
return response.status_code, response.json()
def _poll_until_stopped(client: TestClient, timeout: float = 2.0) -> bool:
deadline = monotonic() + timeout
while monotonic() < deadline:
_, payload = _http_call_json(client, "/actions/status")
if payload.get("detail") == "stopped":
return True
sleep(0.05)
return False
def _build_runtime(worker: Worker, *, control_timeout: int = 1) -> tuple[RuntimeManager, TestClient]:
runtime = RuntimeManager()
runtime.register_module(WorkerModule(worker))
runtime.start(start_control_plane=False)
channel = HttpControlChannel(host="127.0.0.1", port=0, timeout=control_timeout)
channel._actions = ControlActionSet(
health=runtime.health_status,
start=runtime.start_runtime,
stop=runtime.stop_runtime,
status=runtime.runtime_status,
)
app = channel._factory.create(channel._health_response, channel._action_response)
return runtime, TestClient(app)
def test_worker_wakes_up_with_configured_interval() -> None:
interval = 0.15
routine = IntervalRoutine()
worker = ScenarioWorker("interval-worker", routine, interval=interval)
runtime = RuntimeManager()
runtime.register_module(WorkerModule(worker))
runtime.start()
try:
assert routine.wait_runs(count=3, timeout=2.0) is True
finally:
runtime.stop()
deltas = routine.deltas()
assert len(deltas) >= 2
assert all(delta >= interval * 0.7 for delta in deltas[:2])
def test_actions_start_stop_and_health_when_worker_is_idle() -> None:
runtime, client = _build_runtime(ScenarioWorker("idle-worker", IntervalRoutine(), interval=0.2))
try:
stop_status, stop_payload = _http_call_json(client, "/actions/stop", method="POST")
assert stop_status == 200
assert isinstance(stop_payload.get("detail"), dict)
assert stop_payload["detail"]["timed_out"] is False
assert stop_payload["detail"]["state"] == "stopped"
health_stopped_status, health_stopped_payload = _http_call_json(client, "/health")
assert health_stopped_status == 503
assert health_stopped_payload["state"] == "stopped"
assert health_stopped_payload["status"] == "unhealthy"
start_status, start_payload = _http_call_json(client, "/actions/start", method="POST")
assert start_status == 200
assert isinstance(start_payload.get("detail"), dict)
assert start_payload["detail"]["timed_out"] is False
assert start_payload["detail"]["state"] in {"idle", "busy"}
health_started_status, health_started_payload = _http_call_json(client, "/health")
assert health_started_status == 200
assert health_started_payload["status"] == "ok"
assert health_started_payload["state"] in {"idle", "busy"}
finally:
client.close()
runtime.stop()
def test_actions_stop_busy_worker_before_timeout() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("busy-worker", BlockingRoutine(started, release)))
assert started.wait(timeout=1.0) is True
def release_later() -> None:
sleep(0.2)
release.set()
Thread(target=release_later, daemon=True).start()
try:
status, payload = _http_call_json(client, "/actions/stop", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopped"
health_status, health_payload = _http_call_json(client, "/health")
assert health_status == 503
assert health_payload["status"] == "unhealthy"
assert health_payload["state"] == "stopped"
finally:
client.close()
runtime.stop()
def test_actions_stop_busy_worker_after_timeout() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("slow-stop-worker", BlockingRoutine(started, release)))
runtime.ACTION_TIMEOUT_SECONDS = 0.3
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is True
assert payload["detail"]["state"] == "stopping"
health_status, health_payload = _http_call_json(client, "/health")
assert health_status == 503
assert health_payload["status"] == "degraded"
assert health_payload["state"] == "stopping"
release.set()
assert _poll_until_stopped(client, timeout=2.0) is True
stopped_health_status, stopped_health_payload = _http_call_json(client, "/health")
assert stopped_health_status == 503
assert stopped_health_payload["state"] == "stopped"
finally:
client.close()
runtime.stop()
def test_actions_stop_wait_false_returns_before_worker_finishes() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("async-stop-worker", BlockingRoutine(started, release)))
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop?wait=false", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopping"
health_status, health_payload = _http_call_json(client, "/health")
assert health_status == 503
assert health_payload["status"] == "degraded"
assert health_payload["state"] == "stopping"
release.set()
assert _poll_until_stopped(client, timeout=2.0) is True
finally:
client.close()
runtime.stop()
def test_actions_stop_honors_timeout_query_parameter() -> None:
started = Event()
release = Event()
runtime, client = _build_runtime(ScenarioWorker("timeout-stop-worker", BlockingRoutine(started, release)))
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop?timeout=0.1", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is True
assert payload["detail"]["state"] == "stopping"
release.set()
assert _poll_until_stopped(client, timeout=2.0) is True
finally:
client.close()
runtime.stop()
def test_actions_stop_honors_force_query_parameter() -> None:
started = Event()
release = Event()
worker = ForceAwareWorker("force-stop-worker", started, release)
runtime, client = _build_runtime(worker)
assert started.wait(timeout=1.0) is True
try:
status, payload = _http_call_json(client, "/actions/stop?force=true&timeout=0.5", method="POST")
assert status == 200
assert isinstance(payload.get("detail"), dict)
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopped"
assert worker.stop_calls == [True]
finally:
release.set()
client.close()
runtime.stop()
def test_actions_log_client_source_for_start_and_stop(caplog) -> None:
runtime, client = _build_runtime(ScenarioWorker("log-worker", IntervalRoutine(), interval=0.2))
caplog.set_level("WARNING", logger="app_runtime.control.http_app")
try:
_http_call_json(client, "/actions/stop", method="POST", headers={"X-Client-Source": "web-ui"})
_http_call_json(client, "/actions/start", method="POST", headers={"X-Client-Source": "web-ui"})
finally:
client.close()
runtime.stop()
messages = [record.getMessage() for record in caplog.records]
assert any("/actions/stop client=web-ui" in message for message in messages)
assert any("/actions/start client=web-ui" in message for message in messages)
+110 -3
View File
@@ -5,6 +5,8 @@ from dataclasses import dataclass, field
from threading import Event, Lock, Thread from threading import Event, Lock, Thread
from time import sleep from time import sleep
from fastapi.testclient import TestClient
from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.application import ApplicationModule
from app_runtime.contracts.health import HealthContributor from app_runtime.contracts.health import HealthContributor
from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus
@@ -176,6 +178,28 @@ class BlockingModule(ApplicationModule):
registry.add_worker(RoutineWorker("blocking-worker", self.routine)) registry.add_worker(RoutineWorker("blocking-worker", self.routine))
class WorkerModuleAdapter(ApplicationModule):
def __init__(self, worker: Worker) -> None:
self._worker = worker
@property
def name(self) -> str:
return "worker-adapter"
def register(self, registry: ModuleRegistry) -> None:
registry.add_worker(self._worker)
class ForceRecordingWorker(RoutineWorker):
def __init__(self) -> None:
super().__init__("force-recorder", CollectingRoutine(), interval=0.01)
self.stop_flags: list[bool] = []
def stop(self, force: bool = False) -> None:
self.stop_flags.append(force)
super().stop(force=force)
class RecordingTransport(NoOpTraceTransport): class RecordingTransport(NoOpTraceTransport):
def __init__(self) -> None: def __init__(self) -> None:
self.contexts: list[object] = [] self.contexts: list[object] = []
@@ -188,6 +212,21 @@ class RecordingTransport(NoOpTraceTransport):
self.messages.append(record) self.messages.append(record)
def _build_control_client(runtime: RuntimeManager, *, control_timeout: int = 1) -> TestClient:
from app_runtime.control.base import ControlActionSet
from app_runtime.control.http_channel import HttpControlChannel
channel = HttpControlChannel("127.0.0.1", 0, control_timeout)
channel._actions = ControlActionSet(
health=runtime.health_status,
start=runtime.start_runtime,
stop=runtime.stop_runtime,
status=runtime.runtime_status,
)
app = channel._factory.create(channel._health_response, channel._action_response)
return TestClient(app)
def test_runtime_runs_worker_routine_and_exposes_status(tmp_path) -> None: def test_runtime_runs_worker_routine_and_exposes_status(tmp_path) -> None:
config_path = tmp_path / "config.yml" config_path = tmp_path / "config.yml"
config_path.write_text( config_path.write_text(
@@ -302,15 +341,15 @@ def test_http_control_channel_exposes_health_and_actions() -> None:
async def health(): async def health():
return {"status": "ok" if state["started"] else "unhealthy", "state": "idle" if state["started"] else "stopped"} return {"status": "ok" if state["started"] else "unhealthy", "state": "idle" if state["started"] else "stopped"}
async def start_handler() -> str: async def start_handler(_request) -> str:
state["started"] = True state["started"] = True
return "started" return "started"
async def stop_handler() -> str: async def stop_handler(_request) -> str:
state["started"] = False state["started"] = False
return "stopped" return "stopped"
async def status_handler() -> str: async def status_handler(_request) -> str:
return "idle" if state["started"] else "stopped" return "idle" if state["started"] else "stopped"
async def scenario() -> None: async def scenario() -> None:
@@ -339,6 +378,74 @@ def test_http_control_channel_exposes_health_and_actions() -> None:
asyncio.run(scenario()) asyncio.run(scenario())
def test_http_control_stop_wait_false_returns_immediately() -> None:
started = Event()
release = Event()
runtime = RuntimeManager()
runtime.register_module(BlockingModule(started, release))
runtime.start(start_control_plane=False)
assert started.wait(timeout=1.0) is True
client = _build_control_client(runtime)
try:
response = client.post("/actions/stop?wait=false")
payload = response.json()
assert response.status_code == 200
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopping"
health_response = client.get("/health")
health_payload = health_response.json()
assert health_response.status_code == 503
assert health_payload["state"] == "stopping"
assert health_payload["status"] == "degraded"
finally:
release.set()
client.close()
runtime.stop()
def test_http_control_stop_timeout_query_changes_wait_window() -> None:
started = Event()
release = Event()
runtime = RuntimeManager()
runtime.ACTION_TIMEOUT_SECONDS = 5.0
runtime.register_module(BlockingModule(started, release))
runtime.start(start_control_plane=False)
assert started.wait(timeout=1.0) is True
client = _build_control_client(runtime)
try:
response = client.post("/actions/stop?timeout=0.1")
payload = response.json()
assert response.status_code == 200
assert payload["detail"]["timed_out"] is True
assert payload["detail"]["state"] == "stopping"
finally:
release.set()
client.close()
runtime.stop()
def test_http_control_stop_force_query_propagates_to_worker() -> None:
runtime = RuntimeManager()
worker = ForceRecordingWorker()
runtime.register_module(WorkerModuleAdapter(worker))
runtime.start(start_control_plane=False)
client = _build_control_client(runtime)
try:
response = client.post("/actions/stop?force=true")
payload = response.json()
assert response.status_code == 200
assert payload["detail"]["timed_out"] is False
assert payload["detail"]["state"] == "stopped"
assert worker.stop_flags == [True]
finally:
client.close()
runtime.stop()
def test_public_plba_package_exports_runtime_builder_and_worker_contract(tmp_path) -> None: def test_public_plba_package_exports_runtime_builder_and_worker_contract(tmp_path) -> None:
import plba import plba
from plba import ApplicationModule as PublicApplicationModule from plba import ApplicationModule as PublicApplicationModule
+41
View File
@@ -0,0 +1,41 @@
from app_runtime.workflow.persistence.workflow_repository import WorkflowRepository
class StubConnectionFactory:
@staticmethod
def dumps(snapshot) -> str:
return str(snapshot)
def test_build_run_payload_prefers_active_task_trace_id() -> None:
repository = WorkflowRepository(StubConnectionFactory())
snapshot = {
"payload": {"inbox_message": {"external_message_id": "msg-1", "id": 7}},
"state": {
"runtime": {
"trace_id": "task-trace-1",
"email_trace_id": "message-trace-1",
"queue_task_id": 13,
}
},
}
payload = repository._build_run_payload("estimate", snapshot)
assert payload[0] == "estimate"
assert payload[2] == "msg-1"
assert payload[3] == 13
assert payload[4] == 7
assert payload[8] == "task-trace-1"
def test_build_run_payload_falls_back_to_legacy_email_trace_id() -> None:
repository = WorkflowRepository(StubConnectionFactory())
snapshot = {
"payload": {"inbox_message": {"external_message_id": "msg-2", "id": 8}},
"state": {"runtime": {"email_trace_id": "message-trace-2"}},
}
payload = repository._build_run_payload("estimate", snapshot)
assert payload[8] == "message-trace-2"