From e6509ee0cd23214409a6e8280596a09317f48280 Mon Sep 17 00:00:00 2001 From: zosimovaa Date: Thu, 14 May 2026 21:08:18 +0300 Subject: [PATCH] =?UTF-8?q?=D0=95=D0=B4=D0=B8=D0=BD=D1=8B=D0=B9=20http=20?= =?UTF-8?q?=D1=81=D0=B5=D1=80=D0=B2=D0=B8=D1=81=20=D0=BD=D0=B0=20=D0=BE?= =?UTF-8?q?=D0=B4=D0=BD=D0=BE=D0=BC=20=D0=BF=D0=BE=D1=80=D1=82=D1=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 19 +++++- pyproject.toml | 2 +- src/app_runtime/control/action_responder.py | 46 ++++++++++++++ src/app_runtime/control/http_channel.py | 39 ++++-------- src/app_runtime/http/__init__.py | 2 + src/app_runtime/http/unified_service.py | 68 +++++++++++++++++++++ src/plba/__init__.py | 2 + src/plba/http.py | 2 + tests/test_application_http.py | 25 ++++++++ 9 files changed, 174 insertions(+), 31 deletions(-) create mode 100644 src/app_runtime/control/action_responder.py create mode 100644 src/app_runtime/http/unified_service.py diff --git a/README.md b/README.md index a46af04..4dacd66 100644 --- a/README.md +++ b/README.md @@ -319,7 +319,7 @@ with traces.open_context(alias="email:123", kind="email") as message_trace_id: - Consumer-воркер извлекает через `get(timeout)` и обрабатывает. ## 5. Application HTTP -`PLBA` поддерживает отдельный прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. Он не смешивается с `control plane` и поднимается отдельным сервисом внутри того же runtime. +`PLBA` поддерживает прикладной HTTP-слой для пользовательских страниц и API бизнес-приложения. По умолчанию он отделён от `control plane`, но при необходимости можно опубликовать control routes и business routes через один HTTP channel. `Control Plane` и `Application HTTP` обслуживают разные контуры: - `Control Plane` используется для `/health`, `/actions/*`, `/traces/*`. @@ -330,6 +330,7 @@ with traces.open_context(alias="email:123", kind="email") as message_trace_id: - `HttpApplicationChannel` поднимает отдельный `FastAPI` app через `uvicorn`. - `HttpRouteRegistrar` регистрирует пользовательские routes и получает `ServiceContainer`. - `HttpApplicationAppFactory` собирает `FastAPI(title="PLBA Application API")`, middleware и routes. +- `UnifiedHttpService` собирает один `FastAPI` app с control routes (`/health`, `/actions/*`, `/traces/*`) и application routes. ### Минимальный пример ```python @@ -370,6 +371,20 @@ runtime.start() - `GET /demo` - `POST /demo/api/tasks` +### Единый HTTP API +Если бизнес-приложению нужен один опубликованный порт для control и application routes, передайте `UnifiedHttpService` как `application_http`: + +```python +from plba import HttpApplicationChannel, RuntimeManager, UnifiedHttpService + +runtime = RuntimeManager(application_http=UnifiedHttpService()) +runtime.application_http.register_channel( + HttpApplicationChannel(host="0.0.0.0", port=15000, timeout=5) +) +``` + +Старый режим с отдельными `ControlPlaneService` и `ApplicationHttpService` остаётся доступен и работает как раньше. + ### Типовые сценарии - `Web UI для фоновых задач`: список задач, запуск, статус, cancel, download result. - `Внутренний REST API`: например `POST /jobs`, `GET /jobs/{id}`, `POST /jobs/{id}/cancel`. @@ -377,7 +392,7 @@ runtime.start() - `Webhook endpoint`: прием callback от внешней системы и передача события в worker pipeline. ### Ограничения и рекомендации -- Не смешивайте business routes с `control plane`. +- По умолчанию держите business routes и `control plane` раздельно; используйте `UnifiedHttpService`, когда один опубликованный порт является явным эксплуатационным требованием. - Держите бизнес-логику в сервисах, а handlers используйте как тонкий HTTP-адаптер. - Runtime предоставляет доступ к зависимостям через `services`, поэтому handlers не должны зависеть от `RuntimeManager`. - В первой версии `Application HTTP` не решает auth, static files, шаблонизаторы, websocket и OpenAPI customization. diff --git a/pyproject.toml b/pyproject.toml index af0457d..1e8fa9e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "plba" -version = "0.3.20" +version = "0.4.0" description = "Platform runtime for business applications" readme = "README.md" requires-python = ">=3.11" diff --git a/src/app_runtime/control/action_responder.py b/src/app_runtime/control/action_responder.py new file mode 100644 index 0000000..deeb94f --- /dev/null +++ b/src/app_runtime/control/action_responder.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import asyncio + +from fastapi.responses import JSONResponse + +from app_runtime.control.base import ControlActionRequest, ControlActionSet + + +class ControlActionResponder: + def __init__(self, actions: ControlActionSet, timeout: int) -> None: + self._actions = actions + self._timeout = timeout + + async def respond( + self, + action: str, + _client_source: str = "unknown", + request: ControlActionRequest | None = None, + ) -> JSONResponse: + callbacks = { + "start": self._actions.start, + "stop": self._actions.stop, + "status": self._actions.status, + } + callback = callbacks.get(action) + if callback is None: + return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404) + action_request = request or ControlActionRequest() + action_timeout = self._action_timeout(action, action_request) + try: + detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout) + except asyncio.TimeoutError: + return JSONResponse( + content={"status": "accepted", "detail": f"{action} operation is still in progress"}, + status_code=202, + ) + except Exception as exc: + return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) + return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200) + + def _action_timeout(self, action: str, request: ControlActionRequest) -> float: + base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout) + if action != "stop" or request.wait is False or request.timeout is None: + return base_timeout + return max(base_timeout, float(request.timeout) + 1.0) diff --git a/src/app_runtime/control/http_channel.py b/src/app_runtime/control/http_channel.py index b1b1166..af6013c 100644 --- a/src/app_runtime/control/http_channel.py +++ b/src/app_runtime/control/http_channel.py @@ -4,6 +4,7 @@ import asyncio from fastapi.responses import JSONResponse +from app_runtime.control.action_responder import ControlActionResponder from app_runtime.control.base import ControlActionRequest, ControlActionSet, ControlChannel, TraceQueryRequest from app_runtime.contracts.trace import TraceLogView from app_runtime.control.http_app import HttpControlAppFactory @@ -16,9 +17,11 @@ class HttpControlChannel(ControlChannel): self._runner = UvicornThreadRunner(host, port, timeout) self._factory = HttpControlAppFactory() self._actions: ControlActionSet | None = None + self._action_responder: ControlActionResponder | None = None async def start(self, actions: ControlActionSet) -> None: self._actions = actions + self._action_responder = ControlActionResponder(actions, self._timeout) app = self._factory.create(self._health_response, self._action_response, self._trace_response) await self._runner.start(app) @@ -40,34 +43,14 @@ class HttpControlChannel(ControlChannel): _client_source: str = "unknown", request: ControlActionRequest | None = None, ) -> JSONResponse: - if self._actions is None: - return JSONResponse(content={"status": "error", "detail": f"{action} handler is not configured"}, status_code=404) - callbacks = { - "start": self._actions.start, - "stop": self._actions.stop, - "status": self._actions.status, - } - callback = callbacks.get(action) - if callback is None: - return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404) - action_request = request or ControlActionRequest() - action_timeout = self._action_timeout(action, action_request) - try: - detail = await asyncio.wait_for(callback(action_request), timeout=action_timeout) - except asyncio.TimeoutError: - return JSONResponse( - content={"status": "accepted", "detail": f"{action} operation is still in progress"}, - status_code=202, - ) - except Exception as exc: - return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) - return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200) - - def _action_timeout(self, action: str, request: ControlActionRequest) -> float: - base_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout) - if action != "stop" or request.wait is False or request.timeout is None: - return base_timeout - return max(base_timeout, float(request.timeout) + 1.0) + if self._action_responder is None: + if self._actions is None: + return JSONResponse( + content={"status": "error", "detail": f"{action} handler is not configured"}, + status_code=404, + ) + self._action_responder = ControlActionResponder(self._actions, self._timeout) + return await self._action_responder.respond(action, _client_source, request) async def _trace_response(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView: if self._actions is None or self._actions.trace_lookup is None: diff --git a/src/app_runtime/http/__init__.py b/src/app_runtime/http/__init__.py index bf9b28b..53a2cbe 100644 --- a/src/app_runtime/http/__init__.py +++ b/src/app_runtime/http/__init__.py @@ -2,6 +2,7 @@ from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar from app_runtime.http.http_app import HttpApplicationAppFactory from app_runtime.http.http_channel import HttpApplicationChannel from app_runtime.http.service import ApplicationHttpService +from app_runtime.http.unified_service import UnifiedHttpService __all__ = [ "ApplicationHttpChannel", @@ -9,4 +10,5 @@ __all__ = [ "HttpApplicationAppFactory", "HttpApplicationChannel", "HttpRouteRegistrar", + "UnifiedHttpService", ] diff --git a/src/app_runtime/http/unified_service.py b/src/app_runtime/http/unified_service.py new file mode 100644 index 0000000..d1335e4 --- /dev/null +++ b/src/app_runtime/http/unified_service.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING + +from app_runtime.control.action_responder import ControlActionResponder +from app_runtime.control.base import ControlActionSet +from app_runtime.control.http_app import HttpControlAppFactory +from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar +from app_runtime.http.http_app import HttpApplicationAppFactory + +if TYPE_CHECKING: + from app_runtime.core.runtime import RuntimeManager + + +class UnifiedHttpService: + """Publish control and application routes through the same HTTP channel.""" + + def __init__( + self, + app_factory: HttpApplicationAppFactory | None = None, + control_app_factory: HttpControlAppFactory | None = None, + control_timeout: int = 5, + ) -> None: + self._channels: list[ApplicationHttpChannel] = [] + self._registrars: list[HttpRouteRegistrar] = [] + self._app_factory = app_factory or HttpApplicationAppFactory() + self._control_app_factory = control_app_factory or HttpControlAppFactory() + self._control_timeout = control_timeout + + def register_channel(self, channel: ApplicationHttpChannel) -> None: + self._channels.append(channel) + + def register_routes(self, registrar: HttpRouteRegistrar) -> None: + self._registrars.append(registrar) + + def start(self, runtime: RuntimeManager) -> None: + if not self._channels: + return + asyncio.run(self._start_async(runtime)) + + def stop(self) -> None: + if not self._channels: + return + asyncio.run(self._stop_async()) + + async def _start_async(self, runtime: RuntimeManager) -> None: + app = self._app_factory.create(self._registrars, runtime.services) + actions = ControlActionSet( + health=runtime.health_status, + start=runtime.start_runtime, + stop=runtime.stop_runtime, + status=runtime.runtime_status, + trace_lookup=runtime.trace_logs, + ) + action_responder = ControlActionResponder(actions, self._control_timeout) + control_app = self._control_app_factory.create( + actions.health, + action_responder.respond, + actions.trace_lookup, + ) + app.router.routes.extend(control_app.router.routes) + for channel in self._channels: + await channel.start(app) + + async def _stop_async(self) -> None: + for channel in reversed(self._channels): + await channel.stop() diff --git a/src/plba/__init__.py b/src/plba/__init__.py index 2690d00..ee42c79 100644 --- a/src/plba/__init__.py +++ b/src/plba/__init__.py @@ -21,6 +21,7 @@ from plba.http import ( HttpApplicationAppFactory, HttpApplicationChannel, HttpRouteRegistrar, + UnifiedHttpService, ) from plba.logging import LogManager from plba.queue import InMemoryTaskQueue @@ -56,6 +57,7 @@ __all__ = [ "HttpApplicationChannel", "HttpRouteRegistrar", "HttpControlChannel", + "UnifiedHttpService", "InMemoryTaskQueue", "LogManager", "MySqlTraceTransport", diff --git a/src/plba/http.py b/src/plba/http.py index bf9b28b..53a2cbe 100644 --- a/src/plba/http.py +++ b/src/plba/http.py @@ -2,6 +2,7 @@ from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar from app_runtime.http.http_app import HttpApplicationAppFactory from app_runtime.http.http_channel import HttpApplicationChannel from app_runtime.http.service import ApplicationHttpService +from app_runtime.http.unified_service import UnifiedHttpService __all__ = [ "ApplicationHttpChannel", @@ -9,4 +10,5 @@ __all__ = [ "HttpApplicationAppFactory", "HttpApplicationChannel", "HttpRouteRegistrar", + "UnifiedHttpService", ] diff --git a/tests/test_application_http.py b/tests/test_application_http.py index 87869b4..2f9e32d 100644 --- a/tests/test_application_http.py +++ b/tests/test_application_http.py @@ -15,6 +15,7 @@ from app_runtime.core.registration import ModuleRegistry from app_runtime.core.runtime import RuntimeManager from app_runtime.http.base import ApplicationHttpChannel from app_runtime.http.http_channel import HttpApplicationChannel +from app_runtime.http.unified_service import UnifiedHttpService try: import python_multipart # noqa: F401 @@ -207,3 +208,27 @@ def test_control_plane_and_application_http_work_independently() -> None: finally: runtime.control_plane.stop() runtime.stop(stop_control_plane=False) + + +def test_unified_http_serves_control_and_application_routes_on_one_app() -> None: + runtime = RuntimeManager(application_http=UnifiedHttpService()) + channel = RecordingChannel() + runtime.application_http.register_channel(channel) + runtime.register_module(HttpModule(PingRoutes())) + runtime.start(start_control_plane=False) + + try: + client = _application_client(channel) + with client: + health_response = client.get("/health") + action_response = client.get("/actions/status") + app_response = client.get("/estimate/ping") + + assert health_response.status_code == 200 + assert health_response.json()["status"] == "ok" + assert action_response.status_code == 200 + assert action_response.json() == {"status": "ok", "detail": "idle"} + assert app_response.status_code == 200 + assert app_response.json() == {"status": "ok"} + finally: + runtime.stop(stop_control_plane=False)