from __future__ import annotations from time import monotonic, sleep from app_runtime.config.providers import FileConfigProvider from app_runtime.contracts.application import ApplicationModule from app_runtime.control.base import ControlActionRequest from app_runtime.control.base import TraceQueryRequest from app_runtime.contracts.trace import TraceLogView from app_runtime.control.service import ControlPlaneService from app_runtime.core.configuration import ConfigurationManager from app_runtime.core.registration import ModuleRegistry from app_runtime.core.service_container import ServiceContainer from app_runtime.core.types import HealthPayload, LifecycleState from app_runtime.health.registry import HealthRegistry from app_runtime.http.service import ApplicationHttpService from app_runtime.logging.manager import LogManager from app_runtime.tracing.reader import build_trace_log_reader from app_runtime.tracing.service import TraceService from app_runtime.workers.supervisor import WorkerSupervisor class RuntimeManager: ACTION_TIMEOUT_SECONDS = 10.0 ACTION_POLL_INTERVAL_SECONDS = 0.05 def __init__( self, configuration: ConfigurationManager | None = None, services: ServiceContainer | None = None, traces: TraceService | None = None, health: HealthRegistry | None = None, logs: LogManager | None = None, workers: WorkerSupervisor | None = None, control_plane: ControlPlaneService | None = None, application_http: ApplicationHttpService | None = None, ) -> None: self.configuration = configuration or ConfigurationManager() self.services = services or ServiceContainer() self.traces = traces or TraceService() self.health = health or HealthRegistry() self.logs = logs or LogManager() self.workers = workers or WorkerSupervisor() self.control_plane = control_plane or ControlPlaneService() self.application_http = application_http or ApplicationHttpService() self.registry = ModuleRegistry(self.services) self._started = False self._state = LifecycleState.IDLE self._core_registered = False self._workers_registered = False self._register_core_services() def register_module(self, module: ApplicationModule) -> None: self.registry.register_module(module.name) module.register(self.registry) def add_config_file(self, path: str) -> FileConfigProvider: provider = FileConfigProvider(path) self.configuration.add_provider(provider) return provider def start(self, *, start_control_plane: bool = True) -> None: if self._started: return self._state = LifecycleState.STARTING config = self.configuration.load() self.logs.apply_config(config) self._register_health_contributors() self._register_workers() self.workers.start() if start_control_plane: self.control_plane.start(self) self._register_application_http_routes() self.application_http.start(self) self._started = True self._refresh_state() def stop(self, timeout: float = 30.0, force: bool = False, stop_control_plane: bool = True) -> None: if not self._started: return self._state = LifecycleState.STOPPING self.workers.stop(timeout=timeout, force=force) self.application_http.stop() if stop_control_plane: self.control_plane.stop() self._started = False self._state = LifecycleState.STOPPED def status(self) -> dict[str, object]: self._refresh_state() return self.control_plane.snapshot(self) def current_health(self) -> HealthPayload: self._refresh_state() return self.health.payload(self._state, self.workers.healths()) async def health_status(self) -> HealthPayload: return self.current_health() async def start_runtime(self, _request: ControlActionRequest) -> dict[str, object] | str: self._refresh_state() if self._started: return "runtime already running" if self._state == LifecycleState.STOPPING: return self._action_detail("runtime stop is still in progress", timed_out=True) self.start(start_control_plane=False) started = self._wait_for_state({LifecycleState.IDLE, LifecycleState.BUSY}, timeout=self.ACTION_TIMEOUT_SECONDS) if started: return self._action_detail("runtime started", timed_out=False) return self._action_detail("runtime start is still in progress", timed_out=True) async def stop_runtime(self, request: ControlActionRequest) -> dict[str, object] | str: self._refresh_state() if not self._started: if self._state == LifecycleState.STOPPING: return self._action_detail("runtime stop is still in progress", timed_out=True) return "runtime already stopped" wait = True if request.wait is None else request.wait timeout = self.ACTION_TIMEOUT_SECONDS if request.timeout is None else float(request.timeout) force = False if request.force is None else request.force self._state = LifecycleState.STOPPING try: self.workers.stop(timeout=timeout, force=force, wait=wait) except TimeoutError: return self._action_detail("runtime stop is still in progress", timed_out=True) self.application_http.stop() self._refresh_state() if self._state == LifecycleState.STOPPED: self._started = False return self._action_detail("runtime stopped", timed_out=False) return self._action_detail("runtime stop requested", timed_out=False) async def runtime_status(self, _request: ControlActionRequest) -> str: self._refresh_state() return self._state.value async def trace_logs(self, trace_id: str, request: TraceQueryRequest) -> TraceLogView: reader = build_trace_log_reader(self.traces.transport) if reader is None: raise RuntimeError("trace log reader is not configured") trace_view = reader.read_trace(trace_id, request.levels, request.ancestor_depth) if trace_view is None: raise KeyError(trace_id) return trace_view def _register_core_services(self) -> None: if self._core_registered: return self.services.register("configuration", self.configuration) self.services.register("traces", self.traces) self.services.register("health", self.health) self.services.register("logs", self.logs) self.services.register("workers", self.workers) self.services.register("control_plane", self.control_plane) self.services.register("application_http", self.application_http) self._core_registered = True def _register_health_contributors(self) -> None: for contributor in self.registry.health_contributors: self.health.register(contributor) def _register_workers(self) -> None: if self._workers_registered: return for worker in self.registry.workers: self.workers.register(worker) self._workers_registered = True def _register_application_http_routes(self) -> None: for registrar in self.registry.http_route_registrars: self.application_http.register_routes(registrar) self.registry.http_route_registrars.clear() def _refresh_state(self) -> None: lifecycle = self.workers.lifecycle_state() if self._state == LifecycleState.STOPPING: if lifecycle == LifecycleState.STOPPED: self._started = False self._state = LifecycleState.STOPPED return if not self._started: if lifecycle == LifecycleState.STOPPED: self._state = LifecycleState.STOPPED return self._state = lifecycle def _wait_for_state(self, target_states: set[LifecycleState], *, timeout: float) -> bool: deadline = monotonic() + timeout while monotonic() < deadline: self._refresh_state() if self._state in target_states: return True sleep(self.ACTION_POLL_INTERVAL_SECONDS) self._refresh_state() return self._state in target_states def _action_detail(self, message: str, *, timed_out: bool) -> dict[str, object]: self._refresh_state() return { "message": message, "state": self._state.value, "timed_out": timed_out, "workers": self.workers.snapshot(), }