Добавлена возможность регистрировать прикладные апи

This commit is contained in:
2026-04-30 13:47:23 +03:00
parent 9eb7282437
commit 90422a0c2a
14 changed files with 474 additions and 5 deletions
+3 -2
View File
@@ -8,10 +8,11 @@ from uvicorn import Config, Server
class UvicornThreadRunner:
def __init__(self, host: str, port: int, timeout: int) -> None:
def __init__(self, host: str, port: int, timeout: int, *, thread_name: str = "plba-http-control") -> None:
self._host = host
self._port = port
self._timeout = timeout
self._thread_name = thread_name
self._server: Server | None = None
self._thread: Thread | None = None
self._error: BaseException | None = None
@@ -22,7 +23,7 @@ class UvicornThreadRunner:
self._error = None
config = Config(app=app, host=self._host, port=self._port, log_level="warning")
self._server = Server(config)
self._thread = Thread(target=self._serve, name="plba-http-control", daemon=True)
self._thread = Thread(target=self._serve, name=self._thread_name, daemon=True)
self._thread.start()
await self._wait_until_started()
+5
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
from app_runtime.contracts.health import HealthContributor
from app_runtime.contracts.worker import Worker
from app_runtime.core.service_container import ServiceContainer
from app_runtime.http.base import HttpRouteRegistrar
class ModuleRegistry:
@@ -10,6 +11,7 @@ class ModuleRegistry:
self.services = services
self.workers: list[Worker] = []
self.health_contributors: list[HealthContributor] = []
self.http_route_registrars: list[HttpRouteRegistrar] = []
self.modules: list[str] = []
def register_module(self, name: str) -> None:
@@ -20,3 +22,6 @@ class ModuleRegistry:
def add_health_contributor(self, contributor: HealthContributor) -> None:
self.health_contributors.append(contributor)
def add_http_routes(self, registrar: HttpRouteRegistrar) -> None:
self.http_route_registrars.append(registrar)
+13
View File
@@ -13,6 +13,7 @@ from app_runtime.core.registration import ModuleRegistry
from app_runtime.core.service_container import ServiceContainer
from app_runtime.core.types import HealthPayload, LifecycleState
from app_runtime.health.registry import HealthRegistry
from app_runtime.http.service import ApplicationHttpService
from app_runtime.logging.manager import LogManager
from app_runtime.tracing.reader import build_trace_log_reader
from app_runtime.tracing.service import TraceService
@@ -32,6 +33,7 @@ class RuntimeManager:
logs: LogManager | None = None,
workers: WorkerSupervisor | None = None,
control_plane: ControlPlaneService | None = None,
application_http: ApplicationHttpService | None = None,
) -> None:
self.configuration = configuration or ConfigurationManager()
self.services = services or ServiceContainer()
@@ -40,6 +42,7 @@ class RuntimeManager:
self.logs = logs or LogManager()
self.workers = workers or WorkerSupervisor()
self.control_plane = control_plane or ControlPlaneService()
self.application_http = application_http or ApplicationHttpService()
self.registry = ModuleRegistry(self.services)
self._started = False
self._state = LifecycleState.IDLE
@@ -67,6 +70,8 @@ class RuntimeManager:
self.workers.start()
if start_control_plane:
self.control_plane.start(self)
self._register_application_http_routes()
self.application_http.start(self)
self._started = True
self._refresh_state()
@@ -75,6 +80,7 @@ class RuntimeManager:
return
self._state = LifecycleState.STOPPING
self.workers.stop(timeout=timeout, force=force)
self.application_http.stop()
if stop_control_plane:
self.control_plane.stop()
self._started = False
@@ -120,6 +126,7 @@ class RuntimeManager:
except TimeoutError:
return self._action_detail("runtime stop is still in progress", timed_out=True)
self.application_http.stop()
self._refresh_state()
if self._state == LifecycleState.STOPPED:
self._started = False
@@ -148,6 +155,7 @@ class RuntimeManager:
self.services.register("logs", self.logs)
self.services.register("workers", self.workers)
self.services.register("control_plane", self.control_plane)
self.services.register("application_http", self.application_http)
self._core_registered = True
def _register_health_contributors(self) -> None:
@@ -161,6 +169,11 @@ class RuntimeManager:
self.workers.register(worker)
self._workers_registered = True
def _register_application_http_routes(self) -> None:
for registrar in self.registry.http_route_registrars:
self.application_http.register_routes(registrar)
self.registry.http_route_registrars.clear()
def _refresh_state(self) -> None:
lifecycle = self.workers.lifecycle_state()
+12
View File
@@ -0,0 +1,12 @@
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
from app_runtime.http.http_channel import HttpApplicationChannel
from app_runtime.http.service import ApplicationHttpService
__all__ = [
"ApplicationHttpChannel",
"ApplicationHttpService",
"HttpApplicationAppFactory",
"HttpApplicationChannel",
"HttpRouteRegistrar",
]
+23
View File
@@ -0,0 +1,23 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Protocol
from fastapi import FastAPI
from app_runtime.core.service_container import ServiceContainer
class HttpRouteRegistrar(Protocol):
def register(self, app: FastAPI, services: ServiceContainer) -> None:
"""Register application routes on the provided FastAPI app."""
class ApplicationHttpChannel(ABC):
@abstractmethod
async def start(self, app: FastAPI) -> None:
"""Start the HTTP channel with the prepared application app."""
@abstractmethod
async def stop(self) -> None:
"""Stop the HTTP channel and release resources."""
+37
View File
@@ -0,0 +1,37 @@
from __future__ import annotations
import logging
import time
from collections.abc import Iterable
from fastapi import FastAPI, Request
from app_runtime.core.service_container import ServiceContainer
from app_runtime.http.base import HttpRouteRegistrar
LOGGER = logging.getLogger(__name__)
class HttpApplicationAppFactory:
def create(self, registrars: Iterable[HttpRouteRegistrar], services: ServiceContainer) -> FastAPI:
app = FastAPI(title="PLBA Application API")
self._register_middleware(app)
for registrar in registrars:
registrar.register(app, services)
return app
def _register_middleware(self, app: FastAPI) -> None:
@app.middleware("http")
async def track_request(request: Request, call_next): # type: ignore[no-untyped-def]
started = time.monotonic()
response = await call_next(request)
duration_ms = int((time.monotonic() - started) * 1000)
response.headers["X-Response-Time-Ms"] = str(duration_ms)
LOGGER.info(
"Application HTTP request handled: method=%s path=%s status=%s duration_ms=%s",
request.method,
request.url.path,
response.status_code,
duration_ms,
)
return response
+21
View File
@@ -0,0 +1,21 @@
from __future__ import annotations
from fastapi import FastAPI
from app_runtime.control.http_runner import UvicornThreadRunner
from app_runtime.http.base import ApplicationHttpChannel
class HttpApplicationChannel(ApplicationHttpChannel):
def __init__(self, host: str, port: int, timeout: int) -> None:
self._runner = UvicornThreadRunner(host, port, timeout, thread_name="plba-http-application")
async def start(self, app: FastAPI) -> None:
await self._runner.start(app)
async def stop(self) -> None:
await self._runner.stop()
@property
def port(self) -> int:
return self._runner.port
+42
View File
@@ -0,0 +1,42 @@
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
if TYPE_CHECKING:
from app_runtime.core.runtime import RuntimeManager
class ApplicationHttpService:
def __init__(self, app_factory: HttpApplicationAppFactory | None = None) -> None:
self._channels: list[ApplicationHttpChannel] = []
self._registrars: list[HttpRouteRegistrar] = []
self._app_factory = app_factory or HttpApplicationAppFactory()
def register_channel(self, channel: ApplicationHttpChannel) -> None:
self._channels.append(channel)
def register_routes(self, registrar: HttpRouteRegistrar) -> None:
self._registrars.append(registrar)
def start(self, runtime: RuntimeManager) -> None:
if not self._channels:
return
asyncio.run(self._start_async(runtime))
def stop(self) -> None:
if not self._channels:
return
asyncio.run(self._stop_async())
async def _start_async(self, runtime: RuntimeManager) -> None:
app = self._app_factory.create(self._registrars, runtime.services)
for channel in self._channels:
await channel.start(app)
async def _stop_async(self) -> None:
for channel in reversed(self._channels):
await channel.stop()
+12
View File
@@ -15,6 +15,13 @@ from plba.contracts import (
)
from plba.core import ConfigurationManager, RuntimeManager, ServiceContainer
from plba.health import HealthRegistry
from plba.http import (
ApplicationHttpChannel,
ApplicationHttpService,
HttpApplicationAppFactory,
HttpApplicationChannel,
HttpRouteRegistrar,
)
from plba.logging import LogManager
from plba.queue import InMemoryTaskQueue
from plba.tracing import MySqlTraceTransport, NoOpTraceTransport, TraceService
@@ -43,6 +50,11 @@ __all__ = [
"FileConfigProvider",
"HealthContributor",
"HealthRegistry",
"ApplicationHttpChannel",
"ApplicationHttpService",
"HttpApplicationAppFactory",
"HttpApplicationChannel",
"HttpRouteRegistrar",
"HttpControlChannel",
"InMemoryTaskQueue",
"LogManager",
+12
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
from app_runtime.control.http_channel import HttpControlChannel
from app_runtime.core.runtime import RuntimeManager
from app_runtime.contracts.application import ApplicationModule
from app_runtime.http.http_channel import HttpApplicationChannel
def create_runtime(
@@ -13,6 +14,9 @@ def create_runtime(
control_host: str = "127.0.0.1",
control_port: int = 8080,
control_timeout: int = 5,
application_host: str | None = None,
application_port: int = 15000,
application_timeout: int = 5,
) -> RuntimeManager:
runtime = RuntimeManager()
if config_path is not None:
@@ -25,5 +29,13 @@ def create_runtime(
timeout=control_timeout,
)
)
if application_host is not None:
runtime.application_http.register_channel(
HttpApplicationChannel(
host=application_host,
port=application_port,
timeout=application_timeout,
)
)
runtime.register_module(module)
return runtime
+12
View File
@@ -0,0 +1,12 @@
from app_runtime.http.base import ApplicationHttpChannel, HttpRouteRegistrar
from app_runtime.http.http_app import HttpApplicationAppFactory
from app_runtime.http.http_channel import HttpApplicationChannel
from app_runtime.http.service import ApplicationHttpService
__all__ = [
"ApplicationHttpChannel",
"ApplicationHttpService",
"HttpApplicationAppFactory",
"HttpApplicationChannel",
"HttpRouteRegistrar",
]