diff --git a/src/app_runtime/control/http_channel.py b/src/app_runtime/control/http_channel.py index 9d5a374..1073279 100644 --- a/src/app_runtime/control/http_channel.py +++ b/src/app_runtime/control/http_channel.py @@ -44,10 +44,14 @@ class HttpControlChannel(ControlChannel): callback = callbacks.get(action) if callback is None: return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404) + action_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout) try: - detail = await asyncio.wait_for(callback(), timeout=float(self._timeout)) + detail = await asyncio.wait_for(callback(), timeout=action_timeout) except asyncio.TimeoutError: - return JSONResponse(content={"status": "error", "detail": f"{action} handler timeout"}, status_code=504) + return JSONResponse( + content={"status": "accepted", "detail": f"{action} operation is still in progress"}, + status_code=202, + ) except Exception as exc: return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500) return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200) diff --git a/src/app_runtime/core/runtime.py b/src/app_runtime/core/runtime.py index 9ccf539..521ec1f 100644 --- a/src/app_runtime/core/runtime.py +++ b/src/app_runtime/core/runtime.py @@ -1,5 +1,7 @@ from __future__ import annotations +from time import monotonic, sleep + from app_runtime.config.providers import FileConfigProvider from app_runtime.contracts.application import ApplicationModule from app_runtime.control.service import ControlPlaneService @@ -14,6 +16,9 @@ from app_runtime.workers.supervisor import WorkerSupervisor class RuntimeManager: + ACTION_TIMEOUT_SECONDS = 10.0 + ACTION_POLL_INTERVAL_SECONDS = 0.05 + def __init__( self, configuration: ConfigurationManager | None = None, @@ -65,16 +70,11 @@ class RuntimeManager: if not self._started: return self._state = LifecycleState.STOPPING - try: - self.workers.stop(timeout=timeout, force=force) - except TimeoutError: - # Do not leave runtime in STOPPING forever for control-plane callers. - self.workers.stop(force=True) - finally: - if stop_control_plane: - self.control_plane.stop() - self._started = False - self._state = LifecycleState.STOPPED + self.workers.stop(timeout=timeout, force=force) + if stop_control_plane: + self.control_plane.stop() + self._started = False + self._state = LifecycleState.STOPPED def status(self) -> dict[str, object]: self._refresh_state() @@ -87,19 +87,35 @@ class RuntimeManager: async def health_status(self) -> HealthPayload: return self.current_health() - async def start_runtime(self) -> str: + async def start_runtime(self) -> dict[str, object] | str: + self._refresh_state() if self._started: return "runtime already running" if self._state == LifecycleState.STOPPING: - return "runtime is stopping" - self.start(start_control_plane=False) - return "runtime started" + return self._action_detail("runtime stop is still in progress", timed_out=True) - async def stop_runtime(self) -> str: + self.start(start_control_plane=False) + started = self._wait_for_state({LifecycleState.IDLE, LifecycleState.BUSY}, timeout=self.ACTION_TIMEOUT_SECONDS) + if started: + return self._action_detail("runtime started", timed_out=False) + return self._action_detail("runtime start is still in progress", timed_out=True) + + async def stop_runtime(self) -> dict[str, object] | str: + self._refresh_state() if not self._started: + if self._state == LifecycleState.STOPPING: + return self._action_detail("runtime stop is still in progress", timed_out=True) return "runtime already stopped" - self.stop(timeout=0.0, stop_control_plane=False) - return "runtime stopped" + + self._state = LifecycleState.STOPPING + try: + self.workers.stop(timeout=self.ACTION_TIMEOUT_SECONDS, force=False) + except TimeoutError: + return self._action_detail("runtime stop is still in progress", timed_out=True) + + self._started = False + self._state = LifecycleState.STOPPED + return self._action_detail("runtime stopped", timed_out=False) async def runtime_status(self) -> str: self._refresh_state() @@ -128,6 +144,36 @@ class RuntimeManager: self._workers_registered = True def _refresh_state(self) -> None: - if not self._started or self._state == LifecycleState.STOPPING: + lifecycle = self.workers.lifecycle_state() + + if self._state == LifecycleState.STOPPING: + if lifecycle == LifecycleState.STOPPED: + self._started = False + self._state = LifecycleState.STOPPED return - self._state = self.workers.lifecycle_state() + + if not self._started: + if lifecycle == LifecycleState.STOPPED: + self._state = LifecycleState.STOPPED + return + + self._state = lifecycle + + def _wait_for_state(self, target_states: set[LifecycleState], *, timeout: float) -> bool: + deadline = monotonic() + timeout + while monotonic() < deadline: + self._refresh_state() + if self._state in target_states: + return True + sleep(self.ACTION_POLL_INTERVAL_SECONDS) + self._refresh_state() + return self._state in target_states + + def _action_detail(self, message: str, *, timed_out: bool) -> dict[str, object]: + self._refresh_state() + return { + "message": message, + "state": self._state.value, + "timed_out": timed_out, + "workers": self.workers.snapshot(), + }