diff --git a/.gitignore b/.gitignore index f4e69dd..3c0e7b3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ __pycache__/ *.pyc -plba.egg-ingo/ +.pytest_cache/ +build/ +plba.egg-info/ diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..9b38853 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "python.testing.pytestArgs": [ + "tests" + ], + "python.testing.unittestEnabled": false, + "python.testing.pytestEnabled": true +} \ No newline at end of file diff --git a/README.md b/README.md index ea8b2fb..354037a 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,29 @@ # PLBA +## Установка + +Установка пакета напрямую из Git-репозитория через `pip`: + +```bash +export GIT_PLBA_TOKEN="<ваш_токен>" +pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git" +``` + +Если нужен конкретный тег, ветка или commit, добавьте ref после `.git`: + +```bash +pip install "git+https://oauth2:${GIT_PLBA_TOKEN}@git.lesha.spb.ru/alex/plba.git@main" +``` + +### Доступ через `GIT_PLBA_TOKEN` + +Для установки по HTTPS используется переменная окружения `GIT_PLBA_TOKEN`. В нее нужно положить персональный Git-токен с правом чтения репозитория `alex/plba`. Токен передается в URL установки и позволяет `pip` скачать исходники без SSH-доступа. + +Рекомендуется: +- экспортировать токен только в текущую shell-сессию; +- не хардкодить токен в `requirements.txt`, `pyproject.toml` или исходниках; +- использовать отдельный read-only токен, если Git-сервер это поддерживает. + ## 1. Назначение платформы PLBA (`Platform Runtime for Business Applications`) - это runtime-слой для бизнес-приложений, который забирает на себя инфраструктурную часть исполнения. Платформа стандартизирует запуск и остановку рабочих процессов, контроль состояния приложения и эксплуатационные сервисы вокруг них. За счет этого прикладной код концентрируется на бизнес-логике, а не на lifecycle, диагностике и служебных механизмах. Базовая модель использования строится вокруг `ApplicationModule`, `Worker` и прикладной `Routine`, где каждый уровень отвечает за свою часть ответственности. В результате приложение получается предсказуемым в эксплуатации, проще в сопровождении и масштабировании. @@ -189,6 +213,11 @@ classDiagram - Для чего нужен: позволяет собрать дерево исполнения и связать все сообщения конкретной бизнес-операции. - Атрибуты контекста (`TraceContextRecord`): `trace_id`, `alias`, `parent_id`, `type`, `event_time`, `attrs`. - Иерархия: `parent_id` указывает на родительский контекст; так строится цепочка root -> child. + - Стандартная модель для бизнес-приложений: + - root-context создается на внешний триггер верхнего уровня, например входящее письмо, webhook, команду оператора или batch-запуск; + - child-context создается на каждую производную единицу работы, например вложение письма, отдельную задачу, заказ или документ; + - child-context должен создаваться через `parent_id=` и передаваться дальше через `trace_context` / runtime metadata как активный trace конкретной бизнес-операции; + - если одно входящее сообщение порождает несколько задач, у них должен быть общий parent root-context и разные child trace contexts. - Таблица: `trace_contexts`. - Как объявляется в коде: ```python @@ -199,6 +228,22 @@ with traces.open_context(alias="orders-worker", kind="worker", attrs={"routine": root = traces.new_root("orders.sync") child = traces.child_of(root, "orders.process_batch") ``` +```python +with traces.open_context(alias="email:123", kind="email") as message_trace_id: + ... + with traces.open_context( + alias="attachment:invoice.xlsx", + parent_id=message_trace_id, + kind="task", + attrs={"attachment_index": 0}, + ) as task_trace_id: + runtime["trace_id"] = task_trace_id + runtime["message_trace_id"] = message_trace_id + ... +``` + - Стандарт для workflow persistence: + - если workflow представляет отдельную бизнес-задачу, в `state.runtime.trace_id` должен лежать активный trace этой задачи; + - дополнительные идентификаторы родительских контекстов (`message_trace_id`, `email_trace_id` и т.д.) могут храниться рядом в runtime для логов и связности, но `workflow_runs.trace_id` должен ссылаться именно на активный trace текущей задачи. - Trace Message: - Что это: событие внутри активного context (статус шага, предупреждение, ошибка, служебная информация). - Роль `step`: текущая стадия операции (`parse`, `validate`, `persist` и т.д.), которую выставляют через `traces.step("...")`. @@ -245,6 +290,19 @@ child = traces.child_of(root, "orders.process_batch") - При создании runtime включить `enable_http_control=True`. - Использовать `/health` для readiness/liveness и `/actions/*` для операционного контроля. +#### Простой Web UI через nginx (порт 15000) +- Статические файлы UI: `web/control-ui/` (`index.html`, `app.js`, `styles.css`). +- UI использует polling (`/api/health` раз в 2 секунды) и кнопки `Start/Stop` (`POST /api/actions/start|stop`). +- Для вызовов API UI передает заголовок `X-Client-Source: web-ui`. +- Пример server-конфига nginx: `deploy/nginx/plba-control-ui.conf`. + - Слушает `15000`. + - Отдает UI из `/usr/share/nginx/html`. + - Проксирует `/api/*` на control API `http://app:8080/*` (sidecar в docker network). +- Пример запуска в составе бизнес-приложения: `deploy/docker-compose.control-ui.yml`. + - Публикуется только один внешний порт: `15000`. + - Внутренний control API остается в сети compose и доступен nginx по имени сервиса `app`. + - В `app` нужно поднять control channel на `0.0.0.0:8080`. + #### Queue - Назначение: простой in-memory буфер задач/сообщений внутри приложения. - Реализация: `InMemoryTaskQueue[T]`. diff --git a/pyproject.toml b/pyproject.toml index e7986ce..d6b9e32 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plba" -version = "0.2.7" +version = "0.2.8" description = "Platform runtime for business applications" readme = "README.md" requires-python = ">=3.11" diff --git a/src/app_runtime/control/__init__.py b/src/app_runtime/control/__init__.py index 09b35fe..191bf3f 100644 --- a/src/app_runtime/control/__init__.py +++ b/src/app_runtime/control/__init__.py @@ -1,5 +1,5 @@ -from app_runtime.control.base import ControlActionSet, ControlChannel +from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel from app_runtime.control.http_channel import HttpControlChannel from app_runtime.control.service import ControlPlaneService -__all__ = ["ControlActionSet", "ControlChannel", "ControlPlaneService", "HttpControlChannel"] +__all__ = ["ControlActionRequest", "ControlActionSet", "ControlChannel", "ControlPlaneService", "HttpControlChannel"] diff --git a/src/app_runtime/control/base.py b/src/app_runtime/control/base.py index 06e9566..9c65fbf 100644 --- a/src/app_runtime/control/base.py +++ b/src/app_runtime/control/base.py @@ -6,7 +6,16 @@ from dataclasses import dataclass from app_runtime.core.types import HealthPayload -ActionHandler = Callable[[], Awaitable[str]] + +@dataclass(slots=True) +class ControlActionRequest: + wait: bool | None = None + timeout: float | None = None + force: bool | None = None + + +ActionResult = str | dict[str, object] +ActionHandler = Callable[[ControlActionRequest], Awaitable[ActionResult]] HealthHandler = Callable[[], Awaitable[HealthPayload]] diff --git a/src/app_runtime/control/http_app.py b/src/app_runtime/control/http_app.py index 70aa5b1..5d42c6f 100644 --- a/src/app_runtime/control/http_app.py +++ b/src/app_runtime/control/http_app.py @@ -1,19 +1,23 @@ from __future__ import annotations +import logging import time from collections.abc import Awaitable, Callable from fastapi import FastAPI, Request from fastapi.responses import JSONResponse +from app_runtime.control.base import ControlActionRequest from app_runtime.core.types import HealthPayload +LOGGER = logging.getLogger(__name__) + class HttpControlAppFactory: def create( self, health_provider: Callable[[], Awaitable[HealthPayload]], - action_provider: Callable[[str], Awaitable[JSONResponse]], + action_provider: Callable[[str, str, ControlActionRequest], Awaitable[JSONResponse]], ) -> FastAPI: app = FastAPI(title="PLBA Control API") @@ -32,7 +36,53 @@ class HttpControlAppFactory: @app.get("/actions/{action}") @app.post("/actions/{action}") - async def action(action: str) -> JSONResponse: - return await action_provider(action) + async def action(action: str, request: Request) -> JSONResponse: + client_source = self._client_source(request) + if action in {"start", "stop"}: + LOGGER.warning("Control action requested: /actions/%s client=%s", action, client_source) + try: + action_request = self._action_request(request) + except ValueError as exc: + return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=400) + return await action_provider(action, client_source, action_request) return app + + def _action_request(self, request: Request) -> ControlActionRequest: + return ControlActionRequest( + wait=self._bool_param(request, "wait"), + timeout=self._float_param(request, "timeout"), + force=self._bool_param(request, "force"), + ) + + def _client_source(self, request: Request) -> str: + explicit_header = request.headers.get("X-Client-Source", "").strip() + if explicit_header: + return explicit_header + user_agent = request.headers.get("User-Agent", "").strip() + if user_agent: + return f"user-agent:{user_agent}" + return "unknown" + + def _bool_param(self, request: Request, name: str) -> bool | None: + raw_value = request.query_params.get(name) + if raw_value is None: + return None + normalized = raw_value.strip().lower() + if normalized in {"1", "true", "yes", "on"}: + return True + if normalized in {"0", "false", "no", "off"}: + return False + raise ValueError(f"invalid boolean query parameter: {name}={raw_value}") + + def _float_param(self, request: Request, name: str) -> float | None: + raw_value = request.query_params.get(name) + if raw_value is None: + return None + try: + value = float(raw_value) + except ValueError as exc: + raise ValueError(f"invalid numeric query parameter: {name}={raw_value}") from exc + if value < 0: + raise ValueError(f"query parameter must be >= 0: {name}={raw_value}") + return value diff --git a/src/app_runtime/control/http_channel.py b/src/app_runtime/control/http_channel.py index 1073279..d6b4808 100644 --- a/src/app_runtime/control/http_channel.py +++ b/src/app_runtime/control/http_channel.py @@ -4,7 +4,7 @@ import asyncio from fastapi.responses import JSONResponse -from app_runtime.control.base import ControlActionSet, ControlChannel +from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel from app_runtime.control.http_app import HttpControlAppFactory from app_runtime.control.http_runner import UvicornThreadRunner @@ -33,7 +33,12 @@ class HttpControlChannel(ControlChannel): return {"status": "unhealthy", "detail": "control actions are not configured"} return await asyncio.wait_for(self._actions.health(), timeout=float(self._timeout)) - async def _action_response(self, action: str) -> JSONResponse: + async def _action_response( + self, + action: str, + _client_source: str = "unknown", + request: ControlActionRequest | None = None, + ) -> JSONResponse: if self._actions is None: return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404) callbacks = { @@ -44,9 +49,10 @@ class HttpControlChannel(ControlChannel): callback = callbacks.get(action) if callback is None: return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404) - action_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout) + action_request = request or ControlActionRequest() + action_timeout = self._action_timeout(action, action_request) try: - detail = await asyncio.wait_for(callback(), timeout=action_timeout) + detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout) except asyncio.TimeoutError: return JSONResponse( content={"status": "accepted", "detail": f"{action} operation is still in progress"}, @@ -55,3 +61,9 @@ class HttpControlChannel(ControlChannel): except Exception as exc: return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200) + + def _action_timeout(self, action: str, request: ControlActionRequest) -> float: + base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout) + if action != "stop" or request.wait is False or request.timeout is None: + return base_timeout + return max(base_timeout, float(request.timeout) + 1.0) diff --git a/src/app_runtime/core/runtime.py b/src/app_runtime/core/runtime.py index 521ec1f..d2ef66f 100644 --- a/src/app_runtime/core/runtime.py +++ b/src/app_runtime/core/runtime.py @@ -4,6 +4,7 @@ from time import monotonic, sleep from app_runtime.config.providers import FileConfigProvider from app_runtime.contracts.application import ApplicationModule +from app_runtime.control.base import ControlActionRequest from app_runtime.control.service import ControlPlaneService from app_runtime.core.configuration import ConfigurationManager from app_runtime.core.registration import ModuleRegistry @@ -87,7 +88,7 @@ class RuntimeManager: async def health_status(self) -> HealthPayload: return self.current_health() - async def start_runtime(self) -> dict[str, object] | str: + async def start_runtime(self, _request: ControlActionRequest) -> dict[str, object] | str: self._refresh_state() if self._started: return "runtime already running" @@ -100,24 +101,29 @@ class RuntimeManager: return self._action_detail("runtime started", timed_out=False) return self._action_detail("runtime start is still in progress", timed_out=True) - async def stop_runtime(self) -> dict[str, object] | str: + async def stop_runtime(self, request: ControlActionRequest) -> dict[str, object] | str: self._refresh_state() if not self._started: if self._state == LifecycleState.STOPPING: return self._action_detail("runtime stop is still in progress", timed_out=True) return "runtime already stopped" + wait = True if request.wait is None else request.wait + timeout = self.ACTION_TIMEOUT_SECONDS if request.timeout is None else float(request.timeout) + force = False if request.force is None else request.force self._state = LifecycleState.STOPPING try: - self.workers.stop(timeout=self.ACTION_TIMEOUT_SECONDS, force=False) + self.workers.stop(timeout=timeout, force=force, wait=wait) except TimeoutError: return self._action_detail("runtime stop is still in progress", timed_out=True) - self._started = False - self._state = LifecycleState.STOPPED - return self._action_detail("runtime stopped", timed_out=False) + self._refresh_state() + if self._state == LifecycleState.STOPPED: + self._started = False + return self._action_detail("runtime stopped", timed_out=False) + return self._action_detail("runtime stop requested", timed_out=False) - async def runtime_status(self) -> str: + async def runtime_status(self, _request: ControlActionRequest) -> str: self._refresh_state() return self._state.value diff --git a/src/app_runtime/workers/supervisor.py b/src/app_runtime/workers/supervisor.py index a73277b..5d89be7 100644 --- a/src/app_runtime/workers/supervisor.py +++ b/src/app_runtime/workers/supervisor.py @@ -18,10 +18,10 @@ class WorkerSupervisor: for worker in self._workers: worker.start() - def stop(self, timeout: float = 30.0, force: bool = False) -> None: + def stop(self, timeout: float = 30.0, force: bool = False, wait: bool = True) -> None: for worker in self._workers: worker.stop(force=force) - if force: + if not wait: return deadline = monotonic() + timeout while True: diff --git a/src/app_runtime/workflow/persistence/workflow_repository.py b/src/app_runtime/workflow/persistence/workflow_repository.py index 7c5512c..3fb8dc4 100644 --- a/src/app_runtime/workflow/persistence/workflow_repository.py +++ b/src/app_runtime/workflow/persistence/workflow_repository.py @@ -133,7 +133,17 @@ class WorkflowRepository: None, "running", self._connection_factory.dumps(snapshot), - runtime.get("email_trace_id"), + self._resolve_trace_id(runtime), + ) + + @staticmethod + def _resolve_trace_id(runtime: dict[str, Any]) -> str | None: + return ( + runtime.get("trace_id") + or runtime.get("task_trace_id") + or runtime.get("order_trace_id") + or runtime.get("attachment_trace_id") + or runtime.get("email_trace_id") ) def _use_memory(self) -> bool: diff --git a/src/plba/__init__.py b/src/plba/__init__.py index f2876d1..ba58d89 100644 --- a/src/plba/__init__.py +++ b/src/plba/__init__.py @@ -1,6 +1,6 @@ from plba.bootstrap import create_runtime from plba.config import ConfigFileLoader, FileConfigProvider -from plba.control import ControlActionSet, ControlChannel, ControlPlaneService, HttpControlChannel +from plba.control import ControlActionRequest, ControlActionSet, ControlChannel, ControlPlaneService, HttpControlChannel from plba.contracts import ( ApplicationModule, ConfigProvider, @@ -35,6 +35,7 @@ __all__ = [ "ConfigFileLoader", "ConfigProvider", "ConfigurationManager", + "ControlActionRequest", "ControlActionSet", "ControlChannel", "ControlPlaneService", diff --git a/src/plba/control.py b/src/plba/control.py index 8b817ee..f8063a2 100644 --- a/src/plba/control.py +++ b/src/plba/control.py @@ -1,8 +1,9 @@ -from app_runtime.control.base import ControlActionSet, ControlChannel +from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel from app_runtime.control.http_channel import HttpControlChannel from app_runtime.control.service import ControlPlaneService __all__ = [ + "ControlActionRequest", "ControlActionSet", "ControlChannel", "ControlPlaneService", diff --git a/tests/test_business_control_actions.py b/tests/test_business_control_actions.py new file mode 100644 index 0000000..10bfe4c --- /dev/null +++ b/tests/test_business_control_actions.py @@ -0,0 +1,394 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from threading import Event, Lock, Thread +from time import monotonic, sleep + +from fastapi.testclient import TestClient + +from app_runtime.contracts.application import ApplicationModule +from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus +from app_runtime.control.base import ControlActionSet +from app_runtime.control.http_channel import HttpControlChannel +from app_runtime.core.registration import ModuleRegistry +from app_runtime.core.runtime import RuntimeManager + + +@dataclass +class IntervalRoutine: + calls: list[float] = field(default_factory=list) + _lock: Lock = field(default_factory=Lock) + + def run(self) -> None: + with self._lock: + self.calls.append(monotonic()) + + def wait_runs(self, count: int, timeout: float) -> bool: + deadline = monotonic() + timeout + while monotonic() < deadline: + with self._lock: + if len(self.calls) >= count: + return True + sleep(0.01) + return False + + def deltas(self) -> list[float]: + with self._lock: + return [self.calls[index + 1] - self.calls[index] for index in range(len(self.calls) - 1)] + + +class BlockingRoutine: + def __init__(self, started: Event, release: Event) -> None: + self._started = started + self._release = release + + def run(self) -> None: + self._started.set() + self._release.wait(timeout=5.0) + + +class ScenarioWorker(Worker): + def __init__(self, name: str, routine: object, *, interval: float = 0.05) -> None: + self._name = name + self._routine = routine + self._interval = interval + self._thread: Thread | None = None + self._stop_requested = Event() + self._in_flight = 0 + self._runs = 0 + self._lock = Lock() + + @property + def name(self) -> str: + return self._name + + @property + def critical(self) -> bool: + return True + + def start(self) -> None: + if self._thread is not None and self._thread.is_alive(): + return + self._stop_requested.clear() + self._thread = Thread(target=self._loop, name=f"{self._name}-thread", daemon=True) + self._thread.start() + + def stop(self, force: bool = False) -> None: + self._stop_requested.set() + + def health(self) -> WorkerHealth: + return WorkerHealth(name=self.name, status="ok", critical=True, meta={"runs": self._runs}) + + def status(self) -> WorkerStatus: + thread_alive = self._thread is not None and self._thread.is_alive() + with self._lock: + in_flight = self._in_flight + runs = self._runs + if not thread_alive: + state = "stopped" + elif self._stop_requested.is_set(): + state = "stopping" + elif in_flight > 0: + state = "busy" + else: + state = "idle" + return WorkerStatus(name=self.name, state=state, in_flight=in_flight, meta={"runs": runs}) + + def _loop(self) -> None: + while not self._stop_requested.is_set(): + with self._lock: + self._in_flight += 1 + try: + self._routine.run() + with self._lock: + self._runs += 1 + finally: + with self._lock: + self._in_flight -= 1 + if self._stop_requested.is_set(): + return + sleep(self._interval) + + +class ForceAwareWorker(Worker): + def __init__(self, name: str, started: Event, release: Event) -> None: + self._name = name + self._started = started + self._release = release + self._thread: Thread | None = None + self._stop_requested = Event() + self._force_stop = Event() + self.stop_calls: list[bool] = [] + + @property + def name(self) -> str: + return self._name + + @property + def critical(self) -> bool: + return True + + def start(self) -> None: + if self._thread is not None and self._thread.is_alive(): + return + self._stop_requested.clear() + self._force_stop.clear() + self._thread = Thread(target=self._loop, name=f"{self._name}-thread", daemon=True) + self._thread.start() + + def stop(self, force: bool = False) -> None: + self.stop_calls.append(force) + self._stop_requested.set() + if force: + self._force_stop.set() + + def health(self) -> WorkerHealth: + return WorkerHealth(name=self.name, status="ok", critical=True) + + def status(self) -> WorkerStatus: + thread_alive = self._thread is not None and self._thread.is_alive() + if not thread_alive: + state = "stopped" + elif self._stop_requested.is_set(): + state = "stopping" + else: + state = "busy" + return WorkerStatus(name=self.name, state=state, in_flight=1 if thread_alive else 0) + + def _loop(self) -> None: + self._started.set() + while not self._force_stop.is_set(): + if self._release.wait(timeout=0.05): + return + + +class WorkerModule(ApplicationModule): + def __init__(self, worker: Worker) -> None: + self._worker = worker + + @property + def name(self) -> str: + return "business-tests" + + def register(self, registry: ModuleRegistry) -> None: + registry.add_worker(self._worker) + + +def _http_call_json( + client: TestClient, + path: str, + *, + method: str = "GET", + headers: dict[str, str] | None = None, +) -> tuple[int, dict[str, object]]: + response = client.request(method, path, headers=headers or {}) + return response.status_code, response.json() + + +def _poll_until_stopped(client: TestClient, timeout: float = 2.0) -> bool: + deadline = monotonic() + timeout + while monotonic() < deadline: + _, payload = _http_call_json(client, "/actions/status") + if payload.get("detail") == "stopped": + return True + sleep(0.05) + return False + + +def _build_runtime(worker: Worker, *, control_timeout: int = 1) -> tuple[RuntimeManager, TestClient]: + runtime = RuntimeManager() + runtime.register_module(WorkerModule(worker)) + runtime.start(start_control_plane=False) + + channel = HttpControlChannel(host="127.0.0.1", port=0, timeout=control_timeout) + channel._actions = ControlActionSet( + health=runtime.health_status, + start=runtime.start_runtime, + stop=runtime.stop_runtime, + status=runtime.runtime_status, + ) + app = channel._factory.create(channel._health_response, channel._action_response) + return runtime, TestClient(app) + + +def test_worker_wakes_up_with_configured_interval() -> None: + interval = 0.15 + routine = IntervalRoutine() + worker = ScenarioWorker("interval-worker", routine, interval=interval) + runtime = RuntimeManager() + runtime.register_module(WorkerModule(worker)) + + runtime.start() + try: + assert routine.wait_runs(count=3, timeout=2.0) is True + finally: + runtime.stop() + + deltas = routine.deltas() + assert len(deltas) >= 2 + assert all(delta >= interval * 0.7 for delta in deltas[:2]) + + +def test_actions_start_stop_and_health_when_worker_is_idle() -> None: + runtime, client = _build_runtime(ScenarioWorker("idle-worker", IntervalRoutine(), interval=0.2)) + try: + stop_status, stop_payload = _http_call_json(client, "/actions/stop", method="POST") + assert stop_status == 200 + assert isinstance(stop_payload.get("detail"), dict) + assert stop_payload["detail"]["timed_out"] is False + assert stop_payload["detail"]["state"] == "stopped" + + health_stopped_status, health_stopped_payload = _http_call_json(client, "/health") + assert health_stopped_status == 503 + assert health_stopped_payload["state"] == "stopped" + assert health_stopped_payload["status"] == "unhealthy" + + start_status, start_payload = _http_call_json(client, "/actions/start", method="POST") + assert start_status == 200 + assert isinstance(start_payload.get("detail"), dict) + assert start_payload["detail"]["timed_out"] is False + assert start_payload["detail"]["state"] in {"idle", "busy"} + + health_started_status, health_started_payload = _http_call_json(client, "/health") + assert health_started_status == 200 + assert health_started_payload["status"] == "ok" + assert health_started_payload["state"] in {"idle", "busy"} + finally: + client.close() + runtime.stop() + + +def test_actions_stop_busy_worker_before_timeout() -> None: + started = Event() + release = Event() + runtime, client = _build_runtime(ScenarioWorker("busy-worker", BlockingRoutine(started, release))) + assert started.wait(timeout=1.0) is True + + def release_later() -> None: + sleep(0.2) + release.set() + + Thread(target=release_later, daemon=True).start() + try: + status, payload = _http_call_json(client, "/actions/stop", method="POST") + assert status == 200 + assert isinstance(payload.get("detail"), dict) + assert payload["detail"]["timed_out"] is False + assert payload["detail"]["state"] == "stopped" + + health_status, health_payload = _http_call_json(client, "/health") + assert health_status == 503 + assert health_payload["status"] == "unhealthy" + assert health_payload["state"] == "stopped" + finally: + client.close() + runtime.stop() + + +def test_actions_stop_busy_worker_after_timeout() -> None: + started = Event() + release = Event() + runtime, client = _build_runtime(ScenarioWorker("slow-stop-worker", BlockingRoutine(started, release))) + runtime.ACTION_TIMEOUT_SECONDS = 0.3 + assert started.wait(timeout=1.0) is True + + try: + status, payload = _http_call_json(client, "/actions/stop", method="POST") + assert status == 200 + assert isinstance(payload.get("detail"), dict) + assert payload["detail"]["timed_out"] is True + assert payload["detail"]["state"] == "stopping" + + health_status, health_payload = _http_call_json(client, "/health") + assert health_status == 503 + assert health_payload["status"] == "degraded" + assert health_payload["state"] == "stopping" + + release.set() + assert _poll_until_stopped(client, timeout=2.0) is True + stopped_health_status, stopped_health_payload = _http_call_json(client, "/health") + assert stopped_health_status == 503 + assert stopped_health_payload["state"] == "stopped" + finally: + client.close() + runtime.stop() + + +def test_actions_stop_wait_false_returns_before_worker_finishes() -> None: + started = Event() + release = Event() + runtime, client = _build_runtime(ScenarioWorker("async-stop-worker", BlockingRoutine(started, release))) + assert started.wait(timeout=1.0) is True + + try: + status, payload = _http_call_json(client, "/actions/stop?wait=false", method="POST") + assert status == 200 + assert isinstance(payload.get("detail"), dict) + assert payload["detail"]["timed_out"] is False + assert payload["detail"]["state"] == "stopping" + + health_status, health_payload = _http_call_json(client, "/health") + assert health_status == 503 + assert health_payload["status"] == "degraded" + assert health_payload["state"] == "stopping" + + release.set() + assert _poll_until_stopped(client, timeout=2.0) is True + finally: + client.close() + runtime.stop() + + +def test_actions_stop_honors_timeout_query_parameter() -> None: + started = Event() + release = Event() + runtime, client = _build_runtime(ScenarioWorker("timeout-stop-worker", BlockingRoutine(started, release))) + assert started.wait(timeout=1.0) is True + + try: + status, payload = _http_call_json(client, "/actions/stop?timeout=0.1", method="POST") + assert status == 200 + assert isinstance(payload.get("detail"), dict) + assert payload["detail"]["timed_out"] is True + assert payload["detail"]["state"] == "stopping" + + release.set() + assert _poll_until_stopped(client, timeout=2.0) is True + finally: + client.close() + runtime.stop() + + +def test_actions_stop_honors_force_query_parameter() -> None: + started = Event() + release = Event() + worker = ForceAwareWorker("force-stop-worker", started, release) + runtime, client = _build_runtime(worker) + assert started.wait(timeout=1.0) is True + + try: + status, payload = _http_call_json(client, "/actions/stop?force=true&timeout=0.5", method="POST") + assert status == 200 + assert isinstance(payload.get("detail"), dict) + assert payload["detail"]["timed_out"] is False + assert payload["detail"]["state"] == "stopped" + assert worker.stop_calls == [True] + finally: + release.set() + client.close() + runtime.stop() + + +def test_actions_log_client_source_for_start_and_stop(caplog) -> None: + runtime, client = _build_runtime(ScenarioWorker("log-worker", IntervalRoutine(), interval=0.2)) + caplog.set_level("WARNING", logger="app_runtime.control.http_app") + try: + _http_call_json(client, "/actions/stop", method="POST", headers={"X-Client-Source": "web-ui"}) + _http_call_json(client, "/actions/start", method="POST", headers={"X-Client-Source": "web-ui"}) + finally: + client.close() + runtime.stop() + + messages = [record.getMessage() for record in caplog.records] + assert any("/actions/stop client=web-ui" in message for message in messages) + assert any("/actions/start client=web-ui" in message for message in messages) diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 49db511..3fdac4d 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -5,6 +5,8 @@ from dataclasses import dataclass, field from threading import Event, Lock, Thread from time import sleep +from fastapi.testclient import TestClient + from app_runtime.contracts.application import ApplicationModule from app_runtime.contracts.health import HealthContributor from app_runtime.contracts.worker import Worker, WorkerHealth, WorkerStatus @@ -176,6 +178,28 @@ class BlockingModule(ApplicationModule): registry.add_worker(RoutineWorker("blocking-worker", self.routine)) +class WorkerModuleAdapter(ApplicationModule): + def __init__(self, worker: Worker) -> None: + self._worker = worker + + @property + def name(self) -> str: + return "worker-adapter" + + def register(self, registry: ModuleRegistry) -> None: + registry.add_worker(self._worker) + + +class ForceRecordingWorker(RoutineWorker): + def __init__(self) -> None: + super().__init__("force-recorder", CollectingRoutine(), interval=0.01) + self.stop_flags: list[bool] = [] + + def stop(self, force: bool = False) -> None: + self.stop_flags.append(force) + super().stop(force=force) + + class RecordingTransport(NoOpTraceTransport): def __init__(self) -> None: self.contexts: list[object] = [] @@ -188,6 +212,21 @@ class RecordingTransport(NoOpTraceTransport): self.messages.append(record) +def _build_control_client(runtime: RuntimeManager, *, control_timeout: int = 1) -> TestClient: + from app_runtime.control.base import ControlActionSet + from app_runtime.control.http_channel import HttpControlChannel + + channel = HttpControlChannel("127.0.0.1", 0, control_timeout) + channel._actions = ControlActionSet( + health=runtime.health_status, + start=runtime.start_runtime, + stop=runtime.stop_runtime, + status=runtime.runtime_status, + ) + app = channel._factory.create(channel._health_response, channel._action_response) + return TestClient(app) + + def test_runtime_runs_worker_routine_and_exposes_status(tmp_path) -> None: config_path = tmp_path / "config.yml" config_path.write_text( @@ -302,15 +341,15 @@ def test_http_control_channel_exposes_health_and_actions() -> None: async def health(): return {"status": "ok" if state["started"] else "unhealthy", "state": "idle" if state["started"] else "stopped"} - async def start_handler() -> str: + async def start_handler(_request) -> str: state["started"] = True return "started" - async def stop_handler() -> str: + async def stop_handler(_request) -> str: state["started"] = False return "stopped" - async def status_handler() -> str: + async def status_handler(_request) -> str: return "idle" if state["started"] else "stopped" async def scenario() -> None: @@ -339,6 +378,74 @@ def test_http_control_channel_exposes_health_and_actions() -> None: asyncio.run(scenario()) +def test_http_control_stop_wait_false_returns_immediately() -> None: + started = Event() + release = Event() + runtime = RuntimeManager() + runtime.register_module(BlockingModule(started, release)) + runtime.start(start_control_plane=False) + assert started.wait(timeout=1.0) is True + client = _build_control_client(runtime) + + try: + response = client.post("/actions/stop?wait=false") + payload = response.json() + assert response.status_code == 200 + assert payload["detail"]["timed_out"] is False + assert payload["detail"]["state"] == "stopping" + + health_response = client.get("/health") + health_payload = health_response.json() + assert health_response.status_code == 503 + assert health_payload["state"] == "stopping" + assert health_payload["status"] == "degraded" + finally: + release.set() + client.close() + runtime.stop() + + +def test_http_control_stop_timeout_query_changes_wait_window() -> None: + started = Event() + release = Event() + runtime = RuntimeManager() + runtime.ACTION_TIMEOUT_SECONDS = 5.0 + runtime.register_module(BlockingModule(started, release)) + runtime.start(start_control_plane=False) + assert started.wait(timeout=1.0) is True + client = _build_control_client(runtime) + + try: + response = client.post("/actions/stop?timeout=0.1") + payload = response.json() + assert response.status_code == 200 + assert payload["detail"]["timed_out"] is True + assert payload["detail"]["state"] == "stopping" + finally: + release.set() + client.close() + runtime.stop() + + +def test_http_control_stop_force_query_propagates_to_worker() -> None: + runtime = RuntimeManager() + worker = ForceRecordingWorker() + runtime.register_module(WorkerModuleAdapter(worker)) + runtime.start(start_control_plane=False) + client = _build_control_client(runtime) + + try: + response = client.post("/actions/stop?force=true") + payload = response.json() + assert response.status_code == 200 + assert payload["detail"]["timed_out"] is False + assert payload["detail"]["state"] == "stopped" + assert worker.stop_flags == [True] + finally: + client.close() + runtime.stop() + + def test_public_plba_package_exports_runtime_builder_and_worker_contract(tmp_path) -> None: import plba from plba import ApplicationModule as PublicApplicationModule diff --git a/tests/test_workflow_repository.py b/tests/test_workflow_repository.py new file mode 100644 index 0000000..2385608 --- /dev/null +++ b/tests/test_workflow_repository.py @@ -0,0 +1,41 @@ +from app_runtime.workflow.persistence.workflow_repository import WorkflowRepository + + +class StubConnectionFactory: + @staticmethod + def dumps(snapshot) -> str: + return str(snapshot) + + +def test_build_run_payload_prefers_active_task_trace_id() -> None: + repository = WorkflowRepository(StubConnectionFactory()) + snapshot = { + "payload": {"inbox_message": {"external_message_id": "msg-1", "id": 7}}, + "state": { + "runtime": { + "trace_id": "task-trace-1", + "email_trace_id": "message-trace-1", + "queue_task_id": 13, + } + }, + } + + payload = repository._build_run_payload("estimate", snapshot) + + assert payload[0] == "estimate" + assert payload[2] == "msg-1" + assert payload[3] == 13 + assert payload[4] == 7 + assert payload[8] == "task-trace-1" + + +def test_build_run_payload_falls_back_to_legacy_email_trace_id() -> None: + repository = WorkflowRepository(StubConnectionFactory()) + snapshot = { + "payload": {"inbox_message": {"external_message_id": "msg-2", "id": 8}}, + "state": {"runtime": {"email_trace_id": "message-trace-2"}}, + } + + payload = repository._build_run_payload("estimate", snapshot) + + assert payload[8] == "message-trace-2"