Фикс рантайма
This commit is contained in:
@@ -44,10 +44,14 @@ class HttpControlChannel(ControlChannel):
|
|||||||
callback = callbacks.get(action)
|
callback = callbacks.get(action)
|
||||||
if callback is None:
|
if callback is None:
|
||||||
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
|
return JSONResponse(content={"status": "error", "detail": f"unsupported action: {action}"}, status_code=404)
|
||||||
|
action_timeout = max(float(self._timeout), 10.0) if action in {"start", "stop"} else float(self._timeout)
|
||||||
try:
|
try:
|
||||||
detail = await asyncio.wait_for(callback(), timeout=float(self._timeout))
|
detail = await asyncio.wait_for(callback(), timeout=action_timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
return JSONResponse(content={"status": "error", "detail": f"{action} handler timeout"}, status_code=504)
|
return JSONResponse(
|
||||||
|
content={"status": "accepted", "detail": f"{action} operation is still in progress"},
|
||||||
|
status_code=202,
|
||||||
|
)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
return JSONResponse(content={"status": "error", "detail": str(exc)}, status_code=500)
|
||||||
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
|
return JSONResponse(content={"status": "ok", "detail": detail or f"{action} action accepted"}, status_code=200)
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from time import monotonic, sleep
|
||||||
|
|
||||||
from app_runtime.config.providers import FileConfigProvider
|
from app_runtime.config.providers import FileConfigProvider
|
||||||
from app_runtime.contracts.application import ApplicationModule
|
from app_runtime.contracts.application import ApplicationModule
|
||||||
from app_runtime.control.service import ControlPlaneService
|
from app_runtime.control.service import ControlPlaneService
|
||||||
@@ -14,6 +16,9 @@ from app_runtime.workers.supervisor import WorkerSupervisor
|
|||||||
|
|
||||||
|
|
||||||
class RuntimeManager:
|
class RuntimeManager:
|
||||||
|
ACTION_TIMEOUT_SECONDS = 10.0
|
||||||
|
ACTION_POLL_INTERVAL_SECONDS = 0.05
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
configuration: ConfigurationManager | None = None,
|
configuration: ConfigurationManager | None = None,
|
||||||
@@ -65,12 +70,7 @@ class RuntimeManager:
|
|||||||
if not self._started:
|
if not self._started:
|
||||||
return
|
return
|
||||||
self._state = LifecycleState.STOPPING
|
self._state = LifecycleState.STOPPING
|
||||||
try:
|
|
||||||
self.workers.stop(timeout=timeout, force=force)
|
self.workers.stop(timeout=timeout, force=force)
|
||||||
except TimeoutError:
|
|
||||||
# Do not leave runtime in STOPPING forever for control-plane callers.
|
|
||||||
self.workers.stop(force=True)
|
|
||||||
finally:
|
|
||||||
if stop_control_plane:
|
if stop_control_plane:
|
||||||
self.control_plane.stop()
|
self.control_plane.stop()
|
||||||
self._started = False
|
self._started = False
|
||||||
@@ -87,19 +87,35 @@ class RuntimeManager:
|
|||||||
async def health_status(self) -> HealthPayload:
|
async def health_status(self) -> HealthPayload:
|
||||||
return self.current_health()
|
return self.current_health()
|
||||||
|
|
||||||
async def start_runtime(self) -> str:
|
async def start_runtime(self) -> dict[str, object] | str:
|
||||||
|
self._refresh_state()
|
||||||
if self._started:
|
if self._started:
|
||||||
return "runtime already running"
|
return "runtime already running"
|
||||||
if self._state == LifecycleState.STOPPING:
|
if self._state == LifecycleState.STOPPING:
|
||||||
return "runtime is stopping"
|
return self._action_detail("runtime stop is still in progress", timed_out=True)
|
||||||
self.start(start_control_plane=False)
|
|
||||||
return "runtime started"
|
|
||||||
|
|
||||||
async def stop_runtime(self) -> str:
|
self.start(start_control_plane=False)
|
||||||
|
started = self._wait_for_state({LifecycleState.IDLE, LifecycleState.BUSY}, timeout=self.ACTION_TIMEOUT_SECONDS)
|
||||||
|
if started:
|
||||||
|
return self._action_detail("runtime started", timed_out=False)
|
||||||
|
return self._action_detail("runtime start is still in progress", timed_out=True)
|
||||||
|
|
||||||
|
async def stop_runtime(self) -> dict[str, object] | str:
|
||||||
|
self._refresh_state()
|
||||||
if not self._started:
|
if not self._started:
|
||||||
|
if self._state == LifecycleState.STOPPING:
|
||||||
|
return self._action_detail("runtime stop is still in progress", timed_out=True)
|
||||||
return "runtime already stopped"
|
return "runtime already stopped"
|
||||||
self.stop(timeout=0.0, stop_control_plane=False)
|
|
||||||
return "runtime stopped"
|
self._state = LifecycleState.STOPPING
|
||||||
|
try:
|
||||||
|
self.workers.stop(timeout=self.ACTION_TIMEOUT_SECONDS, force=False)
|
||||||
|
except TimeoutError:
|
||||||
|
return self._action_detail("runtime stop is still in progress", timed_out=True)
|
||||||
|
|
||||||
|
self._started = False
|
||||||
|
self._state = LifecycleState.STOPPED
|
||||||
|
return self._action_detail("runtime stopped", timed_out=False)
|
||||||
|
|
||||||
async def runtime_status(self) -> str:
|
async def runtime_status(self) -> str:
|
||||||
self._refresh_state()
|
self._refresh_state()
|
||||||
@@ -128,6 +144,36 @@ class RuntimeManager:
|
|||||||
self._workers_registered = True
|
self._workers_registered = True
|
||||||
|
|
||||||
def _refresh_state(self) -> None:
|
def _refresh_state(self) -> None:
|
||||||
if not self._started or self._state == LifecycleState.STOPPING:
|
lifecycle = self.workers.lifecycle_state()
|
||||||
|
|
||||||
|
if self._state == LifecycleState.STOPPING:
|
||||||
|
if lifecycle == LifecycleState.STOPPED:
|
||||||
|
self._started = False
|
||||||
|
self._state = LifecycleState.STOPPED
|
||||||
return
|
return
|
||||||
self._state = self.workers.lifecycle_state()
|
|
||||||
|
if not self._started:
|
||||||
|
if lifecycle == LifecycleState.STOPPED:
|
||||||
|
self._state = LifecycleState.STOPPED
|
||||||
|
return
|
||||||
|
|
||||||
|
self._state = lifecycle
|
||||||
|
|
||||||
|
def _wait_for_state(self, target_states: set[LifecycleState], *, timeout: float) -> bool:
|
||||||
|
deadline = monotonic() + timeout
|
||||||
|
while monotonic() < deadline:
|
||||||
|
self._refresh_state()
|
||||||
|
if self._state in target_states:
|
||||||
|
return True
|
||||||
|
sleep(self.ACTION_POLL_INTERVAL_SECONDS)
|
||||||
|
self._refresh_state()
|
||||||
|
return self._state in target_states
|
||||||
|
|
||||||
|
def _action_detail(self, message: str, *, timed_out: bool) -> dict[str, object]:
|
||||||
|
self._refresh_state()
|
||||||
|
return {
|
||||||
|
"message": message,
|
||||||
|
"state": self._state.value,
|
||||||
|
"timed_out": timed_out,
|
||||||
|
"workers": self.workers.snapshot(),
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user