Files
plba/src/app_runtime/core/runtime.py
T

211 lines
8.6 KiB
Python

from __future__ import annotations
from time import monotonic, sleep
from app_runtime.config.providers import FileConfigProvider
from app_runtime.contracts.application import ApplicationModule
from app_runtime.control.base import ControlActionRequest
from app_runtime.control.base import TraceQueryRequest
from app_runtime.contracts.trace import TraceLogView
from app_runtime.control.service import ControlPlaneService
from app_runtime.core.configuration import ConfigurationManager
from app_runtime.core.registration import ModuleRegistry
from app_runtime.core.service_container import ServiceContainer
from app_runtime.core.types import HealthPayload, LifecycleState
from app_runtime.health.registry import HealthRegistry
from app_runtime.http.service import ApplicationHttpService
from app_runtime.logging.manager import LogManager
from app_runtime.tracing.reader import build_trace_log_reader
from app_runtime.tracing.service import TraceService
from app_runtime.workers.supervisor import WorkerSupervisor
class RuntimeManager:
ACTION_TIMEOUT_SECONDS = 10.0
ACTION_POLL_INTERVAL_SECONDS = 0.05
def __init__(
self,
configuration: ConfigurationManager | None = None,
services: ServiceContainer | None = None,
traces: TraceService | None = None,
health: HealthRegistry | None = None,
logs: LogManager | None = None,
workers: WorkerSupervisor | None = None,
control_plane: ControlPlaneService | None = None,
application_http: ApplicationHttpService | None = None,
) -> None:
self.configuration = configuration or ConfigurationManager()
self.services = services or ServiceContainer()
self.traces = traces or TraceService()
self.health = health or HealthRegistry()
self.logs = logs or LogManager()
self.workers = workers or WorkerSupervisor()
self.control_plane = control_plane or ControlPlaneService()
self.application_http = application_http or ApplicationHttpService()
self.registry = ModuleRegistry(self.services)
self._started = False
self._state = LifecycleState.IDLE
self._core_registered = False
self._workers_registered = False
self._register_core_services()
def register_module(self, module: ApplicationModule) -> None:
self.registry.register_module(module.name)
module.register(self.registry)
def add_config_file(self, path: str) -> FileConfigProvider:
provider = FileConfigProvider(path)
self.configuration.add_provider(provider)
return provider
def start(self, *, start_control_plane: bool = True) -> None:
if self._started:
return
self._state = LifecycleState.STARTING
config = self.configuration.load()
self.logs.apply_config(config)
self._register_health_contributors()
self._register_workers()
self.workers.start()
if start_control_plane:
self.control_plane.start(self)
self._register_application_http_routes()
self.application_http.start(self)
self._started = True
self._refresh_state()
def stop(self, timeout: float = 30.0, force: bool = False, stop_control_plane: bool = True) -> None:
if not self._started:
return
self._state = LifecycleState.STOPPING
self.workers.stop(timeout=timeout, force=force)
self.application_http.stop()
if stop_control_plane:
self.control_plane.stop()
self._started = False
self._state = LifecycleState.STOPPED
def status(self) -> dict[str, object]:
self._refresh_state()
return self.control_plane.snapshot(self)
def current_health(self) -> HealthPayload:
self._refresh_state()
return self.health.payload(self._state, self.workers.healths())
async def health_status(self) -> HealthPayload:
return self.current_health()
async def start_runtime(self, _request: ControlActionRequest) -> dict[str, object] | str:
self._refresh_state()
if self._started:
return "runtime already running"
if self._state == LifecycleState.STOPPING:
return self._action_detail("runtime stop is still in progress", timed_out=True)
self.start(start_control_plane=False)
started = self._wait_for_state({LifecycleState.IDLE, LifecycleState.BUSY}, timeout=self.ACTION_TIMEOUT_SECONDS)
if started:
return self._action_detail("runtime started", timed_out=False)
return self._action_detail("runtime start is still in progress", timed_out=True)
async def stop_runtime(self, request: ControlActionRequest) -> dict[str, object] | str:
self._refresh_state()
if not self._started:
if self._state == LifecycleState.STOPPING:
return self._action_detail("runtime stop is still in progress", timed_out=True)
return "runtime already stopped"
wait = True if request.wait is None else request.wait
timeout = self.ACTION_TIMEOUT_SECONDS if request.timeout is None else float(request.timeout)
force = False if request.force is None else request.force
self._state = LifecycleState.STOPPING
try:
self.workers.stop(timeout=timeout, force=force, wait=wait)
except TimeoutError:
return self._action_detail("runtime stop is still in progress", timed_out=True)
self.application_http.stop()
self._refresh_state()
if self._state == LifecycleState.STOPPED:
self._started = False
return self._action_detail("runtime stopped", timed_out=False)
return self._action_detail("runtime stop requested", timed_out=False)
async def runtime_status(self, _request: ControlActionRequest) -> str:
self._refresh_state()
return self._state.value
async def trace_logs(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView:
reader = build_trace_log_reader(self.traces.transport)
if reader is None:
raise RuntimeError("trace log reader is not configured")
trace_view = reader.read_trace(trace_id, request.levels, request.ancestor_depth)
if trace_view is None:
raise KeyError(trace_id)
return trace_view
def _register_core_services(self) -> None:
if self._core_registered:
return
self.services.register("configuration", self.configuration)
self.services.register("traces", self.traces)
self.services.register("health", self.health)
self.services.register("logs", self.logs)
self.services.register("workers", self.workers)
self.services.register("control_plane", self.control_plane)
self.services.register("application_http", self.application_http)
self._core_registered = True
def _register_health_contributors(self) -> None:
for contributor in self.registry.health_contributors:
self.health.register(contributor)
def _register_workers(self) -> None:
if self._workers_registered:
return
for worker in self.registry.workers:
self.workers.register(worker)
self._workers_registered = True
def _register_application_http_routes(self) -> None:
for registrar in self.registry.http_route_registrars:
self.application_http.register_routes(registrar)
self.registry.http_route_registrars.clear()
def _refresh_state(self) -> None:
lifecycle = self.workers.lifecycle_state()
if self._state == LifecycleState.STOPPING:
if lifecycle == LifecycleState.STOPPED:
self._started = False
self._state = LifecycleState.STOPPED
return
if not self._started:
if lifecycle == LifecycleState.STOPPED:
self._state = LifecycleState.STOPPED
return
self._state = lifecycle
def _wait_for_state(self, target_states: set[LifecycleState], *, timeout: float) -> bool:
deadline = monotonic() + timeout
while monotonic() < deadline:
self._refresh_state()
if self._state in target_states:
return True
sleep(self.ACTION_POLL_INTERVAL_SECONDS)
self._refresh_state()
return self._state in target_states
def _action_detail(self, message: str, *, timed_out: bool) -> dict[str, object]:
self._refresh_state()
return {
"message": message,
"state": self._state.value,
"timed_out": timed_out,
"workers": self.workers.snapshot(),
}