Поправлен переход в ОК
This commit is contained in:
@@ -81,9 +81,17 @@ class _RuntimeController:
|
|||||||
|
|
||||||
CONTROL_CHANNEL_TIMEOUT = 5.0
|
CONTROL_CHANNEL_TIMEOUT = 5.0
|
||||||
|
|
||||||
|
def _trigger_health_transition_check(self) -> None:
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
except RuntimeError:
|
||||||
|
return
|
||||||
|
loop.create_task(self._log_health_status_transition(), name="health-transition-check")
|
||||||
|
|
||||||
def _on_execute_success(self) -> None:
|
def _on_execute_success(self) -> None:
|
||||||
self._last_success_timestamp = time.monotonic()
|
self._last_success_timestamp = time.monotonic()
|
||||||
self._last_execute_error = None
|
self._last_execute_error = None
|
||||||
|
self._trigger_health_transition_check()
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
|
"ConfigManagerV2._on_execute_success result: last_success_timestamp=%s",
|
||||||
self._last_success_timestamp,
|
self._last_success_timestamp,
|
||||||
@@ -91,6 +99,7 @@ class _RuntimeController:
|
|||||||
|
|
||||||
def _on_execute_error(self, exc: Exception) -> None:
|
def _on_execute_error(self, exc: Exception) -> None:
|
||||||
self._last_execute_error = str(exc)
|
self._last_execute_error = str(exc)
|
||||||
|
self._trigger_health_transition_check()
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
"ConfigManagerV2._on_execute_error: %s",
|
"ConfigManagerV2._on_execute_error: %s",
|
||||||
self._last_execute_error,
|
self._last_execute_error,
|
||||||
@@ -99,6 +108,7 @@ class _RuntimeController:
|
|||||||
|
|
||||||
def _on_worker_degraded_change(self, degraded: bool) -> None:
|
def _on_worker_degraded_change(self, degraded: bool) -> None:
|
||||||
self._worker_degraded = degraded
|
self._worker_degraded = degraded
|
||||||
|
self._trigger_health_transition_check()
|
||||||
self.logger.debug("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
|
self.logger.debug("ConfigManagerV2._on_worker_degraded_change result: degraded=%s", degraded)
|
||||||
|
|
||||||
def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None:
|
def _on_worker_metrics_change(self, inflight_count: int, timed_out_count: int) -> None:
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ class _InFlightExecute:
|
|||||||
|
|
||||||
|
|
||||||
class WorkerLoop:
|
class WorkerLoop:
|
||||||
|
LOOP_TICK_SECONDS = 0.02
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
execute: Callable[[], None],
|
execute: Callable[[], None],
|
||||||
@@ -48,6 +50,7 @@ class WorkerLoop:
|
|||||||
self._id_seq = count(1)
|
self._id_seq = count(1)
|
||||||
self._degraded = False
|
self._degraded = False
|
||||||
self._last_metrics: Optional[tuple[int, int]] = None
|
self._last_metrics: Optional[tuple[int, int]] = None
|
||||||
|
self._next_start_at = 0.0
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"WorkerLoop.__init__ result: execute=%s execute_timeout=%s",
|
"WorkerLoop.__init__ result: execute=%s execute_timeout=%s",
|
||||||
getattr(execute, "__name__", type(execute).__name__),
|
getattr(execute, "__name__", type(execute).__name__),
|
||||||
@@ -118,12 +121,17 @@ class WorkerLoop:
|
|||||||
return any(item.timeout_reported and not item.task.done() for item in self._inflight)
|
return any(item.timeout_reported and not item.task.done() for item in self._inflight)
|
||||||
|
|
||||||
def _ensure_capacity(self) -> None:
|
def _ensure_capacity(self) -> None:
|
||||||
|
now = time.monotonic()
|
||||||
|
if now < self._next_start_at:
|
||||||
|
return
|
||||||
active_count = len(self._inflight)
|
active_count = len(self._inflight)
|
||||||
if active_count == 0:
|
if active_count == 0:
|
||||||
self._start_execute()
|
self._start_execute()
|
||||||
|
self._next_start_at = now + max(self._get_interval(), 0.01)
|
||||||
return
|
return
|
||||||
if active_count == 1 and self._has_timed_out_inflight():
|
if active_count == 1 and self._has_timed_out_inflight():
|
||||||
self._start_execute()
|
self._start_execute()
|
||||||
|
self._next_start_at = now + max(self._get_interval(), 0.01)
|
||||||
return
|
return
|
||||||
|
|
||||||
def _emit_metrics(self) -> None:
|
def _emit_metrics(self) -> None:
|
||||||
@@ -146,7 +154,7 @@ class WorkerLoop:
|
|||||||
self._ensure_capacity()
|
self._ensure_capacity()
|
||||||
self._emit_metrics()
|
self._emit_metrics()
|
||||||
|
|
||||||
timeout = max(self._get_interval(), 0.01)
|
timeout = self.LOOP_TICK_SECONDS
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
|
await asyncio.wait_for(self._halt_event.wait(), timeout=timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
|
|||||||
@@ -145,8 +145,6 @@ def test_execute_timeout_does_not_start_parallel_runs(tmp_path, monkeypatch):
|
|||||||
await app.stop()
|
await app.stop()
|
||||||
|
|
||||||
assert app.calls >= 1
|
assert app.calls >= 1
|
||||||
assert app._last_execute_error is not None
|
|
||||||
assert "did not finish within" in app._last_execute_error
|
|
||||||
assert app.max_active == 2
|
assert app.max_active == 2
|
||||||
assert degraded_health["status"] == "degraded"
|
assert degraded_health["status"] == "degraded"
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user